LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - image_layer.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 77.4 % 962 745
Test Date: 2024-11-13 18:23:39 Functions: 54.3 % 92 50

            Line data    Source code
       1              : //! An ImageLayer represents an image or a snapshot of a key-range at
       2              : //! one particular LSN.
       3              : //!
       4              : //! It contains an image of all key-value pairs in its key-range. Any key
       5              : //! that falls into the image layer's range but does not exist in the layer,
       6              : //! does not exist.
       7              : //!
       8              : //! An image layer is stored in a file on disk. The file is stored in
       9              : //! timelines/<timeline_id> directory.  Currently, there are no
      10              : //! subdirectories, and each image layer file is named like this:
      11              : //!
      12              : //! ```text
      13              : //!    <key start>-<key end>__<LSN>
      14              : //! ```
      15              : //!
      16              : //! For example:
      17              : //!
      18              : //! ```text
      19              : //!    000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
      20              : //! ```
      21              : //!
      22              : //! Every image layer file consists of three parts: "summary",
      23              : //! "index", and "values".  The summary is a fixed size header at the
      24              : //! beginning of the file, and it contains basic information about the
      25              : //! layer, and offsets to the other parts. The "index" is a B-tree,
      26              : //! mapping from Key to an offset in the "values" part.  The
      27              : //! actual page images are stored in the "values" part.
      28              : use crate::config::PageServerConf;
      29              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      30              : use crate::page_cache::{self, FileId, PAGE_SZ};
      31              : use crate::tenant::blob_io::BlobWriter;
      32              : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
      33              : use crate::tenant::disk_btree::{
      34              :     DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
      35              : };
      36              : use crate::tenant::timeline::GetVectoredError;
      37              : use crate::tenant::vectored_blob_io::{
      38              :     BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
      39              :     VectoredReadPlanner,
      40              : };
      41              : use crate::tenant::PageReconstructError;
      42              : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
      43              : use crate::virtual_file::IoBufferMut;
      44              : use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
      45              : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
      46              : use anyhow::{anyhow, bail, ensure, Context, Result};
      47              : use bytes::Bytes;
      48              : use camino::{Utf8Path, Utf8PathBuf};
      49              : use hex;
      50              : use itertools::Itertools;
      51              : use pageserver_api::config::MaxVectoredReadBytes;
      52              : use pageserver_api::key::DBDIR_KEY;
      53              : use pageserver_api::key::{Key, KEY_SIZE};
      54              : use pageserver_api::keyspace::KeySpace;
      55              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
      56              : use pageserver_api::value::Value;
      57              : use rand::{distributions::Alphanumeric, Rng};
      58              : use serde::{Deserialize, Serialize};
      59              : use std::collections::VecDeque;
      60              : use std::fs::File;
      61              : use std::io::SeekFrom;
      62              : use std::ops::Range;
      63              : use std::os::unix::prelude::FileExt;
      64              : use std::str::FromStr;
      65              : use tokio::sync::OnceCell;
      66              : use tokio_stream::StreamExt;
      67              : use tracing::*;
      68              : 
      69              : use utils::{
      70              :     bin_ser::BeSer,
      71              :     id::{TenantId, TimelineId},
      72              :     lsn::Lsn,
      73              : };
      74              : 
      75              : use super::layer_name::ImageLayerName;
      76              : use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
      77              : 
      78              : ///
      79              : /// Header stored in the beginning of the file
      80              : ///
      81              : /// After this comes the 'values' part, starting on block 1. After that,
      82              : /// the 'index' starts at the block indicated by 'index_start_blk'
      83              : ///
      84          122 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      85              : pub struct Summary {
      86              :     /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
      87              :     pub magic: u16,
      88              :     pub format_version: u16,
      89              : 
      90              :     pub tenant_id: TenantId,
      91              :     pub timeline_id: TimelineId,
      92              :     pub key_range: Range<Key>,
      93              :     pub lsn: Lsn,
      94              : 
      95              :     /// Block number where the 'index' part of the file begins.
      96              :     pub index_start_blk: u32,
      97              :     /// Block within the 'index', where the B-tree root page is stored
      98              :     pub index_root_blk: u32,
      99              :     // the 'values' part starts after the summary header, on block 1.
     100              : }
     101              : 
     102              : impl From<&ImageLayer> for Summary {
     103            0 :     fn from(layer: &ImageLayer) -> Self {
     104            0 :         Self::expected(
     105            0 :             layer.desc.tenant_shard_id.tenant_id,
     106            0 :             layer.desc.timeline_id,
     107            0 :             layer.desc.key_range.clone(),
     108            0 :             layer.lsn,
     109            0 :         )
     110            0 :     }
     111              : }
     112              : 
     113              : impl Summary {
     114          122 :     pub(super) fn expected(
     115          122 :         tenant_id: TenantId,
     116          122 :         timeline_id: TimelineId,
     117          122 :         key_range: Range<Key>,
     118          122 :         lsn: Lsn,
     119          122 :     ) -> Self {
     120          122 :         Self {
     121          122 :             magic: IMAGE_FILE_MAGIC,
     122          122 :             format_version: STORAGE_FORMAT_VERSION,
     123          122 :             tenant_id,
     124          122 :             timeline_id,
     125          122 :             key_range,
     126          122 :             lsn,
     127          122 : 
     128          122 :             index_start_blk: 0,
     129          122 :             index_root_blk: 0,
     130          122 :         }
     131          122 :     }
     132              : }
     133              : 
     134              : /// This is used only from `pagectl`. Within pageserver, all layers are
     135              : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
     136              : pub struct ImageLayer {
     137              :     path: Utf8PathBuf,
     138              :     pub desc: PersistentLayerDesc,
     139              :     // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
     140              :     pub lsn: Lsn,
     141              :     inner: OnceCell<ImageLayerInner>,
     142              : }
     143              : 
     144              : impl std::fmt::Debug for ImageLayer {
     145            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     146              :         use super::RangeDisplayDebug;
     147              : 
     148            0 :         f.debug_struct("ImageLayer")
     149            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     150            0 :             .field("file_size", &self.desc.file_size)
     151            0 :             .field("lsn", &self.lsn)
     152            0 :             .field("inner", &self.inner)
     153            0 :             .finish()
     154            0 :     }
     155              : }
     156              : 
     157              : /// ImageLayer is the in-memory data structure associated with an on-disk image
     158              : /// file.
     159              : pub struct ImageLayerInner {
     160              :     // values copied from summary
     161              :     index_start_blk: u32,
     162              :     index_root_blk: u32,
     163              : 
     164              :     key_range: Range<Key>,
     165              :     lsn: Lsn,
     166              : 
     167              :     file: VirtualFile,
     168              :     file_id: FileId,
     169              : 
     170              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     171              : }
     172              : 
     173              : impl ImageLayerInner {
     174            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
     175            0 :         format!(
     176            0 :             "image {}..{} {}",
     177            0 :             self.key_range().start,
     178            0 :             self.key_range().end,
     179            0 :             self.lsn()
     180            0 :         )
     181            0 :     }
     182              : }
     183              : 
     184              : impl std::fmt::Debug for ImageLayerInner {
     185            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     186            0 :         f.debug_struct("ImageLayerInner")
     187            0 :             .field("index_start_blk", &self.index_start_blk)
     188            0 :             .field("index_root_blk", &self.index_root_blk)
     189            0 :             .finish()
     190            0 :     }
     191              : }
     192              : 
     193              : impl ImageLayerInner {
     194            0 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
     195            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     196            0 :         let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
     197            0 :             self.index_start_blk,
     198            0 :             self.index_root_blk,
     199            0 :             block_reader,
     200            0 :         );
     201            0 : 
     202            0 :         tree_reader.dump().await?;
     203              : 
     204            0 :         tree_reader
     205            0 :             .visit(
     206            0 :                 &[0u8; KEY_SIZE],
     207            0 :                 VisitDirection::Forwards,
     208            0 :                 |key, value| {
     209            0 :                     println!("key: {} offset {}", hex::encode(key), value);
     210            0 :                     true
     211            0 :                 },
     212            0 :                 ctx,
     213            0 :             )
     214            0 :             .await?;
     215              : 
     216            0 :         Ok(())
     217            0 :     }
     218              : }
     219              : 
     220              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     221              : impl std::fmt::Display for ImageLayer {
     222            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     223            0 :         write!(f, "{}", self.layer_desc().short_id())
     224            0 :     }
     225              : }
     226              : 
     227              : impl AsLayerDesc for ImageLayer {
     228            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     229            0 :         &self.desc
     230            0 :     }
     231              : }
     232              : 
     233              : impl ImageLayer {
     234            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     235            0 :         self.desc.dump();
     236            0 : 
     237            0 :         if !verbose {
     238            0 :             return Ok(());
     239            0 :         }
     240              : 
     241            0 :         let inner = self.load(ctx).await?;
     242              : 
     243            0 :         inner.dump(ctx).await?;
     244              : 
     245            0 :         Ok(())
     246            0 :     }
     247              : 
     248          514 :     fn temp_path_for(
     249          514 :         conf: &PageServerConf,
     250          514 :         timeline_id: TimelineId,
     251          514 :         tenant_shard_id: TenantShardId,
     252          514 :         fname: &ImageLayerName,
     253          514 :     ) -> Utf8PathBuf {
     254          514 :         let rand_string: String = rand::thread_rng()
     255          514 :             .sample_iter(&Alphanumeric)
     256          514 :             .take(8)
     257          514 :             .map(char::from)
     258          514 :             .collect();
     259          514 : 
     260          514 :         conf.timeline_path(&tenant_shard_id, &timeline_id)
     261          514 :             .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
     262          514 :     }
     263              : 
     264              :     ///
     265              :     /// Open the underlying file and read the metadata into memory, if it's
     266              :     /// not loaded already.
     267              :     ///
     268            0 :     async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
     269            0 :         self.inner
     270            0 :             .get_or_try_init(|| self.load_inner(ctx))
     271            0 :             .await
     272            0 :             .with_context(|| format!("Failed to load image layer {}", self.path()))
     273            0 :     }
     274              : 
     275            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
     276            0 :         let path = self.path();
     277              : 
     278            0 :         let loaded =
     279            0 :             ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
     280              : 
     281              :         // not production code
     282            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     283            0 :         let expected_layer_name = self.layer_desc().layer_name();
     284            0 : 
     285            0 :         if actual_layer_name != expected_layer_name {
     286            0 :             println!("warning: filename does not match what is expected from in-file summary");
     287            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     288            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     289            0 :         }
     290              : 
     291            0 :         Ok(loaded)
     292            0 :     }
     293              : 
     294              :     /// Create an ImageLayer struct representing an existing file on disk.
     295              :     ///
     296              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     297            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
     298            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     299            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     300            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     301            0 :         let metadata = file
     302            0 :             .metadata()
     303            0 :             .context("get file metadata to determine size")?;
     304              : 
     305              :         // This function is never used for constructing layers in a running pageserver,
     306              :         // so it does not need an accurate TenantShardId.
     307            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     308            0 : 
     309            0 :         Ok(ImageLayer {
     310            0 :             path: path.to_path_buf(),
     311            0 :             desc: PersistentLayerDesc::new_img(
     312            0 :                 tenant_shard_id,
     313            0 :                 summary.timeline_id,
     314            0 :                 summary.key_range,
     315            0 :                 summary.lsn,
     316            0 :                 metadata.len(),
     317            0 :             ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
     318            0 :             lsn: summary.lsn,
     319            0 :             inner: OnceCell::new(),
     320            0 :         })
     321            0 :     }
     322              : 
     323            0 :     fn path(&self) -> Utf8PathBuf {
     324            0 :         self.path.clone()
     325            0 :     }
     326              : }
     327              : 
     328            0 : #[derive(thiserror::Error, Debug)]
     329              : pub enum RewriteSummaryError {
     330              :     #[error("magic mismatch")]
     331              :     MagicMismatch,
     332              :     #[error(transparent)]
     333              :     Other(#[from] anyhow::Error),
     334              : }
     335              : 
     336              : impl From<std::io::Error> for RewriteSummaryError {
     337            0 :     fn from(e: std::io::Error) -> Self {
     338            0 :         Self::Other(anyhow::anyhow!(e))
     339            0 :     }
     340              : }
     341              : 
     342              : impl ImageLayer {
     343            0 :     pub async fn rewrite_summary<F>(
     344            0 :         path: &Utf8Path,
     345            0 :         rewrite: F,
     346            0 :         ctx: &RequestContext,
     347            0 :     ) -> Result<(), RewriteSummaryError>
     348            0 :     where
     349            0 :         F: Fn(Summary) -> Summary,
     350            0 :     {
     351            0 :         let mut file = VirtualFile::open_with_options(
     352            0 :             path,
     353            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     354            0 :             ctx,
     355            0 :         )
     356            0 :         .await
     357            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     358            0 :         let file_id = page_cache::next_file_id();
     359            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     360            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     361            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     362            0 :         if actual_summary.magic != IMAGE_FILE_MAGIC {
     363            0 :             return Err(RewriteSummaryError::MagicMismatch);
     364            0 :         }
     365            0 : 
     366            0 :         let new_summary = rewrite(actual_summary);
     367            0 : 
     368            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     369            0 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     370            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     371            0 :         file.seek(SeekFrom::Start(0)).await?;
     372            0 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     373            0 :         res?;
     374            0 :         Ok(())
     375            0 :     }
     376              : }
     377              : 
     378              : impl ImageLayerInner {
     379          116 :     pub(crate) fn key_range(&self) -> &Range<Key> {
     380          116 :         &self.key_range
     381          116 :     }
     382              : 
     383          116 :     pub(crate) fn lsn(&self) -> Lsn {
     384          116 :         self.lsn
     385          116 :     }
     386              : 
     387          122 :     pub(super) async fn load(
     388          122 :         path: &Utf8Path,
     389          122 :         lsn: Lsn,
     390          122 :         summary: Option<Summary>,
     391          122 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     392          122 :         ctx: &RequestContext,
     393          122 :     ) -> anyhow::Result<Self> {
     394          122 :         let file = VirtualFile::open_v2(path, ctx)
     395           61 :             .await
     396          122 :             .context("open layer file")?;
     397          122 :         let file_id = page_cache::next_file_id();
     398          122 :         let block_reader = FileBlockReader::new(&file, file_id);
     399          122 :         let summary_blk = block_reader
     400          122 :             .read_blk(0, ctx)
     401           62 :             .await
     402          122 :             .context("read first block")?;
     403              : 
     404              :         // length is the only way how this could fail, so it's not actually likely at all unless
     405              :         // read_blk returns wrong sized block.
     406              :         //
     407              :         // TODO: confirm and make this into assertion
     408          122 :         let actual_summary =
     409          122 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     410              : 
     411          122 :         if let Some(mut expected_summary) = summary {
     412              :             // production code path
     413          122 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     414          122 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     415          122 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     416          122 :             expected_summary.timeline_id = actual_summary.timeline_id;
     417          122 : 
     418          122 :             if actual_summary != expected_summary {
     419            0 :                 bail!(
     420            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     421            0 :                     actual_summary,
     422            0 :                     expected_summary
     423            0 :                 );
     424          122 :             }
     425            0 :         }
     426              : 
     427          122 :         Ok(ImageLayerInner {
     428          122 :             index_start_blk: actual_summary.index_start_blk,
     429          122 :             index_root_blk: actual_summary.index_root_blk,
     430          122 :             lsn,
     431          122 :             file,
     432          122 :             file_id,
     433          122 :             max_vectored_read_bytes,
     434          122 :             key_range: actual_summary.key_range,
     435          122 :         })
     436          122 :     }
     437              : 
     438              :     // Look up the keys in the provided keyspace and update
     439              :     // the reconstruct state with whatever is found.
     440        21820 :     pub(super) async fn get_values_reconstruct_data(
     441        21820 :         &self,
     442        21820 :         keyspace: KeySpace,
     443        21820 :         reconstruct_state: &mut ValuesReconstructState,
     444        21820 :         ctx: &RequestContext,
     445        21820 :     ) -> Result<(), GetVectoredError> {
     446        21820 :         let reads = self
     447        21820 :             .plan_reads(keyspace, None, ctx)
     448          487 :             .await
     449        21820 :             .map_err(GetVectoredError::Other)?;
     450              : 
     451        21820 :         self.do_reads_and_update_state(reads, reconstruct_state, ctx)
     452        11097 :             .await;
     453              : 
     454        21820 :         reconstruct_state.on_image_layer_visited(&self.key_range);
     455        21820 : 
     456        21820 :         Ok(())
     457        21820 :     }
     458              : 
     459              :     /// Traverse the layer's index to build read operations on the overlap of the input keyspace
     460              :     /// and the keys in this layer.
     461              :     ///
     462              :     /// If shard_identity is provided, it will be used to filter keys down to those stored on
     463              :     /// this shard.
     464        21828 :     async fn plan_reads(
     465        21828 :         &self,
     466        21828 :         keyspace: KeySpace,
     467        21828 :         shard_identity: Option<&ShardIdentity>,
     468        21828 :         ctx: &RequestContext,
     469        21828 :     ) -> anyhow::Result<Vec<VectoredRead>> {
     470        21828 :         let mut planner = VectoredReadPlanner::new(
     471        21828 :             self.max_vectored_read_bytes
     472        21828 :                 .expect("Layer is loaded with max vectored bytes config")
     473        21828 :                 .0
     474        21828 :                 .into(),
     475        21828 :         );
     476        21828 : 
     477        21828 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     478        21828 :         let tree_reader =
     479        21828 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     480        21828 : 
     481        21828 :         let ctx = RequestContextBuilder::extend(ctx)
     482        21828 :             .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     483        21828 :             .build();
     484              : 
     485        21836 :         for range in keyspace.ranges.iter() {
     486        21836 :             let mut range_end_handled = false;
     487        21836 :             let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     488        21836 :             range.start.write_to_byte_slice(&mut search_key);
     489        21836 : 
     490        21836 :             let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
     491        21836 :             let mut index_stream = std::pin::pin!(index_stream);
     492              : 
     493      1124260 :             while let Some(index_entry) = index_stream.next().await {
     494      1124032 :                 let (raw_key, offset) = index_entry?;
     495              : 
     496      1124032 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     497      1124032 :                 assert!(key >= range.start);
     498              : 
     499      1124032 :                 let flag = if let Some(shard_identity) = shard_identity {
     500      1048576 :                     if shard_identity.is_key_disposable(&key) {
     501       786432 :                         BlobFlag::Ignore
     502              :                     } else {
     503       262144 :                         BlobFlag::None
     504              :                     }
     505              :                 } else {
     506        75456 :                     BlobFlag::None
     507              :                 };
     508              : 
     509      1124032 :                 if key >= range.end {
     510        21608 :                     planner.handle_range_end(offset);
     511        21608 :                     range_end_handled = true;
     512        21608 :                     break;
     513      1102424 :                 } else {
     514      1102424 :                     planner.handle(key, self.lsn, offset, flag);
     515      1102424 :                 }
     516              :             }
     517              : 
     518        21836 :             if !range_end_handled {
     519          228 :                 let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
     520          228 :                 planner.handle_range_end(payload_end);
     521        21608 :             }
     522              :         }
     523              : 
     524        21828 :         Ok(planner.finish())
     525        21828 :     }
     526              : 
     527              :     /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
     528              :     /// then execute vectored GET operations, passing the results of all read keys into the writer.
     529            8 :     pub(super) async fn filter(
     530            8 :         &self,
     531            8 :         shard_identity: &ShardIdentity,
     532            8 :         writer: &mut ImageLayerWriter,
     533            8 :         ctx: &RequestContext,
     534            8 :     ) -> anyhow::Result<usize> {
     535              :         // Fragment the range into the regions owned by this ShardIdentity
     536            8 :         let plan = self
     537            8 :             .plan_reads(
     538            8 :                 KeySpace {
     539            8 :                     // If asked for the total key space, plan_reads will give us all the keys in the layer
     540            8 :                     ranges: vec![Key::MIN..Key::MAX],
     541            8 :                 },
     542            8 :                 Some(shard_identity),
     543            8 :                 ctx,
     544            8 :             )
     545          469 :             .await?;
     546              : 
     547            8 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     548            8 :         let mut key_count = 0;
     549           16 :         for read in plan.into_iter() {
     550           16 :             let buf_size = read.size();
     551           16 : 
     552           16 :             let buf = IoBufferMut::with_capacity(buf_size);
     553           16 :             let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
     554              : 
     555           16 :             let view = BufView::new_slice(&blobs_buf.buf);
     556              : 
     557       262144 :             for meta in blobs_buf.blobs.iter() {
     558       262144 :                 let img_buf = meta.read(&view).await?;
     559              : 
     560       262144 :                 key_count += 1;
     561       262144 :                 writer
     562       262144 :                     .put_image(meta.meta.key, img_buf.into_bytes(), ctx)
     563       266240 :                     .await
     564       262144 :                     .context(format!("Storing key {}", meta.meta.key))?;
     565              :             }
     566              :         }
     567              : 
     568            8 :         Ok(key_count)
     569            8 :     }
     570              : 
     571        21820 :     async fn do_reads_and_update_state(
     572        21820 :         &self,
     573        21820 :         reads: Vec<VectoredRead>,
     574        21820 :         reconstruct_state: &mut ValuesReconstructState,
     575        21820 :         ctx: &RequestContext,
     576        21820 :     ) {
     577        21820 :         let max_vectored_read_bytes = self
     578        21820 :             .max_vectored_read_bytes
     579        21820 :             .expect("Layer is loaded with max vectored bytes config")
     580        21820 :             .0
     581        21820 :             .into();
     582        21820 : 
     583        21820 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     584        21820 :         for read in reads.into_iter() {
     585        21796 :             let buf_size = read.size();
     586        21796 : 
     587        21796 :             if buf_size > max_vectored_read_bytes {
     588              :                 // If the read is oversized, it should only contain one key.
     589            0 :                 let offenders = read
     590            0 :                     .blobs_at
     591            0 :                     .as_slice()
     592            0 :                     .iter()
     593            0 :                     .filter_map(|(_, blob_meta)| {
     594            0 :                         if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
     595              :                             // The size of values for these keys is unbounded and can
     596              :                             // grow very large in pathological cases.
     597            0 :                             None
     598              :                         } else {
     599            0 :                             Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
     600              :                         }
     601            0 :                     })
     602            0 :                     .join(", ");
     603            0 : 
     604            0 :                 if !offenders.is_empty() {
     605            0 :                     tracing::warn!(
     606            0 :                         "Oversized vectored read ({} > {}) for keys {}",
     607              :                         buf_size,
     608              :                         max_vectored_read_bytes,
     609              :                         offenders
     610              :                     );
     611            0 :                 }
     612        21796 :             }
     613              : 
     614        21796 :             let buf = IoBufferMut::with_capacity(buf_size);
     615        21796 :             let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
     616              : 
     617        21796 :             match res {
     618        21796 :                 Ok(blobs_buf) => {
     619        21796 :                     let view = BufView::new_slice(&blobs_buf.buf);
     620        53848 :                     for meta in blobs_buf.blobs.iter() {
     621        53848 :                         let img_buf = meta.read(&view).await;
     622              : 
     623        53848 :                         let img_buf = match img_buf {
     624        53848 :                             Ok(img_buf) => img_buf,
     625            0 :                             Err(e) => {
     626            0 :                                 reconstruct_state.on_key_error(
     627            0 :                                     meta.meta.key,
     628            0 :                                     PageReconstructError::Other(anyhow!(e).context(format!(
     629            0 :                                         "Failed to decompress blob from virtual file {}",
     630            0 :                                         self.file.path(),
     631            0 :                                     ))),
     632            0 :                                 );
     633            0 : 
     634            0 :                                 continue;
     635              :                             }
     636              :                         };
     637        53848 :                         reconstruct_state.update_key(
     638        53848 :                             &meta.meta.key,
     639        53848 :                             self.lsn,
     640        53848 :                             Value::Image(img_buf.into_bytes()),
     641        53848 :                         );
     642              :                     }
     643              :                 }
     644            0 :                 Err(err) => {
     645            0 :                     let kind = err.kind();
     646            0 :                     for (_, blob_meta) in read.blobs_at.as_slice() {
     647            0 :                         reconstruct_state.on_key_error(
     648            0 :                             blob_meta.key,
     649            0 :                             PageReconstructError::from(anyhow!(
     650            0 :                                 "Failed to read blobs from virtual file {}: {}",
     651            0 :                                 self.file.path(),
     652            0 :                                 kind
     653            0 :                             )),
     654            0 :                         );
     655            0 :                     }
     656              :                 }
     657              :             };
     658              :         }
     659        21820 :     }
     660              : 
     661          114 :     pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
     662          114 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     663          114 :         let tree_reader =
     664          114 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     665          114 :         ImageLayerIterator {
     666          114 :             image_layer: self,
     667          114 :             ctx,
     668          114 :             index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
     669          114 :             key_values_batch: VecDeque::new(),
     670          114 :             is_end: false,
     671          114 :             planner: StreamingVectoredReadPlanner::new(
     672          114 :                 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
     673          114 :                 1024,        // The default value. Unit tests might use a different value
     674          114 :             ),
     675          114 :         }
     676          114 :     }
     677              : 
     678              :     /// NB: not super efficient, but not terrible either. Should prob be an iterator.
     679              :     //
     680              :     // We're reusing the index traversal logical in plan_reads; would be nice to
     681              :     // factor that out.
     682            0 :     pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
     683            0 :         let plan = self
     684            0 :             .plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
     685            0 :             .await?;
     686            0 :         Ok(plan
     687            0 :             .into_iter()
     688            0 :             .flat_map(|read| read.blobs_at)
     689            0 :             .map(|(_, blob_meta)| blob_meta.key)
     690            0 :             .collect())
     691            0 :     }
     692              : }
     693              : 
     694              : /// A builder object for constructing a new image layer.
     695              : ///
     696              : /// Usage:
     697              : ///
     698              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     699              : ///
     700              : /// 2. Write the contents by calling `put_page_image` for every key-value
     701              : ///    pair in the key range.
     702              : ///
     703              : /// 3. Call `finish`.
     704              : ///
     705              : struct ImageLayerWriterInner {
     706              :     conf: &'static PageServerConf,
     707              :     path: Utf8PathBuf,
     708              :     timeline_id: TimelineId,
     709              :     tenant_shard_id: TenantShardId,
     710              :     key_range: Range<Key>,
     711              :     lsn: Lsn,
     712              : 
     713              :     // Total uncompressed bytes passed into put_image
     714              :     uncompressed_bytes: u64,
     715              : 
     716              :     // Like `uncompressed_bytes`,
     717              :     // but only of images we might consider for compression
     718              :     uncompressed_bytes_eligible: u64,
     719              : 
     720              :     // Like `uncompressed_bytes`, but only of images
     721              :     // where we have chosen their compressed form
     722              :     uncompressed_bytes_chosen: u64,
     723              : 
     724              :     // Number of keys in the layer.
     725              :     num_keys: usize,
     726              : 
     727              :     blob_writer: BlobWriter<false>,
     728              :     tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
     729              : 
     730              :     #[cfg(feature = "testing")]
     731              :     last_written_key: Key,
     732              : }
     733              : 
     734              : impl ImageLayerWriterInner {
     735              :     ///
     736              :     /// Start building a new image layer.
     737              :     ///
     738          514 :     async fn new(
     739          514 :         conf: &'static PageServerConf,
     740          514 :         timeline_id: TimelineId,
     741          514 :         tenant_shard_id: TenantShardId,
     742          514 :         key_range: &Range<Key>,
     743          514 :         lsn: Lsn,
     744          514 :         ctx: &RequestContext,
     745          514 :     ) -> anyhow::Result<Self> {
     746          514 :         // Create the file initially with a temporary filename.
     747          514 :         // We'll atomically rename it to the final name when we're done.
     748          514 :         let path = ImageLayer::temp_path_for(
     749          514 :             conf,
     750          514 :             timeline_id,
     751          514 :             tenant_shard_id,
     752          514 :             &ImageLayerName {
     753          514 :                 key_range: key_range.clone(),
     754          514 :                 lsn,
     755          514 :             },
     756          514 :         );
     757          514 :         trace!("creating image layer {}", path);
     758          514 :         let mut file = {
     759          514 :             VirtualFile::open_with_options(
     760          514 :                 &path,
     761          514 :                 virtual_file::OpenOptions::new()
     762          514 :                     .write(true)
     763          514 :                     .create_new(true),
     764          514 :                 ctx,
     765          514 :             )
     766          340 :             .await?
     767              :         };
     768              :         // make room for the header block
     769          514 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     770          514 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     771          514 : 
     772          514 :         // Initialize the b-tree index builder
     773          514 :         let block_buf = BlockBuf::new();
     774          514 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     775          514 : 
     776          514 :         let writer = Self {
     777          514 :             conf,
     778          514 :             path,
     779          514 :             timeline_id,
     780          514 :             tenant_shard_id,
     781          514 :             key_range: key_range.clone(),
     782          514 :             lsn,
     783          514 :             tree: tree_builder,
     784          514 :             blob_writer,
     785          514 :             uncompressed_bytes: 0,
     786          514 :             uncompressed_bytes_eligible: 0,
     787          514 :             uncompressed_bytes_chosen: 0,
     788          514 :             num_keys: 0,
     789          514 :             #[cfg(feature = "testing")]
     790          514 :             last_written_key: Key::MIN,
     791          514 :         };
     792          514 : 
     793          514 :         Ok(writer)
     794          514 :     }
     795              : 
     796              :     ///
     797              :     /// Write next value to the file.
     798              :     ///
     799              :     /// The page versions must be appended in blknum order.
     800              :     ///
     801       546308 :     async fn put_image(
     802       546308 :         &mut self,
     803       546308 :         key: Key,
     804       546308 :         img: Bytes,
     805       546308 :         ctx: &RequestContext,
     806       546308 :     ) -> anyhow::Result<()> {
     807       546308 :         ensure!(self.key_range.contains(&key));
     808       546308 :         let compression = self.conf.image_compression;
     809       546308 :         let uncompressed_len = img.len() as u64;
     810       546308 :         self.uncompressed_bytes += uncompressed_len;
     811       546308 :         self.num_keys += 1;
     812       546308 :         let (_img, res) = self
     813       546308 :             .blob_writer
     814       546308 :             .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
     815       554961 :             .await;
     816              :         // TODO: re-use the buffer for `img` further upstack
     817       546308 :         let (off, compression_info) = res?;
     818       546308 :         if compression_info.compressed_size.is_some() {
     819         8002 :             // The image has been considered for compression at least
     820         8002 :             self.uncompressed_bytes_eligible += uncompressed_len;
     821       538306 :         }
     822       546308 :         if compression_info.written_compressed {
     823            0 :             // The image has been compressed
     824            0 :             self.uncompressed_bytes_chosen += uncompressed_len;
     825       546308 :         }
     826              : 
     827       546308 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     828       546308 :         key.write_to_byte_slice(&mut keybuf);
     829       546308 :         self.tree.append(&keybuf, off)?;
     830              : 
     831              :         #[cfg(feature = "testing")]
     832       546308 :         {
     833       546308 :             self.last_written_key = key;
     834       546308 :         }
     835       546308 : 
     836       546308 :         Ok(())
     837       546308 :     }
     838              : 
     839              :     ///
     840              :     /// Finish writing the image layer.
     841              :     ///
     842          314 :     async fn finish(
     843          314 :         self,
     844          314 :         ctx: &RequestContext,
     845          314 :         end_key: Option<Key>,
     846          314 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     847          314 :         let temp_path = self.path.clone();
     848          866 :         let result = self.finish0(ctx, end_key).await;
     849          314 :         if let Err(ref e) = result {
     850            0 :             tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
     851            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     852            0 :                 tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
     853            0 :             }
     854          314 :         }
     855          314 :         result
     856          314 :     }
     857              : 
     858              :     ///
     859              :     /// Finish writing the image layer.
     860              :     ///
     861          314 :     async fn finish0(
     862          314 :         self,
     863          314 :         ctx: &RequestContext,
     864          314 :         end_key: Option<Key>,
     865          314 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     866          314 :         let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
     867          314 : 
     868          314 :         // Calculate compression ratio
     869          314 :         let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
     870          314 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
     871          314 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
     872          314 :             .inc_by(self.uncompressed_bytes_eligible);
     873          314 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
     874          314 :         crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
     875          314 : 
     876          314 :         let mut file = self.blob_writer.into_inner();
     877          314 : 
     878          314 :         // Write out the index
     879          314 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     880            0 :             .await?;
     881          314 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     882         1084 :         for buf in block_buf.blocks {
     883          770 :             let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     884          770 :             res?;
     885              :         }
     886              : 
     887          314 :         let final_key_range = if let Some(end_key) = end_key {
     888           40 :             self.key_range.start..end_key
     889              :         } else {
     890          274 :             self.key_range.clone()
     891              :         };
     892              : 
     893              :         // Fill in the summary on blk 0
     894          314 :         let summary = Summary {
     895          314 :             magic: IMAGE_FILE_MAGIC,
     896          314 :             format_version: STORAGE_FORMAT_VERSION,
     897          314 :             tenant_id: self.tenant_shard_id.tenant_id,
     898          314 :             timeline_id: self.timeline_id,
     899          314 :             key_range: final_key_range.clone(),
     900          314 :             lsn: self.lsn,
     901          314 :             index_start_blk,
     902          314 :             index_root_blk,
     903          314 :         };
     904          314 : 
     905          314 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     906          314 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     907          314 :         Summary::ser_into(&summary, &mut buf)?;
     908          314 :         file.seek(SeekFrom::Start(0)).await?;
     909          314 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     910          314 :         res?;
     911              : 
     912          314 :         let metadata = file
     913          314 :             .metadata()
     914          159 :             .await
     915          314 :             .context("get metadata to determine file size")?;
     916              : 
     917          314 :         let desc = PersistentLayerDesc::new_img(
     918          314 :             self.tenant_shard_id,
     919          314 :             self.timeline_id,
     920          314 :             final_key_range,
     921          314 :             self.lsn,
     922          314 :             metadata.len(),
     923          314 :         );
     924              : 
     925              :         #[cfg(feature = "testing")]
     926          314 :         if let Some(end_key) = end_key {
     927           40 :             assert!(
     928           40 :                 self.last_written_key < end_key,
     929            0 :                 "written key violates end_key range"
     930              :             );
     931          274 :         }
     932              : 
     933              :         // Note: Because we open the file in write-only mode, we cannot
     934              :         // reuse the same VirtualFile for reading later. That's why we don't
     935              :         // set inner.file here. The first read will have to re-open it.
     936              : 
     937              :         // fsync the file
     938          314 :         file.sync_all()
     939          158 :             .await
     940          314 :             .maybe_fatal_err("image_layer sync_all")?;
     941              : 
     942          314 :         trace!("created image layer {}", self.path);
     943              : 
     944          314 :         Ok((desc, self.path))
     945          314 :     }
     946              : }
     947              : 
     948              : /// A builder object for constructing a new image layer.
     949              : ///
     950              : /// Usage:
     951              : ///
     952              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     953              : ///
     954              : /// 2. Write the contents by calling `put_page_image` for every key-value
     955              : ///    pair in the key range.
     956              : ///
     957              : /// 3. Call `finish`.
     958              : ///
     959              : /// # Note
     960              : ///
     961              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     962              : /// possible for the writer to drop before `finish` is actually called. So this
     963              : /// could lead to odd temporary files in the directory, exhausting file system.
     964              : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
     965              : /// implementation that cleans up the temporary file in failure. It's not
     966              : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
     967              : /// out some fields, making it impossible to implement `Drop`.
     968              : ///
     969              : #[must_use]
     970              : pub struct ImageLayerWriter {
     971              :     inner: Option<ImageLayerWriterInner>,
     972              : }
     973              : 
     974              : impl ImageLayerWriter {
     975              :     ///
     976              :     /// Start building a new image layer.
     977              :     ///
     978          514 :     pub async fn new(
     979          514 :         conf: &'static PageServerConf,
     980          514 :         timeline_id: TimelineId,
     981          514 :         tenant_shard_id: TenantShardId,
     982          514 :         key_range: &Range<Key>,
     983          514 :         lsn: Lsn,
     984          514 :         ctx: &RequestContext,
     985          514 :     ) -> anyhow::Result<ImageLayerWriter> {
     986          514 :         Ok(Self {
     987          514 :             inner: Some(
     988          514 :                 ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
     989          340 :                     .await?,
     990              :             ),
     991              :         })
     992          514 :     }
     993              : 
     994              :     ///
     995              :     /// Write next value to the file.
     996              :     ///
     997              :     /// The page versions must be appended in blknum order.
     998              :     ///
     999       546308 :     pub async fn put_image(
    1000       546308 :         &mut self,
    1001       546308 :         key: Key,
    1002       546308 :         img: Bytes,
    1003       546308 :         ctx: &RequestContext,
    1004       546308 :     ) -> anyhow::Result<()> {
    1005       554961 :         self.inner.as_mut().unwrap().put_image(key, img, ctx).await
    1006       546308 :     }
    1007              : 
    1008              :     /// Estimated size of the image layer.
    1009         8476 :     pub(crate) fn estimated_size(&self) -> u64 {
    1010         8476 :         let inner = self.inner.as_ref().unwrap();
    1011         8476 :         inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
    1012         8476 :     }
    1013              : 
    1014         8564 :     pub(crate) fn num_keys(&self) -> usize {
    1015         8564 :         self.inner.as_ref().unwrap().num_keys
    1016         8564 :     }
    1017              : 
    1018              :     ///
    1019              :     /// Finish writing the image layer.
    1020              :     ///
    1021          274 :     pub(crate) async fn finish(
    1022          274 :         mut self,
    1023          274 :         ctx: &RequestContext,
    1024          274 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
    1025          786 :         self.inner.take().unwrap().finish(ctx, None).await
    1026          274 :     }
    1027              : 
    1028              :     /// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
    1029           40 :     pub(super) async fn finish_with_end_key(
    1030           40 :         mut self,
    1031           40 :         end_key: Key,
    1032           40 :         ctx: &RequestContext,
    1033           40 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
    1034           80 :         self.inner.take().unwrap().finish(ctx, Some(end_key)).await
    1035           40 :     }
    1036              : }
    1037              : 
    1038              : impl Drop for ImageLayerWriter {
    1039          514 :     fn drop(&mut self) {
    1040          514 :         if let Some(inner) = self.inner.take() {
    1041          200 :             inner.blob_writer.into_inner().remove();
    1042          314 :         }
    1043          514 :     }
    1044              : }
    1045              : 
    1046              : pub struct ImageLayerIterator<'a> {
    1047              :     image_layer: &'a ImageLayerInner,
    1048              :     ctx: &'a RequestContext,
    1049              :     planner: StreamingVectoredReadPlanner,
    1050              :     index_iter: DiskBtreeIterator<'a>,
    1051              :     key_values_batch: VecDeque<(Key, Lsn, Value)>,
    1052              :     is_end: bool,
    1053              : }
    1054              : 
    1055              : impl<'a> ImageLayerIterator<'a> {
    1056            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
    1057            0 :         self.image_layer.layer_dbg_info()
    1058            0 :     }
    1059              : 
    1060              :     /// Retrieve a batch of key-value pairs into the iterator buffer.
    1061        19118 :     async fn next_batch(&mut self) -> anyhow::Result<()> {
    1062        19118 :         assert!(self.key_values_batch.is_empty());
    1063        19118 :         assert!(!self.is_end);
    1064              : 
    1065        19118 :         let plan = loop {
    1066        28960 :             if let Some(res) = self.index_iter.next().await {
    1067        28874 :                 let (raw_key, offset) = res?;
    1068        28874 :                 if let Some(batch_plan) = self.planner.handle(
    1069        28874 :                     Key::from_slice(&raw_key[..KEY_SIZE]),
    1070        28874 :                     self.image_layer.lsn,
    1071        28874 :                     offset,
    1072        28874 :                 ) {
    1073        19032 :                     break batch_plan;
    1074         9842 :                 }
    1075              :             } else {
    1076           86 :                 self.is_end = true;
    1077           86 :                 let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
    1078           86 :                 if let Some(item) = self.planner.handle_range_end(payload_end) {
    1079           86 :                     break item;
    1080              :                 } else {
    1081            0 :                     return Ok(()); // TODO: a test case on empty iterator
    1082              :                 }
    1083              :             }
    1084              :         };
    1085        19118 :         let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
    1086        19118 :         let mut next_batch = std::collections::VecDeque::new();
    1087        19118 :         let buf_size = plan.size();
    1088        19118 :         let buf = IoBufferMut::with_capacity(buf_size);
    1089        19118 :         let blobs_buf = vectored_blob_reader
    1090        19118 :             .read_blobs(&plan, buf, self.ctx)
    1091         9707 :             .await?;
    1092        19118 :         let view = BufView::new_slice(&blobs_buf.buf);
    1093        28846 :         for meta in blobs_buf.blobs.iter() {
    1094        28846 :             let img_buf = meta.read(&view).await?;
    1095        28846 :             next_batch.push_back((
    1096        28846 :                 meta.meta.key,
    1097        28846 :                 self.image_layer.lsn,
    1098        28846 :                 Value::Image(img_buf.into_bytes()),
    1099        28846 :             ));
    1100              :         }
    1101        19118 :         self.key_values_batch = next_batch;
    1102        19118 :         Ok(())
    1103        19118 :     }
    1104              : 
    1105        28720 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
    1106        28720 :         if self.key_values_batch.is_empty() {
    1107        19178 :             if self.is_end {
    1108          144 :                 return Ok(None);
    1109        19034 :             }
    1110        19034 :             self.next_batch().await?;
    1111         9542 :         }
    1112        28576 :         Ok(Some(
    1113        28576 :             self.key_values_batch
    1114        28576 :                 .pop_front()
    1115        28576 :                 .expect("should not be empty"),
    1116        28576 :         ))
    1117        28720 :     }
    1118              : }
    1119              : 
    1120              : #[cfg(test)]
    1121              : mod test {
    1122              :     use std::{sync::Arc, time::Duration};
    1123              : 
    1124              :     use bytes::Bytes;
    1125              :     use itertools::Itertools;
    1126              :     use pageserver_api::{
    1127              :         key::Key,
    1128              :         shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
    1129              :         value::Value,
    1130              :     };
    1131              :     use utils::{
    1132              :         generation::Generation,
    1133              :         id::{TenantId, TimelineId},
    1134              :         lsn::Lsn,
    1135              :     };
    1136              : 
    1137              :     use crate::{
    1138              :         context::RequestContext,
    1139              :         tenant::{
    1140              :             config::TenantConf,
    1141              :             harness::{TenantHarness, TIMELINE_ID},
    1142              :             storage_layer::{Layer, ResidentLayer},
    1143              :             vectored_blob_io::StreamingVectoredReadPlanner,
    1144              :             Tenant, Timeline,
    1145              :         },
    1146              :         DEFAULT_PG_VERSION,
    1147              :     };
    1148              : 
    1149              :     use super::{ImageLayerIterator, ImageLayerWriter};
    1150              : 
    1151              :     #[tokio::test]
    1152            2 :     async fn image_layer_rewrite() {
    1153            2 :         let tenant_conf = TenantConf {
    1154            2 :             gc_period: Duration::ZERO,
    1155            2 :             compaction_period: Duration::ZERO,
    1156            2 :             ..TenantConf::default()
    1157            2 :         };
    1158            2 :         let tenant_id = TenantId::generate();
    1159            2 :         let mut gen = Generation::new(0xdead0001);
    1160           10 :         let mut get_next_gen = || {
    1161           10 :             let ret = gen;
    1162           10 :             gen = gen.next();
    1163           10 :             ret
    1164           10 :         };
    1165            2 :         // The LSN at which we will create an image layer to filter
    1166            2 :         let lsn = Lsn(0xdeadbeef0000);
    1167            2 :         let timeline_id = TimelineId::generate();
    1168            2 : 
    1169            2 :         //
    1170            2 :         // Create an unsharded parent with a layer.
    1171            2 :         //
    1172            2 : 
    1173            2 :         let harness = TenantHarness::create_custom(
    1174            2 :             "test_image_layer_rewrite--parent",
    1175            2 :             tenant_conf.clone(),
    1176            2 :             tenant_id,
    1177            2 :             ShardIdentity::unsharded(),
    1178            2 :             get_next_gen(),
    1179            2 :         )
    1180            2 :         .await
    1181            2 :         .unwrap();
    1182           20 :         let (tenant, ctx) = harness.load().await;
    1183            2 :         let timeline = tenant
    1184            2 :             .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1185            6 :             .await
    1186            2 :             .unwrap();
    1187            2 : 
    1188            2 :         // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
    1189            2 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1190            2 :         let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
    1191            2 :         let range = input_start..input_end;
    1192            2 : 
    1193            2 :         // Build an image layer to filter
    1194            2 :         let resident = {
    1195            2 :             let mut writer = ImageLayerWriter::new(
    1196            2 :                 harness.conf,
    1197            2 :                 timeline_id,
    1198            2 :                 harness.tenant_shard_id,
    1199            2 :                 &range,
    1200            2 :                 lsn,
    1201            2 :                 &ctx,
    1202            2 :             )
    1203            2 :             .await
    1204            2 :             .unwrap();
    1205            2 : 
    1206            2 :             let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
    1207            2 :             let mut key = range.start;
    1208       262146 :             while key < range.end {
    1209       266239 :                 writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
    1210       262144 : 
    1211       262144 :                 key = key.next();
    1212            2 :             }
    1213          119 :             let (desc, path) = writer.finish(&ctx).await.unwrap();
    1214            2 :             Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
    1215            2 :         };
    1216            2 :         let original_size = resident.metadata().file_size;
    1217            2 : 
    1218            2 :         //
    1219            2 :         // Create child shards and do the rewrite, exercising filter().
    1220            2 :         // TODO: abstraction in TenantHarness for splits.
    1221            2 :         //
    1222            2 : 
    1223            2 :         // Filter for various shards: this exercises cases like values at start of key range, end of key
    1224            2 :         // range, middle of key range.
    1225            2 :         let shard_count = ShardCount::new(4);
    1226            8 :         for shard_number in 0..shard_count.count() {
    1227            2 :             //
    1228            2 :             // mimic the shard split
    1229            2 :             //
    1230            8 :             let shard_identity = ShardIdentity::new(
    1231            8 :                 ShardNumber(shard_number),
    1232            8 :                 shard_count,
    1233            8 :                 ShardStripeSize(0x8000),
    1234            8 :             )
    1235            8 :             .unwrap();
    1236            8 :             let harness = TenantHarness::create_custom(
    1237            8 :                 Box::leak(Box::new(format!(
    1238            8 :                     "test_image_layer_rewrite--child{}",
    1239            8 :                     shard_identity.shard_slug()
    1240            8 :                 ))),
    1241            8 :                 tenant_conf.clone(),
    1242            8 :                 tenant_id,
    1243            8 :                 shard_identity,
    1244            8 :                 // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
    1245            8 :                 // But here, all we care about is that the gen number is unique.
    1246            8 :                 get_next_gen(),
    1247            8 :             )
    1248            2 :             .await
    1249            8 :             .unwrap();
    1250           80 :             let (tenant, ctx) = harness.load().await;
    1251            8 :             let timeline = tenant
    1252            8 :                 .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1253           20 :                 .await
    1254            8 :                 .unwrap();
    1255            2 : 
    1256            2 :             //
    1257            2 :             // use filter() and make assertions
    1258            2 :             //
    1259            2 : 
    1260            8 :             let mut filtered_writer = ImageLayerWriter::new(
    1261            8 :                 harness.conf,
    1262            8 :                 timeline_id,
    1263            8 :                 harness.tenant_shard_id,
    1264            8 :                 &range,
    1265            8 :                 lsn,
    1266            8 :                 &ctx,
    1267            8 :             )
    1268            4 :             .await
    1269            8 :             .unwrap();
    1270            2 : 
    1271            8 :             let wrote_keys = resident
    1272            8 :                 .filter(&shard_identity, &mut filtered_writer, &ctx)
    1273       266719 :                 .await
    1274            8 :                 .unwrap();
    1275            8 :             let replacement = if wrote_keys > 0 {
    1276          129 :                 let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
    1277            6 :                 let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
    1278            6 :                 Some(resident)
    1279            2 :             } else {
    1280            2 :                 None
    1281            2 :             };
    1282            2 : 
    1283            2 :             // This exact size and those below will need updating as/when the layer encoding changes, but
    1284            2 :             // should be deterministic for a given version of the format, as we used no randomness generating the input.
    1285            8 :             assert_eq!(original_size, 1597440);
    1286            2 : 
    1287            8 :             match shard_number {
    1288            2 :                 0 => {
    1289            2 :                     // We should have written out just one stripe for our shard identity
    1290            2 :                     assert_eq!(wrote_keys, 0x8000);
    1291            2 :                     let replacement = replacement.unwrap();
    1292            2 : 
    1293            2 :                     // We should have dropped some of the data
    1294            2 :                     assert!(replacement.metadata().file_size < original_size);
    1295            2 :                     assert!(replacement.metadata().file_size > 0);
    1296            2 : 
    1297            2 :                     // Assert that we dropped ~3/4 of the data.
    1298            2 :                     assert_eq!(replacement.metadata().file_size, 417792);
    1299            2 :                 }
    1300            2 :                 1 => {
    1301            2 :                     // Shard 1 has no keys in our input range
    1302            2 :                     assert_eq!(wrote_keys, 0x0);
    1303            2 :                     assert!(replacement.is_none());
    1304            2 :                 }
    1305            2 :                 2 => {
    1306            2 :                     // Shard 2 has one stripes in the input range
    1307            2 :                     assert_eq!(wrote_keys, 0x8000);
    1308            2 :                     let replacement = replacement.unwrap();
    1309            2 :                     assert!(replacement.metadata().file_size < original_size);
    1310            2 :                     assert!(replacement.metadata().file_size > 0);
    1311            2 :                     assert_eq!(replacement.metadata().file_size, 417792);
    1312            2 :                 }
    1313            2 :                 3 => {
    1314            2 :                     // Shard 3 has two stripes in the input range
    1315            2 :                     assert_eq!(wrote_keys, 0x10000);
    1316            2 :                     let replacement = replacement.unwrap();
    1317            2 :                     assert!(replacement.metadata().file_size < original_size);
    1318            2 :                     assert!(replacement.metadata().file_size > 0);
    1319            2 :                     assert_eq!(replacement.metadata().file_size, 811008);
    1320            2 :                 }
    1321            2 :                 _ => unreachable!(),
    1322            2 :             }
    1323            2 :         }
    1324            2 :     }
    1325              : 
    1326            2 :     async fn produce_image_layer(
    1327            2 :         tenant: &Tenant,
    1328            2 :         tline: &Arc<Timeline>,
    1329            2 :         mut images: Vec<(Key, Bytes)>,
    1330            2 :         lsn: Lsn,
    1331            2 :         ctx: &RequestContext,
    1332            2 :     ) -> anyhow::Result<ResidentLayer> {
    1333            2 :         images.sort();
    1334            2 :         let (key_start, _) = images.first().unwrap();
    1335            2 :         let (key_last, _) = images.last().unwrap();
    1336            2 :         let key_end = key_last.next();
    1337            2 :         let key_range = *key_start..key_end;
    1338            2 :         let mut writer = ImageLayerWriter::new(
    1339            2 :             tenant.conf,
    1340            2 :             tline.timeline_id,
    1341            2 :             tenant.tenant_shard_id,
    1342            2 :             &key_range,
    1343            2 :             lsn,
    1344            2 :             ctx,
    1345            2 :         )
    1346            1 :         .await?;
    1347              : 
    1348         2002 :         for (key, img) in images {
    1349         2031 :             writer.put_image(key, img, ctx).await?;
    1350              :         }
    1351            4 :         let (desc, path) = writer.finish(ctx).await?;
    1352            2 :         let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
    1353              : 
    1354            2 :         Ok::<_, anyhow::Error>(img_layer)
    1355            2 :     }
    1356              : 
    1357           28 :     async fn assert_img_iter_equal(
    1358           28 :         img_iter: &mut ImageLayerIterator<'_>,
    1359           28 :         expect: &[(Key, Bytes)],
    1360           28 :         expect_lsn: Lsn,
    1361           28 :     ) {
    1362           28 :         let mut expect_iter = expect.iter();
    1363              :         loop {
    1364        28028 :             let o1 = img_iter.next().await.unwrap();
    1365        28028 :             let o2 = expect_iter.next();
    1366        28028 :             match (o1, o2) {
    1367           28 :                 (None, None) => break,
    1368        28000 :                 (Some((k1, l1, v1)), Some((k2, i2))) => {
    1369        28000 :                     let Value::Image(i1) = v1 else {
    1370            0 :                         panic!("expect Value::Image")
    1371              :                     };
    1372        28000 :                     assert_eq!(&k1, k2);
    1373        28000 :                     assert_eq!(l1, expect_lsn);
    1374        28000 :                     assert_eq!(&i1, i2);
    1375              :                 }
    1376            0 :                 (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
    1377              :             }
    1378              :         }
    1379           28 :     }
    1380              : 
    1381              :     #[tokio::test]
    1382            2 :     async fn image_layer_iterator() {
    1383            2 :         let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
    1384           20 :         let (tenant, ctx) = harness.load().await;
    1385            2 : 
    1386            2 :         let tline = tenant
    1387            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    1388            6 :             .await
    1389            2 :             .unwrap();
    1390            2 : 
    1391         2000 :         fn get_key(id: u32) -> Key {
    1392         2000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
    1393         2000 :             key.field6 = id;
    1394         2000 :             key
    1395         2000 :         }
    1396            2 :         const N: usize = 1000;
    1397            2 :         let test_imgs = (0..N)
    1398         2000 :             .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
    1399            2 :             .collect_vec();
    1400            2 :         let resident_layer =
    1401            2 :             produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
    1402         2036 :                 .await
    1403            2 :                 .unwrap();
    1404            2 :         let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
    1405            6 :         for max_read_size in [1, 1024] {
    1406           32 :             for batch_size in [1, 2, 4, 8, 3, 7, 13] {
    1407           28 :                 println!("running with batch_size={batch_size} max_read_size={max_read_size}");
    1408           28 :                 // Test if the batch size is correctly determined
    1409           28 :                 let mut iter = img_layer.iter(&ctx);
    1410           28 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1411           28 :                 let mut num_items = 0;
    1412          112 :                 for _ in 0..3 {
    1413           84 :                     iter.next_batch().await.unwrap();
    1414           84 :                     num_items += iter.key_values_batch.len();
    1415           84 :                     if max_read_size == 1 {
    1416            2 :                         // every key should be a batch b/c the value is larger than max_read_size
    1417           42 :                         assert_eq!(iter.key_values_batch.len(), 1);
    1418            2 :                     } else {
    1419           42 :                         assert!(iter.key_values_batch.len() <= batch_size);
    1420            2 :                     }
    1421           84 :                     if num_items >= N {
    1422            2 :                         break;
    1423           84 :                     }
    1424           84 :                     iter.key_values_batch.clear();
    1425            2 :                 }
    1426            2 :                 // Test if the result is correct
    1427           28 :                 let mut iter = img_layer.iter(&ctx);
    1428           28 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1429         9635 :                 assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
    1430            2 :             }
    1431            2 :         }
    1432            2 :     }
    1433              : }
        

Generated by: LCOV version 2.1-beta