TLA Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "index", and
24 : //! "values". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::PAGE_SZ;
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
37 : use crate::tenant::storage_layer::{
38 : PersistentLayer, ValueReconstructResult, ValueReconstructState,
39 : };
40 : use crate::virtual_file::VirtualFile;
41 : use crate::{walrecord, TEMP_FILE_SUFFIX};
42 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
43 : use anyhow::{bail, ensure, Context, Result};
44 : use camino::{Utf8Path, Utf8PathBuf};
45 : use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
46 : use rand::{distributions::Alphanumeric, Rng};
47 : use serde::{Deserialize, Serialize};
48 : use std::fs::{self, File};
49 : use std::io::SeekFrom;
50 : use std::ops::Range;
51 : use std::os::unix::fs::FileExt;
52 : use std::sync::Arc;
53 : use tokio::sync::OnceCell;
54 : use tracing::*;
55 :
56 : use utils::{
57 : bin_ser::BeSer,
58 : id::{TenantId, TimelineId},
59 : lsn::Lsn,
60 : };
61 :
62 : use super::{
63 : AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, PathOrConf,
64 : PersistentLayerDesc,
65 : };
66 :
67 : ///
68 : /// Header stored in the beginning of the file
69 : ///
70 : /// After this comes the 'values' part, starting on block 1. After that,
71 : /// the 'index' starts at the block indicated by 'index_start_blk'
72 : ///
73 CBC 15721 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
74 : pub struct Summary {
75 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
76 : magic: u16,
77 : format_version: u16,
78 :
79 : tenant_id: TenantId,
80 : timeline_id: TimelineId,
81 : key_range: Range<Key>,
82 : lsn_range: Range<Lsn>,
83 :
84 : /// Block number where the 'index' part of the file begins.
85 : pub index_start_blk: u32,
86 : /// Block within the 'index', where the B-tree root page is stored
87 : pub index_root_blk: u32,
88 : }
89 :
90 : impl From<&DeltaLayer> for Summary {
91 10807 : fn from(layer: &DeltaLayer) -> Self {
92 10807 : Self::expected(
93 10807 : layer.desc.tenant_id,
94 10807 : layer.desc.timeline_id,
95 10807 : layer.desc.key_range.clone(),
96 10807 : layer.desc.lsn_range.clone(),
97 10807 : )
98 10807 : }
99 : }
100 :
101 : impl Summary {
102 10807 : pub(super) fn expected(
103 10807 : tenant_id: TenantId,
104 10807 : timeline_id: TimelineId,
105 10807 : keys: Range<Key>,
106 10807 : lsns: Range<Lsn>,
107 10807 : ) -> Self {
108 10807 : Self {
109 10807 : magic: DELTA_FILE_MAGIC,
110 10807 : format_version: STORAGE_FORMAT_VERSION,
111 10807 :
112 10807 : tenant_id,
113 10807 : timeline_id,
114 10807 : key_range: keys,
115 10807 : lsn_range: lsns,
116 10807 :
117 10807 : index_start_blk: 0,
118 10807 : index_root_blk: 0,
119 10807 : }
120 10807 : }
121 : }
122 :
123 : // Flag indicating that this version initialize the page
124 : const WILL_INIT: u64 = 1;
125 :
126 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
127 : /// offset, and for WAL records it also contains `will_init` flag. The flag
128 : /// helps to determine the range of records that needs to be applied, without
129 : /// reading/deserializing records themselves.
130 UBC 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
131 : pub struct BlobRef(pub u64);
132 :
133 : impl BlobRef {
134 CBC 131433446 : pub fn will_init(&self) -> bool {
135 131433446 : (self.0 & WILL_INIT) != 0
136 131433446 : }
137 :
138 191246763 : pub fn pos(&self) -> u64 {
139 191246763 : self.0 >> 1
140 191246763 : }
141 :
142 82730190 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
143 82730190 : let mut blob_ref = pos << 1;
144 82730190 : if will_init {
145 8619558 : blob_ref |= WILL_INIT;
146 74110632 : }
147 82730190 : BlobRef(blob_ref)
148 82730190 : }
149 : }
150 :
151 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
152 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
153 :
154 : /// This is the key of the B-tree index stored in the delta layer. It consists
155 : /// of the serialized representation of a Key and LSN.
156 : impl DeltaKey {
157 30235446 : fn from_slice(buf: &[u8]) -> Self {
158 30235446 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
159 30235446 : bytes.copy_from_slice(buf);
160 30235446 : DeltaKey(bytes)
161 30235446 : }
162 :
163 106149550 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
164 106149550 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
165 106149550 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
166 106149550 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
167 106149550 : DeltaKey(bytes)
168 106149550 : }
169 :
170 30235446 : fn key(&self) -> Key {
171 30235446 : Key::from_slice(&self.0)
172 30235446 : }
173 :
174 30235446 : fn lsn(&self) -> Lsn {
175 30235446 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
176 30235446 : }
177 :
178 132123317 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
179 132123317 : let mut lsn_buf = [0u8; 8];
180 132123317 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
181 132123317 : Lsn(u64::from_be_bytes(lsn_buf))
182 132123317 : }
183 : }
184 :
185 : /// DeltaLayer is the in-memory data structure associated with an on-disk delta
186 : /// file.
187 : ///
188 : /// We keep a DeltaLayer in memory for each file, in the LayerMap. If a layer
189 : /// is in "loaded" state, we have a copy of the index in memory, in 'inner'.
190 : /// Otherwise the struct is just a placeholder for a file that exists on disk,
191 : /// and it needs to be loaded before using it in queries.
192 : pub struct DeltaLayer {
193 : path_or_conf: PathOrConf,
194 :
195 : pub desc: PersistentLayerDesc,
196 :
197 : access_stats: LayerAccessStats,
198 :
199 : inner: OnceCell<Arc<DeltaLayerInner>>,
200 : }
201 :
202 : impl std::fmt::Debug for DeltaLayer {
203 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 0 : use super::RangeDisplayDebug;
205 0 :
206 0 : f.debug_struct("DeltaLayer")
207 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
208 0 : .field("lsn_range", &self.desc.lsn_range)
209 0 : .field("file_size", &self.desc.file_size)
210 0 : .field("inner", &self.inner)
211 0 : .finish()
212 0 : }
213 : }
214 :
215 : pub struct DeltaLayerInner {
216 : // values copied from summary
217 : index_start_blk: u32,
218 : index_root_blk: u32,
219 :
220 : /// Reader object for reading blocks from the file.
221 : file: FileBlockReader,
222 : }
223 :
224 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
225 CBC 31066950 : fn as_ref(&self) -> &DeltaLayerInner {
226 31066950 : self
227 31066950 : }
228 : }
229 :
230 : impl std::fmt::Debug for DeltaLayerInner {
231 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 0 : f.debug_struct("DeltaLayerInner")
233 0 : .field("index_start_blk", &self.index_start_blk)
234 0 : .field("index_root_blk", &self.index_root_blk)
235 0 : .finish()
236 0 : }
237 : }
238 :
239 : #[async_trait::async_trait]
240 : impl Layer for DeltaLayer {
241 CBC 23419370 : async fn get_value_reconstruct_data(
242 23419370 : &self,
243 23419370 : key: Key,
244 23419370 : lsn_range: Range<Lsn>,
245 23419370 : reconstruct_state: &mut ValueReconstructState,
246 23419370 : ctx: &RequestContext,
247 23419370 : ) -> anyhow::Result<ValueReconstructResult> {
248 23419349 : self.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
249 2118353 : .await
250 46838692 : }
251 : }
252 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
253 : impl std::fmt::Display for DeltaLayer {
254 4571 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255 4571 : write!(f, "{}", self.layer_desc().short_id())
256 4571 : }
257 : }
258 :
259 : impl AsLayerDesc for DeltaLayer {
260 143635 : fn layer_desc(&self) -> &PersistentLayerDesc {
261 143635 : &self.desc
262 143635 : }
263 : }
264 :
265 : impl PersistentLayer for DeltaLayer {
266 4363 : fn downcast_delta_layer(self: Arc<Self>) -> Option<std::sync::Arc<DeltaLayer>> {
267 4363 : Some(self)
268 4363 : }
269 :
270 218 : fn local_path(&self) -> Option<Utf8PathBuf> {
271 218 : self.local_path()
272 218 : }
273 :
274 6951 : fn delete_resident_layer_file(&self) -> Result<()> {
275 6951 : self.delete_resident_layer_file()
276 6951 : }
277 :
278 1370 : fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
279 1370 : self.info(reset)
280 1370 : }
281 :
282 1455 : fn access_stats(&self) -> &LayerAccessStats {
283 1455 : self.access_stats()
284 1455 : }
285 : }
286 :
287 : impl DeltaLayer {
288 4 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
289 4 : println!(
290 4 : "----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
291 4 : self.desc.tenant_id,
292 4 : self.desc.timeline_id,
293 4 : self.desc.key_range.start,
294 4 : self.desc.key_range.end,
295 4 : self.desc.lsn_range.start,
296 4 : self.desc.lsn_range.end,
297 4 : self.desc.file_size,
298 4 : );
299 4 :
300 4 : if !verbose {
301 2 : return Ok(());
302 2 : }
303 :
304 2 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
305 :
306 2 : println!(
307 2 : "index_start_blk: {}, root {}",
308 2 : inner.index_start_blk, inner.index_root_blk
309 2 : );
310 2 :
311 2 : let file = &inner.file;
312 2 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
313 2 : inner.index_start_blk,
314 2 : inner.index_root_blk,
315 2 : file,
316 2 : );
317 2 :
318 2 : tree_reader.dump().await?;
319 :
320 2 : let keys = DeltaLayerInner::load_keys(&inner, ctx).await?;
321 :
322 : // A subroutine to dump a single blob
323 4 : async fn dump_blob(val: ValueRef<'_>, ctx: &RequestContext) -> Result<String> {
324 4 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
325 4 : let val = Value::des(&buf)?;
326 4 : let desc = match val {
327 4 : Value::Image(img) => {
328 4 : format!(" img {} bytes", img.len())
329 : }
330 UBC 0 : Value::WalRecord(rec) => {
331 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
332 0 : format!(
333 0 : " rec {} bytes will_init: {} {}",
334 0 : buf.len(),
335 0 : rec.will_init(),
336 0 : wal_desc
337 0 : )
338 : }
339 : };
340 CBC 4 : Ok(desc)
341 4 : }
342 :
343 6 : for entry in keys {
344 4 : let DeltaEntry { key, lsn, val, .. } = entry;
345 4 : let desc = match dump_blob(val, ctx).await {
346 4 : Ok(desc) => desc,
347 UBC 0 : Err(err) => {
348 0 : let err: anyhow::Error = err;
349 0 : format!("ERROR: {err}")
350 : }
351 : };
352 CBC 4 : println!(" key {key} at {lsn}: {desc}");
353 : }
354 :
355 2 : Ok(())
356 4 : }
357 :
358 23419370 : pub(crate) async fn get_value_reconstruct_data(
359 23419370 : &self,
360 23419370 : key: Key,
361 23419370 : lsn_range: Range<Lsn>,
362 23419370 : reconstruct_state: &mut ValueReconstructState,
363 23419370 : ctx: &RequestContext,
364 23419370 : ) -> anyhow::Result<ValueReconstructResult> {
365 23419340 : ensure!(lsn_range.start >= self.desc.lsn_range.start);
366 :
367 23419340 : ensure!(self.desc.key_range.contains(&key));
368 :
369 23419340 : let inner = self
370 23419340 : .load(LayerAccessKind::GetValueReconstructData, ctx)
371 309 : .await?;
372 23419330 : inner
373 23419330 : .get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
374 2118044 : .await
375 23419336 : }
376 :
377 218 : pub(crate) fn local_path(&self) -> Option<Utf8PathBuf> {
378 218 : Some(self.path())
379 218 : }
380 :
381 : pub(crate) fn delete_resident_layer_file(&self) -> Result<()> {
382 : // delete underlying file
383 6951 : fs::remove_file(self.path())?;
384 6951 : Ok(())
385 6951 : }
386 :
387 1370 : pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
388 1370 : let layer_file_name = self.layer_desc().filename().file_name();
389 1370 : let lsn_range = self.layer_desc().lsn_range.clone();
390 1370 :
391 1370 : let access_stats = self.access_stats.as_api_model(reset);
392 1370 :
393 1370 : HistoricLayerInfo::Delta {
394 1370 : layer_file_name,
395 1370 : layer_file_size: self.desc.file_size,
396 1370 : lsn_start: lsn_range.start,
397 1370 : lsn_end: lsn_range.end,
398 1370 : remote: false,
399 1370 : access_stats,
400 1370 : }
401 1370 : }
402 :
403 16653 : pub(crate) fn access_stats(&self) -> &LayerAccessStats {
404 16653 : &self.access_stats
405 16653 : }
406 :
407 59302 : fn path_for(
408 59302 : path_or_conf: &PathOrConf,
409 59302 : tenant_id: &TenantId,
410 59302 : timeline_id: &TimelineId,
411 59302 : fname: &DeltaFileName,
412 59302 : ) -> Utf8PathBuf {
413 59302 : match path_or_conf {
414 UBC 0 : PathOrConf::Path(path) => path.clone(),
415 CBC 59302 : PathOrConf::Conf(conf) => conf
416 59302 : .timeline_path(tenant_id, timeline_id)
417 59302 : .join(fname.to_string()),
418 : }
419 59302 : }
420 :
421 15728 : fn temp_path_for(
422 15728 : conf: &PageServerConf,
423 15728 : tenant_id: &TenantId,
424 15728 : timeline_id: &TimelineId,
425 15728 : key_start: Key,
426 15728 : lsn_range: &Range<Lsn>,
427 15728 : ) -> Utf8PathBuf {
428 15728 : let rand_string: String = rand::thread_rng()
429 15728 : .sample_iter(&Alphanumeric)
430 15728 : .take(8)
431 15728 : .map(char::from)
432 15728 : .collect();
433 15728 :
434 15728 : conf.timeline_path(tenant_id, timeline_id).join(format!(
435 15728 : "{}-XXX__{:016X}-{:016X}.{}.{}",
436 15728 : key_start,
437 15728 : u64::from(lsn_range.start),
438 15728 : u64::from(lsn_range.end),
439 15728 : rand_string,
440 15728 : TEMP_FILE_SUFFIX,
441 15728 : ))
442 15728 : }
443 :
444 : ///
445 : /// Open the underlying file and read the metadata into memory, if it's
446 : /// not loaded already.
447 : ///
448 23423705 : async fn load(
449 23423705 : &self,
450 23423705 : access_kind: LayerAccessKind,
451 23423705 : ctx: &RequestContext,
452 23423705 : ) -> Result<&Arc<DeltaLayerInner>> {
453 23423675 : self.access_stats.record_access(access_kind, ctx);
454 23423675 : // Quick exit if already loaded
455 23423675 : self.inner
456 23423675 : .get_or_try_init(|| self.load_inner(ctx))
457 311 : .await
458 23423675 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
459 23423675 : }
460 :
461 10807 : async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
462 10807 : let path = self.path();
463 :
464 10807 : let summary = match &self.path_or_conf {
465 10807 : PathOrConf::Conf(_) => Some(Summary::from(self)),
466 UBC 0 : PathOrConf::Path(_) => None,
467 : };
468 :
469 CBC 10807 : let loaded = DeltaLayerInner::load(&path, summary, ctx).await?;
470 :
471 10797 : if let PathOrConf::Path(ref path) = self.path_or_conf {
472 : // not production code
473 :
474 UBC 0 : let actual_filename = path.file_name().unwrap().to_owned();
475 0 : let expected_filename = self.filename().file_name();
476 0 :
477 0 : if actual_filename != expected_filename {
478 0 : println!("warning: filename does not match what is expected from in-file summary");
479 0 : println!("actual: {:?}", actual_filename);
480 0 : println!("expected: {:?}", expected_filename);
481 0 : }
482 CBC 10797 : }
483 :
484 10797 : Ok(Arc::new(loaded))
485 10807 : }
486 :
487 : /// Create a DeltaLayer struct representing an existing file on disk.
488 2690 : pub fn new(
489 2690 : conf: &'static PageServerConf,
490 2690 : timeline_id: TimelineId,
491 2690 : tenant_id: TenantId,
492 2690 : filename: &DeltaFileName,
493 2690 : file_size: u64,
494 2690 : access_stats: LayerAccessStats,
495 2690 : ) -> DeltaLayer {
496 2690 : DeltaLayer {
497 2690 : path_or_conf: PathOrConf::Conf(conf),
498 2690 : desc: PersistentLayerDesc::new_delta(
499 2690 : tenant_id,
500 2690 : timeline_id,
501 2690 : filename.key_range.clone(),
502 2690 : filename.lsn_range.clone(),
503 2690 : file_size,
504 2690 : ),
505 2690 : access_stats,
506 2690 : inner: OnceCell::new(),
507 2690 : }
508 2690 : }
509 :
510 : /// Create a DeltaLayer struct representing an existing file on disk.
511 : ///
512 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
513 UBC 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
514 0 : let mut summary_buf = vec![0; PAGE_SZ];
515 0 : file.read_exact_at(&mut summary_buf, 0)?;
516 0 : let summary = Summary::des_prefix(&summary_buf)?;
517 :
518 0 : let metadata = file
519 0 : .metadata()
520 0 : .context("get file metadata to determine size")?;
521 :
522 0 : Ok(DeltaLayer {
523 0 : path_or_conf: PathOrConf::Path(path.to_path_buf()),
524 0 : desc: PersistentLayerDesc::new_delta(
525 0 : summary.tenant_id,
526 0 : summary.timeline_id,
527 0 : summary.key_range,
528 0 : summary.lsn_range,
529 0 : metadata.len(),
530 0 : ),
531 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
532 0 : inner: OnceCell::new(),
533 0 : })
534 0 : }
535 :
536 CBC 43581 : fn layer_name(&self) -> DeltaFileName {
537 43581 : self.desc.delta_file_name()
538 43581 : }
539 : /// Path to the layer file in pageserver workdir.
540 43581 : pub fn path(&self) -> Utf8PathBuf {
541 43581 : Self::path_for(
542 43581 : &self.path_or_conf,
543 43581 : &self.desc.tenant_id,
544 43581 : &self.desc.timeline_id,
545 43581 : &self.layer_name(),
546 43581 : )
547 43581 : }
548 : /// Loads all keys stored in the layer. Returns key, lsn, value size and value reference.
549 : ///
550 : /// The value can be obtained via the [`ValueRef::load`] function.
551 4333 : pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> Result<Vec<DeltaEntry<'_>>> {
552 4333 : let inner = self
553 4333 : .load(LayerAccessKind::KeyIter, ctx)
554 2 : .await
555 4333 : .context("load delta layer keys")?;
556 4333 : DeltaLayerInner::load_keys(inner, ctx)
557 1504 : .await
558 4333 : .context("Layer index is corrupted")
559 4333 : }
560 : }
561 :
562 : /// A builder object for constructing a new delta layer.
563 : ///
564 : /// Usage:
565 : ///
566 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
567 : ///
568 : /// 2. Write the contents by calling `put_value` for every page
569 : /// version to store in the layer.
570 : ///
571 : /// 3. Call `finish`.
572 : ///
573 : struct DeltaLayerWriterInner {
574 : conf: &'static PageServerConf,
575 : pub path: Utf8PathBuf,
576 : timeline_id: TimelineId,
577 : tenant_id: TenantId,
578 :
579 : key_start: Key,
580 : lsn_range: Range<Lsn>,
581 :
582 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
583 :
584 : blob_writer: BlobWriter<true>,
585 : }
586 :
587 : impl DeltaLayerWriterInner {
588 : ///
589 : /// Start building a new delta layer.
590 : ///
591 15728 : async fn new(
592 15728 : conf: &'static PageServerConf,
593 15728 : timeline_id: TimelineId,
594 15728 : tenant_id: TenantId,
595 15728 : key_start: Key,
596 15728 : lsn_range: Range<Lsn>,
597 15728 : ) -> anyhow::Result<Self> {
598 15728 : // Create the file initially with a temporary filename. We don't know
599 15728 : // the end key yet, so we cannot form the final filename yet. We will
600 15728 : // rename it when we're done.
601 15728 : //
602 15728 : // Note: This overwrites any existing file. There shouldn't be any.
603 15728 : // FIXME: throw an error instead?
604 15728 : let path = DeltaLayer::temp_path_for(conf, &tenant_id, &timeline_id, key_start, &lsn_range);
605 :
606 15728 : let mut file = VirtualFile::create(&path).await?;
607 : // make room for the header block
608 15728 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
609 15728 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
610 15728 :
611 15728 : // Initialize the b-tree index builder
612 15728 : let block_buf = BlockBuf::new();
613 15728 : let tree_builder = DiskBtreeBuilder::new(block_buf);
614 15728 :
615 15728 : Ok(Self {
616 15728 : conf,
617 15728 : path,
618 15728 : timeline_id,
619 15728 : tenant_id,
620 15728 : key_start,
621 15728 : lsn_range,
622 15728 : tree: tree_builder,
623 15728 : blob_writer,
624 15728 : })
625 15728 : }
626 :
627 : ///
628 : /// Append a key-value pair to the file.
629 : ///
630 : /// The values must be appended in key, lsn order.
631 : ///
632 29577866 : async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
633 29577757 : self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
634 UBC 0 : .await
635 CBC 29577757 : }
636 :
637 82730191 : async fn put_value_bytes(
638 82730191 : &mut self,
639 82730191 : key: Key,
640 82730191 : lsn: Lsn,
641 82730191 : val: &[u8],
642 82730191 : will_init: bool,
643 82730191 : ) -> anyhow::Result<()> {
644 82729950 : assert!(self.lsn_range.start <= lsn);
645 :
646 82729950 : let off = self.blob_writer.write_blob(val).await?;
647 :
648 82729948 : let blob_ref = BlobRef::new(off, will_init);
649 82729948 :
650 82729948 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
651 82729948 : self.tree.append(&delta_key.0, blob_ref.0)?;
652 :
653 82729948 : Ok(())
654 82729948 : }
655 :
656 1870701 : fn size(&self) -> u64 {
657 1870701 : self.blob_writer.size() + self.tree.borrow_writer().size()
658 1870701 : }
659 :
660 : ///
661 : /// Finish writing the delta layer.
662 : ///
663 15722 : async fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
664 15721 : let index_start_blk =
665 15721 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
666 :
667 15721 : let mut file = self.blob_writer.into_inner().await?;
668 :
669 : // Write out the index
670 15721 : let (index_root_blk, block_buf) = self.tree.finish()?;
671 15721 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
672 UBC 0 : .await?;
673 CBC 192976 : for buf in block_buf.blocks {
674 177255 : file.write_all(buf.as_ref()).await?;
675 : }
676 15721 : assert!(self.lsn_range.start < self.lsn_range.end);
677 : // Fill in the summary on blk 0
678 15721 : let summary = Summary {
679 15721 : magic: DELTA_FILE_MAGIC,
680 15721 : format_version: STORAGE_FORMAT_VERSION,
681 15721 : tenant_id: self.tenant_id,
682 15721 : timeline_id: self.timeline_id,
683 15721 : key_range: self.key_start..key_end,
684 15721 : lsn_range: self.lsn_range.clone(),
685 15721 : index_start_blk,
686 15721 : index_root_blk,
687 15721 : };
688 15721 :
689 15721 : let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
690 15721 : Summary::ser_into(&summary, &mut buf)?;
691 15721 : if buf.spilled() {
692 : // This is bad as we only have one free block for the summary
693 UBC 0 : warn!(
694 0 : "Used more than one page size for summary buffer: {}",
695 0 : buf.len()
696 0 : );
697 CBC 15721 : }
698 15721 : file.seek(SeekFrom::Start(0)).await?;
699 15721 : file.write_all(&buf).await?;
700 :
701 15721 : let metadata = file
702 15721 : .metadata()
703 UBC 0 : .await
704 CBC 15721 : .context("get file metadata to determine size")?;
705 :
706 : // 5GB limit for objects without multipart upload (which we don't want to use)
707 : // Make it a little bit below to account for differing GB units
708 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
709 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
710 15721 : ensure!(
711 15721 : metadata.len() <= S3_UPLOAD_LIMIT,
712 UBC 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
713 0 : file.path,
714 0 : metadata.len()
715 : );
716 :
717 : // Note: Because we opened the file in write-only mode, we cannot
718 : // reuse the same VirtualFile for reading later. That's why we don't
719 : // set inner.file here. The first read will have to re-open it.
720 CBC 15721 : let layer = DeltaLayer {
721 15721 : path_or_conf: PathOrConf::Conf(self.conf),
722 15721 : desc: PersistentLayerDesc::new_delta(
723 15721 : self.tenant_id,
724 15721 : self.timeline_id,
725 15721 : self.key_start..key_end,
726 15721 : self.lsn_range.clone(),
727 15721 : metadata.len(),
728 15721 : ),
729 15721 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
730 15721 : inner: OnceCell::new(),
731 15721 : };
732 15721 :
733 15721 : // fsync the file
734 15721 : file.sync_all().await?;
735 : // Rename the file to its final name
736 : //
737 : // Note: This overwrites any existing file. There shouldn't be any.
738 : // FIXME: throw an error instead?
739 15721 : let final_path = DeltaLayer::path_for(
740 15721 : &PathOrConf::Conf(self.conf),
741 15721 : &self.tenant_id,
742 15721 : &self.timeline_id,
743 15721 : &DeltaFileName {
744 15721 : key_range: self.key_start..key_end,
745 15721 : lsn_range: self.lsn_range,
746 15721 : },
747 15721 : );
748 15721 : std::fs::rename(self.path, &final_path)?;
749 :
750 UBC 0 : trace!("created delta layer {final_path}");
751 :
752 CBC 15721 : Ok(layer)
753 15721 : }
754 : }
755 :
756 : /// A builder object for constructing a new delta layer.
757 : ///
758 : /// Usage:
759 : ///
760 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
761 : ///
762 : /// 2. Write the contents by calling `put_value` for every page
763 : /// version to store in the layer.
764 : ///
765 : /// 3. Call `finish`.
766 : ///
767 : /// # Note
768 : ///
769 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
770 : /// possible for the writer to drop before `finish` is actually called. So this
771 : /// could lead to odd temporary files in the directory, exhausting file system.
772 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
773 : /// implementation that cleans up the temporary file in failure. It's not
774 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
775 : /// out some fields, making it impossible to implement `Drop`.
776 : ///
777 : #[must_use]
778 : pub struct DeltaLayerWriter {
779 : inner: Option<DeltaLayerWriterInner>,
780 : }
781 :
782 : impl DeltaLayerWriter {
783 : ///
784 : /// Start building a new delta layer.
785 : ///
786 15728 : pub async fn new(
787 15728 : conf: &'static PageServerConf,
788 15728 : timeline_id: TimelineId,
789 15728 : tenant_id: TenantId,
790 15728 : key_start: Key,
791 15728 : lsn_range: Range<Lsn>,
792 15728 : ) -> anyhow::Result<Self> {
793 : Ok(Self {
794 : inner: Some(
795 15728 : DeltaLayerWriterInner::new(conf, timeline_id, tenant_id, key_start, lsn_range)
796 UBC 0 : .await?,
797 : ),
798 : })
799 CBC 15728 : }
800 :
801 : ///
802 : /// Append a key-value pair to the file.
803 : ///
804 : /// The values must be appended in key, lsn order.
805 : ///
806 29577866 : pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
807 29577757 : self.inner.as_mut().unwrap().put_value(key, lsn, val).await
808 29577757 : }
809 :
810 53152325 : pub async fn put_value_bytes(
811 53152325 : &mut self,
812 53152325 : key: Key,
813 53152325 : lsn: Lsn,
814 53152325 : val: &[u8],
815 53152325 : will_init: bool,
816 53152325 : ) -> anyhow::Result<()> {
817 53152193 : self.inner
818 53152193 : .as_mut()
819 53152193 : .unwrap()
820 53152193 : .put_value_bytes(key, lsn, val, will_init)
821 UBC 0 : .await
822 CBC 53152191 : }
823 :
824 1870701 : pub fn size(&self) -> u64 {
825 1870701 : self.inner.as_ref().unwrap().size()
826 1870701 : }
827 :
828 : ///
829 : /// Finish writing the delta layer.
830 : ///
831 15722 : pub async fn finish(mut self, key_end: Key) -> anyhow::Result<DeltaLayer> {
832 15721 : self.inner.take().unwrap().finish(key_end).await
833 15721 : }
834 : }
835 :
836 : impl Drop for DeltaLayerWriter {
837 : fn drop(&mut self) {
838 15721 : if let Some(inner) = self.inner.take() {
839 UBC 0 : // We want to remove the virtual file here, so it's fine to not
840 0 : // having completely flushed unwritten data.
841 0 : let vfile = inner.blob_writer.into_inner_no_flush();
842 0 : vfile.remove();
843 CBC 15721 : }
844 15721 : }
845 : }
846 :
847 : impl DeltaLayerInner {
848 10807 : pub(super) async fn load(
849 10807 : path: &Utf8Path,
850 10807 : summary: Option<Summary>,
851 10807 : ctx: &RequestContext,
852 10807 : ) -> anyhow::Result<Self> {
853 10807 : let file = VirtualFile::open(path)
854 UBC 0 : .await
855 CBC 10807 : .with_context(|| format!("Failed to open file '{path}'"))?;
856 10807 : let file = FileBlockReader::new(file);
857 :
858 10807 : let summary_blk = file.read_blk(0, ctx).await?;
859 10807 : let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
860 :
861 10807 : if let Some(mut expected_summary) = summary {
862 : // production code path
863 10807 : expected_summary.index_start_blk = actual_summary.index_start_blk;
864 10807 : expected_summary.index_root_blk = actual_summary.index_root_blk;
865 10807 : if actual_summary != expected_summary {
866 10 : bail!(
867 10 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
868 10 : actual_summary,
869 10 : expected_summary
870 10 : );
871 10797 : }
872 UBC 0 : }
873 :
874 CBC 10797 : Ok(DeltaLayerInner {
875 10797 : file,
876 10797 : index_start_blk: actual_summary.index_start_blk,
877 10797 : index_root_blk: actual_summary.index_root_blk,
878 10797 : })
879 10807 : }
880 :
881 23419360 : pub(super) async fn get_value_reconstruct_data(
882 23419360 : &self,
883 23419360 : key: Key,
884 23419360 : lsn_range: Range<Lsn>,
885 23419360 : reconstruct_state: &mut ValueReconstructState,
886 23419360 : ctx: &RequestContext,
887 23419360 : ) -> anyhow::Result<ValueReconstructResult> {
888 23419330 : let mut need_image = true;
889 23419330 : // Scan the page versions backwards, starting from `lsn`.
890 23419330 : let file = &self.file;
891 23419330 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
892 23419330 : self.index_start_blk,
893 23419330 : self.index_root_blk,
894 23419330 : file,
895 23419330 : );
896 23419330 : let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
897 23419330 :
898 23419330 : let mut offsets: Vec<(Lsn, u64)> = Vec::new();
899 23419330 :
900 23419330 : tree_reader
901 23419330 : .visit(
902 23419330 : &search_key.0,
903 23419330 : VisitDirection::Backwards,
904 140907650 : |key, value| {
905 140907650 : let blob_ref = BlobRef(value);
906 140907650 : if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
907 8784530 : return false;
908 132123120 : }
909 132123120 : let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
910 132123120 : if entry_lsn < lsn_range.start {
911 689871 : return false;
912 131433249 : }
913 131433249 : offsets.push((entry_lsn, blob_ref.pos()));
914 131433249 :
915 131433249 : !blob_ref.will_init()
916 140907650 : },
917 23419330 : &RequestContextBuilder::extend(ctx)
918 23419330 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
919 23419330 : .build(),
920 23419330 : )
921 277660 : .await?;
922 :
923 23419337 : let ctx = &RequestContextBuilder::extend(ctx)
924 23419337 : .page_content_kind(PageContentKind::DeltaLayerValue)
925 23419337 : .build();
926 23419337 :
927 23419337 : // Ok, 'offsets' now contains the offsets of all the entries we need to read
928 23419337 : let cursor = file.block_cursor();
929 23419337 : let mut buf = Vec::new();
930 152135744 : for (entry_lsn, pos) in offsets {
931 131432736 : cursor
932 131432736 : .read_blob_into_buf(pos, &mut buf, ctx)
933 1840384 : .await
934 131432732 : .with_context(|| {
935 UBC 0 : format!("Failed to read blob from virtual file {}", file.file.path)
936 CBC 131432732 : })?;
937 131432732 : let val = Value::des(&buf).with_context(|| {
938 UBC 0 : format!(
939 0 : "Failed to deserialize file blob from virtual file {}",
940 0 : file.file.path
941 0 : )
942 CBC 131432732 : })?;
943 131432732 : match val {
944 1434401 : Value::Image(img) => {
945 1434401 : reconstruct_state.img = Some((entry_lsn, img));
946 1434401 : need_image = false;
947 1434401 : break;
948 : }
949 129998331 : Value::WalRecord(rec) => {
950 129998331 : let will_init = rec.will_init();
951 129998331 : reconstruct_state.records.push((entry_lsn, rec));
952 129998331 : if will_init {
953 : // This WAL record initializes the page, so no need to go further back
954 1281924 : need_image = false;
955 1281924 : break;
956 128716407 : }
957 : }
958 : }
959 : }
960 :
961 : // If an older page image is needed to reconstruct the page, let the
962 : // caller know.
963 23419333 : if need_image {
964 20703008 : Ok(ValueReconstructResult::Continue)
965 : } else {
966 2716325 : Ok(ValueReconstructResult::Complete)
967 : }
968 23419333 : }
969 :
970 4335 : pub(super) async fn load_keys<'a, 'b, T: AsRef<DeltaLayerInner> + Clone>(
971 4335 : this: &'a T,
972 4335 : ctx: &'b RequestContext,
973 4335 : ) -> Result<Vec<DeltaEntry<'a>>> {
974 4335 : let dl = this.as_ref();
975 4335 : let file = &dl.file;
976 4335 :
977 4335 : let tree_reader =
978 4335 : DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file);
979 4335 :
980 4335 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
981 4335 :
982 4335 : tree_reader
983 4335 : .visit(
984 4335 : &[0u8; DELTA_KEY_SIZE],
985 4335 : VisitDirection::Forwards,
986 30235446 : |key, value| {
987 30235446 : let delta_key = DeltaKey::from_slice(key);
988 30235446 : let val_ref = ValueRef {
989 30235446 : blob_ref: BlobRef(value),
990 30235446 : reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
991 30235446 : Adapter(dl),
992 30235446 : )),
993 30235446 : };
994 30235446 : let pos = BlobRef(value).pos();
995 30235446 : if let Some(last) = all_keys.last_mut() {
996 30231111 : // subtract offset of the current and last entries to get the size
997 30231111 : // of the value associated with this (key, lsn) tuple
998 30231111 : let first_pos = last.size;
999 30231111 : last.size = pos - first_pos;
1000 30231111 : }
1001 30235446 : let entry = DeltaEntry {
1002 30235446 : key: delta_key.key(),
1003 30235446 : lsn: delta_key.lsn(),
1004 30235446 : size: pos,
1005 30235446 : val: val_ref,
1006 30235446 : };
1007 30235446 : all_keys.push(entry);
1008 30235446 : true
1009 30235446 : },
1010 4335 : &RequestContextBuilder::extend(ctx)
1011 4335 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1012 4335 : .build(),
1013 4335 : )
1014 1504 : .await?;
1015 4335 : if let Some(last) = all_keys.last_mut() {
1016 4335 : // Last key occupies all space till end of value storage,
1017 4335 : // which corresponds to beginning of the index
1018 4335 : last.size = dl.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
1019 4335 : }
1020 4335 : Ok(all_keys)
1021 4335 : }
1022 : }
1023 :
1024 : /// A set of data associated with a delta layer key and its value
1025 : pub struct DeltaEntry<'a> {
1026 : pub key: Key,
1027 : pub lsn: Lsn,
1028 : /// Size of the stored value
1029 : pub size: u64,
1030 : /// Reference to the on-disk value
1031 : pub val: ValueRef<'a>,
1032 : }
1033 :
1034 : /// Reference to an on-disk value
1035 : pub struct ValueRef<'a> {
1036 : blob_ref: BlobRef,
1037 : reader: BlockCursor<'a>,
1038 : }
1039 :
1040 : impl<'a> ValueRef<'a> {
1041 : /// Loads the value from disk
1042 29577867 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1043 : // theoretically we *could* record an access time for each, but it does not really matter
1044 29577759 : let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
1045 29577759 : let val = Value::des(&buf)?;
1046 29577759 : Ok(val)
1047 29577759 : }
1048 : }
1049 :
1050 : pub(crate) struct Adapter<T>(T);
1051 :
1052 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1053 31066841 : pub(crate) async fn read_blk(
1054 31066841 : &self,
1055 31066841 : blknum: u32,
1056 31066841 : ctx: &RequestContext,
1057 31066841 : ) -> Result<BlockLease, std::io::Error> {
1058 31066841 : self.0.as_ref().file.read_blk(blknum, ctx).await
1059 31066841 : }
1060 : }
|