LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - image_layer.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 60.8 % 518 315
Test Date: 2024-05-10 13:18:37 Functions: 40.6 % 64 26

            Line data    Source code
       1              : //! An ImageLayer represents an image or a snapshot of a key-range at
       2              : //! one particular LSN. It contains an image of all key-value pairs
       3              : //! in its key-range. Any key that falls into the image layer's range
       4              : //! but does not exist in the layer, does not exist.
       5              : //!
       6              : //! An image layer is stored in a file on disk. The file is stored in
       7              : //! timelines/<timeline_id> directory.  Currently, there are no
       8              : //! subdirectories, and each image layer file is named like this:
       9              : //!
      10              : //! ```text
      11              : //!    <key start>-<key end>__<LSN>
      12              : //! ```
      13              : //!
      14              : //! For example:
      15              : //!
      16              : //! ```text
      17              : //!    000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
      18              : //! ```
      19              : //!
      20              : //! Every image layer file consists of three parts: "summary",
      21              : //! "index", and "values".  The summary is a fixed size header at the
      22              : //! beginning of the file, and it contains basic information about the
      23              : //! layer, and offsets to the other parts. The "index" is a B-tree,
      24              : //! mapping from Key to an offset in the "values" part.  The
      25              : //! actual page images are stored in the "values" part.
      26              : use crate::config::PageServerConf;
      27              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      28              : use crate::page_cache::{self, FileId, PAGE_SZ};
      29              : use crate::repository::{Key, Value, KEY_SIZE};
      30              : use crate::tenant::blob_io::BlobWriter;
      31              : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
      32              : use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
      33              : use crate::tenant::storage_layer::{
      34              :     LayerAccessStats, ValueReconstructResult, ValueReconstructState,
      35              : };
      36              : use crate::tenant::timeline::GetVectoredError;
      37              : use crate::tenant::vectored_blob_io::{
      38              :     BlobFlag, MaxVectoredReadBytes, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
      39              : };
      40              : use crate::tenant::{PageReconstructError, Timeline};
      41              : use crate::virtual_file::{self, VirtualFile};
      42              : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
      43              : use anyhow::{anyhow, bail, ensure, Context, Result};
      44              : use bytes::{Bytes, BytesMut};
      45              : use camino::{Utf8Path, Utf8PathBuf};
      46              : use hex;
      47              : use itertools::Itertools;
      48              : use pageserver_api::keyspace::KeySpace;
      49              : use pageserver_api::models::LayerAccessKind;
      50              : use pageserver_api::shard::TenantShardId;
      51              : use rand::{distributions::Alphanumeric, Rng};
      52              : use serde::{Deserialize, Serialize};
      53              : use std::fs::File;
      54              : use std::io::SeekFrom;
      55              : use std::ops::Range;
      56              : use std::os::unix::prelude::FileExt;
      57              : use std::str::FromStr;
      58              : use std::sync::Arc;
      59              : use tokio::sync::OnceCell;
      60              : use tokio_stream::StreamExt;
      61              : use tracing::*;
      62              : 
      63              : use utils::{
      64              :     bin_ser::BeSer,
      65              :     id::{TenantId, TimelineId},
      66              :     lsn::Lsn,
      67              : };
      68              : 
      69              : use super::layer_name::ImageLayerName;
      70              : use super::{
      71              :     AsLayerDesc, Layer, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
      72              : };
      73              : 
      74              : ///
      75              : /// Header stored in the beginning of the file
      76              : ///
      77              : /// After this comes the 'values' part, starting on block 1. After that,
      78              : /// the 'index' starts at the block indicated by 'index_start_blk'
      79              : ///
      80           26 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      81              : pub struct Summary {
      82              :     /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
      83              :     pub magic: u16,
      84              :     pub format_version: u16,
      85              : 
      86              :     pub tenant_id: TenantId,
      87              :     pub timeline_id: TimelineId,
      88              :     pub key_range: Range<Key>,
      89              :     pub lsn: Lsn,
      90              : 
      91              :     /// Block number where the 'index' part of the file begins.
      92              :     pub index_start_blk: u32,
      93              :     /// Block within the 'index', where the B-tree root page is stored
      94              :     pub index_root_blk: u32,
      95              :     // the 'values' part starts after the summary header, on block 1.
      96              : }
      97              : 
      98              : impl From<&ImageLayer> for Summary {
      99            0 :     fn from(layer: &ImageLayer) -> Self {
     100            0 :         Self::expected(
     101            0 :             layer.desc.tenant_shard_id.tenant_id,
     102            0 :             layer.desc.timeline_id,
     103            0 :             layer.desc.key_range.clone(),
     104            0 :             layer.lsn,
     105            0 :         )
     106            0 :     }
     107              : }
     108              : 
     109              : impl Summary {
     110           26 :     pub(super) fn expected(
     111           26 :         tenant_id: TenantId,
     112           26 :         timeline_id: TimelineId,
     113           26 :         key_range: Range<Key>,
     114           26 :         lsn: Lsn,
     115           26 :     ) -> Self {
     116           26 :         Self {
     117           26 :             magic: IMAGE_FILE_MAGIC,
     118           26 :             format_version: STORAGE_FORMAT_VERSION,
     119           26 :             tenant_id,
     120           26 :             timeline_id,
     121           26 :             key_range,
     122           26 :             lsn,
     123           26 : 
     124           26 :             index_start_blk: 0,
     125           26 :             index_root_blk: 0,
     126           26 :         }
     127           26 :     }
     128              : }
     129              : 
     130              : /// This is used only from `pagectl`. Within pageserver, all layers are
     131              : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
     132              : pub struct ImageLayer {
     133              :     path: Utf8PathBuf,
     134              :     pub desc: PersistentLayerDesc,
     135              :     // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
     136              :     pub lsn: Lsn,
     137              :     access_stats: LayerAccessStats,
     138              :     inner: OnceCell<ImageLayerInner>,
     139              : }
     140              : 
     141              : impl std::fmt::Debug for ImageLayer {
     142            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     143            0 :         use super::RangeDisplayDebug;
     144            0 : 
     145            0 :         f.debug_struct("ImageLayer")
     146            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     147            0 :             .field("file_size", &self.desc.file_size)
     148            0 :             .field("lsn", &self.lsn)
     149            0 :             .field("inner", &self.inner)
     150            0 :             .finish()
     151            0 :     }
     152              : }
     153              : 
     154              : /// ImageLayer is the in-memory data structure associated with an on-disk image
     155              : /// file.
     156              : pub struct ImageLayerInner {
     157              :     // values copied from summary
     158              :     index_start_blk: u32,
     159              :     index_root_blk: u32,
     160              : 
     161              :     lsn: Lsn,
     162              : 
     163              :     file: VirtualFile,
     164              :     file_id: FileId,
     165              : 
     166              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     167              : }
     168              : 
     169              : impl std::fmt::Debug for ImageLayerInner {
     170            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     171            0 :         f.debug_struct("ImageLayerInner")
     172            0 :             .field("index_start_blk", &self.index_start_blk)
     173            0 :             .field("index_root_blk", &self.index_root_blk)
     174            0 :             .finish()
     175            0 :     }
     176              : }
     177              : 
     178              : impl ImageLayerInner {
     179            0 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
     180            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     181            0 :         let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
     182            0 :             self.index_start_blk,
     183            0 :             self.index_root_blk,
     184            0 :             block_reader,
     185            0 :         );
     186            0 : 
     187            0 :         tree_reader.dump().await?;
     188              : 
     189            0 :         tree_reader
     190            0 :             .visit(
     191            0 :                 &[0u8; KEY_SIZE],
     192            0 :                 VisitDirection::Forwards,
     193            0 :                 |key, value| {
     194            0 :                     println!("key: {} offset {}", hex::encode(key), value);
     195            0 :                     true
     196            0 :                 },
     197            0 :                 ctx,
     198            0 :             )
     199            0 :             .await?;
     200              : 
     201            0 :         Ok(())
     202            0 :     }
     203              : }
     204              : 
     205              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     206              : impl std::fmt::Display for ImageLayer {
     207            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     208            0 :         write!(f, "{}", self.layer_desc().short_id())
     209            0 :     }
     210              : }
     211              : 
     212              : impl AsLayerDesc for ImageLayer {
     213            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     214            0 :         &self.desc
     215            0 :     }
     216              : }
     217              : 
     218              : impl ImageLayer {
     219            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     220            0 :         self.desc.dump();
     221            0 : 
     222            0 :         if !verbose {
     223            0 :             return Ok(());
     224            0 :         }
     225              : 
     226            0 :         let inner = self.load(LayerAccessKind::Dump, ctx).await?;
     227              : 
     228            0 :         inner.dump(ctx).await?;
     229              : 
     230            0 :         Ok(())
     231            0 :     }
     232              : 
     233          116 :     fn temp_path_for(
     234          116 :         conf: &PageServerConf,
     235          116 :         timeline_id: TimelineId,
     236          116 :         tenant_shard_id: TenantShardId,
     237          116 :         fname: &ImageLayerName,
     238          116 :     ) -> Utf8PathBuf {
     239          116 :         let rand_string: String = rand::thread_rng()
     240          116 :             .sample_iter(&Alphanumeric)
     241          116 :             .take(8)
     242          116 :             .map(char::from)
     243          116 :             .collect();
     244          116 : 
     245          116 :         conf.timeline_path(&tenant_shard_id, &timeline_id)
     246          116 :             .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
     247          116 :     }
     248              : 
     249              :     ///
     250              :     /// Open the underlying file and read the metadata into memory, if it's
     251              :     /// not loaded already.
     252              :     ///
     253            0 :     async fn load(
     254            0 :         &self,
     255            0 :         access_kind: LayerAccessKind,
     256            0 :         ctx: &RequestContext,
     257            0 :     ) -> Result<&ImageLayerInner> {
     258            0 :         self.access_stats.record_access(access_kind, ctx);
     259            0 :         self.inner
     260            0 :             .get_or_try_init(|| self.load_inner(ctx))
     261            0 :             .await
     262            0 :             .with_context(|| format!("Failed to load image layer {}", self.path()))
     263            0 :     }
     264              : 
     265            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
     266            0 :         let path = self.path();
     267              : 
     268            0 :         let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
     269            0 :             .await
     270            0 :             .and_then(|res| res)?;
     271              : 
     272              :         // not production code
     273            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     274            0 :         let expected_layer_name = self.layer_desc().layer_name();
     275            0 : 
     276            0 :         if actual_layer_name != expected_layer_name {
     277            0 :             println!("warning: filename does not match what is expected from in-file summary");
     278            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     279            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     280            0 :         }
     281              : 
     282            0 :         Ok(loaded)
     283            0 :     }
     284              : 
     285              :     /// Create an ImageLayer struct representing an existing file on disk.
     286              :     ///
     287              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     288            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
     289            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     290            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     291            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     292            0 :         let metadata = file
     293            0 :             .metadata()
     294            0 :             .context("get file metadata to determine size")?;
     295              : 
     296              :         // This function is never used for constructing layers in a running pageserver,
     297              :         // so it does not need an accurate TenantShardId.
     298            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     299            0 : 
     300            0 :         Ok(ImageLayer {
     301            0 :             path: path.to_path_buf(),
     302            0 :             desc: PersistentLayerDesc::new_img(
     303            0 :                 tenant_shard_id,
     304            0 :                 summary.timeline_id,
     305            0 :                 summary.key_range,
     306            0 :                 summary.lsn,
     307            0 :                 metadata.len(),
     308            0 :             ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
     309            0 :             lsn: summary.lsn,
     310            0 :             access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
     311            0 :             inner: OnceCell::new(),
     312            0 :         })
     313            0 :     }
     314              : 
     315            0 :     fn path(&self) -> Utf8PathBuf {
     316            0 :         self.path.clone()
     317            0 :     }
     318              : }
     319              : 
     320            0 : #[derive(thiserror::Error, Debug)]
     321              : pub enum RewriteSummaryError {
     322              :     #[error("magic mismatch")]
     323              :     MagicMismatch,
     324              :     #[error(transparent)]
     325              :     Other(#[from] anyhow::Error),
     326              : }
     327              : 
     328              : impl From<std::io::Error> for RewriteSummaryError {
     329            0 :     fn from(e: std::io::Error) -> Self {
     330            0 :         Self::Other(anyhow::anyhow!(e))
     331            0 :     }
     332              : }
     333              : 
     334              : impl ImageLayer {
     335            0 :     pub async fn rewrite_summary<F>(
     336            0 :         path: &Utf8Path,
     337            0 :         rewrite: F,
     338            0 :         ctx: &RequestContext,
     339            0 :     ) -> Result<(), RewriteSummaryError>
     340            0 :     where
     341            0 :         F: Fn(Summary) -> Summary,
     342            0 :     {
     343            0 :         let mut file = VirtualFile::open_with_options(
     344            0 :             path,
     345            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     346            0 :         )
     347            0 :         .await
     348            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     349            0 :         let file_id = page_cache::next_file_id();
     350            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     351            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     352            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     353            0 :         if actual_summary.magic != IMAGE_FILE_MAGIC {
     354            0 :             return Err(RewriteSummaryError::MagicMismatch);
     355            0 :         }
     356            0 : 
     357            0 :         let new_summary = rewrite(actual_summary);
     358            0 : 
     359            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     360            0 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     361            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     362            0 :         file.seek(SeekFrom::Start(0)).await?;
     363            0 :         let (_buf, res) = file.write_all(buf, ctx).await;
     364            0 :         res?;
     365            0 :         Ok(())
     366            0 :     }
     367              : }
     368              : 
     369              : impl ImageLayerInner {
     370              :     /// Returns nested result following Result<Result<_, OpErr>, Critical>:
     371              :     /// - inner has the success or transient failure
     372              :     /// - outer has the permanent failure
     373           26 :     pub(super) async fn load(
     374           26 :         path: &Utf8Path,
     375           26 :         lsn: Lsn,
     376           26 :         summary: Option<Summary>,
     377           26 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     378           26 :         ctx: &RequestContext,
     379           26 :     ) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
     380           26 :         let file = match VirtualFile::open(path).await {
     381           26 :             Ok(file) => file,
     382            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
     383              :         };
     384           26 :         let file_id = page_cache::next_file_id();
     385           26 :         let block_reader = FileBlockReader::new(&file, file_id);
     386           26 :         let summary_blk = match block_reader.read_blk(0, ctx).await {
     387           26 :             Ok(blk) => blk,
     388            0 :             Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
     389              :         };
     390              : 
     391              :         // length is the only way how this could fail, so it's not actually likely at all unless
     392              :         // read_blk returns wrong sized block.
     393              :         //
     394              :         // TODO: confirm and make this into assertion
     395           26 :         let actual_summary =
     396           26 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     397              : 
     398           26 :         if let Some(mut expected_summary) = summary {
     399              :             // production code path
     400           26 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     401           26 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     402           26 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     403           26 :             expected_summary.timeline_id = actual_summary.timeline_id;
     404           26 : 
     405           26 :             if actual_summary != expected_summary {
     406            0 :                 bail!(
     407            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     408            0 :                     actual_summary,
     409            0 :                     expected_summary
     410            0 :                 );
     411           26 :             }
     412            0 :         }
     413              : 
     414           26 :         Ok(Ok(ImageLayerInner {
     415           26 :             index_start_blk: actual_summary.index_start_blk,
     416           26 :             index_root_blk: actual_summary.index_root_blk,
     417           26 :             lsn,
     418           26 :             file,
     419           26 :             file_id,
     420           26 :             max_vectored_read_bytes,
     421           26 :         }))
     422           26 :     }
     423              : 
     424          526 :     pub(super) async fn get_value_reconstruct_data(
     425          526 :         &self,
     426          526 :         key: Key,
     427          526 :         reconstruct_state: &mut ValueReconstructState,
     428          526 :         ctx: &RequestContext,
     429          526 :     ) -> anyhow::Result<ValueReconstructResult> {
     430          526 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     431          526 :         let tree_reader =
     432          526 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
     433          526 : 
     434          526 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     435          526 :         key.write_to_byte_slice(&mut keybuf);
     436          526 :         if let Some(offset) = tree_reader
     437          526 :             .get(
     438          526 :                 &keybuf,
     439          526 :                 &RequestContextBuilder::extend(ctx)
     440          526 :                     .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     441          526 :                     .build(),
     442          526 :             )
     443          299 :             .await?
     444              :         {
     445          522 :             let blob = block_reader
     446          522 :                 .block_cursor()
     447          522 :                 .read_blob(
     448          522 :                     offset,
     449          522 :                     &RequestContextBuilder::extend(ctx)
     450          522 :                         .page_content_kind(PageContentKind::ImageLayerValue)
     451          522 :                         .build(),
     452          522 :                 )
     453          158 :                 .await
     454          522 :                 .with_context(|| format!("failed to read value from offset {}", offset))?;
     455          522 :             let value = Bytes::from(blob);
     456          522 : 
     457          522 :             reconstruct_state.img = Some((self.lsn, value));
     458          522 :             Ok(ValueReconstructResult::Complete)
     459              :         } else {
     460            4 :             Ok(ValueReconstructResult::Missing)
     461              :         }
     462          526 :     }
     463              : 
     464              :     // Look up the keys in the provided keyspace and update
     465              :     // the reconstruct state with whatever is found.
     466            4 :     pub(super) async fn get_values_reconstruct_data(
     467            4 :         &self,
     468            4 :         keyspace: KeySpace,
     469            4 :         reconstruct_state: &mut ValuesReconstructState,
     470            4 :         ctx: &RequestContext,
     471            4 :     ) -> Result<(), GetVectoredError> {
     472            4 :         let reads = self
     473            4 :             .plan_reads(keyspace, ctx)
     474            1 :             .await
     475            4 :             .map_err(GetVectoredError::Other)?;
     476              : 
     477            4 :         self.do_reads_and_update_state(reads, reconstruct_state)
     478            0 :             .await;
     479              : 
     480            4 :         Ok(())
     481            4 :     }
     482              : 
     483            4 :     async fn plan_reads(
     484            4 :         &self,
     485            4 :         keyspace: KeySpace,
     486            4 :         ctx: &RequestContext,
     487            4 :     ) -> anyhow::Result<Vec<VectoredRead>> {
     488            4 :         let mut planner = VectoredReadPlanner::new(
     489            4 :             self.max_vectored_read_bytes
     490            4 :                 .expect("Layer is loaded with max vectored bytes config")
     491            4 :                 .0
     492            4 :                 .into(),
     493            4 :         );
     494            4 : 
     495            4 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     496            4 :         let tree_reader =
     497            4 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     498            4 : 
     499            4 :         let ctx = RequestContextBuilder::extend(ctx)
     500            4 :             .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     501            4 :             .build();
     502              : 
     503            4 :         for range in keyspace.ranges.iter() {
     504            4 :             let mut range_end_handled = false;
     505            4 : 
     506            4 :             let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     507            4 :             range.start.write_to_byte_slice(&mut search_key);
     508            4 : 
     509            4 :             let index_stream = tree_reader.get_stream_from(&search_key, &ctx);
     510            4 :             let mut index_stream = std::pin::pin!(index_stream);
     511              : 
     512            4 :             while let Some(index_entry) = index_stream.next().await {
     513            4 :                 let (raw_key, offset) = index_entry?;
     514              : 
     515            4 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     516            4 :                 assert!(key >= range.start);
     517              : 
     518            4 :                 if key >= range.end {
     519            4 :                     planner.handle_range_end(offset);
     520            4 :                     range_end_handled = true;
     521            4 :                     break;
     522            0 :                 } else {
     523            0 :                     planner.handle(key, self.lsn, offset, BlobFlag::None);
     524            0 :                 }
     525              :             }
     526              : 
     527            4 :             if !range_end_handled {
     528            0 :                 let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
     529            0 :                 planner.handle_range_end(payload_end);
     530            4 :             }
     531              :         }
     532              : 
     533            4 :         Ok(planner.finish())
     534            4 :     }
     535              : 
     536            4 :     async fn do_reads_and_update_state(
     537            4 :         &self,
     538            4 :         reads: Vec<VectoredRead>,
     539            4 :         reconstruct_state: &mut ValuesReconstructState,
     540            4 :     ) {
     541            4 :         let max_vectored_read_bytes = self
     542            4 :             .max_vectored_read_bytes
     543            4 :             .expect("Layer is loaded with max vectored bytes config")
     544            4 :             .0
     545            4 :             .into();
     546            4 : 
     547            4 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     548            4 :         for read in reads.into_iter() {
     549            0 :             let buf_size = read.size();
     550            0 : 
     551            0 :             if buf_size > max_vectored_read_bytes {
     552              :                 // If the read is oversized, it should only contain one key.
     553            0 :                 let offenders = read
     554            0 :                     .blobs_at
     555            0 :                     .as_slice()
     556            0 :                     .iter()
     557            0 :                     .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
     558            0 :                     .join(", ");
     559            0 :                 tracing::warn!(
     560            0 :                     "Oversized vectored read ({} > {}) for keys {}",
     561              :                     buf_size,
     562              :                     max_vectored_read_bytes,
     563              :                     offenders
     564              :                 );
     565            0 :             }
     566              : 
     567            0 :             let buf = BytesMut::with_capacity(buf_size);
     568            0 :             let res = vectored_blob_reader.read_blobs(&read, buf).await;
     569              : 
     570            0 :             match res {
     571            0 :                 Ok(blobs_buf) => {
     572            0 :                     let frozen_buf = blobs_buf.buf.freeze();
     573              : 
     574            0 :                     for meta in blobs_buf.blobs.iter() {
     575            0 :                         let img_buf = frozen_buf.slice(meta.start..meta.end);
     576            0 :                         reconstruct_state.update_key(
     577            0 :                             &meta.meta.key,
     578            0 :                             self.lsn,
     579            0 :                             Value::Image(img_buf),
     580            0 :                         );
     581            0 :                     }
     582              :                 }
     583            0 :                 Err(err) => {
     584            0 :                     let kind = err.kind();
     585            0 :                     for (_, blob_meta) in read.blobs_at.as_slice() {
     586            0 :                         reconstruct_state.on_key_error(
     587            0 :                             blob_meta.key,
     588            0 :                             PageReconstructError::from(anyhow!(
     589            0 :                                 "Failed to read blobs from virtual file {}: {}",
     590            0 :                                 self.file.path,
     591            0 :                                 kind
     592            0 :                             )),
     593            0 :                         );
     594            0 :                     }
     595              :                 }
     596              :             };
     597              :         }
     598            4 :     }
     599              : }
     600              : 
     601              : /// A builder object for constructing a new image layer.
     602              : ///
     603              : /// Usage:
     604              : ///
     605              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     606              : ///
     607              : /// 2. Write the contents by calling `put_page_image` for every key-value
     608              : ///    pair in the key range.
     609              : ///
     610              : /// 3. Call `finish`.
     611              : ///
     612              : struct ImageLayerWriterInner {
     613              :     conf: &'static PageServerConf,
     614              :     path: Utf8PathBuf,
     615              :     timeline_id: TimelineId,
     616              :     tenant_shard_id: TenantShardId,
     617              :     key_range: Range<Key>,
     618              :     lsn: Lsn,
     619              : 
     620              :     blob_writer: BlobWriter<false>,
     621              :     tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
     622              : }
     623              : 
     624              : impl ImageLayerWriterInner {
     625              :     ///
     626              :     /// Start building a new image layer.
     627              :     ///
     628          116 :     async fn new(
     629          116 :         conf: &'static PageServerConf,
     630          116 :         timeline_id: TimelineId,
     631          116 :         tenant_shard_id: TenantShardId,
     632          116 :         key_range: &Range<Key>,
     633          116 :         lsn: Lsn,
     634          116 :     ) -> anyhow::Result<Self> {
     635          116 :         // Create the file initially with a temporary filename.
     636          116 :         // We'll atomically rename it to the final name when we're done.
     637          116 :         let path = ImageLayer::temp_path_for(
     638          116 :             conf,
     639          116 :             timeline_id,
     640          116 :             tenant_shard_id,
     641          116 :             &ImageLayerName {
     642          116 :                 key_range: key_range.clone(),
     643          116 :                 lsn,
     644          116 :             },
     645          116 :         );
     646          116 :         info!("new image layer {path}");
     647          116 :         let mut file = {
     648          116 :             VirtualFile::open_with_options(
     649          116 :                 &path,
     650          116 :                 virtual_file::OpenOptions::new()
     651          116 :                     .write(true)
     652          116 :                     .create_new(true),
     653          116 :             )
     654          108 :             .await?
     655              :         };
     656              :         // make room for the header block
     657          116 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     658          116 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     659          116 : 
     660          116 :         // Initialize the b-tree index builder
     661          116 :         let block_buf = BlockBuf::new();
     662          116 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     663          116 : 
     664          116 :         let writer = Self {
     665          116 :             conf,
     666          116 :             path,
     667          116 :             timeline_id,
     668          116 :             tenant_shard_id,
     669          116 :             key_range: key_range.clone(),
     670          116 :             lsn,
     671          116 :             tree: tree_builder,
     672          116 :             blob_writer,
     673          116 :         };
     674          116 : 
     675          116 :         Ok(writer)
     676          116 :     }
     677              : 
     678              :     ///
     679              :     /// Write next value to the file.
     680              :     ///
     681              :     /// The page versions must be appended in blknum order.
     682              :     ///
     683          816 :     async fn put_image(
     684          816 :         &mut self,
     685          816 :         key: Key,
     686          816 :         img: Bytes,
     687          816 :         ctx: &RequestContext,
     688          816 :     ) -> anyhow::Result<()> {
     689          816 :         ensure!(self.key_range.contains(&key));
     690          870 :         let (_img, res) = self.blob_writer.write_blob(img, ctx).await;
     691              :         // TODO: re-use the buffer for `img` further upstack
     692          816 :         let off = res?;
     693              : 
     694          816 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     695          816 :         key.write_to_byte_slice(&mut keybuf);
     696          816 :         self.tree.append(&keybuf, off)?;
     697              : 
     698          816 :         Ok(())
     699          816 :     }
     700              : 
     701              :     ///
     702              :     /// Finish writing the image layer.
     703              :     ///
     704          116 :     async fn finish(
     705          116 :         self,
     706          116 :         timeline: &Arc<Timeline>,
     707          116 :         ctx: &RequestContext,
     708          116 :     ) -> anyhow::Result<ResidentLayer> {
     709          116 :         let index_start_blk =
     710          116 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     711          116 : 
     712          116 :         let mut file = self.blob_writer.into_inner();
     713          116 : 
     714          116 :         // Write out the index
     715          116 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     716            0 :             .await?;
     717          116 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     718          232 :         for buf in block_buf.blocks {
     719          116 :             let (_buf, res) = file.write_all(buf, ctx).await;
     720          116 :             res?;
     721              :         }
     722              : 
     723              :         // Fill in the summary on blk 0
     724          116 :         let summary = Summary {
     725          116 :             magic: IMAGE_FILE_MAGIC,
     726          116 :             format_version: STORAGE_FORMAT_VERSION,
     727          116 :             tenant_id: self.tenant_shard_id.tenant_id,
     728          116 :             timeline_id: self.timeline_id,
     729          116 :             key_range: self.key_range.clone(),
     730          116 :             lsn: self.lsn,
     731          116 :             index_start_blk,
     732          116 :             index_root_blk,
     733          116 :         };
     734          116 : 
     735          116 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     736          116 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     737          116 :         Summary::ser_into(&summary, &mut buf)?;
     738          116 :         file.seek(SeekFrom::Start(0)).await?;
     739          116 :         let (_buf, res) = file.write_all(buf, ctx).await;
     740          116 :         res?;
     741              : 
     742          116 :         let metadata = file
     743          116 :             .metadata()
     744           58 :             .await
     745          116 :             .context("get metadata to determine file size")?;
     746              : 
     747          116 :         let desc = PersistentLayerDesc::new_img(
     748          116 :             self.tenant_shard_id,
     749          116 :             self.timeline_id,
     750          116 :             self.key_range.clone(),
     751          116 :             self.lsn,
     752          116 :             metadata.len(),
     753          116 :         );
     754          116 : 
     755          116 :         // Note: Because we open the file in write-only mode, we cannot
     756          116 :         // reuse the same VirtualFile for reading later. That's why we don't
     757          116 :         // set inner.file here. The first read will have to re-open it.
     758          116 : 
     759          116 :         // fsync the file
     760          116 :         file.sync_all().await?;
     761              : 
     762              :         // FIXME: why not carry the virtualfile here, it supports renaming?
     763          116 :         let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
     764              : 
     765          116 :         trace!("created image layer {}", layer.local_path());
     766              : 
     767          116 :         Ok(layer)
     768          116 :     }
     769              : }
     770              : 
     771              : /// A builder object for constructing a new image layer.
     772              : ///
     773              : /// Usage:
     774              : ///
     775              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     776              : ///
     777              : /// 2. Write the contents by calling `put_page_image` for every key-value
     778              : ///    pair in the key range.
     779              : ///
     780              : /// 3. Call `finish`.
     781              : ///
     782              : /// # Note
     783              : ///
     784              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     785              : /// possible for the writer to drop before `finish` is actually called. So this
     786              : /// could lead to odd temporary files in the directory, exhausting file system.
     787              : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
     788              : /// implementation that cleans up the temporary file in failure. It's not
     789              : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
     790              : /// out some fields, making it impossible to implement `Drop`.
     791              : ///
     792              : #[must_use]
     793              : pub struct ImageLayerWriter {
     794              :     inner: Option<ImageLayerWriterInner>,
     795              : }
     796              : 
     797              : impl ImageLayerWriter {
     798              :     ///
     799              :     /// Start building a new image layer.
     800              :     ///
     801          116 :     pub async fn new(
     802          116 :         conf: &'static PageServerConf,
     803          116 :         timeline_id: TimelineId,
     804          116 :         tenant_shard_id: TenantShardId,
     805          116 :         key_range: &Range<Key>,
     806          116 :         lsn: Lsn,
     807          116 :     ) -> anyhow::Result<ImageLayerWriter> {
     808          116 :         Ok(Self {
     809          116 :             inner: Some(
     810          116 :                 ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
     811          108 :                     .await?,
     812              :             ),
     813              :         })
     814          116 :     }
     815              : 
     816              :     ///
     817              :     /// Write next value to the file.
     818              :     ///
     819              :     /// The page versions must be appended in blknum order.
     820              :     ///
     821          816 :     pub async fn put_image(
     822          816 :         &mut self,
     823          816 :         key: Key,
     824          816 :         img: Bytes,
     825          816 :         ctx: &RequestContext,
     826          816 :     ) -> anyhow::Result<()> {
     827          870 :         self.inner.as_mut().unwrap().put_image(key, img, ctx).await
     828          816 :     }
     829              : 
     830              :     ///
     831              :     /// Finish writing the image layer.
     832              :     ///
     833          116 :     pub(crate) async fn finish(
     834          116 :         mut self,
     835          116 :         timeline: &Arc<Timeline>,
     836          116 :         ctx: &RequestContext,
     837          116 :     ) -> anyhow::Result<super::ResidentLayer> {
     838          232 :         self.inner.take().unwrap().finish(timeline, ctx).await
     839          116 :     }
     840              : }
     841              : 
     842              : impl Drop for ImageLayerWriter {
     843          116 :     fn drop(&mut self) {
     844          116 :         if let Some(inner) = self.inner.take() {
     845            0 :             inner.blob_writer.into_inner().remove();
     846          116 :         }
     847          116 :     }
     848              : }
        

Generated by: LCOV version 2.1-beta