LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - image_layer.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 78.4 % 941 738
Test Date: 2024-08-02 21:34:27 Functions: 53.5 % 86 46

            Line data    Source code
       1              : //! An ImageLayer represents an image or a snapshot of a key-range at
       2              : //! one particular LSN. It contains an image of all key-value pairs
       3              : //! in its key-range. Any key that falls into the image layer's range
       4              : //! but does not exist in the layer, does not exist.
       5              : //!
       6              : //! An image layer is stored in a file on disk. The file is stored in
       7              : //! timelines/<timeline_id> directory.  Currently, there are no
       8              : //! subdirectories, and each image layer file is named like this:
       9              : //!
      10              : //! ```text
      11              : //!    <key start>-<key end>__<LSN>
      12              : //! ```
      13              : //!
      14              : //! For example:
      15              : //!
      16              : //! ```text
      17              : //!    000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
      18              : //! ```
      19              : //!
      20              : //! Every image layer file consists of three parts: "summary",
      21              : //! "index", and "values".  The summary is a fixed size header at the
      22              : //! beginning of the file, and it contains basic information about the
      23              : //! layer, and offsets to the other parts. The "index" is a B-tree,
      24              : //! mapping from Key to an offset in the "values" part.  The
      25              : //! actual page images are stored in the "values" part.
      26              : use crate::config::PageServerConf;
      27              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      28              : use crate::page_cache::{self, FileId, PAGE_SZ};
      29              : use crate::repository::{Key, Value, KEY_SIZE};
      30              : use crate::tenant::blob_io::BlobWriter;
      31              : use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
      32              : use crate::tenant::disk_btree::{
      33              :     DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
      34              : };
      35              : use crate::tenant::storage_layer::{
      36              :     LayerAccessStats, ValueReconstructResult, ValueReconstructState,
      37              : };
      38              : use crate::tenant::timeline::GetVectoredError;
      39              : use crate::tenant::vectored_blob_io::{
      40              :     BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
      41              :     VectoredReadPlanner,
      42              : };
      43              : use crate::tenant::{PageReconstructError, Timeline};
      44              : use crate::virtual_file::{self, VirtualFile};
      45              : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
      46              : use anyhow::{anyhow, bail, ensure, Context, Result};
      47              : use bytes::{Bytes, BytesMut};
      48              : use camino::{Utf8Path, Utf8PathBuf};
      49              : use hex;
      50              : use itertools::Itertools;
      51              : use pageserver_api::keyspace::KeySpace;
      52              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
      53              : use rand::{distributions::Alphanumeric, Rng};
      54              : use serde::{Deserialize, Serialize};
      55              : use std::collections::VecDeque;
      56              : use std::fs::File;
      57              : use std::io::SeekFrom;
      58              : use std::ops::Range;
      59              : use std::os::unix::prelude::FileExt;
      60              : use std::str::FromStr;
      61              : use std::sync::Arc;
      62              : use tokio::sync::OnceCell;
      63              : use tokio_stream::StreamExt;
      64              : use tracing::*;
      65              : 
      66              : use utils::{
      67              :     bin_ser::BeSer,
      68              :     id::{TenantId, TimelineId},
      69              :     lsn::Lsn,
      70              : };
      71              : 
      72              : use super::layer_name::ImageLayerName;
      73              : use super::{
      74              :     AsLayerDesc, Layer, LayerName, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
      75              : };
      76              : 
      77              : ///
      78              : /// Header stored in the beginning of the file
      79              : ///
      80              : /// After this comes the 'values' part, starting on block 1. After that,
      81              : /// the 'index' starts at the block indicated by 'index_start_blk'
      82              : ///
      83           98 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      84              : pub struct Summary {
      85              :     /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
      86              :     pub magic: u16,
      87              :     pub format_version: u16,
      88              : 
      89              :     pub tenant_id: TenantId,
      90              :     pub timeline_id: TimelineId,
      91              :     pub key_range: Range<Key>,
      92              :     pub lsn: Lsn,
      93              : 
      94              :     /// Block number where the 'index' part of the file begins.
      95              :     pub index_start_blk: u32,
      96              :     /// Block within the 'index', where the B-tree root page is stored
      97              :     pub index_root_blk: u32,
      98              :     // the 'values' part starts after the summary header, on block 1.
      99              : }
     100              : 
     101              : impl From<&ImageLayer> for Summary {
     102            0 :     fn from(layer: &ImageLayer) -> Self {
     103            0 :         Self::expected(
     104            0 :             layer.desc.tenant_shard_id.tenant_id,
     105            0 :             layer.desc.timeline_id,
     106            0 :             layer.desc.key_range.clone(),
     107            0 :             layer.lsn,
     108            0 :         )
     109            0 :     }
     110              : }
     111              : 
     112              : impl Summary {
     113           98 :     pub(super) fn expected(
     114           98 :         tenant_id: TenantId,
     115           98 :         timeline_id: TimelineId,
     116           98 :         key_range: Range<Key>,
     117           98 :         lsn: Lsn,
     118           98 :     ) -> Self {
     119           98 :         Self {
     120           98 :             magic: IMAGE_FILE_MAGIC,
     121           98 :             format_version: STORAGE_FORMAT_VERSION,
     122           98 :             tenant_id,
     123           98 :             timeline_id,
     124           98 :             key_range,
     125           98 :             lsn,
     126           98 : 
     127           98 :             index_start_blk: 0,
     128           98 :             index_root_blk: 0,
     129           98 :         }
     130           98 :     }
     131              : }
     132              : 
     133              : /// This is used only from `pagectl`. Within pageserver, all layers are
     134              : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
     135              : pub struct ImageLayer {
     136              :     path: Utf8PathBuf,
     137              :     pub desc: PersistentLayerDesc,
     138              :     // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
     139              :     pub lsn: Lsn,
     140              :     access_stats: LayerAccessStats,
     141              :     inner: OnceCell<ImageLayerInner>,
     142              : }
     143              : 
     144              : impl std::fmt::Debug for ImageLayer {
     145            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     146            0 :         use super::RangeDisplayDebug;
     147            0 : 
     148            0 :         f.debug_struct("ImageLayer")
     149            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     150            0 :             .field("file_size", &self.desc.file_size)
     151            0 :             .field("lsn", &self.lsn)
     152            0 :             .field("inner", &self.inner)
     153            0 :             .finish()
     154            0 :     }
     155              : }
     156              : 
     157              : /// ImageLayer is the in-memory data structure associated with an on-disk image
     158              : /// file.
     159              : pub struct ImageLayerInner {
     160              :     // values copied from summary
     161              :     index_start_blk: u32,
     162              :     index_root_blk: u32,
     163              : 
     164              :     key_range: Range<Key>,
     165              :     lsn: Lsn,
     166              : 
     167              :     file: VirtualFile,
     168              :     file_id: FileId,
     169              : 
     170              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     171              : }
     172              : 
     173              : impl std::fmt::Debug for ImageLayerInner {
     174            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     175            0 :         f.debug_struct("ImageLayerInner")
     176            0 :             .field("index_start_blk", &self.index_start_blk)
     177            0 :             .field("index_root_blk", &self.index_root_blk)
     178            0 :             .finish()
     179            0 :     }
     180              : }
     181              : 
     182              : impl ImageLayerInner {
     183            0 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
     184            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     185            0 :         let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
     186            0 :             self.index_start_blk,
     187            0 :             self.index_root_blk,
     188            0 :             block_reader,
     189            0 :         );
     190            0 : 
     191            0 :         tree_reader.dump().await?;
     192              : 
     193            0 :         tree_reader
     194            0 :             .visit(
     195            0 :                 &[0u8; KEY_SIZE],
     196            0 :                 VisitDirection::Forwards,
     197            0 :                 |key, value| {
     198            0 :                     println!("key: {} offset {}", hex::encode(key), value);
     199            0 :                     true
     200            0 :                 },
     201            0 :                 ctx,
     202            0 :             )
     203            0 :             .await?;
     204              : 
     205            0 :         Ok(())
     206            0 :     }
     207              : }
     208              : 
     209              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     210              : impl std::fmt::Display for ImageLayer {
     211            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     212            0 :         write!(f, "{}", self.layer_desc().short_id())
     213            0 :     }
     214              : }
     215              : 
     216              : impl AsLayerDesc for ImageLayer {
     217            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     218            0 :         &self.desc
     219            0 :     }
     220              : }
     221              : 
     222              : impl ImageLayer {
     223            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     224            0 :         self.desc.dump();
     225            0 : 
     226            0 :         if !verbose {
     227            0 :             return Ok(());
     228            0 :         }
     229              : 
     230            0 :         let inner = self.load(ctx).await?;
     231              : 
     232            0 :         inner.dump(ctx).await?;
     233              : 
     234            0 :         Ok(())
     235            0 :     }
     236              : 
     237          272 :     fn temp_path_for(
     238          272 :         conf: &PageServerConf,
     239          272 :         timeline_id: TimelineId,
     240          272 :         tenant_shard_id: TenantShardId,
     241          272 :         fname: &ImageLayerName,
     242          272 :     ) -> Utf8PathBuf {
     243          272 :         let rand_string: String = rand::thread_rng()
     244          272 :             .sample_iter(&Alphanumeric)
     245          272 :             .take(8)
     246          272 :             .map(char::from)
     247          272 :             .collect();
     248          272 : 
     249          272 :         conf.timeline_path(&tenant_shard_id, &timeline_id)
     250          272 :             .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
     251          272 :     }
     252              : 
     253              :     ///
     254              :     /// Open the underlying file and read the metadata into memory, if it's
     255              :     /// not loaded already.
     256              :     ///
     257            0 :     async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
     258            0 :         self.access_stats.record_access(ctx);
     259            0 :         self.inner
     260            0 :             .get_or_try_init(|| self.load_inner(ctx))
     261            0 :             .await
     262            0 :             .with_context(|| format!("Failed to load image layer {}", self.path()))
     263            0 :     }
     264              : 
     265            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
     266            0 :         let path = self.path();
     267              : 
     268            0 :         let loaded =
     269            0 :             ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
     270              : 
     271              :         // not production code
     272            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     273            0 :         let expected_layer_name = self.layer_desc().layer_name();
     274            0 : 
     275            0 :         if actual_layer_name != expected_layer_name {
     276            0 :             println!("warning: filename does not match what is expected from in-file summary");
     277            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     278            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     279            0 :         }
     280              : 
     281            0 :         Ok(loaded)
     282            0 :     }
     283              : 
     284              :     /// Create an ImageLayer struct representing an existing file on disk.
     285              :     ///
     286              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     287            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
     288            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     289            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     290            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     291            0 :         let metadata = file
     292            0 :             .metadata()
     293            0 :             .context("get file metadata to determine size")?;
     294              : 
     295              :         // This function is never used for constructing layers in a running pageserver,
     296              :         // so it does not need an accurate TenantShardId.
     297            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     298            0 : 
     299            0 :         Ok(ImageLayer {
     300            0 :             path: path.to_path_buf(),
     301            0 :             desc: PersistentLayerDesc::new_img(
     302            0 :                 tenant_shard_id,
     303            0 :                 summary.timeline_id,
     304            0 :                 summary.key_range,
     305            0 :                 summary.lsn,
     306            0 :                 metadata.len(),
     307            0 :             ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
     308            0 :             lsn: summary.lsn,
     309            0 :             access_stats: Default::default(),
     310            0 :             inner: OnceCell::new(),
     311            0 :         })
     312            0 :     }
     313              : 
     314            0 :     fn path(&self) -> Utf8PathBuf {
     315            0 :         self.path.clone()
     316            0 :     }
     317              : }
     318              : 
     319            0 : #[derive(thiserror::Error, Debug)]
     320              : pub enum RewriteSummaryError {
     321              :     #[error("magic mismatch")]
     322              :     MagicMismatch,
     323              :     #[error(transparent)]
     324              :     Other(#[from] anyhow::Error),
     325              : }
     326              : 
     327              : impl From<std::io::Error> for RewriteSummaryError {
     328            0 :     fn from(e: std::io::Error) -> Self {
     329            0 :         Self::Other(anyhow::anyhow!(e))
     330            0 :     }
     331              : }
     332              : 
     333              : impl ImageLayer {
     334            0 :     pub async fn rewrite_summary<F>(
     335            0 :         path: &Utf8Path,
     336            0 :         rewrite: F,
     337            0 :         ctx: &RequestContext,
     338            0 :     ) -> Result<(), RewriteSummaryError>
     339            0 :     where
     340            0 :         F: Fn(Summary) -> Summary,
     341            0 :     {
     342            0 :         let mut file = VirtualFile::open_with_options(
     343            0 :             path,
     344            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     345            0 :             ctx,
     346            0 :         )
     347            0 :         .await
     348            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     349            0 :         let file_id = page_cache::next_file_id();
     350            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     351            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     352            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     353            0 :         if actual_summary.magic != IMAGE_FILE_MAGIC {
     354            0 :             return Err(RewriteSummaryError::MagicMismatch);
     355            0 :         }
     356            0 : 
     357            0 :         let new_summary = rewrite(actual_summary);
     358            0 : 
     359            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     360            0 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     361            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     362            0 :         file.seek(SeekFrom::Start(0)).await?;
     363            0 :         let (_buf, res) = file.write_all(buf, ctx).await;
     364            0 :         res?;
     365            0 :         Ok(())
     366            0 :     }
     367              : }
     368              : 
     369              : impl ImageLayerInner {
     370           22 :     pub(crate) fn key_range(&self) -> &Range<Key> {
     371           22 :         &self.key_range
     372           22 :     }
     373              : 
     374           22 :     pub(crate) fn lsn(&self) -> Lsn {
     375           22 :         self.lsn
     376           22 :     }
     377              : 
     378              :     /// Returns nested result following Result<Result<_, OpErr>, Critical>:
     379              :     /// - inner has the success or transient failure
     380              :     /// - outer has the permanent failure
     381           98 :     pub(super) async fn load(
     382           98 :         path: &Utf8Path,
     383           98 :         lsn: Lsn,
     384           98 :         summary: Option<Summary>,
     385           98 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     386           98 :         ctx: &RequestContext,
     387           98 :     ) -> anyhow::Result<Self> {
     388           98 :         let file = VirtualFile::open(path, ctx)
     389           49 :             .await
     390           98 :             .context("open layer file")?;
     391           98 :         let file_id = page_cache::next_file_id();
     392           98 :         let block_reader = FileBlockReader::new(&file, file_id);
     393           98 :         let summary_blk = block_reader
     394           98 :             .read_blk(0, ctx)
     395           51 :             .await
     396           98 :             .context("read first block")?;
     397              : 
     398              :         // length is the only way how this could fail, so it's not actually likely at all unless
     399              :         // read_blk returns wrong sized block.
     400              :         //
     401              :         // TODO: confirm and make this into assertion
     402           98 :         let actual_summary =
     403           98 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     404              : 
     405           98 :         if let Some(mut expected_summary) = summary {
     406              :             // production code path
     407           98 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     408           98 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     409           98 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     410           98 :             expected_summary.timeline_id = actual_summary.timeline_id;
     411           98 : 
     412           98 :             if actual_summary != expected_summary {
     413            0 :                 bail!(
     414            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     415            0 :                     actual_summary,
     416            0 :                     expected_summary
     417            0 :                 );
     418           98 :             }
     419            0 :         }
     420              : 
     421           98 :         Ok(ImageLayerInner {
     422           98 :             index_start_blk: actual_summary.index_start_blk,
     423           98 :             index_root_blk: actual_summary.index_root_blk,
     424           98 :             lsn,
     425           98 :             file,
     426           98 :             file_id,
     427           98 :             max_vectored_read_bytes,
     428           98 :             key_range: actual_summary.key_range,
     429           98 :         })
     430           98 :     }
     431              : 
     432            4 :     pub(super) async fn get_value_reconstruct_data(
     433            4 :         &self,
     434            4 :         key: Key,
     435            4 :         reconstruct_state: &mut ValueReconstructState,
     436            4 :         ctx: &RequestContext,
     437            4 :     ) -> anyhow::Result<ValueReconstructResult> {
     438            4 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     439            4 :         let tree_reader =
     440            4 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
     441            4 : 
     442            4 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     443            4 :         key.write_to_byte_slice(&mut keybuf);
     444            4 :         if let Some(offset) = tree_reader
     445            4 :             .get(
     446            4 :                 &keybuf,
     447            4 :                 &RequestContextBuilder::extend(ctx)
     448            4 :                     .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     449            4 :                     .build(),
     450            4 :             )
     451            2 :             .await?
     452              :         {
     453            4 :             let blob = block_reader
     454            4 :                 .block_cursor()
     455            4 :                 .read_blob(
     456            4 :                     offset,
     457            4 :                     &RequestContextBuilder::extend(ctx)
     458            4 :                         .page_content_kind(PageContentKind::ImageLayerValue)
     459            4 :                         .build(),
     460            4 :                 )
     461            2 :                 .await
     462            4 :                 .with_context(|| format!("failed to read value from offset {}", offset))?;
     463            4 :             let value = Bytes::from(blob);
     464            4 : 
     465            4 :             reconstruct_state.img = Some((self.lsn, value));
     466            4 :             Ok(ValueReconstructResult::Complete)
     467              :         } else {
     468            0 :             Ok(ValueReconstructResult::Missing)
     469              :         }
     470            4 :     }
     471              : 
     472              :     // Look up the keys in the provided keyspace and update
     473              :     // the reconstruct state with whatever is found.
     474         7606 :     pub(super) async fn get_values_reconstruct_data(
     475         7606 :         &self,
     476         7606 :         keyspace: KeySpace,
     477         7606 :         reconstruct_state: &mut ValuesReconstructState,
     478         7606 :         ctx: &RequestContext,
     479         7606 :     ) -> Result<(), GetVectoredError> {
     480         7606 :         let reads = self
     481         7606 :             .plan_reads(keyspace, None, ctx)
     482          730 :             .await
     483         7606 :             .map_err(GetVectoredError::Other)?;
     484              : 
     485         7606 :         self.do_reads_and_update_state(reads, reconstruct_state, ctx)
     486         4633 :             .await;
     487              : 
     488         7606 :         reconstruct_state.on_image_layer_visited(&self.key_range);
     489         7606 : 
     490         7606 :         Ok(())
     491         7606 :     }
     492              : 
     493              :     /// Load all key-values in the delta layer, should be replaced by an iterator-based interface in the future.
     494            0 :     pub(super) async fn load_key_values(
     495            0 :         &self,
     496            0 :         ctx: &RequestContext,
     497            0 :     ) -> anyhow::Result<Vec<(Key, Lsn, Value)>> {
     498            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     499            0 :         let tree_reader =
     500            0 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, &block_reader);
     501            0 :         let mut result = Vec::new();
     502            0 :         let mut stream = Box::pin(tree_reader.into_stream(&[0; KEY_SIZE], ctx));
     503            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     504            0 :         let cursor = block_reader.block_cursor();
     505            0 :         while let Some(item) = stream.next().await {
     506              :             // TODO: dedup code with get_reconstruct_value
     507            0 :             let (raw_key, offset) = item?;
     508            0 :             let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     509              :             // TODO: ctx handling and sharding
     510            0 :             let blob = cursor
     511            0 :                 .read_blob(offset, ctx)
     512            0 :                 .await
     513            0 :                 .with_context(|| format!("failed to read value from offset {}", offset))?;
     514            0 :             let value = Bytes::from(blob);
     515            0 :             result.push((key, self.lsn, Value::Image(value)));
     516              :         }
     517            0 :         Ok(result)
     518            0 :     }
     519              : 
     520              :     /// Traverse the layer's index to build read operations on the overlap of the input keyspace
     521              :     /// and the keys in this layer.
     522              :     ///
     523              :     /// If shard_identity is provided, it will be used to filter keys down to those stored on
     524              :     /// this shard.
     525         7614 :     async fn plan_reads(
     526         7614 :         &self,
     527         7614 :         keyspace: KeySpace,
     528         7614 :         shard_identity: Option<&ShardIdentity>,
     529         7614 :         ctx: &RequestContext,
     530         7614 :     ) -> anyhow::Result<Vec<VectoredRead>> {
     531         7614 :         let mut planner = VectoredReadPlanner::new(
     532         7614 :             self.max_vectored_read_bytes
     533         7614 :                 .expect("Layer is loaded with max vectored bytes config")
     534         7614 :                 .0
     535         7614 :                 .into(),
     536         7614 :         );
     537         7614 : 
     538         7614 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     539         7614 :         let tree_reader =
     540         7614 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     541         7614 : 
     542         7614 :         let ctx = RequestContextBuilder::extend(ctx)
     543         7614 :             .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     544         7614 :             .build();
     545              : 
     546        29312 :         for range in keyspace.ranges.iter() {
     547        29312 :             let mut range_end_handled = false;
     548        29312 :             let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     549        29312 :             range.start.write_to_byte_slice(&mut search_key);
     550        29312 : 
     551        29312 :             let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
     552        29312 :             let mut index_stream = std::pin::pin!(index_stream);
     553              : 
     554      1095809 :             while let Some(index_entry) = index_stream.next().await {
     555      1095626 :                 let (raw_key, offset) = index_entry?;
     556              : 
     557      1095626 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     558      1095626 :                 assert!(key >= range.start);
     559              : 
     560      1095626 :                 let flag = if let Some(shard_identity) = shard_identity {
     561      1048576 :                     if shard_identity.is_key_disposable(&key) {
     562       786432 :                         BlobFlag::Ignore
     563              :                     } else {
     564       262144 :                         BlobFlag::None
     565              :                     }
     566              :                 } else {
     567        47050 :                     BlobFlag::None
     568              :                 };
     569              : 
     570      1095626 :                 if key >= range.end {
     571        29129 :                     planner.handle_range_end(offset);
     572        29129 :                     range_end_handled = true;
     573        29129 :                     break;
     574      1066497 :                 } else {
     575      1066497 :                     planner.handle(key, self.lsn, offset, flag);
     576      1066497 :                 }
     577              :             }
     578              : 
     579        29312 :             if !range_end_handled {
     580          183 :                 let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
     581          183 :                 planner.handle_range_end(payload_end);
     582        29129 :             }
     583              :         }
     584              : 
     585         7614 :         Ok(planner.finish())
     586         7614 :     }
     587              : 
     588              :     /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
     589              :     /// then execute vectored GET operations, passing the results of all read keys into the writer.
     590            8 :     pub(super) async fn filter(
     591            8 :         &self,
     592            8 :         shard_identity: &ShardIdentity,
     593            8 :         writer: &mut ImageLayerWriter,
     594            8 :         ctx: &RequestContext,
     595            8 :     ) -> anyhow::Result<usize> {
     596              :         // Fragment the range into the regions owned by this ShardIdentity
     597            8 :         let plan = self
     598            8 :             .plan_reads(
     599            8 :                 KeySpace {
     600            8 :                     // If asked for the total key space, plan_reads will give us all the keys in the layer
     601            8 :                     ranges: vec![Key::MIN..Key::MAX],
     602            8 :                 },
     603            8 :                 Some(shard_identity),
     604            8 :                 ctx,
     605            8 :             )
     606          469 :             .await?;
     607              : 
     608            8 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     609            8 :         let mut key_count = 0;
     610           16 :         for read in plan.into_iter() {
     611           16 :             let buf_size = read.size();
     612           16 : 
     613           16 :             let buf = BytesMut::with_capacity(buf_size);
     614           16 :             let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
     615              : 
     616           16 :             let frozen_buf = blobs_buf.buf.freeze();
     617              : 
     618       262144 :             for meta in blobs_buf.blobs.iter() {
     619       262144 :                 let img_buf = frozen_buf.slice(meta.start..meta.end);
     620       262144 : 
     621       262144 :                 key_count += 1;
     622       262144 :                 writer
     623       262144 :                     .put_image(meta.meta.key, img_buf, ctx)
     624       266240 :                     .await
     625       262144 :                     .context(format!("Storing key {}", meta.meta.key))?;
     626              :             }
     627              :         }
     628              : 
     629            8 :         Ok(key_count)
     630            8 :     }
     631              : 
     632         7606 :     async fn do_reads_and_update_state(
     633         7606 :         &self,
     634         7606 :         reads: Vec<VectoredRead>,
     635         7606 :         reconstruct_state: &mut ValuesReconstructState,
     636         7606 :         ctx: &RequestContext,
     637         7606 :     ) {
     638         7606 :         let max_vectored_read_bytes = self
     639         7606 :             .max_vectored_read_bytes
     640         7606 :             .expect("Layer is loaded with max vectored bytes config")
     641         7606 :             .0
     642         7606 :             .into();
     643         7606 : 
     644         7606 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     645         9217 :         for read in reads.into_iter() {
     646         9217 :             let buf_size = read.size();
     647         9217 : 
     648         9217 :             if buf_size > max_vectored_read_bytes {
     649              :                 // If the read is oversized, it should only contain one key.
     650            0 :                 let offenders = read
     651            0 :                     .blobs_at
     652            0 :                     .as_slice()
     653            0 :                     .iter()
     654            0 :                     .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
     655            0 :                     .join(", ");
     656            0 :                 tracing::warn!(
     657            0 :                     "Oversized vectored read ({} > {}) for keys {}",
     658              :                     buf_size,
     659              :                     max_vectored_read_bytes,
     660              :                     offenders
     661              :                 );
     662         9217 :             }
     663              : 
     664         9217 :             let buf = BytesMut::with_capacity(buf_size);
     665         9217 :             let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
     666              : 
     667         9217 :             match res {
     668         9217 :                 Ok(blobs_buf) => {
     669         9217 :                     let frozen_buf = blobs_buf.buf.freeze();
     670              : 
     671        17921 :                     for meta in blobs_buf.blobs.iter() {
     672        17921 :                         let img_buf = frozen_buf.slice(meta.start..meta.end);
     673        17921 :                         reconstruct_state.update_key(
     674        17921 :                             &meta.meta.key,
     675        17921 :                             self.lsn,
     676        17921 :                             Value::Image(img_buf),
     677        17921 :                         );
     678        17921 :                     }
     679              :                 }
     680            0 :                 Err(err) => {
     681            0 :                     let kind = err.kind();
     682            0 :                     for (_, blob_meta) in read.blobs_at.as_slice() {
     683            0 :                         reconstruct_state.on_key_error(
     684            0 :                             blob_meta.key,
     685            0 :                             PageReconstructError::from(anyhow!(
     686            0 :                                 "Failed to read blobs from virtual file {}: {}",
     687            0 :                                 self.file.path,
     688            0 :                                 kind
     689            0 :                             )),
     690            0 :                         );
     691            0 :                     }
     692              :                 }
     693              :             };
     694              :         }
     695         7606 :     }
     696              : 
     697           78 :     pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
     698           78 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     699           78 :         let tree_reader =
     700           78 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     701           78 :         ImageLayerIterator {
     702           78 :             image_layer: self,
     703           78 :             ctx,
     704           78 :             index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
     705           78 :             key_values_batch: VecDeque::new(),
     706           78 :             is_end: false,
     707           78 :             planner: StreamingVectoredReadPlanner::new(
     708           78 :                 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
     709           78 :                 1024,        // The default value. Unit tests might use a different value
     710           78 :             ),
     711           78 :         }
     712           78 :     }
     713              : }
     714              : 
     715              : /// A builder object for constructing a new image layer.
     716              : ///
     717              : /// Usage:
     718              : ///
     719              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     720              : ///
     721              : /// 2. Write the contents by calling `put_page_image` for every key-value
     722              : ///    pair in the key range.
     723              : ///
     724              : /// 3. Call `finish`.
     725              : ///
     726              : struct ImageLayerWriterInner {
     727              :     conf: &'static PageServerConf,
     728              :     path: Utf8PathBuf,
     729              :     timeline_id: TimelineId,
     730              :     tenant_shard_id: TenantShardId,
     731              :     key_range: Range<Key>,
     732              :     lsn: Lsn,
     733              : 
     734              :     // Total uncompressed bytes passed into put_image
     735              :     uncompressed_bytes: u64,
     736              : 
     737              :     // Like `uncompressed_bytes`,
     738              :     // but only of images we might consider for compression
     739              :     uncompressed_bytes_eligible: u64,
     740              : 
     741              :     // Like `uncompressed_bytes`, but only of images
     742              :     // where we have chosen their compressed form
     743              :     uncompressed_bytes_chosen: u64,
     744              : 
     745              :     blob_writer: BlobWriter<false>,
     746              :     tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
     747              : }
     748              : 
     749              : impl ImageLayerWriterInner {
     750              :     ///
     751              :     /// Start building a new image layer.
     752              :     ///
     753          272 :     async fn new(
     754          272 :         conf: &'static PageServerConf,
     755          272 :         timeline_id: TimelineId,
     756          272 :         tenant_shard_id: TenantShardId,
     757          272 :         key_range: &Range<Key>,
     758          272 :         lsn: Lsn,
     759          272 :         ctx: &RequestContext,
     760          272 :     ) -> anyhow::Result<Self> {
     761          272 :         // Create the file initially with a temporary filename.
     762          272 :         // We'll atomically rename it to the final name when we're done.
     763          272 :         let path = ImageLayer::temp_path_for(
     764          272 :             conf,
     765          272 :             timeline_id,
     766          272 :             tenant_shard_id,
     767          272 :             &ImageLayerName {
     768          272 :                 key_range: key_range.clone(),
     769          272 :                 lsn,
     770          272 :             },
     771          272 :         );
     772          272 :         trace!("creating image layer {}", path);
     773          272 :         let mut file = {
     774          272 :             VirtualFile::open_with_options(
     775          272 :                 &path,
     776          272 :                 virtual_file::OpenOptions::new()
     777          272 :                     .write(true)
     778          272 :                     .create_new(true),
     779          272 :                 ctx,
     780          272 :             )
     781          210 :             .await?
     782              :         };
     783              :         // make room for the header block
     784          272 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     785          272 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     786          272 : 
     787          272 :         // Initialize the b-tree index builder
     788          272 :         let block_buf = BlockBuf::new();
     789          272 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     790          272 : 
     791          272 :         let writer = Self {
     792          272 :             conf,
     793          272 :             path,
     794          272 :             timeline_id,
     795          272 :             tenant_shard_id,
     796          272 :             key_range: key_range.clone(),
     797          272 :             lsn,
     798          272 :             tree: tree_builder,
     799          272 :             blob_writer,
     800          272 :             uncompressed_bytes: 0,
     801          272 :             uncompressed_bytes_eligible: 0,
     802          272 :             uncompressed_bytes_chosen: 0,
     803          272 :         };
     804          272 : 
     805          272 :         Ok(writer)
     806          272 :     }
     807              : 
     808              :     ///
     809              :     /// Write next value to the file.
     810              :     ///
     811              :     /// The page versions must be appended in blknum order.
     812              :     ///
     813       538070 :     async fn put_image(
     814       538070 :         &mut self,
     815       538070 :         key: Key,
     816       538070 :         img: Bytes,
     817       538070 :         ctx: &RequestContext,
     818       538070 :     ) -> anyhow::Result<()> {
     819       538070 :         ensure!(self.key_range.contains(&key));
     820       538070 :         let compression = self.conf.image_compression;
     821       538070 :         let uncompressed_len = img.len() as u64;
     822       538070 :         self.uncompressed_bytes += uncompressed_len;
     823       538070 :         let (_img, res) = self
     824       538070 :             .blob_writer
     825       538070 :             .write_blob_maybe_compressed(img, ctx, compression)
     826       546563 :             .await;
     827              :         // TODO: re-use the buffer for `img` further upstack
     828       538070 :         let (off, compression_info) = res?;
     829       538070 :         if compression_info.compressed_size.is_some() {
     830            0 :             // The image has been considered for compression at least
     831            0 :             self.uncompressed_bytes_eligible += uncompressed_len;
     832       538070 :         }
     833       538070 :         if compression_info.written_compressed {
     834            0 :             // The image has been compressed
     835            0 :             self.uncompressed_bytes_chosen += uncompressed_len;
     836       538070 :         }
     837              : 
     838       538070 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     839       538070 :         key.write_to_byte_slice(&mut keybuf);
     840       538070 :         self.tree.append(&keybuf, off)?;
     841              : 
     842       538070 :         Ok(())
     843       538070 :     }
     844              : 
     845              :     ///
     846              :     /// Finish writing the image layer.
     847              :     ///
     848          260 :     async fn finish(
     849          260 :         self,
     850          260 :         timeline: &Arc<Timeline>,
     851          260 :         ctx: &RequestContext,
     852          260 :     ) -> anyhow::Result<ResidentLayer> {
     853          260 :         let index_start_blk =
     854          260 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     855          260 : 
     856          260 :         // Calculate compression ratio
     857          260 :         let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
     858          260 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
     859          260 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
     860          260 :             .inc_by(self.uncompressed_bytes_eligible);
     861          260 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
     862          260 :         crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
     863          260 : 
     864          260 :         let mut file = self.blob_writer.into_inner();
     865          260 : 
     866          260 :         // Write out the index
     867          260 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     868            0 :             .await?;
     869          260 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     870          976 :         for buf in block_buf.blocks {
     871          716 :             let (_buf, res) = file.write_all(buf, ctx).await;
     872          716 :             res?;
     873              :         }
     874              : 
     875              :         // Fill in the summary on blk 0
     876          260 :         let summary = Summary {
     877          260 :             magic: IMAGE_FILE_MAGIC,
     878          260 :             format_version: STORAGE_FORMAT_VERSION,
     879          260 :             tenant_id: self.tenant_shard_id.tenant_id,
     880          260 :             timeline_id: self.timeline_id,
     881          260 :             key_range: self.key_range.clone(),
     882          260 :             lsn: self.lsn,
     883          260 :             index_start_blk,
     884          260 :             index_root_blk,
     885          260 :         };
     886          260 : 
     887          260 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     888          260 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     889          260 :         Summary::ser_into(&summary, &mut buf)?;
     890          260 :         file.seek(SeekFrom::Start(0)).await?;
     891          260 :         let (_buf, res) = file.write_all(buf, ctx).await;
     892          260 :         res?;
     893              : 
     894          260 :         let metadata = file
     895          260 :             .metadata()
     896          131 :             .await
     897          260 :             .context("get metadata to determine file size")?;
     898              : 
     899          260 :         let desc = PersistentLayerDesc::new_img(
     900          260 :             self.tenant_shard_id,
     901          260 :             self.timeline_id,
     902          260 :             self.key_range.clone(),
     903          260 :             self.lsn,
     904          260 :             metadata.len(),
     905          260 :         );
     906          260 : 
     907          260 :         // Note: Because we open the file in write-only mode, we cannot
     908          260 :         // reuse the same VirtualFile for reading later. That's why we don't
     909          260 :         // set inner.file here. The first read will have to re-open it.
     910          260 : 
     911          260 :         // fsync the file
     912          260 :         file.sync_all().await?;
     913              : 
     914              :         // FIXME: why not carry the virtualfile here, it supports renaming?
     915          260 :         let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
     916              : 
     917          260 :         info!("created image layer {}", layer.local_path());
     918              : 
     919          260 :         Ok(layer)
     920          260 :     }
     921              : }
     922              : 
     923              : /// A builder object for constructing a new image layer.
     924              : ///
     925              : /// Usage:
     926              : ///
     927              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     928              : ///
     929              : /// 2. Write the contents by calling `put_page_image` for every key-value
     930              : ///    pair in the key range.
     931              : ///
     932              : /// 3. Call `finish`.
     933              : ///
     934              : /// # Note
     935              : ///
     936              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     937              : /// possible for the writer to drop before `finish` is actually called. So this
     938              : /// could lead to odd temporary files in the directory, exhausting file system.
     939              : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
     940              : /// implementation that cleans up the temporary file in failure. It's not
     941              : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
     942              : /// out some fields, making it impossible to implement `Drop`.
     943              : ///
     944              : #[must_use]
     945              : pub struct ImageLayerWriter {
     946              :     inner: Option<ImageLayerWriterInner>,
     947              : }
     948              : 
     949              : impl ImageLayerWriter {
     950              :     ///
     951              :     /// Start building a new image layer.
     952              :     ///
     953          272 :     pub async fn new(
     954          272 :         conf: &'static PageServerConf,
     955          272 :         timeline_id: TimelineId,
     956          272 :         tenant_shard_id: TenantShardId,
     957          272 :         key_range: &Range<Key>,
     958          272 :         lsn: Lsn,
     959          272 :         ctx: &RequestContext,
     960          272 :     ) -> anyhow::Result<ImageLayerWriter> {
     961          272 :         Ok(Self {
     962          272 :             inner: Some(
     963          272 :                 ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
     964          210 :                     .await?,
     965              :             ),
     966              :         })
     967          272 :     }
     968              : 
     969              :     ///
     970              :     /// Write next value to the file.
     971              :     ///
     972              :     /// The page versions must be appended in blknum order.
     973              :     ///
     974       538070 :     pub async fn put_image(
     975       538070 :         &mut self,
     976       538070 :         key: Key,
     977       538070 :         img: Bytes,
     978       538070 :         ctx: &RequestContext,
     979       538070 :     ) -> anyhow::Result<()> {
     980       546563 :         self.inner.as_mut().unwrap().put_image(key, img, ctx).await
     981       538070 :     }
     982              : 
     983              :     ///
     984              :     /// Finish writing the image layer.
     985              :     ///
     986          260 :     pub(crate) async fn finish(
     987          260 :         mut self,
     988          260 :         timeline: &Arc<Timeline>,
     989          260 :         ctx: &RequestContext,
     990          260 :     ) -> anyhow::Result<super::ResidentLayer> {
     991          756 :         self.inner.take().unwrap().finish(timeline, ctx).await
     992          260 :     }
     993              : }
     994              : 
     995              : impl Drop for ImageLayerWriter {
     996          272 :     fn drop(&mut self) {
     997          272 :         if let Some(inner) = self.inner.take() {
     998           12 :             inner.blob_writer.into_inner().remove();
     999          260 :         }
    1000          272 :     }
    1001              : }
    1002              : 
    1003              : pub struct ImageLayerIterator<'a> {
    1004              :     image_layer: &'a ImageLayerInner,
    1005              :     ctx: &'a RequestContext,
    1006              :     planner: StreamingVectoredReadPlanner,
    1007              :     index_iter: DiskBtreeIterator<'a>,
    1008              :     key_values_batch: VecDeque<(Key, Lsn, Value)>,
    1009              :     is_end: bool,
    1010              : }
    1011              : 
    1012              : impl<'a> ImageLayerIterator<'a> {
    1013              :     /// Retrieve a batch of key-value pairs into the iterator buffer.
    1014        18964 :     async fn next_batch(&mut self) -> anyhow::Result<()> {
    1015        18964 :         assert!(self.key_values_batch.is_empty());
    1016        18964 :         assert!(!self.is_end);
    1017              : 
    1018        18964 :         let plan = loop {
    1019        28636 :             if let Some(res) = self.index_iter.next().await {
    1020        28586 :                 let (raw_key, offset) = res?;
    1021        28586 :                 if let Some(batch_plan) = self.planner.handle(
    1022        28586 :                     Key::from_slice(&raw_key[..KEY_SIZE]),
    1023        28586 :                     self.image_layer.lsn,
    1024        28586 :                     offset,
    1025        28586 :                 ) {
    1026        18914 :                     break batch_plan;
    1027         9672 :                 }
    1028              :             } else {
    1029           50 :                 self.is_end = true;
    1030           50 :                 let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
    1031           50 :                 if let Some(item) = self.planner.handle_range_end(payload_end) {
    1032           50 :                     break item;
    1033              :                 } else {
    1034            0 :                     return Ok(()); // TODO: a test case on empty iterator
    1035              :                 }
    1036              :             }
    1037              :         };
    1038        18964 :         let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
    1039        18964 :         let mut next_batch = std::collections::VecDeque::new();
    1040        18964 :         let buf_size = plan.size();
    1041        18964 :         let buf = BytesMut::with_capacity(buf_size);
    1042        18964 :         let blobs_buf = vectored_blob_reader
    1043        18964 :             .read_blobs(&plan, buf, self.ctx)
    1044         9631 :             .await?;
    1045        18964 :         let frozen_buf: Bytes = blobs_buf.buf.freeze();
    1046        28558 :         for meta in blobs_buf.blobs.iter() {
    1047        28558 :             let img_buf = frozen_buf.slice(meta.start..meta.end);
    1048        28558 :             next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
    1049        28558 :         }
    1050        18964 :         self.key_values_batch = next_batch;
    1051        18964 :         Ok(())
    1052        18964 :     }
    1053              : 
    1054        28360 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
    1055        28360 :         if self.key_values_batch.is_empty() {
    1056        18952 :             if self.is_end {
    1057           72 :                 return Ok(None);
    1058        18880 :             }
    1059        18880 :             self.next_batch().await?;
    1060         9408 :         }
    1061        28288 :         Ok(Some(
    1062        28288 :             self.key_values_batch
    1063        28288 :                 .pop_front()
    1064        28288 :                 .expect("should not be empty"),
    1065        28288 :         ))
    1066        28360 :     }
    1067              : }
    1068              : 
    1069              : #[cfg(test)]
    1070              : mod test {
    1071              :     use std::{sync::Arc, time::Duration};
    1072              : 
    1073              :     use bytes::Bytes;
    1074              :     use itertools::Itertools;
    1075              :     use pageserver_api::{
    1076              :         key::Key,
    1077              :         shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
    1078              :     };
    1079              :     use utils::{
    1080              :         generation::Generation,
    1081              :         id::{TenantId, TimelineId},
    1082              :         lsn::Lsn,
    1083              :     };
    1084              : 
    1085              :     use crate::{
    1086              :         context::RequestContext,
    1087              :         repository::Value,
    1088              :         tenant::{
    1089              :             config::TenantConf,
    1090              :             harness::{TenantHarness, TIMELINE_ID},
    1091              :             storage_layer::ResidentLayer,
    1092              :             vectored_blob_io::StreamingVectoredReadPlanner,
    1093              :             Tenant, Timeline,
    1094              :         },
    1095              :         DEFAULT_PG_VERSION,
    1096              :     };
    1097              : 
    1098              :     use super::{ImageLayerIterator, ImageLayerWriter};
    1099              : 
    1100              :     #[tokio::test]
    1101            2 :     async fn image_layer_rewrite() {
    1102            2 :         let tenant_conf = TenantConf {
    1103            2 :             gc_period: Duration::ZERO,
    1104            2 :             compaction_period: Duration::ZERO,
    1105            2 :             ..TenantConf::default()
    1106            2 :         };
    1107            2 :         let tenant_id = TenantId::generate();
    1108            2 :         let mut gen = Generation::new(0xdead0001);
    1109           10 :         let mut get_next_gen = || {
    1110           10 :             let ret = gen;
    1111           10 :             gen = gen.next();
    1112           10 :             ret
    1113           10 :         };
    1114            2 :         // The LSN at which we will create an image layer to filter
    1115            2 :         let lsn = Lsn(0xdeadbeef0000);
    1116            2 :         let timeline_id = TimelineId::generate();
    1117            2 : 
    1118            2 :         //
    1119            2 :         // Create an unsharded parent with a layer.
    1120            2 :         //
    1121            2 : 
    1122            2 :         let harness = TenantHarness::create_custom(
    1123            2 :             "test_image_layer_rewrite--parent",
    1124            2 :             tenant_conf.clone(),
    1125            2 :             tenant_id,
    1126            2 :             ShardIdentity::unsharded(),
    1127            2 :             get_next_gen(),
    1128            2 :         )
    1129            2 :         .await
    1130            2 :         .unwrap();
    1131            8 :         let (tenant, ctx) = harness.load().await;
    1132            2 :         let timeline = tenant
    1133            2 :             .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1134            4 :             .await
    1135            2 :             .unwrap();
    1136            2 : 
    1137            2 :         // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
    1138            2 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1139            2 :         let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
    1140            2 :         let range = input_start..input_end;
    1141            2 : 
    1142            2 :         // Build an image layer to filter
    1143            2 :         let resident = {
    1144            2 :             let mut writer = ImageLayerWriter::new(
    1145            2 :                 harness.conf,
    1146            2 :                 timeline_id,
    1147            2 :                 harness.tenant_shard_id,
    1148            2 :                 &range,
    1149            2 :                 lsn,
    1150            2 :                 &ctx,
    1151            2 :             )
    1152            2 :             .await
    1153            2 :             .unwrap();
    1154            2 : 
    1155            2 :             let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
    1156            2 :             let mut key = range.start;
    1157       262146 :             while key < range.end {
    1158       266239 :                 writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
    1159       262144 : 
    1160       262144 :                 key = key.next();
    1161            2 :             }
    1162          119 :             writer.finish(&timeline, &ctx).await.unwrap()
    1163            2 :         };
    1164            2 :         let original_size = resident.metadata().file_size;
    1165            2 : 
    1166            2 :         //
    1167            2 :         // Create child shards and do the rewrite, exercising filter().
    1168            2 :         // TODO: abstraction in TenantHarness for splits.
    1169            2 :         //
    1170            2 : 
    1171            2 :         // Filter for various shards: this exercises cases like values at start of key range, end of key
    1172            2 :         // range, middle of key range.
    1173            2 :         let shard_count = ShardCount::new(4);
    1174            8 :         for shard_number in 0..shard_count.count() {
    1175            2 :             //
    1176            2 :             // mimic the shard split
    1177            2 :             //
    1178            8 :             let shard_identity = ShardIdentity::new(
    1179            8 :                 ShardNumber(shard_number),
    1180            8 :                 shard_count,
    1181            8 :                 ShardStripeSize(0x8000),
    1182            8 :             )
    1183            8 :             .unwrap();
    1184            8 :             let harness = TenantHarness::create_custom(
    1185            8 :                 Box::leak(Box::new(format!(
    1186            8 :                     "test_image_layer_rewrite--child{}",
    1187            8 :                     shard_identity.shard_slug()
    1188            8 :                 ))),
    1189            8 :                 tenant_conf.clone(),
    1190            8 :                 tenant_id,
    1191            8 :                 shard_identity,
    1192            8 :                 // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
    1193            8 :                 // But here, all we care about is that the gen number is unique.
    1194            8 :                 get_next_gen(),
    1195            8 :             )
    1196            2 :             .await
    1197            8 :             .unwrap();
    1198           31 :             let (tenant, ctx) = harness.load().await;
    1199            8 :             let timeline = tenant
    1200            8 :                 .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1201           12 :                 .await
    1202            8 :                 .unwrap();
    1203            2 : 
    1204            2 :             //
    1205            2 :             // use filter() and make assertions
    1206            2 :             //
    1207            2 : 
    1208            8 :             let mut filtered_writer = ImageLayerWriter::new(
    1209            8 :                 harness.conf,
    1210            8 :                 timeline_id,
    1211            8 :                 harness.tenant_shard_id,
    1212            8 :                 &range,
    1213            8 :                 lsn,
    1214            8 :                 &ctx,
    1215            8 :             )
    1216            4 :             .await
    1217            8 :             .unwrap();
    1218            2 : 
    1219            8 :             let wrote_keys = resident
    1220            8 :                 .filter(&shard_identity, &mut filtered_writer, &ctx)
    1221       266719 :                 .await
    1222            8 :                 .unwrap();
    1223            8 :             let replacement = if wrote_keys > 0 {
    1224          129 :                 Some(filtered_writer.finish(&timeline, &ctx).await.unwrap())
    1225            2 :             } else {
    1226            2 :                 None
    1227            2 :             };
    1228            2 : 
    1229            2 :             // This exact size and those below will need updating as/when the layer encoding changes, but
    1230            2 :             // should be deterministic for a given version of the format, as we used no randomness generating the input.
    1231            8 :             assert_eq!(original_size, 1597440);
    1232            2 : 
    1233            8 :             match shard_number {
    1234            2 :                 0 => {
    1235            2 :                     // We should have written out just one stripe for our shard identity
    1236            2 :                     assert_eq!(wrote_keys, 0x8000);
    1237            2 :                     let replacement = replacement.unwrap();
    1238            2 : 
    1239            2 :                     // We should have dropped some of the data
    1240            2 :                     assert!(replacement.metadata().file_size < original_size);
    1241            2 :                     assert!(replacement.metadata().file_size > 0);
    1242            2 : 
    1243            2 :                     // Assert that we dropped ~3/4 of the data.
    1244            2 :                     assert_eq!(replacement.metadata().file_size, 417792);
    1245            2 :                 }
    1246            2 :                 1 => {
    1247            2 :                     // Shard 1 has no keys in our input range
    1248            2 :                     assert_eq!(wrote_keys, 0x0);
    1249            2 :                     assert!(replacement.is_none());
    1250            2 :                 }
    1251            2 :                 2 => {
    1252            2 :                     // Shard 2 has one stripes in the input range
    1253            2 :                     assert_eq!(wrote_keys, 0x8000);
    1254            2 :                     let replacement = replacement.unwrap();
    1255            2 :                     assert!(replacement.metadata().file_size < original_size);
    1256            2 :                     assert!(replacement.metadata().file_size > 0);
    1257            2 :                     assert_eq!(replacement.metadata().file_size, 417792);
    1258            2 :                 }
    1259            2 :                 3 => {
    1260            2 :                     // Shard 3 has two stripes in the input range
    1261            2 :                     assert_eq!(wrote_keys, 0x10000);
    1262            2 :                     let replacement = replacement.unwrap();
    1263            2 :                     assert!(replacement.metadata().file_size < original_size);
    1264            2 :                     assert!(replacement.metadata().file_size > 0);
    1265            2 :                     assert_eq!(replacement.metadata().file_size, 811008);
    1266            2 :                 }
    1267            2 :                 _ => unreachable!(),
    1268            2 :             }
    1269            2 :         }
    1270            2 :     }
    1271              : 
    1272            2 :     async fn produce_image_layer(
    1273            2 :         tenant: &Tenant,
    1274            2 :         tline: &Arc<Timeline>,
    1275            2 :         mut images: Vec<(Key, Bytes)>,
    1276            2 :         lsn: Lsn,
    1277            2 :         ctx: &RequestContext,
    1278            2 :     ) -> anyhow::Result<ResidentLayer> {
    1279            2 :         images.sort();
    1280            2 :         let (key_start, _) = images.first().unwrap();
    1281            2 :         let (key_last, _) = images.last().unwrap();
    1282            2 :         let key_end = key_last.next();
    1283            2 :         let key_range = *key_start..key_end;
    1284            2 :         let mut writer = ImageLayerWriter::new(
    1285            2 :             tenant.conf,
    1286            2 :             tline.timeline_id,
    1287            2 :             tenant.tenant_shard_id,
    1288            2 :             &key_range,
    1289            2 :             lsn,
    1290            2 :             ctx,
    1291            2 :         )
    1292            1 :         .await?;
    1293              : 
    1294         2002 :         for (key, img) in images {
    1295         2031 :             writer.put_image(key, img, ctx).await?;
    1296              :         }
    1297            4 :         let img_layer = writer.finish(tline, ctx).await?;
    1298              : 
    1299            2 :         Ok::<_, anyhow::Error>(img_layer)
    1300            2 :     }
    1301              : 
    1302           28 :     async fn assert_img_iter_equal(
    1303           28 :         img_iter: &mut ImageLayerIterator<'_>,
    1304           28 :         expect: &[(Key, Bytes)],
    1305           28 :         expect_lsn: Lsn,
    1306           28 :     ) {
    1307           28 :         let mut expect_iter = expect.iter();
    1308              :         loop {
    1309        28028 :             let o1 = img_iter.next().await.unwrap();
    1310        28028 :             let o2 = expect_iter.next();
    1311        28028 :             match (o1, o2) {
    1312           28 :                 (None, None) => break,
    1313        28000 :                 (Some((k1, l1, v1)), Some((k2, i2))) => {
    1314        28000 :                     let Value::Image(i1) = v1 else {
    1315            0 :                         panic!("expect Value::Image")
    1316              :                     };
    1317        28000 :                     assert_eq!(&k1, k2);
    1318        28000 :                     assert_eq!(l1, expect_lsn);
    1319        28000 :                     assert_eq!(&i1, i2);
    1320              :                 }
    1321            0 :                 (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
    1322              :             }
    1323              :         }
    1324           28 :     }
    1325              : 
    1326              :     #[tokio::test]
    1327            2 :     async fn image_layer_iterator() {
    1328            2 :         let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
    1329            8 :         let (tenant, ctx) = harness.load().await;
    1330            2 : 
    1331            2 :         let tline = tenant
    1332            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    1333            4 :             .await
    1334            2 :             .unwrap();
    1335            2 : 
    1336         2000 :         fn get_key(id: u32) -> Key {
    1337         2000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
    1338         2000 :             key.field6 = id;
    1339         2000 :             key
    1340         2000 :         }
    1341            2 :         const N: usize = 1000;
    1342            2 :         let test_imgs = (0..N)
    1343         2000 :             .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
    1344            2 :             .collect_vec();
    1345            2 :         let resident_layer =
    1346            2 :             produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
    1347         2036 :                 .await
    1348            2 :                 .unwrap();
    1349            2 :         let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
    1350            6 :         for max_read_size in [1, 1024] {
    1351           32 :             for batch_size in [1, 2, 4, 8, 3, 7, 13] {
    1352           28 :                 println!("running with batch_size={batch_size} max_read_size={max_read_size}");
    1353           28 :                 // Test if the batch size is correctly determined
    1354           28 :                 let mut iter = img_layer.iter(&ctx);
    1355           28 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1356           28 :                 let mut num_items = 0;
    1357          112 :                 for _ in 0..3 {
    1358           84 :                     iter.next_batch().await.unwrap();
    1359           84 :                     num_items += iter.key_values_batch.len();
    1360           84 :                     if max_read_size == 1 {
    1361            2 :                         // every key should be a batch b/c the value is larger than max_read_size
    1362           42 :                         assert_eq!(iter.key_values_batch.len(), 1);
    1363            2 :                     } else {
    1364           42 :                         assert_eq!(iter.key_values_batch.len(), batch_size);
    1365            2 :                     }
    1366           84 :                     if num_items >= N {
    1367            2 :                         break;
    1368           84 :                     }
    1369           84 :                     iter.key_values_batch.clear();
    1370            2 :                 }
    1371            2 :                 // Test if the result is correct
    1372           28 :                 let mut iter = img_layer.iter(&ctx);
    1373           28 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1374         9576 :                 assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
    1375            2 :             }
    1376            2 :         }
    1377            2 :     }
    1378              : }
        

Generated by: LCOV version 2.1-beta