LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - image_layer.rs (source / functions) Coverage Total Hit
Test: 53536e7d038dd1afd98124ffab7571882048d4d5.info Lines: 79.9 % 1024 818
Test Date: 2025-04-24 12:00:37 Functions: 59.3 % 91 54

            Line data    Source code
       1              : //! An ImageLayer represents an image or a snapshot of a key-range at
       2              : //! one particular LSN.
       3              : //!
       4              : //! It contains an image of all key-value pairs in its key-range. Any key
       5              : //! that falls into the image layer's range but does not exist in the layer,
       6              : //! does not exist.
       7              : //!
       8              : //! An image layer is stored in a file on disk. The file is stored in
       9              : //! timelines/<timeline_id> directory.  Currently, there are no
      10              : //! subdirectories, and each image layer file is named like this:
      11              : //!
      12              : //! ```text
      13              : //!    <key start>-<key end>__<LSN>
      14              : //! ```
      15              : //!
      16              : //! For example:
      17              : //!
      18              : //! ```text
      19              : //!    000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
      20              : //! ```
      21              : //!
      22              : //! Every image layer file consists of three parts: "summary",
      23              : //! "index", and "values".  The summary is a fixed size header at the
      24              : //! beginning of the file, and it contains basic information about the
      25              : //! layer, and offsets to the other parts. The "index" is a B-tree,
      26              : //! mapping from Key to an offset in the "values" part.  The
      27              : //! actual page images are stored in the "values" part.
      28              : use std::collections::{HashMap, VecDeque};
      29              : use std::fs::File;
      30              : use std::io::SeekFrom;
      31              : use std::ops::Range;
      32              : use std::os::unix::prelude::FileExt;
      33              : use std::str::FromStr;
      34              : use std::sync::Arc;
      35              : 
      36              : use anyhow::{Context, Result, bail, ensure};
      37              : use bytes::Bytes;
      38              : use camino::{Utf8Path, Utf8PathBuf};
      39              : use hex;
      40              : use itertools::Itertools;
      41              : use pageserver_api::config::MaxVectoredReadBytes;
      42              : use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
      43              : use pageserver_api::keyspace::KeySpace;
      44              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
      45              : use pageserver_api::value::Value;
      46              : use rand::Rng;
      47              : use rand::distributions::Alphanumeric;
      48              : use serde::{Deserialize, Serialize};
      49              : use tokio::sync::OnceCell;
      50              : use tokio_stream::StreamExt;
      51              : use tokio_util::sync::CancellationToken;
      52              : use tracing::*;
      53              : use utils::bin_ser::BeSer;
      54              : use utils::id::{TenantId, TimelineId};
      55              : use utils::lsn::Lsn;
      56              : 
      57              : use super::layer_name::ImageLayerName;
      58              : use super::{
      59              :     AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
      60              :     ValuesReconstructState,
      61              : };
      62              : use crate::config::PageServerConf;
      63              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      64              : use crate::page_cache::{self, FileId, PAGE_SZ};
      65              : use crate::tenant::blob_io::BlobWriter;
      66              : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
      67              : use crate::tenant::disk_btree::{
      68              :     DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
      69              : };
      70              : use crate::tenant::timeline::GetVectoredError;
      71              : use crate::tenant::vectored_blob_io::{
      72              :     BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
      73              :     VectoredReadPlanner,
      74              : };
      75              : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
      76              : use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
      77              : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
      78              : 
      79              : ///
      80              : /// Header stored in the beginning of the file
      81              : ///
      82              : /// After this comes the 'values' part, starting on block 1. After that,
      83              : /// the 'index' starts at the block indicated by 'index_start_blk'
      84              : ///
      85            0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      86              : pub struct Summary {
      87              :     /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
      88              :     pub magic: u16,
      89              :     pub format_version: u16,
      90              : 
      91              :     pub tenant_id: TenantId,
      92              :     pub timeline_id: TimelineId,
      93              :     pub key_range: Range<Key>,
      94              :     pub lsn: Lsn,
      95              : 
      96              :     /// Block number where the 'index' part of the file begins.
      97              :     pub index_start_blk: u32,
      98              :     /// Block within the 'index', where the B-tree root page is stored
      99              :     pub index_root_blk: u32,
     100              :     // the 'values' part starts after the summary header, on block 1.
     101              : }
     102              : 
     103              : impl From<&ImageLayer> for Summary {
     104            0 :     fn from(layer: &ImageLayer) -> Self {
     105            0 :         Self::expected(
     106            0 :             layer.desc.tenant_shard_id.tenant_id,
     107            0 :             layer.desc.timeline_id,
     108            0 :             layer.desc.key_range.clone(),
     109            0 :             layer.lsn,
     110            0 :         )
     111            0 :     }
     112              : }
     113              : 
     114              : impl Summary {
     115          900 :     pub(super) fn expected(
     116          900 :         tenant_id: TenantId,
     117          900 :         timeline_id: TimelineId,
     118          900 :         key_range: Range<Key>,
     119          900 :         lsn: Lsn,
     120          900 :     ) -> Self {
     121          900 :         Self {
     122          900 :             magic: IMAGE_FILE_MAGIC,
     123          900 :             format_version: STORAGE_FORMAT_VERSION,
     124          900 :             tenant_id,
     125          900 :             timeline_id,
     126          900 :             key_range,
     127          900 :             lsn,
     128          900 : 
     129          900 :             index_start_blk: 0,
     130          900 :             index_root_blk: 0,
     131          900 :         }
     132          900 :     }
     133              : }
     134              : 
     135              : /// This is used only from `pagectl`. Within pageserver, all layers are
     136              : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
     137              : pub struct ImageLayer {
     138              :     path: Utf8PathBuf,
     139              :     pub desc: PersistentLayerDesc,
     140              :     // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
     141              :     pub lsn: Lsn,
     142              :     inner: OnceCell<ImageLayerInner>,
     143              : }
     144              : 
     145              : impl std::fmt::Debug for ImageLayer {
     146            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     147              :         use super::RangeDisplayDebug;
     148              : 
     149            0 :         f.debug_struct("ImageLayer")
     150            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     151            0 :             .field("file_size", &self.desc.file_size)
     152            0 :             .field("lsn", &self.lsn)
     153            0 :             .field("inner", &self.inner)
     154            0 :             .finish()
     155            0 :     }
     156              : }
     157              : 
     158              : /// ImageLayer is the in-memory data structure associated with an on-disk image
     159              : /// file.
     160              : pub struct ImageLayerInner {
     161              :     // values copied from summary
     162              :     index_start_blk: u32,
     163              :     index_root_blk: u32,
     164              : 
     165              :     key_range: Range<Key>,
     166              :     lsn: Lsn,
     167              : 
     168              :     file: Arc<VirtualFile>,
     169              :     file_id: FileId,
     170              : 
     171              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     172              : }
     173              : 
     174              : impl ImageLayerInner {
     175            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
     176            0 :         format!(
     177            0 :             "image {}..{} {}",
     178            0 :             self.key_range().start,
     179            0 :             self.key_range().end,
     180            0 :             self.lsn()
     181            0 :         )
     182            0 :     }
     183              : }
     184              : 
     185              : impl std::fmt::Debug for ImageLayerInner {
     186            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     187            0 :         f.debug_struct("ImageLayerInner")
     188            0 :             .field("index_start_blk", &self.index_start_blk)
     189            0 :             .field("index_root_blk", &self.index_root_blk)
     190            0 :             .finish()
     191            0 :     }
     192              : }
     193              : 
     194              : impl ImageLayerInner {
     195            0 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
     196            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     197            0 :         let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
     198            0 :             self.index_start_blk,
     199            0 :             self.index_root_blk,
     200            0 :             block_reader,
     201            0 :         );
     202            0 : 
     203            0 :         tree_reader.dump(ctx).await?;
     204              : 
     205            0 :         tree_reader
     206            0 :             .visit(
     207            0 :                 &[0u8; KEY_SIZE],
     208            0 :                 VisitDirection::Forwards,
     209            0 :                 |key, value| {
     210            0 :                     println!("key: {} offset {}", hex::encode(key), value);
     211            0 :                     true
     212            0 :                 },
     213            0 :                 ctx,
     214            0 :             )
     215            0 :             .await?;
     216              : 
     217            0 :         Ok(())
     218            0 :     }
     219              : }
     220              : 
     221              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     222              : impl std::fmt::Display for ImageLayer {
     223            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     224            0 :         write!(f, "{}", self.layer_desc().short_id())
     225            0 :     }
     226              : }
     227              : 
     228              : impl AsLayerDesc for ImageLayer {
     229            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     230            0 :         &self.desc
     231            0 :     }
     232              : }
     233              : 
     234              : impl ImageLayer {
     235            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     236            0 :         self.desc.dump();
     237            0 : 
     238            0 :         if !verbose {
     239            0 :             return Ok(());
     240            0 :         }
     241              : 
     242            0 :         let inner = self.load(ctx).await?;
     243              : 
     244            0 :         inner.dump(ctx).await?;
     245              : 
     246            0 :         Ok(())
     247            0 :     }
     248              : 
     249         3744 :     fn temp_path_for(
     250         3744 :         conf: &PageServerConf,
     251         3744 :         timeline_id: TimelineId,
     252         3744 :         tenant_shard_id: TenantShardId,
     253         3744 :         fname: &ImageLayerName,
     254         3744 :     ) -> Utf8PathBuf {
     255         3744 :         let rand_string: String = rand::thread_rng()
     256         3744 :             .sample_iter(&Alphanumeric)
     257         3744 :             .take(8)
     258         3744 :             .map(char::from)
     259         3744 :             .collect();
     260         3744 : 
     261         3744 :         conf.timeline_path(&tenant_shard_id, &timeline_id)
     262         3744 :             .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
     263         3744 :     }
     264              : 
     265              :     ///
     266              :     /// Open the underlying file and read the metadata into memory, if it's
     267              :     /// not loaded already.
     268              :     ///
     269            0 :     async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
     270            0 :         self.inner
     271            0 :             .get_or_try_init(|| self.load_inner(ctx))
     272            0 :             .await
     273            0 :             .with_context(|| format!("Failed to load image layer {}", self.path()))
     274            0 :     }
     275              : 
     276            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
     277            0 :         let path = self.path();
     278              : 
     279            0 :         let loaded =
     280            0 :             ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
     281              : 
     282              :         // not production code
     283            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     284            0 :         let expected_layer_name = self.layer_desc().layer_name();
     285            0 : 
     286            0 :         if actual_layer_name != expected_layer_name {
     287            0 :             println!("warning: filename does not match what is expected from in-file summary");
     288            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     289            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     290            0 :         }
     291              : 
     292            0 :         Ok(loaded)
     293            0 :     }
     294              : 
     295              :     /// Create an ImageLayer struct representing an existing file on disk.
     296              :     ///
     297              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     298            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
     299            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     300            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     301            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     302            0 :         let metadata = file
     303            0 :             .metadata()
     304            0 :             .context("get file metadata to determine size")?;
     305              : 
     306              :         // This function is never used for constructing layers in a running pageserver,
     307              :         // so it does not need an accurate TenantShardId.
     308            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     309            0 : 
     310            0 :         Ok(ImageLayer {
     311            0 :             path: path.to_path_buf(),
     312            0 :             desc: PersistentLayerDesc::new_img(
     313            0 :                 tenant_shard_id,
     314            0 :                 summary.timeline_id,
     315            0 :                 summary.key_range,
     316            0 :                 summary.lsn,
     317            0 :                 metadata.len(),
     318            0 :             ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
     319            0 :             lsn: summary.lsn,
     320            0 :             inner: OnceCell::new(),
     321            0 :         })
     322            0 :     }
     323              : 
     324            0 :     fn path(&self) -> Utf8PathBuf {
     325            0 :         self.path.clone()
     326            0 :     }
     327              : }
     328              : 
     329              : #[derive(thiserror::Error, Debug)]
     330              : pub enum RewriteSummaryError {
     331              :     #[error("magic mismatch")]
     332              :     MagicMismatch,
     333              :     #[error(transparent)]
     334              :     Other(#[from] anyhow::Error),
     335              : }
     336              : 
     337              : impl From<std::io::Error> for RewriteSummaryError {
     338            0 :     fn from(e: std::io::Error) -> Self {
     339            0 :         Self::Other(anyhow::anyhow!(e))
     340            0 :     }
     341              : }
     342              : 
     343              : impl ImageLayer {
     344            0 :     pub async fn rewrite_summary<F>(
     345            0 :         path: &Utf8Path,
     346            0 :         rewrite: F,
     347            0 :         ctx: &RequestContext,
     348            0 :     ) -> Result<(), RewriteSummaryError>
     349            0 :     where
     350            0 :         F: Fn(Summary) -> Summary,
     351            0 :     {
     352            0 :         let mut file = VirtualFile::open_with_options(
     353            0 :             path,
     354            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     355            0 :             ctx,
     356            0 :         )
     357            0 :         .await
     358            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     359            0 :         let file_id = page_cache::next_file_id();
     360            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     361            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     362            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     363            0 :         if actual_summary.magic != IMAGE_FILE_MAGIC {
     364            0 :             return Err(RewriteSummaryError::MagicMismatch);
     365            0 :         }
     366            0 : 
     367            0 :         let new_summary = rewrite(actual_summary);
     368            0 : 
     369            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     370            0 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     371            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     372            0 :         file.seek(SeekFrom::Start(0)).await?;
     373            0 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     374            0 :         res?;
     375            0 :         Ok(())
     376            0 :     }
     377              : }
     378              : 
     379              : impl ImageLayerInner {
     380          840 :     pub(crate) fn key_range(&self) -> &Range<Key> {
     381          840 :         &self.key_range
     382          840 :     }
     383              : 
     384          840 :     pub(crate) fn lsn(&self) -> Lsn {
     385          840 :         self.lsn
     386          840 :     }
     387              : 
     388          900 :     pub(super) async fn load(
     389          900 :         path: &Utf8Path,
     390          900 :         lsn: Lsn,
     391          900 :         summary: Option<Summary>,
     392          900 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     393          900 :         ctx: &RequestContext,
     394          900 :     ) -> anyhow::Result<Self> {
     395          900 :         let file = Arc::new(
     396          900 :             VirtualFile::open_v2(path, ctx)
     397          900 :                 .await
     398          900 :                 .context("open layer file")?,
     399              :         );
     400          900 :         let file_id = page_cache::next_file_id();
     401          900 :         let block_reader = FileBlockReader::new(&file, file_id);
     402          900 :         let summary_blk = block_reader
     403          900 :             .read_blk(0, ctx)
     404          900 :             .await
     405          900 :             .context("read first block")?;
     406              : 
     407              :         // length is the only way how this could fail, so it's not actually likely at all unless
     408              :         // read_blk returns wrong sized block.
     409              :         //
     410              :         // TODO: confirm and make this into assertion
     411          900 :         let actual_summary =
     412          900 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     413              : 
     414          900 :         if let Some(mut expected_summary) = summary {
     415              :             // production code path
     416          900 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     417          900 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     418          900 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     419          900 :             expected_summary.timeline_id = actual_summary.timeline_id;
     420          900 : 
     421          900 :             if actual_summary != expected_summary {
     422            0 :                 bail!(
     423            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     424            0 :                     actual_summary,
     425            0 :                     expected_summary
     426            0 :                 );
     427          900 :             }
     428            0 :         }
     429              : 
     430          900 :         Ok(ImageLayerInner {
     431          900 :             index_start_blk: actual_summary.index_start_blk,
     432          900 :             index_root_blk: actual_summary.index_root_blk,
     433          900 :             lsn,
     434          900 :             file,
     435          900 :             file_id,
     436          900 :             max_vectored_read_bytes,
     437          900 :             key_range: actual_summary.key_range,
     438          900 :         })
     439          900 :     }
     440              : 
     441              :     // Look up the keys in the provided keyspace and update
     442              :     // the reconstruct state with whatever is found.
     443       181512 :     pub(super) async fn get_values_reconstruct_data(
     444       181512 :         &self,
     445       181512 :         this: ResidentLayer,
     446       181512 :         keyspace: KeySpace,
     447       181512 :         reconstruct_state: &mut ValuesReconstructState,
     448       181512 :         ctx: &RequestContext,
     449       181512 :     ) -> Result<(), GetVectoredError> {
     450       181512 :         let reads = self
     451       181512 :             .plan_reads(keyspace, None, ctx)
     452       181512 :             .await
     453       181512 :             .map_err(GetVectoredError::Other)?;
     454              : 
     455       181512 :         self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
     456       181512 :             .await;
     457              : 
     458       181512 :         reconstruct_state.on_image_layer_visited(&self.key_range);
     459       181512 : 
     460       181512 :         Ok(())
     461       181512 :     }
     462              : 
     463              :     /// Traverse the layer's index to build read operations on the overlap of the input keyspace
     464              :     /// and the keys in this layer.
     465              :     ///
     466              :     /// If shard_identity is provided, it will be used to filter keys down to those stored on
     467              :     /// this shard.
     468       181560 :     async fn plan_reads(
     469       181560 :         &self,
     470       181560 :         keyspace: KeySpace,
     471       181560 :         shard_identity: Option<&ShardIdentity>,
     472       181560 :         ctx: &RequestContext,
     473       181560 :     ) -> anyhow::Result<Vec<VectoredRead>> {
     474       181560 :         let mut planner = VectoredReadPlanner::new(
     475       181560 :             self.max_vectored_read_bytes
     476       181560 :                 .expect("Layer is loaded with max vectored bytes config")
     477       181560 :                 .0
     478       181560 :                 .into(),
     479       181560 :         );
     480       181560 : 
     481       181560 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     482       181560 :         let tree_reader =
     483       181560 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     484       181560 : 
     485       181560 :         let ctx = RequestContextBuilder::from(ctx)
     486       181560 :             .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     487       181560 :             .attached_child();
     488              : 
     489       269148 :         for range in keyspace.ranges.iter() {
     490       269148 :             let mut range_end_handled = false;
     491       269148 :             let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     492       269148 :             range.start.write_to_byte_slice(&mut search_key);
     493       269148 : 
     494       269148 :             let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
     495       269148 :             let mut index_stream = std::pin::pin!(index_stream);
     496              : 
     497      1209540 :             while let Some(index_entry) = index_stream.next().await {
     498      1204944 :                 let (raw_key, offset) = index_entry?;
     499              : 
     500      1204944 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     501      1204944 :                 assert!(key >= range.start);
     502              : 
     503      1204944 :                 let flag = if let Some(shard_identity) = shard_identity {
     504       393216 :                     if shard_identity.is_key_disposable(&key) {
     505       294912 :                         BlobFlag::Ignore
     506              :                     } else {
     507        98304 :                         BlobFlag::None
     508              :                     }
     509              :                 } else {
     510       811728 :                     BlobFlag::None
     511              :                 };
     512              : 
     513      1204944 :                 if key >= range.end {
     514       264552 :                     planner.handle_range_end(offset);
     515       264552 :                     range_end_handled = true;
     516       264552 :                     break;
     517       940392 :                 } else {
     518       940392 :                     planner.handle(key, self.lsn, offset, flag);
     519       940392 :                 }
     520              :             }
     521              : 
     522       269148 :             if !range_end_handled {
     523         4596 :                 let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
     524         4596 :                 planner.handle_range_end(payload_end);
     525       264552 :             }
     526              :         }
     527              : 
     528       181560 :         Ok(planner.finish())
     529       181560 :     }
     530              : 
     531              :     /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
     532              :     /// then execute vectored GET operations, passing the results of all read keys into the writer.
     533           48 :     pub(super) async fn filter(
     534           48 :         &self,
     535           48 :         shard_identity: &ShardIdentity,
     536           48 :         writer: &mut ImageLayerWriter,
     537           48 :         ctx: &RequestContext,
     538           48 :     ) -> anyhow::Result<usize> {
     539              :         // Fragment the range into the regions owned by this ShardIdentity
     540           48 :         let plan = self
     541           48 :             .plan_reads(
     542           48 :                 KeySpace {
     543           48 :                     // If asked for the total key space, plan_reads will give us all the keys in the layer
     544           48 :                     ranges: vec![Key::MIN..Key::MAX],
     545           48 :                 },
     546           48 :                 Some(shard_identity),
     547           48 :                 ctx,
     548           48 :             )
     549           48 :             .await?;
     550              : 
     551           48 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     552           48 :         let mut key_count = 0;
     553           48 :         for read in plan.into_iter() {
     554           48 :             let buf_size = read.size();
     555           48 : 
     556           48 :             let buf = IoBufferMut::with_capacity(buf_size);
     557           48 :             let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
     558              : 
     559           48 :             let view = BufView::new_slice(&blobs_buf.buf);
     560              : 
     561        98304 :             for meta in blobs_buf.blobs.iter() {
     562              :                 // Just read the raw header+data and pass it through to the target layer, without
     563              :                 // decoding and recompressing it.
     564        98304 :                 let raw = meta.raw_with_header(&view);
     565        98304 :                 key_count += 1;
     566        98304 :                 writer
     567        98304 :                     .put_image_raw(meta.meta.key, raw.into_bytes(), ctx)
     568        98304 :                     .await
     569        98304 :                     .context(format!("Storing key {}", meta.meta.key))?;
     570              :             }
     571              :         }
     572              : 
     573           48 :         Ok(key_count)
     574           48 :     }
     575              : 
     576       181512 :     async fn do_reads_and_update_state(
     577       181512 :         &self,
     578       181512 :         this: ResidentLayer,
     579       181512 :         reads: Vec<VectoredRead>,
     580       181512 :         reconstruct_state: &mut ValuesReconstructState,
     581       181512 :         ctx: &RequestContext,
     582       181512 :     ) {
     583       181512 :         let max_vectored_read_bytes = self
     584       181512 :             .max_vectored_read_bytes
     585       181512 :             .expect("Layer is loaded with max vectored bytes config")
     586       181512 :             .0
     587       181512 :             .into();
     588              : 
     589       207264 :         for read in reads.into_iter() {
     590       207264 :             let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
     591       547176 :             for (_, blob_meta) in read.blobs_at.as_slice() {
     592       547176 :                 let io = reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true);
     593       547176 :                 ios.insert((blob_meta.key, blob_meta.lsn), io);
     594       547176 :             }
     595              : 
     596       207264 :             let buf_size = read.size();
     597       207264 : 
     598       207264 :             if buf_size > max_vectored_read_bytes {
     599              :                 // If the read is oversized, it should only contain one key.
     600            0 :                 let offenders = read
     601            0 :                     .blobs_at
     602            0 :                     .as_slice()
     603            0 :                     .iter()
     604            0 :                     .filter_map(|(_, blob_meta)| {
     605            0 :                         if blob_meta.key.is_rel_dir_key()
     606            0 :                             || blob_meta.key == DBDIR_KEY
     607            0 :                             || blob_meta.key.is_aux_file_key()
     608              :                         {
     609              :                             // The size of values for these keys is unbounded and can
     610              :                             // grow very large in pathological cases.
     611            0 :                             None
     612              :                         } else {
     613            0 :                             Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
     614              :                         }
     615            0 :                     })
     616            0 :                     .join(", ");
     617            0 : 
     618            0 :                 if !offenders.is_empty() {
     619            0 :                     tracing::warn!(
     620            0 :                         "Oversized vectored read ({} > {}) for keys {}",
     621              :                         buf_size,
     622              :                         max_vectored_read_bytes,
     623              :                         offenders
     624              :                     );
     625            0 :                 }
     626       207264 :             }
     627              : 
     628       207264 :             let read_extend_residency = this.clone();
     629       207264 :             let read_from = self.file.clone();
     630       207264 :             let read_ctx = ctx.attached_child();
     631       207264 :             reconstruct_state
     632       207264 :                 .spawn_io(async move {
     633       207264 :                     let buf = IoBufferMut::with_capacity(buf_size);
     634       207264 :                     let vectored_blob_reader = VectoredBlobReader::new(&read_from);
     635       207264 :                     let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
     636              : 
     637       207264 :                     match res {
     638       207264 :                         Ok(blobs_buf) => {
     639       207264 :                             let view = BufView::new_slice(&blobs_buf.buf);
     640       547176 :                             for meta in blobs_buf.blobs.iter() {
     641       547176 :                                 let io: OnDiskValueIo =
     642       547176 :                                     ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
     643       547176 :                                 let img_buf = meta.read(&view).await;
     644              : 
     645       547176 :                                 let img_buf = match img_buf {
     646       547176 :                                     Ok(img_buf) => img_buf,
     647            0 :                                     Err(e) => {
     648            0 :                                         io.complete(Err(e));
     649            0 :                                         continue;
     650              :                                     }
     651              :                                 };
     652              : 
     653       547176 :                                 io.complete(Ok(OnDiskValue::RawImage(img_buf.into_bytes())));
     654              :                             }
     655              : 
     656       207264 :                             assert!(ios.is_empty());
     657              :                         }
     658            0 :                         Err(err) => {
     659            0 :                             for (_, io) in ios {
     660            0 :                                 io.complete(Err(std::io::Error::new(
     661            0 :                                     err.kind(),
     662            0 :                                     "vec read failed",
     663            0 :                                 )));
     664            0 :                             }
     665              :                         }
     666              :                     }
     667              : 
     668              :                     // keep layer resident until this IO is done; this spawned IO future generally outlives the
     669              :                     // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
     670       207264 :                     drop(read_extend_residency);
     671       207264 :                 })
     672       207264 :                 .await;
     673              :         }
     674       181512 :     }
     675              : 
     676          756 :     pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
     677          756 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     678          756 :         let tree_reader =
     679          756 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     680          756 :         ImageLayerIterator {
     681          756 :             image_layer: self,
     682          756 :             ctx,
     683          756 :             index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
     684          756 :             key_values_batch: VecDeque::new(),
     685          756 :             is_end: false,
     686          756 :             planner: StreamingVectoredReadPlanner::new(
     687          756 :                 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
     688          756 :                 1024,        // The default value. Unit tests might use a different value
     689          756 :             ),
     690          756 :         }
     691          756 :     }
     692              : 
     693              :     /// NB: not super efficient, but not terrible either. Should prob be an iterator.
     694              :     //
     695              :     // We're reusing the index traversal logical in plan_reads; would be nice to
     696              :     // factor that out.
     697            0 :     pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
     698            0 :         let plan = self
     699            0 :             .plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
     700            0 :             .await?;
     701            0 :         Ok(plan
     702            0 :             .into_iter()
     703            0 :             .flat_map(|read| read.blobs_at)
     704            0 :             .map(|(_, blob_meta)| blob_meta.key)
     705            0 :             .collect())
     706            0 :     }
     707              : }
     708              : 
     709              : /// A builder object for constructing a new image layer.
     710              : ///
     711              : /// Usage:
     712              : ///
     713              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     714              : ///
     715              : /// 2. Write the contents by calling `put_page_image` for every key-value
     716              : ///    pair in the key range.
     717              : ///
     718              : /// 3. Call `finish`.
     719              : ///
     720              : struct ImageLayerWriterInner {
     721              :     conf: &'static PageServerConf,
     722              :     path: Utf8PathBuf,
     723              :     timeline_id: TimelineId,
     724              :     tenant_shard_id: TenantShardId,
     725              :     key_range: Range<Key>,
     726              :     lsn: Lsn,
     727              : 
     728              :     // Total uncompressed bytes passed into put_image
     729              :     uncompressed_bytes: u64,
     730              : 
     731              :     // Like `uncompressed_bytes`,
     732              :     // but only of images we might consider for compression
     733              :     uncompressed_bytes_eligible: u64,
     734              : 
     735              :     // Like `uncompressed_bytes`, but only of images
     736              :     // where we have chosen their compressed form
     737              :     uncompressed_bytes_chosen: u64,
     738              : 
     739              :     // Number of keys in the layer.
     740              :     num_keys: usize,
     741              : 
     742              :     blob_writer: BlobWriter<false>,
     743              :     tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
     744              : 
     745              :     #[cfg(feature = "testing")]
     746              :     last_written_key: Key,
     747              : }
     748              : 
     749              : impl ImageLayerWriterInner {
     750              :     ///
     751              :     /// Start building a new image layer.
     752              :     ///
     753              :     #[allow(clippy::too_many_arguments)]
     754         3744 :     async fn new(
     755         3744 :         conf: &'static PageServerConf,
     756         3744 :         timeline_id: TimelineId,
     757         3744 :         tenant_shard_id: TenantShardId,
     758         3744 :         key_range: &Range<Key>,
     759         3744 :         lsn: Lsn,
     760         3744 :         gate: &utils::sync::gate::Gate,
     761         3744 :         cancel: CancellationToken,
     762         3744 :         ctx: &RequestContext,
     763         3744 :     ) -> anyhow::Result<Self> {
     764         3744 :         // Create the file initially with a temporary filename.
     765         3744 :         // We'll atomically rename it to the final name when we're done.
     766         3744 :         let path = ImageLayer::temp_path_for(
     767         3744 :             conf,
     768         3744 :             timeline_id,
     769         3744 :             tenant_shard_id,
     770         3744 :             &ImageLayerName {
     771         3744 :                 key_range: key_range.clone(),
     772         3744 :                 lsn,
     773         3744 :             },
     774         3744 :         );
     775         3744 :         trace!("creating image layer {}", path);
     776         3744 :         let mut file = {
     777         3744 :             VirtualFile::open_with_options(
     778         3744 :                 &path,
     779         3744 :                 virtual_file::OpenOptions::new()
     780         3744 :                     .write(true)
     781         3744 :                     .create_new(true),
     782         3744 :                 ctx,
     783         3744 :             )
     784         3744 :             .await?
     785              :         };
     786              :         // make room for the header block
     787         3744 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     788         3744 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
     789         3744 : 
     790         3744 :         // Initialize the b-tree index builder
     791         3744 :         let block_buf = BlockBuf::new();
     792         3744 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     793         3744 : 
     794         3744 :         let writer = Self {
     795         3744 :             conf,
     796         3744 :             path,
     797         3744 :             timeline_id,
     798         3744 :             tenant_shard_id,
     799         3744 :             key_range: key_range.clone(),
     800         3744 :             lsn,
     801         3744 :             tree: tree_builder,
     802         3744 :             blob_writer,
     803         3744 :             uncompressed_bytes: 0,
     804         3744 :             uncompressed_bytes_eligible: 0,
     805         3744 :             uncompressed_bytes_chosen: 0,
     806         3744 :             num_keys: 0,
     807         3744 :             #[cfg(feature = "testing")]
     808         3744 :             last_written_key: Key::MIN,
     809         3744 :         };
     810         3744 : 
     811         3744 :         Ok(writer)
     812         3744 :     }
     813              : 
     814              :     ///
     815              :     /// Write next value to the file.
     816              :     ///
     817              :     /// The page versions must be appended in blknum order.
     818              :     ///
     819       234828 :     async fn put_image(
     820       234828 :         &mut self,
     821       234828 :         key: Key,
     822       234828 :         img: Bytes,
     823       234828 :         ctx: &RequestContext,
     824       234828 :     ) -> anyhow::Result<()> {
     825       234828 :         ensure!(self.key_range.contains(&key));
     826       234828 :         let compression = self.conf.image_compression;
     827       234828 :         let uncompressed_len = img.len() as u64;
     828       234828 :         self.uncompressed_bytes += uncompressed_len;
     829       234828 :         self.num_keys += 1;
     830       234828 :         let (_img, res) = self
     831       234828 :             .blob_writer
     832       234828 :             .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
     833       234828 :             .await;
     834              :         // TODO: re-use the buffer for `img` further upstack
     835       234828 :         let (off, compression_info) = res?;
     836       234828 :         if compression_info.compressed_size.is_some() {
     837        48012 :             // The image has been considered for compression at least
     838        48012 :             self.uncompressed_bytes_eligible += uncompressed_len;
     839       186816 :         }
     840       234828 :         if compression_info.written_compressed {
     841            0 :             // The image has been compressed
     842            0 :             self.uncompressed_bytes_chosen += uncompressed_len;
     843       234828 :         }
     844              : 
     845       234828 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     846       234828 :         key.write_to_byte_slice(&mut keybuf);
     847       234828 :         self.tree.append(&keybuf, off)?;
     848              : 
     849              :         #[cfg(feature = "testing")]
     850       234828 :         {
     851       234828 :             self.last_written_key = key;
     852       234828 :         }
     853       234828 : 
     854       234828 :         Ok(())
     855       234828 :     }
     856              : 
     857              :     ///
     858              :     /// Write the next image to the file, as a raw blob header and data.
     859              :     ///
     860              :     /// The page versions must be appended in blknum order.
     861              :     ///
     862        98304 :     async fn put_image_raw(
     863        98304 :         &mut self,
     864        98304 :         key: Key,
     865        98304 :         raw_with_header: Bytes,
     866        98304 :         ctx: &RequestContext,
     867        98304 :     ) -> anyhow::Result<()> {
     868        98304 :         ensure!(self.key_range.contains(&key));
     869              : 
     870              :         // NB: we don't update the (un)compressed metrics, since we can't determine them without
     871              :         // decompressing the image. This seems okay.
     872        98304 :         self.num_keys += 1;
     873              : 
     874        98304 :         let (_, res) = self
     875        98304 :             .blob_writer
     876        98304 :             .write_blob_raw(raw_with_header.slice_len(), ctx)
     877        98304 :             .await;
     878        98304 :         let offset = res?;
     879              : 
     880        98304 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     881        98304 :         key.write_to_byte_slice(&mut keybuf);
     882        98304 :         self.tree.append(&keybuf, offset)?;
     883              : 
     884              :         #[cfg(feature = "testing")]
     885        98304 :         {
     886        98304 :             self.last_written_key = key;
     887        98304 :         }
     888        98304 : 
     889        98304 :         Ok(())
     890        98304 :     }
     891              : 
     892              :     ///
     893              :     /// Finish writing the image layer.
     894              :     ///
     895         2280 :     async fn finish(
     896         2280 :         self,
     897         2280 :         ctx: &RequestContext,
     898         2280 :         end_key: Option<Key>,
     899         2280 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     900         2280 :         let temp_path = self.path.clone();
     901         2280 :         let result = self.finish0(ctx, end_key).await;
     902         2280 :         if let Err(ref e) = result {
     903            0 :             tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
     904            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     905            0 :                 tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
     906            0 :             }
     907         2280 :         }
     908         2280 :         result
     909         2280 :     }
     910              : 
     911              :     ///
     912              :     /// Finish writing the image layer.
     913              :     ///
     914         2280 :     async fn finish0(
     915         2280 :         self,
     916         2280 :         ctx: &RequestContext,
     917         2280 :         end_key: Option<Key>,
     918         2280 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     919         2280 :         let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
     920         2280 : 
     921         2280 :         // Calculate compression ratio
     922         2280 :         let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
     923         2280 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
     924         2280 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
     925         2280 :             .inc_by(self.uncompressed_bytes_eligible);
     926         2280 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
     927         2280 : 
     928         2280 :         // NB: filter() may pass through raw pages from a different layer, without looking at
     929         2280 :         // whether these are compressed or not. We don't track metrics for these, so avoid
     930         2280 :         // increasing `COMPRESSION_IMAGE_OUTPUT_BYTES` in this case too.
     931         2280 :         if self.uncompressed_bytes > 0 {
     932         2244 :             crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
     933         2244 :         };
     934              : 
     935         2280 :         let mut file = self.blob_writer.into_inner();
     936         2280 : 
     937         2280 :         // Write out the index
     938         2280 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     939         2280 :             .await?;
     940         2280 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     941         4752 :         for buf in block_buf.blocks {
     942         2472 :             let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     943         2472 :             res?;
     944              :         }
     945              : 
     946         2280 :         let final_key_range = if let Some(end_key) = end_key {
     947         1788 :             self.key_range.start..end_key
     948              :         } else {
     949          492 :             self.key_range.clone()
     950              :         };
     951              : 
     952              :         // Fill in the summary on blk 0
     953         2280 :         let summary = Summary {
     954         2280 :             magic: IMAGE_FILE_MAGIC,
     955         2280 :             format_version: STORAGE_FORMAT_VERSION,
     956         2280 :             tenant_id: self.tenant_shard_id.tenant_id,
     957         2280 :             timeline_id: self.timeline_id,
     958         2280 :             key_range: final_key_range.clone(),
     959         2280 :             lsn: self.lsn,
     960         2280 :             index_start_blk,
     961         2280 :             index_root_blk,
     962         2280 :         };
     963         2280 : 
     964         2280 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     965         2280 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     966         2280 :         Summary::ser_into(&summary, &mut buf)?;
     967         2280 :         file.seek(SeekFrom::Start(0)).await?;
     968         2280 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     969         2280 :         res?;
     970              : 
     971         2280 :         let metadata = file
     972         2280 :             .metadata()
     973         2280 :             .await
     974         2280 :             .context("get metadata to determine file size")?;
     975              : 
     976         2280 :         let desc = PersistentLayerDesc::new_img(
     977         2280 :             self.tenant_shard_id,
     978         2280 :             self.timeline_id,
     979         2280 :             final_key_range,
     980         2280 :             self.lsn,
     981         2280 :             metadata.len(),
     982         2280 :         );
     983              : 
     984              :         #[cfg(feature = "testing")]
     985         2280 :         if let Some(end_key) = end_key {
     986         1788 :             assert!(
     987         1788 :                 self.last_written_key < end_key,
     988            0 :                 "written key violates end_key range"
     989              :             );
     990          492 :         }
     991              : 
     992              :         // Note: Because we open the file in write-only mode, we cannot
     993              :         // reuse the same VirtualFile for reading later. That's why we don't
     994              :         // set inner.file here. The first read will have to re-open it.
     995              : 
     996              :         // fsync the file
     997         2280 :         file.sync_all()
     998         2280 :             .await
     999         2280 :             .maybe_fatal_err("image_layer sync_all")?;
    1000              : 
    1001         2280 :         trace!("created image layer {}", self.path);
    1002              : 
    1003         2280 :         Ok((desc, self.path))
    1004         2280 :     }
    1005              : }
    1006              : 
    1007              : /// A builder object for constructing a new image layer.
    1008              : ///
    1009              : /// Usage:
    1010              : ///
    1011              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
    1012              : ///
    1013              : /// 2. Write the contents by calling `put_page_image` for every key-value
    1014              : ///    pair in the key range.
    1015              : ///
    1016              : /// 3. Call `finish`.
    1017              : ///
    1018              : /// # Note
    1019              : ///
    1020              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
    1021              : /// possible for the writer to drop before `finish` is actually called. So this
    1022              : /// could lead to odd temporary files in the directory, exhausting file system.
    1023              : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
    1024              : /// implementation that cleans up the temporary file in failure. It's not
    1025              : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
    1026              : /// out some fields, making it impossible to implement `Drop`.
    1027              : ///
    1028              : #[must_use]
    1029              : pub struct ImageLayerWriter {
    1030              :     inner: Option<ImageLayerWriterInner>,
    1031              : }
    1032              : 
    1033              : impl ImageLayerWriter {
    1034              :     ///
    1035              :     /// Start building a new image layer.
    1036              :     ///
    1037              :     #[allow(clippy::too_many_arguments)]
    1038         3744 :     pub async fn new(
    1039         3744 :         conf: &'static PageServerConf,
    1040         3744 :         timeline_id: TimelineId,
    1041         3744 :         tenant_shard_id: TenantShardId,
    1042         3744 :         key_range: &Range<Key>,
    1043         3744 :         lsn: Lsn,
    1044         3744 :         gate: &utils::sync::gate::Gate,
    1045         3744 :         cancel: CancellationToken,
    1046         3744 :         ctx: &RequestContext,
    1047         3744 :     ) -> anyhow::Result<ImageLayerWriter> {
    1048         3744 :         Ok(Self {
    1049         3744 :             inner: Some(
    1050         3744 :                 ImageLayerWriterInner::new(
    1051         3744 :                     conf,
    1052         3744 :                     timeline_id,
    1053         3744 :                     tenant_shard_id,
    1054         3744 :                     key_range,
    1055         3744 :                     lsn,
    1056         3744 :                     gate,
    1057         3744 :                     cancel,
    1058         3744 :                     ctx,
    1059         3744 :                 )
    1060         3744 :                 .await?,
    1061              :             ),
    1062              :         })
    1063         3744 :     }
    1064              : 
    1065              :     ///
    1066              :     /// Write next value to the file.
    1067              :     ///
    1068              :     /// The page versions must be appended in blknum order.
    1069              :     ///
    1070       234828 :     pub async fn put_image(
    1071       234828 :         &mut self,
    1072       234828 :         key: Key,
    1073       234828 :         img: Bytes,
    1074       234828 :         ctx: &RequestContext,
    1075       234828 :     ) -> anyhow::Result<()> {
    1076       234828 :         self.inner.as_mut().unwrap().put_image(key, img, ctx).await
    1077       234828 :     }
    1078              : 
    1079              :     ///
    1080              :     /// Write the next value to the file, as a raw header and data. This allows passing through a
    1081              :     /// raw, potentially compressed image from a different layer file without recompressing it.
    1082              :     ///
    1083              :     /// The page versions must be appended in blknum order.
    1084              :     ///
    1085        98304 :     pub async fn put_image_raw(
    1086        98304 :         &mut self,
    1087        98304 :         key: Key,
    1088        98304 :         raw_with_header: Bytes,
    1089        98304 :         ctx: &RequestContext,
    1090        98304 :     ) -> anyhow::Result<()> {
    1091        98304 :         self.inner
    1092        98304 :             .as_mut()
    1093        98304 :             .unwrap()
    1094        98304 :             .put_image_raw(key, raw_with_header, ctx)
    1095        98304 :             .await
    1096        98304 :     }
    1097              : 
    1098              :     /// Estimated size of the image layer.
    1099        51324 :     pub(crate) fn estimated_size(&self) -> u64 {
    1100        51324 :         let inner = self.inner.as_ref().unwrap();
    1101        51324 :         inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
    1102        51324 :     }
    1103              : 
    1104        51912 :     pub(crate) fn num_keys(&self) -> usize {
    1105        51912 :         self.inner.as_ref().unwrap().num_keys
    1106        51912 :     }
    1107              : 
    1108              :     ///
    1109              :     /// Finish writing the image layer.
    1110              :     ///
    1111          492 :     pub(crate) async fn finish(
    1112          492 :         mut self,
    1113          492 :         ctx: &RequestContext,
    1114          492 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
    1115          492 :         self.inner.take().unwrap().finish(ctx, None).await
    1116          492 :     }
    1117              : 
    1118              :     /// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
    1119         1788 :     pub(super) async fn finish_with_end_key(
    1120         1788 :         mut self,
    1121         1788 :         end_key: Key,
    1122         1788 :         ctx: &RequestContext,
    1123         1788 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
    1124         1788 :         self.inner.take().unwrap().finish(ctx, Some(end_key)).await
    1125         1788 :     }
    1126              : }
    1127              : 
    1128              : impl Drop for ImageLayerWriter {
    1129         3744 :     fn drop(&mut self) {
    1130         3744 :         if let Some(inner) = self.inner.take() {
    1131         1464 :             inner.blob_writer.into_inner().remove();
    1132         2280 :         }
    1133         3744 :     }
    1134              : }
    1135              : 
    1136              : pub struct ImageLayerIterator<'a> {
    1137              :     image_layer: &'a ImageLayerInner,
    1138              :     ctx: &'a RequestContext,
    1139              :     planner: StreamingVectoredReadPlanner,
    1140              :     index_iter: DiskBtreeIterator<'a>,
    1141              :     key_values_batch: VecDeque<(Key, Lsn, Value)>,
    1142              :     is_end: bool,
    1143              : }
    1144              : 
    1145              : impl ImageLayerIterator<'_> {
    1146            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
    1147            0 :         self.image_layer.layer_dbg_info()
    1148            0 :     }
    1149              : 
    1150              :     /// Retrieve a batch of key-value pairs into the iterator buffer.
    1151       114780 :     async fn next_batch(&mut self) -> anyhow::Result<()> {
    1152       114780 :         assert!(self.key_values_batch.is_empty());
    1153       114780 :         assert!(!self.is_end);
    1154              : 
    1155       114780 :         let plan = loop {
    1156       174444 :             if let Some(res) = self.index_iter.next().await {
    1157       173856 :                 let (raw_key, offset) = res?;
    1158       173856 :                 if let Some(batch_plan) = self.planner.handle(
    1159       173856 :                     Key::from_slice(&raw_key[..KEY_SIZE]),
    1160       173856 :                     self.image_layer.lsn,
    1161       173856 :                     offset,
    1162       173856 :                     true,
    1163       173856 :                 ) {
    1164       114192 :                     break batch_plan;
    1165        59664 :                 }
    1166              :             } else {
    1167          588 :                 self.is_end = true;
    1168          588 :                 let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
    1169          588 :                 if let Some(item) = self.planner.handle_range_end(payload_end) {
    1170          588 :                     break item;
    1171              :                 } else {
    1172            0 :                     return Ok(()); // TODO: a test case on empty iterator
    1173              :                 }
    1174              :             }
    1175              :         };
    1176       114780 :         let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
    1177       114780 :         let mut next_batch = std::collections::VecDeque::new();
    1178       114780 :         let buf_size = plan.size();
    1179       114780 :         let buf = IoBufferMut::with_capacity(buf_size);
    1180       114780 :         let blobs_buf = vectored_blob_reader
    1181       114780 :             .read_blobs(&plan, buf, self.ctx)
    1182       114780 :             .await?;
    1183       114780 :         let view = BufView::new_slice(&blobs_buf.buf);
    1184       173688 :         for meta in blobs_buf.blobs.iter() {
    1185       173688 :             let img_buf = meta.read(&view).await?;
    1186       173688 :             next_batch.push_back((
    1187       173688 :                 meta.meta.key,
    1188       173688 :                 self.image_layer.lsn,
    1189       173688 :                 Value::Image(img_buf.into_bytes()),
    1190       173688 :             ));
    1191              :         }
    1192       114780 :         self.key_values_batch = next_batch;
    1193       114780 :         Ok(())
    1194       114780 :     }
    1195              : 
    1196       172968 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
    1197       172968 :         if self.key_values_batch.is_empty() {
    1198       115248 :             if self.is_end {
    1199          972 :                 return Ok(None);
    1200       114276 :             }
    1201       114276 :             self.next_batch().await?;
    1202        57720 :         }
    1203       171996 :         Ok(Some(
    1204       171996 :             self.key_values_batch
    1205       171996 :                 .pop_front()
    1206       171996 :                 .expect("should not be empty"),
    1207       171996 :         ))
    1208       172968 :     }
    1209              : }
    1210              : 
    1211              : #[cfg(test)]
    1212              : mod test {
    1213              :     use std::sync::Arc;
    1214              :     use std::time::Duration;
    1215              : 
    1216              :     use bytes::Bytes;
    1217              :     use itertools::Itertools;
    1218              :     use pageserver_api::key::Key;
    1219              :     use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
    1220              :     use pageserver_api::value::Value;
    1221              :     use utils::generation::Generation;
    1222              :     use utils::id::{TenantId, TimelineId};
    1223              :     use utils::lsn::Lsn;
    1224              : 
    1225              :     use super::{ImageLayerIterator, ImageLayerWriter};
    1226              :     use crate::DEFAULT_PG_VERSION;
    1227              :     use crate::context::RequestContext;
    1228              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
    1229              :     use crate::tenant::storage_layer::{Layer, ResidentLayer};
    1230              :     use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
    1231              :     use crate::tenant::{TenantShard, Timeline};
    1232              : 
    1233              :     #[tokio::test]
    1234           12 :     async fn image_layer_rewrite() {
    1235           12 :         let tenant_conf = pageserver_api::models::TenantConfig {
    1236           12 :             gc_period: Some(Duration::ZERO),
    1237           12 :             compaction_period: Some(Duration::ZERO),
    1238           12 :             ..Default::default()
    1239           12 :         };
    1240           12 :         let tenant_id = TenantId::generate();
    1241           12 :         let mut gen_ = Generation::new(0xdead0001);
    1242           60 :         let mut get_next_gen = || {
    1243           60 :             let ret = gen_;
    1244           60 :             gen_ = gen_.next();
    1245           60 :             ret
    1246           60 :         };
    1247           12 :         // The LSN at which we will create an image layer to filter
    1248           12 :         let lsn = Lsn(0xdeadbeef0000);
    1249           12 :         let timeline_id = TimelineId::generate();
    1250           12 : 
    1251           12 :         //
    1252           12 :         // Create an unsharded parent with a layer.
    1253           12 :         //
    1254           12 : 
    1255           12 :         let harness = TenantHarness::create_custom(
    1256           12 :             "test_image_layer_rewrite--parent",
    1257           12 :             tenant_conf.clone(),
    1258           12 :             tenant_id,
    1259           12 :             ShardIdentity::unsharded(),
    1260           12 :             get_next_gen(),
    1261           12 :         )
    1262           12 :         .await
    1263           12 :         .unwrap();
    1264           12 :         let (tenant, ctx) = harness.load().await;
    1265           12 :         let timeline = tenant
    1266           12 :             .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1267           12 :             .await
    1268           12 :             .unwrap();
    1269           12 : 
    1270           12 :         // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
    1271           12 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1272           12 :         let input_end = Key::from_hex("000000067f00000001000000ae0000002000").unwrap();
    1273           12 :         let range = input_start..input_end;
    1274           12 : 
    1275           12 :         // Build an image layer to filter
    1276           12 :         let resident = {
    1277           12 :             let mut writer = ImageLayerWriter::new(
    1278           12 :                 harness.conf,
    1279           12 :                 timeline_id,
    1280           12 :                 harness.tenant_shard_id,
    1281           12 :                 &range,
    1282           12 :                 lsn,
    1283           12 :                 &timeline.gate,
    1284           12 :                 timeline.cancel.clone(),
    1285           12 :                 &ctx,
    1286           12 :             )
    1287           12 :             .await
    1288           12 :             .unwrap();
    1289           12 : 
    1290           12 :             let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
    1291           12 :             let mut key = range.start;
    1292        98316 :             while key < range.end {
    1293        98304 :                 writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
    1294        98304 : 
    1295        98304 :                 key = key.next();
    1296           12 :             }
    1297           12 :             let (desc, path) = writer.finish(&ctx).await.unwrap();
    1298           12 :             Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
    1299           12 :         };
    1300           12 :         let original_size = resident.metadata().file_size;
    1301           12 : 
    1302           12 :         //
    1303           12 :         // Create child shards and do the rewrite, exercising filter().
    1304           12 :         // TODO: abstraction in TenantHarness for splits.
    1305           12 :         //
    1306           12 : 
    1307           12 :         // Filter for various shards: this exercises cases like values at start of key range, end of key
    1308           12 :         // range, middle of key range.
    1309           12 :         let shard_count = ShardCount::new(4);
    1310           48 :         for shard_number in 0..shard_count.count() {
    1311           12 :             //
    1312           12 :             // mimic the shard split
    1313           12 :             //
    1314           48 :             let shard_identity = ShardIdentity::new(
    1315           48 :                 ShardNumber(shard_number),
    1316           48 :                 shard_count,
    1317           48 :                 ShardStripeSize(0x800),
    1318           48 :             )
    1319           48 :             .unwrap();
    1320           48 :             let harness = TenantHarness::create_custom(
    1321           48 :                 Box::leak(Box::new(format!(
    1322           48 :                     "test_image_layer_rewrite--child{}",
    1323           48 :                     shard_identity.shard_slug()
    1324           48 :                 ))),
    1325           48 :                 tenant_conf.clone(),
    1326           48 :                 tenant_id,
    1327           48 :                 shard_identity,
    1328           48 :                 // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
    1329           48 :                 // But here, all we care about is that the gen number is unique.
    1330           48 :                 get_next_gen(),
    1331           48 :             )
    1332           48 :             .await
    1333           48 :             .unwrap();
    1334           48 :             let (tenant, ctx) = harness.load().await;
    1335           48 :             let timeline = tenant
    1336           48 :                 .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1337           48 :                 .await
    1338           48 :                 .unwrap();
    1339           12 : 
    1340           12 :             //
    1341           12 :             // use filter() and make assertions
    1342           12 :             //
    1343           12 : 
    1344           48 :             let mut filtered_writer = ImageLayerWriter::new(
    1345           48 :                 harness.conf,
    1346           48 :                 timeline_id,
    1347           48 :                 harness.tenant_shard_id,
    1348           48 :                 &range,
    1349           48 :                 lsn,
    1350           48 :                 &timeline.gate,
    1351           48 :                 timeline.cancel.clone(),
    1352           48 :                 &ctx,
    1353           48 :             )
    1354           48 :             .await
    1355           48 :             .unwrap();
    1356           12 : 
    1357           48 :             let wrote_keys = resident
    1358           48 :                 .filter(&shard_identity, &mut filtered_writer, &ctx)
    1359           48 :                 .await
    1360           48 :                 .unwrap();
    1361           48 :             let replacement = if wrote_keys > 0 {
    1362           36 :                 let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
    1363           36 :                 let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
    1364           36 :                 Some(resident)
    1365           12 :             } else {
    1366           12 :                 None
    1367           12 :             };
    1368           12 : 
    1369           12 :             // This exact size and those below will need updating as/when the layer encoding changes, but
    1370           12 :             // should be deterministic for a given version of the format, as we used no randomness generating the input.
    1371           48 :             assert_eq!(original_size, 122880);
    1372           12 : 
    1373           48 :             match shard_number {
    1374           12 :                 0 => {
    1375           12 :                     // We should have written out just one stripe for our shard identity
    1376           12 :                     assert_eq!(wrote_keys, 0x800);
    1377           12 :                     let replacement = replacement.unwrap();
    1378           12 : 
    1379           12 :                     // We should have dropped some of the data
    1380           12 :                     assert!(replacement.metadata().file_size < original_size);
    1381           12 :                     assert!(replacement.metadata().file_size > 0);
    1382           12 : 
    1383           12 :                     // Assert that we dropped ~3/4 of the data.
    1384           12 :                     assert_eq!(replacement.metadata().file_size, 49152);
    1385           12 :                 }
    1386           12 :                 1 => {
    1387           12 :                     // Shard 1 has no keys in our input range
    1388           12 :                     assert_eq!(wrote_keys, 0x0);
    1389           12 :                     assert!(replacement.is_none());
    1390           12 :                 }
    1391           12 :                 2 => {
    1392           12 :                     // Shard 2 has one stripes in the input range
    1393           12 :                     assert_eq!(wrote_keys, 0x800);
    1394           12 :                     let replacement = replacement.unwrap();
    1395           12 :                     assert!(replacement.metadata().file_size < original_size);
    1396           12 :                     assert!(replacement.metadata().file_size > 0);
    1397           12 :                     assert_eq!(replacement.metadata().file_size, 49152);
    1398           12 :                 }
    1399           12 :                 3 => {
    1400           12 :                     // Shard 3 has two stripes in the input range
    1401           12 :                     assert_eq!(wrote_keys, 0x1000);
    1402           12 :                     let replacement = replacement.unwrap();
    1403           12 :                     assert!(replacement.metadata().file_size < original_size);
    1404           12 :                     assert!(replacement.metadata().file_size > 0);
    1405           12 :                     assert_eq!(replacement.metadata().file_size, 73728);
    1406           12 :                 }
    1407           12 :                 _ => unreachable!(),
    1408           12 :             }
    1409           12 :         }
    1410           12 :     }
    1411              : 
    1412           12 :     async fn produce_image_layer(
    1413           12 :         tenant: &TenantShard,
    1414           12 :         tline: &Arc<Timeline>,
    1415           12 :         mut images: Vec<(Key, Bytes)>,
    1416           12 :         lsn: Lsn,
    1417           12 :         ctx: &RequestContext,
    1418           12 :     ) -> anyhow::Result<ResidentLayer> {
    1419           12 :         images.sort();
    1420           12 :         let (key_start, _) = images.first().unwrap();
    1421           12 :         let (key_last, _) = images.last().unwrap();
    1422           12 :         let key_end = key_last.next();
    1423           12 :         let key_range = *key_start..key_end;
    1424           12 :         let mut writer = ImageLayerWriter::new(
    1425           12 :             tenant.conf,
    1426           12 :             tline.timeline_id,
    1427           12 :             tenant.tenant_shard_id,
    1428           12 :             &key_range,
    1429           12 :             lsn,
    1430           12 :             &tline.gate,
    1431           12 :             tline.cancel.clone(),
    1432           12 :             ctx,
    1433           12 :         )
    1434           12 :         .await?;
    1435              : 
    1436        12012 :         for (key, img) in images {
    1437        12000 :             writer.put_image(key, img, ctx).await?;
    1438              :         }
    1439           12 :         let (desc, path) = writer.finish(ctx).await?;
    1440           12 :         let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
    1441              : 
    1442           12 :         Ok::<_, anyhow::Error>(img_layer)
    1443           12 :     }
    1444              : 
    1445          168 :     async fn assert_img_iter_equal(
    1446          168 :         img_iter: &mut ImageLayerIterator<'_>,
    1447          168 :         expect: &[(Key, Bytes)],
    1448          168 :         expect_lsn: Lsn,
    1449          168 :     ) {
    1450          168 :         let mut expect_iter = expect.iter();
    1451              :         loop {
    1452       168168 :             let o1 = img_iter.next().await.unwrap();
    1453       168168 :             let o2 = expect_iter.next();
    1454       168168 :             match (o1, o2) {
    1455          168 :                 (None, None) => break,
    1456       168000 :                 (Some((k1, l1, v1)), Some((k2, i2))) => {
    1457       168000 :                     let Value::Image(i1) = v1 else {
    1458            0 :                         panic!("expect Value::Image")
    1459              :                     };
    1460       168000 :                     assert_eq!(&k1, k2);
    1461       168000 :                     assert_eq!(l1, expect_lsn);
    1462       168000 :                     assert_eq!(&i1, i2);
    1463              :                 }
    1464            0 :                 (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
    1465              :             }
    1466              :         }
    1467          168 :     }
    1468              : 
    1469              :     #[tokio::test]
    1470           12 :     async fn image_layer_iterator() {
    1471           12 :         let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
    1472           12 :         let (tenant, ctx) = harness.load().await;
    1473           12 : 
    1474           12 :         let tline = tenant
    1475           12 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    1476           12 :             .await
    1477           12 :             .unwrap();
    1478           12 : 
    1479        12000 :         fn get_key(id: u32) -> Key {
    1480        12000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
    1481        12000 :             key.field6 = id;
    1482        12000 :             key
    1483        12000 :         }
    1484           12 :         const N: usize = 1000;
    1485           12 :         let test_imgs = (0..N)
    1486        12000 :             .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
    1487           12 :             .collect_vec();
    1488           12 :         let resident_layer =
    1489           12 :             produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
    1490           12 :                 .await
    1491           12 :                 .unwrap();
    1492           12 :         let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
    1493           36 :         for max_read_size in [1, 1024] {
    1494          192 :             for batch_size in [1, 2, 4, 8, 3, 7, 13] {
    1495          168 :                 println!("running with batch_size={batch_size} max_read_size={max_read_size}");
    1496          168 :                 // Test if the batch size is correctly determined
    1497          168 :                 let mut iter = img_layer.iter(&ctx);
    1498          168 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1499          168 :                 let mut num_items = 0;
    1500          672 :                 for _ in 0..3 {
    1501          504 :                     iter.next_batch().await.unwrap();
    1502          504 :                     num_items += iter.key_values_batch.len();
    1503          504 :                     if max_read_size == 1 {
    1504           12 :                         // every key should be a batch b/c the value is larger than max_read_size
    1505          252 :                         assert_eq!(iter.key_values_batch.len(), 1);
    1506           12 :                     } else {
    1507          252 :                         assert!(iter.key_values_batch.len() <= batch_size);
    1508           12 :                     }
    1509          504 :                     if num_items >= N {
    1510           12 :                         break;
    1511          504 :                     }
    1512          504 :                     iter.key_values_batch.clear();
    1513           12 :                 }
    1514           12 :                 // Test if the result is correct
    1515          168 :                 let mut iter = img_layer.iter(&ctx);
    1516          168 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1517          168 :                 assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
    1518           12 :             }
    1519           12 :         }
    1520           12 :     }
    1521              : }
        

Generated by: LCOV version 2.1-beta