LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - image_layer.rs (source / functions) Coverage Total Hit
Test: 36bb8dd7c7efcb53483d1a7d9f7cb33e8406dcf0.info Lines: 45.9 % 505 232
Test Date: 2024-04-08 10:22:05 Functions: 31.3 % 67 21

            Line data    Source code
       1              : //! An ImageLayer represents an image or a snapshot of a key-range at
       2              : //! one particular LSN. It contains an image of all key-value pairs
       3              : //! in its key-range. Any key that falls into the image layer's range
       4              : //! but does not exist in the layer, does not exist.
       5              : //!
       6              : //! An image layer is stored in a file on disk. The file is stored in
       7              : //! timelines/<timeline_id> directory.  Currently, there are no
       8              : //! subdirectories, and each image layer file is named like this:
       9              : //!
      10              : //! ```text
      11              : //!    <key start>-<key end>__<LSN>
      12              : //! ```
      13              : //!
      14              : //! For example:
      15              : //!
      16              : //! ```text
      17              : //!    000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
      18              : //! ```
      19              : //!
      20              : //! Every image layer file consists of three parts: "summary",
      21              : //! "index", and "values".  The summary is a fixed size header at the
      22              : //! beginning of the file, and it contains basic information about the
      23              : //! layer, and offsets to the other parts. The "index" is a B-tree,
      24              : //! mapping from Key to an offset in the "values" part.  The
      25              : //! actual page images are stored in the "values" part.
      26              : use crate::config::PageServerConf;
      27              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      28              : use crate::page_cache::{self, FileId, PAGE_SZ};
      29              : use crate::repository::{Key, Value, KEY_SIZE};
      30              : use crate::tenant::blob_io::BlobWriter;
      31              : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
      32              : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
      33              : use crate::tenant::storage_layer::{
      34              :     LayerAccessStats, ValueReconstructResult, ValueReconstructState,
      35              : };
      36              : use crate::tenant::timeline::GetVectoredError;
      37              : use crate::tenant::vectored_blob_io::{
      38              :     BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
      39              : };
      40              : use crate::tenant::{PageReconstructError, Timeline};
      41              : use crate::virtual_file::{self, VirtualFile};
      42              : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
      43              : use anyhow::{anyhow, bail, ensure, Context, Result};
      44              : use bytes::{Bytes, BytesMut};
      45              : use camino::{Utf8Path, Utf8PathBuf};
      46              : use hex;
      47              : use itertools::Itertools;
      48              : use pageserver_api::keyspace::KeySpace;
      49              : use pageserver_api::models::LayerAccessKind;
      50              : use pageserver_api::shard::TenantShardId;
      51              : use rand::{distributions::Alphanumeric, Rng};
      52              : use serde::{Deserialize, Serialize};
      53              : use std::fs::File;
      54              : use std::io::SeekFrom;
      55              : use std::ops::Range;
      56              : use std::os::unix::prelude::FileExt;
      57              : use std::sync::Arc;
      58              : use tokio::sync::OnceCell;
      59              : use tokio_stream::StreamExt;
      60              : use tracing::*;
      61              : 
      62              : use utils::{
      63              :     bin_ser::BeSer,
      64              :     id::{TenantId, TimelineId},
      65              :     lsn::Lsn,
      66              : };
      67              : 
      68              : use super::filename::ImageFileName;
      69              : use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer, ValuesReconstructState};
      70              : 
      71              : ///
      72              : /// Header stored in the beginning of the file
      73              : ///
      74              : /// After this comes the 'values' part, starting on block 1. After that,
      75              : /// the 'index' starts at the block indicated by 'index_start_blk'
      76              : ///
      77           24 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      78              : pub struct Summary {
      79              :     /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
      80              :     pub magic: u16,
      81              :     pub format_version: u16,
      82              : 
      83              :     pub tenant_id: TenantId,
      84              :     pub timeline_id: TimelineId,
      85              :     pub key_range: Range<Key>,
      86              :     pub lsn: Lsn,
      87              : 
      88              :     /// Block number where the 'index' part of the file begins.
      89              :     pub index_start_blk: u32,
      90              :     /// Block within the 'index', where the B-tree root page is stored
      91              :     pub index_root_blk: u32,
      92              :     // the 'values' part starts after the summary header, on block 1.
      93              : }
      94              : 
      95              : impl From<&ImageLayer> for Summary {
      96            0 :     fn from(layer: &ImageLayer) -> Self {
      97            0 :         Self::expected(
      98            0 :             layer.desc.tenant_shard_id.tenant_id,
      99            0 :             layer.desc.timeline_id,
     100            0 :             layer.desc.key_range.clone(),
     101            0 :             layer.lsn,
     102            0 :         )
     103            0 :     }
     104              : }
     105              : 
     106              : impl Summary {
     107           24 :     pub(super) fn expected(
     108           24 :         tenant_id: TenantId,
     109           24 :         timeline_id: TimelineId,
     110           24 :         key_range: Range<Key>,
     111           24 :         lsn: Lsn,
     112           24 :     ) -> Self {
     113           24 :         Self {
     114           24 :             magic: IMAGE_FILE_MAGIC,
     115           24 :             format_version: STORAGE_FORMAT_VERSION,
     116           24 :             tenant_id,
     117           24 :             timeline_id,
     118           24 :             key_range,
     119           24 :             lsn,
     120           24 : 
     121           24 :             index_start_blk: 0,
     122           24 :             index_root_blk: 0,
     123           24 :         }
     124           24 :     }
     125              : }
     126              : 
     127              : /// This is used only from `pagectl`. Within pageserver, all layers are
     128              : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
     129              : pub struct ImageLayer {
     130              :     path: Utf8PathBuf,
     131              :     pub desc: PersistentLayerDesc,
     132              :     // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
     133              :     pub lsn: Lsn,
     134              :     access_stats: LayerAccessStats,
     135              :     inner: OnceCell<ImageLayerInner>,
     136              : }
     137              : 
     138              : impl std::fmt::Debug for ImageLayer {
     139            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     140            0 :         use super::RangeDisplayDebug;
     141            0 : 
     142            0 :         f.debug_struct("ImageLayer")
     143            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     144            0 :             .field("file_size", &self.desc.file_size)
     145            0 :             .field("lsn", &self.lsn)
     146            0 :             .field("inner", &self.inner)
     147            0 :             .finish()
     148            0 :     }
     149              : }
     150              : 
     151              : /// ImageLayer is the in-memory data structure associated with an on-disk image
     152              : /// file.
     153              : pub struct ImageLayerInner {
     154              :     // values copied from summary
     155              :     index_start_blk: u32,
     156              :     index_root_blk: u32,
     157              : 
     158              :     lsn: Lsn,
     159              : 
     160              :     file: VirtualFile,
     161              :     file_id: FileId,
     162              : 
     163              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     164              : }
     165              : 
     166              : impl std::fmt::Debug for ImageLayerInner {
     167            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     168            0 :         f.debug_struct("ImageLayerInner")
     169            0 :             .field("index_start_blk", &self.index_start_blk)
     170            0 :             .field("index_root_blk", &self.index_root_blk)
     171            0 :             .finish()
     172            0 :     }
     173              : }
     174              : 
     175              : impl ImageLayerInner {
     176            0 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
     177            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     178            0 :         let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
     179            0 :             self.index_start_blk,
     180            0 :             self.index_root_blk,
     181            0 :             block_reader,
     182            0 :         );
     183            0 : 
     184            0 :         tree_reader.dump().await?;
     185              : 
     186            0 :         tree_reader
     187            0 :             .visit(
     188            0 :                 &[0u8; KEY_SIZE],
     189            0 :                 VisitDirection::Forwards,
     190            0 :                 |key, value| {
     191            0 :                     println!("key: {} offset {}", hex::encode(key), value);
     192            0 :                     true
     193            0 :                 },
     194            0 :                 ctx,
     195            0 :             )
     196            0 :             .await?;
     197              : 
     198            0 :         Ok(())
     199            0 :     }
     200              : }
     201              : 
     202              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     203              : impl std::fmt::Display for ImageLayer {
     204            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     205            0 :         write!(f, "{}", self.layer_desc().short_id())
     206            0 :     }
     207              : }
     208              : 
     209              : impl AsLayerDesc for ImageLayer {
     210            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     211            0 :         &self.desc
     212            0 :     }
     213              : }
     214              : 
     215              : impl ImageLayer {
     216            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     217            0 :         self.desc.dump();
     218            0 : 
     219            0 :         if !verbose {
     220            0 :             return Ok(());
     221            0 :         }
     222              : 
     223            0 :         let inner = self.load(LayerAccessKind::Dump, ctx).await?;
     224              : 
     225            0 :         inner.dump(ctx).await?;
     226              : 
     227            0 :         Ok(())
     228            0 :     }
     229              : 
     230          104 :     fn temp_path_for(
     231          104 :         conf: &PageServerConf,
     232          104 :         timeline_id: TimelineId,
     233          104 :         tenant_shard_id: TenantShardId,
     234          104 :         fname: &ImageFileName,
     235          104 :     ) -> Utf8PathBuf {
     236          104 :         let rand_string: String = rand::thread_rng()
     237          104 :             .sample_iter(&Alphanumeric)
     238          104 :             .take(8)
     239          104 :             .map(char::from)
     240          104 :             .collect();
     241          104 : 
     242          104 :         conf.timeline_path(&tenant_shard_id, &timeline_id)
     243          104 :             .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
     244          104 :     }
     245              : 
     246              :     ///
     247              :     /// Open the underlying file and read the metadata into memory, if it's
     248              :     /// not loaded already.
     249              :     ///
     250            0 :     async fn load(
     251            0 :         &self,
     252            0 :         access_kind: LayerAccessKind,
     253            0 :         ctx: &RequestContext,
     254            0 :     ) -> Result<&ImageLayerInner> {
     255            0 :         self.access_stats.record_access(access_kind, ctx);
     256            0 :         self.inner
     257            0 :             .get_or_try_init(|| self.load_inner(ctx))
     258            0 :             .await
     259            0 :             .with_context(|| format!("Failed to load image layer {}", self.path()))
     260            0 :     }
     261              : 
     262            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
     263            0 :         let path = self.path();
     264              : 
     265            0 :         let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
     266            0 :             .await
     267            0 :             .and_then(|res| res)?;
     268              : 
     269              :         // not production code
     270            0 :         let actual_filename = path.file_name().unwrap().to_owned();
     271            0 :         let expected_filename = self.layer_desc().filename().file_name();
     272            0 : 
     273            0 :         if actual_filename != expected_filename {
     274            0 :             println!("warning: filename does not match what is expected from in-file summary");
     275            0 :             println!("actual: {:?}", actual_filename);
     276            0 :             println!("expected: {:?}", expected_filename);
     277            0 :         }
     278              : 
     279            0 :         Ok(loaded)
     280            0 :     }
     281              : 
     282              :     /// Create an ImageLayer struct representing an existing file on disk.
     283              :     ///
     284              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     285            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
     286            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     287            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     288            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     289            0 :         let metadata = file
     290            0 :             .metadata()
     291            0 :             .context("get file metadata to determine size")?;
     292              : 
     293              :         // This function is never used for constructing layers in a running pageserver,
     294              :         // so it does not need an accurate TenantShardId.
     295            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     296            0 : 
     297            0 :         Ok(ImageLayer {
     298            0 :             path: path.to_path_buf(),
     299            0 :             desc: PersistentLayerDesc::new_img(
     300            0 :                 tenant_shard_id,
     301            0 :                 summary.timeline_id,
     302            0 :                 summary.key_range,
     303            0 :                 summary.lsn,
     304            0 :                 metadata.len(),
     305            0 :             ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
     306            0 :             lsn: summary.lsn,
     307            0 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     308            0 :             inner: OnceCell::new(),
     309            0 :         })
     310            0 :     }
     311              : 
     312            0 :     fn path(&self) -> Utf8PathBuf {
     313            0 :         self.path.clone()
     314            0 :     }
     315              : }
     316              : 
     317            0 : #[derive(thiserror::Error, Debug)]
     318              : pub enum RewriteSummaryError {
     319              :     #[error("magic mismatch")]
     320              :     MagicMismatch,
     321              :     #[error(transparent)]
     322              :     Other(#[from] anyhow::Error),
     323              : }
     324              : 
     325              : impl From<std::io::Error> for RewriteSummaryError {
     326            0 :     fn from(e: std::io::Error) -> Self {
     327            0 :         Self::Other(anyhow::anyhow!(e))
     328            0 :     }
     329              : }
     330              : 
     331              : impl ImageLayer {
     332            0 :     pub async fn rewrite_summary<F>(
     333            0 :         path: &Utf8Path,
     334            0 :         rewrite: F,
     335            0 :         ctx: &RequestContext,
     336            0 :     ) -> Result<(), RewriteSummaryError>
     337            0 :     where
     338            0 :         F: Fn(Summary) -> Summary,
     339            0 :     {
     340            0 :         let mut file = VirtualFile::open_with_options(
     341            0 :             path,
     342            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     343            0 :         )
     344            0 :         .await
     345            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     346            0 :         let file_id = page_cache::next_file_id();
     347            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     348            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     349            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     350            0 :         if actual_summary.magic != IMAGE_FILE_MAGIC {
     351            0 :             return Err(RewriteSummaryError::MagicMismatch);
     352            0 :         }
     353            0 : 
     354            0 :         let new_summary = rewrite(actual_summary);
     355            0 : 
     356            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     357            0 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     358            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     359            0 :         file.seek(SeekFrom::Start(0)).await?;
     360            0 :         let (_buf, res) = file.write_all(buf).await;
     361            0 :         res?;
     362            0 :         Ok(())
     363            0 :     }
     364              : }
     365              : 
     366              : impl ImageLayerInner {
     367              :     /// Returns nested result following Result<Result<_, OpErr>, Critical>:
     368              :     /// - inner has the success or transient failure
     369              :     /// - outer has the permanent failure
     370           24 :     pub(super) async fn load(
     371           24 :         path: &Utf8Path,
     372           24 :         lsn: Lsn,
     373           24 :         summary: Option<Summary>,
     374           24 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     375           24 :         ctx: &RequestContext,
     376           24 :     ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
     377           24 :         let file = match VirtualFile::open(path).await {
     378           24 :             Ok(file) => file,
     379            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
     380              :         };
     381           24 :         let file_id = page_cache::next_file_id();
     382           24 :         let block_reader = FileBlockReader::new(&file, file_id);
     383           24 :         let summary_blk = match block_reader.read_blk(0, ctx).await {
     384           24 :             Ok(blk) => blk,
     385            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
     386              :         };
     387              : 
     388              :         // length is the only way how this could fail, so it's not actually likely at all unless
     389              :         // read_blk returns wrong sized block.
     390              :         //
     391              :         // TODO: confirm and make this into assertion
     392           24 :         let actual_summary =
     393           24 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     394              : 
     395           24 :         if let Some(mut expected_summary) = summary {
     396              :             // production code path
     397           24 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     398           24 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     399           24 : 
     400           24 :             if actual_summary != expected_summary {
     401            0 :                 bail!(
     402            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     403            0 :                     actual_summary,
     404            0 :                     expected_summary
     405            0 :                 );
     406           24 :             }
     407            0 :         }
     408              : 
     409           24 :         Ok(Ok(ImageLayerInner {
     410           24 :             index_start_blk: actual_summary.index_start_blk,
     411           24 :             index_root_blk: actual_summary.index_root_blk,
     412           24 :             lsn,
     413           24 :             file,
     414           24 :             file_id,
     415           24 :             max_vectored_read_bytes,
     416           24 :         }))
     417           24 :     }
     418              : 
     419          526 :     pub(super) async fn get_value_reconstruct_data(
     420          526 :         &self,
     421          526 :         key: Key,
     422          526 :         reconstruct_state: &mut ValueReconstructState,
     423          526 :         ctx: &RequestContext,
     424          526 :     ) -> anyhow::Result<ValueReconstructResult> {
     425          526 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     426          526 :         let tree_reader =
     427          526 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
     428          526 : 
     429          526 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     430          526 :         key.write_to_byte_slice(&mut keybuf);
     431          526 :         if let Some(offset) = tree_reader
     432          526 :             .get(
     433          526 :                 &keybuf,
     434          526 :                 &RequestContextBuilder::extend(ctx)
     435          526 :                     .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     436          526 :                     .build(),
     437          526 :             )
     438          295 :             .await?
     439              :         {
     440          522 :             let blob = block_reader
     441          522 :                 .block_cursor()
     442          522 :                 .read_blob(
     443          522 :                     offset,
     444          522 :                     &RequestContextBuilder::extend(ctx)
     445          522 :                         .page_content_kind(PageContentKind::ImageLayerValue)
     446          522 :                         .build(),
     447          522 :                 )
     448          153 :                 .await
     449          522 :                 .with_context(|| format!("failed to read value from offset {}", offset))?;
     450          522 :             let value = Bytes::from(blob);
     451          522 : 
     452          522 :             reconstruct_state.img = Some((self.lsn, value));
     453          522 :             Ok(ValueReconstructResult::Complete)
     454              :         } else {
     455            4 :             Ok(ValueReconstructResult::Missing)
     456              :         }
     457          526 :     }
     458              : 
     459              :     // Look up the keys in the provided keyspace and update
     460              :     // the reconstruct state with whatever is found.
     461            0 :     pub(super) async fn get_values_reconstruct_data(
     462            0 :         &self,
     463            0 :         keyspace: KeySpace,
     464            0 :         reconstruct_state: &mut ValuesReconstructState,
     465            0 :         ctx: &RequestContext,
     466            0 :     ) -> Result<(), GetVectoredError> {
     467            0 :         let reads = self
     468            0 :             .plan_reads(keyspace, ctx)
     469            0 :             .await
     470            0 :             .map_err(GetVectoredError::Other)?;
     471              : 
     472            0 :         self.do_reads_and_update_state(reads, reconstruct_state)
     473            0 :             .await;
     474              : 
     475            0 :         Ok(())
     476            0 :     }
     477              : 
     478            0 :     async fn plan_reads(
     479            0 :         &self,
     480            0 :         keyspace: KeySpace,
     481            0 :         ctx: &RequestContext,
     482            0 :     ) -> anyhow::Result<Vec<VectoredRead>> {
     483            0 :         let mut planner = VectoredReadPlanner::new(
     484            0 :             self.max_vectored_read_bytes
     485            0 :                 .expect("Layer is loaded with max vectored bytes config")
     486            0 :                 .0
     487            0 :                 .into(),
     488            0 :         );
     489            0 : 
     490            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     491            0 :         let tree_reader =
     492            0 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     493            0 : 
     494            0 :         let ctx = RequestContextBuilder::extend(ctx)
     495            0 :             .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     496            0 :             .build();
     497              : 
     498            0 :         for range in keyspace.ranges.iter() {
     499            0 :             let mut range_end_handled = false;
     500            0 : 
     501            0 :             let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     502            0 :             range.start.write_to_byte_slice(&mut search_key);
     503            0 : 
     504            0 :             let index_stream = tree_reader.get_stream_from(&search_key, &ctx);
     505            0 :             let mut index_stream = std::pin::pin!(index_stream);
     506              : 
     507            0 :             while let Some(index_entry) = index_stream.next().await {
     508            0 :                 let (raw_key, offset) = index_entry?;
     509              : 
     510            0 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     511            0 :                 assert!(key >= range.start);
     512              : 
     513            0 :                 if key >= range.end {
     514            0 :                     planner.handle_range_end(offset);
     515            0 :                     range_end_handled = true;
     516            0 :                     break;
     517            0 :                 } else {
     518            0 :                     planner.handle(key, self.lsn, offset, BlobFlag::None);
     519            0 :                 }
     520              :             }
     521              : 
     522            0 :             if !range_end_handled {
     523            0 :                 let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
     524            0 :                 planner.handle_range_end(payload_end);
     525            0 :             }
     526              :         }
     527              : 
     528            0 :         Ok(planner.finish())
     529            0 :     }
     530              : 
     531            0 :     async fn do_reads_and_update_state(
     532            0 :         &self,
     533            0 :         reads: Vec<VectoredRead>,
     534            0 :         reconstruct_state: &mut ValuesReconstructState,
     535            0 :     ) {
     536            0 :         let max_vectored_read_bytes = self
     537            0 :             .max_vectored_read_bytes
     538            0 :             .expect("Layer is loaded with max vectored bytes config")
     539            0 :             .0
     540            0 :             .into();
     541            0 : 
     542            0 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     543            0 :         for read in reads.into_iter() {
     544            0 :             let buf_size = read.size();
     545            0 : 
     546            0 :             if buf_size > max_vectored_read_bytes {
     547              :                 // If the read is oversized, it should only contain one key.
     548            0 :                 let offenders = read
     549            0 :                     .blobs_at
     550            0 :                     .as_slice()
     551            0 :                     .iter()
     552            0 :                     .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
     553            0 :                     .join(", ");
     554            0 :                 tracing::warn!(
     555            0 :                     "Oversized vectored read ({} > {}) for keys {}",
     556            0 :                     buf_size,
     557            0 :                     max_vectored_read_bytes,
     558            0 :                     offenders
     559            0 :                 );
     560            0 :             }
     561              : 
     562            0 :             let buf = BytesMut::with_capacity(buf_size);
     563            0 :             let res = vectored_blob_reader.read_blobs(&read, buf).await;
     564              : 
     565            0 :             match res {
     566            0 :                 Ok(blobs_buf) => {
     567            0 :                     let frozen_buf = blobs_buf.buf.freeze();
     568              : 
     569            0 :                     for meta in blobs_buf.blobs.iter() {
     570            0 :                         let img_buf = frozen_buf.slice(meta.start..meta.end);
     571            0 :                         reconstruct_state.update_key(
     572            0 :                             &meta.meta.key,
     573            0 :                             self.lsn,
     574            0 :                             Value::Image(img_buf),
     575            0 :                         );
     576            0 :                     }
     577              :                 }
     578            0 :                 Err(err) => {
     579            0 :                     let kind = err.kind();
     580            0 :                     for (_, blob_meta) in read.blobs_at.as_slice() {
     581            0 :                         reconstruct_state.on_key_error(
     582            0 :                             blob_meta.key,
     583            0 :                             PageReconstructError::from(anyhow!(
     584            0 :                                 "Failed to read blobs from virtual file {}: {}",
     585            0 :                                 self.file.path,
     586            0 :                                 kind
     587            0 :                             )),
     588            0 :                         );
     589            0 :                     }
     590              :                 }
     591              :             };
     592              :         }
     593            0 :     }
     594              : }
     595              : 
     596              : /// A builder object for constructing a new image layer.
     597              : ///
     598              : /// Usage:
     599              : ///
     600              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     601              : ///
     602              : /// 2. Write the contents by calling `put_page_image` for every key-value
     603              : ///    pair in the key range.
     604              : ///
     605              : /// 3. Call `finish`.
     606              : ///
     607              : struct ImageLayerWriterInner {
     608              :     conf: &'static PageServerConf,
     609              :     path: Utf8PathBuf,
     610              :     timeline_id: TimelineId,
     611              :     tenant_shard_id: TenantShardId,
     612              :     key_range: Range<Key>,
     613              :     lsn: Lsn,
     614              : 
     615              :     blob_writer: BlobWriter<false>,
     616              :     tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
     617              : }
     618              : 
     619              : impl ImageLayerWriterInner {
     620              :     ///
     621              :     /// Start building a new image layer.
     622              :     ///
     623          104 :     async fn new(
     624          104 :         conf: &'static PageServerConf,
     625          104 :         timeline_id: TimelineId,
     626          104 :         tenant_shard_id: TenantShardId,
     627          104 :         key_range: &Range<Key>,
     628          104 :         lsn: Lsn,
     629          104 :     ) -> anyhow::Result<Self> {
     630          104 :         // Create the file initially with a temporary filename.
     631          104 :         // We'll atomically rename it to the final name when we're done.
     632          104 :         let path = ImageLayer::temp_path_for(
     633          104 :             conf,
     634          104 :             timeline_id,
     635          104 :             tenant_shard_id,
     636          104 :             &ImageFileName {
     637          104 :                 key_range: key_range.clone(),
     638          104 :                 lsn,
     639          104 :             },
     640          104 :         );
     641          104 :         info!("new image layer {path}");
     642          104 :         let mut file = {
     643          104 :             VirtualFile::open_with_options(
     644          104 :                 &path,
     645          104 :                 virtual_file::OpenOptions::new()
     646          104 :                     .write(true)
     647          104 :                     .create_new(true),
     648          104 :             )
     649           97 :             .await?
     650              :         };
     651              :         // make room for the header block
     652          104 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     653          104 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     654          104 : 
     655          104 :         // Initialize the b-tree index builder
     656          104 :         let block_buf = BlockBuf::new();
     657          104 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     658          104 : 
     659          104 :         let writer = Self {
     660          104 :             conf,
     661          104 :             path,
     662          104 :             timeline_id,
     663          104 :             tenant_shard_id,
     664          104 :             key_range: key_range.clone(),
     665          104 :             lsn,
     666          104 :             tree: tree_builder,
     667          104 :             blob_writer,
     668          104 :         };
     669          104 : 
     670          104 :         Ok(writer)
     671          104 :     }
     672              : 
     673              :     ///
     674              :     /// Write next value to the file.
     675              :     ///
     676              :     /// The page versions must be appended in blknum order.
     677              :     ///
     678          720 :     async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
     679          720 :         ensure!(self.key_range.contains(&key));
     680          776 :         let (_img, res) = self.blob_writer.write_blob(img).await;
     681              :         // TODO: re-use the buffer for `img` further upstack
     682          720 :         let off = res?;
     683              : 
     684          720 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     685          720 :         key.write_to_byte_slice(&mut keybuf);
     686          720 :         self.tree.append(&keybuf, off)?;
     687              : 
     688          720 :         Ok(())
     689          720 :     }
     690              : 
     691              :     ///
     692              :     /// Finish writing the image layer.
     693              :     ///
     694          104 :     async fn finish(self, timeline: &Arc<Timeline>) -> anyhow::Result<ResidentLayer> {
     695          104 :         let index_start_blk =
     696          104 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     697          104 : 
     698          104 :         let mut file = self.blob_writer.into_inner();
     699          104 : 
     700          104 :         // Write out the index
     701          104 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     702            0 :             .await?;
     703          104 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     704          208 :         for buf in block_buf.blocks {
     705          104 :             let (_buf, res) = file.write_all(buf).await;
     706          104 :             res?;
     707              :         }
     708              : 
     709              :         // Fill in the summary on blk 0
     710          104 :         let summary = Summary {
     711          104 :             magic: IMAGE_FILE_MAGIC,
     712          104 :             format_version: STORAGE_FORMAT_VERSION,
     713          104 :             tenant_id: self.tenant_shard_id.tenant_id,
     714          104 :             timeline_id: self.timeline_id,
     715          104 :             key_range: self.key_range.clone(),
     716          104 :             lsn: self.lsn,
     717          104 :             index_start_blk,
     718          104 :             index_root_blk,
     719          104 :         };
     720          104 : 
     721          104 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     722          104 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     723          104 :         Summary::ser_into(&summary, &mut buf)?;
     724          104 :         file.seek(SeekFrom::Start(0)).await?;
     725          104 :         let (_buf, res) = file.write_all(buf).await;
     726          104 :         res?;
     727              : 
     728          104 :         let metadata = file
     729          104 :             .metadata()
     730           52 :             .await
     731          104 :             .context("get metadata to determine file size")?;
     732              : 
     733          104 :         let desc = PersistentLayerDesc::new_img(
     734          104 :             self.tenant_shard_id,
     735          104 :             self.timeline_id,
     736          104 :             self.key_range.clone(),
     737          104 :             self.lsn,
     738          104 :             metadata.len(),
     739          104 :         );
     740          104 : 
     741          104 :         // Note: Because we open the file in write-only mode, we cannot
     742          104 :         // reuse the same VirtualFile for reading later. That's why we don't
     743          104 :         // set inner.file here. The first read will have to re-open it.
     744          104 : 
     745          104 :         // fsync the file
     746          104 :         file.sync_all().await?;
     747              : 
     748              :         // FIXME: why not carry the virtualfile here, it supports renaming?
     749          104 :         let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
     750              : 
     751          104 :         trace!("created image layer {}", layer.local_path());
     752              : 
     753          104 :         Ok(layer)
     754          104 :     }
     755              : }
     756              : 
     757              : /// A builder object for constructing a new image layer.
     758              : ///
     759              : /// Usage:
     760              : ///
     761              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     762              : ///
     763              : /// 2. Write the contents by calling `put_page_image` for every key-value
     764              : ///    pair in the key range.
     765              : ///
     766              : /// 3. Call `finish`.
     767              : ///
     768              : /// # Note
     769              : ///
     770              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     771              : /// possible for the writer to drop before `finish` is actually called. So this
     772              : /// could lead to odd temporary files in the directory, exhausting file system.
     773              : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
     774              : /// implementation that cleans up the temporary file in failure. It's not
     775              : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
     776              : /// out some fields, making it impossible to implement `Drop`.
     777              : ///
     778              : #[must_use]
     779              : pub struct ImageLayerWriter {
     780              :     inner: Option<ImageLayerWriterInner>,
     781              : }
     782              : 
     783              : impl ImageLayerWriter {
     784              :     ///
     785              :     /// Start building a new image layer.
     786              :     ///
     787          104 :     pub async fn new(
     788          104 :         conf: &'static PageServerConf,
     789          104 :         timeline_id: TimelineId,
     790          104 :         tenant_shard_id: TenantShardId,
     791          104 :         key_range: &Range<Key>,
     792          104 :         lsn: Lsn,
     793          104 :     ) -> anyhow::Result<ImageLayerWriter> {
     794          104 :         Ok(Self {
     795          104 :             inner: Some(
     796          104 :                 ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
     797           97 :                     .await?,
     798              :             ),
     799              :         })
     800          104 :     }
     801              : 
     802              :     ///
     803              :     /// Write next value to the file.
     804              :     ///
     805              :     /// The page versions must be appended in blknum order.
     806              :     ///
     807          720 :     pub async fn put_image(&mut self, key: Key, img: Bytes) -> anyhow::Result<()> {
     808          776 :         self.inner.as_mut().unwrap().put_image(key, img).await
     809          720 :     }
     810              : 
     811              :     ///
     812              :     /// Finish writing the image layer.
     813              :     ///
     814          104 :     pub(crate) async fn finish(
     815          104 :         mut self,
     816          104 :         timeline: &Arc<Timeline>,
     817          104 :     ) -> anyhow::Result<super::ResidentLayer> {
     818          208 :         self.inner.take().unwrap().finish(timeline).await
     819          104 :     }
     820              : }
     821              : 
     822              : impl Drop for ImageLayerWriter {
     823          104 :     fn drop(&mut self) {
     824          104 :         if let Some(inner) = self.inner.take() {
     825            0 :             inner.blob_writer.into_inner().remove();
     826          104 :         }
     827          104 :     }
     828              : }
        

Generated by: LCOV version 2.1-beta