LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - image_layer.rs (source / functions) Coverage Total Hit
Test: 90b23405d17e36048d3bb64e314067f397803f1b.info Lines: 79.7 % 916 730
Test Date: 2024-09-20 13:14:58 Functions: 55.8 % 86 48

            Line data    Source code
       1              : //! An ImageLayer represents an image or a snapshot of a key-range at
       2              : //! one particular LSN.
       3              : //!
       4              : //! It contains an image of all key-value pairs in its key-range. Any key
       5              : //! that falls into the image layer's range but does not exist in the layer,
       6              : //! does not exist.
       7              : //!
       8              : //! An image layer is stored in a file on disk. The file is stored in
       9              : //! timelines/<timeline_id> directory.  Currently, there are no
      10              : //! subdirectories, and each image layer file is named like this:
      11              : //!
      12              : //! ```text
      13              : //!    <key start>-<key end>__<LSN>
      14              : //! ```
      15              : //!
      16              : //! For example:
      17              : //!
      18              : //! ```text
      19              : //!    000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
      20              : //! ```
      21              : //!
      22              : //! Every image layer file consists of three parts: "summary",
      23              : //! "index", and "values".  The summary is a fixed size header at the
      24              : //! beginning of the file, and it contains basic information about the
      25              : //! layer, and offsets to the other parts. The "index" is a B-tree,
      26              : //! mapping from Key to an offset in the "values" part.  The
      27              : //! actual page images are stored in the "values" part.
      28              : use crate::config::PageServerConf;
      29              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      30              : use crate::page_cache::{self, FileId, PAGE_SZ};
      31              : use crate::repository::{Key, Value, KEY_SIZE};
      32              : use crate::tenant::blob_io::BlobWriter;
      33              : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
      34              : use crate::tenant::disk_btree::{
      35              :     DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
      36              : };
      37              : use crate::tenant::timeline::GetVectoredError;
      38              : use crate::tenant::vectored_blob_io::{
      39              :     BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
      40              : };
      41              : use crate::tenant::PageReconstructError;
      42              : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
      43              : use crate::virtual_file::{self, VirtualFile};
      44              : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
      45              : use anyhow::{anyhow, bail, ensure, Context, Result};
      46              : use bytes::{Bytes, BytesMut};
      47              : use camino::{Utf8Path, Utf8PathBuf};
      48              : use hex;
      49              : use itertools::Itertools;
      50              : use pageserver_api::config::MaxVectoredReadBytes;
      51              : use pageserver_api::keyspace::KeySpace;
      52              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
      53              : use rand::{distributions::Alphanumeric, Rng};
      54              : use serde::{Deserialize, Serialize};
      55              : use std::collections::VecDeque;
      56              : use std::fs::File;
      57              : use std::io::SeekFrom;
      58              : use std::ops::Range;
      59              : use std::os::unix::prelude::FileExt;
      60              : use std::str::FromStr;
      61              : use tokio::sync::OnceCell;
      62              : use tokio_stream::StreamExt;
      63              : use tracing::*;
      64              : 
      65              : use utils::{
      66              :     bin_ser::BeSer,
      67              :     id::{TenantId, TimelineId},
      68              :     lsn::Lsn,
      69              : };
      70              : 
      71              : use super::layer_name::ImageLayerName;
      72              : use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
      73              : 
      74              : ///
      75              : /// Header stored in the beginning of the file
      76              : ///
      77              : /// After this comes the 'values' part, starting on block 1. After that,
      78              : /// the 'index' starts at the block indicated by 'index_start_blk'
      79              : ///
      80          318 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      81              : pub struct Summary {
      82              :     /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
      83              :     pub magic: u16,
      84              :     pub format_version: u16,
      85              : 
      86              :     pub tenant_id: TenantId,
      87              :     pub timeline_id: TimelineId,
      88              :     pub key_range: Range<Key>,
      89              :     pub lsn: Lsn,
      90              : 
      91              :     /// Block number where the 'index' part of the file begins.
      92              :     pub index_start_blk: u32,
      93              :     /// Block within the 'index', where the B-tree root page is stored
      94              :     pub index_root_blk: u32,
      95              :     // the 'values' part starts after the summary header, on block 1.
      96              : }
      97              : 
      98              : impl From<&ImageLayer> for Summary {
      99            0 :     fn from(layer: &ImageLayer) -> Self {
     100            0 :         Self::expected(
     101            0 :             layer.desc.tenant_shard_id.tenant_id,
     102            0 :             layer.desc.timeline_id,
     103            0 :             layer.desc.key_range.clone(),
     104            0 :             layer.lsn,
     105            0 :         )
     106            0 :     }
     107              : }
     108              : 
     109              : impl Summary {
     110          318 :     pub(super) fn expected(
     111          318 :         tenant_id: TenantId,
     112          318 :         timeline_id: TimelineId,
     113          318 :         key_range: Range<Key>,
     114          318 :         lsn: Lsn,
     115          318 :     ) -> Self {
     116          318 :         Self {
     117          318 :             magic: IMAGE_FILE_MAGIC,
     118          318 :             format_version: STORAGE_FORMAT_VERSION,
     119          318 :             tenant_id,
     120          318 :             timeline_id,
     121          318 :             key_range,
     122          318 :             lsn,
     123          318 : 
     124          318 :             index_start_blk: 0,
     125          318 :             index_root_blk: 0,
     126          318 :         }
     127          318 :     }
     128              : }
     129              : 
     130              : /// This is used only from `pagectl`. Within pageserver, all layers are
     131              : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
     132              : pub struct ImageLayer {
     133              :     path: Utf8PathBuf,
     134              :     pub desc: PersistentLayerDesc,
     135              :     // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
     136              :     pub lsn: Lsn,
     137              :     inner: OnceCell<ImageLayerInner>,
     138              : }
     139              : 
     140              : impl std::fmt::Debug for ImageLayer {
     141            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     142              :         use super::RangeDisplayDebug;
     143              : 
     144            0 :         f.debug_struct("ImageLayer")
     145            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     146            0 :             .field("file_size", &self.desc.file_size)
     147            0 :             .field("lsn", &self.lsn)
     148            0 :             .field("inner", &self.inner)
     149            0 :             .finish()
     150            0 :     }
     151              : }
     152              : 
     153              : /// ImageLayer is the in-memory data structure associated with an on-disk image
     154              : /// file.
     155              : pub struct ImageLayerInner {
     156              :     // values copied from summary
     157              :     index_start_blk: u32,
     158              :     index_root_blk: u32,
     159              : 
     160              :     key_range: Range<Key>,
     161              :     lsn: Lsn,
     162              : 
     163              :     file: VirtualFile,
     164              :     file_id: FileId,
     165              : 
     166              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     167              : }
     168              : 
     169              : impl ImageLayerInner {
     170            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
     171            0 :         format!(
     172            0 :             "image {}..{} {}",
     173            0 :             self.key_range().start,
     174            0 :             self.key_range().end,
     175            0 :             self.lsn()
     176            0 :         )
     177            0 :     }
     178              : }
     179              : 
     180              : impl std::fmt::Debug for ImageLayerInner {
     181            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     182            0 :         f.debug_struct("ImageLayerInner")
     183            0 :             .field("index_start_blk", &self.index_start_blk)
     184            0 :             .field("index_root_blk", &self.index_root_blk)
     185            0 :             .finish()
     186            0 :     }
     187              : }
     188              : 
     189              : impl ImageLayerInner {
     190            0 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
     191            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     192            0 :         let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
     193            0 :             self.index_start_blk,
     194            0 :             self.index_root_blk,
     195            0 :             block_reader,
     196            0 :         );
     197            0 : 
     198            0 :         tree_reader.dump().await?;
     199              : 
     200            0 :         tree_reader
     201            0 :             .visit(
     202            0 :                 &[0u8; KEY_SIZE],
     203            0 :                 VisitDirection::Forwards,
     204            0 :                 |key, value| {
     205            0 :                     println!("key: {} offset {}", hex::encode(key), value);
     206            0 :                     true
     207            0 :                 },
     208            0 :                 ctx,
     209            0 :             )
     210            0 :             .await?;
     211              : 
     212            0 :         Ok(())
     213            0 :     }
     214              : }
     215              : 
     216              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     217              : impl std::fmt::Display for ImageLayer {
     218            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     219            0 :         write!(f, "{}", self.layer_desc().short_id())
     220            0 :     }
     221              : }
     222              : 
     223              : impl AsLayerDesc for ImageLayer {
     224            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     225            0 :         &self.desc
     226            0 :     }
     227              : }
     228              : 
     229              : impl ImageLayer {
     230            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     231            0 :         self.desc.dump();
     232            0 : 
     233            0 :         if !verbose {
     234            0 :             return Ok(());
     235            0 :         }
     236              : 
     237            0 :         let inner = self.load(ctx).await?;
     238              : 
     239            0 :         inner.dump(ctx).await?;
     240              : 
     241            0 :         Ok(())
     242            0 :     }
     243              : 
     244         1494 :     fn temp_path_for(
     245         1494 :         conf: &PageServerConf,
     246         1494 :         timeline_id: TimelineId,
     247         1494 :         tenant_shard_id: TenantShardId,
     248         1494 :         fname: &ImageLayerName,
     249         1494 :     ) -> Utf8PathBuf {
     250         1494 :         let rand_string: String = rand::thread_rng()
     251         1494 :             .sample_iter(&Alphanumeric)
     252         1494 :             .take(8)
     253         1494 :             .map(char::from)
     254         1494 :             .collect();
     255         1494 : 
     256         1494 :         conf.timeline_path(&tenant_shard_id, &timeline_id)
     257         1494 :             .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
     258         1494 :     }
     259              : 
     260              :     ///
     261              :     /// Open the underlying file and read the metadata into memory, if it's
     262              :     /// not loaded already.
     263              :     ///
     264            0 :     async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
     265            0 :         self.inner
     266            0 :             .get_or_try_init(|| self.load_inner(ctx))
     267            0 :             .await
     268            0 :             .with_context(|| format!("Failed to load image layer {}", self.path()))
     269            0 :     }
     270              : 
     271            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
     272            0 :         let path = self.path();
     273              : 
     274            0 :         let loaded =
     275            0 :             ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
     276              : 
     277              :         // not production code
     278            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     279            0 :         let expected_layer_name = self.layer_desc().layer_name();
     280            0 : 
     281            0 :         if actual_layer_name != expected_layer_name {
     282            0 :             println!("warning: filename does not match what is expected from in-file summary");
     283            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     284            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     285            0 :         }
     286              : 
     287            0 :         Ok(loaded)
     288            0 :     }
     289              : 
     290              :     /// Create an ImageLayer struct representing an existing file on disk.
     291              :     ///
     292              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     293            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
     294            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     295            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     296            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     297            0 :         let metadata = file
     298            0 :             .metadata()
     299            0 :             .context("get file metadata to determine size")?;
     300              : 
     301              :         // This function is never used for constructing layers in a running pageserver,
     302              :         // so it does not need an accurate TenantShardId.
     303            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     304            0 : 
     305            0 :         Ok(ImageLayer {
     306            0 :             path: path.to_path_buf(),
     307            0 :             desc: PersistentLayerDesc::new_img(
     308            0 :                 tenant_shard_id,
     309            0 :                 summary.timeline_id,
     310            0 :                 summary.key_range,
     311            0 :                 summary.lsn,
     312            0 :                 metadata.len(),
     313            0 :             ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
     314            0 :             lsn: summary.lsn,
     315            0 :             inner: OnceCell::new(),
     316            0 :         })
     317            0 :     }
     318              : 
     319            0 :     fn path(&self) -> Utf8PathBuf {
     320            0 :         self.path.clone()
     321            0 :     }
     322              : }
     323              : 
     324            0 : #[derive(thiserror::Error, Debug)]
     325              : pub enum RewriteSummaryError {
     326              :     #[error("magic mismatch")]
     327              :     MagicMismatch,
     328              :     #[error(transparent)]
     329              :     Other(#[from] anyhow::Error),
     330              : }
     331              : 
     332              : impl From<std::io::Error> for RewriteSummaryError {
     333            0 :     fn from(e: std::io::Error) -> Self {
     334            0 :         Self::Other(anyhow::anyhow!(e))
     335            0 :     }
     336              : }
     337              : 
     338              : impl ImageLayer {
     339            0 :     pub async fn rewrite_summary<F>(
     340            0 :         path: &Utf8Path,
     341            0 :         rewrite: F,
     342            0 :         ctx: &RequestContext,
     343            0 :     ) -> Result<(), RewriteSummaryError>
     344            0 :     where
     345            0 :         F: Fn(Summary) -> Summary,
     346            0 :     {
     347            0 :         let mut file = VirtualFile::open_with_options(
     348            0 :             path,
     349            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     350            0 :             ctx,
     351            0 :         )
     352            0 :         .await
     353            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     354            0 :         let file_id = page_cache::next_file_id();
     355            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     356            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     357            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     358            0 :         if actual_summary.magic != IMAGE_FILE_MAGIC {
     359            0 :             return Err(RewriteSummaryError::MagicMismatch);
     360            0 :         }
     361            0 : 
     362            0 :         let new_summary = rewrite(actual_summary);
     363            0 : 
     364            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     365            0 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     366            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     367            0 :         file.seek(SeekFrom::Start(0)).await?;
     368            0 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     369            0 :         res?;
     370            0 :         Ok(())
     371            0 :     }
     372              : }
     373              : 
     374              : impl ImageLayerInner {
     375          108 :     pub(crate) fn key_range(&self) -> &Range<Key> {
     376          108 :         &self.key_range
     377          108 :     }
     378              : 
     379          108 :     pub(crate) fn lsn(&self) -> Lsn {
     380          108 :         self.lsn
     381          108 :     }
     382              : 
     383          318 :     pub(super) async fn load(
     384          318 :         path: &Utf8Path,
     385          318 :         lsn: Lsn,
     386          318 :         summary: Option<Summary>,
     387          318 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     388          318 :         ctx: &RequestContext,
     389          318 :     ) -> anyhow::Result<Self> {
     390          318 :         let file = VirtualFile::open(path, ctx)
     391          159 :             .await
     392          318 :             .context("open layer file")?;
     393          318 :         let file_id = page_cache::next_file_id();
     394          318 :         let block_reader = FileBlockReader::new(&file, file_id);
     395          318 :         let summary_blk = block_reader
     396          318 :             .read_blk(0, ctx)
     397          162 :             .await
     398          318 :             .context("read first block")?;
     399              : 
     400              :         // length is the only way how this could fail, so it's not actually likely at all unless
     401              :         // read_blk returns wrong sized block.
     402              :         //
     403              :         // TODO: confirm and make this into assertion
     404          318 :         let actual_summary =
     405          318 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     406              : 
     407          318 :         if let Some(mut expected_summary) = summary {
     408              :             // production code path
     409          318 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     410          318 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     411          318 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     412          318 :             expected_summary.timeline_id = actual_summary.timeline_id;
     413          318 : 
     414          318 :             if actual_summary != expected_summary {
     415            0 :                 bail!(
     416            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     417            0 :                     actual_summary,
     418            0 :                     expected_summary
     419            0 :                 );
     420          318 :             }
     421            0 :         }
     422              : 
     423          318 :         Ok(ImageLayerInner {
     424          318 :             index_start_blk: actual_summary.index_start_blk,
     425          318 :             index_root_blk: actual_summary.index_root_blk,
     426          318 :             lsn,
     427          318 :             file,
     428          318 :             file_id,
     429          318 :             max_vectored_read_bytes,
     430          318 :             key_range: actual_summary.key_range,
     431          318 :         })
     432          318 :     }
     433              : 
     434              :     // Look up the keys in the provided keyspace and update
     435              :     // the reconstruct state with whatever is found.
     436        24184 :     pub(super) async fn get_values_reconstruct_data(
     437        24184 :         &self,
     438        24184 :         keyspace: KeySpace,
     439        24184 :         reconstruct_state: &mut ValuesReconstructState,
     440        24184 :         ctx: &RequestContext,
     441        24184 :     ) -> Result<(), GetVectoredError> {
     442        24184 :         let reads = self
     443        24184 :             .plan_reads(keyspace, None, ctx)
     444         2129 :             .await
     445        24184 :             .map_err(GetVectoredError::Other)?;
     446              : 
     447        24184 :         self.do_reads_and_update_state(reads, reconstruct_state, ctx)
     448        14255 :             .await;
     449              : 
     450        24184 :         reconstruct_state.on_image_layer_visited(&self.key_range);
     451        24184 : 
     452        24184 :         Ok(())
     453        24184 :     }
     454              : 
     455              :     /// Traverse the layer's index to build read operations on the overlap of the input keyspace
     456              :     /// and the keys in this layer.
     457              :     ///
     458              :     /// If shard_identity is provided, it will be used to filter keys down to those stored on
     459              :     /// this shard.
     460        24208 :     async fn plan_reads(
     461        24208 :         &self,
     462        24208 :         keyspace: KeySpace,
     463        24208 :         shard_identity: Option<&ShardIdentity>,
     464        24208 :         ctx: &RequestContext,
     465        24208 :     ) -> anyhow::Result<Vec<VectoredRead>> {
     466        24208 :         let mut planner = VectoredReadPlanner::new(
     467        24208 :             self.max_vectored_read_bytes
     468        24208 :                 .expect("Layer is loaded with max vectored bytes config")
     469        24208 :                 .0
     470        24208 :                 .into(),
     471        24208 :         );
     472        24208 : 
     473        24208 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     474        24208 :         let tree_reader =
     475        24208 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     476        24208 : 
     477        24208 :         let ctx = RequestContextBuilder::extend(ctx)
     478        24208 :             .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     479        24208 :             .build();
     480              : 
     481        89136 :         for range in keyspace.ranges.iter() {
     482        89136 :             let mut range_end_handled = false;
     483        89136 :             let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     484        89136 :             range.start.write_to_byte_slice(&mut search_key);
     485        89136 : 
     486        89136 :             let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
     487        89136 :             let mut index_stream = std::pin::pin!(index_stream);
     488              : 
     489      3290134 :             while let Some(index_entry) = index_stream.next().await {
     490      3289527 :                 let (raw_key, offset) = index_entry?;
     491              : 
     492      3289527 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     493      3289527 :                 assert!(key >= range.start);
     494              : 
     495      3289527 :                 let flag = if let Some(shard_identity) = shard_identity {
     496      3145728 :                     if shard_identity.is_key_disposable(&key) {
     497      2359296 :                         BlobFlag::Ignore
     498              :                     } else {
     499       786432 :                         BlobFlag::None
     500              :                     }
     501              :                 } else {
     502       143799 :                     BlobFlag::None
     503              :                 };
     504              : 
     505      3289527 :                 if key >= range.end {
     506        88529 :                     planner.handle_range_end(offset);
     507        88529 :                     range_end_handled = true;
     508        88529 :                     break;
     509      3200998 :                 } else {
     510      3200998 :                     planner.handle(key, self.lsn, offset, flag);
     511      3200998 :                 }
     512              :             }
     513              : 
     514        89136 :             if !range_end_handled {
     515          607 :                 let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
     516          607 :                 planner.handle_range_end(payload_end);
     517        88529 :             }
     518              :         }
     519              : 
     520        24208 :         Ok(planner.finish())
     521        24208 :     }
     522              : 
     523              :     /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
     524              :     /// then execute vectored GET operations, passing the results of all read keys into the writer.
     525           24 :     pub(super) async fn filter(
     526           24 :         &self,
     527           24 :         shard_identity: &ShardIdentity,
     528           24 :         writer: &mut ImageLayerWriter,
     529           24 :         ctx: &RequestContext,
     530           24 :     ) -> anyhow::Result<usize> {
     531              :         // Fragment the range into the regions owned by this ShardIdentity
     532           24 :         let plan = self
     533           24 :             .plan_reads(
     534           24 :                 KeySpace {
     535           24 :                     // If asked for the total key space, plan_reads will give us all the keys in the layer
     536           24 :                     ranges: vec![Key::MIN..Key::MAX],
     537           24 :                 },
     538           24 :                 Some(shard_identity),
     539           24 :                 ctx,
     540           24 :             )
     541         1407 :             .await?;
     542              : 
     543           24 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     544           24 :         let mut key_count = 0;
     545           48 :         for read in plan.into_iter() {
     546           48 :             let buf_size = read.size();
     547           48 : 
     548           48 :             let buf = BytesMut::with_capacity(buf_size);
     549           48 :             let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
     550              : 
     551           48 :             let frozen_buf = blobs_buf.buf.freeze();
     552              : 
     553       786432 :             for meta in blobs_buf.blobs.iter() {
     554       786432 :                 let img_buf = frozen_buf.slice(meta.start..meta.end);
     555       786432 : 
     556       786432 :                 key_count += 1;
     557       786432 :                 writer
     558       786432 :                     .put_image(meta.meta.key, img_buf, ctx)
     559       798720 :                     .await
     560       786432 :                     .context(format!("Storing key {}", meta.meta.key))?;
     561              :             }
     562              :         }
     563              : 
     564           24 :         Ok(key_count)
     565           24 :     }
     566              : 
     567        24184 :     async fn do_reads_and_update_state(
     568        24184 :         &self,
     569        24184 :         reads: Vec<VectoredRead>,
     570        24184 :         reconstruct_state: &mut ValuesReconstructState,
     571        24184 :         ctx: &RequestContext,
     572        24184 :     ) {
     573        24184 :         let max_vectored_read_bytes = self
     574        24184 :             .max_vectored_read_bytes
     575        24184 :             .expect("Layer is loaded with max vectored bytes config")
     576        24184 :             .0
     577        24184 :             .into();
     578        24184 : 
     579        24184 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     580        27813 :         for read in reads.into_iter() {
     581        27813 :             let buf_size = read.size();
     582        27813 : 
     583        27813 :             if buf_size > max_vectored_read_bytes {
     584              :                 // If the read is oversized, it should only contain one key.
     585            0 :                 let offenders = read
     586            0 :                     .blobs_at
     587            0 :                     .as_slice()
     588            0 :                     .iter()
     589            0 :                     .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
     590            0 :                     .join(", ");
     591            0 :                 tracing::warn!(
     592            0 :                     "Oversized vectored read ({} > {}) for keys {}",
     593              :                     buf_size,
     594              :                     max_vectored_read_bytes,
     595              :                     offenders
     596              :                 );
     597        27813 :             }
     598              : 
     599        27813 :             let buf = BytesMut::with_capacity(buf_size);
     600        27813 :             let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
     601              : 
     602        27813 :             match res {
     603        27813 :                 Ok(blobs_buf) => {
     604        27813 :                     let frozen_buf = blobs_buf.buf.freeze();
     605              : 
     606        55270 :                     for meta in blobs_buf.blobs.iter() {
     607        55270 :                         let img_buf = frozen_buf.slice(meta.start..meta.end);
     608        55270 :                         reconstruct_state.update_key(
     609        55270 :                             &meta.meta.key,
     610        55270 :                             self.lsn,
     611        55270 :                             Value::Image(img_buf),
     612        55270 :                         );
     613        55270 :                     }
     614              :                 }
     615            0 :                 Err(err) => {
     616            0 :                     let kind = err.kind();
     617            0 :                     for (_, blob_meta) in read.blobs_at.as_slice() {
     618            0 :                         reconstruct_state.on_key_error(
     619            0 :                             blob_meta.key,
     620            0 :                             PageReconstructError::from(anyhow!(
     621            0 :                                 "Failed to read blobs from virtual file {}: {}",
     622            0 :                                 self.file.path,
     623            0 :                                 kind
     624            0 :                             )),
     625            0 :                         );
     626            0 :                     }
     627              :                 }
     628              :             };
     629              :         }
     630        24184 :     }
     631              : 
     632          276 :     pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
     633          276 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     634          276 :         let tree_reader =
     635          276 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     636          276 :         ImageLayerIterator {
     637          276 :             image_layer: self,
     638          276 :             ctx,
     639          276 :             index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
     640          276 :             key_values_batch: VecDeque::new(),
     641          276 :             is_end: false,
     642          276 :             planner: StreamingVectoredReadPlanner::new(
     643          276 :                 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
     644          276 :                 1024,        // The default value. Unit tests might use a different value
     645          276 :             ),
     646          276 :         }
     647          276 :     }
     648              : }
     649              : 
     650              : /// A builder object for constructing a new image layer.
     651              : ///
     652              : /// Usage:
     653              : ///
     654              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     655              : ///
     656              : /// 2. Write the contents by calling `put_page_image` for every key-value
     657              : ///    pair in the key range.
     658              : ///
     659              : /// 3. Call `finish`.
     660              : ///
     661              : struct ImageLayerWriterInner {
     662              :     conf: &'static PageServerConf,
     663              :     path: Utf8PathBuf,
     664              :     timeline_id: TimelineId,
     665              :     tenant_shard_id: TenantShardId,
     666              :     key_range: Range<Key>,
     667              :     lsn: Lsn,
     668              : 
     669              :     // Total uncompressed bytes passed into put_image
     670              :     uncompressed_bytes: u64,
     671              : 
     672              :     // Like `uncompressed_bytes`,
     673              :     // but only of images we might consider for compression
     674              :     uncompressed_bytes_eligible: u64,
     675              : 
     676              :     // Like `uncompressed_bytes`, but only of images
     677              :     // where we have chosen their compressed form
     678              :     uncompressed_bytes_chosen: u64,
     679              : 
     680              :     // Number of keys in the layer.
     681              :     num_keys: usize,
     682              : 
     683              :     blob_writer: BlobWriter<false>,
     684              :     tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
     685              : 
     686              :     #[cfg(feature = "testing")]
     687              :     last_written_key: Key,
     688              : }
     689              : 
     690              : impl ImageLayerWriterInner {
     691              :     ///
     692              :     /// Start building a new image layer.
     693              :     ///
     694         1494 :     async fn new(
     695         1494 :         conf: &'static PageServerConf,
     696         1494 :         timeline_id: TimelineId,
     697         1494 :         tenant_shard_id: TenantShardId,
     698         1494 :         key_range: &Range<Key>,
     699         1494 :         lsn: Lsn,
     700         1494 :         ctx: &RequestContext,
     701         1494 :     ) -> anyhow::Result<Self> {
     702         1494 :         // Create the file initially with a temporary filename.
     703         1494 :         // We'll atomically rename it to the final name when we're done.
     704         1494 :         let path = ImageLayer::temp_path_for(
     705         1494 :             conf,
     706         1494 :             timeline_id,
     707         1494 :             tenant_shard_id,
     708         1494 :             &ImageLayerName {
     709         1494 :                 key_range: key_range.clone(),
     710         1494 :                 lsn,
     711         1494 :             },
     712         1494 :         );
     713         1494 :         trace!("creating image layer {}", path);
     714         1494 :         let mut file = {
     715         1494 :             VirtualFile::open_with_options(
     716         1494 :                 &path,
     717         1494 :                 virtual_file::OpenOptions::new()
     718         1494 :                     .write(true)
     719         1494 :                     .create_new(true),
     720         1494 :                 ctx,
     721         1494 :             )
     722          998 :             .await?
     723              :         };
     724              :         // make room for the header block
     725         1494 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     726         1494 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     727         1494 : 
     728         1494 :         // Initialize the b-tree index builder
     729         1494 :         let block_buf = BlockBuf::new();
     730         1494 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     731         1494 : 
     732         1494 :         let writer = Self {
     733         1494 :             conf,
     734         1494 :             path,
     735         1494 :             timeline_id,
     736         1494 :             tenant_shard_id,
     737         1494 :             key_range: key_range.clone(),
     738         1494 :             lsn,
     739         1494 :             tree: tree_builder,
     740         1494 :             blob_writer,
     741         1494 :             uncompressed_bytes: 0,
     742         1494 :             uncompressed_bytes_eligible: 0,
     743         1494 :             uncompressed_bytes_chosen: 0,
     744         1494 :             num_keys: 0,
     745         1494 :             #[cfg(feature = "testing")]
     746         1494 :             last_written_key: Key::MIN,
     747         1494 :         };
     748         1494 : 
     749         1494 :         Ok(writer)
     750         1494 :     }
     751              : 
     752              :     ///
     753              :     /// Write next value to the file.
     754              :     ///
     755              :     /// The page versions must be appended in blknum order.
     756              :     ///
     757      1638552 :     async fn put_image(
     758      1638552 :         &mut self,
     759      1638552 :         key: Key,
     760      1638552 :         img: Bytes,
     761      1638552 :         ctx: &RequestContext,
     762      1638552 :     ) -> anyhow::Result<()> {
     763      1638552 :         ensure!(self.key_range.contains(&key));
     764      1638552 :         let compression = self.conf.image_compression;
     765      1638552 :         let uncompressed_len = img.len() as u64;
     766      1638552 :         self.uncompressed_bytes += uncompressed_len;
     767      1638552 :         self.num_keys += 1;
     768      1638552 :         let (_img, res) = self
     769      1638552 :             .blob_writer
     770      1638552 :             .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
     771      1664445 :             .await;
     772              :         // TODO: re-use the buffer for `img` further upstack
     773      1638552 :         let (off, compression_info) = res?;
     774      1638552 :         if compression_info.compressed_size.is_some() {
     775        24006 :             // The image has been considered for compression at least
     776        24006 :             self.uncompressed_bytes_eligible += uncompressed_len;
     777      1614546 :         }
     778      1638552 :         if compression_info.written_compressed {
     779            0 :             // The image has been compressed
     780            0 :             self.uncompressed_bytes_chosen += uncompressed_len;
     781      1638552 :         }
     782              : 
     783      1638552 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     784      1638552 :         key.write_to_byte_slice(&mut keybuf);
     785      1638552 :         self.tree.append(&keybuf, off)?;
     786              : 
     787              :         #[cfg(feature = "testing")]
     788      1638552 :         {
     789      1638552 :             self.last_written_key = key;
     790      1638552 :         }
     791      1638552 : 
     792      1638552 :         Ok(())
     793      1638552 :     }
     794              : 
     795              :     ///
     796              :     /// Finish writing the image layer.
     797              :     ///
     798          918 :     async fn finish(
     799          918 :         self,
     800          918 :         ctx: &RequestContext,
     801          918 :         end_key: Option<Key>,
     802          918 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     803          918 :         let index_start_blk =
     804          918 :             ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
     805          918 : 
     806          918 :         // Calculate compression ratio
     807          918 :         let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
     808          918 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
     809          918 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
     810          918 :             .inc_by(self.uncompressed_bytes_eligible);
     811          918 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
     812          918 :         crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
     813          918 : 
     814          918 :         let mut file = self.blob_writer.into_inner();
     815          918 : 
     816          918 :         // Write out the index
     817          918 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     818            0 :             .await?;
     819          918 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     820         3204 :         for buf in block_buf.blocks {
     821         2286 :             let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     822         2286 :             res?;
     823              :         }
     824              : 
     825          918 :         let final_key_range = if let Some(end_key) = end_key {
     826          102 :             self.key_range.start..end_key
     827              :         } else {
     828          816 :             self.key_range.clone()
     829              :         };
     830              : 
     831              :         // Fill in the summary on blk 0
     832          918 :         let summary = Summary {
     833          918 :             magic: IMAGE_FILE_MAGIC,
     834          918 :             format_version: STORAGE_FORMAT_VERSION,
     835          918 :             tenant_id: self.tenant_shard_id.tenant_id,
     836          918 :             timeline_id: self.timeline_id,
     837          918 :             key_range: final_key_range.clone(),
     838          918 :             lsn: self.lsn,
     839          918 :             index_start_blk,
     840          918 :             index_root_blk,
     841          918 :         };
     842          918 : 
     843          918 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     844          918 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     845          918 :         Summary::ser_into(&summary, &mut buf)?;
     846          918 :         file.seek(SeekFrom::Start(0)).await?;
     847          918 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     848          918 :         res?;
     849              : 
     850          918 :         let metadata = file
     851          918 :             .metadata()
     852          464 :             .await
     853          918 :             .context("get metadata to determine file size")?;
     854              : 
     855          918 :         let desc = PersistentLayerDesc::new_img(
     856          918 :             self.tenant_shard_id,
     857          918 :             self.timeline_id,
     858          918 :             final_key_range,
     859          918 :             self.lsn,
     860          918 :             metadata.len(),
     861          918 :         );
     862              : 
     863              :         #[cfg(feature = "testing")]
     864          918 :         if let Some(end_key) = end_key {
     865          102 :             assert!(
     866          102 :                 self.last_written_key < end_key,
     867            0 :                 "written key violates end_key range"
     868              :             );
     869          816 :         }
     870              : 
     871              :         // Note: Because we open the file in write-only mode, we cannot
     872              :         // reuse the same VirtualFile for reading later. That's why we don't
     873              :         // set inner.file here. The first read will have to re-open it.
     874              : 
     875              :         // fsync the file
     876          918 :         file.sync_all().await?;
     877              : 
     878          918 :         trace!("created image layer {}", self.path);
     879              : 
     880          918 :         Ok((desc, self.path))
     881          918 :     }
     882              : }
     883              : 
     884              : /// A builder object for constructing a new image layer.
     885              : ///
     886              : /// Usage:
     887              : ///
     888              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     889              : ///
     890              : /// 2. Write the contents by calling `put_page_image` for every key-value
     891              : ///    pair in the key range.
     892              : ///
     893              : /// 3. Call `finish`.
     894              : ///
     895              : /// # Note
     896              : ///
     897              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     898              : /// possible for the writer to drop before `finish` is actually called. So this
     899              : /// could lead to odd temporary files in the directory, exhausting file system.
     900              : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
     901              : /// implementation that cleans up the temporary file in failure. It's not
     902              : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
     903              : /// out some fields, making it impossible to implement `Drop`.
     904              : ///
     905              : #[must_use]
     906              : pub struct ImageLayerWriter {
     907              :     inner: Option<ImageLayerWriterInner>,
     908              : }
     909              : 
     910              : impl ImageLayerWriter {
     911              :     ///
     912              :     /// Start building a new image layer.
     913              :     ///
     914         1494 :     pub async fn new(
     915         1494 :         conf: &'static PageServerConf,
     916         1494 :         timeline_id: TimelineId,
     917         1494 :         tenant_shard_id: TenantShardId,
     918         1494 :         key_range: &Range<Key>,
     919         1494 :         lsn: Lsn,
     920         1494 :         ctx: &RequestContext,
     921         1494 :     ) -> anyhow::Result<ImageLayerWriter> {
     922         1494 :         Ok(Self {
     923         1494 :             inner: Some(
     924         1494 :                 ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
     925          998 :                     .await?,
     926              :             ),
     927              :         })
     928         1494 :     }
     929              : 
     930              :     ///
     931              :     /// Write next value to the file.
     932              :     ///
     933              :     /// The page versions must be appended in blknum order.
     934              :     ///
     935      1638552 :     pub async fn put_image(
     936      1638552 :         &mut self,
     937      1638552 :         key: Key,
     938      1638552 :         img: Bytes,
     939      1638552 :         ctx: &RequestContext,
     940      1638552 :     ) -> anyhow::Result<()> {
     941      1664445 :         self.inner.as_mut().unwrap().put_image(key, img, ctx).await
     942      1638552 :     }
     943              : 
     944              :     /// Estimated size of the image layer.
     945        25146 :     pub(crate) fn estimated_size(&self) -> u64 {
     946        25146 :         let inner = self.inner.as_ref().unwrap();
     947        25146 :         inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
     948        25146 :     }
     949              : 
     950        25326 :     pub(crate) fn num_keys(&self) -> usize {
     951        25326 :         self.inner.as_ref().unwrap().num_keys
     952        25326 :     }
     953              : 
     954              :     ///
     955              :     /// Finish writing the image layer.
     956              :     ///
     957          816 :     pub(crate) async fn finish(
     958          816 :         mut self,
     959          816 :         ctx: &RequestContext,
     960          816 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     961         2344 :         self.inner.take().unwrap().finish(ctx, None).await
     962          816 :     }
     963              : 
     964              :     /// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
     965          102 :     pub(super) async fn finish_with_end_key(
     966          102 :         mut self,
     967          102 :         end_key: Key,
     968          102 :         ctx: &RequestContext,
     969          102 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     970          210 :         self.inner.take().unwrap().finish(ctx, Some(end_key)).await
     971          102 :     }
     972              : }
     973              : 
     974              : impl Drop for ImageLayerWriter {
     975         1494 :     fn drop(&mut self) {
     976         1494 :         if let Some(inner) = self.inner.take() {
     977          576 :             inner.blob_writer.into_inner().remove();
     978          918 :         }
     979         1494 :     }
     980              : }
     981              : 
     982              : pub struct ImageLayerIterator<'a> {
     983              :     image_layer: &'a ImageLayerInner,
     984              :     ctx: &'a RequestContext,
     985              :     planner: StreamingVectoredReadPlanner,
     986              :     index_iter: DiskBtreeIterator<'a>,
     987              :     key_values_batch: VecDeque<(Key, Lsn, Value)>,
     988              :     is_end: bool,
     989              : }
     990              : 
     991              : impl<'a> ImageLayerIterator<'a> {
     992            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
     993            0 :         self.image_layer.layer_dbg_info()
     994            0 :     }
     995              : 
     996              :     /// Retrieve a batch of key-value pairs into the iterator buffer.
     997        57052 :     async fn next_batch(&mut self) -> anyhow::Result<()> {
     998        57052 :         assert!(self.key_values_batch.is_empty());
     999        57052 :         assert!(!self.is_end);
    1000              : 
    1001        57052 :         let plan = loop {
    1002        86310 :             if let Some(res) = self.index_iter.next().await {
    1003        86118 :                 let (raw_key, offset) = res?;
    1004        86118 :                 if let Some(batch_plan) = self.planner.handle(
    1005        86118 :                     Key::from_slice(&raw_key[..KEY_SIZE]),
    1006        86118 :                     self.image_layer.lsn,
    1007        86118 :                     offset,
    1008        86118 :                 ) {
    1009        56860 :                     break batch_plan;
    1010        29258 :                 }
    1011              :             } else {
    1012          192 :                 self.is_end = true;
    1013          192 :                 let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
    1014          192 :                 if let Some(item) = self.planner.handle_range_end(payload_end) {
    1015          192 :                     break item;
    1016              :                 } else {
    1017            0 :                     return Ok(()); // TODO: a test case on empty iterator
    1018              :                 }
    1019              :             }
    1020              :         };
    1021        57052 :         let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
    1022        57052 :         let mut next_batch = std::collections::VecDeque::new();
    1023        57052 :         let buf_size = plan.size();
    1024        57052 :         let buf = BytesMut::with_capacity(buf_size);
    1025        57052 :         let blobs_buf = vectored_blob_reader
    1026        57052 :             .read_blobs(&plan, buf, self.ctx)
    1027        28970 :             .await?;
    1028        57052 :         let frozen_buf: Bytes = blobs_buf.buf.freeze();
    1029        86034 :         for meta in blobs_buf.blobs.iter() {
    1030        86034 :             let img_buf = frozen_buf.slice(meta.start..meta.end);
    1031        86034 :             next_batch.push_back((meta.meta.key, self.image_layer.lsn, Value::Image(img_buf)));
    1032        86034 :         }
    1033        57052 :         self.key_values_batch = next_batch;
    1034        57052 :         Ok(())
    1035        57052 :     }
    1036              : 
    1037        85524 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
    1038        85524 :         if self.key_values_batch.is_empty() {
    1039        57100 :             if self.is_end {
    1040          300 :                 return Ok(None);
    1041        56800 :             }
    1042        56800 :             self.next_batch().await?;
    1043        28424 :         }
    1044        85224 :         Ok(Some(
    1045        85224 :             self.key_values_batch
    1046        85224 :                 .pop_front()
    1047        85224 :                 .expect("should not be empty"),
    1048        85224 :         ))
    1049        85524 :     }
    1050              : }
    1051              : 
    1052              : #[cfg(test)]
    1053              : mod test {
    1054              :     use std::{sync::Arc, time::Duration};
    1055              : 
    1056              :     use bytes::Bytes;
    1057              :     use itertools::Itertools;
    1058              :     use pageserver_api::{
    1059              :         key::Key,
    1060              :         shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
    1061              :     };
    1062              :     use utils::{
    1063              :         generation::Generation,
    1064              :         id::{TenantId, TimelineId},
    1065              :         lsn::Lsn,
    1066              :     };
    1067              : 
    1068              :     use crate::{
    1069              :         context::RequestContext,
    1070              :         repository::Value,
    1071              :         tenant::{
    1072              :             config::TenantConf,
    1073              :             harness::{TenantHarness, TIMELINE_ID},
    1074              :             storage_layer::{Layer, ResidentLayer},
    1075              :             vectored_blob_io::StreamingVectoredReadPlanner,
    1076              :             Tenant, Timeline,
    1077              :         },
    1078              :         DEFAULT_PG_VERSION,
    1079              :     };
    1080              : 
    1081              :     use super::{ImageLayerIterator, ImageLayerWriter};
    1082              : 
    1083              :     #[tokio::test]
    1084            6 :     async fn image_layer_rewrite() {
    1085            6 :         let tenant_conf = TenantConf {
    1086            6 :             gc_period: Duration::ZERO,
    1087            6 :             compaction_period: Duration::ZERO,
    1088            6 :             ..TenantConf::default()
    1089            6 :         };
    1090            6 :         let tenant_id = TenantId::generate();
    1091            6 :         let mut gen = Generation::new(0xdead0001);
    1092           30 :         let mut get_next_gen = || {
    1093           30 :             let ret = gen;
    1094           30 :             gen = gen.next();
    1095           30 :             ret
    1096           30 :         };
    1097            6 :         // The LSN at which we will create an image layer to filter
    1098            6 :         let lsn = Lsn(0xdeadbeef0000);
    1099            6 :         let timeline_id = TimelineId::generate();
    1100            6 : 
    1101            6 :         //
    1102            6 :         // Create an unsharded parent with a layer.
    1103            6 :         //
    1104            6 : 
    1105            6 :         let harness = TenantHarness::create_custom(
    1106            6 :             "test_image_layer_rewrite--parent",
    1107            6 :             tenant_conf.clone(),
    1108            6 :             tenant_id,
    1109            6 :             ShardIdentity::unsharded(),
    1110            6 :             get_next_gen(),
    1111            6 :         )
    1112            6 :         .await
    1113            6 :         .unwrap();
    1114           24 :         let (tenant, ctx) = harness.load().await;
    1115            6 :         let timeline = tenant
    1116            6 :             .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1117           12 :             .await
    1118            6 :             .unwrap();
    1119            6 : 
    1120            6 :         // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
    1121            6 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1122            6 :         let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
    1123            6 :         let range = input_start..input_end;
    1124            6 : 
    1125            6 :         // Build an image layer to filter
    1126            6 :         let resident = {
    1127            6 :             let mut writer = ImageLayerWriter::new(
    1128            6 :                 harness.conf,
    1129            6 :                 timeline_id,
    1130            6 :                 harness.tenant_shard_id,
    1131            6 :                 &range,
    1132            6 :                 lsn,
    1133            6 :                 &ctx,
    1134            6 :             )
    1135            6 :             .await
    1136            6 :             .unwrap();
    1137            6 : 
    1138            6 :             let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
    1139            6 :             let mut key = range.start;
    1140       786438 :             while key < range.end {
    1141       798717 :                 writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
    1142       786432 : 
    1143       786432 :                 key = key.next();
    1144            6 :             }
    1145          357 :             let (desc, path) = writer.finish(&ctx).await.unwrap();
    1146            6 :             Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
    1147            6 :         };
    1148            6 :         let original_size = resident.metadata().file_size;
    1149            6 : 
    1150            6 :         //
    1151            6 :         // Create child shards and do the rewrite, exercising filter().
    1152            6 :         // TODO: abstraction in TenantHarness for splits.
    1153            6 :         //
    1154            6 : 
    1155            6 :         // Filter for various shards: this exercises cases like values at start of key range, end of key
    1156            6 :         // range, middle of key range.
    1157            6 :         let shard_count = ShardCount::new(4);
    1158           24 :         for shard_number in 0..shard_count.count() {
    1159            6 :             //
    1160            6 :             // mimic the shard split
    1161            6 :             //
    1162           24 :             let shard_identity = ShardIdentity::new(
    1163           24 :                 ShardNumber(shard_number),
    1164           24 :                 shard_count,
    1165           24 :                 ShardStripeSize(0x8000),
    1166           24 :             )
    1167           24 :             .unwrap();
    1168           24 :             let harness = TenantHarness::create_custom(
    1169           24 :                 Box::leak(Box::new(format!(
    1170           24 :                     "test_image_layer_rewrite--child{}",
    1171           24 :                     shard_identity.shard_slug()
    1172           24 :                 ))),
    1173           24 :                 tenant_conf.clone(),
    1174           24 :                 tenant_id,
    1175           24 :                 shard_identity,
    1176           24 :                 // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
    1177           24 :                 // But here, all we care about is that the gen number is unique.
    1178           24 :                 get_next_gen(),
    1179           24 :             )
    1180            6 :             .await
    1181           24 :             .unwrap();
    1182           96 :             let (tenant, ctx) = harness.load().await;
    1183           24 :             let timeline = tenant
    1184           24 :                 .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1185           36 :                 .await
    1186           24 :                 .unwrap();
    1187            6 : 
    1188            6 :             //
    1189            6 :             // use filter() and make assertions
    1190            6 :             //
    1191            6 : 
    1192           24 :             let mut filtered_writer = ImageLayerWriter::new(
    1193           24 :                 harness.conf,
    1194           24 :                 timeline_id,
    1195           24 :                 harness.tenant_shard_id,
    1196           24 :                 &range,
    1197           24 :                 lsn,
    1198           24 :                 &ctx,
    1199           24 :             )
    1200           12 :             .await
    1201           24 :             .unwrap();
    1202            6 : 
    1203           24 :             let wrote_keys = resident
    1204           24 :                 .filter(&shard_identity, &mut filtered_writer, &ctx)
    1205       800157 :                 .await
    1206           24 :                 .unwrap();
    1207           24 :             let replacement = if wrote_keys > 0 {
    1208          387 :                 let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
    1209           18 :                 let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
    1210           18 :                 Some(resident)
    1211            6 :             } else {
    1212            6 :                 None
    1213            6 :             };
    1214            6 : 
    1215            6 :             // This exact size and those below will need updating as/when the layer encoding changes, but
    1216            6 :             // should be deterministic for a given version of the format, as we used no randomness generating the input.
    1217           24 :             assert_eq!(original_size, 1597440);
    1218            6 : 
    1219           24 :             match shard_number {
    1220            6 :                 0 => {
    1221            6 :                     // We should have written out just one stripe for our shard identity
    1222            6 :                     assert_eq!(wrote_keys, 0x8000);
    1223            6 :                     let replacement = replacement.unwrap();
    1224            6 : 
    1225            6 :                     // We should have dropped some of the data
    1226            6 :                     assert!(replacement.metadata().file_size < original_size);
    1227            6 :                     assert!(replacement.metadata().file_size > 0);
    1228            6 : 
    1229            6 :                     // Assert that we dropped ~3/4 of the data.
    1230            6 :                     assert_eq!(replacement.metadata().file_size, 417792);
    1231            6 :                 }
    1232            6 :                 1 => {
    1233            6 :                     // Shard 1 has no keys in our input range
    1234            6 :                     assert_eq!(wrote_keys, 0x0);
    1235            6 :                     assert!(replacement.is_none());
    1236            6 :                 }
    1237            6 :                 2 => {
    1238            6 :                     // Shard 2 has one stripes in the input range
    1239            6 :                     assert_eq!(wrote_keys, 0x8000);
    1240            6 :                     let replacement = replacement.unwrap();
    1241            6 :                     assert!(replacement.metadata().file_size < original_size);
    1242            6 :                     assert!(replacement.metadata().file_size > 0);
    1243            6 :                     assert_eq!(replacement.metadata().file_size, 417792);
    1244            6 :                 }
    1245            6 :                 3 => {
    1246            6 :                     // Shard 3 has two stripes in the input range
    1247            6 :                     assert_eq!(wrote_keys, 0x10000);
    1248            6 :                     let replacement = replacement.unwrap();
    1249            6 :                     assert!(replacement.metadata().file_size < original_size);
    1250            6 :                     assert!(replacement.metadata().file_size > 0);
    1251            6 :                     assert_eq!(replacement.metadata().file_size, 811008);
    1252            6 :                 }
    1253            6 :                 _ => unreachable!(),
    1254            6 :             }
    1255            6 :         }
    1256            6 :     }
    1257              : 
    1258            6 :     async fn produce_image_layer(
    1259            6 :         tenant: &Tenant,
    1260            6 :         tline: &Arc<Timeline>,
    1261            6 :         mut images: Vec<(Key, Bytes)>,
    1262            6 :         lsn: Lsn,
    1263            6 :         ctx: &RequestContext,
    1264            6 :     ) -> anyhow::Result<ResidentLayer> {
    1265            6 :         images.sort();
    1266            6 :         let (key_start, _) = images.first().unwrap();
    1267            6 :         let (key_last, _) = images.last().unwrap();
    1268            6 :         let key_end = key_last.next();
    1269            6 :         let key_range = *key_start..key_end;
    1270            6 :         let mut writer = ImageLayerWriter::new(
    1271            6 :             tenant.conf,
    1272            6 :             tline.timeline_id,
    1273            6 :             tenant.tenant_shard_id,
    1274            6 :             &key_range,
    1275            6 :             lsn,
    1276            6 :             ctx,
    1277            6 :         )
    1278            3 :         .await?;
    1279              : 
    1280         6006 :         for (key, img) in images {
    1281         6093 :             writer.put_image(key, img, ctx).await?;
    1282              :         }
    1283           12 :         let (desc, path) = writer.finish(ctx).await?;
    1284            6 :         let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
    1285              : 
    1286            6 :         Ok::<_, anyhow::Error>(img_layer)
    1287            6 :     }
    1288              : 
    1289           84 :     async fn assert_img_iter_equal(
    1290           84 :         img_iter: &mut ImageLayerIterator<'_>,
    1291           84 :         expect: &[(Key, Bytes)],
    1292           84 :         expect_lsn: Lsn,
    1293           84 :     ) {
    1294           84 :         let mut expect_iter = expect.iter();
    1295              :         loop {
    1296        84084 :             let o1 = img_iter.next().await.unwrap();
    1297        84084 :             let o2 = expect_iter.next();
    1298        84084 :             match (o1, o2) {
    1299           84 :                 (None, None) => break,
    1300        84000 :                 (Some((k1, l1, v1)), Some((k2, i2))) => {
    1301        84000 :                     let Value::Image(i1) = v1 else {
    1302            0 :                         panic!("expect Value::Image")
    1303              :                     };
    1304        84000 :                     assert_eq!(&k1, k2);
    1305        84000 :                     assert_eq!(l1, expect_lsn);
    1306        84000 :                     assert_eq!(&i1, i2);
    1307              :                 }
    1308            0 :                 (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
    1309              :             }
    1310              :         }
    1311           84 :     }
    1312              : 
    1313              :     #[tokio::test]
    1314            6 :     async fn image_layer_iterator() {
    1315            6 :         let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
    1316           24 :         let (tenant, ctx) = harness.load().await;
    1317            6 : 
    1318            6 :         let tline = tenant
    1319            6 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    1320           12 :             .await
    1321            6 :             .unwrap();
    1322            6 : 
    1323         6000 :         fn get_key(id: u32) -> Key {
    1324         6000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
    1325         6000 :             key.field6 = id;
    1326         6000 :             key
    1327         6000 :         }
    1328            6 :         const N: usize = 1000;
    1329            6 :         let test_imgs = (0..N)
    1330         6000 :             .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
    1331            6 :             .collect_vec();
    1332            6 :         let resident_layer =
    1333            6 :             produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
    1334         6108 :                 .await
    1335            6 :                 .unwrap();
    1336            6 :         let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
    1337           18 :         for max_read_size in [1, 1024] {
    1338           96 :             for batch_size in [1, 2, 4, 8, 3, 7, 13] {
    1339           84 :                 println!("running with batch_size={batch_size} max_read_size={max_read_size}");
    1340           84 :                 // Test if the batch size is correctly determined
    1341           84 :                 let mut iter = img_layer.iter(&ctx);
    1342           84 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1343           84 :                 let mut num_items = 0;
    1344          336 :                 for _ in 0..3 {
    1345          252 :                     iter.next_batch().await.unwrap();
    1346          252 :                     num_items += iter.key_values_batch.len();
    1347          252 :                     if max_read_size == 1 {
    1348            6 :                         // every key should be a batch b/c the value is larger than max_read_size
    1349          126 :                         assert_eq!(iter.key_values_batch.len(), 1);
    1350            6 :                     } else {
    1351          126 :                         assert!(iter.key_values_batch.len() <= batch_size);
    1352            6 :                     }
    1353          252 :                     if num_items >= N {
    1354            6 :                         break;
    1355          252 :                     }
    1356          252 :                     iter.key_values_batch.clear();
    1357            6 :                 }
    1358            6 :                 // Test if the result is correct
    1359           84 :                 let mut iter = img_layer.iter(&ctx);
    1360           84 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1361        28787 :                 assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
    1362            6 :             }
    1363            6 :         }
    1364            6 :     }
    1365              : }
        

Generated by: LCOV version 2.1-beta