Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use std::collections::{HashMap, VecDeque};
31 : use std::fs::File;
32 : use std::io::SeekFrom;
33 : use std::ops::Range;
34 : use std::os::unix::fs::FileExt;
35 : use std::str::FromStr;
36 : use std::sync::Arc;
37 :
38 : use anyhow::{Context, Result, bail, ensure};
39 : use camino::{Utf8Path, Utf8PathBuf};
40 : use futures::StreamExt;
41 : use itertools::Itertools;
42 : use pageserver_api::config::MaxVectoredReadBytes;
43 : use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
44 : use pageserver_api::keyspace::KeySpace;
45 : use pageserver_api::models::ImageCompressionAlgorithm;
46 : use pageserver_api::shard::TenantShardId;
47 : use pageserver_api::value::Value;
48 : use rand::Rng;
49 : use rand::distributions::Alphanumeric;
50 : use serde::{Deserialize, Serialize};
51 : use tokio::sync::OnceCell;
52 : use tokio_epoll_uring::IoBuf;
53 : use tracing::*;
54 : use utils::bin_ser::BeSer;
55 : use utils::id::{TenantId, TimelineId};
56 : use utils::lsn::Lsn;
57 :
58 : use super::{
59 : AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
60 : ValuesReconstructState,
61 : };
62 : use crate::config::PageServerConf;
63 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
64 : use crate::page_cache::{self, FileId, PAGE_SZ};
65 : use crate::tenant::blob_io::BlobWriter;
66 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
67 : use crate::tenant::disk_btree::{
68 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
69 : };
70 : use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
71 : use crate::tenant::timeline::GetVectoredError;
72 : use crate::tenant::vectored_blob_io::{
73 : BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
74 : VectoredReadPlanner,
75 : };
76 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
77 : use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
78 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
79 :
80 : ///
81 : /// Header stored in the beginning of the file
82 : ///
83 : /// After this comes the 'values' part, starting on block 1. After that,
84 : /// the 'index' starts at the block indicated by 'index_start_blk'
85 : ///
86 0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
87 : pub struct Summary {
88 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
89 : pub magic: u16,
90 : pub format_version: u16,
91 :
92 : pub tenant_id: TenantId,
93 : pub timeline_id: TimelineId,
94 : pub key_range: Range<Key>,
95 : pub lsn_range: Range<Lsn>,
96 :
97 : /// Block number where the 'index' part of the file begins.
98 : pub index_start_blk: u32,
99 : /// Block within the 'index', where the B-tree root page is stored
100 : pub index_root_blk: u32,
101 : }
102 :
103 : impl From<&DeltaLayer> for Summary {
104 0 : fn from(layer: &DeltaLayer) -> Self {
105 0 : Self::expected(
106 0 : layer.desc.tenant_shard_id.tenant_id,
107 0 : layer.desc.timeline_id,
108 0 : layer.desc.key_range.clone(),
109 0 : layer.desc.lsn_range.clone(),
110 0 : )
111 0 : }
112 : }
113 :
114 : impl Summary {
115 2188 : pub(super) fn expected(
116 2188 : tenant_id: TenantId,
117 2188 : timeline_id: TimelineId,
118 2188 : keys: Range<Key>,
119 2188 : lsns: Range<Lsn>,
120 2188 : ) -> Self {
121 2188 : Self {
122 2188 : magic: DELTA_FILE_MAGIC,
123 2188 : format_version: STORAGE_FORMAT_VERSION,
124 2188 :
125 2188 : tenant_id,
126 2188 : timeline_id,
127 2188 : key_range: keys,
128 2188 : lsn_range: lsns,
129 2188 :
130 2188 : index_start_blk: 0,
131 2188 : index_root_blk: 0,
132 2188 : }
133 2188 : }
134 : }
135 :
136 : // Flag indicating that this version initialize the page
137 : const WILL_INIT: u64 = 1;
138 :
139 : /// Struct representing reference to BLOB in layers.
140 : ///
141 : /// Reference contains BLOB offset, and for WAL records it also contains
142 : /// `will_init` flag. The flag helps to determine the range of records
143 : /// that needs to be applied, without reading/deserializing records themselves.
144 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
145 : pub struct BlobRef(pub u64);
146 :
147 : impl BlobRef {
148 5366709 : pub fn will_init(&self) -> bool {
149 5366709 : (self.0 & WILL_INIT) != 0
150 5366709 : }
151 :
152 9579856 : pub fn pos(&self) -> u64 {
153 9579856 : self.0 >> 1
154 9579856 : }
155 :
156 12942956 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
157 12942956 : let mut blob_ref = pos << 1;
158 12942956 : if will_init {
159 12939836 : blob_ref |= WILL_INIT;
160 12939836 : }
161 12942956 : BlobRef(blob_ref)
162 12942956 : }
163 : }
164 :
165 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
166 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
167 :
168 : /// This is the key of the B-tree index stored in the delta layer. It consists
169 : /// of the serialized representation of a Key and LSN.
170 : impl DeltaKey {
171 4128396 : fn from_slice(buf: &[u8]) -> Self {
172 4128396 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
173 4128396 : bytes.copy_from_slice(buf);
174 4128396 : DeltaKey(bytes)
175 4128396 : }
176 :
177 13378020 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
178 13378020 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
179 13378020 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
180 13378020 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
181 13378020 : DeltaKey(bytes)
182 13378020 : }
183 :
184 4128396 : fn key(&self) -> Key {
185 4128396 : Key::from_slice(&self.0)
186 4128396 : }
187 :
188 4128396 : fn lsn(&self) -> Lsn {
189 4128396 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
190 4128396 : }
191 :
192 5451340 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
193 5451340 : let mut lsn_buf = [0u8; 8];
194 5451340 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
195 5451340 : Lsn(u64::from_be_bytes(lsn_buf))
196 5451340 : }
197 : }
198 :
199 : /// This is used only from `pagectl`. Within pageserver, all layers are
200 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
201 : pub struct DeltaLayer {
202 : path: Utf8PathBuf,
203 : pub desc: PersistentLayerDesc,
204 : inner: OnceCell<Arc<DeltaLayerInner>>,
205 : }
206 :
207 : impl std::fmt::Debug for DeltaLayer {
208 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 : use super::RangeDisplayDebug;
210 :
211 0 : f.debug_struct("DeltaLayer")
212 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
213 0 : .field("lsn_range", &self.desc.lsn_range)
214 0 : .field("file_size", &self.desc.file_size)
215 0 : .field("inner", &self.inner)
216 0 : .finish()
217 0 : }
218 : }
219 :
220 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
221 : /// file.
222 : pub struct DeltaLayerInner {
223 : // values copied from summary
224 : index_start_blk: u32,
225 : index_root_blk: u32,
226 :
227 : file: Arc<VirtualFile>,
228 : file_id: FileId,
229 :
230 : layer_key_range: Range<Key>,
231 : layer_lsn_range: Range<Lsn>,
232 :
233 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
234 : }
235 :
236 : impl DeltaLayerInner {
237 0 : pub(crate) fn layer_dbg_info(&self) -> String {
238 0 : format!(
239 0 : "delta {}..{} {}..{}",
240 0 : self.key_range().start,
241 0 : self.key_range().end,
242 0 : self.lsn_range().start,
243 0 : self.lsn_range().end
244 0 : )
245 0 : }
246 : }
247 :
248 : impl std::fmt::Debug for DeltaLayerInner {
249 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 0 : f.debug_struct("DeltaLayerInner")
251 0 : .field("index_start_blk", &self.index_start_blk)
252 0 : .field("index_root_blk", &self.index_root_blk)
253 0 : .finish()
254 0 : }
255 : }
256 :
257 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
258 : impl std::fmt::Display for DeltaLayer {
259 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 0 : write!(f, "{}", self.layer_desc().short_id())
261 0 : }
262 : }
263 :
264 : impl AsLayerDesc for DeltaLayer {
265 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
266 0 : &self.desc
267 0 : }
268 : }
269 :
270 : impl DeltaLayer {
271 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
272 0 : self.desc.dump();
273 0 :
274 0 : if !verbose {
275 0 : return Ok(());
276 0 : }
277 :
278 0 : let inner = self.load(ctx).await?;
279 :
280 0 : inner.dump(ctx).await
281 0 : }
282 :
283 2936 : fn temp_path_for(
284 2936 : conf: &PageServerConf,
285 2936 : tenant_shard_id: &TenantShardId,
286 2936 : timeline_id: &TimelineId,
287 2936 : key_start: Key,
288 2936 : lsn_range: &Range<Lsn>,
289 2936 : ) -> Utf8PathBuf {
290 2936 : let rand_string: String = rand::thread_rng()
291 2936 : .sample_iter(&Alphanumeric)
292 2936 : .take(8)
293 2936 : .map(char::from)
294 2936 : .collect();
295 2936 :
296 2936 : conf.timeline_path(tenant_shard_id, timeline_id)
297 2936 : .join(format!(
298 2936 : "{}-XXX__{:016X}-{:016X}.{}.{}",
299 2936 : key_start,
300 2936 : u64::from(lsn_range.start),
301 2936 : u64::from(lsn_range.end),
302 2936 : rand_string,
303 2936 : TEMP_FILE_SUFFIX,
304 2936 : ))
305 2936 : }
306 :
307 : ///
308 : /// Open the underlying file and read the metadata into memory, if it's
309 : /// not loaded already.
310 : ///
311 0 : async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
312 0 : // Quick exit if already loaded
313 0 : self.inner
314 0 : .get_or_try_init(|| self.load_inner(ctx))
315 0 : .await
316 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
317 0 : }
318 :
319 0 : async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
320 0 : let path = self.path();
321 :
322 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
323 :
324 : // not production code
325 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
326 0 : let expected_layer_name = self.layer_desc().layer_name();
327 0 :
328 0 : if actual_layer_name != expected_layer_name {
329 0 : println!("warning: filename does not match what is expected from in-file summary");
330 0 : println!("actual: {:?}", actual_layer_name.to_string());
331 0 : println!("expected: {:?}", expected_layer_name.to_string());
332 0 : }
333 :
334 0 : Ok(Arc::new(loaded))
335 0 : }
336 :
337 : /// Create a DeltaLayer struct representing an existing file on disk.
338 : ///
339 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
340 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
341 0 : let mut summary_buf = vec![0; PAGE_SZ];
342 0 : file.read_exact_at(&mut summary_buf, 0)?;
343 0 : let summary = Summary::des_prefix(&summary_buf)?;
344 :
345 0 : let metadata = file
346 0 : .metadata()
347 0 : .context("get file metadata to determine size")?;
348 :
349 : // This function is never used for constructing layers in a running pageserver,
350 : // so it does not need an accurate TenantShardId.
351 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
352 0 :
353 0 : Ok(DeltaLayer {
354 0 : path: path.to_path_buf(),
355 0 : desc: PersistentLayerDesc::new_delta(
356 0 : tenant_shard_id,
357 0 : summary.timeline_id,
358 0 : summary.key_range,
359 0 : summary.lsn_range,
360 0 : metadata.len(),
361 0 : ),
362 0 : inner: OnceCell::new(),
363 0 : })
364 0 : }
365 :
366 : /// Path to the layer file in pageserver workdir.
367 0 : fn path(&self) -> Utf8PathBuf {
368 0 : self.path.clone()
369 0 : }
370 : }
371 :
372 : /// A builder object for constructing a new delta layer.
373 : ///
374 : /// Usage:
375 : ///
376 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
377 : ///
378 : /// 2. Write the contents by calling `put_value` for every page
379 : /// version to store in the layer.
380 : ///
381 : /// 3. Call `finish`.
382 : ///
383 : struct DeltaLayerWriterInner {
384 : pub path: Utf8PathBuf,
385 : timeline_id: TimelineId,
386 : tenant_shard_id: TenantShardId,
387 :
388 : key_start: Key,
389 : lsn_range: Range<Lsn>,
390 :
391 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
392 :
393 : blob_writer: BlobWriter<true>,
394 :
395 : // Number of key-lsns in the layer.
396 : num_keys: usize,
397 : }
398 :
399 : impl DeltaLayerWriterInner {
400 : ///
401 : /// Start building a new delta layer.
402 : ///
403 2936 : async fn new(
404 2936 : conf: &'static PageServerConf,
405 2936 : timeline_id: TimelineId,
406 2936 : tenant_shard_id: TenantShardId,
407 2936 : key_start: Key,
408 2936 : lsn_range: Range<Lsn>,
409 2936 : ctx: &RequestContext,
410 2936 : ) -> anyhow::Result<Self> {
411 2936 : // Create the file initially with a temporary filename. We don't know
412 2936 : // the end key yet, so we cannot form the final filename yet. We will
413 2936 : // rename it when we're done.
414 2936 : //
415 2936 : // Note: This overwrites any existing file. There shouldn't be any.
416 2936 : // FIXME: throw an error instead?
417 2936 : let path =
418 2936 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
419 :
420 2936 : let mut file = VirtualFile::create(&path, ctx).await?;
421 : // make room for the header block
422 2936 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
423 2936 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
424 2936 :
425 2936 : // Initialize the b-tree index builder
426 2936 : let block_buf = BlockBuf::new();
427 2936 : let tree_builder = DiskBtreeBuilder::new(block_buf);
428 2936 :
429 2936 : Ok(Self {
430 2936 : path,
431 2936 : timeline_id,
432 2936 : tenant_shard_id,
433 2936 : key_start,
434 2936 : lsn_range,
435 2936 : tree: tree_builder,
436 2936 : blob_writer,
437 2936 : num_keys: 0,
438 2936 : })
439 2936 : }
440 :
441 : ///
442 : /// Append a key-value pair to the file.
443 : ///
444 : /// The values must be appended in key, lsn order.
445 : ///
446 4169428 : async fn put_value(
447 4169428 : &mut self,
448 4169428 : key: Key,
449 4169428 : lsn: Lsn,
450 4169428 : val: Value,
451 4169428 : ctx: &RequestContext,
452 4169428 : ) -> anyhow::Result<()> {
453 4169428 : let (_, res) = self
454 4169428 : .put_value_bytes(
455 4169428 : key,
456 4169428 : lsn,
457 4169428 : Value::ser(&val)?.slice_len(),
458 4169428 : val.will_init(),
459 4169428 : ctx,
460 4169428 : )
461 4169428 : .await;
462 4169428 : res
463 4169428 : }
464 :
465 12942748 : async fn put_value_bytes<Buf>(
466 12942748 : &mut self,
467 12942748 : key: Key,
468 12942748 : lsn: Lsn,
469 12942748 : val: FullSlice<Buf>,
470 12942748 : will_init: bool,
471 12942748 : ctx: &RequestContext,
472 12942748 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
473 12942748 : where
474 12942748 : Buf: IoBuf + Send,
475 12942748 : {
476 12942748 : assert!(
477 12942748 : self.lsn_range.start <= lsn,
478 0 : "lsn_start={}, lsn={}",
479 : self.lsn_range.start,
480 : lsn
481 : );
482 : // We don't want to use compression in delta layer creation
483 12942748 : let compression = ImageCompressionAlgorithm::Disabled;
484 12942748 : let (val, res) = self
485 12942748 : .blob_writer
486 12942748 : .write_blob_maybe_compressed(val, ctx, compression)
487 12942748 : .await;
488 12942748 : let off = match res {
489 12942748 : Ok((off, _)) => off,
490 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
491 : };
492 :
493 12942748 : let blob_ref = BlobRef::new(off, will_init);
494 12942748 :
495 12942748 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
496 12942748 : let res = self.tree.append(&delta_key.0, blob_ref.0);
497 12942748 :
498 12942748 : self.num_keys += 1;
499 12942748 :
500 12942748 : (val, res.map_err(|e| anyhow::anyhow!(e)))
501 12942748 : }
502 :
503 4047944 : fn size(&self) -> u64 {
504 4047944 : self.blob_writer.size() + self.tree.borrow_writer().size()
505 4047944 : }
506 :
507 : ///
508 : /// Finish writing the delta layer.
509 : ///
510 2884 : async fn finish(
511 2884 : self,
512 2884 : key_end: Key,
513 2884 : ctx: &RequestContext,
514 2884 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
515 2884 : let temp_path = self.path.clone();
516 2884 : let result = self.finish0(key_end, ctx).await;
517 2884 : if let Err(ref e) = result {
518 0 : tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
519 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
520 0 : tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
521 0 : }
522 2884 : }
523 2884 : result
524 2884 : }
525 :
526 2884 : async fn finish0(
527 2884 : self,
528 2884 : key_end: Key,
529 2884 : ctx: &RequestContext,
530 2884 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
531 2884 : let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
532 :
533 2884 : let mut file = self.blob_writer.into_inner(ctx).await?;
534 :
535 : // Write out the index
536 2884 : let (index_root_blk, block_buf) = self.tree.finish()?;
537 2884 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
538 2884 : .await?;
539 30211 : for buf in block_buf.blocks {
540 27327 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
541 27327 : res?;
542 : }
543 2884 : assert!(self.lsn_range.start < self.lsn_range.end);
544 : // Fill in the summary on blk 0
545 2884 : let summary = Summary {
546 2884 : magic: DELTA_FILE_MAGIC,
547 2884 : format_version: STORAGE_FORMAT_VERSION,
548 2884 : tenant_id: self.tenant_shard_id.tenant_id,
549 2884 : timeline_id: self.timeline_id,
550 2884 : key_range: self.key_start..key_end,
551 2884 : lsn_range: self.lsn_range.clone(),
552 2884 : index_start_blk,
553 2884 : index_root_blk,
554 2884 : };
555 2884 :
556 2884 : let mut buf = Vec::with_capacity(PAGE_SZ);
557 2884 : // TODO: could use smallvec here but it's a pain with Slice<T>
558 2884 : Summary::ser_into(&summary, &mut buf)?;
559 2884 : file.seek(SeekFrom::Start(0)).await?;
560 2884 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
561 2884 : res?;
562 :
563 2884 : let metadata = file
564 2884 : .metadata()
565 2884 : .await
566 2884 : .context("get file metadata to determine size")?;
567 :
568 : // 5GB limit for objects without multipart upload (which we don't want to use)
569 : // Make it a little bit below to account for differing GB units
570 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
571 2884 : ensure!(
572 2884 : metadata.len() <= S3_UPLOAD_LIMIT,
573 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
574 0 : file.path(),
575 0 : metadata.len()
576 : );
577 :
578 : // Note: Because we opened the file in write-only mode, we cannot
579 : // reuse the same VirtualFile for reading later. That's why we don't
580 : // set inner.file here. The first read will have to re-open it.
581 :
582 2884 : let desc = PersistentLayerDesc::new_delta(
583 2884 : self.tenant_shard_id,
584 2884 : self.timeline_id,
585 2884 : self.key_start..key_end,
586 2884 : self.lsn_range.clone(),
587 2884 : metadata.len(),
588 2884 : );
589 2884 :
590 2884 : // fsync the file
591 2884 : file.sync_all()
592 2884 : .await
593 2884 : .maybe_fatal_err("delta_layer sync_all")?;
594 :
595 2884 : trace!("created delta layer {}", self.path);
596 :
597 2884 : Ok((desc, self.path))
598 2884 : }
599 : }
600 :
601 : /// A builder object for constructing a new delta layer.
602 : ///
603 : /// Usage:
604 : ///
605 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
606 : ///
607 : /// 2. Write the contents by calling `put_value` for every page
608 : /// version to store in the layer.
609 : ///
610 : /// 3. Call `finish`.
611 : ///
612 : /// # Note
613 : ///
614 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
615 : /// possible for the writer to drop before `finish` is actually called. So this
616 : /// could lead to odd temporary files in the directory, exhausting file system.
617 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
618 : /// implementation that cleans up the temporary file in failure. It's not
619 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
620 : /// out some fields, making it impossible to implement `Drop`.
621 : ///
622 : #[must_use]
623 : pub struct DeltaLayerWriter {
624 : inner: Option<DeltaLayerWriterInner>,
625 : }
626 :
627 : impl DeltaLayerWriter {
628 : ///
629 : /// Start building a new delta layer.
630 : ///
631 2936 : pub async fn new(
632 2936 : conf: &'static PageServerConf,
633 2936 : timeline_id: TimelineId,
634 2936 : tenant_shard_id: TenantShardId,
635 2936 : key_start: Key,
636 2936 : lsn_range: Range<Lsn>,
637 2936 : ctx: &RequestContext,
638 2936 : ) -> anyhow::Result<Self> {
639 2936 : Ok(Self {
640 2936 : inner: Some(
641 2936 : DeltaLayerWriterInner::new(
642 2936 : conf,
643 2936 : timeline_id,
644 2936 : tenant_shard_id,
645 2936 : key_start,
646 2936 : lsn_range,
647 2936 : ctx,
648 2936 : )
649 2936 : .await?,
650 : ),
651 : })
652 2936 : }
653 :
654 0 : pub fn is_empty(&self) -> bool {
655 0 : self.inner.as_ref().unwrap().num_keys == 0
656 0 : }
657 :
658 : ///
659 : /// Append a key-value pair to the file.
660 : ///
661 : /// The values must be appended in key, lsn order.
662 : ///
663 4169428 : pub async fn put_value(
664 4169428 : &mut self,
665 4169428 : key: Key,
666 4169428 : lsn: Lsn,
667 4169428 : val: Value,
668 4169428 : ctx: &RequestContext,
669 4169428 : ) -> anyhow::Result<()> {
670 4169428 : self.inner
671 4169428 : .as_mut()
672 4169428 : .unwrap()
673 4169428 : .put_value(key, lsn, val, ctx)
674 4169428 : .await
675 4169428 : }
676 :
677 8773320 : pub async fn put_value_bytes<Buf>(
678 8773320 : &mut self,
679 8773320 : key: Key,
680 8773320 : lsn: Lsn,
681 8773320 : val: FullSlice<Buf>,
682 8773320 : will_init: bool,
683 8773320 : ctx: &RequestContext,
684 8773320 : ) -> (FullSlice<Buf>, anyhow::Result<()>)
685 8773320 : where
686 8773320 : Buf: IoBuf + Send,
687 8773320 : {
688 8773320 : self.inner
689 8773320 : .as_mut()
690 8773320 : .unwrap()
691 8773320 : .put_value_bytes(key, lsn, val, will_init, ctx)
692 8773320 : .await
693 8773320 : }
694 :
695 4047944 : pub fn size(&self) -> u64 {
696 4047944 : self.inner.as_ref().unwrap().size()
697 4047944 : }
698 :
699 : ///
700 : /// Finish writing the delta layer.
701 : ///
702 2884 : pub(crate) async fn finish(
703 2884 : mut self,
704 2884 : key_end: Key,
705 2884 : ctx: &RequestContext,
706 2884 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
707 2884 : self.inner.take().unwrap().finish(key_end, ctx).await
708 2884 : }
709 :
710 24508 : pub(crate) fn num_keys(&self) -> usize {
711 24508 : self.inner.as_ref().unwrap().num_keys
712 24508 : }
713 :
714 30280 : pub(crate) fn estimated_size(&self) -> u64 {
715 30280 : let inner = self.inner.as_ref().unwrap();
716 30280 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
717 30280 : }
718 : }
719 :
720 : impl Drop for DeltaLayerWriter {
721 2936 : fn drop(&mut self) {
722 2936 : if let Some(inner) = self.inner.take() {
723 52 : // We want to remove the virtual file here, so it's fine to not
724 52 : // having completely flushed unwritten data.
725 52 : let vfile = inner.blob_writer.into_inner_no_flush();
726 52 : vfile.remove();
727 2884 : }
728 2936 : }
729 : }
730 :
731 : #[derive(thiserror::Error, Debug)]
732 : pub enum RewriteSummaryError {
733 : #[error("magic mismatch")]
734 : MagicMismatch,
735 : #[error(transparent)]
736 : Other(#[from] anyhow::Error),
737 : }
738 :
739 : impl From<std::io::Error> for RewriteSummaryError {
740 0 : fn from(e: std::io::Error) -> Self {
741 0 : Self::Other(anyhow::anyhow!(e))
742 0 : }
743 : }
744 :
745 : impl DeltaLayer {
746 0 : pub async fn rewrite_summary<F>(
747 0 : path: &Utf8Path,
748 0 : rewrite: F,
749 0 : ctx: &RequestContext,
750 0 : ) -> Result<(), RewriteSummaryError>
751 0 : where
752 0 : F: Fn(Summary) -> Summary,
753 0 : {
754 0 : let mut file = VirtualFile::open_with_options(
755 0 : path,
756 0 : virtual_file::OpenOptions::new().read(true).write(true),
757 0 : ctx,
758 0 : )
759 0 : .await
760 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
761 0 : let file_id = page_cache::next_file_id();
762 0 : let block_reader = FileBlockReader::new(&file, file_id);
763 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
764 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
765 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
766 0 : return Err(RewriteSummaryError::MagicMismatch);
767 0 : }
768 0 :
769 0 : let new_summary = rewrite(actual_summary);
770 0 :
771 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
772 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
773 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
774 0 : file.seek(SeekFrom::Start(0)).await?;
775 0 : let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
776 0 : res?;
777 0 : Ok(())
778 0 : }
779 : }
780 :
781 : impl DeltaLayerInner {
782 2072 : pub(crate) fn key_range(&self) -> &Range<Key> {
783 2072 : &self.layer_key_range
784 2072 : }
785 :
786 2072 : pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
787 2072 : &self.layer_lsn_range
788 2072 : }
789 :
790 2188 : pub(super) async fn load(
791 2188 : path: &Utf8Path,
792 2188 : summary: Option<Summary>,
793 2188 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
794 2188 : ctx: &RequestContext,
795 2188 : ) -> anyhow::Result<Self> {
796 2188 : let file = Arc::new(
797 2188 : VirtualFile::open_v2(path, ctx)
798 2188 : .await
799 2188 : .context("open layer file")?,
800 : );
801 :
802 2188 : let file_id = page_cache::next_file_id();
803 2188 :
804 2188 : let block_reader = FileBlockReader::new(&file, file_id);
805 :
806 2188 : let summary_blk = block_reader
807 2188 : .read_blk(0, ctx)
808 2188 : .await
809 2188 : .context("read first block")?;
810 :
811 : // TODO: this should be an assertion instead; see ImageLayerInner::load
812 2188 : let actual_summary =
813 2188 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
814 :
815 2188 : if let Some(mut expected_summary) = summary {
816 : // production code path
817 2188 : expected_summary.index_start_blk = actual_summary.index_start_blk;
818 2188 : expected_summary.index_root_blk = actual_summary.index_root_blk;
819 2188 : // mask out the timeline_id, but still require the layers to be from the same tenant
820 2188 : expected_summary.timeline_id = actual_summary.timeline_id;
821 2188 :
822 2188 : if actual_summary != expected_summary {
823 0 : bail!(
824 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
825 0 : actual_summary,
826 0 : expected_summary
827 0 : );
828 2188 : }
829 0 : }
830 :
831 2188 : Ok(DeltaLayerInner {
832 2188 : file,
833 2188 : file_id,
834 2188 : index_start_blk: actual_summary.index_start_blk,
835 2188 : index_root_blk: actual_summary.index_root_blk,
836 2188 : max_vectored_read_bytes,
837 2188 : layer_key_range: actual_summary.key_range,
838 2188 : layer_lsn_range: actual_summary.lsn_range,
839 2188 : })
840 2188 : }
841 :
842 : // Look up the keys in the provided keyspace and update
843 : // the reconstruct state with whatever is found.
844 : //
845 : // Currently, the index is visited for each range, but this
846 : // can be further optimised to visit the index only once.
847 434348 : pub(super) async fn get_values_reconstruct_data(
848 434348 : &self,
849 434348 : this: ResidentLayer,
850 434348 : keyspace: KeySpace,
851 434348 : lsn_range: Range<Lsn>,
852 434348 : reconstruct_state: &mut ValuesReconstructState,
853 434348 : ctx: &RequestContext,
854 434348 : ) -> Result<(), GetVectoredError> {
855 434348 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
856 434348 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
857 434348 : self.index_start_blk,
858 434348 : self.index_root_blk,
859 434348 : block_reader,
860 434348 : );
861 434348 :
862 434348 : let planner = VectoredReadPlanner::new(
863 434348 : self.max_vectored_read_bytes
864 434348 : .expect("Layer is loaded with max vectored bytes config")
865 434348 : .0
866 434348 : .into(),
867 434348 : );
868 434348 :
869 434348 : let data_end_offset = self.index_start_offset();
870 :
871 434348 : let reads = Self::plan_reads(
872 434348 : &keyspace,
873 434348 : lsn_range.clone(),
874 434348 : data_end_offset,
875 434348 : index_reader,
876 434348 : planner,
877 434348 : ctx,
878 434348 : )
879 434348 : .await
880 434348 : .map_err(GetVectoredError::Other)?;
881 :
882 434348 : self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
883 434348 : .await;
884 :
885 434348 : Ok(())
886 434348 : }
887 :
888 434752 : async fn plan_reads<Reader>(
889 434752 : keyspace: &KeySpace,
890 434752 : lsn_range: Range<Lsn>,
891 434752 : data_end_offset: u64,
892 434752 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
893 434752 : mut planner: VectoredReadPlanner,
894 434752 : ctx: &RequestContext,
895 434752 : ) -> anyhow::Result<Vec<VectoredRead>>
896 434752 : where
897 434752 : Reader: BlockReader + Clone,
898 434752 : {
899 434752 : let ctx = RequestContextBuilder::extend(ctx)
900 434752 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
901 434752 : .build();
902 :
903 435204 : for range in keyspace.ranges.iter() {
904 435204 : let mut range_end_handled = false;
905 435204 :
906 435204 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
907 435204 : let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
908 435204 : let mut index_stream = std::pin::pin!(index_stream);
909 :
910 1269999 : while let Some(index_entry) = index_stream.next().await {
911 1253188 : let (raw_key, value) = index_entry?;
912 1253188 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
913 1253188 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
914 1253188 : let blob_ref = BlobRef(value);
915 1253188 :
916 1253188 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
917 1253188 : assert!(key >= range.start);
918 :
919 1253188 : let outside_lsn_range = !lsn_range.contains(&lsn);
920 :
921 1253188 : let flag = {
922 1253188 : if outside_lsn_range {
923 84631 : BlobFlag::Ignore
924 1168557 : } else if blob_ref.will_init() {
925 1051305 : BlobFlag::ReplaceAll
926 : } else {
927 : // Usual path: add blob to the read
928 117252 : BlobFlag::None
929 : }
930 : };
931 :
932 1253188 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
933 418393 : planner.handle_range_end(blob_ref.pos());
934 418393 : range_end_handled = true;
935 418393 : break;
936 834795 : } else {
937 834795 : planner.handle(key, lsn, blob_ref.pos(), flag);
938 834795 : }
939 : }
940 :
941 435204 : if !range_end_handled {
942 16811 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
943 16811 : planner.handle_range_end(data_end_offset);
944 418393 : }
945 : }
946 :
947 434752 : Ok(planner.finish())
948 434752 : }
949 :
950 434748 : fn get_min_read_buffer_size(
951 434748 : planned_reads: &[VectoredRead],
952 434748 : read_size_soft_max: usize,
953 434748 : ) -> usize {
954 434748 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
955 173148 : return read_size_soft_max;
956 : };
957 :
958 261600 : let largest_read_size = largest_read.size();
959 261600 : if largest_read_size > read_size_soft_max {
960 : // If the read is oversized, it should only contain one key.
961 400 : let offenders = largest_read
962 400 : .blobs_at
963 400 : .as_slice()
964 400 : .iter()
965 400 : .filter_map(|(_, blob_meta)| {
966 400 : if blob_meta.key.is_rel_dir_key()
967 400 : || blob_meta.key == DBDIR_KEY
968 400 : || blob_meta.key.is_aux_file_key()
969 : {
970 : // The size of values for these keys is unbounded and can
971 : // grow very large in pathological cases.
972 0 : None
973 : } else {
974 400 : Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
975 : }
976 400 : })
977 400 : .join(", ");
978 400 :
979 400 : if !offenders.is_empty() {
980 400 : tracing::warn!(
981 0 : "Oversized vectored read ({} > {}) for keys {}",
982 : largest_read_size,
983 : read_size_soft_max,
984 : offenders
985 : );
986 0 : }
987 261200 : }
988 :
989 261600 : largest_read_size
990 434748 : }
991 :
992 434348 : async fn do_reads_and_update_state(
993 434348 : &self,
994 434348 : this: ResidentLayer,
995 434348 : reads: Vec<VectoredRead>,
996 434348 : reconstruct_state: &mut ValuesReconstructState,
997 434348 : ctx: &RequestContext,
998 434348 : ) {
999 434348 : let max_vectored_read_bytes = self
1000 434348 : .max_vectored_read_bytes
1001 434348 : .expect("Layer is loaded with max vectored bytes config")
1002 434348 : .0
1003 434348 : .into();
1004 434348 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
1005 :
1006 : // Note that reads are processed in reverse order (from highest key+lsn).
1007 : // This is the order that `ReconstructState` requires such that it can
1008 : // track when a key is done.
1009 434348 : for read in reads.into_iter().rev() {
1010 267206 : let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
1011 374767 : for (_, blob_meta) in read.blobs_at.as_slice().iter().rev() {
1012 374767 : let io = reconstruct_state.update_key(
1013 374767 : &blob_meta.key,
1014 374767 : blob_meta.lsn,
1015 374767 : blob_meta.will_init,
1016 374767 : );
1017 374767 : ios.insert((blob_meta.key, blob_meta.lsn), io);
1018 374767 : }
1019 :
1020 267206 : let read_extend_residency = this.clone();
1021 267206 : let read_from = self.file.clone();
1022 267206 : let read_ctx = ctx.attached_child();
1023 267206 : reconstruct_state
1024 267206 : .spawn_io(async move {
1025 267206 : let vectored_blob_reader = VectoredBlobReader::new(&read_from);
1026 267206 : let buf = IoBufferMut::with_capacity(buf_size);
1027 :
1028 267206 : let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
1029 267206 : match res {
1030 267206 : Ok(blobs_buf) => {
1031 267206 : let view = BufView::new_slice(&blobs_buf.buf);
1032 374767 : for meta in blobs_buf.blobs.iter().rev() {
1033 374767 : let io = ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
1034 :
1035 374767 : let blob_read = meta.read(&view).await;
1036 374767 : let blob_read = match blob_read {
1037 374767 : Ok(buf) => buf,
1038 0 : Err(e) => {
1039 0 : io.complete(Err(e));
1040 0 : continue;
1041 : }
1042 : };
1043 :
1044 374767 : io.complete(Ok(OnDiskValue::WalRecordOrImage(
1045 374767 : blob_read.into_bytes(),
1046 374767 : )));
1047 : }
1048 :
1049 267206 : assert!(ios.is_empty());
1050 : }
1051 0 : Err(err) => {
1052 0 : for (_, sender) in ios {
1053 0 : sender.complete(Err(std::io::Error::new(
1054 0 : err.kind(),
1055 0 : "vec read failed",
1056 0 : )));
1057 0 : }
1058 : }
1059 : }
1060 :
1061 : // keep layer resident until this IO is done; this spawned IO future generally outlives the
1062 : // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
1063 267206 : drop(read_extend_residency);
1064 267206 : })
1065 267206 : .await;
1066 : }
1067 434348 : }
1068 :
1069 812 : pub(crate) async fn index_entries<'a>(
1070 812 : &'a self,
1071 812 : ctx: &RequestContext,
1072 812 : ) -> Result<Vec<DeltaEntry<'a>>> {
1073 812 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1074 812 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1075 812 : self.index_start_blk,
1076 812 : self.index_root_blk,
1077 812 : block_reader,
1078 812 : );
1079 812 :
1080 812 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1081 812 :
1082 812 : tree_reader
1083 812 : .visit(
1084 812 : &[0u8; DELTA_KEY_SIZE],
1085 812 : VisitDirection::Forwards,
1086 4128092 : |key, value| {
1087 4128092 : let delta_key = DeltaKey::from_slice(key);
1088 4128092 : let val_ref = ValueRef {
1089 4128092 : blob_ref: BlobRef(value),
1090 4128092 : layer: self,
1091 4128092 : };
1092 4128092 : let pos = BlobRef(value).pos();
1093 4128092 : if let Some(last) = all_keys.last_mut() {
1094 4127280 : // subtract offset of the current and last entries to get the size
1095 4127280 : // of the value associated with this (key, lsn) tuple
1096 4127280 : let first_pos = last.size;
1097 4127280 : last.size = pos - first_pos;
1098 4127280 : }
1099 4128092 : let entry = DeltaEntry {
1100 4128092 : key: delta_key.key(),
1101 4128092 : lsn: delta_key.lsn(),
1102 4128092 : size: pos,
1103 4128092 : val: val_ref,
1104 4128092 : };
1105 4128092 : all_keys.push(entry);
1106 4128092 : true
1107 4128092 : },
1108 812 : &RequestContextBuilder::extend(ctx)
1109 812 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1110 812 : .build(),
1111 812 : )
1112 812 : .await?;
1113 812 : if let Some(last) = all_keys.last_mut() {
1114 812 : // Last key occupies all space till end of value storage,
1115 812 : // which corresponds to beginning of the index
1116 812 : last.size = self.index_start_offset() - last.size;
1117 812 : }
1118 812 : Ok(all_keys)
1119 812 : }
1120 :
1121 : /// Using the given writer, write out a version which has the earlier Lsns than `until`.
1122 : ///
1123 : /// Return the amount of key value records pushed to the writer.
1124 20 : pub(super) async fn copy_prefix(
1125 20 : &self,
1126 20 : writer: &mut DeltaLayerWriter,
1127 20 : until: Lsn,
1128 20 : ctx: &RequestContext,
1129 20 : ) -> anyhow::Result<usize> {
1130 : use futures::stream::TryStreamExt;
1131 :
1132 : use crate::tenant::vectored_blob_io::{
1133 : BlobMeta, ChunkedVectoredReadBuilder, VectoredReadExtended,
1134 : };
1135 :
1136 : #[derive(Debug)]
1137 : enum Item {
1138 : Actual(Key, Lsn, BlobRef),
1139 : Sentinel,
1140 : }
1141 :
1142 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1143 140 : fn from(value: Item) -> Self {
1144 140 : match value {
1145 120 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1146 20 : Item::Sentinel => None,
1147 : }
1148 140 : }
1149 : }
1150 :
1151 : impl Item {
1152 140 : fn offset(&self) -> Option<BlobRef> {
1153 140 : match self {
1154 120 : Item::Actual(_, _, blob) => Some(*blob),
1155 20 : Item::Sentinel => None,
1156 : }
1157 140 : }
1158 :
1159 140 : fn is_last(&self) -> bool {
1160 140 : matches!(self, Item::Sentinel)
1161 140 : }
1162 : }
1163 :
1164 20 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1165 20 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1166 20 : self.index_start_blk,
1167 20 : self.index_root_blk,
1168 20 : block_reader,
1169 20 : );
1170 20 :
1171 20 : let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1172 120 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1173 20 : // put in a sentinel value for getting the end offset for last item, and not having to
1174 20 : // repeat the whole read part
1175 20 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1176 20 : Item::Sentinel,
1177 20 : ))));
1178 20 : let mut stream = std::pin::pin!(stream);
1179 20 :
1180 20 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1181 20 :
1182 20 : let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
1183 20 :
1184 20 : let max_read_size = self
1185 20 : .max_vectored_read_bytes
1186 20 : .map(|x| x.0.get())
1187 20 : .unwrap_or(8192);
1188 20 :
1189 20 : let mut buffer = Some(IoBufferMut::with_capacity(max_read_size));
1190 20 :
1191 20 : // FIXME: buffering of DeltaLayerWriter
1192 20 : let mut per_blob_copy = Vec::new();
1193 20 :
1194 20 : let mut records = 0;
1195 :
1196 160 : while let Some(item) = stream.try_next().await? {
1197 140 : tracing::debug!(?item, "popped");
1198 140 : let offset = item
1199 140 : .offset()
1200 140 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1201 :
1202 140 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1203 120 : let end_offset = offset;
1204 120 :
1205 120 : Some((
1206 120 : BlobMeta {
1207 120 : key,
1208 120 : lsn,
1209 120 : will_init: false,
1210 120 : },
1211 120 : start_offset..end_offset,
1212 120 : ))
1213 : } else {
1214 20 : None
1215 : };
1216 :
1217 140 : let is_last = item.is_last();
1218 140 :
1219 140 : prev = Option::from(item);
1220 140 :
1221 140 : let actionable = actionable.filter(|x| x.0.lsn < until);
1222 :
1223 140 : let builder = if let Some((meta, offsets)) = actionable {
1224 : // extend or create a new builder
1225 64 : if read_builder
1226 64 : .as_mut()
1227 64 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1228 64 : .unwrap_or(VectoredReadExtended::No)
1229 64 : == VectoredReadExtended::Yes
1230 : {
1231 32 : None
1232 : } else {
1233 32 : read_builder.replace(ChunkedVectoredReadBuilder::new(
1234 32 : offsets.start.pos(),
1235 32 : offsets.end.pos(),
1236 32 : meta,
1237 32 : max_read_size,
1238 32 : ))
1239 : }
1240 : } else {
1241 : // nothing to do, except perhaps flush any existing for the last element
1242 76 : None
1243 : };
1244 :
1245 : // flush the possible older builder and also the new one if the item was the last one
1246 140 : let builders = builder.into_iter();
1247 140 : let builders = if is_last {
1248 20 : builders.chain(read_builder.take())
1249 : } else {
1250 120 : builders.chain(None)
1251 : };
1252 :
1253 172 : for builder in builders {
1254 32 : let read = builder.build();
1255 32 :
1256 32 : let reader = VectoredBlobReader::new(&self.file);
1257 32 :
1258 32 : let mut buf = buffer.take().unwrap();
1259 32 :
1260 32 : buf.clear();
1261 32 : buf.reserve(read.size());
1262 32 : let res = reader.read_blobs(&read, buf, ctx).await?;
1263 :
1264 32 : let view = BufView::new_slice(&res.buf);
1265 :
1266 96 : for blob in res.blobs {
1267 64 : let key = blob.meta.key;
1268 64 : let lsn = blob.meta.lsn;
1269 :
1270 64 : let data = blob.read(&view).await?;
1271 :
1272 : #[cfg(debug_assertions)]
1273 64 : Value::des(&data)
1274 64 : .with_context(|| {
1275 0 : format!(
1276 0 : "blob failed to deserialize for {}: {:?}",
1277 0 : blob,
1278 0 : utils::Hex(&data)
1279 0 : )
1280 64 : })
1281 64 : .unwrap();
1282 64 :
1283 64 : // is it an image or will_init walrecord?
1284 64 : // FIXME: this could be handled by threading the BlobRef to the
1285 64 : // VectoredReadBuilder
1286 64 : let will_init = pageserver_api::value::ValueBytes::will_init(&data)
1287 64 : .inspect_err(|_e| {
1288 0 : #[cfg(feature = "testing")]
1289 0 : tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
1290 64 : })
1291 64 : .unwrap_or(false);
1292 64 :
1293 64 : per_blob_copy.clear();
1294 64 : per_blob_copy.extend_from_slice(&data);
1295 :
1296 64 : let (tmp, res) = writer
1297 64 : .put_value_bytes(
1298 64 : key,
1299 64 : lsn,
1300 64 : std::mem::take(&mut per_blob_copy).slice_len(),
1301 64 : will_init,
1302 64 : ctx,
1303 64 : )
1304 64 : .await;
1305 64 : per_blob_copy = tmp.into_raw_slice().into_inner();
1306 64 :
1307 64 : res?;
1308 :
1309 64 : records += 1;
1310 : }
1311 :
1312 32 : buffer = Some(res.buf);
1313 : }
1314 : }
1315 :
1316 20 : assert!(
1317 20 : read_builder.is_none(),
1318 0 : "with the sentinel above loop should had handled all"
1319 : );
1320 :
1321 20 : Ok(records)
1322 20 : }
1323 :
1324 8 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1325 8 : println!(
1326 8 : "index_start_blk: {}, root {}",
1327 8 : self.index_start_blk, self.index_root_blk
1328 8 : );
1329 8 :
1330 8 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1331 8 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1332 8 : self.index_start_blk,
1333 8 : self.index_root_blk,
1334 8 : block_reader,
1335 8 : );
1336 8 :
1337 8 : tree_reader.dump(ctx).await?;
1338 :
1339 8 : let keys = self.index_entries(ctx).await?;
1340 :
1341 16 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1342 16 : let buf = val.load_raw(ctx).await?;
1343 16 : let val = Value::des(&buf)?;
1344 16 : let desc = match val {
1345 16 : Value::Image(img) => {
1346 16 : format!(" img {} bytes", img.len())
1347 : }
1348 0 : Value::WalRecord(rec) => {
1349 0 : let wal_desc = pageserver_api::record::describe_wal_record(&rec)?;
1350 0 : format!(
1351 0 : " rec {} bytes will_init: {} {}",
1352 0 : buf.len(),
1353 0 : rec.will_init(),
1354 0 : wal_desc
1355 0 : )
1356 : }
1357 : };
1358 16 : Ok(desc)
1359 16 : }
1360 :
1361 24 : for entry in keys {
1362 16 : let DeltaEntry { key, lsn, val, .. } = entry;
1363 16 : let desc = match dump_blob(&val, ctx).await {
1364 16 : Ok(desc) => desc,
1365 0 : Err(err) => {
1366 0 : format!("ERROR: {err}")
1367 : }
1368 : };
1369 16 : println!(" key {key} at {lsn}: {desc}");
1370 :
1371 : // Print more details about CHECKPOINT records. Would be nice to print details
1372 : // of many other record types too, but these are particularly interesting, as
1373 : // have a lot of special processing for them in walingest.rs.
1374 8 : use pageserver_api::key::CHECKPOINT_KEY;
1375 8 : use postgres_ffi::CheckPoint;
1376 16 : if key == CHECKPOINT_KEY {
1377 0 : let val = val.load(ctx).await?;
1378 0 : match val {
1379 0 : Value::Image(img) => {
1380 0 : let checkpoint = CheckPoint::decode(&img)?;
1381 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1382 : }
1383 0 : Value::WalRecord(_rec) => {
1384 0 : println!(" unexpected walrecord value for checkpoint key");
1385 0 : }
1386 : }
1387 16 : }
1388 : }
1389 :
1390 8 : Ok(())
1391 8 : }
1392 :
1393 60 : fn stream_index_forwards<'a, R>(
1394 60 : &'a self,
1395 60 : reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
1396 60 : start: &'a [u8; DELTA_KEY_SIZE],
1397 60 : ctx: &'a RequestContext,
1398 60 : ) -> impl futures::stream::Stream<
1399 60 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1400 60 : > + 'a
1401 60 : where
1402 60 : R: BlockReader + 'a,
1403 60 : {
1404 : use futures::stream::TryStreamExt;
1405 60 : let stream = reader.into_stream(start, ctx);
1406 304 : stream.map_ok(|(key, value)| {
1407 304 : let key = DeltaKey::from_slice(&key);
1408 304 : let (key, lsn) = (key.key(), key.lsn());
1409 304 : let offset = BlobRef(value);
1410 304 :
1411 304 : (key, lsn, offset)
1412 304 : })
1413 60 : }
1414 :
1415 : /// The file offset to the first block of index.
1416 : ///
1417 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1418 436392 : fn index_start_offset(&self) -> u64 {
1419 436392 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1420 436392 : let bref = BlobRef(offset);
1421 436392 : tracing::debug!(
1422 : index_start_blk = self.index_start_blk,
1423 : offset,
1424 0 : pos = bref.pos(),
1425 0 : "index_start_offset"
1426 : );
1427 436392 : offset
1428 436392 : }
1429 :
1430 1148 : pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
1431 1148 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1432 1148 : let tree_reader =
1433 1148 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
1434 1148 : DeltaLayerIterator {
1435 1148 : delta_layer: self,
1436 1148 : ctx,
1437 1148 : index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
1438 1148 : key_values_batch: std::collections::VecDeque::new(),
1439 1148 : is_end: false,
1440 1148 : planner: StreamingVectoredReadPlanner::new(
1441 1148 : 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
1442 1148 : 1024, // The default value. Unit tests might use a different value
1443 1148 : ),
1444 1148 : }
1445 1148 : }
1446 :
1447 : /// NB: not super efficient, but not terrible either. Should prob be an iterator.
1448 : //
1449 : // We're reusing the index traversal logical in plan_reads; would be nice to
1450 : // factor that out.
1451 0 : pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
1452 0 : self.index_entries(ctx)
1453 0 : .await
1454 0 : .map(|entries| entries.into_iter().map(|entry| entry.key).collect())
1455 0 : }
1456 : }
1457 :
1458 : /// A set of data associated with a delta layer key and its value
1459 : pub struct DeltaEntry<'a> {
1460 : pub key: Key,
1461 : pub lsn: Lsn,
1462 : /// Size of the stored value
1463 : pub size: u64,
1464 : /// Reference to the on-disk value
1465 : pub val: ValueRef<'a>,
1466 : }
1467 :
1468 : /// Reference to an on-disk value
1469 : pub struct ValueRef<'a> {
1470 : blob_ref: BlobRef,
1471 : layer: &'a DeltaLayerInner,
1472 : }
1473 :
1474 : impl ValueRef<'_> {
1475 : /// Loads the value from disk
1476 0 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1477 0 : let buf = self.load_raw(ctx).await?;
1478 0 : let val = Value::des(&buf)?;
1479 0 : Ok(val)
1480 0 : }
1481 :
1482 16 : async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
1483 16 : let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
1484 16 : self.layer,
1485 16 : )));
1486 16 : let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
1487 16 : Ok(buf)
1488 16 : }
1489 : }
1490 :
1491 : pub(crate) struct Adapter<T>(T);
1492 :
1493 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1494 16 : pub(crate) async fn read_blk(
1495 16 : &self,
1496 16 : blknum: u32,
1497 16 : ctx: &RequestContext,
1498 16 : ) -> Result<BlockLease, std::io::Error> {
1499 16 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1500 16 : block_reader.read_blk(blknum, ctx).await
1501 16 : }
1502 : }
1503 :
1504 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1505 32 : fn as_ref(&self) -> &DeltaLayerInner {
1506 32 : self
1507 32 : }
1508 : }
1509 :
1510 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1511 0 : fn key(&self) -> Key {
1512 0 : self.key
1513 0 : }
1514 0 : fn lsn(&self) -> Lsn {
1515 0 : self.lsn
1516 0 : }
1517 0 : fn size(&self) -> u64 {
1518 0 : self.size
1519 0 : }
1520 : }
1521 :
1522 : pub struct DeltaLayerIterator<'a> {
1523 : delta_layer: &'a DeltaLayerInner,
1524 : ctx: &'a RequestContext,
1525 : planner: StreamingVectoredReadPlanner,
1526 : index_iter: DiskBtreeIterator<'a>,
1527 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1528 : is_end: bool,
1529 : }
1530 :
1531 : impl DeltaLayerIterator<'_> {
1532 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1533 0 : self.delta_layer.layer_dbg_info()
1534 0 : }
1535 :
1536 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1537 42808 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1538 42808 : assert!(self.key_values_batch.is_empty());
1539 42808 : assert!(!self.is_end);
1540 :
1541 42808 : let plan = loop {
1542 4199244 : if let Some(res) = self.index_iter.next().await {
1543 4198152 : let (raw_key, value) = res?;
1544 4198152 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
1545 4198152 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
1546 4198152 : let blob_ref = BlobRef(value);
1547 4198152 : let offset = blob_ref.pos();
1548 41716 : if let Some(batch_plan) =
1549 4198152 : self.planner.handle(key, lsn, offset, blob_ref.will_init())
1550 : {
1551 41716 : break batch_plan;
1552 4156436 : }
1553 : } else {
1554 1092 : self.is_end = true;
1555 1092 : let data_end_offset = self.delta_layer.index_start_offset();
1556 1092 : if let Some(item) = self.planner.handle_range_end(data_end_offset) {
1557 1092 : break item;
1558 : } else {
1559 0 : return Ok(()); // TODO: test empty iterator
1560 : }
1561 : }
1562 : };
1563 42808 : let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
1564 42808 : let mut next_batch = std::collections::VecDeque::new();
1565 42808 : let buf_size = plan.size();
1566 42808 : let buf = IoBufferMut::with_capacity(buf_size);
1567 42808 : let blobs_buf = vectored_blob_reader
1568 42808 : .read_blobs(&plan, buf, self.ctx)
1569 42808 : .await?;
1570 42808 : let view = BufView::new_slice(&blobs_buf.buf);
1571 4198096 : for meta in blobs_buf.blobs.iter() {
1572 4198096 : let blob_read = meta.read(&view).await?;
1573 4198096 : let value = Value::des(&blob_read)?;
1574 :
1575 4198096 : next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
1576 : }
1577 42808 : self.key_values_batch = next_batch;
1578 42808 : Ok(())
1579 42808 : }
1580 :
1581 4199668 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1582 4199668 : if self.key_values_batch.is_empty() {
1583 44760 : if self.is_end {
1584 2120 : return Ok(None);
1585 42640 : }
1586 42640 : self.next_batch().await?;
1587 4154908 : }
1588 4197548 : Ok(Some(
1589 4197548 : self.key_values_batch
1590 4197548 : .pop_front()
1591 4197548 : .expect("should not be empty"),
1592 4197548 : ))
1593 4199668 : }
1594 : }
1595 :
1596 : #[cfg(test)]
1597 : pub(crate) mod test {
1598 : use std::collections::BTreeMap;
1599 :
1600 : use bytes::Bytes;
1601 : use itertools::MinMaxResult;
1602 : use pageserver_api::value::Value;
1603 : use rand::RngCore;
1604 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1605 :
1606 : use super::*;
1607 : use crate::DEFAULT_PG_VERSION;
1608 : use crate::context::DownloadBehavior;
1609 : use crate::task_mgr::TaskKind;
1610 : use crate::tenant::disk_btree::tests::TestDisk;
1611 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
1612 : use crate::tenant::storage_layer::{Layer, ResidentLayer};
1613 : use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
1614 : use crate::tenant::{Tenant, Timeline};
1615 :
1616 : /// Construct an index for a fictional delta layer and and then
1617 : /// traverse in order to plan vectored reads for a query. Finally,
1618 : /// verify that the traversal fed the right index key and value
1619 : /// pairs into the planner.
1620 : #[tokio::test]
1621 4 : async fn test_delta_layer_index_traversal() {
1622 4 : let base_key = Key {
1623 4 : field1: 0,
1624 4 : field2: 1663,
1625 4 : field3: 12972,
1626 4 : field4: 16396,
1627 4 : field5: 0,
1628 4 : field6: 246080,
1629 4 : };
1630 4 :
1631 4 : // Populate the index with some entries
1632 4 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1633 4 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1634 4 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1635 4 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1636 4 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1637 4 : ]);
1638 4 :
1639 4 : let mut disk = TestDisk::default();
1640 4 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1641 4 :
1642 4 : let mut disk_offset = 0;
1643 20 : for (key, lsns) in &entries {
1644 84 : for lsn in lsns {
1645 68 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1646 68 : let blob_ref = BlobRef::new(disk_offset, false);
1647 68 : writer
1648 68 : .append(&index_key.0, blob_ref.0)
1649 68 : .expect("In memory disk append should never fail");
1650 68 :
1651 68 : disk_offset += 1;
1652 68 : }
1653 4 : }
1654 4 :
1655 4 : // Prepare all the arguments for the call into `plan_reads` below
1656 4 : let (root_offset, _writer) = writer
1657 4 : .finish()
1658 4 : .expect("In memory disk finish should never fail");
1659 4 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1660 4 : let planner = VectoredReadPlanner::new(100);
1661 4 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1662 4 :
1663 4 : let keyspace = KeySpace {
1664 4 : ranges: vec![
1665 4 : base_key..base_key.add(3),
1666 4 : base_key.add(3)..base_key.add(100),
1667 4 : ],
1668 4 : };
1669 4 : let lsn_range = Lsn(2)..Lsn(40);
1670 4 :
1671 4 : // Plan and validate
1672 4 : let vectored_reads = DeltaLayerInner::plan_reads(
1673 4 : &keyspace,
1674 4 : lsn_range.clone(),
1675 4 : disk_offset,
1676 4 : reader,
1677 4 : planner,
1678 4 : &ctx,
1679 4 : )
1680 4 : .await
1681 4 : .expect("Read planning should not fail");
1682 4 :
1683 4 : validate(keyspace, lsn_range, vectored_reads, entries);
1684 4 : }
1685 :
1686 4 : fn validate(
1687 4 : keyspace: KeySpace,
1688 4 : lsn_range: Range<Lsn>,
1689 4 : vectored_reads: Vec<VectoredRead>,
1690 4 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1691 4 : ) {
1692 : #[derive(Debug, PartialEq, Eq)]
1693 : struct BlobSpec {
1694 : key: Key,
1695 : lsn: Lsn,
1696 : at: u64,
1697 : }
1698 :
1699 4 : let mut planned_blobs = Vec::new();
1700 60 : for read in vectored_reads {
1701 56 : for (at, meta) in read.blobs_at.as_slice() {
1702 56 : planned_blobs.push(BlobSpec {
1703 56 : key: meta.key,
1704 56 : lsn: meta.lsn,
1705 56 : at: *at,
1706 56 : });
1707 56 : }
1708 : }
1709 :
1710 4 : let mut expected_blobs = Vec::new();
1711 4 : let mut disk_offset = 0;
1712 20 : for (key, lsns) in index_entries {
1713 84 : for lsn in lsns {
1714 84 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1715 68 : let lsn_included = lsn_range.contains(&lsn);
1716 68 :
1717 68 : if key_included && lsn_included {
1718 56 : expected_blobs.push(BlobSpec {
1719 56 : key,
1720 56 : lsn,
1721 56 : at: disk_offset,
1722 56 : });
1723 56 : }
1724 :
1725 68 : disk_offset += 1;
1726 : }
1727 : }
1728 :
1729 4 : assert_eq!(planned_blobs, expected_blobs);
1730 4 : }
1731 :
1732 : mod constants {
1733 : use utils::lsn::Lsn;
1734 :
1735 : /// Offset used by all lsns in this test
1736 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1737 : /// Number of unique keys including in the test data
1738 : pub(super) const KEY_COUNT: u8 = 60;
1739 : /// Max number of different lsns for each key
1740 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1741 : /// Possible value sizes for each key along with a probability weight
1742 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1743 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1744 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1745 : /// The minimum size of a key range in all the generated reads
1746 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1747 : /// The number of ranges included in each vectored read
1748 : pub(super) const RANGES_COUNT: u8 = 2;
1749 : /// The number of vectored reads performed
1750 : pub(super) const READS_COUNT: u8 = 100;
1751 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1752 : /// with values larger than the limit
1753 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1754 : }
1755 :
1756 : struct Entry {
1757 : key: Key,
1758 : lsn: Lsn,
1759 : value: Vec<u8>,
1760 : }
1761 :
1762 4 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1763 4 : let mut current_key = Key::MIN;
1764 4 :
1765 4 : let mut entries = Vec::new();
1766 244 : for _ in 0..constants::KEY_COUNT {
1767 240 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1768 240 : let mut lsns_iter =
1769 4520 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1770 4520 : Some(Lsn(lsn.0 + 0x08))
1771 4520 : });
1772 240 : let mut lsns = Vec::new();
1773 4760 : while lsns.len() < count as usize {
1774 4520 : let take = rng.gen_bool(0.5);
1775 4520 : let lsn = lsns_iter.next().unwrap();
1776 4520 : if take {
1777 2224 : lsns.push(lsn);
1778 2296 : }
1779 : }
1780 :
1781 2464 : for lsn in lsns {
1782 2224 : let size = constants::VALUE_SIZES
1783 6672 : .choose_weighted(rng, |item| item.1)
1784 2224 : .unwrap()
1785 2224 : .0;
1786 2224 : let mut buf = vec![0; size];
1787 2224 : rng.fill_bytes(&mut buf);
1788 2224 :
1789 2224 : entries.push(Entry {
1790 2224 : key: current_key,
1791 2224 : lsn,
1792 2224 : value: buf,
1793 2224 : })
1794 : }
1795 :
1796 240 : let gap = constants::KEY_GAP_CHANGES
1797 480 : .choose_weighted(rng, |item| item.1)
1798 240 : .unwrap()
1799 240 : .0;
1800 240 : if gap {
1801 76 : current_key = current_key.add(2);
1802 164 : } else {
1803 164 : current_key = current_key.add(1);
1804 164 : }
1805 : }
1806 :
1807 4 : entries
1808 4 : }
1809 :
1810 : struct EntriesMeta {
1811 : key_range: Range<Key>,
1812 : lsn_range: Range<Lsn>,
1813 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1814 : }
1815 :
1816 4 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1817 2224 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1818 4 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1819 0 : _ => panic!("More than one entry is always expected"),
1820 : };
1821 :
1822 2224 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1823 4 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1824 0 : _ => panic!("More than one entry is always expected"),
1825 : };
1826 :
1827 4 : let mut index = BTreeMap::new();
1828 2224 : for entry in entries.iter() {
1829 2224 : index.insert((entry.key, entry.lsn), entry.value.clone());
1830 2224 : }
1831 :
1832 4 : EntriesMeta {
1833 4 : key_range,
1834 4 : lsn_range,
1835 4 : index,
1836 4 : }
1837 4 : }
1838 :
1839 400 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1840 400 : let start = key_range.start.to_i128();
1841 400 : let end = key_range.end.to_i128();
1842 400 :
1843 400 : let mut keyspace = KeySpace::default();
1844 :
1845 1200 : for _ in 0..constants::RANGES_COUNT {
1846 800 : let mut range: Option<Range<Key>> = Option::default();
1847 2488 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1848 1688 : let range_start = rng.gen_range(start..end);
1849 1688 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1850 1688 : if range_end_offset >= end {
1851 200 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1852 1488 : } else {
1853 1488 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1854 1488 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1855 1488 : }
1856 : }
1857 800 : keyspace.ranges.push(range.unwrap());
1858 : }
1859 :
1860 400 : keyspace
1861 400 : }
1862 :
1863 : #[tokio::test]
1864 4 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1865 4 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
1866 4 : let (tenant, ctx) = harness.load().await;
1867 4 :
1868 4 : let timeline_id = TimelineId::generate();
1869 4 : let timeline = tenant
1870 4 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1871 4 : .await?;
1872 4 :
1873 4 : tracing::info!("Generating test data ...");
1874 4 :
1875 4 : let rng = &mut StdRng::seed_from_u64(0);
1876 4 : let entries = generate_entries(rng);
1877 4 : let entries_meta = get_entries_meta(&entries);
1878 4 :
1879 4 : tracing::info!("Done generating {} entries", entries.len());
1880 4 :
1881 4 : tracing::info!("Writing test data to delta layer ...");
1882 4 : let mut writer = DeltaLayerWriter::new(
1883 4 : harness.conf,
1884 4 : timeline_id,
1885 4 : harness.tenant_shard_id,
1886 4 : entries_meta.key_range.start,
1887 4 : entries_meta.lsn_range.clone(),
1888 4 : &ctx,
1889 4 : )
1890 4 : .await?;
1891 4 :
1892 2228 : for entry in entries {
1893 2224 : let (_, res) = writer
1894 2224 : .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
1895 2224 : .await;
1896 2224 : res?;
1897 4 : }
1898 4 :
1899 4 : let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
1900 4 : let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
1901 4 :
1902 4 : let inner = resident.get_as_delta(&ctx).await?;
1903 4 :
1904 4 : let file_size = inner.file.metadata().await?.len();
1905 4 : tracing::info!(
1906 4 : "Done writing test data to delta layer. Resulting file size is: {}",
1907 4 : file_size
1908 4 : );
1909 4 :
1910 404 : for i in 0..constants::READS_COUNT {
1911 400 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1912 4 :
1913 400 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1914 400 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1915 400 : inner.index_start_blk,
1916 400 : inner.index_root_blk,
1917 400 : block_reader,
1918 400 : );
1919 400 :
1920 400 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1921 400 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1922 400 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1923 4 :
1924 400 : let vectored_reads = DeltaLayerInner::plan_reads(
1925 400 : &keyspace,
1926 400 : entries_meta.lsn_range.clone(),
1927 400 : data_end_offset,
1928 400 : index_reader,
1929 400 : planner,
1930 400 : &ctx,
1931 400 : )
1932 400 : .await?;
1933 4 :
1934 400 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1935 400 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1936 400 : &vectored_reads,
1937 400 : constants::MAX_VECTORED_READ_BYTES,
1938 400 : );
1939 400 : let mut buf = Some(IoBufferMut::with_capacity(buf_size));
1940 4 :
1941 39848 : for read in vectored_reads {
1942 39448 : let blobs_buf = vectored_blob_reader
1943 39448 : .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
1944 39448 : .await?;
1945 39448 : let view = BufView::new_slice(&blobs_buf.buf);
1946 114608 : for meta in blobs_buf.blobs.iter() {
1947 114608 : let value = meta.read(&view).await?;
1948 114608 : assert_eq!(
1949 114608 : &value[..],
1950 114608 : &entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
1951 114608 : );
1952 4 : }
1953 4 :
1954 39448 : buf = Some(blobs_buf.buf);
1955 4 : }
1956 4 : }
1957 4 :
1958 4 : Ok(())
1959 4 : }
1960 :
1961 : #[tokio::test]
1962 4 : async fn copy_delta_prefix_smoke() {
1963 4 : use bytes::Bytes;
1964 4 : use pageserver_api::record::NeonWalRecord;
1965 4 :
1966 4 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
1967 4 : .await
1968 4 : .unwrap();
1969 4 : let (tenant, ctx) = h.load().await;
1970 4 : let ctx = &ctx;
1971 4 : let timeline = tenant
1972 4 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
1973 4 : .await
1974 4 : .unwrap();
1975 4 : let ctx = &ctx.with_scope_timeline(&timeline);
1976 4 :
1977 4 : let initdb_layer = timeline
1978 4 : .layers
1979 4 : .read()
1980 4 : .await
1981 4 : .likely_resident_layers()
1982 4 : .next()
1983 4 : .cloned()
1984 4 : .unwrap();
1985 4 :
1986 4 : {
1987 4 : let mut writer = timeline.writer().await;
1988 4 :
1989 4 : let data = [
1990 4 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
1991 4 : (
1992 4 : 0x30,
1993 4 : 12,
1994 4 : Value::WalRecord(NeonWalRecord::Postgres {
1995 4 : will_init: false,
1996 4 : rec: Bytes::from_static(b"1"),
1997 4 : }),
1998 4 : ),
1999 4 : (
2000 4 : 0x40,
2001 4 : 12,
2002 4 : Value::WalRecord(NeonWalRecord::Postgres {
2003 4 : will_init: true,
2004 4 : rec: Bytes::from_static(b"2"),
2005 4 : }),
2006 4 : ),
2007 4 : // build an oversized value so we cannot extend and existing read over
2008 4 : // this
2009 4 : (
2010 4 : 0x50,
2011 4 : 12,
2012 4 : Value::WalRecord(NeonWalRecord::Postgres {
2013 4 : will_init: true,
2014 4 : rec: {
2015 4 : let mut buf =
2016 4 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
2017 4 : buf.iter_mut()
2018 4 : .enumerate()
2019 536576 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
2020 4 : Bytes::from(buf)
2021 4 : },
2022 4 : }),
2023 4 : ),
2024 4 : // because the oversized read cannot be extended further, we are sure to exercise the
2025 4 : // builder created on the last round with this:
2026 4 : (
2027 4 : 0x60,
2028 4 : 12,
2029 4 : Value::WalRecord(NeonWalRecord::Postgres {
2030 4 : will_init: true,
2031 4 : rec: Bytes::from_static(b"3"),
2032 4 : }),
2033 4 : ),
2034 4 : (
2035 4 : 0x60,
2036 4 : 9,
2037 4 : Value::Image(Bytes::from_static(b"something for a different key")),
2038 4 : ),
2039 4 : ];
2040 4 :
2041 4 : let mut last_lsn = None;
2042 4 :
2043 28 : for (lsn, key, value) in data {
2044 24 : let key = Key::from_i128(key);
2045 24 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
2046 24 : last_lsn = Some(lsn);
2047 4 : }
2048 4 :
2049 4 : writer.finish_write(Lsn(last_lsn.unwrap()));
2050 4 : }
2051 4 : timeline.freeze_and_flush().await.unwrap();
2052 4 :
2053 4 : let new_layer = timeline
2054 4 : .layers
2055 4 : .read()
2056 4 : .await
2057 4 : .likely_resident_layers()
2058 5 : .find(|&x| x != &initdb_layer)
2059 4 : .cloned()
2060 4 : .unwrap();
2061 4 :
2062 4 : // create a copy for the timeline, so we don't overwrite the file
2063 4 : let branch = tenant
2064 4 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
2065 4 : .await
2066 4 : .unwrap();
2067 4 :
2068 4 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
2069 4 :
2070 4 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
2071 4 : // a single key
2072 4 :
2073 24 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
2074 20 : let truncate_at = Lsn(truncate_at);
2075 4 :
2076 20 : let mut writer = DeltaLayerWriter::new(
2077 20 : tenant.conf,
2078 20 : branch.timeline_id,
2079 20 : tenant.tenant_shard_id,
2080 20 : Key::MIN,
2081 20 : Lsn(0x11)..truncate_at,
2082 20 : ctx,
2083 20 : )
2084 20 : .await
2085 20 : .unwrap();
2086 4 :
2087 20 : let new_layer = new_layer.download_and_keep_resident(ctx).await.unwrap();
2088 20 :
2089 20 : new_layer
2090 20 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
2091 20 : .await
2092 20 : .unwrap();
2093 4 :
2094 20 : let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
2095 20 : let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
2096 20 :
2097 20 : copied_layer.get_as_delta(ctx).await.unwrap();
2098 20 :
2099 20 : assert_keys_and_values_eq(
2100 20 : new_layer.get_as_delta(ctx).await.unwrap(),
2101 20 : copied_layer.get_as_delta(ctx).await.unwrap(),
2102 20 : truncate_at,
2103 20 : ctx,
2104 20 : )
2105 20 : .await;
2106 4 : }
2107 4 : }
2108 :
2109 20 : async fn assert_keys_and_values_eq(
2110 20 : source: &DeltaLayerInner,
2111 20 : truncated: &DeltaLayerInner,
2112 20 : truncated_at: Lsn,
2113 20 : ctx: &RequestContext,
2114 20 : ) {
2115 : use futures::future::ready;
2116 : use futures::stream::TryStreamExt;
2117 :
2118 20 : let start_key = [0u8; DELTA_KEY_SIZE];
2119 20 :
2120 20 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
2121 20 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2122 20 : source.index_start_blk,
2123 20 : source.index_root_blk,
2124 20 : &source_reader,
2125 20 : );
2126 20 : let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
2127 120 : let source_stream = source_stream.filter(|res| match res {
2128 120 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
2129 0 : _ => ready(true),
2130 120 : });
2131 20 : let mut source_stream = std::pin::pin!(source_stream);
2132 20 :
2133 20 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
2134 20 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2135 20 : truncated.index_start_blk,
2136 20 : truncated.index_root_blk,
2137 20 : &truncated_reader,
2138 20 : );
2139 20 : let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
2140 20 : let mut truncated_stream = std::pin::pin!(truncated_stream);
2141 20 :
2142 20 : let mut scratch_left = Vec::new();
2143 20 : let mut scratch_right = Vec::new();
2144 :
2145 : loop {
2146 84 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
2147 84 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2148 84 :
2149 84 : if src.is_none() {
2150 20 : assert!(truncated.is_none());
2151 20 : break;
2152 64 : }
2153 64 :
2154 64 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2155 64 :
2156 64 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2157 64 : assert_eq!(src.0, truncated.0);
2158 64 : assert_eq!(src.1, truncated.1);
2159 :
2160 : // if this is needed for something else, just drop this assert.
2161 64 : assert!(
2162 64 : src.2.pos() >= truncated.2.pos(),
2163 0 : "value position should not go backwards {} vs. {}",
2164 0 : src.2.pos(),
2165 0 : truncated.2.pos()
2166 : );
2167 :
2168 64 : scratch_left.clear();
2169 64 : let src_cursor = source_reader.block_cursor();
2170 64 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2171 64 : scratch_right.clear();
2172 64 : let trunc_cursor = truncated_reader.block_cursor();
2173 64 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2174 64 :
2175 64 : tokio::try_join!(left, right).unwrap();
2176 64 :
2177 64 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2178 : }
2179 20 : }
2180 :
2181 36448 : pub(crate) fn sort_delta(
2182 36448 : (k1, l1, _): &(Key, Lsn, Value),
2183 36448 : (k2, l2, _): &(Key, Lsn, Value),
2184 36448 : ) -> std::cmp::Ordering {
2185 36448 : (k1, l1).cmp(&(k2, l2))
2186 36448 : }
2187 :
2188 : #[cfg(feature = "testing")]
2189 188 : pub(crate) fn sort_delta_value(
2190 188 : (k1, l1, v1): &(Key, Lsn, Value),
2191 188 : (k2, l2, v2): &(Key, Lsn, Value),
2192 188 : ) -> std::cmp::Ordering {
2193 188 : let order_1 = if v1.is_image() { 0 } else { 1 };
2194 188 : let order_2 = if v2.is_image() { 0 } else { 1 };
2195 188 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
2196 188 : }
2197 :
2198 44 : pub(crate) async fn produce_delta_layer(
2199 44 : tenant: &Tenant,
2200 44 : tline: &Arc<Timeline>,
2201 44 : mut deltas: Vec<(Key, Lsn, Value)>,
2202 44 : ctx: &RequestContext,
2203 44 : ) -> anyhow::Result<ResidentLayer> {
2204 44 : deltas.sort_by(sort_delta);
2205 44 : let (key_start, _, _) = deltas.first().unwrap();
2206 44 : let (key_max, _, _) = deltas.last().unwrap();
2207 16480 : let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
2208 16480 : let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
2209 44 : let lsn_end = Lsn(lsn_max.0 + 1);
2210 44 : let mut writer = DeltaLayerWriter::new(
2211 44 : tenant.conf,
2212 44 : tline.timeline_id,
2213 44 : tenant.tenant_shard_id,
2214 44 : *key_start,
2215 44 : (*lsn_min)..lsn_end,
2216 44 : ctx,
2217 44 : )
2218 44 : .await?;
2219 44 : let key_end = key_max.next();
2220 :
2221 16524 : for (key, lsn, value) in deltas {
2222 16480 : writer.put_value(key, lsn, value, ctx).await?;
2223 : }
2224 :
2225 44 : let (desc, path) = writer.finish(key_end, ctx).await?;
2226 44 : let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
2227 :
2228 44 : Ok::<_, anyhow::Error>(delta_layer)
2229 44 : }
2230 :
2231 56 : async fn assert_delta_iter_equal(
2232 56 : delta_iter: &mut DeltaLayerIterator<'_>,
2233 56 : expect: &[(Key, Lsn, Value)],
2234 56 : ) {
2235 56 : let mut expect_iter = expect.iter();
2236 : loop {
2237 56056 : let o1 = delta_iter.next().await.unwrap();
2238 56056 : let o2 = expect_iter.next();
2239 56056 : assert_eq!(o1.is_some(), o2.is_some());
2240 56056 : if o1.is_none() && o2.is_none() {
2241 56 : break;
2242 56000 : }
2243 56000 : let (k1, l1, v1) = o1.unwrap();
2244 56000 : let (k2, l2, v2) = o2.unwrap();
2245 56000 : assert_eq!(&k1, k2);
2246 56000 : assert_eq!(l1, *l2);
2247 56000 : assert_eq!(&v1, v2);
2248 : }
2249 56 : }
2250 :
2251 : #[tokio::test]
2252 4 : async fn delta_layer_iterator() {
2253 4 : let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
2254 4 : let (tenant, ctx) = harness.load().await;
2255 4 :
2256 4 : let tline = tenant
2257 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2258 4 : .await
2259 4 : .unwrap();
2260 4 :
2261 4000 : fn get_key(id: u32) -> Key {
2262 4000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
2263 4000 : key.field6 = id;
2264 4000 : key
2265 4000 : }
2266 4 : const N: usize = 1000;
2267 4 : let test_deltas = (0..N)
2268 4000 : .map(|idx| {
2269 4000 : (
2270 4000 : get_key(idx as u32 / 10),
2271 4000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
2272 4000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
2273 4000 : )
2274 4000 : })
2275 4 : .collect_vec();
2276 4 : let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
2277 4 : .await
2278 4 : .unwrap();
2279 4 : let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
2280 12 : for max_read_size in [1, 1024] {
2281 64 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
2282 56 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
2283 56 : // Test if the batch size is correctly determined
2284 56 : let mut iter = delta_layer.iter(&ctx);
2285 56 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2286 56 : let mut num_items = 0;
2287 224 : for _ in 0..3 {
2288 168 : iter.next_batch().await.unwrap();
2289 168 : num_items += iter.key_values_batch.len();
2290 168 : if max_read_size == 1 {
2291 4 : // every key should be a batch b/c the value is larger than max_read_size
2292 84 : assert_eq!(iter.key_values_batch.len(), 1);
2293 4 : } else {
2294 84 : assert!(iter.key_values_batch.len() <= batch_size);
2295 4 : }
2296 168 : if num_items >= N {
2297 4 : break;
2298 168 : }
2299 168 : iter.key_values_batch.clear();
2300 4 : }
2301 4 : // Test if the result is correct
2302 56 : let mut iter = delta_layer.iter(&ctx);
2303 56 : iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
2304 56 : assert_delta_iter_equal(&mut iter, &test_deltas).await;
2305 4 : }
2306 4 : }
2307 4 : }
2308 : }
|