Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use std::collections::{HashMap, VecDeque};
31 : use std::fs::File;
32 : use std::ops::Range;
33 : use std::os::unix::fs::FileExt;
34 : use std::str::FromStr;
35 : use std::sync::Arc;
36 : use std::sync::atomic::AtomicU64;
37 :
38 : use anyhow::{Context, Result, bail, ensure};
39 : use camino::{Utf8Path, Utf8PathBuf};
40 : use futures::StreamExt;
41 : use itertools::Itertools;
42 : use pageserver_api::config::MaxVectoredReadBytes;
43 : use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
44 : use pageserver_api::keyspace::KeySpace;
45 : use pageserver_api::models::ImageCompressionAlgorithm;
46 : use pageserver_api::shard::TenantShardId;
47 : use serde::{Deserialize, Serialize};
48 : use tokio::sync::OnceCell;
49 : use tokio_epoll_uring::IoBuf;
50 : use tokio_util::sync::CancellationToken;
51 : use tracing::*;
52 : use utils::bin_ser::BeSer;
53 : use utils::bin_ser::SerializeError;
54 : use utils::id::{TenantId, TimelineId};
55 : use utils::lsn::Lsn;
56 : use wal_decoder::models::value::Value;
57 :
58 : use super::errors::PutError;
59 : use super::{
60 : AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
61 : ValuesReconstructState,
62 : };
63 : use crate::config::PageServerConf;
64 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
65 : use crate::page_cache::{self, FileId, PAGE_SZ};
66 : use crate::tenant::blob_io::BlobWriter;
67 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
68 : use crate::tenant::disk_btree::{
69 : DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
70 : };
71 : use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
72 : use crate::tenant::timeline::GetVectoredError;
73 : use crate::tenant::vectored_blob_io::{
74 : BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
75 : VectoredReadPlanner,
76 : };
77 : use crate::virtual_file::TempVirtualFile;
78 : use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
79 : use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
80 : use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
81 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
82 :
83 : ///
84 : /// Header stored in the beginning of the file
85 : ///
86 : /// After this comes the 'values' part, starting on block 1. After that,
87 : /// the 'index' starts at the block indicated by 'index_start_blk'
88 : ///
89 0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
90 : pub struct Summary {
91 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
92 : pub magic: u16,
93 : pub format_version: u16,
94 :
95 : pub tenant_id: TenantId,
96 : pub timeline_id: TimelineId,
97 : pub key_range: Range<Key>,
98 : pub lsn_range: Range<Lsn>,
99 :
100 : /// Block number where the 'index' part of the file begins.
101 : pub index_start_blk: u32,
102 : /// Block within the 'index', where the B-tree root page is stored
103 : pub index_root_blk: u32,
104 : }
105 :
106 : impl From<&DeltaLayer> for Summary {
107 0 : fn from(layer: &DeltaLayer) -> Self {
108 0 : Self::expected(
109 0 : layer.desc.tenant_shard_id.tenant_id,
110 0 : layer.desc.timeline_id,
111 0 : layer.desc.key_range.clone(),
112 0 : layer.desc.lsn_range.clone(),
113 : )
114 0 : }
115 : }
116 :
117 : impl Summary {
118 : /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
119 736 : pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
120 736 : let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
121 736 : Self::ser_into(self, &mut buf)?;
122 : // Pad zeroes to the buffer so the length is a multiple of the alignment.
123 736 : buf.extend_with(0, buf.capacity() - buf.len());
124 736 : Ok(buf.freeze())
125 736 : }
126 :
127 560 : pub(super) fn expected(
128 560 : tenant_id: TenantId,
129 560 : timeline_id: TimelineId,
130 560 : keys: Range<Key>,
131 560 : lsns: Range<Lsn>,
132 560 : ) -> Self {
133 560 : Self {
134 560 : magic: DELTA_FILE_MAGIC,
135 560 : format_version: STORAGE_FORMAT_VERSION,
136 560 :
137 560 : tenant_id,
138 560 : timeline_id,
139 560 : key_range: keys,
140 560 : lsn_range: lsns,
141 560 :
142 560 : index_start_blk: 0,
143 560 : index_root_blk: 0,
144 560 : }
145 560 : }
146 : }
147 :
148 : // Flag indicating that this version initialize the page
149 : const WILL_INIT: u64 = 1;
150 :
151 : /// Struct representing reference to BLOB in layers.
152 : ///
153 : /// Reference contains BLOB offset, and for WAL records it also contains
154 : /// `will_init` flag. The flag helps to determine the range of records
155 : /// that needs to be applied, without reading/deserializing records themselves.
156 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
157 : pub struct BlobRef(pub u64);
158 :
159 : impl BlobRef {
160 2421170 : pub fn will_init(&self) -> bool {
161 2421170 : (self.0 & WILL_INIT) != 0
162 2421170 : }
163 :
164 3882368 : pub fn pos(&self) -> u64 {
165 3882368 : self.0 >> 1
166 3882368 : }
167 :
168 3246043 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
169 3246043 : let mut blob_ref = pos << 1;
170 3246043 : if will_init {
171 3235085 : blob_ref |= WILL_INIT;
172 3235085 : }
173 3246043 : BlobRef(blob_ref)
174 3246043 : }
175 : }
176 :
177 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
178 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
179 :
180 : /// This is the key of the B-tree index stored in the delta layer. It consists
181 : /// of the serialized representation of a Key and LSN.
182 : impl DeltaKey {
183 1032099 : fn from_slice(buf: &[u8]) -> Self {
184 1032099 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
185 1032099 : bytes.copy_from_slice(buf);
186 1032099 : DeltaKey(bytes)
187 1032099 : }
188 :
189 3373437 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
190 3373437 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
191 3373437 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
192 3373437 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
193 3373437 : DeltaKey(bytes)
194 3373437 : }
195 :
196 1032099 : fn key(&self) -> Key {
197 1032099 : Key::from_slice(&self.0)
198 1032099 : }
199 :
200 1032099 : fn lsn(&self) -> Lsn {
201 1032099 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
202 1032099 : }
203 :
204 2850239 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
205 2850239 : let mut lsn_buf = [0u8; 8];
206 2850239 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
207 2850239 : Lsn(u64::from_be_bytes(lsn_buf))
208 2850239 : }
209 : }
210 :
211 : /// This is used only from `pagectl`. Within pageserver, all layers are
212 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
213 : pub struct DeltaLayer {
214 : path: Utf8PathBuf,
215 : pub desc: PersistentLayerDesc,
216 : inner: OnceCell<Arc<DeltaLayerInner>>,
217 : }
218 :
219 : impl std::fmt::Debug for DeltaLayer {
220 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 : use super::RangeDisplayDebug;
222 :
223 0 : f.debug_struct("DeltaLayer")
224 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
225 0 : .field("lsn_range", &self.desc.lsn_range)
226 0 : .field("file_size", &self.desc.file_size)
227 0 : .field("inner", &self.inner)
228 0 : .finish()
229 0 : }
230 : }
231 :
232 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
233 : /// file.
234 : pub struct DeltaLayerInner {
235 : // values copied from summary
236 : index_start_blk: u32,
237 : index_root_blk: u32,
238 :
239 : file: Arc<VirtualFile>,
240 : file_id: FileId,
241 :
242 : layer_key_range: Range<Key>,
243 : layer_lsn_range: Range<Lsn>,
244 :
245 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
246 : }
247 :
248 : impl DeltaLayerInner {
249 0 : pub(crate) fn layer_dbg_info(&self) -> String {
250 0 : format!(
251 0 : "delta {}..{} {}..{}",
252 0 : self.key_range().start,
253 0 : self.key_range().end,
254 0 : self.lsn_range().start,
255 0 : self.lsn_range().end
256 : )
257 0 : }
258 : }
259 :
260 : impl std::fmt::Debug for DeltaLayerInner {
261 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262 0 : f.debug_struct("DeltaLayerInner")
263 0 : .field("index_start_blk", &self.index_start_blk)
264 0 : .field("index_root_blk", &self.index_root_blk)
265 0 : .finish()
266 0 : }
267 : }
268 :
269 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
270 : impl std::fmt::Display for DeltaLayer {
271 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272 0 : write!(f, "{}", self.layer_desc().short_id())
273 0 : }
274 : }
275 :
276 : impl AsLayerDesc for DeltaLayer {
277 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
278 0 : &self.desc
279 0 : }
280 : }
281 :
282 : impl DeltaLayer {
283 0 : pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
284 0 : self.desc.dump();
285 :
286 0 : if !verbose {
287 0 : return Ok(());
288 0 : }
289 :
290 0 : let inner = self.load(ctx).await?;
291 :
292 0 : inner.dump(ctx).await
293 0 : }
294 :
295 749 : fn temp_path_for(
296 749 : conf: &PageServerConf,
297 749 : tenant_shard_id: &TenantShardId,
298 749 : timeline_id: &TimelineId,
299 749 : key_start: Key,
300 749 : lsn_range: &Range<Lsn>,
301 749 : ) -> Utf8PathBuf {
302 : // TempVirtualFile requires us to never reuse a filename while an old
303 : // instance of TempVirtualFile created with that filename is not done dropping yet.
304 : // So, we use a monotonic counter to disambiguate the filenames.
305 : static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
306 749 : let filename_disambiguator =
307 749 : NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
308 :
309 749 : conf.timeline_path(tenant_shard_id, timeline_id)
310 749 : .join(format!(
311 749 : "{}-XXX__{:016X}-{:016X}.{:x}.{}",
312 749 : key_start,
313 749 : u64::from(lsn_range.start),
314 749 : u64::from(lsn_range.end),
315 749 : filename_disambiguator,
316 749 : TEMP_FILE_SUFFIX,
317 749 : ))
318 749 : }
319 :
320 : ///
321 : /// Open the underlying file and read the metadata into memory, if it's
322 : /// not loaded already.
323 : ///
324 0 : async fn load(&self, ctx: &RequestContext) -> Result<&Arc<DeltaLayerInner>> {
325 : // Quick exit if already loaded
326 0 : self.inner
327 0 : .get_or_try_init(|| self.load_inner(ctx))
328 0 : .await
329 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
330 0 : }
331 :
332 0 : async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
333 0 : let path = self.path();
334 :
335 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
336 :
337 : // not production code
338 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
339 0 : let expected_layer_name = self.layer_desc().layer_name();
340 :
341 0 : if actual_layer_name != expected_layer_name {
342 0 : println!("warning: filename does not match what is expected from in-file summary");
343 0 : println!("actual: {:?}", actual_layer_name.to_string());
344 0 : println!("expected: {:?}", expected_layer_name.to_string());
345 0 : }
346 :
347 0 : Ok(Arc::new(loaded))
348 0 : }
349 :
350 : /// Create a DeltaLayer struct representing an existing file on disk.
351 : ///
352 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
353 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
354 0 : let mut summary_buf = vec![0; PAGE_SZ];
355 0 : file.read_exact_at(&mut summary_buf, 0)?;
356 0 : let summary = Summary::des_prefix(&summary_buf)?;
357 :
358 0 : let metadata = file
359 0 : .metadata()
360 0 : .context("get file metadata to determine size")?;
361 :
362 : // This function is never used for constructing layers in a running pageserver,
363 : // so it does not need an accurate TenantShardId.
364 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
365 :
366 0 : Ok(DeltaLayer {
367 0 : path: path.to_path_buf(),
368 0 : desc: PersistentLayerDesc::new_delta(
369 0 : tenant_shard_id,
370 0 : summary.timeline_id,
371 0 : summary.key_range,
372 0 : summary.lsn_range,
373 0 : metadata.len(),
374 0 : ),
375 0 : inner: OnceCell::new(),
376 0 : })
377 0 : }
378 :
379 : /// Path to the layer file in pageserver workdir.
380 0 : fn path(&self) -> Utf8PathBuf {
381 0 : self.path.clone()
382 0 : }
383 : }
384 :
385 : /// A builder object for constructing a new delta layer.
386 : ///
387 : /// Usage:
388 : ///
389 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
390 : ///
391 : /// 2. Write the contents by calling `put_value` for every page
392 : /// version to store in the layer.
393 : ///
394 : /// 3. Call `finish`.
395 : ///
396 : struct DeltaLayerWriterInner {
397 : pub path: Utf8PathBuf,
398 : timeline_id: TimelineId,
399 : tenant_shard_id: TenantShardId,
400 :
401 : key_start: Key,
402 : lsn_range: Range<Lsn>,
403 :
404 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
405 :
406 : blob_writer: BlobWriter<TempVirtualFile>,
407 :
408 : // Number of key-lsns in the layer.
409 : num_keys: usize,
410 : }
411 :
412 : impl DeltaLayerWriterInner {
413 : ///
414 : /// Start building a new delta layer.
415 : ///
416 : #[allow(clippy::too_many_arguments)]
417 749 : async fn new(
418 749 : conf: &'static PageServerConf,
419 749 : timeline_id: TimelineId,
420 749 : tenant_shard_id: TenantShardId,
421 749 : key_start: Key,
422 749 : lsn_range: Range<Lsn>,
423 749 : gate: &utils::sync::gate::Gate,
424 749 : cancel: CancellationToken,
425 749 : ctx: &RequestContext,
426 749 : ) -> anyhow::Result<Self> {
427 : // Create the file initially with a temporary filename. We don't know
428 : // the end key yet, so we cannot form the final filename yet. We will
429 : // rename it when we're done.
430 749 : let path =
431 749 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
432 749 : let file = TempVirtualFile::new(
433 749 : VirtualFile::open_with_options_v2(
434 749 : &path,
435 749 : virtual_file::OpenOptions::new()
436 749 : .create_new(true)
437 749 : .write(true),
438 749 : ctx,
439 749 : )
440 749 : .await?,
441 749 : gate.enter()?,
442 : );
443 :
444 : // Start at PAGE_SZ, make room for the header block
445 749 : let blob_writer = BlobWriter::new(
446 749 : file,
447 749 : PAGE_SZ as u64,
448 749 : gate,
449 749 : cancel,
450 749 : ctx,
451 749 : info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
452 0 : )?;
453 :
454 : // Initialize the b-tree index builder
455 749 : let block_buf = BlockBuf::new();
456 749 : let tree_builder = DiskBtreeBuilder::new(block_buf);
457 :
458 749 : Ok(Self {
459 749 : path,
460 749 : timeline_id,
461 749 : tenant_shard_id,
462 749 : key_start,
463 749 : lsn_range,
464 749 : tree: tree_builder,
465 749 : blob_writer,
466 749 : num_keys: 0,
467 749 : })
468 749 : }
469 :
470 : ///
471 : /// Append a key-value pair to the file.
472 : ///
473 : /// The values must be appended in key, lsn order.
474 : ///
475 1052659 : async fn put_value(
476 1052659 : &mut self,
477 1052659 : key: Key,
478 1052659 : lsn: Lsn,
479 1052659 : val: Value,
480 1052659 : ctx: &RequestContext,
481 1052659 : ) -> Result<(), PutError> {
482 1052659 : let (_, res) = self
483 1052659 : .put_value_bytes(
484 1052659 : key,
485 1052659 : lsn,
486 1052659 : Value::ser(&val)
487 1052659 : .map_err(anyhow::Error::new)
488 1052659 : .map_err(PutError::Other)?
489 1052659 : .slice_len(),
490 1052659 : val.will_init(),
491 1052659 : ctx,
492 : )
493 1052659 : .await;
494 1052659 : res
495 1052659 : }
496 :
497 3245991 : async fn put_value_bytes<Buf>(
498 3245991 : &mut self,
499 3245991 : key: Key,
500 3245991 : lsn: Lsn,
501 3245991 : val: FullSlice<Buf>,
502 3245991 : will_init: bool,
503 3245991 : ctx: &RequestContext,
504 3245991 : ) -> (FullSlice<Buf>, Result<(), PutError>)
505 3245991 : where
506 3245991 : Buf: IoBuf + Send,
507 3245991 : {
508 3245991 : assert!(
509 3245991 : self.lsn_range.start <= lsn,
510 0 : "lsn_start={}, lsn={}",
511 : self.lsn_range.start,
512 : lsn
513 : );
514 : // We don't want to use compression in delta layer creation
515 3245991 : let compression = ImageCompressionAlgorithm::Disabled;
516 3245991 : let (val, res) = self
517 3245991 : .blob_writer
518 3245991 : .write_blob_maybe_compressed(val, ctx, compression)
519 3245991 : .await;
520 3245991 : let res = res.map_err(PutError::WriteBlob);
521 3245991 : let off = match res {
522 3245991 : Ok((off, _)) => off,
523 0 : Err(e) => return (val, Err(e)),
524 : };
525 :
526 3245991 : let blob_ref = BlobRef::new(off, will_init);
527 :
528 3245991 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
529 3245991 : let res = self
530 3245991 : .tree
531 3245991 : .append(&delta_key.0, blob_ref.0)
532 3245991 : .map_err(anyhow::Error::new)
533 3245991 : .map_err(PutError::Other);
534 :
535 3245991 : self.num_keys += 1;
536 :
537 3245991 : (val, res)
538 3245991 : }
539 :
540 1017680 : fn size(&self) -> u64 {
541 1017680 : self.blob_writer.size() + self.tree.borrow_writer().size()
542 1017680 : }
543 :
544 : ///
545 : /// Finish writing the delta layer.
546 : ///
547 736 : async fn finish(
548 736 : self,
549 736 : key_end: Key,
550 736 : ctx: &RequestContext,
551 736 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
552 736 : let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
553 :
554 736 : let file = self
555 736 : .blob_writer
556 736 : .shutdown(
557 736 : BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
558 736 : ctx,
559 736 : )
560 736 : .await?;
561 :
562 : // Write out the index
563 736 : let (index_root_blk, block_buf) = self.tree.finish()?;
564 736 : let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
565 :
566 : // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
567 : // Should we just replace BlockBuf::blocks with one big buffer
568 7600 : for buf in block_buf.blocks {
569 6864 : let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
570 6864 : res?;
571 6864 : offset += PAGE_SZ as u64;
572 : }
573 736 : assert!(self.lsn_range.start < self.lsn_range.end);
574 : // Fill in the summary on blk 0
575 736 : let summary = Summary {
576 736 : magic: DELTA_FILE_MAGIC,
577 736 : format_version: STORAGE_FORMAT_VERSION,
578 736 : tenant_id: self.tenant_shard_id.tenant_id,
579 736 : timeline_id: self.timeline_id,
580 736 : key_range: self.key_start..key_end,
581 736 : lsn_range: self.lsn_range.clone(),
582 736 : index_start_blk,
583 736 : index_root_blk,
584 736 : };
585 :
586 : // Writes summary at the first block (offset 0).
587 736 : let buf = summary.ser_into_page()?;
588 736 : let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
589 736 : res?;
590 :
591 736 : let metadata = file
592 736 : .metadata()
593 736 : .await
594 736 : .context("get file metadata to determine size")?;
595 :
596 : // 5GB limit for objects without multipart upload (which we don't want to use)
597 : // Make it a little bit below to account for differing GB units
598 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
599 736 : ensure!(
600 736 : metadata.len() <= S3_UPLOAD_LIMIT,
601 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
602 0 : file.path(),
603 0 : metadata.len()
604 : );
605 :
606 : // Note: Because we opened the file in write-only mode, we cannot
607 : // reuse the same VirtualFile for reading later. That's why we don't
608 : // set inner.file here. The first read will have to re-open it.
609 :
610 736 : let desc = PersistentLayerDesc::new_delta(
611 736 : self.tenant_shard_id,
612 736 : self.timeline_id,
613 736 : self.key_start..key_end,
614 736 : self.lsn_range.clone(),
615 736 : metadata.len(),
616 : );
617 :
618 : // fsync the file
619 736 : file.sync_all()
620 736 : .await
621 736 : .maybe_fatal_err("delta_layer sync_all")?;
622 :
623 736 : trace!("created delta layer {}", self.path);
624 :
625 : // The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
626 : // keep the gate open also, so that it's safe for them to rename the file to its final destination.
627 736 : file.disarm_into_inner();
628 :
629 736 : Ok((desc, self.path))
630 736 : }
631 : }
632 :
633 : /// A builder object for constructing a new delta layer.
634 : ///
635 : /// Usage:
636 : ///
637 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
638 : ///
639 : /// 2. Write the contents by calling `put_value` for every page
640 : /// version to store in the layer.
641 : ///
642 : /// 3. Call `finish`.
643 : ///
644 : /// # Note
645 : ///
646 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
647 : /// possible for the writer to drop before `finish` is actually called. So this
648 : /// could lead to odd temporary files in the directory, exhausting file system.
649 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
650 : /// implementation that cleans up the temporary file in failure. It's not
651 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
652 : /// out some fields, making it impossible to implement `Drop`.
653 : ///
654 : #[must_use]
655 : pub struct DeltaLayerWriter {
656 : inner: Option<DeltaLayerWriterInner>,
657 : }
658 :
659 : impl DeltaLayerWriter {
660 : ///
661 : /// Start building a new delta layer.
662 : ///
663 : #[allow(clippy::too_many_arguments)]
664 749 : pub async fn new(
665 749 : conf: &'static PageServerConf,
666 749 : timeline_id: TimelineId,
667 749 : tenant_shard_id: TenantShardId,
668 749 : key_start: Key,
669 749 : lsn_range: Range<Lsn>,
670 749 : gate: &utils::sync::gate::Gate,
671 749 : cancel: CancellationToken,
672 749 : ctx: &RequestContext,
673 749 : ) -> anyhow::Result<Self> {
674 : Ok(Self {
675 : inner: Some(
676 749 : DeltaLayerWriterInner::new(
677 749 : conf,
678 749 : timeline_id,
679 749 : tenant_shard_id,
680 749 : key_start,
681 749 : lsn_range,
682 749 : gate,
683 749 : cancel,
684 749 : ctx,
685 749 : )
686 749 : .await?,
687 : ),
688 : })
689 749 : }
690 :
691 0 : pub fn is_empty(&self) -> bool {
692 0 : self.inner.as_ref().unwrap().num_keys == 0
693 0 : }
694 :
695 : ///
696 : /// Append a key-value pair to the file.
697 : ///
698 : /// The values must be appended in key, lsn order.
699 : ///
700 1052659 : pub async fn put_value(
701 1052659 : &mut self,
702 1052659 : key: Key,
703 1052659 : lsn: Lsn,
704 1052659 : val: Value,
705 1052659 : ctx: &RequestContext,
706 1052659 : ) -> Result<(), PutError> {
707 1052659 : self.inner
708 1052659 : .as_mut()
709 1052659 : .unwrap()
710 1052659 : .put_value(key, lsn, val, ctx)
711 1052659 : .await
712 1052659 : }
713 :
714 2193332 : pub async fn put_value_bytes<Buf>(
715 2193332 : &mut self,
716 2193332 : key: Key,
717 2193332 : lsn: Lsn,
718 2193332 : val: FullSlice<Buf>,
719 2193332 : will_init: bool,
720 2193332 : ctx: &RequestContext,
721 2193332 : ) -> (FullSlice<Buf>, Result<(), PutError>)
722 2193332 : where
723 2193332 : Buf: IoBuf + Send,
724 2193332 : {
725 2193332 : self.inner
726 2193332 : .as_mut()
727 2193332 : .unwrap()
728 2193332 : .put_value_bytes(key, lsn, val, will_init, ctx)
729 2193332 : .await
730 2193332 : }
731 :
732 1017680 : pub fn size(&self) -> u64 {
733 1017680 : self.inner.as_ref().unwrap().size()
734 1017680 : }
735 :
736 : ///
737 : /// Finish writing the delta layer.
738 : ///
739 736 : pub(crate) async fn finish(
740 736 : mut self,
741 736 : key_end: Key,
742 736 : ctx: &RequestContext,
743 736 : ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
744 736 : self.inner.take().unwrap().finish(key_end, ctx).await
745 736 : }
746 :
747 6127 : pub(crate) fn num_keys(&self) -> usize {
748 6127 : self.inner.as_ref().unwrap().num_keys
749 6127 : }
750 :
751 7570 : pub(crate) fn estimated_size(&self) -> u64 {
752 7570 : let inner = self.inner.as_ref().unwrap();
753 7570 : inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
754 7570 : }
755 : }
756 :
757 : #[derive(thiserror::Error, Debug)]
758 : pub enum RewriteSummaryError {
759 : #[error("magic mismatch")]
760 : MagicMismatch,
761 : #[error(transparent)]
762 : Other(#[from] anyhow::Error),
763 : }
764 :
765 : impl From<std::io::Error> for RewriteSummaryError {
766 0 : fn from(e: std::io::Error) -> Self {
767 0 : Self::Other(anyhow::anyhow!(e))
768 0 : }
769 : }
770 :
771 : impl DeltaLayer {
772 0 : pub async fn rewrite_summary<F>(
773 0 : path: &Utf8Path,
774 0 : rewrite: F,
775 0 : ctx: &RequestContext,
776 0 : ) -> Result<(), RewriteSummaryError>
777 0 : where
778 0 : F: Fn(Summary) -> Summary,
779 0 : {
780 0 : let file = VirtualFile::open_with_options_v2(
781 0 : path,
782 0 : virtual_file::OpenOptions::new().read(true).write(true),
783 0 : ctx,
784 0 : )
785 0 : .await
786 0 : .with_context(|| format!("Failed to open file '{path}'"))?;
787 0 : let file_id = page_cache::next_file_id();
788 0 : let block_reader = FileBlockReader::new(&file, file_id);
789 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
790 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
791 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
792 0 : return Err(RewriteSummaryError::MagicMismatch);
793 0 : }
794 :
795 0 : let new_summary = rewrite(actual_summary);
796 :
797 0 : let buf = new_summary.ser_into_page().context("serialize")?;
798 0 : let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
799 0 : res?;
800 0 : Ok(())
801 0 : }
802 : }
803 :
804 : impl DeltaLayerInner {
805 520 : pub(crate) fn key_range(&self) -> &Range<Key> {
806 520 : &self.layer_key_range
807 520 : }
808 :
809 520 : pub(crate) fn lsn_range(&self) -> &Range<Lsn> {
810 520 : &self.layer_lsn_range
811 520 : }
812 :
813 560 : pub(super) async fn load(
814 560 : path: &Utf8Path,
815 560 : summary: Option<Summary>,
816 560 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
817 560 : ctx: &RequestContext,
818 560 : ) -> anyhow::Result<Self> {
819 560 : let file = Arc::new(
820 560 : VirtualFile::open_v2(path, ctx)
821 560 : .await
822 560 : .context("open layer file")?,
823 : );
824 :
825 560 : let file_id = page_cache::next_file_id();
826 :
827 560 : let block_reader = FileBlockReader::new(&file, file_id);
828 :
829 560 : let summary_blk = block_reader
830 560 : .read_blk(0, ctx)
831 560 : .await
832 560 : .context("read first block")?;
833 :
834 : // TODO: this should be an assertion instead; see ImageLayerInner::load
835 560 : let actual_summary =
836 560 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
837 :
838 560 : if let Some(mut expected_summary) = summary {
839 : // production code path
840 560 : expected_summary.index_start_blk = actual_summary.index_start_blk;
841 560 : expected_summary.index_root_blk = actual_summary.index_root_blk;
842 : // mask out the timeline_id, but still require the layers to be from the same tenant
843 560 : expected_summary.timeline_id = actual_summary.timeline_id;
844 :
845 560 : if actual_summary != expected_summary {
846 0 : bail!(
847 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
848 : actual_summary,
849 : expected_summary
850 : );
851 560 : }
852 0 : }
853 :
854 560 : Ok(DeltaLayerInner {
855 560 : file,
856 560 : file_id,
857 560 : index_start_blk: actual_summary.index_start_blk,
858 560 : index_root_blk: actual_summary.index_root_blk,
859 560 : max_vectored_read_bytes,
860 560 : layer_key_range: actual_summary.key_range,
861 560 : layer_lsn_range: actual_summary.lsn_range,
862 560 : })
863 560 : }
864 :
865 : // Look up the keys in the provided keyspace and update
866 : // the reconstruct state with whatever is found.
867 : //
868 : // Currently, the index is visited for each range, but this
869 : // can be further optimised to visit the index only once.
870 124723 : pub(super) async fn get_values_reconstruct_data(
871 124723 : &self,
872 124723 : this: ResidentLayer,
873 124723 : keyspace: KeySpace,
874 124723 : lsn_range: Range<Lsn>,
875 124723 : reconstruct_state: &mut ValuesReconstructState,
876 124723 : ctx: &RequestContext,
877 124723 : ) -> Result<(), GetVectoredError> {
878 124723 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
879 124723 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
880 124723 : self.index_start_blk,
881 124723 : self.index_root_blk,
882 124723 : block_reader,
883 : );
884 :
885 124723 : let planner = VectoredReadPlanner::new(
886 124723 : self.max_vectored_read_bytes
887 124723 : .expect("Layer is loaded with max vectored bytes config")
888 124723 : .0
889 124723 : .into(),
890 : );
891 :
892 124723 : let data_end_offset = self.index_start_offset();
893 :
894 124723 : let reads = Self::plan_reads(
895 124723 : &keyspace,
896 124723 : lsn_range.clone(),
897 124723 : data_end_offset,
898 124723 : index_reader,
899 124723 : planner,
900 124723 : ctx,
901 124723 : )
902 124723 : .await
903 124723 : .map_err(GetVectoredError::Other)?;
904 :
905 124723 : self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
906 124723 : .await;
907 :
908 124723 : Ok(())
909 124723 : }
910 :
911 124824 : async fn plan_reads<Reader>(
912 124824 : keyspace: &KeySpace,
913 124824 : lsn_range: Range<Lsn>,
914 124824 : data_end_offset: u64,
915 124824 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
916 124824 : mut planner: VectoredReadPlanner,
917 124824 : ctx: &RequestContext,
918 124824 : ) -> anyhow::Result<Vec<VectoredRead>>
919 124824 : where
920 124824 : Reader: BlockReader + Clone,
921 124824 : {
922 124824 : let ctx = RequestContextBuilder::from(ctx)
923 124824 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
924 124824 : .attached_child();
925 :
926 127429 : for range in keyspace.ranges.iter() {
927 127429 : let mut range_end_handled = false;
928 :
929 127429 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
930 127429 : let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
931 127429 : let mut index_stream = std::pin::pin!(index_stream);
932 :
933 1806369 : while let Some(index_entry) = index_stream.next().await {
934 1800698 : let (raw_key, value) = index_entry?;
935 1800698 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
936 1800698 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
937 1800698 : let blob_ref = BlobRef(value);
938 :
939 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
940 1800698 : assert!(key >= range.start);
941 :
942 1800698 : let outside_lsn_range = !lsn_range.contains(&lsn);
943 :
944 1800698 : let flag = {
945 1800698 : if outside_lsn_range {
946 429069 : BlobFlag::Ignore
947 1371629 : } else if blob_ref.will_init() {
948 292986 : BlobFlag::ReplaceAll
949 : } else {
950 : // Usual path: add blob to the read
951 1078643 : BlobFlag::None
952 : }
953 : };
954 :
955 1800698 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
956 121758 : planner.handle_range_end(blob_ref.pos());
957 121758 : range_end_handled = true;
958 121758 : break;
959 1678940 : } else {
960 1678940 : planner.handle(key, lsn, blob_ref.pos(), flag);
961 1678940 : }
962 : }
963 :
964 127429 : if !range_end_handled {
965 5671 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
966 5671 : planner.handle_range_end(data_end_offset);
967 121758 : }
968 : }
969 :
970 124824 : Ok(planner.finish())
971 124824 : }
972 :
973 124823 : fn get_min_read_buffer_size(
974 124823 : planned_reads: &[VectoredRead],
975 124823 : read_size_soft_max: usize,
976 124823 : ) -> usize {
977 124823 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
978 45880 : return read_size_soft_max;
979 : };
980 :
981 78943 : let largest_read_size = largest_read.size();
982 78943 : if largest_read_size > read_size_soft_max {
983 : // If the read is oversized, it should only contain one key.
984 100 : let offenders = largest_read
985 100 : .blobs_at
986 100 : .as_slice()
987 100 : .iter()
988 100 : .filter_map(|(_, blob_meta)| {
989 100 : if blob_meta.key.is_rel_dir_key()
990 100 : || blob_meta.key == DBDIR_KEY
991 100 : || blob_meta.key.is_aux_file_key()
992 : {
993 : // The size of values for these keys is unbounded and can
994 : // grow very large in pathological cases.
995 0 : None
996 : } else {
997 100 : Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
998 : }
999 100 : })
1000 100 : .join(", ");
1001 :
1002 100 : if !offenders.is_empty() {
1003 100 : tracing::warn!(
1004 0 : "Oversized vectored read ({} > {}) for keys {}",
1005 : largest_read_size,
1006 : read_size_soft_max,
1007 : offenders
1008 : );
1009 0 : }
1010 78843 : }
1011 :
1012 78943 : largest_read_size
1013 124823 : }
1014 :
1015 124723 : async fn do_reads_and_update_state(
1016 124723 : &self,
1017 124723 : this: ResidentLayer,
1018 124723 : reads: Vec<VectoredRead>,
1019 124723 : reconstruct_state: &mut ValuesReconstructState,
1020 124723 : ctx: &RequestContext,
1021 124723 : ) {
1022 124723 : let max_vectored_read_bytes = self
1023 124723 : .max_vectored_read_bytes
1024 124723 : .expect("Layer is loaded with max vectored bytes config")
1025 124723 : .0
1026 124723 : .into();
1027 124723 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
1028 :
1029 : // Note that reads are processed in reverse order (from highest key+lsn).
1030 : // This is the order that `ReconstructState` requires such that it can
1031 : // track when a key is done.
1032 124723 : for read in reads.into_iter().rev() {
1033 88817 : let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
1034 797352 : for (_, blob_meta) in read.blobs_at.as_slice().iter().rev() {
1035 797352 : let io = reconstruct_state.update_key(
1036 797352 : &blob_meta.key,
1037 797352 : blob_meta.lsn,
1038 797352 : blob_meta.will_init,
1039 797352 : );
1040 797352 : ios.insert((blob_meta.key, blob_meta.lsn), io);
1041 797352 : }
1042 :
1043 88817 : let read_extend_residency = this.clone();
1044 88817 : let read_from = self.file.clone();
1045 88817 : let read_ctx = ctx.attached_child();
1046 88817 : reconstruct_state
1047 88817 : .spawn_io(async move {
1048 88817 : let vectored_blob_reader = VectoredBlobReader::new(&read_from);
1049 88817 : let buf = IoBufferMut::with_capacity(buf_size);
1050 :
1051 88817 : let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
1052 88817 : match res {
1053 88817 : Ok(blobs_buf) => {
1054 88817 : let view = BufView::new_slice(&blobs_buf.buf);
1055 797352 : for meta in blobs_buf.blobs.iter().rev() {
1056 797352 : let io = ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
1057 :
1058 797352 : let blob_read = meta.read(&view).await;
1059 797352 : let blob_read = match blob_read {
1060 797352 : Ok(buf) => buf,
1061 0 : Err(e) => {
1062 0 : io.complete(Err(e));
1063 0 : continue;
1064 : }
1065 : };
1066 :
1067 797352 : io.complete(Ok(OnDiskValue::WalRecordOrImage(
1068 797352 : blob_read.into_bytes(),
1069 797352 : )));
1070 : }
1071 :
1072 88817 : assert!(ios.is_empty());
1073 : }
1074 0 : Err(err) => {
1075 0 : for (_, sender) in ios {
1076 0 : sender.complete(Err(std::io::Error::new(
1077 0 : err.kind(),
1078 0 : "vec read failed",
1079 0 : )));
1080 0 : }
1081 : }
1082 : }
1083 :
1084 : // keep layer resident until this IO is done; this spawned IO future generally outlives the
1085 : // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
1086 88817 : drop(read_extend_residency);
1087 88817 : })
1088 88817 : .await;
1089 : }
1090 124723 : }
1091 :
1092 203 : pub(crate) async fn index_entries<'a>(
1093 203 : &'a self,
1094 203 : ctx: &RequestContext,
1095 203 : ) -> Result<Vec<DeltaEntry<'a>>> {
1096 203 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1097 203 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1098 203 : self.index_start_blk,
1099 203 : self.index_root_blk,
1100 203 : block_reader,
1101 : );
1102 :
1103 203 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1104 :
1105 203 : tree_reader
1106 203 : .visit(
1107 203 : &[0u8; DELTA_KEY_SIZE],
1108 203 : VisitDirection::Forwards,
1109 1032023 : |key, value| {
1110 1032023 : let delta_key = DeltaKey::from_slice(key);
1111 1032023 : let val_ref = ValueRef {
1112 1032023 : blob_ref: BlobRef(value),
1113 1032023 : layer: self,
1114 1032023 : };
1115 1032023 : let pos = BlobRef(value).pos();
1116 1032023 : if let Some(last) = all_keys.last_mut() {
1117 1031820 : // subtract offset of the current and last entries to get the size
1118 1031820 : // of the value associated with this (key, lsn) tuple
1119 1031820 : let first_pos = last.size;
1120 1031820 : last.size = pos - first_pos;
1121 1031820 : }
1122 1032023 : let entry = DeltaEntry {
1123 1032023 : key: delta_key.key(),
1124 1032023 : lsn: delta_key.lsn(),
1125 1032023 : size: pos,
1126 1032023 : val: val_ref,
1127 1032023 : };
1128 1032023 : all_keys.push(entry);
1129 1032023 : true
1130 1032023 : },
1131 203 : &RequestContextBuilder::from(ctx)
1132 203 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1133 203 : .attached_child(),
1134 : )
1135 203 : .await?;
1136 203 : if let Some(last) = all_keys.last_mut() {
1137 203 : // Last key occupies all space till end of value storage,
1138 203 : // which corresponds to beginning of the index
1139 203 : last.size = self.index_start_offset() - last.size;
1140 203 : }
1141 203 : Ok(all_keys)
1142 203 : }
1143 :
1144 : /// Using the given writer, write out a version which has the earlier Lsns than `until`.
1145 : ///
1146 : /// Return the amount of key value records pushed to the writer.
1147 5 : pub(super) async fn copy_prefix(
1148 5 : &self,
1149 5 : writer: &mut DeltaLayerWriter,
1150 5 : until: Lsn,
1151 5 : ctx: &RequestContext,
1152 5 : ) -> anyhow::Result<usize> {
1153 : use futures::stream::TryStreamExt;
1154 :
1155 : use crate::tenant::vectored_blob_io::{
1156 : BlobMeta, ChunkedVectoredReadBuilder, VectoredReadExtended,
1157 : };
1158 :
1159 : #[derive(Debug)]
1160 : enum Item {
1161 : Actual(Key, Lsn, BlobRef),
1162 : Sentinel,
1163 : }
1164 :
1165 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1166 35 : fn from(value: Item) -> Self {
1167 35 : match value {
1168 30 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1169 5 : Item::Sentinel => None,
1170 : }
1171 35 : }
1172 : }
1173 :
1174 : impl Item {
1175 35 : fn offset(&self) -> Option<BlobRef> {
1176 35 : match self {
1177 30 : Item::Actual(_, _, blob) => Some(*blob),
1178 5 : Item::Sentinel => None,
1179 : }
1180 35 : }
1181 :
1182 35 : fn is_last(&self) -> bool {
1183 35 : matches!(self, Item::Sentinel)
1184 35 : }
1185 : }
1186 :
1187 5 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1188 5 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1189 5 : self.index_start_blk,
1190 5 : self.index_root_blk,
1191 5 : block_reader,
1192 : );
1193 :
1194 5 : let stream = self.stream_index_forwards(tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1195 30 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1196 : // put in a sentinel value for getting the end offset for last item, and not having to
1197 : // repeat the whole read part
1198 5 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1199 5 : Item::Sentinel,
1200 5 : ))));
1201 5 : let mut stream = std::pin::pin!(stream);
1202 :
1203 5 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1204 :
1205 5 : let mut read_builder: Option<ChunkedVectoredReadBuilder> = None;
1206 :
1207 5 : let max_read_size = self
1208 5 : .max_vectored_read_bytes
1209 5 : .map(|x| x.0.get())
1210 5 : .unwrap_or(8192);
1211 :
1212 5 : let mut buffer = Some(IoBufferMut::with_capacity(max_read_size));
1213 :
1214 : // FIXME: buffering of DeltaLayerWriter
1215 5 : let mut per_blob_copy = Vec::new();
1216 :
1217 5 : let mut records = 0;
1218 :
1219 40 : while let Some(item) = stream.try_next().await? {
1220 35 : tracing::debug!(?item, "popped");
1221 35 : let offset = item
1222 35 : .offset()
1223 35 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1224 :
1225 35 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1226 30 : let end_offset = offset;
1227 :
1228 30 : Some((
1229 30 : BlobMeta {
1230 30 : key,
1231 30 : lsn,
1232 30 : will_init: false,
1233 30 : },
1234 30 : start_offset..end_offset,
1235 30 : ))
1236 : } else {
1237 5 : None
1238 : };
1239 :
1240 35 : let is_last = item.is_last();
1241 :
1242 35 : prev = Option::from(item);
1243 :
1244 35 : let actionable = actionable.filter(|x| x.0.lsn < until);
1245 :
1246 35 : let builder = if let Some((meta, offsets)) = actionable {
1247 : // extend or create a new builder
1248 16 : if read_builder
1249 16 : .as_mut()
1250 16 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1251 16 : .unwrap_or(VectoredReadExtended::No)
1252 16 : == VectoredReadExtended::Yes
1253 : {
1254 8 : None
1255 : } else {
1256 8 : read_builder.replace(ChunkedVectoredReadBuilder::new(
1257 8 : offsets.start.pos(),
1258 8 : offsets.end.pos(),
1259 8 : meta,
1260 8 : max_read_size,
1261 : ))
1262 : }
1263 : } else {
1264 : // nothing to do, except perhaps flush any existing for the last element
1265 19 : None
1266 : };
1267 :
1268 : // flush the possible older builder and also the new one if the item was the last one
1269 35 : let builders = builder.into_iter();
1270 35 : let builders = if is_last {
1271 5 : builders.chain(read_builder.take())
1272 : } else {
1273 30 : builders.chain(None)
1274 : };
1275 :
1276 43 : for builder in builders {
1277 8 : let read = builder.build();
1278 :
1279 8 : let reader = VectoredBlobReader::new(&self.file);
1280 :
1281 8 : let mut buf = buffer.take().unwrap();
1282 :
1283 8 : buf.clear();
1284 8 : buf.reserve(read.size());
1285 8 : let res = reader.read_blobs(&read, buf, ctx).await?;
1286 :
1287 8 : let view = BufView::new_slice(&res.buf);
1288 :
1289 24 : for blob in res.blobs {
1290 16 : let key = blob.meta.key;
1291 16 : let lsn = blob.meta.lsn;
1292 :
1293 16 : let data = blob.read(&view).await?;
1294 :
1295 : #[cfg(debug_assertions)]
1296 16 : Value::des(&data)
1297 16 : .with_context(|| {
1298 0 : format!(
1299 0 : "blob failed to deserialize for {}: {:?}",
1300 : blob,
1301 0 : utils::Hex(&data)
1302 : )
1303 0 : })
1304 16 : .unwrap();
1305 :
1306 : // is it an image or will_init walrecord?
1307 : // FIXME: this could be handled by threading the BlobRef to the
1308 : // VectoredReadBuilder
1309 16 : let will_init = wal_decoder::models::value::ValueBytes::will_init(&data)
1310 16 : .inspect_err(|_e| {
1311 : #[cfg(feature = "testing")]
1312 0 : tracing::error!(data=?utils::Hex(&data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
1313 0 : })
1314 16 : .unwrap_or(false);
1315 :
1316 16 : per_blob_copy.clear();
1317 16 : per_blob_copy.extend_from_slice(&data);
1318 :
1319 16 : let (tmp, res) = writer
1320 16 : .put_value_bytes(
1321 16 : key,
1322 16 : lsn,
1323 16 : std::mem::take(&mut per_blob_copy).slice_len(),
1324 16 : will_init,
1325 16 : ctx,
1326 : )
1327 16 : .await;
1328 16 : per_blob_copy = tmp.into_raw_slice().into_inner();
1329 :
1330 16 : res?;
1331 :
1332 16 : records += 1;
1333 : }
1334 :
1335 8 : buffer = Some(res.buf);
1336 : }
1337 : }
1338 :
1339 5 : assert!(
1340 5 : read_builder.is_none(),
1341 0 : "with the sentinel above loop should had handled all"
1342 : );
1343 :
1344 5 : Ok(records)
1345 5 : }
1346 :
1347 2 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1348 2 : println!(
1349 2 : "index_start_blk: {}, root {}",
1350 : self.index_start_blk, self.index_root_blk
1351 : );
1352 :
1353 2 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1354 2 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1355 2 : self.index_start_blk,
1356 2 : self.index_root_blk,
1357 2 : block_reader,
1358 : );
1359 :
1360 2 : tree_reader.dump(ctx).await?;
1361 :
1362 2 : let keys = self.index_entries(ctx).await?;
1363 :
1364 4 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1365 4 : let buf = val.load_raw(ctx).await?;
1366 4 : let val = Value::des(&buf)?;
1367 4 : let desc = match val {
1368 4 : Value::Image(img) => {
1369 4 : format!(" img {} bytes", img.len())
1370 : }
1371 0 : Value::WalRecord(rec) => {
1372 0 : let wal_desc = wal_decoder::models::record::describe_wal_record(&rec)?;
1373 0 : format!(
1374 0 : " rec {} bytes will_init: {} {}",
1375 0 : buf.len(),
1376 0 : rec.will_init(),
1377 : wal_desc
1378 : )
1379 : }
1380 : };
1381 4 : Ok(desc)
1382 4 : }
1383 :
1384 6 : for entry in keys {
1385 4 : let DeltaEntry { key, lsn, val, .. } = entry;
1386 4 : let desc = match dump_blob(&val, ctx).await {
1387 4 : Ok(desc) => desc,
1388 0 : Err(err) => {
1389 0 : format!("ERROR: {err}")
1390 : }
1391 : };
1392 4 : println!(" key {key} at {lsn}: {desc}");
1393 :
1394 : // Print more details about CHECKPOINT records. Would be nice to print details
1395 : // of many other record types too, but these are particularly interesting, as
1396 : // have a lot of special processing for them in walingest.rs.
1397 : use pageserver_api::key::CHECKPOINT_KEY;
1398 : use postgres_ffi::CheckPoint;
1399 4 : if key == CHECKPOINT_KEY {
1400 0 : let val = val.load(ctx).await?;
1401 0 : match val {
1402 0 : Value::Image(img) => {
1403 0 : let checkpoint = CheckPoint::decode(&img)?;
1404 0 : println!(" CHECKPOINT: {checkpoint:?}");
1405 : }
1406 0 : Value::WalRecord(_rec) => {
1407 0 : println!(" unexpected walrecord value for checkpoint key");
1408 0 : }
1409 : }
1410 4 : }
1411 : }
1412 :
1413 2 : Ok(())
1414 2 : }
1415 :
1416 15 : fn stream_index_forwards<'a, R>(
1417 15 : &'a self,
1418 15 : reader: DiskBtreeReader<R, DELTA_KEY_SIZE>,
1419 15 : start: &'a [u8; DELTA_KEY_SIZE],
1420 15 : ctx: &'a RequestContext,
1421 15 : ) -> impl futures::stream::Stream<
1422 15 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1423 15 : > + 'a
1424 15 : where
1425 15 : R: BlockReader + 'a,
1426 : {
1427 : use futures::stream::TryStreamExt;
1428 15 : let stream = reader.into_stream(start, ctx);
1429 76 : stream.map_ok(|(key, value)| {
1430 76 : let key = DeltaKey::from_slice(&key);
1431 76 : let (key, lsn) = (key.key(), key.lsn());
1432 76 : let offset = BlobRef(value);
1433 :
1434 76 : (key, lsn, offset)
1435 76 : })
1436 15 : }
1437 :
1438 : /// The file offset to the first block of index.
1439 : ///
1440 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1441 125235 : fn index_start_offset(&self) -> u64 {
1442 125235 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1443 125235 : let bref = BlobRef(offset);
1444 125235 : tracing::debug!(
1445 : index_start_blk = self.index_start_blk,
1446 : offset,
1447 0 : pos = bref.pos(),
1448 0 : "index_start_offset"
1449 : );
1450 125235 : offset
1451 125235 : }
1452 :
1453 288 : pub fn iter_with_options<'a>(
1454 288 : &'a self,
1455 288 : ctx: &'a RequestContext,
1456 288 : max_read_size: u64,
1457 288 : max_batch_size: usize,
1458 288 : ) -> DeltaLayerIterator<'a> {
1459 288 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1460 288 : let tree_reader =
1461 288 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
1462 288 : DeltaLayerIterator {
1463 288 : delta_layer: self,
1464 288 : ctx,
1465 288 : index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
1466 288 : key_values_batch: std::collections::VecDeque::new(),
1467 288 : is_end: false,
1468 288 : planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
1469 288 : }
1470 288 : }
1471 :
1472 : /// NB: not super efficient, but not terrible either. Should prob be an iterator.
1473 : //
1474 : // We're reusing the index traversal logical in plan_reads; would be nice to
1475 : // factor that out.
1476 0 : pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
1477 0 : self.index_entries(ctx)
1478 0 : .await
1479 0 : .map(|entries| entries.into_iter().map(|entry| entry.key).collect())
1480 0 : }
1481 : }
1482 :
1483 : /// A set of data associated with a delta layer key and its value
1484 : pub struct DeltaEntry<'a> {
1485 : pub key: Key,
1486 : pub lsn: Lsn,
1487 : /// Size of the stored value
1488 : pub size: u64,
1489 : /// Reference to the on-disk value
1490 : pub val: ValueRef<'a>,
1491 : }
1492 :
1493 : /// Reference to an on-disk value
1494 : pub struct ValueRef<'a> {
1495 : blob_ref: BlobRef,
1496 : layer: &'a DeltaLayerInner,
1497 : }
1498 :
1499 : impl ValueRef<'_> {
1500 : /// Loads the value from disk
1501 0 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1502 0 : let buf = self.load_raw(ctx).await?;
1503 0 : let val = Value::des(&buf)?;
1504 0 : Ok(val)
1505 0 : }
1506 :
1507 4 : async fn load_raw(&self, ctx: &RequestContext) -> Result<Vec<u8>> {
1508 4 : let reader = BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(Adapter(
1509 4 : self.layer,
1510 4 : )));
1511 4 : let buf = reader.read_blob(self.blob_ref.pos(), ctx).await?;
1512 4 : Ok(buf)
1513 4 : }
1514 : }
1515 :
1516 : pub(crate) struct Adapter<T>(T);
1517 :
1518 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1519 4 : pub(crate) async fn read_blk(
1520 4 : &self,
1521 4 : blknum: u32,
1522 4 : ctx: &RequestContext,
1523 4 : ) -> Result<BlockLease, std::io::Error> {
1524 4 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1525 4 : block_reader.read_blk(blknum, ctx).await
1526 4 : }
1527 : }
1528 :
1529 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1530 8 : fn as_ref(&self) -> &DeltaLayerInner {
1531 8 : self
1532 8 : }
1533 : }
1534 :
1535 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1536 0 : fn key(&self) -> Key {
1537 0 : self.key
1538 0 : }
1539 0 : fn lsn(&self) -> Lsn {
1540 0 : self.lsn
1541 0 : }
1542 0 : fn size(&self) -> u64 {
1543 0 : self.size
1544 0 : }
1545 : }
1546 :
1547 : pub struct DeltaLayerIterator<'a> {
1548 : delta_layer: &'a DeltaLayerInner,
1549 : ctx: &'a RequestContext,
1550 : planner: StreamingVectoredReadPlanner,
1551 : index_iter: DiskBtreeIterator<'a>,
1552 : key_values_batch: VecDeque<(Key, Lsn, Value)>,
1553 : is_end: bool,
1554 : }
1555 :
1556 : impl DeltaLayerIterator<'_> {
1557 0 : pub(crate) fn layer_dbg_info(&self) -> String {
1558 0 : self.delta_layer.layer_dbg_info()
1559 0 : }
1560 :
1561 : /// Retrieve a batch of key-value pairs into the iterator buffer.
1562 10703 : async fn next_batch(&mut self) -> anyhow::Result<()> {
1563 10703 : assert!(self.key_values_batch.is_empty());
1564 10703 : assert!(!self.is_end);
1565 :
1566 10703 : let plan = loop {
1567 1049815 : if let Some(res) = self.index_iter.next().await {
1568 1049541 : let (raw_key, value) = res?;
1569 1049541 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
1570 1049541 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
1571 1049541 : let blob_ref = BlobRef(value);
1572 1049541 : let offset = blob_ref.pos();
1573 10429 : if let Some(batch_plan) =
1574 1049541 : self.planner.handle(key, lsn, offset, blob_ref.will_init())
1575 : {
1576 10429 : break batch_plan;
1577 1039112 : }
1578 : } else {
1579 274 : self.is_end = true;
1580 274 : let data_end_offset = self.delta_layer.index_start_offset();
1581 274 : if let Some(item) = self.planner.handle_range_end(data_end_offset) {
1582 274 : break item;
1583 : } else {
1584 0 : return Ok(()); // TODO: test empty iterator
1585 : }
1586 : }
1587 : };
1588 10703 : let vectored_blob_reader = VectoredBlobReader::new(&self.delta_layer.file);
1589 10703 : let mut next_batch = std::collections::VecDeque::new();
1590 10703 : let buf_size = plan.size();
1591 10703 : let buf = IoBufferMut::with_capacity(buf_size);
1592 10703 : let blobs_buf = vectored_blob_reader
1593 10703 : .read_blobs(&plan, buf, self.ctx)
1594 10703 : .await?;
1595 10703 : let view = BufView::new_slice(&blobs_buf.buf);
1596 1049527 : for meta in blobs_buf.blobs.iter() {
1597 1049527 : let blob_read = meta.read(&view).await?;
1598 1049527 : let value = Value::des(&blob_read)?;
1599 :
1600 1049527 : next_batch.push_back((meta.meta.key, meta.meta.lsn, value));
1601 : }
1602 10703 : self.key_values_batch = next_batch;
1603 10703 : Ok(())
1604 10703 : }
1605 :
1606 1049921 : pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
1607 1049921 : if self.key_values_batch.is_empty() {
1608 11192 : if self.is_end {
1609 531 : return Ok(None);
1610 10661 : }
1611 10661 : self.next_batch().await?;
1612 1038729 : }
1613 1049390 : Ok(Some(
1614 1049390 : self.key_values_batch
1615 1049390 : .pop_front()
1616 1049390 : .expect("should not be empty"),
1617 1049390 : ))
1618 1049921 : }
1619 : }
1620 :
1621 : #[cfg(test)]
1622 : pub(crate) mod test {
1623 : use std::collections::BTreeMap;
1624 :
1625 : use super::*;
1626 : use crate::DEFAULT_PG_VERSION;
1627 : use crate::context::DownloadBehavior;
1628 : use crate::task_mgr::TaskKind;
1629 : use crate::tenant::disk_btree::tests::TestDisk;
1630 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
1631 : use crate::tenant::storage_layer::{Layer, ResidentLayer};
1632 : use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
1633 : use crate::tenant::{TenantShard, Timeline};
1634 : use bytes::Bytes;
1635 : use itertools::MinMaxResult;
1636 : use postgres_ffi::PgMajorVersion;
1637 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1638 : use rand::{Rng, RngCore};
1639 :
1640 : /// Construct an index for a fictional delta layer and and then
1641 : /// traverse in order to plan vectored reads for a query. Finally,
1642 : /// verify that the traversal fed the right index key and value
1643 : /// pairs into the planner.
1644 : #[tokio::test]
1645 1 : async fn test_delta_layer_index_traversal() {
1646 1 : let base_key = Key {
1647 1 : field1: 0,
1648 1 : field2: 1663,
1649 1 : field3: 12972,
1650 1 : field4: 16396,
1651 1 : field5: 0,
1652 1 : field6: 246080,
1653 1 : };
1654 :
1655 : // Populate the index with some entries
1656 1 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1657 1 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1658 1 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1659 1 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1660 1 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1661 1 : ]);
1662 :
1663 1 : let mut disk = TestDisk::default();
1664 1 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1665 :
1666 1 : let mut disk_offset = 0;
1667 5 : for (key, lsns) in &entries {
1668 21 : for lsn in lsns {
1669 17 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1670 17 : let blob_ref = BlobRef::new(disk_offset, false);
1671 17 : writer
1672 17 : .append(&index_key.0, blob_ref.0)
1673 17 : .expect("In memory disk append should never fail");
1674 17 :
1675 17 : disk_offset += 1;
1676 17 : }
1677 : }
1678 :
1679 : // Prepare all the arguments for the call into `plan_reads` below
1680 1 : let (root_offset, _writer) = writer
1681 1 : .finish()
1682 1 : .expect("In memory disk finish should never fail");
1683 1 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1684 1 : let planner = VectoredReadPlanner::new(100);
1685 1 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1686 :
1687 1 : let keyspace = KeySpace {
1688 1 : ranges: vec![
1689 1 : base_key..base_key.add(3),
1690 1 : base_key.add(3)..base_key.add(100),
1691 1 : ],
1692 1 : };
1693 1 : let lsn_range = Lsn(2)..Lsn(40);
1694 :
1695 : // Plan and validate
1696 1 : let vectored_reads = DeltaLayerInner::plan_reads(
1697 1 : &keyspace,
1698 1 : lsn_range.clone(),
1699 1 : disk_offset,
1700 1 : reader,
1701 1 : planner,
1702 1 : &ctx,
1703 1 : )
1704 1 : .await
1705 1 : .expect("Read planning should not fail");
1706 :
1707 1 : validate(keyspace, lsn_range, vectored_reads, entries);
1708 1 : }
1709 :
1710 1 : fn validate(
1711 1 : keyspace: KeySpace,
1712 1 : lsn_range: Range<Lsn>,
1713 1 : vectored_reads: Vec<VectoredRead>,
1714 1 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1715 1 : ) {
1716 : #[derive(Debug, PartialEq, Eq)]
1717 : struct BlobSpec {
1718 : key: Key,
1719 : lsn: Lsn,
1720 : at: u64,
1721 : }
1722 :
1723 1 : let mut planned_blobs = Vec::new();
1724 15 : for read in vectored_reads {
1725 14 : for (at, meta) in read.blobs_at.as_slice() {
1726 14 : planned_blobs.push(BlobSpec {
1727 14 : key: meta.key,
1728 14 : lsn: meta.lsn,
1729 14 : at: *at,
1730 14 : });
1731 14 : }
1732 : }
1733 :
1734 1 : let mut expected_blobs = Vec::new();
1735 1 : let mut disk_offset = 0;
1736 5 : for (key, lsns) in index_entries {
1737 21 : for lsn in lsns {
1738 21 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1739 17 : let lsn_included = lsn_range.contains(&lsn);
1740 :
1741 17 : if key_included && lsn_included {
1742 14 : expected_blobs.push(BlobSpec {
1743 14 : key,
1744 14 : lsn,
1745 14 : at: disk_offset,
1746 14 : });
1747 14 : }
1748 :
1749 17 : disk_offset += 1;
1750 : }
1751 : }
1752 :
1753 1 : assert_eq!(planned_blobs, expected_blobs);
1754 1 : }
1755 :
1756 : mod constants {
1757 : use utils::lsn::Lsn;
1758 :
1759 : /// Offset used by all lsns in this test
1760 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1761 : /// Number of unique keys including in the test data
1762 : pub(super) const KEY_COUNT: u8 = 60;
1763 : /// Max number of different lsns for each key
1764 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1765 : /// Possible value sizes for each key along with a probability weight
1766 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1767 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1768 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1769 : /// The minimum size of a key range in all the generated reads
1770 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1771 : /// The number of ranges included in each vectored read
1772 : pub(super) const RANGES_COUNT: u8 = 2;
1773 : /// The number of vectored reads performed
1774 : pub(super) const READS_COUNT: u8 = 100;
1775 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1776 : /// with values larger than the limit
1777 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1778 : }
1779 :
1780 : struct Entry {
1781 : key: Key,
1782 : lsn: Lsn,
1783 : value: Vec<u8>,
1784 : }
1785 :
1786 1 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1787 1 : let mut current_key = Key::MIN;
1788 :
1789 1 : let mut entries = Vec::new();
1790 61 : for _ in 0..constants::KEY_COUNT {
1791 60 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1792 60 : let mut lsns_iter =
1793 1130 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1794 1130 : Some(Lsn(lsn.0 + 0x08))
1795 1130 : });
1796 60 : let mut lsns = Vec::new();
1797 1190 : while lsns.len() < count as usize {
1798 1130 : let take = rng.gen_bool(0.5);
1799 1130 : let lsn = lsns_iter.next().unwrap();
1800 1130 : if take {
1801 556 : lsns.push(lsn);
1802 574 : }
1803 : }
1804 :
1805 616 : for lsn in lsns {
1806 556 : let size = constants::VALUE_SIZES
1807 556 : .choose_weighted(rng, |item| item.1)
1808 556 : .unwrap()
1809 : .0;
1810 556 : let mut buf = vec![0; size];
1811 556 : rng.fill_bytes(&mut buf);
1812 :
1813 556 : entries.push(Entry {
1814 556 : key: current_key,
1815 556 : lsn,
1816 556 : value: buf,
1817 556 : })
1818 : }
1819 :
1820 60 : let gap = constants::KEY_GAP_CHANGES
1821 60 : .choose_weighted(rng, |item| item.1)
1822 60 : .unwrap()
1823 : .0;
1824 60 : if gap {
1825 19 : current_key = current_key.add(2);
1826 41 : } else {
1827 41 : current_key = current_key.add(1);
1828 41 : }
1829 : }
1830 :
1831 1 : entries
1832 1 : }
1833 :
1834 : struct EntriesMeta {
1835 : key_range: Range<Key>,
1836 : lsn_range: Range<Lsn>,
1837 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1838 : }
1839 :
1840 1 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1841 1 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1842 1 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1843 0 : _ => panic!("More than one entry is always expected"),
1844 : };
1845 :
1846 1 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1847 1 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1848 0 : _ => panic!("More than one entry is always expected"),
1849 : };
1850 :
1851 1 : let mut index = BTreeMap::new();
1852 556 : for entry in entries.iter() {
1853 556 : index.insert((entry.key, entry.lsn), entry.value.clone());
1854 556 : }
1855 :
1856 1 : EntriesMeta {
1857 1 : key_range,
1858 1 : lsn_range,
1859 1 : index,
1860 1 : }
1861 1 : }
1862 :
1863 100 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1864 100 : let start = key_range.start.to_i128();
1865 100 : let end = key_range.end.to_i128();
1866 :
1867 100 : let mut keyspace = KeySpace::default();
1868 :
1869 300 : for _ in 0..constants::RANGES_COUNT {
1870 200 : let mut range: Option<Range<Key>> = Option::default();
1871 622 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1872 422 : let range_start = rng.gen_range(start..end);
1873 422 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1874 422 : if range_end_offset >= end {
1875 50 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1876 372 : } else {
1877 372 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1878 372 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1879 372 : }
1880 : }
1881 200 : keyspace.ranges.push(range.unwrap());
1882 : }
1883 :
1884 100 : keyspace
1885 100 : }
1886 :
1887 : #[tokio::test]
1888 1 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1889 1 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
1890 1 : let (tenant, ctx) = harness.load().await;
1891 :
1892 1 : let timeline_id = TimelineId::generate();
1893 1 : let timeline = tenant
1894 1 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1895 1 : .await?;
1896 :
1897 1 : tracing::info!("Generating test data ...");
1898 :
1899 1 : let rng = &mut StdRng::seed_from_u64(0);
1900 1 : let entries = generate_entries(rng);
1901 1 : let entries_meta = get_entries_meta(&entries);
1902 :
1903 1 : tracing::info!("Done generating {} entries", entries.len());
1904 :
1905 1 : tracing::info!("Writing test data to delta layer ...");
1906 1 : let mut writer = DeltaLayerWriter::new(
1907 1 : harness.conf,
1908 1 : timeline_id,
1909 1 : harness.tenant_shard_id,
1910 1 : entries_meta.key_range.start,
1911 1 : entries_meta.lsn_range.clone(),
1912 1 : &timeline.gate,
1913 1 : timeline.cancel.clone(),
1914 1 : &ctx,
1915 1 : )
1916 1 : .await?;
1917 :
1918 557 : for entry in entries {
1919 556 : let (_, res) = writer
1920 556 : .put_value_bytes(entry.key, entry.lsn, entry.value.slice_len(), false, &ctx)
1921 556 : .await;
1922 556 : res?;
1923 : }
1924 :
1925 1 : let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?;
1926 1 : let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?;
1927 :
1928 1 : let inner = resident.get_as_delta(&ctx).await?;
1929 :
1930 1 : let file_size = inner.file.metadata().await?.len();
1931 1 : tracing::info!(
1932 0 : "Done writing test data to delta layer. Resulting file size is: {}",
1933 : file_size
1934 : );
1935 :
1936 101 : for i in 0..constants::READS_COUNT {
1937 100 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1938 1 :
1939 100 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1940 100 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1941 100 : inner.index_start_blk,
1942 100 : inner.index_root_blk,
1943 100 : block_reader,
1944 1 : );
1945 1 :
1946 100 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1947 100 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1948 100 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1949 1 :
1950 100 : let vectored_reads = DeltaLayerInner::plan_reads(
1951 100 : &keyspace,
1952 100 : entries_meta.lsn_range.clone(),
1953 100 : data_end_offset,
1954 100 : index_reader,
1955 100 : planner,
1956 100 : &ctx,
1957 100 : )
1958 100 : .await?;
1959 1 :
1960 100 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1961 100 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1962 100 : &vectored_reads,
1963 1 : constants::MAX_VECTORED_READ_BYTES,
1964 1 : );
1965 100 : let mut buf = Some(IoBufferMut::with_capacity(buf_size));
1966 1 :
1967 9962 : for read in vectored_reads {
1968 9862 : let blobs_buf = vectored_blob_reader
1969 9862 : .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
1970 9862 : .await?;
1971 9862 : let view = BufView::new_slice(&blobs_buf.buf);
1972 28652 : for meta in blobs_buf.blobs.iter() {
1973 28652 : let value = meta.read(&view).await?;
1974 28652 : assert_eq!(
1975 28652 : &value[..],
1976 28652 : &entries_meta.index[&(meta.meta.key, meta.meta.lsn)]
1977 1 : );
1978 1 : }
1979 1 :
1980 9862 : buf = Some(blobs_buf.buf);
1981 1 : }
1982 1 : }
1983 1 :
1984 1 : Ok(())
1985 1 : }
1986 :
1987 : #[tokio::test]
1988 1 : async fn copy_delta_prefix_smoke() {
1989 : use bytes::Bytes;
1990 : use wal_decoder::models::record::NeonWalRecord;
1991 :
1992 1 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
1993 1 : .await
1994 1 : .unwrap();
1995 1 : let (tenant, ctx) = h.load().await;
1996 1 : let ctx = &ctx;
1997 1 : let timeline = tenant
1998 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), PgMajorVersion::PG14, ctx)
1999 1 : .await
2000 1 : .unwrap();
2001 1 : let ctx = &ctx.with_scope_timeline(&timeline);
2002 :
2003 1 : let initdb_layer = timeline
2004 1 : .layers
2005 1 : .read(crate::tenant::timeline::layer_manager::LayerManagerLockHolder::Testing)
2006 1 : .await
2007 1 : .likely_resident_layers()
2008 1 : .next()
2009 1 : .cloned()
2010 1 : .unwrap();
2011 :
2012 : {
2013 1 : let mut writer = timeline.writer().await;
2014 :
2015 1 : let data = [
2016 1 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
2017 1 : (
2018 1 : 0x30,
2019 1 : 12,
2020 1 : Value::WalRecord(NeonWalRecord::Postgres {
2021 1 : will_init: false,
2022 1 : rec: Bytes::from_static(b"1"),
2023 1 : }),
2024 1 : ),
2025 1 : (
2026 1 : 0x40,
2027 1 : 12,
2028 1 : Value::WalRecord(NeonWalRecord::Postgres {
2029 1 : will_init: true,
2030 1 : rec: Bytes::from_static(b"2"),
2031 1 : }),
2032 1 : ),
2033 : // build an oversized value so we cannot extend and existing read over
2034 : // this
2035 : (
2036 : 0x50,
2037 : 12,
2038 : Value::WalRecord(NeonWalRecord::Postgres {
2039 : will_init: true,
2040 : rec: {
2041 1 : let mut buf =
2042 1 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
2043 1 : buf.iter_mut()
2044 1 : .enumerate()
2045 134144 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
2046 1 : Bytes::from(buf)
2047 : },
2048 : }),
2049 : ),
2050 : // because the oversized read cannot be extended further, we are sure to exercise the
2051 : // builder created on the last round with this:
2052 1 : (
2053 1 : 0x60,
2054 1 : 12,
2055 1 : Value::WalRecord(NeonWalRecord::Postgres {
2056 1 : will_init: true,
2057 1 : rec: Bytes::from_static(b"3"),
2058 1 : }),
2059 1 : ),
2060 1 : (
2061 1 : 0x60,
2062 1 : 9,
2063 1 : Value::Image(Bytes::from_static(b"something for a different key")),
2064 1 : ),
2065 : ];
2066 :
2067 1 : let mut last_lsn = None;
2068 :
2069 7 : for (lsn, key, value) in data {
2070 6 : let key = Key::from_i128(key);
2071 6 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
2072 6 : last_lsn = Some(lsn);
2073 : }
2074 :
2075 1 : writer.finish_write(Lsn(last_lsn.unwrap()));
2076 : }
2077 1 : timeline.freeze_and_flush().await.unwrap();
2078 :
2079 1 : let new_layer = timeline
2080 1 : .layers
2081 1 : .read(LayerManagerLockHolder::Testing)
2082 1 : .await
2083 1 : .likely_resident_layers()
2084 1 : .find(|&x| x != &initdb_layer)
2085 1 : .cloned()
2086 1 : .unwrap();
2087 :
2088 : // create a copy for the timeline, so we don't overwrite the file
2089 1 : let branch = tenant
2090 1 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
2091 1 : .await
2092 1 : .unwrap();
2093 :
2094 1 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
2095 :
2096 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
2097 : // a single key
2098 :
2099 6 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
2100 5 : let truncate_at = Lsn(truncate_at);
2101 1 :
2102 5 : let mut writer = DeltaLayerWriter::new(
2103 5 : tenant.conf,
2104 5 : branch.timeline_id,
2105 5 : tenant.tenant_shard_id,
2106 5 : Key::MIN,
2107 5 : Lsn(0x11)..truncate_at,
2108 5 : &branch.gate,
2109 5 : branch.cancel.clone(),
2110 5 : ctx,
2111 5 : )
2112 5 : .await
2113 5 : .unwrap();
2114 1 :
2115 5 : let new_layer = new_layer.download_and_keep_resident(ctx).await.unwrap();
2116 1 :
2117 5 : new_layer
2118 5 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
2119 5 : .await
2120 5 : .unwrap();
2121 1 :
2122 5 : let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap();
2123 5 : let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap();
2124 1 :
2125 5 : copied_layer.get_as_delta(ctx).await.unwrap();
2126 1 :
2127 5 : assert_keys_and_values_eq(
2128 5 : new_layer.get_as_delta(ctx).await.unwrap(),
2129 5 : copied_layer.get_as_delta(ctx).await.unwrap(),
2130 5 : truncate_at,
2131 5 : ctx,
2132 1 : )
2133 5 : .await;
2134 1 : }
2135 1 : }
2136 :
2137 5 : async fn assert_keys_and_values_eq(
2138 5 : source: &DeltaLayerInner,
2139 5 : truncated: &DeltaLayerInner,
2140 5 : truncated_at: Lsn,
2141 5 : ctx: &RequestContext,
2142 5 : ) {
2143 : use futures::future::ready;
2144 : use futures::stream::TryStreamExt;
2145 :
2146 5 : let start_key = [0u8; DELTA_KEY_SIZE];
2147 :
2148 5 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
2149 5 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2150 5 : source.index_start_blk,
2151 5 : source.index_root_blk,
2152 5 : &source_reader,
2153 : );
2154 5 : let source_stream = source.stream_index_forwards(source_tree, &start_key, ctx);
2155 30 : let source_stream = source_stream.filter(|res| match res {
2156 30 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
2157 0 : _ => ready(true),
2158 30 : });
2159 5 : let mut source_stream = std::pin::pin!(source_stream);
2160 :
2161 5 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
2162 5 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2163 5 : truncated.index_start_blk,
2164 5 : truncated.index_root_blk,
2165 5 : &truncated_reader,
2166 : );
2167 5 : let truncated_stream = truncated.stream_index_forwards(truncated_tree, &start_key, ctx);
2168 5 : let mut truncated_stream = std::pin::pin!(truncated_stream);
2169 :
2170 5 : let mut scratch_left = Vec::new();
2171 5 : let mut scratch_right = Vec::new();
2172 :
2173 : loop {
2174 21 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
2175 21 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2176 :
2177 21 : if src.is_none() {
2178 5 : assert!(truncated.is_none());
2179 5 : break;
2180 16 : }
2181 :
2182 16 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2183 :
2184 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2185 16 : assert_eq!(src.0, truncated.0);
2186 16 : assert_eq!(src.1, truncated.1);
2187 :
2188 : // if this is needed for something else, just drop this assert.
2189 16 : assert!(
2190 16 : src.2.pos() >= truncated.2.pos(),
2191 0 : "value position should not go backwards {} vs. {}",
2192 0 : src.2.pos(),
2193 0 : truncated.2.pos()
2194 : );
2195 :
2196 16 : scratch_left.clear();
2197 16 : let src_cursor = source_reader.block_cursor();
2198 16 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2199 16 : scratch_right.clear();
2200 16 : let trunc_cursor = truncated_reader.block_cursor();
2201 16 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2202 :
2203 16 : tokio::try_join!(left, right).unwrap();
2204 :
2205 16 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2206 : }
2207 5 : }
2208 :
2209 9112 : pub(crate) fn sort_delta(
2210 9112 : (k1, l1, _): &(Key, Lsn, Value),
2211 9112 : (k2, l2, _): &(Key, Lsn, Value),
2212 9112 : ) -> std::cmp::Ordering {
2213 9112 : (k1, l1).cmp(&(k2, l2))
2214 9112 : }
2215 :
2216 : #[cfg(feature = "testing")]
2217 47 : pub(crate) fn sort_delta_value(
2218 47 : (k1, l1, v1): &(Key, Lsn, Value),
2219 47 : (k2, l2, v2): &(Key, Lsn, Value),
2220 47 : ) -> std::cmp::Ordering {
2221 47 : let order_1 = if v1.is_image() { 0 } else { 1 };
2222 47 : let order_2 = if v2.is_image() { 0 } else { 1 };
2223 47 : (k1, l1, order_1).cmp(&(k2, l2, order_2))
2224 47 : }
2225 :
2226 11 : pub(crate) async fn produce_delta_layer(
2227 11 : tenant: &TenantShard,
2228 11 : tline: &Arc<Timeline>,
2229 11 : mut deltas: Vec<(Key, Lsn, Value)>,
2230 11 : ctx: &RequestContext,
2231 11 : ) -> anyhow::Result<ResidentLayer> {
2232 11 : deltas.sort_by(sort_delta);
2233 11 : let (key_start, _, _) = deltas.first().unwrap();
2234 11 : let (key_max, _, _) = deltas.last().unwrap();
2235 11 : let lsn_min = deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
2236 11 : let lsn_max = deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap();
2237 11 : let lsn_end = Lsn(lsn_max.0 + 1);
2238 11 : let mut writer = DeltaLayerWriter::new(
2239 11 : tenant.conf,
2240 11 : tline.timeline_id,
2241 11 : tenant.tenant_shard_id,
2242 11 : *key_start,
2243 11 : (*lsn_min)..lsn_end,
2244 11 : &tline.gate,
2245 11 : tline.cancel.clone(),
2246 11 : ctx,
2247 11 : )
2248 11 : .await?;
2249 11 : let key_end = key_max.next();
2250 :
2251 4131 : for (key, lsn, value) in deltas {
2252 4120 : writer.put_value(key, lsn, value, ctx).await?;
2253 : }
2254 :
2255 11 : let (desc, path) = writer.finish(key_end, ctx).await?;
2256 11 : let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
2257 :
2258 11 : Ok::<_, anyhow::Error>(delta_layer)
2259 11 : }
2260 :
2261 14 : async fn assert_delta_iter_equal(
2262 14 : delta_iter: &mut DeltaLayerIterator<'_>,
2263 14 : expect: &[(Key, Lsn, Value)],
2264 14 : ) {
2265 14 : let mut expect_iter = expect.iter();
2266 : loop {
2267 14014 : let o1 = delta_iter.next().await.unwrap();
2268 14014 : let o2 = expect_iter.next();
2269 14014 : assert_eq!(o1.is_some(), o2.is_some());
2270 14014 : if o1.is_none() && o2.is_none() {
2271 14 : break;
2272 14000 : }
2273 14000 : let (k1, l1, v1) = o1.unwrap();
2274 14000 : let (k2, l2, v2) = o2.unwrap();
2275 14000 : assert_eq!(&k1, k2);
2276 14000 : assert_eq!(l1, *l2);
2277 14000 : assert_eq!(&v1, v2);
2278 : }
2279 14 : }
2280 :
2281 : #[tokio::test]
2282 1 : async fn delta_layer_iterator() {
2283 1 : let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
2284 1 : let (tenant, ctx) = harness.load().await;
2285 :
2286 1 : let tline = tenant
2287 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
2288 1 : .await
2289 1 : .unwrap();
2290 :
2291 1000 : fn get_key(id: u32) -> Key {
2292 1000 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
2293 1000 : key.field6 = id;
2294 1000 : key
2295 1000 : }
2296 : const N: usize = 1000;
2297 1 : let test_deltas = (0..N)
2298 1000 : .map(|idx| {
2299 1000 : (
2300 1000 : get_key(idx as u32 / 10),
2301 1000 : Lsn(0x10 * ((idx as u64) % 10 + 1)),
2302 1000 : Value::Image(Bytes::from(format!("img{idx:05}"))),
2303 1000 : )
2304 1000 : })
2305 1 : .collect_vec();
2306 1 : let resident_layer = produce_delta_layer(&tenant, &tline, test_deltas.clone(), &ctx)
2307 1 : .await
2308 1 : .unwrap();
2309 1 : let delta_layer = resident_layer.get_as_delta(&ctx).await.unwrap();
2310 3 : for max_read_size in [1, 1024] {
2311 16 : for batch_size in [1, 2, 4, 8, 3, 7, 13] {
2312 14 : println!("running with batch_size={batch_size} max_read_size={max_read_size}");
2313 1 : // Test if the batch size is correctly determined
2314 14 : let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
2315 14 : let mut num_items = 0;
2316 56 : for _ in 0..3 {
2317 42 : iter.next_batch().await.unwrap();
2318 42 : num_items += iter.key_values_batch.len();
2319 42 : if max_read_size == 1 {
2320 1 : // every key should be a batch b/c the value is larger than max_read_size
2321 21 : assert_eq!(iter.key_values_batch.len(), 1);
2322 1 : } else {
2323 21 : assert!(iter.key_values_batch.len() <= batch_size);
2324 1 : }
2325 42 : if num_items >= N {
2326 1 : break;
2327 42 : }
2328 42 : iter.key_values_batch.clear();
2329 1 : }
2330 1 : // Test if the result is correct
2331 14 : let mut iter = delta_layer.iter_with_options(&ctx, max_read_size, batch_size);
2332 14 : assert_delta_iter_equal(&mut iter, &test_deltas).await;
2333 1 : }
2334 1 : }
2335 1 : }
2336 : }
|