Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "index", and
24 : //! "values". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::PAGE_SZ;
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
37 : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
38 : use crate::tenant::Timeline;
39 : use crate::virtual_file::{self, VirtualFile};
40 : use crate::{walrecord, TEMP_FILE_SUFFIX};
41 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
42 : use anyhow::{bail, ensure, Context, Result};
43 : use camino::{Utf8Path, Utf8PathBuf};
44 : use pageserver_api::models::LayerAccessKind;
45 : use pageserver_api::shard::TenantShardId;
46 : use rand::{distributions::Alphanumeric, Rng};
47 : use serde::{Deserialize, Serialize};
48 : use std::fs::File;
49 : use std::io::SeekFrom;
50 : use std::ops::Range;
51 : use std::os::unix::fs::FileExt;
52 : use std::sync::Arc;
53 : use tokio::sync::OnceCell;
54 : use tracing::*;
55 :
56 : use utils::{
57 : bin_ser::BeSer,
58 : id::{TenantId, TimelineId},
59 : lsn::Lsn,
60 : };
61 :
62 : use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
63 :
64 : ///
65 : /// Header stored in the beginning of the file
66 : ///
67 : /// After this comes the 'values' part, starting on block 1. After that,
68 : /// the 'index' starts at the block indicated by 'index_start_blk'
69 : ///
70 15893 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
71 : pub struct Summary {
72 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
73 : pub magic: u16,
74 : pub format_version: u16,
75 :
76 : pub tenant_id: TenantId,
77 : pub timeline_id: TimelineId,
78 : pub key_range: Range<Key>,
79 : pub lsn_range: Range<Lsn>,
80 :
81 : /// Block number where the 'index' part of the file begins.
82 : pub index_start_blk: u32,
83 : /// Block within the 'index', where the B-tree root page is stored
84 : pub index_root_blk: u32,
85 : }
86 :
87 : impl From<&DeltaLayer> for Summary {
88 0 : fn from(layer: &DeltaLayer) -> Self {
89 0 : Self::expected(
90 0 : layer.desc.tenant_shard_id.tenant_id,
91 0 : layer.desc.timeline_id,
92 0 : layer.desc.key_range.clone(),
93 0 : layer.desc.lsn_range.clone(),
94 0 : )
95 0 : }
96 : }
97 :
98 : impl Summary {
99 12147 : pub(super) fn expected(
100 12147 : tenant_id: TenantId,
101 12147 : timeline_id: TimelineId,
102 12147 : keys: Range<Key>,
103 12147 : lsns: Range<Lsn>,
104 12147 : ) -> Self {
105 12147 : Self {
106 12147 : magic: DELTA_FILE_MAGIC,
107 12147 : format_version: STORAGE_FORMAT_VERSION,
108 12147 :
109 12147 : tenant_id,
110 12147 : timeline_id,
111 12147 : key_range: keys,
112 12147 : lsn_range: lsns,
113 12147 :
114 12147 : index_start_blk: 0,
115 12147 : index_root_blk: 0,
116 12147 : }
117 12147 : }
118 : }
119 :
120 : // Flag indicating that this version initialize the page
121 : const WILL_INIT: u64 = 1;
122 :
123 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
124 : /// offset, and for WAL records it also contains `will_init` flag. The flag
125 : /// helps to determine the range of records that needs to be applied, without
126 : /// reading/deserializing records themselves.
127 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
128 : pub struct BlobRef(pub u64);
129 :
130 : impl BlobRef {
131 49126552 : pub fn will_init(&self) -> bool {
132 49126552 : (self.0 & WILL_INIT) != 0
133 49126552 : }
134 :
135 83723293 : pub fn pos(&self) -> u64 {
136 83723293 : self.0 >> 1
137 83723293 : }
138 :
139 52098927 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
140 52098927 : let mut blob_ref = pos << 1;
141 52098927 : if will_init {
142 9289693 : blob_ref |= WILL_INIT;
143 42809234 : }
144 52098927 : BlobRef(blob_ref)
145 52098927 : }
146 : }
147 :
148 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
149 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
150 :
151 : /// This is the key of the B-tree index stored in the delta layer. It consists
152 : /// of the serialized representation of a Key and LSN.
153 : impl DeltaKey {
154 17304147 : fn from_slice(buf: &[u8]) -> Self {
155 17304147 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
156 17304147 : bytes.copy_from_slice(buf);
157 17304147 : DeltaKey(bytes)
158 17304147 : }
159 :
160 68493871 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
161 68493871 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
162 68493871 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
163 68493871 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
164 68493871 : DeltaKey(bytes)
165 68493871 : }
166 :
167 17304147 : fn key(&self) -> Key {
168 17304147 : Key::from_slice(&self.0)
169 17304147 : }
170 :
171 17304147 : fn lsn(&self) -> Lsn {
172 17304147 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
173 17304147 : }
174 :
175 49585494 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
176 49585494 : let mut lsn_buf = [0u8; 8];
177 49585494 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
178 49585494 : Lsn(u64::from_be_bytes(lsn_buf))
179 49585494 : }
180 : }
181 :
182 : /// This is used only from `pagectl`. Within pageserver, all layers are
183 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
184 : pub struct DeltaLayer {
185 : path: Utf8PathBuf,
186 : pub desc: PersistentLayerDesc,
187 : access_stats: LayerAccessStats,
188 : inner: OnceCell<Arc<DeltaLayerInner>>,
189 : }
190 :
191 : impl std::fmt::Debug for DeltaLayer {
192 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 0 : use super::RangeDisplayDebug;
194 0 :
195 0 : f.debug_struct("DeltaLayer")
196 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
197 0 : .field("lsn_range", &self.desc.lsn_range)
198 0 : .field("file_size", &self.desc.file_size)
199 0 : .field("inner", &self.inner)
200 0 : .finish()
201 0 : }
202 : }
203 :
204 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
205 : /// file.
206 : pub struct DeltaLayerInner {
207 : // values copied from summary
208 : index_start_blk: u32,
209 : index_root_blk: u32,
210 :
211 : /// Reader object for reading blocks from the file.
212 : file: FileBlockReader,
213 : }
214 :
215 : impl std::fmt::Debug for DeltaLayerInner {
216 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217 0 : f.debug_struct("DeltaLayerInner")
218 0 : .field("index_start_blk", &self.index_start_blk)
219 0 : .field("index_root_blk", &self.index_root_blk)
220 0 : .finish()
221 0 : }
222 : }
223 :
224 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
225 : impl std::fmt::Display for DeltaLayer {
226 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 0 : write!(f, "{}", self.layer_desc().short_id())
228 0 : }
229 : }
230 :
231 : impl AsLayerDesc for DeltaLayer {
232 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
233 0 : &self.desc
234 0 : }
235 : }
236 :
237 : impl DeltaLayer {
238 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
239 0 : self.desc.dump();
240 0 :
241 0 : if !verbose {
242 0 : return Ok(());
243 0 : }
244 :
245 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
246 :
247 0 : inner.dump(ctx).await
248 0 : }
249 :
250 15895 : fn temp_path_for(
251 15895 : conf: &PageServerConf,
252 15895 : tenant_shard_id: &TenantShardId,
253 15895 : timeline_id: &TimelineId,
254 15895 : key_start: Key,
255 15895 : lsn_range: &Range<Lsn>,
256 15895 : ) -> Utf8PathBuf {
257 15895 : let rand_string: String = rand::thread_rng()
258 15895 : .sample_iter(&Alphanumeric)
259 15895 : .take(8)
260 15895 : .map(char::from)
261 15895 : .collect();
262 15895 :
263 15895 : conf.timeline_path(tenant_shard_id, timeline_id)
264 15895 : .join(format!(
265 15895 : "{}-XXX__{:016X}-{:016X}.{}.{}",
266 15895 : key_start,
267 15895 : u64::from(lsn_range.start),
268 15895 : u64::from(lsn_range.end),
269 15895 : rand_string,
270 15895 : TEMP_FILE_SUFFIX,
271 15895 : ))
272 15895 : }
273 :
274 : ///
275 : /// Open the underlying file and read the metadata into memory, if it's
276 : /// not loaded already.
277 : ///
278 0 : async fn load(
279 0 : &self,
280 0 : access_kind: LayerAccessKind,
281 0 : ctx: &RequestContext,
282 0 : ) -> Result<&Arc<DeltaLayerInner>> {
283 0 : self.access_stats.record_access(access_kind, ctx);
284 0 : // Quick exit if already loaded
285 0 : self.inner
286 0 : .get_or_try_init(|| self.load_inner(ctx))
287 0 : .await
288 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
289 0 : }
290 :
291 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
292 0 : let path = self.path();
293 :
294 0 : let loaded = DeltaLayerInner::load(&path, None, ctx)
295 0 : .await
296 0 : .and_then(|res| res)?;
297 :
298 : // not production code
299 0 : let actual_filename = path.file_name().unwrap().to_owned();
300 0 : let expected_filename = self.layer_desc().filename().file_name();
301 0 :
302 0 : if actual_filename != expected_filename {
303 0 : println!("warning: filename does not match what is expected from in-file summary");
304 0 : println!("actual: {:?}", actual_filename);
305 0 : println!("expected: {:?}", expected_filename);
306 0 : }
307 :
308 0 : Ok(Arc::new(loaded))
309 0 : }
310 :
311 : /// Create a DeltaLayer struct representing an existing file on disk.
312 : ///
313 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
314 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
315 0 : let mut summary_buf = vec![0; PAGE_SZ];
316 0 : file.read_exact_at(&mut summary_buf, 0)?;
317 0 : let summary = Summary::des_prefix(&summary_buf)?;
318 :
319 0 : let metadata = file
320 0 : .metadata()
321 0 : .context("get file metadata to determine size")?;
322 :
323 : // This function is never used for constructing layers in a running pageserver,
324 : // so it does not need an accurate TenantShardId.
325 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
326 0 :
327 0 : Ok(DeltaLayer {
328 0 : path: path.to_path_buf(),
329 0 : desc: PersistentLayerDesc::new_delta(
330 0 : tenant_shard_id,
331 0 : summary.timeline_id,
332 0 : summary.key_range,
333 0 : summary.lsn_range,
334 0 : metadata.len(),
335 0 : ),
336 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
337 0 : inner: OnceCell::new(),
338 0 : })
339 0 : }
340 :
341 : /// Path to the layer file in pageserver workdir.
342 0 : fn path(&self) -> Utf8PathBuf {
343 0 : self.path.clone()
344 0 : }
345 : }
346 :
347 : /// A builder object for constructing a new delta layer.
348 : ///
349 : /// Usage:
350 : ///
351 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
352 : ///
353 : /// 2. Write the contents by calling `put_value` for every page
354 : /// version to store in the layer.
355 : ///
356 : /// 3. Call `finish`.
357 : ///
358 : struct DeltaLayerWriterInner {
359 : conf: &'static PageServerConf,
360 : pub path: Utf8PathBuf,
361 : timeline_id: TimelineId,
362 : tenant_shard_id: TenantShardId,
363 :
364 : key_start: Key,
365 : lsn_range: Range<Lsn>,
366 :
367 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
368 :
369 : blob_writer: BlobWriter<true>,
370 : }
371 :
372 : impl DeltaLayerWriterInner {
373 : ///
374 : /// Start building a new delta layer.
375 : ///
376 15895 : async fn new(
377 15895 : conf: &'static PageServerConf,
378 15895 : timeline_id: TimelineId,
379 15895 : tenant_shard_id: TenantShardId,
380 15895 : key_start: Key,
381 15895 : lsn_range: Range<Lsn>,
382 15895 : ) -> anyhow::Result<Self> {
383 15895 : // Create the file initially with a temporary filename. We don't know
384 15895 : // the end key yet, so we cannot form the final filename yet. We will
385 15895 : // rename it when we're done.
386 15895 : //
387 15895 : // Note: This overwrites any existing file. There shouldn't be any.
388 15895 : // FIXME: throw an error instead?
389 15895 : let path =
390 15895 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
391 :
392 15895 : let mut file = VirtualFile::create(&path).await?;
393 : // make room for the header block
394 15895 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
395 15895 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
396 15895 :
397 15895 : // Initialize the b-tree index builder
398 15895 : let block_buf = BlockBuf::new();
399 15895 : let tree_builder = DiskBtreeBuilder::new(block_buf);
400 15895 :
401 15895 : Ok(Self {
402 15895 : conf,
403 15895 : path,
404 15895 : timeline_id,
405 15895 : tenant_shard_id,
406 15895 : key_start,
407 15895 : lsn_range,
408 15895 : tree: tree_builder,
409 15895 : blob_writer,
410 15895 : })
411 15895 : }
412 :
413 : ///
414 : /// Append a key-value pair to the file.
415 : ///
416 : /// The values must be appended in key, lsn order.
417 : ///
418 17292586 : async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
419 17292586 : let (_, res) = self
420 17292586 : .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init())
421 20553 : .await;
422 17292586 : res
423 17292586 : }
424 :
425 52098927 : async fn put_value_bytes(
426 52098927 : &mut self,
427 52098927 : key: Key,
428 52098927 : lsn: Lsn,
429 52098927 : val: Vec<u8>,
430 52098927 : will_init: bool,
431 52098927 : ) -> (Vec<u8>, anyhow::Result<()>) {
432 52098906 : assert!(self.lsn_range.start <= lsn);
433 52098906 : let (val, res) = self.blob_writer.write_blob(val).await;
434 52098905 : let off = match res {
435 52098905 : Ok(off) => off,
436 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
437 : };
438 :
439 52098905 : let blob_ref = BlobRef::new(off, will_init);
440 52098905 :
441 52098905 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
442 52098905 : let res = self.tree.append(&delta_key.0, blob_ref.0);
443 52098905 : (val, res.map_err(|e| anyhow::anyhow!(e)))
444 52098905 : }
445 :
446 2280229 : fn size(&self) -> u64 {
447 2280229 : self.blob_writer.size() + self.tree.borrow_writer().size()
448 2280229 : }
449 :
450 : ///
451 : /// Finish writing the delta layer.
452 : ///
453 15893 : async fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
454 15893 : let index_start_blk =
455 15893 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
456 :
457 15893 : let mut file = self.blob_writer.into_inner().await?;
458 :
459 : // Write out the index
460 15893 : let (index_root_blk, block_buf) = self.tree.finish()?;
461 15893 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
462 0 : .await?;
463 130570 : for buf in block_buf.blocks {
464 114677 : file.write_all(buf.as_ref()).await?;
465 : }
466 15893 : assert!(self.lsn_range.start < self.lsn_range.end);
467 : // Fill in the summary on blk 0
468 15893 : let summary = Summary {
469 15893 : magic: DELTA_FILE_MAGIC,
470 15893 : format_version: STORAGE_FORMAT_VERSION,
471 15893 : tenant_id: self.tenant_shard_id.tenant_id,
472 15893 : timeline_id: self.timeline_id,
473 15893 : key_range: self.key_start..key_end,
474 15893 : lsn_range: self.lsn_range.clone(),
475 15893 : index_start_blk,
476 15893 : index_root_blk,
477 15893 : };
478 15893 :
479 15893 : let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
480 15893 : Summary::ser_into(&summary, &mut buf)?;
481 15893 : if buf.spilled() {
482 : // This is bad as we only have one free block for the summary
483 0 : warn!(
484 0 : "Used more than one page size for summary buffer: {}",
485 0 : buf.len()
486 0 : );
487 15893 : }
488 15893 : file.seek(SeekFrom::Start(0)).await?;
489 15893 : file.write_all(&buf).await?;
490 :
491 15893 : let metadata = file
492 15893 : .metadata()
493 225 : .await
494 15893 : .context("get file metadata to determine size")?;
495 :
496 : // 5GB limit for objects without multipart upload (which we don't want to use)
497 : // Make it a little bit below to account for differing GB units
498 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
499 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
500 15893 : ensure!(
501 15893 : metadata.len() <= S3_UPLOAD_LIMIT,
502 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
503 0 : file.path,
504 0 : metadata.len()
505 : );
506 :
507 : // Note: Because we opened the file in write-only mode, we cannot
508 : // reuse the same VirtualFile for reading later. That's why we don't
509 : // set inner.file here. The first read will have to re-open it.
510 :
511 15893 : let desc = PersistentLayerDesc::new_delta(
512 15893 : self.tenant_shard_id,
513 15893 : self.timeline_id,
514 15893 : self.key_start..key_end,
515 15893 : self.lsn_range.clone(),
516 15893 : metadata.len(),
517 15893 : );
518 15893 :
519 15893 : // fsync the file
520 15893 : file.sync_all().await?;
521 :
522 15893 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
523 :
524 0 : trace!("created delta layer {}", layer.local_path());
525 :
526 15893 : Ok(layer)
527 15893 : }
528 : }
529 :
530 : /// A builder object for constructing a new delta layer.
531 : ///
532 : /// Usage:
533 : ///
534 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
535 : ///
536 : /// 2. Write the contents by calling `put_value` for every page
537 : /// version to store in the layer.
538 : ///
539 : /// 3. Call `finish`.
540 : ///
541 : /// # Note
542 : ///
543 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
544 : /// possible for the writer to drop before `finish` is actually called. So this
545 : /// could lead to odd temporary files in the directory, exhausting file system.
546 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
547 : /// implementation that cleans up the temporary file in failure. It's not
548 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
549 : /// out some fields, making it impossible to implement `Drop`.
550 : ///
551 : #[must_use]
552 : pub struct DeltaLayerWriter {
553 : inner: Option<DeltaLayerWriterInner>,
554 : }
555 :
556 : impl DeltaLayerWriter {
557 : ///
558 : /// Start building a new delta layer.
559 : ///
560 15895 : pub async fn new(
561 15895 : conf: &'static PageServerConf,
562 15895 : timeline_id: TimelineId,
563 15895 : tenant_shard_id: TenantShardId,
564 15895 : key_start: Key,
565 15895 : lsn_range: Range<Lsn>,
566 15895 : ) -> anyhow::Result<Self> {
567 15895 : Ok(Self {
568 15895 : inner: Some(
569 15895 : DeltaLayerWriterInner::new(
570 15895 : conf,
571 15895 : timeline_id,
572 15895 : tenant_shard_id,
573 15895 : key_start,
574 15895 : lsn_range,
575 15895 : )
576 217 : .await?,
577 : ),
578 : })
579 15895 : }
580 :
581 : ///
582 : /// Append a key-value pair to the file.
583 : ///
584 : /// The values must be appended in key, lsn order.
585 : ///
586 17292586 : pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
587 17292586 : self.inner.as_mut().unwrap().put_value(key, lsn, val).await
588 17292586 : }
589 :
590 34806341 : pub async fn put_value_bytes(
591 34806341 : &mut self,
592 34806341 : key: Key,
593 34806341 : lsn: Lsn,
594 34806341 : val: Vec<u8>,
595 34806341 : will_init: bool,
596 34806341 : ) -> (Vec<u8>, anyhow::Result<()>) {
597 34806320 : self.inner
598 34806320 : .as_mut()
599 34806320 : .unwrap()
600 34806320 : .put_value_bytes(key, lsn, val, will_init)
601 48280 : .await
602 34806320 : }
603 :
604 2280229 : pub fn size(&self) -> u64 {
605 2280229 : self.inner.as_ref().unwrap().size()
606 2280229 : }
607 :
608 : ///
609 : /// Finish writing the delta layer.
610 : ///
611 15893 : pub(crate) async fn finish(
612 15893 : mut self,
613 15893 : key_end: Key,
614 15893 : timeline: &Arc<Timeline>,
615 15893 : ) -> anyhow::Result<ResidentLayer> {
616 15893 : let inner = self.inner.take().unwrap();
617 15893 : let temp_path = inner.path.clone();
618 15893 : let result = inner.finish(key_end, timeline).await;
619 : // The delta layer files can sometimes be really large. Clean them up.
620 15893 : if result.is_err() {
621 0 : tracing::warn!(
622 0 : "Cleaning up temporary delta file {temp_path} after error during writing"
623 0 : );
624 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
625 0 : tracing::warn!("Error cleaning up temporary delta layer file {temp_path}: {e:?}")
626 0 : }
627 15893 : }
628 15893 : result
629 15893 : }
630 : }
631 :
632 : impl Drop for DeltaLayerWriter {
633 15893 : fn drop(&mut self) {
634 15893 : if let Some(inner) = self.inner.take() {
635 0 : // We want to remove the virtual file here, so it's fine to not
636 0 : // having completely flushed unwritten data.
637 0 : let vfile = inner.blob_writer.into_inner_no_flush();
638 0 : vfile.remove();
639 15893 : }
640 15893 : }
641 : }
642 :
643 0 : #[derive(thiserror::Error, Debug)]
644 : pub enum RewriteSummaryError {
645 : #[error("magic mismatch")]
646 : MagicMismatch,
647 : #[error(transparent)]
648 : Other(#[from] anyhow::Error),
649 : }
650 :
651 : impl From<std::io::Error> for RewriteSummaryError {
652 0 : fn from(e: std::io::Error) -> Self {
653 0 : Self::Other(anyhow::anyhow!(e))
654 0 : }
655 : }
656 :
657 : impl DeltaLayer {
658 0 : pub async fn rewrite_summary<F>(
659 0 : path: &Utf8Path,
660 0 : rewrite: F,
661 0 : ctx: &RequestContext,
662 0 : ) -> Result<(), RewriteSummaryError>
663 0 : where
664 0 : F: Fn(Summary) -> Summary,
665 0 : {
666 0 : let file = VirtualFile::open_with_options(
667 0 : path,
668 0 : virtual_file::OpenOptions::new().read(true).write(true),
669 0 : )
670 0 : .await
671 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
672 0 : let file = FileBlockReader::new(file);
673 0 : let summary_blk = file.read_blk(0, ctx).await?;
674 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
675 0 : let mut file = file.file;
676 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
677 0 : return Err(RewriteSummaryError::MagicMismatch);
678 0 : }
679 0 :
680 0 : let new_summary = rewrite(actual_summary);
681 0 :
682 0 : let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
683 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
684 0 : if buf.spilled() {
685 : // The code in DeltaLayerWriterInner just warn!()s for this.
686 : // It should probably error out as well.
687 0 : return Err(RewriteSummaryError::Other(anyhow::anyhow!(
688 0 : "Used more than one page size for summary buffer: {}",
689 0 : buf.len()
690 0 : )));
691 0 : }
692 0 : file.seek(SeekFrom::Start(0)).await?;
693 0 : file.write_all(&buf).await?;
694 0 : Ok(())
695 0 : }
696 : }
697 :
698 : impl DeltaLayerInner {
699 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
700 : /// - inner has the success or transient failure
701 : /// - outer has the permanent failure
702 12147 : pub(super) async fn load(
703 12147 : path: &Utf8Path,
704 12147 : summary: Option<Summary>,
705 12147 : ctx: &RequestContext,
706 12147 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
707 12147 : let file = match VirtualFile::open(path).await {
708 12147 : Ok(file) => file,
709 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
710 : };
711 12147 : let file = FileBlockReader::new(file);
712 :
713 12147 : let summary_blk = match file.read_blk(0, ctx).await {
714 12147 : Ok(blk) => blk,
715 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
716 : };
717 :
718 : // TODO: this should be an assertion instead; see ImageLayerInner::load
719 12147 : let actual_summary =
720 12147 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
721 :
722 12147 : if let Some(mut expected_summary) = summary {
723 : // production code path
724 12147 : expected_summary.index_start_blk = actual_summary.index_start_blk;
725 12147 : expected_summary.index_root_blk = actual_summary.index_root_blk;
726 12147 : if actual_summary != expected_summary {
727 1 : bail!(
728 1 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
729 1 : actual_summary,
730 1 : expected_summary
731 1 : );
732 12146 : }
733 0 : }
734 :
735 12146 : Ok(Ok(DeltaLayerInner {
736 12146 : file,
737 12146 : index_start_blk: actual_summary.index_start_blk,
738 12146 : index_root_blk: actual_summary.index_root_blk,
739 12146 : }))
740 12147 : }
741 :
742 16394944 : pub(super) async fn get_value_reconstruct_data(
743 16394944 : &self,
744 16394944 : key: Key,
745 16394944 : lsn_range: Range<Lsn>,
746 16394944 : reconstruct_state: &mut ValueReconstructState,
747 16394944 : ctx: &RequestContext,
748 16394944 : ) -> anyhow::Result<ValueReconstructResult> {
749 16394915 : let mut need_image = true;
750 16394915 : // Scan the page versions backwards, starting from `lsn`.
751 16394915 : let file = &self.file;
752 16394915 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
753 16394915 : self.index_start_blk,
754 16394915 : self.index_root_blk,
755 16394915 : file,
756 16394915 : );
757 16394915 : let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
758 16394915 :
759 16394915 : let mut offsets: Vec<(Lsn, u64)> = Vec::new();
760 16394915 :
761 16394915 : tree_reader
762 16394915 : .visit(
763 16394915 : &search_key.0,
764 16394915 : VisitDirection::Backwards,
765 52241249 : |key, value| {
766 52241249 : let blob_ref = BlobRef(value);
767 52241249 : if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
768 2656889 : return false;
769 49584360 : }
770 49584360 : let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
771 49584360 : if entry_lsn < lsn_range.start {
772 458942 : return false;
773 49125417 : }
774 49125417 : offsets.push((entry_lsn, blob_ref.pos()));
775 49125417 :
776 49125417 : !blob_ref.will_init()
777 52241248 : },
778 16394915 : &RequestContextBuilder::extend(ctx)
779 16394915 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
780 16394915 : .build(),
781 16394915 : )
782 211694 : .await?;
783 :
784 16394914 : let ctx = &RequestContextBuilder::extend(ctx)
785 16394914 : .page_content_kind(PageContentKind::DeltaLayerValue)
786 16394914 : .build();
787 16394914 :
788 16394914 : // Ok, 'offsets' now contains the offsets of all the entries we need to read
789 16394914 : let cursor = file.block_cursor();
790 16394914 : let mut buf = Vec::new();
791 62485233 : for (entry_lsn, pos) in offsets {
792 49121438 : cursor
793 49121438 : .read_blob_into_buf(pos, &mut buf, ctx)
794 746117 : .await
795 49121436 : .with_context(|| {
796 0 : format!("Failed to read blob from virtual file {}", file.file.path)
797 49121436 : })?;
798 49121436 : let val = Value::des(&buf).with_context(|| {
799 0 : format!(
800 0 : "Failed to deserialize file blob from virtual file {}",
801 0 : file.file.path
802 0 : )
803 49121436 : })?;
804 49121436 : match val {
805 1740816 : Value::Image(img) => {
806 1740816 : reconstruct_state.img = Some((entry_lsn, img));
807 1740816 : need_image = false;
808 1740816 : break;
809 : }
810 47380620 : Value::WalRecord(rec) => {
811 47380620 : let will_init = rec.will_init();
812 47380620 : reconstruct_state.records.push((entry_lsn, rec));
813 47380620 : if will_init {
814 : // This WAL record initializes the page, so no need to go further back
815 1290301 : need_image = false;
816 1290301 : break;
817 46090319 : }
818 : }
819 : }
820 : }
821 :
822 : // If an older page image is needed to reconstruct the page, let the
823 : // caller know.
824 16394912 : if need_image {
825 13363795 : Ok(ValueReconstructResult::Continue)
826 : } else {
827 3031116 : Ok(ValueReconstructResult::Complete)
828 : }
829 16394911 : }
830 :
831 4152 : pub(super) async fn load_keys<'a>(
832 4152 : &'a self,
833 4152 : ctx: &RequestContext,
834 4152 : ) -> Result<Vec<DeltaEntry<'a>>> {
835 4152 : let file = &self.file;
836 4152 :
837 4152 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
838 4152 : self.index_start_blk,
839 4152 : self.index_root_blk,
840 4152 : file,
841 4152 : );
842 4152 :
843 4152 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
844 4152 :
845 4152 : tree_reader
846 4152 : .visit(
847 4152 : &[0u8; DELTA_KEY_SIZE],
848 4152 : VisitDirection::Forwards,
849 17304147 : |key, value| {
850 17304147 : let delta_key = DeltaKey::from_slice(key);
851 17304147 : let val_ref = ValueRef {
852 17304147 : blob_ref: BlobRef(value),
853 17304147 : reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
854 17304147 : Adapter(self),
855 17304147 : )),
856 17304147 : };
857 17304147 : let pos = BlobRef(value).pos();
858 17304147 : if let Some(last) = all_keys.last_mut() {
859 17299995 : // subtract offset of the current and last entries to get the size
860 17299995 : // of the value associated with this (key, lsn) tuple
861 17299995 : let first_pos = last.size;
862 17299995 : last.size = pos - first_pos;
863 17299995 : }
864 17304147 : let entry = DeltaEntry {
865 17304147 : key: delta_key.key(),
866 17304147 : lsn: delta_key.lsn(),
867 17304147 : size: pos,
868 17304147 : val: val_ref,
869 17304147 : };
870 17304147 : all_keys.push(entry);
871 17304147 : true
872 17304147 : },
873 4152 : &RequestContextBuilder::extend(ctx)
874 4152 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
875 4152 : .build(),
876 4152 : )
877 2304 : .await?;
878 4152 : if let Some(last) = all_keys.last_mut() {
879 4152 : // Last key occupies all space till end of value storage,
880 4152 : // which corresponds to beginning of the index
881 4152 : last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
882 4152 : }
883 4152 : Ok(all_keys)
884 4152 : }
885 :
886 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
887 4 : println!(
888 4 : "index_start_blk: {}, root {}",
889 4 : self.index_start_blk, self.index_root_blk
890 4 : );
891 4 :
892 4 : let file = &self.file;
893 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
894 4 : self.index_start_blk,
895 4 : self.index_root_blk,
896 4 : file,
897 4 : );
898 4 :
899 4 : tree_reader.dump().await?;
900 :
901 4 : let keys = self.load_keys(ctx).await?;
902 :
903 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
904 8 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
905 8 : let val = Value::des(&buf)?;
906 8 : let desc = match val {
907 8 : Value::Image(img) => {
908 8 : format!(" img {} bytes", img.len())
909 : }
910 0 : Value::WalRecord(rec) => {
911 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
912 0 : format!(
913 0 : " rec {} bytes will_init: {} {}",
914 0 : buf.len(),
915 0 : rec.will_init(),
916 0 : wal_desc
917 0 : )
918 : }
919 : };
920 8 : Ok(desc)
921 8 : }
922 :
923 12 : for entry in keys {
924 8 : let DeltaEntry { key, lsn, val, .. } = entry;
925 8 : let desc = match dump_blob(&val, ctx).await {
926 8 : Ok(desc) => desc,
927 0 : Err(err) => {
928 0 : format!("ERROR: {err}")
929 : }
930 : };
931 8 : println!(" key {key} at {lsn}: {desc}");
932 8 :
933 8 : // Print more details about CHECKPOINT records. Would be nice to print details
934 8 : // of many other record types too, but these are particularly interesting, as
935 8 : // have a lot of special processing for them in walingest.rs.
936 8 : use pageserver_api::key::CHECKPOINT_KEY;
937 8 : use postgres_ffi::CheckPoint;
938 8 : if key == CHECKPOINT_KEY {
939 0 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
940 0 : let val = Value::des(&buf)?;
941 0 : match val {
942 0 : Value::Image(img) => {
943 0 : let checkpoint = CheckPoint::decode(&img)?;
944 0 : println!(" CHECKPOINT: {:?}", checkpoint);
945 : }
946 0 : Value::WalRecord(_rec) => {
947 0 : println!(" unexpected walrecord value for checkpoint key");
948 0 : }
949 : }
950 8 : }
951 : }
952 :
953 4 : Ok(())
954 4 : }
955 : }
956 :
957 : /// A set of data associated with a delta layer key and its value
958 : pub struct DeltaEntry<'a> {
959 : pub key: Key,
960 : pub lsn: Lsn,
961 : /// Size of the stored value
962 : pub size: u64,
963 : /// Reference to the on-disk value
964 : pub val: ValueRef<'a>,
965 : }
966 :
967 : /// Reference to an on-disk value
968 : pub struct ValueRef<'a> {
969 : blob_ref: BlobRef,
970 : reader: BlockCursor<'a>,
971 : }
972 :
973 : impl<'a> ValueRef<'a> {
974 : /// Loads the value from disk
975 17292586 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
976 : // theoretically we *could* record an access time for each, but it does not really matter
977 17292586 : let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
978 17292586 : let val = Value::des(&buf)?;
979 17292586 : Ok(val)
980 17292586 : }
981 : }
982 :
983 : pub(crate) struct Adapter<T>(T);
984 :
985 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
986 18616583 : pub(crate) async fn read_blk(
987 18616583 : &self,
988 18616583 : blknum: u32,
989 18616583 : ctx: &RequestContext,
990 18616583 : ) -> Result<BlockLease, std::io::Error> {
991 18616583 : self.0.as_ref().file.read_blk(blknum, ctx).await
992 18616583 : }
993 : }
994 :
995 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
996 18616583 : fn as_ref(&self) -> &DeltaLayerInner {
997 18616583 : self
998 18616583 : }
999 : }
|