Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::{self, FileId, PAGE_SZ};
29 : use crate::repository::{Key, Value, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
33 : use crate::tenant::storage_layer::{
34 : LayerAccessStats, ValueReconstructResult, ValueReconstructState,
35 : };
36 : use crate::tenant::timeline::GetVectoredError;
37 : use crate::tenant::vectored_blob_io::{
38 : BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
39 : };
40 : use crate::tenant::{PageReconstructError, Timeline};
41 : use crate::virtual_file::{self, VirtualFile};
42 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
43 : use anyhow::{anyhow, bail, ensure, Context, Result};
44 : use bytes::{Bytes, BytesMut};
45 : use camino::{Utf8Path, Utf8PathBuf};
46 : use hex;
47 : use itertools::Itertools;
48 : use pageserver_api::keyspace::KeySpace;
49 : use pageserver_api::models::LayerAccessKind;
50 : use pageserver_api::shard::TenantShardId;
51 : use rand::{distributions::Alphanumeric, Rng};
52 : use serde::{Deserialize, Serialize};
53 : use std::fs::File;
54 : use std::io::SeekFrom;
55 : use std::ops::Range;
56 : use std::os::unix::prelude::FileExt;
57 : use std::str::FromStr;
58 : use std::sync::Arc;
59 : use tokio::sync::OnceCell;
60 : use tokio_stream::StreamExt;
61 : use tracing::*;
62 :
63 : use utils::{
64 : bin_ser::BeSer,
65 : id::{TenantId, TimelineId},
66 : lsn::Lsn,
67 : };
68 :
69 : use super::layer_name::ImageLayerName;
70 : use super::{
71 : AsLayerDesc, Layer, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
72 : };
73 :
74 : ///
75 : /// Header stored in the beginning of the file
76 : ///
77 : /// After this comes the 'values' part, starting on block 1. After that,
78 : /// the 'index' starts at the block indicated by 'index_start_blk'
79 : ///
80 62 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
81 : pub struct Summary {
82 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
83 : pub magic: u16,
84 : pub format_version: u16,
85 :
86 : pub tenant_id: TenantId,
87 : pub timeline_id: TimelineId,
88 : pub key_range: Range<Key>,
89 : pub lsn: Lsn,
90 :
91 : /// Block number where the 'index' part of the file begins.
92 : pub index_start_blk: u32,
93 : /// Block within the 'index', where the B-tree root page is stored
94 : pub index_root_blk: u32,
95 : // the 'values' part starts after the summary header, on block 1.
96 : }
97 :
98 : impl From<&ImageLayer> for Summary {
99 0 : fn from(layer: &ImageLayer) -> Self {
100 0 : Self::expected(
101 0 : layer.desc.tenant_shard_id.tenant_id,
102 0 : layer.desc.timeline_id,
103 0 : layer.desc.key_range.clone(),
104 0 : layer.lsn,
105 0 : )
106 0 : }
107 : }
108 :
109 : impl Summary {
110 62 : pub(super) fn expected(
111 62 : tenant_id: TenantId,
112 62 : timeline_id: TimelineId,
113 62 : key_range: Range<Key>,
114 62 : lsn: Lsn,
115 62 : ) -> Self {
116 62 : Self {
117 62 : magic: IMAGE_FILE_MAGIC,
118 62 : format_version: STORAGE_FORMAT_VERSION,
119 62 : tenant_id,
120 62 : timeline_id,
121 62 : key_range,
122 62 : lsn,
123 62 :
124 62 : index_start_blk: 0,
125 62 : index_root_blk: 0,
126 62 : }
127 62 : }
128 : }
129 :
130 : /// This is used only from `pagectl`. Within pageserver, all layers are
131 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
132 : pub struct ImageLayer {
133 : path: Utf8PathBuf,
134 : pub desc: PersistentLayerDesc,
135 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
136 : pub lsn: Lsn,
137 : access_stats: LayerAccessStats,
138 : inner: OnceCell<ImageLayerInner>,
139 : }
140 :
141 : impl std::fmt::Debug for ImageLayer {
142 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 0 : use super::RangeDisplayDebug;
144 0 :
145 0 : f.debug_struct("ImageLayer")
146 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
147 0 : .field("file_size", &self.desc.file_size)
148 0 : .field("lsn", &self.lsn)
149 0 : .field("inner", &self.inner)
150 0 : .finish()
151 0 : }
152 : }
153 :
154 : /// ImageLayer is the in-memory data structure associated with an on-disk image
155 : /// file.
156 : pub struct ImageLayerInner {
157 : // values copied from summary
158 : index_start_blk: u32,
159 : index_root_blk: u32,
160 :
161 : key_range: Range<Key>,
162 : lsn: Lsn,
163 :
164 : file: VirtualFile,
165 : file_id: FileId,
166 :
167 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
168 : }
169 :
170 : impl std::fmt::Debug for ImageLayerInner {
171 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 0 : f.debug_struct("ImageLayerInner")
173 0 : .field("index_start_blk", &self.index_start_blk)
174 0 : .field("index_root_blk", &self.index_root_blk)
175 0 : .finish()
176 0 : }
177 : }
178 :
179 : impl ImageLayerInner {
180 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
181 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
182 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
183 0 : self.index_start_blk,
184 0 : self.index_root_blk,
185 0 : block_reader,
186 0 : );
187 0 :
188 0 : tree_reader.dump().await?;
189 :
190 0 : tree_reader
191 0 : .visit(
192 0 : &[0u8; KEY_SIZE],
193 0 : VisitDirection::Forwards,
194 0 : |key, value| {
195 0 : println!("key: {} offset {}", hex::encode(key), value);
196 0 : true
197 0 : },
198 0 : ctx,
199 0 : )
200 0 : .await?;
201 :
202 0 : Ok(())
203 0 : }
204 : }
205 :
206 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
207 : impl std::fmt::Display for ImageLayer {
208 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 0 : write!(f, "{}", self.layer_desc().short_id())
210 0 : }
211 : }
212 :
213 : impl AsLayerDesc for ImageLayer {
214 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
215 0 : &self.desc
216 0 : }
217 : }
218 :
219 : impl ImageLayer {
220 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
221 0 : self.desc.dump();
222 0 :
223 0 : if !verbose {
224 0 : return Ok(());
225 0 : }
226 :
227 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
228 :
229 0 : inner.dump(ctx).await?;
230 :
231 0 : Ok(())
232 0 : }
233 :
234 508 : fn temp_path_for(
235 508 : conf: &PageServerConf,
236 508 : timeline_id: TimelineId,
237 508 : tenant_shard_id: TenantShardId,
238 508 : fname: &ImageLayerName,
239 508 : ) -> Utf8PathBuf {
240 508 : let rand_string: String = rand::thread_rng()
241 508 : .sample_iter(&Alphanumeric)
242 508 : .take(8)
243 508 : .map(char::from)
244 508 : .collect();
245 508 :
246 508 : conf.timeline_path(&tenant_shard_id, &timeline_id)
247 508 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
248 508 : }
249 :
250 : ///
251 : /// Open the underlying file and read the metadata into memory, if it's
252 : /// not loaded already.
253 : ///
254 0 : async fn load(
255 0 : &self,
256 0 : access_kind: LayerAccessKind,
257 0 : ctx: &RequestContext,
258 0 : ) -> Result<&ImageLayerInner> {
259 0 : self.access_stats.record_access(access_kind, ctx);
260 0 : self.inner
261 0 : .get_or_try_init(|| self.load_inner(ctx))
262 0 : .await
263 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
264 0 : }
265 :
266 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
267 0 : let path = self.path();
268 :
269 0 : let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
270 0 : .await
271 0 : .and_then(|res| res)?;
272 :
273 : // not production code
274 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
275 0 : let expected_layer_name = self.layer_desc().layer_name();
276 0 :
277 0 : if actual_layer_name != expected_layer_name {
278 0 : println!("warning: filename does not match what is expected from in-file summary");
279 0 : println!("actual: {:?}", actual_layer_name.to_string());
280 0 : println!("expected: {:?}", expected_layer_name.to_string());
281 0 : }
282 :
283 0 : Ok(loaded)
284 0 : }
285 :
286 : /// Create an ImageLayer struct representing an existing file on disk.
287 : ///
288 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
289 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
290 0 : let mut summary_buf = vec![0; PAGE_SZ];
291 0 : file.read_exact_at(&mut summary_buf, 0)?;
292 0 : let summary = Summary::des_prefix(&summary_buf)?;
293 0 : let metadata = file
294 0 : .metadata()
295 0 : .context("get file metadata to determine size")?;
296 :
297 : // This function is never used for constructing layers in a running pageserver,
298 : // so it does not need an accurate TenantShardId.
299 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
300 0 :
301 0 : Ok(ImageLayer {
302 0 : path: path.to_path_buf(),
303 0 : desc: PersistentLayerDesc::new_img(
304 0 : tenant_shard_id,
305 0 : summary.timeline_id,
306 0 : summary.key_range,
307 0 : summary.lsn,
308 0 : metadata.len(),
309 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
310 0 : lsn: summary.lsn,
311 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
312 0 : inner: OnceCell::new(),
313 0 : })
314 0 : }
315 :
316 0 : fn path(&self) -> Utf8PathBuf {
317 0 : self.path.clone()
318 0 : }
319 : }
320 :
321 0 : #[derive(thiserror::Error, Debug)]
322 : pub enum RewriteSummaryError {
323 : #[error("magic mismatch")]
324 : MagicMismatch,
325 : #[error(transparent)]
326 : Other(#[from] anyhow::Error),
327 : }
328 :
329 : impl From<std::io::Error> for RewriteSummaryError {
330 0 : fn from(e: std::io::Error) -> Self {
331 0 : Self::Other(anyhow::anyhow!(e))
332 0 : }
333 : }
334 :
335 : impl ImageLayer {
336 0 : pub async fn rewrite_summary<F>(
337 0 : path: &Utf8Path,
338 0 : rewrite: F,
339 0 : ctx: &RequestContext,
340 0 : ) -> Result<(), RewriteSummaryError>
341 0 : where
342 0 : F: Fn(Summary) -> Summary,
343 0 : {
344 0 : let mut file = VirtualFile::open_with_options(
345 0 : path,
346 0 : virtual_file::OpenOptions::new().read(true).write(true),
347 0 : ctx,
348 0 : )
349 0 : .await
350 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
351 0 : let file_id = page_cache::next_file_id();
352 0 : let block_reader = FileBlockReader::new(&file, file_id);
353 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
354 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
355 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
356 0 : return Err(RewriteSummaryError::MagicMismatch);
357 0 : }
358 0 :
359 0 : let new_summary = rewrite(actual_summary);
360 0 :
361 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
362 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
363 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
364 0 : file.seek(SeekFrom::Start(0)).await?;
365 0 : let (_buf, res) = file.write_all(buf, ctx).await;
366 0 : res?;
367 0 : Ok(())
368 0 : }
369 : }
370 :
371 : impl ImageLayerInner {
372 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
373 : /// - inner has the success or transient failure
374 : /// - outer has the permanent failure
375 62 : pub(super) async fn load(
376 62 : path: &Utf8Path,
377 62 : lsn: Lsn,
378 62 : summary: Option<Summary>,
379 62 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
380 62 : ctx: &RequestContext,
381 62 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
382 62 : let file = match VirtualFile::open(path, ctx).await {
383 62 : Ok(file) => file,
384 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
385 : };
386 62 : let file_id = page_cache::next_file_id();
387 62 : let block_reader = FileBlockReader::new(&file, file_id);
388 62 : let summary_blk = match block_reader.read_blk(0, ctx).await {
389 62 : Ok(blk) => blk,
390 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
391 : };
392 :
393 : // length is the only way how this could fail, so it's not actually likely at all unless
394 : // read_blk returns wrong sized block.
395 : //
396 : // TODO: confirm and make this into assertion
397 62 : let actual_summary =
398 62 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
399 :
400 62 : if let Some(mut expected_summary) = summary {
401 : // production code path
402 62 : expected_summary.index_start_blk = actual_summary.index_start_blk;
403 62 : expected_summary.index_root_blk = actual_summary.index_root_blk;
404 62 : // mask out the timeline_id, but still require the layers to be from the same tenant
405 62 : expected_summary.timeline_id = actual_summary.timeline_id;
406 62 :
407 62 : if actual_summary != expected_summary {
408 0 : bail!(
409 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
410 0 : actual_summary,
411 0 : expected_summary
412 0 : );
413 62 : }
414 0 : }
415 :
416 62 : Ok(Ok(ImageLayerInner {
417 62 : index_start_blk: actual_summary.index_start_blk,
418 62 : index_root_blk: actual_summary.index_root_blk,
419 62 : lsn,
420 62 : file,
421 62 : file_id,
422 62 : max_vectored_read_bytes,
423 62 : key_range: actual_summary.key_range,
424 62 : }))
425 62 : }
426 :
427 6939 : pub(super) async fn get_value_reconstruct_data(
428 6939 : &self,
429 6939 : key: Key,
430 6939 : reconstruct_state: &mut ValueReconstructState,
431 6939 : ctx: &RequestContext,
432 6939 : ) -> anyhow::Result<ValueReconstructResult> {
433 6939 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
434 6939 : let tree_reader =
435 6939 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
436 6939 :
437 6939 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
438 6939 : key.write_to_byte_slice(&mut keybuf);
439 6939 : if let Some(offset) = tree_reader
440 6939 : .get(
441 6939 : &keybuf,
442 6939 : &RequestContextBuilder::extend(ctx)
443 6939 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
444 6939 : .build(),
445 6939 : )
446 401 : .await?
447 : {
448 6935 : let blob = block_reader
449 6935 : .block_cursor()
450 6935 : .read_blob(
451 6935 : offset,
452 6935 : &RequestContextBuilder::extend(ctx)
453 6935 : .page_content_kind(PageContentKind::ImageLayerValue)
454 6935 : .build(),
455 6935 : )
456 310 : .await
457 6935 : .with_context(|| format!("failed to read value from offset {}", offset))?;
458 6935 : let value = Bytes::from(blob);
459 6935 :
460 6935 : reconstruct_state.img = Some((self.lsn, value));
461 6935 : Ok(ValueReconstructResult::Complete)
462 : } else {
463 4 : Ok(ValueReconstructResult::Missing)
464 : }
465 6939 : }
466 :
467 : // Look up the keys in the provided keyspace and update
468 : // the reconstruct state with whatever is found.
469 102 : pub(super) async fn get_values_reconstruct_data(
470 102 : &self,
471 102 : keyspace: KeySpace,
472 102 : reconstruct_state: &mut ValuesReconstructState,
473 102 : ctx: &RequestContext,
474 102 : ) -> Result<(), GetVectoredError> {
475 102 : let reads = self
476 102 : .plan_reads(keyspace, ctx)
477 350 : .await
478 102 : .map_err(GetVectoredError::Other)?;
479 :
480 102 : self.do_reads_and_update_state(reads, reconstruct_state, ctx)
481 848 : .await;
482 :
483 102 : reconstruct_state.on_image_layer_visited(&self.key_range);
484 102 :
485 102 : Ok(())
486 102 : }
487 :
488 102 : async fn plan_reads(
489 102 : &self,
490 102 : keyspace: KeySpace,
491 102 : ctx: &RequestContext,
492 102 : ) -> anyhow::Result<Vec<VectoredRead>> {
493 102 : let mut planner = VectoredReadPlanner::new(
494 102 : self.max_vectored_read_bytes
495 102 : .expect("Layer is loaded with max vectored bytes config")
496 102 : .0
497 102 : .into(),
498 102 : );
499 102 :
500 102 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
501 102 : let tree_reader =
502 102 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
503 102 :
504 102 : let ctx = RequestContextBuilder::extend(ctx)
505 102 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
506 102 : .build();
507 :
508 21826 : for range in keyspace.ranges.iter() {
509 21826 : let mut range_end_handled = false;
510 21826 :
511 21826 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
512 21826 : range.start.write_to_byte_slice(&mut search_key);
513 21826 :
514 21826 : let index_stream = tree_reader.get_stream_from(&search_key, &ctx);
515 21826 : let mut index_stream = std::pin::pin!(index_stream);
516 :
517 70087 : while let Some(index_entry) = index_stream.next().await {
518 70005 : let (raw_key, offset) = index_entry?;
519 :
520 70005 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
521 70005 : assert!(key >= range.start);
522 :
523 70005 : if key >= range.end {
524 21744 : planner.handle_range_end(offset);
525 21744 : range_end_handled = true;
526 21744 : break;
527 48261 : } else {
528 48261 : planner.handle(key, self.lsn, offset, BlobFlag::None);
529 48261 : }
530 : }
531 :
532 21826 : if !range_end_handled {
533 82 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
534 82 : planner.handle_range_end(payload_end);
535 21744 : }
536 : }
537 :
538 102 : Ok(planner.finish())
539 102 : }
540 :
541 102 : async fn do_reads_and_update_state(
542 102 : &self,
543 102 : reads: Vec<VectoredRead>,
544 102 : reconstruct_state: &mut ValuesReconstructState,
545 102 : ctx: &RequestContext,
546 102 : ) {
547 102 : let max_vectored_read_bytes = self
548 102 : .max_vectored_read_bytes
549 102 : .expect("Layer is loaded with max vectored bytes config")
550 102 : .0
551 102 : .into();
552 102 :
553 102 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
554 1708 : for read in reads.into_iter() {
555 1708 : let buf_size = read.size();
556 1708 :
557 1708 : if buf_size > max_vectored_read_bytes {
558 : // If the read is oversized, it should only contain one key.
559 0 : let offenders = read
560 0 : .blobs_at
561 0 : .as_slice()
562 0 : .iter()
563 0 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
564 0 : .join(", ");
565 0 : tracing::warn!(
566 0 : "Oversized vectored read ({} > {}) for keys {}",
567 : buf_size,
568 : max_vectored_read_bytes,
569 : offenders
570 : );
571 1708 : }
572 :
573 1708 : let buf = BytesMut::with_capacity(buf_size);
574 1708 : let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
575 :
576 1708 : match res {
577 1708 : Ok(blobs_buf) => {
578 1708 : let frozen_buf = blobs_buf.buf.freeze();
579 :
580 48261 : for meta in blobs_buf.blobs.iter() {
581 48261 : let img_buf = frozen_buf.slice(meta.start..meta.end);
582 48261 : reconstruct_state.update_key(
583 48261 : &meta.meta.key,
584 48261 : self.lsn,
585 48261 : Value::Image(img_buf),
586 48261 : );
587 48261 : }
588 : }
589 0 : Err(err) => {
590 0 : let kind = err.kind();
591 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
592 0 : reconstruct_state.on_key_error(
593 0 : blob_meta.key,
594 0 : PageReconstructError::from(anyhow!(
595 0 : "Failed to read blobs from virtual file {}: {}",
596 0 : self.file.path,
597 0 : kind
598 0 : )),
599 0 : );
600 0 : }
601 : }
602 : };
603 : }
604 102 : }
605 : }
606 :
607 : /// A builder object for constructing a new image layer.
608 : ///
609 : /// Usage:
610 : ///
611 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
612 : ///
613 : /// 2. Write the contents by calling `put_page_image` for every key-value
614 : /// pair in the key range.
615 : ///
616 : /// 3. Call `finish`.
617 : ///
618 : struct ImageLayerWriterInner {
619 : conf: &'static PageServerConf,
620 : path: Utf8PathBuf,
621 : timeline_id: TimelineId,
622 : tenant_shard_id: TenantShardId,
623 : key_range: Range<Key>,
624 : lsn: Lsn,
625 :
626 : blob_writer: BlobWriter<false>,
627 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
628 : }
629 :
630 : impl ImageLayerWriterInner {
631 : ///
632 : /// Start building a new image layer.
633 : ///
634 508 : async fn new(
635 508 : conf: &'static PageServerConf,
636 508 : timeline_id: TimelineId,
637 508 : tenant_shard_id: TenantShardId,
638 508 : key_range: &Range<Key>,
639 508 : lsn: Lsn,
640 508 : ctx: &RequestContext,
641 508 : ) -> anyhow::Result<Self> {
642 508 : // Create the file initially with a temporary filename.
643 508 : // We'll atomically rename it to the final name when we're done.
644 508 : let path = ImageLayer::temp_path_for(
645 508 : conf,
646 508 : timeline_id,
647 508 : tenant_shard_id,
648 508 : &ImageLayerName {
649 508 : key_range: key_range.clone(),
650 508 : lsn,
651 508 : },
652 508 : );
653 508 : info!("new image layer {path}");
654 508 : let mut file = {
655 508 : VirtualFile::open_with_options(
656 508 : &path,
657 508 : virtual_file::OpenOptions::new()
658 508 : .write(true)
659 508 : .create_new(true),
660 508 : ctx,
661 508 : )
662 310 : .await?
663 : };
664 : // make room for the header block
665 508 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
666 508 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
667 508 :
668 508 : // Initialize the b-tree index builder
669 508 : let block_buf = BlockBuf::new();
670 508 : let tree_builder = DiskBtreeBuilder::new(block_buf);
671 508 :
672 508 : let writer = Self {
673 508 : conf,
674 508 : path,
675 508 : timeline_id,
676 508 : tenant_shard_id,
677 508 : key_range: key_range.clone(),
678 508 : lsn,
679 508 : tree: tree_builder,
680 508 : blob_writer,
681 508 : };
682 508 :
683 508 : Ok(writer)
684 508 : }
685 :
686 : ///
687 : /// Write next value to the file.
688 : ///
689 : /// The page versions must be appended in blknum order.
690 : ///
691 11030 : async fn put_image(
692 11030 : &mut self,
693 11030 : key: Key,
694 11030 : img: Bytes,
695 11030 : ctx: &RequestContext,
696 11030 : ) -> anyhow::Result<()> {
697 11030 : ensure!(self.key_range.contains(&key));
698 11251 : let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
699 : // TODO: re-use the buffer for `img` further upstack
700 11030 : let off = res?;
701 :
702 11030 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
703 11030 : key.write_to_byte_slice(&mut keybuf);
704 11030 : self.tree.append(&keybuf, off)?;
705 :
706 11030 : Ok(())
707 11030 : }
708 :
709 : ///
710 : /// Finish writing the image layer.
711 : ///
712 158 : async fn finish(
713 158 : self,
714 158 : timeline: &Arc<Timeline>,
715 158 : ctx: &RequestContext,
716 158 : ) -> anyhow::Result<ResidentLayer> {
717 158 : let index_start_blk =
718 158 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
719 158 :
720 158 : let mut file = self.blob_writer.into_inner();
721 158 :
722 158 : // Write out the index
723 158 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
724 0 : .await?;
725 158 : let (index_root_blk, block_buf) = self.tree.finish()?;
726 316 : for buf in block_buf.blocks {
727 158 : let (_buf, res) = file.write_all(buf, ctx).await;
728 158 : res?;
729 : }
730 :
731 : // Fill in the summary on blk 0
732 158 : let summary = Summary {
733 158 : magic: IMAGE_FILE_MAGIC,
734 158 : format_version: STORAGE_FORMAT_VERSION,
735 158 : tenant_id: self.tenant_shard_id.tenant_id,
736 158 : timeline_id: self.timeline_id,
737 158 : key_range: self.key_range.clone(),
738 158 : lsn: self.lsn,
739 158 : index_start_blk,
740 158 : index_root_blk,
741 158 : };
742 158 :
743 158 : let mut buf = Vec::with_capacity(PAGE_SZ);
744 158 : // TODO: could use smallvec here but it's a pain with Slice<T>
745 158 : Summary::ser_into(&summary, &mut buf)?;
746 158 : file.seek(SeekFrom::Start(0)).await?;
747 158 : let (_buf, res) = file.write_all(buf, ctx).await;
748 158 : res?;
749 :
750 158 : let metadata = file
751 158 : .metadata()
752 79 : .await
753 158 : .context("get metadata to determine file size")?;
754 :
755 158 : let desc = PersistentLayerDesc::new_img(
756 158 : self.tenant_shard_id,
757 158 : self.timeline_id,
758 158 : self.key_range.clone(),
759 158 : self.lsn,
760 158 : metadata.len(),
761 158 : );
762 158 :
763 158 : // Note: Because we open the file in write-only mode, we cannot
764 158 : // reuse the same VirtualFile for reading later. That's why we don't
765 158 : // set inner.file here. The first read will have to re-open it.
766 158 :
767 158 : // fsync the file
768 158 : file.sync_all().await?;
769 :
770 : // FIXME: why not carry the virtualfile here, it supports renaming?
771 158 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
772 :
773 158 : trace!("created image layer {}", layer.local_path());
774 :
775 158 : Ok(layer)
776 158 : }
777 : }
778 :
779 : /// A builder object for constructing a new image layer.
780 : ///
781 : /// Usage:
782 : ///
783 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
784 : ///
785 : /// 2. Write the contents by calling `put_page_image` for every key-value
786 : /// pair in the key range.
787 : ///
788 : /// 3. Call `finish`.
789 : ///
790 : /// # Note
791 : ///
792 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
793 : /// possible for the writer to drop before `finish` is actually called. So this
794 : /// could lead to odd temporary files in the directory, exhausting file system.
795 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
796 : /// implementation that cleans up the temporary file in failure. It's not
797 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
798 : /// out some fields, making it impossible to implement `Drop`.
799 : ///
800 : #[must_use]
801 : pub struct ImageLayerWriter {
802 : inner: Option<ImageLayerWriterInner>,
803 : }
804 :
805 : impl ImageLayerWriter {
806 : ///
807 : /// Start building a new image layer.
808 : ///
809 508 : pub async fn new(
810 508 : conf: &'static PageServerConf,
811 508 : timeline_id: TimelineId,
812 508 : tenant_shard_id: TenantShardId,
813 508 : key_range: &Range<Key>,
814 508 : lsn: Lsn,
815 508 : ctx: &RequestContext,
816 508 : ) -> anyhow::Result<ImageLayerWriter> {
817 508 : Ok(Self {
818 508 : inner: Some(
819 508 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
820 310 : .await?,
821 : ),
822 : })
823 508 : }
824 :
825 : ///
826 : /// Write next value to the file.
827 : ///
828 : /// The page versions must be appended in blknum order.
829 : ///
830 11030 : pub async fn put_image(
831 11030 : &mut self,
832 11030 : key: Key,
833 11030 : img: Bytes,
834 11030 : ctx: &RequestContext,
835 11030 : ) -> anyhow::Result<()> {
836 11251 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
837 11030 : }
838 :
839 : ///
840 : /// Finish writing the image layer.
841 : ///
842 158 : pub(crate) async fn finish(
843 158 : mut self,
844 158 : timeline: &Arc<Timeline>,
845 158 : ctx: &RequestContext,
846 158 : ) -> anyhow::Result<super::ResidentLayer> {
847 318 : self.inner.take().unwrap().finish(timeline, ctx).await
848 158 : }
849 : }
850 :
851 : impl Drop for ImageLayerWriter {
852 508 : fn drop(&mut self) {
853 508 : if let Some(inner) = self.inner.take() {
854 350 : inner.blob_writer.into_inner().remove();
855 350 : }
856 508 : }
857 : }
|