LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - image_layer.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 80.0 % 1022 818
Test Date: 2025-05-26 10:37:33 Functions: 58.4 % 89 52

            Line data    Source code
       1              : //! An ImageLayer represents an image or a snapshot of a key-range at
       2              : //! one particular LSN.
       3              : //!
       4              : //! It contains an image of all key-value pairs in its key-range. Any key
       5              : //! that falls into the image layer's range but does not exist in the layer,
       6              : //! does not exist.
       7              : //!
       8              : //! An image layer is stored in a file on disk. The file is stored in
       9              : //! timelines/<timeline_id> directory.  Currently, there are no
      10              : //! subdirectories, and each image layer file is named like this:
      11              : //!
      12              : //! ```text
      13              : //!    <key start>-<key end>__<LSN>
      14              : //! ```
      15              : //!
      16              : //! For example:
      17              : //!
      18              : //! ```text
      19              : //!    000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
      20              : //! ```
      21              : //!
      22              : //! Every image layer file consists of three parts: "summary",
      23              : //! "index", and "values".  The summary is a fixed size header at the
      24              : //! beginning of the file, and it contains basic information about the
      25              : //! layer, and offsets to the other parts. The "index" is a B-tree,
      26              : //! mapping from Key to an offset in the "values" part.  The
      27              : //! actual page images are stored in the "values" part.
      28              : use std::collections::{HashMap, VecDeque};
      29              : use std::fs::File;
      30              : use std::ops::Range;
      31              : use std::os::unix::prelude::FileExt;
      32              : use std::str::FromStr;
      33              : use std::sync::Arc;
      34              : use std::sync::atomic::AtomicU64;
      35              : 
      36              : use anyhow::{Context, Result, bail, ensure};
      37              : use bytes::Bytes;
      38              : use camino::{Utf8Path, Utf8PathBuf};
      39              : use hex;
      40              : use itertools::Itertools;
      41              : use pageserver_api::config::MaxVectoredReadBytes;
      42              : use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
      43              : use pageserver_api::keyspace::KeySpace;
      44              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
      45              : use pageserver_api::value::Value;
      46              : use serde::{Deserialize, Serialize};
      47              : use tokio::sync::OnceCell;
      48              : use tokio_stream::StreamExt;
      49              : use tokio_util::sync::CancellationToken;
      50              : use tracing::*;
      51              : use utils::bin_ser::BeSer;
      52              : use utils::bin_ser::SerializeError;
      53              : use utils::id::{TenantId, TimelineId};
      54              : use utils::lsn::Lsn;
      55              : 
      56              : use super::errors::PutError;
      57              : use super::layer_name::ImageLayerName;
      58              : use super::{
      59              :     AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
      60              :     ValuesReconstructState,
      61              : };
      62              : use crate::config::PageServerConf;
      63              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      64              : use crate::page_cache::{self, FileId, PAGE_SZ};
      65              : use crate::tenant::blob_io::BlobWriter;
      66              : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
      67              : use crate::tenant::disk_btree::{
      68              :     DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
      69              : };
      70              : use crate::tenant::timeline::GetVectoredError;
      71              : use crate::tenant::vectored_blob_io::{
      72              :     BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
      73              :     VectoredReadPlanner,
      74              : };
      75              : use crate::virtual_file::TempVirtualFile;
      76              : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
      77              : use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
      78              : use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
      79              : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
      80              : 
      81              : ///
      82              : /// Header stored in the beginning of the file
      83              : ///
      84              : /// After this comes the 'values' part, starting on block 1. After that,
      85              : /// the 'index' starts at the block indicated by 'index_start_blk'
      86              : ///
      87            0 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      88              : pub struct Summary {
      89              :     /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
      90              :     pub magic: u16,
      91              :     pub format_version: u16,
      92              : 
      93              :     pub tenant_id: TenantId,
      94              :     pub timeline_id: TimelineId,
      95              :     pub key_range: Range<Key>,
      96              :     pub lsn: Lsn,
      97              : 
      98              :     /// Block number where the 'index' part of the file begins.
      99              :     pub index_start_blk: u32,
     100              :     /// Block within the 'index', where the B-tree root page is stored
     101              :     pub index_root_blk: u32,
     102              :     // the 'values' part starts after the summary header, on block 1.
     103              : }
     104              : 
     105              : impl From<&ImageLayer> for Summary {
     106            0 :     fn from(layer: &ImageLayer) -> Self {
     107            0 :         Self::expected(
     108            0 :             layer.desc.tenant_shard_id.tenant_id,
     109            0 :             layer.desc.timeline_id,
     110            0 :             layer.desc.key_range.clone(),
     111            0 :             layer.lsn,
     112            0 :         )
     113            0 :     }
     114              : }
     115              : 
     116              : impl Summary {
     117              :     /// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
     118          191 :     pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
     119          191 :         let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
     120          191 :         Self::ser_into(self, &mut buf)?;
     121              :         // Pad zeroes to the buffer so the length is a multiple of the alignment.
     122          191 :         buf.extend_with(0, buf.capacity() - buf.len());
     123          191 :         Ok(buf.freeze())
     124          191 :     }
     125              : 
     126           75 :     pub(super) fn expected(
     127           75 :         tenant_id: TenantId,
     128           75 :         timeline_id: TimelineId,
     129           75 :         key_range: Range<Key>,
     130           75 :         lsn: Lsn,
     131           75 :     ) -> Self {
     132           75 :         Self {
     133           75 :             magic: IMAGE_FILE_MAGIC,
     134           75 :             format_version: STORAGE_FORMAT_VERSION,
     135           75 :             tenant_id,
     136           75 :             timeline_id,
     137           75 :             key_range,
     138           75 :             lsn,
     139           75 : 
     140           75 :             index_start_blk: 0,
     141           75 :             index_root_blk: 0,
     142           75 :         }
     143           75 :     }
     144              : }
     145              : 
     146              : /// This is used only from `pagectl`. Within pageserver, all layers are
     147              : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
     148              : pub struct ImageLayer {
     149              :     path: Utf8PathBuf,
     150              :     pub desc: PersistentLayerDesc,
     151              :     // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
     152              :     pub lsn: Lsn,
     153              :     inner: OnceCell<ImageLayerInner>,
     154              : }
     155              : 
     156              : impl std::fmt::Debug for ImageLayer {
     157            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     158              :         use super::RangeDisplayDebug;
     159              : 
     160            0 :         f.debug_struct("ImageLayer")
     161            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     162            0 :             .field("file_size", &self.desc.file_size)
     163            0 :             .field("lsn", &self.lsn)
     164            0 :             .field("inner", &self.inner)
     165            0 :             .finish()
     166            0 :     }
     167              : }
     168              : 
     169              : /// ImageLayer is the in-memory data structure associated with an on-disk image
     170              : /// file.
     171              : pub struct ImageLayerInner {
     172              :     // values copied from summary
     173              :     index_start_blk: u32,
     174              :     index_root_blk: u32,
     175              : 
     176              :     key_range: Range<Key>,
     177              :     lsn: Lsn,
     178              : 
     179              :     file: Arc<VirtualFile>,
     180              :     file_id: FileId,
     181              : 
     182              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     183              : }
     184              : 
     185              : impl ImageLayerInner {
     186            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
     187            0 :         format!(
     188            0 :             "image {}..{} {}",
     189            0 :             self.key_range().start,
     190            0 :             self.key_range().end,
     191            0 :             self.lsn()
     192            0 :         )
     193            0 :     }
     194              : }
     195              : 
     196              : impl std::fmt::Debug for ImageLayerInner {
     197            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     198            0 :         f.debug_struct("ImageLayerInner")
     199            0 :             .field("index_start_blk", &self.index_start_blk)
     200            0 :             .field("index_root_blk", &self.index_root_blk)
     201            0 :             .finish()
     202            0 :     }
     203              : }
     204              : 
     205              : impl ImageLayerInner {
     206            0 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
     207            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     208            0 :         let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
     209            0 :             self.index_start_blk,
     210            0 :             self.index_root_blk,
     211            0 :             block_reader,
     212            0 :         );
     213            0 : 
     214            0 :         tree_reader.dump(ctx).await?;
     215              : 
     216            0 :         tree_reader
     217            0 :             .visit(
     218            0 :                 &[0u8; KEY_SIZE],
     219            0 :                 VisitDirection::Forwards,
     220            0 :                 |key, value| {
     221            0 :                     println!("key: {} offset {}", hex::encode(key), value);
     222            0 :                     true
     223            0 :                 },
     224            0 :                 ctx,
     225            0 :             )
     226            0 :             .await?;
     227              : 
     228            0 :         Ok(())
     229            0 :     }
     230              : }
     231              : 
     232              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     233              : impl std::fmt::Display for ImageLayer {
     234            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     235            0 :         write!(f, "{}", self.layer_desc().short_id())
     236            0 :     }
     237              : }
     238              : 
     239              : impl AsLayerDesc for ImageLayer {
     240            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     241            0 :         &self.desc
     242            0 :     }
     243              : }
     244              : 
     245              : impl ImageLayer {
     246            0 :     pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     247            0 :         self.desc.dump();
     248            0 : 
     249            0 :         if !verbose {
     250            0 :             return Ok(());
     251            0 :         }
     252              : 
     253            0 :         let inner = self.load(ctx).await?;
     254              : 
     255            0 :         inner.dump(ctx).await?;
     256              : 
     257            0 :         Ok(())
     258            0 :     }
     259              : 
     260          314 :     fn temp_path_for(
     261          314 :         conf: &PageServerConf,
     262          314 :         timeline_id: TimelineId,
     263          314 :         tenant_shard_id: TenantShardId,
     264          314 :         fname: &ImageLayerName,
     265          314 :     ) -> Utf8PathBuf {
     266              :         // TempVirtualFile requires us to never reuse a filename while an old
     267              :         // instance of TempVirtualFile created with that filename is not done dropping yet.
     268              :         // So, we use a monotonic counter to disambiguate the filenames.
     269              :         static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
     270          314 :         let filename_disambiguator =
     271          314 :             NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
     272          314 : 
     273          314 :         conf.timeline_path(&tenant_shard_id, &timeline_id)
     274          314 :             .join(format!(
     275          314 :                 "{fname}.{:x}.{TEMP_FILE_SUFFIX}",
     276          314 :                 filename_disambiguator
     277          314 :             ))
     278          314 :     }
     279              : 
     280              :     ///
     281              :     /// Open the underlying file and read the metadata into memory, if it's
     282              :     /// not loaded already.
     283              :     ///
     284            0 :     async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
     285            0 :         self.inner
     286            0 :             .get_or_try_init(|| self.load_inner(ctx))
     287            0 :             .await
     288            0 :             .with_context(|| format!("Failed to load image layer {}", self.path()))
     289            0 :     }
     290              : 
     291            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
     292            0 :         let path = self.path();
     293              : 
     294            0 :         let loaded =
     295            0 :             ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
     296              : 
     297              :         // not production code
     298            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     299            0 :         let expected_layer_name = self.layer_desc().layer_name();
     300            0 : 
     301            0 :         if actual_layer_name != expected_layer_name {
     302            0 :             println!("warning: filename does not match what is expected from in-file summary");
     303            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     304            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     305            0 :         }
     306              : 
     307            0 :         Ok(loaded)
     308            0 :     }
     309              : 
     310              :     /// Create an ImageLayer struct representing an existing file on disk.
     311              :     ///
     312              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     313            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
     314            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     315            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     316            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     317            0 :         let metadata = file
     318            0 :             .metadata()
     319            0 :             .context("get file metadata to determine size")?;
     320              : 
     321              :         // This function is never used for constructing layers in a running pageserver,
     322              :         // so it does not need an accurate TenantShardId.
     323            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     324            0 : 
     325            0 :         Ok(ImageLayer {
     326            0 :             path: path.to_path_buf(),
     327            0 :             desc: PersistentLayerDesc::new_img(
     328            0 :                 tenant_shard_id,
     329            0 :                 summary.timeline_id,
     330            0 :                 summary.key_range,
     331            0 :                 summary.lsn,
     332            0 :                 metadata.len(),
     333            0 :             ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
     334            0 :             lsn: summary.lsn,
     335            0 :             inner: OnceCell::new(),
     336            0 :         })
     337            0 :     }
     338              : 
     339            0 :     fn path(&self) -> Utf8PathBuf {
     340            0 :         self.path.clone()
     341            0 :     }
     342              : }
     343              : 
     344              : #[derive(thiserror::Error, Debug)]
     345              : pub enum RewriteSummaryError {
     346              :     #[error("magic mismatch")]
     347              :     MagicMismatch,
     348              :     #[error(transparent)]
     349              :     Other(#[from] anyhow::Error),
     350              : }
     351              : 
     352              : impl From<std::io::Error> for RewriteSummaryError {
     353            0 :     fn from(e: std::io::Error) -> Self {
     354            0 :         Self::Other(anyhow::anyhow!(e))
     355            0 :     }
     356              : }
     357              : 
     358              : impl ImageLayer {
     359            0 :     pub async fn rewrite_summary<F>(
     360            0 :         path: &Utf8Path,
     361            0 :         rewrite: F,
     362            0 :         ctx: &RequestContext,
     363            0 :     ) -> Result<(), RewriteSummaryError>
     364            0 :     where
     365            0 :         F: Fn(Summary) -> Summary,
     366            0 :     {
     367            0 :         let file = VirtualFile::open_with_options_v2(
     368            0 :             path,
     369            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     370            0 :             ctx,
     371            0 :         )
     372            0 :         .await
     373            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     374            0 :         let file_id = page_cache::next_file_id();
     375            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     376            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     377            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     378            0 :         if actual_summary.magic != IMAGE_FILE_MAGIC {
     379            0 :             return Err(RewriteSummaryError::MagicMismatch);
     380            0 :         }
     381            0 : 
     382            0 :         let new_summary = rewrite(actual_summary);
     383              : 
     384            0 :         let buf = new_summary.ser_into_page().context("serialize")?;
     385            0 :         let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
     386            0 :         res?;
     387            0 :         Ok(())
     388            0 :     }
     389              : }
     390              : 
     391              : impl ImageLayerInner {
     392           70 :     pub(crate) fn key_range(&self) -> &Range<Key> {
     393           70 :         &self.key_range
     394           70 :     }
     395              : 
     396           70 :     pub(crate) fn lsn(&self) -> Lsn {
     397           70 :         self.lsn
     398           70 :     }
     399              : 
     400           75 :     pub(super) async fn load(
     401           75 :         path: &Utf8Path,
     402           75 :         lsn: Lsn,
     403           75 :         summary: Option<Summary>,
     404           75 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     405           75 :         ctx: &RequestContext,
     406           75 :     ) -> anyhow::Result<Self> {
     407           75 :         let file = Arc::new(
     408           75 :             VirtualFile::open_v2(path, ctx)
     409           75 :                 .await
     410           75 :                 .context("open layer file")?,
     411              :         );
     412           75 :         let file_id = page_cache::next_file_id();
     413           75 :         let block_reader = FileBlockReader::new(&file, file_id);
     414           75 :         let summary_blk = block_reader
     415           75 :             .read_blk(0, ctx)
     416           75 :             .await
     417           75 :             .context("read first block")?;
     418              : 
     419              :         // length is the only way how this could fail, so it's not actually likely at all unless
     420              :         // read_blk returns wrong sized block.
     421              :         //
     422              :         // TODO: confirm and make this into assertion
     423           75 :         let actual_summary =
     424           75 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     425              : 
     426           75 :         if let Some(mut expected_summary) = summary {
     427              :             // production code path
     428           75 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     429           75 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     430           75 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     431           75 :             expected_summary.timeline_id = actual_summary.timeline_id;
     432           75 : 
     433           75 :             if actual_summary != expected_summary {
     434            0 :                 bail!(
     435            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     436            0 :                     actual_summary,
     437            0 :                     expected_summary
     438            0 :                 );
     439           75 :             }
     440            0 :         }
     441              : 
     442           75 :         Ok(ImageLayerInner {
     443           75 :             index_start_blk: actual_summary.index_start_blk,
     444           75 :             index_root_blk: actual_summary.index_root_blk,
     445           75 :             lsn,
     446           75 :             file,
     447           75 :             file_id,
     448           75 :             max_vectored_read_bytes,
     449           75 :             key_range: actual_summary.key_range,
     450           75 :         })
     451           75 :     }
     452              : 
     453              :     // Look up the keys in the provided keyspace and update
     454              :     // the reconstruct state with whatever is found.
     455        15126 :     pub(super) async fn get_values_reconstruct_data(
     456        15126 :         &self,
     457        15126 :         this: ResidentLayer,
     458        15126 :         keyspace: KeySpace,
     459        15126 :         reconstruct_state: &mut ValuesReconstructState,
     460        15126 :         ctx: &RequestContext,
     461        15126 :     ) -> Result<(), GetVectoredError> {
     462        15126 :         let reads = self
     463        15126 :             .plan_reads(keyspace, None, ctx)
     464        15126 :             .await
     465        15126 :             .map_err(GetVectoredError::Other)?;
     466              : 
     467        15126 :         self.do_reads_and_update_state(this, reads, reconstruct_state, ctx)
     468        15126 :             .await;
     469              : 
     470        15126 :         reconstruct_state.on_image_layer_visited(&self.key_range);
     471        15126 : 
     472        15126 :         Ok(())
     473        15126 :     }
     474              : 
     475              :     /// Traverse the layer's index to build read operations on the overlap of the input keyspace
     476              :     /// and the keys in this layer.
     477              :     ///
     478              :     /// If shard_identity is provided, it will be used to filter keys down to those stored on
     479              :     /// this shard.
     480        15130 :     async fn plan_reads(
     481        15130 :         &self,
     482        15130 :         keyspace: KeySpace,
     483        15130 :         shard_identity: Option<&ShardIdentity>,
     484        15130 :         ctx: &RequestContext,
     485        15130 :     ) -> anyhow::Result<Vec<VectoredRead>> {
     486        15130 :         let mut planner = VectoredReadPlanner::new(
     487        15130 :             self.max_vectored_read_bytes
     488        15130 :                 .expect("Layer is loaded with max vectored bytes config")
     489        15130 :                 .0
     490        15130 :                 .into(),
     491        15130 :         );
     492        15130 : 
     493        15130 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     494        15130 :         let tree_reader =
     495        15130 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     496        15130 : 
     497        15130 :         let ctx = RequestContextBuilder::from(ctx)
     498        15130 :             .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     499        15130 :             .attached_child();
     500              : 
     501        22429 :         for range in keyspace.ranges.iter() {
     502        22429 :             let mut range_end_handled = false;
     503        22429 :             let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     504        22429 :             range.start.write_to_byte_slice(&mut search_key);
     505        22429 : 
     506        22429 :             let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
     507        22429 :             let mut index_stream = std::pin::pin!(index_stream);
     508              : 
     509       100795 :             while let Some(index_entry) = index_stream.next().await {
     510       100412 :                 let (raw_key, offset) = index_entry?;
     511              : 
     512       100412 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     513       100412 :                 assert!(key >= range.start);
     514              : 
     515       100412 :                 let flag = if let Some(shard_identity) = shard_identity {
     516        32768 :                     if shard_identity.is_key_disposable(&key) {
     517        24576 :                         BlobFlag::Ignore
     518              :                     } else {
     519         8192 :                         BlobFlag::None
     520              :                     }
     521              :                 } else {
     522        67644 :                     BlobFlag::None
     523              :                 };
     524              : 
     525       100412 :                 if key >= range.end {
     526        22046 :                     planner.handle_range_end(offset);
     527        22046 :                     range_end_handled = true;
     528        22046 :                     break;
     529        78366 :                 } else {
     530        78366 :                     planner.handle(key, self.lsn, offset, flag);
     531        78366 :                 }
     532              :             }
     533              : 
     534        22429 :             if !range_end_handled {
     535          383 :                 let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
     536          383 :                 planner.handle_range_end(payload_end);
     537        22046 :             }
     538              :         }
     539              : 
     540        15130 :         Ok(planner.finish())
     541        15130 :     }
     542              : 
     543              :     /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
     544              :     /// then execute vectored GET operations, passing the results of all read keys into the writer.
     545            4 :     pub(super) async fn filter(
     546            4 :         &self,
     547            4 :         shard_identity: &ShardIdentity,
     548            4 :         writer: &mut ImageLayerWriter,
     549            4 :         ctx: &RequestContext,
     550            4 :     ) -> anyhow::Result<usize> {
     551              :         // Fragment the range into the regions owned by this ShardIdentity
     552            4 :         let plan = self
     553            4 :             .plan_reads(
     554            4 :                 KeySpace {
     555            4 :                     // If asked for the total key space, plan_reads will give us all the keys in the layer
     556            4 :                     ranges: vec![Key::MIN..Key::MAX],
     557            4 :                 },
     558            4 :                 Some(shard_identity),
     559            4 :                 ctx,
     560            4 :             )
     561            4 :             .await?;
     562              : 
     563            4 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     564            4 :         let mut key_count = 0;
     565            4 :         for read in plan.into_iter() {
     566            4 :             let buf_size = read.size();
     567            4 : 
     568            4 :             let buf = IoBufferMut::with_capacity(buf_size);
     569            4 :             let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
     570              : 
     571            4 :             let view = BufView::new_slice(&blobs_buf.buf);
     572              : 
     573         8192 :             for meta in blobs_buf.blobs.iter() {
     574              :                 // Just read the raw header+data and pass it through to the target layer, without
     575              :                 // decoding and recompressing it.
     576         8192 :                 let raw = meta.raw_with_header(&view);
     577         8192 :                 key_count += 1;
     578         8192 :                 writer
     579         8192 :                     .put_image_raw(meta.meta.key, raw.into_bytes(), ctx)
     580         8192 :                     .await
     581         8192 :                     .context(format!("Storing key {}", meta.meta.key))?;
     582              :             }
     583              :         }
     584              : 
     585            4 :         Ok(key_count)
     586            4 :     }
     587              : 
     588        15126 :     async fn do_reads_and_update_state(
     589        15126 :         &self,
     590        15126 :         this: ResidentLayer,
     591        15126 :         reads: Vec<VectoredRead>,
     592        15126 :         reconstruct_state: &mut ValuesReconstructState,
     593        15126 :         ctx: &RequestContext,
     594        15126 :     ) {
     595        15126 :         let max_vectored_read_bytes = self
     596        15126 :             .max_vectored_read_bytes
     597        15126 :             .expect("Layer is loaded with max vectored bytes config")
     598        15126 :             .0
     599        15126 :             .into();
     600              : 
     601        17272 :         for read in reads.into_iter() {
     602        17272 :             let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
     603        45598 :             for (_, blob_meta) in read.blobs_at.as_slice() {
     604        45598 :                 let io = reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true);
     605        45598 :                 ios.insert((blob_meta.key, blob_meta.lsn), io);
     606        45598 :             }
     607              : 
     608        17272 :             let buf_size = read.size();
     609        17272 : 
     610        17272 :             if buf_size > max_vectored_read_bytes {
     611              :                 // If the read is oversized, it should only contain one key.
     612            0 :                 let offenders = read
     613            0 :                     .blobs_at
     614            0 :                     .as_slice()
     615            0 :                     .iter()
     616            0 :                     .filter_map(|(_, blob_meta)| {
     617            0 :                         if blob_meta.key.is_rel_dir_key()
     618            0 :                             || blob_meta.key == DBDIR_KEY
     619            0 :                             || blob_meta.key.is_aux_file_key()
     620              :                         {
     621              :                             // The size of values for these keys is unbounded and can
     622              :                             // grow very large in pathological cases.
     623            0 :                             None
     624              :                         } else {
     625            0 :                             Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
     626              :                         }
     627            0 :                     })
     628            0 :                     .join(", ");
     629            0 : 
     630            0 :                 if !offenders.is_empty() {
     631            0 :                     tracing::warn!(
     632            0 :                         "Oversized vectored read ({} > {}) for keys {}",
     633              :                         buf_size,
     634              :                         max_vectored_read_bytes,
     635              :                         offenders
     636              :                     );
     637            0 :                 }
     638        17272 :             }
     639              : 
     640        17272 :             let read_extend_residency = this.clone();
     641        17272 :             let read_from = self.file.clone();
     642        17272 :             let read_ctx = ctx.attached_child();
     643        17272 :             reconstruct_state
     644        17272 :                 .spawn_io(async move {
     645        17272 :                     let buf = IoBufferMut::with_capacity(buf_size);
     646        17272 :                     let vectored_blob_reader = VectoredBlobReader::new(&read_from);
     647        17272 :                     let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
     648              : 
     649        17272 :                     match res {
     650        17272 :                         Ok(blobs_buf) => {
     651        17272 :                             let view = BufView::new_slice(&blobs_buf.buf);
     652        45598 :                             for meta in blobs_buf.blobs.iter() {
     653        45598 :                                 let io: OnDiskValueIo =
     654        45598 :                                     ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
     655        45598 :                                 let img_buf = meta.read(&view).await;
     656              : 
     657        45598 :                                 let img_buf = match img_buf {
     658        45598 :                                     Ok(img_buf) => img_buf,
     659            0 :                                     Err(e) => {
     660            0 :                                         io.complete(Err(e));
     661            0 :                                         continue;
     662              :                                     }
     663              :                                 };
     664              : 
     665        45598 :                                 io.complete(Ok(OnDiskValue::RawImage(img_buf.into_bytes())));
     666              :                             }
     667              : 
     668        17272 :                             assert!(ios.is_empty());
     669              :                         }
     670            0 :                         Err(err) => {
     671            0 :                             for (_, io) in ios {
     672            0 :                                 io.complete(Err(std::io::Error::new(
     673            0 :                                     err.kind(),
     674            0 :                                     "vec read failed",
     675            0 :                                 )));
     676            0 :                             }
     677              :                         }
     678              :                     }
     679              : 
     680              :                     // keep layer resident until this IO is done; this spawned IO future generally outlives the
     681              :                     // call to `self` / the `Arc<DownloadedLayer>` / the `ResidentLayer` that guarantees residency
     682        17272 :                     drop(read_extend_residency);
     683        17272 :                 })
     684        17272 :                 .await;
     685              :         }
     686        15126 :     }
     687              : 
     688           63 :     pub(crate) fn iter_with_options<'a>(
     689           63 :         &'a self,
     690           63 :         ctx: &'a RequestContext,
     691           63 :         max_read_size: u64,
     692           63 :         max_batch_size: usize,
     693           63 :     ) -> ImageLayerIterator<'a> {
     694           63 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     695           63 :         let tree_reader =
     696           63 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     697           63 :         ImageLayerIterator {
     698           63 :             image_layer: self,
     699           63 :             ctx,
     700           63 :             index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
     701           63 :             key_values_batch: VecDeque::new(),
     702           63 :             is_end: false,
     703           63 :             planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
     704           63 :         }
     705           63 :     }
     706              : 
     707              :     /// NB: not super efficient, but not terrible either. Should prob be an iterator.
     708              :     //
     709              :     // We're reusing the index traversal logical in plan_reads; would be nice to
     710              :     // factor that out.
     711            0 :     pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result<Vec<Key>> {
     712            0 :         let plan = self
     713            0 :             .plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
     714            0 :             .await?;
     715            0 :         Ok(plan
     716            0 :             .into_iter()
     717            0 :             .flat_map(|read| read.blobs_at)
     718            0 :             .map(|(_, blob_meta)| blob_meta.key)
     719            0 :             .collect())
     720            0 :     }
     721              : }
     722              : 
     723              : /// A builder object for constructing a new image layer.
     724              : ///
     725              : /// Usage:
     726              : ///
     727              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     728              : ///
     729              : /// 2. Write the contents by calling `put_page_image` for every key-value
     730              : ///    pair in the key range.
     731              : ///
     732              : /// 3. Call `finish`.
     733              : ///
     734              : struct ImageLayerWriterInner {
     735              :     conf: &'static PageServerConf,
     736              :     path: Utf8PathBuf,
     737              :     timeline_id: TimelineId,
     738              :     tenant_shard_id: TenantShardId,
     739              :     key_range: Range<Key>,
     740              :     lsn: Lsn,
     741              : 
     742              :     // Total uncompressed bytes passed into put_image
     743              :     uncompressed_bytes: u64,
     744              : 
     745              :     // Like `uncompressed_bytes`,
     746              :     // but only of images we might consider for compression
     747              :     uncompressed_bytes_eligible: u64,
     748              : 
     749              :     // Like `uncompressed_bytes`, but only of images
     750              :     // where we have chosen their compressed form
     751              :     uncompressed_bytes_chosen: u64,
     752              : 
     753              :     // Number of keys in the layer.
     754              :     num_keys: usize,
     755              : 
     756              :     blob_writer: BlobWriter<TempVirtualFile>,
     757              :     tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
     758              : 
     759              :     #[cfg(feature = "testing")]
     760              :     last_written_key: Key,
     761              : }
     762              : 
     763              : impl ImageLayerWriterInner {
     764              :     ///
     765              :     /// Start building a new image layer.
     766              :     ///
     767              :     #[allow(clippy::too_many_arguments)]
     768          314 :     async fn new(
     769          314 :         conf: &'static PageServerConf,
     770          314 :         timeline_id: TimelineId,
     771          314 :         tenant_shard_id: TenantShardId,
     772          314 :         key_range: &Range<Key>,
     773          314 :         lsn: Lsn,
     774          314 :         gate: &utils::sync::gate::Gate,
     775          314 :         cancel: CancellationToken,
     776          314 :         ctx: &RequestContext,
     777          314 :     ) -> anyhow::Result<Self> {
     778          314 :         // Create the file initially with a temporary filename.
     779          314 :         // We'll atomically rename it to the final name when we're done.
     780          314 :         let path = ImageLayer::temp_path_for(
     781          314 :             conf,
     782          314 :             timeline_id,
     783          314 :             tenant_shard_id,
     784          314 :             &ImageLayerName {
     785          314 :                 key_range: key_range.clone(),
     786          314 :                 lsn,
     787          314 :             },
     788          314 :         );
     789          314 :         trace!("creating image layer {}", path);
     790          314 :         let file = TempVirtualFile::new(
     791          314 :             VirtualFile::open_with_options_v2(
     792          314 :                 &path,
     793          314 :                 virtual_file::OpenOptions::new()
     794          314 :                     .create_new(true)
     795          314 :                     .write(true),
     796          314 :                 ctx,
     797          314 :             )
     798          314 :             .await?,
     799          314 :             gate.enter()?,
     800              :         );
     801              : 
     802              :         // Start at `PAGE_SZ` to make room for the header block.
     803          314 :         let blob_writer = BlobWriter::new(
     804          314 :             file,
     805          314 :             PAGE_SZ as u64,
     806          314 :             gate,
     807          314 :             cancel,
     808          314 :             ctx,
     809          314 :             info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
     810            0 :         )?;
     811              : 
     812              :         // Initialize the b-tree index builder
     813          314 :         let block_buf = BlockBuf::new();
     814          314 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     815          314 : 
     816          314 :         let writer = Self {
     817          314 :             conf,
     818          314 :             path,
     819          314 :             timeline_id,
     820          314 :             tenant_shard_id,
     821          314 :             key_range: key_range.clone(),
     822          314 :             lsn,
     823          314 :             tree: tree_builder,
     824          314 :             blob_writer,
     825          314 :             uncompressed_bytes: 0,
     826          314 :             uncompressed_bytes_eligible: 0,
     827          314 :             uncompressed_bytes_chosen: 0,
     828          314 :             num_keys: 0,
     829          314 :             #[cfg(feature = "testing")]
     830          314 :             last_written_key: Key::MIN,
     831          314 :         };
     832          314 : 
     833          314 :         Ok(writer)
     834          314 :     }
     835              : 
     836              :     ///
     837              :     /// Write next value to the file.
     838              :     ///
     839              :     /// The page versions must be appended in blknum order.
     840              :     ///
     841        19576 :     async fn put_image(
     842        19576 :         &mut self,
     843        19576 :         key: Key,
     844        19576 :         img: Bytes,
     845        19576 :         ctx: &RequestContext,
     846        19576 :     ) -> Result<(), PutError> {
     847        19576 :         if !self.key_range.contains(&key) {
     848            0 :             return Err(PutError::Other(anyhow::anyhow!(
     849            0 :                 "key {:?} not in range {:?}",
     850            0 :                 key,
     851            0 :                 self.key_range
     852            0 :             )));
     853        19576 :         }
     854        19576 :         let compression = self.conf.image_compression;
     855        19576 :         let uncompressed_len = img.len() as u64;
     856        19576 :         self.uncompressed_bytes += uncompressed_len;
     857        19576 :         self.num_keys += 1;
     858        19576 :         let (_img, res) = self
     859        19576 :             .blob_writer
     860        19576 :             .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
     861        19576 :             .await;
     862              :         // TODO: re-use the buffer for `img` further upstack
     863        19576 :         let (off, compression_info) = res.map_err(PutError::WriteBlob)?;
     864        19576 :         if compression_info.compressed_size.is_some() {
     865         4001 :             // The image has been considered for compression at least
     866         4001 :             self.uncompressed_bytes_eligible += uncompressed_len;
     867        15575 :         }
     868        19576 :         if compression_info.written_compressed {
     869            0 :             // The image has been compressed
     870            0 :             self.uncompressed_bytes_chosen += uncompressed_len;
     871        19576 :         }
     872              : 
     873        19576 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     874        19576 :         key.write_to_byte_slice(&mut keybuf);
     875        19576 :         self.tree
     876        19576 :             .append(&keybuf, off)
     877        19576 :             .map_err(anyhow::Error::new)
     878        19576 :             .map_err(PutError::Other)?;
     879              : 
     880              :         #[cfg(feature = "testing")]
     881        19576 :         {
     882        19576 :             self.last_written_key = key;
     883        19576 :         }
     884        19576 : 
     885        19576 :         Ok(())
     886        19576 :     }
     887              : 
     888              :     ///
     889              :     /// Write the next image to the file, as a raw blob header and data.
     890              :     ///
     891              :     /// The page versions must be appended in blknum order.
     892              :     ///
     893         8192 :     async fn put_image_raw(
     894         8192 :         &mut self,
     895         8192 :         key: Key,
     896         8192 :         raw_with_header: Bytes,
     897         8192 :         ctx: &RequestContext,
     898         8192 :     ) -> anyhow::Result<()> {
     899         8192 :         ensure!(self.key_range.contains(&key));
     900              : 
     901              :         // NB: we don't update the (un)compressed metrics, since we can't determine them without
     902              :         // decompressing the image. This seems okay.
     903         8192 :         self.num_keys += 1;
     904              : 
     905         8192 :         let (_, res) = self
     906         8192 :             .blob_writer
     907         8192 :             .write_blob_raw(raw_with_header.slice_len(), ctx)
     908         8192 :             .await;
     909         8192 :         let offset = res?;
     910              : 
     911         8192 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     912         8192 :         key.write_to_byte_slice(&mut keybuf);
     913         8192 :         self.tree.append(&keybuf, offset)?;
     914              : 
     915              :         #[cfg(feature = "testing")]
     916         8192 :         {
     917         8192 :             self.last_written_key = key;
     918         8192 :         }
     919         8192 : 
     920         8192 :         Ok(())
     921         8192 :     }
     922              : 
     923              :     ///
     924              :     /// Finish writing the image layer.
     925              :     ///
     926          191 :     async fn finish(
     927          191 :         self,
     928          191 :         ctx: &RequestContext,
     929          191 :         end_key: Option<Key>,
     930          191 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     931          191 :         let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
     932          191 : 
     933          191 :         // Calculate compression ratio
     934          191 :         let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
     935          191 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
     936          191 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
     937          191 :             .inc_by(self.uncompressed_bytes_eligible);
     938          191 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
     939          191 : 
     940          191 :         // NB: filter() may pass through raw pages from a different layer, without looking at
     941          191 :         // whether these are compressed or not. We don't track metrics for these, so avoid
     942          191 :         // increasing `COMPRESSION_IMAGE_OUTPUT_BYTES` in this case too.
     943          191 :         if self.uncompressed_bytes > 0 {
     944          188 :             crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
     945          188 :         };
     946              : 
     947          191 :         let file = self
     948          191 :             .blob_writer
     949          191 :             .shutdown(
     950          191 :                 BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
     951          191 :                 ctx,
     952          191 :             )
     953          191 :             .await?;
     954              : 
     955              :         // Write out the index
     956          191 :         let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
     957          191 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     958              : 
     959              :         // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
     960              :         // Should we just replace BlockBuf::blocks with one big buffer?
     961          398 :         for buf in block_buf.blocks {
     962          207 :             let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
     963          207 :             res?;
     964          207 :             offset += PAGE_SZ as u64;
     965              :         }
     966              : 
     967          191 :         let final_key_range = if let Some(end_key) = end_key {
     968          150 :             self.key_range.start..end_key
     969              :         } else {
     970           41 :             self.key_range.clone()
     971              :         };
     972              : 
     973              :         // Fill in the summary on blk 0
     974          191 :         let summary = Summary {
     975          191 :             magic: IMAGE_FILE_MAGIC,
     976          191 :             format_version: STORAGE_FORMAT_VERSION,
     977          191 :             tenant_id: self.tenant_shard_id.tenant_id,
     978          191 :             timeline_id: self.timeline_id,
     979          191 :             key_range: final_key_range.clone(),
     980          191 :             lsn: self.lsn,
     981          191 :             index_start_blk,
     982          191 :             index_root_blk,
     983          191 :         };
     984              : 
     985              :         // Writes summary at the first block (offset 0).
     986          191 :         let buf = summary.ser_into_page()?;
     987          191 :         let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
     988          191 :         res?;
     989              : 
     990          191 :         let metadata = file
     991          191 :             .metadata()
     992          191 :             .await
     993          191 :             .context("get metadata to determine file size")?;
     994              : 
     995          191 :         let desc = PersistentLayerDesc::new_img(
     996          191 :             self.tenant_shard_id,
     997          191 :             self.timeline_id,
     998          191 :             final_key_range,
     999          191 :             self.lsn,
    1000          191 :             metadata.len(),
    1001          191 :         );
    1002              : 
    1003              :         #[cfg(feature = "testing")]
    1004          191 :         if let Some(end_key) = end_key {
    1005          150 :             assert!(
    1006          150 :                 self.last_written_key < end_key,
    1007            0 :                 "written key violates end_key range"
    1008              :             );
    1009           41 :         }
    1010              : 
    1011              :         // Note: Because we open the file in write-only mode, we cannot
    1012              :         // reuse the same VirtualFile for reading later. That's why we don't
    1013              :         // set inner.file here. The first read will have to re-open it.
    1014              : 
    1015              :         // fsync the file
    1016          191 :         file.sync_all()
    1017          191 :             .await
    1018          191 :             .maybe_fatal_err("image_layer sync_all")?;
    1019              : 
    1020          191 :         trace!("created image layer {}", self.path);
    1021              : 
    1022              :         // The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
    1023              :         // keep the gate open also, so that it's safe for them to rename the file to its final destination.
    1024          191 :         file.disarm_into_inner();
    1025          191 : 
    1026          191 :         Ok((desc, self.path))
    1027          191 :     }
    1028              : }
    1029              : 
    1030              : /// A builder object for constructing a new image layer.
    1031              : ///
    1032              : /// Usage:
    1033              : ///
    1034              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
    1035              : ///
    1036              : /// 2. Write the contents by calling `put_page_image` for every key-value
    1037              : ///    pair in the key range.
    1038              : ///
    1039              : /// 3. Call `finish`.
    1040              : ///
    1041              : /// # Note
    1042              : ///
    1043              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
    1044              : /// possible for the writer to drop before `finish` is actually called. So this
    1045              : /// could lead to odd temporary files in the directory, exhausting file system.
    1046              : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
    1047              : /// implementation that cleans up the temporary file in failure. It's not
    1048              : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
    1049              : /// out some fields, making it impossible to implement `Drop`.
    1050              : ///
    1051              : #[must_use]
    1052              : pub struct ImageLayerWriter {
    1053              :     inner: Option<ImageLayerWriterInner>,
    1054              : }
    1055              : 
    1056              : impl ImageLayerWriter {
    1057              :     ///
    1058              :     /// Start building a new image layer.
    1059              :     ///
    1060              :     #[allow(clippy::too_many_arguments)]
    1061          314 :     pub async fn new(
    1062          314 :         conf: &'static PageServerConf,
    1063          314 :         timeline_id: TimelineId,
    1064          314 :         tenant_shard_id: TenantShardId,
    1065          314 :         key_range: &Range<Key>,
    1066          314 :         lsn: Lsn,
    1067          314 :         gate: &utils::sync::gate::Gate,
    1068          314 :         cancel: CancellationToken,
    1069          314 :         ctx: &RequestContext,
    1070          314 :     ) -> anyhow::Result<ImageLayerWriter> {
    1071          314 :         Ok(Self {
    1072          314 :             inner: Some(
    1073          314 :                 ImageLayerWriterInner::new(
    1074          314 :                     conf,
    1075          314 :                     timeline_id,
    1076          314 :                     tenant_shard_id,
    1077          314 :                     key_range,
    1078          314 :                     lsn,
    1079          314 :                     gate,
    1080          314 :                     cancel,
    1081          314 :                     ctx,
    1082          314 :                 )
    1083          314 :                 .await?,
    1084              :             ),
    1085              :         })
    1086          314 :     }
    1087              : 
    1088              :     ///
    1089              :     /// Write next value to the file.
    1090              :     ///
    1091              :     /// The page versions must be appended in blknum order.
    1092              :     ///
    1093        19576 :     pub async fn put_image(
    1094        19576 :         &mut self,
    1095        19576 :         key: Key,
    1096        19576 :         img: Bytes,
    1097        19576 :         ctx: &RequestContext,
    1098        19576 :     ) -> Result<(), PutError> {
    1099        19576 :         self.inner.as_mut().unwrap().put_image(key, img, ctx).await
    1100        19576 :     }
    1101              : 
    1102              :     ///
    1103              :     /// Write the next value to the file, as a raw header and data. This allows passing through a
    1104              :     /// raw, potentially compressed image from a different layer file without recompressing it.
    1105              :     ///
    1106              :     /// The page versions must be appended in blknum order.
    1107              :     ///
    1108         8192 :     pub async fn put_image_raw(
    1109         8192 :         &mut self,
    1110         8192 :         key: Key,
    1111         8192 :         raw_with_header: Bytes,
    1112         8192 :         ctx: &RequestContext,
    1113         8192 :     ) -> anyhow::Result<()> {
    1114         8192 :         self.inner
    1115         8192 :             .as_mut()
    1116         8192 :             .unwrap()
    1117         8192 :             .put_image_raw(key, raw_with_header, ctx)
    1118         8192 :             .await
    1119         8192 :     }
    1120              : 
    1121              :     /// Estimated size of the image layer.
    1122         4277 :     pub(crate) fn estimated_size(&self) -> u64 {
    1123         4277 :         let inner = self.inner.as_ref().unwrap();
    1124         4277 :         inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
    1125         4277 :     }
    1126              : 
    1127         4326 :     pub(crate) fn num_keys(&self) -> usize {
    1128         4326 :         self.inner.as_ref().unwrap().num_keys
    1129         4326 :     }
    1130              : 
    1131              :     ///
    1132              :     /// Finish writing the image layer.
    1133              :     ///
    1134           41 :     pub(crate) async fn finish(
    1135           41 :         mut self,
    1136           41 :         ctx: &RequestContext,
    1137           41 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
    1138           41 :         self.inner.take().unwrap().finish(ctx, None).await
    1139           41 :     }
    1140              : 
    1141              :     /// Finish writing the image layer with an end key, used in [`super::batch_split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
    1142          150 :     pub(super) async fn finish_with_end_key(
    1143          150 :         mut self,
    1144          150 :         end_key: Key,
    1145          150 :         ctx: &RequestContext,
    1146          150 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
    1147          150 :         self.inner.take().unwrap().finish(ctx, Some(end_key)).await
    1148          150 :     }
    1149              : }
    1150              : 
    1151              : pub struct ImageLayerIterator<'a> {
    1152              :     image_layer: &'a ImageLayerInner,
    1153              :     ctx: &'a RequestContext,
    1154              :     planner: StreamingVectoredReadPlanner,
    1155              :     index_iter: DiskBtreeIterator<'a>,
    1156              :     key_values_batch: VecDeque<(Key, Lsn, Value)>,
    1157              :     is_end: bool,
    1158              : }
    1159              : 
    1160              : impl ImageLayerIterator<'_> {
    1161            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
    1162            0 :         self.image_layer.layer_dbg_info()
    1163            0 :     }
    1164              : 
    1165              :     /// Retrieve a batch of key-value pairs into the iterator buffer.
    1166         9565 :     async fn next_batch(&mut self) -> anyhow::Result<()> {
    1167         9565 :         assert!(self.key_values_batch.is_empty());
    1168         9565 :         assert!(!self.is_end);
    1169              : 
    1170         9565 :         let plan = loop {
    1171        14537 :             if let Some(res) = self.index_iter.next().await {
    1172        14488 :                 let (raw_key, offset) = res?;
    1173        14488 :                 if let Some(batch_plan) = self.planner.handle(
    1174        14488 :                     Key::from_slice(&raw_key[..KEY_SIZE]),
    1175        14488 :                     self.image_layer.lsn,
    1176        14488 :                     offset,
    1177        14488 :                     true,
    1178        14488 :                 ) {
    1179         9516 :                     break batch_plan;
    1180         4972 :                 }
    1181              :             } else {
    1182           49 :                 self.is_end = true;
    1183           49 :                 let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
    1184           49 :                 if let Some(item) = self.planner.handle_range_end(payload_end) {
    1185           49 :                     break item;
    1186              :                 } else {
    1187            0 :                     return Ok(()); // TODO: a test case on empty iterator
    1188              :                 }
    1189              :             }
    1190              :         };
    1191         9565 :         let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
    1192         9565 :         let mut next_batch = std::collections::VecDeque::new();
    1193         9565 :         let buf_size = plan.size();
    1194         9565 :         let buf = IoBufferMut::with_capacity(buf_size);
    1195         9565 :         let blobs_buf = vectored_blob_reader
    1196         9565 :             .read_blobs(&plan, buf, self.ctx)
    1197         9565 :             .await?;
    1198         9565 :         let view = BufView::new_slice(&blobs_buf.buf);
    1199        14474 :         for meta in blobs_buf.blobs.iter() {
    1200        14474 :             let img_buf = meta.read(&view).await?;
    1201        14474 :             next_batch.push_back((
    1202        14474 :                 meta.meta.key,
    1203        14474 :                 self.image_layer.lsn,
    1204        14474 :                 Value::Image(img_buf.into_bytes()),
    1205        14474 :             ));
    1206              :         }
    1207         9565 :         self.key_values_batch = next_batch;
    1208         9565 :         Ok(())
    1209         9565 :     }
    1210              : 
    1211        14414 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
    1212        14414 :         if self.key_values_batch.is_empty() {
    1213         9604 :             if self.is_end {
    1214           81 :                 return Ok(None);
    1215         9523 :             }
    1216         9523 :             self.next_batch().await?;
    1217         4810 :         }
    1218        14333 :         Ok(Some(
    1219        14333 :             self.key_values_batch
    1220        14333 :                 .pop_front()
    1221        14333 :                 .expect("should not be empty"),
    1222        14333 :         ))
    1223        14414 :     }
    1224              : }
    1225              : 
    1226              : #[cfg(test)]
    1227              : mod test {
    1228              :     use std::sync::Arc;
    1229              :     use std::time::Duration;
    1230              : 
    1231              :     use bytes::Bytes;
    1232              :     use itertools::Itertools;
    1233              :     use pageserver_api::key::Key;
    1234              :     use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
    1235              :     use pageserver_api::value::Value;
    1236              :     use utils::generation::Generation;
    1237              :     use utils::id::{TenantId, TimelineId};
    1238              :     use utils::lsn::Lsn;
    1239              : 
    1240              :     use super::{ImageLayerIterator, ImageLayerWriter};
    1241              :     use crate::DEFAULT_PG_VERSION;
    1242              :     use crate::context::RequestContext;
    1243              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
    1244              :     use crate::tenant::storage_layer::{Layer, ResidentLayer};
    1245              :     use crate::tenant::{TenantShard, Timeline};
    1246              : 
    1247              :     #[tokio::test]
    1248            1 :     async fn image_layer_rewrite() {
    1249            1 :         let tenant_conf = pageserver_api::models::TenantConfig {
    1250            1 :             gc_period: Some(Duration::ZERO),
    1251            1 :             compaction_period: Some(Duration::ZERO),
    1252            1 :             ..Default::default()
    1253            1 :         };
    1254            1 :         let tenant_id = TenantId::generate();
    1255            1 :         let mut gen_ = Generation::new(0xdead0001);
    1256            5 :         let mut get_next_gen = || {
    1257            5 :             let ret = gen_;
    1258            5 :             gen_ = gen_.next();
    1259            5 :             ret
    1260            5 :         };
    1261            1 :         // The LSN at which we will create an image layer to filter
    1262            1 :         let lsn = Lsn(0xdeadbeef0000);
    1263            1 :         let timeline_id = TimelineId::generate();
    1264            1 : 
    1265            1 :         //
    1266            1 :         // Create an unsharded parent with a layer.
    1267            1 :         //
    1268            1 : 
    1269            1 :         let harness = TenantHarness::create_custom(
    1270            1 :             "test_image_layer_rewrite--parent",
    1271            1 :             tenant_conf.clone(),
    1272            1 :             tenant_id,
    1273            1 :             ShardIdentity::unsharded(),
    1274            1 :             get_next_gen(),
    1275            1 :         )
    1276            1 :         .await
    1277            1 :         .unwrap();
    1278            1 :         let (tenant, ctx) = harness.load().await;
    1279            1 :         let timeline = tenant
    1280            1 :             .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1281            1 :             .await
    1282            1 :             .unwrap();
    1283            1 : 
    1284            1 :         // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
    1285            1 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1286            1 :         let input_end = Key::from_hex("000000067f00000001000000ae0000002000").unwrap();
    1287            1 :         let range = input_start..input_end;
    1288            1 : 
    1289            1 :         // Build an image layer to filter
    1290            1 :         let resident = {
    1291            1 :             let mut writer = ImageLayerWriter::new(
    1292            1 :                 harness.conf,
    1293            1 :                 timeline_id,
    1294            1 :                 harness.tenant_shard_id,
    1295            1 :                 &range,
    1296            1 :                 lsn,
    1297            1 :                 &timeline.gate,
    1298            1 :                 timeline.cancel.clone(),
    1299            1 :                 &ctx,
    1300            1 :             )
    1301            1 :             .await
    1302            1 :             .unwrap();
    1303            1 : 
    1304            1 :             let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
    1305            1 :             let mut key = range.start;
    1306         8193 :             while key < range.end {
    1307         8192 :                 writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
    1308         8192 : 
    1309         8192 :                 key = key.next();
    1310            1 :             }
    1311            1 :             let (desc, path) = writer.finish(&ctx).await.unwrap();
    1312            1 :             Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
    1313            1 :         };
    1314            1 :         let original_size = resident.metadata().file_size;
    1315            1 : 
    1316            1 :         //
    1317            1 :         // Create child shards and do the rewrite, exercising filter().
    1318            1 :         // TODO: abstraction in TenantHarness for splits.
    1319            1 :         //
    1320            1 : 
    1321            1 :         // Filter for various shards: this exercises cases like values at start of key range, end of key
    1322            1 :         // range, middle of key range.
    1323            1 :         let shard_count = ShardCount::new(4);
    1324            4 :         for shard_number in 0..shard_count.count() {
    1325            1 :             //
    1326            1 :             // mimic the shard split
    1327            1 :             //
    1328            4 :             let shard_identity = ShardIdentity::new(
    1329            4 :                 ShardNumber(shard_number),
    1330            4 :                 shard_count,
    1331            4 :                 ShardStripeSize(0x800),
    1332            4 :             )
    1333            4 :             .unwrap();
    1334            4 :             let harness = TenantHarness::create_custom(
    1335            4 :                 Box::leak(Box::new(format!(
    1336            4 :                     "test_image_layer_rewrite--child{}",
    1337            4 :                     shard_identity.shard_slug()
    1338            4 :                 ))),
    1339            4 :                 tenant_conf.clone(),
    1340            4 :                 tenant_id,
    1341            4 :                 shard_identity,
    1342            4 :                 // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
    1343            4 :                 // But here, all we care about is that the gen number is unique.
    1344            4 :                 get_next_gen(),
    1345            4 :             )
    1346            4 :             .await
    1347            4 :             .unwrap();
    1348            4 :             let (tenant, ctx) = harness.load().await;
    1349            4 :             let timeline = tenant
    1350            4 :                 .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1351            4 :                 .await
    1352            4 :                 .unwrap();
    1353            1 : 
    1354            1 :             //
    1355            1 :             // use filter() and make assertions
    1356            1 :             //
    1357            1 : 
    1358            4 :             let mut filtered_writer = ImageLayerWriter::new(
    1359            4 :                 harness.conf,
    1360            4 :                 timeline_id,
    1361            4 :                 harness.tenant_shard_id,
    1362            4 :                 &range,
    1363            4 :                 lsn,
    1364            4 :                 &timeline.gate,
    1365            4 :                 timeline.cancel.clone(),
    1366            4 :                 &ctx,
    1367            4 :             )
    1368            4 :             .await
    1369            4 :             .unwrap();
    1370            1 : 
    1371            4 :             let wrote_keys = resident
    1372            4 :                 .filter(&shard_identity, &mut filtered_writer, &ctx)
    1373            4 :                 .await
    1374            4 :                 .unwrap();
    1375            4 :             let replacement = if wrote_keys > 0 {
    1376            3 :                 let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
    1377            3 :                 let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
    1378            3 :                 Some(resident)
    1379            1 :             } else {
    1380            1 :                 None
    1381            1 :             };
    1382            1 : 
    1383            1 :             // This exact size and those below will need updating as/when the layer encoding changes, but
    1384            1 :             // should be deterministic for a given version of the format, as we used no randomness generating the input.
    1385            4 :             assert_eq!(original_size, 122880);
    1386            1 : 
    1387            4 :             match shard_number {
    1388            1 :                 0 => {
    1389            1 :                     // We should have written out just one stripe for our shard identity
    1390            1 :                     assert_eq!(wrote_keys, 0x800);
    1391            1 :                     let replacement = replacement.unwrap();
    1392            1 : 
    1393            1 :                     // We should have dropped some of the data
    1394            1 :                     assert!(replacement.metadata().file_size < original_size);
    1395            1 :                     assert!(replacement.metadata().file_size > 0);
    1396            1 : 
    1397            1 :                     // Assert that we dropped ~3/4 of the data.
    1398            1 :                     assert_eq!(replacement.metadata().file_size, 49152);
    1399            1 :                 }
    1400            1 :                 1 => {
    1401            1 :                     // Shard 1 has no keys in our input range
    1402            1 :                     assert_eq!(wrote_keys, 0x0);
    1403            1 :                     assert!(replacement.is_none());
    1404            1 :                 }
    1405            1 :                 2 => {
    1406            1 :                     // Shard 2 has one stripes in the input range
    1407            1 :                     assert_eq!(wrote_keys, 0x800);
    1408            1 :                     let replacement = replacement.unwrap();
    1409            1 :                     assert!(replacement.metadata().file_size < original_size);
    1410            1 :                     assert!(replacement.metadata().file_size > 0);
    1411            1 :                     assert_eq!(replacement.metadata().file_size, 49152);
    1412            1 :                 }
    1413            1 :                 3 => {
    1414            1 :                     // Shard 3 has two stripes in the input range
    1415            1 :                     assert_eq!(wrote_keys, 0x1000);
    1416            1 :                     let replacement = replacement.unwrap();
    1417            1 :                     assert!(replacement.metadata().file_size < original_size);
    1418            1 :                     assert!(replacement.metadata().file_size > 0);
    1419            1 :                     assert_eq!(replacement.metadata().file_size, 73728);
    1420            1 :                 }
    1421            1 :                 _ => unreachable!(),
    1422            1 :             }
    1423            1 :         }
    1424            1 :     }
    1425              : 
    1426            1 :     async fn produce_image_layer(
    1427            1 :         tenant: &TenantShard,
    1428            1 :         tline: &Arc<Timeline>,
    1429            1 :         mut images: Vec<(Key, Bytes)>,
    1430            1 :         lsn: Lsn,
    1431            1 :         ctx: &RequestContext,
    1432            1 :     ) -> anyhow::Result<ResidentLayer> {
    1433            1 :         images.sort();
    1434            1 :         let (key_start, _) = images.first().unwrap();
    1435            1 :         let (key_last, _) = images.last().unwrap();
    1436            1 :         let key_end = key_last.next();
    1437            1 :         let key_range = *key_start..key_end;
    1438            1 :         let mut writer = ImageLayerWriter::new(
    1439            1 :             tenant.conf,
    1440            1 :             tline.timeline_id,
    1441            1 :             tenant.tenant_shard_id,
    1442            1 :             &key_range,
    1443            1 :             lsn,
    1444            1 :             &tline.gate,
    1445            1 :             tline.cancel.clone(),
    1446            1 :             ctx,
    1447            1 :         )
    1448            1 :         .await?;
    1449              : 
    1450         1001 :         for (key, img) in images {
    1451         1000 :             writer.put_image(key, img, ctx).await?;
    1452              :         }
    1453            1 :         let (desc, path) = writer.finish(ctx).await?;
    1454            1 :         let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
    1455              : 
    1456            1 :         Ok::<_, anyhow::Error>(img_layer)
    1457            1 :     }
    1458              : 
    1459           14 :     async fn assert_img_iter_equal(
    1460           14 :         img_iter: &mut ImageLayerIterator<'_>,
    1461           14 :         expect: &[(Key, Bytes)],
    1462           14 :         expect_lsn: Lsn,
    1463           14 :     ) {
    1464           14 :         let mut expect_iter = expect.iter();
    1465              :         loop {
    1466        14014 :             let o1 = img_iter.next().await.unwrap();
    1467        14014 :             let o2 = expect_iter.next();
    1468        14014 :             match (o1, o2) {
    1469           14 :                 (None, None) => break,
    1470        14000 :                 (Some((k1, l1, v1)), Some((k2, i2))) => {
    1471        14000 :                     let Value::Image(i1) = v1 else {
    1472            0 :                         panic!("expect Value::Image")
    1473              :                     };
    1474        14000 :                     assert_eq!(&k1, k2);
    1475        14000 :                     assert_eq!(l1, expect_lsn);
    1476        14000 :                     assert_eq!(&i1, i2);
    1477              :                 }
    1478            0 :                 (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
    1479              :             }
    1480              :         }
    1481           14 :     }
    1482              : 
    1483              :     #[tokio::test]
    1484            1 :     async fn image_layer_iterator() {
    1485            1 :         let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
    1486            1 :         let (tenant, ctx) = harness.load().await;
    1487            1 : 
    1488            1 :         let tline = tenant
    1489            1 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    1490            1 :             .await
    1491            1 :             .unwrap();
    1492            1 : 
    1493         1000 :         fn get_key(id: u32) -> Key {
    1494         1000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
    1495         1000 :             key.field6 = id;
    1496         1000 :             key
    1497         1000 :         }
    1498            1 :         const N: usize = 1000;
    1499            1 :         let test_imgs = (0..N)
    1500         1000 :             .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
    1501            1 :             .collect_vec();
    1502            1 :         let resident_layer =
    1503            1 :             produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
    1504            1 :                 .await
    1505            1 :                 .unwrap();
    1506            1 :         let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
    1507            3 :         for max_read_size in [1, 1024] {
    1508           16 :             for batch_size in [1, 2, 4, 8, 3, 7, 13] {
    1509           14 :                 println!("running with batch_size={batch_size} max_read_size={max_read_size}");
    1510           14 :                 // Test if the batch size is correctly determined
    1511           14 :                 let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
    1512           14 :                 let mut num_items = 0;
    1513           56 :                 for _ in 0..3 {
    1514           42 :                     iter.next_batch().await.unwrap();
    1515           42 :                     num_items += iter.key_values_batch.len();
    1516           42 :                     if max_read_size == 1 {
    1517            1 :                         // every key should be a batch b/c the value is larger than max_read_size
    1518           21 :                         assert_eq!(iter.key_values_batch.len(), 1);
    1519            1 :                     } else {
    1520           21 :                         assert!(iter.key_values_batch.len() <= batch_size);
    1521            1 :                     }
    1522           42 :                     if num_items >= N {
    1523            1 :                         break;
    1524           42 :                     }
    1525           42 :                     iter.key_values_batch.clear();
    1526            1 :                 }
    1527            1 :                 // Test if the result is correct
    1528           14 :                 let mut iter = img_layer.iter_with_options(&ctx, max_read_size, batch_size);
    1529           14 :                 assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
    1530            1 :             }
    1531            1 :         }
    1532            1 :     }
    1533              : }
        

Generated by: LCOV version 2.1-beta