Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::PAGE_SZ;
29 : use crate::repository::{Key, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
33 : use crate::tenant::storage_layer::{
34 : LayerAccessStats, ValueReconstructResult, ValueReconstructState,
35 : };
36 : use crate::tenant::Timeline;
37 : use crate::virtual_file::{self, VirtualFile};
38 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
39 : use anyhow::{bail, ensure, Context, Result};
40 : use bytes::Bytes;
41 : use camino::{Utf8Path, Utf8PathBuf};
42 : use hex;
43 : use pageserver_api::models::LayerAccessKind;
44 : use pageserver_api::shard::TenantShardId;
45 : use rand::{distributions::Alphanumeric, Rng};
46 : use serde::{Deserialize, Serialize};
47 : use std::fs::File;
48 : use std::io::SeekFrom;
49 : use std::ops::Range;
50 : use std::os::unix::prelude::FileExt;
51 : use std::sync::Arc;
52 : use tokio::sync::OnceCell;
53 : use tracing::*;
54 :
55 : use utils::{
56 : bin_ser::BeSer,
57 : id::{TenantId, TimelineId},
58 : lsn::Lsn,
59 : };
60 :
61 : use super::filename::ImageFileName;
62 : use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
63 :
64 : ///
65 : /// Header stored in the beginning of the file
66 : ///
67 : /// After this comes the 'values' part, starting on block 1. After that,
68 : /// the 'index' starts at the block indicated by 'index_start_blk'
69 : ///
70 22361 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
71 : pub struct Summary {
72 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
73 : pub magic: u16,
74 : pub format_version: u16,
75 :
76 : pub tenant_id: TenantId,
77 : pub timeline_id: TimelineId,
78 : pub key_range: Range<Key>,
79 : pub lsn: Lsn,
80 :
81 : /// Block number where the 'index' part of the file begins.
82 : pub index_start_blk: u32,
83 : /// Block within the 'index', where the B-tree root page is stored
84 : pub index_root_blk: u32,
85 : // the 'values' part starts after the summary header, on block 1.
86 : }
87 :
88 : impl From<&ImageLayer> for Summary {
89 0 : fn from(layer: &ImageLayer) -> Self {
90 0 : Self::expected(
91 0 : layer.desc.tenant_shard_id.tenant_id,
92 0 : layer.desc.timeline_id,
93 0 : layer.desc.key_range.clone(),
94 0 : layer.lsn,
95 0 : )
96 0 : }
97 : }
98 :
99 : impl Summary {
100 22361 : pub(super) fn expected(
101 22361 : tenant_id: TenantId,
102 22361 : timeline_id: TimelineId,
103 22361 : key_range: Range<Key>,
104 22361 : lsn: Lsn,
105 22361 : ) -> Self {
106 22361 : Self {
107 22361 : magic: IMAGE_FILE_MAGIC,
108 22361 : format_version: STORAGE_FORMAT_VERSION,
109 22361 : tenant_id,
110 22361 : timeline_id,
111 22361 : key_range,
112 22361 : lsn,
113 22361 :
114 22361 : index_start_blk: 0,
115 22361 : index_root_blk: 0,
116 22361 : }
117 22361 : }
118 : }
119 :
120 : /// This is used only from `pagectl`. Within pageserver, all layers are
121 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
122 : pub struct ImageLayer {
123 : path: Utf8PathBuf,
124 : pub desc: PersistentLayerDesc,
125 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
126 : pub lsn: Lsn,
127 : access_stats: LayerAccessStats,
128 : inner: OnceCell<ImageLayerInner>,
129 : }
130 :
131 : impl std::fmt::Debug for ImageLayer {
132 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 0 : use super::RangeDisplayDebug;
134 0 :
135 0 : f.debug_struct("ImageLayer")
136 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
137 0 : .field("file_size", &self.desc.file_size)
138 0 : .field("lsn", &self.lsn)
139 0 : .field("inner", &self.inner)
140 0 : .finish()
141 0 : }
142 : }
143 :
144 : /// ImageLayer is the in-memory data structure associated with an on-disk image
145 : /// file.
146 : pub struct ImageLayerInner {
147 : // values copied from summary
148 : index_start_blk: u32,
149 : index_root_blk: u32,
150 :
151 : lsn: Lsn,
152 :
153 : /// Reader object for reading blocks from the file.
154 : file: FileBlockReader,
155 : }
156 :
157 : impl std::fmt::Debug for ImageLayerInner {
158 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 0 : f.debug_struct("ImageLayerInner")
160 0 : .field("index_start_blk", &self.index_start_blk)
161 0 : .field("index_root_blk", &self.index_root_blk)
162 0 : .finish()
163 0 : }
164 : }
165 :
166 : impl ImageLayerInner {
167 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
168 0 : let file = &self.file;
169 0 : let tree_reader =
170 0 : DiskBtreeReader::<_, KEY_SIZE>::new(self.index_start_blk, self.index_root_blk, file);
171 0 :
172 0 : tree_reader.dump().await?;
173 :
174 0 : tree_reader
175 0 : .visit(
176 0 : &[0u8; KEY_SIZE],
177 0 : VisitDirection::Forwards,
178 0 : |key, value| {
179 0 : println!("key: {} offset {}", hex::encode(key), value);
180 0 : true
181 0 : },
182 0 : ctx,
183 0 : )
184 0 : .await?;
185 :
186 0 : Ok(())
187 0 : }
188 : }
189 :
190 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
191 : impl std::fmt::Display for ImageLayer {
192 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 0 : write!(f, "{}", self.layer_desc().short_id())
194 0 : }
195 : }
196 :
197 : impl AsLayerDesc for ImageLayer {
198 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
199 0 : &self.desc
200 0 : }
201 : }
202 :
203 : impl ImageLayer {
204 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
205 0 : self.desc.dump();
206 0 :
207 0 : if !verbose {
208 0 : return Ok(());
209 0 : }
210 :
211 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
212 :
213 0 : inner.dump(ctx).await?;
214 :
215 0 : Ok(())
216 0 : }
217 :
218 6029 : fn temp_path_for(
219 6029 : conf: &PageServerConf,
220 6029 : timeline_id: TimelineId,
221 6029 : tenant_shard_id: TenantShardId,
222 6029 : fname: &ImageFileName,
223 6029 : ) -> Utf8PathBuf {
224 6029 : let rand_string: String = rand::thread_rng()
225 6029 : .sample_iter(&Alphanumeric)
226 6029 : .take(8)
227 6029 : .map(char::from)
228 6029 : .collect();
229 6029 :
230 6029 : conf.timeline_path(&tenant_shard_id, &timeline_id)
231 6029 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
232 6029 : }
233 :
234 : ///
235 : /// Open the underlying file and read the metadata into memory, if it's
236 : /// not loaded already.
237 : ///
238 0 : async fn load(
239 0 : &self,
240 0 : access_kind: LayerAccessKind,
241 0 : ctx: &RequestContext,
242 0 : ) -> Result<&ImageLayerInner> {
243 0 : self.access_stats.record_access(access_kind, ctx);
244 0 : self.inner
245 0 : .get_or_try_init(|| self.load_inner(ctx))
246 0 : .await
247 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
248 0 : }
249 :
250 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
251 0 : let path = self.path();
252 :
253 0 : let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
254 0 : .await
255 0 : .and_then(|res| res)?;
256 :
257 : // not production code
258 0 : let actual_filename = path.file_name().unwrap().to_owned();
259 0 : let expected_filename = self.layer_desc().filename().file_name();
260 0 :
261 0 : if actual_filename != expected_filename {
262 0 : println!("warning: filename does not match what is expected from in-file summary");
263 0 : println!("actual: {:?}", actual_filename);
264 0 : println!("expected: {:?}", expected_filename);
265 0 : }
266 :
267 0 : Ok(loaded)
268 0 : }
269 :
270 : /// Create an ImageLayer struct representing an existing file on disk.
271 : ///
272 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
273 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
274 0 : let mut summary_buf = vec![0; PAGE_SZ];
275 0 : file.read_exact_at(&mut summary_buf, 0)?;
276 0 : let summary = Summary::des_prefix(&summary_buf)?;
277 0 : let metadata = file
278 0 : .metadata()
279 0 : .context("get file metadata to determine size")?;
280 :
281 : // This function is never used for constructing layers in a running pageserver,
282 : // so it does not need an accurate TenantShardId.
283 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
284 0 :
285 0 : Ok(ImageLayer {
286 0 : path: path.to_path_buf(),
287 0 : desc: PersistentLayerDesc::new_img(
288 0 : tenant_shard_id,
289 0 : summary.timeline_id,
290 0 : summary.key_range,
291 0 : summary.lsn,
292 0 : metadata.len(),
293 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
294 0 : lsn: summary.lsn,
295 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
296 0 : inner: OnceCell::new(),
297 0 : })
298 0 : }
299 :
300 0 : fn path(&self) -> Utf8PathBuf {
301 0 : self.path.clone()
302 0 : }
303 : }
304 :
305 0 : #[derive(thiserror::Error, Debug)]
306 : pub enum RewriteSummaryError {
307 : #[error("magic mismatch")]
308 : MagicMismatch,
309 : #[error(transparent)]
310 : Other(#[from] anyhow::Error),
311 : }
312 :
313 : impl From<std::io::Error> for RewriteSummaryError {
314 0 : fn from(e: std::io::Error) -> Self {
315 0 : Self::Other(anyhow::anyhow!(e))
316 0 : }
317 : }
318 :
319 : impl ImageLayer {
320 0 : pub async fn rewrite_summary<F>(
321 0 : path: &Utf8Path,
322 0 : rewrite: F,
323 0 : ctx: &RequestContext,
324 0 : ) -> Result<(), RewriteSummaryError>
325 0 : where
326 0 : F: Fn(Summary) -> Summary,
327 0 : {
328 0 : let file = VirtualFile::open_with_options(
329 0 : path,
330 0 : virtual_file::OpenOptions::new().read(true).write(true),
331 0 : )
332 0 : .await
333 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
334 0 : let file = FileBlockReader::new(file);
335 0 : let summary_blk = file.read_blk(0, ctx).await?;
336 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
337 0 : let mut file = file.file;
338 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
339 0 : return Err(RewriteSummaryError::MagicMismatch);
340 0 : }
341 0 :
342 0 : let new_summary = rewrite(actual_summary);
343 0 :
344 0 : let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
345 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
346 0 : if buf.spilled() {
347 : // The code in ImageLayerWriterInner just warn!()s for this.
348 : // It should probably error out as well.
349 0 : return Err(RewriteSummaryError::Other(anyhow::anyhow!(
350 0 : "Used more than one page size for summary buffer: {}",
351 0 : buf.len()
352 0 : )));
353 0 : }
354 0 : file.seek(SeekFrom::Start(0)).await?;
355 0 : file.write_all(&buf).await?;
356 0 : Ok(())
357 0 : }
358 : }
359 :
360 : impl ImageLayerInner {
361 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
362 : /// - inner has the success or transient failure
363 : /// - outer has the permanent failure
364 22361 : pub(super) async fn load(
365 22361 : path: &Utf8Path,
366 22361 : lsn: Lsn,
367 22361 : summary: Option<Summary>,
368 22361 : ctx: &RequestContext,
369 22361 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
370 22361 : let file = match VirtualFile::open(path).await {
371 22361 : Ok(file) => file,
372 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
373 : };
374 22361 : let file = FileBlockReader::new(file);
375 22361 : let summary_blk = match file.read_blk(0, ctx).await {
376 22361 : Ok(blk) => blk,
377 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
378 : };
379 :
380 : // length is the only way how this could fail, so it's not actually likely at all unless
381 : // read_blk returns wrong sized block.
382 : //
383 : // TODO: confirm and make this into assertion
384 22361 : let actual_summary =
385 22361 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
386 :
387 22361 : if let Some(mut expected_summary) = summary {
388 : // production code path
389 22361 : expected_summary.index_start_blk = actual_summary.index_start_blk;
390 22361 : expected_summary.index_root_blk = actual_summary.index_root_blk;
391 22361 :
392 22361 : if actual_summary != expected_summary {
393 0 : bail!(
394 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
395 0 : actual_summary,
396 0 : expected_summary
397 0 : );
398 22361 : }
399 0 : }
400 :
401 22361 : Ok(Ok(ImageLayerInner {
402 22361 : index_start_blk: actual_summary.index_start_blk,
403 22361 : index_root_blk: actual_summary.index_root_blk,
404 22361 : lsn,
405 22361 : file,
406 22361 : }))
407 22361 : }
408 :
409 1135219 : pub(super) async fn get_value_reconstruct_data(
410 1135219 : &self,
411 1135219 : key: Key,
412 1135219 : reconstruct_state: &mut ValueReconstructState,
413 1135219 : ctx: &RequestContext,
414 1135219 : ) -> anyhow::Result<ValueReconstructResult> {
415 1135219 : let file = &self.file;
416 1135219 : let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
417 1135219 :
418 1135219 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
419 1135219 : key.write_to_byte_slice(&mut keybuf);
420 1135219 : if let Some(offset) = tree_reader
421 1135219 : .get(
422 1135219 : &keybuf,
423 1135219 : &RequestContextBuilder::extend(ctx)
424 1135219 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
425 1135219 : .build(),
426 1135219 : )
427 6280 : .await?
428 : {
429 1135215 : let blob = file
430 1135215 : .block_cursor()
431 1135215 : .read_blob(
432 1135215 : offset,
433 1135215 : &RequestContextBuilder::extend(ctx)
434 1135215 : .page_content_kind(PageContentKind::ImageLayerValue)
435 1135215 : .build(),
436 1135215 : )
437 8274 : .await
438 1135215 : .with_context(|| format!("failed to read value from offset {}", offset))?;
439 1135215 : let value = Bytes::from(blob);
440 1135215 :
441 1135215 : reconstruct_state.img = Some((self.lsn, value));
442 1135215 : Ok(ValueReconstructResult::Complete)
443 : } else {
444 4 : Ok(ValueReconstructResult::Missing)
445 : }
446 1135219 : }
447 : }
448 :
449 : /// A builder object for constructing a new image layer.
450 : ///
451 : /// Usage:
452 : ///
453 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
454 : ///
455 : /// 2. Write the contents by calling `put_page_image` for every key-value
456 : /// pair in the key range.
457 : ///
458 : /// 3. Call `finish`.
459 : ///
460 : struct ImageLayerWriterInner {
461 : conf: &'static PageServerConf,
462 : path: Utf8PathBuf,
463 : timeline_id: TimelineId,
464 : tenant_shard_id: TenantShardId,
465 : key_range: Range<Key>,
466 : lsn: Lsn,
467 :
468 : blob_writer: BlobWriter<false>,
469 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
470 : }
471 :
472 : impl ImageLayerWriterInner {
473 : ///
474 : /// Start building a new image layer.
475 : ///
476 6029 : async fn new(
477 6029 : conf: &'static PageServerConf,
478 6029 : timeline_id: TimelineId,
479 6029 : tenant_shard_id: TenantShardId,
480 6029 : key_range: &Range<Key>,
481 6029 : lsn: Lsn,
482 6029 : ) -> anyhow::Result<Self> {
483 6029 : // Create the file initially with a temporary filename.
484 6029 : // We'll atomically rename it to the final name when we're done.
485 6029 : let path = ImageLayer::temp_path_for(
486 6029 : conf,
487 6029 : timeline_id,
488 6029 : tenant_shard_id,
489 6029 : &ImageFileName {
490 6029 : key_range: key_range.clone(),
491 6029 : lsn,
492 6029 : },
493 6029 : );
494 6029 : info!("new image layer {path}");
495 6029 : let mut file = {
496 6029 : VirtualFile::open_with_options(
497 6029 : &path,
498 6029 : virtual_file::OpenOptions::new()
499 6029 : .write(true)
500 6029 : .create_new(true),
501 6029 : )
502 70 : .await?
503 : };
504 : // make room for the header block
505 6029 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
506 6029 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
507 6029 :
508 6029 : // Initialize the b-tree index builder
509 6029 : let block_buf = BlockBuf::new();
510 6029 : let tree_builder = DiskBtreeBuilder::new(block_buf);
511 6029 :
512 6029 : let writer = Self {
513 6029 : conf,
514 6029 : path,
515 6029 : timeline_id,
516 6029 : tenant_shard_id,
517 6029 : key_range: key_range.clone(),
518 6029 : lsn,
519 6029 : tree: tree_builder,
520 6029 : blob_writer,
521 6029 : };
522 6029 :
523 6029 : Ok(writer)
524 6029 : }
525 :
526 : ///
527 : /// Write next value to the file.
528 : ///
529 : /// The page versions must be appended in blknum order.
530 : ///
531 229356 : async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
532 229356 : ensure!(self.key_range.contains(&key));
533 229356 : let off = self.blob_writer.write_blob(img).await?;
534 :
535 229356 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
536 229356 : key.write_to_byte_slice(&mut keybuf);
537 229356 : self.tree.append(&keybuf, off)?;
538 :
539 229356 : Ok(())
540 229356 : }
541 :
542 : ///
543 : /// Finish writing the image layer.
544 : ///
545 6029 : async fn finish(self, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
546 6029 : let index_start_blk =
547 6029 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
548 6029 :
549 6029 : let mut file = self.blob_writer.into_inner();
550 6029 :
551 6029 : // Write out the index
552 6029 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
553 0 : .await?;
554 6029 : let (index_root_blk, block_buf) = self.tree.finish()?;
555 12248 : for buf in block_buf.blocks {
556 6219 : file.write_all(buf.as_ref()).await?;
557 : }
558 :
559 : // Fill in the summary on blk 0
560 6029 : let summary = Summary {
561 6029 : magic: IMAGE_FILE_MAGIC,
562 6029 : format_version: STORAGE_FORMAT_VERSION,
563 6029 : tenant_id: self.tenant_shard_id.tenant_id,
564 6029 : timeline_id: self.timeline_id,
565 6029 : key_range: self.key_range.clone(),
566 6029 : lsn: self.lsn,
567 6029 : index_start_blk,
568 6029 : index_root_blk,
569 6029 : };
570 6029 :
571 6029 : let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
572 6029 : Summary::ser_into(&summary, &mut buf)?;
573 6029 : if buf.spilled() {
574 : // This is bad as we only have one free block for the summary
575 0 : warn!(
576 0 : "Used more than one page size for summary buffer: {}",
577 0 : buf.len()
578 0 : );
579 6029 : }
580 6029 : file.seek(SeekFrom::Start(0)).await?;
581 6029 : file.write_all(&buf).await?;
582 :
583 6029 : let metadata = file
584 6029 : .metadata()
585 82 : .await
586 6029 : .context("get metadata to determine file size")?;
587 :
588 6029 : let desc = PersistentLayerDesc::new_img(
589 6029 : self.tenant_shard_id,
590 6029 : self.timeline_id,
591 6029 : self.key_range.clone(),
592 6029 : self.lsn,
593 6029 : metadata.len(),
594 6029 : );
595 6029 :
596 6029 : // Note: Because we open the file in write-only mode, we cannot
597 6029 : // reuse the same VirtualFile for reading later. That's why we don't
598 6029 : // set inner.file here. The first read will have to re-open it.
599 6029 :
600 6029 : // fsync the file
601 6029 : file.sync_all().await?;
602 :
603 : // FIXME: why not carry the virtualfile here, it supports renaming?
604 6029 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
605 :
606 0 : trace!("created image layer {}", layer.local_path());
607 :
608 6029 : Ok(layer)
609 6029 : }
610 : }
611 :
612 : /// A builder object for constructing a new image layer.
613 : ///
614 : /// Usage:
615 : ///
616 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
617 : ///
618 : /// 2. Write the contents by calling `put_page_image` for every key-value
619 : /// pair in the key range.
620 : ///
621 : /// 3. Call `finish`.
622 : ///
623 : /// # Note
624 : ///
625 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
626 : /// possible for the writer to drop before `finish` is actually called. So this
627 : /// could lead to odd temporary files in the directory, exhausting file system.
628 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
629 : /// implementation that cleans up the temporary file in failure. It's not
630 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
631 : /// out some fields, making it impossible to implement `Drop`.
632 : ///
633 : #[must_use]
634 : pub struct ImageLayerWriter {
635 : inner: Option<ImageLayerWriterInner>,
636 : }
637 :
638 : impl ImageLayerWriter {
639 : ///
640 : /// Start building a new image layer.
641 : ///
642 6029 : pub async fn new(
643 6029 : conf: &'static PageServerConf,
644 6029 : timeline_id: TimelineId,
645 6029 : tenant_shard_id: TenantShardId,
646 6029 : key_range: &Range<Key>,
647 6029 : lsn: Lsn,
648 6029 : ) -> anyhow::Result<ImageLayerWriter> {
649 6029 : Ok(Self {
650 6029 : inner: Some(
651 6029 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
652 70 : .await?,
653 : ),
654 : })
655 6029 : }
656 :
657 : ///
658 : /// Write next value to the file.
659 : ///
660 : /// The page versions must be appended in blknum order.
661 : ///
662 229356 : pub async fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
663 229356 : self.inner.as_mut().unwrap().put_image(key, img).await
664 229356 : }
665 :
666 : ///
667 : /// Finish writing the image layer.
668 : ///
669 6029 : pub(crate) async fn finish(
670 6029 : mut self,
671 6029 : timeline: &Arc<Timeline>,
672 6029 : ) -> anyhow::Result<super::ResidentLayer> {
673 6029 : self.inner.take().unwrap().finish(timeline).await
674 6029 : }
675 : }
676 :
677 : impl Drop for ImageLayerWriter {
678 6029 : fn drop(&mut self) {
679 6029 : if let Some(inner) = self.inner.take() {
680 0 : inner.blob_writer.into_inner().remove();
681 6029 : }
682 6029 : }
683 : }
|