Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::{self, FileId, PAGE_SZ};
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{
37 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
38 : };
39 : use crate::tenant::timeline::GetVectoredError;
40 : use crate::tenant::vectored_blob_io::{
41 : BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
42 : VectoredReadPlanner,
43 : };
44 : use crate::tenant::PageReconstructError;
45 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
46 : use crate::virtual_file::{self, VirtualFile};
47 : use crate::{walrecord, TEMP_FILE_SUFFIX};
48 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
49 : use anyhow::{anyhow, bail, ensure, Context, Result};
50 : use bytes::BytesMut;
51 : use camino::{Utf8Path, Utf8PathBuf};
52 : use futures::StreamExt;
53 : use itertools::Itertools;
54 : use pageserver_api::keyspace::KeySpace;
55 : use pageserver_api::models::ImageCompressionAlgorithm;
56 : use pageserver_api::shard::TenantShardId;
57 : use rand::{distributions::Alphanumeric, Rng};
58 : use serde::{Deserialize, Serialize};
59 : use std::collections::VecDeque;
60 : use std::fs::File;
61 : use std::io::SeekFrom;
62 : use std::ops::Range;
63 : use std::os::unix::fs::FileExt;
64 : use std::str::FromStr;
65 : use std::sync::Arc;
66 : use tokio::sync::OnceCell;
67 : use tokio_epoll_uring::IoBufMut;
68 : use tracing::*;
69 :
70 : use utils::{
71 : bin_ser::BeSer,
72 : id::{TenantId, TimelineId},
73 : lsn::Lsn,
74 : };
75 :
76 : use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
77 :
78 : ///
79 : /// Header stored in the beginning of the file
80 : ///
81 : /// After this comes the 'values' part, starting on block 1. After that,
82 : /// the 'index' starts at the block indicated by 'index_start_blk'
83 : ///
84 1050 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
85 : pub struct Summary {
86 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
87 : pub magic: u16,
88 : pub format_version: u16,
89 :
90 : pub tenant_id: TenantId,
91 : pub timeline_id: TimelineId,
92 : pub key_range: Range<Key>,
93 : pub lsn_range: Range<Lsn>,
94 :
95 : /// Block number where the 'index' part of the file begins.
96 : pub index_start_blk: u32,
97 : /// Block within the 'index', where the B-tree root page is stored
98 : pub index_root_blk: u32,
99 : }
100 :
101 : impl From<&DeltaLayer> for Summary {
102 0 : fn from(layer: &DeltaLayer) -> Self {
103 0 : Self::expected(
104 0 : layer.desc.tenant_shard_id.tenant_id,
105 0 : layer.desc.timeline_id,
106 0 : layer.desc.key_range.clone(),
107 0 : layer.desc.lsn_range.clone(),
108 0 : )
109 0 : }
110 : }
111 :
112 : impl Summary {
113 1050 : pub(super) fn expected(
114 1050 : tenant_id: TenantId,
115 1050 : timeline_id: TimelineId,
116 1050 : keys: Range<Key>,
117 1050 : lsns: Range<Lsn>,
118 1050 : ) -> Self {
119 1050 : Self {
120 1050 : magic: DELTA_FILE_MAGIC,
121 1050 : format_version: STORAGE_FORMAT_VERSION,
122 1050 :
123 1050 : tenant_id,
124 1050 : timeline_id,
125 1050 : key_range: keys,
126 1050 : lsn_range: lsns,
127 1050 :
128 1050 : index_start_blk: 0,
129 1050 : index_root_blk: 0,
130 1050 : }
131 1050 : }
132 : }
133 :
134 : // Flag indicating that this version initialize the page
135 : const WILL_INIT: u64 = 1;
136 :
137 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
138 : /// offset, and for WAL records it also contains `will_init` flag. The flag
139 : /// helps to determine the range of records that needs to be applied, without
140 : /// reading/deserializing records themselves.
141 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
142 : pub struct BlobRef(pub u64);
143 :
144 : impl BlobRef {
145 511428 : pub fn will_init(&self) -> bool {
146 511428 : (self.0 & WILL_INIT) != 0
147 511428 : }
148 :
149 6811022 : pub fn pos(&self) -> u64 {
150 6811022 : self.0 >> 1
151 6811022 : }
152 :
153 6463022 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
154 6463022 : let mut blob_ref = pos << 1;
155 6463022 : if will_init {
156 6461668 : blob_ref |= WILL_INIT;
157 6461668 : }
158 6463022 : BlobRef(blob_ref)
159 6463022 : }
160 : }
161 :
162 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
163 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
164 :
165 : /// This is the key of the B-tree index stored in the delta layer. It consists
166 : /// of the serialized representation of a Key and LSN.
167 : impl DeltaKey {
168 2064198 : fn from_slice(buf: &[u8]) -> Self {
169 2064198 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
170 2064198 : bytes.copy_from_slice(buf);
171 2064198 : DeltaKey(bytes)
172 2064198 : }
173 :
174 6709789 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
175 6709789 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
176 6709789 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
177 6709789 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
178 6709789 : DeltaKey(bytes)
179 6709789 : }
180 :
181 2064198 : fn key(&self) -> Key {
182 2064198 : Key::from_slice(&self.0)
183 2064198 : }
184 :
185 2064198 : fn lsn(&self) -> Lsn {
186 2064198 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
187 2064198 : }
188 :
189 2682726 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
190 2682726 : let mut lsn_buf = [0u8; 8];
191 2682726 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
192 2682726 : Lsn(u64::from_be_bytes(lsn_buf))
193 2682726 : }
194 : }
195 :
196 : /// This is used only from `pagectl`. Within pageserver, all layers are
197 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
198 : pub struct DeltaLayer {
199 : path: Utf8PathBuf,
200 : pub desc: PersistentLayerDesc,
201 : inner: OnceCell<Arc<DeltaLayerInner>>,
202 : }
203 :
204 : impl std::fmt::Debug for DeltaLayer {
205 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206 0 : use super::RangeDisplayDebug;
207 0 :
208 0 : f.debug_struct("DeltaLayer")
209 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
210 0 : .field("lsn_range", &self.desc.lsn_range)
211 0 : .field("file_size", &self.desc.file_size)
212 0 : .field("inner", &self.inner)
213 0 : .finish()
214 0 : }
215 : }
216 :
217 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
218 : /// file.
219 : pub struct DeltaLayerInner {
220 : // values copied from summary
221 : index_start_blk: u32,
222 : index_root_blk: u32,
223 :
224 : file: VirtualFile,
225 : file_id: FileId,
226 :
227 : #[allow(dead_code)]
228 : layer_key_range: Range<Key>,
229 : #[allow(dead_code)]
230 : layer_lsn_range: Range<Lsn>,
231 :
232 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
233 : }
234 :
235 : impl std::fmt::Debug for DeltaLayerInner {
236 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 0 : f.debug_struct("DeltaLayerInner")
238 0 : .field("index_start_blk", &self.index_start_blk)
239 0 : .field("index_root_blk", &self.index_root_blk)
240 0 : .finish()
241 0 : }
242 : }
243 :
244 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
245 : impl std::fmt::Display for DeltaLayer {
246 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247 0 : write!(f, "{}", self.layer_desc().short_id())
248 0 : }
249 : }
250 :
251 : impl AsLayerDesc for DeltaLayer {
252 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
253 0 : &self.desc
254 0 : }
255 : }
256 :
257 : impl DeltaLayer {
258 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
259 0 : self.desc.dump();
260 0 :
261 0 : if !verbose {
262 0 : return Ok(());
263 0 : }
264 :
265 0 : let inner = self.load(ctx).await?;
266 :
267 0 : inner.dump(ctx).await
268 0 : }
269 :
270 1406 : fn temp_path_for(
271 1406 : conf: &PageServerConf,
272 1406 : tenant_shard_id: &TenantShardId,
273 1406 : timeline_id: &TimelineId,
274 1406 : key_start: Key,
275 1406 : lsn_range: &Range<Lsn>,
276 1406 : ) -> Utf8PathBuf {
277 1406 : let rand_string: String = rand::thread_rng()
278 1406 : .sample_iter(&Alphanumeric)
279 1406 : .take(8)
280 1406 : .map(char::from)
281 1406 : .collect();
282 1406 :
283 1406 : conf.timeline_path(tenant_shard_id, timeline_id)
284 1406 : .join(format!(
285 1406 : "{}-XXX__{:016X}-{:016X}.{}.{}",
286 1406 : key_start,
287 1406 : u64::from(lsn_range.start),
288 1406 : u64::from(lsn_range.end),
289 1406 : rand_string,
290 1406 : TEMP_FILE_SUFFIX,
291 1406 : ))
292 1406 : }
293 :
294 : ///
295 : /// Open the underlying file and read the metadata into memory, if it's
296 : /// not loaded already.
297 : ///
298 0 : async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
299 0 : // Quick exit if already loaded
300 0 : self.inner
301 0 : .get_or_try_init(|| self.load_inner(ctx))
302 0 : .await
303 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
304 0 : }
305 :
306 0 : async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
307 0 : let path = self.path();
308 :
309 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
310 :
311 : // not production code
312 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
313 0 : let expected_layer_name = self.layer_desc().layer_name();
314 0 :
315 0 : if actual_layer_name != expected_layer_name {
316 0 : println!("warning: filename does not match what is expected from in-file summary");
317 0 : println!("actual: {:?}", actual_layer_name.to_string());
318 0 : println!("expected: {:?}", expected_layer_name.to_string());
319 0 : }
320 :
321 0 : Ok(Arc::new(loaded))
322 0 : }
323 :
324 : /// Create a DeltaLayer struct representing an existing file on disk.
325 : ///
326 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
327 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
328 0 : let mut summary_buf = vec![0; PAGE_SZ];
329 0 : file.read_exact_at(&mut summary_buf, 0)?;
330 0 : let summary = Summary::des_prefix(&summary_buf)?;
331 :
332 0 : let metadata = file
333 0 : .metadata()
334 0 : .context("get file metadata to determine size")?;
335 :
336 : // This function is never used for constructing layers in a running pageserver,
337 : // so it does not need an accurate TenantShardId.
338 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
339 0 :
340 0 : Ok(DeltaLayer {
341 0 : path: path.to_path_buf(),
342 0 : desc: PersistentLayerDesc::new_delta(
343 0 : tenant_shard_id,
344 0 : summary.timeline_id,
345 0 : summary.key_range,
346 0 : summary.lsn_range,
347 0 : metadata.len(),
348 0 : ),
349 0 : inner: OnceCell::new(),
350 0 : })
351 0 : }
352 :
353 : /// Path to the layer file in pageserver workdir.
354 0 : fn path(&self) -> Utf8PathBuf {
355 0 : self.path.clone()
356 0 : }
357 : }
358 :
359 : /// A builder object for constructing a new delta layer.
360 : ///
361 : /// Usage:
362 : ///
363 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
364 : ///
365 : /// 2. Write the contents by calling `put_value` for every page
366 : /// version to store in the layer.
367 : ///
368 : /// 3. Call `finish`.
369 : ///
370 : struct DeltaLayerWriterInner {
371 : pub path: Utf8PathBuf,
372 : timeline_id: TimelineId,
373 : tenant_shard_id: TenantShardId,
374 :
375 : key_start: Key,
376 : lsn_range: Range<Lsn>,
377 :
378 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
379 :
380 : blob_writer: BlobWriter<true>,
381 :
382 : // Number of key-lsns in the layer.
383 : num_keys: usize,
384 : }
385 :
386 : impl DeltaLayerWriterInner {
387 : ///
388 : /// Start building a new delta layer.
389 : ///
390 1406 : async fn new(
391 1406 : conf: &'static PageServerConf,
392 1406 : timeline_id: TimelineId,
393 1406 : tenant_shard_id: TenantShardId,
394 1406 : key_start: Key,
395 1406 : lsn_range: Range<Lsn>,
396 1406 : ctx: &RequestContext,
397 1406 : ) -> anyhow::Result<Self> {
398 1406 : // Create the file initially with a temporary filename. We don't know
399 1406 : // the end key yet, so we cannot form the final filename yet. We will
400 1406 : // rename it when we're done.
401 1406 : //
402 1406 : // Note: This overwrites any existing file. There shouldn't be any.
403 1406 : // FIXME: throw an error instead?
404 1406 : let path =
405 1406 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
406 :
407 1406 : let mut file = VirtualFile::create(&path, ctx).await?;
408 : // make room for the header block
409 1406 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
410 1406 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
411 1406 :
412 1406 : // Initialize the b-tree index builder
413 1406 : let block_buf = BlockBuf::new();
414 1406 : let tree_builder = DiskBtreeBuilder::new(block_buf);
415 1406 :
416 1406 : Ok(Self {
417 1406 : path,
418 1406 : timeline_id,
419 1406 : tenant_shard_id,
420 1406 : key_start,
421 1406 : lsn_range,
422 1406 : tree: tree_builder,
423 1406 : blob_writer,
424 1406 : num_keys: 0,
425 1406 : })
426 1406 : }
427 :
428 : ///
429 : /// Append a key-value pair to the file.
430 : ///
431 : /// The values must be appended in key, lsn order.
432 : ///
433 2076256 : async fn put_value(
434 2076256 : &mut self,
435 2076256 : key: Key,
436 2076256 : lsn: Lsn,
437 2076256 : val: Value,
438 2076256 : ctx: &RequestContext,
439 2076256 : ) -> anyhow::Result<()> {
440 2076256 : let (_, res) = self
441 2076256 : .put_value_bytes(
442 2076256 : key,
443 2076256 : lsn,
444 2076256 : Value::ser(&val)?.slice_len(),
445 2076256 : val.will_init(),
446 2076256 : ctx,
447 : )
448 1472 : .await;
449 2076256 : res
450 2076256 : }
451 :
452 6462918 : async fn put_value_bytes<Buf>(
453 6462918 : &mut self,
454 6462918 : key: Key,
455 6462918 : lsn: Lsn,
456 6462918 : val: FullSlice<Buf>,
457 6462918 : will_init: bool,
458 6462918 : ctx: &RequestContext,
459 6462918 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
460 6462918 : where
461 6462918 : Buf: IoBufMut + Send,
462 6462918 : {
463 6462918 : assert!(
464 6462918 : self.lsn_range.start <= lsn,
465 0 : "lsn_start={}, lsn={}",
466 : self.lsn_range.start,
467 : lsn
468 : );
469 : // We don't want to use compression in delta layer creation
470 6462918 : let compression = ImageCompressionAlgorithm::Disabled;
471 6462918 : let (val, res) = self
472 6462918 : .blob_writer
473 6462918 : .write_blob_maybe_compressed(val, ctx, compression)
474 4423 : .await;
475 6462918 : let off = match res {
476 6462918 : Ok((off, _)) => off,
477 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
478 : };
479 :
480 6462918 : let blob_ref = BlobRef::new(off, will_init);
481 6462918 :
482 6462918 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
483 6462918 : let res = self.tree.append(&delta_key.0, blob_ref.0);
484 6462918 :
485 6462918 : self.num_keys += 1;
486 6462918 :
487 6462918 : (val, res.map_err(|e| anyhow::anyhow!(e)))
488 6462918 : }
489 :
490 2024004 : fn size(&self) -> u64 {
491 2024004 : self.blob_writer.size() + self.tree.borrow_writer().size()
492 2024004 : }
493 :
494 : ///
495 : /// Finish writing the delta layer.
496 : ///
497 1398 : async fn finish(
498 1398 : self,
499 1398 : key_end: Key,
500 1398 : ctx: &RequestContext,
501 1398 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
502 1398 : let temp_path = self.path.clone();
503 9666 : let result = self.finish0(key_end, ctx).await;
504 1398 : if result.is_err() {
505 0 : tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
506 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
507 0 : tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
508 0 : }
509 1398 : }
510 1398 : result
511 1398 : }
512 :
513 1398 : async fn finish0(
514 1398 : self,
515 1398 : key_end: Key,
516 1398 : ctx: &RequestContext,
517 1398 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
518 1398 : let index_start_blk =
519 1398 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
520 :
521 1398 : let mut file = self.blob_writer.into_inner(ctx).await?;
522 :
523 : // Write out the index
524 1398 : let (index_root_blk, block_buf) = self.tree.finish()?;
525 1398 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
526 0 : .await?;
527 15011 : for buf in block_buf.blocks {
528 13613 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
529 13613 : res?;
530 : }
531 1398 : assert!(self.lsn_range.start < self.lsn_range.end);
532 : // Fill in the summary on blk 0
533 1398 : let summary = Summary {
534 1398 : magic: DELTA_FILE_MAGIC,
535 1398 : format_version: STORAGE_FORMAT_VERSION,
536 1398 : tenant_id: self.tenant_shard_id.tenant_id,
537 1398 : timeline_id: self.timeline_id,
538 1398 : key_range: self.key_start..key_end,
539 1398 : lsn_range: self.lsn_range.clone(),
540 1398 : index_start_blk,
541 1398 : index_root_blk,
542 1398 : };
543 1398 :
544 1398 : let mut buf = Vec::with_capacity(PAGE_SZ);
545 1398 : // TODO: could use smallvec here but it's a pain with Slice<T>
546 1398 : Summary::ser_into(&summary, &mut buf)?;
547 1398 : file.seek(SeekFrom::Start(0)).await?;
548 1398 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
549 1398 : res?;
550 :
551 1398 : let metadata = file
552 1398 : .metadata()
553 700 : .await
554 1398 : .context("get file metadata to determine size")?;
555 :
556 : // 5GB limit for objects without multipart upload (which we don't want to use)
557 : // Make it a little bit below to account for differing GB units
558 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
559 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
560 1398 : ensure!(
561 1398 : metadata.len() <= S3_UPLOAD_LIMIT,
562 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
563 0 : file.path,
564 0 : metadata.len()
565 : );
566 :
567 : // Note: Because we opened the file in write-only mode, we cannot
568 : // reuse the same VirtualFile for reading later. That's why we don't
569 : // set inner.file here. The first read will have to re-open it.
570 :
571 1398 : let desc = PersistentLayerDesc::new_delta(
572 1398 : self.tenant_shard_id,
573 1398 : self.timeline_id,
574 1398 : self.key_start..key_end,
575 1398 : self.lsn_range.clone(),
576 1398 : metadata.len(),
577 1398 : );
578 1398 :
579 1398 : // fsync the file
580 1398 : file.sync_all().await?;
581 :
582 1398 : trace!("created delta layer {}", self.path);
583 :
584 1398 : Ok((desc, self.path))
585 1398 : }
586 : }
587 :
588 : /// A builder object for constructing a new delta layer.
589 : ///
590 : /// Usage:
591 : ///
592 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
593 : ///
594 : /// 2. Write the contents by calling `put_value` for every page
595 : /// version to store in the layer.
596 : ///
597 : /// 3. Call `finish`.
598 : ///
599 : /// # Note
600 : ///
601 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
602 : /// possible for the writer to drop before `finish` is actually called. So this
603 : /// could lead to odd temporary files in the directory, exhausting file system.
604 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
605 : /// implementation that cleans up the temporary file in failure. It's not
606 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
607 : /// out some fields, making it impossible to implement `Drop`.
608 : ///
609 : #[must_use]
610 : pub struct DeltaLayerWriter {
611 : inner: Option<DeltaLayerWriterInner>,
612 : }
613 :
614 : impl DeltaLayerWriter {
615 : ///
616 : /// Start building a new delta layer.
617 : ///
618 1406 : pub async fn new(
619 1406 : conf: &'static PageServerConf,
620 1406 : timeline_id: TimelineId,
621 1406 : tenant_shard_id: TenantShardId,
622 1406 : key_start: Key,
623 1406 : lsn_range: Range<Lsn>,
624 1406 : ctx: &RequestContext,
625 1406 : ) -> anyhow::Result<Self> {
626 1406 : Ok(Self {
627 1406 : inner: Some(
628 1406 : DeltaLayerWriterInner::new(
629 1406 : conf,
630 1406 : timeline_id,
631 1406 : tenant_shard_id,
632 1406 : key_start,
633 1406 : lsn_range,
634 1406 : ctx,
635 1406 : )
636 723 : .await?,
637 : ),
638 : })
639 1406 : }
640 :
641 : ///
642 : /// Append a key-value pair to the file.
643 : ///
644 : /// The values must be appended in key, lsn order.
645 : ///
646 2076256 : pub async fn put_value(
647 2076256 : &mut self,
648 2076256 : key: Key,
649 2076256 : lsn: Lsn,
650 2076256 : val: Value,
651 2076256 : ctx: &RequestContext,
652 2076256 : ) -> anyhow::Result<()> {
653 2076256 : self.inner
654 2076256 : .as_mut()
655 2076256 : .unwrap()
656 2076256 : .put_value(key, lsn, val, ctx)
657 1472 : .await
658 2076256 : }
659 :
660 4386662 : pub async fn put_value_bytes<Buf>(
661 4386662 : &mut self,
662 4386662 : key: Key,
663 4386662 : lsn: Lsn,
664 4386662 : val: FullSlice<Buf>,
665 4386662 : will_init: bool,
666 4386662 : ctx: &RequestContext,
667 4386662 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
668 4386662 : where
669 4386662 : Buf: IoBufMut + Send,
670 4386662 : {
671 4386662 : self.inner
672 4386662 : .as_mut()
673 4386662 : .unwrap()
674 4386662 : .put_value_bytes(key, lsn, val, will_init, ctx)
675 2951 : .await
676 4386662 : }
677 :
678 2024004 : pub fn size(&self) -> u64 {
679 2024004 : self.inner.as_ref().unwrap().size()
680 2024004 : }
681 :
682 : ///
683 : /// Finish writing the delta layer.
684 : ///
685 1398 : pub(crate) async fn finish(
686 1398 : mut self,
687 1398 : key_end: Key,
688 1398 : ctx: &RequestContext,
689 1398 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
690 9666 : self.inner.take().unwrap().finish(key_end, ctx).await
691 1398 : }
692 :
693 : #[cfg(test)]
694 4006 : pub(crate) fn num_keys(&self) -> usize {
695 4006 : self.inner.as_ref().unwrap().num_keys
696 4006 : }
697 :
698 : #[cfg(test)]
699 4000 : pub(crate) fn estimated_size(&self) -> u64 {
700 4000 : let inner = self.inner.as_ref().unwrap();
701 4000 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
702 4000 : }
703 : }
704 :
705 : impl Drop for DeltaLayerWriter {
706 1406 : fn drop(&mut self) {
707 1406 : if let Some(inner) = self.inner.take() {
708 8 : // We want to remove the virtual file here, so it's fine to not
709 8 : // having completely flushed unwritten data.
710 8 : let vfile = inner.blob_writer.into_inner_no_flush();
711 8 : vfile.remove();
712 1398 : }
713 1406 : }
714 : }
715 :
716 0 : #[derive(thiserror::Error, Debug)]
717 : pub enum RewriteSummaryError {
718 : #[error("magic mismatch")]
719 : MagicMismatch,
720 : #[error(transparent)]
721 : Other(#[from] anyhow::Error),
722 : }
723 :
724 : impl From<std::io::Error> for RewriteSummaryError {
725 0 : fn from(e: std::io::Error) -> Self {
726 0 : Self::Other(anyhow::anyhow!(e))
727 0 : }
728 : }
729 :
730 : impl DeltaLayer {
731 0 : pub async fn rewrite_summary<F>(
732 0 : path: &Utf8Path,
733 0 : rewrite: F,
734 0 : ctx: &RequestContext,
735 0 : ) -> Result<(), RewriteSummaryError>
736 0 : where
737 0 : F: Fn(Summary) -> Summary,
738 0 : {
739 0 : let mut file = VirtualFile::open_with_options(
740 0 : path,
741 0 : virtual_file::OpenOptions::new().read(true).write(true),
742 0 : ctx,
743 0 : )
744 0 : .await
745 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
746 0 : let file_id = page_cache::next_file_id();
747 0 : let block_reader = FileBlockReader::new(&file, file_id);
748 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
749 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
750 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
751 0 : return Err(RewriteSummaryError::MagicMismatch);
752 0 : }
753 0 :
754 0 : let new_summary = rewrite(actual_summary);
755 0 :
756 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
757 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
758 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
759 0 : file.seek(SeekFrom::Start(0)).await?;
760 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
761 0 : res?;
762 0 : Ok(())
763 0 : }
764 : }
765 :
766 : impl DeltaLayerInner {
767 482 : pub(crate) fn key_range(&self) -> &Range<Key> {
768 482 : &self.layer_key_range
769 482 : }
770 :
771 482 : pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
772 482 : &self.layer_lsn_range
773 482 : }
774 :
775 1050 : pub(super) async fn load(
776 1050 : path: &Utf8Path,
777 1050 : summary: Option<Summary>,
778 1050 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
779 1050 : ctx: &RequestContext,
780 1050 : ) -> anyhow::Result<Self> {
781 1050 : let file = VirtualFile::open(path, ctx)
782 525 : .await
783 1050 : .context("open layer file")?;
784 :
785 1050 : let file_id = page_cache::next_file_id();
786 1050 :
787 1050 : let block_reader = FileBlockReader::new(&file, file_id);
788 :
789 1050 : let summary_blk = block_reader
790 1050 : .read_blk(0, ctx)
791 529 : .await
792 1050 : .context("read first block")?;
793 :
794 : // TODO: this should be an assertion instead; see ImageLayerInner::load
795 1050 : let actual_summary =
796 1050 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
797 :
798 1050 : if let Some(mut expected_summary) = summary {
799 : // production code path
800 1050 : expected_summary.index_start_blk = actual_summary.index_start_blk;
801 1050 : expected_summary.index_root_blk = actual_summary.index_root_blk;
802 1050 : // mask out the timeline_id, but still require the layers to be from the same tenant
803 1050 : expected_summary.timeline_id = actual_summary.timeline_id;
804 1050 :
805 1050 : if actual_summary != expected_summary {
806 0 : bail!(
807 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
808 0 : actual_summary,
809 0 : expected_summary
810 0 : );
811 1050 : }
812 0 : }
813 :
814 1050 : Ok(DeltaLayerInner {
815 1050 : file,
816 1050 : file_id,
817 1050 : index_start_blk: actual_summary.index_start_blk,
818 1050 : index_root_blk: actual_summary.index_root_blk,
819 1050 : max_vectored_read_bytes,
820 1050 : layer_key_range: actual_summary.key_range,
821 1050 : layer_lsn_range: actual_summary.lsn_range,
822 1050 : })
823 1050 : }
824 :
825 : // Look up the keys in the provided keyspace and update
826 : // the reconstruct state with whatever is found.
827 : //
828 : // If the key is cached, go no further than the cached Lsn.
829 : //
830 : // Currently, the index is visited for each range, but this
831 : // can be further optimised to visit the index only once.
832 204452 : pub(super) async fn get_values_reconstruct_data(
833 204452 : &self,
834 204452 : keyspace: KeySpace,
835 204452 : lsn_range: Range<Lsn>,
836 204452 : reconstruct_state: &mut ValuesReconstructState,
837 204452 : ctx: &RequestContext,
838 204452 : ) -> Result<(), GetVectoredError> {
839 204452 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
840 204452 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
841 204452 : self.index_start_blk,
842 204452 : self.index_root_blk,
843 204452 : block_reader,
844 204452 : );
845 204452 :
846 204452 : let planner = VectoredReadPlanner::new(
847 204452 : self.max_vectored_read_bytes
848 204452 : .expect("Layer is loaded with max vectored bytes config")
849 204452 : .0
850 204452 : .into(),
851 204452 : );
852 204452 :
853 204452 : let data_end_offset = self.index_start_offset();
854 :
855 204452 : let reads = Self::plan_reads(
856 204452 : &keyspace,
857 204452 : lsn_range.clone(),
858 204452 : data_end_offset,
859 204452 : index_reader,
860 204452 : planner,
861 204452 : reconstruct_state,
862 204452 : ctx,
863 204452 : )
864 21204 : .await
865 204452 : .map_err(GetVectoredError::Other)?;
866 :
867 204452 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
868 70859 : .await;
869 :
870 204452 : reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
871 204452 :
872 204452 : Ok(())
873 204452 : }
874 :
875 : /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
876 0 : pub(super) async fn load_key_values(
877 0 : &self,
878 0 : ctx: &RequestContext,
879 0 : ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
880 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
881 0 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
882 0 : self.index_start_blk,
883 0 : self.index_root_blk,
884 0 : block_reader,
885 0 : );
886 0 : let mut result = Vec::new();
887 0 : let mut stream =
888 0 : Box::pin(self.stream_index_forwards(index_reader, &[0; DELTA_KEY_SIZE], ctx));
889 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
890 0 : let cursor = block_reader.block_cursor();
891 0 : let mut buf = Vec::new();
892 0 : while let Some(item) = stream.next().await {
893 0 : let (key, lsn, pos) = item?;
894 : // TODO: dedup code with get_reconstruct_value
895 : // TODO: ctx handling and sharding
896 0 : cursor
897 0 : .read_blob_into_buf(pos.pos(), &mut buf, ctx)
898 0 : .await
899 0 : .with_context(|| {
900 0 : format!("Failed to read blob from virtual file {}", self.file.path)
901 0 : })?;
902 0 : let val = Value::des(&buf).with_context(|| {
903 0 : format!(
904 0 : "Failed to deserialize file blob from virtual file {}",
905 0 : self.file.path
906 0 : )
907 0 : })?;
908 0 : result.push((key, lsn, val));
909 : }
910 0 : Ok(result)
911 0 : }
912 :
913 204654 : async fn plan_reads<Reader>(
914 204654 : keyspace: &KeySpace,
915 204654 : lsn_range: Range<Lsn>,
916 204654 : data_end_offset: u64,
917 204654 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
918 204654 : mut planner: VectoredReadPlanner,
919 204654 : reconstruct_state: &mut ValuesReconstructState,
920 204654 : ctx: &RequestContext,
921 204654 : ) -> anyhow::Result<Vec<VectoredRead>>
922 204654 : where
923 204654 : Reader: BlockReader + Clone,
924 204654 : {
925 204654 : let ctx = RequestContextBuilder::extend(ctx)
926 204654 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
927 204654 : .build();
928 :
929 246837 : for range in keyspace.ranges.iter() {
930 246837 : let mut range_end_handled = false;
931 246837 :
932 246837 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
933 246837 : let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
934 246837 : let mut index_stream = std::pin::pin!(index_stream);
935 :
936 593100 : while let Some(index_entry) = index_stream.next().await {
937 584202 : let (raw_key, value) = index_entry?;
938 584202 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
939 584202 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
940 584202 : let blob_ref = BlobRef(value);
941 584202 :
942 584202 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
943 584202 : assert!(key >= range.start);
944 :
945 584202 : let outside_lsn_range = !lsn_range.contains(&lsn);
946 584202 : let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
947 :
948 584202 : let flag = {
949 584202 : if outside_lsn_range || below_cached_lsn {
950 72774 : BlobFlag::Ignore
951 511428 : } else if blob_ref.will_init() {
952 453366 : BlobFlag::ReplaceAll
953 : } else {
954 : // Usual path: add blob to the read
955 58062 : BlobFlag::None
956 : }
957 : };
958 :
959 584202 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
960 237939 : planner.handle_range_end(blob_ref.pos());
961 237939 : range_end_handled = true;
962 237939 : break;
963 346263 : } else {
964 346263 : planner.handle(key, lsn, blob_ref.pos(), flag);
965 346263 : }
966 : }
967 :
968 246837 : if !range_end_handled {
969 8898 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
970 8898 : planner.handle_range_end(data_end_offset);
971 237939 : }
972 : }
973 :
974 204654 : Ok(planner.finish())
975 204654 : }
976 :
977 204652 : fn get_min_read_buffer_size(
978 204652 : planned_reads: &[VectoredRead],
979 204652 : read_size_soft_max: usize,
980 204652 : ) -> usize {
981 204652 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
982 82445 : return read_size_soft_max;
983 : };
984 :
985 122207 : let largest_read_size = largest_read.size();
986 122207 : if largest_read_size > read_size_soft_max {
987 : // If the read is oversized, it should only contain one key.
988 200 : let offenders = largest_read
989 200 : .blobs_at
990 200 : .as_slice()
991 200 : .iter()
992 200 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
993 200 : .join(", ");
994 200 : tracing::warn!(
995 0 : "Oversized vectored read ({} > {}) for keys {}",
996 : largest_read_size,
997 : read_size_soft_max,
998 : offenders
999 : );
1000 122007 : }
1001 :
1002 122207 : largest_read_size
1003 204652 : }
1004 :
1005 204452 : async fn do_reads_and_update_state(
1006 204452 : &self,
1007 204452 : reads: Vec<VectoredRead>,
1008 204452 : reconstruct_state: &mut ValuesReconstructState,
1009 204452 : ctx: &RequestContext,
1010 204452 : ) {
1011 204452 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
1012 204452 : let mut ignore_key_with_err = None;
1013 204452 :
1014 204452 : let max_vectored_read_bytes = self
1015 204452 : .max_vectored_read_bytes
1016 204452 : .expect("Layer is loaded with max vectored bytes config")
1017 204452 : .0
1018 204452 : .into();
1019 204452 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
1020 204452 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1021 :
1022 : // Note that reads are processed in reverse order (from highest key+lsn).
1023 : // This is the order that `ReconstructState` requires such that it can
1024 : // track when a key is done.
1025 204452 : for read in reads.into_iter().rev() {
1026 139164 : let res = vectored_blob_reader
1027 139164 : .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
1028 70859 : .await;
1029 :
1030 139164 : let blobs_buf = match res {
1031 139164 : Ok(blobs_buf) => blobs_buf,
1032 0 : Err(err) => {
1033 0 : let kind = err.kind();
1034 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
1035 0 : reconstruct_state.on_key_error(
1036 0 : blob_meta.key,
1037 0 : PageReconstructError::Other(anyhow!(
1038 0 : "Failed to read blobs from virtual file {}: {}",
1039 0 : self.file.path,
1040 0 : kind
1041 0 : )),
1042 0 : );
1043 0 : }
1044 :
1045 : // We have "lost" the buffer since the lower level IO api
1046 : // doesn't return the buffer on error. Allocate a new one.
1047 0 : buf = Some(BytesMut::with_capacity(buf_size));
1048 0 :
1049 0 : continue;
1050 : }
1051 : };
1052 :
1053 149999 : for meta in blobs_buf.blobs.iter().rev() {
1054 149999 : if Some(meta.meta.key) == ignore_key_with_err {
1055 0 : continue;
1056 149999 : }
1057 149999 :
1058 149999 : let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
1059 149999 : let value = match value {
1060 149999 : Ok(v) => v,
1061 0 : Err(e) => {
1062 0 : reconstruct_state.on_key_error(
1063 0 : meta.meta.key,
1064 0 : PageReconstructError::Other(anyhow!(e).context(format!(
1065 0 : "Failed to deserialize blob from virtual file {}",
1066 0 : self.file.path,
1067 0 : ))),
1068 0 : );
1069 0 :
1070 0 : ignore_key_with_err = Some(meta.meta.key);
1071 0 : continue;
1072 : }
1073 : };
1074 :
1075 : // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
1076 : // state, no further updates shall be made to it. The call below will
1077 : // panic if the invariant is violated.
1078 149999 : reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
1079 : }
1080 :
1081 139164 : buf = Some(blobs_buf.buf);
1082 : }
1083 204452 : }
1084 :
1085 406 : pub(super) async fn load_keys<'a>(
1086 406 : &'a self,
1087 406 : ctx: &RequestContext,
1088 406 : ) -> Result<Vec<DeltaEntry<'a>>> {
1089 406 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1090 406 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1091 406 : self.index_start_blk,
1092 406 : self.index_root_blk,
1093 406 : block_reader,
1094 406 : );
1095 406 :
1096 406 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1097 406 :
1098 406 : tree_reader
1099 406 : .visit(
1100 406 : &[0u8; DELTA_KEY_SIZE],
1101 406 : VisitDirection::Forwards,
1102 2064046 : |key, value| {
1103 2064046 : let delta_key = DeltaKey::from_slice(key);
1104 2064046 : let val_ref = ValueRef {
1105 2064046 : blob_ref: BlobRef(value),
1106 2064046 : layer: self,
1107 2064046 : };
1108 2064046 : let pos = BlobRef(value).pos();
1109 2064046 : if let Some(last) = all_keys.last_mut() {
1110 2063640 : // subtract offset of the current and last entries to get the size
1111 2063640 : // of the value associated with this (key, lsn) tuple
1112 2063640 : let first_pos = last.size;
1113 2063640 : last.size = pos - first_pos;
1114 2063640 : }
1115 2064046 : let entry = DeltaEntry {
1116 2064046 : key: delta_key.key(),
1117 2064046 : lsn: delta_key.lsn(),
1118 2064046 : size: pos,
1119 2064046 : val: val_ref,
1120 2064046 : };
1121 2064046 : all_keys.push(entry);
1122 2064046 : true
1123 2064046 : },
1124 406 : &RequestContextBuilder::extend(ctx)
1125 406 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1126 406 : .build(),
1127 406 : )
1128 2122 : .await?;
1129 406 : if let Some(last) = all_keys.last_mut() {
1130 406 : // Last key occupies all space till end of value storage,
1131 406 : // which corresponds to beginning of the index
1132 406 : last.size = self.index_start_offset() - last.size;
1133 406 : }
1134 406 : Ok(all_keys)
1135 406 : }
1136 :
1137 : /// Using the given writer, write out a version which has the earlier Lsns than `until`.
1138 : ///
1139 : /// Return the amount of key value records pushed to the writer.
1140 10 : pub(super) async fn copy_prefix(
1141 10 : &self,
1142 10 : writer: &mut DeltaLayerWriter,
1143 10 : until: Lsn,
1144 10 : ctx: &RequestContext,
1145 10 : ) -> anyhow::Result<usize> {
1146 10 : use crate::tenant::vectored_blob_io::{
1147 10 : BlobMeta, VectoredReadBuilder, VectoredReadExtended,
1148 10 : };
1149 10 : use futures::stream::TryStreamExt;
1150 10 :
1151 10 : #[derive(Debug)]
1152 10 : enum Item {
1153 10 : Actual(Key, Lsn, BlobRef),
1154 10 : Sentinel,
1155 10 : }
1156 10 :
1157 10 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1158 70 : fn from(value: Item) -> Self {
1159 70 : match value {
1160 60 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1161 10 : Item::Sentinel => None,
1162 10 : }
1163 70 : }
1164 10 : }
1165 10 :
1166 10 : impl Item {
1167 70 : fn offset(&self) -> Option<BlobRef> {
1168 70 : match self {
1169 60 : Item::Actual(_, _, blob) => Some(*blob),
1170 10 : Item::Sentinel => None,
1171 10 : }
1172 70 : }
1173 10 :
1174 70 : fn is_last(&self) -> bool {
1175 70 : matches!(self, Item::Sentinel)
1176 70 : }
1177 10 : }
1178 10 :
1179 10 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1180 10 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1181 10 : self.index_start_blk,
1182 10 : self.index_root_blk,
1183 10 : block_reader,
1184 10 : );
1185 10 :
1186 10 : let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1187 60 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1188 10 : // put in a sentinel value for getting the end offset for last item, and not having to
1189 10 : // repeat the whole read part
1190 10 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1191 10 : Item::Sentinel,
1192 10 : ))));
1193 10 : let mut stream = std::pin::pin!(stream);
1194 10 :
1195 10 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1196 10 :
1197 10 : let mut read_builder: Option<VectoredReadBuilder> = None;
1198 10 :
1199 10 : let max_read_size = self
1200 10 : .max_vectored_read_bytes
1201 10 : .map(|x| x.0.get())
1202 10 : .unwrap_or(8192);
1203 10 :
1204 10 : let mut buffer = Some(BytesMut::with_capacity(max_read_size));
1205 10 :
1206 10 : // FIXME: buffering of DeltaLayerWriter
1207 10 : let mut per_blob_copy = Vec::new();
1208 10 :
1209 10 : let mut records = 0;
1210 :
1211 80 : while let Some(item) = stream.try_next().await? {
1212 70 : tracing::debug!(?item, "popped");
1213 70 : let offset = item
1214 70 : .offset()
1215 70 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1216 :
1217 70 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1218 60 : let end_offset = offset;
1219 60 :
1220 60 : Some((BlobMeta { key, lsn }, start_offset..end_offset))
1221 : } else {
1222 10 : None
1223 : };
1224 :
1225 70 : let is_last = item.is_last();
1226 70 :
1227 70 : prev = Option::from(item);
1228 70 :
1229 70 : let actionable = actionable.filter(|x| x.0.lsn < until);
1230 :
1231 70 : let builder = if let Some((meta, offsets)) = actionable {
1232 : // extend or create a new builder
1233 32 : if read_builder
1234 32 : .as_mut()
1235 32 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1236 32 : .unwrap_or(VectoredReadExtended::No)
1237 32 : == VectoredReadExtended::Yes
1238 : {
1239 16 : None
1240 : } else {
1241 16 : read_builder.replace(VectoredReadBuilder::new(
1242 16 : offsets.start.pos(),
1243 16 : offsets.end.pos(),
1244 16 : meta,
1245 16 : max_read_size,
1246 16 : ))
1247 : }
1248 : } else {
1249 : // nothing to do, except perhaps flush any existing for the last element
1250 38 : None
1251 : };
1252 :
1253 : // flush the possible older builder and also the new one if the item was the last one
1254 70 : let builders = builder.into_iter();
1255 70 : let builders = if is_last {
1256 10 : builders.chain(read_builder.take())
1257 : } else {
1258 60 : builders.chain(None)
1259 : };
1260 :
1261 86 : for builder in builders {
1262 16 : let read = builder.build();
1263 16 :
1264 16 : let reader = VectoredBlobReader::new(&self.file);
1265 16 :
1266 16 : let mut buf = buffer.take().unwrap();
1267 16 :
1268 16 : buf.clear();
1269 16 : buf.reserve(read.size());
1270 16 : let res = reader.read_blobs(&read, buf, ctx).await?;
1271 :
1272 48 : for blob in res.blobs {
1273 32 : let key = blob.meta.key;
1274 32 : let lsn = blob.meta.lsn;
1275 32 : let data = &res.buf[blob.start..blob.end];
1276 32 :
1277 32 : #[cfg(debug_assertions)]
1278 32 : Value::des(data)
1279 32 : .with_context(|| {
1280 0 : format!(
1281 0 : "blob failed to deserialize for {}@{}, {}..{}: {:?}",
1282 0 : blob.meta.key,
1283 0 : blob.meta.lsn,
1284 0 : blob.start,
1285 0 : blob.end,
1286 0 : utils::Hex(data)
1287 0 : )
1288 32 : })
1289 32 : .unwrap();
1290 32 :
1291 32 : // is it an image or will_init walrecord?
1292 32 : // FIXME: this could be handled by threading the BlobRef to the
1293 32 : // VectoredReadBuilder
1294 32 : let will_init = crate::repository::ValueBytes::will_init(data)
1295 32 : .inspect_err(|_e| {
1296 0 : #[cfg(feature = "testing")]
1297 0 : tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
1298 32 : })
1299 32 : .unwrap_or(false);
1300 32 :
1301 32 : per_blob_copy.clear();
1302 32 : per_blob_copy.extend_from_slice(data);
1303 :
1304 32 : let (tmp, res) = writer
1305 32 : .put_value_bytes(
1306 32 : key,
1307 32 : lsn,
1308 32 : std::mem::take(&mut per_blob_copy).slice_len(),
1309 32 : will_init,
1310 32 : ctx,
1311 32 : )
1312 4 : .await;
1313 32 : per_blob_copy = tmp.into_raw_slice().into_inner();
1314 32 :
1315 32 : res?;
1316 :
1317 32 : records += 1;
1318 : }
1319 :
1320 16 : buffer = Some(res.buf);
1321 : }
1322 : }
1323 :
1324 10 : assert!(
1325 10 : read_builder.is_none(),
1326 0 : "with the sentinel above loop should had handled all"
1327 : );
1328 :
1329 10 : Ok(records)
1330 10 : }
1331 :
1332 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1333 4 : println!(
1334 4 : "index_start_blk: {}, root {}",
1335 4 : self.index_start_blk, self.index_root_blk
1336 4 : );
1337 4 :
1338 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1339 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1340 4 : self.index_start_blk,
1341 4 : self.index_root_blk,
1342 4 : block_reader,
1343 4 : );
1344 4 :
1345 4 : tree_reader.dump().await?;
1346 :
1347 4 : let keys = self.load_keys(ctx).await?;
1348 :
1349 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1350 8 : let buf = val.load_raw(ctx).await?;
1351 8 : let val = Value::des(&buf)?;
1352 8 : let desc = match val {
1353 8 : Value::Image(img) => {
1354 8 : format!(" img {} bytes", img.len())
1355 : }
1356 0 : Value::WalRecord(rec) => {
1357 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
1358 0 : format!(
1359 0 : " rec {} bytes will_init: {} {}",
1360 0 : buf.len(),
1361 0 : rec.will_init(),
1362 0 : wal_desc
1363 0 : )
1364 : }
1365 : };
1366 8 : Ok(desc)
1367 8 : }
1368 :
1369 12 : for entry in keys {
1370 8 : let DeltaEntry { key, lsn, val, .. } = entry;
1371 8 : let desc = match dump_blob(&val, ctx).await {
1372 8 : Ok(desc) => desc,
1373 0 : Err(err) => {
1374 0 : format!("ERROR: {err}")
1375 : }
1376 : };
1377 8 : println!(" key {key} at {lsn}: {desc}");
1378 8 :
1379 8 : // Print more details about CHECKPOINT records. Would be nice to print details
1380 8 : // of many other record types too, but these are particularly interesting, as
1381 8 : // have a lot of special processing for them in walingest.rs.
1382 8 : use pageserver_api::key::CHECKPOINT_KEY;
1383 8 : use postgres_ffi::CheckPoint;
1384 8 : if key == CHECKPOINT_KEY {
1385 0 : let val = val.load(ctx).await?;
1386 0 : match val {
1387 0 : Value::Image(img) => {
1388 0 : let checkpoint = CheckPoint::decode(&img)?;
1389 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1390 : }
1391 0 : Value::WalRecord(_rec) => {
1392 0 : println!(" unexpected walrecord value for checkpoint key");
1393 0 : }
1394 : }
1395 8 : }
1396 : }
1397 :
1398 4 : Ok(())
1399 4 : }
1400 :
1401 30 : fn stream_index_forwards<'a, R>(
1402 30 : &'a self,
1403 30 : reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
1404 30 : start: &'a [u8; DELTA_KEY_SIZE],
1405 30 : ctx: &'a RequestContext,
1406 30 : ) -> impl futures::stream::Stream<
1407 30 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1408 30 : > + 'a
1409 30 : where
1410 30 : R: BlockReader + 'a,
1411 30 : {
1412 30 : use futures::stream::TryStreamExt;
1413 30 : let stream = reader.into_stream(start, ctx);
1414 152 : stream.map_ok(|(key, value)| {
1415 152 : let key = DeltaKey::from_slice(&key);
1416 152 : let (key, lsn) = (key.key(), key.lsn());
1417 152 : let offset = BlobRef(value);
1418 152 :
1419 152 : (key, lsn, offset)
1420 152 : })
1421 30 : }
1422 :
1423 : /// The file offset to the first block of index.
1424 : ///
1425 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1426 205438 : fn index_start_offset(&self) -> u64 {
1427 205438 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1428 205438 : let bref = BlobRef(offset);
1429 205438 : tracing::debug!(
1430 : index_start_blk = self.index_start_blk,
1431 : offset,
1432 0 : pos = bref.pos(),
1433 0 : "index_start_offset"
1434 : );
1435 205438 : offset
1436 205438 : }
1437 :
1438 538 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
1439 538 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1440 538 : let tree_reader =
1441 538 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
1442 538 : DeltaLayerIterator {
1443 538 : delta_layer: self,
1444 538 : ctx,
1445 538 : index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
1446 538 : key_values_batch: std::collections::VecDeque::new(),
1447 538 : is_end: false,
1448 538 : planner: StreamingVectoredReadPlanner::new(
1449 538 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1450 538 : 1024, // The default value. Unit tests might use a different value
1451 538 : ),
1452 538 : }
1453 538 : }
1454 : }
1455 :
1456 : /// A set of data associated with a delta layer key and its value
1457 : pub struct DeltaEntry<'a> {
1458 : pub key: Key,
1459 : pub lsn: Lsn,
1460 : /// Size of the stored value
1461 : pub size: u64,
1462 : /// Reference to the on-disk value
1463 : pub val: ValueRef<'a>,
1464 : }
1465 :
1466 : /// Reference to an on-disk value
1467 : pub struct ValueRef<'a> {
1468 : blob_ref: BlobRef,
1469 : layer: &'a DeltaLayerInner,
1470 : }
1471 :
1472 : impl<'a> ValueRef<'a> {
1473 : /// Loads the value from disk
1474 2064038 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1475 2064038 : let buf = self.load_raw(ctx).await?;
1476 2064038 : let val = Value::des(&buf)?;
1477 2064038 : Ok(val)
1478 2064038 : }
1479 :
1480 2064046 : async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
1481 2064046 : let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
1482 2064046 : self.layer,
1483 2064046 : )));
1484 2064046 : let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
1485 2064046 : Ok(buf)
1486 2064046 : }
1487 : }
1488 :
1489 : pub(crate) struct Adapter<T>(T);
1490 :
1491 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1492 2083002 : pub(crate) async fn read_blk(
1493 2083002 : &self,
1494 2083002 : blknum: u32,
1495 2083002 : ctx: &RequestContext,
1496 2083002 : ) -> Result<BlockLease, std::io::Error> {
1497 2083002 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1498 2083002 : block_reader.read_blk(blknum, ctx).await
1499 2083002 : }
1500 : }
1501 :
1502 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1503 4166004 : fn as_ref(&self) -> &DeltaLayerInner {
1504 4166004 : self
1505 4166004 : }
1506 : }
1507 :
1508 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1509 0 : fn key(&self) -> Key {
1510 0 : self.key
1511 0 : }
1512 0 : fn lsn(&self) -> Lsn {
1513 0 : self.lsn
1514 0 : }
1515 0 : fn size(&self) -> u64 {
1516 0 : self.size
1517 0 : }
1518 : }
1519 :
1520 : pub struct DeltaLayerIterator<'a> {
1521 : delta_layer: &'a DeltaLayerInner,
1522 : ctx: &'a RequestContext,
1523 : planner: StreamingVectoredReadPlanner,
1524 : index_iter: DiskBtreeIterator<'a>,
1525 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1526 : is_end: bool,
1527 : }
1528 :
1529 : impl<'a> DeltaLayerIterator<'a> {
1530 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1531 21228 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1532 21228 : assert!(self.key_values_batch.is_empty());
1533 21228 : assert!(!self.is_end);
1534 :
1535 21228 : let plan = loop {
1536 2099034 : if let Some(res) = self.index_iter.next().await {
1537 2098524 : let (raw_key, value) = res?;
1538 2098524 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
1539 2098524 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
1540 2098524 : let blob_ref = BlobRef(value);
1541 2098524 : let offset = blob_ref.pos();
1542 2098524 : if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
1543 20718 : break batch_plan;
1544 2077806 : }
1545 : } else {
1546 510 : self.is_end = true;
1547 510 : let data_end_offset = self.delta_layer.index_start_offset();
1548 510 : if let Some(item) = self.planner.handle_range_end(data_end_offset) {
1549 510 : break item;
1550 : } else {
1551 0 : return Ok(()); // TODO: test empty iterator
1552 : }
1553 : }
1554 : };
1555 21228 : let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
1556 21228 : let mut next_batch = std::collections::VecDeque::new();
1557 21228 : let buf_size = plan.size();
1558 21228 : let buf = BytesMut::with_capacity(buf_size);
1559 21228 : let blobs_buf = vectored_blob_reader
1560 21228 : .read_blobs(&plan, buf, self.ctx)
1561 10838 : .await?;
1562 21228 : let frozen_buf = blobs_buf.buf.freeze();
1563 2098496 : for meta in blobs_buf.blobs.iter() {
1564 2098496 : let value = Value::des(&frozen_buf[meta.start..meta.end])?;
1565 2098496 : next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
1566 : }
1567 21228 : self.key_values_batch = next_batch;
1568 21228 : Ok(())
1569 21228 : }
1570 :
1571 2099218 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1572 2099218 : if self.key_values_batch.is_empty() {
1573 22136 : if self.is_end {
1574 992 : return Ok(None);
1575 21144 : }
1576 21144 : self.next_batch().await?;
1577 2077082 : }
1578 2098226 : Ok(Some(
1579 2098226 : self.key_values_batch
1580 2098226 : .pop_front()
1581 2098226 : .expect("should not be empty"),
1582 2098226 : ))
1583 2099218 : }
1584 : }
1585 :
1586 : #[cfg(test)]
1587 : pub(crate) mod test {
1588 : use std::collections::BTreeMap;
1589 :
1590 : use itertools::MinMaxResult;
1591 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1592 : use rand::RngCore;
1593 :
1594 : use super::*;
1595 : use crate::repository::Value;
1596 : use crate::tenant::harness::TIMELINE_ID;
1597 : use crate::tenant::storage_layer::{Layer, ResidentLayer};
1598 : use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
1599 : use crate::tenant::{Tenant, Timeline};
1600 : use crate::{
1601 : context::DownloadBehavior,
1602 : task_mgr::TaskKind,
1603 : tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
1604 : DEFAULT_PG_VERSION,
1605 : };
1606 : use bytes::Bytes;
1607 :
1608 : /// Construct an index for a fictional delta layer and and then
1609 : /// traverse in order to plan vectored reads for a query. Finally,
1610 : /// verify that the traversal fed the right index key and value
1611 : /// pairs into the planner.
1612 : #[tokio::test]
1613 2 : async fn test_delta_layer_index_traversal() {
1614 2 : let base_key = Key {
1615 2 : field1: 0,
1616 2 : field2: 1663,
1617 2 : field3: 12972,
1618 2 : field4: 16396,
1619 2 : field5: 0,
1620 2 : field6: 246080,
1621 2 : };
1622 2 :
1623 2 : // Populate the index with some entries
1624 2 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1625 2 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1626 2 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1627 2 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1628 2 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1629 2 : ]);
1630 2 :
1631 2 : let mut disk = TestDisk::default();
1632 2 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1633 2 :
1634 2 : let mut disk_offset = 0;
1635 10 : for (key, lsns) in &entries {
1636 42 : for lsn in lsns {
1637 34 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1638 34 : let blob_ref = BlobRef::new(disk_offset, false);
1639 34 : writer
1640 34 : .append(&index_key.0, blob_ref.0)
1641 34 : .expect("In memory disk append should never fail");
1642 34 :
1643 34 : disk_offset += 1;
1644 34 : }
1645 2 : }
1646 2 :
1647 2 : // Prepare all the arguments for the call into `plan_reads` below
1648 2 : let (root_offset, _writer) = writer
1649 2 : .finish()
1650 2 : .expect("In memory disk finish should never fail");
1651 2 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1652 2 : let planner = VectoredReadPlanner::new(100);
1653 2 : let mut reconstruct_state = ValuesReconstructState::new();
1654 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1655 2 :
1656 2 : let keyspace = KeySpace {
1657 2 : ranges: vec![
1658 2 : base_key..base_key.add(3),
1659 2 : base_key.add(3)..base_key.add(100),
1660 2 : ],
1661 2 : };
1662 2 : let lsn_range = Lsn(2)..Lsn(40);
1663 2 :
1664 2 : // Plan and validate
1665 2 : let vectored_reads = DeltaLayerInner::plan_reads(
1666 2 : &keyspace,
1667 2 : lsn_range.clone(),
1668 2 : disk_offset,
1669 2 : reader,
1670 2 : planner,
1671 2 : &mut reconstruct_state,
1672 2 : &ctx,
1673 2 : )
1674 2 : .await
1675 2 : .expect("Read planning should not fail");
1676 2 :
1677 2 : validate(keyspace, lsn_range, vectored_reads, entries);
1678 2 : }
1679 :
1680 2 : fn validate(
1681 2 : keyspace: KeySpace,
1682 2 : lsn_range: Range<Lsn>,
1683 2 : vectored_reads: Vec<VectoredRead>,
1684 2 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1685 2 : ) {
1686 2 : #[derive(Debug, PartialEq, Eq)]
1687 2 : struct BlobSpec {
1688 2 : key: Key,
1689 2 : lsn: Lsn,
1690 2 : at: u64,
1691 2 : }
1692 2 :
1693 2 : let mut planned_blobs = Vec::new();
1694 8 : for read in vectored_reads {
1695 28 : for (at, meta) in read.blobs_at.as_slice() {
1696 28 : planned_blobs.push(BlobSpec {
1697 28 : key: meta.key,
1698 28 : lsn: meta.lsn,
1699 28 : at: *at,
1700 28 : });
1701 28 : }
1702 : }
1703 :
1704 2 : let mut expected_blobs = Vec::new();
1705 2 : let mut disk_offset = 0;
1706 10 : for (key, lsns) in index_entries {
1707 42 : for lsn in lsns {
1708 42 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1709 34 : let lsn_included = lsn_range.contains(&lsn);
1710 34 :
1711 34 : if key_included && lsn_included {
1712 28 : expected_blobs.push(BlobSpec {
1713 28 : key,
1714 28 : lsn,
1715 28 : at: disk_offset,
1716 28 : });
1717 28 : }
1718 :
1719 34 : disk_offset += 1;
1720 : }
1721 : }
1722 :
1723 2 : assert_eq!(planned_blobs, expected_blobs);
1724 2 : }
1725 :
1726 : mod constants {
1727 : use utils::lsn::Lsn;
1728 :
1729 : /// Offset used by all lsns in this test
1730 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1731 : /// Number of unique keys including in the test data
1732 : pub(super) const KEY_COUNT: u8 = 60;
1733 : /// Max number of different lsns for each key
1734 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1735 : /// Possible value sizes for each key along with a probability weight
1736 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1737 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1738 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1739 : /// The minimum size of a key range in all the generated reads
1740 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1741 : /// The number of ranges included in each vectored read
1742 : pub(super) const RANGES_COUNT: u8 = 2;
1743 : /// The number of vectored reads performed
1744 : pub(super) const READS_COUNT: u8 = 100;
1745 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1746 : /// with values larger than the limit
1747 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1748 : }
1749 :
1750 : struct Entry {
1751 : key: Key,
1752 : lsn: Lsn,
1753 : value: Vec<u8>,
1754 : }
1755 :
1756 2 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1757 2 : let mut current_key = Key::MIN;
1758 2 :
1759 2 : let mut entries = Vec::new();
1760 122 : for _ in 0..constants::KEY_COUNT {
1761 120 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1762 120 : let mut lsns_iter =
1763 2260 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1764 2260 : Some(Lsn(lsn.0 + 0x08))
1765 2260 : });
1766 120 : let mut lsns = Vec::new();
1767 2380 : while lsns.len() < count as usize {
1768 2260 : let take = rng.gen_bool(0.5);
1769 2260 : let lsn = lsns_iter.next().unwrap();
1770 2260 : if take {
1771 1112 : lsns.push(lsn);
1772 1148 : }
1773 : }
1774 :
1775 1232 : for lsn in lsns {
1776 1112 : let size = constants::VALUE_SIZES
1777 3336 : .choose_weighted(rng, |item| item.1)
1778 1112 : .unwrap()
1779 1112 : .0;
1780 1112 : let mut buf = vec![0; size];
1781 1112 : rng.fill_bytes(&mut buf);
1782 1112 :
1783 1112 : entries.push(Entry {
1784 1112 : key: current_key,
1785 1112 : lsn,
1786 1112 : value: buf,
1787 1112 : })
1788 : }
1789 :
1790 120 : let gap = constants::KEY_GAP_CHANGES
1791 240 : .choose_weighted(rng, |item| item.1)
1792 120 : .unwrap()
1793 120 : .0;
1794 120 : if gap {
1795 38 : current_key = current_key.add(2);
1796 82 : } else {
1797 82 : current_key = current_key.add(1);
1798 82 : }
1799 : }
1800 :
1801 2 : entries
1802 2 : }
1803 :
1804 : struct EntriesMeta {
1805 : key_range: Range<Key>,
1806 : lsn_range: Range<Lsn>,
1807 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1808 : }
1809 :
1810 2 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1811 1112 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1812 2 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1813 0 : _ => panic!("More than one entry is always expected"),
1814 : };
1815 :
1816 1112 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1817 2 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1818 0 : _ => panic!("More than one entry is always expected"),
1819 : };
1820 :
1821 2 : let mut index = BTreeMap::new();
1822 1112 : for entry in entries.iter() {
1823 1112 : index.insert((entry.key, entry.lsn), entry.value.clone());
1824 1112 : }
1825 :
1826 2 : EntriesMeta {
1827 2 : key_range,
1828 2 : lsn_range,
1829 2 : index,
1830 2 : }
1831 2 : }
1832 :
1833 200 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1834 200 : let start = key_range.start.to_i128();
1835 200 : let end = key_range.end.to_i128();
1836 200 :
1837 200 : let mut keyspace = KeySpace::default();
1838 :
1839 600 : for _ in 0..constants::RANGES_COUNT {
1840 400 : let mut range: Option<Range<Key>> = Option::default();
1841 1244 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1842 844 : let range_start = rng.gen_range(start..end);
1843 844 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1844 844 : if range_end_offset >= end {
1845 100 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1846 744 : } else {
1847 744 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1848 744 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1849 744 : }
1850 : }
1851 400 : keyspace.ranges.push(range.unwrap());
1852 : }
1853 :
1854 200 : keyspace
1855 200 : }
1856 :
1857 : #[tokio::test]
1858 2 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1859 2 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
1860 8 : let (tenant, ctx) = harness.load().await;
1861 2 :
1862 2 : let timeline_id = TimelineId::generate();
1863 2 : let timeline = tenant
1864 2 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1865 4 : .await?;
1866 2 :
1867 2 : tracing::info!("Generating test data ...");
1868 2 :
1869 2 : let rng = &mut StdRng::seed_from_u64(0);
1870 2 : let entries = generate_entries(rng);
1871 2 : let entries_meta = get_entries_meta(&entries);
1872 2 :
1873 2 : tracing::info!("Done generating {} entries", entries.len());
1874 2 :
1875 2 : tracing::info!("Writing test data to delta layer ...");
1876 2 : let mut writer = DeltaLayerWriter::new(
1877 2 : harness.conf,
1878 2 : timeline_id,
1879 2 : harness.tenant_shard_id,
1880 2 : entries_meta.key_range.start,
1881 2 : entries_meta.lsn_range.clone(),
1882 2 : &ctx,
1883 2 : )
1884 2 : .await?;
1885 2 :
1886 1114 : for entry in entries {
1887 1112 : let (_, res) = writer
1888 1112 : .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
1889 215 : .await;
1890 1112 : res?;
1891 2 : }
1892 2 :
1893 5 : let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
1894 2 : let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
1895 2 :
1896 2 : let inner = resident.get_as_delta(&ctx).await?;
1897 2 :
1898 2 : let file_size = inner.file.metadata().await?.len();
1899 2 : tracing::info!(
1900 2 : "Done writing test data to delta layer. Resulting file size is: {}",
1901 2 : file_size
1902 2 : );
1903 2 :
1904 202 : for i in 0..constants::READS_COUNT {
1905 200 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1906 2 :
1907 200 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1908 200 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1909 200 : inner.index_start_blk,
1910 200 : inner.index_root_blk,
1911 200 : block_reader,
1912 200 : );
1913 200 :
1914 200 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1915 200 : let mut reconstruct_state = ValuesReconstructState::new();
1916 200 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1917 200 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1918 2 :
1919 200 : let vectored_reads = DeltaLayerInner::plan_reads(
1920 200 : &keyspace,
1921 200 : entries_meta.lsn_range.clone(),
1922 200 : data_end_offset,
1923 200 : index_reader,
1924 200 : planner,
1925 200 : &mut reconstruct_state,
1926 200 : &ctx,
1927 200 : )
1928 4 : .await?;
1929 2 :
1930 200 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1931 200 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1932 200 : &vectored_reads,
1933 200 : constants::MAX_VECTORED_READ_BYTES,
1934 200 : );
1935 200 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1936 2 :
1937 19924 : for read in vectored_reads {
1938 19724 : let blobs_buf = vectored_blob_reader
1939 19724 : .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
1940 10016 : .await?;
1941 57304 : for meta in blobs_buf.blobs.iter() {
1942 57304 : let value = &blobs_buf.buf[meta.start..meta.end];
1943 57304 : assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
1944 2 : }
1945 2 :
1946 19724 : buf = Some(blobs_buf.buf);
1947 2 : }
1948 2 : }
1949 2 :
1950 2 : Ok(())
1951 2 : }
1952 :
1953 : #[tokio::test]
1954 2 : async fn copy_delta_prefix_smoke() {
1955 2 : use crate::walrecord::NeonWalRecord;
1956 2 : use bytes::Bytes;
1957 2 :
1958 2 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
1959 2 : .await
1960 2 : .unwrap();
1961 8 : let (tenant, ctx) = h.load().await;
1962 2 : let ctx = &ctx;
1963 2 : let timeline = tenant
1964 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
1965 4 : .await
1966 2 : .unwrap();
1967 2 :
1968 2 : let initdb_layer = timeline
1969 2 : .layers
1970 2 : .read()
1971 2 : .await
1972 2 : .likely_resident_layers()
1973 2 : .next()
1974 2 : .cloned()
1975 2 : .unwrap();
1976 2 :
1977 2 : {
1978 2 : let mut writer = timeline.writer().await;
1979 2 :
1980 2 : let data = [
1981 2 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
1982 2 : (
1983 2 : 0x30,
1984 2 : 12,
1985 2 : Value::WalRecord(NeonWalRecord::Postgres {
1986 2 : will_init: false,
1987 2 : rec: Bytes::from_static(b"1"),
1988 2 : }),
1989 2 : ),
1990 2 : (
1991 2 : 0x40,
1992 2 : 12,
1993 2 : Value::WalRecord(NeonWalRecord::Postgres {
1994 2 : will_init: true,
1995 2 : rec: Bytes::from_static(b"2"),
1996 2 : }),
1997 2 : ),
1998 2 : // build an oversized value so we cannot extend and existing read over
1999 2 : // this
2000 2 : (
2001 2 : 0x50,
2002 2 : 12,
2003 2 : Value::WalRecord(NeonWalRecord::Postgres {
2004 2 : will_init: true,
2005 2 : rec: {
2006 2 : let mut buf =
2007 2 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
2008 2 : buf.iter_mut()
2009 2 : .enumerate()
2010 264192 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
2011 2 : Bytes::from(buf)
2012 2 : },
2013 2 : }),
2014 2 : ),
2015 2 : // because the oversized read cannot be extended further, we are sure to exercise the
2016 2 : // builder created on the last round with this:
2017 2 : (
2018 2 : 0x60,
2019 2 : 12,
2020 2 : Value::WalRecord(NeonWalRecord::Postgres {
2021 2 : will_init: true,
2022 2 : rec: Bytes::from_static(b"3"),
2023 2 : }),
2024 2 : ),
2025 2 : (
2026 2 : 0x60,
2027 2 : 9,
2028 2 : Value::Image(Bytes::from_static(b"something for a different key")),
2029 2 : ),
2030 2 : ];
2031 2 :
2032 2 : let mut last_lsn = None;
2033 2 :
2034 14 : for (lsn, key, value) in data {
2035 12 : let key = Key::from_i128(key);
2036 12 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
2037 12 : last_lsn = Some(lsn);
2038 2 : }
2039 2 :
2040 2 : writer.finish_write(Lsn(last_lsn.unwrap()));
2041 2 : }
2042 2 : timeline.freeze_and_flush().await.unwrap();
2043 2 :
2044 2 : let new_layer = timeline
2045 2 : .layers
2046 2 : .read()
2047 2 : .await
2048 2 : .likely_resident_layers()
2049 3 : .find(|&x| x != &initdb_layer)
2050 2 : .cloned()
2051 2 : .unwrap();
2052 2 :
2053 2 : // create a copy for the timeline, so we don't overwrite the file
2054 2 : let branch = tenant
2055 2 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
2056 2 : .await
2057 2 : .unwrap();
2058 2 :
2059 2 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
2060 2 :
2061 2 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
2062 2 : // a single key
2063 2 :
2064 12 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
2065 10 : let truncate_at = Lsn(truncate_at);
2066 2 :
2067 10 : let mut writer = DeltaLayerWriter::new(
2068 10 : tenant.conf,
2069 10 : branch.timeline_id,
2070 10 : tenant.tenant_shard_id,
2071 10 : Key::MIN,
2072 10 : Lsn(0x11)..truncate_at,
2073 10 : ctx,
2074 10 : )
2075 5 : .await
2076 10 : .unwrap();
2077 2 :
2078 10 : let new_layer = new_layer.download_and_keep_resident().await.unwrap();
2079 10 :
2080 10 : new_layer
2081 10 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
2082 15 : .await
2083 10 : .unwrap();
2084 2 :
2085 24 : let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
2086 10 : let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
2087 10 :
2088 10 : copied_layer.get_as_delta(ctx).await.unwrap();
2089 10 :
2090 10 : assert_keys_and_values_eq(
2091 10 : new_layer.get_as_delta(ctx).await.unwrap(),
2092 10 : copied_layer.get_as_delta(ctx).await.unwrap(),
2093 10 : truncate_at,
2094 10 : ctx,
2095 2 : )
2096 56 : .await;
2097 2 : }
2098 2 : }
2099 :
2100 10 : async fn assert_keys_and_values_eq(
2101 10 : source: &DeltaLayerInner,
2102 10 : truncated: &DeltaLayerInner,
2103 10 : truncated_at: Lsn,
2104 10 : ctx: &RequestContext,
2105 10 : ) {
2106 10 : use futures::future::ready;
2107 10 : use futures::stream::TryStreamExt;
2108 10 :
2109 10 : let start_key = [0u8; DELTA_KEY_SIZE];
2110 10 :
2111 10 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
2112 10 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2113 10 : source.index_start_blk,
2114 10 : source.index_root_blk,
2115 10 : &source_reader,
2116 10 : );
2117 10 : let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
2118 60 : let source_stream = source_stream.filter(|res| match res {
2119 60 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
2120 0 : _ => ready(true),
2121 60 : });
2122 10 : let mut source_stream = std::pin::pin!(source_stream);
2123 10 :
2124 10 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
2125 10 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2126 10 : truncated.index_start_blk,
2127 10 : truncated.index_root_blk,
2128 10 : &truncated_reader,
2129 10 : );
2130 10 : let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
2131 10 : let mut truncated_stream = std::pin::pin!(truncated_stream);
2132 10 :
2133 10 : let mut scratch_left = Vec::new();
2134 10 : let mut scratch_right = Vec::new();
2135 :
2136 : loop {
2137 42 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
2138 42 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2139 42 :
2140 42 : if src.is_none() {
2141 10 : assert!(truncated.is_none());
2142 10 : break;
2143 32 : }
2144 32 :
2145 32 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2146 32 :
2147 32 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2148 32 : assert_eq!(src.0, truncated.0);
2149 32 : assert_eq!(src.1, truncated.1);
2150 :
2151 : // if this is needed for something else, just drop this assert.
2152 32 : assert!(
2153 32 : src.2.pos() >= truncated.2.pos(),
2154 0 : "value position should not go backwards {} vs. {}",
2155 0 : src.2.pos(),
2156 0 : truncated.2.pos()
2157 : );
2158 :
2159 32 : scratch_left.clear();
2160 32 : let src_cursor = source_reader.block_cursor();
2161 32 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2162 32 : scratch_right.clear();
2163 32 : let trunc_cursor = truncated_reader.block_cursor();
2164 32 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2165 :
2166 32 : tokio::try_join!(left, right).unwrap();
2167 32 :
2168 32 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2169 : }
2170 10 : }
2171 :
2172 18026 : pub(crate) fn sort_delta(
2173 18026 : (k1, l1, _): &(Key, Lsn, Value),
2174 18026 : (k2, l2, _): &(Key, Lsn, Value),
2175 18026 : ) -> std::cmp::Ordering {
2176 18026 : (k1, l1).cmp(&(k2, l2))
2177 18026 : }
2178 :
2179 94 : pub(crate) fn sort_delta_value(
2180 94 : (k1, l1, v1): &(Key, Lsn, Value),
2181 94 : (k2, l2, v2): &(Key, Lsn, Value),
2182 94 : ) -> std::cmp::Ordering {
2183 94 : let order_1 = if v1.is_image() { 0 } else { 1 };
2184 94 : let order_2 = if v2.is_image() { 0 } else { 1 };
2185 94 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
2186 94 : }
2187 :
2188 20 : pub(crate) async fn produce_delta_layer(
2189 20 : tenant: &Tenant,
2190 20 : tline: &Arc<Timeline>,
2191 20 : mut deltas: Vec<(Key, Lsn, Value)>,
2192 20 : ctx: &RequestContext,
2193 20 : ) -> anyhow::Result<ResidentLayer> {
2194 20 : deltas.sort_by(sort_delta);
2195 20 : let (key_start, _, _) = deltas.first().unwrap();
2196 20 : let (key_max, _, _) = deltas.last().unwrap();
2197 8040 : let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
2198 8040 : let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
2199 20 : let lsn_end = Lsn(lsn_max.0 + 1);
2200 20 : let mut writer = DeltaLayerWriter::new(
2201 20 : tenant.conf,
2202 20 : tline.timeline_id,
2203 20 : tenant.tenant_shard_id,
2204 20 : *key_start,
2205 20 : (*lsn_min)..lsn_end,
2206 20 : ctx,
2207 20 : )
2208 10 : .await?;
2209 20 : let key_end = key_max.next();
2210 :
2211 8060 : for (key, lsn, value) in deltas {
2212 8040 : writer.put_value(key, lsn, value, ctx).await?;
2213 : }
2214 :
2215 58 : let (desc, path) = writer.finish(key_end, ctx).await?;
2216 20 : let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
2217 :
2218 20 : Ok::<_, anyhow::Error>(delta_layer)
2219 20 : }
2220 :
2221 28 : async fn assert_delta_iter_equal(
2222 28 : delta_iter: &mut DeltaLayerIterator<'_>,
2223 28 : expect: &[(Key, Lsn, Value)],
2224 28 : ) {
2225 28 : let mut expect_iter = expect.iter();
2226 : loop {
2227 28028 : let o1 = delta_iter.next().await.unwrap();
2228 28028 : let o2 = expect_iter.next();
2229 28028 : assert_eq!(o1.is_some(), o2.is_some());
2230 28028 : if o1.is_none() && o2.is_none() {
2231 28 : break;
2232 28000 : }
2233 28000 : let (k1, l1, v1) = o1.unwrap();
2234 28000 : let (k2, l2, v2) = o2.unwrap();
2235 28000 : assert_eq!(&k1, k2);
2236 28000 : assert_eq!(l1, *l2);
2237 28000 : assert_eq!(&v1, v2);
2238 : }
2239 28 : }
2240 :
2241 : #[tokio::test]
2242 2 : async fn delta_layer_iterator() {
2243 2 : let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
2244 8 : let (tenant, ctx) = harness.load().await;
2245 2 :
2246 2 : let tline = tenant
2247 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2248 4 : .await
2249 2 : .unwrap();
2250 2 :
2251 2000 : fn get_key(id: u32) -> Key {
2252 2000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
2253 2000 : key.field6 = id;
2254 2000 : key
2255 2000 : }
2256 2 : const N: usize = 1000;
2257 2 : let test_deltas = (0..N)
2258 2000 : .map(|idx| {
2259 2000 : (
2260 2000 : get_key(idx as u32 / 10),
2261 2000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
2262 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
2263 2000 : )
2264 2000 : })
2265 2 : .collect_vec();
2266 2 : let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
2267 8 : .await
2268 2 : .unwrap();
2269 2 : let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
2270 6 : for max_read_size in [1, 1024] {
2271 32 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
2272 28 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
2273 28 : // Test if the batch size is correctly determined
2274 28 : let mut iter = delta_layer.iter(&ctx);
2275 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2276 28 : let mut num_items = 0;
2277 112 : for _ in 0..3 {
2278 84 : iter.next_batch().await.unwrap();
2279 84 : num_items += iter.key_values_batch.len();
2280 84 : if max_read_size == 1 {
2281 2 : // every key should be a batch b/c the value is larger than max_read_size
2282 42 : assert_eq!(iter.key_values_batch.len(), 1);
2283 2 : } else {
2284 42 : assert_eq!(iter.key_values_batch.len(), batch_size);
2285 2 : }
2286 84 : if num_items >= N {
2287 2 : break;
2288 84 : }
2289 84 : iter.key_values_batch.clear();
2290 2 : }
2291 2 : // Test if the result is correct
2292 28 : let mut iter = delta_layer.iter(&ctx);
2293 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2294 9577 : assert_delta_iter_equal(&mut iter, &test_deltas).await;
2295 2 : }
2296 2 : }
2297 2 : }
2298 : }
|