Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "values", and
24 : //! "index". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::{self, FileId, PAGE_SZ};
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
37 : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
38 : use crate::tenant::timeline::GetVectoredError;
39 : use crate::tenant::vectored_blob_io::{
40 : BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
41 : };
42 : use crate::tenant::{PageReconstructError, Timeline};
43 : use crate::virtual_file::{self, VirtualFile};
44 : use crate::{walrecord, TEMP_FILE_SUFFIX};
45 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
46 : use anyhow::{anyhow, bail, ensure, Context, Result};
47 : use bytes::BytesMut;
48 : use camino::{Utf8Path, Utf8PathBuf};
49 : use futures::StreamExt;
50 : use itertools::Itertools;
51 : use pageserver_api::keyspace::KeySpace;
52 : use pageserver_api::models::LayerAccessKind;
53 : use pageserver_api::shard::TenantShardId;
54 : use rand::{distributions::Alphanumeric, Rng};
55 : use serde::{Deserialize, Serialize};
56 : use std::fs::File;
57 : use std::io::SeekFrom;
58 : use std::ops::Range;
59 : use std::os::unix::fs::FileExt;
60 : use std::str::FromStr;
61 : use std::sync::Arc;
62 : use tokio::sync::OnceCell;
63 : use tracing::*;
64 :
65 : use utils::{
66 : bin_ser::BeSer,
67 : id::{TenantId, TimelineId},
68 : lsn::Lsn,
69 : };
70 :
71 : use super::{
72 : AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer,
73 : ValuesReconstructState,
74 : };
75 :
76 : ///
77 : /// Header stored in the beginning of the file
78 : ///
79 : /// After this comes the 'values' part, starting on block 1. After that,
80 : /// the 'index' starts at the block indicated by 'index_start_blk'
81 : ///
82 894 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
83 : pub struct Summary {
84 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
85 : pub magic: u16,
86 : pub format_version: u16,
87 :
88 : pub tenant_id: TenantId,
89 : pub timeline_id: TimelineId,
90 : pub key_range: Range<Key>,
91 : pub lsn_range: Range<Lsn>,
92 :
93 : /// Block number where the 'index' part of the file begins.
94 : pub index_start_blk: u32,
95 : /// Block within the 'index', where the B-tree root page is stored
96 : pub index_root_blk: u32,
97 : }
98 :
99 : impl From<&DeltaLayer> for Summary {
100 0 : fn from(layer: &DeltaLayer) -> Self {
101 0 : Self::expected(
102 0 : layer.desc.tenant_shard_id.tenant_id,
103 0 : layer.desc.timeline_id,
104 0 : layer.desc.key_range.clone(),
105 0 : layer.desc.lsn_range.clone(),
106 0 : )
107 0 : }
108 : }
109 :
110 : impl Summary {
111 894 : pub(super) fn expected(
112 894 : tenant_id: TenantId,
113 894 : timeline_id: TimelineId,
114 894 : keys: Range<Key>,
115 894 : lsns: Range<Lsn>,
116 894 : ) -> Self {
117 894 : Self {
118 894 : magic: DELTA_FILE_MAGIC,
119 894 : format_version: STORAGE_FORMAT_VERSION,
120 894 :
121 894 : tenant_id,
122 894 : timeline_id,
123 894 : key_range: keys,
124 894 : lsn_range: lsns,
125 894 :
126 894 : index_start_blk: 0,
127 894 : index_root_blk: 0,
128 894 : }
129 894 : }
130 : }
131 :
132 : // Flag indicating that this version initialize the page
133 : const WILL_INIT: u64 = 1;
134 :
135 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
136 : /// offset, and for WAL records it also contains `will_init` flag. The flag
137 : /// helps to determine the range of records that needs to be applied, without
138 : /// reading/deserializing records themselves.
139 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
140 : pub struct BlobRef(pub u64);
141 :
142 : impl BlobRef {
143 240231 : pub fn will_init(&self) -> bool {
144 240231 : (self.0 & WILL_INIT) != 0
145 240231 : }
146 :
147 4439445 : pub fn pos(&self) -> u64 {
148 4439445 : self.0 >> 1
149 4439445 : }
150 :
151 6450812 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
152 6450812 : let mut blob_ref = pos << 1;
153 6450812 : if will_init {
154 6449586 : blob_ref |= WILL_INIT;
155 6449586 : }
156 6450812 : BlobRef(blob_ref)
157 6450812 : }
158 : }
159 :
160 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
161 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
162 :
163 : /// This is the key of the B-tree index stored in the delta layer. It consists
164 : /// of the serialized representation of a Key and LSN.
165 : impl DeltaKey {
166 2064198 : fn from_slice(buf: &[u8]) -> Self {
167 2064198 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
168 2064198 : bytes.copy_from_slice(buf);
169 2064198 : DeltaKey(bytes)
170 2064198 : }
171 :
172 6696894 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
173 6696894 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
174 6696894 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
175 6696894 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
176 6696894 : DeltaKey(bytes)
177 6696894 : }
178 :
179 2064198 : fn key(&self) -> Key {
180 2064198 : Key::from_slice(&self.0)
181 2064198 : }
182 :
183 2064198 : fn lsn(&self) -> Lsn {
184 2064198 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
185 2064198 : }
186 :
187 311177 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
188 311177 : let mut lsn_buf = [0u8; 8];
189 311177 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
190 311177 : Lsn(u64::from_be_bytes(lsn_buf))
191 311177 : }
192 : }
193 :
194 : /// This is used only from `pagectl`. Within pageserver, all layers are
195 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
196 : pub struct DeltaLayer {
197 : path: Utf8PathBuf,
198 : pub desc: PersistentLayerDesc,
199 : access_stats: LayerAccessStats,
200 : inner: OnceCell<Arc<DeltaLayerInner>>,
201 : }
202 :
203 : impl std::fmt::Debug for DeltaLayer {
204 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 0 : use super::RangeDisplayDebug;
206 0 :
207 0 : f.debug_struct("DeltaLayer")
208 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
209 0 : .field("lsn_range", &self.desc.lsn_range)
210 0 : .field("file_size", &self.desc.file_size)
211 0 : .field("inner", &self.inner)
212 0 : .finish()
213 0 : }
214 : }
215 :
216 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
217 : /// file.
218 : pub struct DeltaLayerInner {
219 : // values copied from summary
220 : index_start_blk: u32,
221 : index_root_blk: u32,
222 : lsn_range: Range<Lsn>,
223 :
224 : file: VirtualFile,
225 : file_id: FileId,
226 :
227 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
228 : }
229 :
230 : impl std::fmt::Debug for DeltaLayerInner {
231 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 0 : f.debug_struct("DeltaLayerInner")
233 0 : .field("index_start_blk", &self.index_start_blk)
234 0 : .field("index_root_blk", &self.index_root_blk)
235 0 : .finish()
236 0 : }
237 : }
238 :
239 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
240 : impl std::fmt::Display for DeltaLayer {
241 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 0 : write!(f, "{}", self.layer_desc().short_id())
243 0 : }
244 : }
245 :
246 : impl AsLayerDesc for DeltaLayer {
247 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
248 0 : &self.desc
249 0 : }
250 : }
251 :
252 : impl DeltaLayer {
253 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
254 0 : self.desc.dump();
255 0 :
256 0 : if !verbose {
257 0 : return Ok(());
258 0 : }
259 :
260 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
261 :
262 0 : inner.dump(ctx).await
263 0 : }
264 :
265 1136 : fn temp_path_for(
266 1136 : conf: &PageServerConf,
267 1136 : tenant_shard_id: &TenantShardId,
268 1136 : timeline_id: &TimelineId,
269 1136 : key_start: Key,
270 1136 : lsn_range: &Range<Lsn>,
271 1136 : ) -> Utf8PathBuf {
272 1136 : let rand_string: String = rand::thread_rng()
273 1136 : .sample_iter(&Alphanumeric)
274 1136 : .take(8)
275 1136 : .map(char::from)
276 1136 : .collect();
277 1136 :
278 1136 : conf.timeline_path(tenant_shard_id, timeline_id)
279 1136 : .join(format!(
280 1136 : "{}-XXX__{:016X}-{:016X}.{}.{}",
281 1136 : key_start,
282 1136 : u64::from(lsn_range.start),
283 1136 : u64::from(lsn_range.end),
284 1136 : rand_string,
285 1136 : TEMP_FILE_SUFFIX,
286 1136 : ))
287 1136 : }
288 :
289 : ///
290 : /// Open the underlying file and read the metadata into memory, if it's
291 : /// not loaded already.
292 : ///
293 0 : async fn load(
294 0 : &self,
295 0 : access_kind: LayerAccessKind,
296 0 : ctx: &RequestContext,
297 0 : ) -> Result<&Arc<DeltaLayerInner>> {
298 0 : self.access_stats.record_access(access_kind, ctx);
299 0 : // Quick exit if already loaded
300 0 : self.inner
301 0 : .get_or_try_init(|| self.load_inner(ctx))
302 0 : .await
303 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
304 0 : }
305 :
306 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
307 0 : let path = self.path();
308 :
309 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx)
310 0 : .await
311 0 : .and_then(|res| res)?;
312 :
313 : // not production code
314 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
315 0 : let expected_layer_name = self.layer_desc().layer_name();
316 0 :
317 0 : if actual_layer_name != expected_layer_name {
318 0 : println!("warning: filename does not match what is expected from in-file summary");
319 0 : println!("actual: {:?}", actual_layer_name.to_string());
320 0 : println!("expected: {:?}", expected_layer_name.to_string());
321 0 : }
322 :
323 0 : Ok(Arc::new(loaded))
324 0 : }
325 :
326 : /// Create a DeltaLayer struct representing an existing file on disk.
327 : ///
328 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
329 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
330 0 : let mut summary_buf = vec![0; PAGE_SZ];
331 0 : file.read_exact_at(&mut summary_buf, 0)?;
332 0 : let summary = Summary::des_prefix(&summary_buf)?;
333 :
334 0 : let metadata = file
335 0 : .metadata()
336 0 : .context("get file metadata to determine size")?;
337 :
338 : // This function is never used for constructing layers in a running pageserver,
339 : // so it does not need an accurate TenantShardId.
340 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
341 0 :
342 0 : Ok(DeltaLayer {
343 0 : path: path.to_path_buf(),
344 0 : desc: PersistentLayerDesc::new_delta(
345 0 : tenant_shard_id,
346 0 : summary.timeline_id,
347 0 : summary.key_range,
348 0 : summary.lsn_range,
349 0 : metadata.len(),
350 0 : ),
351 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
352 0 : inner: OnceCell::new(),
353 0 : })
354 0 : }
355 :
356 : /// Path to the layer file in pageserver workdir.
357 0 : fn path(&self) -> Utf8PathBuf {
358 0 : self.path.clone()
359 0 : }
360 : }
361 :
362 : /// A builder object for constructing a new delta layer.
363 : ///
364 : /// Usage:
365 : ///
366 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
367 : ///
368 : /// 2. Write the contents by calling `put_value` for every page
369 : /// version to store in the layer.
370 : ///
371 : /// 3. Call `finish`.
372 : ///
373 : struct DeltaLayerWriterInner {
374 : conf: &'static PageServerConf,
375 : pub path: Utf8PathBuf,
376 : timeline_id: TimelineId,
377 : tenant_shard_id: TenantShardId,
378 :
379 : key_start: Key,
380 : lsn_range: Range<Lsn>,
381 :
382 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
383 :
384 : blob_writer: BlobWriter<true>,
385 : }
386 :
387 : impl DeltaLayerWriterInner {
388 : ///
389 : /// Start building a new delta layer.
390 : ///
391 1136 : async fn new(
392 1136 : conf: &'static PageServerConf,
393 1136 : timeline_id: TimelineId,
394 1136 : tenant_shard_id: TenantShardId,
395 1136 : key_start: Key,
396 1136 : lsn_range: Range<Lsn>,
397 1136 : ctx: &RequestContext,
398 1136 : ) -> anyhow::Result<Self> {
399 1136 : // Create the file initially with a temporary filename. We don't know
400 1136 : // the end key yet, so we cannot form the final filename yet. We will
401 1136 : // rename it when we're done.
402 1136 : //
403 1136 : // Note: This overwrites any existing file. There shouldn't be any.
404 1136 : // FIXME: throw an error instead?
405 1136 : let path =
406 1136 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
407 :
408 1136 : let mut file = VirtualFile::create(&path, ctx).await?;
409 : // make room for the header block
410 1136 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
411 1136 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
412 1136 :
413 1136 : // Initialize the b-tree index builder
414 1136 : let block_buf = BlockBuf::new();
415 1136 : let tree_builder = DiskBtreeBuilder::new(block_buf);
416 1136 :
417 1136 : Ok(Self {
418 1136 : conf,
419 1136 : path,
420 1136 : timeline_id,
421 1136 : tenant_shard_id,
422 1136 : key_start,
423 1136 : lsn_range,
424 1136 : tree: tree_builder,
425 1136 : blob_writer,
426 1136 : })
427 1136 : }
428 :
429 : ///
430 : /// Append a key-value pair to the file.
431 : ///
432 : /// The values must be appended in key, lsn order.
433 : ///
434 2064038 : async fn put_value(
435 2064038 : &mut self,
436 2064038 : key: Key,
437 2064038 : lsn: Lsn,
438 2064038 : val: Value,
439 2064038 : ctx: &RequestContext,
440 2064038 : ) -> anyhow::Result<()> {
441 2064038 : let (_, res) = self
442 2064038 : .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init(), ctx)
443 1570 : .await;
444 2064038 : res
445 2064038 : }
446 :
447 6450708 : async fn put_value_bytes(
448 6450708 : &mut self,
449 6450708 : key: Key,
450 6450708 : lsn: Lsn,
451 6450708 : val: Vec<u8>,
452 6450708 : will_init: bool,
453 6450708 : ctx: &RequestContext,
454 6450708 : ) -> (Vec<u8>, anyhow::Result<()>) {
455 6450708 : assert!(self.lsn_range.start <= lsn);
456 6450708 : let (val, res) = self.blob_writer.write_blob(val, ctx).await;
457 6450708 : let off = match res {
458 6450708 : Ok(off) => off,
459 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
460 : };
461 :
462 6450708 : let blob_ref = BlobRef::new(off, will_init);
463 6450708 :
464 6450708 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
465 6450708 : let res = self.tree.append(&delta_key.0, blob_ref.0);
466 6450708 : (val, res.map_err(|e| anyhow::anyhow!(e)))
467 6450708 : }
468 :
469 2023972 : fn size(&self) -> u64 {
470 2023972 : self.blob_writer.size() + self.tree.borrow_writer().size()
471 2023972 : }
472 :
473 : ///
474 : /// Finish writing the delta layer.
475 : ///
476 1136 : async fn finish(
477 1136 : self,
478 1136 : key_end: Key,
479 1136 : timeline: &Arc<Timeline>,
480 1136 : ctx: &RequestContext,
481 1136 : ) -> anyhow::Result<ResidentLayer> {
482 1136 : let index_start_blk =
483 1136 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
484 :
485 1136 : let mut file = self.blob_writer.into_inner(ctx).await?;
486 :
487 : // Write out the index
488 1136 : let (index_root_blk, block_buf) = self.tree.finish()?;
489 1136 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
490 0 : .await?;
491 14487 : for buf in block_buf.blocks {
492 13351 : let (_buf, res) = file.write_all(buf, ctx).await;
493 13351 : res?;
494 : }
495 1136 : assert!(self.lsn_range.start < self.lsn_range.end);
496 : // Fill in the summary on blk 0
497 1136 : let summary = Summary {
498 1136 : magic: DELTA_FILE_MAGIC,
499 1136 : format_version: STORAGE_FORMAT_VERSION,
500 1136 : tenant_id: self.tenant_shard_id.tenant_id,
501 1136 : timeline_id: self.timeline_id,
502 1136 : key_range: self.key_start..key_end,
503 1136 : lsn_range: self.lsn_range.clone(),
504 1136 : index_start_blk,
505 1136 : index_root_blk,
506 1136 : };
507 1136 :
508 1136 : let mut buf = Vec::with_capacity(PAGE_SZ);
509 1136 : // TODO: could use smallvec here but it's a pain with Slice<T>
510 1136 : Summary::ser_into(&summary, &mut buf)?;
511 1136 : file.seek(SeekFrom::Start(0)).await?;
512 1136 : let (_buf, res) = file.write_all(buf, ctx).await;
513 1136 : res?;
514 :
515 1136 : let metadata = file
516 1136 : .metadata()
517 574 : .await
518 1136 : .context("get file metadata to determine size")?;
519 :
520 : // 5GB limit for objects without multipart upload (which we don't want to use)
521 : // Make it a little bit below to account for differing GB units
522 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
523 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
524 1136 : ensure!(
525 1136 : metadata.len() <= S3_UPLOAD_LIMIT,
526 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
527 0 : file.path,
528 0 : metadata.len()
529 : );
530 :
531 : // Note: Because we opened the file in write-only mode, we cannot
532 : // reuse the same VirtualFile for reading later. That's why we don't
533 : // set inner.file here. The first read will have to re-open it.
534 :
535 1136 : let desc = PersistentLayerDesc::new_delta(
536 1136 : self.tenant_shard_id,
537 1136 : self.timeline_id,
538 1136 : self.key_start..key_end,
539 1136 : self.lsn_range.clone(),
540 1136 : metadata.len(),
541 1136 : );
542 1136 :
543 1136 : // fsync the file
544 1136 : file.sync_all().await?;
545 :
546 1136 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
547 :
548 1136 : trace!("created delta layer {}", layer.local_path());
549 :
550 1136 : Ok(layer)
551 1136 : }
552 : }
553 :
554 : /// A builder object for constructing a new delta layer.
555 : ///
556 : /// Usage:
557 : ///
558 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
559 : ///
560 : /// 2. Write the contents by calling `put_value` for every page
561 : /// version to store in the layer.
562 : ///
563 : /// 3. Call `finish`.
564 : ///
565 : /// # Note
566 : ///
567 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
568 : /// possible for the writer to drop before `finish` is actually called. So this
569 : /// could lead to odd temporary files in the directory, exhausting file system.
570 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
571 : /// implementation that cleans up the temporary file in failure. It's not
572 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
573 : /// out some fields, making it impossible to implement `Drop`.
574 : ///
575 : #[must_use]
576 : pub struct DeltaLayerWriter {
577 : inner: Option<DeltaLayerWriterInner>,
578 : }
579 :
580 : impl DeltaLayerWriter {
581 : ///
582 : /// Start building a new delta layer.
583 : ///
584 1136 : pub async fn new(
585 1136 : conf: &'static PageServerConf,
586 1136 : timeline_id: TimelineId,
587 1136 : tenant_shard_id: TenantShardId,
588 1136 : key_start: Key,
589 1136 : lsn_range: Range<Lsn>,
590 1136 : ctx: &RequestContext,
591 1136 : ) -> anyhow::Result<Self> {
592 1136 : Ok(Self {
593 1136 : inner: Some(
594 1136 : DeltaLayerWriterInner::new(
595 1136 : conf,
596 1136 : timeline_id,
597 1136 : tenant_shard_id,
598 1136 : key_start,
599 1136 : lsn_range,
600 1136 : ctx,
601 1136 : )
602 579 : .await?,
603 : ),
604 : })
605 1136 : }
606 :
607 : ///
608 : /// Append a key-value pair to the file.
609 : ///
610 : /// The values must be appended in key, lsn order.
611 : ///
612 2064038 : pub async fn put_value(
613 2064038 : &mut self,
614 2064038 : key: Key,
615 2064038 : lsn: Lsn,
616 2064038 : val: Value,
617 2064038 : ctx: &RequestContext,
618 2064038 : ) -> anyhow::Result<()> {
619 2064038 : self.inner
620 2064038 : .as_mut()
621 2064038 : .unwrap()
622 2064038 : .put_value(key, lsn, val, ctx)
623 1570 : .await
624 2064038 : }
625 :
626 4386670 : pub async fn put_value_bytes(
627 4386670 : &mut self,
628 4386670 : key: Key,
629 4386670 : lsn: Lsn,
630 4386670 : val: Vec<u8>,
631 4386670 : will_init: bool,
632 4386670 : ctx: &RequestContext,
633 4386670 : ) -> (Vec<u8>, anyhow::Result<()>) {
634 4386670 : self.inner
635 4386670 : .as_mut()
636 4386670 : .unwrap()
637 4386670 : .put_value_bytes(key, lsn, val, will_init, ctx)
638 2950 : .await
639 4386670 : }
640 :
641 2023972 : pub fn size(&self) -> u64 {
642 2023972 : self.inner.as_ref().unwrap().size()
643 2023972 : }
644 :
645 : ///
646 : /// Finish writing the delta layer.
647 : ///
648 1136 : pub(crate) async fn finish(
649 1136 : mut self,
650 1136 : key_end: Key,
651 1136 : timeline: &Arc<Timeline>,
652 1136 : ctx: &RequestContext,
653 1136 : ) -> anyhow::Result<ResidentLayer> {
654 1136 : let inner = self.inner.take().unwrap();
655 1136 : let temp_path = inner.path.clone();
656 9034 : let result = inner.finish(key_end, timeline, ctx).await;
657 : // The delta layer files can sometimes be really large. Clean them up.
658 1136 : if result.is_err() {
659 0 : tracing::warn!(
660 0 : "Cleaning up temporary delta file {temp_path} after error during writing"
661 : );
662 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
663 0 : tracing::warn!("Error cleaning up temporary delta layer file {temp_path}: {e:?}")
664 0 : }
665 1136 : }
666 1136 : result
667 1136 : }
668 : }
669 :
670 : impl Drop for DeltaLayerWriter {
671 1136 : fn drop(&mut self) {
672 1136 : if let Some(inner) = self.inner.take() {
673 0 : // We want to remove the virtual file here, so it's fine to not
674 0 : // having completely flushed unwritten data.
675 0 : let vfile = inner.blob_writer.into_inner_no_flush();
676 0 : vfile.remove();
677 1136 : }
678 1136 : }
679 : }
680 :
681 0 : #[derive(thiserror::Error, Debug)]
682 : pub enum RewriteSummaryError {
683 : #[error("magic mismatch")]
684 : MagicMismatch,
685 : #[error(transparent)]
686 : Other(#[from] anyhow::Error),
687 : }
688 :
689 : impl From<std::io::Error> for RewriteSummaryError {
690 0 : fn from(e: std::io::Error) -> Self {
691 0 : Self::Other(anyhow::anyhow!(e))
692 0 : }
693 : }
694 :
695 : impl DeltaLayer {
696 0 : pub async fn rewrite_summary<F>(
697 0 : path: &Utf8Path,
698 0 : rewrite: F,
699 0 : ctx: &RequestContext,
700 0 : ) -> Result<(), RewriteSummaryError>
701 0 : where
702 0 : F: Fn(Summary) -> Summary,
703 0 : {
704 0 : let mut file = VirtualFile::open_with_options(
705 0 : path,
706 0 : virtual_file::OpenOptions::new().read(true).write(true),
707 0 : ctx,
708 0 : )
709 0 : .await
710 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
711 0 : let file_id = page_cache::next_file_id();
712 0 : let block_reader = FileBlockReader::new(&file, file_id);
713 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
714 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
715 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
716 0 : return Err(RewriteSummaryError::MagicMismatch);
717 0 : }
718 0 :
719 0 : let new_summary = rewrite(actual_summary);
720 0 :
721 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
722 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
723 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
724 0 : file.seek(SeekFrom::Start(0)).await?;
725 0 : let (_buf, res) = file.write_all(buf, ctx).await;
726 0 : res?;
727 0 : Ok(())
728 0 : }
729 : }
730 :
731 : impl DeltaLayerInner {
732 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
733 : /// - inner has the success or transient failure
734 : /// - outer has the permanent failure
735 894 : pub(super) async fn load(
736 894 : path: &Utf8Path,
737 894 : summary: Option<Summary>,
738 894 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
739 894 : ctx: &RequestContext,
740 894 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
741 894 : let file = match VirtualFile::open(path, ctx).await {
742 894 : Ok(file) => file,
743 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
744 : };
745 894 : let file_id = page_cache::next_file_id();
746 894 :
747 894 : let block_reader = FileBlockReader::new(&file, file_id);
748 :
749 894 : let summary_blk = match block_reader.read_blk(0, ctx).await {
750 894 : Ok(blk) => blk,
751 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
752 : };
753 :
754 : // TODO: this should be an assertion instead; see ImageLayerInner::load
755 894 : let actual_summary =
756 894 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
757 :
758 894 : if let Some(mut expected_summary) = summary {
759 : // production code path
760 894 : expected_summary.index_start_blk = actual_summary.index_start_blk;
761 894 : expected_summary.index_root_blk = actual_summary.index_root_blk;
762 894 : // mask out the timeline_id, but still require the layers to be from the same tenant
763 894 : expected_summary.timeline_id = actual_summary.timeline_id;
764 894 :
765 894 : if actual_summary != expected_summary {
766 0 : bail!(
767 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
768 0 : actual_summary,
769 0 : expected_summary
770 0 : );
771 894 : }
772 0 : }
773 :
774 894 : Ok(Ok(DeltaLayerInner {
775 894 : file,
776 894 : file_id,
777 894 : index_start_blk: actual_summary.index_start_blk,
778 894 : index_root_blk: actual_summary.index_root_blk,
779 894 : lsn_range: actual_summary.lsn_range,
780 894 : max_vectored_read_bytes,
781 894 : }))
782 894 : }
783 :
784 203442 : pub(super) async fn get_value_reconstruct_data(
785 203442 : &self,
786 203442 : key: Key,
787 203442 : lsn_range: Range<Lsn>,
788 203442 : reconstruct_state: &mut ValueReconstructState,
789 203442 : ctx: &RequestContext,
790 203442 : ) -> anyhow::Result<ValueReconstructResult> {
791 203442 : let mut need_image = true;
792 203442 : // Scan the page versions backwards, starting from `lsn`.
793 203442 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
794 203442 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
795 203442 : self.index_start_blk,
796 203442 : self.index_root_blk,
797 203442 : &block_reader,
798 203442 : );
799 203442 : let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
800 203442 :
801 203442 : let mut offsets: Vec<(Lsn, u64)> = Vec::new();
802 203442 :
803 203442 : tree_reader
804 203442 : .visit(
805 203442 : &search_key.0,
806 203442 : VisitDirection::Backwards,
807 203442 : |key, value| {
808 197521 : let blob_ref = BlobRef(value);
809 197521 : if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
810 75879 : return false;
811 121642 : }
812 121642 : let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
813 121642 : if entry_lsn < lsn_range.start {
814 28 : return false;
815 121614 : }
816 121614 : offsets.push((entry_lsn, blob_ref.pos()));
817 121614 :
818 121614 : !blob_ref.will_init()
819 203442 : },
820 203442 : &RequestContextBuilder::extend(ctx)
821 203442 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
822 203442 : .build(),
823 203442 : )
824 20502 : .await?;
825 :
826 203442 : let ctx = &RequestContextBuilder::extend(ctx)
827 203442 : .page_content_kind(PageContentKind::DeltaLayerValue)
828 203442 : .build();
829 203442 :
830 203442 : // Ok, 'offsets' now contains the offsets of all the entries we need to read
831 203442 : let cursor = block_reader.block_cursor();
832 203442 : let mut buf = Vec::new();
833 203442 : for (entry_lsn, pos) in offsets {
834 121614 : cursor
835 121614 : .read_blob_into_buf(pos, &mut buf, ctx)
836 7851 : .await
837 121614 : .with_context(|| {
838 0 : format!("Failed to read blob from virtual file {}", self.file.path)
839 121614 : })?;
840 121614 : let val = Value::des(&buf).with_context(|| {
841 0 : format!(
842 0 : "Failed to deserialize file blob from virtual file {}",
843 0 : self.file.path
844 0 : )
845 121614 : })?;
846 121614 : match val {
847 121614 : Value::Image(img) => {
848 121614 : reconstruct_state.img = Some((entry_lsn, img));
849 121614 : need_image = false;
850 121614 : break;
851 : }
852 0 : Value::WalRecord(rec) => {
853 0 : let will_init = rec.will_init();
854 0 : reconstruct_state.records.push((entry_lsn, rec));
855 0 : if will_init {
856 : // This WAL record initializes the page, so no need to go further back
857 0 : need_image = false;
858 0 : break;
859 0 : }
860 : }
861 : }
862 : }
863 :
864 : // If an older page image is needed to reconstruct the page, let the
865 : // caller know.
866 203442 : if need_image {
867 81828 : Ok(ValueReconstructResult::Continue)
868 : } else {
869 121614 : Ok(ValueReconstructResult::Complete)
870 : }
871 203442 : }
872 :
873 : // Look up the keys in the provided keyspace and update
874 : // the reconstruct state with whatever is found.
875 : //
876 : // If the key is cached, go no further than the cached Lsn.
877 : //
878 : // Currently, the index is visited for each range, but this
879 : // can be further optimised to visit the index only once.
880 252 : pub(super) async fn get_values_reconstruct_data(
881 252 : &self,
882 252 : keyspace: KeySpace,
883 252 : lsn_range: Range<Lsn>,
884 252 : reconstruct_state: &mut ValuesReconstructState,
885 252 : ctx: &RequestContext,
886 252 : ) -> Result<(), GetVectoredError> {
887 252 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
888 252 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
889 252 : self.index_start_blk,
890 252 : self.index_root_blk,
891 252 : block_reader,
892 252 : );
893 252 :
894 252 : let planner = VectoredReadPlanner::new(
895 252 : self.max_vectored_read_bytes
896 252 : .expect("Layer is loaded with max vectored bytes config")
897 252 : .0
898 252 : .into(),
899 252 : );
900 252 :
901 252 : let data_end_offset = self.index_start_offset();
902 :
903 252 : let reads = Self::plan_reads(
904 252 : &keyspace,
905 252 : lsn_range,
906 252 : data_end_offset,
907 252 : index_reader,
908 252 : planner,
909 252 : reconstruct_state,
910 252 : ctx,
911 252 : )
912 1422 : .await
913 252 : .map_err(GetVectoredError::Other)?;
914 :
915 252 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
916 8805 : .await;
917 :
918 252 : reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
919 252 :
920 252 : Ok(())
921 252 : }
922 :
923 454 : async fn plan_reads<Reader>(
924 454 : keyspace: &KeySpace,
925 454 : lsn_range: Range<Lsn>,
926 454 : data_end_offset: u64,
927 454 : index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
928 454 : mut planner: VectoredReadPlanner,
929 454 : reconstruct_state: &mut ValuesReconstructState,
930 454 : ctx: &RequestContext,
931 454 : ) -> anyhow::Result<Vec<VectoredRead>>
932 454 : where
933 454 : Reader: BlockReader,
934 454 : {
935 454 : let ctx = RequestContextBuilder::extend(ctx)
936 454 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
937 454 : .build();
938 :
939 42710 : for range in keyspace.ranges.iter() {
940 42710 : let mut range_end_handled = false;
941 42710 :
942 42710 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
943 42710 : let index_stream = index_reader.get_stream_from(&start_key.0, &ctx);
944 42710 : let mut index_stream = std::pin::pin!(index_stream);
945 :
946 189836 : while let Some(index_entry) = index_stream.next().await {
947 189535 : let (raw_key, value) = index_entry?;
948 189535 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
949 189535 : let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
950 189535 : let blob_ref = BlobRef(value);
951 189535 :
952 189535 : // Lsns are not monotonically increasing across keys, so we don't assert on them.
953 189535 : assert!(key >= range.start);
954 :
955 189535 : let outside_lsn_range = !lsn_range.contains(&lsn);
956 189535 : let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
957 :
958 189535 : let flag = {
959 189535 : if outside_lsn_range || below_cached_lsn {
960 70918 : BlobFlag::Ignore
961 118617 : } else if blob_ref.will_init() {
962 60969 : BlobFlag::ReplaceAll
963 : } else {
964 : // Usual path: add blob to the read
965 57648 : BlobFlag::None
966 : }
967 : };
968 :
969 189535 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
970 42409 : planner.handle_range_end(blob_ref.pos());
971 42409 : range_end_handled = true;
972 42409 : break;
973 147126 : } else {
974 147126 : planner.handle(key, lsn, blob_ref.pos(), flag);
975 147126 : }
976 : }
977 :
978 42710 : if !range_end_handled {
979 301 : tracing::debug!("Handling range end fallback at {}", data_end_offset);
980 301 : planner.handle_range_end(data_end_offset);
981 42409 : }
982 : }
983 :
984 454 : Ok(planner.finish())
985 454 : }
986 :
987 452 : fn get_min_read_buffer_size(
988 452 : planned_reads: &[VectoredRead],
989 452 : read_size_soft_max: usize,
990 452 : ) -> usize {
991 36950 : let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
992 120 : return read_size_soft_max;
993 : };
994 :
995 332 : let largest_read_size = largest_read.size();
996 332 : if largest_read_size > read_size_soft_max {
997 : // If the read is oversized, it should only contain one key.
998 200 : let offenders = largest_read
999 200 : .blobs_at
1000 200 : .as_slice()
1001 200 : .iter()
1002 200 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
1003 200 : .join(", ");
1004 200 : tracing::warn!(
1005 0 : "Oversized vectored read ({} > {}) for keys {}",
1006 : largest_read_size,
1007 : read_size_soft_max,
1008 : offenders
1009 : );
1010 132 : }
1011 :
1012 332 : largest_read_size
1013 452 : }
1014 :
1015 252 : async fn do_reads_and_update_state(
1016 252 : &self,
1017 252 : reads: Vec<VectoredRead>,
1018 252 : reconstruct_state: &mut ValuesReconstructState,
1019 252 : ctx: &RequestContext,
1020 252 : ) {
1021 252 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
1022 252 : let mut ignore_key_with_err = None;
1023 252 :
1024 252 : let max_vectored_read_bytes = self
1025 252 : .max_vectored_read_bytes
1026 252 : .expect("Layer is loaded with max vectored bytes config")
1027 252 : .0
1028 252 : .into();
1029 252 : let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
1030 252 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1031 :
1032 : // Note that reads are processed in reverse order (from highest key+lsn).
1033 : // This is the order that `ReconstructState` requires such that it can
1034 : // track when a key is done.
1035 17226 : for read in reads.into_iter().rev() {
1036 17226 : let res = vectored_blob_reader
1037 17226 : .read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
1038 8805 : .await;
1039 :
1040 17226 : let blobs_buf = match res {
1041 17226 : Ok(blobs_buf) => blobs_buf,
1042 0 : Err(err) => {
1043 0 : let kind = err.kind();
1044 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
1045 0 : reconstruct_state.on_key_error(
1046 0 : blob_meta.key,
1047 0 : PageReconstructError::from(anyhow!(
1048 0 : "Failed to read blobs from virtual file {}: {}",
1049 0 : self.file.path,
1050 0 : kind
1051 0 : )),
1052 0 : );
1053 0 : }
1054 :
1055 : // We have "lost" the buffer since the lower level IO api
1056 : // doesn't return the buffer on error. Allocate a new one.
1057 0 : buf = Some(BytesMut::with_capacity(buf_size));
1058 0 :
1059 0 : continue;
1060 : }
1061 : };
1062 :
1063 28141 : for meta in blobs_buf.blobs.iter().rev() {
1064 28141 : if Some(meta.meta.key) == ignore_key_with_err {
1065 0 : continue;
1066 28141 : }
1067 28141 :
1068 28141 : let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
1069 28141 : let value = match value {
1070 28141 : Ok(v) => v,
1071 0 : Err(e) => {
1072 0 : reconstruct_state.on_key_error(
1073 0 : meta.meta.key,
1074 0 : PageReconstructError::from(anyhow!(e).context(format!(
1075 0 : "Failed to deserialize blob from virtual file {}",
1076 0 : self.file.path,
1077 0 : ))),
1078 0 : );
1079 0 :
1080 0 : ignore_key_with_err = Some(meta.meta.key);
1081 0 : continue;
1082 : }
1083 : };
1084 :
1085 : // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
1086 : // state, no further updates shall be made to it. The call below will
1087 : // panic if the invariant is violated.
1088 28141 : reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
1089 : }
1090 :
1091 17226 : buf = Some(blobs_buf.buf);
1092 : }
1093 252 : }
1094 :
1095 326 : pub(super) async fn load_keys<'a>(
1096 326 : &'a self,
1097 326 : ctx: &RequestContext,
1098 326 : ) -> Result<Vec<DeltaEntry<'a>>> {
1099 326 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1100 326 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1101 326 : self.index_start_blk,
1102 326 : self.index_root_blk,
1103 326 : block_reader,
1104 326 : );
1105 326 :
1106 326 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1107 326 :
1108 326 : tree_reader
1109 326 : .visit(
1110 326 : &[0u8; DELTA_KEY_SIZE],
1111 326 : VisitDirection::Forwards,
1112 2064046 : |key, value| {
1113 2064046 : let delta_key = DeltaKey::from_slice(key);
1114 2064046 : let val_ref = ValueRef {
1115 2064046 : blob_ref: BlobRef(value),
1116 2064046 : reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
1117 2064046 : Adapter(self),
1118 2064046 : )),
1119 2064046 : };
1120 2064046 : let pos = BlobRef(value).pos();
1121 2064046 : if let Some(last) = all_keys.last_mut() {
1122 2063720 : // subtract offset of the current and last entries to get the size
1123 2063720 : // of the value associated with this (key, lsn) tuple
1124 2063720 : let first_pos = last.size;
1125 2063720 : last.size = pos - first_pos;
1126 2063720 : }
1127 2064046 : let entry = DeltaEntry {
1128 2064046 : key: delta_key.key(),
1129 2064046 : lsn: delta_key.lsn(),
1130 2064046 : size: pos,
1131 2064046 : val: val_ref,
1132 2064046 : };
1133 2064046 : all_keys.push(entry);
1134 2064046 : true
1135 2064046 : },
1136 326 : &RequestContextBuilder::extend(ctx)
1137 326 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1138 326 : .build(),
1139 326 : )
1140 2147 : .await?;
1141 326 : if let Some(last) = all_keys.last_mut() {
1142 326 : // Last key occupies all space till end of value storage,
1143 326 : // which corresponds to beginning of the index
1144 326 : last.size = self.index_start_offset() - last.size;
1145 326 : }
1146 326 : Ok(all_keys)
1147 326 : }
1148 :
1149 : /// Using the given writer, write out a version which has the earlier Lsns than `until`.
1150 : ///
1151 : /// Return the amount of key value records pushed to the writer.
1152 10 : pub(super) async fn copy_prefix(
1153 10 : &self,
1154 10 : writer: &mut DeltaLayerWriter,
1155 10 : until: Lsn,
1156 10 : ctx: &RequestContext,
1157 10 : ) -> anyhow::Result<usize> {
1158 10 : use crate::tenant::vectored_blob_io::{
1159 10 : BlobMeta, VectoredReadBuilder, VectoredReadExtended,
1160 10 : };
1161 10 : use futures::stream::TryStreamExt;
1162 10 :
1163 10 : #[derive(Debug)]
1164 10 : enum Item {
1165 10 : Actual(Key, Lsn, BlobRef),
1166 10 : Sentinel,
1167 10 : }
1168 10 :
1169 10 : impl From<Item> for Option<(Key, Lsn, BlobRef)> {
1170 70 : fn from(value: Item) -> Self {
1171 70 : match value {
1172 60 : Item::Actual(key, lsn, blob) => Some((key, lsn, blob)),
1173 10 : Item::Sentinel => None,
1174 10 : }
1175 70 : }
1176 10 : }
1177 10 :
1178 10 : impl Item {
1179 70 : fn offset(&self) -> Option<BlobRef> {
1180 70 : match self {
1181 60 : Item::Actual(_, _, blob) => Some(*blob),
1182 10 : Item::Sentinel => None,
1183 10 : }
1184 70 : }
1185 10 :
1186 70 : fn is_last(&self) -> bool {
1187 70 : matches!(self, Item::Sentinel)
1188 70 : }
1189 10 : }
1190 10 :
1191 10 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1192 10 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1193 10 : self.index_start_blk,
1194 10 : self.index_root_blk,
1195 10 : block_reader,
1196 10 : );
1197 10 :
1198 10 : let stream = self.stream_index_forwards(&tree_reader, &[0u8; DELTA_KEY_SIZE], ctx);
1199 60 : let stream = stream.map_ok(|(key, lsn, pos)| Item::Actual(key, lsn, pos));
1200 10 : // put in a sentinel value for getting the end offset for last item, and not having to
1201 10 : // repeat the whole read part
1202 10 : let stream = stream.chain(futures::stream::once(futures::future::ready(Ok(
1203 10 : Item::Sentinel,
1204 10 : ))));
1205 10 : let mut stream = std::pin::pin!(stream);
1206 10 :
1207 10 : let mut prev: Option<(Key, Lsn, BlobRef)> = None;
1208 10 :
1209 10 : let mut read_builder: Option<VectoredReadBuilder> = None;
1210 10 :
1211 10 : let max_read_size = self
1212 10 : .max_vectored_read_bytes
1213 10 : .map(|x| x.0.get())
1214 10 : .unwrap_or(8192);
1215 10 :
1216 10 : let mut buffer = Some(BytesMut::with_capacity(max_read_size));
1217 10 :
1218 10 : // FIXME: buffering of DeltaLayerWriter
1219 10 : let mut per_blob_copy = Vec::new();
1220 10 :
1221 10 : let mut records = 0;
1222 :
1223 80 : while let Some(item) = stream.try_next().await? {
1224 70 : tracing::debug!(?item, "popped");
1225 70 : let offset = item
1226 70 : .offset()
1227 70 : .unwrap_or(BlobRef::new(self.index_start_offset(), false));
1228 :
1229 70 : let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
1230 60 : let end_offset = offset;
1231 60 :
1232 60 : Some((BlobMeta { key, lsn }, start_offset..end_offset))
1233 : } else {
1234 10 : None
1235 : };
1236 :
1237 70 : let is_last = item.is_last();
1238 70 :
1239 70 : prev = Option::from(item);
1240 70 :
1241 70 : let actionable = actionable.filter(|x| x.0.lsn < until);
1242 :
1243 70 : let builder = if let Some((meta, offsets)) = actionable {
1244 : // extend or create a new builder
1245 32 : if read_builder
1246 32 : .as_mut()
1247 32 : .map(|x| x.extend(offsets.start.pos(), offsets.end.pos(), meta))
1248 32 : .unwrap_or(VectoredReadExtended::No)
1249 32 : == VectoredReadExtended::Yes
1250 : {
1251 16 : None
1252 : } else {
1253 16 : read_builder.replace(VectoredReadBuilder::new(
1254 16 : offsets.start.pos(),
1255 16 : offsets.end.pos(),
1256 16 : meta,
1257 16 : max_read_size,
1258 16 : ))
1259 : }
1260 : } else {
1261 : // nothing to do, except perhaps flush any existing for the last element
1262 38 : None
1263 : };
1264 :
1265 : // flush the possible older builder and also the new one if the item was the last one
1266 70 : let builders = builder.into_iter();
1267 70 : let builders = if is_last {
1268 10 : builders.chain(read_builder.take())
1269 : } else {
1270 60 : builders.chain(None)
1271 : };
1272 :
1273 86 : for builder in builders {
1274 16 : let read = builder.build();
1275 16 :
1276 16 : let reader = VectoredBlobReader::new(&self.file);
1277 16 :
1278 16 : let mut buf = buffer.take().unwrap();
1279 16 :
1280 16 : buf.clear();
1281 16 : buf.reserve(read.size());
1282 16 : let res = reader.read_blobs(&read, buf, ctx).await?;
1283 :
1284 48 : for blob in res.blobs {
1285 32 : let key = blob.meta.key;
1286 32 : let lsn = blob.meta.lsn;
1287 32 : let data = &res.buf[blob.start..blob.end];
1288 32 :
1289 32 : #[cfg(debug_assertions)]
1290 32 : Value::des(data)
1291 32 : .with_context(|| {
1292 0 : format!(
1293 0 : "blob failed to deserialize for {}@{}, {}..{}: {:?}",
1294 0 : blob.meta.key,
1295 0 : blob.meta.lsn,
1296 0 : blob.start,
1297 0 : blob.end,
1298 0 : utils::Hex(data)
1299 0 : )
1300 32 : })
1301 32 : .unwrap();
1302 32 :
1303 32 : // is it an image or will_init walrecord?
1304 32 : // FIXME: this could be handled by threading the BlobRef to the
1305 32 : // VectoredReadBuilder
1306 32 : let will_init = crate::repository::ValueBytes::will_init(data)
1307 32 : .inspect_err(|_e| {
1308 0 : #[cfg(feature = "testing")]
1309 0 : tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
1310 32 : })
1311 32 : .unwrap_or(false);
1312 32 :
1313 32 : per_blob_copy.clear();
1314 32 : per_blob_copy.extend_from_slice(data);
1315 :
1316 32 : let (tmp, res) = writer
1317 32 : .put_value_bytes(
1318 32 : key,
1319 32 : lsn,
1320 32 : std::mem::take(&mut per_blob_copy),
1321 32 : will_init,
1322 32 : ctx,
1323 32 : )
1324 4 : .await;
1325 32 : per_blob_copy = tmp;
1326 32 :
1327 32 : res?;
1328 :
1329 32 : records += 1;
1330 : }
1331 :
1332 16 : buffer = Some(res.buf);
1333 : }
1334 : }
1335 :
1336 10 : assert!(
1337 10 : read_builder.is_none(),
1338 0 : "with the sentinel above loop should had handled all"
1339 : );
1340 :
1341 10 : Ok(records)
1342 10 : }
1343 :
1344 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1345 4 : println!(
1346 4 : "index_start_blk: {}, root {}",
1347 4 : self.index_start_blk, self.index_root_blk
1348 4 : );
1349 4 :
1350 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1351 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1352 4 : self.index_start_blk,
1353 4 : self.index_root_blk,
1354 4 : block_reader,
1355 4 : );
1356 4 :
1357 4 : tree_reader.dump().await?;
1358 :
1359 4 : let keys = self.load_keys(ctx).await?;
1360 :
1361 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1362 8 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1363 8 : let val = Value::des(&buf)?;
1364 8 : let desc = match val {
1365 8 : Value::Image(img) => {
1366 8 : format!(" img {} bytes", img.len())
1367 : }
1368 0 : Value::WalRecord(rec) => {
1369 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
1370 0 : format!(
1371 0 : " rec {} bytes will_init: {} {}",
1372 0 : buf.len(),
1373 0 : rec.will_init(),
1374 0 : wal_desc
1375 0 : )
1376 : }
1377 : };
1378 8 : Ok(desc)
1379 8 : }
1380 :
1381 12 : for entry in keys {
1382 8 : let DeltaEntry { key, lsn, val, .. } = entry;
1383 8 : let desc = match dump_blob(&val, ctx).await {
1384 8 : Ok(desc) => desc,
1385 0 : Err(err) => {
1386 0 : format!("ERROR: {err}")
1387 : }
1388 : };
1389 8 : println!(" key {key} at {lsn}: {desc}");
1390 8 :
1391 8 : // Print more details about CHECKPOINT records. Would be nice to print details
1392 8 : // of many other record types too, but these are particularly interesting, as
1393 8 : // have a lot of special processing for them in walingest.rs.
1394 8 : use pageserver_api::key::CHECKPOINT_KEY;
1395 8 : use postgres_ffi::CheckPoint;
1396 8 : if key == CHECKPOINT_KEY {
1397 0 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1398 0 : let val = Value::des(&buf)?;
1399 0 : match val {
1400 0 : Value::Image(img) => {
1401 0 : let checkpoint = CheckPoint::decode(&img)?;
1402 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1403 : }
1404 0 : Value::WalRecord(_rec) => {
1405 0 : println!(" unexpected walrecord value for checkpoint key");
1406 0 : }
1407 : }
1408 8 : }
1409 : }
1410 :
1411 4 : Ok(())
1412 4 : }
1413 :
1414 30 : fn stream_index_forwards<'a, R>(
1415 30 : &'a self,
1416 30 : reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,
1417 30 : start: &'a [u8; DELTA_KEY_SIZE],
1418 30 : ctx: &'a RequestContext,
1419 30 : ) -> impl futures::stream::Stream<
1420 30 : Item = Result<(Key, Lsn, BlobRef), crate::tenant::disk_btree::DiskBtreeError>,
1421 30 : > + 'a
1422 30 : where
1423 30 : R: BlockReader,
1424 30 : {
1425 30 : use futures::stream::TryStreamExt;
1426 30 : let stream = reader.get_stream_from(start, ctx);
1427 152 : stream.map_ok(|(key, value)| {
1428 152 : let key = DeltaKey::from_slice(&key);
1429 152 : let (key, lsn) = (key.key(), key.lsn());
1430 152 : let offset = BlobRef(value);
1431 152 :
1432 152 : (key, lsn, offset)
1433 152 : })
1434 30 : }
1435 :
1436 : /// The file offset to the first block of index.
1437 : ///
1438 : /// The file structure is summary, values, and index. We often need this for the size of last blob.
1439 648 : fn index_start_offset(&self) -> u64 {
1440 648 : let offset = self.index_start_blk as u64 * PAGE_SZ as u64;
1441 648 : let bref = BlobRef(offset);
1442 648 : tracing::debug!(
1443 : index_start_blk = self.index_start_blk,
1444 : offset,
1445 0 : pos = bref.pos(),
1446 0 : "index_start_offset"
1447 : );
1448 648 : offset
1449 648 : }
1450 : }
1451 :
1452 : /// A set of data associated with a delta layer key and its value
1453 : pub struct DeltaEntry<'a> {
1454 : pub key: Key,
1455 : pub lsn: Lsn,
1456 : /// Size of the stored value
1457 : pub size: u64,
1458 : /// Reference to the on-disk value
1459 : pub val: ValueRef<'a>,
1460 : }
1461 :
1462 : /// Reference to an on-disk value
1463 : pub struct ValueRef<'a> {
1464 : blob_ref: BlobRef,
1465 : reader: BlockCursor<'a>,
1466 : }
1467 :
1468 : impl<'a> ValueRef<'a> {
1469 : /// Loads the value from disk
1470 2064038 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1471 : // theoretically we *could* record an access time for each, but it does not really matter
1472 2064038 : let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
1473 2064038 : let val = Value::des(&buf)?;
1474 2064038 : Ok(val)
1475 2064038 : }
1476 : }
1477 :
1478 : pub(crate) struct Adapter<T>(T);
1479 :
1480 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1481 2083002 : pub(crate) async fn read_blk(
1482 2083002 : &self,
1483 2083002 : blknum: u32,
1484 2083002 : ctx: &RequestContext,
1485 2083002 : ) -> Result<BlockLease, std::io::Error> {
1486 2083002 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1487 2083002 : block_reader.read_blk(blknum, ctx).await
1488 2083002 : }
1489 : }
1490 :
1491 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1492 4166004 : fn as_ref(&self) -> &DeltaLayerInner {
1493 4166004 : self
1494 4166004 : }
1495 : }
1496 :
1497 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1498 0 : fn key(&self) -> Key {
1499 0 : self.key
1500 0 : }
1501 0 : fn lsn(&self) -> Lsn {
1502 0 : self.lsn
1503 0 : }
1504 0 : fn size(&self) -> u64 {
1505 0 : self.size
1506 0 : }
1507 : }
1508 :
1509 : #[cfg(test)]
1510 : mod test {
1511 : use std::collections::BTreeMap;
1512 :
1513 : use itertools::MinMaxResult;
1514 : use rand::prelude::{SeedableRng, SliceRandom, StdRng};
1515 : use rand::RngCore;
1516 :
1517 : use super::*;
1518 : use crate::{
1519 : context::DownloadBehavior,
1520 : task_mgr::TaskKind,
1521 : tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
1522 : DEFAULT_PG_VERSION,
1523 : };
1524 :
1525 : /// Construct an index for a fictional delta layer and and then
1526 : /// traverse in order to plan vectored reads for a query. Finally,
1527 : /// verify that the traversal fed the right index key and value
1528 : /// pairs into the planner.
1529 : #[tokio::test]
1530 2 : async fn test_delta_layer_index_traversal() {
1531 2 : let base_key = Key {
1532 2 : field1: 0,
1533 2 : field2: 1663,
1534 2 : field3: 12972,
1535 2 : field4: 16396,
1536 2 : field5: 0,
1537 2 : field6: 246080,
1538 2 : };
1539 2 :
1540 2 : // Populate the index with some entries
1541 2 : let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
1542 2 : (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
1543 2 : (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1544 2 : (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
1545 2 : (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
1546 2 : ]);
1547 2 :
1548 2 : let mut disk = TestDisk::default();
1549 2 : let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
1550 2 :
1551 2 : let mut disk_offset = 0;
1552 10 : for (key, lsns) in &entries {
1553 42 : for lsn in lsns {
1554 34 : let index_key = DeltaKey::from_key_lsn(key, *lsn);
1555 34 : let blob_ref = BlobRef::new(disk_offset, false);
1556 34 : writer
1557 34 : .append(&index_key.0, blob_ref.0)
1558 34 : .expect("In memory disk append should never fail");
1559 34 :
1560 34 : disk_offset += 1;
1561 34 : }
1562 2 : }
1563 2 :
1564 2 : // Prepare all the arguments for the call into `plan_reads` below
1565 2 : let (root_offset, _writer) = writer
1566 2 : .finish()
1567 2 : .expect("In memory disk finish should never fail");
1568 2 : let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
1569 2 : let planner = VectoredReadPlanner::new(100);
1570 2 : let mut reconstruct_state = ValuesReconstructState::new();
1571 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1572 2 :
1573 2 : let keyspace = KeySpace {
1574 2 : ranges: vec![
1575 2 : base_key..base_key.add(3),
1576 2 : base_key.add(3)..base_key.add(100),
1577 2 : ],
1578 2 : };
1579 2 : let lsn_range = Lsn(2)..Lsn(40);
1580 2 :
1581 2 : // Plan and validate
1582 2 : let vectored_reads = DeltaLayerInner::plan_reads(
1583 2 : &keyspace,
1584 2 : lsn_range.clone(),
1585 2 : disk_offset,
1586 2 : reader,
1587 2 : planner,
1588 2 : &mut reconstruct_state,
1589 2 : &ctx,
1590 2 : )
1591 2 : .await
1592 2 : .expect("Read planning should not fail");
1593 2 :
1594 2 : validate(keyspace, lsn_range, vectored_reads, entries);
1595 2 : }
1596 :
1597 2 : fn validate(
1598 2 : keyspace: KeySpace,
1599 2 : lsn_range: Range<Lsn>,
1600 2 : vectored_reads: Vec<VectoredRead>,
1601 2 : index_entries: BTreeMap<Key, Vec<Lsn>>,
1602 2 : ) {
1603 2 : #[derive(Debug, PartialEq, Eq)]
1604 2 : struct BlobSpec {
1605 2 : key: Key,
1606 2 : lsn: Lsn,
1607 2 : at: u64,
1608 2 : }
1609 2 :
1610 2 : let mut planned_blobs = Vec::new();
1611 8 : for read in vectored_reads {
1612 28 : for (at, meta) in read.blobs_at.as_slice() {
1613 28 : planned_blobs.push(BlobSpec {
1614 28 : key: meta.key,
1615 28 : lsn: meta.lsn,
1616 28 : at: *at,
1617 28 : });
1618 28 : }
1619 : }
1620 :
1621 2 : let mut expected_blobs = Vec::new();
1622 2 : let mut disk_offset = 0;
1623 10 : for (key, lsns) in index_entries {
1624 42 : for lsn in lsns {
1625 42 : let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
1626 34 : let lsn_included = lsn_range.contains(&lsn);
1627 34 :
1628 34 : if key_included && lsn_included {
1629 28 : expected_blobs.push(BlobSpec {
1630 28 : key,
1631 28 : lsn,
1632 28 : at: disk_offset,
1633 28 : });
1634 28 : }
1635 :
1636 34 : disk_offset += 1;
1637 : }
1638 : }
1639 :
1640 2 : assert_eq!(planned_blobs, expected_blobs);
1641 2 : }
1642 :
1643 : mod constants {
1644 : use utils::lsn::Lsn;
1645 :
1646 : /// Offset used by all lsns in this test
1647 : pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
1648 : /// Number of unique keys including in the test data
1649 : pub(super) const KEY_COUNT: u8 = 60;
1650 : /// Max number of different lsns for each key
1651 : pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
1652 : /// Possible value sizes for each key along with a probability weight
1653 : pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
1654 : /// Probability that there will be a gap between the current key and the next one (33.3%)
1655 : pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
1656 : /// The minimum size of a key range in all the generated reads
1657 : pub(super) const MIN_RANGE_SIZE: i128 = 10;
1658 : /// The number of ranges included in each vectored read
1659 : pub(super) const RANGES_COUNT: u8 = 2;
1660 : /// The number of vectored reads performed
1661 : pub(super) const READS_COUNT: u8 = 100;
1662 : /// Soft max size of a vectored read. Will be violated if we have to read keys
1663 : /// with values larger than the limit
1664 : pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
1665 : }
1666 :
1667 : struct Entry {
1668 : key: Key,
1669 : lsn: Lsn,
1670 : value: Vec<u8>,
1671 : }
1672 :
1673 2 : fn generate_entries(rng: &mut StdRng) -> Vec<Entry> {
1674 2 : let mut current_key = Key::MIN;
1675 2 :
1676 2 : let mut entries = Vec::new();
1677 122 : for _ in 0..constants::KEY_COUNT {
1678 120 : let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
1679 120 : let mut lsns_iter =
1680 2260 : std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
1681 2260 : Some(Lsn(lsn.0 + 0x08))
1682 2260 : });
1683 120 : let mut lsns = Vec::new();
1684 2380 : while lsns.len() < count as usize {
1685 2260 : let take = rng.gen_bool(0.5);
1686 2260 : let lsn = lsns_iter.next().unwrap();
1687 2260 : if take {
1688 1112 : lsns.push(lsn);
1689 1148 : }
1690 : }
1691 :
1692 1232 : for lsn in lsns {
1693 1112 : let size = constants::VALUE_SIZES
1694 3336 : .choose_weighted(rng, |item| item.1)
1695 1112 : .unwrap()
1696 1112 : .0;
1697 1112 : let mut buf = vec![0; size];
1698 1112 : rng.fill_bytes(&mut buf);
1699 1112 :
1700 1112 : entries.push(Entry {
1701 1112 : key: current_key,
1702 1112 : lsn,
1703 1112 : value: buf,
1704 1112 : })
1705 : }
1706 :
1707 120 : let gap = constants::KEY_GAP_CHANGES
1708 240 : .choose_weighted(rng, |item| item.1)
1709 120 : .unwrap()
1710 120 : .0;
1711 120 : if gap {
1712 38 : current_key = current_key.add(2);
1713 82 : } else {
1714 82 : current_key = current_key.add(1);
1715 82 : }
1716 : }
1717 :
1718 2 : entries
1719 2 : }
1720 :
1721 : struct EntriesMeta {
1722 : key_range: Range<Key>,
1723 : lsn_range: Range<Lsn>,
1724 : index: BTreeMap<(Key, Lsn), Vec<u8>>,
1725 : }
1726 :
1727 2 : fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
1728 1112 : let key_range = match entries.iter().minmax_by_key(|e| e.key) {
1729 2 : MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
1730 0 : _ => panic!("More than one entry is always expected"),
1731 : };
1732 :
1733 1112 : let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
1734 2 : MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
1735 0 : _ => panic!("More than one entry is always expected"),
1736 : };
1737 :
1738 2 : let mut index = BTreeMap::new();
1739 1112 : for entry in entries.iter() {
1740 1112 : index.insert((entry.key, entry.lsn), entry.value.clone());
1741 1112 : }
1742 :
1743 2 : EntriesMeta {
1744 2 : key_range,
1745 2 : lsn_range,
1746 2 : index,
1747 2 : }
1748 2 : }
1749 :
1750 200 : fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range<Key>) -> KeySpace {
1751 200 : let start = key_range.start.to_i128();
1752 200 : let end = key_range.end.to_i128();
1753 200 :
1754 200 : let mut keyspace = KeySpace::default();
1755 :
1756 600 : for _ in 0..constants::RANGES_COUNT {
1757 400 : let mut range: Option<Range<Key>> = Option::default();
1758 1244 : while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
1759 844 : let range_start = rng.gen_range(start..end);
1760 844 : let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
1761 844 : if range_end_offset >= end {
1762 100 : range = Some(Key::from_i128(range_start)..Key::from_i128(end));
1763 744 : } else {
1764 744 : let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
1765 744 : range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
1766 744 : }
1767 : }
1768 400 : keyspace.ranges.push(range.unwrap());
1769 : }
1770 :
1771 200 : keyspace
1772 200 : }
1773 :
1774 : #[tokio::test]
1775 2 : async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
1776 2 : let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?;
1777 8 : let (tenant, ctx) = harness.load().await;
1778 2 :
1779 2 : let timeline_id = TimelineId::generate();
1780 2 : let timeline = tenant
1781 2 : .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
1782 6 : .await?;
1783 2 :
1784 2 : tracing::info!("Generating test data ...");
1785 2 :
1786 2 : let rng = &mut StdRng::seed_from_u64(0);
1787 2 : let entries = generate_entries(rng);
1788 2 : let entries_meta = get_entries_meta(&entries);
1789 2 :
1790 2 : tracing::info!("Done generating {} entries", entries.len());
1791 2 :
1792 2 : tracing::info!("Writing test data to delta layer ...");
1793 2 : let mut writer = DeltaLayerWriter::new(
1794 2 : harness.conf,
1795 2 : timeline_id,
1796 2 : harness.tenant_shard_id,
1797 2 : entries_meta.key_range.start,
1798 2 : entries_meta.lsn_range.clone(),
1799 2 : &ctx,
1800 2 : )
1801 2 : .await?;
1802 2 :
1803 1114 : for entry in entries {
1804 1112 : let (_, res) = writer
1805 1112 : .put_value_bytes(entry.key, entry.lsn, entry.value, false, &ctx)
1806 215 : .await;
1807 1112 : res?;
1808 2 : }
1809 2 :
1810 2 : let resident = writer
1811 2 : .finish(entries_meta.key_range.end, &timeline, &ctx)
1812 5 : .await?;
1813 2 :
1814 2 : let inner = resident.as_delta(&ctx).await?;
1815 2 :
1816 2 : let file_size = inner.file.metadata().await?.len();
1817 2 : tracing::info!(
1818 2 : "Done writing test data to delta layer. Resulting file size is: {}",
1819 2 : file_size
1820 2 : );
1821 2 :
1822 202 : for i in 0..constants::READS_COUNT {
1823 200 : tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
1824 2 :
1825 200 : let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
1826 200 : let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1827 200 : inner.index_start_blk,
1828 200 : inner.index_root_blk,
1829 200 : block_reader,
1830 200 : );
1831 200 :
1832 200 : let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
1833 200 : let mut reconstruct_state = ValuesReconstructState::new();
1834 200 : let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
1835 200 : let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
1836 2 :
1837 200 : let vectored_reads = DeltaLayerInner::plan_reads(
1838 200 : &keyspace,
1839 200 : entries_meta.lsn_range.clone(),
1840 200 : data_end_offset,
1841 200 : index_reader,
1842 200 : planner,
1843 200 : &mut reconstruct_state,
1844 200 : &ctx,
1845 200 : )
1846 4 : .await?;
1847 2 :
1848 200 : let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
1849 200 : let buf_size = DeltaLayerInner::get_min_read_buffer_size(
1850 200 : &vectored_reads,
1851 200 : constants::MAX_VECTORED_READ_BYTES,
1852 200 : );
1853 200 : let mut buf = Some(BytesMut::with_capacity(buf_size));
1854 2 :
1855 19924 : for read in vectored_reads {
1856 19724 : let blobs_buf = vectored_blob_reader
1857 19724 : .read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
1858 10016 : .await?;
1859 57304 : for meta in blobs_buf.blobs.iter() {
1860 57304 : let value = &blobs_buf.buf[meta.start..meta.end];
1861 57304 : assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
1862 2 : }
1863 2 :
1864 19724 : buf = Some(blobs_buf.buf);
1865 2 : }
1866 2 : }
1867 2 :
1868 2 : Ok(())
1869 2 : }
1870 :
1871 : #[tokio::test]
1872 2 : async fn copy_delta_prefix_smoke() {
1873 2 : use crate::walrecord::NeonWalRecord;
1874 2 : use bytes::Bytes;
1875 2 :
1876 2 : let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
1877 8 : let (tenant, ctx) = h.load().await;
1878 2 : let ctx = &ctx;
1879 2 : let timeline = tenant
1880 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx)
1881 6 : .await
1882 2 : .unwrap();
1883 2 :
1884 2 : let initdb_layer = timeline
1885 2 : .layers
1886 2 : .read()
1887 2 : .await
1888 2 : .likely_resident_layers()
1889 2 : .next()
1890 2 : .unwrap();
1891 2 :
1892 2 : {
1893 2 : let mut writer = timeline.writer().await;
1894 2 :
1895 2 : let data = [
1896 2 : (0x20, 12, Value::Image(Bytes::from_static(b"foobar"))),
1897 2 : (
1898 2 : 0x30,
1899 2 : 12,
1900 2 : Value::WalRecord(NeonWalRecord::Postgres {
1901 2 : will_init: false,
1902 2 : rec: Bytes::from_static(b"1"),
1903 2 : }),
1904 2 : ),
1905 2 : (
1906 2 : 0x40,
1907 2 : 12,
1908 2 : Value::WalRecord(NeonWalRecord::Postgres {
1909 2 : will_init: true,
1910 2 : rec: Bytes::from_static(b"2"),
1911 2 : }),
1912 2 : ),
1913 2 : // build an oversized value so we cannot extend and existing read over
1914 2 : // this
1915 2 : (
1916 2 : 0x50,
1917 2 : 12,
1918 2 : Value::WalRecord(NeonWalRecord::Postgres {
1919 2 : will_init: true,
1920 2 : rec: {
1921 2 : let mut buf =
1922 2 : vec![0u8; tenant.conf.max_vectored_read_bytes.0.get() + 1024];
1923 2 : buf.iter_mut()
1924 2 : .enumerate()
1925 264192 : .for_each(|(i, slot)| *slot = (i % 256) as u8);
1926 2 : Bytes::from(buf)
1927 2 : },
1928 2 : }),
1929 2 : ),
1930 2 : // because the oversized read cannot be extended further, we are sure to exercise the
1931 2 : // builder created on the last round with this:
1932 2 : (
1933 2 : 0x60,
1934 2 : 12,
1935 2 : Value::WalRecord(NeonWalRecord::Postgres {
1936 2 : will_init: true,
1937 2 : rec: Bytes::from_static(b"3"),
1938 2 : }),
1939 2 : ),
1940 2 : (
1941 2 : 0x60,
1942 2 : 9,
1943 2 : Value::Image(Bytes::from_static(b"something for a different key")),
1944 2 : ),
1945 2 : ];
1946 2 :
1947 2 : let mut last_lsn = None;
1948 2 :
1949 14 : for (lsn, key, value) in data {
1950 12 : let key = Key::from_i128(key);
1951 12 : writer.put(key, Lsn(lsn), &value, ctx).await.unwrap();
1952 12 : last_lsn = Some(lsn);
1953 2 : }
1954 2 :
1955 2 : writer.finish_write(Lsn(last_lsn.unwrap()));
1956 2 : }
1957 2 : timeline.freeze_and_flush().await.unwrap();
1958 2 :
1959 2 : let new_layer = timeline
1960 2 : .layers
1961 2 : .read()
1962 2 : .await
1963 2 : .likely_resident_layers()
1964 4 : .find(|x| x != &initdb_layer)
1965 2 : .unwrap();
1966 2 :
1967 2 : // create a copy for the timeline, so we don't overwrite the file
1968 2 : let branch = tenant
1969 2 : .branch_timeline_test(&timeline, TimelineId::generate(), None, ctx)
1970 2 : .await
1971 2 : .unwrap();
1972 2 :
1973 2 : assert_eq!(branch.get_ancestor_lsn(), Lsn(0x60));
1974 2 :
1975 2 : // truncating at 0x61 gives us a full copy, otherwise just go backwards until there's just
1976 2 : // a single key
1977 2 :
1978 12 : for truncate_at in [0x61, 0x51, 0x41, 0x31, 0x21] {
1979 10 : let truncate_at = Lsn(truncate_at);
1980 2 :
1981 10 : let mut writer = DeltaLayerWriter::new(
1982 10 : tenant.conf,
1983 10 : branch.timeline_id,
1984 10 : tenant.tenant_shard_id,
1985 10 : Key::MIN,
1986 10 : Lsn(0x11)..truncate_at,
1987 10 : ctx,
1988 10 : )
1989 5 : .await
1990 10 : .unwrap();
1991 2 :
1992 10 : let new_layer = new_layer.download_and_keep_resident().await.unwrap();
1993 10 :
1994 10 : new_layer
1995 10 : .copy_delta_prefix(&mut writer, truncate_at, ctx)
1996 15 : .await
1997 10 : .unwrap();
1998 2 :
1999 24 : let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap();
2000 10 :
2001 11 : copied_layer.as_delta(ctx).await.unwrap();
2002 10 :
2003 10 : assert_keys_and_values_eq(
2004 10 : new_layer.as_delta(ctx).await.unwrap(),
2005 10 : copied_layer.as_delta(ctx).await.unwrap(),
2006 10 : truncate_at,
2007 10 : ctx,
2008 2 : )
2009 58 : .await;
2010 2 : }
2011 2 : }
2012 :
2013 10 : async fn assert_keys_and_values_eq(
2014 10 : source: &DeltaLayerInner,
2015 10 : truncated: &DeltaLayerInner,
2016 10 : truncated_at: Lsn,
2017 10 : ctx: &RequestContext,
2018 10 : ) {
2019 10 : use futures::future::ready;
2020 10 : use futures::stream::TryStreamExt;
2021 10 :
2022 10 : let start_key = [0u8; DELTA_KEY_SIZE];
2023 10 :
2024 10 : let source_reader = FileBlockReader::new(&source.file, source.file_id);
2025 10 : let source_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2026 10 : source.index_start_blk,
2027 10 : source.index_root_blk,
2028 10 : &source_reader,
2029 10 : );
2030 10 : let source_stream = source.stream_index_forwards(&source_tree, &start_key, ctx);
2031 60 : let source_stream = source_stream.filter(|res| match res {
2032 60 : Ok((_, lsn, _)) => ready(lsn < &truncated_at),
2033 0 : _ => ready(true),
2034 60 : });
2035 10 : let mut source_stream = std::pin::pin!(source_stream);
2036 10 :
2037 10 : let truncated_reader = FileBlockReader::new(&truncated.file, truncated.file_id);
2038 10 : let truncated_tree = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
2039 10 : truncated.index_start_blk,
2040 10 : truncated.index_root_blk,
2041 10 : &truncated_reader,
2042 10 : );
2043 10 : let truncated_stream = truncated.stream_index_forwards(&truncated_tree, &start_key, ctx);
2044 10 : let mut truncated_stream = std::pin::pin!(truncated_stream);
2045 10 :
2046 10 : let mut scratch_left = Vec::new();
2047 10 : let mut scratch_right = Vec::new();
2048 :
2049 : loop {
2050 42 : let (src, truncated) = (source_stream.try_next(), truncated_stream.try_next());
2051 42 : let (src, truncated) = tokio::try_join!(src, truncated).unwrap();
2052 42 :
2053 42 : if src.is_none() {
2054 10 : assert!(truncated.is_none());
2055 10 : break;
2056 32 : }
2057 32 :
2058 32 : let (src, truncated) = (src.unwrap(), truncated.unwrap());
2059 32 :
2060 32 : // because we've filtered the source with Lsn, we should always have the same keys from both.
2061 32 : assert_eq!(src.0, truncated.0);
2062 32 : assert_eq!(src.1, truncated.1);
2063 :
2064 : // if this is needed for something else, just drop this assert.
2065 32 : assert!(
2066 32 : src.2.pos() >= truncated.2.pos(),
2067 0 : "value position should not go backwards {} vs. {}",
2068 0 : src.2.pos(),
2069 0 : truncated.2.pos()
2070 : );
2071 :
2072 32 : scratch_left.clear();
2073 32 : let src_cursor = source_reader.block_cursor();
2074 32 : let left = src_cursor.read_blob_into_buf(src.2.pos(), &mut scratch_left, ctx);
2075 32 : scratch_right.clear();
2076 32 : let trunc_cursor = truncated_reader.block_cursor();
2077 32 : let right = trunc_cursor.read_blob_into_buf(truncated.2.pos(), &mut scratch_right, ctx);
2078 :
2079 32 : tokio::try_join!(left, right).unwrap();
2080 32 :
2081 32 : assert_eq!(utils::Hex(&scratch_left), utils::Hex(&scratch_right));
2082 : }
2083 10 : }
2084 : }
|