Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::PAGE_SZ;
29 : use crate::repository::{Key, Value, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
33 : use crate::tenant::storage_layer::{
34 : LayerAccessStats, ValueReconstructResult, ValueReconstructState,
35 : };
36 : use crate::tenant::timeline::GetVectoredError;
37 : use crate::tenant::{PageReconstructError, Timeline};
38 : use crate::virtual_file::{self, VirtualFile};
39 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
40 : use anyhow::{anyhow, bail, ensure, Context, Result};
41 : use bytes::Bytes;
42 : use camino::{Utf8Path, Utf8PathBuf};
43 : use hex;
44 : use pageserver_api::keyspace::KeySpace;
45 : use pageserver_api::models::LayerAccessKind;
46 : use pageserver_api::shard::TenantShardId;
47 : use rand::{distributions::Alphanumeric, Rng};
48 : use serde::{Deserialize, Serialize};
49 : use std::fs::File;
50 : use std::io::SeekFrom;
51 : use std::ops::Range;
52 : use std::os::unix::prelude::FileExt;
53 : use std::sync::Arc;
54 : use tokio::sync::OnceCell;
55 : use tracing::*;
56 :
57 : use utils::{
58 : bin_ser::BeSer,
59 : id::{TenantId, TimelineId},
60 : lsn::Lsn,
61 : };
62 :
63 : use super::filename::ImageFileName;
64 : use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer, ValuesReconstructState};
65 :
66 : ///
67 : /// Header stored in the beginning of the file
68 : ///
69 : /// After this comes the 'values' part, starting on block 1. After that,
70 : /// the 'index' starts at the block indicated by 'index_start_blk'
71 : ///
72 70 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
73 : pub struct Summary {
74 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
75 : pub magic: u16,
76 : pub format_version: u16,
77 :
78 : pub tenant_id: TenantId,
79 : pub timeline_id: TimelineId,
80 : pub key_range: Range<Key>,
81 : pub lsn: Lsn,
82 :
83 : /// Block number where the 'index' part of the file begins.
84 : pub index_start_blk: u32,
85 : /// Block within the 'index', where the B-tree root page is stored
86 : pub index_root_blk: u32,
87 : // the 'values' part starts after the summary header, on block 1.
88 : }
89 :
90 : impl From<&ImageLayer> for Summary {
91 0 : fn from(layer: &ImageLayer) -> Self {
92 0 : Self::expected(
93 0 : layer.desc.tenant_shard_id.tenant_id,
94 0 : layer.desc.timeline_id,
95 0 : layer.desc.key_range.clone(),
96 0 : layer.lsn,
97 0 : )
98 0 : }
99 : }
100 :
101 : impl Summary {
102 10 : pub(super) fn expected(
103 10 : tenant_id: TenantId,
104 10 : timeline_id: TimelineId,
105 10 : key_range: Range<Key>,
106 10 : lsn: Lsn,
107 10 : ) -> Self {
108 10 : Self {
109 10 : magic: IMAGE_FILE_MAGIC,
110 10 : format_version: STORAGE_FORMAT_VERSION,
111 10 : tenant_id,
112 10 : timeline_id,
113 10 : key_range,
114 10 : lsn,
115 10 :
116 10 : index_start_blk: 0,
117 10 : index_root_blk: 0,
118 10 : }
119 10 : }
120 : }
121 :
122 : /// This is used only from `pagectl`. Within pageserver, all layers are
123 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
124 : pub struct ImageLayer {
125 : path: Utf8PathBuf,
126 : pub desc: PersistentLayerDesc,
127 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
128 : pub lsn: Lsn,
129 : access_stats: LayerAccessStats,
130 : inner: OnceCell<ImageLayerInner>,
131 : }
132 :
133 : impl std::fmt::Debug for ImageLayer {
134 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 0 : use super::RangeDisplayDebug;
136 0 :
137 0 : f.debug_struct("ImageLayer")
138 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
139 0 : .field("file_size", &self.desc.file_size)
140 0 : .field("lsn", &self.lsn)
141 0 : .field("inner", &self.inner)
142 0 : .finish()
143 0 : }
144 : }
145 :
146 : /// ImageLayer is the in-memory data structure associated with an on-disk image
147 : /// file.
148 : pub struct ImageLayerInner {
149 : // values copied from summary
150 : index_start_blk: u32,
151 : index_root_blk: u32,
152 :
153 : lsn: Lsn,
154 :
155 : /// Reader object for reading blocks from the file.
156 : file: FileBlockReader,
157 : }
158 :
159 : impl std::fmt::Debug for ImageLayerInner {
160 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 0 : f.debug_struct("ImageLayerInner")
162 0 : .field("index_start_blk", &self.index_start_blk)
163 0 : .field("index_root_blk", &self.index_root_blk)
164 0 : .finish()
165 0 : }
166 : }
167 :
168 : impl ImageLayerInner {
169 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
170 0 : let file = &self.file;
171 0 : let tree_reader =
172 0 : DiskBtreeReader::<_, KEY_SIZE>::new(self.index_start_blk, self.index_root_blk, file);
173 0 :
174 0 : tree_reader.dump().await?;
175 :
176 0 : tree_reader
177 0 : .visit(
178 0 : &[0u8; KEY_SIZE],
179 0 : VisitDirection::Forwards,
180 0 : |key, value| {
181 0 : println!("key: {} offset {}", hex::encode(key), value);
182 0 : true
183 0 : },
184 0 : ctx,
185 0 : )
186 0 : .await?;
187 :
188 0 : Ok(())
189 0 : }
190 : }
191 :
192 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
193 : impl std::fmt::Display for ImageLayer {
194 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 0 : write!(f, "{}", self.layer_desc().short_id())
196 0 : }
197 : }
198 :
199 : impl AsLayerDesc for ImageLayer {
200 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
201 0 : &self.desc
202 0 : }
203 : }
204 :
205 : impl ImageLayer {
206 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
207 0 : self.desc.dump();
208 0 :
209 0 : if !verbose {
210 0 : return Ok(());
211 0 : }
212 :
213 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
214 :
215 0 : inner.dump(ctx).await?;
216 :
217 0 : Ok(())
218 0 : }
219 :
220 70 : fn temp_path_for(
221 70 : conf: &PageServerConf,
222 70 : timeline_id: TimelineId,
223 70 : tenant_shard_id: TenantShardId,
224 70 : fname: &ImageFileName,
225 70 : ) -> Utf8PathBuf {
226 70 : let rand_string: String = rand::thread_rng()
227 70 : .sample_iter(&Alphanumeric)
228 70 : .take(8)
229 70 : .map(char::from)
230 70 : .collect();
231 70 :
232 70 : conf.timeline_path(&tenant_shard_id, &timeline_id)
233 70 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
234 70 : }
235 :
236 : ///
237 : /// Open the underlying file and read the metadata into memory, if it's
238 : /// not loaded already.
239 : ///
240 0 : async fn load(
241 0 : &self,
242 0 : access_kind: LayerAccessKind,
243 0 : ctx: &RequestContext,
244 0 : ) -> Result<&ImageLayerInner> {
245 0 : self.access_stats.record_access(access_kind, ctx);
246 0 : self.inner
247 0 : .get_or_try_init(|| self.load_inner(ctx))
248 0 : .await
249 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
250 0 : }
251 :
252 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
253 0 : let path = self.path();
254 :
255 0 : let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
256 0 : .await
257 0 : .and_then(|res| res)?;
258 :
259 : // not production code
260 0 : let actual_filename = path.file_name().unwrap().to_owned();
261 0 : let expected_filename = self.layer_desc().filename().file_name();
262 0 :
263 0 : if actual_filename != expected_filename {
264 0 : println!("warning: filename does not match what is expected from in-file summary");
265 0 : println!("actual: {:?}", actual_filename);
266 0 : println!("expected: {:?}", expected_filename);
267 0 : }
268 :
269 0 : Ok(loaded)
270 0 : }
271 :
272 : /// Create an ImageLayer struct representing an existing file on disk.
273 : ///
274 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
275 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
276 0 : let mut summary_buf = vec![0; PAGE_SZ];
277 0 : file.read_exact_at(&mut summary_buf, 0)?;
278 0 : let summary = Summary::des_prefix(&summary_buf)?;
279 0 : let metadata = file
280 0 : .metadata()
281 0 : .context("get file metadata to determine size")?;
282 :
283 : // This function is never used for constructing layers in a running pageserver,
284 : // so it does not need an accurate TenantShardId.
285 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
286 0 :
287 0 : Ok(ImageLayer {
288 0 : path: path.to_path_buf(),
289 0 : desc: PersistentLayerDesc::new_img(
290 0 : tenant_shard_id,
291 0 : summary.timeline_id,
292 0 : summary.key_range,
293 0 : summary.lsn,
294 0 : metadata.len(),
295 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
296 0 : lsn: summary.lsn,
297 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
298 0 : inner: OnceCell::new(),
299 0 : })
300 0 : }
301 :
302 0 : fn path(&self) -> Utf8PathBuf {
303 0 : self.path.clone()
304 0 : }
305 : }
306 :
307 0 : #[derive(thiserror::Error, Debug)]
308 : pub enum RewriteSummaryError {
309 : #[error("magic mismatch")]
310 : MagicMismatch,
311 : #[error(transparent)]
312 : Other(#[from] anyhow::Error),
313 : }
314 :
315 : impl From<std::io::Error> for RewriteSummaryError {
316 0 : fn from(e: std::io::Error) -> Self {
317 0 : Self::Other(anyhow::anyhow!(e))
318 0 : }
319 : }
320 :
321 : impl ImageLayer {
322 0 : pub async fn rewrite_summary<F>(
323 0 : path: &Utf8Path,
324 0 : rewrite: F,
325 0 : ctx: &RequestContext,
326 0 : ) -> Result<(), RewriteSummaryError>
327 0 : where
328 0 : F: Fn(Summary) -> Summary,
329 0 : {
330 0 : let file = VirtualFile::open_with_options(
331 0 : path,
332 0 : virtual_file::OpenOptions::new().read(true).write(true),
333 0 : )
334 0 : .await
335 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
336 0 : let file = FileBlockReader::new(file);
337 0 : let summary_blk = file.read_blk(0, ctx).await?;
338 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
339 0 : let mut file = file.file;
340 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
341 0 : return Err(RewriteSummaryError::MagicMismatch);
342 0 : }
343 0 :
344 0 : let new_summary = rewrite(actual_summary);
345 0 :
346 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
347 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
348 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
349 0 : file.seek(SeekFrom::Start(0)).await?;
350 0 : let (_buf, res) = file.write_all(buf).await;
351 0 : res?;
352 0 : Ok(())
353 0 : }
354 : }
355 :
356 : impl ImageLayerInner {
357 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
358 : /// - inner has the success or transient failure
359 : /// - outer has the permanent failure
360 10 : pub(super) async fn load(
361 10 : path: &Utf8Path,
362 10 : lsn: Lsn,
363 10 : summary: Option<Summary>,
364 10 : ctx: &RequestContext,
365 10 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
366 10 : let file = match VirtualFile::open(path).await {
367 10 : Ok(file) => file,
368 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
369 : };
370 10 : let file = FileBlockReader::new(file);
371 10 : let summary_blk = match file.read_blk(0, ctx).await {
372 10 : Ok(blk) => blk,
373 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
374 : };
375 :
376 : // length is the only way how this could fail, so it's not actually likely at all unless
377 : // read_blk returns wrong sized block.
378 : //
379 : // TODO: confirm and make this into assertion
380 10 : let actual_summary =
381 10 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
382 :
383 10 : if let Some(mut expected_summary) = summary {
384 : // production code path
385 10 : expected_summary.index_start_blk = actual_summary.index_start_blk;
386 10 : expected_summary.index_root_blk = actual_summary.index_root_blk;
387 10 :
388 10 : if actual_summary != expected_summary {
389 0 : bail!(
390 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
391 0 : actual_summary,
392 0 : expected_summary
393 0 : );
394 10 : }
395 0 : }
396 :
397 10 : Ok(Ok(ImageLayerInner {
398 10 : index_start_blk: actual_summary.index_start_blk,
399 10 : index_root_blk: actual_summary.index_root_blk,
400 10 : lsn,
401 10 : file,
402 10 : }))
403 10 : }
404 :
405 512 : pub(super) async fn get_value_reconstruct_data(
406 512 : &self,
407 512 : key: Key,
408 512 : reconstruct_state: &mut ValueReconstructState,
409 512 : ctx: &RequestContext,
410 512 : ) -> anyhow::Result<ValueReconstructResult> {
411 512 : let file = &self.file;
412 512 : let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
413 512 :
414 512 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
415 512 : key.write_to_byte_slice(&mut keybuf);
416 512 : if let Some(offset) = tree_reader
417 512 : .get(
418 512 : &keybuf,
419 512 : &RequestContextBuilder::extend(ctx)
420 512 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
421 512 : .build(),
422 512 : )
423 299 : .await?
424 : {
425 508 : let blob = file
426 508 : .block_cursor()
427 508 : .read_blob(
428 508 : offset,
429 508 : &RequestContextBuilder::extend(ctx)
430 508 : .page_content_kind(PageContentKind::ImageLayerValue)
431 508 : .build(),
432 508 : )
433 153 : .await
434 508 : .with_context(|| format!("failed to read value from offset {}", offset))?;
435 508 : let value = Bytes::from(blob);
436 508 :
437 508 : reconstruct_state.img = Some((self.lsn, value));
438 508 : Ok(ValueReconstructResult::Complete)
439 : } else {
440 4 : Ok(ValueReconstructResult::Missing)
441 : }
442 512 : }
443 :
444 : // Look up the keys in the provided keyspace and update
445 : // the reconstruct state with whatever is found.
446 0 : pub(super) async fn get_values_reconstruct_data(
447 0 : &self,
448 0 : keyspace: KeySpace,
449 0 : reconstruct_state: &mut ValuesReconstructState,
450 0 : ctx: &RequestContext,
451 0 : ) -> Result<(), GetVectoredError> {
452 0 : let file = &self.file;
453 0 : let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
454 0 :
455 0 : let mut offsets = Vec::new();
456 :
457 0 : for range in keyspace.ranges.iter() {
458 0 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
459 0 : range.start.write_to_byte_slice(&mut search_key);
460 0 :
461 0 : tree_reader
462 0 : .visit(
463 0 : &search_key,
464 0 : VisitDirection::Forwards,
465 0 : |raw_key, value| {
466 0 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
467 0 : assert!(key >= range.start);
468 :
469 0 : if !range.contains(&key) {
470 0 : return false;
471 0 : }
472 0 :
473 0 : offsets.push((key, value));
474 0 :
475 0 : true
476 0 : },
477 0 : &RequestContextBuilder::extend(ctx)
478 0 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
479 0 : .build(),
480 0 : )
481 0 : .await
482 0 : .map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
483 : }
484 :
485 0 : let ctx = &RequestContextBuilder::extend(ctx)
486 0 : .page_content_kind(PageContentKind::ImageLayerValue)
487 0 : .build();
488 0 :
489 0 : let cursor = file.block_cursor();
490 0 : let mut buf = Vec::new();
491 0 : for (key, offset) in offsets {
492 0 : let res = cursor.read_blob_into_buf(offset, &mut buf, ctx).await;
493 0 : if let Err(e) = res {
494 0 : reconstruct_state.on_key_error(
495 0 : key,
496 0 : PageReconstructError::from(anyhow!(e).context(format!(
497 0 : "Failed to read blob from virtual file {}",
498 0 : file.file.path
499 0 : ))),
500 0 : );
501 0 :
502 0 : continue;
503 0 : }
504 0 :
505 0 : let blob = Bytes::copy_from_slice(buf.as_slice());
506 0 : reconstruct_state.update_key(&key, self.lsn, Value::Image(blob));
507 : }
508 :
509 0 : Ok(())
510 0 : }
511 : }
512 :
513 : /// A builder object for constructing a new image layer.
514 : ///
515 : /// Usage:
516 : ///
517 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
518 : ///
519 : /// 2. Write the contents by calling `put_page_image` for every key-value
520 : /// pair in the key range.
521 : ///
522 : /// 3. Call `finish`.
523 : ///
524 : struct ImageLayerWriterInner {
525 : conf: &'static PageServerConf,
526 : path: Utf8PathBuf,
527 : timeline_id: TimelineId,
528 : tenant_shard_id: TenantShardId,
529 : key_range: Range<Key>,
530 : lsn: Lsn,
531 :
532 : blob_writer: BlobWriter<false>,
533 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
534 : }
535 :
536 : impl ImageLayerWriterInner {
537 : ///
538 : /// Start building a new image layer.
539 : ///
540 70 : async fn new(
541 70 : conf: &'static PageServerConf,
542 70 : timeline_id: TimelineId,
543 70 : tenant_shard_id: TenantShardId,
544 70 : key_range: &Range<Key>,
545 70 : lsn: Lsn,
546 70 : ) -> anyhow::Result<Self> {
547 70 : // Create the file initially with a temporary filename.
548 70 : // We'll atomically rename it to the final name when we're done.
549 70 : let path = ImageLayer::temp_path_for(
550 70 : conf,
551 70 : timeline_id,
552 70 : tenant_shard_id,
553 70 : &ImageFileName {
554 70 : key_range: key_range.clone(),
555 70 : lsn,
556 70 : },
557 70 : );
558 70 : info!("new image layer {path}");
559 70 : let mut file = {
560 70 : VirtualFile::open_with_options(
561 70 : &path,
562 70 : virtual_file::OpenOptions::new()
563 70 : .write(true)
564 70 : .create_new(true),
565 70 : )
566 70 : .await?
567 : };
568 : // make room for the header block
569 70 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
570 70 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
571 70 :
572 70 : // Initialize the b-tree index builder
573 70 : let block_buf = BlockBuf::new();
574 70 : let tree_builder = DiskBtreeBuilder::new(block_buf);
575 70 :
576 70 : let writer = Self {
577 70 : conf,
578 70 : path,
579 70 : timeline_id,
580 70 : tenant_shard_id,
581 70 : key_range: key_range.clone(),
582 70 : lsn,
583 70 : tree: tree_builder,
584 70 : blob_writer,
585 70 : };
586 70 :
587 70 : Ok(writer)
588 70 : }
589 :
590 : ///
591 : /// Write next value to the file.
592 : ///
593 : /// The page versions must be appended in blknum order.
594 : ///
595 560 : async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
596 560 : ensure!(self.key_range.contains(&key));
597 560 : let (_img, res) = self.blob_writer.write_blob(img).await;
598 : // TODO: re-use the buffer for `img` further upstack
599 560 : let off = res?;
600 :
601 560 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
602 560 : key.write_to_byte_slice(&mut keybuf);
603 560 : self.tree.append(&keybuf, off)?;
604 :
605 560 : Ok(())
606 560 : }
607 :
608 : ///
609 : /// Finish writing the image layer.
610 : ///
611 70 : async fn finish(self, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
612 70 : let index_start_blk =
613 70 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
614 70 :
615 70 : let mut file = self.blob_writer.into_inner();
616 70 :
617 70 : // Write out the index
618 70 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
619 0 : .await?;
620 70 : let (index_root_blk, block_buf) = self.tree.finish()?;
621 140 : for buf in block_buf.blocks {
622 70 : let (_buf, res) = file.write_all(buf).await;
623 70 : res?;
624 : }
625 :
626 : // Fill in the summary on blk 0
627 70 : let summary = Summary {
628 70 : magic: IMAGE_FILE_MAGIC,
629 70 : format_version: STORAGE_FORMAT_VERSION,
630 70 : tenant_id: self.tenant_shard_id.tenant_id,
631 70 : timeline_id: self.timeline_id,
632 70 : key_range: self.key_range.clone(),
633 70 : lsn: self.lsn,
634 70 : index_start_blk,
635 70 : index_root_blk,
636 70 : };
637 70 :
638 70 : let mut buf = Vec::with_capacity(PAGE_SZ);
639 70 : // TODO: could use smallvec here but it's a pain with Slice<T>
640 70 : Summary::ser_into(&summary, &mut buf)?;
641 70 : file.seek(SeekFrom::Start(0)).await?;
642 70 : let (_buf, res) = file.write_all(buf).await;
643 70 : res?;
644 :
645 70 : let metadata = file
646 70 : .metadata()
647 0 : .await
648 70 : .context("get metadata to determine file size")?;
649 :
650 70 : let desc = PersistentLayerDesc::new_img(
651 70 : self.tenant_shard_id,
652 70 : self.timeline_id,
653 70 : self.key_range.clone(),
654 70 : self.lsn,
655 70 : metadata.len(),
656 70 : );
657 70 :
658 70 : // Note: Because we open the file in write-only mode, we cannot
659 70 : // reuse the same VirtualFile for reading later. That's why we don't
660 70 : // set inner.file here. The first read will have to re-open it.
661 70 :
662 70 : // fsync the file
663 70 : file.sync_all().await?;
664 :
665 : // FIXME: why not carry the virtualfile here, it supports renaming?
666 70 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
667 :
668 0 : trace!("created image layer {}", layer.local_path());
669 :
670 70 : Ok(layer)
671 70 : }
672 : }
673 :
674 : /// A builder object for constructing a new image layer.
675 : ///
676 : /// Usage:
677 : ///
678 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
679 : ///
680 : /// 2. Write the contents by calling `put_page_image` for every key-value
681 : /// pair in the key range.
682 : ///
683 : /// 3. Call `finish`.
684 : ///
685 : /// # Note
686 : ///
687 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
688 : /// possible for the writer to drop before `finish` is actually called. So this
689 : /// could lead to odd temporary files in the directory, exhausting file system.
690 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
691 : /// implementation that cleans up the temporary file in failure. It's not
692 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
693 : /// out some fields, making it impossible to implement `Drop`.
694 : ///
695 : #[must_use]
696 : pub struct ImageLayerWriter {
697 : inner: Option<ImageLayerWriterInner>,
698 : }
699 :
700 : impl ImageLayerWriter {
701 : ///
702 : /// Start building a new image layer.
703 : ///
704 70 : pub async fn new(
705 70 : conf: &'static PageServerConf,
706 70 : timeline_id: TimelineId,
707 70 : tenant_shard_id: TenantShardId,
708 70 : key_range: &Range<Key>,
709 70 : lsn: Lsn,
710 70 : ) -> anyhow::Result<ImageLayerWriter> {
711 70 : Ok(Self {
712 70 : inner: Some(
713 70 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
714 70 : .await?,
715 : ),
716 : })
717 70 : }
718 :
719 : ///
720 : /// Write next value to the file.
721 : ///
722 : /// The page versions must be appended in blknum order.
723 : ///
724 560 : pub async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
725 560 : self.inner.as_mut().unwrap().put_image(key, img).await
726 560 : }
727 :
728 : ///
729 : /// Finish writing the image layer.
730 : ///
731 70 : pub(crate) async fn finish(
732 70 : mut self,
733 70 : timeline: &Arc<Timeline>,
734 70 : ) -> anyhow::Result<super::ResidentLayer> {
735 70 : self.inner.take().unwrap().finish(timeline).await
736 70 : }
737 : }
738 :
739 : impl Drop for ImageLayerWriter {
740 70 : fn drop(&mut self) {
741 70 : if let Some(inner) = self.inner.take() {
742 0 : inner.blob_writer.into_inner().remove();
743 70 : }
744 70 : }
745 : }
|