Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "index", and
24 : //! "values". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::RequestContext;
32 : use crate::page_cache::PAGE_SZ;
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::WriteBlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
37 : use crate::tenant::storage_layer::{
38 : PersistentLayer, ValueReconstructResult, ValueReconstructState,
39 : };
40 : use crate::virtual_file::VirtualFile;
41 : use crate::{walrecord, TEMP_FILE_SUFFIX};
42 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
43 : use anyhow::{bail, ensure, Context, Result};
44 : use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
45 : use rand::{distributions::Alphanumeric, Rng};
46 : use serde::{Deserialize, Serialize};
47 : use std::fs::{self, File};
48 : use std::io::SeekFrom;
49 : use std::io::{BufWriter, Write};
50 : use std::ops::Range;
51 : use std::os::unix::fs::FileExt;
52 : use std::path::{Path, PathBuf};
53 : use std::sync::Arc;
54 : use tokio::sync::OnceCell;
55 : use tracing::*;
56 :
57 : use utils::{
58 : bin_ser::BeSer,
59 : id::{TenantId, TimelineId},
60 : lsn::Lsn,
61 : };
62 :
63 : use super::{
64 : AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, PathOrConf,
65 : PersistentLayerDesc,
66 : };
67 :
68 : ///
69 : /// Header stored in the beginning of the file
70 : ///
71 : /// After this comes the 'values' part, starting on block 1. After that,
72 : /// the 'index' starts at the block indicated by 'index_start_blk'
73 : ///
74 15508 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
75 : pub struct Summary {
76 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
77 : magic: u16,
78 : format_version: u16,
79 :
80 : tenant_id: TenantId,
81 : timeline_id: TimelineId,
82 : key_range: Range<Key>,
83 : lsn_range: Range<Lsn>,
84 :
85 : /// Block number where the 'index' part of the file begins.
86 : pub index_start_blk: u32,
87 : /// Block within the 'index', where the B-tree root page is stored
88 : pub index_root_blk: u32,
89 : }
90 :
91 : impl From<&DeltaLayer> for Summary {
92 10578 : fn from(layer: &DeltaLayer) -> Self {
93 10578 : Self::expected(
94 10578 : layer.desc.tenant_id,
95 10578 : layer.desc.timeline_id,
96 10578 : layer.desc.key_range.clone(),
97 10578 : layer.desc.lsn_range.clone(),
98 10578 : )
99 10578 : }
100 : }
101 :
102 : impl Summary {
103 10578 : pub(super) fn expected(
104 10578 : tenant_id: TenantId,
105 10578 : timeline_id: TimelineId,
106 10578 : keys: Range<Key>,
107 10578 : lsns: Range<Lsn>,
108 10578 : ) -> Self {
109 10578 : Self {
110 10578 : magic: DELTA_FILE_MAGIC,
111 10578 : format_version: STORAGE_FORMAT_VERSION,
112 10578 :
113 10578 : tenant_id,
114 10578 : timeline_id,
115 10578 : key_range: keys,
116 10578 : lsn_range: lsns,
117 10578 :
118 10578 : index_start_blk: 0,
119 10578 : index_root_blk: 0,
120 10578 : }
121 10578 : }
122 : }
123 :
124 : // Flag indicating that this version initialize the page
125 : const WILL_INIT: u64 = 1;
126 :
127 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
128 : /// offset, and for WAL records it also contains `will_init` flag. The flag
129 : /// helps to determine the range of records that needs to be applied, without
130 : /// reading/deserializing records themselves.
131 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
132 : pub struct BlobRef(pub u64);
133 :
134 : impl BlobRef {
135 177283003 : pub fn will_init(&self) -> bool {
136 177283003 : (self.0 & WILL_INIT) != 0
137 177283003 : }
138 :
139 256231626 : pub fn pos(&self) -> u64 {
140 256231626 : self.0 >> 1
141 256231626 : }
142 :
143 91515920 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
144 91515920 : let mut blob_ref = pos << 1;
145 91515920 : if will_init {
146 8798982 : blob_ref |= WILL_INIT;
147 82716938 : }
148 91515920 : BlobRef(blob_ref)
149 91515920 : }
150 : }
151 :
152 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
153 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
154 :
155 : /// This is the key of the B-tree index stored in the delta layer. It consists
156 : /// of the serialized representation of a Key and LSN.
157 : impl DeltaKey {
158 46455422 : fn from_slice(buf: &[u8]) -> Self {
159 46455422 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
160 46455422 : bytes.copy_from_slice(buf);
161 46455422 : DeltaKey(bytes)
162 46455422 : }
163 :
164 136552371 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
165 136552371 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
166 136552371 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
167 136552371 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
168 136552371 : DeltaKey(bytes)
169 136552371 : }
170 :
171 46455422 : fn key(&self) -> Key {
172 46455422 : Key::from_slice(&self.0)
173 46455422 : }
174 :
175 46455422 : fn lsn(&self) -> Lsn {
176 46455422 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
177 46455422 : }
178 :
179 178427717 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
180 178427717 : let mut lsn_buf = [0u8; 8];
181 178427717 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
182 178427717 : Lsn(u64::from_be_bytes(lsn_buf))
183 178427717 : }
184 : }
185 :
186 : /// DeltaLayer is the in-memory data structure associated with an on-disk delta
187 : /// file.
188 : ///
189 : /// We keep a DeltaLayer in memory for each file, in the LayerMap. If a layer
190 : /// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
191 : /// Otherwise the struct is just a placeholder for a file that exists on disk,
192 : /// and it needs to be loaded before using it in queries.
193 : pub struct DeltaLayer {
194 : path_or_conf: PathOrConf,
195 :
196 : pub desc: PersistentLayerDesc,
197 :
198 : access_stats: LayerAccessStats,
199 :
200 : inner: OnceCell<Arc<DeltaLayerInner>>,
201 : }
202 :
203 : impl std::fmt::Debug for DeltaLayer {
204 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 0 : use super::RangeDisplayDebug;
206 0 :
207 0 : f.debug_struct("DeltaLayer")
208 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
209 0 : .field("lsn_range", &self.desc.lsn_range)
210 0 : .field("file_size", &self.desc.file_size)
211 0 : .field("inner", &self.inner)
212 0 : .finish()
213 0 : }
214 : }
215 :
216 : pub struct DeltaLayerInner {
217 : // values copied from summary
218 : index_start_blk: u32,
219 : index_root_blk: u32,
220 :
221 : /// Reader object for reading blocks from the file.
222 : file: FileBlockReader,
223 : }
224 :
225 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
226 33813857 : fn as_ref(&self) -> &DeltaLayerInner {
227 33813857 : self
228 33813857 : }
229 : }
230 :
231 : impl std::fmt::Debug for DeltaLayerInner {
232 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 0 : f.debug_struct("DeltaLayerInner")
234 0 : .field("index_start_blk", &self.index_start_blk)
235 0 : .field("index_root_blk", &self.index_root_blk)
236 0 : .finish()
237 0 : }
238 : }
239 :
240 : #[async_trait::async_trait]
241 : impl Layer for DeltaLayer {
242 45036462 : async fn get_value_reconstruct_data(
243 45036462 : &self,
244 45036462 : key: Key,
245 45036462 : lsn_range: Range<Lsn>,
246 45036462 : reconstruct_state: &mut ValueReconstructState,
247 45036462 : ctx: &RequestContext,
248 45036482 : ) -> anyhow::Result<ValueReconstructResult> {
249 45036482 : self.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
250 1369957 : .await
251 90072961 : }
252 : }
253 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
254 : impl std::fmt::Display for DeltaLayer {
255 5156 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 5156 : write!(f, "{}", self.layer_desc().short_id())
257 5156 : }
258 : }
259 :
260 : impl AsLayerDesc for DeltaLayer {
261 152073 : fn layer_desc(&self) -> &PersistentLayerDesc {
262 152073 : &self.desc
263 152073 : }
264 : }
265 :
266 : impl PersistentLayer for DeltaLayer {
267 7794 : fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
268 7794 : Some(self)
269 7794 : }
270 :
271 203 : fn local_path(&self) -> Option<PathBuf> {
272 203 : self.local_path()
273 203 : }
274 :
275 5257 : fn delete_resident_layer_file(&self) -> Result<()> {
276 5257 : self.delete_resident_layer_file()
277 5257 : }
278 :
279 1867 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
280 1867 : self.info(reset)
281 1867 : }
282 :
283 1440 : fn access_stats(&self) -> &LayerAccessStats {
284 1440 : self.access_stats()
285 1440 : }
286 : }
287 :
288 : impl DeltaLayer {
289 4 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
290 4 : println!(
291 4 : "----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
292 4 : self.desc.tenant_id,
293 4 : self.desc.timeline_id,
294 4 : self.desc.key_range.start,
295 4 : self.desc.key_range.end,
296 4 : self.desc.lsn_range.start,
297 4 : self.desc.lsn_range.end,
298 4 : self.desc.file_size,
299 4 : );
300 4 :
301 4 : if !verbose {
302 2 : return Ok(());
303 2 : }
304 :
305 2 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
306 :
307 2 : println!(
308 2 : "index_start_blk: {}, root {}",
309 2 : inner.index_start_blk, inner.index_root_blk
310 2 : );
311 2 :
312 2 : let file = &inner.file;
313 2 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
314 2 : inner.index_start_blk,
315 2 : inner.index_root_blk,
316 2 : file,
317 2 : );
318 2 :
319 2 : tree_reader.dump().await?;
320 :
321 2 : let keys = DeltaLayerInner::load_keys(&inner).await?;
322 :
323 : // A subroutine to dump a single blob
324 4 : async fn dump_blob(val: ValueRef<'_>) -> Result<String> {
325 4 : let buf = val.reader.read_blob(val.blob_ref.pos()).await?;
326 4 : let val = Value::des(&buf)?;
327 4 : let desc = match val {
328 4 : Value::Image(img) => {
329 4 : format!(" img {} bytes", img.len())
330 : }
331 0 : Value::WalRecord(rec) => {
332 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
333 0 : format!(
334 0 : " rec {} bytes will_init: {} {}",
335 0 : buf.len(),
336 0 : rec.will_init(),
337 0 : wal_desc
338 0 : )
339 : }
340 : };
341 4 : Ok(desc)
342 4 : }
343 :
344 6 : for entry in keys {
345 4 : let DeltaEntry { key, lsn, val, .. } = entry;
346 4 : let desc = match dump_blob(val).await {
347 4 : Ok(desc) => desc,
348 0 : Err(err) => {
349 0 : let err: anyhow::Error = err;
350 0 : format!("ERROR: {err}")
351 : }
352 : };
353 4 : println!(" key {key} at {lsn}: {desc}");
354 : }
355 :
356 2 : Ok(())
357 4 : }
358 :
359 45036462 : pub(crate) async fn get_value_reconstruct_data(
360 45036462 : &self,
361 45036462 : key: Key,
362 45036462 : lsn_range: Range<Lsn>,
363 45036462 : reconstruct_state: &mut ValueReconstructState,
364 45036462 : ctx: &RequestContext,
365 45036482 : ) -> anyhow::Result<ValueReconstructResult> {
366 45036482 : ensure!(lsn_range.start >= self.desc.lsn_range.start);
367 :
368 45036482 : ensure!(self.desc.key_range.contains(&key));
369 :
370 45036482 : let inner = self
371 45036482 : .load(LayerAccessKind::GetValueReconstructData, ctx)
372 368 : .await?;
373 45036471 : inner
374 45036471 : .get_value_reconstruct_data(key, lsn_range, reconstruct_state)
375 1369589 : .await
376 45036478 : }
377 :
378 203 : pub(crate) fn local_path(&self) -> Option<PathBuf> {
379 203 : Some(self.path())
380 203 : }
381 :
382 : pub(crate) fn delete_resident_layer_file(&self) -> Result<()> {
383 : // delete underlying file
384 5257 : fs::remove_file(self.path())?;
385 5257 : Ok(())
386 5257 : }
387 :
388 1867 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
389 1867 : let layer_file_name = self.layer_desc().filename().file_name();
390 1867 : let lsn_range = self.layer_desc().lsn_range.clone();
391 1867 :
392 1867 : let access_stats = self.access_stats.as_api_model(reset);
393 1867 :
394 1867 : HistoricLayerInfo::Delta {
395 1867 : layer_file_name,
396 1867 : layer_file_size: self.desc.file_size,
397 1867 : lsn_start: lsn_range.start,
398 1867 : lsn_end: lsn_range.end,
399 1867 : remote: false,
400 1867 : access_stats,
401 1867 : }
402 1867 : }
403 :
404 18957 : pub(crate) fn access_stats(&self) -> &LayerAccessStats {
405 18957 : &self.access_stats
406 18957 : }
407 :
408 57812 : fn path_for(
409 57812 : path_or_conf: &PathOrConf,
410 57812 : tenant_id: &TenantId,
411 57812 : timeline_id: &TimelineId,
412 57812 : fname: &DeltaFileName,
413 57812 : ) -> PathBuf {
414 57812 : match path_or_conf {
415 0 : PathOrConf::Path(path) => path.clone(),
416 57812 : PathOrConf::Conf(conf) => conf
417 57812 : .timeline_path(tenant_id, timeline_id)
418 57812 : .join(fname.to_string()),
419 : }
420 57812 : }
421 :
422 15514 : fn temp_path_for(
423 15514 : conf: &PageServerConf,
424 15514 : tenant_id: &TenantId,
425 15514 : timeline_id: &TimelineId,
426 15514 : key_start: Key,
427 15514 : lsn_range: &Range<Lsn>,
428 15514 : ) -> PathBuf {
429 15514 : let rand_string: String = rand::thread_rng()
430 15514 : .sample_iter(&Alphanumeric)
431 15514 : .take(8)
432 15514 : .map(char::from)
433 15514 : .collect();
434 15514 :
435 15514 : conf.timeline_path(tenant_id, timeline_id).join(format!(
436 15514 : "{}-XXX__{:016X}-{:016X}.{}.{}",
437 15514 : key_start,
438 15514 : u64::from(lsn_range.start),
439 15514 : u64::from(lsn_range.end),
440 15514 : rand_string,
441 15514 : TEMP_FILE_SUFFIX,
442 15514 : ))
443 15514 : }
444 :
445 : ///
446 : /// Open the underlying file and read the metadata into memory, if it's
447 : /// not loaded already.
448 : ///
449 45041417 : async fn load(
450 45041417 : &self,
451 45041417 : access_kind: LayerAccessKind,
452 45041417 : ctx: &RequestContext,
453 45041437 : ) -> Result<&Arc<DeltaLayerInner>> {
454 45041437 : self.access_stats.record_access(access_kind, ctx);
455 45041437 : // Quick exit if already loaded
456 45041437 : self.inner
457 45041437 : .get_or_try_init(|| self.load_inner())
458 368 : .await
459 45041437 : .with_context(|| format!("Failed to load delta layer {}", self.path().display()))
460 45041437 : }
461 :
462 10578 : async fn load_inner(&self) -> Result<Arc<DeltaLayerInner>> {
463 10578 : let path = self.path();
464 :
465 10578 : let summary = match &self.path_or_conf {
466 10578 : PathOrConf::Conf(_) => Some(Summary::from(self)),
467 0 : PathOrConf::Path(_) => None,
468 : };
469 :
470 10578 : let loaded = DeltaLayerInner::load(&path, summary).await?;
471 :
472 10567 : if let PathOrConf::Path(ref path) = self.path_or_conf {
473 : // not production code
474 :
475 0 : let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
476 0 : let expected_filename = self.filename().file_name();
477 0 :
478 0 : if actual_filename != expected_filename {
479 0 : println!("warning: filename does not match what is expected from in-file summary");
480 0 : println!("actual: {:?}", actual_filename);
481 0 : println!("expected: {:?}", expected_filename);
482 0 : }
483 10567 : }
484 :
485 10567 : Ok(Arc::new(loaded))
486 10578 : }
487 :
488 : /// Create a DeltaLayer struct representing an existing file on disk.
489 4113 : pub fn new(
490 4113 : conf: &'static PageServerConf,
491 4113 : timeline_id: TimelineId,
492 4113 : tenant_id: TenantId,
493 4113 : filename: &DeltaFileName,
494 4113 : file_size: u64,
495 4113 : access_stats: LayerAccessStats,
496 4113 : ) -> DeltaLayer {
497 4113 : DeltaLayer {
498 4113 : path_or_conf: PathOrConf::Conf(conf),
499 4113 : desc: PersistentLayerDesc::new_delta(
500 4113 : tenant_id,
501 4113 : timeline_id,
502 4113 : filename.key_range.clone(),
503 4113 : filename.lsn_range.clone(),
504 4113 : file_size,
505 4113 : ),
506 4113 : access_stats,
507 4113 : inner: OnceCell::new(),
508 4113 : }
509 4113 : }
510 :
511 : /// Create a DeltaLayer struct representing an existing file on disk.
512 : ///
513 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
514 0 : pub fn new_for_path(path: &Path, file: File) -> Result<Self> {
515 0 : let mut summary_buf = Vec::new();
516 0 : summary_buf.resize(PAGE_SZ, 0);
517 0 : file.read_exact_at(&mut summary_buf, 0)?;
518 0 : let summary = Summary::des_prefix(&summary_buf)?;
519 :
520 0 : let metadata = file
521 0 : .metadata()
522 0 : .context("get file metadata to determine size")?;
523 :
524 0 : Ok(DeltaLayer {
525 0 : path_or_conf: PathOrConf::Path(path.to_path_buf()),
526 0 : desc: PersistentLayerDesc::new_delta(
527 0 : summary.tenant_id,
528 0 : summary.timeline_id,
529 0 : summary.key_range,
530 0 : summary.lsn_range,
531 0 : metadata.len(),
532 0 : ),
533 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
534 0 : inner: OnceCell::new(),
535 0 : })
536 0 : }
537 :
538 42304 : fn layer_name(&self) -> DeltaFileName {
539 42304 : self.desc.delta_file_name()
540 42304 : }
541 : /// Path to the layer file in pageserver workdir.
542 42304 : pub fn path(&self) -> PathBuf {
543 42304 : Self::path_for(
544 42304 : &self.path_or_conf,
545 42304 : &self.desc.tenant_id,
546 42304 : &self.desc.timeline_id,
547 42304 : &self.layer_name(),
548 42304 : )
549 42304 : }
550 : /// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
551 : ///
552 : /// The value can be obtained via the [`ValueRef::load`] function.
553 4953 : pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
554 4953 : let inner = self
555 4953 : .load(LayerAccessKind::KeyIter, ctx)
556 0 : .await
557 4953 : .context("load delta layer keys")?;
558 4953 : DeltaLayerInner::load_keys(inner)
559 1285 : .await
560 4953 : .context("Layer index is corrupted")
561 4953 : }
562 : }
563 :
564 : /// A builder object for constructing a new delta layer.
565 : ///
566 : /// Usage:
567 : ///
568 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
569 : ///
570 : /// 2. Write the contents by calling `put_value` for every page
571 : /// version to store in the layer.
572 : ///
573 : /// 3. Call `finish`.
574 : ///
575 : struct DeltaLayerWriterInner {
576 : conf: &'static PageServerConf,
577 : pub path: PathBuf,
578 : timeline_id: TimelineId,
579 : tenant_id: TenantId,
580 :
581 : key_start: Key,
582 : lsn_range: Range<Lsn>,
583 :
584 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
585 :
586 : blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
587 : }
588 :
589 : impl DeltaLayerWriterInner {
590 : ///
591 : /// Start building a new delta layer.
592 : ///
593 15514 : async fn new(
594 15514 : conf: &'static PageServerConf,
595 15514 : timeline_id: TimelineId,
596 15514 : tenant_id: TenantId,
597 15514 : key_start: Key,
598 15514 : lsn_range: Range<Lsn>,
599 15514 : ) -> anyhow::Result<Self> {
600 15514 : // Create the file initially with a temporary filename. We don't know
601 15514 : // the end key yet, so we cannot form the final filename yet. We will
602 15514 : // rename it when we're done.
603 15514 : //
604 15514 : // Note: This overwrites any existing file. There shouldn't be any.
605 15514 : // FIXME: throw an error instead?
606 15514 : let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range);
607 :
608 15514 : let mut file = VirtualFile::create(&path)?;
609 : // make room for the header block
610 15514 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
611 15514 : let buf_writer = BufWriter::new(file);
612 15514 : let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64);
613 15514 :
614 15514 : // Initialize the b-tree index builder
615 15514 : let block_buf = BlockBuf::new();
616 15514 : let tree_builder = DiskBtreeBuilder::new(block_buf);
617 15514 :
618 15514 : Ok(Self {
619 15514 : conf,
620 15514 : path,
621 15514 : timeline_id,
622 15514 : tenant_id,
623 15514 : key_start,
624 15514 : lsn_range,
625 15514 : tree: tree_builder,
626 15514 : blob_writer,
627 15514 : })
628 15514 : }
629 :
630 : ///
631 : /// Append a key-value pair to the file.
632 : ///
633 : /// The values must be appended in key, lsn order.
634 : ///
635 32493194 : async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
636 32493264 : self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
637 0 : .await
638 32493262 : }
639 :
640 91515921 : async fn put_value_bytes(
641 91515921 : &mut self,
642 91515921 : key: Key,
643 91515921 : lsn: Lsn,
644 91515921 : val: &[u8],
645 91515921 : will_init: bool,
646 91516026 : ) -> anyhow::Result<()> {
647 91516026 : assert!(self.lsn_range.start <= lsn);
648 :
649 91516027 : let off = self.blob_writer.write_blob(val).await?;
650 :
651 91516027 : let blob_ref = BlobRef::new(off, will_init);
652 91516027 :
653 91516027 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
654 91516027 : self.tree.append(&delta_key.0, blob_ref.0)?;
655 :
656 91516027 : Ok(())
657 91516027 : }
658 :
659 1751918 : fn size(&self) -> u64 {
660 1751918 : self.blob_writer.size() + self.tree.borrow_writer().size()
661 1751918 : }
662 :
663 : ///
664 : /// Finish writing the delta layer.
665 : ///
666 15508 : async fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
667 15508 : let index_start_blk =
668 15508 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
669 15508 :
670 15508 : let buf_writer = self.blob_writer.into_inner();
671 15508 : let mut file = buf_writer.into_inner()?;
672 :
673 : // Write out the index
674 15508 : let (index_root_blk, block_buf) = self.tree.finish()?;
675 15508 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
676 0 : .await?;
677 207575 : for buf in block_buf.blocks {
678 192067 : file.write_all(buf.as_ref())?;
679 : }
680 15508 : assert!(self.lsn_range.start < self.lsn_range.end);
681 : // Fill in the summary on blk 0
682 15508 : let summary = Summary {
683 15508 : magic: DELTA_FILE_MAGIC,
684 15508 : format_version: STORAGE_FORMAT_VERSION,
685 15508 : tenant_id: self.tenant_id,
686 15508 : timeline_id: self.timeline_id,
687 15508 : key_range: self.key_start..key_end,
688 15508 : lsn_range: self.lsn_range.clone(),
689 15508 : index_start_blk,
690 15508 : index_root_blk,
691 15508 : };
692 15508 :
693 15508 : let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
694 15508 : Summary::ser_into(&summary, &mut buf)?;
695 15508 : if buf.spilled() {
696 : // This is bad as we only have one free block for the summary
697 0 : warn!(
698 0 : "Used more than one page size for summary buffer: {}",
699 0 : buf.len()
700 0 : );
701 15508 : }
702 15508 : file.seek(SeekFrom::Start(0)).await?;
703 15508 : file.write_all(&buf)?;
704 :
705 15508 : let metadata = file
706 15508 : .metadata()
707 0 : .await
708 15508 : .context("get file metadata to determine size")?;
709 :
710 : // 5GB limit for objects without multipart upload (which we don't want to use)
711 : // Make it a little bit below to account for differing GB units
712 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
713 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
714 15508 : ensure!(
715 15508 : metadata.len() <= S3_UPLOAD_LIMIT,
716 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
717 0 : file.path.display(),
718 0 : metadata.len()
719 : );
720 :
721 : // Note: Because we opened the file in write-only mode, we cannot
722 : // reuse the same VirtualFile for reading later. That's why we don't
723 : // set inner.file here. The first read will have to re-open it.
724 15508 : let layer = DeltaLayer {
725 15508 : path_or_conf: PathOrConf::Conf(self.conf),
726 15508 : desc: PersistentLayerDesc::new_delta(
727 15508 : self.tenant_id,
728 15508 : self.timeline_id,
729 15508 : self.key_start..key_end,
730 15508 : self.lsn_range.clone(),
731 15508 : metadata.len(),
732 15508 : ),
733 15508 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
734 15508 : inner: OnceCell::new(),
735 15508 : };
736 15508 :
737 15508 : // fsync the file
738 15508 : file.sync_all()?;
739 : // Rename the file to its final name
740 : //
741 : // Note: This overwrites any existing file. There shouldn't be any.
742 : // FIXME: throw an error instead?
743 15508 : let final_path = DeltaLayer::path_for(
744 15508 : &PathOrConf::Conf(self.conf),
745 15508 : &self.tenant_id,
746 15508 : &self.timeline_id,
747 15508 : &DeltaFileName {
748 15508 : key_range: self.key_start..key_end,
749 15508 : lsn_range: self.lsn_range,
750 15508 : },
751 15508 : );
752 15508 : std::fs::rename(self.path, &final_path)?;
753 :
754 0 : trace!("created delta layer {}", final_path.display());
755 :
756 15508 : Ok(layer)
757 15508 : }
758 : }
759 :
760 : /// A builder object for constructing a new delta layer.
761 : ///
762 : /// Usage:
763 : ///
764 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
765 : ///
766 : /// 2. Write the contents by calling `put_value` for every page
767 : /// version to store in the layer.
768 : ///
769 : /// 3. Call `finish`.
770 : ///
771 : /// # Note
772 : ///
773 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
774 : /// possible for the writer to drop before `finish` is actually called. So this
775 : /// could lead to odd temporary files in the directory, exhausting file system.
776 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
777 : /// implementation that cleans up the temporary file in failure. It's not
778 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
779 : /// out some fields, making it impossible to implement `Drop`.
780 : ///
781 : #[must_use]
782 : pub struct DeltaLayerWriter {
783 : inner: Option<DeltaLayerWriterInner>,
784 : }
785 :
786 : impl DeltaLayerWriter {
787 : ///
788 : /// Start building a new delta layer.
789 : ///
790 15514 : pub async fn new(
791 15514 : conf: &'static PageServerConf,
792 15514 : timeline_id: TimelineId,
793 15514 : tenant_id: TenantId,
794 15514 : key_start: Key,
795 15514 : lsn_range: Range<Lsn>,
796 15514 : ) -> anyhow::Result<Self> {
797 : Ok(Self {
798 : inner: Some(
799 15514 : DeltaLayerWriterInner::new(conf, timeline_id, tenant_id, key_start, lsn_range)
800 0 : .await?,
801 : ),
802 : })
803 15514 : }
804 :
805 : ///
806 : /// Append a key-value pair to the file.
807 : ///
808 : /// The values must be appended in key, lsn order.
809 : ///
810 32493194 : pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
811 32493266 : self.inner.as_mut().unwrap().put_value(key, lsn, val).await
812 32493264 : }
813 :
814 59022728 : pub async fn put_value_bytes(
815 59022728 : &mut self,
816 59022728 : key: Key,
817 59022728 : lsn: Lsn,
818 59022728 : val: &[u8],
819 59022728 : will_init: bool,
820 59022728 : ) -> anyhow::Result<()> {
821 59022764 : self.inner
822 59022764 : .as_mut()
823 59022764 : .unwrap()
824 59022764 : .put_value_bytes(key, lsn, val, will_init)
825 0 : .await
826 59022763 : }
827 :
828 1751918 : pub fn size(&self) -> u64 {
829 1751918 : self.inner.as_ref().unwrap().size()
830 1751918 : }
831 :
832 : ///
833 : /// Finish writing the delta layer.
834 : ///
835 15508 : pub async fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
836 15508 : self.inner.take().unwrap().finish(key_end).await
837 15508 : }
838 : }
839 :
840 : impl Drop for DeltaLayerWriter {
841 : fn drop(&mut self) {
842 15508 : if let Some(inner) = self.inner.take() {
843 0 : match inner.blob_writer.into_inner().into_inner() {
844 0 : Ok(vfile) => vfile.remove(),
845 0 : Err(err) => warn!(
846 0 : "error while flushing buffer of image layer temporary file: {}",
847 0 : err
848 0 : ),
849 : }
850 15508 : }
851 15508 : }
852 : }
853 :
854 : impl DeltaLayerInner {
855 10578 : pub(super) async fn load(
856 10578 : path: &std::path::Path,
857 10578 : summary: Option<Summary>,
858 10578 : ) -> anyhow::Result<Self> {
859 10578 : let file = VirtualFile::open(path)
860 10578 : .with_context(|| format!("Failed to open file '{}'", path.display()))?;
861 10578 : let file = FileBlockReader::new(file);
862 :
863 10578 : let summary_blk = file.read_blk(0).await?;
864 10567 : let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
865 :
866 10567 : if let Some(mut expected_summary) = summary {
867 : // production code path
868 10567 : expected_summary.index_start_blk = actual_summary.index_start_blk;
869 10567 : expected_summary.index_root_blk = actual_summary.index_root_blk;
870 10567 : if actual_summary != expected_summary {
871 0 : bail!(
872 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
873 0 : actual_summary,
874 0 : expected_summary
875 0 : );
876 10567 : }
877 0 : }
878 :
879 10567 : Ok(DeltaLayerInner {
880 10567 : file,
881 10567 : index_start_blk: actual_summary.index_start_blk,
882 10567 : index_root_blk: actual_summary.index_root_blk,
883 10567 : })
884 10578 : }
885 :
886 45036451 : pub(super) async fn get_value_reconstruct_data(
887 45036451 : &self,
888 45036451 : key: Key,
889 45036451 : lsn_range: Range<Lsn>,
890 45036451 : reconstruct_state: &mut ValueReconstructState,
891 45036471 : ) -> anyhow::Result<ValueReconstructResult> {
892 45036471 : let mut need_image = true;
893 45036471 : // Scan the page versions backwards, starting from `lsn`.
894 45036471 : let file = &self.file;
895 45036471 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
896 45036471 : self.index_start_blk,
897 45036471 : self.index_root_blk,
898 45036471 : file,
899 45036471 : );
900 45036471 : let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
901 45036471 :
902 45036471 : let mut offsets: Vec<(Lsn, u64)> = Vec::new();
903 45036471 :
904 45036471 : tree_reader
905 203946202 : .visit(&search_key.0, VisitDirection::Backwards, |key, value| {
906 203946202 : let blob_ref = BlobRef(value);
907 203946202 : if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
908 25518103 : return false;
909 178428099 : }
910 178428099 : let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
911 178428099 : if entry_lsn < lsn_range.start {
912 1144715 : return false;
913 177283384 : }
914 177283384 : offsets.push((entry_lsn, blob_ref.pos()));
915 177283384 :
916 177283384 : !blob_ref.will_init()
917 203946202 : })
918 311457 : .await?;
919 :
920 : // Ok, 'offsets' now contains the offsets of all the entries we need to read
921 45036470 : let cursor = file.block_cursor();
922 45036470 : let mut buf = Vec::new();
923 219437037 : for (entry_lsn, pos) in offsets {
924 177283037 : cursor
925 177283037 : .read_blob_into_buf(pos, &mut buf)
926 1058132 : .await
927 177283035 : .with_context(|| {
928 0 : format!(
929 0 : "Failed to read blob from virtual file {}",
930 0 : file.file.path.display()
931 0 : )
932 177283035 : })?;
933 177283035 : let val = Value::des(&buf).with_context(|| {
934 0 : format!(
935 0 : "Failed to deserialize file blob from virtual file {}",
936 0 : file.file.path.display()
937 0 : )
938 177283035 : })?;
939 177283035 : match val {
940 1455205 : Value::Image(img) => {
941 1455205 : reconstruct_state.img = Some((entry_lsn, img));
942 1455205 : need_image = false;
943 1455205 : break;
944 : }
945 175827830 : Value::WalRecord(rec) => {
946 175827830 : let will_init = rec.will_init();
947 175827830 : reconstruct_state.records.push((entry_lsn, rec));
948 175827830 : if will_init {
949 : // This WAL record initializes the page, so no need to go further back
950 1427263 : need_image = false;
951 1427263 : break;
952 174400567 : }
953 : }
954 : }
955 : }
956 :
957 : // If an older page image is needed to reconstruct the page, let the
958 : // caller know.
959 45036468 : if need_image {
960 42154000 : Ok(ValueReconstructResult::Continue)
961 : } else {
962 2882468 : Ok(ValueReconstructResult::Complete)
963 : }
964 45036468 : }
965 :
966 4955 : pub(super) async fn load_keys<T: AsRef<DeltaLayerInner> + Clone>(
967 4955 : this: &T,
968 4955 : ) -> Result<Vec<DeltaEntry<'_>>> {
969 4955 : let dl = this.as_ref();
970 4955 : let file = &dl.file;
971 4955 :
972 4955 : let tree_reader =
973 4955 : DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file);
974 4955 :
975 4955 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
976 4955 :
977 4955 : tree_reader
978 4955 : .visit(
979 4955 : &[0u8; DELTA_KEY_SIZE],
980 4955 : VisitDirection::Forwards,
981 46455422 : |key, value| {
982 46455422 : let delta_key = DeltaKey::from_slice(key);
983 46455422 : let val_ref = ValueRef {
984 46455422 : blob_ref: BlobRef(value),
985 46455422 : reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
986 46455422 : Adapter(dl),
987 46455422 : )),
988 46455422 : };
989 46455422 : let pos = BlobRef(value).pos();
990 46455422 : if let Some(last) = all_keys.last_mut() {
991 46450467 : // subtract offset of the current and last entries to get the size
992 46450467 : // of the value associated with this (key, lsn) tuple
993 46450467 : let first_pos = last.size;
994 46450467 : last.size = pos - first_pos;
995 46450467 : }
996 46455422 : let entry = DeltaEntry {
997 46455422 : key: delta_key.key(),
998 46455422 : lsn: delta_key.lsn(),
999 46455422 : size: pos,
1000 46455422 : val: val_ref,
1001 46455422 : };
1002 46455422 : all_keys.push(entry);
1003 46455422 : true
1004 46455422 : },
1005 4955 : )
1006 1285 : .await?;
1007 4955 : if let Some(last) = all_keys.last_mut() {
1008 4955 : // Last key occupies all space till end of value storage,
1009 4955 : // which corresponds to beginning of the index
1010 4955 : last.size = dl.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
1011 4955 : }
1012 4955 : Ok(all_keys)
1013 4955 : }
1014 : }
1015 :
1016 : /// A set of data associated with a delta layer key and its value
1017 : pub struct DeltaEntry<'a> {
1018 : pub key: Key,
1019 : pub lsn: Lsn,
1020 : /// Size of the stored value
1021 : pub size: u64,
1022 : /// Reference to the on-disk value
1023 : pub val: ValueRef<'a>,
1024 : }
1025 :
1026 : /// Reference to an on-disk value
1027 : pub struct ValueRef<'a> {
1028 : blob_ref: BlobRef,
1029 : reader: BlockCursor<'a>,
1030 : }
1031 :
1032 : impl<'a> ValueRef<'a> {
1033 : /// Loads the value from disk
1034 32493197 : pub async fn load(&self) -> Result<Value> {
1035 : // theoretically we *could* record an access time for each, but it does not really matter
1036 32493269 : let buf = self.reader.read_blob(self.blob_ref.pos()).await?;
1037 32493267 : let val = Value::des(&buf)?;
1038 32493266 : Ok(val)
1039 32493266 : }
1040 : }
1041 :
1042 : pub(crate) struct Adapter<T>(T);
1043 :
1044 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1045 33813933 : pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
1046 33813933 : self.0.as_ref().file.read_blk(blknum).await
1047 33813931 : }
1048 : }
|