Line data Source code
1 : //! A DeltaLayer represents a collection of WAL records or page images in a range of
2 : //! LSNs, and in a range of Keys. It is stored on a file on disk.
3 : //!
4 : //! Usually a delta layer only contains differences, in the form of WAL records
5 : //! against a base LSN. However, if a relation extended or a whole new relation
6 : //! is created, there would be no base for the new pages. The entries for them
7 : //! must be page images or WAL records with the 'will_init' flag set, so that
8 : //! they can be replayed without referring to an older page version.
9 : //!
10 : //! The delta files are stored in `timelines/<timeline_id>` directory. Currently,
11 : //! there are no subdirectories, and each delta file is named like this:
12 : //!
13 : //! ```text
14 : //! <key start>-<key end>__<start LSN>-<end LSN>
15 : //! ```
16 : //!
17 : //! For example:
18 : //!
19 : //! ```text
20 : //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
21 : //! ```
22 : //!
23 : //! Every delta file consists of three parts: "summary", "index", and
24 : //! "values". The summary is a fixed size header at the beginning of the file,
25 : //! and it contains basic information about the layer, and offsets to the other
26 : //! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
27 : //! "values" part. The actual page images and WAL records are stored in the
28 : //! "values" part.
29 : //!
30 : use crate::config::PageServerConf;
31 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
32 : use crate::page_cache::{self, FileId, PAGE_SZ};
33 : use crate::repository::{Key, Value, KEY_SIZE};
34 : use crate::tenant::blob_io::BlobWriter;
35 : use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
36 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
37 : use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
38 : use crate::tenant::timeline::GetVectoredError;
39 : use crate::tenant::vectored_blob_io::{
40 : BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
41 : };
42 : use crate::tenant::{PageReconstructError, Timeline};
43 : use crate::virtual_file::{self, VirtualFile};
44 : use crate::{walrecord, TEMP_FILE_SUFFIX};
45 : use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
46 : use anyhow::{anyhow, bail, ensure, Context, Result};
47 : use bytes::BytesMut;
48 : use camino::{Utf8Path, Utf8PathBuf};
49 : use pageserver_api::keyspace::KeySpace;
50 : use pageserver_api::models::LayerAccessKind;
51 : use pageserver_api::shard::TenantShardId;
52 : use rand::{distributions::Alphanumeric, Rng};
53 : use serde::{Deserialize, Serialize};
54 : use std::fs::File;
55 : use std::io::SeekFrom;
56 : use std::ops::Range;
57 : use std::os::unix::fs::FileExt;
58 : use std::sync::Arc;
59 : use tokio::sync::OnceCell;
60 : use tracing::*;
61 :
62 : use utils::{
63 : bin_ser::BeSer,
64 : id::{TenantId, TimelineId},
65 : lsn::Lsn,
66 : };
67 :
68 : use super::{
69 : AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
70 : };
71 :
72 : ///
73 : /// Header stored in the beginning of the file
74 : ///
75 : /// After this comes the 'values' part, starting on block 1. After that,
76 : /// the 'index' starts at the block indicated by 'index_start_blk'
77 : ///
78 480 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
79 : pub struct Summary {
80 : /// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
81 : pub magic: u16,
82 : pub format_version: u16,
83 :
84 : pub tenant_id: TenantId,
85 : pub timeline_id: TimelineId,
86 : pub key_range: Range<Key>,
87 : pub lsn_range: Range<Lsn>,
88 :
89 : /// Block number where the 'index' part of the file begins.
90 : pub index_start_blk: u32,
91 : /// Block within the 'index', where the B-tree root page is stored
92 : pub index_root_blk: u32,
93 : }
94 :
95 : impl From<&DeltaLayer> for Summary {
96 0 : fn from(layer: &DeltaLayer) -> Self {
97 0 : Self::expected(
98 0 : layer.desc.tenant_shard_id.tenant_id,
99 0 : layer.desc.timeline_id,
100 0 : layer.desc.key_range.clone(),
101 0 : layer.desc.lsn_range.clone(),
102 0 : )
103 0 : }
104 : }
105 :
106 : impl Summary {
107 440 : pub(super) fn expected(
108 440 : tenant_id: TenantId,
109 440 : timeline_id: TimelineId,
110 440 : keys: Range<Key>,
111 440 : lsns: Range<Lsn>,
112 440 : ) -> Self {
113 440 : Self {
114 440 : magic: DELTA_FILE_MAGIC,
115 440 : format_version: STORAGE_FORMAT_VERSION,
116 440 :
117 440 : tenant_id,
118 440 : timeline_id,
119 440 : key_range: keys,
120 440 : lsn_range: lsns,
121 440 :
122 440 : index_start_blk: 0,
123 440 : index_root_blk: 0,
124 440 : }
125 440 : }
126 : }
127 :
128 : // Flag indicating that this version initialize the page
129 : const WILL_INIT: u64 = 1;
130 :
131 : /// Struct representing reference to BLOB in layers. Reference contains BLOB
132 : /// offset, and for WAL records it also contains `will_init` flag. The flag
133 : /// helps to determine the range of records that needs to be applied, without
134 : /// reading/deserializing records themselves.
135 0 : #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
136 : pub struct BlobRef(pub u64);
137 :
138 : impl BlobRef {
139 72401 : pub fn will_init(&self) -> bool {
140 72401 : (self.0 & WILL_INIT) != 0
141 72401 : }
142 :
143 4276417 : pub fn pos(&self) -> u64 {
144 4276417 : self.0 >> 1
145 4276417 : }
146 :
147 4311466 : pub fn new(pos: u64, will_init: bool) -> BlobRef {
148 4311466 : let mut blob_ref = pos << 1;
149 4311466 : if will_init {
150 4311466 : blob_ref |= WILL_INIT;
151 4311466 : }
152 4311466 : BlobRef(blob_ref)
153 4311466 : }
154 : }
155 :
156 : pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
157 : struct DeltaKey([u8; DELTA_KEY_SIZE]);
158 :
159 : /// This is the key of the B-tree index stored in the delta layer. It consists
160 : /// of the serialized representation of a Key and LSN.
161 : impl DeltaKey {
162 2102008 : fn from_slice(buf: &[u8]) -> Self {
163 2102008 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
164 2102008 : bytes.copy_from_slice(buf);
165 2102008 : DeltaKey(bytes)
166 2102008 : }
167 :
168 4434992 : fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
169 4434992 : let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
170 4434992 : key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
171 4434992 : bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
172 4434992 : DeltaKey(bytes)
173 4434992 : }
174 :
175 2102008 : fn key(&self) -> Key {
176 2102008 : Key::from_slice(&self.0)
177 2102008 : }
178 :
179 2102008 : fn lsn(&self) -> Lsn {
180 2102008 : Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
181 2102008 : }
182 :
183 72401 : fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
184 72401 : let mut lsn_buf = [0u8; 8];
185 72401 : lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
186 72401 : Lsn(u64::from_be_bytes(lsn_buf))
187 72401 : }
188 : }
189 :
190 : /// This is used only from `pagectl`. Within pageserver, all layers are
191 : /// [`crate::tenant::storage_layer::Layer`], which can hold a [`DeltaLayerInner`].
192 : pub struct DeltaLayer {
193 : path: Utf8PathBuf,
194 : pub desc: PersistentLayerDesc,
195 : access_stats: LayerAccessStats,
196 : inner: OnceCell<Arc<DeltaLayerInner>>,
197 : }
198 :
199 : impl std::fmt::Debug for DeltaLayer {
200 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 0 : use super::RangeDisplayDebug;
202 0 :
203 0 : f.debug_struct("DeltaLayer")
204 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
205 0 : .field("lsn_range", &self.desc.lsn_range)
206 0 : .field("file_size", &self.desc.file_size)
207 0 : .field("inner", &self.inner)
208 0 : .finish()
209 0 : }
210 : }
211 :
212 : /// `DeltaLayerInner` is the in-memory data structure associated with an on-disk delta
213 : /// file.
214 : pub struct DeltaLayerInner {
215 : // values copied from summary
216 : index_start_blk: u32,
217 : index_root_blk: u32,
218 :
219 : file: VirtualFile,
220 : file_id: FileId,
221 :
222 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
223 : }
224 :
225 : impl std::fmt::Debug for DeltaLayerInner {
226 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 0 : f.debug_struct("DeltaLayerInner")
228 0 : .field("index_start_blk", &self.index_start_blk)
229 0 : .field("index_root_blk", &self.index_root_blk)
230 0 : .finish()
231 0 : }
232 : }
233 :
234 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
235 : impl std::fmt::Display for DeltaLayer {
236 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
237 0 : write!(f, "{}", self.layer_desc().short_id())
238 0 : }
239 : }
240 :
241 : impl AsLayerDesc for DeltaLayer {
242 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
243 0 : &self.desc
244 0 : }
245 : }
246 :
247 : impl DeltaLayer {
248 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
249 0 : self.desc.dump();
250 0 :
251 0 : if !verbose {
252 0 : return Ok(());
253 0 : }
254 :
255 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
256 :
257 0 : inner.dump(ctx).await
258 0 : }
259 :
260 480 : fn temp_path_for(
261 480 : conf: &PageServerConf,
262 480 : tenant_shard_id: &TenantShardId,
263 480 : timeline_id: &TimelineId,
264 480 : key_start: Key,
265 480 : lsn_range: &Range<Lsn>,
266 480 : ) -> Utf8PathBuf {
267 480 : let rand_string: String = rand::thread_rng()
268 480 : .sample_iter(&Alphanumeric)
269 480 : .take(8)
270 480 : .map(char::from)
271 480 : .collect();
272 480 :
273 480 : conf.timeline_path(tenant_shard_id, timeline_id)
274 480 : .join(format!(
275 480 : "{}-XXX__{:016X}-{:016X}.{}.{}",
276 480 : key_start,
277 480 : u64::from(lsn_range.start),
278 480 : u64::from(lsn_range.end),
279 480 : rand_string,
280 480 : TEMP_FILE_SUFFIX,
281 480 : ))
282 480 : }
283 :
284 : ///
285 : /// Open the underlying file and read the metadata into memory, if it's
286 : /// not loaded already.
287 : ///
288 0 : async fn load(
289 0 : &self,
290 0 : access_kind: LayerAccessKind,
291 0 : ctx: &RequestContext,
292 0 : ) -> Result<&Arc<DeltaLayerInner>> {
293 0 : self.access_stats.record_access(access_kind, ctx);
294 0 : // Quick exit if already loaded
295 0 : self.inner
296 0 : .get_or_try_init(|| self.load_inner(ctx))
297 0 : .await
298 0 : .with_context(|| format!("Failed to load delta layer {}", self.path()))
299 0 : }
300 :
301 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
302 0 : let path = self.path();
303 :
304 0 : let loaded = DeltaLayerInner::load(&path, None, None, ctx)
305 0 : .await
306 0 : .and_then(|res| res)?;
307 :
308 : // not production code
309 0 : let actual_filename = path.file_name().unwrap().to_owned();
310 0 : let expected_filename = self.layer_desc().filename().file_name();
311 0 :
312 0 : if actual_filename != expected_filename {
313 0 : println!("warning: filename does not match what is expected from in-file summary");
314 0 : println!("actual: {:?}", actual_filename);
315 0 : println!("expected: {:?}", expected_filename);
316 0 : }
317 :
318 0 : Ok(Arc::new(loaded))
319 0 : }
320 :
321 : /// Create a DeltaLayer struct representing an existing file on disk.
322 : ///
323 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
324 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
325 0 : let mut summary_buf = vec![0; PAGE_SZ];
326 0 : file.read_exact_at(&mut summary_buf, 0)?;
327 0 : let summary = Summary::des_prefix(&summary_buf)?;
328 :
329 0 : let metadata = file
330 0 : .metadata()
331 0 : .context("get file metadata to determine size")?;
332 :
333 : // This function is never used for constructing layers in a running pageserver,
334 : // so it does not need an accurate TenantShardId.
335 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
336 0 :
337 0 : Ok(DeltaLayer {
338 0 : path: path.to_path_buf(),
339 0 : desc: PersistentLayerDesc::new_delta(
340 0 : tenant_shard_id,
341 0 : summary.timeline_id,
342 0 : summary.key_range,
343 0 : summary.lsn_range,
344 0 : metadata.len(),
345 0 : ),
346 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
347 0 : inner: OnceCell::new(),
348 0 : })
349 0 : }
350 :
351 : /// Path to the layer file in pageserver workdir.
352 0 : fn path(&self) -> Utf8PathBuf {
353 0 : self.path.clone()
354 0 : }
355 : }
356 :
357 : /// A builder object for constructing a new delta layer.
358 : ///
359 : /// Usage:
360 : ///
361 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
362 : ///
363 : /// 2. Write the contents by calling `put_value` for every page
364 : /// version to store in the layer.
365 : ///
366 : /// 3. Call `finish`.
367 : ///
368 : struct DeltaLayerWriterInner {
369 : conf: &'static PageServerConf,
370 : pub path: Utf8PathBuf,
371 : timeline_id: TimelineId,
372 : tenant_shard_id: TenantShardId,
373 :
374 : key_start: Key,
375 : lsn_range: Range<Lsn>,
376 :
377 : tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
378 :
379 : blob_writer: BlobWriter<true>,
380 : }
381 :
382 : impl DeltaLayerWriterInner {
383 : ///
384 : /// Start building a new delta layer.
385 : ///
386 480 : async fn new(
387 480 : conf: &'static PageServerConf,
388 480 : timeline_id: TimelineId,
389 480 : tenant_shard_id: TenantShardId,
390 480 : key_start: Key,
391 480 : lsn_range: Range<Lsn>,
392 480 : ) -> anyhow::Result<Self> {
393 480 : // Create the file initially with a temporary filename. We don't know
394 480 : // the end key yet, so we cannot form the final filename yet. We will
395 480 : // rename it when we're done.
396 480 : //
397 480 : // Note: This overwrites any existing file. There shouldn't be any.
398 480 : // FIXME: throw an error instead?
399 480 : let path =
400 480 : DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
401 :
402 480 : let mut file = VirtualFile::create(&path).await?;
403 : // make room for the header block
404 480 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
405 480 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
406 480 :
407 480 : // Initialize the b-tree index builder
408 480 : let block_buf = BlockBuf::new();
409 480 : let tree_builder = DiskBtreeBuilder::new(block_buf);
410 480 :
411 480 : Ok(Self {
412 480 : conf,
413 480 : path,
414 480 : timeline_id,
415 480 : tenant_shard_id,
416 480 : key_start,
417 480 : lsn_range,
418 480 : tree: tree_builder,
419 480 : blob_writer,
420 480 : })
421 480 : }
422 :
423 : ///
424 : /// Append a key-value pair to the file.
425 : ///
426 : /// The values must be appended in key, lsn order.
427 : ///
428 2102000 : async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
429 2102000 : let (_, res) = self
430 2102000 : .put_value_bytes(key, lsn, Value::ser(&val)?, val.will_init())
431 153 : .await;
432 2102000 : res
433 2102000 : }
434 :
435 4311466 : async fn put_value_bytes(
436 4311466 : &mut self,
437 4311466 : key: Key,
438 4311466 : lsn: Lsn,
439 4311466 : val: Vec<u8>,
440 4311466 : will_init: bool,
441 4311466 : ) -> (Vec<u8>, anyhow::Result<()>) {
442 4311466 : assert!(self.lsn_range.start <= lsn);
443 4311466 : let (val, res) = self.blob_writer.write_blob(val).await;
444 4311466 : let off = match res {
445 4311466 : Ok(off) => off,
446 0 : Err(e) => return (val, Err(anyhow::anyhow!(e))),
447 : };
448 :
449 4311466 : let blob_ref = BlobRef::new(off, will_init);
450 4311466 :
451 4311466 : let delta_key = DeltaKey::from_key_lsn(&key, lsn);
452 4311466 : let res = self.tree.append(&delta_key.0, blob_ref.0);
453 4311466 : (val, res.map_err(|e| anyhow::anyhow!(e)))
454 4311466 : }
455 :
456 2009970 : fn size(&self) -> u64 {
457 2009970 : self.blob_writer.size() + self.tree.borrow_writer().size()
458 2009970 : }
459 :
460 : ///
461 : /// Finish writing the delta layer.
462 : ///
463 480 : async fn finish(self, key_end: Key, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
464 480 : let index_start_blk =
465 480 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
466 :
467 480 : let mut file = self.blob_writer.into_inner().await?;
468 :
469 : // Write out the index
470 480 : let (index_root_blk, block_buf) = self.tree.finish()?;
471 480 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
472 0 : .await?;
473 9119 : for buf in block_buf.blocks {
474 8639 : let (_buf, res) = file.write_all(buf).await;
475 8639 : res?;
476 : }
477 480 : assert!(self.lsn_range.start < self.lsn_range.end);
478 : // Fill in the summary on blk 0
479 480 : let summary = Summary {
480 480 : magic: DELTA_FILE_MAGIC,
481 480 : format_version: STORAGE_FORMAT_VERSION,
482 480 : tenant_id: self.tenant_shard_id.tenant_id,
483 480 : timeline_id: self.timeline_id,
484 480 : key_range: self.key_start..key_end,
485 480 : lsn_range: self.lsn_range.clone(),
486 480 : index_start_blk,
487 480 : index_root_blk,
488 480 : };
489 480 :
490 480 : let mut buf = Vec::with_capacity(PAGE_SZ);
491 480 : // TODO: could use smallvec here but it's a pain with Slice<T>
492 480 : Summary::ser_into(&summary, &mut buf)?;
493 480 : file.seek(SeekFrom::Start(0)).await?;
494 480 : let (_buf, res) = file.write_all(buf).await;
495 480 : res?;
496 :
497 480 : let metadata = file
498 480 : .metadata()
499 0 : .await
500 480 : .context("get file metadata to determine size")?;
501 :
502 : // 5GB limit for objects without multipart upload (which we don't want to use)
503 : // Make it a little bit below to account for differing GB units
504 : // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html
505 : const S3_UPLOAD_LIMIT: u64 = 4_500_000_000;
506 480 : ensure!(
507 480 : metadata.len() <= S3_UPLOAD_LIMIT,
508 0 : "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
509 0 : file.path,
510 0 : metadata.len()
511 : );
512 :
513 : // Note: Because we opened the file in write-only mode, we cannot
514 : // reuse the same VirtualFile for reading later. That's why we don't
515 : // set inner.file here. The first read will have to re-open it.
516 :
517 480 : let desc = PersistentLayerDesc::new_delta(
518 480 : self.tenant_shard_id,
519 480 : self.timeline_id,
520 480 : self.key_start..key_end,
521 480 : self.lsn_range.clone(),
522 480 : metadata.len(),
523 480 : );
524 480 :
525 480 : // fsync the file
526 480 : file.sync_all().await?;
527 :
528 480 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
529 :
530 0 : trace!("created delta layer {}", layer.local_path());
531 :
532 480 : Ok(layer)
533 480 : }
534 : }
535 :
536 : /// A builder object for constructing a new delta layer.
537 : ///
538 : /// Usage:
539 : ///
540 : /// 1. Create the DeltaLayerWriter by calling DeltaLayerWriter::new(...)
541 : ///
542 : /// 2. Write the contents by calling `put_value` for every page
543 : /// version to store in the layer.
544 : ///
545 : /// 3. Call `finish`.
546 : ///
547 : /// # Note
548 : ///
549 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
550 : /// possible for the writer to drop before `finish` is actually called. So this
551 : /// could lead to odd temporary files in the directory, exhausting file system.
552 : /// This structure wraps `DeltaLayerWriterInner` and also contains `Drop`
553 : /// implementation that cleans up the temporary file in failure. It's not
554 : /// possible to do this directly in `DeltaLayerWriterInner` since `finish` moves
555 : /// out some fields, making it impossible to implement `Drop`.
556 : ///
557 : #[must_use]
558 : pub struct DeltaLayerWriter {
559 : inner: Option<DeltaLayerWriterInner>,
560 : }
561 :
562 : impl DeltaLayerWriter {
563 : ///
564 : /// Start building a new delta layer.
565 : ///
566 480 : pub async fn new(
567 480 : conf: &'static PageServerConf,
568 480 : timeline_id: TimelineId,
569 480 : tenant_shard_id: TenantShardId,
570 480 : key_start: Key,
571 480 : lsn_range: Range<Lsn>,
572 480 : ) -> anyhow::Result<Self> {
573 480 : Ok(Self {
574 480 : inner: Some(
575 480 : DeltaLayerWriterInner::new(
576 480 : conf,
577 480 : timeline_id,
578 480 : tenant_shard_id,
579 480 : key_start,
580 480 : lsn_range,
581 480 : )
582 282 : .await?,
583 : ),
584 : })
585 480 : }
586 :
587 : ///
588 : /// Append a key-value pair to the file.
589 : ///
590 : /// The values must be appended in key, lsn order.
591 : ///
592 2102000 : pub async fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> anyhow::Result<()> {
593 2102000 : self.inner.as_mut().unwrap().put_value(key, lsn, val).await
594 2102000 : }
595 :
596 2209466 : pub async fn put_value_bytes(
597 2209466 : &mut self,
598 2209466 : key: Key,
599 2209466 : lsn: Lsn,
600 2209466 : val: Vec<u8>,
601 2209466 : will_init: bool,
602 2209466 : ) -> (Vec<u8>, anyhow::Result<()>) {
603 2209466 : self.inner
604 2209466 : .as_mut()
605 2209466 : .unwrap()
606 2209466 : .put_value_bytes(key, lsn, val, will_init)
607 174 : .await
608 2209466 : }
609 :
610 2009970 : pub fn size(&self) -> u64 {
611 2009970 : self.inner.as_ref().unwrap().size()
612 2009970 : }
613 :
614 : ///
615 : /// Finish writing the delta layer.
616 : ///
617 480 : pub(crate) async fn finish(
618 480 : mut self,
619 480 : key_end: Key,
620 480 : timeline: &Arc<Timeline>,
621 480 : ) -> anyhow::Result<ResidentLayer> {
622 480 : let inner = self.inner.take().unwrap();
623 480 : let temp_path = inner.path.clone();
624 480 : let result = inner.finish(key_end, timeline).await;
625 : // The delta layer files can sometimes be really large. Clean them up.
626 480 : if result.is_err() {
627 0 : tracing::warn!(
628 0 : "Cleaning up temporary delta file {temp_path} after error during writing"
629 0 : );
630 0 : if let Err(e) = std::fs::remove_file(&temp_path) {
631 0 : tracing::warn!("Error cleaning up temporary delta layer file {temp_path}: {e:?}")
632 0 : }
633 480 : }
634 480 : result
635 480 : }
636 : }
637 :
638 : impl Drop for DeltaLayerWriter {
639 480 : fn drop(&mut self) {
640 480 : if let Some(inner) = self.inner.take() {
641 0 : // We want to remove the virtual file here, so it's fine to not
642 0 : // having completely flushed unwritten data.
643 0 : let vfile = inner.blob_writer.into_inner_no_flush();
644 0 : vfile.remove();
645 480 : }
646 480 : }
647 : }
648 :
649 0 : #[derive(thiserror::Error, Debug)]
650 : pub enum RewriteSummaryError {
651 : #[error("magic mismatch")]
652 : MagicMismatch,
653 : #[error(transparent)]
654 : Other(#[from] anyhow::Error),
655 : }
656 :
657 : impl From<std::io::Error> for RewriteSummaryError {
658 0 : fn from(e: std::io::Error) -> Self {
659 0 : Self::Other(anyhow::anyhow!(e))
660 0 : }
661 : }
662 :
663 : impl DeltaLayer {
664 0 : pub async fn rewrite_summary<F>(
665 0 : path: &Utf8Path,
666 0 : rewrite: F,
667 0 : ctx: &RequestContext,
668 0 : ) -> Result<(), RewriteSummaryError>
669 0 : where
670 0 : F: Fn(Summary) -> Summary,
671 0 : {
672 0 : let mut file = VirtualFile::open_with_options(
673 0 : path,
674 0 : virtual_file::OpenOptions::new().read(true).write(true),
675 0 : )
676 0 : .await
677 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
678 0 : let file_id = page_cache::next_file_id();
679 0 : let block_reader = FileBlockReader::new(&file, file_id);
680 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
681 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
682 0 : if actual_summary.magic != DELTA_FILE_MAGIC {
683 0 : return Err(RewriteSummaryError::MagicMismatch);
684 0 : }
685 0 :
686 0 : let new_summary = rewrite(actual_summary);
687 0 :
688 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
689 0 : // TODO: could use smallvec here, but it's a pain with Slice<T>
690 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
691 0 : file.seek(SeekFrom::Start(0)).await?;
692 0 : let (_buf, res) = file.write_all(buf).await;
693 0 : res?;
694 0 : Ok(())
695 0 : }
696 : }
697 :
698 : impl DeltaLayerInner {
699 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
700 : /// - inner has the success or transient failure
701 : /// - outer has the permanent failure
702 440 : pub(super) async fn load(
703 440 : path: &Utf8Path,
704 440 : summary: Option<Summary>,
705 440 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
706 440 : ctx: &RequestContext,
707 440 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
708 440 : let file = match VirtualFile::open(path).await {
709 440 : Ok(file) => file,
710 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
711 : };
712 440 : let file_id = page_cache::next_file_id();
713 440 :
714 440 : let block_reader = FileBlockReader::new(&file, file_id);
715 :
716 440 : let summary_blk = match block_reader.read_blk(0, ctx).await {
717 440 : Ok(blk) => blk,
718 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
719 : };
720 :
721 : // TODO: this should be an assertion instead; see ImageLayerInner::load
722 440 : let actual_summary =
723 440 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
724 :
725 440 : if let Some(mut expected_summary) = summary {
726 : // production code path
727 440 : expected_summary.index_start_blk = actual_summary.index_start_blk;
728 440 : expected_summary.index_root_blk = actual_summary.index_root_blk;
729 440 : if actual_summary != expected_summary {
730 0 : bail!(
731 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
732 0 : actual_summary,
733 0 : expected_summary
734 0 : );
735 440 : }
736 0 : }
737 :
738 440 : Ok(Ok(DeltaLayerInner {
739 440 : file,
740 440 : file_id,
741 440 : index_start_blk: actual_summary.index_start_blk,
742 440 : index_root_blk: actual_summary.index_root_blk,
743 440 : max_vectored_read_bytes,
744 440 : }))
745 440 : }
746 :
747 123516 : pub(super) async fn get_value_reconstruct_data(
748 123516 : &self,
749 123516 : key: Key,
750 123516 : lsn_range: Range<Lsn>,
751 123516 : reconstruct_state: &mut ValueReconstructState,
752 123516 : ctx: &RequestContext,
753 123516 : ) -> anyhow::Result<ValueReconstructResult> {
754 123516 : let mut need_image = true;
755 123516 : // Scan the page versions backwards, starting from `lsn`.
756 123516 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
757 123516 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
758 123516 : self.index_start_blk,
759 123516 : self.index_root_blk,
760 123516 : &block_reader,
761 123516 : );
762 123516 : let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
763 123516 :
764 123516 : let mut offsets: Vec<(Lsn, u64)> = Vec::new();
765 123516 :
766 123516 : tree_reader
767 123516 : .visit(
768 123516 : &search_key.0,
769 123516 : VisitDirection::Backwards,
770 123516 : |key, value| {
771 118341 : let blob_ref = BlobRef(value);
772 118341 : if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
773 46270 : return false;
774 72071 : }
775 72071 : let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
776 72071 : if entry_lsn < lsn_range.start {
777 0 : return false;
778 72071 : }
779 72071 : offsets.push((entry_lsn, blob_ref.pos()));
780 72071 :
781 72071 : !blob_ref.will_init()
782 123516 : },
783 123516 : &RequestContextBuilder::extend(ctx)
784 123516 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
785 123516 : .build(),
786 123516 : )
787 17171 : .await?;
788 :
789 123516 : let ctx = &RequestContextBuilder::extend(ctx)
790 123516 : .page_content_kind(PageContentKind::DeltaLayerValue)
791 123516 : .build();
792 123516 :
793 123516 : // Ok, 'offsets' now contains the offsets of all the entries we need to read
794 123516 : let cursor = block_reader.block_cursor();
795 123516 : let mut buf = Vec::new();
796 123516 : for (entry_lsn, pos) in offsets {
797 72071 : cursor
798 72071 : .read_blob_into_buf(pos, &mut buf, ctx)
799 5476 : .await
800 72071 : .with_context(|| {
801 0 : format!("Failed to read blob from virtual file {}", self.file.path)
802 72071 : })?;
803 72071 : let val = Value::des(&buf).with_context(|| {
804 0 : format!(
805 0 : "Failed to deserialize file blob from virtual file {}",
806 0 : self.file.path
807 0 : )
808 72071 : })?;
809 72071 : match val {
810 72071 : Value::Image(img) => {
811 72071 : reconstruct_state.img = Some((entry_lsn, img));
812 72071 : need_image = false;
813 72071 : break;
814 : }
815 0 : Value::WalRecord(rec) => {
816 0 : let will_init = rec.will_init();
817 0 : reconstruct_state.records.push((entry_lsn, rec));
818 0 : if will_init {
819 : // This WAL record initializes the page, so no need to go further back
820 0 : need_image = false;
821 0 : break;
822 0 : }
823 : }
824 : }
825 : }
826 :
827 : // If an older page image is needed to reconstruct the page, let the
828 : // caller know.
829 123516 : if need_image {
830 51445 : Ok(ValueReconstructResult::Continue)
831 : } else {
832 72071 : Ok(ValueReconstructResult::Complete)
833 : }
834 123516 : }
835 :
836 : // Look up the keys in the provided keyspace and update
837 : // the reconstruct state with whatever is found.
838 : //
839 : // If the key is cached, go no further than the cached Lsn.
840 : //
841 : // Currently, the index is visited for each range, but this
842 : // can be further optimised to visit the index only once.
843 10 : pub(super) async fn get_values_reconstruct_data(
844 10 : &self,
845 10 : keyspace: KeySpace,
846 10 : lsn_range: Range<Lsn>,
847 10 : reconstruct_state: &mut ValuesReconstructState,
848 10 : ctx: &RequestContext,
849 10 : ) -> Result<(), GetVectoredError> {
850 10 : let reads = self
851 10 : .plan_reads(keyspace, lsn_range, reconstruct_state, ctx)
852 10 : .await
853 10 : .map_err(GetVectoredError::Other)?;
854 :
855 10 : self.do_reads_and_update_state(reads, reconstruct_state)
856 5 : .await;
857 :
858 10 : Ok(())
859 10 : }
860 :
861 10 : async fn plan_reads(
862 10 : &self,
863 10 : keyspace: KeySpace,
864 10 : lsn_range: Range<Lsn>,
865 10 : reconstruct_state: &mut ValuesReconstructState,
866 10 : ctx: &RequestContext,
867 10 : ) -> anyhow::Result<Vec<VectoredRead>> {
868 10 : let mut planner = VectoredReadPlanner::new(
869 10 : self.max_vectored_read_bytes
870 10 : .expect("Layer is loaded with max vectored bytes config")
871 10 : .0
872 10 : .into(),
873 10 : );
874 10 :
875 10 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
876 10 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
877 10 : self.index_start_blk,
878 10 : self.index_root_blk,
879 10 : block_reader,
880 10 : );
881 :
882 10 : for range in keyspace.ranges.iter() {
883 10 : let mut range_end_handled = false;
884 10 :
885 10 : let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
886 10 : tree_reader
887 10 : .visit(
888 10 : &start_key.0,
889 10 : VisitDirection::Forwards,
890 330 : |raw_key, value| {
891 330 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
892 330 : let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
893 330 : let blob_ref = BlobRef(value);
894 330 :
895 330 : assert!(key >= range.start && lsn >= lsn_range.start);
896 :
897 330 : let cached_lsn = reconstruct_state.get_cached_lsn(&key);
898 330 : let flag = {
899 330 : if cached_lsn >= Some(lsn) {
900 0 : BlobFlag::Ignore
901 330 : } else if blob_ref.will_init() {
902 330 : BlobFlag::Replaces
903 : } else {
904 0 : BlobFlag::None
905 : }
906 : };
907 :
908 330 : if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
909 10 : planner.handle_range_end(blob_ref.pos());
910 10 : range_end_handled = true;
911 10 : false
912 : } else {
913 320 : planner.handle(key, lsn, blob_ref.pos(), flag);
914 320 : true
915 : }
916 330 : },
917 10 : &RequestContextBuilder::extend(ctx)
918 10 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
919 10 : .build(),
920 10 : )
921 10 : .await
922 10 : .map_err(|err| anyhow!(err))?;
923 :
924 10 : if !range_end_handled {
925 0 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
926 0 : tracing::info!("Handling range end fallback at {}", payload_end);
927 0 : planner.handle_range_end(payload_end);
928 10 : }
929 : }
930 :
931 10 : Ok(planner.finish())
932 10 : }
933 :
934 10 : async fn do_reads_and_update_state(
935 10 : &self,
936 10 : reads: Vec<VectoredRead>,
937 10 : reconstruct_state: &mut ValuesReconstructState,
938 10 : ) {
939 10 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
940 10 : let mut ignore_key_with_err = None;
941 10 :
942 10 : let max_vectored_read_bytes = self
943 10 : .max_vectored_read_bytes
944 10 : .expect("Layer is loaded with max vectored bytes config")
945 10 : .0
946 10 : .into();
947 10 : let mut buf = Some(BytesMut::with_capacity(max_vectored_read_bytes));
948 :
949 : // Note that reads are processed in reverse order (from highest key+lsn).
950 : // This is the order that `ReconstructState` requires such that it can
951 : // track when a key is done.
952 10 : for read in reads.into_iter().rev() {
953 10 : let res = vectored_blob_reader
954 10 : .read_blobs(&read, buf.take().expect("Should have a buffer"))
955 5 : .await;
956 :
957 10 : let blobs_buf = match res {
958 10 : Ok(blobs_buf) => blobs_buf,
959 0 : Err(err) => {
960 0 : let kind = err.kind();
961 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
962 0 : reconstruct_state.on_key_error(
963 0 : blob_meta.key,
964 0 : PageReconstructError::from(anyhow!(
965 0 : "Failed to read blobs from virtual file {}: {}",
966 0 : self.file.path,
967 0 : kind
968 0 : )),
969 0 : );
970 0 : }
971 :
972 : // We have "lost" the buffer since the lower level IO api
973 : // doesn't return the buffer on error. Allocate a new one.
974 0 : buf = Some(BytesMut::with_capacity(max_vectored_read_bytes));
975 0 :
976 0 : continue;
977 : }
978 : };
979 :
980 320 : for meta in blobs_buf.blobs.iter().rev() {
981 320 : if Some(meta.meta.key) == ignore_key_with_err {
982 0 : continue;
983 320 : }
984 320 :
985 320 : let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
986 320 : let value = match value {
987 320 : Ok(v) => v,
988 0 : Err(e) => {
989 0 : reconstruct_state.on_key_error(
990 0 : meta.meta.key,
991 0 : PageReconstructError::from(anyhow!(e).context(format!(
992 0 : "Failed to deserialize blob from virtual file {}",
993 0 : self.file.path,
994 0 : ))),
995 0 : );
996 0 :
997 0 : ignore_key_with_err = Some(meta.meta.key);
998 0 : continue;
999 : }
1000 : };
1001 :
1002 : // Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
1003 : // state, no further updates shall be made to it. The call below will
1004 : // panic if the invariant is violated.
1005 320 : reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
1006 : }
1007 :
1008 10 : buf = Some(blobs_buf.buf);
1009 : }
1010 10 : }
1011 :
1012 304 : pub(super) async fn load_keys<'a>(
1013 304 : &'a self,
1014 304 : ctx: &RequestContext,
1015 304 : ) -> Result<Vec<DeltaEntry<'a>>> {
1016 304 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1017 304 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1018 304 : self.index_start_blk,
1019 304 : self.index_root_blk,
1020 304 : block_reader,
1021 304 : );
1022 304 :
1023 304 : let mut all_keys: Vec<DeltaEntry<'_>> = Vec::new();
1024 304 :
1025 304 : tree_reader
1026 304 : .visit(
1027 304 : &[0u8; DELTA_KEY_SIZE],
1028 304 : VisitDirection::Forwards,
1029 2102008 : |key, value| {
1030 2102008 : let delta_key = DeltaKey::from_slice(key);
1031 2102008 : let val_ref = ValueRef {
1032 2102008 : blob_ref: BlobRef(value),
1033 2102008 : reader: BlockCursor::new(crate::tenant::block_io::BlockReaderRef::Adapter(
1034 2102008 : Adapter(self),
1035 2102008 : )),
1036 2102008 : };
1037 2102008 : let pos = BlobRef(value).pos();
1038 2102008 : if let Some(last) = all_keys.last_mut() {
1039 2101704 : // subtract offset of the current and last entries to get the size
1040 2101704 : // of the value associated with this (key, lsn) tuple
1041 2101704 : let first_pos = last.size;
1042 2101704 : last.size = pos - first_pos;
1043 2101704 : }
1044 2102008 : let entry = DeltaEntry {
1045 2102008 : key: delta_key.key(),
1046 2102008 : lsn: delta_key.lsn(),
1047 2102008 : size: pos,
1048 2102008 : val: val_ref,
1049 2102008 : };
1050 2102008 : all_keys.push(entry);
1051 2102008 : true
1052 2102008 : },
1053 304 : &RequestContextBuilder::extend(ctx)
1054 304 : .page_content_kind(PageContentKind::DeltaLayerBtreeNode)
1055 304 : .build(),
1056 304 : )
1057 2197 : .await?;
1058 304 : if let Some(last) = all_keys.last_mut() {
1059 304 : // Last key occupies all space till end of value storage,
1060 304 : // which corresponds to beginning of the index
1061 304 : last.size = self.index_start_blk as u64 * PAGE_SZ as u64 - last.size;
1062 304 : }
1063 304 : Ok(all_keys)
1064 304 : }
1065 :
1066 4 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
1067 4 : println!(
1068 4 : "index_start_blk: {}, root {}",
1069 4 : self.index_start_blk, self.index_root_blk
1070 4 : );
1071 4 :
1072 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
1073 4 : let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
1074 4 : self.index_start_blk,
1075 4 : self.index_root_blk,
1076 4 : block_reader,
1077 4 : );
1078 4 :
1079 4 : tree_reader.dump().await?;
1080 :
1081 4 : let keys = self.load_keys(ctx).await?;
1082 :
1083 8 : async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
1084 8 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1085 8 : let val = Value::des(&buf)?;
1086 8 : let desc = match val {
1087 8 : Value::Image(img) => {
1088 8 : format!(" img {} bytes", img.len())
1089 : }
1090 0 : Value::WalRecord(rec) => {
1091 0 : let wal_desc = walrecord::describe_wal_record(&rec)?;
1092 0 : format!(
1093 0 : " rec {} bytes will_init: {} {}",
1094 0 : buf.len(),
1095 0 : rec.will_init(),
1096 0 : wal_desc
1097 0 : )
1098 : }
1099 : };
1100 8 : Ok(desc)
1101 8 : }
1102 :
1103 12 : for entry in keys {
1104 8 : let DeltaEntry { key, lsn, val, .. } = entry;
1105 8 : let desc = match dump_blob(&val, ctx).await {
1106 8 : Ok(desc) => desc,
1107 0 : Err(err) => {
1108 0 : format!("ERROR: {err}")
1109 : }
1110 : };
1111 8 : println!(" key {key} at {lsn}: {desc}");
1112 8 :
1113 8 : // Print more details about CHECKPOINT records. Would be nice to print details
1114 8 : // of many other record types too, but these are particularly interesting, as
1115 8 : // have a lot of special processing for them in walingest.rs.
1116 8 : use pageserver_api::key::CHECKPOINT_KEY;
1117 8 : use postgres_ffi::CheckPoint;
1118 8 : if key == CHECKPOINT_KEY {
1119 0 : let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
1120 0 : let val = Value::des(&buf)?;
1121 0 : match val {
1122 0 : Value::Image(img) => {
1123 0 : let checkpoint = CheckPoint::decode(&img)?;
1124 0 : println!(" CHECKPOINT: {:?}", checkpoint);
1125 : }
1126 0 : Value::WalRecord(_rec) => {
1127 0 : println!(" unexpected walrecord value for checkpoint key");
1128 0 : }
1129 : }
1130 8 : }
1131 : }
1132 :
1133 4 : Ok(())
1134 4 : }
1135 : }
1136 :
1137 : /// A set of data associated with a delta layer key and its value
1138 : pub struct DeltaEntry<'a> {
1139 : pub key: Key,
1140 : pub lsn: Lsn,
1141 : /// Size of the stored value
1142 : pub size: u64,
1143 : /// Reference to the on-disk value
1144 : pub val: ValueRef<'a>,
1145 : }
1146 :
1147 : /// Reference to an on-disk value
1148 : pub struct ValueRef<'a> {
1149 : blob_ref: BlobRef,
1150 : reader: BlockCursor<'a>,
1151 : }
1152 :
1153 : impl<'a> ValueRef<'a> {
1154 : /// Loads the value from disk
1155 2102000 : pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
1156 : // theoretically we *could* record an access time for each, but it does not really matter
1157 2102000 : let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
1158 2102000 : let val = Value::des(&buf)?;
1159 2102000 : Ok(val)
1160 2102000 : }
1161 : }
1162 :
1163 : pub(crate) struct Adapter<T>(T);
1164 :
1165 : impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
1166 2121326 : pub(crate) async fn read_blk(
1167 2121326 : &self,
1168 2121326 : blknum: u32,
1169 2121326 : ctx: &RequestContext,
1170 2121326 : ) -> Result<BlockLease, std::io::Error> {
1171 2121326 : let block_reader = FileBlockReader::new(&self.0.as_ref().file, self.0.as_ref().file_id);
1172 2121326 : block_reader.read_blk(blknum, ctx).await
1173 2121326 : }
1174 : }
1175 :
1176 : impl AsRef<DeltaLayerInner> for DeltaLayerInner {
1177 4242652 : fn as_ref(&self) -> &DeltaLayerInner {
1178 4242652 : self
1179 4242652 : }
1180 : }
1181 :
1182 : impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for DeltaEntry<'a> {
1183 0 : fn key(&self) -> Key {
1184 0 : self.key
1185 0 : }
1186 0 : fn lsn(&self) -> Lsn {
1187 0 : self.lsn
1188 0 : }
1189 0 : fn size(&self) -> u64 {
1190 0 : self.size
1191 0 : }
1192 : }
|