Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::{self, FileId, PAGE_SZ};
29 : use crate::repository::{Key, Value, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
33 : use crate::tenant::storage_layer::{
34 : LayerAccessStats, ValueReconstructResult, ValueReconstructState,
35 : };
36 : use crate::tenant::timeline::GetVectoredError;
37 : use crate::tenant::vectored_blob_io::{
38 : BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
39 : };
40 : use crate::tenant::{PageReconstructError, Timeline};
41 : use crate::virtual_file::{self, VirtualFile};
42 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
43 : use anyhow::{anyhow, bail, ensure, Context, Result};
44 : use bytes::{Bytes, BytesMut};
45 : use camino::{Utf8Path, Utf8PathBuf};
46 : use hex;
47 : use itertools::Itertools;
48 : use pageserver_api::keyspace::KeySpace;
49 : use pageserver_api::models::LayerAccessKind;
50 : use pageserver_api::shard::TenantShardId;
51 : use rand::{distributions::Alphanumeric, Rng};
52 : use serde::{Deserialize, Serialize};
53 : use std::fs::File;
54 : use std::io::SeekFrom;
55 : use std::ops::Range;
56 : use std::os::unix::prelude::FileExt;
57 : use std::sync::Arc;
58 : use tokio::sync::OnceCell;
59 : use tokio_stream::StreamExt;
60 : use tracing::*;
61 :
62 : use utils::{
63 : bin_ser::BeSer,
64 : id::{TenantId, TimelineId},
65 : lsn::Lsn,
66 : };
67 :
68 : use super::filename::ImageFileName;
69 : use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer, ValuesReconstructState};
70 :
71 : ///
72 : /// Header stored in the beginning of the file
73 : ///
74 : /// After this comes the 'values' part, starting on block 1. After that,
75 : /// the 'index' starts at the block indicated by 'index_start_blk'
76 : ///
77 24 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
78 : pub struct Summary {
79 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
80 : pub magic: u16,
81 : pub format_version: u16,
82 :
83 : pub tenant_id: TenantId,
84 : pub timeline_id: TimelineId,
85 : pub key_range: Range<Key>,
86 : pub lsn: Lsn,
87 :
88 : /// Block number where the 'index' part of the file begins.
89 : pub index_start_blk: u32,
90 : /// Block within the 'index', where the B-tree root page is stored
91 : pub index_root_blk: u32,
92 : // the 'values' part starts after the summary header, on block 1.
93 : }
94 :
95 : impl From<&ImageLayer> for Summary {
96 0 : fn from(layer: &ImageLayer) -> Self {
97 0 : Self::expected(
98 0 : layer.desc.tenant_shard_id.tenant_id,
99 0 : layer.desc.timeline_id,
100 0 : layer.desc.key_range.clone(),
101 0 : layer.lsn,
102 0 : )
103 0 : }
104 : }
105 :
106 : impl Summary {
107 24 : pub(super) fn expected(
108 24 : tenant_id: TenantId,
109 24 : timeline_id: TimelineId,
110 24 : key_range: Range<Key>,
111 24 : lsn: Lsn,
112 24 : ) -> Self {
113 24 : Self {
114 24 : magic: IMAGE_FILE_MAGIC,
115 24 : format_version: STORAGE_FORMAT_VERSION,
116 24 : tenant_id,
117 24 : timeline_id,
118 24 : key_range,
119 24 : lsn,
120 24 :
121 24 : index_start_blk: 0,
122 24 : index_root_blk: 0,
123 24 : }
124 24 : }
125 : }
126 :
127 : /// This is used only from `pagectl`. Within pageserver, all layers are
128 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
129 : pub struct ImageLayer {
130 : path: Utf8PathBuf,
131 : pub desc: PersistentLayerDesc,
132 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
133 : pub lsn: Lsn,
134 : access_stats: LayerAccessStats,
135 : inner: OnceCell<ImageLayerInner>,
136 : }
137 :
138 : impl std::fmt::Debug for ImageLayer {
139 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 0 : use super::RangeDisplayDebug;
141 0 :
142 0 : f.debug_struct("ImageLayer")
143 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
144 0 : .field("file_size", &self.desc.file_size)
145 0 : .field("lsn", &self.lsn)
146 0 : .field("inner", &self.inner)
147 0 : .finish()
148 0 : }
149 : }
150 :
151 : /// ImageLayer is the in-memory data structure associated with an on-disk image
152 : /// file.
153 : pub struct ImageLayerInner {
154 : // values copied from summary
155 : index_start_blk: u32,
156 : index_root_blk: u32,
157 :
158 : lsn: Lsn,
159 :
160 : file: VirtualFile,
161 : file_id: FileId,
162 :
163 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
164 : }
165 :
166 : impl std::fmt::Debug for ImageLayerInner {
167 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 0 : f.debug_struct("ImageLayerInner")
169 0 : .field("index_start_blk", &self.index_start_blk)
170 0 : .field("index_root_blk", &self.index_root_blk)
171 0 : .finish()
172 0 : }
173 : }
174 :
175 : impl ImageLayerInner {
176 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
177 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
178 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
179 0 : self.index_start_blk,
180 0 : self.index_root_blk,
181 0 : block_reader,
182 0 : );
183 0 :
184 0 : tree_reader.dump().await?;
185 :
186 0 : tree_reader
187 0 : .visit(
188 0 : &[0u8; KEY_SIZE],
189 0 : VisitDirection::Forwards,
190 0 : |key, value| {
191 0 : println!("key: {} offset {}", hex::encode(key), value);
192 0 : true
193 0 : },
194 0 : ctx,
195 0 : )
196 0 : .await?;
197 :
198 0 : Ok(())
199 0 : }
200 : }
201 :
202 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
203 : impl std::fmt::Display for ImageLayer {
204 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 0 : write!(f, "{}", self.layer_desc().short_id())
206 0 : }
207 : }
208 :
209 : impl AsLayerDesc for ImageLayer {
210 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
211 0 : &self.desc
212 0 : }
213 : }
214 :
215 : impl ImageLayer {
216 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
217 0 : self.desc.dump();
218 0 :
219 0 : if !verbose {
220 0 : return Ok(());
221 0 : }
222 :
223 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
224 :
225 0 : inner.dump(ctx).await?;
226 :
227 0 : Ok(())
228 0 : }
229 :
230 104 : fn temp_path_for(
231 104 : conf: &PageServerConf,
232 104 : timeline_id: TimelineId,
233 104 : tenant_shard_id: TenantShardId,
234 104 : fname: &ImageFileName,
235 104 : ) -> Utf8PathBuf {
236 104 : let rand_string: String = rand::thread_rng()
237 104 : .sample_iter(&Alphanumeric)
238 104 : .take(8)
239 104 : .map(char::from)
240 104 : .collect();
241 104 :
242 104 : conf.timeline_path(&tenant_shard_id, &timeline_id)
243 104 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
244 104 : }
245 :
246 : ///
247 : /// Open the underlying file and read the metadata into memory, if it's
248 : /// not loaded already.
249 : ///
250 0 : async fn load(
251 0 : &self,
252 0 : access_kind: LayerAccessKind,
253 0 : ctx: &RequestContext,
254 0 : ) -> Result<&ImageLayerInner> {
255 0 : self.access_stats.record_access(access_kind, ctx);
256 0 : self.inner
257 0 : .get_or_try_init(|| self.load_inner(ctx))
258 0 : .await
259 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
260 0 : }
261 :
262 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
263 0 : let path = self.path();
264 :
265 0 : let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
266 0 : .await
267 0 : .and_then(|res| res)?;
268 :
269 : // not production code
270 0 : let actual_filename = path.file_name().unwrap().to_owned();
271 0 : let expected_filename = self.layer_desc().filename().file_name();
272 0 :
273 0 : if actual_filename != expected_filename {
274 0 : println!("warning: filename does not match what is expected from in-file summary");
275 0 : println!("actual: {:?}", actual_filename);
276 0 : println!("expected: {:?}", expected_filename);
277 0 : }
278 :
279 0 : Ok(loaded)
280 0 : }
281 :
282 : /// Create an ImageLayer struct representing an existing file on disk.
283 : ///
284 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
285 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
286 0 : let mut summary_buf = vec![0; PAGE_SZ];
287 0 : file.read_exact_at(&mut summary_buf, 0)?;
288 0 : let summary = Summary::des_prefix(&summary_buf)?;
289 0 : let metadata = file
290 0 : .metadata()
291 0 : .context("get file metadata to determine size")?;
292 :
293 : // This function is never used for constructing layers in a running pageserver,
294 : // so it does not need an accurate TenantShardId.
295 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
296 0 :
297 0 : Ok(ImageLayer {
298 0 : path: path.to_path_buf(),
299 0 : desc: PersistentLayerDesc::new_img(
300 0 : tenant_shard_id,
301 0 : summary.timeline_id,
302 0 : summary.key_range,
303 0 : summary.lsn,
304 0 : metadata.len(),
305 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
306 0 : lsn: summary.lsn,
307 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
308 0 : inner: OnceCell::new(),
309 0 : })
310 0 : }
311 :
312 0 : fn path(&self) -> Utf8PathBuf {
313 0 : self.path.clone()
314 0 : }
315 : }
316 :
317 0 : #[derive(thiserror::Error, Debug)]
318 : pub enum RewriteSummaryError {
319 : #[error("magic mismatch")]
320 : MagicMismatch,
321 : #[error(transparent)]
322 : Other(#[from] anyhow::Error),
323 : }
324 :
325 : impl From<std::io::Error> for RewriteSummaryError {
326 0 : fn from(e: std::io::Error) -> Self {
327 0 : Self::Other(anyhow::anyhow!(e))
328 0 : }
329 : }
330 :
331 : impl ImageLayer {
332 0 : pub async fn rewrite_summary<F>(
333 0 : path: &Utf8Path,
334 0 : rewrite: F,
335 0 : ctx: &RequestContext,
336 0 : ) -> Result<(), RewriteSummaryError>
337 0 : where
338 0 : F: Fn(Summary) -> Summary,
339 0 : {
340 0 : let mut file = VirtualFile::open_with_options(
341 0 : path,
342 0 : virtual_file::OpenOptions::new().read(true).write(true),
343 0 : )
344 0 : .await
345 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
346 0 : let file_id = page_cache::next_file_id();
347 0 : let block_reader = FileBlockReader::new(&file, file_id);
348 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
349 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
350 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
351 0 : return Err(RewriteSummaryError::MagicMismatch);
352 0 : }
353 0 :
354 0 : let new_summary = rewrite(actual_summary);
355 0 :
356 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
357 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
358 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
359 0 : file.seek(SeekFrom::Start(0)).await?;
360 0 : let (_buf, res) = file.write_all(buf).await;
361 0 : res?;
362 0 : Ok(())
363 0 : }
364 : }
365 :
366 : impl ImageLayerInner {
367 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
368 : /// - inner has the success or transient failure
369 : /// - outer has the permanent failure
370 24 : pub(super) async fn load(
371 24 : path: &Utf8Path,
372 24 : lsn: Lsn,
373 24 : summary: Option<Summary>,
374 24 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
375 24 : ctx: &RequestContext,
376 24 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
377 24 : let file = match VirtualFile::open(path).await {
378 24 : Ok(file) => file,
379 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
380 : };
381 24 : let file_id = page_cache::next_file_id();
382 24 : let block_reader = FileBlockReader::new(&file, file_id);
383 24 : let summary_blk = match block_reader.read_blk(0, ctx).await {
384 24 : Ok(blk) => blk,
385 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
386 : };
387 :
388 : // length is the only way how this could fail, so it's not actually likely at all unless
389 : // read_blk returns wrong sized block.
390 : //
391 : // TODO: confirm and make this into assertion
392 24 : let actual_summary =
393 24 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
394 :
395 24 : if let Some(mut expected_summary) = summary {
396 : // production code path
397 24 : expected_summary.index_start_blk = actual_summary.index_start_blk;
398 24 : expected_summary.index_root_blk = actual_summary.index_root_blk;
399 24 :
400 24 : if actual_summary != expected_summary {
401 0 : bail!(
402 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
403 0 : actual_summary,
404 0 : expected_summary
405 0 : );
406 24 : }
407 0 : }
408 :
409 24 : Ok(Ok(ImageLayerInner {
410 24 : index_start_blk: actual_summary.index_start_blk,
411 24 : index_root_blk: actual_summary.index_root_blk,
412 24 : lsn,
413 24 : file,
414 24 : file_id,
415 24 : max_vectored_read_bytes,
416 24 : }))
417 24 : }
418 :
419 526 : pub(super) async fn get_value_reconstruct_data(
420 526 : &self,
421 526 : key: Key,
422 526 : reconstruct_state: &mut ValueReconstructState,
423 526 : ctx: &RequestContext,
424 526 : ) -> anyhow::Result<ValueReconstructResult> {
425 526 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
426 526 : let tree_reader =
427 526 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
428 526 :
429 526 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
430 526 : key.write_to_byte_slice(&mut keybuf);
431 526 : if let Some(offset) = tree_reader
432 526 : .get(
433 526 : &keybuf,
434 526 : &RequestContextBuilder::extend(ctx)
435 526 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
436 526 : .build(),
437 526 : )
438 295 : .await?
439 : {
440 522 : let blob = block_reader
441 522 : .block_cursor()
442 522 : .read_blob(
443 522 : offset,
444 522 : &RequestContextBuilder::extend(ctx)
445 522 : .page_content_kind(PageContentKind::ImageLayerValue)
446 522 : .build(),
447 522 : )
448 153 : .await
449 522 : .with_context(|| format!("failed to read value from offset {}", offset))?;
450 522 : let value = Bytes::from(blob);
451 522 :
452 522 : reconstruct_state.img = Some((self.lsn, value));
453 522 : Ok(ValueReconstructResult::Complete)
454 : } else {
455 4 : Ok(ValueReconstructResult::Missing)
456 : }
457 526 : }
458 :
459 : // Look up the keys in the provided keyspace and update
460 : // the reconstruct state with whatever is found.
461 0 : pub(super) async fn get_values_reconstruct_data(
462 0 : &self,
463 0 : keyspace: KeySpace,
464 0 : reconstruct_state: &mut ValuesReconstructState,
465 0 : ctx: &RequestContext,
466 0 : ) -> Result<(), GetVectoredError> {
467 0 : let reads = self
468 0 : .plan_reads(keyspace, ctx)
469 0 : .await
470 0 : .map_err(GetVectoredError::Other)?;
471 :
472 0 : self.do_reads_and_update_state(reads, reconstruct_state)
473 0 : .await;
474 :
475 0 : Ok(())
476 0 : }
477 :
478 0 : async fn plan_reads(
479 0 : &self,
480 0 : keyspace: KeySpace,
481 0 : ctx: &RequestContext,
482 0 : ) -> anyhow::Result<Vec<VectoredRead>> {
483 0 : let mut planner = VectoredReadPlanner::new(
484 0 : self.max_vectored_read_bytes
485 0 : .expect("Layer is loaded with max vectored bytes config")
486 0 : .0
487 0 : .into(),
488 0 : );
489 0 :
490 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
491 0 : let tree_reader =
492 0 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
493 0 :
494 0 : let ctx = RequestContextBuilder::extend(ctx)
495 0 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
496 0 : .build();
497 :
498 0 : for range in keyspace.ranges.iter() {
499 0 : let mut range_end_handled = false;
500 0 :
501 0 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
502 0 : range.start.write_to_byte_slice(&mut search_key);
503 0 :
504 0 : let index_stream = tree_reader.get_stream_from(&search_key, &ctx);
505 0 : let mut index_stream = std::pin::pin!(index_stream);
506 :
507 0 : while let Some(index_entry) = index_stream.next().await {
508 0 : let (raw_key, offset) = index_entry?;
509 :
510 0 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
511 0 : assert!(key >= range.start);
512 :
513 0 : if key >= range.end {
514 0 : planner.handle_range_end(offset);
515 0 : range_end_handled = true;
516 0 : break;
517 0 : } else {
518 0 : planner.handle(key, self.lsn, offset, BlobFlag::None);
519 0 : }
520 : }
521 :
522 0 : if !range_end_handled {
523 0 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
524 0 : planner.handle_range_end(payload_end);
525 0 : }
526 : }
527 :
528 0 : Ok(planner.finish())
529 0 : }
530 :
531 0 : async fn do_reads_and_update_state(
532 0 : &self,
533 0 : reads: Vec<VectoredRead>,
534 0 : reconstruct_state: &mut ValuesReconstructState,
535 0 : ) {
536 0 : let max_vectored_read_bytes = self
537 0 : .max_vectored_read_bytes
538 0 : .expect("Layer is loaded with max vectored bytes config")
539 0 : .0
540 0 : .into();
541 0 :
542 0 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
543 0 : for read in reads.into_iter() {
544 0 : let buf_size = read.size();
545 0 :
546 0 : if buf_size > max_vectored_read_bytes {
547 : // If the read is oversized, it should only contain one key.
548 0 : let offenders = read
549 0 : .blobs_at
550 0 : .as_slice()
551 0 : .iter()
552 0 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
553 0 : .join(", ");
554 0 : tracing::warn!(
555 0 : "Oversized vectored read ({} > {}) for keys {}",
556 0 : buf_size,
557 0 : max_vectored_read_bytes,
558 0 : offenders
559 0 : );
560 0 : }
561 :
562 0 : let buf = BytesMut::with_capacity(buf_size);
563 0 : let res = vectored_blob_reader.read_blobs(&read, buf).await;
564 :
565 0 : match res {
566 0 : Ok(blobs_buf) => {
567 0 : let frozen_buf = blobs_buf.buf.freeze();
568 :
569 0 : for meta in blobs_buf.blobs.iter() {
570 0 : let img_buf = frozen_buf.slice(meta.start..meta.end);
571 0 : reconstruct_state.update_key(
572 0 : &meta.meta.key,
573 0 : self.lsn,
574 0 : Value::Image(img_buf),
575 0 : );
576 0 : }
577 : }
578 0 : Err(err) => {
579 0 : let kind = err.kind();
580 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
581 0 : reconstruct_state.on_key_error(
582 0 : blob_meta.key,
583 0 : PageReconstructError::from(anyhow!(
584 0 : "Failed to read blobs from virtual file {}: {}",
585 0 : self.file.path,
586 0 : kind
587 0 : )),
588 0 : );
589 0 : }
590 : }
591 : };
592 : }
593 0 : }
594 : }
595 :
596 : /// A builder object for constructing a new image layer.
597 : ///
598 : /// Usage:
599 : ///
600 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
601 : ///
602 : /// 2. Write the contents by calling `put_page_image` for every key-value
603 : /// pair in the key range.
604 : ///
605 : /// 3. Call `finish`.
606 : ///
607 : struct ImageLayerWriterInner {
608 : conf: &'static PageServerConf,
609 : path: Utf8PathBuf,
610 : timeline_id: TimelineId,
611 : tenant_shard_id: TenantShardId,
612 : key_range: Range<Key>,
613 : lsn: Lsn,
614 :
615 : blob_writer: BlobWriter<false>,
616 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
617 : }
618 :
619 : impl ImageLayerWriterInner {
620 : ///
621 : /// Start building a new image layer.
622 : ///
623 104 : async fn new(
624 104 : conf: &'static PageServerConf,
625 104 : timeline_id: TimelineId,
626 104 : tenant_shard_id: TenantShardId,
627 104 : key_range: &Range<Key>,
628 104 : lsn: Lsn,
629 104 : ) -> anyhow::Result<Self> {
630 104 : // Create the file initially with a temporary filename.
631 104 : // We'll atomically rename it to the final name when we're done.
632 104 : let path = ImageLayer::temp_path_for(
633 104 : conf,
634 104 : timeline_id,
635 104 : tenant_shard_id,
636 104 : &ImageFileName {
637 104 : key_range: key_range.clone(),
638 104 : lsn,
639 104 : },
640 104 : );
641 104 : info!("new image layer {path}");
642 104 : let mut file = {
643 104 : VirtualFile::open_with_options(
644 104 : &path,
645 104 : virtual_file::OpenOptions::new()
646 104 : .write(true)
647 104 : .create_new(true),
648 104 : )
649 97 : .await?
650 : };
651 : // make room for the header block
652 104 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
653 104 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
654 104 :
655 104 : // Initialize the b-tree index builder
656 104 : let block_buf = BlockBuf::new();
657 104 : let tree_builder = DiskBtreeBuilder::new(block_buf);
658 104 :
659 104 : let writer = Self {
660 104 : conf,
661 104 : path,
662 104 : timeline_id,
663 104 : tenant_shard_id,
664 104 : key_range: key_range.clone(),
665 104 : lsn,
666 104 : tree: tree_builder,
667 104 : blob_writer,
668 104 : };
669 104 :
670 104 : Ok(writer)
671 104 : }
672 :
673 : ///
674 : /// Write next value to the file.
675 : ///
676 : /// The page versions must be appended in blknum order.
677 : ///
678 720 : async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
679 720 : ensure!(self.key_range.contains(&key));
680 776 : let (_img, res) = self.blob_writer.write_blob(img).await;
681 : // TODO: re-use the buffer for `img` further upstack
682 720 : let off = res?;
683 :
684 720 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
685 720 : key.write_to_byte_slice(&mut keybuf);
686 720 : self.tree.append(&keybuf, off)?;
687 :
688 720 : Ok(())
689 720 : }
690 :
691 : ///
692 : /// Finish writing the image layer.
693 : ///
694 104 : async fn finish(self, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
695 104 : let index_start_blk =
696 104 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
697 104 :
698 104 : let mut file = self.blob_writer.into_inner();
699 104 :
700 104 : // Write out the index
701 104 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
702 0 : .await?;
703 104 : let (index_root_blk, block_buf) = self.tree.finish()?;
704 208 : for buf in block_buf.blocks {
705 104 : let (_buf, res) = file.write_all(buf).await;
706 104 : res?;
707 : }
708 :
709 : // Fill in the summary on blk 0
710 104 : let summary = Summary {
711 104 : magic: IMAGE_FILE_MAGIC,
712 104 : format_version: STORAGE_FORMAT_VERSION,
713 104 : tenant_id: self.tenant_shard_id.tenant_id,
714 104 : timeline_id: self.timeline_id,
715 104 : key_range: self.key_range.clone(),
716 104 : lsn: self.lsn,
717 104 : index_start_blk,
718 104 : index_root_blk,
719 104 : };
720 104 :
721 104 : let mut buf = Vec::with_capacity(PAGE_SZ);
722 104 : // TODO: could use smallvec here but it's a pain with Slice<T>
723 104 : Summary::ser_into(&summary, &mut buf)?;
724 104 : file.seek(SeekFrom::Start(0)).await?;
725 104 : let (_buf, res) = file.write_all(buf).await;
726 104 : res?;
727 :
728 104 : let metadata = file
729 104 : .metadata()
730 52 : .await
731 104 : .context("get metadata to determine file size")?;
732 :
733 104 : let desc = PersistentLayerDesc::new_img(
734 104 : self.tenant_shard_id,
735 104 : self.timeline_id,
736 104 : self.key_range.clone(),
737 104 : self.lsn,
738 104 : metadata.len(),
739 104 : );
740 104 :
741 104 : // Note: Because we open the file in write-only mode, we cannot
742 104 : // reuse the same VirtualFile for reading later. That's why we don't
743 104 : // set inner.file here. The first read will have to re-open it.
744 104 :
745 104 : // fsync the file
746 104 : file.sync_all().await?;
747 :
748 : // FIXME: why not carry the virtualfile here, it supports renaming?
749 104 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
750 :
751 104 : trace!("created image layer {}", layer.local_path());
752 :
753 104 : Ok(layer)
754 104 : }
755 : }
756 :
757 : /// A builder object for constructing a new image layer.
758 : ///
759 : /// Usage:
760 : ///
761 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
762 : ///
763 : /// 2. Write the contents by calling `put_page_image` for every key-value
764 : /// pair in the key range.
765 : ///
766 : /// 3. Call `finish`.
767 : ///
768 : /// # Note
769 : ///
770 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
771 : /// possible for the writer to drop before `finish` is actually called. So this
772 : /// could lead to odd temporary files in the directory, exhausting file system.
773 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
774 : /// implementation that cleans up the temporary file in failure. It's not
775 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
776 : /// out some fields, making it impossible to implement `Drop`.
777 : ///
778 : #[must_use]
779 : pub struct ImageLayerWriter {
780 : inner: Option<ImageLayerWriterInner>,
781 : }
782 :
783 : impl ImageLayerWriter {
784 : ///
785 : /// Start building a new image layer.
786 : ///
787 104 : pub async fn new(
788 104 : conf: &'static PageServerConf,
789 104 : timeline_id: TimelineId,
790 104 : tenant_shard_id: TenantShardId,
791 104 : key_range: &Range<Key>,
792 104 : lsn: Lsn,
793 104 : ) -> anyhow::Result<ImageLayerWriter> {
794 104 : Ok(Self {
795 104 : inner: Some(
796 104 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
797 97 : .await?,
798 : ),
799 : })
800 104 : }
801 :
802 : ///
803 : /// Write next value to the file.
804 : ///
805 : /// The page versions must be appended in blknum order.
806 : ///
807 720 : pub async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
808 776 : self.inner.as_mut().unwrap().put_image(key, img).await
809 720 : }
810 :
811 : ///
812 : /// Finish writing the image layer.
813 : ///
814 104 : pub(crate) async fn finish(
815 104 : mut self,
816 104 : timeline: &Arc<Timeline>,
817 104 : ) -> anyhow::Result<super::ResidentLayer> {
818 208 : self.inner.take().unwrap().finish(timeline).await
819 104 : }
820 : }
821 :
822 : impl Drop for ImageLayerWriter {
823 104 : fn drop(&mut self) {
824 104 : if let Some(inner) = self.inner.take() {
825 0 : inner.blob_writer.into_inner().remove();
826 104 : }
827 104 : }
828 : }
|