Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::PAGE_SZ;
29 : use crate::repository::{Key, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
33 : use crate::tenant::storage_layer::{
34 : LayerAccessStats, ValueReconstructResult, ValueReconstructState,
35 : };
36 : use crate::tenant::Timeline;
37 : use crate::virtual_file::{self, VirtualFile};
38 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
39 : use anyhow::{bail, ensure, Context, Result};
40 : use bytes::Bytes;
41 : use camino::{Utf8Path, Utf8PathBuf};
42 : use hex;
43 : use pageserver_api::models::LayerAccessKind;
44 : use pageserver_api::shard::TenantShardId;
45 : use rand::{distributions::Alphanumeric, Rng};
46 : use serde::{Deserialize, Serialize};
47 : use std::fs::File;
48 : use std::io::SeekFrom;
49 : use std::ops::Range;
50 : use std::os::unix::prelude::FileExt;
51 : use std::sync::Arc;
52 : use tokio::sync::OnceCell;
53 : use tracing::*;
54 :
55 : use utils::{
56 : bin_ser::BeSer,
57 : id::{TenantId, TimelineId},
58 : lsn::Lsn,
59 : };
60 :
61 : use super::filename::ImageFileName;
62 : use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
63 :
64 : ///
65 : /// Header stored in the beginning of the file
66 : ///
67 : /// After this comes the 'values' part, starting on block 1. After that,
68 : /// the 'index' starts at the block indicated by 'index_start_blk'
69 : ///
70 21294 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
71 : pub struct Summary {
72 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
73 : pub magic: u16,
74 : pub format_version: u16,
75 :
76 : pub tenant_id: TenantId,
77 : pub timeline_id: TimelineId,
78 : pub key_range: Range<Key>,
79 : pub lsn: Lsn,
80 :
81 : /// Block number where the 'index' part of the file begins.
82 : pub index_start_blk: u32,
83 : /// Block within the 'index', where the B-tree root page is stored
84 : pub index_root_blk: u32,
85 : // the 'values' part starts after the summary header, on block 1.
86 : }
87 :
88 : impl From<&ImageLayer> for Summary {
89 0 : fn from(layer: &ImageLayer) -> Self {
90 0 : Self::expected(
91 0 : layer.desc.tenant_shard_id.tenant_id,
92 0 : layer.desc.timeline_id,
93 0 : layer.desc.key_range.clone(),
94 0 : layer.lsn,
95 0 : )
96 0 : }
97 : }
98 :
99 : impl Summary {
100 21294 : pub(super) fn expected(
101 21294 : tenant_id: TenantId,
102 21294 : timeline_id: TimelineId,
103 21294 : key_range: Range<Key>,
104 21294 : lsn: Lsn,
105 21294 : ) -> Self {
106 21294 : Self {
107 21294 : magic: IMAGE_FILE_MAGIC,
108 21294 : format_version: STORAGE_FORMAT_VERSION,
109 21294 : tenant_id,
110 21294 : timeline_id,
111 21294 : key_range,
112 21294 : lsn,
113 21294 :
114 21294 : index_start_blk: 0,
115 21294 : index_root_blk: 0,
116 21294 : }
117 21294 : }
118 : }
119 :
120 : /// This is used only from `pagectl`. Within pageserver, all layers are
121 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
122 : pub struct ImageLayer {
123 : path: Utf8PathBuf,
124 : pub desc: PersistentLayerDesc,
125 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
126 : pub lsn: Lsn,
127 : access_stats: LayerAccessStats,
128 : inner: OnceCell<ImageLayerInner>,
129 : }
130 :
131 : impl std::fmt::Debug for ImageLayer {
132 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 0 : use super::RangeDisplayDebug;
134 0 :
135 0 : f.debug_struct("ImageLayer")
136 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
137 0 : .field("file_size", &self.desc.file_size)
138 0 : .field("lsn", &self.lsn)
139 0 : .field("inner", &self.inner)
140 0 : .finish()
141 0 : }
142 : }
143 :
144 : /// ImageLayer is the in-memory data structure associated with an on-disk image
145 : /// file.
146 : pub struct ImageLayerInner {
147 : // values copied from summary
148 : index_start_blk: u32,
149 : index_root_blk: u32,
150 :
151 : lsn: Lsn,
152 :
153 : /// Reader object for reading blocks from the file.
154 : file: FileBlockReader,
155 : }
156 :
157 : impl std::fmt::Debug for ImageLayerInner {
158 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 0 : f.debug_struct("ImageLayerInner")
160 0 : .field("index_start_blk", &self.index_start_blk)
161 0 : .field("index_root_blk", &self.index_root_blk)
162 0 : .finish()
163 0 : }
164 : }
165 :
166 : impl ImageLayerInner {
167 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
168 0 : let file = &self.file;
169 0 : let tree_reader =
170 0 : DiskBtreeReader::<_, KEY_SIZE>::new(self.index_start_blk, self.index_root_blk, file);
171 0 :
172 0 : tree_reader.dump().await?;
173 :
174 0 : tree_reader
175 0 : .visit(
176 0 : &[0u8; KEY_SIZE],
177 0 : VisitDirection::Forwards,
178 0 : |key, value| {
179 0 : println!("key: {} offset {}", hex::encode(key), value);
180 0 : true
181 0 : },
182 0 : ctx,
183 0 : )
184 0 : .await?;
185 :
186 0 : Ok(())
187 0 : }
188 : }
189 :
190 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
191 : impl std::fmt::Display for ImageLayer {
192 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 0 : write!(f, "{}", self.layer_desc().short_id())
194 0 : }
195 : }
196 :
197 : impl AsLayerDesc for ImageLayer {
198 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
199 0 : &self.desc
200 0 : }
201 : }
202 :
203 : impl ImageLayer {
204 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
205 0 : self.desc.dump();
206 0 :
207 0 : if !verbose {
208 0 : return Ok(());
209 0 : }
210 :
211 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
212 :
213 0 : inner.dump(ctx).await?;
214 :
215 0 : Ok(())
216 0 : }
217 :
218 6475 : fn temp_path_for(
219 6475 : conf: &PageServerConf,
220 6475 : timeline_id: TimelineId,
221 6475 : tenant_shard_id: TenantShardId,
222 6475 : fname: &ImageFileName,
223 6475 : ) -> Utf8PathBuf {
224 6475 : let rand_string: String = rand::thread_rng()
225 6475 : .sample_iter(&Alphanumeric)
226 6475 : .take(8)
227 6475 : .map(char::from)
228 6475 : .collect();
229 6475 :
230 6475 : conf.timeline_path(&tenant_shard_id, &timeline_id)
231 6475 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
232 6475 : }
233 :
234 : ///
235 : /// Open the underlying file and read the metadata into memory, if it's
236 : /// not loaded already.
237 : ///
238 0 : async fn load(
239 0 : &self,
240 0 : access_kind: LayerAccessKind,
241 0 : ctx: &RequestContext,
242 0 : ) -> Result<&ImageLayerInner> {
243 0 : self.access_stats.record_access(access_kind, ctx);
244 0 : self.inner
245 0 : .get_or_try_init(|| self.load_inner(ctx))
246 0 : .await
247 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
248 0 : }
249 :
250 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
251 0 : let path = self.path();
252 :
253 0 : let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
254 0 : .await
255 0 : .and_then(|res| res)?;
256 :
257 : // not production code
258 0 : let actual_filename = path.file_name().unwrap().to_owned();
259 0 : let expected_filename = self.layer_desc().filename().file_name();
260 0 :
261 0 : if actual_filename != expected_filename {
262 0 : println!("warning: filename does not match what is expected from in-file summary");
263 0 : println!("actual: {:?}", actual_filename);
264 0 : println!("expected: {:?}", expected_filename);
265 0 : }
266 :
267 0 : Ok(loaded)
268 0 : }
269 :
270 : /// Create an ImageLayer struct representing an existing file on disk.
271 : ///
272 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
273 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
274 0 : let mut summary_buf = vec![0; PAGE_SZ];
275 0 : file.read_exact_at(&mut summary_buf, 0)?;
276 0 : let summary = Summary::des_prefix(&summary_buf)?;
277 0 : let metadata = file
278 0 : .metadata()
279 0 : .context("get file metadata to determine size")?;
280 :
281 : // This function is never used for constructing layers in a running pageserver,
282 : // so it does not need an accurate TenantShardId.
283 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
284 0 :
285 0 : Ok(ImageLayer {
286 0 : path: path.to_path_buf(),
287 0 : desc: PersistentLayerDesc::new_img(
288 0 : tenant_shard_id,
289 0 : summary.timeline_id,
290 0 : summary.key_range,
291 0 : summary.lsn,
292 0 : metadata.len(),
293 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
294 0 : lsn: summary.lsn,
295 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
296 0 : inner: OnceCell::new(),
297 0 : })
298 0 : }
299 :
300 0 : fn path(&self) -> Utf8PathBuf {
301 0 : self.path.clone()
302 0 : }
303 : }
304 :
305 0 : #[derive(thiserror::Error, Debug)]
306 : pub enum RewriteSummaryError {
307 : #[error("magic mismatch")]
308 : MagicMismatch,
309 : #[error(transparent)]
310 : Other(#[from] anyhow::Error),
311 : }
312 :
313 : impl From<std::io::Error> for RewriteSummaryError {
314 0 : fn from(e: std::io::Error) -> Self {
315 0 : Self::Other(anyhow::anyhow!(e))
316 0 : }
317 : }
318 :
319 : impl ImageLayer {
320 0 : pub async fn rewrite_summary<F>(
321 0 : path: &Utf8Path,
322 0 : rewrite: F,
323 0 : ctx: &RequestContext,
324 0 : ) -> Result<(), RewriteSummaryError>
325 0 : where
326 0 : F: Fn(Summary) -> Summary,
327 0 : {
328 0 : let file = VirtualFile::open_with_options(
329 0 : path,
330 0 : virtual_file::OpenOptions::new().read(true).write(true),
331 0 : )
332 0 : .await
333 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
334 0 : let file = FileBlockReader::new(file);
335 0 : let summary_blk = file.read_blk(0, ctx).await?;
336 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
337 0 : let mut file = file.file;
338 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
339 0 : return Err(RewriteSummaryError::MagicMismatch);
340 0 : }
341 0 :
342 0 : let new_summary = rewrite(actual_summary);
343 0 :
344 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
345 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
346 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
347 0 : file.seek(SeekFrom::Start(0)).await?;
348 0 : let (_buf, res) = file.write_all(buf).await;
349 0 : res?;
350 0 : Ok(())
351 0 : }
352 : }
353 :
354 : impl ImageLayerInner {
355 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
356 : /// - inner has the success or transient failure
357 : /// - outer has the permanent failure
358 21294 : pub(super) async fn load(
359 21294 : path: &Utf8Path,
360 21294 : lsn: Lsn,
361 21294 : summary: Option<Summary>,
362 21294 : ctx: &RequestContext,
363 21294 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
364 21294 : let file = match VirtualFile::open(path).await {
365 21294 : Ok(file) => file,
366 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
367 : };
368 21294 : let file = FileBlockReader::new(file);
369 21294 : let summary_blk = match file.read_blk(0, ctx).await {
370 21294 : Ok(blk) => blk,
371 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
372 : };
373 :
374 : // length is the only way how this could fail, so it's not actually likely at all unless
375 : // read_blk returns wrong sized block.
376 : //
377 : // TODO: confirm and make this into assertion
378 21294 : let actual_summary =
379 21294 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
380 :
381 21294 : if let Some(mut expected_summary) = summary {
382 : // production code path
383 21294 : expected_summary.index_start_blk = actual_summary.index_start_blk;
384 21294 : expected_summary.index_root_blk = actual_summary.index_root_blk;
385 21294 :
386 21294 : if actual_summary != expected_summary {
387 0 : bail!(
388 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
389 0 : actual_summary,
390 0 : expected_summary
391 0 : );
392 21294 : }
393 0 : }
394 :
395 21294 : Ok(Ok(ImageLayerInner {
396 21294 : index_start_blk: actual_summary.index_start_blk,
397 21294 : index_root_blk: actual_summary.index_root_blk,
398 21294 : lsn,
399 21294 : file,
400 21294 : }))
401 21294 : }
402 :
403 428023 : pub(super) async fn get_value_reconstruct_data(
404 428023 : &self,
405 428023 : key: Key,
406 428023 : reconstruct_state: &mut ValueReconstructState,
407 428023 : ctx: &RequestContext,
408 428023 : ) -> anyhow::Result<ValueReconstructResult> {
409 428023 : let file = &self.file;
410 428023 : let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
411 428023 :
412 428023 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
413 428023 : key.write_to_byte_slice(&mut keybuf);
414 428023 : if let Some(offset) = tree_reader
415 428023 : .get(
416 428023 : &keybuf,
417 428023 : &RequestContextBuilder::extend(ctx)
418 428023 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
419 428023 : .build(),
420 428023 : )
421 6330 : .await?
422 : {
423 428019 : let blob = file
424 428019 : .block_cursor()
425 428019 : .read_blob(
426 428019 : offset,
427 428019 : &RequestContextBuilder::extend(ctx)
428 428019 : .page_content_kind(PageContentKind::ImageLayerValue)
429 428019 : .build(),
430 428019 : )
431 6938 : .await
432 428019 : .with_context(|| format!("failed to read value from offset {}", offset))?;
433 428019 : let value = Bytes::from(blob);
434 428019 :
435 428019 : reconstruct_state.img = Some((self.lsn, value));
436 428019 : Ok(ValueReconstructResult::Complete)
437 : } else {
438 4 : Ok(ValueReconstructResult::Missing)
439 : }
440 428023 : }
441 : }
442 :
443 : /// A builder object for constructing a new image layer.
444 : ///
445 : /// Usage:
446 : ///
447 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
448 : ///
449 : /// 2. Write the contents by calling `put_page_image` for every key-value
450 : /// pair in the key range.
451 : ///
452 : /// 3. Call `finish`.
453 : ///
454 : struct ImageLayerWriterInner {
455 : conf: &'static PageServerConf,
456 : path: Utf8PathBuf,
457 : timeline_id: TimelineId,
458 : tenant_shard_id: TenantShardId,
459 : key_range: Range<Key>,
460 : lsn: Lsn,
461 :
462 : blob_writer: BlobWriter<false>,
463 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
464 : }
465 :
466 : impl ImageLayerWriterInner {
467 : ///
468 : /// Start building a new image layer.
469 : ///
470 6475 : async fn new(
471 6475 : conf: &'static PageServerConf,
472 6475 : timeline_id: TimelineId,
473 6475 : tenant_shard_id: TenantShardId,
474 6475 : key_range: &Range<Key>,
475 6475 : lsn: Lsn,
476 6475 : ) -> anyhow::Result<Self> {
477 6475 : // Create the file initially with a temporary filename.
478 6475 : // We'll atomically rename it to the final name when we're done.
479 6475 : let path = ImageLayer::temp_path_for(
480 6475 : conf,
481 6475 : timeline_id,
482 6475 : tenant_shard_id,
483 6475 : &ImageFileName {
484 6475 : key_range: key_range.clone(),
485 6475 : lsn,
486 6475 : },
487 6475 : );
488 6475 : info!("new image layer {path}");
489 6475 : let mut file = {
490 6475 : VirtualFile::open_with_options(
491 6475 : &path,
492 6475 : virtual_file::OpenOptions::new()
493 6475 : .write(true)
494 6475 : .create_new(true),
495 6475 : )
496 70 : .await?
497 : };
498 : // make room for the header block
499 6475 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
500 6475 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
501 6475 :
502 6475 : // Initialize the b-tree index builder
503 6475 : let block_buf = BlockBuf::new();
504 6475 : let tree_builder = DiskBtreeBuilder::new(block_buf);
505 6475 :
506 6475 : let writer = Self {
507 6475 : conf,
508 6475 : path,
509 6475 : timeline_id,
510 6475 : tenant_shard_id,
511 6475 : key_range: key_range.clone(),
512 6475 : lsn,
513 6475 : tree: tree_builder,
514 6475 : blob_writer,
515 6475 : };
516 6475 :
517 6475 : Ok(writer)
518 6475 : }
519 :
520 : ///
521 : /// Write next value to the file.
522 : ///
523 : /// The page versions must be appended in blknum order.
524 : ///
525 229189 : async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
526 229189 : ensure!(self.key_range.contains(&key));
527 229189 : let (_img, res) = self.blob_writer.write_blob(img).await;
528 : // TODO: re-use the buffer for `img` further upstack
529 229189 : let off = res?;
530 :
531 229189 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
532 229189 : key.write_to_byte_slice(&mut keybuf);
533 229189 : self.tree.append(&keybuf, off)?;
534 :
535 229189 : Ok(())
536 229189 : }
537 :
538 : ///
539 : /// Finish writing the image layer.
540 : ///
541 6475 : async fn finish(self, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
542 6475 : let index_start_blk =
543 6475 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
544 6475 :
545 6475 : let mut file = self.blob_writer.into_inner();
546 6475 :
547 6475 : // Write out the index
548 6475 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
549 0 : .await?;
550 6475 : let (index_root_blk, block_buf) = self.tree.finish()?;
551 13164 : for buf in block_buf.blocks {
552 6689 : let (_buf, res) = file.write_all(buf).await;
553 6689 : res?;
554 : }
555 :
556 : // Fill in the summary on blk 0
557 6475 : let summary = Summary {
558 6475 : magic: IMAGE_FILE_MAGIC,
559 6475 : format_version: STORAGE_FORMAT_VERSION,
560 6475 : tenant_id: self.tenant_shard_id.tenant_id,
561 6475 : timeline_id: self.timeline_id,
562 6475 : key_range: self.key_range.clone(),
563 6475 : lsn: self.lsn,
564 6475 : index_start_blk,
565 6475 : index_root_blk,
566 6475 : };
567 6475 :
568 6475 : let mut buf = Vec::with_capacity(PAGE_SZ);
569 6475 : // TODO: could use smallvec here but it's a pain with Slice<T>
570 6475 : Summary::ser_into(&summary, &mut buf)?;
571 6475 : file.seek(SeekFrom::Start(0)).await?;
572 6475 : let (_buf, res) = file.write_all(buf).await;
573 6475 : res?;
574 :
575 6475 : let metadata = file
576 6475 : .metadata()
577 61 : .await
578 6475 : .context("get metadata to determine file size")?;
579 :
580 6475 : let desc = PersistentLayerDesc::new_img(
581 6475 : self.tenant_shard_id,
582 6475 : self.timeline_id,
583 6475 : self.key_range.clone(),
584 6475 : self.lsn,
585 6475 : metadata.len(),
586 6475 : );
587 6475 :
588 6475 : // Note: Because we open the file in write-only mode, we cannot
589 6475 : // reuse the same VirtualFile for reading later. That's why we don't
590 6475 : // set inner.file here. The first read will have to re-open it.
591 6475 :
592 6475 : // fsync the file
593 6475 : file.sync_all().await?;
594 :
595 : // FIXME: why not carry the virtualfile here, it supports renaming?
596 6475 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
597 :
598 0 : trace!("created image layer {}", layer.local_path());
599 :
600 6475 : Ok(layer)
601 6475 : }
602 : }
603 :
604 : /// A builder object for constructing a new image layer.
605 : ///
606 : /// Usage:
607 : ///
608 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
609 : ///
610 : /// 2. Write the contents by calling `put_page_image` for every key-value
611 : /// pair in the key range.
612 : ///
613 : /// 3. Call `finish`.
614 : ///
615 : /// # Note
616 : ///
617 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
618 : /// possible for the writer to drop before `finish` is actually called. So this
619 : /// could lead to odd temporary files in the directory, exhausting file system.
620 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
621 : /// implementation that cleans up the temporary file in failure. It's not
622 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
623 : /// out some fields, making it impossible to implement `Drop`.
624 : ///
625 : #[must_use]
626 : pub struct ImageLayerWriter {
627 : inner: Option<ImageLayerWriterInner>,
628 : }
629 :
630 : impl ImageLayerWriter {
631 : ///
632 : /// Start building a new image layer.
633 : ///
634 6475 : pub async fn new(
635 6475 : conf: &'static PageServerConf,
636 6475 : timeline_id: TimelineId,
637 6475 : tenant_shard_id: TenantShardId,
638 6475 : key_range: &Range<Key>,
639 6475 : lsn: Lsn,
640 6475 : ) -> anyhow::Result<ImageLayerWriter> {
641 6475 : Ok(Self {
642 6475 : inner: Some(
643 6475 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
644 70 : .await?,
645 : ),
646 : })
647 6475 : }
648 :
649 : ///
650 : /// Write next value to the file.
651 : ///
652 : /// The page versions must be appended in blknum order.
653 : ///
654 229189 : pub async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
655 229189 : self.inner.as_mut().unwrap().put_image(key, img).await
656 229189 : }
657 :
658 : ///
659 : /// Finish writing the image layer.
660 : ///
661 6475 : pub(crate) async fn finish(
662 6475 : mut self,
663 6475 : timeline: &Arc<Timeline>,
664 6475 : ) -> anyhow::Result<super::ResidentLayer> {
665 6475 : self.inner.take().unwrap().finish(timeline).await
666 6475 : }
667 : }
668 :
669 : impl Drop for ImageLayerWriter {
670 6475 : fn drop(&mut self) {
671 6475 : if let Some(inner) = self.inner.take() {
672 0 : inner.blob_writer.into_inner().remove();
673 6475 : }
674 6475 : }
675 : }
|