LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer - image_layer.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 78.3 % 952 745
Test Date: 2024-10-22 22:13:45 Functions: 56.8 % 88 50

            Line data    Source code
       1              : //! An ImageLayer represents an image or a snapshot of a key-range at
       2              : //! one particular LSN.
       3              : //!
       4              : //! It contains an image of all key-value pairs in its key-range. Any key
       5              : //! that falls into the image layer's range but does not exist in the layer,
       6              : //! does not exist.
       7              : //!
       8              : //! An image layer is stored in a file on disk. The file is stored in
       9              : //! timelines/<timeline_id> directory.  Currently, there are no
      10              : //! subdirectories, and each image layer file is named like this:
      11              : //!
      12              : //! ```text
      13              : //!    <key start>-<key end>__<LSN>
      14              : //! ```
      15              : //!
      16              : //! For example:
      17              : //!
      18              : //! ```text
      19              : //!    000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
      20              : //! ```
      21              : //!
      22              : //! Every image layer file consists of three parts: "summary",
      23              : //! "index", and "values".  The summary is a fixed size header at the
      24              : //! beginning of the file, and it contains basic information about the
      25              : //! layer, and offsets to the other parts. The "index" is a B-tree,
      26              : //! mapping from Key to an offset in the "values" part.  The
      27              : //! actual page images are stored in the "values" part.
      28              : use crate::config::PageServerConf;
      29              : use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
      30              : use crate::page_cache::{self, FileId, PAGE_SZ};
      31              : use crate::repository::{Key, Value, KEY_SIZE};
      32              : use crate::tenant::blob_io::BlobWriter;
      33              : use crate::tenant::block_io::{BlockBuf, FileBlockReader};
      34              : use crate::tenant::disk_btree::{
      35              :     DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
      36              : };
      37              : use crate::tenant::timeline::GetVectoredError;
      38              : use crate::tenant::vectored_blob_io::{
      39              :     BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
      40              :     VectoredReadPlanner,
      41              : };
      42              : use crate::tenant::PageReconstructError;
      43              : use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
      44              : use crate::virtual_file::IoBufferMut;
      45              : use crate::virtual_file::{self, MaybeFatalIo, VirtualFile};
      46              : use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
      47              : use anyhow::{anyhow, bail, ensure, Context, Result};
      48              : use bytes::Bytes;
      49              : use camino::{Utf8Path, Utf8PathBuf};
      50              : use hex;
      51              : use itertools::Itertools;
      52              : use pageserver_api::config::MaxVectoredReadBytes;
      53              : use pageserver_api::key::DBDIR_KEY;
      54              : use pageserver_api::keyspace::KeySpace;
      55              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
      56              : use rand::{distributions::Alphanumeric, Rng};
      57              : use serde::{Deserialize, Serialize};
      58              : use std::collections::VecDeque;
      59              : use std::fs::File;
      60              : use std::io::SeekFrom;
      61              : use std::ops::Range;
      62              : use std::os::unix::prelude::FileExt;
      63              : use std::str::FromStr;
      64              : use tokio::sync::OnceCell;
      65              : use tokio_stream::StreamExt;
      66              : use tracing::*;
      67              : 
      68              : use utils::{
      69              :     bin_ser::BeSer,
      70              :     id::{TenantId, TimelineId},
      71              :     lsn::Lsn,
      72              : };
      73              : 
      74              : use super::layer_name::ImageLayerName;
      75              : use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
      76              : 
      77              : ///
      78              : /// Header stored in the beginning of the file
      79              : ///
      80              : /// After this comes the 'values' part, starting on block 1. After that,
      81              : /// the 'index' starts at the block indicated by 'index_start_blk'
      82              : ///
      83          102 : #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
      84              : pub struct Summary {
      85              :     /// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
      86              :     pub magic: u16,
      87              :     pub format_version: u16,
      88              : 
      89              :     pub tenant_id: TenantId,
      90              :     pub timeline_id: TimelineId,
      91              :     pub key_range: Range<Key>,
      92              :     pub lsn: Lsn,
      93              : 
      94              :     /// Block number where the 'index' part of the file begins.
      95              :     pub index_start_blk: u32,
      96              :     /// Block within the 'index', where the B-tree root page is stored
      97              :     pub index_root_blk: u32,
      98              :     // the 'values' part starts after the summary header, on block 1.
      99              : }
     100              : 
     101              : impl From<&ImageLayer> for Summary {
     102            0 :     fn from(layer: &ImageLayer) -> Self {
     103            0 :         Self::expected(
     104            0 :             layer.desc.tenant_shard_id.tenant_id,
     105            0 :             layer.desc.timeline_id,
     106            0 :             layer.desc.key_range.clone(),
     107            0 :             layer.lsn,
     108            0 :         )
     109            0 :     }
     110              : }
     111              : 
     112              : impl Summary {
     113          102 :     pub(super) fn expected(
     114          102 :         tenant_id: TenantId,
     115          102 :         timeline_id: TimelineId,
     116          102 :         key_range: Range<Key>,
     117          102 :         lsn: Lsn,
     118          102 :     ) -> Self {
     119          102 :         Self {
     120          102 :             magic: IMAGE_FILE_MAGIC,
     121          102 :             format_version: STORAGE_FORMAT_VERSION,
     122          102 :             tenant_id,
     123          102 :             timeline_id,
     124          102 :             key_range,
     125          102 :             lsn,
     126          102 : 
     127          102 :             index_start_blk: 0,
     128          102 :             index_root_blk: 0,
     129          102 :         }
     130          102 :     }
     131              : }
     132              : 
     133              : /// This is used only from `pagectl`. Within pageserver, all layers are
     134              : /// [`crate::tenant::storage_layer::Layer`], which can hold an [`ImageLayerInner`].
     135              : pub struct ImageLayer {
     136              :     path: Utf8PathBuf,
     137              :     pub desc: PersistentLayerDesc,
     138              :     // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
     139              :     pub lsn: Lsn,
     140              :     inner: OnceCell<ImageLayerInner>,
     141              : }
     142              : 
     143              : impl std::fmt::Debug for ImageLayer {
     144            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     145              :         use super::RangeDisplayDebug;
     146              : 
     147            0 :         f.debug_struct("ImageLayer")
     148            0 :             .field("key_range", &RangeDisplayDebug(&self.desc.key_range))
     149            0 :             .field("file_size", &self.desc.file_size)
     150            0 :             .field("lsn", &self.lsn)
     151            0 :             .field("inner", &self.inner)
     152            0 :             .finish()
     153            0 :     }
     154              : }
     155              : 
     156              : /// ImageLayer is the in-memory data structure associated with an on-disk image
     157              : /// file.
     158              : pub struct ImageLayerInner {
     159              :     // values copied from summary
     160              :     index_start_blk: u32,
     161              :     index_root_blk: u32,
     162              : 
     163              :     key_range: Range<Key>,
     164              :     lsn: Lsn,
     165              : 
     166              :     file: VirtualFile,
     167              :     file_id: FileId,
     168              : 
     169              :     max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     170              : }
     171              : 
     172              : impl ImageLayerInner {
     173            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
     174            0 :         format!(
     175            0 :             "image {}..{} {}",
     176            0 :             self.key_range().start,
     177            0 :             self.key_range().end,
     178            0 :             self.lsn()
     179            0 :         )
     180            0 :     }
     181              : }
     182              : 
     183              : impl std::fmt::Debug for ImageLayerInner {
     184            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     185            0 :         f.debug_struct("ImageLayerInner")
     186            0 :             .field("index_start_blk", &self.index_start_blk)
     187            0 :             .field("index_root_blk", &self.index_root_blk)
     188            0 :             .finish()
     189            0 :     }
     190              : }
     191              : 
     192              : impl ImageLayerInner {
     193            0 :     pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
     194            0 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     195            0 :         let tree_reader = DiskBtreeReader::<_, KEY_SIZE>::new(
     196            0 :             self.index_start_blk,
     197            0 :             self.index_root_blk,
     198            0 :             block_reader,
     199            0 :         );
     200            0 : 
     201            0 :         tree_reader.dump().await?;
     202              : 
     203            0 :         tree_reader
     204            0 :             .visit(
     205            0 :                 &[0u8; KEY_SIZE],
     206            0 :                 VisitDirection::Forwards,
     207            0 :                 |key, value| {
     208            0 :                     println!("key: {} offset {}", hex::encode(key), value);
     209            0 :                     true
     210            0 :                 },
     211            0 :                 ctx,
     212            0 :             )
     213            0 :             .await?;
     214              : 
     215            0 :         Ok(())
     216            0 :     }
     217              : }
     218              : 
     219              : /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
     220              : impl std::fmt::Display for ImageLayer {
     221            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     222            0 :         write!(f, "{}", self.layer_desc().short_id())
     223            0 :     }
     224              : }
     225              : 
     226              : impl AsLayerDesc for ImageLayer {
     227            0 :     fn layer_desc(&self) -> &PersistentLayerDesc {
     228            0 :         &self.desc
     229            0 :     }
     230              : }
     231              : 
     232              : impl ImageLayer {
     233            0 :     pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
     234            0 :         self.desc.dump();
     235            0 : 
     236            0 :         if !verbose {
     237            0 :             return Ok(());
     238            0 :         }
     239              : 
     240            0 :         let inner = self.load(ctx).await?;
     241              : 
     242            0 :         inner.dump(ctx).await?;
     243              : 
     244            0 :         Ok(())
     245            0 :     }
     246              : 
     247          490 :     fn temp_path_for(
     248          490 :         conf: &PageServerConf,
     249          490 :         timeline_id: TimelineId,
     250          490 :         tenant_shard_id: TenantShardId,
     251          490 :         fname: &ImageLayerName,
     252          490 :     ) -> Utf8PathBuf {
     253          490 :         let rand_string: String = rand::thread_rng()
     254          490 :             .sample_iter(&Alphanumeric)
     255          490 :             .take(8)
     256          490 :             .map(char::from)
     257          490 :             .collect();
     258          490 : 
     259          490 :         conf.timeline_path(&tenant_shard_id, &timeline_id)
     260          490 :             .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
     261          490 :     }
     262              : 
     263              :     ///
     264              :     /// Open the underlying file and read the metadata into memory, if it's
     265              :     /// not loaded already.
     266              :     ///
     267            0 :     async fn load(&self, ctx: &RequestContext) -> Result<&ImageLayerInner> {
     268            0 :         self.inner
     269            0 :             .get_or_try_init(|| self.load_inner(ctx))
     270            0 :             .await
     271            0 :             .with_context(|| format!("Failed to load image layer {}", self.path()))
     272            0 :     }
     273              : 
     274            0 :     async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
     275            0 :         let path = self.path();
     276              : 
     277            0 :         let loaded =
     278            0 :             ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
     279              : 
     280              :         // not production code
     281            0 :         let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
     282            0 :         let expected_layer_name = self.layer_desc().layer_name();
     283            0 : 
     284            0 :         if actual_layer_name != expected_layer_name {
     285            0 :             println!("warning: filename does not match what is expected from in-file summary");
     286            0 :             println!("actual: {:?}", actual_layer_name.to_string());
     287            0 :             println!("expected: {:?}", expected_layer_name.to_string());
     288            0 :         }
     289              : 
     290            0 :         Ok(loaded)
     291            0 :     }
     292              : 
     293              :     /// Create an ImageLayer struct representing an existing file on disk.
     294              :     ///
     295              :     /// This variant is only used for debugging purposes, by the 'pagectl' binary.
     296            0 :     pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
     297            0 :         let mut summary_buf = vec![0; PAGE_SZ];
     298            0 :         file.read_exact_at(&mut summary_buf, 0)?;
     299            0 :         let summary = Summary::des_prefix(&summary_buf)?;
     300            0 :         let metadata = file
     301            0 :             .metadata()
     302            0 :             .context("get file metadata to determine size")?;
     303              : 
     304              :         // This function is never used for constructing layers in a running pageserver,
     305              :         // so it does not need an accurate TenantShardId.
     306            0 :         let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
     307            0 : 
     308            0 :         Ok(ImageLayer {
     309            0 :             path: path.to_path_buf(),
     310            0 :             desc: PersistentLayerDesc::new_img(
     311            0 :                 tenant_shard_id,
     312            0 :                 summary.timeline_id,
     313            0 :                 summary.key_range,
     314            0 :                 summary.lsn,
     315            0 :                 metadata.len(),
     316            0 :             ), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
     317            0 :             lsn: summary.lsn,
     318            0 :             inner: OnceCell::new(),
     319            0 :         })
     320            0 :     }
     321              : 
     322            0 :     fn path(&self) -> Utf8PathBuf {
     323            0 :         self.path.clone()
     324            0 :     }
     325              : }
     326              : 
     327            0 : #[derive(thiserror::Error, Debug)]
     328              : pub enum RewriteSummaryError {
     329              :     #[error("magic mismatch")]
     330              :     MagicMismatch,
     331              :     #[error(transparent)]
     332              :     Other(#[from] anyhow::Error),
     333              : }
     334              : 
     335              : impl From<std::io::Error> for RewriteSummaryError {
     336            0 :     fn from(e: std::io::Error) -> Self {
     337            0 :         Self::Other(anyhow::anyhow!(e))
     338            0 :     }
     339              : }
     340              : 
     341              : impl ImageLayer {
     342            0 :     pub async fn rewrite_summary<F>(
     343            0 :         path: &Utf8Path,
     344            0 :         rewrite: F,
     345            0 :         ctx: &RequestContext,
     346            0 :     ) -> Result<(), RewriteSummaryError>
     347            0 :     where
     348            0 :         F: Fn(Summary) -> Summary,
     349            0 :     {
     350            0 :         let mut file = VirtualFile::open_with_options(
     351            0 :             path,
     352            0 :             virtual_file::OpenOptions::new().read(true).write(true),
     353            0 :             ctx,
     354            0 :         )
     355            0 :         .await
     356            0 :         .with_context(|| format!("Failed to open file '{}'", path))?;
     357            0 :         let file_id = page_cache::next_file_id();
     358            0 :         let block_reader = FileBlockReader::new(&file, file_id);
     359            0 :         let summary_blk = block_reader.read_blk(0, ctx).await?;
     360            0 :         let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
     361            0 :         if actual_summary.magic != IMAGE_FILE_MAGIC {
     362            0 :             return Err(RewriteSummaryError::MagicMismatch);
     363            0 :         }
     364            0 : 
     365            0 :         let new_summary = rewrite(actual_summary);
     366            0 : 
     367            0 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     368            0 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     369            0 :         Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
     370            0 :         file.seek(SeekFrom::Start(0)).await?;
     371            0 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     372            0 :         res?;
     373            0 :         Ok(())
     374            0 :     }
     375              : }
     376              : 
     377              : impl ImageLayerInner {
     378           36 :     pub(crate) fn key_range(&self) -> &Range<Key> {
     379           36 :         &self.key_range
     380           36 :     }
     381              : 
     382           36 :     pub(crate) fn lsn(&self) -> Lsn {
     383           36 :         self.lsn
     384           36 :     }
     385              : 
     386          102 :     pub(super) async fn load(
     387          102 :         path: &Utf8Path,
     388          102 :         lsn: Lsn,
     389          102 :         summary: Option<Summary>,
     390          102 :         max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
     391          102 :         ctx: &RequestContext,
     392          102 :     ) -> anyhow::Result<Self> {
     393          102 :         let file = VirtualFile::open_v2(path, ctx)
     394           51 :             .await
     395          102 :             .context("open layer file")?;
     396          102 :         let file_id = page_cache::next_file_id();
     397          102 :         let block_reader = FileBlockReader::new(&file, file_id);
     398          102 :         let summary_blk = block_reader
     399          102 :             .read_blk(0, ctx)
     400           52 :             .await
     401          102 :             .context("read first block")?;
     402              : 
     403              :         // length is the only way how this could fail, so it's not actually likely at all unless
     404              :         // read_blk returns wrong sized block.
     405              :         //
     406              :         // TODO: confirm and make this into assertion
     407          102 :         let actual_summary =
     408          102 :             Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
     409              : 
     410          102 :         if let Some(mut expected_summary) = summary {
     411              :             // production code path
     412          102 :             expected_summary.index_start_blk = actual_summary.index_start_blk;
     413          102 :             expected_summary.index_root_blk = actual_summary.index_root_blk;
     414          102 :             // mask out the timeline_id, but still require the layers to be from the same tenant
     415          102 :             expected_summary.timeline_id = actual_summary.timeline_id;
     416          102 : 
     417          102 :             if actual_summary != expected_summary {
     418            0 :                 bail!(
     419            0 :                     "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
     420            0 :                     actual_summary,
     421            0 :                     expected_summary
     422            0 :                 );
     423          102 :             }
     424            0 :         }
     425              : 
     426          102 :         Ok(ImageLayerInner {
     427          102 :             index_start_blk: actual_summary.index_start_blk,
     428          102 :             index_root_blk: actual_summary.index_root_blk,
     429          102 :             lsn,
     430          102 :             file,
     431          102 :             file_id,
     432          102 :             max_vectored_read_bytes,
     433          102 :             key_range: actual_summary.key_range,
     434          102 :         })
     435          102 :     }
     436              : 
     437              :     // Look up the keys in the provided keyspace and update
     438              :     // the reconstruct state with whatever is found.
     439         7969 :     pub(super) async fn get_values_reconstruct_data(
     440         7969 :         &self,
     441         7969 :         keyspace: KeySpace,
     442         7969 :         reconstruct_state: &mut ValuesReconstructState,
     443         7969 :         ctx: &RequestContext,
     444         7969 :     ) -> Result<(), GetVectoredError> {
     445         7969 :         let reads = self
     446         7969 :             .plan_reads(keyspace, None, ctx)
     447          710 :             .await
     448         7969 :             .map_err(GetVectoredError::Other)?;
     449              : 
     450         7969 :         self.do_reads_and_update_state(reads, reconstruct_state, ctx)
     451         4235 :             .await;
     452              : 
     453         7969 :         reconstruct_state.on_image_layer_visited(&self.key_range);
     454         7969 : 
     455         7969 :         Ok(())
     456         7969 :     }
     457              : 
     458              :     /// Traverse the layer's index to build read operations on the overlap of the input keyspace
     459              :     /// and the keys in this layer.
     460              :     ///
     461              :     /// If shard_identity is provided, it will be used to filter keys down to those stored on
     462              :     /// this shard.
     463         7977 :     async fn plan_reads(
     464         7977 :         &self,
     465         7977 :         keyspace: KeySpace,
     466         7977 :         shard_identity: Option<&ShardIdentity>,
     467         7977 :         ctx: &RequestContext,
     468         7977 :     ) -> anyhow::Result<Vec<VectoredRead>> {
     469         7977 :         let mut planner = VectoredReadPlanner::new(
     470         7977 :             self.max_vectored_read_bytes
     471         7977 :                 .expect("Layer is loaded with max vectored bytes config")
     472         7977 :                 .0
     473         7977 :                 .into(),
     474         7977 :         );
     475         7977 : 
     476         7977 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     477         7977 :         let tree_reader =
     478         7977 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     479         7977 : 
     480         7977 :         let ctx = RequestContextBuilder::extend(ctx)
     481         7977 :             .page_content_kind(PageContentKind::ImageLayerBtreeNode)
     482         7977 :             .build();
     483              : 
     484        29707 :         for range in keyspace.ranges.iter() {
     485        29707 :             let mut range_end_handled = false;
     486        29707 :             let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     487        29707 :             range.start.write_to_byte_slice(&mut search_key);
     488        29707 : 
     489        29707 :             let index_stream = tree_reader.clone().into_stream(&search_key, &ctx);
     490        29707 :             let mut index_stream = std::pin::pin!(index_stream);
     491              : 
     492      1096536 :             while let Some(index_entry) = index_stream.next().await {
     493      1096337 :                 let (raw_key, offset) = index_entry?;
     494              : 
     495      1096337 :                 let key = Key::from_slice(&raw_key[..KEY_SIZE]);
     496      1096337 :                 assert!(key >= range.start);
     497              : 
     498      1096337 :                 let flag = if let Some(shard_identity) = shard_identity {
     499      1048576 :                     if shard_identity.is_key_disposable(&key) {
     500       786432 :                         BlobFlag::Ignore
     501              :                     } else {
     502       262144 :                         BlobFlag::None
     503              :                     }
     504              :                 } else {
     505        47761 :                     BlobFlag::None
     506              :                 };
     507              : 
     508      1096337 :                 if key >= range.end {
     509        29508 :                     planner.handle_range_end(offset);
     510        29508 :                     range_end_handled = true;
     511        29508 :                     break;
     512      1066829 :                 } else {
     513      1066829 :                     planner.handle(key, self.lsn, offset, flag);
     514      1066829 :                 }
     515              :             }
     516              : 
     517        29707 :             if !range_end_handled {
     518          199 :                 let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
     519          199 :                 planner.handle_range_end(payload_end);
     520        29508 :             }
     521              :         }
     522              : 
     523         7977 :         Ok(planner.finish())
     524         7977 :     }
     525              : 
     526              :     /// Given a key range, select the parts of that range that should be retained by the ShardIdentity,
     527              :     /// then execute vectored GET operations, passing the results of all read keys into the writer.
     528            8 :     pub(super) async fn filter(
     529            8 :         &self,
     530            8 :         shard_identity: &ShardIdentity,
     531            8 :         writer: &mut ImageLayerWriter,
     532            8 :         ctx: &RequestContext,
     533            8 :     ) -> anyhow::Result<usize> {
     534              :         // Fragment the range into the regions owned by this ShardIdentity
     535            8 :         let plan = self
     536            8 :             .plan_reads(
     537            8 :                 KeySpace {
     538            8 :                     // If asked for the total key space, plan_reads will give us all the keys in the layer
     539            8 :                     ranges: vec![Key::MIN..Key::MAX],
     540            8 :                 },
     541            8 :                 Some(shard_identity),
     542            8 :                 ctx,
     543            8 :             )
     544          469 :             .await?;
     545              : 
     546            8 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     547            8 :         let mut key_count = 0;
     548           16 :         for read in plan.into_iter() {
     549           16 :             let buf_size = read.size();
     550           16 : 
     551           16 :             let buf = IoBufferMut::with_capacity(buf_size);
     552           16 :             let blobs_buf = vectored_blob_reader.read_blobs(&read, buf, ctx).await?;
     553              : 
     554           16 :             let view = BufView::new_slice(&blobs_buf.buf);
     555              : 
     556       262144 :             for meta in blobs_buf.blobs.iter() {
     557       262144 :                 let img_buf = meta.read(&view).await?;
     558              : 
     559       262144 :                 key_count += 1;
     560       262144 :                 writer
     561       262144 :                     .put_image(meta.meta.key, img_buf.into_bytes(), ctx)
     562       266240 :                     .await
     563       262144 :                     .context(format!("Storing key {}", meta.meta.key))?;
     564              :             }
     565              :         }
     566              : 
     567            8 :         Ok(key_count)
     568            8 :     }
     569              : 
     570         7969 :     async fn do_reads_and_update_state(
     571         7969 :         &self,
     572         7969 :         reads: Vec<VectoredRead>,
     573         7969 :         reconstruct_state: &mut ValuesReconstructState,
     574         7969 :         ctx: &RequestContext,
     575         7969 :     ) {
     576         7969 :         let max_vectored_read_bytes = self
     577         7969 :             .max_vectored_read_bytes
     578         7969 :             .expect("Layer is loaded with max vectored bytes config")
     579         7969 :             .0
     580         7969 :             .into();
     581         7969 : 
     582         7969 :         let vectored_blob_reader = VectoredBlobReader::new(&self.file);
     583         8230 :         for read in reads.into_iter() {
     584         8230 :             let buf_size = read.size();
     585         8230 : 
     586         8230 :             if buf_size > max_vectored_read_bytes {
     587              :                 // If the read is oversized, it should only contain one key.
     588            0 :                 let offenders = read
     589            0 :                     .blobs_at
     590            0 :                     .as_slice()
     591            0 :                     .iter()
     592            0 :                     .filter_map(|(_, blob_meta)| {
     593            0 :                         if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
     594              :                             // The size of values for these keys is unbounded and can
     595              :                             // grow very large in pathological cases.
     596            0 :                             None
     597              :                         } else {
     598            0 :                             Some(format!("{}@{}", blob_meta.key, blob_meta.lsn))
     599              :                         }
     600            0 :                     })
     601            0 :                     .join(", ");
     602            0 : 
     603            0 :                 if !offenders.is_empty() {
     604            0 :                     tracing::warn!(
     605            0 :                         "Oversized vectored read ({} > {}) for keys {}",
     606              :                         buf_size,
     607              :                         max_vectored_read_bytes,
     608              :                         offenders
     609              :                     );
     610            0 :                 }
     611         8230 :             }
     612              : 
     613         8230 :             let buf = IoBufferMut::with_capacity(buf_size);
     614         8230 :             let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
     615              : 
     616         8230 :             match res {
     617         8230 :                 Ok(blobs_buf) => {
     618         8230 :                     let view = BufView::new_slice(&blobs_buf.buf);
     619        18253 :                     for meta in blobs_buf.blobs.iter() {
     620        18253 :                         let img_buf = meta.read(&view).await;
     621              : 
     622        18253 :                         let img_buf = match img_buf {
     623        18253 :                             Ok(img_buf) => img_buf,
     624            0 :                             Err(e) => {
     625            0 :                                 reconstruct_state.on_key_error(
     626            0 :                                     meta.meta.key,
     627            0 :                                     PageReconstructError::Other(anyhow!(e).context(format!(
     628            0 :                                         "Failed to decompress blob from virtual file {}",
     629            0 :                                         self.file.path(),
     630            0 :                                     ))),
     631            0 :                                 );
     632            0 : 
     633            0 :                                 continue;
     634              :                             }
     635              :                         };
     636        18253 :                         reconstruct_state.update_key(
     637        18253 :                             &meta.meta.key,
     638        18253 :                             self.lsn,
     639        18253 :                             Value::Image(img_buf.into_bytes()),
     640        18253 :                         );
     641              :                     }
     642              :                 }
     643            0 :                 Err(err) => {
     644            0 :                     let kind = err.kind();
     645            0 :                     for (_, blob_meta) in read.blobs_at.as_slice() {
     646            0 :                         reconstruct_state.on_key_error(
     647            0 :                             blob_meta.key,
     648            0 :                             PageReconstructError::from(anyhow!(
     649            0 :                                 "Failed to read blobs from virtual file {}: {}",
     650            0 :                                 self.file.path(),
     651            0 :                                 kind
     652            0 :                             )),
     653            0 :                         );
     654            0 :                     }
     655              :                 }
     656              :             };
     657              :         }
     658         7969 :     }
     659              : 
     660           92 :     pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
     661           92 :         let block_reader = FileBlockReader::new(&self.file, self.file_id);
     662           92 :         let tree_reader =
     663           92 :             DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
     664           92 :         ImageLayerIterator {
     665           92 :             image_layer: self,
     666           92 :             ctx,
     667           92 :             index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
     668           92 :             key_values_batch: VecDeque::new(),
     669           92 :             is_end: false,
     670           92 :             planner: StreamingVectoredReadPlanner::new(
     671           92 :                 1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
     672           92 :                 1024,        // The default value. Unit tests might use a different value
     673           92 :             ),
     674           92 :         }
     675           92 :     }
     676              : }
     677              : 
     678              : /// A builder object for constructing a new image layer.
     679              : ///
     680              : /// Usage:
     681              : ///
     682              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     683              : ///
     684              : /// 2. Write the contents by calling `put_page_image` for every key-value
     685              : ///    pair in the key range.
     686              : ///
     687              : /// 3. Call `finish`.
     688              : ///
     689              : struct ImageLayerWriterInner {
     690              :     conf: &'static PageServerConf,
     691              :     path: Utf8PathBuf,
     692              :     timeline_id: TimelineId,
     693              :     tenant_shard_id: TenantShardId,
     694              :     key_range: Range<Key>,
     695              :     lsn: Lsn,
     696              : 
     697              :     // Total uncompressed bytes passed into put_image
     698              :     uncompressed_bytes: u64,
     699              : 
     700              :     // Like `uncompressed_bytes`,
     701              :     // but only of images we might consider for compression
     702              :     uncompressed_bytes_eligible: u64,
     703              : 
     704              :     // Like `uncompressed_bytes`, but only of images
     705              :     // where we have chosen their compressed form
     706              :     uncompressed_bytes_chosen: u64,
     707              : 
     708              :     // Number of keys in the layer.
     709              :     num_keys: usize,
     710              : 
     711              :     blob_writer: BlobWriter<false>,
     712              :     tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
     713              : 
     714              :     #[cfg(feature = "testing")]
     715              :     last_written_key: Key,
     716              : }
     717              : 
     718              : impl ImageLayerWriterInner {
     719              :     ///
     720              :     /// Start building a new image layer.
     721              :     ///
     722          490 :     async fn new(
     723          490 :         conf: &'static PageServerConf,
     724          490 :         timeline_id: TimelineId,
     725          490 :         tenant_shard_id: TenantShardId,
     726          490 :         key_range: &Range<Key>,
     727          490 :         lsn: Lsn,
     728          490 :         ctx: &RequestContext,
     729          490 :     ) -> anyhow::Result<Self> {
     730          490 :         // Create the file initially with a temporary filename.
     731          490 :         // We'll atomically rename it to the final name when we're done.
     732          490 :         let path = ImageLayer::temp_path_for(
     733          490 :             conf,
     734          490 :             timeline_id,
     735          490 :             tenant_shard_id,
     736          490 :             &ImageLayerName {
     737          490 :                 key_range: key_range.clone(),
     738          490 :                 lsn,
     739          490 :             },
     740          490 :         );
     741          490 :         trace!("creating image layer {}", path);
     742          490 :         let mut file = {
     743          490 :             VirtualFile::open_with_options(
     744          490 :                 &path,
     745          490 :                 virtual_file::OpenOptions::new()
     746          490 :                     .write(true)
     747          490 :                     .create_new(true),
     748          490 :                 ctx,
     749          490 :             )
     750          328 :             .await?
     751              :         };
     752              :         // make room for the header block
     753          490 :         file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
     754          490 :         let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
     755          490 : 
     756          490 :         // Initialize the b-tree index builder
     757          490 :         let block_buf = BlockBuf::new();
     758          490 :         let tree_builder = DiskBtreeBuilder::new(block_buf);
     759          490 : 
     760          490 :         let writer = Self {
     761          490 :             conf,
     762          490 :             path,
     763          490 :             timeline_id,
     764          490 :             tenant_shard_id,
     765          490 :             key_range: key_range.clone(),
     766          490 :             lsn,
     767          490 :             tree: tree_builder,
     768          490 :             blob_writer,
     769          490 :             uncompressed_bytes: 0,
     770          490 :             uncompressed_bytes_eligible: 0,
     771          490 :             uncompressed_bytes_chosen: 0,
     772          490 :             num_keys: 0,
     773          490 :             #[cfg(feature = "testing")]
     774          490 :             last_written_key: Key::MIN,
     775          490 :         };
     776          490 : 
     777          490 :         Ok(writer)
     778          490 :     }
     779              : 
     780              :     ///
     781              :     /// Write next value to the file.
     782              :     ///
     783              :     /// The page versions must be appended in blknum order.
     784              :     ///
     785       546152 :     async fn put_image(
     786       546152 :         &mut self,
     787       546152 :         key: Key,
     788       546152 :         img: Bytes,
     789       546152 :         ctx: &RequestContext,
     790       546152 :     ) -> anyhow::Result<()> {
     791       546152 :         ensure!(self.key_range.contains(&key));
     792       546152 :         let compression = self.conf.image_compression;
     793       546152 :         let uncompressed_len = img.len() as u64;
     794       546152 :         self.uncompressed_bytes += uncompressed_len;
     795       546152 :         self.num_keys += 1;
     796       546152 :         let (_img, res) = self
     797       546152 :             .blob_writer
     798       546152 :             .write_blob_maybe_compressed(img.slice_len(), ctx, compression)
     799       554810 :             .await;
     800              :         // TODO: re-use the buffer for `img` further upstack
     801       546152 :         let (off, compression_info) = res?;
     802       546152 :         if compression_info.compressed_size.is_some() {
     803         8002 :             // The image has been considered for compression at least
     804         8002 :             self.uncompressed_bytes_eligible += uncompressed_len;
     805       538150 :         }
     806       546152 :         if compression_info.written_compressed {
     807            0 :             // The image has been compressed
     808            0 :             self.uncompressed_bytes_chosen += uncompressed_len;
     809       546152 :         }
     810              : 
     811       546152 :         let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
     812       546152 :         key.write_to_byte_slice(&mut keybuf);
     813       546152 :         self.tree.append(&keybuf, off)?;
     814              : 
     815              :         #[cfg(feature = "testing")]
     816       546152 :         {
     817       546152 :             self.last_written_key = key;
     818       546152 :         }
     819       546152 : 
     820       546152 :         Ok(())
     821       546152 :     }
     822              : 
     823              :     ///
     824              :     /// Finish writing the image layer.
     825              :     ///
     826          294 :     async fn finish(
     827          294 :         self,
     828          294 :         ctx: &RequestContext,
     829          294 :         end_key: Option<Key>,
     830          294 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     831          294 :         let temp_path = self.path.clone();
     832          829 :         let result = self.finish0(ctx, end_key).await;
     833          294 :         if let Err(ref e) = result {
     834            0 :             tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
     835            0 :             if let Err(e) = std::fs::remove_file(&temp_path) {
     836            0 :                 tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
     837            0 :             }
     838          294 :         }
     839          294 :         result
     840          294 :     }
     841              : 
     842              :     ///
     843              :     /// Finish writing the image layer.
     844              :     ///
     845          294 :     async fn finish0(
     846          294 :         self,
     847          294 :         ctx: &RequestContext,
     848          294 :         end_key: Option<Key>,
     849          294 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
     850          294 :         let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
     851          294 : 
     852          294 :         // Calculate compression ratio
     853          294 :         let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
     854          294 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
     855          294 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
     856          294 :             .inc_by(self.uncompressed_bytes_eligible);
     857          294 :         crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
     858          294 :         crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
     859          294 : 
     860          294 :         let mut file = self.blob_writer.into_inner();
     861          294 : 
     862          294 :         // Write out the index
     863          294 :         file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
     864            0 :             .await?;
     865          294 :         let (index_root_blk, block_buf) = self.tree.finish()?;
     866         1044 :         for buf in block_buf.blocks {
     867          750 :             let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     868          750 :             res?;
     869              :         }
     870              : 
     871          294 :         let final_key_range = if let Some(end_key) = end_key {
     872           26 :             self.key_range.start..end_key
     873              :         } else {
     874          268 :             self.key_range.clone()
     875              :         };
     876              : 
     877              :         // Fill in the summary on blk 0
     878          294 :         let summary = Summary {
     879          294 :             magic: IMAGE_FILE_MAGIC,
     880          294 :             format_version: STORAGE_FORMAT_VERSION,
     881          294 :             tenant_id: self.tenant_shard_id.tenant_id,
     882          294 :             timeline_id: self.timeline_id,
     883          294 :             key_range: final_key_range.clone(),
     884          294 :             lsn: self.lsn,
     885          294 :             index_start_blk,
     886          294 :             index_root_blk,
     887          294 :         };
     888          294 : 
     889          294 :         let mut buf = Vec::with_capacity(PAGE_SZ);
     890          294 :         // TODO: could use smallvec here but it's a pain with Slice<T>
     891          294 :         Summary::ser_into(&summary, &mut buf)?;
     892          294 :         file.seek(SeekFrom::Start(0)).await?;
     893          294 :         let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
     894          294 :         res?;
     895              : 
     896          294 :         let metadata = file
     897          294 :             .metadata()
     898          148 :             .await
     899          294 :             .context("get metadata to determine file size")?;
     900              : 
     901          294 :         let desc = PersistentLayerDesc::new_img(
     902          294 :             self.tenant_shard_id,
     903          294 :             self.timeline_id,
     904          294 :             final_key_range,
     905          294 :             self.lsn,
     906          294 :             metadata.len(),
     907          294 :         );
     908              : 
     909              :         #[cfg(feature = "testing")]
     910          294 :         if let Some(end_key) = end_key {
     911           26 :             assert!(
     912           26 :                 self.last_written_key < end_key,
     913            0 :                 "written key violates end_key range"
     914              :             );
     915          268 :         }
     916              : 
     917              :         // Note: Because we open the file in write-only mode, we cannot
     918              :         // reuse the same VirtualFile for reading later. That's why we don't
     919              :         // set inner.file here. The first read will have to re-open it.
     920              : 
     921              :         // fsync the file
     922          294 :         file.sync_all()
     923          149 :             .await
     924          294 :             .maybe_fatal_err("image_layer sync_all")?;
     925              : 
     926          294 :         trace!("created image layer {}", self.path);
     927              : 
     928          294 :         Ok((desc, self.path))
     929          294 :     }
     930              : }
     931              : 
     932              : /// A builder object for constructing a new image layer.
     933              : ///
     934              : /// Usage:
     935              : ///
     936              : /// 1. Create the ImageLayerWriter by calling ImageLayerWriter::new(...)
     937              : ///
     938              : /// 2. Write the contents by calling `put_page_image` for every key-value
     939              : ///    pair in the key range.
     940              : ///
     941              : /// 3. Call `finish`.
     942              : ///
     943              : /// # Note
     944              : ///
     945              : /// As described in <https://github.com/neondatabase/neon/issues/2650>, it's
     946              : /// possible for the writer to drop before `finish` is actually called. So this
     947              : /// could lead to odd temporary files in the directory, exhausting file system.
     948              : /// This structure wraps `ImageLayerWriterInner` and also contains `Drop`
     949              : /// implementation that cleans up the temporary file in failure. It's not
     950              : /// possible to do this directly in `ImageLayerWriterInner` since `finish` moves
     951              : /// out some fields, making it impossible to implement `Drop`.
     952              : ///
     953              : #[must_use]
     954              : pub struct ImageLayerWriter {
     955              :     inner: Option<ImageLayerWriterInner>,
     956              : }
     957              : 
     958              : impl ImageLayerWriter {
     959              :     ///
     960              :     /// Start building a new image layer.
     961              :     ///
     962          490 :     pub async fn new(
     963          490 :         conf: &'static PageServerConf,
     964          490 :         timeline_id: TimelineId,
     965          490 :         tenant_shard_id: TenantShardId,
     966          490 :         key_range: &Range<Key>,
     967          490 :         lsn: Lsn,
     968          490 :         ctx: &RequestContext,
     969          490 :     ) -> anyhow::Result<ImageLayerWriter> {
     970          490 :         Ok(Self {
     971          490 :             inner: Some(
     972          490 :                 ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
     973          328 :                     .await?,
     974              :             ),
     975              :         })
     976          490 :     }
     977              : 
     978              :     ///
     979              :     /// Write next value to the file.
     980              :     ///
     981              :     /// The page versions must be appended in blknum order.
     982              :     ///
     983       546152 :     pub async fn put_image(
     984       546152 :         &mut self,
     985       546152 :         key: Key,
     986       546152 :         img: Bytes,
     987       546152 :         ctx: &RequestContext,
     988       546152 :     ) -> anyhow::Result<()> {
     989       554810 :         self.inner.as_mut().unwrap().put_image(key, img, ctx).await
     990       546152 :     }
     991              : 
     992              :     /// Estimated size of the image layer.
     993         8382 :     pub(crate) fn estimated_size(&self) -> u64 {
     994         8382 :         let inner = self.inner.as_ref().unwrap();
     995         8382 :         inner.blob_writer.size() + inner.tree.borrow_writer().size() + PAGE_SZ as u64
     996         8382 :     }
     997              : 
     998         8442 :     pub(crate) fn num_keys(&self) -> usize {
     999         8442 :         self.inner.as_ref().unwrap().num_keys
    1000         8442 :     }
    1001              : 
    1002              :     ///
    1003              :     /// Finish writing the image layer.
    1004              :     ///
    1005          268 :     pub(crate) async fn finish(
    1006          268 :         mut self,
    1007          268 :         ctx: &RequestContext,
    1008          268 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
    1009          776 :         self.inner.take().unwrap().finish(ctx, None).await
    1010          268 :     }
    1011              : 
    1012              :     /// Finish writing the image layer with an end key, used in [`super::split_writer::SplitImageLayerWriter`]. The end key determines the end of the image layer's covered range and is exclusive.
    1013           26 :     pub(super) async fn finish_with_end_key(
    1014           26 :         mut self,
    1015           26 :         end_key: Key,
    1016           26 :         ctx: &RequestContext,
    1017           26 :     ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
    1018           53 :         self.inner.take().unwrap().finish(ctx, Some(end_key)).await
    1019           26 :     }
    1020              : }
    1021              : 
    1022              : impl Drop for ImageLayerWriter {
    1023          490 :     fn drop(&mut self) {
    1024          490 :         if let Some(inner) = self.inner.take() {
    1025          196 :             inner.blob_writer.into_inner().remove();
    1026          294 :         }
    1027          490 :     }
    1028              : }
    1029              : 
    1030              : pub struct ImageLayerIterator<'a> {
    1031              :     image_layer: &'a ImageLayerInner,
    1032              :     ctx: &'a RequestContext,
    1033              :     planner: StreamingVectoredReadPlanner,
    1034              :     index_iter: DiskBtreeIterator<'a>,
    1035              :     key_values_batch: VecDeque<(Key, Lsn, Value)>,
    1036              :     is_end: bool,
    1037              : }
    1038              : 
    1039              : impl<'a> ImageLayerIterator<'a> {
    1040            0 :     pub(crate) fn layer_dbg_info(&self) -> String {
    1041            0 :         self.image_layer.layer_dbg_info()
    1042            0 :     }
    1043              : 
    1044              :     /// Retrieve a batch of key-value pairs into the iterator buffer.
    1045        19096 :     async fn next_batch(&mut self) -> anyhow::Result<()> {
    1046        19096 :         assert!(self.key_values_batch.is_empty());
    1047        19096 :         assert!(!self.is_end);
    1048              : 
    1049        19096 :         let plan = loop {
    1050        28770 :             if let Some(res) = self.index_iter.next().await {
    1051        28706 :                 let (raw_key, offset) = res?;
    1052        28706 :                 if let Some(batch_plan) = self.planner.handle(
    1053        28706 :                     Key::from_slice(&raw_key[..KEY_SIZE]),
    1054        28706 :                     self.image_layer.lsn,
    1055        28706 :                     offset,
    1056        28706 :                 ) {
    1057        19032 :                     break batch_plan;
    1058         9674 :                 }
    1059              :             } else {
    1060           64 :                 self.is_end = true;
    1061           64 :                 let payload_end = self.image_layer.index_start_blk as u64 * PAGE_SZ as u64;
    1062           64 :                 if let Some(item) = self.planner.handle_range_end(payload_end) {
    1063           64 :                     break item;
    1064              :                 } else {
    1065            0 :                     return Ok(()); // TODO: a test case on empty iterator
    1066              :                 }
    1067              :             }
    1068              :         };
    1069        19096 :         let vectored_blob_reader = VectoredBlobReader::new(&self.image_layer.file);
    1070        19096 :         let mut next_batch = std::collections::VecDeque::new();
    1071        19096 :         let buf_size = plan.size();
    1072        19096 :         let buf = IoBufferMut::with_capacity(buf_size);
    1073        19096 :         let blobs_buf = vectored_blob_reader
    1074        19096 :             .read_blobs(&plan, buf, self.ctx)
    1075         9696 :             .await?;
    1076        19096 :         let view = BufView::new_slice(&blobs_buf.buf);
    1077        28678 :         for meta in blobs_buf.blobs.iter() {
    1078        28678 :             let img_buf = meta.read(&view).await?;
    1079        28678 :             next_batch.push_back((
    1080        28678 :                 meta.meta.key,
    1081        28678 :                 self.image_layer.lsn,
    1082        28678 :                 Value::Image(img_buf.into_bytes()),
    1083        28678 :             ));
    1084              :         }
    1085        19096 :         self.key_values_batch = next_batch;
    1086        19096 :         Ok(())
    1087        19096 :     }
    1088              : 
    1089        28508 :     pub async fn next(&mut self) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
    1090        28508 :         if self.key_values_batch.is_empty() {
    1091        19112 :             if self.is_end {
    1092          100 :                 return Ok(None);
    1093        19012 :             }
    1094        19012 :             self.next_batch().await?;
    1095         9396 :         }
    1096        28408 :         Ok(Some(
    1097        28408 :             self.key_values_batch
    1098        28408 :                 .pop_front()
    1099        28408 :                 .expect("should not be empty"),
    1100        28408 :         ))
    1101        28508 :     }
    1102              : }
    1103              : 
    1104              : #[cfg(test)]
    1105              : mod test {
    1106              :     use std::{sync::Arc, time::Duration};
    1107              : 
    1108              :     use bytes::Bytes;
    1109              :     use itertools::Itertools;
    1110              :     use pageserver_api::{
    1111              :         key::Key,
    1112              :         shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize},
    1113              :     };
    1114              :     use utils::{
    1115              :         generation::Generation,
    1116              :         id::{TenantId, TimelineId},
    1117              :         lsn::Lsn,
    1118              :     };
    1119              : 
    1120              :     use crate::{
    1121              :         context::RequestContext,
    1122              :         repository::Value,
    1123              :         tenant::{
    1124              :             config::TenantConf,
    1125              :             harness::{TenantHarness, TIMELINE_ID},
    1126              :             storage_layer::{Layer, ResidentLayer},
    1127              :             vectored_blob_io::StreamingVectoredReadPlanner,
    1128              :             Tenant, Timeline,
    1129              :         },
    1130              :         DEFAULT_PG_VERSION,
    1131              :     };
    1132              : 
    1133              :     use super::{ImageLayerIterator, ImageLayerWriter};
    1134              : 
    1135              :     #[tokio::test]
    1136            2 :     async fn image_layer_rewrite() {
    1137            2 :         let tenant_conf = TenantConf {
    1138            2 :             gc_period: Duration::ZERO,
    1139            2 :             compaction_period: Duration::ZERO,
    1140            2 :             ..TenantConf::default()
    1141            2 :         };
    1142            2 :         let tenant_id = TenantId::generate();
    1143            2 :         let mut gen = Generation::new(0xdead0001);
    1144           10 :         let mut get_next_gen = || {
    1145           10 :             let ret = gen;
    1146           10 :             gen = gen.next();
    1147           10 :             ret
    1148           10 :         };
    1149            2 :         // The LSN at which we will create an image layer to filter
    1150            2 :         let lsn = Lsn(0xdeadbeef0000);
    1151            2 :         let timeline_id = TimelineId::generate();
    1152            2 : 
    1153            2 :         //
    1154            2 :         // Create an unsharded parent with a layer.
    1155            2 :         //
    1156            2 : 
    1157            2 :         let harness = TenantHarness::create_custom(
    1158            2 :             "test_image_layer_rewrite--parent",
    1159            2 :             tenant_conf.clone(),
    1160            2 :             tenant_id,
    1161            2 :             ShardIdentity::unsharded(),
    1162            2 :             get_next_gen(),
    1163            2 :         )
    1164            2 :         .await
    1165            2 :         .unwrap();
    1166            8 :         let (tenant, ctx) = harness.load().await;
    1167            2 :         let timeline = tenant
    1168            2 :             .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1169            4 :             .await
    1170            2 :             .unwrap();
    1171            2 : 
    1172            2 :         // This key range contains several 0x8000 page stripes, only one of which belongs to shard zero
    1173            2 :         let input_start = Key::from_hex("000000067f00000001000000ae0000000000").unwrap();
    1174            2 :         let input_end = Key::from_hex("000000067f00000001000000ae0000020000").unwrap();
    1175            2 :         let range = input_start..input_end;
    1176            2 : 
    1177            2 :         // Build an image layer to filter
    1178            2 :         let resident = {
    1179            2 :             let mut writer = ImageLayerWriter::new(
    1180            2 :                 harness.conf,
    1181            2 :                 timeline_id,
    1182            2 :                 harness.tenant_shard_id,
    1183            2 :                 &range,
    1184            2 :                 lsn,
    1185            2 :                 &ctx,
    1186            2 :             )
    1187            2 :             .await
    1188            2 :             .unwrap();
    1189            2 : 
    1190            2 :             let foo_img = Bytes::from_static(&[1, 2, 3, 4]);
    1191            2 :             let mut key = range.start;
    1192       262146 :             while key < range.end {
    1193       266239 :                 writer.put_image(key, foo_img.clone(), &ctx).await.unwrap();
    1194       262144 : 
    1195       262144 :                 key = key.next();
    1196            2 :             }
    1197          119 :             let (desc, path) = writer.finish(&ctx).await.unwrap();
    1198            2 :             Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap()
    1199            2 :         };
    1200            2 :         let original_size = resident.metadata().file_size;
    1201            2 : 
    1202            2 :         //
    1203            2 :         // Create child shards and do the rewrite, exercising filter().
    1204            2 :         // TODO: abstraction in TenantHarness for splits.
    1205            2 :         //
    1206            2 : 
    1207            2 :         // Filter for various shards: this exercises cases like values at start of key range, end of key
    1208            2 :         // range, middle of key range.
    1209            2 :         let shard_count = ShardCount::new(4);
    1210            8 :         for shard_number in 0..shard_count.count() {
    1211            2 :             //
    1212            2 :             // mimic the shard split
    1213            2 :             //
    1214            8 :             let shard_identity = ShardIdentity::new(
    1215            8 :                 ShardNumber(shard_number),
    1216            8 :                 shard_count,
    1217            8 :                 ShardStripeSize(0x8000),
    1218            8 :             )
    1219            8 :             .unwrap();
    1220            8 :             let harness = TenantHarness::create_custom(
    1221            8 :                 Box::leak(Box::new(format!(
    1222            8 :                     "test_image_layer_rewrite--child{}",
    1223            8 :                     shard_identity.shard_slug()
    1224            8 :                 ))),
    1225            8 :                 tenant_conf.clone(),
    1226            8 :                 tenant_id,
    1227            8 :                 shard_identity,
    1228            8 :                 // NB: in reality, the shards would each fork off their own gen number sequence from the parent.
    1229            8 :                 // But here, all we care about is that the gen number is unique.
    1230            8 :                 get_next_gen(),
    1231            8 :             )
    1232            2 :             .await
    1233            8 :             .unwrap();
    1234           32 :             let (tenant, ctx) = harness.load().await;
    1235            8 :             let timeline = tenant
    1236            8 :                 .create_test_timeline(timeline_id, lsn, DEFAULT_PG_VERSION, &ctx)
    1237           12 :                 .await
    1238            8 :                 .unwrap();
    1239            2 : 
    1240            2 :             //
    1241            2 :             // use filter() and make assertions
    1242            2 :             //
    1243            2 : 
    1244            8 :             let mut filtered_writer = ImageLayerWriter::new(
    1245            8 :                 harness.conf,
    1246            8 :                 timeline_id,
    1247            8 :                 harness.tenant_shard_id,
    1248            8 :                 &range,
    1249            8 :                 lsn,
    1250            8 :                 &ctx,
    1251            8 :             )
    1252            4 :             .await
    1253            8 :             .unwrap();
    1254            2 : 
    1255            8 :             let wrote_keys = resident
    1256            8 :                 .filter(&shard_identity, &mut filtered_writer, &ctx)
    1257       266719 :                 .await
    1258            8 :                 .unwrap();
    1259            8 :             let replacement = if wrote_keys > 0 {
    1260          129 :                 let (desc, path) = filtered_writer.finish(&ctx).await.unwrap();
    1261            6 :                 let resident = Layer::finish_creating(tenant.conf, &timeline, desc, &path).unwrap();
    1262            6 :                 Some(resident)
    1263            2 :             } else {
    1264            2 :                 None
    1265            2 :             };
    1266            2 : 
    1267            2 :             // This exact size and those below will need updating as/when the layer encoding changes, but
    1268            2 :             // should be deterministic for a given version of the format, as we used no randomness generating the input.
    1269            8 :             assert_eq!(original_size, 1597440);
    1270            2 : 
    1271            8 :             match shard_number {
    1272            2 :                 0 => {
    1273            2 :                     // We should have written out just one stripe for our shard identity
    1274            2 :                     assert_eq!(wrote_keys, 0x8000);
    1275            2 :                     let replacement = replacement.unwrap();
    1276            2 : 
    1277            2 :                     // We should have dropped some of the data
    1278            2 :                     assert!(replacement.metadata().file_size < original_size);
    1279            2 :                     assert!(replacement.metadata().file_size > 0);
    1280            2 : 
    1281            2 :                     // Assert that we dropped ~3/4 of the data.
    1282            2 :                     assert_eq!(replacement.metadata().file_size, 417792);
    1283            2 :                 }
    1284            2 :                 1 => {
    1285            2 :                     // Shard 1 has no keys in our input range
    1286            2 :                     assert_eq!(wrote_keys, 0x0);
    1287            2 :                     assert!(replacement.is_none());
    1288            2 :                 }
    1289            2 :                 2 => {
    1290            2 :                     // Shard 2 has one stripes in the input range
    1291            2 :                     assert_eq!(wrote_keys, 0x8000);
    1292            2 :                     let replacement = replacement.unwrap();
    1293            2 :                     assert!(replacement.metadata().file_size < original_size);
    1294            2 :                     assert!(replacement.metadata().file_size > 0);
    1295            2 :                     assert_eq!(replacement.metadata().file_size, 417792);
    1296            2 :                 }
    1297            2 :                 3 => {
    1298            2 :                     // Shard 3 has two stripes in the input range
    1299            2 :                     assert_eq!(wrote_keys, 0x10000);
    1300            2 :                     let replacement = replacement.unwrap();
    1301            2 :                     assert!(replacement.metadata().file_size < original_size);
    1302            2 :                     assert!(replacement.metadata().file_size > 0);
    1303            2 :                     assert_eq!(replacement.metadata().file_size, 811008);
    1304            2 :                 }
    1305            2 :                 _ => unreachable!(),
    1306            2 :             }
    1307            2 :         }
    1308            2 :     }
    1309              : 
    1310            2 :     async fn produce_image_layer(
    1311            2 :         tenant: &Tenant,
    1312            2 :         tline: &Arc<Timeline>,
    1313            2 :         mut images: Vec<(Key, Bytes)>,
    1314            2 :         lsn: Lsn,
    1315            2 :         ctx: &RequestContext,
    1316            2 :     ) -> anyhow::Result<ResidentLayer> {
    1317            2 :         images.sort();
    1318            2 :         let (key_start, _) = images.first().unwrap();
    1319            2 :         let (key_last, _) = images.last().unwrap();
    1320            2 :         let key_end = key_last.next();
    1321            2 :         let key_range = *key_start..key_end;
    1322            2 :         let mut writer = ImageLayerWriter::new(
    1323            2 :             tenant.conf,
    1324            2 :             tline.timeline_id,
    1325            2 :             tenant.tenant_shard_id,
    1326            2 :             &key_range,
    1327            2 :             lsn,
    1328            2 :             ctx,
    1329            2 :         )
    1330            1 :         .await?;
    1331              : 
    1332         2002 :         for (key, img) in images {
    1333         2031 :             writer.put_image(key, img, ctx).await?;
    1334              :         }
    1335            4 :         let (desc, path) = writer.finish(ctx).await?;
    1336            2 :         let img_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?;
    1337              : 
    1338            2 :         Ok::<_, anyhow::Error>(img_layer)
    1339            2 :     }
    1340              : 
    1341           28 :     async fn assert_img_iter_equal(
    1342           28 :         img_iter: &mut ImageLayerIterator<'_>,
    1343           28 :         expect: &[(Key, Bytes)],
    1344           28 :         expect_lsn: Lsn,
    1345           28 :     ) {
    1346           28 :         let mut expect_iter = expect.iter();
    1347              :         loop {
    1348        28028 :             let o1 = img_iter.next().await.unwrap();
    1349        28028 :             let o2 = expect_iter.next();
    1350        28028 :             match (o1, o2) {
    1351           28 :                 (None, None) => break,
    1352        28000 :                 (Some((k1, l1, v1)), Some((k2, i2))) => {
    1353        28000 :                     let Value::Image(i1) = v1 else {
    1354            0 :                         panic!("expect Value::Image")
    1355              :                     };
    1356        28000 :                     assert_eq!(&k1, k2);
    1357        28000 :                     assert_eq!(l1, expect_lsn);
    1358        28000 :                     assert_eq!(&i1, i2);
    1359              :                 }
    1360            0 :                 (o1, o2) => panic!("iterators length mismatch: {:?}, {:?}", o1, o2),
    1361              :             }
    1362              :         }
    1363           28 :     }
    1364              : 
    1365              :     #[tokio::test]
    1366            2 :     async fn image_layer_iterator() {
    1367            2 :         let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
    1368            8 :         let (tenant, ctx) = harness.load().await;
    1369            2 : 
    1370            2 :         let tline = tenant
    1371            2 :             .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
    1372            4 :             .await
    1373            2 :             .unwrap();
    1374            2 : 
    1375         2000 :         fn get_key(id: u32) -> Key {
    1376         2000 :             let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
    1377         2000 :             key.field6 = id;
    1378         2000 :             key
    1379         2000 :         }
    1380            2 :         const N: usize = 1000;
    1381            2 :         let test_imgs = (0..N)
    1382         2000 :             .map(|idx| (get_key(idx as u32), Bytes::from(format!("img{idx:05}"))))
    1383            2 :             .collect_vec();
    1384            2 :         let resident_layer =
    1385            2 :             produce_image_layer(&tenant, &tline, test_imgs.clone(), Lsn(0x10), &ctx)
    1386         2036 :                 .await
    1387            2 :                 .unwrap();
    1388            2 :         let img_layer = resident_layer.get_as_image(&ctx).await.unwrap();
    1389            6 :         for max_read_size in [1, 1024] {
    1390           32 :             for batch_size in [1, 2, 4, 8, 3, 7, 13] {
    1391           28 :                 println!("running with batch_size={batch_size} max_read_size={max_read_size}");
    1392           28 :                 // Test if the batch size is correctly determined
    1393           28 :                 let mut iter = img_layer.iter(&ctx);
    1394           28 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1395           28 :                 let mut num_items = 0;
    1396          112 :                 for _ in 0..3 {
    1397           84 :                     iter.next_batch().await.unwrap();
    1398           84 :                     num_items += iter.key_values_batch.len();
    1399           84 :                     if max_read_size == 1 {
    1400            2 :                         // every key should be a batch b/c the value is larger than max_read_size
    1401           42 :                         assert_eq!(iter.key_values_batch.len(), 1);
    1402            2 :                     } else {
    1403           42 :                         assert!(iter.key_values_batch.len() <= batch_size);
    1404            2 :                     }
    1405           84 :                     if num_items >= N {
    1406            2 :                         break;
    1407           84 :                     }
    1408           84 :                     iter.key_values_batch.clear();
    1409            2 :                 }
    1410            2 :                 // Test if the result is correct
    1411           28 :                 let mut iter = img_layer.iter(&ctx);
    1412           28 :                 iter.planner = StreamingVectoredReadPlanner::new(max_read_size, batch_size);
    1413         9635 :                 assert_img_iter_equal(&mut iter, &test_imgs, Lsn(0x10)).await;
    1414            2 :             }
    1415            2 :         }
    1416            2 :     }
    1417              : }
        

Generated by: LCOV version 2.1-beta