Line data Source code
1 : //! An ImageLayer represents an image or a snapshot of a key-range at
2 : //! one particular LSN. It contains an image of all key-value pairs
3 : //! in its key-range. Any key that falls into the image layer's range
4 : //! but does not exist in the layer, does not exist.
5 : //!
6 : //! An image layer is stored in a file on disk. The file is stored in
7 : //! timelines/<timeline_id> directory. Currently, there are no
8 : //! subdirectories, and each image layer file is named like this:
9 : //!
10 : //! ```text
11 : //! <key start>-<key end>__<LSN>
12 : //! ```
13 : //!
14 : //! For example:
15 : //!
16 : //! ```text
17 : //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
18 : //! ```
19 : //!
20 : //! Every image layer file consists of three parts: "summary",
21 : //! "index", and "values". The summary is a fixed size header at the
22 : //! beginning of the file, and it contains basic information about the
23 : //! layer, and offsets to the other parts. The "index" is a B-tree,
24 : //! mapping from Key to an offset in the "values" part. The
25 : //! actual page images are stored in the "values" part.
26 : use crate::config::PageServerConf;
27 : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
28 : use crate::page_cache::{self, FileId, PAGE_SZ};
29 : use crate::repository::{Key, Value, KEY_SIZE};
30 : use crate::tenant::blob_io::BlobWriter;
31 : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
32 : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
33 : use crate::tenant::storage_layer::{
34 : LayerAccessStats, ValueReconstructResult, ValueReconstructState,
35 : };
36 : use crate::tenant::timeline::GetVectoredError;
37 : use crate::tenant::vectored_blob_io::{
38 : BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
39 : };
40 : use crate::tenant::{PageReconstructError, Timeline};
41 : use crate::virtual_file::{self, VirtualFile};
42 : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
43 : use anyhow::{anyhow, bail, ensure, Context, Result};
44 : use bytes::{Bytes, BytesMut};
45 : use camino::{Utf8Path, Utf8PathBuf};
46 : use hex;
47 : use itertools::Itertools;
48 : use pageserver_api::keyspace::KeySpace;
49 : use pageserver_api::models::LayerAccessKind;
50 : use pageserver_api::shard::TenantShardId;
51 : use rand::{distributions::Alphanumeric, Rng};
52 : use serde::{Deserialize, Serialize};
53 : use std::fs::File;
54 : use std::io::SeekFrom;
55 : use std::ops::Range;
56 : use std::os::unix::prelude::FileExt;
57 : use std::str::FromStr;
58 : use std::sync::Arc;
59 : use tokio::sync::OnceCell;
60 : use tokio_stream::StreamExt;
61 : use tracing::*;
62 :
63 : use utils::{
64 : bin_ser::BeSer,
65 : id::{TenantId, TimelineId},
66 : lsn::Lsn,
67 : };
68 :
69 : use super::layer_name::ImageLayerName;
70 : use super::{
71 : AsLayerDesc, Layer, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
72 : };
73 :
74 : ///
75 : /// Header stored in the beginning of the file
76 : ///
77 : /// After this comes the 'values' part, starting on block 1. After that,
78 : /// the 'index' starts at the block indicated by 'index_start_blk'
79 : ///
80 26 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
81 : pub struct Summary {
82 : /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
83 : pub magic: u16,
84 : pub format_version: u16,
85 :
86 : pub tenant_id: TenantId,
87 : pub timeline_id: TimelineId,
88 : pub key_range: Range<Key>,
89 : pub lsn: Lsn,
90 :
91 : /// Block number where the 'index' part of the file begins.
92 : pub index_start_blk: u32,
93 : /// Block within the 'index', where the B-tree root page is stored
94 : pub index_root_blk: u32,
95 : // the 'values' part starts after the summary header, on block 1.
96 : }
97 :
98 : impl From<&ImageLayer> for Summary {
99 0 : fn from(layer: &ImageLayer) -> Self {
100 0 : Self::expected(
101 0 : layer.desc.tenant_shard_id.tenant_id,
102 0 : layer.desc.timeline_id,
103 0 : layer.desc.key_range.clone(),
104 0 : layer.lsn,
105 0 : )
106 0 : }
107 : }
108 :
109 : impl Summary {
110 26 : pub(super) fn expected(
111 26 : tenant_id: TenantId,
112 26 : timeline_id: TimelineId,
113 26 : key_range: Range<Key>,
114 26 : lsn: Lsn,
115 26 : ) -> Self {
116 26 : Self {
117 26 : magic: IMAGE_FILE_MAGIC,
118 26 : format_version: STORAGE_FORMAT_VERSION,
119 26 : tenant_id,
120 26 : timeline_id,
121 26 : key_range,
122 26 : lsn,
123 26 :
124 26 : index_start_blk: 0,
125 26 : index_root_blk: 0,
126 26 : }
127 26 : }
128 : }
129 :
130 : /// This is used only from `pagectl`. Within pageserver, all layers are
131 : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
132 : pub struct ImageLayer {
133 : path: Utf8PathBuf,
134 : pub desc: PersistentLayerDesc,
135 : // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
136 : pub lsn: Lsn,
137 : access_stats: LayerAccessStats,
138 : inner: OnceCell<ImageLayerInner>,
139 : }
140 :
141 : impl std::fmt::Debug for ImageLayer {
142 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 0 : use super::RangeDisplayDebug;
144 0 :
145 0 : f.debug_struct("ImageLayer")
146 0 : .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
147 0 : .field("file_size", &self.desc.file_size)
148 0 : .field("lsn", &self.lsn)
149 0 : .field("inner", &self.inner)
150 0 : .finish()
151 0 : }
152 : }
153 :
154 : /// ImageLayer is the in-memory data structure associated with an on-disk image
155 : /// file.
156 : pub struct ImageLayerInner {
157 : // values copied from summary
158 : index_start_blk: u32,
159 : index_root_blk: u32,
160 :
161 : lsn: Lsn,
162 :
163 : file: VirtualFile,
164 : file_id: FileId,
165 :
166 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
167 : }
168 :
169 : impl std::fmt::Debug for ImageLayerInner {
170 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 0 : f.debug_struct("ImageLayerInner")
172 0 : .field("index_start_blk", &self.index_start_blk)
173 0 : .field("index_root_blk", &self.index_root_blk)
174 0 : .finish()
175 0 : }
176 : }
177 :
178 : impl ImageLayerInner {
179 0 : pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
180 0 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
181 0 : let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
182 0 : self.index_start_blk,
183 0 : self.index_root_blk,
184 0 : block_reader,
185 0 : );
186 0 :
187 0 : tree_reader.dump().await?;
188 :
189 0 : tree_reader
190 0 : .visit(
191 0 : &[0u8; KEY_SIZE],
192 0 : VisitDirection::Forwards,
193 0 : |key, value| {
194 0 : println!("key: {} offset {}", hex::encode(key), value);
195 0 : true
196 0 : },
197 0 : ctx,
198 0 : )
199 0 : .await?;
200 :
201 0 : Ok(())
202 0 : }
203 : }
204 :
205 : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
206 : impl std::fmt::Display for ImageLayer {
207 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 0 : write!(f, "{}", self.layer_desc().short_id())
209 0 : }
210 : }
211 :
212 : impl AsLayerDesc for ImageLayer {
213 0 : fn layer_desc(&self) -> &PersistentLayerDesc {
214 0 : &self.desc
215 0 : }
216 : }
217 :
218 : impl ImageLayer {
219 0 : pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
220 0 : self.desc.dump();
221 0 :
222 0 : if !verbose {
223 0 : return Ok(());
224 0 : }
225 :
226 0 : let inner = self.load(LayerAccessKind::Dump, ctx).await?;
227 :
228 0 : inner.dump(ctx).await?;
229 :
230 0 : Ok(())
231 0 : }
232 :
233 116 : fn temp_path_for(
234 116 : conf: &PageServerConf,
235 116 : timeline_id: TimelineId,
236 116 : tenant_shard_id: TenantShardId,
237 116 : fname: &ImageLayerName,
238 116 : ) -> Utf8PathBuf {
239 116 : let rand_string: String = rand::thread_rng()
240 116 : .sample_iter(&Alphanumeric)
241 116 : .take(8)
242 116 : .map(char::from)
243 116 : .collect();
244 116 :
245 116 : conf.timeline_path(&tenant_shard_id, &timeline_id)
246 116 : .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
247 116 : }
248 :
249 : ///
250 : /// Open the underlying file and read the metadata into memory, if it's
251 : /// not loaded already.
252 : ///
253 0 : async fn load(
254 0 : &self,
255 0 : access_kind: LayerAccessKind,
256 0 : ctx: &RequestContext,
257 0 : ) -> Result<&ImageLayerInner> {
258 0 : self.access_stats.record_access(access_kind, ctx);
259 0 : self.inner
260 0 : .get_or_try_init(|| self.load_inner(ctx))
261 0 : .await
262 0 : .with_context(|| format!("Failed to load image layer {}", self.path()))
263 0 : }
264 :
265 0 : async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
266 0 : let path = self.path();
267 :
268 0 : let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
269 0 : .await
270 0 : .and_then(|res| res)?;
271 :
272 : // not production code
273 0 : let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
274 0 : let expected_layer_name = self.layer_desc().layer_name();
275 0 :
276 0 : if actual_layer_name != expected_layer_name {
277 0 : println!("warning: filename does not match what is expected from in-file summary");
278 0 : println!("actual: {:?}", actual_layer_name.to_string());
279 0 : println!("expected: {:?}", expected_layer_name.to_string());
280 0 : }
281 :
282 0 : Ok(loaded)
283 0 : }
284 :
285 : /// Create an ImageLayer struct representing an existing file on disk.
286 : ///
287 : /// This variant is only used for debugging purposes, by the 'pagectl' binary.
288 0 : pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
289 0 : let mut summary_buf = vec![0; PAGE_SZ];
290 0 : file.read_exact_at(&mut summary_buf, 0)?;
291 0 : let summary = Summary::des_prefix(&summary_buf)?;
292 0 : let metadata = file
293 0 : .metadata()
294 0 : .context("get file metadata to determine size")?;
295 :
296 : // This function is never used for constructing layers in a running pageserver,
297 : // so it does not need an accurate TenantShardId.
298 0 : let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
299 0 :
300 0 : Ok(ImageLayer {
301 0 : path: path.to_path_buf(),
302 0 : desc: PersistentLayerDesc::new_img(
303 0 : tenant_shard_id,
304 0 : summary.timeline_id,
305 0 : summary.key_range,
306 0 : summary.lsn,
307 0 : metadata.len(),
308 0 : ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
309 0 : lsn: summary.lsn,
310 0 : access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
311 0 : inner: OnceCell::new(),
312 0 : })
313 0 : }
314 :
315 0 : fn path(&self) -> Utf8PathBuf {
316 0 : self.path.clone()
317 0 : }
318 : }
319 :
320 0 : #[derive(thiserror::Error, Debug)]
321 : pub enum RewriteSummaryError {
322 : #[error("magic mismatch")]
323 : MagicMismatch,
324 : #[error(transparent)]
325 : Other(#[from] anyhow::Error),
326 : }
327 :
328 : impl From<std::io::Error> for RewriteSummaryError {
329 0 : fn from(e: std::io::Error) -> Self {
330 0 : Self::Other(anyhow::anyhow!(e))
331 0 : }
332 : }
333 :
334 : impl ImageLayer {
335 0 : pub async fn rewrite_summary<F>(
336 0 : path: &Utf8Path,
337 0 : rewrite: F,
338 0 : ctx: &RequestContext,
339 0 : ) -> Result<(), RewriteSummaryError>
340 0 : where
341 0 : F: Fn(Summary) -> Summary,
342 0 : {
343 0 : let mut file = VirtualFile::open_with_options(
344 0 : path,
345 0 : virtual_file::OpenOptions::new().read(true).write(true),
346 0 : )
347 0 : .await
348 0 : .with_context(|| format!("Failed to open file '{}'", path))?;
349 0 : let file_id = page_cache::next_file_id();
350 0 : let block_reader = FileBlockReader::new(&file, file_id);
351 0 : let summary_blk = block_reader.read_blk(0, ctx).await?;
352 0 : let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
353 0 : if actual_summary.magic != IMAGE_FILE_MAGIC {
354 0 : return Err(RewriteSummaryError::MagicMismatch);
355 0 : }
356 0 :
357 0 : let new_summary = rewrite(actual_summary);
358 0 :
359 0 : let mut buf = Vec::with_capacity(PAGE_SZ);
360 0 : // TODO: could use smallvec here but it's a pain with Slice<T>
361 0 : Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
362 0 : file.seek(SeekFrom::Start(0)).await?;
363 0 : let (_buf, res) = file.write_all(buf, ctx).await;
364 0 : res?;
365 0 : Ok(())
366 0 : }
367 : }
368 :
369 : impl ImageLayerInner {
370 : /// Returns nested result following Result<Result<_, OpErr>, Critical>:
371 : /// - inner has the success or transient failure
372 : /// - outer has the permanent failure
373 26 : pub(super) async fn load(
374 26 : path: &Utf8Path,
375 26 : lsn: Lsn,
376 26 : summary: Option<Summary>,
377 26 : max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
378 26 : ctx: &RequestContext,
379 26 : ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
380 26 : let file = match VirtualFile::open(path).await {
381 26 : Ok(file) => file,
382 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
383 : };
384 26 : let file_id = page_cache::next_file_id();
385 26 : let block_reader = FileBlockReader::new(&file, file_id);
386 26 : let summary_blk = match block_reader.read_blk(0, ctx).await {
387 26 : Ok(blk) => blk,
388 0 : Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
389 : };
390 :
391 : // length is the only way how this could fail, so it's not actually likely at all unless
392 : // read_blk returns wrong sized block.
393 : //
394 : // TODO: confirm and make this into assertion
395 26 : let actual_summary =
396 26 : Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
397 :
398 26 : if let Some(mut expected_summary) = summary {
399 : // production code path
400 26 : expected_summary.index_start_blk = actual_summary.index_start_blk;
401 26 : expected_summary.index_root_blk = actual_summary.index_root_blk;
402 26 : // mask out the timeline_id, but still require the layers to be from the same tenant
403 26 : expected_summary.timeline_id = actual_summary.timeline_id;
404 26 :
405 26 : if actual_summary != expected_summary {
406 0 : bail!(
407 0 : "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
408 0 : actual_summary,
409 0 : expected_summary
410 0 : );
411 26 : }
412 0 : }
413 :
414 26 : Ok(Ok(ImageLayerInner {
415 26 : index_start_blk: actual_summary.index_start_blk,
416 26 : index_root_blk: actual_summary.index_root_blk,
417 26 : lsn,
418 26 : file,
419 26 : file_id,
420 26 : max_vectored_read_bytes,
421 26 : }))
422 26 : }
423 :
424 526 : pub(super) async fn get_value_reconstruct_data(
425 526 : &self,
426 526 : key: Key,
427 526 : reconstruct_state: &mut ValueReconstructState,
428 526 : ctx: &RequestContext,
429 526 : ) -> anyhow::Result<ValueReconstructResult> {
430 526 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
431 526 : let tree_reader =
432 526 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
433 526 :
434 526 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
435 526 : key.write_to_byte_slice(&mut keybuf);
436 526 : if let Some(offset) = tree_reader
437 526 : .get(
438 526 : &keybuf,
439 526 : &RequestContextBuilder::extend(ctx)
440 526 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
441 526 : .build(),
442 526 : )
443 299 : .await?
444 : {
445 522 : let blob = block_reader
446 522 : .block_cursor()
447 522 : .read_blob(
448 522 : offset,
449 522 : &RequestContextBuilder::extend(ctx)
450 522 : .page_content_kind(PageContentKind::ImageLayerValue)
451 522 : .build(),
452 522 : )
453 158 : .await
454 522 : .with_context(|| format!("failed to read value from offset {}", offset))?;
455 522 : let value = Bytes::from(blob);
456 522 :
457 522 : reconstruct_state.img = Some((self.lsn, value));
458 522 : Ok(ValueReconstructResult::Complete)
459 : } else {
460 4 : Ok(ValueReconstructResult::Missing)
461 : }
462 526 : }
463 :
464 : // Look up the keys in the provided keyspace and update
465 : // the reconstruct state with whatever is found.
466 4 : pub(super) async fn get_values_reconstruct_data(
467 4 : &self,
468 4 : keyspace: KeySpace,
469 4 : reconstruct_state: &mut ValuesReconstructState,
470 4 : ctx: &RequestContext,
471 4 : ) -> Result<(), GetVectoredError> {
472 4 : let reads = self
473 4 : .plan_reads(keyspace, ctx)
474 1 : .await
475 4 : .map_err(GetVectoredError::Other)?;
476 :
477 4 : self.do_reads_and_update_state(reads, reconstruct_state)
478 0 : .await;
479 :
480 4 : Ok(())
481 4 : }
482 :
483 4 : async fn plan_reads(
484 4 : &self,
485 4 : keyspace: KeySpace,
486 4 : ctx: &RequestContext,
487 4 : ) -> anyhow::Result<Vec<VectoredRead>> {
488 4 : let mut planner = VectoredReadPlanner::new(
489 4 : self.max_vectored_read_bytes
490 4 : .expect("Layer is loaded with max vectored bytes config")
491 4 : .0
492 4 : .into(),
493 4 : );
494 4 :
495 4 : let block_reader = FileBlockReader::new(&self.file, self.file_id);
496 4 : let tree_reader =
497 4 : DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
498 4 :
499 4 : let ctx = RequestContextBuilder::extend(ctx)
500 4 : .page_content_kind(PageContentKind::ImageLayerBtreeNode)
501 4 : .build();
502 :
503 4 : for range in keyspace.ranges.iter() {
504 4 : let mut range_end_handled = false;
505 4 :
506 4 : let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
507 4 : range.start.write_to_byte_slice(&mut search_key);
508 4 :
509 4 : let index_stream = tree_reader.get_stream_from(&search_key, &ctx);
510 4 : let mut index_stream = std::pin::pin!(index_stream);
511 :
512 4 : while let Some(index_entry) = index_stream.next().await {
513 4 : let (raw_key, offset) = index_entry?;
514 :
515 4 : let key = Key::from_slice(&raw_key[..KEY_SIZE]);
516 4 : assert!(key >= range.start);
517 :
518 4 : if key >= range.end {
519 4 : planner.handle_range_end(offset);
520 4 : range_end_handled = true;
521 4 : break;
522 0 : } else {
523 0 : planner.handle(key, self.lsn, offset, BlobFlag::None);
524 0 : }
525 : }
526 :
527 4 : if !range_end_handled {
528 0 : let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
529 0 : planner.handle_range_end(payload_end);
530 4 : }
531 : }
532 :
533 4 : Ok(planner.finish())
534 4 : }
535 :
536 4 : async fn do_reads_and_update_state(
537 4 : &self,
538 4 : reads: Vec<VectoredRead>,
539 4 : reconstruct_state: &mut ValuesReconstructState,
540 4 : ) {
541 4 : let max_vectored_read_bytes = self
542 4 : .max_vectored_read_bytes
543 4 : .expect("Layer is loaded with max vectored bytes config")
544 4 : .0
545 4 : .into();
546 4 :
547 4 : let vectored_blob_reader = VectoredBlobReader::new(&self.file);
548 4 : for read in reads.into_iter() {
549 0 : let buf_size = read.size();
550 0 :
551 0 : if buf_size > max_vectored_read_bytes {
552 : // If the read is oversized, it should only contain one key.
553 0 : let offenders = read
554 0 : .blobs_at
555 0 : .as_slice()
556 0 : .iter()
557 0 : .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
558 0 : .join(", ");
559 0 : tracing::warn!(
560 0 : "Oversized vectored read ({} > {}) for keys {}",
561 : buf_size,
562 : max_vectored_read_bytes,
563 : offenders
564 : );
565 0 : }
566 :
567 0 : let buf = BytesMut::with_capacity(buf_size);
568 0 : let res = vectored_blob_reader.read_blobs(&read, buf).await;
569 :
570 0 : match res {
571 0 : Ok(blobs_buf) => {
572 0 : let frozen_buf = blobs_buf.buf.freeze();
573 :
574 0 : for meta in blobs_buf.blobs.iter() {
575 0 : let img_buf = frozen_buf.slice(meta.start..meta.end);
576 0 : reconstruct_state.update_key(
577 0 : &meta.meta.key,
578 0 : self.lsn,
579 0 : Value::Image(img_buf),
580 0 : );
581 0 : }
582 : }
583 0 : Err(err) => {
584 0 : let kind = err.kind();
585 0 : for (_, blob_meta) in read.blobs_at.as_slice() {
586 0 : reconstruct_state.on_key_error(
587 0 : blob_meta.key,
588 0 : PageReconstructError::from(anyhow!(
589 0 : "Failed to read blobs from virtual file {}: {}",
590 0 : self.file.path,
591 0 : kind
592 0 : )),
593 0 : );
594 0 : }
595 : }
596 : };
597 : }
598 4 : }
599 : }
600 :
601 : /// A builder object for constructing a new image layer.
602 : ///
603 : /// Usage:
604 : ///
605 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
606 : ///
607 : /// 2. Write the contents by calling `put_page_image` for every key-value
608 : /// pair in the key range.
609 : ///
610 : /// 3. Call `finish`.
611 : ///
612 : struct ImageLayerWriterInner {
613 : conf: &'static PageServerConf,
614 : path: Utf8PathBuf,
615 : timeline_id: TimelineId,
616 : tenant_shard_id: TenantShardId,
617 : key_range: Range<Key>,
618 : lsn: Lsn,
619 :
620 : blob_writer: BlobWriter<false>,
621 : tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
622 : }
623 :
624 : impl ImageLayerWriterInner {
625 : ///
626 : /// Start building a new image layer.
627 : ///
628 116 : async fn new(
629 116 : conf: &'static PageServerConf,
630 116 : timeline_id: TimelineId,
631 116 : tenant_shard_id: TenantShardId,
632 116 : key_range: &Range<Key>,
633 116 : lsn: Lsn,
634 116 : ) -> anyhow::Result<Self> {
635 116 : // Create the file initially with a temporary filename.
636 116 : // We'll atomically rename it to the final name when we're done.
637 116 : let path = ImageLayer::temp_path_for(
638 116 : conf,
639 116 : timeline_id,
640 116 : tenant_shard_id,
641 116 : &ImageLayerName {
642 116 : key_range: key_range.clone(),
643 116 : lsn,
644 116 : },
645 116 : );
646 116 : info!("new image layer {path}");
647 116 : let mut file = {
648 116 : VirtualFile::open_with_options(
649 116 : &path,
650 116 : virtual_file::OpenOptions::new()
651 116 : .write(true)
652 116 : .create_new(true),
653 116 : )
654 108 : .await?
655 : };
656 : // make room for the header block
657 116 : file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
658 116 : let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
659 116 :
660 116 : // Initialize the b-tree index builder
661 116 : let block_buf = BlockBuf::new();
662 116 : let tree_builder = DiskBtreeBuilder::new(block_buf);
663 116 :
664 116 : let writer = Self {
665 116 : conf,
666 116 : path,
667 116 : timeline_id,
668 116 : tenant_shard_id,
669 116 : key_range: key_range.clone(),
670 116 : lsn,
671 116 : tree: tree_builder,
672 116 : blob_writer,
673 116 : };
674 116 :
675 116 : Ok(writer)
676 116 : }
677 :
678 : ///
679 : /// Write next value to the file.
680 : ///
681 : /// The page versions must be appended in blknum order.
682 : ///
683 816 : async fn put_image(
684 816 : &mut self,
685 816 : key: Key,
686 816 : img: Bytes,
687 816 : ctx: &RequestContext,
688 816 : ) -> anyhow::Result<()> {
689 816 : ensure!(self.key_range.contains(&key));
690 870 : let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
691 : // TODO: re-use the buffer for `img` further upstack
692 816 : let off = res?;
693 :
694 816 : let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
695 816 : key.write_to_byte_slice(&mut keybuf);
696 816 : self.tree.append(&keybuf, off)?;
697 :
698 816 : Ok(())
699 816 : }
700 :
701 : ///
702 : /// Finish writing the image layer.
703 : ///
704 116 : async fn finish(
705 116 : self,
706 116 : timeline: &Arc<Timeline>,
707 116 : ctx: &RequestContext,
708 116 : ) -> anyhow::Result<ResidentLayer> {
709 116 : let index_start_blk =
710 116 : ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
711 116 :
712 116 : let mut file = self.blob_writer.into_inner();
713 116 :
714 116 : // Write out the index
715 116 : file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
716 0 : .await?;
717 116 : let (index_root_blk, block_buf) = self.tree.finish()?;
718 232 : for buf in block_buf.blocks {
719 116 : let (_buf, res) = file.write_all(buf, ctx).await;
720 116 : res?;
721 : }
722 :
723 : // Fill in the summary on blk 0
724 116 : let summary = Summary {
725 116 : magic: IMAGE_FILE_MAGIC,
726 116 : format_version: STORAGE_FORMAT_VERSION,
727 116 : tenant_id: self.tenant_shard_id.tenant_id,
728 116 : timeline_id: self.timeline_id,
729 116 : key_range: self.key_range.clone(),
730 116 : lsn: self.lsn,
731 116 : index_start_blk,
732 116 : index_root_blk,
733 116 : };
734 116 :
735 116 : let mut buf = Vec::with_capacity(PAGE_SZ);
736 116 : // TODO: could use smallvec here but it's a pain with Slice<T>
737 116 : Summary::ser_into(&summary, &mut buf)?;
738 116 : file.seek(SeekFrom::Start(0)).await?;
739 116 : let (_buf, res) = file.write_all(buf, ctx).await;
740 116 : res?;
741 :
742 116 : let metadata = file
743 116 : .metadata()
744 58 : .await
745 116 : .context("get metadata to determine file size")?;
746 :
747 116 : let desc = PersistentLayerDesc::new_img(
748 116 : self.tenant_shard_id,
749 116 : self.timeline_id,
750 116 : self.key_range.clone(),
751 116 : self.lsn,
752 116 : metadata.len(),
753 116 : );
754 116 :
755 116 : // Note: Because we open the file in write-only mode, we cannot
756 116 : // reuse the same VirtualFile for reading later. That's why we don't
757 116 : // set inner.file here. The first read will have to re-open it.
758 116 :
759 116 : // fsync the file
760 116 : file.sync_all().await?;
761 :
762 : // FIXME: why not carry the virtualfile here, it supports renaming?
763 116 : let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
764 :
765 116 : trace!("created image layer {}", layer.local_path());
766 :
767 116 : Ok(layer)
768 116 : }
769 : }
770 :
771 : /// A builder object for constructing a new image layer.
772 : ///
773 : /// Usage:
774 : ///
775 : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
776 : ///
777 : /// 2. Write the contents by calling `put_page_image` for every key-value
778 : /// pair in the key range.
779 : ///
780 : /// 3. Call `finish`.
781 : ///
782 : /// # Note
783 : ///
784 : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
785 : /// possible for the writer to drop before `finish` is actually called. So this
786 : /// could lead to odd temporary files in the directory, exhausting file system.
787 : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
788 : /// implementation that cleans up the temporary file in failure. It's not
789 : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
790 : /// out some fields, making it impossible to implement `Drop`.
791 : ///
792 : #[must_use]
793 : pub struct ImageLayerWriter {
794 : inner: Option<ImageLayerWriterInner>,
795 : }
796 :
797 : impl ImageLayerWriter {
798 : ///
799 : /// Start building a new image layer.
800 : ///
801 116 : pub async fn new(
802 116 : conf: &'static PageServerConf,
803 116 : timeline_id: TimelineId,
804 116 : tenant_shard_id: TenantShardId,
805 116 : key_range: &Range<Key>,
806 116 : lsn: Lsn,
807 116 : ) -> anyhow::Result<ImageLayerWriter> {
808 116 : Ok(Self {
809 116 : inner: Some(
810 116 : ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
811 108 : .await?,
812 : ),
813 : })
814 116 : }
815 :
816 : ///
817 : /// Write next value to the file.
818 : ///
819 : /// The page versions must be appended in blknum order.
820 : ///
821 816 : pub async fn put_image(
822 816 : &mut self,
823 816 : key: Key,
824 816 : img: Bytes,
825 816 : ctx: &RequestContext,
826 816 : ) -> anyhow::Result<()> {
827 870 : self.inner.as_mut().unwrap().put_image(key, img, ctx).await
828 816 : }
829 :
830 : ///
831 : /// Finish writing the image layer.
832 : ///
833 116 : pub(crate) async fn finish(
834 116 : mut self,
835 116 : timeline: &Arc<Timeline>,
836 116 : ctx: &RequestContext,
837 116 : ) -> anyhow::Result<super::ResidentLayer> {
838 232 : self.inner.take().unwrap().finish(timeline, ctx).await
839 116 : }
840 : }
841 :
842 : impl Drop for ImageLayerWriter {
843 116 : fn drop(&mut self) {
844 116 : if let Some(inner) = self.inner.take() {
845 0 : inner.blob_writer.into_inner().remove();
846 116 : }
847 116 : }
848 : }
|