Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::{self, FileId, PAGE_SZ};
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
37 : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
38 : use crate::tenant::timeline::GetVectoredError;
39 : use crate::tenant::vectored_blob_io::{
40 : BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
41 : };
42 : use crate::tenant::{PageReconstructError, Timeline};
43 : use crate::virtual_file::{self, VirtualFile};
44 : use crate::{walrecord, TEMP_FILE_SUFFIX};
45 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
46 : use anyhow::{anyhow, bail, ensure, Context, Result};
47 : use bytes::BytesMut;
48 : use camino::{Utf8Path, Utf8PathBuf};
49 : use futures::StreamExt;
50 : use itertools::Itertools;
51 : use pageserver_api::keyspace::KeySpace;
52 : use pageserver_api::models::LayerAccessKind;
53 : use pageserver_api::shard::TenantShardId;
54 : use rand::{distributions::Alphanumeric, Rng};
55 : use serde::{Deserialize, Serialize};
56 : use std::fs::File;
57 : use std::io::SeekFrom;
58 : use std::ops::Range;
59 : use std::os::unix::fs::FileExt;
60 : use std::sync::Arc;
61 : use tokio::sync::OnceCell;
62 : use tracing::*;
63 :
64 : use utils::{
65 : bin_ser::BeSer,
66 : id::{TenantId, TimelineId},
67 : lsn::Lsn,
68 : };
69 :
70 : use super::{
71 : AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
72 : };
73 :
74 : ///
75 : /// Header stored in the beginning of the file
76 : ///
77 : /// After this comes the 'values' part, starting on block 1. After that,
78 : /// the 'index' starts at the block indicated by 'index_start_blk'
79 : ///
80 606 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
81 : pub struct Summary {
82 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
83 : pub magic: u16,
84 : pub format_version: u16,
85 :
86 : pub tenant_id: TenantId,
87 : pub timeline_id: TimelineId,
88 : pub key_range: Range<Key>,
89 : pub lsn_range: Range<Lsn>,
90 :
91 : /// Block number where the 'index' part of the file begins.
92 : pub index_start_blk: u32,
93 : /// Block within the 'index', where the B-tree root page is stored
94 : pub index_root_blk: u32,
95 : }
96 :
97 : impl From<&DeltaLayer> for Summary {
98 0 : fn from(layer: &DeltaLayer) -> Self {
99 0 : Self::expected(
100 0 : layer.desc.tenant_shard_id.tenant_id,
101 0 : layer.desc.timeline_id,
102 0 : layer.desc.key_range.clone(),
103 0 : layer.desc.lsn_range.clone(),
104 0 : )
105 0 : }
106 : }
107 :
108 : impl Summary {
109 606 : pub(super) fn expected(
110 606 : tenant_id: TenantId,
111 606 : timeline_id: TimelineId,
112 606 : keys: Range<Key>,
113 606 : lsns: Range<Lsn>,
114 606 : ) -> Self {
115 606 : Self {
116 606 : magic: DELTA_FILE_MAGIC,
117 606 : format_version: STORAGE_FORMAT_VERSION,
118 606 :
119 606 : tenant_id,
120 606 : timeline_id,
121 606 : key_range: keys,
122 606 : lsn_range: lsns,
123 606 :
124 606 : index_start_blk: 0,
125 606 : index_root_blk: 0,
126 606 : }
127 606 : }
128 : }
129 :
130 : // Flag indicating that this version initialize the page
131 : const WILL_INIT: u64 = 1;
132 :
133 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
134 : /// offset, and for WAL records it also contains `will_init` flag. The flag
135 : /// helps to determine the range of records that needs to be applied, without
136 : /// reading/deserializing records themselves.
137 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
138 : pub struct BlobRef(pub u64);
139 :
140 : impl BlobRef {
141 130003 : pub fn will_init(&self) -> bool {
142 130003 : (self.0 & WILL_INIT) != 0
143 130003 : }
144 :
145 6374223 : pub fn pos(&self) -> u64 {
146 6374223 : self.0 >> 1
147 6374223 : }
148 :
149 6352726 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
150 6352726 : let mut blob_ref = pos << 1;
151 6352726 : if will_init {
152 6351500 : blob_ref |= WILL_INIT;
153 6351500 : }
154 6352726 : BlobRef(blob_ref)
155 6352726 : }
156 : }
157 :
158 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
159 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
160 :
161 : /// This is the key of the B-tree index stored in the delta layer. It consists
162 : /// of the serialized representation of a Key and LSN.
163 : impl DeltaKey {
164 3122158 : fn from_slice(buf: &[u8]) -> Self {
165 3122158 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
166 3122158 : bytes.copy_from_slice(buf);
167 3122158 : DeltaKey(bytes)
168 3122158 : }
169 :
170 6476320 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
171 6476320 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
172 6476320 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
173 6476320 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
174 6476320 : DeltaKey(bytes)
175 6476320 : }
176 :
177 3122158 : fn key(&self) -> Key {
178 3122158 : Key::from_slice(&self.0)
179 3122158 : }
180 :
181 3122158 : fn lsn(&self) -> Lsn {
182 3122158 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
183 3122158 : }
184 :
185 130007 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
186 130007 : let mut lsn_buf = [0u8; 8];
187 130007 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
188 130007 : Lsn(u64::from_be_bytes(lsn_buf))
189 130007 : }
190 : }
191 :
192 : /// This is used only from `pagectl`. Within pageserver, all layers are
193 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
194 : pub struct DeltaLayer {
195 : path: Utf8PathBuf,
196 : pub desc: PersistentLayerDesc,
197 : access_stats: LayerAccessStats,
198 : inner: OnceCell<Arc<DeltaLayerInner>>,
199 : }
200 :
201 : impl std::fmt::Debug for DeltaLayer {
202 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 0 : use super::RangeDisplayDebug;
204 0 :
205 0 : f.debug_struct("DeltaLayer")
206 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
207 0 : .field("lsn_range", &self.desc.lsn_range)
208 0 : .field("file_size", &self.desc.file_size)
209 0 : .field("inner", &self.inner)
210 0 : .finish()
211 0 : }
212 : }
213 :
214 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
215 : /// file.
216 : pub struct DeltaLayerInner {
217 : // values copied from summary
218 : index_start_blk: u32,
219 : index_root_blk: u32,
220 :
221 : file: VirtualFile,
222 : file_id: FileId,
223 :
224 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
225 : }
226 :
227 : impl std::fmt::Debug for DeltaLayerInner {
228 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 0 : f.debug_struct("DeltaLayerInner")
230 0 : .field("index_start_blk", &self.index_start_blk)
231 0 : .field("index_root_blk", &self.index_root_blk)
232 0 : .finish()
233 0 : }
234 : }
235 :
236 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
237 : impl std::fmt::Display for DeltaLayer {
238 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 0 : write!(f, "{}", self.layer_desc().short_id())
240 0 : }
241 : }
242 :
243 : impl AsLayerDesc for DeltaLayer {
244 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
245 0 : &self.desc
246 0 : }
247 : }
248 :
249 : impl DeltaLayer {
250 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
251 0 : self.desc.dump();
252 0 :
253 0 : if !verbose {
254 0 : return Ok(());
255 0 : }
256 :
257 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
258 :
259 0 : inner.dump(ctx).await
260 0 : }
261 :
262 852 : fn temp_path_for(
263 852 : conf: &PageServerConf,
264 852 : tenant_shard_id: &TenantShardId,
265 852 : timeline_id: &TimelineId,
266 852 : key_start: Key,
267 852 : lsn_range: &Range<Lsn>,
268 852 : ) -> Utf8PathBuf {
269 852 : let rand_string: String = rand::thread_rng()
270 852 : .sample_iter(&Alphanumeric)
271 852 : .take(8)
272 852 : .map(char::from)
273 852 : .collect();
274 852 :
275 852 : conf.timeline_path(tenant_shard_id, timeline_id)
276 852 : .join(format!(
277 852 : "{}-XXX__{:016X}-{:016X}.{}.{}",
278 852 : key_start,
279 852 : u64::from(lsn_range.start),
280 852 : u64::from(lsn_range.end),
281 852 : rand_string,
282 852 : TEMP_FILE_SUFFIX,
283 852 : ))
284 852 : }
285 :
286 : ///
287 : /// Open the underlying file and read the metadata into memory, if it's
288 : /// not loaded already.
289 : ///
290 0 : async fn load(
291 0 : &self,
292 0 : access_kind: LayerAccessKind,
293 0 : ctx: &RequestContext,
294 0 : ) -> Result<&Arc<DeltaLayerInner>> {
295 0 : self.access_stats.record_access(access_kind, ctx);
296 0 : // Quick exit if already loaded
297 0 : self.inner
298 0 : .get_or_try_init(|| self.load_inner(ctx))
299 0 : .await
300 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
301 0 : }
302 :
303 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
304 0 : let path = self.path();
305 :
306 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx)
307 0 : .await
308 0 : .and_then(|res| res)?;
309 :
310 : // not production code
311 0 : let actual_filename = path.file_name().unwrap().to_owned();
312 0 : let expected_filename = self.layer_desc().filename().file_name();
313 0 :
314 0 : if actual_filename != expected_filename {
315 0 : println!("warning: filename does not match what is expected from in-file summary");
316 0 : println!("actual: {:?}", actual_filename);
317 0 : println!("expected: {:?}", expected_filename);
318 0 : }
319 :
320 0 : Ok(Arc::new(loaded))
321 0 : }
322 :
323 : /// Create a DeltaLayer struct representing an existing file on disk.
324 : ///
325 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
326 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
327 0 : let mut summary_buf = vec![0; PAGE_SZ];
328 0 : file.read_exact_at(&mut summary_buf, 0)?;
329 0 : let summary = Summary::des_prefix(&summary_buf)?;
330 :
331 0 : let metadata = file
332 0 : .metadata()
333 0 : .context("get file metadata to determine size")?;
334 :
335 : // This function is never used for constructing layers in a running pageserver,
336 : // so it does not need an accurate TenantShardId.
337 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
338 0 :
339 0 : Ok(DeltaLayer {
340 0 : path: path.to_path_buf(),
341 0 : desc: PersistentLayerDesc::new_delta(
342 0 : tenant_shard_id,
343 0 : summary.timeline_id,
344 0 : summary.key_range,
345 0 : summary.lsn_range,
346 0 : metadata.len(),
347 0 : ),
348 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
349 0 : inner: OnceCell::new(),
350 0 : })
351 0 : }
352 :
353 : /// Path to the layer file in pageserver workdir.
354 0 : fn path(&self) -> Utf8PathBuf {
355 0 : self.path.clone()
356 0 : }
357 : }
358 :
359 : /// A builder object for constructing a new delta layer.
360 : ///
361 : /// Usage:
362 : ///
363 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
364 : ///
365 : /// 2. Write the contents by calling `put_value` for every page
366 : /// version to store in the layer.
367 : ///
368 : /// 3. Call `finish`.
369 : ///
370 : struct DeltaLayerWriterInner {
371 : conf: &'static PageServerConf,
372 : pub path: Utf8PathBuf,
373 : timeline_id: TimelineId,
374 : tenant_shard_id: TenantShardId,
375 :
376 : key_start: Key,
377 : lsn_range: Range<Lsn>,
378 :
379 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
380 :
381 : blob_writer: BlobWriter<true>,
382 : }
383 :
384 : impl DeltaLayerWriterInner {
385 : ///
386 : /// Start building a new delta layer.
387 : ///
388 852 : async fn new(
389 852 : conf: &'static PageServerConf,
390 852 : timeline_id: TimelineId,
391 852 : tenant_shard_id: TenantShardId,
392 852 : key_start: Key,
393 852 : lsn_range: Range<Lsn>,
394 852 : ) -> anyhow::Result<Self> {
395 852 : // Create the file initially with a temporary filename. We don't know
396 852 : // the end key yet, so we cannot form the final filename yet. We will
397 852 : // rename it when we're done.
398 852 : //
399 852 : // Note: This overwrites any existing file. There shouldn't be any.
400 852 : // FIXME: throw an error instead?
401 852 : let path =
402 852 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
403 :
404 852 : let mut file = VirtualFile::create(&path).await?;
405 : // make room for the header block
406 852 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
407 852 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
408 852 :
409 852 : // Initialize the b-tree index builder
410 852 : let block_buf = BlockBuf::new();
411 852 : let tree_builder = DiskBtreeBuilder::new(block_buf);
412 852 :
413 852 : Ok(Self {
414 852 : conf,
415 852 : path,
416 852 : timeline_id,
417 852 : tenant_shard_id,
418 852 : key_start,
419 852 : lsn_range,
420 852 : tree: tree_builder,
421 852 : blob_writer,
422 852 : })
423 852 : }
424 :
425 : ///
426 : /// Append a key-value pair to the file.
427 : ///
428 : /// The values must be appended in key, lsn order.
429 : ///
430 3121998 : async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
431 3121998 : let (_, res) = self
432 3121998 : .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init())
433 14778 : .await;
434 3121998 : res
435 3121998 : }
436 :
437 6352622 : async fn put_value_bytes(
438 6352622 : &mut self,
439 6352622 : key: Key,
440 6352622 : lsn: Lsn,
441 6352622 : val: Vec<u8>,
442 6352622 : will_init: bool,
443 6352622 : ) -> (Vec<u8>, anyhow::Result<()>) {
444 6352622 : assert!(self.lsn_range.start <= lsn);
445 6352622 : let (val, res) = self.blob_writer.write_blob(val).await;
446 6352622 : let off = match res {
447 6352622 : Ok(off) => off,
448 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
449 : };
450 :
451 6352622 : let blob_ref = BlobRef::new(off, will_init);
452 6352622 :
453 6352622 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
454 6352622 : let res = self.tree.append(&delta_key.0, blob_ref.0);
455 6352622 : (val, res.map_err(|e| anyhow::anyhow!(e)))
456 6352622 : }
457 :
458 3029956 : fn size(&self) -> u64 {
459 3029956 : self.blob_writer.size() + self.tree.borrow_writer().size()
460 3029956 : }
461 :
462 : ///
463 : /// Finish writing the delta layer.
464 : ///
465 852 : async fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
466 852 : let index_start_blk =
467 852 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
468 :
469 852 : let mut file = self.blob_writer.into_inner().await?;
470 :
471 : // Write out the index
472 852 : let (index_root_blk, block_buf) = self.tree.finish()?;
473 852 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
474 0 : .await?;
475 13606 : for buf in block_buf.blocks {
476 12754 : let (_buf, res) = file.write_all(buf).await;
477 12754 : res?;
478 : }
479 852 : assert!(self.lsn_range.start < self.lsn_range.end);
480 : // Fill in the summary on blk 0
481 852 : let summary = Summary {
482 852 : magic: DELTA_FILE_MAGIC,
483 852 : format_version: STORAGE_FORMAT_VERSION,
484 852 : tenant_id: self.tenant_shard_id.tenant_id,
485 852 : timeline_id: self.timeline_id,
486 852 : key_range: self.key_start..key_end,
487 852 : lsn_range: self.lsn_range.clone(),
488 852 : index_start_blk,
489 852 : index_root_blk,
490 852 : };
491 852 :
492 852 : let mut buf = Vec::with_capacity(PAGE_SZ);
493 852 : // TODO: could use smallvec here but it's a pain with Slice<T>
494 852 : Summary::ser_into(&summary, &mut buf)?;
495 852 : file.seek(SeekFrom::Start(0)).await?;
496 852 : let (_buf, res) = file.write_all(buf).await;
497 852 : res?;
498 :
499 852 : let metadata = file
500 852 : .metadata()
501 428 : .await
502 852 : .context("get file metadata to determine size")?;
503 :
504 : // 5GB limit for objects without multipart upload (which we don't want to use)
505 : // Make it a little bit below to account for differing GB units
506 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
507 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
508 852 : ensure!(
509 852 : metadata.len() <= S3_UPLOAD_LIMIT,
510 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
511 0 : file.path,
512 0 : metadata.len()
513 : );
514 :
515 : // Note: Because we opened the file in write-only mode, we cannot
516 : // reuse the same VirtualFile for reading later. That's why we don't
517 : // set inner.file here. The first read will have to re-open it.
518 :
519 852 : let desc = PersistentLayerDesc::new_delta(
520 852 : self.tenant_shard_id,
521 852 : self.timeline_id,
522 852 : self.key_start..key_end,
523 852 : self.lsn_range.clone(),
524 852 : metadata.len(),
525 852 : );
526 852 :
527 852 : // fsync the file
528 852 : file.sync_all().await?;
529 :
530 852 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
531 :
532 852 : trace!("created delta layer {}", layer.local_path());
533 :
534 852 : Ok(layer)
535 852 : }
536 : }
537 :
538 : /// A builder object for constructing a new delta layer.
539 : ///
540 : /// Usage:
541 : ///
542 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
543 : ///
544 : /// 2. Write the contents by calling `put_value` for every page
545 : /// version to store in the layer.
546 : ///
547 : /// 3. Call `finish`.
548 : ///
549 : /// # Note
550 : ///
551 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
552 : /// possible for the writer to drop before `finish` is actually called. So this
553 : /// could lead to odd temporary files in the directory, exhausting file system.
554 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
555 : /// implementation that cleans up the temporary file in failure. It's not
556 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
557 : /// out some fields, making it impossible to implement `Drop`.
558 : ///
559 : #[must_use]
560 : pub struct DeltaLayerWriter {
561 : inner: Option<DeltaLayerWriterInner>,
562 : }
563 :
564 : impl DeltaLayerWriter {
565 : ///
566 : /// Start building a new delta layer.
567 : ///
568 852 : pub async fn new(
569 852 : conf: &'static PageServerConf,
570 852 : timeline_id: TimelineId,
571 852 : tenant_shard_id: TenantShardId,
572 852 : key_start: Key,
573 852 : lsn_range: Range<Lsn>,
574 852 : ) -> anyhow::Result<Self> {
575 852 : Ok(Self {
576 852 : inner: Some(
577 852 : DeltaLayerWriterInner::new(
578 852 : conf,
579 852 : timeline_id,
580 852 : tenant_shard_id,
581 852 : key_start,
582 852 : lsn_range,
583 852 : )
584 442 : .await?,
585 : ),
586 : })
587 852 : }
588 :
589 : ///
590 : /// Append a key-value pair to the file.
591 : ///
592 : /// The values must be appended in key, lsn order.
593 : ///
594 3121998 : pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
595 3121998 : self.inner.as_mut().unwrap().put_value(key, lsn, val).await
596 3121998 : }
597 :
598 3230624 : pub async fn put_value_bytes(
599 3230624 : &mut self,
600 3230624 : key: Key,
601 3230624 : lsn: Lsn,
602 3230624 : val: Vec<u8>,
603 3230624 : will_init: bool,
604 3230624 : ) -> (Vec<u8>, anyhow::Result<()>) {
605 3230624 : self.inner
606 3230624 : .as_mut()
607 3230624 : .unwrap()
608 3230624 : .put_value_bytes(key, lsn, val, will_init)
609 18113 : .await
610 3230624 : }
611 :
612 3029956 : pub fn size(&self) -> u64 {
613 3029956 : self.inner.as_ref().unwrap().size()
614 3029956 : }
615 :
616 : ///
617 : /// Finish writing the delta layer.
618 : ///
619 852 : pub(crate) async fn finish(
620 852 : mut self,
621 852 : key_end: Key,
622 852 : timeline: &Arc<Timeline>,
623 852 : ) -> anyhow::Result<ResidentLayer> {
624 852 : let inner = self.inner.take().unwrap();
625 852 : let temp_path = inner.path.clone();
626 8135 : let result = inner.finish(key_end, timeline).await;
627 : // The delta layer files can sometimes be really large. Clean them up.
628 852 : if result.is_err() {
629 0 : tracing::warn!(
630 0 : "Cleaning up temporary delta file {temp_path} after error during writing"
631 0 : );
632 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
633 0 : tracing::warn!("Error cleaning up temporary delta layer file {temp_path}: {e:?}")
634 0 : }
635 852 : }
636 852 : result
637 852 : }
638 : }
639 :
640 : impl Drop for DeltaLayerWriter {
641 852 : fn drop(&mut self) {
642 852 : if let Some(inner) = self.inner.take() {
643 0 : // We want to remove the virtual file here, so it's fine to not
644 0 : // having completely flushed unwritten data.
645 0 : let vfile = inner.blob_writer.into_inner_no_flush();
646 0 : vfile.remove();
647 852 : }
648 852 : }
649 : }
650 :
651 0 : #[derive(thiserror::Error, Debug)]
652 : pub enum RewriteSummaryError {
653 : #[error("magic mismatch")]
654 : MagicMismatch,
655 : #[error(transparent)]
656 : Other(#[from] anyhow::Error),
657 : }
658 :
659 : impl From<std::io::Error> for RewriteSummaryError {
660 0 : fn from(e: std::io::Error) -> Self {
661 0 : Self::Other(anyhow::anyhow!(e))
662 0 : }
663 : }
664 :
665 : impl DeltaLayer {
666 0 : pub async fn rewrite_summary<F>(
667 0 : path: &Utf8Path,
668 0 : rewrite: F,
669 0 : ctx: &RequestContext,
670 0 : ) -> Result<(), RewriteSummaryError>
671 0 : where
672 0 : F: Fn(Summary) -> Summary,
673 0 : {
674 0 : let mut file = VirtualFile::open_with_options(
675 0 : path,
676 0 : virtual_file::OpenOptions::new().read(true).write(true),
677 0 : )
678 0 : .await
679 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
680 0 : let file_id = page_cache::next_file_id();
681 0 : let block_reader = FileBlockReader::new(&file, file_id);
682 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
683 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
684 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
685 0 : return Err(RewriteSummaryError::MagicMismatch);
686 0 : }
687 0 :
688 0 : let new_summary = rewrite(actual_summary);
689 0 :
690 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
691 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
692 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
693 0 : file.seek(SeekFrom::Start(0)).await?;
694 0 : let (_buf, res) = file.write_all(buf).await;
695 0 : res?;
696 0 : Ok(())
697 0 : }
698 : }
699 :
700 : impl DeltaLayerInner {
701 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
702 : /// - inner has the success or transient failure
703 : /// - outer has the permanent failure
704 606 : pub(super) async fn load(
705 606 : path: &Utf8Path,
706 606 : summary: Option<Summary>,
707 606 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
708 606 : ctx: &RequestContext,
709 606 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
710 606 : let file = match VirtualFile::open(path).await {
711 606 : Ok(file) => file,
712 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
713 : };
714 606 : let file_id = page_cache::next_file_id();
715 606 :
716 606 : let block_reader = FileBlockReader::new(&file, file_id);
717 :
718 606 : let summary_blk = match block_reader.read_blk(0, ctx).await {
719 606 : Ok(blk) => blk,
720 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
721 : };
722 :
723 : // TODO: this should be an assertion instead; see ImageLayerInner::load
724 606 : let actual_summary =
725 606 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
726 :
727 606 : if let Some(mut expected_summary) = summary {
728 : // production code path
729 606 : expected_summary.index_start_blk = actual_summary.index_start_blk;
730 606 : expected_summary.index_root_blk = actual_summary.index_root_blk;
731 606 : if actual_summary != expected_summary {
732 0 : bail!(
733 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
734 0 : actual_summary,
735 0 : expected_summary
736 0 : );
737 606 : }
738 0 : }
739 :
740 606 : Ok(Ok(DeltaLayerInner {
741 606 : file,
742 606 : file_id,
743 606 : index_start_blk: actual_summary.index_start_blk,
744 606 : index_root_blk: actual_summary.index_root_blk,
745 606 : max_vectored_read_bytes,
746 606 : }))
747 606 : }
748 :
749 123242 : pub(super) async fn get_value_reconstruct_data(
750 123242 : &self,
751 123242 : key: Key,
752 123242 : lsn_range: Range<Lsn>,
753 123242 : reconstruct_state: &mut ValueReconstructState,
754 123242 : ctx: &RequestContext,
755 123242 : ) -> anyhow::Result<ValueReconstructResult> {
756 123242 : let mut need_image = true;
757 123242 : // Scan the page versions backwards, starting from `lsn`.
758 123242 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
759 123242 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
760 123242 : self.index_start_blk,
761 123242 : self.index_root_blk,
762 123242 : &block_reader,
763 123242 : );
764 123242 : let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
765 123242 :
766 123242 : let mut offsets: Vec<(Lsn, u64)> = Vec::new();
767 123242 :
768 123242 : tree_reader
769 123242 : .visit(
770 123242 : &search_key.0,
771 123242 : VisitDirection::Backwards,
772 123242 : |key, value| {
773 118006 : let blob_ref = BlobRef(value);
774 118006 : if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
775 46025 : return false;
776 71981 : }
777 71981 : let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
778 71981 : if entry_lsn < lsn_range.start {
779 0 : return false;
780 71981 : }
781 71981 : offsets.push((entry_lsn, blob_ref.pos()));
782 71981 :
783 71981 : !blob_ref.will_init()
784 123242 : },
785 123242 : &RequestContextBuilder::extend(ctx)
786 123242 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
787 123242 : .build(),
788 123242 : )
789 17146 : .await?;
790 :
791 123242 : let ctx = &RequestContextBuilder::extend(ctx)
792 123242 : .page_content_kind(PageContentKind::DeltaLayerValue)
793 123242 : .build();
794 123242 :
795 123242 : // Ok, 'offsets' now contains the offsets of all the entries we need to read
796 123242 : let cursor = block_reader.block_cursor();
797 123242 : let mut buf = Vec::new();
798 123242 : for (entry_lsn, pos) in offsets {
799 71981 : cursor
800 71981 : .read_blob_into_buf(pos, &mut buf, ctx)
801 5553 : .await
802 71981 : .with_context(|| {
803 0 : format!("Failed to read blob from virtual file {}", self.file.path)
804 71981 : })?;
805 71981 : let val = Value::des(&buf).with_context(|| {
806 0 : format!(
807 0 : "Failed to deserialize file blob from virtual file {}",
808 0 : self.file.path
809 0 : )
810 71981 : })?;
811 71981 : match val {
812 71981 : Value::Image(img) => {
813 71981 : reconstruct_state.img = Some((entry_lsn, img));
814 71981 : need_image = false;
815 71981 : break;
816 : }
817 0 : Value::WalRecord(rec) => {
818 0 : let will_init = rec.will_init();
819 0 : reconstruct_state.records.push((entry_lsn, rec));
820 0 : if will_init {
821 : // This WAL record initializes the page, so no need to go further back
822 0 : need_image = false;
823 0 : break;
824 0 : }
825 : }
826 : }
827 : }
828 :
829 : // If an older page image is needed to reconstruct the page, let the
830 : // caller know.
831 123242 : if need_image {
832 51261 : Ok(ValueReconstructResult::Continue)
833 : } else {
834 71981 : Ok(ValueReconstructResult::Complete)
835 : }
836 123242 : }
837 :
838 : // Look up the keys in the provided keyspace and update
839 : // the reconstruct state with whatever is found.
840 : //
841 : // If the key is cached, go no further than the cached Lsn.
842 : //
843 : // Currently, the index is visited for each range, but this
844 : // can be further optimised to visit the index only once.
845 18 : pub(super) async fn get_values_reconstruct_data(
846 18 : &self,
847 18 : keyspace: KeySpace,
848 18 : lsn_range: Range<Lsn>,
849 18 : reconstruct_state: &mut ValuesReconstructState,
850 18 : ctx: &RequestContext,
851 18 : ) -> Result<(), GetVectoredError> {
852 18 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
853 18 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
854 18 : self.index_start_blk,
855 18 : self.index_root_blk,
856 18 : block_reader,
857 18 : );
858 18 :
859 18 : let planner = VectoredReadPlanner::new(
860 18 : self.max_vectored_read_bytes
861 18 : .expect("Layer is loaded with max vectored bytes config")
862 18 : .0
863 18 : .into(),
864 18 : );
865 18 :
866 18 : let data_end_offset = self.index_start_offset();
867 :
868 18 : let reads = Self::plan_reads(
869 18 : keyspace,
870 18 : lsn_range,
871 18 : data_end_offset,
872 18 : index_reader,
873 18 : planner,
874 18 : reconstruct_state,
875 18 : ctx,
876 18 : )
877 15 : .await
878 18 : .map_err(GetVectoredError::Other)?;
879 :
880 18 : self.do_reads_and_update_state(reads, reconstruct_state)
881 9 : .await;
882 :
883 18 : Ok(())
884 18 : }
885 :
886 220 : async fn plan_reads<Reader>(
887 220 : keyspace: KeySpace,
888 220 : lsn_range: Range<Lsn>,
889 220 : data_end_offset: u64,
890 220 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
891 220 : mut planner: VectoredReadPlanner,
892 220 : reconstruct_state: &mut ValuesReconstructState,
893 220 : ctx: &RequestContext,
894 220 : ) -> anyhow::Result<Vec<VectoredRead>>
895 220 : where
896 220 : Reader: BlockReader,
897 220 : {
898 220 : let ctx = RequestContextBuilder::extend(ctx)
899 220 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
900 220 : .build();
901 :
902 422 : for range in keyspace.ranges.iter() {
903 422 : let mut range_end_handled = false;
904 422 :
905 422 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
906 422 : let index_stream = index_reader.get_stream_from(&start_key.0, &ctx);
907 422 : let mut index_stream = std::pin::pin!(index_stream);
908 :
909 58118 : while let Some(index_entry) = index_stream.next().await {
910 58026 : let (raw_key, value) = index_entry?;
911 58026 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
912 58026 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
913 58026 : let blob_ref = BlobRef(value);
914 58026 :
915 58026 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
916 58026 : assert!(key >= range.start);
917 :
918 58026 : let outside_lsn_range = !lsn_range.contains(&lsn);
919 58026 : let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
920 :
921 58026 : let flag = {
922 58026 : if outside_lsn_range || below_cached_lsn {
923 4 : BlobFlag::Ignore
924 58022 : } else if blob_ref.will_init() {
925 374 : BlobFlag::ReplaceAll
926 : } else {
927 : // Usual path: add blob to the read
928 57648 : BlobFlag::None
929 : }
930 : };
931 :
932 58026 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
933 330 : planner.handle_range_end(blob_ref.pos());
934 330 : range_end_handled = true;
935 330 : break;
936 57696 : } else {
937 57696 : planner.handle(key, lsn, blob_ref.pos(), flag);
938 57696 : }
939 : }
940 :
941 422 : if !range_end_handled {
942 92 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
943 92 : planner.handle_range_end(data_end_offset);
944 330 : }
945 : }
946 :
947 220 : Ok(planner.finish())
948 220 : }
949 :
950 218 : fn get_min_read_buffer_size(
951 218 : planned_reads: &[VectoredRead],
952 218 : read_size_soft_max: usize,
953 218 : ) -> usize {
954 19742 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
955 0 : return read_size_soft_max;
956 : };
957 :
958 218 : let largest_read_size = largest_read.size();
959 218 : if largest_read_size > read_size_soft_max {
960 : // If the read is oversized, it should only contain one key.
961 200 : let offenders = largest_read
962 200 : .blobs_at
963 200 : .as_slice()
964 200 : .iter()
965 200 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
966 200 : .join(", ");
967 200 : tracing::warn!(
968 200 : "Oversized vectored read ({} > {}) for keys {}",
969 200 : largest_read_size,
970 200 : read_size_soft_max,
971 200 : offenders
972 200 : );
973 18 : }
974 :
975 218 : largest_read_size
976 218 : }
977 :
978 18 : async fn do_reads_and_update_state(
979 18 : &self,
980 18 : reads: Vec<VectoredRead>,
981 18 : reconstruct_state: &mut ValuesReconstructState,
982 18 : ) {
983 18 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
984 18 : let mut ignore_key_with_err = None;
985 18 :
986 18 : let max_vectored_read_bytes = self
987 18 : .max_vectored_read_bytes
988 18 : .expect("Layer is loaded with max vectored bytes config")
989 18 : .0
990 18 : .into();
991 18 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
992 18 : let mut buf = Some(BytesMut::with_capacity(buf_size));
993 :
994 : // Note that reads are processed in reverse order (from highest key+lsn).
995 : // This is the order that `ReconstructState` requires such that it can
996 : // track when a key is done.
997 18 : for read in reads.into_iter().rev() {
998 18 : let res = vectored_blob_reader
999 18 : .read_blobs(&read, buf.take().expect("Should have a buffer"))
1000 9 : .await;
1001 :
1002 18 : let blobs_buf = match res {
1003 18 : Ok(blobs_buf) => blobs_buf,
1004 0 : Err(err) => {
1005 0 : let kind = err.kind();
1006 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
1007 0 : reconstruct_state.on_key_error(
1008 0 : blob_meta.key,
1009 0 : PageReconstructError::from(anyhow!(
1010 0 : "Failed to read blobs from virtual file {}: {}",
1011 0 : self.file.path,
1012 0 : kind
1013 0 : )),
1014 0 : );
1015 0 : }
1016 :
1017 : // We have "lost" the buffer since the lower level IO api
1018 : // doesn't return the buffer on error. Allocate a new one.
1019 0 : buf = Some(BytesMut::with_capacity(buf_size));
1020 0 :
1021 0 : continue;
1022 : }
1023 : };
1024 :
1025 362 : for meta in blobs_buf.blobs.iter().rev() {
1026 362 : if Some(meta.meta.key) == ignore_key_with_err {
1027 0 : continue;
1028 362 : }
1029 362 :
1030 362 : let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
1031 362 : let value = match value {
1032 362 : Ok(v) => v,
1033 0 : Err(e) => {
1034 0 : reconstruct_state.on_key_error(
1035 0 : meta.meta.key,
1036 0 : PageReconstructError::from(anyhow!(e).context(format!(
1037 0 : "Failed to deserialize blob from virtual file {}",
1038 0 : self.file.path,
1039 0 : ))),
1040 0 : );
1041 0 :
1042 0 : ignore_key_with_err = Some(meta.meta.key);
1043 0 : continue;
1044 : }
1045 : };
1046 :
1047 : // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
1048 : // state, no further updates shall be made to it. The call below will
1049 : // panic if the invariant is violated.
1050 362 : reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
1051 : }
1052 :
1053 18 : buf = Some(blobs_buf.buf);
1054 : }
1055 18 : }
1056 :
1057 446 : pub(super) async fn load_keys<'a>(
1058 446 : &'a self,
1059 446 : ctx: &RequestContext,
1060 446 : ) -> Result<Vec<DeltaEntry<'a>>> {
1061 446 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1062 446 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1063 446 : self.index_start_blk,
1064 446 : self.index_root_blk,
1065 446 : block_reader,
1066 446 : );
1067 446 :
1068 446 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1069 446 :
1070 446 : tree_reader
1071 446 : .visit(
1072 446 : &[0u8; DELTA_KEY_SIZE],
1073 446 : VisitDirection::Forwards,
1074 3122006 : |key, value| {
1075 3122006 : let delta_key = DeltaKey::from_slice(key);
1076 3122006 : let val_ref = ValueRef {
1077 3122006 : blob_ref: BlobRef(value),
1078 3122006 : reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
1079 3122006 : Adapter(self),
1080 3122006 : )),
1081 3122006 : };
1082 3122006 : let pos = BlobRef(value).pos();
1083 3122006 : if let Some(last) = all_keys.last_mut() {
1084 3121560 : // subtract offset of the current and last entries to get the size
1085 3121560 : // of the value associated with this (key, lsn) tuple
1086 3121560 : let first_pos = last.size;
1087 3121560 : last.size = pos - first_pos;
1088 3121560 : }
1089 3122006 : let entry = DeltaEntry {
1090 3122006 : key: delta_key.key(),
1091 3122006 : lsn: delta_key.lsn(),
1092 3122006 : size: pos,
1093 3122006 : val: val_ref,
1094 3122006 : };
1095 3122006 : all_keys.push(entry);
1096 3122006 : true
1097 3122006 : },
1098 446 : &RequestContextBuilder::extend(ctx)
1099 446 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1100 446 : .build(),
1101 446 : )
1102 3238 : .await?;
1103 446 : if let Some(last) = all_keys.last_mut() {
1104 446 : // Last key occupies all space till end of value storage,
1105 446 : // which corresponds to beginning of the index
1106 446 : last.size = self.index_start_offset() - last.size;
1107 446 : }
1108 446 : Ok(all_keys)
1109 446 : }
1110 :
1111 : /// Using the given writer, write out a truncated version, where LSNs higher than the
1112 : /// truncate_at are missing.
1113 : #[cfg(test)]
1114 10 : pub(super) async fn copy_prefix(
1115 10 : &self,
1116 10 : writer: &mut DeltaLayerWriter,
1117 10 : truncate_at: Lsn,
1118 10 : ctx: &RequestContext,
1119 10 : ) -> anyhow::Result<()> {
1120 10 : use crate::tenant::vectored_blob_io::{
1121 10 : BlobMeta, VectoredReadBuilder, VectoredReadExtended,
1122 10 : };
1123 10 : use futures::stream::TryStreamExt;
1124 10 :
1125 10 : #[derive(Debug)]
1126 10 : enum Item {
1127 10 : Actual(Key, Lsn, BlobRef),
1128 10 : Sentinel,
1129 10 : }
1130 10 :
1131 10 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1132 70 : fn from(value: Item) -> Self {
1133 70 : match value {
1134 60 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1135 10 : Item::Sentinel => None,
1136 10 : }
1137 70 : }
1138 10 : }
1139 10 :
1140 10 : impl Item {
1141 70 : fn offset(&self) -> Option<BlobRef> {
1142 70 : match self {
1143 60 : Item::Actual(_, _, blob) => Some(*blob),
1144 10 : Item::Sentinel => None,
1145 10 : }
1146 70 : }
1147 10 :
1148 70 : fn is_last(&self) -> bool {
1149 70 : matches!(self, Item::Sentinel)
1150 70 : }
1151 10 : }
1152 10 :
1153 10 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1154 10 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1155 10 : self.index_start_blk,
1156 10 : self.index_root_blk,
1157 10 : block_reader,
1158 10 : );
1159 10 :
1160 10 : let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1161 60 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1162 10 : // put in a sentinel value for getting the end offset for last item, and not having to
1163 10 : // repeat the whole read part
1164 10 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1165 10 : Item::Sentinel,
1166 10 : ))));
1167 10 : let mut stream = std::pin::pin!(stream);
1168 10 :
1169 10 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1170 10 :
1171 10 : let mut read_builder: Option<VectoredReadBuilder> = None;
1172 10 :
1173 10 : let max_read_size = self
1174 10 : .max_vectored_read_bytes
1175 10 : .map(|x| x.0.get())
1176 10 : .unwrap_or(8192);
1177 10 :
1178 10 : let mut buffer = Some(BytesMut::with_capacity(max_read_size));
1179 10 :
1180 10 : // FIXME: buffering of DeltaLayerWriter
1181 10 : let mut per_blob_copy = Vec::new();
1182 :
1183 80 : while let Some(item) = stream.try_next().await? {
1184 70 : tracing::debug!(?item, "popped");
1185 70 : let offset = item
1186 70 : .offset()
1187 70 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1188 :
1189 70 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1190 60 : let end_offset = offset;
1191 60 :
1192 60 : Some((BlobMeta { key, lsn }, start_offset..end_offset))
1193 : } else {
1194 10 : None
1195 : };
1196 :
1197 70 : let is_last = item.is_last();
1198 70 :
1199 70 : prev = Option::from(item);
1200 70 :
1201 70 : let actionable = actionable.filter(|x| x.0.lsn < truncate_at);
1202 :
1203 70 : let builder = if let Some((meta, offsets)) = actionable {
1204 : // extend or create a new builder
1205 32 : if read_builder
1206 32 : .as_mut()
1207 32 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1208 32 : .unwrap_or(VectoredReadExtended::No)
1209 32 : == VectoredReadExtended::Yes
1210 : {
1211 16 : None
1212 : } else {
1213 16 : read_builder.replace(VectoredReadBuilder::new(
1214 16 : offsets.start.pos(),
1215 16 : offsets.end.pos(),
1216 16 : meta,
1217 16 : max_read_size,
1218 16 : ))
1219 : }
1220 : } else {
1221 : // nothing to do, except perhaps flush any existing for the last element
1222 38 : None
1223 : };
1224 :
1225 : // flush the possible older builder and also the new one if the item was the last one
1226 70 : let builders = builder.into_iter();
1227 70 : let builders = if is_last {
1228 10 : builders.chain(read_builder.take())
1229 : } else {
1230 60 : builders.chain(None)
1231 : };
1232 :
1233 86 : for builder in builders {
1234 16 : let read = builder.build();
1235 16 :
1236 16 : let reader = VectoredBlobReader::new(&self.file);
1237 16 :
1238 16 : let mut buf = buffer.take().unwrap();
1239 16 :
1240 16 : buf.clear();
1241 16 : buf.reserve(read.size());
1242 16 : let res = reader.read_blobs(&read, buf).await?;
1243 :
1244 48 : for blob in res.blobs {
1245 32 : let key = blob.meta.key;
1246 32 : let lsn = blob.meta.lsn;
1247 32 : let data = &res.buf[blob.start..blob.end];
1248 32 :
1249 32 : #[cfg(debug_assertions)]
1250 32 : Value::des(data)
1251 32 : .with_context(|| {
1252 0 : format!(
1253 0 : "blob failed to deserialize for {}@{}, {}..{}: {:?}",
1254 0 : blob.meta.key,
1255 0 : blob.meta.lsn,
1256 0 : blob.start,
1257 0 : blob.end,
1258 0 : utils::Hex(data)
1259 0 : )
1260 32 : })
1261 32 : .unwrap();
1262 32 :
1263 32 : // is it an image or will_init walrecord?
1264 32 : // FIXME: this could be handled by threading the BlobRef to the
1265 32 : // VectoredReadBuilder
1266 32 : let will_init = crate::repository::ValueBytes::will_init(data)
1267 32 : .inspect_err(|_e| {
1268 0 : #[cfg(feature = "testing")]
1269 0 : tracing::error!(data=?utils::Hex(data), err=?_e, "failed to parse will_init out of serialized value");
1270 32 : })
1271 32 : .unwrap_or(false);
1272 32 :
1273 32 : per_blob_copy.clear();
1274 32 : per_blob_copy.extend_from_slice(data);
1275 :
1276 32 : let (tmp, res) = writer
1277 32 : .put_value_bytes(key, lsn, std::mem::take(&mut per_blob_copy), will_init)
1278 4 : .await;
1279 32 : per_blob_copy = tmp;
1280 32 : res?;
1281 : }
1282 :
1283 16 : buffer = Some(res.buf);
1284 : }
1285 : }
1286 :
1287 10 : assert!(
1288 10 : read_builder.is_none(),
1289 0 : "with the sentinel above loop should had handled all"
1290 : );
1291 :
1292 10 : Ok(())
1293 10 : }
1294 :
1295 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1296 4 : println!(
1297 4 : "index_start_blk: {}, root {}",
1298 4 : self.index_start_blk, self.index_root_blk
1299 4 : );
1300 4 :
1301 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1302 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1303 4 : self.index_start_blk,
1304 4 : self.index_root_blk,
1305 4 : block_reader,
1306 4 : );
1307 4 :
1308 4 : tree_reader.dump().await?;
1309 :
1310 4 : let keys = self.load_keys(ctx).await?;
1311 :
1312 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1313 8 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1314 8 : let val = Value::des(&buf)?;
1315 8 : let desc = match val {
1316 8 : Value::Image(img) => {
1317 8 : format!(" img {} bytes", img.len())
1318 : }
1319 0 : Value::WalRecord(rec) => {
1320 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
1321 0 : format!(
1322 0 : " rec {} bytes will_init: {} {}",
1323 0 : buf.len(),
1324 0 : rec.will_init(),
1325 0 : wal_desc
1326 0 : )
1327 : }
1328 : };
1329 8 : Ok(desc)
1330 8 : }
1331 :
1332 12 : for entry in keys {
1333 8 : let DeltaEntry { key, lsn, val, .. } = entry;
1334 8 : let desc = match dump_blob(&val, ctx).await {
1335 8 : Ok(desc) => desc,
1336 0 : Err(err) => {
1337 0 : format!("ERROR: {err}")
1338 : }
1339 : };
1340 8 : println!(" key {key} at {lsn}: {desc}");
1341 8 :
1342 8 : // Print more details about CHECKPOINT records. Would be nice to print details
1343 8 : // of many other record types too, but these are particularly interesting, as
1344 8 : // have a lot of special processing for them in walingest.rs.
1345 8 : use pageserver_api::key::CHECKPOINT_KEY;
1346 8 : use postgres_ffi::CheckPoint;
1347 8 : if key == CHECKPOINT_KEY {
1348 0 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1349 0 : let val = Value::des(&buf)?;
1350 0 : match val {
1351 0 : Value::Image(img) => {
1352 0 : let checkpoint = CheckPoint::decode(&img)?;
1353 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1354 : }
1355 0 : Value::WalRecord(_rec) => {
1356 0 : println!(" unexpected walrecord value for checkpoint key");
1357 0 : }
1358 : }
1359 8 : }
1360 : }
1361 :
1362 4 : Ok(())
1363 4 : }
1364 :
1365 : #[cfg(test)]
1366 30 : fn stream_index_forwards<'a, R>(
1367 30 : &'a self,
1368 30 : reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,
1369 30 : start: &'a [u8; DELTA_KEY_SIZE],
1370 30 : ctx: &'a RequestContext,
1371 30 : ) -> impl futures::stream::Stream<
1372 30 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1373 30 : > + 'a
1374 30 : where
1375 30 : R: BlockReader,
1376 30 : {
1377 30 : use futures::stream::TryStreamExt;
1378 30 : let stream = reader.get_stream_from(start, ctx);
1379 152 : stream.map_ok(|(key, value)| {
1380 152 : let key = DeltaKey::from_slice(&key);
1381 152 : let (key, lsn) = (key.key(), key.lsn());
1382 152 : let offset = BlobRef(value);
1383 152 :
1384 152 : (key, lsn, offset)
1385 152 : })
1386 30 : }
1387 :
1388 : /// The file offset to the first block of index.
1389 : ///
1390 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1391 534 : fn index_start_offset(&self) -> u64 {
1392 534 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1393 534 : let bref = BlobRef(offset);
1394 534 : tracing::debug!(
1395 0 : index_start_blk = self.index_start_blk,
1396 0 : offset,
1397 0 : pos = bref.pos(),
1398 0 : "index_start_offset"
1399 0 : );
1400 534 : offset
1401 534 : }
1402 : }
1403 :
1404 : /// A set of data associated with a delta layer key and its value
1405 : pub struct DeltaEntry<'a> {
1406 : pub key: Key,
1407 : pub lsn: Lsn,
1408 : /// Size of the stored value
1409 : pub size: u64,
1410 : /// Reference to the on-disk value
1411 : pub val: ValueRef<'a>,
1412 : }
1413 :
1414 : /// Reference to an on-disk value
1415 : pub struct ValueRef<'a> {
1416 : blob_ref: BlobRef,
1417 : reader: BlockCursor<'a>,
1418 : }
1419 :
1420 : impl<'a> ValueRef<'a> {
1421 : /// Loads the value from disk
1422 3121998 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1423 : // theoretically we *could* record an access time for each, but it does not really matter
1424 3121998 : let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
1425 3121998 : let val = Value::des(&buf)?;
1426 3121998 : Ok(val)
1427 3121998 : }
1428 : }
1429 :
1430 : pub(crate) struct Adapter<T>(T);
1431 :
1432 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1433 3150684 : pub(crate) async fn read_blk(
1434 3150684 : &self,
1435 3150684 : blknum: u32,
1436 3150684 : ctx: &RequestContext,
1437 3150684 : ) -> Result<BlockLease, std::io::Error> {
1438 3150684 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1439 3150684 : block_reader.read_blk(blknum, ctx).await
1440 3150684 : }
1441 : }
1442 :
1443 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1444 6301368 : fn as_ref(&self) -> &DeltaLayerInner {
1445 6301368 : self
1446 6301368 : }
1447 : }
1448 :
1449 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1450 0 : fn key(&self) -> Key {
1451 0 : self.key
1452 0 : }
1453 0 : fn lsn(&self) -> Lsn {
1454 0 : self.lsn
1455 0 : }
1456 0 : fn size(&self) -> u64 {
1457 0 : self.size
1458 0 : }
1459 : }
1460 :
1461 : #[cfg(test)]
1462 : mod test {
1463 : use std::collections::BTreeMap;
1464 :
1465 : use itertools::MinMaxResult;
1466 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1467 : use rand::RngCore;
1468 :
1469 : use super::*;
1470 : use crate::{
1471 : context::DownloadBehavior,
1472 : task_mgr::TaskKind,
1473 : tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
1474 : DEFAULT_PG_VERSION,
1475 : };
1476 :
1477 : /// Construct an index for a fictional delta layer and and then
1478 : /// traverse in order to plan vectored reads for a query. Finally,
1479 : /// verify that the traversal fed the right index key and value
1480 : /// pairs into the planner.
1481 : #[tokio::test]
1482 2 : async fn test_delta_layer_index_traversal() {
1483 2 : let base_key = Key {
1484 2 : field1: 0,
1485 2 : field2: 1663,
1486 2 : field3: 12972,
1487 2 : field4: 16396,
1488 2 : field5: 0,
1489 2 : field6: 246080,
1490 2 : };
1491 2 :
1492 2 : // Populate the index with some entries
1493 2 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1494 2 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1495 2 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1496 2 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1497 2 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1498 2 : ]);
1499 2 :
1500 2 : let mut disk = TestDisk::default();
1501 2 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1502 2 :
1503 2 : let mut disk_offset = 0;
1504 10 : for (key, lsns) in &entries {
1505 42 : for lsn in lsns {
1506 34 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1507 34 : let blob_ref = BlobRef::new(disk_offset, false);
1508 34 : writer
1509 34 : .append(&index_key.0, blob_ref.0)
1510 34 : .expect("In memory disk append should never fail");
1511 34 :
1512 34 : disk_offset += 1;
1513 34 : }
1514 2 : }
1515 2 :
1516 2 : // Prepare all the arguments for the call into `plan_reads` below
1517 2 : let (root_offset, _writer) = writer
1518 2 : .finish()
1519 2 : .expect("In memory disk finish should never fail");
1520 2 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1521 2 : let planner = VectoredReadPlanner::new(100);
1522 2 : let mut reconstruct_state = ValuesReconstructState::new();
1523 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1524 2 :
1525 2 : let keyspace = KeySpace {
1526 2 : ranges: vec![
1527 2 : base_key..base_key.add(3),
1528 2 : base_key.add(3)..base_key.add(100),
1529 2 : ],
1530 2 : };
1531 2 : let lsn_range = Lsn(2)..Lsn(40);
1532 2 :
1533 2 : // Plan and validate
1534 2 : let vectored_reads = DeltaLayerInner::plan_reads(
1535 2 : keyspace.clone(),
1536 2 : lsn_range.clone(),
1537 2 : disk_offset,
1538 2 : reader,
1539 2 : planner,
1540 2 : &mut reconstruct_state,
1541 2 : &ctx,
1542 2 : )
1543 2 : .await
1544 2 : .expect("Read planning should not fail");
1545 2 :
1546 2 : validate(keyspace, lsn_range, vectored_reads, entries);
1547 2 : }
1548 :
1549 2 : fn validate(
1550 2 : keyspace: KeySpace,
1551 2 : lsn_range: Range<Lsn>,
1552 2 : vectored_reads: Vec<VectoredRead>,
1553 2 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1554 2 : ) {
1555 2 : #[derive(Debug, PartialEq, Eq)]
1556 2 : struct BlobSpec {
1557 2 : key: Key,
1558 2 : lsn: Lsn,
1559 2 : at: u64,
1560 2 : }
1561 2 :
1562 2 : let mut planned_blobs = Vec::new();
1563 8 : for read in vectored_reads {
1564 28 : for (at, meta) in read.blobs_at.as_slice() {
1565 28 : planned_blobs.push(BlobSpec {
1566 28 : key: meta.key,
1567 28 : lsn: meta.lsn,
1568 28 : at: *at,
1569 28 : });
1570 28 : }
1571 : }
1572 :
1573 2 : let mut expected_blobs = Vec::new();
1574 2 : let mut disk_offset = 0;
1575 10 : for (key, lsns) in index_entries {
1576 42 : for lsn in lsns {
1577 42 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1578 34 : let lsn_included = lsn_range.contains(&lsn);
1579 34 :
1580 34 : if key_included && lsn_included {
1581 28 : expected_blobs.push(BlobSpec {
1582 28 : key,
1583 28 : lsn,
1584 28 : at: disk_offset,
1585 28 : });
1586 28 : }
1587 :
1588 34 : disk_offset += 1;
1589 : }
1590 : }
1591 :
1592 2 : assert_eq!(planned_blobs, expected_blobs);
1593 2 : }
1594 :
1595 : mod constants {
1596 : use utils::lsn::Lsn;
1597 :
1598 : /// Offset used by all lsns in this test
1599 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1600 : /// Number of unique keys including in the test data
1601 : pub(super) const KEY_COUNT: u8 = 60;
1602 : /// Max number of different lsns for each key
1603 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1604 : /// Possible value sizes for each key along with a probability weight
1605 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1606 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1607 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1608 : /// The minimum size of a key range in all the generated reads
1609 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1610 : /// The number of ranges included in each vectored read
1611 : pub(super) const RANGES_COUNT: u8 = 2;
1612 : /// The number of vectored reads performed
1613 : pub(super) const READS_COUNT: u8 = 100;
1614 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1615 : /// with values larger than the limit
1616 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1617 : }
1618 :
1619 : struct Entry {
1620 : key: Key,
1621 : lsn: Lsn,
1622 : value: Vec<u8>,
1623 : }
1624 :
1625 2 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1626 2 : let mut current_key = Key::MIN;
1627 2 :
1628 2 : let mut entries = Vec::new();
1629 122 : for _ in 0..constants::KEY_COUNT {
1630 120 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1631 120 : let mut lsns_iter =
1632 2260 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1633 2260 : Some(Lsn(lsn.0 + 0x08))
1634 2260 : });
1635 120 : let mut lsns = Vec::new();
1636 2380 : while lsns.len() < count as usize {
1637 2260 : let take = rng.gen_bool(0.5);
1638 2260 : let lsn = lsns_iter.next().unwrap();
1639 2260 : if take {
1640 1112 : lsns.push(lsn);
1641 1148 : }
1642 : }
1643 :
1644 1232 : for lsn in lsns {
1645 1112 : let size = constants::VALUE_SIZES
1646 3336 : .choose_weighted(rng, |item| item.1)
1647 1112 : .unwrap()
1648 1112 : .0;
1649 1112 : let mut buf = vec![0; size];
1650 1112 : rng.fill_bytes(&mut buf);
1651 1112 :
1652 1112 : entries.push(Entry {
1653 1112 : key: current_key,
1654 1112 : lsn,
1655 1112 : value: buf,
1656 1112 : })
1657 : }
1658 :
1659 120 : let gap = constants::KEY_GAP_CHANGES
1660 240 : .choose_weighted(rng, |item| item.1)
1661 120 : .unwrap()
1662 120 : .0;
1663 120 : if gap {
1664 38 : current_key = current_key.add(2);
1665 82 : } else {
1666 82 : current_key = current_key.add(1);
1667 82 : }
1668 : }
1669 :
1670 2 : entries
1671 2 : }
1672 :
1673 : struct EntriesMeta {
1674 : key_range: Range<Key>,
1675 : lsn_range: Range<Lsn>,
1676 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1677 : }
1678 :
1679 2 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1680 1112 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1681 2 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1682 0 : _ => panic!("More than one entry is always expected"),
1683 : };
1684 :
1685 1112 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1686 2 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1687 0 : _ => panic!("More than one entry is always expected"),
1688 : };
1689 :
1690 2 : let mut index = BTreeMap::new();
1691 1112 : for entry in entries.iter() {
1692 1112 : index.insert((entry.key, entry.lsn), entry.value.clone());
1693 1112 : }
1694 :
1695 2 : EntriesMeta {
1696 2 : key_range,
1697 2 : lsn_range,
1698 2 : index,
1699 2 : }
1700 2 : }
1701 :
1702 200 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1703 200 : let start = key_range.start.to_i128();
1704 200 : let end = key_range.end.to_i128();
1705 200 :
1706 200 : let mut keyspace = KeySpace::default();
1707 :
1708 600 : for _ in 0..constants::RANGES_COUNT {
1709 400 : let mut range: Option<Range<Key>> = Option::default();
1710 1244 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1711 844 : let range_start = rng.gen_range(start..end);
1712 844 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1713 844 : if range_end_offset >= end {
1714 100 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1715 744 : } else {
1716 744 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1717 744 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1718 744 : }
1719 : }
1720 400 : keyspace.ranges.push(range.unwrap());
1721 : }
1722 :
1723 200 : keyspace
1724 200 : }
1725 :
1726 : #[tokio::test]
1727 2 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1728 2 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?;
1729 2 : let (tenant, ctx) = harness.load().await;
1730 2 :
1731 2 : let timeline_id = TimelineId::generate();
1732 2 : let timeline = tenant
1733 2 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1734 6 : .await?;
1735 2 :
1736 2 : tracing::info!("Generating test data ...");
1737 2 :
1738 2 : let rng = &mut StdRng::seed_from_u64(0);
1739 2 : let entries = generate_entries(rng);
1740 2 : let entries_meta = get_entries_meta(&entries);
1741 2 :
1742 2 : tracing::info!("Done generating {} entries", entries.len());
1743 2 :
1744 2 : tracing::info!("Writing test data to delta layer ...");
1745 2 : let mut writer = DeltaLayerWriter::new(
1746 2 : harness.conf,
1747 2 : timeline_id,
1748 2 : harness.tenant_shard_id,
1749 2 : entries_meta.key_range.start,
1750 2 : entries_meta.lsn_range.clone(),
1751 2 : )
1752 2 : .await?;
1753 2 :
1754 1114 : for entry in entries {
1755 1112 : let (_, res) = writer
1756 1112 : .put_value_bytes(entry.key, entry.lsn, entry.value, false)
1757 220 : .await;
1758 1112 : res?;
1759 2 : }
1760 2 :
1761 5 : let resident = writer.finish(entries_meta.key_range.end, &timeline).await?;
1762 2 :
1763 2 : let inner = resident.as_delta(&ctx).await?;
1764 2 :
1765 2 : let file_size = inner.file.metadata().await?.len();
1766 2 : tracing::info!(
1767 2 : "Done writing test data to delta layer. Resulting file size is: {}",
1768 2 : file_size
1769 2 : );
1770 2 :
1771 202 : for i in 0..constants::READS_COUNT {
1772 200 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1773 2 :
1774 200 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1775 200 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1776 200 : inner.index_start_blk,
1777 200 : inner.index_root_blk,
1778 200 : block_reader,
1779 200 : );
1780 200 :
1781 200 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1782 200 : let mut reconstruct_state = ValuesReconstructState::new();
1783 200 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1784 200 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1785 2 :
1786 200 : let vectored_reads = DeltaLayerInner::plan_reads(
1787 200 : keyspace.clone(),
1788 200 : entries_meta.lsn_range.clone(),
1789 200 : data_end_offset,
1790 200 : index_reader,
1791 200 : planner,
1792 200 : &mut reconstruct_state,
1793 200 : &ctx,
1794 200 : )
1795 3 : .await?;
1796 2 :
1797 200 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1798 200 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1799 200 : &vectored_reads,
1800 200 : constants::MAX_VECTORED_READ_BYTES,
1801 200 : );
1802 200 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1803 2 :
1804 19924 : for read in vectored_reads {
1805 19724 : let blobs_buf = vectored_blob_reader
1806 19724 : .read_blobs(&read, buf.take().expect("Should have a buffer"))
1807 10017 : .await?;
1808 57304 : for meta in blobs_buf.blobs.iter() {
1809 57304 : let value = &blobs_buf.buf[meta.start..meta.end];
1810 57304 : assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
1811 2 : }
1812 2 :
1813 19724 : buf = Some(blobs_buf.buf);
1814 2 : }
1815 2 : }
1816 2 :
1817 2 : Ok(())
1818 2 : }
1819 :
1820 : #[tokio::test]
1821 2 : async fn copy_delta_prefix_smoke() {
1822 2 : use crate::walrecord::NeonWalRecord;
1823 2 : use bytes::Bytes;
1824 2 :
1825 2 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
1826 2 : let (tenant, ctx) = h.load().await;
1827 2 : let ctx = &ctx;
1828 2 : let timeline = tenant
1829 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
1830 6 : .await
1831 2 : .unwrap();
1832 2 :
1833 2 : let initdb_layer = timeline
1834 2 : .layers
1835 2 : .read()
1836 2 : .await
1837 2 : .likely_resident_layers()
1838 2 : .next()
1839 2 : .unwrap();
1840 2 :
1841 2 : {
1842 2 : let mut writer = timeline.writer().await;
1843 2 :
1844 2 : let data = [
1845 2 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
1846 2 : (
1847 2 : 0x30,
1848 2 : 12,
1849 2 : Value::WalRecord(NeonWalRecord::Postgres {
1850 2 : will_init: false,
1851 2 : rec: Bytes::from_static(b"1"),
1852 2 : }),
1853 2 : ),
1854 2 : (
1855 2 : 0x40,
1856 2 : 12,
1857 2 : Value::WalRecord(NeonWalRecord::Postgres {
1858 2 : will_init: true,
1859 2 : rec: Bytes::from_static(b"2"),
1860 2 : }),
1861 2 : ),
1862 2 : // build an oversized value so we cannot extend and existing read over
1863 2 : // this
1864 2 : (
1865 2 : 0x50,
1866 2 : 12,
1867 2 : Value::WalRecord(NeonWalRecord::Postgres {
1868 2 : will_init: true,
1869 2 : rec: {
1870 2 : let mut buf =
1871 2 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
1872 2 : buf.iter_mut()
1873 2 : .enumerate()
1874 264192 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
1875 2 : Bytes::from(buf)
1876 2 : },
1877 2 : }),
1878 2 : ),
1879 2 : // because the oversized read cannot be extended further, we are sure to exercise the
1880 2 : // builder created on the last round with this:
1881 2 : (
1882 2 : 0x60,
1883 2 : 12,
1884 2 : Value::WalRecord(NeonWalRecord::Postgres {
1885 2 : will_init: true,
1886 2 : rec: Bytes::from_static(b"3"),
1887 2 : }),
1888 2 : ),
1889 2 : (
1890 2 : 0x60,
1891 2 : 9,
1892 2 : Value::Image(Bytes::from_static(b"something for a different key")),
1893 2 : ),
1894 2 : ];
1895 2 :
1896 2 : let mut last_lsn = None;
1897 2 :
1898 14 : for (lsn, key, value) in data {
1899 12 : let key = Key::from_i128(key);
1900 17 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
1901 12 : last_lsn = Some(lsn);
1902 2 : }
1903 2 :
1904 2 : writer.finish_write(Lsn(last_lsn.unwrap()));
1905 2 : }
1906 2 : timeline.freeze_and_flush().await.unwrap();
1907 2 :
1908 2 : let new_layer = timeline
1909 2 : .layers
1910 2 : .read()
1911 2 : .await
1912 2 : .likely_resident_layers()
1913 4 : .find(|x| x != &initdb_layer)
1914 2 : .unwrap();
1915 2 :
1916 2 : // create a copy for the timeline, so we don't overwrite the file
1917 2 : let branch = tenant
1918 2 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
1919 2 : .await
1920 2 : .unwrap();
1921 2 :
1922 2 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
1923 2 :
1924 2 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
1925 2 : // a single key
1926 2 :
1927 12 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
1928 10 : let truncate_at = Lsn(truncate_at);
1929 2 :
1930 10 : let mut writer = DeltaLayerWriter::new(
1931 10 : tenant.conf,
1932 10 : branch.timeline_id,
1933 10 : tenant.tenant_shard_id,
1934 10 : Key::MIN,
1935 10 : Lsn(0x11)..truncate_at,
1936 10 : )
1937 5 : .await
1938 10 : .unwrap();
1939 2 :
1940 10 : let new_layer = new_layer.download_and_keep_resident().await.unwrap();
1941 10 :
1942 10 : new_layer
1943 10 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
1944 15 : .await
1945 10 : .unwrap();
1946 2 :
1947 24 : let copied_layer = writer.finish(Key::MAX, &branch).await.unwrap();
1948 10 :
1949 11 : copied_layer.as_delta(ctx).await.unwrap();
1950 10 :
1951 10 : assert_keys_and_values_eq(
1952 10 : new_layer.as_delta(ctx).await.unwrap(),
1953 10 : copied_layer.as_delta(ctx).await.unwrap(),
1954 10 : truncate_at,
1955 10 : ctx,
1956 2 : )
1957 56 : .await;
1958 2 : }
1959 2 : }
1960 :
1961 10 : async fn assert_keys_and_values_eq(
1962 10 : source: &DeltaLayerInner,
1963 10 : truncated: &DeltaLayerInner,
1964 10 : truncated_at: Lsn,
1965 10 : ctx: &RequestContext,
1966 10 : ) {
1967 10 : use futures::future::ready;
1968 10 : use futures::stream::TryStreamExt;
1969 10 :
1970 10 : let start_key = [0u8; DELTA_KEY_SIZE];
1971 10 :
1972 10 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
1973 10 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1974 10 : source.index_start_blk,
1975 10 : source.index_root_blk,
1976 10 : &source_reader,
1977 10 : );
1978 10 : let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx);
1979 60 : let source_stream = source_stream.filter(|res| match res {
1980 60 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
1981 0 : _ => ready(true),
1982 60 : });
1983 10 : let mut source_stream = std::pin::pin!(source_stream);
1984 10 :
1985 10 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
1986 10 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1987 10 : truncated.index_start_blk,
1988 10 : truncated.index_root_blk,
1989 10 : &truncated_reader,
1990 10 : );
1991 10 : let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx);
1992 10 : let mut truncated_stream = std::pin::pin!(truncated_stream);
1993 10 :
1994 10 : let mut scratch_left = Vec::new();
1995 10 : let mut scratch_right = Vec::new();
1996 :
1997 : loop {
1998 42 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
1999 47 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2000 42 :
2001 42 : if src.is_none() {
2002 10 : assert!(truncated.is_none());
2003 10 : break;
2004 32 : }
2005 32 :
2006 32 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2007 32 :
2008 32 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2009 32 : assert_eq!(src.0, truncated.0);
2010 32 : assert_eq!(src.1, truncated.1);
2011 :
2012 : // if this is needed for something else, just drop this assert.
2013 32 : assert!(
2014 32 : src.2.pos() >= truncated.2.pos(),
2015 0 : "value position should not go backwards {} vs. {}",
2016 0 : src.2.pos(),
2017 0 : truncated.2.pos()
2018 : );
2019 :
2020 32 : scratch_left.clear();
2021 32 : let src_cursor = source_reader.block_cursor();
2022 32 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2023 32 : scratch_right.clear();
2024 32 : let trunc_cursor = truncated_reader.block_cursor();
2025 32 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2026 :
2027 83 : tokio::try_join!(left, right).unwrap();
2028 32 :
2029 32 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2030 : }
2031 10 : }
2032 : }
|