Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::{self, FileId, PAGE_SZ};
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{
37 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
38 : };
39 : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
40 : use crate::tenant::timeline::GetVectoredError;
41 : use crate::tenant::vectored_blob_io::{
42 : BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
43 : VectoredReadPlanner,
44 : };
45 : use crate::tenant::{PageReconstructError, Timeline};
46 : use crate::virtual_file::{self, VirtualFile};
47 : use crate::{walrecord, TEMP_FILE_SUFFIX};
48 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
49 : use anyhow::{anyhow, bail, ensure, Context, Result};
50 : use bytes::BytesMut;
51 : use camino::{Utf8Path, Utf8PathBuf};
52 : use futures::StreamExt;
53 : use itertools::Itertools;
54 : use pageserver_api::keyspace::KeySpace;
55 : use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind};
56 : use pageserver_api::shard::TenantShardId;
57 : use rand::{distributions::Alphanumeric, Rng};
58 : use serde::{Deserialize, Serialize};
59 : use std::collections::VecDeque;
60 : use std::fs::File;
61 : use std::io::SeekFrom;
62 : use std::ops::Range;
63 : use std::os::unix::fs::FileExt;
64 : use std::str::FromStr;
65 : use std::sync::Arc;
66 : use tokio::sync::OnceCell;
67 : use tracing::*;
68 :
69 : use utils::{
70 : bin_ser::BeSer,
71 : id::{TenantId, TimelineId},
72 : lsn::Lsn,
73 : };
74 :
75 : use super::{
76 : AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer,
77 : ValuesReconstructState,
78 : };
79 :
80 : ///
81 : /// Header stored in the beginning of the file
82 : ///
83 : /// After this comes the 'values' part, starting on block 1. After that,
84 : /// the 'index' starts at the block indicated by 'index_start_blk'
85 : ///
86 1018 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
87 : pub struct Summary {
88 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
89 : pub magic: u16,
90 : pub format_version: u16,
91 :
92 : pub tenant_id: TenantId,
93 : pub timeline_id: TimelineId,
94 : pub key_range: Range<Key>,
95 : pub lsn_range: Range<Lsn>,
96 :
97 : /// Block number where the 'index' part of the file begins.
98 : pub index_start_blk: u32,
99 : /// Block within the 'index', where the B-tree root page is stored
100 : pub index_root_blk: u32,
101 : }
102 :
103 : impl From<&DeltaLayer> for Summary {
104 0 : fn from(layer: &DeltaLayer) -> Self {
105 0 : Self::expected(
106 0 : layer.desc.tenant_shard_id.tenant_id,
107 0 : layer.desc.timeline_id,
108 0 : layer.desc.key_range.clone(),
109 0 : layer.desc.lsn_range.clone(),
110 0 : )
111 0 : }
112 : }
113 :
114 : impl Summary {
115 1018 : pub(super) fn expected(
116 1018 : tenant_id: TenantId,
117 1018 : timeline_id: TimelineId,
118 1018 : keys: Range<Key>,
119 1018 : lsns: Range<Lsn>,
120 1018 : ) -> Self {
121 1018 : Self {
122 1018 : magic: DELTA_FILE_MAGIC,
123 1018 : format_version: STORAGE_FORMAT_VERSION,
124 1018 :
125 1018 : tenant_id,
126 1018 : timeline_id,
127 1018 : key_range: keys,
128 1018 : lsn_range: lsns,
129 1018 :
130 1018 : index_start_blk: 0,
131 1018 : index_root_blk: 0,
132 1018 : }
133 1018 : }
134 : }
135 :
136 : // Flag indicating that this version initialize the page
137 : const WILL_INIT: u64 = 1;
138 :
139 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
140 : /// offset, and for WAL records it also contains `will_init` flag. The flag
141 : /// helps to determine the range of records that needs to be applied, without
142 : /// reading/deserializing records themselves.
143 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
144 : pub struct BlobRef(pub u64);
145 :
146 : impl BlobRef {
147 240547 : pub fn will_init(&self) -> bool {
148 240547 : (self.0 & WILL_INIT) != 0
149 240547 : }
150 :
151 4474241 : pub fn pos(&self) -> u64 {
152 4474241 : self.0 >> 1
153 4474241 : }
154 :
155 6458920 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
156 6458920 : let mut blob_ref = pos << 1;
157 6458920 : if will_init {
158 6457656 : blob_ref |= WILL_INIT;
159 6457656 : }
160 6458920 : BlobRef(blob_ref)
161 6458920 : }
162 : }
163 :
164 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
165 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
166 :
167 : /// This is the key of the B-tree index stored in the delta layer. It consists
168 : /// of the serialized representation of a Key and LSN.
169 : impl DeltaKey {
170 2064198 : fn from_slice(buf: &[u8]) -> Self {
171 2064198 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
172 2064198 : bytes.copy_from_slice(buf);
173 2064198 : DeltaKey(bytes)
174 2064198 : }
175 :
176 6705952 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
177 6705952 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
178 6705952 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
179 6705952 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
180 6705952 : DeltaKey(bytes)
181 6705952 : }
182 :
183 2064198 : fn key(&self) -> Key {
184 2064198 : Key::from_slice(&self.0)
185 2064198 : }
186 :
187 2064198 : fn lsn(&self) -> Lsn {
188 2064198 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
189 2064198 : }
190 :
191 345977 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
192 345977 : let mut lsn_buf = [0u8; 8];
193 345977 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
194 345977 : Lsn(u64::from_be_bytes(lsn_buf))
195 345977 : }
196 : }
197 :
198 : /// This is used only from `pagectl`. Within pageserver, all layers are
199 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
200 : pub struct DeltaLayer {
201 : path: Utf8PathBuf,
202 : pub desc: PersistentLayerDesc,
203 : access_stats: LayerAccessStats,
204 : inner: OnceCell<Arc<DeltaLayerInner>>,
205 : }
206 :
207 : impl std::fmt::Debug for DeltaLayer {
208 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 0 : use super::RangeDisplayDebug;
210 0 :
211 0 : f.debug_struct("DeltaLayer")
212 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
213 0 : .field("lsn_range", &self.desc.lsn_range)
214 0 : .field("file_size", &self.desc.file_size)
215 0 : .field("inner", &self.inner)
216 0 : .finish()
217 0 : }
218 : }
219 :
220 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
221 : /// file.
222 : pub struct DeltaLayerInner {
223 : // values copied from summary
224 : index_start_blk: u32,
225 : index_root_blk: u32,
226 :
227 : file: VirtualFile,
228 : file_id: FileId,
229 :
230 : #[allow(dead_code)]
231 : layer_key_range: Range<Key>,
232 : #[allow(dead_code)]
233 : layer_lsn_range: Range<Lsn>,
234 :
235 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
236 : }
237 :
238 : impl std::fmt::Debug for DeltaLayerInner {
239 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 0 : f.debug_struct("DeltaLayerInner")
241 0 : .field("index_start_blk", &self.index_start_blk)
242 0 : .field("index_root_blk", &self.index_root_blk)
243 0 : .finish()
244 0 : }
245 : }
246 :
247 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
248 : impl std::fmt::Display for DeltaLayer {
249 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 0 : write!(f, "{}", self.layer_desc().short_id())
251 0 : }
252 : }
253 :
254 : impl AsLayerDesc for DeltaLayer {
255 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
256 0 : &self.desc
257 0 : }
258 : }
259 :
260 : impl DeltaLayer {
261 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
262 0 : self.desc.dump();
263 0 :
264 0 : if !verbose {
265 0 : return Ok(());
266 0 : }
267 :
268 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
269 :
270 0 : inner.dump(ctx).await
271 0 : }
272 :
273 1346 : fn temp_path_for(
274 1346 : conf: &PageServerConf,
275 1346 : tenant_shard_id: &TenantShardId,
276 1346 : timeline_id: &TimelineId,
277 1346 : key_start: Key,
278 1346 : lsn_range: &Range<Lsn>,
279 1346 : ) -> Utf8PathBuf {
280 1346 : let rand_string: String = rand::thread_rng()
281 1346 : .sample_iter(&Alphanumeric)
282 1346 : .take(8)
283 1346 : .map(char::from)
284 1346 : .collect();
285 1346 :
286 1346 : conf.timeline_path(tenant_shard_id, timeline_id)
287 1346 : .join(format!(
288 1346 : "{}-XXX__{:016X}-{:016X}.{}.{}",
289 1346 : key_start,
290 1346 : u64::from(lsn_range.start),
291 1346 : u64::from(lsn_range.end),
292 1346 : rand_string,
293 1346 : TEMP_FILE_SUFFIX,
294 1346 : ))
295 1346 : }
296 :
297 : ///
298 : /// Open the underlying file and read the metadata into memory, if it's
299 : /// not loaded already.
300 : ///
301 0 : async fn load(
302 0 : &self,
303 0 : access_kind: LayerAccessKind,
304 0 : ctx: &RequestContext,
305 0 : ) -> Result<&Arc<DeltaLayerInner>> {
306 0 : self.access_stats.record_access(access_kind, ctx);
307 0 : // Quick exit if already loaded
308 0 : self.inner
309 0 : .get_or_try_init(|| self.load_inner(ctx))
310 0 : .await
311 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
312 0 : }
313 :
314 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
315 0 : let path = self.path();
316 :
317 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx)
318 0 : .await
319 0 : .and_then(|res| res)?;
320 :
321 : // not production code
322 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
323 0 : let expected_layer_name = self.layer_desc().layer_name();
324 0 :
325 0 : if actual_layer_name != expected_layer_name {
326 0 : println!("warning: filename does not match what is expected from in-file summary");
327 0 : println!("actual: {:?}", actual_layer_name.to_string());
328 0 : println!("expected: {:?}", expected_layer_name.to_string());
329 0 : }
330 :
331 0 : Ok(Arc::new(loaded))
332 0 : }
333 :
334 : /// Create a DeltaLayer struct representing an existing file on disk.
335 : ///
336 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
337 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
338 0 : let mut summary_buf = vec![0; PAGE_SZ];
339 0 : file.read_exact_at(&mut summary_buf, 0)?;
340 0 : let summary = Summary::des_prefix(&summary_buf)?;
341 :
342 0 : let metadata = file
343 0 : .metadata()
344 0 : .context("get file metadata to determine size")?;
345 :
346 : // This function is never used for constructing layers in a running pageserver,
347 : // so it does not need an accurate TenantShardId.
348 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
349 0 :
350 0 : Ok(DeltaLayer {
351 0 : path: path.to_path_buf(),
352 0 : desc: PersistentLayerDesc::new_delta(
353 0 : tenant_shard_id,
354 0 : summary.timeline_id,
355 0 : summary.key_range,
356 0 : summary.lsn_range,
357 0 : metadata.len(),
358 0 : ),
359 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
360 0 : inner: OnceCell::new(),
361 0 : })
362 0 : }
363 :
364 : /// Path to the layer file in pageserver workdir.
365 0 : fn path(&self) -> Utf8PathBuf {
366 0 : self.path.clone()
367 0 : }
368 : }
369 :
370 : /// A builder object for constructing a new delta layer.
371 : ///
372 : /// Usage:
373 : ///
374 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
375 : ///
376 : /// 2. Write the contents by calling `put_value` for every page
377 : /// version to store in the layer.
378 : ///
379 : /// 3. Call `finish`.
380 : ///
381 : struct DeltaLayerWriterInner {
382 : conf: &'static PageServerConf,
383 : pub path: Utf8PathBuf,
384 : timeline_id: TimelineId,
385 : tenant_shard_id: TenantShardId,
386 :
387 : key_start: Key,
388 : lsn_range: Range<Lsn>,
389 :
390 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
391 :
392 : blob_writer: BlobWriter<true>,
393 : }
394 :
395 : impl DeltaLayerWriterInner {
396 : ///
397 : /// Start building a new delta layer.
398 : ///
399 1346 : async fn new(
400 1346 : conf: &'static PageServerConf,
401 1346 : timeline_id: TimelineId,
402 1346 : tenant_shard_id: TenantShardId,
403 1346 : key_start: Key,
404 1346 : lsn_range: Range<Lsn>,
405 1346 : ctx: &RequestContext,
406 1346 : ) -> anyhow::Result<Self> {
407 1346 : // Create the file initially with a temporary filename. We don't know
408 1346 : // the end key yet, so we cannot form the final filename yet. We will
409 1346 : // rename it when we're done.
410 1346 : //
411 1346 : // Note: This overwrites any existing file. There shouldn't be any.
412 1346 : // FIXME: throw an error instead?
413 1346 : let path =
414 1346 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
415 :
416 1346 : let mut file = VirtualFile::create(&path, ctx).await?;
417 : // make room for the header block
418 1346 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
419 1346 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
420 1346 :
421 1346 : // Initialize the b-tree index builder
422 1346 : let block_buf = BlockBuf::new();
423 1346 : let tree_builder = DiskBtreeBuilder::new(block_buf);
424 1346 :
425 1346 : Ok(Self {
426 1346 : conf,
427 1346 : path,
428 1346 : timeline_id,
429 1346 : tenant_shard_id,
430 1346 : key_start,
431 1346 : lsn_range,
432 1346 : tree: tree_builder,
433 1346 : blob_writer,
434 1346 : })
435 1346 : }
436 :
437 : ///
438 : /// Append a key-value pair to the file.
439 : ///
440 : /// The values must be appended in key, lsn order.
441 : ///
442 2072154 : async fn put_value(
443 2072154 : &mut self,
444 2072154 : key: Key,
445 2072154 : lsn: Lsn,
446 2072154 : val: Value,
447 2072154 : ctx: &RequestContext,
448 2072154 : ) -> anyhow::Result<()> {
449 2072154 : let (_, res) = self
450 2072154 : .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx)
451 1566 : .await;
452 2072154 : res
453 2072154 : }
454 :
455 6458816 : async fn put_value_bytes(
456 6458816 : &mut self,
457 6458816 : key: Key,
458 6458816 : lsn: Lsn,
459 6458816 : val: Vec<u8>,
460 6458816 : will_init: bool,
461 6458816 : ctx: &RequestContext,
462 6458816 : ) -> (Vec<u8>, anyhow::Result<()>) {
463 6458816 : assert!(self.lsn_range.start <= lsn);
464 : // We don't want to use compression in delta layer creation
465 6458816 : let compression = ImageCompressionAlgorithm::Disabled;
466 6458816 : let (val, res) = self
467 6458816 : .blob_writer
468 6458816 : .write_blob_maybe_compressed(val, ctx, compression)
469 4517 : .await;
470 6458816 : let off = match res {
471 6458816 : Ok(off) => off,
472 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
473 : };
474 :
475 6458816 : let blob_ref = BlobRef::new(off, will_init);
476 6458816 :
477 6458816 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
478 6458816 : let res = self.tree.append(&delta_key.0, blob_ref.0);
479 6458816 : (val, res.map_err(|e| anyhow::anyhow!(e)))
480 6458816 : }
481 :
482 2023972 : fn size(&self) -> u64 {
483 2023972 : self.blob_writer.size() + self.tree.borrow_writer().size()
484 2023972 : }
485 :
486 : ///
487 : /// Finish writing the delta layer.
488 : ///
489 1346 : async fn finish(
490 1346 : self,
491 1346 : key_end: Key,
492 1346 : timeline: &Arc<Timeline>,
493 1346 : ctx: &RequestContext,
494 1346 : ) -> anyhow::Result<ResidentLayer> {
495 1346 : let temp_path = self.path.clone();
496 9573 : let result = self.finish0(key_end, timeline, ctx).await;
497 1346 : if result.is_err() {
498 0 : tracing::info!(%temp_path, "cleaning up temporary file after error during writing");
499 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
500 0 : tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
501 0 : }
502 1346 : }
503 1346 : result
504 1346 : }
505 :
506 1346 : async fn finish0(
507 1346 : self,
508 1346 : key_end: Key,
509 1346 : timeline: &Arc<Timeline>,
510 1346 : ctx: &RequestContext,
511 1346 : ) -> anyhow::Result<ResidentLayer> {
512 1346 : let index_start_blk =
513 1346 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
514 :
515 1346 : let mut file = self.blob_writer.into_inner(ctx).await?;
516 :
517 : // Write out the index
518 1346 : let (index_root_blk, block_buf) = self.tree.finish()?;
519 1346 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
520 0 : .await?;
521 14904 : for buf in block_buf.blocks {
522 13558 : let (_buf, res) = file.write_all(buf, ctx).await;
523 13558 : res?;
524 : }
525 1346 : assert!(self.lsn_range.start < self.lsn_range.end);
526 : // Fill in the summary on blk 0
527 1346 : let summary = Summary {
528 1346 : magic: DELTA_FILE_MAGIC,
529 1346 : format_version: STORAGE_FORMAT_VERSION,
530 1346 : tenant_id: self.tenant_shard_id.tenant_id,
531 1346 : timeline_id: self.timeline_id,
532 1346 : key_range: self.key_start..key_end,
533 1346 : lsn_range: self.lsn_range.clone(),
534 1346 : index_start_blk,
535 1346 : index_root_blk,
536 1346 : };
537 1346 :
538 1346 : let mut buf = Vec::with_capacity(PAGE_SZ);
539 1346 : // TODO: could use smallvec here but it's a pain with Slice<T>
540 1346 : Summary::ser_into(&summary, &mut buf)?;
541 1346 : file.seek(SeekFrom::Start(0)).await?;
542 1346 : let (_buf, res) = file.write_all(buf, ctx).await;
543 1346 : res?;
544 :
545 1346 : let metadata = file
546 1346 : .metadata()
547 677 : .await
548 1346 : .context("get file metadata to determine size")?;
549 :
550 : // 5GB limit for objects without multipart upload (which we don't want to use)
551 : // Make it a little bit below to account for differing GB units
552 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
553 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
554 1346 : ensure!(
555 1346 : metadata.len() <= S3_UPLOAD_LIMIT,
556 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
557 0 : file.path,
558 0 : metadata.len()
559 : );
560 :
561 : // Note: Because we opened the file in write-only mode, we cannot
562 : // reuse the same VirtualFile for reading later. That's why we don't
563 : // set inner.file here. The first read will have to re-open it.
564 :
565 1346 : let desc = PersistentLayerDesc::new_delta(
566 1346 : self.tenant_shard_id,
567 1346 : self.timeline_id,
568 1346 : self.key_start..key_end,
569 1346 : self.lsn_range.clone(),
570 1346 : metadata.len(),
571 1346 : );
572 1346 :
573 1346 : // fsync the file
574 1346 : file.sync_all().await?;
575 :
576 1346 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
577 :
578 1346 : trace!("created delta layer {}", layer.local_path());
579 :
580 1346 : Ok(layer)
581 1346 : }
582 : }
583 :
584 : /// A builder object for constructing a new delta layer.
585 : ///
586 : /// Usage:
587 : ///
588 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
589 : ///
590 : /// 2. Write the contents by calling `put_value` for every page
591 : /// version to store in the layer.
592 : ///
593 : /// 3. Call `finish`.
594 : ///
595 : /// # Note
596 : ///
597 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
598 : /// possible for the writer to drop before `finish` is actually called. So this
599 : /// could lead to odd temporary files in the directory, exhausting file system.
600 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
601 : /// implementation that cleans up the temporary file in failure. It's not
602 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
603 : /// out some fields, making it impossible to implement `Drop`.
604 : ///
605 : #[must_use]
606 : pub struct DeltaLayerWriter {
607 : inner: Option<DeltaLayerWriterInner>,
608 : }
609 :
610 : impl DeltaLayerWriter {
611 : ///
612 : /// Start building a new delta layer.
613 : ///
614 1346 : pub async fn new(
615 1346 : conf: &'static PageServerConf,
616 1346 : timeline_id: TimelineId,
617 1346 : tenant_shard_id: TenantShardId,
618 1346 : key_start: Key,
619 1346 : lsn_range: Range<Lsn>,
620 1346 : ctx: &RequestContext,
621 1346 : ) -> anyhow::Result<Self> {
622 1346 : Ok(Self {
623 1346 : inner: Some(
624 1346 : DeltaLayerWriterInner::new(
625 1346 : conf,
626 1346 : timeline_id,
627 1346 : tenant_shard_id,
628 1346 : key_start,
629 1346 : lsn_range,
630 1346 : ctx,
631 1346 : )
632 686 : .await?,
633 : ),
634 : })
635 1346 : }
636 :
637 : ///
638 : /// Append a key-value pair to the file.
639 : ///
640 : /// The values must be appended in key, lsn order.
641 : ///
642 2072154 : pub async fn put_value(
643 2072154 : &mut self,
644 2072154 : key: Key,
645 2072154 : lsn: Lsn,
646 2072154 : val: Value,
647 2072154 : ctx: &RequestContext,
648 2072154 : ) -> anyhow::Result<()> {
649 2072154 : self.inner
650 2072154 : .as_mut()
651 2072154 : .unwrap()
652 2072154 : .put_value(key, lsn, val, ctx)
653 1566 : .await
654 2072154 : }
655 :
656 4386662 : pub async fn put_value_bytes(
657 4386662 : &mut self,
658 4386662 : key: Key,
659 4386662 : lsn: Lsn,
660 4386662 : val: Vec<u8>,
661 4386662 : will_init: bool,
662 4386662 : ctx: &RequestContext,
663 4386662 : ) -> (Vec<u8>, anyhow::Result<()>) {
664 4386662 : self.inner
665 4386662 : .as_mut()
666 4386662 : .unwrap()
667 4386662 : .put_value_bytes(key, lsn, val, will_init, ctx)
668 2951 : .await
669 4386662 : }
670 :
671 2023972 : pub fn size(&self) -> u64 {
672 2023972 : self.inner.as_ref().unwrap().size()
673 2023972 : }
674 :
675 : ///
676 : /// Finish writing the delta layer.
677 : ///
678 1346 : pub(crate) async fn finish(
679 1346 : mut self,
680 1346 : key_end: Key,
681 1346 : timeline: &Arc<Timeline>,
682 1346 : ctx: &RequestContext,
683 1346 : ) -> anyhow::Result<ResidentLayer> {
684 1346 : self.inner
685 1346 : .take()
686 1346 : .unwrap()
687 1346 : .finish(key_end, timeline, ctx)
688 9573 : .await
689 1346 : }
690 : }
691 :
692 : impl Drop for DeltaLayerWriter {
693 1346 : fn drop(&mut self) {
694 1346 : if let Some(inner) = self.inner.take() {
695 0 : // We want to remove the virtual file here, so it's fine to not
696 0 : // having completely flushed unwritten data.
697 0 : let vfile = inner.blob_writer.into_inner_no_flush();
698 0 : vfile.remove();
699 1346 : }
700 1346 : }
701 : }
702 :
703 0 : #[derive(thiserror::Error, Debug)]
704 : pub enum RewriteSummaryError {
705 : #[error("magic mismatch")]
706 : MagicMismatch,
707 : #[error(transparent)]
708 : Other(#[from] anyhow::Error),
709 : }
710 :
711 : impl From<std::io::Error> for RewriteSummaryError {
712 0 : fn from(e: std::io::Error) -> Self {
713 0 : Self::Other(anyhow::anyhow!(e))
714 0 : }
715 : }
716 :
717 : impl DeltaLayer {
718 0 : pub async fn rewrite_summary<F>(
719 0 : path: &Utf8Path,
720 0 : rewrite: F,
721 0 : ctx: &RequestContext,
722 0 : ) -> Result<(), RewriteSummaryError>
723 0 : where
724 0 : F: Fn(Summary) -> Summary,
725 0 : {
726 0 : let mut file = VirtualFile::open_with_options(
727 0 : path,
728 0 : virtual_file::OpenOptions::new().read(true).write(true),
729 0 : ctx,
730 0 : )
731 0 : .await
732 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
733 0 : let file_id = page_cache::next_file_id();
734 0 : let block_reader = FileBlockReader::new(&file, file_id);
735 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
736 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
737 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
738 0 : return Err(RewriteSummaryError::MagicMismatch);
739 0 : }
740 0 :
741 0 : let new_summary = rewrite(actual_summary);
742 0 :
743 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
744 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
745 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
746 0 : file.seek(SeekFrom::Start(0)).await?;
747 0 : let (_buf, res) = file.write_all(buf, ctx).await;
748 0 : res?;
749 0 : Ok(())
750 0 : }
751 : }
752 :
753 : impl DeltaLayerInner {
754 34 : pub(crate) fn key_range(&self) -> &Range<Key> {
755 34 : &self.layer_key_range
756 34 : }
757 :
758 34 : pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
759 34 : &self.layer_lsn_range
760 34 : }
761 :
762 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
763 : /// - inner has the success or transient failure
764 : /// - outer has the permanent failure
765 1018 : pub(super) async fn load(
766 1018 : path: &Utf8Path,
767 1018 : summary: Option<Summary>,
768 1018 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
769 1018 : ctx: &RequestContext,
770 1018 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
771 1018 : let file = match VirtualFile::open(path, ctx).await {
772 1018 : Ok(file) => file,
773 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
774 : };
775 1018 : let file_id = page_cache::next_file_id();
776 1018 :
777 1018 : let block_reader = FileBlockReader::new(&file, file_id);
778 :
779 1018 : let summary_blk = match block_reader.read_blk(0, ctx).await {
780 1018 : Ok(blk) => blk,
781 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
782 : };
783 :
784 : // TODO: this should be an assertion instead; see ImageLayerInner::load
785 1018 : let actual_summary =
786 1018 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
787 :
788 1018 : if let Some(mut expected_summary) = summary {
789 : // production code path
790 1018 : expected_summary.index_start_blk = actual_summary.index_start_blk;
791 1018 : expected_summary.index_root_blk = actual_summary.index_root_blk;
792 1018 : // mask out the timeline_id, but still require the layers to be from the same tenant
793 1018 : expected_summary.timeline_id = actual_summary.timeline_id;
794 1018 :
795 1018 : if actual_summary != expected_summary {
796 0 : bail!(
797 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
798 0 : actual_summary,
799 0 : expected_summary
800 0 : );
801 1018 : }
802 0 : }
803 :
804 1018 : Ok(Ok(DeltaLayerInner {
805 1018 : file,
806 1018 : file_id,
807 1018 : index_start_blk: actual_summary.index_start_blk,
808 1018 : index_root_blk: actual_summary.index_root_blk,
809 1018 : max_vectored_read_bytes,
810 1018 : layer_key_range: actual_summary.key_range,
811 1018 : layer_lsn_range: actual_summary.lsn_range,
812 1018 : }))
813 1018 : }
814 :
815 204429 : pub(super) async fn get_value_reconstruct_data(
816 204429 : &self,
817 204429 : key: Key,
818 204429 : lsn_range: Range<Lsn>,
819 204429 : reconstruct_state: &mut ValueReconstructState,
820 204429 : ctx: &RequestContext,
821 204429 : ) -> anyhow::Result<ValueReconstructResult> {
822 204429 : let mut need_image = true;
823 204429 : // Scan the page versions backwards, starting from `lsn`.
824 204429 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
825 204429 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
826 204429 : self.index_start_blk,
827 204429 : self.index_root_blk,
828 204429 : &block_reader,
829 204429 : );
830 204429 : let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
831 204429 :
832 204429 : let mut offsets: Vec<(Lsn, u64)> = Vec::new();
833 204429 :
834 204429 : tree_reader
835 204429 : .visit(
836 204429 : &search_key.0,
837 204429 : VisitDirection::Backwards,
838 204429 : |key, value| {
839 198369 : let blob_ref = BlobRef(value);
840 198369 : if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
841 76385 : return false;
842 121984 : }
843 121984 : let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
844 121984 : if entry_lsn < lsn_range.start {
845 32 : return false;
846 121952 : }
847 121952 : offsets.push((entry_lsn, blob_ref.pos()));
848 121952 :
849 121952 : !blob_ref.will_init()
850 204429 : },
851 204429 : &RequestContextBuilder::extend(ctx)
852 204429 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
853 204429 : .build(),
854 204429 : )
855 20923 : .await?;
856 :
857 204429 : let ctx = &RequestContextBuilder::extend(ctx)
858 204429 : .page_content_kind(PageContentKind::DeltaLayerValue)
859 204429 : .build();
860 204429 :
861 204429 : // Ok, 'offsets' now contains the offsets of all the entries we need to read
862 204429 : let cursor = block_reader.block_cursor();
863 204429 : let mut buf = Vec::new();
864 204473 : for (entry_lsn, pos) in offsets {
865 121952 : cursor
866 121952 : .read_blob_into_buf(pos, &mut buf, ctx)
867 7958 : .await
868 121952 : .with_context(|| {
869 0 : format!("Failed to read blob from virtual file {}", self.file.path)
870 121952 : })?;
871 121952 : let val = Value::des(&buf).with_context(|| {
872 0 : format!(
873 0 : "Failed to deserialize file blob from virtual file {}",
874 0 : self.file.path
875 0 : )
876 121952 : })?;
877 121952 : match val {
878 121908 : Value::Image(img) => {
879 121908 : reconstruct_state.img = Some((entry_lsn, img));
880 121908 : need_image = false;
881 121908 : break;
882 : }
883 44 : Value::WalRecord(rec) => {
884 44 : let will_init = rec.will_init();
885 44 : reconstruct_state.records.push((entry_lsn, rec));
886 44 : if will_init {
887 : // This WAL record initializes the page, so no need to go further back
888 0 : need_image = false;
889 0 : break;
890 44 : }
891 : }
892 : }
893 : }
894 :
895 : // If an older page image is needed to reconstruct the page, let the
896 : // caller know.
897 204429 : if need_image {
898 82521 : Ok(ValueReconstructResult::Continue)
899 : } else {
900 121908 : Ok(ValueReconstructResult::Complete)
901 : }
902 204429 : }
903 :
904 : // Look up the keys in the provided keyspace and update
905 : // the reconstruct state with whatever is found.
906 : //
907 : // If the key is cached, go no further than the cached Lsn.
908 : //
909 : // Currently, the index is visited for each range, but this
910 : // can be further optimised to visit the index only once.
911 152 : pub(super) async fn get_values_reconstruct_data(
912 152 : &self,
913 152 : keyspace: KeySpace,
914 152 : lsn_range: Range<Lsn>,
915 152 : reconstruct_state: &mut ValuesReconstructState,
916 152 : ctx: &RequestContext,
917 152 : ) -> Result<(), GetVectoredError> {
918 152 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
919 152 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
920 152 : self.index_start_blk,
921 152 : self.index_root_blk,
922 152 : block_reader,
923 152 : );
924 152 :
925 152 : let planner = VectoredReadPlanner::new(
926 152 : self.max_vectored_read_bytes
927 152 : .expect("Layer is loaded with max vectored bytes config")
928 152 : .0
929 152 : .into(),
930 152 : );
931 152 :
932 152 : let data_end_offset = self.index_start_offset();
933 :
934 152 : let reads = Self::plan_reads(
935 152 : &keyspace,
936 152 : lsn_range.clone(),
937 152 : data_end_offset,
938 152 : index_reader,
939 152 : planner,
940 152 : reconstruct_state,
941 152 : ctx,
942 152 : )
943 1425 : .await
944 152 : .map_err(GetVectoredError::Other)?;
945 :
946 152 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
947 8676 : .await;
948 :
949 152 : reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
950 152 :
951 152 : Ok(())
952 152 : }
953 :
954 : /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
955 0 : pub(super) async fn load_key_values(
956 0 : &self,
957 0 : ctx: &RequestContext,
958 0 : ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
959 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
960 0 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
961 0 : self.index_start_blk,
962 0 : self.index_root_blk,
963 0 : block_reader,
964 0 : );
965 0 : let mut result = Vec::new();
966 0 : let mut stream =
967 0 : Box::pin(self.stream_index_forwards(index_reader, &[0; DELTA_KEY_SIZE], ctx));
968 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
969 0 : let cursor = block_reader.block_cursor();
970 0 : let mut buf = Vec::new();
971 0 : while let Some(item) = stream.next().await {
972 0 : let (key, lsn, pos) = item?;
973 : // TODO: dedup code with get_reconstruct_value
974 : // TODO: ctx handling and sharding
975 0 : cursor
976 0 : .read_blob_into_buf(pos.pos(), &mut buf, ctx)
977 0 : .await
978 0 : .with_context(|| {
979 0 : format!("Failed to read blob from virtual file {}", self.file.path)
980 0 : })?;
981 0 : let val = Value::des(&buf).with_context(|| {
982 0 : format!(
983 0 : "Failed to deserialize file blob from virtual file {}",
984 0 : self.file.path
985 0 : )
986 0 : })?;
987 0 : result.push((key, lsn, val));
988 : }
989 0 : Ok(result)
990 0 : }
991 :
992 354 : async fn plan_reads<Reader>(
993 354 : keyspace: &KeySpace,
994 354 : lsn_range: Range<Lsn>,
995 354 : data_end_offset: u64,
996 354 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
997 354 : mut planner: VectoredReadPlanner,
998 354 : reconstruct_state: &mut ValuesReconstructState,
999 354 : ctx: &RequestContext,
1000 354 : ) -> anyhow::Result<Vec<VectoredRead>>
1001 354 : where
1002 354 : Reader: BlockReader + Clone,
1003 354 : {
1004 354 : let ctx = RequestContextBuilder::extend(ctx)
1005 354 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1006 354 : .build();
1007 :
1008 42673 : for range in keyspace.ranges.iter() {
1009 42673 : let mut range_end_handled = false;
1010 42673 :
1011 42673 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
1012 42673 : let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
1013 42673 : let mut index_stream = std::pin::pin!(index_stream);
1014 :
1015 189810 : while let Some(index_entry) = index_stream.next().await {
1016 189599 : let (raw_key, value) = index_entry?;
1017 189599 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
1018 189599 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
1019 189599 : let blob_ref = BlobRef(value);
1020 189599 :
1021 189599 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
1022 189599 : assert!(key >= range.start);
1023 :
1024 189599 : let outside_lsn_range = !lsn_range.contains(&lsn);
1025 189599 : let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
1026 :
1027 189599 : let flag = {
1028 189599 : if outside_lsn_range || below_cached_lsn {
1029 71004 : BlobFlag::Ignore
1030 118595 : } else if blob_ref.will_init() {
1031 60947 : BlobFlag::ReplaceAll
1032 : } else {
1033 : // Usual path: add blob to the read
1034 57648 : BlobFlag::None
1035 : }
1036 : };
1037 :
1038 189599 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
1039 42462 : planner.handle_range_end(blob_ref.pos());
1040 42462 : range_end_handled = true;
1041 42462 : break;
1042 147137 : } else {
1043 147137 : planner.handle(key, lsn, blob_ref.pos(), flag);
1044 147137 : }
1045 : }
1046 :
1047 42673 : if !range_end_handled {
1048 211 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
1049 211 : planner.handle_range_end(data_end_offset);
1050 42462 : }
1051 : }
1052 :
1053 354 : Ok(planner.finish())
1054 354 : }
1055 :
1056 352 : fn get_min_read_buffer_size(
1057 352 : planned_reads: &[VectoredRead],
1058 352 : read_size_soft_max: usize,
1059 352 : ) -> usize {
1060 36924 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
1061 12 : return read_size_soft_max;
1062 : };
1063 :
1064 340 : let largest_read_size = largest_read.size();
1065 340 : if largest_read_size > read_size_soft_max {
1066 : // If the read is oversized, it should only contain one key.
1067 200 : let offenders = largest_read
1068 200 : .blobs_at
1069 200 : .as_slice()
1070 200 : .iter()
1071 200 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
1072 200 : .join(", ");
1073 200 : tracing::warn!(
1074 0 : "Oversized vectored read ({} > {}) for keys {}",
1075 : largest_read_size,
1076 : read_size_soft_max,
1077 : offenders
1078 : );
1079 140 : }
1080 :
1081 340 : largest_read_size
1082 352 : }
1083 :
1084 152 : async fn do_reads_and_update_state(
1085 152 : &self,
1086 152 : reads: Vec<VectoredRead>,
1087 152 : reconstruct_state: &mut ValuesReconstructState,
1088 152 : ctx: &RequestContext,
1089 152 : ) {
1090 152 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
1091 152 : let mut ignore_key_with_err = None;
1092 152 :
1093 152 : let max_vectored_read_bytes = self
1094 152 : .max_vectored_read_bytes
1095 152 : .expect("Layer is loaded with max vectored bytes config")
1096 152 : .0
1097 152 : .into();
1098 152 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
1099 152 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1100 :
1101 : // Note that reads are processed in reverse order (from highest key+lsn).
1102 : // This is the order that `ReconstructState` requires such that it can
1103 : // track when a key is done.
1104 17200 : for read in reads.into_iter().rev() {
1105 17200 : let res = vectored_blob_reader
1106 17200 : .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
1107 8676 : .await;
1108 :
1109 17200 : let blobs_buf = match res {
1110 17200 : Ok(blobs_buf) => blobs_buf,
1111 0 : Err(err) => {
1112 0 : let kind = err.kind();
1113 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
1114 0 : reconstruct_state.on_key_error(
1115 0 : blob_meta.key,
1116 0 : PageReconstructError::from(anyhow!(
1117 0 : "Failed to read blobs from virtual file {}: {}",
1118 0 : self.file.path,
1119 0 : kind
1120 0 : )),
1121 0 : );
1122 0 : }
1123 :
1124 : // We have "lost" the buffer since the lower level IO api
1125 : // doesn't return the buffer on error. Allocate a new one.
1126 0 : buf = Some(BytesMut::with_capacity(buf_size));
1127 0 :
1128 0 : continue;
1129 : }
1130 : };
1131 :
1132 28120 : for meta in blobs_buf.blobs.iter().rev() {
1133 28120 : if Some(meta.meta.key) == ignore_key_with_err {
1134 0 : continue;
1135 28120 : }
1136 28120 :
1137 28120 : let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
1138 28120 : let value = match value {
1139 28120 : Ok(v) => v,
1140 0 : Err(e) => {
1141 0 : reconstruct_state.on_key_error(
1142 0 : meta.meta.key,
1143 0 : PageReconstructError::from(anyhow!(e).context(format!(
1144 0 : "Failed to deserialize blob from virtual file {}",
1145 0 : self.file.path,
1146 0 : ))),
1147 0 : );
1148 0 :
1149 0 : ignore_key_with_err = Some(meta.meta.key);
1150 0 : continue;
1151 : }
1152 : };
1153 :
1154 : // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
1155 : // state, no further updates shall be made to it. The call below will
1156 : // panic if the invariant is violated.
1157 28120 : reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
1158 : }
1159 :
1160 17200 : buf = Some(blobs_buf.buf);
1161 : }
1162 152 : }
1163 :
1164 406 : pub(super) async fn load_keys<'a>(
1165 406 : &'a self,
1166 406 : ctx: &RequestContext,
1167 406 : ) -> Result<Vec<DeltaEntry<'a>>> {
1168 406 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1169 406 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1170 406 : self.index_start_blk,
1171 406 : self.index_root_blk,
1172 406 : block_reader,
1173 406 : );
1174 406 :
1175 406 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1176 406 :
1177 406 : tree_reader
1178 406 : .visit(
1179 406 : &[0u8; DELTA_KEY_SIZE],
1180 406 : VisitDirection::Forwards,
1181 2064046 : |key, value| {
1182 2064046 : let delta_key = DeltaKey::from_slice(key);
1183 2064046 : let val_ref = ValueRef {
1184 2064046 : blob_ref: BlobRef(value),
1185 2064046 : layer: self,
1186 2064046 : };
1187 2064046 : let pos = BlobRef(value).pos();
1188 2064046 : if let Some(last) = all_keys.last_mut() {
1189 2063640 : // subtract offset of the current and last entries to get the size
1190 2063640 : // of the value associated with this (key, lsn) tuple
1191 2063640 : let first_pos = last.size;
1192 2063640 : last.size = pos - first_pos;
1193 2063640 : }
1194 2064046 : let entry = DeltaEntry {
1195 2064046 : key: delta_key.key(),
1196 2064046 : lsn: delta_key.lsn(),
1197 2064046 : size: pos,
1198 2064046 : val: val_ref,
1199 2064046 : };
1200 2064046 : all_keys.push(entry);
1201 2064046 : true
1202 2064046 : },
1203 406 : &RequestContextBuilder::extend(ctx)
1204 406 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1205 406 : .build(),
1206 406 : )
1207 2170 : .await?;
1208 406 : if let Some(last) = all_keys.last_mut() {
1209 406 : // Last key occupies all space till end of value storage,
1210 406 : // which corresponds to beginning of the index
1211 406 : last.size = self.index_start_offset() - last.size;
1212 406 : }
1213 406 : Ok(all_keys)
1214 406 : }
1215 :
1216 : /// Using the given writer, write out a version which has the earlier Lsns than `until`.
1217 : ///
1218 : /// Return the amount of key value records pushed to the writer.
1219 10 : pub(super) async fn copy_prefix(
1220 10 : &self,
1221 10 : writer: &mut DeltaLayerWriter,
1222 10 : until: Lsn,
1223 10 : ctx: &RequestContext,
1224 10 : ) -> anyhow::Result<usize> {
1225 10 : use crate::tenant::vectored_blob_io::{
1226 10 : BlobMeta, VectoredReadBuilder, VectoredReadExtended,
1227 10 : };
1228 10 : use futures::stream::TryStreamExt;
1229 10 :
1230 10 : #[derive(Debug)]
1231 10 : enum Item {
1232 10 : Actual(Key, Lsn, BlobRef),
1233 10 : Sentinel,
1234 10 : }
1235 10 :
1236 10 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1237 70 : fn from(value: Item) -> Self {
1238 70 : match value {
1239 60 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1240 10 : Item::Sentinel => None,
1241 10 : }
1242 70 : }
1243 10 : }
1244 10 :
1245 10 : impl Item {
1246 70 : fn offset(&self) -> Option<BlobRef> {
1247 70 : match self {
1248 60 : Item::Actual(_, _, blob) => Some(*blob),
1249 10 : Item::Sentinel => None,
1250 10 : }
1251 70 : }
1252 10 :
1253 70 : fn is_last(&self) -> bool {
1254 70 : matches!(self, Item::Sentinel)
1255 70 : }
1256 10 : }
1257 10 :
1258 10 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1259 10 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1260 10 : self.index_start_blk,
1261 10 : self.index_root_blk,
1262 10 : block_reader,
1263 10 : );
1264 10 :
1265 10 : let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1266 60 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1267 10 : // put in a sentinel value for getting the end offset for last item, and not having to
1268 10 : // repeat the whole read part
1269 10 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1270 10 : Item::Sentinel,
1271 10 : ))));
1272 10 : let mut stream = std::pin::pin!(stream);
1273 10 :
1274 10 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1275 10 :
1276 10 : let mut read_builder: Option<VectoredReadBuilder> = None;
1277 10 :
1278 10 : let max_read_size = self
1279 10 : .max_vectored_read_bytes
1280 10 : .map(|x| x.0.get())
1281 10 : .unwrap_or(8192);
1282 10 :
1283 10 : let mut buffer = Some(BytesMut::with_capacity(max_read_size));
1284 10 :
1285 10 : // FIXME: buffering of DeltaLayerWriter
1286 10 : let mut per_blob_copy = Vec::new();
1287 10 :
1288 10 : let mut records = 0;
1289 :
1290 80 : while let Some(item) = stream.try_next().await? {
1291 70 : tracing::debug!(?item, "popped");
1292 70 : let offset = item
1293 70 : .offset()
1294 70 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1295 :
1296 70 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1297 60 : let end_offset = offset;
1298 60 :
1299 60 : Some((BlobMeta { key, lsn }, start_offset..end_offset))
1300 : } else {
1301 10 : None
1302 : };
1303 :
1304 70 : let is_last = item.is_last();
1305 70 :
1306 70 : prev = Option::from(item);
1307 70 :
1308 70 : let actionable = actionable.filter(|x| x.0.lsn < until);
1309 :
1310 70 : let builder = if let Some((meta, offsets)) = actionable {
1311 : // extend or create a new builder
1312 32 : if read_builder
1313 32 : .as_mut()
1314 32 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1315 32 : .unwrap_or(VectoredReadExtended::No)
1316 32 : == VectoredReadExtended::Yes
1317 : {
1318 16 : None
1319 : } else {
1320 16 : read_builder.replace(VectoredReadBuilder::new(
1321 16 : offsets.start.pos(),
1322 16 : offsets.end.pos(),
1323 16 : meta,
1324 16 : max_read_size,
1325 16 : ))
1326 : }
1327 : } else {
1328 : // nothing to do, except perhaps flush any existing for the last element
1329 38 : None
1330 : };
1331 :
1332 : // flush the possible older builder and also the new one if the item was the last one
1333 70 : let builders = builder.into_iter();
1334 70 : let builders = if is_last {
1335 10 : builders.chain(read_builder.take())
1336 : } else {
1337 60 : builders.chain(None)
1338 : };
1339 :
1340 86 : for builder in builders {
1341 16 : let read = builder.build();
1342 16 :
1343 16 : let reader = VectoredBlobReader::new(&self.file);
1344 16 :
1345 16 : let mut buf = buffer.take().unwrap();
1346 16 :
1347 16 : buf.clear();
1348 16 : buf.reserve(read.size());
1349 16 : let res = reader.read_blobs(&read, buf, ctx).await?;
1350 :
1351 48 : for blob in res.blobs {
1352 32 : let key = blob.meta.key;
1353 32 : let lsn = blob.meta.lsn;
1354 32 : let data = &res.buf[blob.start..blob.end];
1355 32 :
1356 32 : #[cfg(debug_assertions)]
1357 32 : Value::des(data)
1358 32 : .with_context(|| {
1359 0 : format!(
1360 0 : "blob failed to deserialize for {}@{}, {}..{}: {:?}",
1361 0 : blob.meta.key,
1362 0 : blob.meta.lsn,
1363 0 : blob.start,
1364 0 : blob.end,
1365 0 : utils::Hex(data)
1366 0 : )
1367 32 : })
1368 32 : .unwrap();
1369 32 :
1370 32 : // is it an image or will_init walrecord?
1371 32 : // FIXME: this could be handled by threading the BlobRef to the
1372 32 : // VectoredReadBuilder
1373 32 : let will_init = crate::repository::ValueBytes::will_init(data)
1374 32 : .inspect_err(|_e| {
1375 0 : #[cfg(feature = "testing")]
1376 0 : tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
1377 32 : })
1378 32 : .unwrap_or(false);
1379 32 :
1380 32 : per_blob_copy.clear();
1381 32 : per_blob_copy.extend_from_slice(data);
1382 :
1383 32 : let (tmp, res) = writer
1384 32 : .put_value_bytes(
1385 32 : key,
1386 32 : lsn,
1387 32 : std::mem::take(&mut per_blob_copy),
1388 32 : will_init,
1389 32 : ctx,
1390 32 : )
1391 4 : .await;
1392 32 : per_blob_copy = tmp;
1393 32 :
1394 32 : res?;
1395 :
1396 32 : records += 1;
1397 : }
1398 :
1399 16 : buffer = Some(res.buf);
1400 : }
1401 : }
1402 :
1403 10 : assert!(
1404 10 : read_builder.is_none(),
1405 0 : "with the sentinel above loop should had handled all"
1406 : );
1407 :
1408 10 : Ok(records)
1409 10 : }
1410 :
1411 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1412 4 : println!(
1413 4 : "index_start_blk: {}, root {}",
1414 4 : self.index_start_blk, self.index_root_blk
1415 4 : );
1416 4 :
1417 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1418 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1419 4 : self.index_start_blk,
1420 4 : self.index_root_blk,
1421 4 : block_reader,
1422 4 : );
1423 4 :
1424 4 : tree_reader.dump().await?;
1425 :
1426 4 : let keys = self.load_keys(ctx).await?;
1427 :
1428 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1429 8 : let buf = val.load_raw(ctx).await?;
1430 8 : let val = Value::des(&buf)?;
1431 8 : let desc = match val {
1432 8 : Value::Image(img) => {
1433 8 : format!(" img {} bytes", img.len())
1434 : }
1435 0 : Value::WalRecord(rec) => {
1436 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
1437 0 : format!(
1438 0 : " rec {} bytes will_init: {} {}",
1439 0 : buf.len(),
1440 0 : rec.will_init(),
1441 0 : wal_desc
1442 0 : )
1443 : }
1444 : };
1445 8 : Ok(desc)
1446 8 : }
1447 :
1448 12 : for entry in keys {
1449 8 : let DeltaEntry { key, lsn, val, .. } = entry;
1450 8 : let desc = match dump_blob(&val, ctx).await {
1451 8 : Ok(desc) => desc,
1452 0 : Err(err) => {
1453 0 : format!("ERROR: {err}")
1454 : }
1455 : };
1456 8 : println!(" key {key} at {lsn}: {desc}");
1457 8 :
1458 8 : // Print more details about CHECKPOINT records. Would be nice to print details
1459 8 : // of many other record types too, but these are particularly interesting, as
1460 8 : // have a lot of special processing for them in walingest.rs.
1461 8 : use pageserver_api::key::CHECKPOINT_KEY;
1462 8 : use postgres_ffi::CheckPoint;
1463 8 : if key == CHECKPOINT_KEY {
1464 0 : let val = val.load(ctx).await?;
1465 0 : match val {
1466 0 : Value::Image(img) => {
1467 0 : let checkpoint = CheckPoint::decode(&img)?;
1468 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1469 : }
1470 0 : Value::WalRecord(_rec) => {
1471 0 : println!(" unexpected walrecord value for checkpoint key");
1472 0 : }
1473 : }
1474 8 : }
1475 : }
1476 :
1477 4 : Ok(())
1478 4 : }
1479 :
1480 30 : fn stream_index_forwards<'a, R>(
1481 30 : &'a self,
1482 30 : reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
1483 30 : start: &'a [u8; DELTA_KEY_SIZE],
1484 30 : ctx: &'a RequestContext,
1485 30 : ) -> impl futures::stream::Stream<
1486 30 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1487 30 : > + 'a
1488 30 : where
1489 30 : R: BlockReader + 'a,
1490 30 : {
1491 30 : use futures::stream::TryStreamExt;
1492 30 : let stream = reader.into_stream(start, ctx);
1493 152 : stream.map_ok(|(key, value)| {
1494 152 : let key = DeltaKey::from_slice(&key);
1495 152 : let (key, lsn) = (key.key(), key.lsn());
1496 152 : let offset = BlobRef(value);
1497 152 :
1498 152 : (key, lsn, offset)
1499 152 : })
1500 30 : }
1501 :
1502 : /// The file offset to the first block of index.
1503 : ///
1504 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1505 690 : fn index_start_offset(&self) -> u64 {
1506 690 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1507 690 : let bref = BlobRef(offset);
1508 690 : tracing::debug!(
1509 : index_start_blk = self.index_start_blk,
1510 : offset,
1511 0 : pos = bref.pos(),
1512 0 : "index_start_offset"
1513 : );
1514 690 : offset
1515 690 : }
1516 :
1517 90 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
1518 90 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1519 90 : let tree_reader =
1520 90 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
1521 90 : DeltaLayerIterator {
1522 90 : delta_layer: self,
1523 90 : ctx,
1524 90 : index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
1525 90 : key_values_batch: std::collections::VecDeque::new(),
1526 90 : is_end: false,
1527 90 : planner: StreamingVectoredReadPlanner::new(
1528 90 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1529 90 : 1024, // The default value. Unit tests might use a different value
1530 90 : ),
1531 90 : }
1532 90 : }
1533 : }
1534 :
1535 : /// A set of data associated with a delta layer key and its value
1536 : pub struct DeltaEntry<'a> {
1537 : pub key: Key,
1538 : pub lsn: Lsn,
1539 : /// Size of the stored value
1540 : pub size: u64,
1541 : /// Reference to the on-disk value
1542 : pub val: ValueRef<'a>,
1543 : }
1544 :
1545 : /// Reference to an on-disk value
1546 : pub struct ValueRef<'a> {
1547 : blob_ref: BlobRef,
1548 : layer: &'a DeltaLayerInner,
1549 : }
1550 :
1551 : impl<'a> ValueRef<'a> {
1552 : /// Loads the value from disk
1553 2064038 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1554 2064038 : let buf = self.load_raw(ctx).await?;
1555 2064038 : let val = Value::des(&buf)?;
1556 2064038 : Ok(val)
1557 2064038 : }
1558 :
1559 2064046 : async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
1560 2064046 : let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
1561 2064046 : self.layer,
1562 2064046 : )));
1563 2064046 : let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
1564 2064046 : Ok(buf)
1565 2064046 : }
1566 : }
1567 :
1568 : pub(crate) struct Adapter<T>(T);
1569 :
1570 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1571 2083002 : pub(crate) async fn read_blk(
1572 2083002 : &self,
1573 2083002 : blknum: u32,
1574 2083002 : ctx: &RequestContext,
1575 2083002 : ) -> Result<BlockLease, std::io::Error> {
1576 2083002 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1577 2083002 : block_reader.read_blk(blknum, ctx).await
1578 2083002 : }
1579 : }
1580 :
1581 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1582 4166004 : fn as_ref(&self) -> &DeltaLayerInner {
1583 4166004 : self
1584 4166004 : }
1585 : }
1586 :
1587 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1588 0 : fn key(&self) -> Key {
1589 0 : self.key
1590 0 : }
1591 0 : fn lsn(&self) -> Lsn {
1592 0 : self.lsn
1593 0 : }
1594 0 : fn size(&self) -> u64 {
1595 0 : self.size
1596 0 : }
1597 : }
1598 :
1599 : pub struct DeltaLayerIterator<'a> {
1600 : delta_layer: &'a DeltaLayerInner,
1601 : ctx: &'a RequestContext,
1602 : planner: StreamingVectoredReadPlanner,
1603 : index_iter: DiskBtreeIterator<'a>,
1604 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1605 : is_end: bool,
1606 : }
1607 :
1608 : impl<'a> DeltaLayerIterator<'a> {
1609 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1610 18976 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1611 18976 : assert!(self.key_values_batch.is_empty());
1612 18976 : assert!(!self.is_end);
1613 :
1614 18976 : let plan = loop {
1615 34456 : if let Some(res) = self.index_iter.next().await {
1616 34394 : let (raw_key, value) = res?;
1617 34394 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
1618 34394 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
1619 34394 : let blob_ref = BlobRef(value);
1620 34394 : let offset = blob_ref.pos();
1621 34394 : if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
1622 18914 : break batch_plan;
1623 15480 : }
1624 : } else {
1625 62 : self.is_end = true;
1626 62 : let data_end_offset = self.delta_layer.index_start_offset();
1627 62 : if let Some(item) = self.planner.handle_range_end(data_end_offset) {
1628 62 : break item;
1629 : } else {
1630 0 : return Ok(()); // TODO: test empty iterator
1631 : }
1632 : }
1633 : };
1634 18976 : let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
1635 18976 : let mut next_batch = std::collections::VecDeque::new();
1636 18976 : let buf_size = plan.size();
1637 18976 : let buf = BytesMut::with_capacity(buf_size);
1638 18976 : let blobs_buf = vectored_blob_reader
1639 18976 : .read_blobs(&plan, buf, self.ctx)
1640 9637 : .await?;
1641 18976 : let frozen_buf = blobs_buf.buf.freeze();
1642 34366 : for meta in blobs_buf.blobs.iter() {
1643 34366 : let value = Value::des(&frozen_buf[meta.start..meta.end])?;
1644 34366 : next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
1645 : }
1646 18976 : self.key_values_batch = next_batch;
1647 18976 : Ok(())
1648 18976 : }
1649 :
1650 34192 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1651 34192 : if self.key_values_batch.is_empty() {
1652 18988 : if self.is_end {
1653 96 : return Ok(None);
1654 18892 : }
1655 18892 : self.next_batch().await?;
1656 15204 : }
1657 34096 : Ok(Some(
1658 34096 : self.key_values_batch
1659 34096 : .pop_front()
1660 34096 : .expect("should not be empty"),
1661 34096 : ))
1662 34192 : }
1663 : }
1664 :
1665 : #[cfg(test)]
1666 : pub(crate) mod test {
1667 : use std::collections::BTreeMap;
1668 :
1669 : use itertools::MinMaxResult;
1670 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1671 : use rand::RngCore;
1672 :
1673 : use super::*;
1674 : use crate::repository::Value;
1675 : use crate::tenant::harness::TIMELINE_ID;
1676 : use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
1677 : use crate::tenant::Tenant;
1678 : use crate::{
1679 : context::DownloadBehavior,
1680 : task_mgr::TaskKind,
1681 : tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
1682 : DEFAULT_PG_VERSION,
1683 : };
1684 : use bytes::Bytes;
1685 :
1686 : /// Construct an index for a fictional delta layer and and then
1687 : /// traverse in order to plan vectored reads for a query. Finally,
1688 : /// verify that the traversal fed the right index key and value
1689 : /// pairs into the planner.
1690 : #[tokio::test]
1691 2 : async fn test_delta_layer_index_traversal() {
1692 2 : let base_key = Key {
1693 2 : field1: 0,
1694 2 : field2: 1663,
1695 2 : field3: 12972,
1696 2 : field4: 16396,
1697 2 : field5: 0,
1698 2 : field6: 246080,
1699 2 : };
1700 2 :
1701 2 : // Populate the index with some entries
1702 2 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1703 2 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1704 2 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1705 2 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1706 2 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1707 2 : ]);
1708 2 :
1709 2 : let mut disk = TestDisk::default();
1710 2 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1711 2 :
1712 2 : let mut disk_offset = 0;
1713 10 : for (key, lsns) in &entries {
1714 42 : for lsn in lsns {
1715 34 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1716 34 : let blob_ref = BlobRef::new(disk_offset, false);
1717 34 : writer
1718 34 : .append(&index_key.0, blob_ref.0)
1719 34 : .expect("In memory disk append should never fail");
1720 34 :
1721 34 : disk_offset += 1;
1722 34 : }
1723 2 : }
1724 2 :
1725 2 : // Prepare all the arguments for the call into `plan_reads` below
1726 2 : let (root_offset, _writer) = writer
1727 2 : .finish()
1728 2 : .expect("In memory disk finish should never fail");
1729 2 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1730 2 : let planner = VectoredReadPlanner::new(100);
1731 2 : let mut reconstruct_state = ValuesReconstructState::new();
1732 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1733 2 :
1734 2 : let keyspace = KeySpace {
1735 2 : ranges: vec![
1736 2 : base_key..base_key.add(3),
1737 2 : base_key.add(3)..base_key.add(100),
1738 2 : ],
1739 2 : };
1740 2 : let lsn_range = Lsn(2)..Lsn(40);
1741 2 :
1742 2 : // Plan and validate
1743 2 : let vectored_reads = DeltaLayerInner::plan_reads(
1744 2 : &keyspace,
1745 2 : lsn_range.clone(),
1746 2 : disk_offset,
1747 2 : reader,
1748 2 : planner,
1749 2 : &mut reconstruct_state,
1750 2 : &ctx,
1751 2 : )
1752 2 : .await
1753 2 : .expect("Read planning should not fail");
1754 2 :
1755 2 : validate(keyspace, lsn_range, vectored_reads, entries);
1756 2 : }
1757 :
1758 2 : fn validate(
1759 2 : keyspace: KeySpace,
1760 2 : lsn_range: Range<Lsn>,
1761 2 : vectored_reads: Vec<VectoredRead>,
1762 2 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1763 2 : ) {
1764 2 : #[derive(Debug, PartialEq, Eq)]
1765 2 : struct BlobSpec {
1766 2 : key: Key,
1767 2 : lsn: Lsn,
1768 2 : at: u64,
1769 2 : }
1770 2 :
1771 2 : let mut planned_blobs = Vec::new();
1772 8 : for read in vectored_reads {
1773 28 : for (at, meta) in read.blobs_at.as_slice() {
1774 28 : planned_blobs.push(BlobSpec {
1775 28 : key: meta.key,
1776 28 : lsn: meta.lsn,
1777 28 : at: *at,
1778 28 : });
1779 28 : }
1780 : }
1781 :
1782 2 : let mut expected_blobs = Vec::new();
1783 2 : let mut disk_offset = 0;
1784 10 : for (key, lsns) in index_entries {
1785 42 : for lsn in lsns {
1786 42 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1787 34 : let lsn_included = lsn_range.contains(&lsn);
1788 34 :
1789 34 : if key_included && lsn_included {
1790 28 : expected_blobs.push(BlobSpec {
1791 28 : key,
1792 28 : lsn,
1793 28 : at: disk_offset,
1794 28 : });
1795 28 : }
1796 :
1797 34 : disk_offset += 1;
1798 : }
1799 : }
1800 :
1801 2 : assert_eq!(planned_blobs, expected_blobs);
1802 2 : }
1803 :
1804 : mod constants {
1805 : use utils::lsn::Lsn;
1806 :
1807 : /// Offset used by all lsns in this test
1808 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1809 : /// Number of unique keys including in the test data
1810 : pub(super) const KEY_COUNT: u8 = 60;
1811 : /// Max number of different lsns for each key
1812 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1813 : /// Possible value sizes for each key along with a probability weight
1814 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1815 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1816 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1817 : /// The minimum size of a key range in all the generated reads
1818 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1819 : /// The number of ranges included in each vectored read
1820 : pub(super) const RANGES_COUNT: u8 = 2;
1821 : /// The number of vectored reads performed
1822 : pub(super) const READS_COUNT: u8 = 100;
1823 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1824 : /// with values larger than the limit
1825 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1826 : }
1827 :
1828 : struct Entry {
1829 : key: Key,
1830 : lsn: Lsn,
1831 : value: Vec<u8>,
1832 : }
1833 :
1834 2 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1835 2 : let mut current_key = Key::MIN;
1836 2 :
1837 2 : let mut entries = Vec::new();
1838 122 : for _ in 0..constants::KEY_COUNT {
1839 120 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1840 120 : let mut lsns_iter =
1841 2260 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1842 2260 : Some(Lsn(lsn.0 + 0x08))
1843 2260 : });
1844 120 : let mut lsns = Vec::new();
1845 2380 : while lsns.len() < count as usize {
1846 2260 : let take = rng.gen_bool(0.5);
1847 2260 : let lsn = lsns_iter.next().unwrap();
1848 2260 : if take {
1849 1112 : lsns.push(lsn);
1850 1148 : }
1851 : }
1852 :
1853 1232 : for lsn in lsns {
1854 1112 : let size = constants::VALUE_SIZES
1855 3336 : .choose_weighted(rng, |item| item.1)
1856 1112 : .unwrap()
1857 1112 : .0;
1858 1112 : let mut buf = vec![0; size];
1859 1112 : rng.fill_bytes(&mut buf);
1860 1112 :
1861 1112 : entries.push(Entry {
1862 1112 : key: current_key,
1863 1112 : lsn,
1864 1112 : value: buf,
1865 1112 : })
1866 : }
1867 :
1868 120 : let gap = constants::KEY_GAP_CHANGES
1869 240 : .choose_weighted(rng, |item| item.1)
1870 120 : .unwrap()
1871 120 : .0;
1872 120 : if gap {
1873 38 : current_key = current_key.add(2);
1874 82 : } else {
1875 82 : current_key = current_key.add(1);
1876 82 : }
1877 : }
1878 :
1879 2 : entries
1880 2 : }
1881 :
1882 : struct EntriesMeta {
1883 : key_range: Range<Key>,
1884 : lsn_range: Range<Lsn>,
1885 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1886 : }
1887 :
1888 2 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1889 1112 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1890 2 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1891 0 : _ => panic!("More than one entry is always expected"),
1892 : };
1893 :
1894 1112 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1895 2 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1896 0 : _ => panic!("More than one entry is always expected"),
1897 : };
1898 :
1899 2 : let mut index = BTreeMap::new();
1900 1112 : for entry in entries.iter() {
1901 1112 : index.insert((entry.key, entry.lsn), entry.value.clone());
1902 1112 : }
1903 :
1904 2 : EntriesMeta {
1905 2 : key_range,
1906 2 : lsn_range,
1907 2 : index,
1908 2 : }
1909 2 : }
1910 :
1911 200 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1912 200 : let start = key_range.start.to_i128();
1913 200 : let end = key_range.end.to_i128();
1914 200 :
1915 200 : let mut keyspace = KeySpace::default();
1916 :
1917 600 : for _ in 0..constants::RANGES_COUNT {
1918 400 : let mut range: Option<Range<Key>> = Option::default();
1919 1244 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1920 844 : let range_start = rng.gen_range(start..end);
1921 844 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1922 844 : if range_end_offset >= end {
1923 100 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1924 744 : } else {
1925 744 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1926 744 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1927 744 : }
1928 : }
1929 400 : keyspace.ranges.push(range.unwrap());
1930 : }
1931 :
1932 200 : keyspace
1933 200 : }
1934 :
1935 : #[tokio::test]
1936 2 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1937 2 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
1938 8 : let (tenant, ctx) = harness.load().await;
1939 2 :
1940 2 : let timeline_id = TimelineId::generate();
1941 2 : let timeline = tenant
1942 2 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1943 6 : .await?;
1944 2 :
1945 2 : tracing::info!("Generating test data ...");
1946 2 :
1947 2 : let rng = &mut StdRng::seed_from_u64(0);
1948 2 : let entries = generate_entries(rng);
1949 2 : let entries_meta = get_entries_meta(&entries);
1950 2 :
1951 2 : tracing::info!("Done generating {} entries", entries.len());
1952 2 :
1953 2 : tracing::info!("Writing test data to delta layer ...");
1954 2 : let mut writer = DeltaLayerWriter::new(
1955 2 : harness.conf,
1956 2 : timeline_id,
1957 2 : harness.tenant_shard_id,
1958 2 : entries_meta.key_range.start,
1959 2 : entries_meta.lsn_range.clone(),
1960 2 : &ctx,
1961 2 : )
1962 2 : .await?;
1963 2 :
1964 1114 : for entry in entries {
1965 1112 : let (_, res) = writer
1966 1112 : .put_value_bytes(entry.key, entry.lsn, entry.value, false, &ctx)
1967 215 : .await;
1968 1112 : res?;
1969 2 : }
1970 2 :
1971 2 : let resident = writer
1972 2 : .finish(entries_meta.key_range.end, &timeline, &ctx)
1973 5 : .await?;
1974 2 :
1975 2 : let inner = resident.get_as_delta(&ctx).await?;
1976 2 :
1977 2 : let file_size = inner.file.metadata().await?.len();
1978 2 : tracing::info!(
1979 2 : "Done writing test data to delta layer. Resulting file size is: {}",
1980 2 : file_size
1981 2 : );
1982 2 :
1983 202 : for i in 0..constants::READS_COUNT {
1984 200 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1985 2 :
1986 200 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1987 200 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1988 200 : inner.index_start_blk,
1989 200 : inner.index_root_blk,
1990 200 : block_reader,
1991 200 : );
1992 200 :
1993 200 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1994 200 : let mut reconstruct_state = ValuesReconstructState::new();
1995 200 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1996 200 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1997 2 :
1998 200 : let vectored_reads = DeltaLayerInner::plan_reads(
1999 200 : &keyspace,
2000 200 : entries_meta.lsn_range.clone(),
2001 200 : data_end_offset,
2002 200 : index_reader,
2003 200 : planner,
2004 200 : &mut reconstruct_state,
2005 200 : &ctx,
2006 200 : )
2007 4 : .await?;
2008 2 :
2009 200 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
2010 200 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
2011 200 : &vectored_reads,
2012 200 : constants::MAX_VECTORED_READ_BYTES,
2013 200 : );
2014 200 : let mut buf = Some(BytesMut::with_capacity(buf_size));
2015 2 :
2016 19924 : for read in vectored_reads {
2017 19724 : let blobs_buf = vectored_blob_reader
2018 19724 : .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
2019 10016 : .await?;
2020 57304 : for meta in blobs_buf.blobs.iter() {
2021 57304 : let value = &blobs_buf.buf[meta.start..meta.end];
2022 57304 : assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
2023 2 : }
2024 2 :
2025 19724 : buf = Some(blobs_buf.buf);
2026 2 : }
2027 2 : }
2028 2 :
2029 2 : Ok(())
2030 2 : }
2031 :
2032 : #[tokio::test]
2033 2 : async fn copy_delta_prefix_smoke() {
2034 2 : use crate::walrecord::NeonWalRecord;
2035 2 : use bytes::Bytes;
2036 2 :
2037 2 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
2038 2 : .await
2039 2 : .unwrap();
2040 8 : let (tenant, ctx) = h.load().await;
2041 2 : let ctx = &ctx;
2042 2 : let timeline = tenant
2043 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
2044 6 : .await
2045 2 : .unwrap();
2046 2 :
2047 2 : let initdb_layer = timeline
2048 2 : .layers
2049 2 : .read()
2050 2 : .await
2051 2 : .likely_resident_layers()
2052 2 : .next()
2053 2 : .unwrap();
2054 2 :
2055 2 : {
2056 2 : let mut writer = timeline.writer().await;
2057 2 :
2058 2 : let data = [
2059 2 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
2060 2 : (
2061 2 : 0x30,
2062 2 : 12,
2063 2 : Value::WalRecord(NeonWalRecord::Postgres {
2064 2 : will_init: false,
2065 2 : rec: Bytes::from_static(b"1"),
2066 2 : }),
2067 2 : ),
2068 2 : (
2069 2 : 0x40,
2070 2 : 12,
2071 2 : Value::WalRecord(NeonWalRecord::Postgres {
2072 2 : will_init: true,
2073 2 : rec: Bytes::from_static(b"2"),
2074 2 : }),
2075 2 : ),
2076 2 : // build an oversized value so we cannot extend and existing read over
2077 2 : // this
2078 2 : (
2079 2 : 0x50,
2080 2 : 12,
2081 2 : Value::WalRecord(NeonWalRecord::Postgres {
2082 2 : will_init: true,
2083 2 : rec: {
2084 2 : let mut buf =
2085 2 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
2086 2 : buf.iter_mut()
2087 2 : .enumerate()
2088 264192 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
2089 2 : Bytes::from(buf)
2090 2 : },
2091 2 : }),
2092 2 : ),
2093 2 : // because the oversized read cannot be extended further, we are sure to exercise the
2094 2 : // builder created on the last round with this:
2095 2 : (
2096 2 : 0x60,
2097 2 : 12,
2098 2 : Value::WalRecord(NeonWalRecord::Postgres {
2099 2 : will_init: true,
2100 2 : rec: Bytes::from_static(b"3"),
2101 2 : }),
2102 2 : ),
2103 2 : (
2104 2 : 0x60,
2105 2 : 9,
2106 2 : Value::Image(Bytes::from_static(b"something for a different key")),
2107 2 : ),
2108 2 : ];
2109 2 :
2110 2 : let mut last_lsn = None;
2111 2 :
2112 14 : for (lsn, key, value) in data {
2113 12 : let key = Key::from_i128(key);
2114 12 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
2115 12 : last_lsn = Some(lsn);
2116 2 : }
2117 2 :
2118 2 : writer.finish_write(Lsn(last_lsn.unwrap()));
2119 2 : }
2120 2 : timeline.freeze_and_flush().await.unwrap();
2121 2 :
2122 2 : let new_layer = timeline
2123 2 : .layers
2124 2 : .read()
2125 2 : .await
2126 2 : .likely_resident_layers()
2127 4 : .find(|x| x != &initdb_layer)
2128 2 : .unwrap();
2129 2 :
2130 2 : // create a copy for the timeline, so we don't overwrite the file
2131 2 : let branch = tenant
2132 2 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
2133 2 : .await
2134 2 : .unwrap();
2135 2 :
2136 2 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
2137 2 :
2138 2 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
2139 2 : // a single key
2140 2 :
2141 12 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
2142 10 : let truncate_at = Lsn(truncate_at);
2143 2 :
2144 10 : let mut writer = DeltaLayerWriter::new(
2145 10 : tenant.conf,
2146 10 : branch.timeline_id,
2147 10 : tenant.tenant_shard_id,
2148 10 : Key::MIN,
2149 10 : Lsn(0x11)..truncate_at,
2150 10 : ctx,
2151 10 : )
2152 5 : .await
2153 10 : .unwrap();
2154 2 :
2155 10 : let new_layer = new_layer.download_and_keep_resident().await.unwrap();
2156 10 :
2157 10 : new_layer
2158 10 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
2159 15 : .await
2160 10 : .unwrap();
2161 2 :
2162 24 : let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
2163 10 :
2164 11 : copied_layer.get_as_delta(ctx).await.unwrap();
2165 10 :
2166 10 : assert_keys_and_values_eq(
2167 10 : new_layer.get_as_delta(ctx).await.unwrap(),
2168 10 : copied_layer.get_as_delta(ctx).await.unwrap(),
2169 10 : truncate_at,
2170 10 : ctx,
2171 2 : )
2172 50 : .await;
2173 2 : }
2174 2 : }
2175 :
2176 10 : async fn assert_keys_and_values_eq(
2177 10 : source: &DeltaLayerInner,
2178 10 : truncated: &DeltaLayerInner,
2179 10 : truncated_at: Lsn,
2180 10 : ctx: &RequestContext,
2181 10 : ) {
2182 10 : use futures::future::ready;
2183 10 : use futures::stream::TryStreamExt;
2184 10 :
2185 10 : let start_key = [0u8; DELTA_KEY_SIZE];
2186 10 :
2187 10 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
2188 10 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2189 10 : source.index_start_blk,
2190 10 : source.index_root_blk,
2191 10 : &source_reader,
2192 10 : );
2193 10 : let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
2194 60 : let source_stream = source_stream.filter(|res| match res {
2195 60 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
2196 0 : _ => ready(true),
2197 60 : });
2198 10 : let mut source_stream = std::pin::pin!(source_stream);
2199 10 :
2200 10 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
2201 10 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2202 10 : truncated.index_start_blk,
2203 10 : truncated.index_root_blk,
2204 10 : &truncated_reader,
2205 10 : );
2206 10 : let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
2207 10 : let mut truncated_stream = std::pin::pin!(truncated_stream);
2208 10 :
2209 10 : let mut scratch_left = Vec::new();
2210 10 : let mut scratch_right = Vec::new();
2211 :
2212 : loop {
2213 42 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
2214 42 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2215 42 :
2216 42 : if src.is_none() {
2217 10 : assert!(truncated.is_none());
2218 10 : break;
2219 32 : }
2220 32 :
2221 32 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2222 32 :
2223 32 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2224 32 : assert_eq!(src.0, truncated.0);
2225 32 : assert_eq!(src.1, truncated.1);
2226 :
2227 : // if this is needed for something else, just drop this assert.
2228 32 : assert!(
2229 32 : src.2.pos() >= truncated.2.pos(),
2230 0 : "value position should not go backwards {} vs. {}",
2231 0 : src.2.pos(),
2232 0 : truncated.2.pos()
2233 : );
2234 :
2235 32 : scratch_left.clear();
2236 32 : let src_cursor = source_reader.block_cursor();
2237 32 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2238 32 : scratch_right.clear();
2239 32 : let trunc_cursor = truncated_reader.block_cursor();
2240 32 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2241 :
2242 32 : tokio::try_join!(left, right).unwrap();
2243 32 :
2244 32 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2245 : }
2246 10 : }
2247 :
2248 18026 : pub(crate) fn sort_delta(
2249 18026 : (k1, l1, _): &(Key, Lsn, Value),
2250 18026 : (k2, l2, _): &(Key, Lsn, Value),
2251 18026 : ) -> std::cmp::Ordering {
2252 18026 : (k1, l1).cmp(&(k2, l2))
2253 18026 : }
2254 :
2255 94 : pub(crate) fn sort_delta_value(
2256 94 : (k1, l1, v1): &(Key, Lsn, Value),
2257 94 : (k2, l2, v2): &(Key, Lsn, Value),
2258 94 : ) -> std::cmp::Ordering {
2259 94 : let order_1 = if v1.is_image() { 0 } else { 1 };
2260 94 : let order_2 = if v2.is_image() { 0 } else { 1 };
2261 94 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
2262 94 : }
2263 :
2264 20 : pub(crate) async fn produce_delta_layer(
2265 20 : tenant: &Tenant,
2266 20 : tline: &Arc<Timeline>,
2267 20 : mut deltas: Vec<(Key, Lsn, Value)>,
2268 20 : ctx: &RequestContext,
2269 20 : ) -> anyhow::Result<ResidentLayer> {
2270 20 : deltas.sort_by(sort_delta);
2271 20 : let (key_start, _, _) = deltas.first().unwrap();
2272 20 : let (key_max, _, _) = deltas.last().unwrap();
2273 8040 : let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
2274 8040 : let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
2275 20 : let lsn_end = Lsn(lsn_max.0 + 1);
2276 20 : let mut writer = DeltaLayerWriter::new(
2277 20 : tenant.conf,
2278 20 : tline.timeline_id,
2279 20 : tenant.tenant_shard_id,
2280 20 : *key_start,
2281 20 : (*lsn_min)..lsn_end,
2282 20 : ctx,
2283 20 : )
2284 10 : .await?;
2285 20 : let key_end = key_max.next();
2286 :
2287 8060 : for (key, lsn, value) in deltas {
2288 8040 : writer.put_value(key, lsn, value, ctx).await?;
2289 : }
2290 58 : let delta_layer = writer.finish(key_end, tline, ctx).await?;
2291 :
2292 20 : Ok::<_, anyhow::Error>(delta_layer)
2293 20 : }
2294 :
2295 28 : async fn assert_delta_iter_equal(
2296 28 : delta_iter: &mut DeltaLayerIterator<'_>,
2297 28 : expect: &[(Key, Lsn, Value)],
2298 28 : ) {
2299 28 : let mut expect_iter = expect.iter();
2300 : loop {
2301 28028 : let o1 = delta_iter.next().await.unwrap();
2302 28028 : let o2 = expect_iter.next();
2303 28028 : assert_eq!(o1.is_some(), o2.is_some());
2304 28028 : if o1.is_none() && o2.is_none() {
2305 28 : break;
2306 28000 : }
2307 28000 : let (k1, l1, v1) = o1.unwrap();
2308 28000 : let (k2, l2, v2) = o2.unwrap();
2309 28000 : assert_eq!(&k1, k2);
2310 28000 : assert_eq!(l1, *l2);
2311 28000 : assert_eq!(&v1, v2);
2312 : }
2313 28 : }
2314 :
2315 : #[tokio::test]
2316 2 : async fn delta_layer_iterator() {
2317 2 : let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
2318 8 : let (tenant, ctx) = harness.load().await;
2319 2 :
2320 2 : let tline = tenant
2321 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2322 6 : .await
2323 2 : .unwrap();
2324 2 :
2325 2000 : fn get_key(id: u32) -> Key {
2326 2000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
2327 2000 : key.field6 = id;
2328 2000 : key
2329 2000 : }
2330 2 : const N: usize = 1000;
2331 2 : let test_deltas = (0..N)
2332 2000 : .map(|idx| {
2333 2000 : (
2334 2000 : get_key(idx as u32 / 10),
2335 2000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
2336 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
2337 2000 : )
2338 2000 : })
2339 2 : .collect_vec();
2340 2 : let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
2341 8 : .await
2342 2 : .unwrap();
2343 2 : let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
2344 6 : for max_read_size in [1, 1024] {
2345 32 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
2346 28 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
2347 28 : // Test if the batch size is correctly determined
2348 28 : let mut iter = delta_layer.iter(&ctx);
2349 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2350 28 : let mut num_items = 0;
2351 112 : for _ in 0..3 {
2352 84 : iter.next_batch().await.unwrap();
2353 84 : num_items += iter.key_values_batch.len();
2354 84 : if max_read_size == 1 {
2355 2 : // every key should be a batch b/c the value is larger than max_read_size
2356 42 : assert_eq!(iter.key_values_batch.len(), 1);
2357 2 : } else {
2358 42 : assert_eq!(iter.key_values_batch.len(), batch_size);
2359 2 : }
2360 84 : if num_items >= N {
2361 2 : break;
2362 84 : }
2363 84 : iter.key_values_batch.clear();
2364 2 : }
2365 2 : // Test if the result is correct
2366 28 : let mut iter = delta_layer.iter(&ctx);
2367 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2368 9577 : assert_delta_iter_equal(&mut iter, &test_deltas).await;
2369 2 : }
2370 2 : }
2371 2 : }
2372 : }
|