Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "index", and
24 : //! "values". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::PAGE_SZ;
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
37 : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
38 : use crate::tenant::timeline::GetVectoredError;
39 : use crate::tenant::{PageReconstructError, Timeline};
40 : use crate::virtual_file::{self, VirtualFile};
41 : use crate::{walrecord, TEMP_FILE_SUFFIX};
42 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
43 : use anyhow::{anyhow, bail, ensure, Context, Result};
44 : use camino::{Utf8Path, Utf8PathBuf};
45 : use pageserver_api::keyspace::KeySpace;
46 : use pageserver_api::models::LayerAccessKind;
47 : use pageserver_api::shard::TenantShardId;
48 : use rand::{distributions::Alphanumeric, Rng};
49 : use serde::{Deserialize, Serialize};
50 : use std::collections::BTreeMap;
51 : use std::fs::File;
52 : use std::io::SeekFrom;
53 : use std::ops::Range;
54 : use std::os::unix::fs::FileExt;
55 : use std::sync::Arc;
56 : use tokio::sync::OnceCell;
57 : use tracing::*;
58 :
59 : use utils::{
60 : bin_ser::BeSer,
61 : id::{TenantId, TimelineId},
62 : lsn::Lsn,
63 : };
64 :
65 : use super::{
66 : AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValueReconstructSituation,
67 : ValuesReconstructState,
68 : };
69 :
70 : ///
71 : /// Header stored in the beginning of the file
72 : ///
73 : /// After this comes the 'values' part, starting on block 1. After that,
74 : /// the 'index' starts at the block indicated by 'index_start_blk'
75 : ///
76 480 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
77 : pub struct Summary {
78 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
79 : pub magic: u16,
80 : pub format_version: u16,
81 :
82 : pub tenant_id: TenantId,
83 : pub timeline_id: TimelineId,
84 : pub key_range: Range<Key>,
85 : pub lsn_range: Range<Lsn>,
86 :
87 : /// Block number where the 'index' part of the file begins.
88 : pub index_start_blk: u32,
89 : /// Block within the 'index', where the B-tree root page is stored
90 : pub index_root_blk: u32,
91 : }
92 :
93 : impl From<&DeltaLayer> for Summary {
94 0 : fn from(layer: &DeltaLayer) -> Self {
95 0 : Self::expected(
96 0 : layer.desc.tenant_shard_id.tenant_id,
97 0 : layer.desc.timeline_id,
98 0 : layer.desc.key_range.clone(),
99 0 : layer.desc.lsn_range.clone(),
100 0 : )
101 0 : }
102 : }
103 :
104 : impl Summary {
105 440 : pub(super) fn expected(
106 440 : tenant_id: TenantId,
107 440 : timeline_id: TimelineId,
108 440 : keys: Range<Key>,
109 440 : lsns: Range<Lsn>,
110 440 : ) -> Self {
111 440 : Self {
112 440 : magic: DELTA_FILE_MAGIC,
113 440 : format_version: STORAGE_FORMAT_VERSION,
114 440 :
115 440 : tenant_id,
116 440 : timeline_id,
117 440 : key_range: keys,
118 440 : lsn_range: lsns,
119 440 :
120 440 : index_start_blk: 0,
121 440 : index_root_blk: 0,
122 440 : }
123 440 : }
124 : }
125 :
126 : // Flag indicating that this version initialize the page
127 : const WILL_INIT: u64 = 1;
128 :
129 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
130 : /// offset, and for WAL records it also contains `will_init` flag. The flag
131 : /// helps to determine the range of records that needs to be applied, without
132 : /// reading/deserializing records themselves.
133 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
134 : pub struct BlobRef(pub u64);
135 :
136 : impl BlobRef {
137 72371 : pub fn will_init(&self) -> bool {
138 72371 : (self.0 & WILL_INIT) != 0
139 72371 : }
140 :
141 4276387 : pub fn pos(&self) -> u64 {
142 4276387 : self.0 >> 1
143 4276387 : }
144 :
145 4311466 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
146 4311466 : let mut blob_ref = pos << 1;
147 4311466 : if will_init {
148 4311466 : blob_ref |= WILL_INIT;
149 4311466 : }
150 4311466 : BlobRef(blob_ref)
151 4311466 : }
152 : }
153 :
154 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
155 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
156 :
157 : /// This is the key of the B-tree index stored in the delta layer. It consists
158 : /// of the serialized representation of a Key and LSN.
159 : impl DeltaKey {
160 2102008 : fn from_slice(buf: &[u8]) -> Self {
161 2102008 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
162 2102008 : bytes.copy_from_slice(buf);
163 2102008 : DeltaKey(bytes)
164 2102008 : }
165 :
166 4434815 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
167 4434815 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
168 4434815 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
169 4434815 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
170 4434815 : DeltaKey(bytes)
171 4434815 : }
172 :
173 2102008 : fn key(&self) -> Key {
174 2102008 : Key::from_slice(&self.0)
175 2102008 : }
176 :
177 2102008 : fn lsn(&self) -> Lsn {
178 2102008 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
179 2102008 : }
180 :
181 72381 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
182 72381 : let mut lsn_buf = [0u8; 8];
183 72381 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
184 72381 : Lsn(u64::from_be_bytes(lsn_buf))
185 72381 : }
186 : }
187 :
188 : /// This is used only from `pagectl`. Within pageserver, all layers are
189 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
190 : pub struct DeltaLayer {
191 : path: Utf8PathBuf,
192 : pub desc: PersistentLayerDesc,
193 : access_stats: LayerAccessStats,
194 : inner: OnceCell<Arc<DeltaLayerInner>>,
195 : }
196 :
197 : impl std::fmt::Debug for DeltaLayer {
198 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 0 : use super::RangeDisplayDebug;
200 0 :
201 0 : f.debug_struct("DeltaLayer")
202 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
203 0 : .field("lsn_range", &self.desc.lsn_range)
204 0 : .field("file_size", &self.desc.file_size)
205 0 : .field("inner", &self.inner)
206 0 : .finish()
207 0 : }
208 : }
209 :
210 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
211 : /// file.
212 : pub struct DeltaLayerInner {
213 : // values copied from summary
214 : index_start_blk: u32,
215 : index_root_blk: u32,
216 :
217 : /// Reader object for reading blocks from the file.
218 : file: FileBlockReader,
219 : }
220 :
221 : impl std::fmt::Debug for DeltaLayerInner {
222 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 0 : f.debug_struct("DeltaLayerInner")
224 0 : .field("index_start_blk", &self.index_start_blk)
225 0 : .field("index_root_blk", &self.index_root_blk)
226 0 : .finish()
227 0 : }
228 : }
229 :
230 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
231 : impl std::fmt::Display for DeltaLayer {
232 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 0 : write!(f, "{}", self.layer_desc().short_id())
234 0 : }
235 : }
236 :
237 : impl AsLayerDesc for DeltaLayer {
238 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
239 0 : &self.desc
240 0 : }
241 : }
242 :
243 : impl DeltaLayer {
244 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
245 0 : self.desc.dump();
246 0 :
247 0 : if !verbose {
248 0 : return Ok(());
249 0 : }
250 :
251 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
252 :
253 0 : inner.dump(ctx).await
254 0 : }
255 :
256 480 : fn temp_path_for(
257 480 : conf: &PageServerConf,
258 480 : tenant_shard_id: &TenantShardId,
259 480 : timeline_id: &TimelineId,
260 480 : key_start: Key,
261 480 : lsn_range: &Range<Lsn>,
262 480 : ) -> Utf8PathBuf {
263 480 : let rand_string: String = rand::thread_rng()
264 480 : .sample_iter(&Alphanumeric)
265 480 : .take(8)
266 480 : .map(char::from)
267 480 : .collect();
268 480 :
269 480 : conf.timeline_path(tenant_shard_id, timeline_id)
270 480 : .join(format!(
271 480 : "{}-XXX__{:016X}-{:016X}.{}.{}",
272 480 : key_start,
273 480 : u64::from(lsn_range.start),
274 480 : u64::from(lsn_range.end),
275 480 : rand_string,
276 480 : TEMP_FILE_SUFFIX,
277 480 : ))
278 480 : }
279 :
280 : ///
281 : /// Open the underlying file and read the metadata into memory, if it's
282 : /// not loaded already.
283 : ///
284 0 : async fn load(
285 0 : &self,
286 0 : access_kind: LayerAccessKind,
287 0 : ctx: &RequestContext,
288 0 : ) -> Result<&Arc<DeltaLayerInner>> {
289 0 : self.access_stats.record_access(access_kind, ctx);
290 0 : // Quick exit if already loaded
291 0 : self.inner
292 0 : .get_or_try_init(|| self.load_inner(ctx))
293 0 : .await
294 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
295 0 : }
296 :
297 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
298 0 : let path = self.path();
299 :
300 0 : let loaded = DeltaLayerInner::load(&path, None, ctx)
301 0 : .await
302 0 : .and_then(|res| res)?;
303 :
304 : // not production code
305 0 : let actual_filename = path.file_name().unwrap().to_owned();
306 0 : let expected_filename = self.layer_desc().filename().file_name();
307 0 :
308 0 : if actual_filename != expected_filename {
309 0 : println!("warning: filename does not match what is expected from in-file summary");
310 0 : println!("actual: {:?}", actual_filename);
311 0 : println!("expected: {:?}", expected_filename);
312 0 : }
313 :
314 0 : Ok(Arc::new(loaded))
315 0 : }
316 :
317 : /// Create a DeltaLayer struct representing an existing file on disk.
318 : ///
319 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
320 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
321 0 : let mut summary_buf = vec![0; PAGE_SZ];
322 0 : file.read_exact_at(&mut summary_buf, 0)?;
323 0 : let summary = Summary::des_prefix(&summary_buf)?;
324 :
325 0 : let metadata = file
326 0 : .metadata()
327 0 : .context("get file metadata to determine size")?;
328 :
329 : // This function is never used for constructing layers in a running pageserver,
330 : // so it does not need an accurate TenantShardId.
331 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
332 0 :
333 0 : Ok(DeltaLayer {
334 0 : path: path.to_path_buf(),
335 0 : desc: PersistentLayerDesc::new_delta(
336 0 : tenant_shard_id,
337 0 : summary.timeline_id,
338 0 : summary.key_range,
339 0 : summary.lsn_range,
340 0 : metadata.len(),
341 0 : ),
342 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
343 0 : inner: OnceCell::new(),
344 0 : })
345 0 : }
346 :
347 : /// Path to the layer file in pageserver workdir.
348 0 : fn path(&self) -> Utf8PathBuf {
349 0 : self.path.clone()
350 0 : }
351 : }
352 :
353 : /// A builder object for constructing a new delta layer.
354 : ///
355 : /// Usage:
356 : ///
357 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
358 : ///
359 : /// 2. Write the contents by calling `put_value` for every page
360 : /// version to store in the layer.
361 : ///
362 : /// 3. Call `finish`.
363 : ///
364 : struct DeltaLayerWriterInner {
365 : conf: &'static PageServerConf,
366 : pub path: Utf8PathBuf,
367 : timeline_id: TimelineId,
368 : tenant_shard_id: TenantShardId,
369 :
370 : key_start: Key,
371 : lsn_range: Range<Lsn>,
372 :
373 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
374 :
375 : blob_writer: BlobWriter<true>,
376 : }
377 :
378 : impl DeltaLayerWriterInner {
379 : ///
380 : /// Start building a new delta layer.
381 : ///
382 480 : async fn new(
383 480 : conf: &'static PageServerConf,
384 480 : timeline_id: TimelineId,
385 480 : tenant_shard_id: TenantShardId,
386 480 : key_start: Key,
387 480 : lsn_range: Range<Lsn>,
388 480 : ) -> anyhow::Result<Self> {
389 480 : // Create the file initially with a temporary filename. We don't know
390 480 : // the end key yet, so we cannot form the final filename yet. We will
391 480 : // rename it when we're done.
392 480 : //
393 480 : // Note: This overwrites any existing file. There shouldn't be any.
394 480 : // FIXME: throw an error instead?
395 480 : let path =
396 480 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
397 :
398 480 : let mut file = VirtualFile::create(&path).await?;
399 : // make room for the header block
400 480 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
401 480 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
402 480 :
403 480 : // Initialize the b-tree index builder
404 480 : let block_buf = BlockBuf::new();
405 480 : let tree_builder = DiskBtreeBuilder::new(block_buf);
406 480 :
407 480 : Ok(Self {
408 480 : conf,
409 480 : path,
410 480 : timeline_id,
411 480 : tenant_shard_id,
412 480 : key_start,
413 480 : lsn_range,
414 480 : tree: tree_builder,
415 480 : blob_writer,
416 480 : })
417 480 : }
418 :
419 : ///
420 : /// Append a key-value pair to the file.
421 : ///
422 : /// The values must be appended in key, lsn order.
423 : ///
424 2102000 : async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
425 2102000 : let (_, res) = self
426 2102000 : .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init())
427 158 : .await;
428 2102000 : res
429 2102000 : }
430 :
431 4311466 : async fn put_value_bytes(
432 4311466 : &mut self,
433 4311466 : key: Key,
434 4311466 : lsn: Lsn,
435 4311466 : val: Vec<u8>,
436 4311466 : will_init: bool,
437 4311466 : ) -> (Vec<u8>, anyhow::Result<()>) {
438 4311466 : assert!(self.lsn_range.start <= lsn);
439 4311466 : let (val, res) = self.blob_writer.write_blob(val).await;
440 4311466 : let off = match res {
441 4311466 : Ok(off) => off,
442 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
443 : };
444 :
445 4311466 : let blob_ref = BlobRef::new(off, will_init);
446 4311466 :
447 4311466 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
448 4311466 : let res = self.tree.append(&delta_key.0, blob_ref.0);
449 4311466 : (val, res.map_err(|e| anyhow::anyhow!(e)))
450 4311466 : }
451 :
452 2009969 : fn size(&self) -> u64 {
453 2009969 : self.blob_writer.size() + self.tree.borrow_writer().size()
454 2009969 : }
455 :
456 : ///
457 : /// Finish writing the delta layer.
458 : ///
459 480 : async fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
460 480 : let index_start_blk =
461 480 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
462 :
463 480 : let mut file = self.blob_writer.into_inner().await?;
464 :
465 : // Write out the index
466 480 : let (index_root_blk, block_buf) = self.tree.finish()?;
467 480 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
468 0 : .await?;
469 9114 : for buf in block_buf.blocks {
470 8634 : let (_buf, res) = file.write_all(buf).await;
471 8634 : res?;
472 : }
473 480 : assert!(self.lsn_range.start < self.lsn_range.end);
474 : // Fill in the summary on blk 0
475 480 : let summary = Summary {
476 480 : magic: DELTA_FILE_MAGIC,
477 480 : format_version: STORAGE_FORMAT_VERSION,
478 480 : tenant_id: self.tenant_shard_id.tenant_id,
479 480 : timeline_id: self.timeline_id,
480 480 : key_range: self.key_start..key_end,
481 480 : lsn_range: self.lsn_range.clone(),
482 480 : index_start_blk,
483 480 : index_root_blk,
484 480 : };
485 480 :
486 480 : let mut buf = Vec::with_capacity(PAGE_SZ);
487 480 : // TODO: could use smallvec here but it's a pain with Slice<T>
488 480 : Summary::ser_into(&summary, &mut buf)?;
489 480 : file.seek(SeekFrom::Start(0)).await?;
490 480 : let (_buf, res) = file.write_all(buf).await;
491 480 : res?;
492 :
493 480 : let metadata = file
494 480 : .metadata()
495 0 : .await
496 480 : .context("get file metadata to determine size")?;
497 :
498 : // 5GB limit for objects without multipart upload (which we don't want to use)
499 : // Make it a little bit below to account for differing GB units
500 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
501 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
502 480 : ensure!(
503 480 : metadata.len() <= S3_UPLOAD_LIMIT,
504 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
505 0 : file.path,
506 0 : metadata.len()
507 : );
508 :
509 : // Note: Because we opened the file in write-only mode, we cannot
510 : // reuse the same VirtualFile for reading later. That's why we don't
511 : // set inner.file here. The first read will have to re-open it.
512 :
513 480 : let desc = PersistentLayerDesc::new_delta(
514 480 : self.tenant_shard_id,
515 480 : self.timeline_id,
516 480 : self.key_start..key_end,
517 480 : self.lsn_range.clone(),
518 480 : metadata.len(),
519 480 : );
520 480 :
521 480 : // fsync the file
522 480 : file.sync_all().await?;
523 :
524 480 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
525 :
526 0 : trace!("created delta layer {}", layer.local_path());
527 :
528 480 : Ok(layer)
529 480 : }
530 : }
531 :
532 : /// A builder object for constructing a new delta layer.
533 : ///
534 : /// Usage:
535 : ///
536 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
537 : ///
538 : /// 2. Write the contents by calling `put_value` for every page
539 : /// version to store in the layer.
540 : ///
541 : /// 3. Call `finish`.
542 : ///
543 : /// # Note
544 : ///
545 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
546 : /// possible for the writer to drop before `finish` is actually called. So this
547 : /// could lead to odd temporary files in the directory, exhausting file system.
548 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
549 : /// implementation that cleans up the temporary file in failure. It's not
550 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
551 : /// out some fields, making it impossible to implement `Drop`.
552 : ///
553 : #[must_use]
554 : pub struct DeltaLayerWriter {
555 : inner: Option<DeltaLayerWriterInner>,
556 : }
557 :
558 : impl DeltaLayerWriter {
559 : ///
560 : /// Start building a new delta layer.
561 : ///
562 480 : pub async fn new(
563 480 : conf: &'static PageServerConf,
564 480 : timeline_id: TimelineId,
565 480 : tenant_shard_id: TenantShardId,
566 480 : key_start: Key,
567 480 : lsn_range: Range<Lsn>,
568 480 : ) -> anyhow::Result<Self> {
569 480 : Ok(Self {
570 480 : inner: Some(
571 480 : DeltaLayerWriterInner::new(
572 480 : conf,
573 480 : timeline_id,
574 480 : tenant_shard_id,
575 480 : key_start,
576 480 : lsn_range,
577 480 : )
578 282 : .await?,
579 : ),
580 : })
581 480 : }
582 :
583 : ///
584 : /// Append a key-value pair to the file.
585 : ///
586 : /// The values must be appended in key, lsn order.
587 : ///
588 2102000 : pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
589 2102000 : self.inner.as_mut().unwrap().put_value(key, lsn, val).await
590 2102000 : }
591 :
592 2209466 : pub async fn put_value_bytes(
593 2209466 : &mut self,
594 2209466 : key: Key,
595 2209466 : lsn: Lsn,
596 2209466 : val: Vec<u8>,
597 2209466 : will_init: bool,
598 2209466 : ) -> (Vec<u8>, anyhow::Result<()>) {
599 2209466 : self.inner
600 2209466 : .as_mut()
601 2209466 : .unwrap()
602 2209466 : .put_value_bytes(key, lsn, val, will_init)
603 162 : .await
604 2209466 : }
605 :
606 2009969 : pub fn size(&self) -> u64 {
607 2009969 : self.inner.as_ref().unwrap().size()
608 2009969 : }
609 :
610 : ///
611 : /// Finish writing the delta layer.
612 : ///
613 480 : pub(crate) async fn finish(
614 480 : mut self,
615 480 : key_end: Key,
616 480 : timeline: &Arc<Timeline>,
617 480 : ) -> anyhow::Result<ResidentLayer> {
618 480 : let inner = self.inner.take().unwrap();
619 480 : let temp_path = inner.path.clone();
620 480 : let result = inner.finish(key_end, timeline).await;
621 : // The delta layer files can sometimes be really large. Clean them up.
622 480 : if result.is_err() {
623 0 : tracing::warn!(
624 0 : "Cleaning up temporary delta file {temp_path} after error during writing"
625 0 : );
626 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
627 0 : tracing::warn!("Error cleaning up temporary delta layer file {temp_path}: {e:?}")
628 0 : }
629 480 : }
630 480 : result
631 480 : }
632 : }
633 :
634 : impl Drop for DeltaLayerWriter {
635 480 : fn drop(&mut self) {
636 480 : if let Some(inner) = self.inner.take() {
637 0 : // We want to remove the virtual file here, so it's fine to not
638 0 : // having completely flushed unwritten data.
639 0 : let vfile = inner.blob_writer.into_inner_no_flush();
640 0 : vfile.remove();
641 480 : }
642 480 : }
643 : }
644 :
645 0 : #[derive(thiserror::Error, Debug)]
646 : pub enum RewriteSummaryError {
647 : #[error("magic mismatch")]
648 : MagicMismatch,
649 : #[error(transparent)]
650 : Other(#[from] anyhow::Error),
651 : }
652 :
653 : impl From<std::io::Error> for RewriteSummaryError {
654 0 : fn from(e: std::io::Error) -> Self {
655 0 : Self::Other(anyhow::anyhow!(e))
656 0 : }
657 : }
658 :
659 : impl DeltaLayer {
660 0 : pub async fn rewrite_summary<F>(
661 0 : path: &Utf8Path,
662 0 : rewrite: F,
663 0 : ctx: &RequestContext,
664 0 : ) -> Result<(), RewriteSummaryError>
665 0 : where
666 0 : F: Fn(Summary) -> Summary,
667 0 : {
668 0 : let file = VirtualFile::open_with_options(
669 0 : path,
670 0 : virtual_file::OpenOptions::new().read(true).write(true),
671 0 : )
672 0 : .await
673 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
674 0 : let file = FileBlockReader::new(file);
675 0 : let summary_blk = file.read_blk(0, ctx).await?;
676 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
677 0 : let mut file = file.file;
678 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
679 0 : return Err(RewriteSummaryError::MagicMismatch);
680 0 : }
681 0 :
682 0 : let new_summary = rewrite(actual_summary);
683 0 :
684 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
685 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
686 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
687 0 : file.seek(SeekFrom::Start(0)).await?;
688 0 : let (_buf, res) = file.write_all(buf).await;
689 0 : res?;
690 0 : Ok(())
691 0 : }
692 : }
693 :
694 : impl DeltaLayerInner {
695 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
696 : /// - inner has the success or transient failure
697 : /// - outer has the permanent failure
698 440 : pub(super) async fn load(
699 440 : path: &Utf8Path,
700 440 : summary: Option<Summary>,
701 440 : ctx: &RequestContext,
702 440 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
703 440 : let file = match VirtualFile::open(path).await {
704 440 : Ok(file) => file,
705 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
706 : };
707 440 : let file = FileBlockReader::new(file);
708 :
709 440 : let summary_blk = match file.read_blk(0, ctx).await {
710 440 : Ok(blk) => blk,
711 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
712 : };
713 :
714 : // TODO: this should be an assertion instead; see ImageLayerInner::load
715 440 : let actual_summary =
716 440 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
717 :
718 440 : if let Some(mut expected_summary) = summary {
719 : // production code path
720 440 : expected_summary.index_start_blk = actual_summary.index_start_blk;
721 440 : expected_summary.index_root_blk = actual_summary.index_root_blk;
722 440 : if actual_summary != expected_summary {
723 0 : bail!(
724 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
725 0 : actual_summary,
726 0 : expected_summary
727 0 : );
728 440 : }
729 0 : }
730 :
731 440 : Ok(Ok(DeltaLayerInner {
732 440 : file,
733 440 : index_start_blk: actual_summary.index_start_blk,
734 440 : index_root_blk: actual_summary.index_root_blk,
735 440 : }))
736 440 : }
737 :
738 123339 : pub(super) async fn get_value_reconstruct_data(
739 123339 : &self,
740 123339 : key: Key,
741 123339 : lsn_range: Range<Lsn>,
742 123339 : reconstruct_state: &mut ValueReconstructState,
743 123339 : ctx: &RequestContext,
744 123339 : ) -> anyhow::Result<ValueReconstructResult> {
745 123339 : let mut need_image = true;
746 123339 : // Scan the page versions backwards, starting from `lsn`.
747 123339 : let file = &self.file;
748 123339 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
749 123339 : self.index_start_blk,
750 123339 : self.index_root_blk,
751 123339 : file,
752 123339 : );
753 123339 : let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
754 123339 :
755 123339 : let mut offsets: Vec<(Lsn, u64)> = Vec::new();
756 123339 :
757 123339 : tree_reader
758 123339 : .visit(
759 123339 : &search_key.0,
760 123339 : VisitDirection::Backwards,
761 123339 : |key, value| {
762 118175 : let blob_ref = BlobRef(value);
763 118175 : if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
764 46124 : return false;
765 72051 : }
766 72051 : let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
767 72051 : if entry_lsn < lsn_range.start {
768 0 : return false;
769 72051 : }
770 72051 : offsets.push((entry_lsn, blob_ref.pos()));
771 72051 :
772 72051 : !blob_ref.will_init()
773 123339 : },
774 123339 : &RequestContextBuilder::extend(ctx)
775 123339 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
776 123339 : .build(),
777 123339 : )
778 17381 : .await?;
779 :
780 123339 : let ctx = &RequestContextBuilder::extend(ctx)
781 123339 : .page_content_kind(PageContentKind::DeltaLayerValue)
782 123339 : .build();
783 123339 :
784 123339 : // Ok, 'offsets' now contains the offsets of all the entries we need to read
785 123339 : let cursor = file.block_cursor();
786 123339 : let mut buf = Vec::new();
787 123339 : for (entry_lsn, pos) in offsets {
788 72051 : cursor
789 72051 : .read_blob_into_buf(pos, &mut buf, ctx)
790 5537 : .await
791 72051 : .with_context(|| {
792 0 : format!("Failed to read blob from virtual file {}", file.file.path)
793 72051 : })?;
794 72051 : let val = Value::des(&buf).with_context(|| {
795 0 : format!(
796 0 : "Failed to deserialize file blob from virtual file {}",
797 0 : file.file.path
798 0 : )
799 72051 : })?;
800 72051 : match val {
801 72051 : Value::Image(img) => {
802 72051 : reconstruct_state.img = Some((entry_lsn, img));
803 72051 : need_image = false;
804 72051 : break;
805 : }
806 0 : Value::WalRecord(rec) => {
807 0 : let will_init = rec.will_init();
808 0 : reconstruct_state.records.push((entry_lsn, rec));
809 0 : if will_init {
810 : // This WAL record initializes the page, so no need to go further back
811 0 : need_image = false;
812 0 : break;
813 0 : }
814 : }
815 : }
816 : }
817 :
818 : // If an older page image is needed to reconstruct the page, let the
819 : // caller know.
820 123339 : if need_image {
821 51288 : Ok(ValueReconstructResult::Continue)
822 : } else {
823 72051 : Ok(ValueReconstructResult::Complete)
824 : }
825 123339 : }
826 :
827 : // Look up the keys in the provided keyspace and update
828 : // the reconstruct state with whatever is found.
829 : //
830 : // If the key is cached, go no further than the cached Lsn.
831 : //
832 : // Currently, the index is visited for each range, but this
833 : // can be further optimised to visit the index only once.
834 10 : pub(super) async fn get_values_reconstruct_data(
835 10 : &self,
836 10 : keyspace: KeySpace,
837 10 : end_lsn: Lsn,
838 10 : reconstruct_state: &mut ValuesReconstructState,
839 10 : ctx: &RequestContext,
840 10 : ) -> Result<(), GetVectoredError> {
841 10 : let file = &self.file;
842 10 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
843 10 : self.index_start_blk,
844 10 : self.index_root_blk,
845 10 : file,
846 10 : );
847 10 :
848 10 : let mut offsets: BTreeMap<Key, Vec<(Lsn, u64)>> = BTreeMap::new();
849 :
850 10 : for range in keyspace.ranges.iter() {
851 10 : let mut ignore_key = None;
852 10 :
853 10 : // Scan the page versions backwards, starting from the last key in the range.
854 10 : // to collect all the offsets at which need to be read.
855 10 : let end_key = DeltaKey::from_key_lsn(&range.end, Lsn(end_lsn.0 - 1));
856 10 : tree_reader
857 10 : .visit(
858 10 : &end_key.0,
859 10 : VisitDirection::Backwards,
860 330 : |raw_key, value| {
861 330 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
862 330 : let entry_lsn = DeltaKey::extract_lsn_from_buf(raw_key);
863 330 :
864 330 : if entry_lsn >= end_lsn {
865 0 : return true;
866 330 : }
867 330 :
868 330 : if key < range.start {
869 0 : return false;
870 330 : }
871 330 :
872 330 : if key >= range.end {
873 10 : return true;
874 320 : }
875 320 :
876 320 : if Some(key) == ignore_key {
877 0 : return true;
878 320 : }
879 :
880 320 : if let Some(cached_lsn) = reconstruct_state.get_cached_lsn(&key) {
881 0 : if entry_lsn <= cached_lsn {
882 0 : return key != range.start;
883 0 : }
884 320 : }
885 :
886 320 : let blob_ref = BlobRef(value);
887 320 : let lsns_at = offsets.entry(key).or_default();
888 320 : lsns_at.push((entry_lsn, blob_ref.pos()));
889 320 :
890 320 : if blob_ref.will_init() {
891 320 : if key == range.start {
892 10 : return false;
893 : } else {
894 310 : ignore_key = Some(key);
895 310 : return true;
896 : }
897 0 : }
898 0 :
899 0 : true
900 330 : },
901 10 : &RequestContextBuilder::extend(ctx)
902 10 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
903 10 : .build(),
904 10 : )
905 10 : .await
906 10 : .map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
907 : }
908 :
909 10 : let ctx = &RequestContextBuilder::extend(ctx)
910 10 : .page_content_kind(PageContentKind::DeltaLayerValue)
911 10 : .build();
912 10 :
913 10 : let cursor = file.block_cursor();
914 10 : let mut buf = Vec::new();
915 330 : for (key, lsns_at) in offsets {
916 320 : for (lsn, block_offset) in lsns_at {
917 320 : let res = cursor.read_blob_into_buf(block_offset, &mut buf, ctx).await;
918 :
919 320 : if let Err(e) = res {
920 0 : reconstruct_state.on_key_error(
921 0 : key,
922 0 : PageReconstructError::from(anyhow!(e).context(format!(
923 0 : "Failed to read blob from virtual file {}",
924 0 : file.file.path
925 0 : ))),
926 0 : );
927 0 :
928 0 : break;
929 320 : }
930 320 :
931 320 : let value = Value::des(&buf);
932 320 : if let Err(e) = value {
933 0 : reconstruct_state.on_key_error(
934 0 : key,
935 0 : PageReconstructError::from(anyhow!(e).context(format!(
936 0 : "Failed to deserialize file blob from virtual file {}",
937 0 : file.file.path
938 0 : ))),
939 0 : );
940 0 :
941 0 : break;
942 320 : }
943 320 :
944 320 : let key_situation = reconstruct_state.update_key(&key, lsn, value.unwrap());
945 320 : if key_situation == ValueReconstructSituation::Complete {
946 320 : break;
947 0 : }
948 : }
949 : }
950 :
951 10 : Ok(())
952 10 : }
953 :
954 304 : pub(super) async fn load_keys<'a>(
955 304 : &'a self,
956 304 : ctx: &RequestContext,
957 304 : ) -> Result<Vec<DeltaEntry<'a>>> {
958 304 : let file = &self.file;
959 304 :
960 304 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
961 304 : self.index_start_blk,
962 304 : self.index_root_blk,
963 304 : file,
964 304 : );
965 304 :
966 304 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
967 304 :
968 304 : tree_reader
969 304 : .visit(
970 304 : &[0u8; DELTA_KEY_SIZE],
971 304 : VisitDirection::Forwards,
972 2102008 : |key, value| {
973 2102008 : let delta_key = DeltaKey::from_slice(key);
974 2102008 : let val_ref = ValueRef {
975 2102008 : blob_ref: BlobRef(value),
976 2102008 : reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
977 2102008 : Adapter(self),
978 2102008 : )),
979 2102008 : };
980 2102008 : let pos = BlobRef(value).pos();
981 2102008 : if let Some(last) = all_keys.last_mut() {
982 2101704 : // subtract offset of the current and last entries to get the size
983 2101704 : // of the value associated with this (key, lsn) tuple
984 2101704 : let first_pos = last.size;
985 2101704 : last.size = pos - first_pos;
986 2101704 : }
987 2102008 : let entry = DeltaEntry {
988 2102008 : key: delta_key.key(),
989 2102008 : lsn: delta_key.lsn(),
990 2102008 : size: pos,
991 2102008 : val: val_ref,
992 2102008 : };
993 2102008 : all_keys.push(entry);
994 2102008 : true
995 2102008 : },
996 304 : &RequestContextBuilder::extend(ctx)
997 304 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
998 304 : .build(),
999 304 : )
1000 2212 : .await?;
1001 304 : if let Some(last) = all_keys.last_mut() {
1002 304 : // Last key occupies all space till end of value storage,
1003 304 : // which corresponds to beginning of the index
1004 304 : last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
1005 304 : }
1006 304 : Ok(all_keys)
1007 304 : }
1008 :
1009 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1010 4 : println!(
1011 4 : "index_start_blk: {}, root {}",
1012 4 : self.index_start_blk, self.index_root_blk
1013 4 : );
1014 4 :
1015 4 : let file = &self.file;
1016 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1017 4 : self.index_start_blk,
1018 4 : self.index_root_blk,
1019 4 : file,
1020 4 : );
1021 4 :
1022 4 : tree_reader.dump().await?;
1023 :
1024 4 : let keys = self.load_keys(ctx).await?;
1025 :
1026 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1027 8 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1028 8 : let val = Value::des(&buf)?;
1029 8 : let desc = match val {
1030 8 : Value::Image(img) => {
1031 8 : format!(" img {} bytes", img.len())
1032 : }
1033 0 : Value::WalRecord(rec) => {
1034 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
1035 0 : format!(
1036 0 : " rec {} bytes will_init: {} {}",
1037 0 : buf.len(),
1038 0 : rec.will_init(),
1039 0 : wal_desc
1040 0 : )
1041 : }
1042 : };
1043 8 : Ok(desc)
1044 8 : }
1045 :
1046 12 : for entry in keys {
1047 8 : let DeltaEntry { key, lsn, val, .. } = entry;
1048 8 : let desc = match dump_blob(&val, ctx).await {
1049 8 : Ok(desc) => desc,
1050 0 : Err(err) => {
1051 0 : format!("ERROR: {err}")
1052 : }
1053 : };
1054 8 : println!(" key {key} at {lsn}: {desc}");
1055 8 :
1056 8 : // Print more details about CHECKPOINT records. Would be nice to print details
1057 8 : // of many other record types too, but these are particularly interesting, as
1058 8 : // have a lot of special processing for them in walingest.rs.
1059 8 : use pageserver_api::key::CHECKPOINT_KEY;
1060 8 : use postgres_ffi::CheckPoint;
1061 8 : if key == CHECKPOINT_KEY {
1062 0 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1063 0 : let val = Value::des(&buf)?;
1064 0 : match val {
1065 0 : Value::Image(img) => {
1066 0 : let checkpoint = CheckPoint::decode(&img)?;
1067 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1068 : }
1069 0 : Value::WalRecord(_rec) => {
1070 0 : println!(" unexpected walrecord value for checkpoint key");
1071 0 : }
1072 : }
1073 8 : }
1074 : }
1075 :
1076 4 : Ok(())
1077 4 : }
1078 : }
1079 :
1080 : /// A set of data associated with a delta layer key and its value
1081 : pub struct DeltaEntry<'a> {
1082 : pub key: Key,
1083 : pub lsn: Lsn,
1084 : /// Size of the stored value
1085 : pub size: u64,
1086 : /// Reference to the on-disk value
1087 : pub val: ValueRef<'a>,
1088 : }
1089 :
1090 : /// Reference to an on-disk value
1091 : pub struct ValueRef<'a> {
1092 : blob_ref: BlobRef,
1093 : reader: BlockCursor<'a>,
1094 : }
1095 :
1096 : impl<'a> ValueRef<'a> {
1097 : /// Loads the value from disk
1098 2102000 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1099 : // theoretically we *could* record an access time for each, but it does not really matter
1100 2102000 : let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
1101 2102000 : let val = Value::des(&buf)?;
1102 2102000 : Ok(val)
1103 2102000 : }
1104 : }
1105 :
1106 : pub(crate) struct Adapter<T>(T);
1107 :
1108 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1109 2121326 : pub(crate) async fn read_blk(
1110 2121326 : &self,
1111 2121326 : blknum: u32,
1112 2121326 : ctx: &RequestContext,
1113 2121326 : ) -> Result<BlockLease, std::io::Error> {
1114 2121326 : self.0.as_ref().file.read_blk(blknum, ctx).await
1115 2121326 : }
1116 : }
1117 :
1118 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1119 2121326 : fn as_ref(&self) -> &DeltaLayerInner {
1120 2121326 : self
1121 2121326 : }
1122 : }
|