LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - image_layer.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 78.8 % 964 760
Test Date: 2025-01-30 15:18:43 Functions: 57.5 % 87 50

            Line data    Source code
       1              : //! An ImageLayer represents an image or a snapshot of a key-range at
       2              : //! one particular LSN.
       3              : //!
       4              : //! It contains an image of all key-value pairs in its key-range. Any key
       5              : //! that falls into the image layer's range but does not exist in the layer,
       6              : //! does not exist.
       7              : //!
       8              : //! An image layer is stored in a file on disk. The file is stored in
       9              : //! timelines/<timeline_id> directory.  Currently, there are no
      10              : //! subdirectories, and each image layer file is named like this:
      11              : //!
      12              : //! ```text
      13              : //!    <key start>-<key end>__<LSN>
      14              : //! ```
      15              : //!
      16              : //! For example:
      17              : //!
      18              : //! ```text
      19              : //!    000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
      20              : //! ```
      21              : //!
      22              : //! Every image layer file consists of three parts: "summary",
      23              : //! "index", and "values".  The summary is a fixed size header at the
      24              : //! beginning of the file, and it contains basic information about the
      25              : //! layer, and offsets to the other parts. The "index" is a B-tree,
      26              : //! mapping from Key to an offset in the "values" part.  The
      27              : //! actual page images are stored in the "values" part.
      28              : use crate::config::PageServerConf;
      29              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      30              : use crate::page_cache::{self, FileId, PAGE_SZ};
      31              : use crate::tenant::blob_io::BlobWriter;
      32              : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
      33              : use crate::tenant::disk_btree::{
      34              :     DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
      35              : };
      36              : use crate::tenant::timeline::GetVectoredError;
      37              : use crate::tenant::vectored_blob_io::{
      38              :     BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
      39              :     VectoredReadPlanner,
      40              : };
      41              : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
      42              : use crate::virtual_file::IoBufferMut;
      43              : use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
      44              : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
      45              : use anyhow::{bail, ensure, Context, Result};
      46              : use bytes::Bytes;
      47              : use camino::{Utf8Path, Utf8PathBuf};
      48              : use hex;
      49              : use itertools::Itertools;
      50              : use pageserver_api::config::MaxVectoredReadBytes;
      51              : use pageserver_api::key::DBDIR_KEY;
      52              : use pageserver_api::key::{Key, KEY_SIZE};
      53              : use pageserver_api::keyspace::KeySpace;
      54              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
      55              : use pageserver_api::value::Value;
      56              : use rand::{distributions::Alphanumeric, Rng};
      57              : use serde::{Deserialize, Serialize};
      58              : use std::collections::{HashMap, VecDeque};
      59              : use std::fs::File;
      60              : use std::io::SeekFrom;
      61              : use std::ops::Range;
      62              : use std::os::unix::prelude::FileExt;
      63              : use std::str::FromStr;
      64              : use std::sync::Arc;
      65              : use tokio::sync::OnceCell;
      66              : use tokio_stream::StreamExt;
      67              : use tracing::*;
      68              : 
      69              : use utils::{
      70              :     bin_ser::BeSer,
      71              :     id::{TenantId, TimelineId},
      72              :     lsn::Lsn,
      73              : };
      74              : 
      75              : use super::layer_name::ImageLayerName;
      76              : use super::{
      77              :     AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
      78              :     ValuesReconstructState,
      79              : };
      80              : 
      81              : ///
      82              : /// Header stored in the beginning of the file
      83              : ///
      84              : /// After this comes the 'values' part, starting on block 1. After that,
      85              : /// the 'index' starts at the block indicated by 'index_start_blk'
      86              : ///
      87            0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      88              : pub struct Summary {
      89              :     /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
      90              :     pub magic: u16,
      91              :     pub format_version: u16,
      92              : 
      93              :     pub tenant_id: TenantId,
      94              :     pub timeline_id: TimelineId,
      95              :     pub key_range: Range<Key>,
      96              :     pub lsn: Lsn,
      97              : 
      98              :     /// Block number where the 'index' part of the file begins.
      99              :     pub index_start_blk: u32,
     100              :     /// Block within the 'index', where the B-tree root page is stored
     101              :     pub index_root_blk: u32,
     102              :     // the 'values' part starts after the summary header, on block 1.
     103              : }
     104              : 
     105              : impl From<&ImageLayer> for Summary {
     106            0 :     fn from(layer: &ImageLayer) -> Self {
     107            0 :         Self::expected(
     108            0 :             layer.desc.tenant_shard_id.tenant_id,
     109            0 :             layer.desc.timeline_id,
     110            0 :             layer.desc.key_range.clone(),
     111            0 :             layer.lsn,
     112            0 :         )
     113            0 :     }
     114              : }
     115              : 
     116              : impl Summary {
     117          268 :     pub(super) fn expected(
     118          268 :         tenant_id: TenantId,
     119          268 :         timeline_id: TimelineId,
     120          268 :         key_range: Range<Key>,
     121          268 :         lsn: Lsn,
     122          268 :     ) -> Self {
     123          268 :         Self {
     124          268 :             magic: IMAGE_FILE_MAGIC,
     125          268 :             format_version: STORAGE_FORMAT_VERSION,
     126          268 :             tenant_id,
     127          268 :             timeline_id,
     128          268 :             key_range,
     129          268 :             lsn,
     130          268 : 
     131          268 :             index_start_blk: 0,
     132          268 :             index_root_blk: 0,
     133          268 :         }
     134          268 :     }
     135              : }
     136              : 
     137              : /// This is used only from `pagectl`. Within pageserver, all layers are
     138              : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
     139              : pub struct ImageLayer {
     140              :     path: Utf8PathBuf,
     141              :     pub desc: PersistentLayerDesc,
     142              :     // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
     143              :     pub lsn: Lsn,
     144              :     inner: OnceCell<ImageLayerInner>,
     145              : }
     146              : 
     147              : impl std::fmt::Debug for ImageLayer {
     148            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     149              :         use super::RangeDisplayDebug;
     150              : 
     151            0 :         f.debug_struct("ImageLayer")
     152            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     153            0 :             .field("file_size", &self.desc.file_size)
     154            0 :             .field("lsn", &self.lsn)
     155            0 :             .field("inner", &self.inner)
     156            0 :             .finish()
     157            0 :     }
     158              : }
     159              : 
     160              : /// ImageLayer is the in-memory data structure associated with an on-disk image
     161              : /// file.
     162              : pub struct ImageLayerInner {
     163              :     // values copied from summary
     164              :     index_start_blk: u32,
     165              :     index_root_blk: u32,
     166              : 
     167              :     key_range: Range<Key>,
     168              :     lsn: Lsn,
     169              : 
     170              :     file: Arc<VirtualFile>,
     171              :     file_id: FileId,
     172              : 
     173              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     174              : }
     175              : 
     176              : impl ImageLayerInner {
     177            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
     178            0 :         format!(
     179            0 :             "image {}..{} {}",
     180            0 :             self.key_range().start,
     181            0 :             self.key_range().end,
     182            0 :             self.lsn()
     183            0 :         )
     184            0 :     }
     185              : }
     186              : 
     187              : impl std::fmt::Debug for ImageLayerInner {
     188            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     189            0 :         f.debug_struct("ImageLayerInner")
     190            0 :             .field("index_start_blk", &self.index_start_blk)
     191            0 :             .field("index_root_blk", &self.index_root_blk)
     192            0 :             .finish()
     193            0 :     }
     194              : }
     195              : 
     196              : impl ImageLayerInner {
     197            0 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
     198            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     199            0 :         let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
     200            0 :             self.index_start_blk,
     201            0 :             self.index_root_blk,
     202            0 :             block_reader,
     203            0 :         );
     204            0 : 
     205            0 :         tree_reader.dump().await?;
     206              : 
     207            0 :         tree_reader
     208            0 :             .visit(
     209            0 :                 &[0u8; KEY_SIZE],
     210            0 :                 VisitDirection::Forwards,
     211            0 :                 |key, value| {
     212            0 :                     println!("key: {} offset {}", hex::encode(key), value);
     213            0 :                     true
     214            0 :                 },
     215            0 :                 ctx,
     216            0 :             )
     217            0 :             .await?;
     218              : 
     219            0 :         Ok(())
     220            0 :     }
     221              : }
     222              : 
     223              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     224              : impl std::fmt::Display for ImageLayer {
     225            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     226            0 :         write!(f, "{}", self.layer_desc().short_id())
     227            0 :     }
     228              : }
     229              : 
     230              : impl AsLayerDesc for ImageLayer {
     231            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     232            0 :         &self.desc
     233            0 :     }
     234              : }
     235              : 
     236              : impl ImageLayer {
     237            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     238            0 :         self.desc.dump();
     239            0 : 
     240            0 :         if !verbose {
     241            0 :             return Ok(());
     242            0 :         }
     243              : 
     244            0 :         let inner = self.load(ctx).await?;
     245              : 
     246            0 :         inner.dump(ctx).await?;
     247              : 
     248            0 :         Ok(())
     249            0 :     }
     250              : 
     251         1168 :     fn temp_path_for(
     252         1168 :         conf: &PageServerConf,
     253         1168 :         timeline_id: TimelineId,
     254         1168 :         tenant_shard_id: TenantShardId,
     255         1168 :         fname: &ImageLayerName,
     256         1168 :     ) -> Utf8PathBuf {
     257         1168 :         let rand_string: String = rand::thread_rng()
     258         1168 :             .sample_iter(&Alphanumeric)
     259         1168 :             .take(8)
     260         1168 :             .map(char::from)
     261         1168 :             .collect();
     262         1168 : 
     263         1168 :         conf.timeline_path(&tenant_shard_id, &timeline_id)
     264         1168 :             .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
     265         1168 :     }
     266              : 
     267              :     ///
     268              :     /// Open the underlying file and read the metadata into memory, if it's
     269              :     /// not loaded already.
     270              :     ///
     271            0 :     async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
     272            0 :         self.inner
     273            0 :             .get_or_try_init(|| self.load_inner(ctx))
     274            0 :             .await
     275            0 :             .with_context(|| format!("Failed to load image layer {}", self.path()))
     276            0 :     }
     277              : 
     278            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
     279            0 :         let path = self.path();
     280              : 
     281            0 :         let loaded =
     282            0 :             ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
     283              : 
     284              :         // not production code
     285            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     286            0 :         let expected_layer_name = self.layer_desc().layer_name();
     287            0 : 
     288            0 :         if actual_layer_name != expected_layer_name {
     289            0 :             println!("warning: filename does not match what is expected from in-file summary");
     290            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     291            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     292            0 :         }
     293              : 
     294            0 :         Ok(loaded)
     295            0 :     }
     296              : 
     297              :     /// Create an ImageLayer struct representing an existing file on disk.
     298              :     ///
     299              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     300            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
     301            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     302            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     303            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     304            0 :         let metadata = file
     305            0 :             .metadata()
     306            0 :             .context("get file metadata to determine size")?;
     307              : 
     308              :         // This function is never used for constructing layers in a running pageserver,
     309              :         // so it does not need an accurate TenantShardId.
     310            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     311            0 : 
     312            0 :         Ok(ImageLayer {
     313            0 :             path: path.to_path_buf(),
     314            0 :             desc: PersistentLayerDesc::new_img(
     315            0 :                 tenant_shard_id,
     316            0 :                 summary.timeline_id,
     317            0 :                 summary.key_range,
     318            0 :                 summary.lsn,
     319            0 :                 metadata.len(),
     320            0 :             ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
     321            0 :             lsn: summary.lsn,
     322            0 :             inner: OnceCell::new(),
     323            0 :         })
     324            0 :     }
     325              : 
     326            0 :     fn path(&self) -> Utf8PathBuf {
     327            0 :         self.path.clone()
     328            0 :     }
     329              : }
     330              : 
     331              : #[derive(thiserror::Error, Debug)]
     332              : pub enum RewriteSummaryError {
     333              :     #[error("magic mismatch")]
     334              :     MagicMismatch,
     335              :     #[error(transparent)]
     336              :     Other(#[from] anyhow::Error),
     337              : }
     338              : 
     339              : impl From<std::io::Error> for RewriteSummaryError {
     340            0 :     fn from(e: std::io::Error) -> Self {
     341            0 :         Self::Other(anyhow::anyhow!(e))
     342            0 :     }
     343              : }
     344              : 
     345              : impl ImageLayer {
     346            0 :     pub async fn rewrite_summary<F>(
     347            0 :         path: &Utf8Path,
     348            0 :         rewrite: F,
     349            0 :         ctx: &RequestContext,
     350            0 :     ) -> Result<(), RewriteSummaryError>
     351            0 :     where
     352            0 :         F: Fn(Summary) -> Summary,
     353            0 :     {
     354            0 :         let mut file = VirtualFile::open_with_options(
     355            0 :             path,
     356            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     357            0 :             ctx,
     358            0 :         )
     359            0 :         .await
     360            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     361            0 :         let file_id = page_cache::next_file_id();
     362            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     363            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     364            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     365            0 :         if actual_summary.magic != IMAGE_FILE_MAGIC {
     366            0 :             return Err(RewriteSummaryError::MagicMismatch);
     367            0 :         }
     368            0 : 
     369            0 :         let new_summary = rewrite(actual_summary);
     370            0 : 
     371            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     372            0 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     373            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     374            0 :         file.seek(SeekFrom::Start(0)).await?;
     375            0 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     376            0 :         res?;
     377            0 :         Ok(())
     378            0 :     }
     379              : }
     380              : 
     381              : impl ImageLayerInner {
     382          264 :     pub(crate) fn key_range(&self) -> &Range<Key> {
     383          264 :         &self.key_range
     384          264 :     }
     385              : 
     386          264 :     pub(crate) fn lsn(&self) -> Lsn {
     387          264 :         self.lsn
     388          264 :     }
     389              : 
     390          268 :     pub(super) async fn load(
     391          268 :         path: &Utf8Path,
     392          268 :         lsn: Lsn,
     393          268 :         summary: Option<Summary>,
     394          268 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     395          268 :         ctx: &RequestContext,
     396          268 :     ) -> anyhow::Result<Self> {
     397          268 :         let file = Arc::new(
     398          268 :             VirtualFile::open_v2(path, ctx)
     399          268 :                 .await
     400          268 :                 .context("open layer file")?,
     401              :         );
     402          268 :         let file_id = page_cache::next_file_id();
     403          268 :         let block_reader = FileBlockReader::new(&file, file_id);
     404          268 :         let summary_blk = block_reader
     405          268 :             .read_blk(0, ctx)
     406          268 :             .await
     407          268 :             .context("read first block")?;
     408              : 
     409              :         // length is the only way how this could fail, so it's not actually likely at all unless
     410              :         // read_blk returns wrong sized block.
     411              :         //
     412              :         // TODO: confirm and make this into assertion
     413          268 :         let actual_summary =
     414          268 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     415              : 
     416          268 :         if let Some(mut expected_summary) = summary {
     417              :             // production code path
     418          268 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     419          268 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     420          268 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     421          268 :             expected_summary.timeline_id = actual_summary.timeline_id;
     422          268 : 
     423          268 :             if actual_summary != expected_summary {
     424            0 :                 bail!(
     425            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     426            0 :                     actual_summary,
     427            0 :                     expected_summary
     428            0 :                 );
     429          268 :             }
     430            0 :         }
     431              : 
     432          268 :         Ok(ImageLayerInner {
     433          268 :             index_start_blk: actual_summary.index_start_blk,
     434          268 :             index_root_blk: actual_summary.index_root_blk,
     435          268 :             lsn,
     436          268 :             file,
     437          268 :             file_id,
     438          268 :             max_vectored_read_bytes,
     439          268 :             key_range: actual_summary.key_range,
     440          268 :         })
     441          268 :     }
     442              : 
     443              :     // Look up the keys in the provided keyspace and update
     444              :     // the reconstruct state with whatever is found.
     445        45116 :     pub(super) async fn get_values_reconstruct_data(
     446        45116 :         &self,
     447        45116 :         this: ResidentLayer,
     448        45116 :         keyspace: KeySpace,
     449        45116 :         reconstruct_state: &mut ValuesReconstructState,
     450        45116 :         ctx: &RequestContext,
     451        45116 :     ) -> Result<(), GetVectoredError> {
     452        45116 :         let reads = self
     453        45116 :             .plan_reads(keyspace, None, ctx)
     454        45116 :             .await
     455        45116 :             .map_err(GetVectoredError::Other)?;
     456              : 
     457        45116 :         self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
     458        45116 :             .await;
     459              : 
     460        45116 :         reconstruct_state.on_image_layer_visited(&self.key_range);
     461        45116 : 
     462        45116 :         Ok(())
     463        45116 :     }
     464              : 
     465              :     /// Traverse the layer's index to build read operations on the overlap of the input keyspace
     466              :     /// and the keys in this layer.
     467              :     ///
     468              :     /// If shard_identity is provided, it will be used to filter keys down to those stored on
     469              :     /// this shard.
     470        45132 :     async fn plan_reads(
     471        45132 :         &self,
     472        45132 :         keyspace: KeySpace,
     473        45132 :         shard_identity: Option<&ShardIdentity>,
     474        45132 :         ctx: &RequestContext,
     475        45132 :     ) -> anyhow::Result<Vec<VectoredRead>> {
     476        45132 :         let mut planner = VectoredReadPlanner::new(
     477        45132 :             self.max_vectored_read_bytes
     478        45132 :                 .expect("Layer is loaded with max vectored bytes config")
     479        45132 :                 .0
     480        45132 :                 .into(),
     481        45132 :         );
     482        45132 : 
     483        45132 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     484        45132 :         let tree_reader =
     485        45132 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     486        45132 : 
     487        45132 :         let ctx = RequestContextBuilder::extend(ctx)
     488        45132 :             .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     489        45132 :             .build();
     490              : 
     491        45148 :         for range in keyspace.ranges.iter() {
     492        45148 :             let mut range_end_handled = false;
     493        45148 :             let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     494        45148 :             range.start.write_to_byte_slice(&mut search_key);
     495        45148 : 
     496        45148 :             let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
     497        45148 :             let mut index_stream = std::pin::pin!(index_stream);
     498              : 
     499      2251468 :             while let Some(index_entry) = index_stream.next().await {
     500      2250860 :                 let (raw_key, offset) = index_entry?;
     501              : 
     502      2250860 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     503      2250860 :                 assert!(key >= range.start);
     504              : 
     505      2250860 :                 let flag = if let Some(shard_identity) = shard_identity {
     506      2097152 :                     if shard_identity.is_key_disposable(&key) {
     507      1572864 :                         BlobFlag::Ignore
     508              :                     } else {
     509       524288 :                         BlobFlag::None
     510              :                     }
     511              :                 } else {
     512       153708 :                     BlobFlag::None
     513              :                 };
     514              : 
     515      2250860 :                 if key >= range.end {
     516        44540 :                     planner.handle_range_end(offset);
     517        44540 :                     range_end_handled = true;
     518        44540 :                     break;
     519      2206320 :                 } else {
     520      2206320 :                     planner.handle(key, self.lsn, offset, flag);
     521      2206320 :                 }
     522              :             }
     523              : 
     524        45148 :             if !range_end_handled {
     525          608 :                 let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
     526          608 :                 planner.handle_range_end(payload_end);
     527        44540 :             }
     528              :         }
     529              : 
     530        45132 :         Ok(planner.finish())
     531        45132 :     }
     532              : 
     533              :     /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
     534              :     /// then execute vectored GET operations, passing the results of all read keys into the writer.
     535           16 :     pub(super) async fn filter(
     536           16 :         &self,
     537           16 :         shard_identity: &ShardIdentity,
     538           16 :         writer: &mut ImageLayerWriter,
     539           16 :         ctx: &RequestContext,
     540           16 :     ) -> anyhow::Result<usize> {
     541              :         // Fragment the range into the regions owned by this ShardIdentity
     542           16 :         let plan = self
     543           16 :             .plan_reads(
     544           16 :                 KeySpace {
     545           16 :                     // If asked for the total key space, plan_reads will give us all the keys in the layer
     546           16 :                     ranges: vec![Key::MIN..Key::MAX],
     547           16 :                 },
     548           16 :                 Some(shard_identity),
     549           16 :                 ctx,
     550           16 :             )
     551           16 :             .await?;
     552              : 
     553           16 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     554           16 :         let mut key_count = 0;
     555           32 :         for read in plan.into_iter() {
     556           32 :             let buf_size = read.size();
     557           32 : 
     558           32 :             let buf = IoBufferMut::with_capacity(buf_size);
     559           32 :             let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
     560              : 
     561           32 :             let view = BufView::new_slice(&blobs_buf.buf);
     562              : 
     563       524288 :             for meta in blobs_buf.blobs.iter() {
     564       524288 :                 let img_buf = meta.read(&view).await?;
     565              : 
     566       524288 :                 key_count += 1;
     567       524288 :                 writer
     568       524288 :                     .put_image(meta.meta.key, img_buf.into_bytes(), ctx)
     569       524288 :                     .await
     570       524288 :                     .context(format!("Storing key {}", meta.meta.key))?;
     571              :             }
     572              :         }
     573              : 
     574           16 :         Ok(key_count)
     575           16 :     }
     576              : 
     577        45116 :     async fn do_reads_and_update_state(
     578        45116 :         &self,
     579        45116 :         this: ResidentLayer,
     580        45116 :         reads: Vec<VectoredRead>,
     581        45116 :         reconstruct_state: &mut ValuesReconstructState,
     582        45116 :         ctx: &RequestContext,
     583        45116 :     ) {
     584        45116 :         let max_vectored_read_bytes = self
     585        45116 :             .max_vectored_read_bytes
     586        45116 :             .expect("Layer is loaded with max vectored bytes config")
     587        45116 :             .0
     588        45116 :             .into();
     589              : 
     590        45116 :         for read in reads.into_iter() {
     591        45040 :             let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
     592       109168 :             for (_, blob_meta) in read.blobs_at.as_slice() {
     593       109168 :                 let io = reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true);
     594       109168 :                 ios.insert((blob_meta.key, blob_meta.lsn), io);
     595       109168 :             }
     596              : 
     597        45040 :             let buf_size = read.size();
     598        45040 : 
     599        45040 :             if buf_size > max_vectored_read_bytes {
     600              :                 // If the read is oversized, it should only contain one key.
     601            0 :                 let offenders = read
     602            0 :                     .blobs_at
     603            0 :                     .as_slice()
     604            0 :                     .iter()
     605            0 :                     .filter_map(|(_, blob_meta)| {
     606            0 :                         if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
     607              :                             // The size of values for these keys is unbounded and can
     608              :                             // grow very large in pathological cases.
     609            0 :                             None
     610              :                         } else {
     611            0 :                             Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
     612              :                         }
     613            0 :                     })
     614            0 :                     .join(", ");
     615            0 : 
     616            0 :                 if !offenders.is_empty() {
     617            0 :                     tracing::warn!(
     618            0 :                         "Oversized vectored read ({} > {}) for keys {}",
     619              :                         buf_size,
     620              :                         max_vectored_read_bytes,
     621              :                         offenders
     622              :                     );
     623            0 :                 }
     624        45040 :             }
     625              : 
     626        45040 :             let read_extend_residency = this.clone();
     627        45040 :             let read_from = self.file.clone();
     628        45040 :             let read_ctx = ctx.attached_child();
     629        45040 :             reconstruct_state
     630        45040 :                 .spawn_io(async move {
     631        45040 :                     let buf = IoBufferMut::with_capacity(buf_size);
     632        45040 :                     let vectored_blob_reader = VectoredBlobReader::new(&read_from);
     633        45040 :                     let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
     634              : 
     635        45040 :                     match res {
     636        45040 :                         Ok(blobs_buf) => {
     637        45040 :                             let view = BufView::new_slice(&blobs_buf.buf);
     638       109168 :                             for meta in blobs_buf.blobs.iter() {
     639       109168 :                                 let io: OnDiskValueIo =
     640       109168 :                                     ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
     641       109168 :                                 let img_buf = meta.read(&view).await;
     642              : 
     643       109168 :                                 let img_buf = match img_buf {
     644       109168 :                                     Ok(img_buf) => img_buf,
     645            0 :                                     Err(e) => {
     646            0 :                                         io.complete(Err(e));
     647            0 :                                         continue;
     648              :                                     }
     649              :                                 };
     650              : 
     651       109168 :                                 io.complete(Ok(OnDiskValue::RawImage(img_buf.into_bytes())));
     652              :                             }
     653              : 
     654        45040 :                             assert!(ios.is_empty());
     655              :                         }
     656            0 :                         Err(err) => {
     657            0 :                             for (_, io) in ios {
     658            0 :                                 io.complete(Err(std::io::Error::new(
     659            0 :                                     err.kind(),
     660            0 :                                     "vec read failed",
     661            0 :                                 )));
     662            0 :                             }
     663              :                         }
     664              :                     }
     665              : 
     666              :                     // keep layer resident until this IO is done; this spawned IO future generally outlives the
     667              :                     // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
     668        45040 :                     drop(read_extend_residency);
     669        45040 :                 })
     670        45040 :                 .await;
     671              :         }
     672        45116 :     }
     673              : 
     674          244 :     pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
     675          244 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     676          244 :         let tree_reader =
     677          244 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     678          244 :         ImageLayerIterator {
     679          244 :             image_layer: self,
     680          244 :             ctx,
     681          244 :             index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
     682          244 :             key_values_batch: VecDeque::new(),
     683          244 :             is_end: false,
     684          244 :             planner: StreamingVectoredReadPlanner::new(
     685          244 :                 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
     686          244 :                 1024,        // The default value. Unit tests might use a different value
     687          244 :             ),
     688          244 :         }
     689          244 :     }
     690              : 
     691              :     /// NB: not super efficient, but not terrible either. Should prob be an iterator.
     692              :     //
     693              :     // We're reusing the index traversal logical in plan_reads; would be nice to
     694              :     // factor that out.
     695            0 :     pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
     696            0 :         let plan = self
     697            0 :             .plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
     698            0 :             .await?;
     699            0 :         Ok(plan
     700            0 :             .into_iter()
     701            0 :             .flat_map(|read| read.blobs_at)
     702            0 :             .map(|(_, blob_meta)| blob_meta.key)
     703            0 :             .collect())
     704            0 :     }
     705              : }
     706              : 
     707              : /// A builder object for constructing a new image layer.
     708              : ///
     709              : /// Usage:
     710              : ///
     711              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     712              : ///
     713              : /// 2. Write the contents by calling `put_page_image` for every key-value
     714              : ///    pair in the key range.
     715              : ///
     716              : /// 3. Call `finish`.
     717              : ///
     718              : struct ImageLayerWriterInner {
     719              :     conf: &'static PageServerConf,
     720              :     path: Utf8PathBuf,
     721              :     timeline_id: TimelineId,
     722              :     tenant_shard_id: TenantShardId,
     723              :     key_range: Range<Key>,
     724              :     lsn: Lsn,
     725              : 
     726              :     // Total uncompressed bytes passed into put_image
     727              :     uncompressed_bytes: u64,
     728              : 
     729              :     // Like `uncompressed_bytes`,
     730              :     // but only of images we might consider for compression
     731              :     uncompressed_bytes_eligible: u64,
     732              : 
     733              :     // Like `uncompressed_bytes`, but only of images
     734              :     // where we have chosen their compressed form
     735              :     uncompressed_bytes_chosen: u64,
     736              : 
     737              :     // Number of keys in the layer.
     738              :     num_keys: usize,
     739              : 
     740              :     blob_writer: BlobWriter<false>,
     741              :     tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
     742              : 
     743              :     #[cfg(feature = "testing")]
     744              :     last_written_key: Key,
     745              : }
     746              : 
     747              : impl ImageLayerWriterInner {
     748              :     ///
     749              :     /// Start building a new image layer.
     750              :     ///
     751         1168 :     async fn new(
     752         1168 :         conf: &'static PageServerConf,
     753         1168 :         timeline_id: TimelineId,
     754         1168 :         tenant_shard_id: TenantShardId,
     755         1168 :         key_range: &Range<Key>,
     756         1168 :         lsn: Lsn,
     757         1168 :         ctx: &RequestContext,
     758         1168 :     ) -> anyhow::Result<Self> {
     759         1168 :         // Create the file initially with a temporary filename.
     760         1168 :         // We'll atomically rename it to the final name when we're done.
     761         1168 :         let path = ImageLayer::temp_path_for(
     762         1168 :             conf,
     763         1168 :             timeline_id,
     764         1168 :             tenant_shard_id,
     765         1168 :             &ImageLayerName {
     766         1168 :                 key_range: key_range.clone(),
     767         1168 :                 lsn,
     768         1168 :             },
     769         1168 :         );
     770         1168 :         trace!("creating image layer {}", path);
     771         1168 :         let mut file = {
     772         1168 :             VirtualFile::open_with_options(
     773         1168 :                 &path,
     774         1168 :                 virtual_file::OpenOptions::new()
     775         1168 :                     .write(true)
     776         1168 :                     .create_new(true),
     777         1168 :                 ctx,
     778         1168 :             )
     779         1168 :             .await?
     780              :         };
     781              :         // make room for the header block
     782         1168 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     783         1168 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     784         1168 : 
     785         1168 :         // Initialize the b-tree index builder
     786         1168 :         let block_buf = BlockBuf::new();
     787         1168 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     788         1168 : 
     789         1168 :         let writer = Self {
     790         1168 :             conf,
     791         1168 :             path,
     792         1168 :             timeline_id,
     793         1168 :             tenant_shard_id,
     794         1168 :             key_range: key_range.clone(),
     795         1168 :             lsn,
     796         1168 :             tree: tree_builder,
     797         1168 :             blob_writer,
     798         1168 :             uncompressed_bytes: 0,
     799         1168 :             uncompressed_bytes_eligible: 0,
     800         1168 :             uncompressed_bytes_chosen: 0,
     801         1168 :             num_keys: 0,
     802         1168 :             #[cfg(feature = "testing")]
     803         1168 :             last_written_key: Key::MIN,
     804         1168 :         };
     805         1168 : 
     806         1168 :         Ok(writer)
     807         1168 :     }
     808              : 
     809              :     ///
     810              :     /// Write next value to the file.
     811              :     ///
     812              :     /// The page versions must be appended in blknum order.
     813              :     ///
     814      1093244 :     async fn put_image(
     815      1093244 :         &mut self,
     816      1093244 :         key: Key,
     817      1093244 :         img: Bytes,
     818      1093244 :         ctx: &RequestContext,
     819      1093244 :     ) -> anyhow::Result<()> {
     820      1093244 :         ensure!(self.key_range.contains(&key));
     821      1093244 :         let compression = self.conf.image_compression;
     822      1093244 :         let uncompressed_len = img.len() as u64;
     823      1093244 :         self.uncompressed_bytes += uncompressed_len;
     824      1093244 :         self.num_keys += 1;
     825      1093244 :         let (_img, res) = self
     826      1093244 :             .blob_writer
     827      1093244 :             .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
     828      1093244 :             .await;
     829              :         // TODO: re-use the buffer for `img` further upstack
     830      1093244 :         let (off, compression_info) = res?;
     831      1093244 :         if compression_info.compressed_size.is_some() {
     832        16004 :             // The image has been considered for compression at least
     833        16004 :             self.uncompressed_bytes_eligible += uncompressed_len;
     834      1077240 :         }
     835      1093244 :         if compression_info.written_compressed {
     836            0 :             // The image has been compressed
     837            0 :             self.uncompressed_bytes_chosen += uncompressed_len;
     838      1093244 :         }
     839              : 
     840      1093244 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     841      1093244 :         key.write_to_byte_slice(&mut keybuf);
     842      1093244 :         self.tree.append(&keybuf, off)?;
     843              : 
     844              :         #[cfg(feature = "testing")]
     845      1093244 :         {
     846      1093244 :             self.last_written_key = key;
     847      1093244 :         }
     848      1093244 : 
     849      1093244 :         Ok(())
     850      1093244 :     }
     851              : 
     852              :     ///
     853              :     /// Finish writing the image layer.
     854              :     ///
     855          708 :     async fn finish(
     856          708 :         self,
     857          708 :         ctx: &RequestContext,
     858          708 :         end_key: Option<Key>,
     859          708 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     860          708 :         let temp_path = self.path.clone();
     861          708 :         let result = self.finish0(ctx, end_key).await;
     862          708 :         if let Err(ref e) = result {
     863            0 :             tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
     864            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     865            0 :                 tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
     866            0 :             }
     867          708 :         }
     868          708 :         result
     869          708 :     }
     870              : 
     871              :     ///
     872              :     /// Finish writing the image layer.
     873              :     ///
     874          708 :     async fn finish0(
     875          708 :         self,
     876          708 :         ctx: &RequestContext,
     877          708 :         end_key: Option<Key>,
     878          708 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     879          708 :         let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
     880          708 : 
     881          708 :         // Calculate compression ratio
     882          708 :         let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
     883          708 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
     884          708 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
     885          708 :             .inc_by(self.uncompressed_bytes_eligible);
     886          708 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
     887          708 :         crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
     888          708 : 
     889          708 :         let mut file = self.blob_writer.into_inner();
     890          708 : 
     891          708 :         // Write out the index
     892          708 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     893          708 :             .await?;
     894          708 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     895         2328 :         for buf in block_buf.blocks {
     896         1620 :             let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     897         1620 :             res?;
     898              :         }
     899              : 
     900          708 :         let final_key_range = if let Some(end_key) = end_key {
     901          572 :             self.key_range.start..end_key
     902              :         } else {
     903          136 :             self.key_range.clone()
     904              :         };
     905              : 
     906              :         // Fill in the summary on blk 0
     907          708 :         let summary = Summary {
     908          708 :             magic: IMAGE_FILE_MAGIC,
     909          708 :             format_version: STORAGE_FORMAT_VERSION,
     910          708 :             tenant_id: self.tenant_shard_id.tenant_id,
     911          708 :             timeline_id: self.timeline_id,
     912          708 :             key_range: final_key_range.clone(),
     913          708 :             lsn: self.lsn,
     914          708 :             index_start_blk,
     915          708 :             index_root_blk,
     916          708 :         };
     917          708 : 
     918          708 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     919          708 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     920          708 :         Summary::ser_into(&summary, &mut buf)?;
     921          708 :         file.seek(SeekFrom::Start(0)).await?;
     922          708 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     923          708 :         res?;
     924              : 
     925          708 :         let metadata = file
     926          708 :             .metadata()
     927          708 :             .await
     928          708 :             .context("get metadata to determine file size")?;
     929              : 
     930          708 :         let desc = PersistentLayerDesc::new_img(
     931          708 :             self.tenant_shard_id,
     932          708 :             self.timeline_id,
     933          708 :             final_key_range,
     934          708 :             self.lsn,
     935          708 :             metadata.len(),
     936          708 :         );
     937              : 
     938              :         #[cfg(feature = "testing")]
     939          708 :         if let Some(end_key) = end_key {
     940          572 :             assert!(
     941          572 :                 self.last_written_key < end_key,
     942            0 :                 "written key violates end_key range"
     943              :             );
     944          136 :         }
     945              : 
     946              :         // Note: Because we open the file in write-only mode, we cannot
     947              :         // reuse the same VirtualFile for reading later. That's why we don't
     948              :         // set inner.file here. The first read will have to re-open it.
     949              : 
     950              :         // fsync the file
     951          708 :         file.sync_all()
     952          708 :             .await
     953          708 :             .maybe_fatal_err("image_layer sync_all")?;
     954              : 
     955          708 :         trace!("created image layer {}", self.path);
     956              : 
     957          708 :         Ok((desc, self.path))
     958          708 :     }
     959              : }
     960              : 
     961              : /// A builder object for constructing a new image layer.
     962              : ///
     963              : /// Usage:
     964              : ///
     965              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     966              : ///
     967              : /// 2. Write the contents by calling `put_page_image` for every key-value
     968              : ///    pair in the key range.
     969              : ///
     970              : /// 3. Call `finish`.
     971              : ///
     972              : /// # Note
     973              : ///
     974              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     975              : /// possible for the writer to drop before `finish` is actually called. So this
     976              : /// could lead to odd temporary files in the directory, exhausting file system.
     977              : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
     978              : /// implementation that cleans up the temporary file in failure. It's not
     979              : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
     980              : /// out some fields, making it impossible to implement `Drop`.
     981              : ///
     982              : #[must_use]
     983              : pub struct ImageLayerWriter {
     984              :     inner: Option<ImageLayerWriterInner>,
     985              : }
     986              : 
     987              : impl ImageLayerWriter {
     988              :     ///
     989              :     /// Start building a new image layer.
     990              :     ///
     991         1168 :     pub async fn new(
     992         1168 :         conf: &'static PageServerConf,
     993         1168 :         timeline_id: TimelineId,
     994         1168 :         tenant_shard_id: TenantShardId,
     995         1168 :         key_range: &Range<Key>,
     996         1168 :         lsn: Lsn,
     997         1168 :         ctx: &RequestContext,
     998         1168 :     ) -> anyhow::Result<ImageLayerWriter> {
     999         1168 :         Ok(Self {
    1000         1168 :             inner: Some(
    1001         1168 :                 ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
    1002         1168 :                     .await?,
    1003              :             ),
    1004              :         })
    1005         1168 :     }
    1006              : 
    1007              :     ///
    1008              :     /// Write next value to the file.
    1009              :     ///
    1010              :     /// The page versions must be appended in blknum order.
    1011              :     ///
    1012      1093244 :     pub async fn put_image(
    1013      1093244 :         &mut self,
    1014      1093244 :         key: Key,
    1015      1093244 :         img: Bytes,
    1016      1093244 :         ctx: &RequestContext,
    1017      1093244 :     ) -> anyhow::Result<()> {
    1018      1093244 :         self.inner.as_mut().unwrap().put_image(key, img, ctx).await
    1019      1093244 :     }
    1020              : 
    1021              :     /// Estimated size of the image layer.
    1022        17080 :     pub(crate) fn estimated_size(&self) -> u64 {
    1023        17080 :         let inner = self.inner.as_ref().unwrap();
    1024        17080 :         inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
    1025        17080 :     }
    1026              : 
    1027        17272 :     pub(crate) fn num_keys(&self) -> usize {
    1028        17272 :         self.inner.as_ref().unwrap().num_keys
    1029        17272 :     }
    1030              : 
    1031              :     ///
    1032              :     /// Finish writing the image layer.
    1033              :     ///
    1034          136 :     pub(crate) async fn finish(
    1035          136 :         mut self,
    1036          136 :         ctx: &RequestContext,
    1037          136 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
    1038          136 :         self.inner.take().unwrap().finish(ctx, None).await
    1039          136 :     }
    1040              : 
    1041              :     /// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
    1042          572 :     pub(super) async fn finish_with_end_key(
    1043          572 :         mut self,
    1044          572 :         end_key: Key,
    1045          572 :         ctx: &RequestContext,
    1046          572 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
    1047          572 :         self.inner.take().unwrap().finish(ctx, Some(end_key)).await
    1048          572 :     }
    1049              : }
    1050              : 
    1051              : impl Drop for ImageLayerWriter {
    1052         1168 :     fn drop(&mut self) {
    1053         1168 :         if let Some(inner) = self.inner.take() {
    1054          460 :             inner.blob_writer.into_inner().remove();
    1055          708 :         }
    1056         1168 :     }
    1057              : }
    1058              : 
    1059              : pub struct ImageLayerIterator<'a> {
    1060              :     image_layer: &'a ImageLayerInner,
    1061              :     ctx: &'a RequestContext,
    1062              :     planner: StreamingVectoredReadPlanner,
    1063              :     index_iter: DiskBtreeIterator<'a>,
    1064              :     key_values_batch: VecDeque<(Key, Lsn, Value)>,
    1065              :     is_end: bool,
    1066              : }
    1067              : 
    1068              : impl ImageLayerIterator<'_> {
    1069            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
    1070            0 :         self.image_layer.layer_dbg_info()
    1071            0 :     }
    1072              : 
    1073              :     /// Retrieve a batch of key-value pairs into the iterator buffer.
    1074        38252 :     async fn next_batch(&mut self) -> anyhow::Result<()> {
    1075        38252 :         assert!(self.key_values_batch.is_empty());
    1076        38252 :         assert!(!self.is_end);
    1077              : 
    1078        38252 :         let plan = loop {
    1079        58072 :             if let Some(res) = self.index_iter.next().await {
    1080        57884 :                 let (raw_key, offset) = res?;
    1081        57884 :                 if let Some(batch_plan) = self.planner.handle(
    1082        57884 :                     Key::from_slice(&raw_key[..KEY_SIZE]),
    1083        57884 :                     self.image_layer.lsn,
    1084        57884 :                     offset,
    1085        57884 :                     true,
    1086        57884 :                 ) {
    1087        38064 :                     break batch_plan;
    1088        19820 :                 }
    1089              :             } else {
    1090          188 :                 self.is_end = true;
    1091          188 :                 let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
    1092          188 :                 if let Some(item) = self.planner.handle_range_end(payload_end) {
    1093          188 :                     break item;
    1094              :                 } else {
    1095            0 :                     return Ok(()); // TODO: a test case on empty iterator
    1096              :                 }
    1097              :             }
    1098              :         };
    1099        38252 :         let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
    1100        38252 :         let mut next_batch = std::collections::VecDeque::new();
    1101        38252 :         let buf_size = plan.size();
    1102        38252 :         let buf = IoBufferMut::with_capacity(buf_size);
    1103        38252 :         let blobs_buf = vectored_blob_reader
    1104        38252 :             .read_blobs(&plan, buf, self.ctx)
    1105        38252 :             .await?;
    1106        38252 :         let view = BufView::new_slice(&blobs_buf.buf);
    1107        57828 :         for meta in blobs_buf.blobs.iter() {
    1108        57828 :             let img_buf = meta.read(&view).await?;
    1109        57828 :             next_batch.push_back((
    1110        57828 :                 meta.meta.key,
    1111        57828 :                 self.image_layer.lsn,
    1112        57828 :                 Value::Image(img_buf.into_bytes()),
    1113        57828 :             ));
    1114              :         }
    1115        38252 :         self.key_values_batch = next_batch;
    1116        38252 :         Ok(())
    1117        38252 :     }
    1118              : 
    1119        57608 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
    1120        57608 :         if self.key_values_batch.is_empty() {
    1121        38404 :             if self.is_end {
    1122          320 :                 return Ok(None);
    1123        38084 :             }
    1124        38084 :             self.next_batch().await?;
    1125        19204 :         }
    1126        57288 :         Ok(Some(
    1127        57288 :             self.key_values_batch
    1128        57288 :                 .pop_front()
    1129        57288 :                 .expect("should not be empty"),
    1130        57288 :         ))
    1131        57608 :     }
    1132              : }
    1133              : 
    1134              : #[cfg(test)]
    1135              : mod test {
    1136              :     use std::{sync::Arc, time::Duration};
    1137              : 
    1138              :     use bytes::Bytes;
    1139              :     use itertools::Itertools;
    1140              :     use pageserver_api::{
    1141              :         key::Key,
    1142              :         shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
    1143              :         value::Value,
    1144              :     };
    1145              :     use utils::{
    1146              :         generation::Generation,
    1147              :         id::{TenantId, TimelineId},
    1148              :         lsn::Lsn,
    1149              :     };
    1150              : 
    1151              :     use crate::{
    1152              :         context::RequestContext,
    1153              :         tenant::{
    1154              :             config::TenantConf,
    1155              :             harness::{TenantHarness, TIMELINE_ID},
    1156              :             storage_layer::{Layer, ResidentLayer},
    1157              :             vectored_blob_io::StreamingVectoredReadPlanner,
    1158              :             Tenant, Timeline,
    1159              :         },
    1160              :         DEFAULT_PG_VERSION,
    1161              :     };
    1162              : 
    1163              :     use super::{ImageLayerIterator, ImageLayerWriter};
    1164              : 
    1165              :     #[tokio::test]
    1166            4 :     async fn image_layer_rewrite() {
    1167            4 :         let tenant_conf = TenantConf {
    1168            4 :             gc_period: Duration::ZERO,
    1169            4 :             compaction_period: Duration::ZERO,
    1170            4 :             ..TenantConf::default()
    1171            4 :         };
    1172            4 :         let tenant_id = TenantId::generate();
    1173            4 :         let mut gen = Generation::new(0xdead0001);
    1174           20 :         let mut get_next_gen = || {
    1175           20 :             let ret = gen;
    1176           20 :             gen = gen.next();
    1177           20 :             ret
    1178           20 :         };
    1179            4 :         // The LSN at which we will create an image layer to filter
    1180            4 :         let lsn = Lsn(0xdeadbeef0000);
    1181            4 :         let timeline_id = TimelineId::generate();
    1182            4 : 
    1183            4 :         //
    1184            4 :         // Create an unsharded parent with a layer.
    1185            4 :         //
    1186            4 : 
    1187            4 :         let harness = TenantHarness::create_custom(
    1188            4 :             "test_image_layer_rewrite--parent",
    1189            4 :             tenant_conf.clone(),
    1190            4 :             tenant_id,
    1191            4 :             ShardIdentity::unsharded(),
    1192            4 :             get_next_gen(),
    1193            4 :         )
    1194            4 :         .await
    1195            4 :         .unwrap();
    1196            4 :         let (tenant, ctx) = harness.load().await;
    1197            4 :         let timeline = tenant
    1198            4 :             .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1199            4 :             .await
    1200            4 :             .unwrap();
    1201            4 : 
    1202            4 :         // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
    1203            4 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1204            4 :         let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
    1205            4 :         let range = input_start..input_end;
    1206            4 : 
    1207            4 :         // Build an image layer to filter
    1208            4 :         let resident = {
    1209            4 :             let mut writer = ImageLayerWriter::new(
    1210            4 :                 harness.conf,
    1211            4 :                 timeline_id,
    1212            4 :                 harness.tenant_shard_id,
    1213            4 :                 &range,
    1214            4 :                 lsn,
    1215            4 :                 &ctx,
    1216            4 :             )
    1217            4 :             .await
    1218            4 :             .unwrap();
    1219            4 : 
    1220            4 :             let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
    1221            4 :             let mut key = range.start;
    1222       524292 :             while key < range.end {
    1223       524288 :                 writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
    1224       524288 : 
    1225       524288 :                 key = key.next();
    1226            4 :             }
    1227            4 :             let (desc, path) = writer.finish(&ctx).await.unwrap();
    1228            4 :             Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
    1229            4 :         };
    1230            4 :         let original_size = resident.metadata().file_size;
    1231            4 : 
    1232            4 :         //
    1233            4 :         // Create child shards and do the rewrite, exercising filter().
    1234            4 :         // TODO: abstraction in TenantHarness for splits.
    1235            4 :         //
    1236            4 : 
    1237            4 :         // Filter for various shards: this exercises cases like values at start of key range, end of key
    1238            4 :         // range, middle of key range.
    1239            4 :         let shard_count = ShardCount::new(4);
    1240           16 :         for shard_number in 0..shard_count.count() {
    1241            4 :             //
    1242            4 :             // mimic the shard split
    1243            4 :             //
    1244           16 :             let shard_identity = ShardIdentity::new(
    1245           16 :                 ShardNumber(shard_number),
    1246           16 :                 shard_count,
    1247           16 :                 ShardStripeSize(0x8000),
    1248           16 :             )
    1249           16 :             .unwrap();
    1250           16 :             let harness = TenantHarness::create_custom(
    1251           16 :                 Box::leak(Box::new(format!(
    1252           16 :                     "test_image_layer_rewrite--child{}",
    1253           16 :                     shard_identity.shard_slug()
    1254           16 :                 ))),
    1255           16 :                 tenant_conf.clone(),
    1256           16 :                 tenant_id,
    1257           16 :                 shard_identity,
    1258           16 :                 // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
    1259           16 :                 // But here, all we care about is that the gen number is unique.
    1260           16 :                 get_next_gen(),
    1261           16 :             )
    1262           16 :             .await
    1263           16 :             .unwrap();
    1264           16 :             let (tenant, ctx) = harness.load().await;
    1265           16 :             let timeline = tenant
    1266           16 :                 .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1267           16 :                 .await
    1268           16 :                 .unwrap();
    1269            4 : 
    1270            4 :             //
    1271            4 :             // use filter() and make assertions
    1272            4 :             //
    1273            4 : 
    1274           16 :             let mut filtered_writer = ImageLayerWriter::new(
    1275           16 :                 harness.conf,
    1276           16 :                 timeline_id,
    1277           16 :                 harness.tenant_shard_id,
    1278           16 :                 &range,
    1279           16 :                 lsn,
    1280           16 :                 &ctx,
    1281           16 :             )
    1282           16 :             .await
    1283           16 :             .unwrap();
    1284            4 : 
    1285           16 :             let wrote_keys = resident
    1286           16 :                 .filter(&shard_identity, &mut filtered_writer, &ctx)
    1287           16 :                 .await
    1288           16 :                 .unwrap();
    1289           16 :             let replacement = if wrote_keys > 0 {
    1290           12 :                 let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
    1291           12 :                 let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
    1292           12 :                 Some(resident)
    1293            4 :             } else {
    1294            4 :                 None
    1295            4 :             };
    1296            4 : 
    1297            4 :             // This exact size and those below will need updating as/when the layer encoding changes, but
    1298            4 :             // should be deterministic for a given version of the format, as we used no randomness generating the input.
    1299           16 :             assert_eq!(original_size, 1597440);
    1300            4 : 
    1301           16 :             match shard_number {
    1302            4 :                 0 => {
    1303            4 :                     // We should have written out just one stripe for our shard identity
    1304            4 :                     assert_eq!(wrote_keys, 0x8000);
    1305            4 :                     let replacement = replacement.unwrap();
    1306            4 : 
    1307            4 :                     // We should have dropped some of the data
    1308            4 :                     assert!(replacement.metadata().file_size < original_size);
    1309            4 :                     assert!(replacement.metadata().file_size > 0);
    1310            4 : 
    1311            4 :                     // Assert that we dropped ~3/4 of the data.
    1312            4 :                     assert_eq!(replacement.metadata().file_size, 417792);
    1313            4 :                 }
    1314            4 :                 1 => {
    1315            4 :                     // Shard 1 has no keys in our input range
    1316            4 :                     assert_eq!(wrote_keys, 0x0);
    1317            4 :                     assert!(replacement.is_none());
    1318            4 :                 }
    1319            4 :                 2 => {
    1320            4 :                     // Shard 2 has one stripes in the input range
    1321            4 :                     assert_eq!(wrote_keys, 0x8000);
    1322            4 :                     let replacement = replacement.unwrap();
    1323            4 :                     assert!(replacement.metadata().file_size < original_size);
    1324            4 :                     assert!(replacement.metadata().file_size > 0);
    1325            4 :                     assert_eq!(replacement.metadata().file_size, 417792);
    1326            4 :                 }
    1327            4 :                 3 => {
    1328            4 :                     // Shard 3 has two stripes in the input range
    1329            4 :                     assert_eq!(wrote_keys, 0x10000);
    1330            4 :                     let replacement = replacement.unwrap();
    1331            4 :                     assert!(replacement.metadata().file_size < original_size);
    1332            4 :                     assert!(replacement.metadata().file_size > 0);
    1333            4 :                     assert_eq!(replacement.metadata().file_size, 811008);
    1334            4 :                 }
    1335            4 :                 _ => unreachable!(),
    1336            4 :             }
    1337            4 :         }
    1338            4 :     }
    1339              : 
    1340            4 :     async fn produce_image_layer(
    1341            4 :         tenant: &Tenant,
    1342            4 :         tline: &Arc<Timeline>,
    1343            4 :         mut images: Vec<(Key, Bytes)>,
    1344            4 :         lsn: Lsn,
    1345            4 :         ctx: &RequestContext,
    1346            4 :     ) -> anyhow::Result<ResidentLayer> {
    1347            4 :         images.sort();
    1348            4 :         let (key_start, _) = images.first().unwrap();
    1349            4 :         let (key_last, _) = images.last().unwrap();
    1350            4 :         let key_end = key_last.next();
    1351            4 :         let key_range = *key_start..key_end;
    1352            4 :         let mut writer = ImageLayerWriter::new(
    1353            4 :             tenant.conf,
    1354            4 :             tline.timeline_id,
    1355            4 :             tenant.tenant_shard_id,
    1356            4 :             &key_range,
    1357            4 :             lsn,
    1358            4 :             ctx,
    1359            4 :         )
    1360            4 :         .await?;
    1361              : 
    1362         4004 :         for (key, img) in images {
    1363         4000 :             writer.put_image(key, img, ctx).await?;
    1364              :         }
    1365            4 :         let (desc, path) = writer.finish(ctx).await?;
    1366            4 :         let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
    1367              : 
    1368            4 :         Ok::<_, anyhow::Error>(img_layer)
    1369            4 :     }
    1370              : 
    1371           56 :     async fn assert_img_iter_equal(
    1372           56 :         img_iter: &mut ImageLayerIterator<'_>,
    1373           56 :         expect: &[(Key, Bytes)],
    1374           56 :         expect_lsn: Lsn,
    1375           56 :     ) {
    1376           56 :         let mut expect_iter = expect.iter();
    1377              :         loop {
    1378        56056 :             let o1 = img_iter.next().await.unwrap();
    1379        56056 :             let o2 = expect_iter.next();
    1380        56056 :             match (o1, o2) {
    1381           56 :                 (None, None) => break,
    1382        56000 :                 (Some((k1, l1, v1)), Some((k2, i2))) => {
    1383        56000 :                     let Value::Image(i1) = v1 else {
    1384            0 :                         panic!("expect Value::Image")
    1385              :                     };
    1386        56000 :                     assert_eq!(&k1, k2);
    1387        56000 :                     assert_eq!(l1, expect_lsn);
    1388        56000 :                     assert_eq!(&i1, i2);
    1389              :                 }
    1390            0 :                 (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
    1391              :             }
    1392              :         }
    1393           56 :     }
    1394              : 
    1395              :     #[tokio::test]
    1396            4 :     async fn image_layer_iterator() {
    1397            4 :         let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
    1398            4 :         let (tenant, ctx) = harness.load().await;
    1399            4 : 
    1400            4 :         let tline = tenant
    1401            4 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    1402            4 :             .await
    1403            4 :             .unwrap();
    1404            4 : 
    1405         4000 :         fn get_key(id: u32) -> Key {
    1406         4000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
    1407         4000 :             key.field6 = id;
    1408         4000 :             key
    1409         4000 :         }
    1410            4 :         const N: usize = 1000;
    1411            4 :         let test_imgs = (0..N)
    1412         4000 :             .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
    1413            4 :             .collect_vec();
    1414            4 :         let resident_layer =
    1415            4 :             produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
    1416            4 :                 .await
    1417            4 :                 .unwrap();
    1418            4 :         let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
    1419           12 :         for max_read_size in [1, 1024] {
    1420           64 :             for batch_size in [1, 2, 4, 8, 3, 7, 13] {
    1421           56 :                 println!("running with batch_size={batch_size} max_read_size={max_read_size}");
    1422           56 :                 // Test if the batch size is correctly determined
    1423           56 :                 let mut iter = img_layer.iter(&ctx);
    1424           56 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1425           56 :                 let mut num_items = 0;
    1426          224 :                 for _ in 0..3 {
    1427          168 :                     iter.next_batch().await.unwrap();
    1428          168 :                     num_items += iter.key_values_batch.len();
    1429          168 :                     if max_read_size == 1 {
    1430            4 :                         // every key should be a batch b/c the value is larger than max_read_size
    1431           84 :                         assert_eq!(iter.key_values_batch.len(), 1);
    1432            4 :                     } else {
    1433           84 :                         assert!(iter.key_values_batch.len() <= batch_size);
    1434            4 :                     }
    1435          168 :                     if num_items >= N {
    1436            4 :                         break;
    1437          168 :                     }
    1438          168 :                     iter.key_values_batch.clear();
    1439            4 :                 }
    1440            4 :                 // Test if the result is correct
    1441           56 :                 let mut iter = img_layer.iter(&ctx);
    1442           56 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1443           56 :                 assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
    1444            4 :             }
    1445            4 :         }
    1446            4 :     }
    1447              : }
        

Generated by: LCOV version 2.1-beta