Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "index", and
24 : //! "values". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::{self, FileId, PAGE_SZ};
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
37 : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
38 : use crate::tenant::timeline::GetVectoredError;
39 : use crate::tenant::vectored_blob_io::{
40 : BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
41 : };
42 : use crate::tenant::{PageReconstructError, Timeline};
43 : use crate::virtual_file::{self, VirtualFile};
44 : use crate::{walrecord, TEMP_FILE_SUFFIX};
45 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
46 : use anyhow::{anyhow, bail, ensure, Context, Result};
47 : use bytes::BytesMut;
48 : use camino::{Utf8Path, Utf8PathBuf};
49 : use futures::StreamExt;
50 : use itertools::Itertools;
51 : use pageserver_api::keyspace::KeySpace;
52 : use pageserver_api::models::LayerAccessKind;
53 : use pageserver_api::shard::TenantShardId;
54 : use rand::{distributions::Alphanumeric, Rng};
55 : use serde::{Deserialize, Serialize};
56 : use std::fs::File;
57 : use std::io::SeekFrom;
58 : use std::ops::Range;
59 : use std::os::unix::fs::FileExt;
60 : use std::sync::Arc;
61 : use tokio::sync::OnceCell;
62 : use tracing::*;
63 :
64 : use utils::{
65 : bin_ser::BeSer,
66 : id::{TenantId, TimelineId},
67 : lsn::Lsn,
68 : };
69 :
70 : use super::{
71 : AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
72 : };
73 :
74 : ///
75 : /// Header stored in the beginning of the file
76 : ///
77 : /// After this comes the 'values' part, starting on block 1. After that,
78 : /// the 'index' starts at the block indicated by 'index_start_blk'
79 : ///
80 594 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
81 : pub struct Summary {
82 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
83 : pub magic: u16,
84 : pub format_version: u16,
85 :
86 : pub tenant_id: TenantId,
87 : pub timeline_id: TimelineId,
88 : pub key_range: Range<Key>,
89 : pub lsn_range: Range<Lsn>,
90 :
91 : /// Block number where the 'index' part of the file begins.
92 : pub index_start_blk: u32,
93 : /// Block within the 'index', where the B-tree root page is stored
94 : pub index_root_blk: u32,
95 : }
96 :
97 : impl From<&DeltaLayer> for Summary {
98 0 : fn from(layer: &DeltaLayer) -> Self {
99 0 : Self::expected(
100 0 : layer.desc.tenant_shard_id.tenant_id,
101 0 : layer.desc.timeline_id,
102 0 : layer.desc.key_range.clone(),
103 0 : layer.desc.lsn_range.clone(),
104 0 : )
105 0 : }
106 : }
107 :
108 : impl Summary {
109 594 : pub(super) fn expected(
110 594 : tenant_id: TenantId,
111 594 : timeline_id: TimelineId,
112 594 : keys: Range<Key>,
113 594 : lsns: Range<Lsn>,
114 594 : ) -> Self {
115 594 : Self {
116 594 : magic: DELTA_FILE_MAGIC,
117 594 : format_version: STORAGE_FORMAT_VERSION,
118 594 :
119 594 : tenant_id,
120 594 : timeline_id,
121 594 : key_range: keys,
122 594 : lsn_range: lsns,
123 594 :
124 594 : index_start_blk: 0,
125 594 : index_root_blk: 0,
126 594 : }
127 594 : }
128 : }
129 :
130 : // Flag indicating that this version initialize the page
131 : const WILL_INIT: u64 = 1;
132 :
133 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
134 : /// offset, and for WAL records it also contains `will_init` flag. The flag
135 : /// helps to determine the range of records that needs to be applied, without
136 : /// reading/deserializing records themselves.
137 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
138 : pub struct BlobRef(pub u64);
139 :
140 : impl BlobRef {
141 130136 : pub fn will_init(&self) -> bool {
142 130136 : (self.0 & WILL_INIT) != 0
143 130136 : }
144 :
145 6374152 : pub fn pos(&self) -> u64 {
146 6374152 : self.0 >> 1
147 6374152 : }
148 :
149 6352610 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
150 6352610 : let mut blob_ref = pos << 1;
151 6352610 : if will_init {
152 6351464 : blob_ref |= WILL_INIT;
153 6351464 : }
154 6352610 : BlobRef(blob_ref)
155 6352610 : }
156 : }
157 :
158 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
159 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
160 :
161 : /// This is the key of the B-tree index stored in the delta layer. It consists
162 : /// of the serialized representation of a Key and LSN.
163 : impl DeltaKey {
164 3122006 : fn from_slice(buf: &[u8]) -> Self {
165 3122006 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
166 3122006 : bytes.copy_from_slice(buf);
167 3122006 : DeltaKey(bytes)
168 3122006 : }
169 :
170 6477048 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
171 6477048 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
172 6477048 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
173 6477048 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
174 6477048 : DeltaKey(bytes)
175 6477048 : }
176 :
177 3122006 : fn key(&self) -> Key {
178 3122006 : Key::from_slice(&self.0)
179 3122006 : }
180 :
181 3122006 : fn lsn(&self) -> Lsn {
182 3122006 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
183 3122006 : }
184 :
185 130140 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
186 130140 : let mut lsn_buf = [0u8; 8];
187 130140 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
188 130140 : Lsn(u64::from_be_bytes(lsn_buf))
189 130140 : }
190 : }
191 :
192 : /// This is used only from `pagectl`. Within pageserver, all layers are
193 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
194 : pub struct DeltaLayer {
195 : path: Utf8PathBuf,
196 : pub desc: PersistentLayerDesc,
197 : access_stats: LayerAccessStats,
198 : inner: OnceCell<Arc<DeltaLayerInner>>,
199 : }
200 :
201 : impl std::fmt::Debug for DeltaLayer {
202 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 0 : use super::RangeDisplayDebug;
204 0 :
205 0 : f.debug_struct("DeltaLayer")
206 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
207 0 : .field("lsn_range", &self.desc.lsn_range)
208 0 : .field("file_size", &self.desc.file_size)
209 0 : .field("inner", &self.inner)
210 0 : .finish()
211 0 : }
212 : }
213 :
214 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
215 : /// file.
216 : pub struct DeltaLayerInner {
217 : // values copied from summary
218 : index_start_blk: u32,
219 : index_root_blk: u32,
220 :
221 : file: VirtualFile,
222 : file_id: FileId,
223 :
224 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
225 : }
226 :
227 : impl std::fmt::Debug for DeltaLayerInner {
228 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 0 : f.debug_struct("DeltaLayerInner")
230 0 : .field("index_start_blk", &self.index_start_blk)
231 0 : .field("index_root_blk", &self.index_root_blk)
232 0 : .finish()
233 0 : }
234 : }
235 :
236 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
237 : impl std::fmt::Display for DeltaLayer {
238 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 0 : write!(f, "{}", self.layer_desc().short_id())
240 0 : }
241 : }
242 :
243 : impl AsLayerDesc for DeltaLayer {
244 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
245 0 : &self.desc
246 0 : }
247 : }
248 :
249 : impl DeltaLayer {
250 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
251 0 : self.desc.dump();
252 0 :
253 0 : if !verbose {
254 0 : return Ok(());
255 0 : }
256 :
257 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
258 :
259 0 : inner.dump(ctx).await
260 0 : }
261 :
262 838 : fn temp_path_for(
263 838 : conf: &PageServerConf,
264 838 : tenant_shard_id: &TenantShardId,
265 838 : timeline_id: &TimelineId,
266 838 : key_start: Key,
267 838 : lsn_range: &Range<Lsn>,
268 838 : ) -> Utf8PathBuf {
269 838 : let rand_string: String = rand::thread_rng()
270 838 : .sample_iter(&Alphanumeric)
271 838 : .take(8)
272 838 : .map(char::from)
273 838 : .collect();
274 838 :
275 838 : conf.timeline_path(tenant_shard_id, timeline_id)
276 838 : .join(format!(
277 838 : "{}-XXX__{:016X}-{:016X}.{}.{}",
278 838 : key_start,
279 838 : u64::from(lsn_range.start),
280 838 : u64::from(lsn_range.end),
281 838 : rand_string,
282 838 : TEMP_FILE_SUFFIX,
283 838 : ))
284 838 : }
285 :
286 : ///
287 : /// Open the underlying file and read the metadata into memory, if it's
288 : /// not loaded already.
289 : ///
290 0 : async fn load(
291 0 : &self,
292 0 : access_kind: LayerAccessKind,
293 0 : ctx: &RequestContext,
294 0 : ) -> Result<&Arc<DeltaLayerInner>> {
295 0 : self.access_stats.record_access(access_kind, ctx);
296 0 : // Quick exit if already loaded
297 0 : self.inner
298 0 : .get_or_try_init(|| self.load_inner(ctx))
299 0 : .await
300 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
301 0 : }
302 :
303 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
304 0 : let path = self.path();
305 :
306 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx)
307 0 : .await
308 0 : .and_then(|res| res)?;
309 :
310 : // not production code
311 0 : let actual_filename = path.file_name().unwrap().to_owned();
312 0 : let expected_filename = self.layer_desc().filename().file_name();
313 0 :
314 0 : if actual_filename != expected_filename {
315 0 : println!("warning: filename does not match what is expected from in-file summary");
316 0 : println!("actual: {:?}", actual_filename);
317 0 : println!("expected: {:?}", expected_filename);
318 0 : }
319 :
320 0 : Ok(Arc::new(loaded))
321 0 : }
322 :
323 : /// Create a DeltaLayer struct representing an existing file on disk.
324 : ///
325 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
326 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
327 0 : let mut summary_buf = vec![0; PAGE_SZ];
328 0 : file.read_exact_at(&mut summary_buf, 0)?;
329 0 : let summary = Summary::des_prefix(&summary_buf)?;
330 :
331 0 : let metadata = file
332 0 : .metadata()
333 0 : .context("get file metadata to determine size")?;
334 :
335 : // This function is never used for constructing layers in a running pageserver,
336 : // so it does not need an accurate TenantShardId.
337 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
338 0 :
339 0 : Ok(DeltaLayer {
340 0 : path: path.to_path_buf(),
341 0 : desc: PersistentLayerDesc::new_delta(
342 0 : tenant_shard_id,
343 0 : summary.timeline_id,
344 0 : summary.key_range,
345 0 : summary.lsn_range,
346 0 : metadata.len(),
347 0 : ),
348 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
349 0 : inner: OnceCell::new(),
350 0 : })
351 0 : }
352 :
353 : /// Path to the layer file in pageserver workdir.
354 0 : fn path(&self) -> Utf8PathBuf {
355 0 : self.path.clone()
356 0 : }
357 : }
358 :
359 : /// A builder object for constructing a new delta layer.
360 : ///
361 : /// Usage:
362 : ///
363 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
364 : ///
365 : /// 2. Write the contents by calling `put_value` for every page
366 : /// version to store in the layer.
367 : ///
368 : /// 3. Call `finish`.
369 : ///
370 : struct DeltaLayerWriterInner {
371 : conf: &'static PageServerConf,
372 : pub path: Utf8PathBuf,
373 : timeline_id: TimelineId,
374 : tenant_shard_id: TenantShardId,
375 :
376 : key_start: Key,
377 : lsn_range: Range<Lsn>,
378 :
379 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
380 :
381 : blob_writer: BlobWriter<true>,
382 : }
383 :
384 : impl DeltaLayerWriterInner {
385 : ///
386 : /// Start building a new delta layer.
387 : ///
388 838 : async fn new(
389 838 : conf: &'static PageServerConf,
390 838 : timeline_id: TimelineId,
391 838 : tenant_shard_id: TenantShardId,
392 838 : key_start: Key,
393 838 : lsn_range: Range<Lsn>,
394 838 : ) -> anyhow::Result<Self> {
395 838 : // Create the file initially with a temporary filename. We don't know
396 838 : // the end key yet, so we cannot form the final filename yet. We will
397 838 : // rename it when we're done.
398 838 : //
399 838 : // Note: This overwrites any existing file. There shouldn't be any.
400 838 : // FIXME: throw an error instead?
401 838 : let path =
402 838 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
403 :
404 838 : let mut file = VirtualFile::create(&path).await?;
405 : // make room for the header block
406 838 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
407 838 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
408 838 :
409 838 : // Initialize the b-tree index builder
410 838 : let block_buf = BlockBuf::new();
411 838 : let tree_builder = DiskBtreeBuilder::new(block_buf);
412 838 :
413 838 : Ok(Self {
414 838 : conf,
415 838 : path,
416 838 : timeline_id,
417 838 : tenant_shard_id,
418 838 : key_start,
419 838 : lsn_range,
420 838 : tree: tree_builder,
421 838 : blob_writer,
422 838 : })
423 838 : }
424 :
425 : ///
426 : /// Append a key-value pair to the file.
427 : ///
428 : /// The values must be appended in key, lsn order.
429 : ///
430 3121998 : async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
431 3121998 : let (_, res) = self
432 3121998 : .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init())
433 14791 : .await;
434 3121998 : res
435 3121998 : }
436 :
437 6352576 : async fn put_value_bytes(
438 6352576 : &mut self,
439 6352576 : key: Key,
440 6352576 : lsn: Lsn,
441 6352576 : val: Vec<u8>,
442 6352576 : will_init: bool,
443 6352576 : ) -> (Vec<u8>, anyhow::Result<()>) {
444 6352576 : assert!(self.lsn_range.start <= lsn);
445 6352576 : let (val, res) = self.blob_writer.write_blob(val).await;
446 6352576 : let off = match res {
447 6352576 : Ok(off) => off,
448 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
449 : };
450 :
451 6352576 : let blob_ref = BlobRef::new(off, will_init);
452 6352576 :
453 6352576 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
454 6352576 : let res = self.tree.append(&delta_key.0, blob_ref.0);
455 6352576 : (val, res.map_err(|e| anyhow::anyhow!(e)))
456 6352576 : }
457 :
458 3029956 : fn size(&self) -> u64 {
459 3029956 : self.blob_writer.size() + self.tree.borrow_writer().size()
460 3029956 : }
461 :
462 : ///
463 : /// Finish writing the delta layer.
464 : ///
465 838 : async fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
466 838 : let index_start_blk =
467 838 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
468 :
469 838 : let mut file = self.blob_writer.into_inner().await?;
470 :
471 : // Write out the index
472 838 : let (index_root_blk, block_buf) = self.tree.finish()?;
473 838 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
474 0 : .await?;
475 13575 : for buf in block_buf.blocks {
476 12737 : let (_buf, res) = file.write_all(buf).await;
477 12737 : res?;
478 : }
479 838 : assert!(self.lsn_range.start < self.lsn_range.end);
480 : // Fill in the summary on blk 0
481 838 : let summary = Summary {
482 838 : magic: DELTA_FILE_MAGIC,
483 838 : format_version: STORAGE_FORMAT_VERSION,
484 838 : tenant_id: self.tenant_shard_id.tenant_id,
485 838 : timeline_id: self.timeline_id,
486 838 : key_range: self.key_start..key_end,
487 838 : lsn_range: self.lsn_range.clone(),
488 838 : index_start_blk,
489 838 : index_root_blk,
490 838 : };
491 838 :
492 838 : let mut buf = Vec::with_capacity(PAGE_SZ);
493 838 : // TODO: could use smallvec here but it's a pain with Slice<T>
494 838 : Summary::ser_into(&summary, &mut buf)?;
495 838 : file.seek(SeekFrom::Start(0)).await?;
496 838 : let (_buf, res) = file.write_all(buf).await;
497 838 : res?;
498 :
499 838 : let metadata = file
500 838 : .metadata()
501 421 : .await
502 838 : .context("get file metadata to determine size")?;
503 :
504 : // 5GB limit for objects without multipart upload (which we don't want to use)
505 : // Make it a little bit below to account for differing GB units
506 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
507 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
508 838 : ensure!(
509 838 : metadata.len() <= S3_UPLOAD_LIMIT,
510 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
511 0 : file.path,
512 0 : metadata.len()
513 : );
514 :
515 : // Note: Because we opened the file in write-only mode, we cannot
516 : // reuse the same VirtualFile for reading later. That's why we don't
517 : // set inner.file here. The first read will have to re-open it.
518 :
519 838 : let desc = PersistentLayerDesc::new_delta(
520 838 : self.tenant_shard_id,
521 838 : self.timeline_id,
522 838 : self.key_start..key_end,
523 838 : self.lsn_range.clone(),
524 838 : metadata.len(),
525 838 : );
526 838 :
527 838 : // fsync the file
528 838 : file.sync_all().await?;
529 :
530 838 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
531 :
532 838 : trace!("created delta layer {}", layer.local_path());
533 :
534 838 : Ok(layer)
535 838 : }
536 : }
537 :
538 : /// A builder object for constructing a new delta layer.
539 : ///
540 : /// Usage:
541 : ///
542 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
543 : ///
544 : /// 2. Write the contents by calling `put_value` for every page
545 : /// version to store in the layer.
546 : ///
547 : /// 3. Call `finish`.
548 : ///
549 : /// # Note
550 : ///
551 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
552 : /// possible for the writer to drop before `finish` is actually called. So this
553 : /// could lead to odd temporary files in the directory, exhausting file system.
554 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
555 : /// implementation that cleans up the temporary file in failure. It's not
556 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
557 : /// out some fields, making it impossible to implement `Drop`.
558 : ///
559 : #[must_use]
560 : pub struct DeltaLayerWriter {
561 : inner: Option<DeltaLayerWriterInner>,
562 : }
563 :
564 : impl DeltaLayerWriter {
565 : ///
566 : /// Start building a new delta layer.
567 : ///
568 838 : pub async fn new(
569 838 : conf: &'static PageServerConf,
570 838 : timeline_id: TimelineId,
571 838 : tenant_shard_id: TenantShardId,
572 838 : key_start: Key,
573 838 : lsn_range: Range<Lsn>,
574 838 : ) -> anyhow::Result<Self> {
575 838 : Ok(Self {
576 838 : inner: Some(
577 838 : DeltaLayerWriterInner::new(
578 838 : conf,
579 838 : timeline_id,
580 838 : tenant_shard_id,
581 838 : key_start,
582 838 : lsn_range,
583 838 : )
584 428 : .await?,
585 : ),
586 : })
587 838 : }
588 :
589 : ///
590 : /// Append a key-value pair to the file.
591 : ///
592 : /// The values must be appended in key, lsn order.
593 : ///
594 3121998 : pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
595 3121998 : self.inner.as_mut().unwrap().put_value(key, lsn, val).await
596 3121998 : }
597 :
598 3230578 : pub async fn put_value_bytes(
599 3230578 : &mut self,
600 3230578 : key: Key,
601 3230578 : lsn: Lsn,
602 3230578 : val: Vec<u8>,
603 3230578 : will_init: bool,
604 3230578 : ) -> (Vec<u8>, anyhow::Result<()>) {
605 3230578 : self.inner
606 3230578 : .as_mut()
607 3230578 : .unwrap()
608 3230578 : .put_value_bytes(key, lsn, val, will_init)
609 18106 : .await
610 3230578 : }
611 :
612 3029956 : pub fn size(&self) -> u64 {
613 3029956 : self.inner.as_ref().unwrap().size()
614 3029956 : }
615 :
616 : ///
617 : /// Finish writing the delta layer.
618 : ///
619 838 : pub(crate) async fn finish(
620 838 : mut self,
621 838 : key_end: Key,
622 838 : timeline: &Arc<Timeline>,
623 838 : ) -> anyhow::Result<ResidentLayer> {
624 838 : let inner = self.inner.take().unwrap();
625 838 : let temp_path = inner.path.clone();
626 8107 : let result = inner.finish(key_end, timeline).await;
627 : // The delta layer files can sometimes be really large. Clean them up.
628 838 : if result.is_err() {
629 0 : tracing::warn!(
630 0 : "Cleaning up temporary delta file {temp_path} after error during writing"
631 0 : );
632 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
633 0 : tracing::warn!("Error cleaning up temporary delta layer file {temp_path}: {e:?}")
634 0 : }
635 838 : }
636 838 : result
637 838 : }
638 : }
639 :
640 : impl Drop for DeltaLayerWriter {
641 838 : fn drop(&mut self) {
642 838 : if let Some(inner) = self.inner.take() {
643 0 : // We want to remove the virtual file here, so it's fine to not
644 0 : // having completely flushed unwritten data.
645 0 : let vfile = inner.blob_writer.into_inner_no_flush();
646 0 : vfile.remove();
647 838 : }
648 838 : }
649 : }
650 :
651 0 : #[derive(thiserror::Error, Debug)]
652 : pub enum RewriteSummaryError {
653 : #[error("magic mismatch")]
654 : MagicMismatch,
655 : #[error(transparent)]
656 : Other(#[from] anyhow::Error),
657 : }
658 :
659 : impl From<std::io::Error> for RewriteSummaryError {
660 0 : fn from(e: std::io::Error) -> Self {
661 0 : Self::Other(anyhow::anyhow!(e))
662 0 : }
663 : }
664 :
665 : impl DeltaLayer {
666 0 : pub async fn rewrite_summary<F>(
667 0 : path: &Utf8Path,
668 0 : rewrite: F,
669 0 : ctx: &RequestContext,
670 0 : ) -> Result<(), RewriteSummaryError>
671 0 : where
672 0 : F: Fn(Summary) -> Summary,
673 0 : {
674 0 : let mut file = VirtualFile::open_with_options(
675 0 : path,
676 0 : virtual_file::OpenOptions::new().read(true).write(true),
677 0 : )
678 0 : .await
679 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
680 0 : let file_id = page_cache::next_file_id();
681 0 : let block_reader = FileBlockReader::new(&file, file_id);
682 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
683 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
684 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
685 0 : return Err(RewriteSummaryError::MagicMismatch);
686 0 : }
687 0 :
688 0 : let new_summary = rewrite(actual_summary);
689 0 :
690 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
691 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
692 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
693 0 : file.seek(SeekFrom::Start(0)).await?;
694 0 : let (_buf, res) = file.write_all(buf).await;
695 0 : res?;
696 0 : Ok(())
697 0 : }
698 : }
699 :
700 : impl DeltaLayerInner {
701 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
702 : /// - inner has the success or transient failure
703 : /// - outer has the permanent failure
704 594 : pub(super) async fn load(
705 594 : path: &Utf8Path,
706 594 : summary: Option<Summary>,
707 594 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
708 594 : ctx: &RequestContext,
709 594 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
710 594 : let file = match VirtualFile::open(path).await {
711 594 : Ok(file) => file,
712 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
713 : };
714 594 : let file_id = page_cache::next_file_id();
715 594 :
716 594 : let block_reader = FileBlockReader::new(&file, file_id);
717 :
718 594 : let summary_blk = match block_reader.read_blk(0, ctx).await {
719 594 : Ok(blk) => blk,
720 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
721 : };
722 :
723 : // TODO: this should be an assertion instead; see ImageLayerInner::load
724 594 : let actual_summary =
725 594 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
726 :
727 594 : if let Some(mut expected_summary) = summary {
728 : // production code path
729 594 : expected_summary.index_start_blk = actual_summary.index_start_blk;
730 594 : expected_summary.index_root_blk = actual_summary.index_root_blk;
731 594 : if actual_summary != expected_summary {
732 0 : bail!(
733 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
734 0 : actual_summary,
735 0 : expected_summary
736 0 : );
737 594 : }
738 0 : }
739 :
740 594 : Ok(Ok(DeltaLayerInner {
741 594 : file,
742 594 : file_id,
743 594 : index_start_blk: actual_summary.index_start_blk,
744 594 : index_root_blk: actual_summary.index_root_blk,
745 594 : max_vectored_read_bytes,
746 594 : }))
747 594 : }
748 :
749 124016 : pub(super) async fn get_value_reconstruct_data(
750 124016 : &self,
751 124016 : key: Key,
752 124016 : lsn_range: Range<Lsn>,
753 124016 : reconstruct_state: &mut ValueReconstructState,
754 124016 : ctx: &RequestContext,
755 124016 : ) -> anyhow::Result<ValueReconstructResult> {
756 124016 : let mut need_image = true;
757 124016 : // Scan the page versions backwards, starting from `lsn`.
758 124016 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
759 124016 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
760 124016 : self.index_start_blk,
761 124016 : self.index_root_blk,
762 124016 : &block_reader,
763 124016 : );
764 124016 : let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
765 124016 :
766 124016 : let mut offsets: Vec<(Lsn, u64)> = Vec::new();
767 124016 :
768 124016 : tree_reader
769 124016 : .visit(
770 124016 : &search_key.0,
771 124016 : VisitDirection::Backwards,
772 124016 : |key, value| {
773 118772 : let blob_ref = BlobRef(value);
774 118772 : if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
775 46658 : return false;
776 72114 : }
777 72114 : let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
778 72114 : if entry_lsn < lsn_range.start {
779 0 : return false;
780 72114 : }
781 72114 : offsets.push((entry_lsn, blob_ref.pos()));
782 72114 :
783 72114 : !blob_ref.will_init()
784 124016 : },
785 124016 : &RequestContextBuilder::extend(ctx)
786 124016 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
787 124016 : .build(),
788 124016 : )
789 17292 : .await?;
790 :
791 124016 : let ctx = &RequestContextBuilder::extend(ctx)
792 124016 : .page_content_kind(PageContentKind::DeltaLayerValue)
793 124016 : .build();
794 124016 :
795 124016 : // Ok, 'offsets' now contains the offsets of all the entries we need to read
796 124016 : let cursor = block_reader.block_cursor();
797 124016 : let mut buf = Vec::new();
798 124016 : for (entry_lsn, pos) in offsets {
799 72114 : cursor
800 72114 : .read_blob_into_buf(pos, &mut buf, ctx)
801 5573 : .await
802 72114 : .with_context(|| {
803 0 : format!("Failed to read blob from virtual file {}", self.file.path)
804 72114 : })?;
805 72114 : let val = Value::des(&buf).with_context(|| {
806 0 : format!(
807 0 : "Failed to deserialize file blob from virtual file {}",
808 0 : self.file.path
809 0 : )
810 72114 : })?;
811 72114 : match val {
812 72114 : Value::Image(img) => {
813 72114 : reconstruct_state.img = Some((entry_lsn, img));
814 72114 : need_image = false;
815 72114 : break;
816 : }
817 0 : Value::WalRecord(rec) => {
818 0 : let will_init = rec.will_init();
819 0 : reconstruct_state.records.push((entry_lsn, rec));
820 0 : if will_init {
821 : // This WAL record initializes the page, so no need to go further back
822 0 : need_image = false;
823 0 : break;
824 0 : }
825 : }
826 : }
827 : }
828 :
829 : // If an older page image is needed to reconstruct the page, let the
830 : // caller know.
831 124016 : if need_image {
832 51902 : Ok(ValueReconstructResult::Continue)
833 : } else {
834 72114 : Ok(ValueReconstructResult::Complete)
835 : }
836 124016 : }
837 :
838 : // Look up the keys in the provided keyspace and update
839 : // the reconstruct state with whatever is found.
840 : //
841 : // If the key is cached, go no further than the cached Lsn.
842 : //
843 : // Currently, the index is visited for each range, but this
844 : // can be further optimised to visit the index only once.
845 18 : pub(super) async fn get_values_reconstruct_data(
846 18 : &self,
847 18 : keyspace: KeySpace,
848 18 : lsn_range: Range<Lsn>,
849 18 : reconstruct_state: &mut ValuesReconstructState,
850 18 : ctx: &RequestContext,
851 18 : ) -> Result<(), GetVectoredError> {
852 18 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
853 18 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
854 18 : self.index_start_blk,
855 18 : self.index_root_blk,
856 18 : block_reader,
857 18 : );
858 18 :
859 18 : let planner = VectoredReadPlanner::new(
860 18 : self.max_vectored_read_bytes
861 18 : .expect("Layer is loaded with max vectored bytes config")
862 18 : .0
863 18 : .into(),
864 18 : );
865 18 :
866 18 : let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
867 :
868 18 : let reads = Self::plan_reads(
869 18 : keyspace,
870 18 : lsn_range,
871 18 : data_end_offset,
872 18 : index_reader,
873 18 : planner,
874 18 : reconstruct_state,
875 18 : ctx,
876 18 : )
877 15 : .await
878 18 : .map_err(GetVectoredError::Other)?;
879 :
880 18 : self.do_reads_and_update_state(reads, reconstruct_state)
881 9 : .await;
882 :
883 18 : Ok(())
884 18 : }
885 :
886 220 : async fn plan_reads<Reader>(
887 220 : keyspace: KeySpace,
888 220 : lsn_range: Range<Lsn>,
889 220 : data_end_offset: u64,
890 220 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
891 220 : mut planner: VectoredReadPlanner,
892 220 : reconstruct_state: &mut ValuesReconstructState,
893 220 : ctx: &RequestContext,
894 220 : ) -> anyhow::Result<Vec<VectoredRead>>
895 220 : where
896 220 : Reader: BlockReader,
897 220 : {
898 220 : let ctx = RequestContextBuilder::extend(ctx)
899 220 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
900 220 : .build();
901 :
902 422 : for range in keyspace.ranges.iter() {
903 422 : let mut range_end_handled = false;
904 422 :
905 422 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
906 422 : let index_stream = index_reader.get_stream_from(&start_key.0, &ctx);
907 422 : let mut index_stream = std::pin::pin!(index_stream);
908 :
909 58118 : while let Some(index_entry) = index_stream.next().await {
910 58026 : let (raw_key, value) = index_entry?;
911 58026 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
912 58026 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
913 58026 : let blob_ref = BlobRef(value);
914 58026 :
915 58026 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
916 58026 : assert!(key >= range.start);
917 :
918 58026 : let outside_lsn_range = !lsn_range.contains(&lsn);
919 58026 : let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
920 :
921 58026 : let flag = {
922 58026 : if outside_lsn_range || below_cached_lsn {
923 4 : BlobFlag::Ignore
924 58022 : } else if blob_ref.will_init() {
925 374 : BlobFlag::ReplaceAll
926 : } else {
927 : // Usual path: add blob to the read
928 57648 : BlobFlag::None
929 : }
930 : };
931 :
932 58026 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
933 330 : planner.handle_range_end(blob_ref.pos());
934 330 : range_end_handled = true;
935 330 : break;
936 57696 : } else {
937 57696 : planner.handle(key, lsn, blob_ref.pos(), flag);
938 57696 : }
939 : }
940 :
941 422 : if !range_end_handled {
942 92 : tracing::info!("Handling range end fallback at {}", data_end_offset);
943 92 : planner.handle_range_end(data_end_offset);
944 330 : }
945 : }
946 :
947 220 : Ok(planner.finish())
948 220 : }
949 :
950 218 : fn get_min_read_buffer_size(
951 218 : planned_reads: &[VectoredRead],
952 218 : read_size_soft_max: usize,
953 218 : ) -> usize {
954 19742 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
955 0 : return read_size_soft_max;
956 : };
957 :
958 218 : let largest_read_size = largest_read.size();
959 218 : if largest_read_size > read_size_soft_max {
960 : // If the read is oversized, it should only contain one key.
961 200 : let offenders = largest_read
962 200 : .blobs_at
963 200 : .as_slice()
964 200 : .iter()
965 200 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
966 200 : .join(", ");
967 200 : tracing::warn!(
968 200 : "Oversized vectored read ({} > {}) for keys {}",
969 200 : largest_read_size,
970 200 : read_size_soft_max,
971 200 : offenders
972 200 : );
973 18 : }
974 :
975 218 : largest_read_size
976 218 : }
977 :
978 18 : async fn do_reads_and_update_state(
979 18 : &self,
980 18 : reads: Vec<VectoredRead>,
981 18 : reconstruct_state: &mut ValuesReconstructState,
982 18 : ) {
983 18 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
984 18 : let mut ignore_key_with_err = None;
985 18 :
986 18 : let max_vectored_read_bytes = self
987 18 : .max_vectored_read_bytes
988 18 : .expect("Layer is loaded with max vectored bytes config")
989 18 : .0
990 18 : .into();
991 18 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
992 18 : let mut buf = Some(BytesMut::with_capacity(buf_size));
993 :
994 : // Note that reads are processed in reverse order (from highest key+lsn).
995 : // This is the order that `ReconstructState` requires such that it can
996 : // track when a key is done.
997 18 : for read in reads.into_iter().rev() {
998 18 : let res = vectored_blob_reader
999 18 : .read_blobs(&read, buf.take().expect("Should have a buffer"))
1000 9 : .await;
1001 :
1002 18 : let blobs_buf = match res {
1003 18 : Ok(blobs_buf) => blobs_buf,
1004 0 : Err(err) => {
1005 0 : let kind = err.kind();
1006 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
1007 0 : reconstruct_state.on_key_error(
1008 0 : blob_meta.key,
1009 0 : PageReconstructError::from(anyhow!(
1010 0 : "Failed to read blobs from virtual file {}: {}",
1011 0 : self.file.path,
1012 0 : kind
1013 0 : )),
1014 0 : );
1015 0 : }
1016 :
1017 : // We have "lost" the buffer since the lower level IO api
1018 : // doesn't return the buffer on error. Allocate a new one.
1019 0 : buf = Some(BytesMut::with_capacity(buf_size));
1020 0 :
1021 0 : continue;
1022 : }
1023 : };
1024 :
1025 362 : for meta in blobs_buf.blobs.iter().rev() {
1026 362 : if Some(meta.meta.key) == ignore_key_with_err {
1027 0 : continue;
1028 362 : }
1029 362 :
1030 362 : let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
1031 362 : let value = match value {
1032 362 : Ok(v) => v,
1033 0 : Err(e) => {
1034 0 : reconstruct_state.on_key_error(
1035 0 : meta.meta.key,
1036 0 : PageReconstructError::from(anyhow!(e).context(format!(
1037 0 : "Failed to deserialize blob from virtual file {}",
1038 0 : self.file.path,
1039 0 : ))),
1040 0 : );
1041 0 :
1042 0 : ignore_key_with_err = Some(meta.meta.key);
1043 0 : continue;
1044 : }
1045 : };
1046 :
1047 : // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
1048 : // state, no further updates shall be made to it. The call below will
1049 : // panic if the invariant is violated.
1050 362 : reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
1051 : }
1052 :
1053 18 : buf = Some(blobs_buf.buf);
1054 : }
1055 18 : }
1056 :
1057 446 : pub(super) async fn load_keys<'a>(
1058 446 : &'a self,
1059 446 : ctx: &RequestContext,
1060 446 : ) -> Result<Vec<DeltaEntry<'a>>> {
1061 446 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1062 446 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1063 446 : self.index_start_blk,
1064 446 : self.index_root_blk,
1065 446 : block_reader,
1066 446 : );
1067 446 :
1068 446 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1069 446 :
1070 446 : tree_reader
1071 446 : .visit(
1072 446 : &[0u8; DELTA_KEY_SIZE],
1073 446 : VisitDirection::Forwards,
1074 3122006 : |key, value| {
1075 3122006 : let delta_key = DeltaKey::from_slice(key);
1076 3122006 : let val_ref = ValueRef {
1077 3122006 : blob_ref: BlobRef(value),
1078 3122006 : reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
1079 3122006 : Adapter(self),
1080 3122006 : )),
1081 3122006 : };
1082 3122006 : let pos = BlobRef(value).pos();
1083 3122006 : if let Some(last) = all_keys.last_mut() {
1084 3121560 : // subtract offset of the current and last entries to get the size
1085 3121560 : // of the value associated with this (key, lsn) tuple
1086 3121560 : let first_pos = last.size;
1087 3121560 : last.size = pos - first_pos;
1088 3121560 : }
1089 3122006 : let entry = DeltaEntry {
1090 3122006 : key: delta_key.key(),
1091 3122006 : lsn: delta_key.lsn(),
1092 3122006 : size: pos,
1093 3122006 : val: val_ref,
1094 3122006 : };
1095 3122006 : all_keys.push(entry);
1096 3122006 : true
1097 3122006 : },
1098 446 : &RequestContextBuilder::extend(ctx)
1099 446 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1100 446 : .build(),
1101 446 : )
1102 3244 : .await?;
1103 446 : if let Some(last) = all_keys.last_mut() {
1104 446 : // Last key occupies all space till end of value storage,
1105 446 : // which corresponds to beginning of the index
1106 446 : last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
1107 446 : }
1108 446 : Ok(all_keys)
1109 446 : }
1110 :
1111 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1112 4 : println!(
1113 4 : "index_start_blk: {}, root {}",
1114 4 : self.index_start_blk, self.index_root_blk
1115 4 : );
1116 4 :
1117 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1118 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1119 4 : self.index_start_blk,
1120 4 : self.index_root_blk,
1121 4 : block_reader,
1122 4 : );
1123 4 :
1124 4 : tree_reader.dump().await?;
1125 :
1126 4 : let keys = self.load_keys(ctx).await?;
1127 :
1128 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1129 8 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1130 8 : let val = Value::des(&buf)?;
1131 8 : let desc = match val {
1132 8 : Value::Image(img) => {
1133 8 : format!(" img {} bytes", img.len())
1134 : }
1135 0 : Value::WalRecord(rec) => {
1136 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
1137 0 : format!(
1138 0 : " rec {} bytes will_init: {} {}",
1139 0 : buf.len(),
1140 0 : rec.will_init(),
1141 0 : wal_desc
1142 0 : )
1143 : }
1144 : };
1145 8 : Ok(desc)
1146 8 : }
1147 :
1148 12 : for entry in keys {
1149 8 : let DeltaEntry { key, lsn, val, .. } = entry;
1150 8 : let desc = match dump_blob(&val, ctx).await {
1151 8 : Ok(desc) => desc,
1152 0 : Err(err) => {
1153 0 : format!("ERROR: {err}")
1154 : }
1155 : };
1156 8 : println!(" key {key} at {lsn}: {desc}");
1157 8 :
1158 8 : // Print more details about CHECKPOINT records. Would be nice to print details
1159 8 : // of many other record types too, but these are particularly interesting, as
1160 8 : // have a lot of special processing for them in walingest.rs.
1161 8 : use pageserver_api::key::CHECKPOINT_KEY;
1162 8 : use postgres_ffi::CheckPoint;
1163 8 : if key == CHECKPOINT_KEY {
1164 0 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1165 0 : let val = Value::des(&buf)?;
1166 0 : match val {
1167 0 : Value::Image(img) => {
1168 0 : let checkpoint = CheckPoint::decode(&img)?;
1169 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1170 : }
1171 0 : Value::WalRecord(_rec) => {
1172 0 : println!(" unexpected walrecord value for checkpoint key");
1173 0 : }
1174 : }
1175 8 : }
1176 : }
1177 :
1178 4 : Ok(())
1179 4 : }
1180 : }
1181 :
1182 : /// A set of data associated with a delta layer key and its value
1183 : pub struct DeltaEntry<'a> {
1184 : pub key: Key,
1185 : pub lsn: Lsn,
1186 : /// Size of the stored value
1187 : pub size: u64,
1188 : /// Reference to the on-disk value
1189 : pub val: ValueRef<'a>,
1190 : }
1191 :
1192 : /// Reference to an on-disk value
1193 : pub struct ValueRef<'a> {
1194 : blob_ref: BlobRef,
1195 : reader: BlockCursor<'a>,
1196 : }
1197 :
1198 : impl<'a> ValueRef<'a> {
1199 : /// Loads the value from disk
1200 3121998 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1201 : // theoretically we *could* record an access time for each, but it does not really matter
1202 3121998 : let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
1203 3121998 : let val = Value::des(&buf)?;
1204 3121998 : Ok(val)
1205 3121998 : }
1206 : }
1207 :
1208 : pub(crate) struct Adapter<T>(T);
1209 :
1210 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1211 3150684 : pub(crate) async fn read_blk(
1212 3150684 : &self,
1213 3150684 : blknum: u32,
1214 3150684 : ctx: &RequestContext,
1215 3150684 : ) -> Result<BlockLease, std::io::Error> {
1216 3150684 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1217 3150684 : block_reader.read_blk(blknum, ctx).await
1218 3150684 : }
1219 : }
1220 :
1221 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1222 6301368 : fn as_ref(&self) -> &DeltaLayerInner {
1223 6301368 : self
1224 6301368 : }
1225 : }
1226 :
1227 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1228 0 : fn key(&self) -> Key {
1229 0 : self.key
1230 0 : }
1231 0 : fn lsn(&self) -> Lsn {
1232 0 : self.lsn
1233 0 : }
1234 0 : fn size(&self) -> u64 {
1235 0 : self.size
1236 0 : }
1237 : }
1238 :
1239 : #[cfg(test)]
1240 : mod test {
1241 : use std::collections::BTreeMap;
1242 :
1243 : use itertools::MinMaxResult;
1244 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1245 : use rand::RngCore;
1246 :
1247 : use super::*;
1248 : use crate::{
1249 : context::DownloadBehavior,
1250 : task_mgr::TaskKind,
1251 : tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
1252 : DEFAULT_PG_VERSION,
1253 : };
1254 :
1255 : /// Construct an index for a fictional delta layer and and then
1256 : /// traverse in order to plan vectored reads for a query. Finally,
1257 : /// verify that the traversal fed the right index key and value
1258 : /// pairs into the planner.
1259 : #[tokio::test]
1260 2 : async fn test_delta_layer_index_traversal() {
1261 2 : let base_key = Key {
1262 2 : field1: 0,
1263 2 : field2: 1663,
1264 2 : field3: 12972,
1265 2 : field4: 16396,
1266 2 : field5: 0,
1267 2 : field6: 246080,
1268 2 : };
1269 2 :
1270 2 : // Populate the index with some entries
1271 2 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1272 2 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1273 2 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1274 2 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1275 2 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1276 2 : ]);
1277 2 :
1278 2 : let mut disk = TestDisk::default();
1279 2 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1280 2 :
1281 2 : let mut disk_offset = 0;
1282 10 : for (key, lsns) in &entries {
1283 42 : for lsn in lsns {
1284 34 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1285 34 : let blob_ref = BlobRef::new(disk_offset, false);
1286 34 : writer
1287 34 : .append(&index_key.0, blob_ref.0)
1288 34 : .expect("In memory disk append should never fail");
1289 34 :
1290 34 : disk_offset += 1;
1291 34 : }
1292 2 : }
1293 2 :
1294 2 : // Prepare all the arguments for the call into `plan_reads` below
1295 2 : let (root_offset, _writer) = writer
1296 2 : .finish()
1297 2 : .expect("In memory disk finish should never fail");
1298 2 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1299 2 : let planner = VectoredReadPlanner::new(100);
1300 2 : let mut reconstruct_state = ValuesReconstructState::new();
1301 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1302 2 :
1303 2 : let keyspace = KeySpace {
1304 2 : ranges: vec![
1305 2 : base_key..base_key.add(3),
1306 2 : base_key.add(3)..base_key.add(100),
1307 2 : ],
1308 2 : };
1309 2 : let lsn_range = Lsn(2)..Lsn(40);
1310 2 :
1311 2 : // Plan and validate
1312 2 : let vectored_reads = DeltaLayerInner::plan_reads(
1313 2 : keyspace.clone(),
1314 2 : lsn_range.clone(),
1315 2 : disk_offset,
1316 2 : reader,
1317 2 : planner,
1318 2 : &mut reconstruct_state,
1319 2 : &ctx,
1320 2 : )
1321 2 : .await
1322 2 : .expect("Read planning should not fail");
1323 2 :
1324 2 : validate(keyspace, lsn_range, vectored_reads, entries);
1325 2 : }
1326 :
1327 2 : fn validate(
1328 2 : keyspace: KeySpace,
1329 2 : lsn_range: Range<Lsn>,
1330 2 : vectored_reads: Vec<VectoredRead>,
1331 2 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1332 2 : ) {
1333 2 : #[derive(Debug, PartialEq, Eq)]
1334 2 : struct BlobSpec {
1335 2 : key: Key,
1336 2 : lsn: Lsn,
1337 2 : at: u64,
1338 2 : }
1339 2 :
1340 2 : let mut planned_blobs = Vec::new();
1341 8 : for read in vectored_reads {
1342 28 : for (at, meta) in read.blobs_at.as_slice() {
1343 28 : planned_blobs.push(BlobSpec {
1344 28 : key: meta.key,
1345 28 : lsn: meta.lsn,
1346 28 : at: *at,
1347 28 : });
1348 28 : }
1349 : }
1350 :
1351 2 : let mut expected_blobs = Vec::new();
1352 2 : let mut disk_offset = 0;
1353 10 : for (key, lsns) in index_entries {
1354 42 : for lsn in lsns {
1355 42 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1356 34 : let lsn_included = lsn_range.contains(&lsn);
1357 34 :
1358 34 : if key_included && lsn_included {
1359 28 : expected_blobs.push(BlobSpec {
1360 28 : key,
1361 28 : lsn,
1362 28 : at: disk_offset,
1363 28 : });
1364 28 : }
1365 :
1366 34 : disk_offset += 1;
1367 : }
1368 : }
1369 :
1370 2 : assert_eq!(planned_blobs, expected_blobs);
1371 2 : }
1372 :
1373 : mod constants {
1374 : use utils::lsn::Lsn;
1375 :
1376 : /// Offset used by all lsns in this test
1377 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1378 : /// Number of unique keys including in the test data
1379 : pub(super) const KEY_COUNT: u8 = 60;
1380 : /// Max number of different lsns for each key
1381 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1382 : /// Possible value sizes for each key along with a probability weight
1383 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1384 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1385 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1386 : /// The minimum size of a key range in all the generated reads
1387 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1388 : /// The number of ranges included in each vectored read
1389 : pub(super) const RANGES_COUNT: u8 = 2;
1390 : /// The number of vectored reads performed
1391 : pub(super) const READS_COUNT: u8 = 100;
1392 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1393 : /// with values larger than the limit
1394 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1395 : }
1396 :
1397 : struct Entry {
1398 : key: Key,
1399 : lsn: Lsn,
1400 : value: Vec<u8>,
1401 : }
1402 :
1403 2 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1404 2 : let mut current_key = Key::MIN;
1405 2 :
1406 2 : let mut entries = Vec::new();
1407 122 : for _ in 0..constants::KEY_COUNT {
1408 120 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1409 120 : let mut lsns_iter =
1410 2260 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1411 2260 : Some(Lsn(lsn.0 + 0x08))
1412 2260 : });
1413 120 : let mut lsns = Vec::new();
1414 2380 : while lsns.len() < count as usize {
1415 2260 : let take = rng.gen_bool(0.5);
1416 2260 : let lsn = lsns_iter.next().unwrap();
1417 2260 : if take {
1418 1112 : lsns.push(lsn);
1419 1148 : }
1420 : }
1421 :
1422 1232 : for lsn in lsns {
1423 1112 : let size = constants::VALUE_SIZES
1424 3336 : .choose_weighted(rng, |item| item.1)
1425 1112 : .unwrap()
1426 1112 : .0;
1427 1112 : let mut buf = vec![0; size];
1428 1112 : rng.fill_bytes(&mut buf);
1429 1112 :
1430 1112 : entries.push(Entry {
1431 1112 : key: current_key,
1432 1112 : lsn,
1433 1112 : value: buf,
1434 1112 : })
1435 : }
1436 :
1437 120 : let gap = constants::KEY_GAP_CHANGES
1438 240 : .choose_weighted(rng, |item| item.1)
1439 120 : .unwrap()
1440 120 : .0;
1441 120 : if gap {
1442 38 : current_key = current_key.add(2);
1443 82 : } else {
1444 82 : current_key = current_key.add(1);
1445 82 : }
1446 : }
1447 :
1448 2 : entries
1449 2 : }
1450 :
1451 : struct EntriesMeta {
1452 : key_range: Range<Key>,
1453 : lsn_range: Range<Lsn>,
1454 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1455 : }
1456 :
1457 2 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1458 1112 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1459 2 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1460 0 : _ => panic!("More than one entry is always expected"),
1461 : };
1462 :
1463 1112 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1464 2 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1465 0 : _ => panic!("More than one entry is always expected"),
1466 : };
1467 :
1468 2 : let mut index = BTreeMap::new();
1469 1112 : for entry in entries.iter() {
1470 1112 : index.insert((entry.key, entry.lsn), entry.value.clone());
1471 1112 : }
1472 :
1473 2 : EntriesMeta {
1474 2 : key_range,
1475 2 : lsn_range,
1476 2 : index,
1477 2 : }
1478 2 : }
1479 :
1480 200 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1481 200 : let start = key_range.start.to_i128();
1482 200 : let end = key_range.end.to_i128();
1483 200 :
1484 200 : let mut keyspace = KeySpace::default();
1485 :
1486 600 : for _ in 0..constants::RANGES_COUNT {
1487 400 : let mut range: Option<Range<Key>> = Option::default();
1488 1244 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1489 844 : let range_start = rng.gen_range(start..end);
1490 844 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1491 844 : if range_end_offset >= end {
1492 100 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1493 744 : } else {
1494 744 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1495 744 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1496 744 : }
1497 : }
1498 400 : keyspace.ranges.push(range.unwrap());
1499 : }
1500 :
1501 200 : keyspace
1502 200 : }
1503 :
1504 : #[tokio::test]
1505 2 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1506 2 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?;
1507 2 : let (tenant, ctx) = harness.load().await;
1508 2 :
1509 2 : let timeline_id = TimelineId::generate();
1510 2 : let timeline = tenant
1511 2 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1512 6 : .await?;
1513 2 :
1514 2 : tracing::info!("Generating test data ...");
1515 2 :
1516 2 : let rng = &mut StdRng::seed_from_u64(0);
1517 2 : let entries = generate_entries(rng);
1518 2 : let entries_meta = get_entries_meta(&entries);
1519 2 :
1520 2 : tracing::info!("Done generating {} entries", entries.len());
1521 2 :
1522 2 : tracing::info!("Writing test data to delta layer ...");
1523 2 : let mut writer = DeltaLayerWriter::new(
1524 2 : harness.conf,
1525 2 : timeline_id,
1526 2 : harness.tenant_shard_id,
1527 2 : entries_meta.key_range.start,
1528 2 : entries_meta.lsn_range.clone(),
1529 2 : )
1530 2 : .await?;
1531 2 :
1532 1114 : for entry in entries {
1533 1112 : let (_, res) = writer
1534 1112 : .put_value_bytes(entry.key, entry.lsn, entry.value, false)
1535 220 : .await;
1536 1112 : res?;
1537 2 : }
1538 2 :
1539 5 : let resident = writer.finish(entries_meta.key_range.end, &timeline).await?;
1540 2 :
1541 2 : let inner = resident.get_inner_delta(&ctx).await?;
1542 2 :
1543 2 : let file_size = inner.file.metadata().await?.len();
1544 2 : tracing::info!(
1545 2 : "Done writing test data to delta layer. Resulting file size is: {}",
1546 2 : file_size
1547 2 : );
1548 2 :
1549 202 : for i in 0..constants::READS_COUNT {
1550 200 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1551 2 :
1552 200 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1553 200 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1554 200 : inner.index_start_blk,
1555 200 : inner.index_root_blk,
1556 200 : block_reader,
1557 200 : );
1558 200 :
1559 200 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1560 200 : let mut reconstruct_state = ValuesReconstructState::new();
1561 200 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1562 200 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1563 2 :
1564 200 : let vectored_reads = DeltaLayerInner::plan_reads(
1565 200 : keyspace.clone(),
1566 200 : entries_meta.lsn_range.clone(),
1567 200 : data_end_offset,
1568 200 : index_reader,
1569 200 : planner,
1570 200 : &mut reconstruct_state,
1571 200 : &ctx,
1572 200 : )
1573 3 : .await?;
1574 2 :
1575 200 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1576 200 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1577 200 : &vectored_reads,
1578 200 : constants::MAX_VECTORED_READ_BYTES,
1579 200 : );
1580 200 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1581 2 :
1582 19924 : for read in vectored_reads {
1583 19724 : let blobs_buf = vectored_blob_reader
1584 19724 : .read_blobs(&read, buf.take().expect("Should have a buffer"))
1585 10017 : .await?;
1586 57304 : for meta in blobs_buf.blobs.iter() {
1587 57304 : let value = &blobs_buf.buf[meta.start..meta.end];
1588 57304 : assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
1589 2 : }
1590 2 :
1591 19724 : buf = Some(blobs_buf.buf);
1592 2 : }
1593 2 : }
1594 2 :
1595 2 : Ok(())
1596 2 : }
1597 : }
|