Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::{self, FileId, PAGE_SZ};
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{
37 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
38 : };
39 : use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
40 : use crate::tenant::timeline::GetVectoredError;
41 : use crate::tenant::vectored_blob_io::{
42 : BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
43 : VectoredReadPlanner,
44 : };
45 : use crate::tenant::PageReconstructError;
46 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
47 : use crate::virtual_file::IoBufferMut;
48 : use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
49 : use crate::{walrecord, TEMP_FILE_SUFFIX};
50 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
51 : use anyhow::{anyhow, bail, ensure, Context, Result};
52 : use camino::{Utf8Path, Utf8PathBuf};
53 : use futures::StreamExt;
54 : use itertools::Itertools;
55 : use pageserver_api::config::MaxVectoredReadBytes;
56 : use pageserver_api::key::DBDIR_KEY;
57 : use pageserver_api::keyspace::KeySpace;
58 : use pageserver_api::models::ImageCompressionAlgorithm;
59 : use pageserver_api::shard::TenantShardId;
60 : use rand::{distributions::Alphanumeric, Rng};
61 : use serde::{Deserialize, Serialize};
62 : use std::collections::VecDeque;
63 : use std::fs::File;
64 : use std::io::SeekFrom;
65 : use std::ops::Range;
66 : use std::os::unix::fs::FileExt;
67 : use std::str::FromStr;
68 : use std::sync::Arc;
69 : use tokio::sync::OnceCell;
70 : use tokio_epoll_uring::IoBuf;
71 : use tracing::*;
72 :
73 : use utils::{
74 : bin_ser::BeSer,
75 : id::{TenantId, TimelineId},
76 : lsn::Lsn,
77 : };
78 :
79 : use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
80 :
81 : ///
82 : /// Header stored in the beginning of the file
83 : ///
84 : /// After this comes the 'values' part, starting on block 1. After that,
85 : /// the 'index' starts at the block indicated by 'index_start_blk'
86 : ///
87 1046 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
88 : pub struct Summary {
89 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
90 : pub magic: u16,
91 : pub format_version: u16,
92 :
93 : pub tenant_id: TenantId,
94 : pub timeline_id: TimelineId,
95 : pub key_range: Range<Key>,
96 : pub lsn_range: Range<Lsn>,
97 :
98 : /// Block number where the 'index' part of the file begins.
99 : pub index_start_blk: u32,
100 : /// Block within the 'index', where the B-tree root page is stored
101 : pub index_root_blk: u32,
102 : }
103 :
104 : impl From<&DeltaLayer> for Summary {
105 0 : fn from(layer: &DeltaLayer) -> Self {
106 0 : Self::expected(
107 0 : layer.desc.tenant_shard_id.tenant_id,
108 0 : layer.desc.timeline_id,
109 0 : layer.desc.key_range.clone(),
110 0 : layer.desc.lsn_range.clone(),
111 0 : )
112 0 : }
113 : }
114 :
115 : impl Summary {
116 1046 : pub(super) fn expected(
117 1046 : tenant_id: TenantId,
118 1046 : timeline_id: TimelineId,
119 1046 : keys: Range<Key>,
120 1046 : lsns: Range<Lsn>,
121 1046 : ) -> Self {
122 1046 : Self {
123 1046 : magic: DELTA_FILE_MAGIC,
124 1046 : format_version: STORAGE_FORMAT_VERSION,
125 1046 :
126 1046 : tenant_id,
127 1046 : timeline_id,
128 1046 : key_range: keys,
129 1046 : lsn_range: lsns,
130 1046 :
131 1046 : index_start_blk: 0,
132 1046 : index_root_blk: 0,
133 1046 : }
134 1046 : }
135 : }
136 :
137 : // Flag indicating that this version initialize the page
138 : const WILL_INIT: u64 = 1;
139 :
140 : /// Struct representing reference to BLOB in layers.
141 : ///
142 : /// Reference contains BLOB offset, and for WAL records it also contains
143 : /// `will_init` flag. The flag helps to determine the range of records
144 : /// that needs to be applied, without reading/deserializing records themselves.
145 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
146 : pub struct BlobRef(pub u64);
147 :
148 : impl BlobRef {
149 510541 : pub fn will_init(&self) -> bool {
150 510541 : (self.0 & WILL_INIT) != 0
151 510541 : }
152 :
153 4746528 : pub fn pos(&self) -> u64 {
154 4746528 : self.0 >> 1
155 4746528 : }
156 :
157 6471322 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
158 6471322 : let mut blob_ref = pos << 1;
159 6471322 : if will_init {
160 6469870 : blob_ref |= WILL_INIT;
161 6469870 : }
162 6471322 : BlobRef(blob_ref)
163 6471322 : }
164 : }
165 :
166 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
167 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
168 :
169 : /// This is the key of the B-tree index stored in the delta layer. It consists
170 : /// of the serialized representation of a Key and LSN.
171 : impl DeltaKey {
172 2064198 : fn from_slice(buf: &[u8]) -> Self {
173 2064198 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
174 2064198 : bytes.copy_from_slice(buf);
175 2064198 : DeltaKey(bytes)
176 2064198 : }
177 :
178 6717749 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
179 6717749 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
180 6717749 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
181 6717749 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
182 6717749 : DeltaKey(bytes)
183 6717749 : }
184 :
185 2064198 : fn key(&self) -> Key {
186 2064198 : Key::from_slice(&self.0)
187 2064198 : }
188 :
189 2064198 : fn lsn(&self) -> Lsn {
190 2064198 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
191 2064198 : }
192 :
193 2682270 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
194 2682270 : let mut lsn_buf = [0u8; 8];
195 2682270 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
196 2682270 : Lsn(u64::from_be_bytes(lsn_buf))
197 2682270 : }
198 : }
199 :
200 : /// This is used only from `pagectl`. Within pageserver, all layers are
201 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
202 : pub struct DeltaLayer {
203 : path: Utf8PathBuf,
204 : pub desc: PersistentLayerDesc,
205 : inner: OnceCell<Arc<DeltaLayerInner>>,
206 : }
207 :
208 : impl std::fmt::Debug for DeltaLayer {
209 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 : use super::RangeDisplayDebug;
211 :
212 0 : f.debug_struct("DeltaLayer")
213 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
214 0 : .field("lsn_range", &self.desc.lsn_range)
215 0 : .field("file_size", &self.desc.file_size)
216 0 : .field("inner", &self.inner)
217 0 : .finish()
218 0 : }
219 : }
220 :
221 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
222 : /// file.
223 : pub struct DeltaLayerInner {
224 : // values copied from summary
225 : index_start_blk: u32,
226 : index_root_blk: u32,
227 :
228 : file: VirtualFile,
229 : file_id: FileId,
230 :
231 : layer_key_range: Range<Key>,
232 : layer_lsn_range: Range<Lsn>,
233 :
234 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
235 : }
236 :
237 : impl DeltaLayerInner {
238 0 : pub(crate) fn layer_dbg_info(&self) -> String {
239 0 : format!(
240 0 : "delta {}..{} {}..{}",
241 0 : self.key_range().start,
242 0 : self.key_range().end,
243 0 : self.lsn_range().start,
244 0 : self.lsn_range().end
245 0 : )
246 0 : }
247 : }
248 :
249 : impl std::fmt::Debug for DeltaLayerInner {
250 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251 0 : f.debug_struct("DeltaLayerInner")
252 0 : .field("index_start_blk", &self.index_start_blk)
253 0 : .field("index_root_blk", &self.index_root_blk)
254 0 : .finish()
255 0 : }
256 : }
257 :
258 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
259 : impl std::fmt::Display for DeltaLayer {
260 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 0 : write!(f, "{}", self.layer_desc().short_id())
262 0 : }
263 : }
264 :
265 : impl AsLayerDesc for DeltaLayer {
266 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
267 0 : &self.desc
268 0 : }
269 : }
270 :
271 : impl DeltaLayer {
272 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
273 0 : self.desc.dump();
274 0 :
275 0 : if !verbose {
276 0 : return Ok(());
277 0 : }
278 :
279 0 : let inner = self.load(ctx).await?;
280 :
281 0 : inner.dump(ctx).await
282 0 : }
283 :
284 1416 : fn temp_path_for(
285 1416 : conf: &PageServerConf,
286 1416 : tenant_shard_id: &TenantShardId,
287 1416 : timeline_id: &TimelineId,
288 1416 : key_start: Key,
289 1416 : lsn_range: &Range<Lsn>,
290 1416 : ) -> Utf8PathBuf {
291 1416 : let rand_string: String = rand::thread_rng()
292 1416 : .sample_iter(&Alphanumeric)
293 1416 : .take(8)
294 1416 : .map(char::from)
295 1416 : .collect();
296 1416 :
297 1416 : conf.timeline_path(tenant_shard_id, timeline_id)
298 1416 : .join(format!(
299 1416 : "{}-XXX__{:016X}-{:016X}.{}.{}",
300 1416 : key_start,
301 1416 : u64::from(lsn_range.start),
302 1416 : u64::from(lsn_range.end),
303 1416 : rand_string,
304 1416 : TEMP_FILE_SUFFIX,
305 1416 : ))
306 1416 : }
307 :
308 : ///
309 : /// Open the underlying file and read the metadata into memory, if it's
310 : /// not loaded already.
311 : ///
312 0 : async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
313 0 : // Quick exit if already loaded
314 0 : self.inner
315 0 : .get_or_try_init(|| self.load_inner(ctx))
316 0 : .await
317 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
318 0 : }
319 :
320 0 : async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
321 0 : let path = self.path();
322 :
323 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
324 :
325 : // not production code
326 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
327 0 : let expected_layer_name = self.layer_desc().layer_name();
328 0 :
329 0 : if actual_layer_name != expected_layer_name {
330 0 : println!("warning: filename does not match what is expected from in-file summary");
331 0 : println!("actual: {:?}", actual_layer_name.to_string());
332 0 : println!("expected: {:?}", expected_layer_name.to_string());
333 0 : }
334 :
335 0 : Ok(Arc::new(loaded))
336 0 : }
337 :
338 : /// Create a DeltaLayer struct representing an existing file on disk.
339 : ///
340 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
341 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
342 0 : let mut summary_buf = vec![0; PAGE_SZ];
343 0 : file.read_exact_at(&mut summary_buf, 0)?;
344 0 : let summary = Summary::des_prefix(&summary_buf)?;
345 :
346 0 : let metadata = file
347 0 : .metadata()
348 0 : .context("get file metadata to determine size")?;
349 :
350 : // This function is never used for constructing layers in a running pageserver,
351 : // so it does not need an accurate TenantShardId.
352 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
353 0 :
354 0 : Ok(DeltaLayer {
355 0 : path: path.to_path_buf(),
356 0 : desc: PersistentLayerDesc::new_delta(
357 0 : tenant_shard_id,
358 0 : summary.timeline_id,
359 0 : summary.key_range,
360 0 : summary.lsn_range,
361 0 : metadata.len(),
362 0 : ),
363 0 : inner: OnceCell::new(),
364 0 : })
365 0 : }
366 :
367 : /// Path to the layer file in pageserver workdir.
368 0 : fn path(&self) -> Utf8PathBuf {
369 0 : self.path.clone()
370 0 : }
371 : }
372 :
373 : /// A builder object for constructing a new delta layer.
374 : ///
375 : /// Usage:
376 : ///
377 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
378 : ///
379 : /// 2. Write the contents by calling `put_value` for every page
380 : /// version to store in the layer.
381 : ///
382 : /// 3. Call `finish`.
383 : ///
384 : struct DeltaLayerWriterInner {
385 : pub path: Utf8PathBuf,
386 : timeline_id: TimelineId,
387 : tenant_shard_id: TenantShardId,
388 :
389 : key_start: Key,
390 : lsn_range: Range<Lsn>,
391 :
392 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
393 :
394 : blob_writer: BlobWriter<true>,
395 :
396 : // Number of key-lsns in the layer.
397 : num_keys: usize,
398 : }
399 :
400 : impl DeltaLayerWriterInner {
401 : ///
402 : /// Start building a new delta layer.
403 : ///
404 1416 : async fn new(
405 1416 : conf: &'static PageServerConf,
406 1416 : timeline_id: TimelineId,
407 1416 : tenant_shard_id: TenantShardId,
408 1416 : key_start: Key,
409 1416 : lsn_range: Range<Lsn>,
410 1416 : ctx: &RequestContext,
411 1416 : ) -> anyhow::Result<Self> {
412 1416 : // Create the file initially with a temporary filename. We don't know
413 1416 : // the end key yet, so we cannot form the final filename yet. We will
414 1416 : // rename it when we're done.
415 1416 : //
416 1416 : // Note: This overwrites any existing file. There shouldn't be any.
417 1416 : // FIXME: throw an error instead?
418 1416 : let path =
419 1416 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
420 :
421 1416 : let mut file = VirtualFile::create(&path, ctx).await?;
422 : // make room for the header block
423 1416 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
424 1416 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
425 1416 :
426 1416 : // Initialize the b-tree index builder
427 1416 : let block_buf = BlockBuf::new();
428 1416 : let tree_builder = DiskBtreeBuilder::new(block_buf);
429 1416 :
430 1416 : Ok(Self {
431 1416 : path,
432 1416 : timeline_id,
433 1416 : tenant_shard_id,
434 1416 : key_start,
435 1416 : lsn_range,
436 1416 : tree: tree_builder,
437 1416 : blob_writer,
438 1416 : num_keys: 0,
439 1416 : })
440 1416 : }
441 :
442 : ///
443 : /// Append a key-value pair to the file.
444 : ///
445 : /// The values must be appended in key, lsn order.
446 : ///
447 2084558 : async fn put_value(
448 2084558 : &mut self,
449 2084558 : key: Key,
450 2084558 : lsn: Lsn,
451 2084558 : val: Value,
452 2084558 : ctx: &RequestContext,
453 2084558 : ) -> anyhow::Result<()> {
454 2084558 : let (_, res) = self
455 2084558 : .put_value_bytes(
456 2084558 : key,
457 2084558 : lsn,
458 2084558 : Value::ser(&val)?.slice_len(),
459 2084558 : val.will_init(),
460 2084558 : ctx,
461 : )
462 1982 : .await;
463 2084558 : res
464 2084558 : }
465 :
466 6471218 : async fn put_value_bytes<Buf>(
467 6471218 : &mut self,
468 6471218 : key: Key,
469 6471218 : lsn: Lsn,
470 6471218 : val: FullSlice<Buf>,
471 6471218 : will_init: bool,
472 6471218 : ctx: &RequestContext,
473 6471218 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
474 6471218 : where
475 6471218 : Buf: IoBuf + Send,
476 6471218 : {
477 6471218 : assert!(
478 6471218 : self.lsn_range.start <= lsn,
479 0 : "lsn_start={}, lsn={}",
480 : self.lsn_range.start,
481 : lsn
482 : );
483 : // We don't want to use compression in delta layer creation
484 6471218 : let compression = ImageCompressionAlgorithm::Disabled;
485 6471218 : let (val, res) = self
486 6471218 : .blob_writer
487 6471218 : .write_blob_maybe_compressed(val, ctx, compression)
488 4931 : .await;
489 6471218 : let off = match res {
490 6471218 : Ok((off, _)) => off,
491 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
492 : };
493 :
494 6471218 : let blob_ref = BlobRef::new(off, will_init);
495 6471218 :
496 6471218 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
497 6471218 : let res = self.tree.append(&delta_key.0, blob_ref.0);
498 6471218 :
499 6471218 : self.num_keys += 1;
500 6471218 :
501 6471218 : (val, res.map_err(|e| anyhow::anyhow!(e)))
502 6471218 : }
503 :
504 2023972 : fn size(&self) -> u64 {
505 2023972 : self.blob_writer.size() + self.tree.borrow_writer().size()
506 2023972 : }
507 :
508 : ///
509 : /// Finish writing the delta layer.
510 : ///
511 1396 : async fn finish(
512 1396 : self,
513 1396 : key_end: Key,
514 1396 : ctx: &RequestContext,
515 1396 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
516 1396 : let temp_path = self.path.clone();
517 9646 : let result = self.finish0(key_end, ctx).await;
518 1396 : if let Err(ref e) = result {
519 0 : tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
520 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
521 0 : tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
522 0 : }
523 1396 : }
524 1396 : result
525 1396 : }
526 :
527 1396 : async fn finish0(
528 1396 : self,
529 1396 : key_end: Key,
530 1396 : ctx: &RequestContext,
531 1396 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
532 1396 : let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
533 :
534 1396 : let mut file = self.blob_writer.into_inner(ctx).await?;
535 :
536 : // Write out the index
537 1396 : let (index_root_blk, block_buf) = self.tree.finish()?;
538 1396 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
539 0 : .await?;
540 15006 : for buf in block_buf.blocks {
541 13610 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
542 13610 : res?;
543 : }
544 1396 : assert!(self.lsn_range.start < self.lsn_range.end);
545 : // Fill in the summary on blk 0
546 1396 : let summary = Summary {
547 1396 : magic: DELTA_FILE_MAGIC,
548 1396 : format_version: STORAGE_FORMAT_VERSION,
549 1396 : tenant_id: self.tenant_shard_id.tenant_id,
550 1396 : timeline_id: self.timeline_id,
551 1396 : key_range: self.key_start..key_end,
552 1396 : lsn_range: self.lsn_range.clone(),
553 1396 : index_start_blk,
554 1396 : index_root_blk,
555 1396 : };
556 1396 :
557 1396 : let mut buf = Vec::with_capacity(PAGE_SZ);
558 1396 : // TODO: could use smallvec here but it's a pain with Slice<T>
559 1396 : Summary::ser_into(&summary, &mut buf)?;
560 1396 : file.seek(SeekFrom::Start(0)).await?;
561 1396 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
562 1396 : res?;
563 :
564 1396 : let metadata = file
565 1396 : .metadata()
566 701 : .await
567 1396 : .context("get file metadata to determine size")?;
568 :
569 : // 5GB limit for objects without multipart upload (which we don't want to use)
570 : // Make it a little bit below to account for differing GB units
571 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
572 1396 : ensure!(
573 1396 : metadata.len() <= S3_UPLOAD_LIMIT,
574 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
575 0 : file.path(),
576 0 : metadata.len()
577 : );
578 :
579 : // Note: Because we opened the file in write-only mode, we cannot
580 : // reuse the same VirtualFile for reading later. That's why we don't
581 : // set inner.file here. The first read will have to re-open it.
582 :
583 1396 : let desc = PersistentLayerDesc::new_delta(
584 1396 : self.tenant_shard_id,
585 1396 : self.timeline_id,
586 1396 : self.key_start..key_end,
587 1396 : self.lsn_range.clone(),
588 1396 : metadata.len(),
589 1396 : );
590 1396 :
591 1396 : // fsync the file
592 1396 : file.sync_all()
593 701 : .await
594 1396 : .maybe_fatal_err("delta_layer sync_all")?;
595 :
596 1396 : trace!("created delta layer {}", self.path);
597 :
598 1396 : Ok((desc, self.path))
599 1396 : }
600 : }
601 :
602 : /// A builder object for constructing a new delta layer.
603 : ///
604 : /// Usage:
605 : ///
606 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
607 : ///
608 : /// 2. Write the contents by calling `put_value` for every page
609 : /// version to store in the layer.
610 : ///
611 : /// 3. Call `finish`.
612 : ///
613 : /// # Note
614 : ///
615 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
616 : /// possible for the writer to drop before `finish` is actually called. So this
617 : /// could lead to odd temporary files in the directory, exhausting file system.
618 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
619 : /// implementation that cleans up the temporary file in failure. It's not
620 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
621 : /// out some fields, making it impossible to implement `Drop`.
622 : ///
623 : #[must_use]
624 : pub struct DeltaLayerWriter {
625 : inner: Option<DeltaLayerWriterInner>,
626 : }
627 :
628 : impl DeltaLayerWriter {
629 : ///
630 : /// Start building a new delta layer.
631 : ///
632 1416 : pub async fn new(
633 1416 : conf: &'static PageServerConf,
634 1416 : timeline_id: TimelineId,
635 1416 : tenant_shard_id: TenantShardId,
636 1416 : key_start: Key,
637 1416 : lsn_range: Range<Lsn>,
638 1416 : ctx: &RequestContext,
639 1416 : ) -> anyhow::Result<Self> {
640 1416 : Ok(Self {
641 1416 : inner: Some(
642 1416 : DeltaLayerWriterInner::new(
643 1416 : conf,
644 1416 : timeline_id,
645 1416 : tenant_shard_id,
646 1416 : key_start,
647 1416 : lsn_range,
648 1416 : ctx,
649 1416 : )
650 723 : .await?,
651 : ),
652 : })
653 1416 : }
654 :
655 : ///
656 : /// Append a key-value pair to the file.
657 : ///
658 : /// The values must be appended in key, lsn order.
659 : ///
660 2084558 : pub async fn put_value(
661 2084558 : &mut self,
662 2084558 : key: Key,
663 2084558 : lsn: Lsn,
664 2084558 : val: Value,
665 2084558 : ctx: &RequestContext,
666 2084558 : ) -> anyhow::Result<()> {
667 2084558 : self.inner
668 2084558 : .as_mut()
669 2084558 : .unwrap()
670 2084558 : .put_value(key, lsn, val, ctx)
671 1982 : .await
672 2084558 : }
673 :
674 4386660 : pub async fn put_value_bytes<Buf>(
675 4386660 : &mut self,
676 4386660 : key: Key,
677 4386660 : lsn: Lsn,
678 4386660 : val: FullSlice<Buf>,
679 4386660 : will_init: bool,
680 4386660 : ctx: &RequestContext,
681 4386660 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
682 4386660 : where
683 4386660 : Buf: IoBuf + Send,
684 4386660 : {
685 4386660 : self.inner
686 4386660 : .as_mut()
687 4386660 : .unwrap()
688 4386660 : .put_value_bytes(key, lsn, val, will_init, ctx)
689 2949 : .await
690 4386660 : }
691 :
692 2023972 : pub fn size(&self) -> u64 {
693 2023972 : self.inner.as_ref().unwrap().size()
694 2023972 : }
695 :
696 : ///
697 : /// Finish writing the delta layer.
698 : ///
699 1396 : pub(crate) async fn finish(
700 1396 : mut self,
701 1396 : key_end: Key,
702 1396 : ctx: &RequestContext,
703 1396 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
704 9646 : self.inner.take().unwrap().finish(key_end, ctx).await
705 1396 : }
706 :
707 12162 : pub(crate) fn num_keys(&self) -> usize {
708 12162 : self.inner.as_ref().unwrap().num_keys
709 12162 : }
710 :
711 15084 : pub(crate) fn estimated_size(&self) -> u64 {
712 15084 : let inner = self.inner.as_ref().unwrap();
713 15084 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
714 15084 : }
715 : }
716 :
717 : impl Drop for DeltaLayerWriter {
718 1416 : fn drop(&mut self) {
719 1416 : if let Some(inner) = self.inner.take() {
720 20 : // We want to remove the virtual file here, so it's fine to not
721 20 : // having completely flushed unwritten data.
722 20 : let vfile = inner.blob_writer.into_inner_no_flush();
723 20 : vfile.remove();
724 1396 : }
725 1416 : }
726 : }
727 :
728 0 : #[derive(thiserror::Error, Debug)]
729 : pub enum RewriteSummaryError {
730 : #[error("magic mismatch")]
731 : MagicMismatch,
732 : #[error(transparent)]
733 : Other(#[from] anyhow::Error),
734 : }
735 :
736 : impl From<std::io::Error> for RewriteSummaryError {
737 0 : fn from(e: std::io::Error) -> Self {
738 0 : Self::Other(anyhow::anyhow!(e))
739 0 : }
740 : }
741 :
742 : impl DeltaLayer {
743 0 : pub async fn rewrite_summary<F>(
744 0 : path: &Utf8Path,
745 0 : rewrite: F,
746 0 : ctx: &RequestContext,
747 0 : ) -> Result<(), RewriteSummaryError>
748 0 : where
749 0 : F: Fn(Summary) -> Summary,
750 0 : {
751 0 : let mut file = VirtualFile::open_with_options(
752 0 : path,
753 0 : virtual_file::OpenOptions::new().read(true).write(true),
754 0 : ctx,
755 0 : )
756 0 : .await
757 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
758 0 : let file_id = page_cache::next_file_id();
759 0 : let block_reader = FileBlockReader::new(&file, file_id);
760 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
761 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
762 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
763 0 : return Err(RewriteSummaryError::MagicMismatch);
764 0 : }
765 0 :
766 0 : let new_summary = rewrite(actual_summary);
767 0 :
768 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
769 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
770 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
771 0 : file.seek(SeekFrom::Start(0)).await?;
772 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
773 0 : res?;
774 0 : Ok(())
775 0 : }
776 : }
777 :
778 : impl DeltaLayerInner {
779 478 : pub(crate) fn key_range(&self) -> &Range<Key> {
780 478 : &self.layer_key_range
781 478 : }
782 :
783 478 : pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
784 478 : &self.layer_lsn_range
785 478 : }
786 :
787 1046 : pub(super) async fn load(
788 1046 : path: &Utf8Path,
789 1046 : summary: Option<Summary>,
790 1046 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
791 1046 : ctx: &RequestContext,
792 1046 : ) -> anyhow::Result<Self> {
793 1046 : let file = VirtualFile::open_v2(path, ctx)
794 523 : .await
795 1046 : .context("open layer file")?;
796 :
797 1046 : let file_id = page_cache::next_file_id();
798 1046 :
799 1046 : let block_reader = FileBlockReader::new(&file, file_id);
800 :
801 1046 : let summary_blk = block_reader
802 1046 : .read_blk(0, ctx)
803 526 : .await
804 1046 : .context("read first block")?;
805 :
806 : // TODO: this should be an assertion instead; see ImageLayerInner::load
807 1046 : let actual_summary =
808 1046 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
809 :
810 1046 : if let Some(mut expected_summary) = summary {
811 : // production code path
812 1046 : expected_summary.index_start_blk = actual_summary.index_start_blk;
813 1046 : expected_summary.index_root_blk = actual_summary.index_root_blk;
814 1046 : // mask out the timeline_id, but still require the layers to be from the same tenant
815 1046 : expected_summary.timeline_id = actual_summary.timeline_id;
816 1046 :
817 1046 : if actual_summary != expected_summary {
818 0 : bail!(
819 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
820 0 : actual_summary,
821 0 : expected_summary
822 0 : );
823 1046 : }
824 0 : }
825 :
826 1046 : Ok(DeltaLayerInner {
827 1046 : file,
828 1046 : file_id,
829 1046 : index_start_blk: actual_summary.index_start_blk,
830 1046 : index_root_blk: actual_summary.index_root_blk,
831 1046 : max_vectored_read_bytes,
832 1046 : layer_key_range: actual_summary.key_range,
833 1046 : layer_lsn_range: actual_summary.lsn_range,
834 1046 : })
835 1046 : }
836 :
837 : // Look up the keys in the provided keyspace and update
838 : // the reconstruct state with whatever is found.
839 : //
840 : // If the key is cached, go no further than the cached Lsn.
841 : //
842 : // Currently, the index is visited for each range, but this
843 : // can be further optimised to visit the index only once.
844 204092 : pub(super) async fn get_values_reconstruct_data(
845 204092 : &self,
846 204092 : keyspace: KeySpace,
847 204092 : lsn_range: Range<Lsn>,
848 204092 : reconstruct_state: &mut ValuesReconstructState,
849 204092 : ctx: &RequestContext,
850 204092 : ) -> Result<(), GetVectoredError> {
851 204092 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
852 204092 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
853 204092 : self.index_start_blk,
854 204092 : self.index_root_blk,
855 204092 : block_reader,
856 204092 : );
857 204092 :
858 204092 : let planner = VectoredReadPlanner::new(
859 204092 : self.max_vectored_read_bytes
860 204092 : .expect("Layer is loaded with max vectored bytes config")
861 204092 : .0
862 204092 : .into(),
863 204092 : );
864 204092 :
865 204092 : let data_end_offset = self.index_start_offset();
866 :
867 204092 : let reads = Self::plan_reads(
868 204092 : &keyspace,
869 204092 : lsn_range.clone(),
870 204092 : data_end_offset,
871 204092 : index_reader,
872 204092 : planner,
873 204092 : reconstruct_state,
874 204092 : ctx,
875 204092 : )
876 20457 : .await
877 204092 : .map_err(GetVectoredError::Other)?;
878 :
879 204092 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
880 63974 : .await;
881 :
882 204092 : reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
883 204092 :
884 204092 : Ok(())
885 204092 : }
886 :
887 204294 : async fn plan_reads<Reader>(
888 204294 : keyspace: &KeySpace,
889 204294 : lsn_range: Range<Lsn>,
890 204294 : data_end_offset: u64,
891 204294 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
892 204294 : mut planner: VectoredReadPlanner,
893 204294 : reconstruct_state: &mut ValuesReconstructState,
894 204294 : ctx: &RequestContext,
895 204294 : ) -> anyhow::Result<Vec<VectoredRead>>
896 204294 : where
897 204294 : Reader: BlockReader + Clone,
898 204294 : {
899 204294 : let ctx = RequestContextBuilder::extend(ctx)
900 204294 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
901 204294 : .build();
902 :
903 246497 : for range in keyspace.ranges.iter() {
904 246497 : let mut range_end_handled = false;
905 246497 :
906 246497 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
907 246497 : let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
908 246497 : let mut index_stream = std::pin::pin!(index_stream);
909 :
910 591999 : while let Some(index_entry) = index_stream.next().await {
911 583312 : let (raw_key, value) = index_entry?;
912 583312 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
913 583312 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
914 583312 : let blob_ref = BlobRef(value);
915 583312 :
916 583312 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
917 583312 : assert!(key >= range.start);
918 :
919 583312 : let outside_lsn_range = !lsn_range.contains(&lsn);
920 583312 : let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
921 :
922 583312 : let flag = {
923 583312 : if outside_lsn_range || below_cached_lsn {
924 72771 : BlobFlag::Ignore
925 510541 : } else if blob_ref.will_init() {
926 452251 : BlobFlag::ReplaceAll
927 : } else {
928 : // Usual path: add blob to the read
929 58290 : BlobFlag::None
930 : }
931 : };
932 :
933 583312 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
934 237810 : planner.handle_range_end(blob_ref.pos());
935 237810 : range_end_handled = true;
936 237810 : break;
937 345502 : } else {
938 345502 : planner.handle(key, lsn, blob_ref.pos(), flag);
939 345502 : }
940 : }
941 :
942 246497 : if !range_end_handled {
943 8687 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
944 8687 : planner.handle_range_end(data_end_offset);
945 237810 : }
946 : }
947 :
948 204294 : Ok(planner.finish())
949 204294 : }
950 :
951 204292 : fn get_min_read_buffer_size(
952 204292 : planned_reads: &[VectoredRead],
953 204292 : read_size_soft_max: usize,
954 204292 : ) -> usize {
955 204292 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
956 82074 : return read_size_soft_max;
957 : };
958 :
959 122218 : let largest_read_size = largest_read.size();
960 122218 : if largest_read_size > read_size_soft_max {
961 : // If the read is oversized, it should only contain one key.
962 200 : let offenders = largest_read
963 200 : .blobs_at
964 200 : .as_slice()
965 200 : .iter()
966 200 : .filter_map(|(_, blob_meta)| {
967 200 : if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
968 : // The size of values for these keys is unbounded and can
969 : // grow very large in pathological cases.
970 0 : None
971 : } else {
972 200 : Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
973 : }
974 200 : })
975 200 : .join(", ");
976 200 :
977 200 : if !offenders.is_empty() {
978 200 : tracing::warn!(
979 0 : "Oversized vectored read ({} > {}) for keys {}",
980 : largest_read_size,
981 : read_size_soft_max,
982 : offenders
983 : );
984 0 : }
985 122018 : }
986 :
987 122218 : largest_read_size
988 204292 : }
989 :
990 204092 : async fn do_reads_and_update_state(
991 204092 : &self,
992 204092 : reads: Vec<VectoredRead>,
993 204092 : reconstruct_state: &mut ValuesReconstructState,
994 204092 : ctx: &RequestContext,
995 204092 : ) {
996 204092 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
997 204092 : let mut ignore_key_with_err = None;
998 204092 :
999 204092 : let max_vectored_read_bytes = self
1000 204092 : .max_vectored_read_bytes
1001 204092 : .expect("Layer is loaded with max vectored bytes config")
1002 204092 : .0
1003 204092 : .into();
1004 204092 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
1005 204092 : let mut buf = Some(IoBufferMut::with_capacity(buf_size));
1006 :
1007 : // Note that reads are processed in reverse order (from highest key+lsn).
1008 : // This is the order that `ReconstructState` requires such that it can
1009 : // track when a key is done.
1010 204092 : for read in reads.into_iter().rev() {
1011 125714 : let res = vectored_blob_reader
1012 125714 : .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
1013 63974 : .await;
1014 :
1015 125714 : let blobs_buf = match res {
1016 125714 : Ok(blobs_buf) => blobs_buf,
1017 0 : Err(err) => {
1018 0 : let kind = err.kind();
1019 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
1020 0 : reconstruct_state.on_key_error(
1021 0 : blob_meta.key,
1022 0 : PageReconstructError::Other(anyhow!(
1023 0 : "Failed to read blobs from virtual file {}: {}",
1024 0 : self.file.path(),
1025 0 : kind
1026 0 : )),
1027 0 : );
1028 0 : }
1029 :
1030 : // We have "lost" the buffer since the lower level IO api
1031 : // doesn't return the buffer on error. Allocate a new one.
1032 0 : buf = Some(IoBufferMut::with_capacity(buf_size));
1033 0 :
1034 0 : continue;
1035 : }
1036 : };
1037 125714 : let view = BufView::new_slice(&blobs_buf.buf);
1038 150138 : for meta in blobs_buf.blobs.iter().rev() {
1039 150138 : if Some(meta.meta.key) == ignore_key_with_err {
1040 0 : continue;
1041 150138 : }
1042 150138 : let blob_read = meta.read(&view).await;
1043 150138 : let blob_read = match blob_read {
1044 150138 : Ok(buf) => buf,
1045 0 : Err(e) => {
1046 0 : reconstruct_state.on_key_error(
1047 0 : meta.meta.key,
1048 0 : PageReconstructError::Other(anyhow!(e).context(format!(
1049 0 : "Failed to decompress blob from virtual file {}",
1050 0 : self.file.path(),
1051 0 : ))),
1052 0 : );
1053 0 :
1054 0 : ignore_key_with_err = Some(meta.meta.key);
1055 0 : continue;
1056 : }
1057 : };
1058 :
1059 150138 : let value = Value::des(&blob_read);
1060 :
1061 150138 : let value = match value {
1062 150138 : Ok(v) => v,
1063 0 : Err(e) => {
1064 0 : reconstruct_state.on_key_error(
1065 0 : meta.meta.key,
1066 0 : PageReconstructError::Other(anyhow!(e).context(format!(
1067 0 : "Failed to deserialize blob from virtual file {}",
1068 0 : self.file.path(),
1069 0 : ))),
1070 0 : );
1071 0 :
1072 0 : ignore_key_with_err = Some(meta.meta.key);
1073 0 : continue;
1074 : }
1075 : };
1076 :
1077 : // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
1078 : // state, no further updates shall be made to it. The call below will
1079 : // panic if the invariant is violated.
1080 150138 : reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
1081 : }
1082 :
1083 125714 : buf = Some(blobs_buf.buf);
1084 : }
1085 204092 : }
1086 :
1087 406 : pub(super) async fn load_keys<'a>(
1088 406 : &'a self,
1089 406 : ctx: &RequestContext,
1090 406 : ) -> Result<Vec<DeltaEntry<'a>>> {
1091 406 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1092 406 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1093 406 : self.index_start_blk,
1094 406 : self.index_root_blk,
1095 406 : block_reader,
1096 406 : );
1097 406 :
1098 406 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1099 406 :
1100 406 : tree_reader
1101 406 : .visit(
1102 406 : &[0u8; DELTA_KEY_SIZE],
1103 406 : VisitDirection::Forwards,
1104 2064046 : |key, value| {
1105 2064046 : let delta_key = DeltaKey::from_slice(key);
1106 2064046 : let val_ref = ValueRef {
1107 2064046 : blob_ref: BlobRef(value),
1108 2064046 : layer: self,
1109 2064046 : };
1110 2064046 : let pos = BlobRef(value).pos();
1111 2064046 : if let Some(last) = all_keys.last_mut() {
1112 2063640 : // subtract offset of the current and last entries to get the size
1113 2063640 : // of the value associated with this (key, lsn) tuple
1114 2063640 : let first_pos = last.size;
1115 2063640 : last.size = pos - first_pos;
1116 2063640 : }
1117 2064046 : let entry = DeltaEntry {
1118 2064046 : key: delta_key.key(),
1119 2064046 : lsn: delta_key.lsn(),
1120 2064046 : size: pos,
1121 2064046 : val: val_ref,
1122 2064046 : };
1123 2064046 : all_keys.push(entry);
1124 2064046 : true
1125 2064046 : },
1126 406 : &RequestContextBuilder::extend(ctx)
1127 406 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1128 406 : .build(),
1129 406 : )
1130 2124 : .await?;
1131 406 : if let Some(last) = all_keys.last_mut() {
1132 406 : // Last key occupies all space till end of value storage,
1133 406 : // which corresponds to beginning of the index
1134 406 : last.size = self.index_start_offset() - last.size;
1135 406 : }
1136 406 : Ok(all_keys)
1137 406 : }
1138 :
1139 : /// Using the given writer, write out a version which has the earlier Lsns than `until`.
1140 : ///
1141 : /// Return the amount of key value records pushed to the writer.
1142 10 : pub(super) async fn copy_prefix(
1143 10 : &self,
1144 10 : writer: &mut DeltaLayerWriter,
1145 10 : until: Lsn,
1146 10 : ctx: &RequestContext,
1147 10 : ) -> anyhow::Result<usize> {
1148 : use crate::tenant::vectored_blob_io::{
1149 : BlobMeta, ChunkedVectoredReadBuilder, VectoredReadExtended,
1150 : };
1151 : use futures::stream::TryStreamExt;
1152 :
1153 : #[derive(Debug)]
1154 : enum Item {
1155 : Actual(Key, Lsn, BlobRef),
1156 : Sentinel,
1157 : }
1158 :
1159 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1160 70 : fn from(value: Item) -> Self {
1161 70 : match value {
1162 60 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1163 10 : Item::Sentinel => None,
1164 : }
1165 70 : }
1166 : }
1167 :
1168 : impl Item {
1169 70 : fn offset(&self) -> Option<BlobRef> {
1170 70 : match self {
1171 60 : Item::Actual(_, _, blob) => Some(*blob),
1172 10 : Item::Sentinel => None,
1173 : }
1174 70 : }
1175 :
1176 70 : fn is_last(&self) -> bool {
1177 70 : matches!(self, Item::Sentinel)
1178 70 : }
1179 : }
1180 :
1181 10 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1182 10 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1183 10 : self.index_start_blk,
1184 10 : self.index_root_blk,
1185 10 : block_reader,
1186 10 : );
1187 10 :
1188 10 : let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1189 60 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1190 10 : // put in a sentinel value for getting the end offset for last item, and not having to
1191 10 : // repeat the whole read part
1192 10 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1193 10 : Item::Sentinel,
1194 10 : ))));
1195 10 : let mut stream = std::pin::pin!(stream);
1196 10 :
1197 10 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1198 10 :
1199 10 : let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
1200 10 :
1201 10 : let max_read_size = self
1202 10 : .max_vectored_read_bytes
1203 10 : .map(|x| x.0.get())
1204 10 : .unwrap_or(8192);
1205 10 :
1206 10 : let mut buffer = Some(IoBufferMut::with_capacity(max_read_size));
1207 10 :
1208 10 : // FIXME: buffering of DeltaLayerWriter
1209 10 : let mut per_blob_copy = Vec::new();
1210 10 :
1211 10 : let mut records = 0;
1212 :
1213 80 : while let Some(item) = stream.try_next().await? {
1214 70 : tracing::debug!(?item, "popped");
1215 70 : let offset = item
1216 70 : .offset()
1217 70 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1218 :
1219 70 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1220 60 : let end_offset = offset;
1221 60 :
1222 60 : Some((BlobMeta { key, lsn }, start_offset..end_offset))
1223 : } else {
1224 10 : None
1225 : };
1226 :
1227 70 : let is_last = item.is_last();
1228 70 :
1229 70 : prev = Option::from(item);
1230 70 :
1231 70 : let actionable = actionable.filter(|x| x.0.lsn < until);
1232 :
1233 70 : let builder = if let Some((meta, offsets)) = actionable {
1234 : // extend or create a new builder
1235 32 : if read_builder
1236 32 : .as_mut()
1237 32 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1238 32 : .unwrap_or(VectoredReadExtended::No)
1239 32 : == VectoredReadExtended::Yes
1240 : {
1241 16 : None
1242 : } else {
1243 16 : read_builder.replace(ChunkedVectoredReadBuilder::new(
1244 16 : offsets.start.pos(),
1245 16 : offsets.end.pos(),
1246 16 : meta,
1247 16 : max_read_size,
1248 16 : ))
1249 : }
1250 : } else {
1251 : // nothing to do, except perhaps flush any existing for the last element
1252 38 : None
1253 : };
1254 :
1255 : // flush the possible older builder and also the new one if the item was the last one
1256 70 : let builders = builder.into_iter();
1257 70 : let builders = if is_last {
1258 10 : builders.chain(read_builder.take())
1259 : } else {
1260 60 : builders.chain(None)
1261 : };
1262 :
1263 86 : for builder in builders {
1264 16 : let read = builder.build();
1265 16 :
1266 16 : let reader = VectoredBlobReader::new(&self.file);
1267 16 :
1268 16 : let mut buf = buffer.take().unwrap();
1269 16 :
1270 16 : buf.clear();
1271 16 : buf.reserve(read.size());
1272 16 : let res = reader.read_blobs(&read, buf, ctx).await?;
1273 :
1274 16 : let view = BufView::new_slice(&res.buf);
1275 :
1276 48 : for blob in res.blobs {
1277 32 : let key = blob.meta.key;
1278 32 : let lsn = blob.meta.lsn;
1279 :
1280 32 : let data = blob.read(&view).await?;
1281 :
1282 : #[cfg(debug_assertions)]
1283 32 : Value::des(&data)
1284 32 : .with_context(|| {
1285 0 : format!(
1286 0 : "blob failed to deserialize for {}: {:?}",
1287 0 : blob,
1288 0 : utils::Hex(&data)
1289 0 : )
1290 32 : })
1291 32 : .unwrap();
1292 32 :
1293 32 : // is it an image or will_init walrecord?
1294 32 : // FIXME: this could be handled by threading the BlobRef to the
1295 32 : // VectoredReadBuilder
1296 32 : let will_init = crate::repository::ValueBytes::will_init(&data)
1297 32 : .inspect_err(|_e| {
1298 0 : #[cfg(feature = "testing")]
1299 0 : tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
1300 32 : })
1301 32 : .unwrap_or(false);
1302 32 :
1303 32 : per_blob_copy.clear();
1304 32 : per_blob_copy.extend_from_slice(&data);
1305 :
1306 32 : let (tmp, res) = writer
1307 32 : .put_value_bytes(
1308 32 : key,
1309 32 : lsn,
1310 32 : std::mem::take(&mut per_blob_copy).slice_len(),
1311 32 : will_init,
1312 32 : ctx,
1313 32 : )
1314 4 : .await;
1315 32 : per_blob_copy = tmp.into_raw_slice().into_inner();
1316 32 :
1317 32 : res?;
1318 :
1319 32 : records += 1;
1320 : }
1321 :
1322 16 : buffer = Some(res.buf);
1323 : }
1324 : }
1325 :
1326 10 : assert!(
1327 10 : read_builder.is_none(),
1328 0 : "with the sentinel above loop should had handled all"
1329 : );
1330 :
1331 10 : Ok(records)
1332 10 : }
1333 :
1334 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1335 4 : println!(
1336 4 : "index_start_blk: {}, root {}",
1337 4 : self.index_start_blk, self.index_root_blk
1338 4 : );
1339 4 :
1340 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1341 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1342 4 : self.index_start_blk,
1343 4 : self.index_root_blk,
1344 4 : block_reader,
1345 4 : );
1346 4 :
1347 4 : tree_reader.dump().await?;
1348 :
1349 4 : let keys = self.load_keys(ctx).await?;
1350 :
1351 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1352 8 : let buf = val.load_raw(ctx).await?;
1353 8 : let val = Value::des(&buf)?;
1354 8 : let desc = match val {
1355 8 : Value::Image(img) => {
1356 8 : format!(" img {} bytes", img.len())
1357 : }
1358 0 : Value::WalRecord(rec) => {
1359 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
1360 0 : format!(
1361 0 : " rec {} bytes will_init: {} {}",
1362 0 : buf.len(),
1363 0 : rec.will_init(),
1364 0 : wal_desc
1365 0 : )
1366 : }
1367 : };
1368 8 : Ok(desc)
1369 8 : }
1370 :
1371 12 : for entry in keys {
1372 8 : let DeltaEntry { key, lsn, val, .. } = entry;
1373 8 : let desc = match dump_blob(&val, ctx).await {
1374 8 : Ok(desc) => desc,
1375 0 : Err(err) => {
1376 0 : format!("ERROR: {err}")
1377 : }
1378 : };
1379 8 : println!(" key {key} at {lsn}: {desc}");
1380 :
1381 : // Print more details about CHECKPOINT records. Would be nice to print details
1382 : // of many other record types too, but these are particularly interesting, as
1383 : // have a lot of special processing for them in walingest.rs.
1384 4 : use pageserver_api::key::CHECKPOINT_KEY;
1385 4 : use postgres_ffi::CheckPoint;
1386 8 : if key == CHECKPOINT_KEY {
1387 0 : let val = val.load(ctx).await?;
1388 0 : match val {
1389 0 : Value::Image(img) => {
1390 0 : let checkpoint = CheckPoint::decode(&img)?;
1391 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1392 : }
1393 0 : Value::WalRecord(_rec) => {
1394 0 : println!(" unexpected walrecord value for checkpoint key");
1395 0 : }
1396 : }
1397 8 : }
1398 : }
1399 :
1400 4 : Ok(())
1401 4 : }
1402 :
1403 30 : fn stream_index_forwards<'a, R>(
1404 30 : &'a self,
1405 30 : reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
1406 30 : start: &'a [u8; DELTA_KEY_SIZE],
1407 30 : ctx: &'a RequestContext,
1408 30 : ) -> impl futures::stream::Stream<
1409 30 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1410 30 : > + 'a
1411 30 : where
1412 30 : R: BlockReader + 'a,
1413 30 : {
1414 : use futures::stream::TryStreamExt;
1415 30 : let stream = reader.into_stream(start, ctx);
1416 152 : stream.map_ok(|(key, value)| {
1417 152 : let key = DeltaKey::from_slice(&key);
1418 152 : let (key, lsn) = (key.key(), key.lsn());
1419 152 : let offset = BlobRef(value);
1420 152 :
1421 152 : (key, lsn, offset)
1422 152 : })
1423 30 : }
1424 :
1425 : /// The file offset to the first block of index.
1426 : ///
1427 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1428 205074 : fn index_start_offset(&self) -> u64 {
1429 205074 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1430 205074 : let bref = BlobRef(offset);
1431 205074 : tracing::debug!(
1432 : index_start_blk = self.index_start_blk,
1433 : offset,
1434 0 : pos = bref.pos(),
1435 0 : "index_start_offset"
1436 : );
1437 205074 : offset
1438 205074 : }
1439 :
1440 534 : pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
1441 534 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1442 534 : let tree_reader =
1443 534 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
1444 534 : DeltaLayerIterator {
1445 534 : delta_layer: self,
1446 534 : ctx,
1447 534 : index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
1448 534 : key_values_batch: std::collections::VecDeque::new(),
1449 534 : is_end: false,
1450 534 : planner: StreamingVectoredReadPlanner::new(
1451 534 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1452 534 : 1024, // The default value. Unit tests might use a different value
1453 534 : ),
1454 534 : }
1455 534 : }
1456 : }
1457 :
1458 : /// A set of data associated with a delta layer key and its value
1459 : pub struct DeltaEntry<'a> {
1460 : pub key: Key,
1461 : pub lsn: Lsn,
1462 : /// Size of the stored value
1463 : pub size: u64,
1464 : /// Reference to the on-disk value
1465 : pub val: ValueRef<'a>,
1466 : }
1467 :
1468 : /// Reference to an on-disk value
1469 : pub struct ValueRef<'a> {
1470 : blob_ref: BlobRef,
1471 : layer: &'a DeltaLayerInner,
1472 : }
1473 :
1474 : impl<'a> ValueRef<'a> {
1475 : /// Loads the value from disk
1476 0 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1477 0 : let buf = self.load_raw(ctx).await?;
1478 0 : let val = Value::des(&buf)?;
1479 0 : Ok(val)
1480 0 : }
1481 :
1482 8 : async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
1483 8 : let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
1484 8 : self.layer,
1485 8 : )));
1486 8 : let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
1487 8 : Ok(buf)
1488 8 : }
1489 : }
1490 :
1491 : pub(crate) struct Adapter<T>(T);
1492 :
1493 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1494 8 : pub(crate) async fn read_blk(
1495 8 : &self,
1496 8 : blknum: u32,
1497 8 : ctx: &RequestContext,
1498 8 : ) -> Result<BlockLease, std::io::Error> {
1499 8 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1500 8 : block_reader.read_blk(blknum, ctx).await
1501 8 : }
1502 : }
1503 :
1504 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1505 16 : fn as_ref(&self) -> &DeltaLayerInner {
1506 16 : self
1507 16 : }
1508 : }
1509 :
1510 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1511 0 : fn key(&self) -> Key {
1512 0 : self.key
1513 0 : }
1514 0 : fn lsn(&self) -> Lsn {
1515 0 : self.lsn
1516 0 : }
1517 0 : fn size(&self) -> u64 {
1518 0 : self.size
1519 0 : }
1520 : }
1521 :
1522 : pub struct DeltaLayerIterator<'a> {
1523 : delta_layer: &'a DeltaLayerInner,
1524 : ctx: &'a RequestContext,
1525 : planner: StreamingVectoredReadPlanner,
1526 : index_iter: DiskBtreeIterator<'a>,
1527 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1528 : is_end: bool,
1529 : }
1530 :
1531 : impl<'a> DeltaLayerIterator<'a> {
1532 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1533 0 : self.delta_layer.layer_dbg_info()
1534 0 : }
1535 :
1536 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1537 21364 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1538 21364 : assert!(self.key_values_batch.is_empty());
1539 21364 : assert!(!self.is_end);
1540 :
1541 21364 : let plan = loop {
1542 2099464 : if let Some(res) = self.index_iter.next().await {
1543 2098958 : let (raw_key, value) = res?;
1544 2098958 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
1545 2098958 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
1546 2098958 : let blob_ref = BlobRef(value);
1547 2098958 : let offset = blob_ref.pos();
1548 2098958 : if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
1549 20858 : break batch_plan;
1550 2078100 : }
1551 : } else {
1552 506 : self.is_end = true;
1553 506 : let data_end_offset = self.delta_layer.index_start_offset();
1554 506 : if let Some(item) = self.planner.handle_range_end(data_end_offset) {
1555 506 : break item;
1556 : } else {
1557 0 : return Ok(()); // TODO: test empty iterator
1558 : }
1559 : }
1560 : };
1561 21364 : let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
1562 21364 : let mut next_batch = std::collections::VecDeque::new();
1563 21364 : let buf_size = plan.size();
1564 21364 : let buf = IoBufferMut::with_capacity(buf_size);
1565 21364 : let blobs_buf = vectored_blob_reader
1566 21364 : .read_blobs(&plan, buf, self.ctx)
1567 10909 : .await?;
1568 21364 : let view = BufView::new_slice(&blobs_buf.buf);
1569 2098930 : for meta in blobs_buf.blobs.iter() {
1570 2098930 : let blob_read = meta.read(&view).await?;
1571 2098930 : let value = Value::des(&blob_read)?;
1572 :
1573 2098930 : next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
1574 : }
1575 21364 : self.key_values_batch = next_batch;
1576 21364 : Ok(())
1577 21364 : }
1578 :
1579 2099636 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1580 2099636 : if self.key_values_batch.is_empty() {
1581 22260 : if self.is_end {
1582 980 : return Ok(None);
1583 21280 : }
1584 21280 : self.next_batch().await?;
1585 2077376 : }
1586 2098656 : Ok(Some(
1587 2098656 : self.key_values_batch
1588 2098656 : .pop_front()
1589 2098656 : .expect("should not be empty"),
1590 2098656 : ))
1591 2099636 : }
1592 : }
1593 :
1594 : #[cfg(test)]
1595 : pub(crate) mod test {
1596 : use std::collections::BTreeMap;
1597 :
1598 : use itertools::MinMaxResult;
1599 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1600 : use rand::RngCore;
1601 :
1602 : use super::*;
1603 : use crate::repository::Value;
1604 : use crate::tenant::harness::TIMELINE_ID;
1605 : use crate::tenant::storage_layer::{Layer, ResidentLayer};
1606 : use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
1607 : use crate::tenant::{Tenant, Timeline};
1608 : use crate::{
1609 : context::DownloadBehavior,
1610 : task_mgr::TaskKind,
1611 : tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
1612 : DEFAULT_PG_VERSION,
1613 : };
1614 : use bytes::Bytes;
1615 :
1616 : /// Construct an index for a fictional delta layer and and then
1617 : /// traverse in order to plan vectored reads for a query. Finally,
1618 : /// verify that the traversal fed the right index key and value
1619 : /// pairs into the planner.
1620 : #[tokio::test]
1621 2 : async fn test_delta_layer_index_traversal() {
1622 2 : let base_key = Key {
1623 2 : field1: 0,
1624 2 : field2: 1663,
1625 2 : field3: 12972,
1626 2 : field4: 16396,
1627 2 : field5: 0,
1628 2 : field6: 246080,
1629 2 : };
1630 2 :
1631 2 : // Populate the index with some entries
1632 2 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1633 2 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1634 2 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1635 2 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1636 2 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1637 2 : ]);
1638 2 :
1639 2 : let mut disk = TestDisk::default();
1640 2 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1641 2 :
1642 2 : let mut disk_offset = 0;
1643 10 : for (key, lsns) in &entries {
1644 42 : for lsn in lsns {
1645 34 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1646 34 : let blob_ref = BlobRef::new(disk_offset, false);
1647 34 : writer
1648 34 : .append(&index_key.0, blob_ref.0)
1649 34 : .expect("In memory disk append should never fail");
1650 34 :
1651 34 : disk_offset += 1;
1652 34 : }
1653 2 : }
1654 2 :
1655 2 : // Prepare all the arguments for the call into `plan_reads` below
1656 2 : let (root_offset, _writer) = writer
1657 2 : .finish()
1658 2 : .expect("In memory disk finish should never fail");
1659 2 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1660 2 : let planner = VectoredReadPlanner::new(100);
1661 2 : let mut reconstruct_state = ValuesReconstructState::new();
1662 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1663 2 :
1664 2 : let keyspace = KeySpace {
1665 2 : ranges: vec![
1666 2 : base_key..base_key.add(3),
1667 2 : base_key.add(3)..base_key.add(100),
1668 2 : ],
1669 2 : };
1670 2 : let lsn_range = Lsn(2)..Lsn(40);
1671 2 :
1672 2 : // Plan and validate
1673 2 : let vectored_reads = DeltaLayerInner::plan_reads(
1674 2 : &keyspace,
1675 2 : lsn_range.clone(),
1676 2 : disk_offset,
1677 2 : reader,
1678 2 : planner,
1679 2 : &mut reconstruct_state,
1680 2 : &ctx,
1681 2 : )
1682 2 : .await
1683 2 : .expect("Read planning should not fail");
1684 2 :
1685 2 : validate(keyspace, lsn_range, vectored_reads, entries);
1686 2 : }
1687 :
1688 2 : fn validate(
1689 2 : keyspace: KeySpace,
1690 2 : lsn_range: Range<Lsn>,
1691 2 : vectored_reads: Vec<VectoredRead>,
1692 2 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1693 2 : ) {
1694 : #[derive(Debug, PartialEq, Eq)]
1695 : struct BlobSpec {
1696 : key: Key,
1697 : lsn: Lsn,
1698 : at: u64,
1699 : }
1700 :
1701 2 : let mut planned_blobs = Vec::new();
1702 30 : for read in vectored_reads {
1703 28 : for (at, meta) in read.blobs_at.as_slice() {
1704 28 : planned_blobs.push(BlobSpec {
1705 28 : key: meta.key,
1706 28 : lsn: meta.lsn,
1707 28 : at: *at,
1708 28 : });
1709 28 : }
1710 : }
1711 :
1712 2 : let mut expected_blobs = Vec::new();
1713 2 : let mut disk_offset = 0;
1714 10 : for (key, lsns) in index_entries {
1715 42 : for lsn in lsns {
1716 42 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1717 34 : let lsn_included = lsn_range.contains(&lsn);
1718 34 :
1719 34 : if key_included && lsn_included {
1720 28 : expected_blobs.push(BlobSpec {
1721 28 : key,
1722 28 : lsn,
1723 28 : at: disk_offset,
1724 28 : });
1725 28 : }
1726 :
1727 34 : disk_offset += 1;
1728 : }
1729 : }
1730 :
1731 2 : assert_eq!(planned_blobs, expected_blobs);
1732 2 : }
1733 :
1734 : mod constants {
1735 : use utils::lsn::Lsn;
1736 :
1737 : /// Offset used by all lsns in this test
1738 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1739 : /// Number of unique keys including in the test data
1740 : pub(super) const KEY_COUNT: u8 = 60;
1741 : /// Max number of different lsns for each key
1742 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1743 : /// Possible value sizes for each key along with a probability weight
1744 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1745 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1746 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1747 : /// The minimum size of a key range in all the generated reads
1748 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1749 : /// The number of ranges included in each vectored read
1750 : pub(super) const RANGES_COUNT: u8 = 2;
1751 : /// The number of vectored reads performed
1752 : pub(super) const READS_COUNT: u8 = 100;
1753 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1754 : /// with values larger than the limit
1755 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1756 : }
1757 :
1758 : struct Entry {
1759 : key: Key,
1760 : lsn: Lsn,
1761 : value: Vec<u8>,
1762 : }
1763 :
1764 2 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1765 2 : let mut current_key = Key::MIN;
1766 2 :
1767 2 : let mut entries = Vec::new();
1768 122 : for _ in 0..constants::KEY_COUNT {
1769 120 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1770 120 : let mut lsns_iter =
1771 2260 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1772 2260 : Some(Lsn(lsn.0 + 0x08))
1773 2260 : });
1774 120 : let mut lsns = Vec::new();
1775 2380 : while lsns.len() < count as usize {
1776 2260 : let take = rng.gen_bool(0.5);
1777 2260 : let lsn = lsns_iter.next().unwrap();
1778 2260 : if take {
1779 1112 : lsns.push(lsn);
1780 1148 : }
1781 : }
1782 :
1783 1232 : for lsn in lsns {
1784 1112 : let size = constants::VALUE_SIZES
1785 3336 : .choose_weighted(rng, |item| item.1)
1786 1112 : .unwrap()
1787 1112 : .0;
1788 1112 : let mut buf = vec![0; size];
1789 1112 : rng.fill_bytes(&mut buf);
1790 1112 :
1791 1112 : entries.push(Entry {
1792 1112 : key: current_key,
1793 1112 : lsn,
1794 1112 : value: buf,
1795 1112 : })
1796 : }
1797 :
1798 120 : let gap = constants::KEY_GAP_CHANGES
1799 240 : .choose_weighted(rng, |item| item.1)
1800 120 : .unwrap()
1801 120 : .0;
1802 120 : if gap {
1803 38 : current_key = current_key.add(2);
1804 82 : } else {
1805 82 : current_key = current_key.add(1);
1806 82 : }
1807 : }
1808 :
1809 2 : entries
1810 2 : }
1811 :
1812 : struct EntriesMeta {
1813 : key_range: Range<Key>,
1814 : lsn_range: Range<Lsn>,
1815 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1816 : }
1817 :
1818 2 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1819 1112 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1820 2 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1821 0 : _ => panic!("More than one entry is always expected"),
1822 : };
1823 :
1824 1112 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1825 2 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1826 0 : _ => panic!("More than one entry is always expected"),
1827 : };
1828 :
1829 2 : let mut index = BTreeMap::new();
1830 1112 : for entry in entries.iter() {
1831 1112 : index.insert((entry.key, entry.lsn), entry.value.clone());
1832 1112 : }
1833 :
1834 2 : EntriesMeta {
1835 2 : key_range,
1836 2 : lsn_range,
1837 2 : index,
1838 2 : }
1839 2 : }
1840 :
1841 200 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1842 200 : let start = key_range.start.to_i128();
1843 200 : let end = key_range.end.to_i128();
1844 200 :
1845 200 : let mut keyspace = KeySpace::default();
1846 :
1847 600 : for _ in 0..constants::RANGES_COUNT {
1848 400 : let mut range: Option<Range<Key>> = Option::default();
1849 1244 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1850 844 : let range_start = rng.gen_range(start..end);
1851 844 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1852 844 : if range_end_offset >= end {
1853 100 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1854 744 : } else {
1855 744 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1856 744 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1857 744 : }
1858 : }
1859 400 : keyspace.ranges.push(range.unwrap());
1860 : }
1861 :
1862 200 : keyspace
1863 200 : }
1864 :
1865 : #[tokio::test]
1866 2 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1867 2 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
1868 8 : let (tenant, ctx) = harness.load().await;
1869 2 :
1870 2 : let timeline_id = TimelineId::generate();
1871 2 : let timeline = tenant
1872 2 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1873 4 : .await?;
1874 2 :
1875 2 : tracing::info!("Generating test data ...");
1876 2 :
1877 2 : let rng = &mut StdRng::seed_from_u64(0);
1878 2 : let entries = generate_entries(rng);
1879 2 : let entries_meta = get_entries_meta(&entries);
1880 2 :
1881 2 : tracing::info!("Done generating {} entries", entries.len());
1882 2 :
1883 2 : tracing::info!("Writing test data to delta layer ...");
1884 2 : let mut writer = DeltaLayerWriter::new(
1885 2 : harness.conf,
1886 2 : timeline_id,
1887 2 : harness.tenant_shard_id,
1888 2 : entries_meta.key_range.start,
1889 2 : entries_meta.lsn_range.clone(),
1890 2 : &ctx,
1891 2 : )
1892 2 : .await?;
1893 2 :
1894 1114 : for entry in entries {
1895 1112 : let (_, res) = writer
1896 1112 : .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
1897 215 : .await;
1898 1112 : res?;
1899 2 : }
1900 2 :
1901 5 : let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
1902 2 : let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
1903 2 :
1904 2 : let inner = resident.get_as_delta(&ctx).await?;
1905 2 :
1906 2 : let file_size = inner.file.metadata().await?.len();
1907 2 : tracing::info!(
1908 2 : "Done writing test data to delta layer. Resulting file size is: {}",
1909 2 : file_size
1910 2 : );
1911 2 :
1912 202 : for i in 0..constants::READS_COUNT {
1913 200 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1914 2 :
1915 200 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1916 200 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1917 200 : inner.index_start_blk,
1918 200 : inner.index_root_blk,
1919 200 : block_reader,
1920 200 : );
1921 200 :
1922 200 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1923 200 : let mut reconstruct_state = ValuesReconstructState::new();
1924 200 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1925 200 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1926 2 :
1927 200 : let vectored_reads = DeltaLayerInner::plan_reads(
1928 200 : &keyspace,
1929 200 : entries_meta.lsn_range.clone(),
1930 200 : data_end_offset,
1931 200 : index_reader,
1932 200 : planner,
1933 200 : &mut reconstruct_state,
1934 200 : &ctx,
1935 200 : )
1936 4 : .await?;
1937 2 :
1938 200 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1939 200 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1940 200 : &vectored_reads,
1941 200 : constants::MAX_VECTORED_READ_BYTES,
1942 200 : );
1943 200 : let mut buf = Some(IoBufferMut::with_capacity(buf_size));
1944 2 :
1945 19924 : for read in vectored_reads {
1946 19724 : let blobs_buf = vectored_blob_reader
1947 19724 : .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
1948 10016 : .await?;
1949 19724 : let view = BufView::new_slice(&blobs_buf.buf);
1950 57304 : for meta in blobs_buf.blobs.iter() {
1951 57304 : let value = meta.read(&view).await?;
1952 57304 : assert_eq!(
1953 57304 : &value[..],
1954 57304 : &entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
1955 57304 : );
1956 2 : }
1957 2 :
1958 19724 : buf = Some(blobs_buf.buf);
1959 2 : }
1960 2 : }
1961 2 :
1962 2 : Ok(())
1963 2 : }
1964 :
1965 : #[tokio::test]
1966 2 : async fn copy_delta_prefix_smoke() {
1967 2 : use crate::walrecord::NeonWalRecord;
1968 2 : use bytes::Bytes;
1969 2 :
1970 2 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
1971 2 : .await
1972 2 : .unwrap();
1973 8 : let (tenant, ctx) = h.load().await;
1974 2 : let ctx = &ctx;
1975 2 : let timeline = tenant
1976 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
1977 4 : .await
1978 2 : .unwrap();
1979 2 :
1980 2 : let initdb_layer = timeline
1981 2 : .layers
1982 2 : .read()
1983 2 : .await
1984 2 : .likely_resident_layers()
1985 2 : .next()
1986 2 : .cloned()
1987 2 : .unwrap();
1988 2 :
1989 2 : {
1990 2 : let mut writer = timeline.writer().await;
1991 2 :
1992 2 : let data = [
1993 2 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
1994 2 : (
1995 2 : 0x30,
1996 2 : 12,
1997 2 : Value::WalRecord(NeonWalRecord::Postgres {
1998 2 : will_init: false,
1999 2 : rec: Bytes::from_static(b"1"),
2000 2 : }),
2001 2 : ),
2002 2 : (
2003 2 : 0x40,
2004 2 : 12,
2005 2 : Value::WalRecord(NeonWalRecord::Postgres {
2006 2 : will_init: true,
2007 2 : rec: Bytes::from_static(b"2"),
2008 2 : }),
2009 2 : ),
2010 2 : // build an oversized value so we cannot extend and existing read over
2011 2 : // this
2012 2 : (
2013 2 : 0x50,
2014 2 : 12,
2015 2 : Value::WalRecord(NeonWalRecord::Postgres {
2016 2 : will_init: true,
2017 2 : rec: {
2018 2 : let mut buf =
2019 2 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
2020 2 : buf.iter_mut()
2021 2 : .enumerate()
2022 268288 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
2023 2 : Bytes::from(buf)
2024 2 : },
2025 2 : }),
2026 2 : ),
2027 2 : // because the oversized read cannot be extended further, we are sure to exercise the
2028 2 : // builder created on the last round with this:
2029 2 : (
2030 2 : 0x60,
2031 2 : 12,
2032 2 : Value::WalRecord(NeonWalRecord::Postgres {
2033 2 : will_init: true,
2034 2 : rec: Bytes::from_static(b"3"),
2035 2 : }),
2036 2 : ),
2037 2 : (
2038 2 : 0x60,
2039 2 : 9,
2040 2 : Value::Image(Bytes::from_static(b"something for a different key")),
2041 2 : ),
2042 2 : ];
2043 2 :
2044 2 : let mut last_lsn = None;
2045 2 :
2046 14 : for (lsn, key, value) in data {
2047 12 : let key = Key::from_i128(key);
2048 12 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
2049 12 : last_lsn = Some(lsn);
2050 2 : }
2051 2 :
2052 2 : writer.finish_write(Lsn(last_lsn.unwrap()));
2053 2 : }
2054 2 : timeline.freeze_and_flush().await.unwrap();
2055 2 :
2056 2 : let new_layer = timeline
2057 2 : .layers
2058 2 : .read()
2059 2 : .await
2060 2 : .likely_resident_layers()
2061 3 : .find(|&x| x != &initdb_layer)
2062 2 : .cloned()
2063 2 : .unwrap();
2064 2 :
2065 2 : // create a copy for the timeline, so we don't overwrite the file
2066 2 : let branch = tenant
2067 2 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
2068 2 : .await
2069 2 : .unwrap();
2070 2 :
2071 2 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
2072 2 :
2073 2 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
2074 2 : // a single key
2075 2 :
2076 12 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
2077 10 : let truncate_at = Lsn(truncate_at);
2078 2 :
2079 10 : let mut writer = DeltaLayerWriter::new(
2080 10 : tenant.conf,
2081 10 : branch.timeline_id,
2082 10 : tenant.tenant_shard_id,
2083 10 : Key::MIN,
2084 10 : Lsn(0x11)..truncate_at,
2085 10 : ctx,
2086 10 : )
2087 5 : .await
2088 10 : .unwrap();
2089 2 :
2090 10 : let new_layer = new_layer.download_and_keep_resident().await.unwrap();
2091 10 :
2092 10 : new_layer
2093 10 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
2094 15 : .await
2095 10 : .unwrap();
2096 2 :
2097 24 : let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
2098 10 : let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
2099 10 :
2100 10 : copied_layer.get_as_delta(ctx).await.unwrap();
2101 10 :
2102 10 : assert_keys_and_values_eq(
2103 10 : new_layer.get_as_delta(ctx).await.unwrap(),
2104 10 : copied_layer.get_as_delta(ctx).await.unwrap(),
2105 10 : truncate_at,
2106 10 : ctx,
2107 2 : )
2108 44 : .await;
2109 2 : }
2110 2 : }
2111 :
2112 10 : async fn assert_keys_and_values_eq(
2113 10 : source: &DeltaLayerInner,
2114 10 : truncated: &DeltaLayerInner,
2115 10 : truncated_at: Lsn,
2116 10 : ctx: &RequestContext,
2117 10 : ) {
2118 : use futures::future::ready;
2119 : use futures::stream::TryStreamExt;
2120 :
2121 10 : let start_key = [0u8; DELTA_KEY_SIZE];
2122 10 :
2123 10 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
2124 10 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2125 10 : source.index_start_blk,
2126 10 : source.index_root_blk,
2127 10 : &source_reader,
2128 10 : );
2129 10 : let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
2130 60 : let source_stream = source_stream.filter(|res| match res {
2131 60 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
2132 0 : _ => ready(true),
2133 60 : });
2134 10 : let mut source_stream = std::pin::pin!(source_stream);
2135 10 :
2136 10 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
2137 10 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2138 10 : truncated.index_start_blk,
2139 10 : truncated.index_root_blk,
2140 10 : &truncated_reader,
2141 10 : );
2142 10 : let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
2143 10 : let mut truncated_stream = std::pin::pin!(truncated_stream);
2144 10 :
2145 10 : let mut scratch_left = Vec::new();
2146 10 : let mut scratch_right = Vec::new();
2147 :
2148 : loop {
2149 42 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
2150 42 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2151 42 :
2152 42 : if src.is_none() {
2153 10 : assert!(truncated.is_none());
2154 10 : break;
2155 32 : }
2156 32 :
2157 32 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2158 32 :
2159 32 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2160 32 : assert_eq!(src.0, truncated.0);
2161 32 : assert_eq!(src.1, truncated.1);
2162 :
2163 : // if this is needed for something else, just drop this assert.
2164 32 : assert!(
2165 32 : src.2.pos() >= truncated.2.pos(),
2166 0 : "value position should not go backwards {} vs. {}",
2167 0 : src.2.pos(),
2168 0 : truncated.2.pos()
2169 : );
2170 :
2171 32 : scratch_left.clear();
2172 32 : let src_cursor = source_reader.block_cursor();
2173 32 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2174 32 : scratch_right.clear();
2175 32 : let trunc_cursor = truncated_reader.block_cursor();
2176 32 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2177 32 :
2178 32 : tokio::try_join!(left, right).unwrap();
2179 32 :
2180 32 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2181 : }
2182 10 : }
2183 :
2184 18224 : pub(crate) fn sort_delta(
2185 18224 : (k1, l1, _): &(Key, Lsn, Value),
2186 18224 : (k2, l2, _): &(Key, Lsn, Value),
2187 18224 : ) -> std::cmp::Ordering {
2188 18224 : (k1, l1).cmp(&(k2, l2))
2189 18224 : }
2190 :
2191 94 : pub(crate) fn sort_delta_value(
2192 94 : (k1, l1, v1): &(Key, Lsn, Value),
2193 94 : (k2, l2, v2): &(Key, Lsn, Value),
2194 94 : ) -> std::cmp::Ordering {
2195 94 : let order_1 = if v1.is_image() { 0 } else { 1 };
2196 94 : let order_2 = if v2.is_image() { 0 } else { 1 };
2197 94 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
2198 94 : }
2199 :
2200 22 : pub(crate) async fn produce_delta_layer(
2201 22 : tenant: &Tenant,
2202 22 : tline: &Arc<Timeline>,
2203 22 : mut deltas: Vec<(Key, Lsn, Value)>,
2204 22 : ctx: &RequestContext,
2205 22 : ) -> anyhow::Result<ResidentLayer> {
2206 22 : deltas.sort_by(sort_delta);
2207 22 : let (key_start, _, _) = deltas.first().unwrap();
2208 22 : let (key_max, _, _) = deltas.last().unwrap();
2209 8240 : let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
2210 8240 : let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
2211 22 : let lsn_end = Lsn(lsn_max.0 + 1);
2212 22 : let mut writer = DeltaLayerWriter::new(
2213 22 : tenant.conf,
2214 22 : tline.timeline_id,
2215 22 : tenant.tenant_shard_id,
2216 22 : *key_start,
2217 22 : (*lsn_min)..lsn_end,
2218 22 : ctx,
2219 22 : )
2220 11 : .await?;
2221 22 : let key_end = key_max.next();
2222 :
2223 8262 : for (key, lsn, value) in deltas {
2224 8240 : writer.put_value(key, lsn, value, ctx).await?;
2225 : }
2226 :
2227 63 : let (desc, path) = writer.finish(key_end, ctx).await?;
2228 22 : let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
2229 :
2230 22 : Ok::<_, anyhow::Error>(delta_layer)
2231 22 : }
2232 :
2233 28 : async fn assert_delta_iter_equal(
2234 28 : delta_iter: &mut DeltaLayerIterator<'_>,
2235 28 : expect: &[(Key, Lsn, Value)],
2236 28 : ) {
2237 28 : let mut expect_iter = expect.iter();
2238 : loop {
2239 28028 : let o1 = delta_iter.next().await.unwrap();
2240 28028 : let o2 = expect_iter.next();
2241 28028 : assert_eq!(o1.is_some(), o2.is_some());
2242 28028 : if o1.is_none() && o2.is_none() {
2243 28 : break;
2244 28000 : }
2245 28000 : let (k1, l1, v1) = o1.unwrap();
2246 28000 : let (k2, l2, v2) = o2.unwrap();
2247 28000 : assert_eq!(&k1, k2);
2248 28000 : assert_eq!(l1, *l2);
2249 28000 : assert_eq!(&v1, v2);
2250 : }
2251 28 : }
2252 :
2253 : #[tokio::test]
2254 2 : async fn delta_layer_iterator() {
2255 2 : let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
2256 8 : let (tenant, ctx) = harness.load().await;
2257 2 :
2258 2 : let tline = tenant
2259 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2260 4 : .await
2261 2 : .unwrap();
2262 2 :
2263 2000 : fn get_key(id: u32) -> Key {
2264 2000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
2265 2000 : key.field6 = id;
2266 2000 : key
2267 2000 : }
2268 2 : const N: usize = 1000;
2269 2 : let test_deltas = (0..N)
2270 2000 : .map(|idx| {
2271 2000 : (
2272 2000 : get_key(idx as u32 / 10),
2273 2000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
2274 2000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
2275 2000 : )
2276 2000 : })
2277 2 : .collect_vec();
2278 2 : let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
2279 8 : .await
2280 2 : .unwrap();
2281 2 : let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
2282 6 : for max_read_size in [1, 1024] {
2283 32 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
2284 28 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
2285 28 : // Test if the batch size is correctly determined
2286 28 : let mut iter = delta_layer.iter(&ctx);
2287 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2288 28 : let mut num_items = 0;
2289 112 : for _ in 0..3 {
2290 84 : iter.next_batch().await.unwrap();
2291 84 : num_items += iter.key_values_batch.len();
2292 84 : if max_read_size == 1 {
2293 2 : // every key should be a batch b/c the value is larger than max_read_size
2294 42 : assert_eq!(iter.key_values_batch.len(), 1);
2295 2 : } else {
2296 42 : assert!(iter.key_values_batch.len() <= batch_size);
2297 2 : }
2298 84 : if num_items >= N {
2299 2 : break;
2300 84 : }
2301 84 : iter.key_values_batch.clear();
2302 2 : }
2303 2 : // Test if the result is correct
2304 28 : let mut iter = delta_layer.iter(&ctx);
2305 28 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2306 9649 : assert_delta_iter_equal(&mut iter, &test_deltas).await;
2307 2 : }
2308 2 : }
2309 2 : }
2310 : }
|