Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::{self, FileId, PAGE_SZ};
29 : use crate::repository::{Key, Value, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
33 : use crate::tenant::storage_layer::{
34 : LayerAccessStats, ValueReconstructResult, ValueReconstructState,
35 : };
36 : use crate::tenant::timeline::GetVectoredError;
37 : use crate::tenant::vectored_blob_io::{
38 : BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
39 : };
40 : use crate::tenant::{PageReconstructError, Timeline};
41 : use crate::virtual_file::{self, VirtualFile};
42 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
43 : use anyhow::{anyhow, bail, ensure, Context, Result};
44 : use bytes::{Bytes, BytesMut};
45 : use camino::{Utf8Path, Utf8PathBuf};
46 : use hex;
47 : use pageserver_api::keyspace::KeySpace;
48 : use pageserver_api::models::LayerAccessKind;
49 : use pageserver_api::shard::TenantShardId;
50 : use rand::{distributions::Alphanumeric, Rng};
51 : use serde::{Deserialize, Serialize};
52 : use std::fs::File;
53 : use std::io::SeekFrom;
54 : use std::ops::Range;
55 : use std::os::unix::prelude::FileExt;
56 : use std::sync::Arc;
57 : use tokio::sync::OnceCell;
58 : use tracing::*;
59 :
60 : use utils::{
61 : bin_ser::BeSer,
62 : id::{TenantId, TimelineId},
63 : lsn::Lsn,
64 : };
65 :
66 : use super::filename::ImageFileName;
67 : use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer, ValuesReconstructState};
68 :
69 : ///
70 : /// Header stored in the beginning of the file
71 : ///
72 : /// After this comes the 'values' part, starting on block 1. After that,
73 : /// the 'index' starts at the block indicated by 'index_start_blk'
74 : ///
75 74 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
76 : pub struct Summary {
77 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
78 : pub magic: u16,
79 : pub format_version: u16,
80 :
81 : pub tenant_id: TenantId,
82 : pub timeline_id: TimelineId,
83 : pub key_range: Range<Key>,
84 : pub lsn: Lsn,
85 :
86 : /// Block number where the 'index' part of the file begins.
87 : pub index_start_blk: u32,
88 : /// Block within the 'index', where the B-tree root page is stored
89 : pub index_root_blk: u32,
90 : // the 'values' part starts after the summary header, on block 1.
91 : }
92 :
93 : impl From<&ImageLayer> for Summary {
94 0 : fn from(layer: &ImageLayer) -> Self {
95 0 : Self::expected(
96 0 : layer.desc.tenant_shard_id.tenant_id,
97 0 : layer.desc.timeline_id,
98 0 : layer.desc.key_range.clone(),
99 0 : layer.lsn,
100 0 : )
101 0 : }
102 : }
103 :
104 : impl Summary {
105 10 : pub(super) fn expected(
106 10 : tenant_id: TenantId,
107 10 : timeline_id: TimelineId,
108 10 : key_range: Range<Key>,
109 10 : lsn: Lsn,
110 10 : ) -> Self {
111 10 : Self {
112 10 : magic: IMAGE_FILE_MAGIC,
113 10 : format_version: STORAGE_FORMAT_VERSION,
114 10 : tenant_id,
115 10 : timeline_id,
116 10 : key_range,
117 10 : lsn,
118 10 :
119 10 : index_start_blk: 0,
120 10 : index_root_blk: 0,
121 10 : }
122 10 : }
123 : }
124 :
125 : /// This is used only from `pagectl`. Within pageserver, all layers are
126 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
127 : pub struct ImageLayer {
128 : path: Utf8PathBuf,
129 : pub desc: PersistentLayerDesc,
130 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
131 : pub lsn: Lsn,
132 : access_stats: LayerAccessStats,
133 : inner: OnceCell<ImageLayerInner>,
134 : }
135 :
136 : impl std::fmt::Debug for ImageLayer {
137 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 0 : use super::RangeDisplayDebug;
139 0 :
140 0 : f.debug_struct("ImageLayer")
141 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
142 0 : .field("file_size", &self.desc.file_size)
143 0 : .field("lsn", &self.lsn)
144 0 : .field("inner", &self.inner)
145 0 : .finish()
146 0 : }
147 : }
148 :
149 : /// ImageLayer is the in-memory data structure associated with an on-disk image
150 : /// file.
151 : pub struct ImageLayerInner {
152 : // values copied from summary
153 : index_start_blk: u32,
154 : index_root_blk: u32,
155 :
156 : lsn: Lsn,
157 :
158 : file: VirtualFile,
159 : file_id: FileId,
160 :
161 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
162 : }
163 :
164 : impl std::fmt::Debug for ImageLayerInner {
165 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 0 : f.debug_struct("ImageLayerInner")
167 0 : .field("index_start_blk", &self.index_start_blk)
168 0 : .field("index_root_blk", &self.index_root_blk)
169 0 : .finish()
170 0 : }
171 : }
172 :
173 : impl ImageLayerInner {
174 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
175 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
176 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
177 0 : self.index_start_blk,
178 0 : self.index_root_blk,
179 0 : block_reader,
180 0 : );
181 0 :
182 0 : tree_reader.dump().await?;
183 :
184 0 : tree_reader
185 0 : .visit(
186 0 : &[0u8; KEY_SIZE],
187 0 : VisitDirection::Forwards,
188 0 : |key, value| {
189 0 : println!("key: {} offset {}", hex::encode(key), value);
190 0 : true
191 0 : },
192 0 : ctx,
193 0 : )
194 0 : .await?;
195 :
196 0 : Ok(())
197 0 : }
198 : }
199 :
200 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
201 : impl std::fmt::Display for ImageLayer {
202 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 0 : write!(f, "{}", self.layer_desc().short_id())
204 0 : }
205 : }
206 :
207 : impl AsLayerDesc for ImageLayer {
208 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
209 0 : &self.desc
210 0 : }
211 : }
212 :
213 : impl ImageLayer {
214 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
215 0 : self.desc.dump();
216 0 :
217 0 : if !verbose {
218 0 : return Ok(());
219 0 : }
220 :
221 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
222 :
223 0 : inner.dump(ctx).await?;
224 :
225 0 : Ok(())
226 0 : }
227 :
228 74 : fn temp_path_for(
229 74 : conf: &PageServerConf,
230 74 : timeline_id: TimelineId,
231 74 : tenant_shard_id: TenantShardId,
232 74 : fname: &ImageFileName,
233 74 : ) -> Utf8PathBuf {
234 74 : let rand_string: String = rand::thread_rng()
235 74 : .sample_iter(&Alphanumeric)
236 74 : .take(8)
237 74 : .map(char::from)
238 74 : .collect();
239 74 :
240 74 : conf.timeline_path(&tenant_shard_id, &timeline_id)
241 74 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
242 74 : }
243 :
244 : ///
245 : /// Open the underlying file and read the metadata into memory, if it's
246 : /// not loaded already.
247 : ///
248 0 : async fn load(
249 0 : &self,
250 0 : access_kind: LayerAccessKind,
251 0 : ctx: &RequestContext,
252 0 : ) -> Result<&ImageLayerInner> {
253 0 : self.access_stats.record_access(access_kind, ctx);
254 0 : self.inner
255 0 : .get_or_try_init(|| self.load_inner(ctx))
256 0 : .await
257 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
258 0 : }
259 :
260 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
261 0 : let path = self.path();
262 :
263 0 : let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
264 0 : .await
265 0 : .and_then(|res| res)?;
266 :
267 : // not production code
268 0 : let actual_filename = path.file_name().unwrap().to_owned();
269 0 : let expected_filename = self.layer_desc().filename().file_name();
270 0 :
271 0 : if actual_filename != expected_filename {
272 0 : println!("warning: filename does not match what is expected from in-file summary");
273 0 : println!("actual: {:?}", actual_filename);
274 0 : println!("expected: {:?}", expected_filename);
275 0 : }
276 :
277 0 : Ok(loaded)
278 0 : }
279 :
280 : /// Create an ImageLayer struct representing an existing file on disk.
281 : ///
282 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
283 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
284 0 : let mut summary_buf = vec![0; PAGE_SZ];
285 0 : file.read_exact_at(&mut summary_buf, 0)?;
286 0 : let summary = Summary::des_prefix(&summary_buf)?;
287 0 : let metadata = file
288 0 : .metadata()
289 0 : .context("get file metadata to determine size")?;
290 :
291 : // This function is never used for constructing layers in a running pageserver,
292 : // so it does not need an accurate TenantShardId.
293 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
294 0 :
295 0 : Ok(ImageLayer {
296 0 : path: path.to_path_buf(),
297 0 : desc: PersistentLayerDesc::new_img(
298 0 : tenant_shard_id,
299 0 : summary.timeline_id,
300 0 : summary.key_range,
301 0 : summary.lsn,
302 0 : metadata.len(),
303 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
304 0 : lsn: summary.lsn,
305 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
306 0 : inner: OnceCell::new(),
307 0 : })
308 0 : }
309 :
310 0 : fn path(&self) -> Utf8PathBuf {
311 0 : self.path.clone()
312 0 : }
313 : }
314 :
315 0 : #[derive(thiserror::Error, Debug)]
316 : pub enum RewriteSummaryError {
317 : #[error("magic mismatch")]
318 : MagicMismatch,
319 : #[error(transparent)]
320 : Other(#[from] anyhow::Error),
321 : }
322 :
323 : impl From<std::io::Error> for RewriteSummaryError {
324 0 : fn from(e: std::io::Error) -> Self {
325 0 : Self::Other(anyhow::anyhow!(e))
326 0 : }
327 : }
328 :
329 : impl ImageLayer {
330 0 : pub async fn rewrite_summary<F>(
331 0 : path: &Utf8Path,
332 0 : rewrite: F,
333 0 : ctx: &RequestContext,
334 0 : ) -> Result<(), RewriteSummaryError>
335 0 : where
336 0 : F: Fn(Summary) -> Summary,
337 0 : {
338 0 : let mut file = VirtualFile::open_with_options(
339 0 : path,
340 0 : virtual_file::OpenOptions::new().read(true).write(true),
341 0 : )
342 0 : .await
343 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
344 0 : let file_id = page_cache::next_file_id();
345 0 : let block_reader = FileBlockReader::new(&file, file_id);
346 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
347 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
348 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
349 0 : return Err(RewriteSummaryError::MagicMismatch);
350 0 : }
351 0 :
352 0 : let new_summary = rewrite(actual_summary);
353 0 :
354 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
355 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
356 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
357 0 : file.seek(SeekFrom::Start(0)).await?;
358 0 : let (_buf, res) = file.write_all(buf).await;
359 0 : res?;
360 0 : Ok(())
361 0 : }
362 : }
363 :
364 : impl ImageLayerInner {
365 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
366 : /// - inner has the success or transient failure
367 : /// - outer has the permanent failure
368 10 : pub(super) async fn load(
369 10 : path: &Utf8Path,
370 10 : lsn: Lsn,
371 10 : summary: Option<Summary>,
372 10 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
373 10 : ctx: &RequestContext,
374 10 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
375 10 : let file = match VirtualFile::open(path).await {
376 10 : Ok(file) => file,
377 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
378 : };
379 10 : let file_id = page_cache::next_file_id();
380 10 : let block_reader = FileBlockReader::new(&file, file_id);
381 10 : let summary_blk = match block_reader.read_blk(0, ctx).await {
382 10 : Ok(blk) => blk,
383 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
384 : };
385 :
386 : // length is the only way how this could fail, so it's not actually likely at all unless
387 : // read_blk returns wrong sized block.
388 : //
389 : // TODO: confirm and make this into assertion
390 10 : let actual_summary =
391 10 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
392 :
393 10 : if let Some(mut expected_summary) = summary {
394 : // production code path
395 10 : expected_summary.index_start_blk = actual_summary.index_start_blk;
396 10 : expected_summary.index_root_blk = actual_summary.index_root_blk;
397 10 :
398 10 : if actual_summary != expected_summary {
399 0 : bail!(
400 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
401 0 : actual_summary,
402 0 : expected_summary
403 0 : );
404 10 : }
405 0 : }
406 :
407 10 : Ok(Ok(ImageLayerInner {
408 10 : index_start_blk: actual_summary.index_start_blk,
409 10 : index_root_blk: actual_summary.index_root_blk,
410 10 : lsn,
411 10 : file,
412 10 : file_id,
413 10 : max_vectored_read_bytes,
414 10 : }))
415 10 : }
416 :
417 512 : pub(super) async fn get_value_reconstruct_data(
418 512 : &self,
419 512 : key: Key,
420 512 : reconstruct_state: &mut ValueReconstructState,
421 512 : ctx: &RequestContext,
422 512 : ) -> anyhow::Result<ValueReconstructResult> {
423 512 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
424 512 : let tree_reader =
425 512 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
426 512 :
427 512 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
428 512 : key.write_to_byte_slice(&mut keybuf);
429 512 : if let Some(offset) = tree_reader
430 512 : .get(
431 512 : &keybuf,
432 512 : &RequestContextBuilder::extend(ctx)
433 512 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
434 512 : .build(),
435 512 : )
436 287 : .await?
437 : {
438 508 : let blob = block_reader
439 508 : .block_cursor()
440 508 : .read_blob(
441 508 : offset,
442 508 : &RequestContextBuilder::extend(ctx)
443 508 : .page_content_kind(PageContentKind::ImageLayerValue)
444 508 : .build(),
445 508 : )
446 148 : .await
447 508 : .with_context(|| format!("failed to read value from offset {}", offset))?;
448 508 : let value = Bytes::from(blob);
449 508 :
450 508 : reconstruct_state.img = Some((self.lsn, value));
451 508 : Ok(ValueReconstructResult::Complete)
452 : } else {
453 4 : Ok(ValueReconstructResult::Missing)
454 : }
455 512 : }
456 :
457 : // Look up the keys in the provided keyspace and update
458 : // the reconstruct state with whatever is found.
459 0 : pub(super) async fn get_values_reconstruct_data(
460 0 : &self,
461 0 : keyspace: KeySpace,
462 0 : reconstruct_state: &mut ValuesReconstructState,
463 0 : ctx: &RequestContext,
464 0 : ) -> Result<(), GetVectoredError> {
465 0 : let reads = self
466 0 : .plan_reads(keyspace, ctx)
467 0 : .await
468 0 : .map_err(GetVectoredError::Other)?;
469 :
470 0 : self.do_reads_and_update_state(reads, reconstruct_state)
471 0 : .await;
472 :
473 0 : Ok(())
474 0 : }
475 :
476 0 : async fn plan_reads(
477 0 : &self,
478 0 : keyspace: KeySpace,
479 0 : ctx: &RequestContext,
480 0 : ) -> anyhow::Result<Vec<VectoredRead>> {
481 0 : let mut planner = VectoredReadPlanner::new(
482 0 : self.max_vectored_read_bytes
483 0 : .expect("Layer is loaded with max vectored bytes config")
484 0 : .0
485 0 : .into(),
486 0 : );
487 0 :
488 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
489 0 : let tree_reader =
490 0 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
491 :
492 0 : for range in keyspace.ranges.iter() {
493 0 : let mut range_end_handled = false;
494 0 :
495 0 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
496 0 : range.start.write_to_byte_slice(&mut search_key);
497 0 :
498 0 : tree_reader
499 0 : .visit(
500 0 : &search_key,
501 0 : VisitDirection::Forwards,
502 0 : |raw_key, offset| {
503 0 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
504 0 : assert!(key >= range.start);
505 :
506 0 : if key >= range.end {
507 0 : planner.handle_range_end(offset);
508 0 : range_end_handled = true;
509 0 : false
510 : } else {
511 0 : planner.handle(key, self.lsn, offset, BlobFlag::None);
512 0 : true
513 : }
514 0 : },
515 0 : &RequestContextBuilder::extend(ctx)
516 0 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
517 0 : .build(),
518 0 : )
519 0 : .await
520 0 : .map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
521 :
522 0 : if !range_end_handled {
523 0 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
524 0 : planner.handle_range_end(payload_end);
525 0 : }
526 : }
527 :
528 0 : Ok(planner.finish())
529 0 : }
530 :
531 0 : async fn do_reads_and_update_state(
532 0 : &self,
533 0 : reads: Vec<VectoredRead>,
534 0 : reconstruct_state: &mut ValuesReconstructState,
535 0 : ) {
536 0 : let max_vectored_read_bytes = self
537 0 : .max_vectored_read_bytes
538 0 : .expect("Layer is loaded with max vectored bytes config")
539 0 : .0
540 0 : .into();
541 0 :
542 0 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
543 0 : for read in reads.into_iter() {
544 0 : let buf = BytesMut::with_capacity(max_vectored_read_bytes);
545 0 : let res = vectored_blob_reader.read_blobs(&read, buf).await;
546 :
547 0 : match res {
548 0 : Ok(blobs_buf) => {
549 0 : let frozen_buf = blobs_buf.buf.freeze();
550 :
551 0 : for meta in blobs_buf.blobs.iter() {
552 0 : let img_buf = frozen_buf.slice(meta.start..meta.end);
553 0 : reconstruct_state.update_key(
554 0 : &meta.meta.key,
555 0 : self.lsn,
556 0 : Value::Image(img_buf),
557 0 : );
558 0 : }
559 : }
560 0 : Err(err) => {
561 0 : let kind = err.kind();
562 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
563 0 : reconstruct_state.on_key_error(
564 0 : blob_meta.key,
565 0 : PageReconstructError::from(anyhow!(
566 0 : "Failed to read blobs from virtual file {}: {}",
567 0 : self.file.path,
568 0 : kind
569 0 : )),
570 0 : );
571 0 : }
572 : }
573 : };
574 : }
575 0 : }
576 : }
577 :
578 : /// A builder object for constructing a new image layer.
579 : ///
580 : /// Usage:
581 : ///
582 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
583 : ///
584 : /// 2. Write the contents by calling `put_page_image` for every key-value
585 : /// pair in the key range.
586 : ///
587 : /// 3. Call `finish`.
588 : ///
589 : struct ImageLayerWriterInner {
590 : conf: &'static PageServerConf,
591 : path: Utf8PathBuf,
592 : timeline_id: TimelineId,
593 : tenant_shard_id: TenantShardId,
594 : key_range: Range<Key>,
595 : lsn: Lsn,
596 :
597 : blob_writer: BlobWriter<false>,
598 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
599 : }
600 :
601 : impl ImageLayerWriterInner {
602 : ///
603 : /// Start building a new image layer.
604 : ///
605 74 : async fn new(
606 74 : conf: &'static PageServerConf,
607 74 : timeline_id: TimelineId,
608 74 : tenant_shard_id: TenantShardId,
609 74 : key_range: &Range<Key>,
610 74 : lsn: Lsn,
611 74 : ) -> anyhow::Result<Self> {
612 74 : // Create the file initially with a temporary filename.
613 74 : // We'll atomically rename it to the final name when we're done.
614 74 : let path = ImageLayer::temp_path_for(
615 74 : conf,
616 74 : timeline_id,
617 74 : tenant_shard_id,
618 74 : &ImageFileName {
619 74 : key_range: key_range.clone(),
620 74 : lsn,
621 74 : },
622 74 : );
623 74 : info!("new image layer {path}");
624 74 : let mut file = {
625 74 : VirtualFile::open_with_options(
626 74 : &path,
627 74 : virtual_file::OpenOptions::new()
628 74 : .write(true)
629 74 : .create_new(true),
630 74 : )
631 74 : .await?
632 : };
633 : // make room for the header block
634 74 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
635 74 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
636 74 :
637 74 : // Initialize the b-tree index builder
638 74 : let block_buf = BlockBuf::new();
639 74 : let tree_builder = DiskBtreeBuilder::new(block_buf);
640 74 :
641 74 : let writer = Self {
642 74 : conf,
643 74 : path,
644 74 : timeline_id,
645 74 : tenant_shard_id,
646 74 : key_range: key_range.clone(),
647 74 : lsn,
648 74 : tree: tree_builder,
649 74 : blob_writer,
650 74 : };
651 74 :
652 74 : Ok(writer)
653 74 : }
654 :
655 : ///
656 : /// Write next value to the file.
657 : ///
658 : /// The page versions must be appended in blknum order.
659 : ///
660 592 : async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
661 592 : ensure!(self.key_range.contains(&key));
662 592 : let (_img, res) = self.blob_writer.write_blob(img).await;
663 : // TODO: re-use the buffer for `img` further upstack
664 592 : let off = res?;
665 :
666 592 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
667 592 : key.write_to_byte_slice(&mut keybuf);
668 592 : self.tree.append(&keybuf, off)?;
669 :
670 592 : Ok(())
671 592 : }
672 :
673 : ///
674 : /// Finish writing the image layer.
675 : ///
676 74 : async fn finish(self, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
677 74 : let index_start_blk =
678 74 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
679 74 :
680 74 : let mut file = self.blob_writer.into_inner();
681 74 :
682 74 : // Write out the index
683 74 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
684 0 : .await?;
685 74 : let (index_root_blk, block_buf) = self.tree.finish()?;
686 148 : for buf in block_buf.blocks {
687 74 : let (_buf, res) = file.write_all(buf).await;
688 74 : res?;
689 : }
690 :
691 : // Fill in the summary on blk 0
692 74 : let summary = Summary {
693 74 : magic: IMAGE_FILE_MAGIC,
694 74 : format_version: STORAGE_FORMAT_VERSION,
695 74 : tenant_id: self.tenant_shard_id.tenant_id,
696 74 : timeline_id: self.timeline_id,
697 74 : key_range: self.key_range.clone(),
698 74 : lsn: self.lsn,
699 74 : index_start_blk,
700 74 : index_root_blk,
701 74 : };
702 74 :
703 74 : let mut buf = Vec::with_capacity(PAGE_SZ);
704 74 : // TODO: could use smallvec here but it's a pain with Slice<T>
705 74 : Summary::ser_into(&summary, &mut buf)?;
706 74 : file.seek(SeekFrom::Start(0)).await?;
707 74 : let (_buf, res) = file.write_all(buf).await;
708 74 : res?;
709 :
710 74 : let metadata = file
711 74 : .metadata()
712 0 : .await
713 74 : .context("get metadata to determine file size")?;
714 :
715 74 : let desc = PersistentLayerDesc::new_img(
716 74 : self.tenant_shard_id,
717 74 : self.timeline_id,
718 74 : self.key_range.clone(),
719 74 : self.lsn,
720 74 : metadata.len(),
721 74 : );
722 74 :
723 74 : // Note: Because we open the file in write-only mode, we cannot
724 74 : // reuse the same VirtualFile for reading later. That's why we don't
725 74 : // set inner.file here. The first read will have to re-open it.
726 74 :
727 74 : // fsync the file
728 74 : file.sync_all().await?;
729 :
730 : // FIXME: why not carry the virtualfile here, it supports renaming?
731 74 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
732 :
733 0 : trace!("created image layer {}", layer.local_path());
734 :
735 74 : Ok(layer)
736 74 : }
737 : }
738 :
739 : /// A builder object for constructing a new image layer.
740 : ///
741 : /// Usage:
742 : ///
743 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
744 : ///
745 : /// 2. Write the contents by calling `put_page_image` for every key-value
746 : /// pair in the key range.
747 : ///
748 : /// 3. Call `finish`.
749 : ///
750 : /// # Note
751 : ///
752 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
753 : /// possible for the writer to drop before `finish` is actually called. So this
754 : /// could lead to odd temporary files in the directory, exhausting file system.
755 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
756 : /// implementation that cleans up the temporary file in failure. It's not
757 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
758 : /// out some fields, making it impossible to implement `Drop`.
759 : ///
760 : #[must_use]
761 : pub struct ImageLayerWriter {
762 : inner: Option<ImageLayerWriterInner>,
763 : }
764 :
765 : impl ImageLayerWriter {
766 : ///
767 : /// Start building a new image layer.
768 : ///
769 74 : pub async fn new(
770 74 : conf: &'static PageServerConf,
771 74 : timeline_id: TimelineId,
772 74 : tenant_shard_id: TenantShardId,
773 74 : key_range: &Range<Key>,
774 74 : lsn: Lsn,
775 74 : ) -> anyhow::Result<ImageLayerWriter> {
776 74 : Ok(Self {
777 74 : inner: Some(
778 74 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
779 74 : .await?,
780 : ),
781 : })
782 74 : }
783 :
784 : ///
785 : /// Write next value to the file.
786 : ///
787 : /// The page versions must be appended in blknum order.
788 : ///
789 592 : pub async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
790 592 : self.inner.as_mut().unwrap().put_image(key, img).await
791 592 : }
792 :
793 : ///
794 : /// Finish writing the image layer.
795 : ///
796 74 : pub(crate) async fn finish(
797 74 : mut self,
798 74 : timeline: &Arc<Timeline>,
799 74 : ) -> anyhow::Result<super::ResidentLayer> {
800 74 : self.inner.take().unwrap().finish(timeline).await
801 74 : }
802 : }
803 :
804 : impl Drop for ImageLayerWriter {
805 74 : fn drop(&mut self) {
806 74 : if let Some(inner) = self.inner.take() {
807 0 : inner.blob_writer.into_inner().remove();
808 74 : }
809 74 : }
810 : }
|