LCOV - code coverage report
Current view: top level - pageserver/src/tenant - vectored_blob_io.rs (source / functions) Coverage Total Hit
Test: 7179b4db0d82ca8088cc95c44c4be4232078509c.info Lines: 94.6 % 615 582
Test Date: 2024-11-21 16:46:58 Functions: 91.5 % 47 43

            Line data    Source code
       1              : //!
       2              : //! Utilities for vectored reading of variable-sized "blobs".
       3              : //!
       4              : //! The "blob" api is an abstraction on top of the "block" api,
       5              : //! with the main difference being that blobs do not have a fixed
       6              : //! size (each blob is prefixed with 1 or 4 byte length field)
       7              : //!
       8              : //! The vectored apis provided in this module allow for planning
       9              : //! and executing disk IO which covers multiple blobs.
      10              : //!
      11              : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
      12              : //! adjacent blocks into a single disk IO request and exectuted by
      13              : //! [`VectoredBlobReader`] which does all the required offset juggling
      14              : //! and returns a buffer housing all the blobs and a list of offsets.
      15              : //!
      16              : //! Note that the vectored blob api does *not* go through the page cache.
      17              : 
      18              : use std::collections::BTreeMap;
      19              : use std::ops::Deref;
      20              : 
      21              : use bytes::Bytes;
      22              : use pageserver_api::key::Key;
      23              : use tokio::io::AsyncWriteExt;
      24              : use tokio_epoll_uring::BoundedBuf;
      25              : use utils::lsn::Lsn;
      26              : use utils::vec_map::VecMap;
      27              : 
      28              : use crate::context::RequestContext;
      29              : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
      30              : use crate::virtual_file::IoBufferMut;
      31              : use crate::virtual_file::{self, VirtualFile};
      32              : 
      33              : /// Metadata bundled with the start and end offset of a blob.
      34              : #[derive(Copy, Clone, Debug)]
      35              : pub struct BlobMeta {
      36              :     pub key: Key,
      37              :     pub lsn: Lsn,
      38              : }
      39              : 
      40              : /// A view into the vectored blobs read buffer.
      41              : #[derive(Clone, Debug)]
      42              : pub(crate) enum BufView<'a> {
      43              :     Slice(&'a [u8]),
      44              :     Bytes(bytes::Bytes),
      45              : }
      46              : 
      47              : impl<'a> BufView<'a> {
      48              :     /// Creates a new slice-based view on the blob.
      49       211718 :     pub fn new_slice(slice: &'a [u8]) -> Self {
      50       211718 :         Self::Slice(slice)
      51       211718 :     }
      52              : 
      53              :     /// Creates a new [`bytes::Bytes`]-based view on the blob.
      54            2 :     pub fn new_bytes(bytes: bytes::Bytes) -> Self {
      55            2 :         Self::Bytes(bytes)
      56            2 :     }
      57              : 
      58              :     /// Convert the view into `Bytes`.
      59              :     ///
      60              :     /// If using slice as the underlying storage, the copy will be an O(n) operation.
      61       344838 :     pub fn into_bytes(self) -> Bytes {
      62       344838 :         match self {
      63       344838 :             BufView::Slice(slice) => Bytes::copy_from_slice(slice),
      64            0 :             BufView::Bytes(bytes) => bytes,
      65              :         }
      66       344838 :     }
      67              : 
      68              :     /// Creates a sub-view of the blob based on the range.
      69      2655294 :     fn view(&self, range: std::ops::Range<usize>) -> Self {
      70      2655294 :         match self {
      71      2655294 :             BufView::Slice(slice) => BufView::Slice(&slice[range]),
      72            0 :             BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)),
      73              :         }
      74      2655294 :     }
      75              : }
      76              : 
      77              : impl Deref for BufView<'_> {
      78              :     type Target = [u8];
      79              : 
      80      2310522 :     fn deref(&self) -> &Self::Target {
      81      2310522 :         match self {
      82      2310520 :             BufView::Slice(slice) => slice,
      83            2 :             BufView::Bytes(bytes) => bytes,
      84              :         }
      85      2310522 :     }
      86              : }
      87              : 
      88              : impl AsRef<[u8]> for BufView<'_> {
      89            0 :     fn as_ref(&self) -> &[u8] {
      90            0 :         match self {
      91            0 :             BufView::Slice(slice) => slice,
      92            0 :             BufView::Bytes(bytes) => bytes.as_ref(),
      93              :         }
      94            0 :     }
      95              : }
      96              : 
      97              : impl<'a> From<&'a [u8]> for BufView<'a> {
      98            0 :     fn from(value: &'a [u8]) -> Self {
      99            0 :         Self::new_slice(value)
     100            0 :     }
     101              : }
     102              : 
     103              : impl From<Bytes> for BufView<'_> {
     104            0 :     fn from(value: Bytes) -> Self {
     105            0 :         Self::new_bytes(value)
     106            0 :     }
     107              : }
     108              : 
     109              : /// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed,
     110              : /// subject to [`VectoredBlob::compression_bits`].
     111              : pub struct VectoredBlob {
     112              :     /// Blob metadata.
     113              :     pub meta: BlobMeta,
     114              :     /// Start offset.
     115              :     start: usize,
     116              :     /// End offset.
     117              :     end: usize,
     118              :     /// Compression used on the the blob.
     119              :     compression_bits: u8,
     120              : }
     121              : 
     122              : impl VectoredBlob {
     123              :     /// Reads a decompressed view of the blob.
     124      2655294 :     pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
     125      2655294 :         let view = buf.view(self.start..self.end);
     126      2655294 : 
     127      2655294 :         match self.compression_bits {
     128      2655292 :             BYTE_UNCOMPRESSED => Ok(view),
     129              :             BYTE_ZSTD => {
     130            2 :                 let mut decompressed_vec = Vec::new();
     131            2 :                 let mut decoder =
     132            2 :                     async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
     133            2 :                 decoder.write_all(&view).await?;
     134            2 :                 decoder.flush().await?;
     135              :                 // Zero-copy conversion from `Vec` to `Bytes`
     136            2 :                 Ok(BufView::new_bytes(Bytes::from(decompressed_vec)))
     137              :             }
     138            0 :             bits => {
     139            0 :                 let error = std::io::Error::new(
     140            0 :                     std::io::ErrorKind::InvalidData,
     141            0 :                     format!("Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}", self.meta.key, self.meta.lsn, self.start, self.end),
     142            0 :                 );
     143            0 :                 Err(error)
     144              :             }
     145              :         }
     146      2655294 :     }
     147              : }
     148              : 
     149              : impl std::fmt::Display for VectoredBlob {
     150            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     151            0 :         write!(
     152            0 :             f,
     153            0 :             "{}@{}, {}..{}",
     154            0 :             self.meta.key, self.meta.lsn, self.start, self.end
     155            0 :         )
     156            0 :     }
     157              : }
     158              : 
     159              : /// Return type of [`VectoredBlobReader::read_blobs`]
     160              : pub struct VectoredBlobsBuf {
     161              :     /// Buffer for all blobs in this read
     162              :     pub buf: IoBufferMut,
     163              :     /// Offsets into the buffer and metadata for all blobs in this read
     164              :     pub blobs: Vec<VectoredBlob>,
     165              : }
     166              : 
     167              : /// Description of one disk read for multiple blobs.
     168              : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
     169              : #[derive(Debug)]
     170              : pub struct VectoredRead {
     171              :     pub start: u64,
     172              :     pub end: u64,
     173              :     /// Start offset and metadata for each blob in this read
     174              :     pub blobs_at: VecMap<u64, BlobMeta>,
     175              : }
     176              : 
     177              : impl VectoredRead {
     178       964880 :     pub(crate) fn size(&self) -> usize {
     179       964880 :         (self.end - self.start) as usize
     180       964880 :     }
     181              : }
     182              : 
     183              : #[derive(Eq, PartialEq, Debug)]
     184              : pub(crate) enum VectoredReadExtended {
     185              :     Yes,
     186              :     No,
     187              : }
     188              : 
     189              : /// A vectored read builder that tries to coalesce all reads that fits in a chunk.
     190              : pub(crate) struct ChunkedVectoredReadBuilder {
     191              :     /// Start block number
     192              :     start_blk_no: usize,
     193              :     /// End block number (exclusive).
     194              :     end_blk_no: usize,
     195              :     /// Start offset and metadata for each blob in this read
     196              :     blobs_at: VecMap<u64, BlobMeta>,
     197              :     max_read_size: Option<usize>,
     198              : }
     199              : 
     200              : impl ChunkedVectoredReadBuilder {
     201              :     const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
     202              :     /// Start building a new vectored read.
     203              :     ///
     204              :     /// Note that by design, this does not check against reading more than `max_read_size` to
     205              :     /// support reading larger blobs than the configuration value. The builder will be single use
     206              :     /// however after that.
     207       211794 :     fn new_impl(
     208       211794 :         start_offset: u64,
     209       211794 :         end_offset: u64,
     210       211794 :         meta: BlobMeta,
     211       211794 :         max_read_size: Option<usize>,
     212       211794 :     ) -> Self {
     213       211794 :         let mut blobs_at = VecMap::default();
     214       211794 :         blobs_at
     215       211794 :             .append(start_offset, meta)
     216       211794 :             .expect("First insertion always succeeds");
     217       211794 : 
     218       211794 :         let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
     219       211794 :         let end_blk_no = (end_offset as usize).div_ceil(Self::CHUNK_SIZE);
     220       211794 :         Self {
     221       211794 :             start_blk_no,
     222       211794 :             end_blk_no,
     223       211794 :             blobs_at,
     224       211794 :             max_read_size,
     225       211794 :         }
     226       211794 :     }
     227              : 
     228       171270 :     pub(crate) fn new(
     229       171270 :         start_offset: u64,
     230       171270 :         end_offset: u64,
     231       171270 :         meta: BlobMeta,
     232       171270 :         max_read_size: usize,
     233       171270 :     ) -> Self {
     234       171270 :         Self::new_impl(start_offset, end_offset, meta, Some(max_read_size))
     235       171270 :     }
     236              : 
     237        40524 :     pub(crate) fn new_streaming(start_offset: u64, end_offset: u64, meta: BlobMeta) -> Self {
     238        40524 :         Self::new_impl(start_offset, end_offset, meta, None)
     239        40524 :     }
     240              : 
     241              :     /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
     242              :     ///
     243              :     /// The resulting size also must be below the max read size.
     244      2466830 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     245      2466830 :         tracing::trace!(start, end, "trying to extend");
     246      2466830 :         let start_blk_no = start as usize / Self::CHUNK_SIZE;
     247      2466830 :         let end_blk_no = (end as usize).div_ceil(Self::CHUNK_SIZE);
     248              : 
     249      2466830 :         let not_limited_by_max_read_size = {
     250      2466830 :             if let Some(max_read_size) = self.max_read_size {
     251       379482 :                 let coalesced_size = (end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE;
     252       379482 :                 coalesced_size <= max_read_size
     253              :             } else {
     254      2087348 :                 true
     255              :             }
     256              :         };
     257              : 
     258              :         // True if the second block starts in the same block or the immediate next block where the first block ended.
     259              :         //
     260              :         // Note: This automatically handles the case where two blocks are adjacent to each other,
     261              :         // whether they starts on chunk size boundary or not.
     262      2466830 :         let is_adjacent_chunk_read = {
     263              :             // 1. first.end & second.start are in the same block
     264      2466830 :             self.end_blk_no == start_blk_no + 1 ||
     265              :             // 2. first.end ends one block before second.start
     266        11968 :             self.end_blk_no == start_blk_no
     267              :         };
     268              : 
     269      2466830 :         if is_adjacent_chunk_read && not_limited_by_max_read_size {
     270      2443612 :             self.end_blk_no = end_blk_no;
     271      2443612 :             self.blobs_at
     272      2443612 :                 .append(start, meta)
     273      2443612 :                 .expect("LSNs are ordered within vectored reads");
     274      2443612 : 
     275      2443612 :             return VectoredReadExtended::Yes;
     276        23218 :         }
     277        23218 : 
     278        23218 :         VectoredReadExtended::No
     279      2466830 :     }
     280              : 
     281      2127254 :     pub(crate) fn size(&self) -> usize {
     282      2127254 :         (self.end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE
     283      2127254 :     }
     284              : 
     285       211794 :     pub(crate) fn build(self) -> VectoredRead {
     286       211794 :         let start = (self.start_blk_no * Self::CHUNK_SIZE) as u64;
     287       211794 :         let end = (self.end_blk_no * Self::CHUNK_SIZE) as u64;
     288       211794 :         VectoredRead {
     289       211794 :             start,
     290       211794 :             end,
     291       211794 :             blobs_at: self.blobs_at,
     292       211794 :         }
     293       211794 :     }
     294              : }
     295              : 
     296              : #[derive(Copy, Clone, Debug)]
     297              : pub enum BlobFlag {
     298              :     None,
     299              :     Ignore,
     300              :     ReplaceAll,
     301              : }
     302              : 
     303              : /// Planner for vectored blob reads.
     304              : ///
     305              : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
     306              : /// and coalesced into disk reads.
     307              : ///
     308              : /// The implementation is very simple:
     309              : /// * Collect all blob offsets in an ordered structure
     310              : /// * Iterate over the collected blobs and coalesce them into reads at the end
     311              : pub struct VectoredReadPlanner {
     312              :     // Track all the blob offsets. Start offsets must be ordered.
     313              :     blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
     314              :     // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
     315              :     prev: Option<(Key, Lsn, u64, BlobFlag)>,
     316              : 
     317              :     max_read_size: usize,
     318              : }
     319              : 
     320              : impl VectoredReadPlanner {
     321       239320 :     pub fn new(max_read_size: usize) -> Self {
     322       239320 :         Self {
     323       239320 :             blobs: BTreeMap::new(),
     324       239320 :             prev: None,
     325       239320 :             max_read_size,
     326       239320 :         }
     327       239320 :     }
     328              : 
     329              :     /// Include a new blob in the read plan.
     330              :     ///
     331              :     /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
     332              :     /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
     333              :     /// keys in a given keyspace. This function must be called for each key in the desired
     334              :     /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
     335              :     /// be called after every range in the offset.
     336              :     ///
     337              :     /// In the event that keys are skipped, the behaviour is undefined and can lead to an
     338              :     /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
     339              :     /// incorrect data to the user.
     340              :     ///
     341              :     /// The `flag` argument has two interesting values:
     342              :     /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
     343              :     ///   This is used for WAL records that `will_init`.
     344              :     /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
     345              :     ///   if the blob is cached.
     346      1519934 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
     347              :         // Implementation note: internally lag behind by one blob such that
     348              :         // we have a start and end offset when initialising [`VectoredRead`]
     349      1519934 :         let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
     350              :             None => {
     351       152743 :                 self.prev = Some((key, lsn, offset, flag));
     352       152743 :                 return;
     353              :             }
     354      1367191 :             Some(prev) => prev,
     355      1367191 :         };
     356      1367191 : 
     357      1367191 :         self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     358      1367191 : 
     359      1367191 :         self.prev = Some((key, lsn, offset, flag));
     360      1519934 :     }
     361              : 
     362       239530 :     pub fn handle_range_end(&mut self, offset: u64) {
     363       239530 :         if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
     364       152743 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     365       152743 :         }
     366              : 
     367       239530 :         self.prev = None;
     368       239530 :     }
     369              : 
     370      1519934 :     fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
     371      1519934 :         match flag {
     372       373822 :             BlobFlag::None => {
     373       373822 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     374       373822 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     375       373822 :             }
     376       256321 :             BlobFlag::ReplaceAll => {
     377       256321 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     378       256321 :                 blobs_for_key.clear();
     379       256321 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     380       256321 :             }
     381       889791 :             BlobFlag::Ignore => {}
     382              :         }
     383      1519934 :     }
     384              : 
     385       239320 :     pub fn finish(self) -> Vec<VectoredRead> {
     386       239320 :         let mut current_read_builder: Option<ChunkedVectoredReadBuilder> = None;
     387       239320 :         let mut reads = Vec::new();
     388              : 
     389       711552 :         for (key, blobs_for_key) in self.blobs {
     390       995622 :             for (lsn, start_offset, end_offset) in blobs_for_key {
     391       523390 :                 let extended = match &mut current_read_builder {
     392       379460 :                     Some(read_builder) => {
     393       379460 :                         read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
     394              :                     }
     395       143930 :                     None => VectoredReadExtended::No,
     396              :                 };
     397              : 
     398       523390 :                 if extended == VectoredReadExtended::No {
     399       167142 :                     let next_read_builder = ChunkedVectoredReadBuilder::new(
     400       167142 :                         start_offset,
     401       167142 :                         end_offset,
     402       167142 :                         BlobMeta { key, lsn },
     403       167142 :                         self.max_read_size,
     404       167142 :                     );
     405       167142 : 
     406       167142 :                     let prev_read_builder = current_read_builder.replace(next_read_builder);
     407              : 
     408              :                     // `current_read_builder` is None in the first iteration of the outer loop
     409       167142 :                     if let Some(read_builder) = prev_read_builder {
     410        23212 :                         reads.push(read_builder.build());
     411       143930 :                     }
     412       356248 :                 }
     413              :             }
     414              :         }
     415              : 
     416       239320 :         if let Some(read_builder) = current_read_builder {
     417       143930 :             reads.push(read_builder.build());
     418       143930 :         }
     419              : 
     420       239320 :         reads
     421       239320 :     }
     422              : }
     423              : 
     424              : /// Disk reader for vectored blob spans (does not go through the page cache)
     425              : pub struct VectoredBlobReader<'a> {
     426              :     file: &'a VirtualFile,
     427              : }
     428              : 
     429              : impl<'a> VectoredBlobReader<'a> {
     430       279834 :     pub fn new(file: &'a VirtualFile) -> Self {
     431       279834 :         Self { file }
     432       279834 :     }
     433              : 
     434              :     /// Read the requested blobs into the buffer.
     435              :     ///
     436              :     /// We have to deal with the fact that blobs are not fixed size.
     437              :     /// Each blob is prefixed by a size header.
     438              :     ///
     439              :     /// The success return value is a struct which contains the buffer
     440              :     /// filled from disk and a list of offsets at which each blob lies
     441              :     /// in the buffer.
     442       211718 :     pub async fn read_blobs(
     443       211718 :         &self,
     444       211718 :         read: &VectoredRead,
     445       211718 :         buf: IoBufferMut,
     446       211718 :         ctx: &RequestContext,
     447       211718 :     ) -> Result<VectoredBlobsBuf, std::io::Error> {
     448       211718 :         assert!(read.size() > 0);
     449       211718 :         assert!(
     450       211718 :             read.size() <= buf.capacity(),
     451            0 :             "{} > {}",
     452            0 :             read.size(),
     453            0 :             buf.capacity()
     454              :         );
     455              : 
     456       211718 :         if cfg!(debug_assertions) {
     457       211718 :             const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
     458       211718 :             debug_assert_eq!(
     459       211718 :                 read.start % ALIGN,
     460              :                 0,
     461            0 :                 "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
     462              :                 read.start,
     463              :                 ALIGN
     464              :             );
     465            0 :         }
     466              : 
     467       211718 :         let buf = self
     468       211718 :             .file
     469       211718 :             .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
     470       107680 :             .await?
     471       211718 :             .into_inner();
     472       211718 : 
     473       211718 :         let blobs_at = read.blobs_at.as_slice();
     474       211718 : 
     475       211718 :         let start_offset = read.start;
     476       211718 : 
     477       211718 :         let mut metas = Vec::with_capacity(blobs_at.len());
     478              :         // Blobs in `read` only provide their starting offset. The end offset
     479              :         // of a blob is implicit: the start of the next blob if one exists
     480              :         // or the end of the read.
     481              : 
     482      2867012 :         for (blob_start, meta) in blobs_at {
     483      2655294 :             let blob_start_in_buf = blob_start - start_offset;
     484      2655294 :             let first_len_byte = buf[blob_start_in_buf as usize];
     485              : 
     486              :             // Each blob is prefixed by a header containing its size and compression information.
     487              :             // Extract the size and skip that header to find the start of the data.
     488              :             // The size can be 1 or 4 bytes. The most significant bit is 0 in the
     489              :             // 1 byte case and 1 in the 4 byte case.
     490      2655294 :             let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
     491      2617602 :                 (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
     492              :             } else {
     493        37692 :                 let mut blob_size_buf = [0u8; 4];
     494        37692 :                 let offset_in_buf = blob_start_in_buf as usize;
     495        37692 : 
     496        37692 :                 blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
     497        37692 :                 blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
     498        37692 : 
     499        37692 :                 let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
     500        37692 :                 (
     501        37692 :                     4,
     502        37692 :                     u32::from_be_bytes(blob_size_buf) as u64,
     503        37692 :                     compression_bits,
     504        37692 :                 )
     505              :             };
     506              : 
     507      2655294 :             let start = (blob_start_in_buf + size_length) as usize;
     508      2655294 :             let end = start + blob_size as usize;
     509      2655294 : 
     510      2655294 :             metas.push(VectoredBlob {
     511      2655294 :                 start,
     512      2655294 :                 end,
     513      2655294 :                 meta: *meta,
     514      2655294 :                 compression_bits,
     515      2655294 :             });
     516              :         }
     517              : 
     518       211718 :         Ok(VectoredBlobsBuf { buf, blobs: metas })
     519       211718 :     }
     520              : }
     521              : 
     522              : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
     523              : ///
     524              : /// It provides a streaming API for getting read blobs. It returns a batch when
     525              : /// `handle` gets called and when the current key would just exceed the read_size and
     526              : /// max_cnt constraints.
     527              : pub struct StreamingVectoredReadPlanner {
     528              :     read_builder: Option<ChunkedVectoredReadBuilder>,
     529              :     // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
     530              :     prev: Option<(Key, Lsn, u64)>,
     531              :     /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
     532              :     /// we will produce a single batch instead of split them.
     533              :     max_read_size: u64,
     534              :     /// Max item count per batch
     535              :     max_cnt: usize,
     536              :     /// Size of the current batch
     537              :     cnt: usize,
     538              : }
     539              : 
     540              : impl StreamingVectoredReadPlanner {
     541          788 :     pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
     542          788 :         assert!(max_cnt > 0);
     543          788 :         assert!(max_read_size > 0);
     544          788 :         Self {
     545          788 :             read_builder: None,
     546          788 :             prev: None,
     547          788 :             max_cnt,
     548          788 :             max_read_size,
     549          788 :             cnt: 0,
     550          788 :         }
     551          788 :     }
     552              : 
     553      2127928 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
     554              :         // Implementation note: internally lag behind by one blob such that
     555              :         // we have a start and end offset when initialising [`VectoredRead`]
     556      2127928 :         let (prev_key, prev_lsn, prev_offset) = match self.prev {
     557              :             None => {
     558          674 :                 self.prev = Some((key, lsn, offset));
     559          674 :                 return None;
     560              :             }
     561      2127254 :             Some(prev) => prev,
     562      2127254 :         };
     563      2127254 : 
     564      2127254 :         let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
     565      2127254 : 
     566      2127254 :         self.prev = Some((key, lsn, offset));
     567      2127254 : 
     568      2127254 :         res
     569      2127928 :     }
     570              : 
     571          620 :     pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
     572          620 :         let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
     573          618 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
     574              :         } else {
     575            2 :             None
     576              :         };
     577              : 
     578          620 :         self.prev = None;
     579          620 : 
     580          620 :         res
     581          620 :     }
     582              : 
     583      2127872 :     fn add_blob(
     584      2127872 :         &mut self,
     585      2127872 :         key: Key,
     586      2127872 :         lsn: Lsn,
     587      2127872 :         start_offset: u64,
     588      2127872 :         end_offset: u64,
     589      2127872 :         is_last_blob_in_read: bool,
     590      2127872 :     ) -> Option<VectoredRead> {
     591      2127872 :         match &mut self.read_builder {
     592      2087348 :             Some(read_builder) => {
     593      2087348 :                 let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
     594      2087348 :                 assert_eq!(extended, VectoredReadExtended::Yes);
     595              :             }
     596        40524 :             None => {
     597        40524 :                 self.read_builder = {
     598        40524 :                     Some(ChunkedVectoredReadBuilder::new_streaming(
     599        40524 :                         start_offset,
     600        40524 :                         end_offset,
     601        40524 :                         BlobMeta { key, lsn },
     602        40524 :                     ))
     603        40524 :                 };
     604        40524 :             }
     605              :         }
     606      2127872 :         let read_builder = self.read_builder.as_mut().unwrap();
     607      2127872 :         self.cnt += 1;
     608      2127872 :         if is_last_blob_in_read
     609      2127254 :             || read_builder.size() >= self.max_read_size as usize
     610      2098410 :             || self.cnt >= self.max_cnt
     611              :         {
     612        40524 :             let prev_read_builder = self.read_builder.take();
     613        40524 :             self.cnt = 0;
     614              : 
     615              :             // `current_read_builder` is None in the first iteration
     616        40524 :             if let Some(read_builder) = prev_read_builder {
     617        40524 :                 return Some(read_builder.build());
     618            0 :             }
     619      2087348 :         }
     620      2087348 :         None
     621      2127872 :     }
     622              : }
     623              : 
     624              : #[cfg(test)]
     625              : mod tests {
     626              :     use anyhow::Error;
     627              : 
     628              :     use crate::context::DownloadBehavior;
     629              :     use crate::page_cache::PAGE_SZ;
     630              :     use crate::task_mgr::TaskKind;
     631              : 
     632              :     use super::super::blob_io::tests::{random_array, write_maybe_compressed};
     633              :     use super::*;
     634              : 
     635           48 :     fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
     636              :         const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
     637           48 :         assert_eq!(read.start % ALIGN, 0);
     638           48 :         assert_eq!(read.start / ALIGN, offset_range.first().unwrap().2 / ALIGN);
     639              : 
     640           84 :         let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
     641           48 : 
     642           48 :         let offsets_in_read: Vec<_> = read
     643           48 :             .blobs_at
     644           48 :             .as_slice()
     645           48 :             .iter()
     646           84 :             .map(|(offset, _)| *offset)
     647           48 :             .collect();
     648           48 : 
     649           48 :         assert_eq!(expected_offsets_in_read, offsets_in_read);
     650           48 :     }
     651              : 
     652              :     #[test]
     653            2 :     fn planner_chunked_coalesce_all_test() {
     654              :         use crate::virtual_file;
     655              : 
     656              :         const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
     657              : 
     658            2 :         let max_read_size = CHUNK_SIZE as usize * 8;
     659            2 :         let key = Key::MIN;
     660            2 :         let lsn = Lsn(0);
     661            2 : 
     662            2 :         let blob_descriptions = [
     663            2 :             (key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
     664            2 :             (key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
     665            2 :             (key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
     666            2 :             (key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
     667            2 :             (key, lsn, CHUNK_SIZE, BlobFlag::None),
     668            2 :             (key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
     669            2 :             (key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
     670            2 :             (key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
     671            2 :             (key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
     672            2 :             (key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
     673            2 :             (key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
     674            2 :             (key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
     675            2 :             (key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
     676            2 :             (key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
     677            2 :         ];
     678            2 : 
     679            2 :         let ranges = [
     680            2 :             &[
     681            2 :                 blob_descriptions[0],
     682            2 :                 blob_descriptions[2],
     683            2 :                 blob_descriptions[4],
     684            2 :                 blob_descriptions[5],
     685            2 :                 blob_descriptions[7],
     686            2 :                 blob_descriptions[8],
     687            2 :                 blob_descriptions[10],
     688            2 :             ],
     689            2 :             &blob_descriptions[11..12],
     690            2 :             &blob_descriptions[13..],
     691            2 :         ];
     692            2 : 
     693            2 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     694           30 :         for (key, lsn, offset, flag) in blob_descriptions {
     695           28 :             planner.handle(key, lsn, offset, flag);
     696           28 :         }
     697              : 
     698            2 :         planner.handle_range_end(652 * 1024);
     699            2 : 
     700            2 :         let reads = planner.finish();
     701            2 : 
     702            2 :         assert_eq!(reads.len(), ranges.len());
     703              : 
     704            6 :         for (idx, read) in reads.iter().enumerate() {
     705            6 :             validate_read(read, ranges[idx]);
     706            6 :         }
     707            2 :     }
     708              : 
     709              :     #[test]
     710            2 :     fn planner_max_read_size_test() {
     711            2 :         let max_read_size = 128 * 1024;
     712            2 :         let key = Key::MIN;
     713            2 :         let lsn = Lsn(0);
     714            2 : 
     715            2 :         let blob_descriptions = vec![
     716            2 :             (key, lsn, 0, BlobFlag::None),
     717            2 :             (key, lsn, 32 * 1024, BlobFlag::None),
     718            2 :             (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
     719            2 :             (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
     720            2 :             (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
     721            2 :             (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
     722            2 :             (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
     723            2 :             (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
     724            2 :         ];
     725            2 : 
     726            2 :         let ranges = [
     727            2 :             &blob_descriptions[0..3],
     728            2 :             &blob_descriptions[3..4],
     729            2 :             &blob_descriptions[4..5],
     730            2 :             &blob_descriptions[5..6],
     731            2 :             &blob_descriptions[6..7],
     732            2 :             &blob_descriptions[7..],
     733            2 :         ];
     734            2 : 
     735            2 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     736           16 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     737           16 :             planner.handle(key, lsn, offset, flag);
     738           16 :         }
     739              : 
     740            2 :         planner.handle_range_end(652 * 1024);
     741            2 : 
     742            2 :         let reads = planner.finish();
     743            2 : 
     744            2 :         assert_eq!(reads.len(), 6);
     745              : 
     746              :         // TODO: could remove zero reads to produce 5 reads here
     747              : 
     748           12 :         for (idx, read) in reads.iter().enumerate() {
     749           12 :             validate_read(read, ranges[idx]);
     750           12 :         }
     751            2 :     }
     752              : 
     753              :     #[test]
     754            2 :     fn planner_replacement_test() {
     755              :         const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
     756            2 :         let max_read_size = 128 * CHUNK_SIZE as usize;
     757            2 :         let first_key = Key::MIN;
     758            2 :         let second_key = first_key.next();
     759            2 :         let lsn = Lsn(0);
     760            2 : 
     761            2 :         let blob_descriptions = vec![
     762            2 :             (first_key, lsn, 0, BlobFlag::None),          // First in read 1
     763            2 :             (first_key, lsn, CHUNK_SIZE, BlobFlag::None), // Last in read 1
     764            2 :             (second_key, lsn, 2 * CHUNK_SIZE, BlobFlag::ReplaceAll),
     765            2 :             (second_key, lsn, 3 * CHUNK_SIZE, BlobFlag::None),
     766            2 :             (second_key, lsn, 4 * CHUNK_SIZE, BlobFlag::ReplaceAll), // First in read 2
     767            2 :             (second_key, lsn, 5 * CHUNK_SIZE, BlobFlag::None),       // Last in read 2
     768            2 :         ];
     769            2 : 
     770            2 :         let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
     771            2 : 
     772            2 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     773           12 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     774           12 :             planner.handle(key, lsn, offset, flag);
     775           12 :         }
     776              : 
     777            2 :         planner.handle_range_end(6 * CHUNK_SIZE);
     778            2 : 
     779            2 :         let reads = planner.finish();
     780            2 :         assert_eq!(reads.len(), 2);
     781              : 
     782            4 :         for (idx, read) in reads.iter().enumerate() {
     783            4 :             validate_read(read, ranges[idx]);
     784            4 :         }
     785            2 :     }
     786              : 
     787              :     #[test]
     788            2 :     fn streaming_planner_max_read_size_test() {
     789            2 :         let max_read_size = 128 * 1024;
     790            2 :         let key = Key::MIN;
     791            2 :         let lsn = Lsn(0);
     792            2 : 
     793            2 :         let blob_descriptions = vec![
     794            2 :             (key, lsn, 0, BlobFlag::None),
     795            2 :             (key, lsn, 32 * 1024, BlobFlag::None),
     796            2 :             (key, lsn, 96 * 1024, BlobFlag::None),
     797            2 :             (key, lsn, 128 * 1024, BlobFlag::None),
     798            2 :             (key, lsn, 198 * 1024, BlobFlag::None),
     799            2 :             (key, lsn, 268 * 1024, BlobFlag::None),
     800            2 :             (key, lsn, 396 * 1024, BlobFlag::None),
     801            2 :             (key, lsn, 652 * 1024, BlobFlag::None),
     802            2 :         ];
     803            2 : 
     804            2 :         let ranges = [
     805            2 :             &blob_descriptions[0..3],
     806            2 :             &blob_descriptions[3..5],
     807            2 :             &blob_descriptions[5..6],
     808            2 :             &blob_descriptions[6..7],
     809            2 :             &blob_descriptions[7..],
     810            2 :         ];
     811            2 : 
     812            2 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
     813            2 :         let mut reads = Vec::new();
     814           16 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     815           16 :             reads.extend(planner.handle(key, lsn, offset));
     816           16 :         }
     817            2 :         reads.extend(planner.handle_range_end(652 * 1024));
     818            2 : 
     819            2 :         assert_eq!(reads.len(), ranges.len());
     820              : 
     821           10 :         for (idx, read) in reads.iter().enumerate() {
     822           10 :             validate_read(read, ranges[idx]);
     823           10 :         }
     824            2 :     }
     825              : 
     826              :     #[test]
     827            2 :     fn streaming_planner_max_cnt_test() {
     828            2 :         let max_read_size = 1024 * 1024;
     829            2 :         let key = Key::MIN;
     830            2 :         let lsn = Lsn(0);
     831            2 : 
     832            2 :         let blob_descriptions = vec![
     833            2 :             (key, lsn, 0, BlobFlag::None),
     834            2 :             (key, lsn, 32 * 1024, BlobFlag::None),
     835            2 :             (key, lsn, 96 * 1024, BlobFlag::None),
     836            2 :             (key, lsn, 128 * 1024, BlobFlag::None),
     837            2 :             (key, lsn, 198 * 1024, BlobFlag::None),
     838            2 :             (key, lsn, 268 * 1024, BlobFlag::None),
     839            2 :             (key, lsn, 396 * 1024, BlobFlag::None),
     840            2 :             (key, lsn, 652 * 1024, BlobFlag::None),
     841            2 :         ];
     842            2 : 
     843            2 :         let ranges = [
     844            2 :             &blob_descriptions[0..2],
     845            2 :             &blob_descriptions[2..4],
     846            2 :             &blob_descriptions[4..6],
     847            2 :             &blob_descriptions[6..],
     848            2 :         ];
     849            2 : 
     850            2 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     851            2 :         let mut reads = Vec::new();
     852           16 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     853           16 :             reads.extend(planner.handle(key, lsn, offset));
     854           16 :         }
     855            2 :         reads.extend(planner.handle_range_end(652 * 1024));
     856            2 : 
     857            2 :         assert_eq!(reads.len(), ranges.len());
     858              : 
     859            8 :         for (idx, read) in reads.iter().enumerate() {
     860            8 :             validate_read(read, ranges[idx]);
     861            8 :         }
     862            2 :     }
     863              : 
     864              :     #[test]
     865            2 :     fn streaming_planner_edge_test() {
     866            2 :         let max_read_size = 1024 * 1024;
     867            2 :         let key = Key::MIN;
     868            2 :         let lsn = Lsn(0);
     869            2 :         {
     870            2 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     871            2 :             let mut reads = Vec::new();
     872            2 :             reads.extend(planner.handle_range_end(652 * 1024));
     873            2 :             assert!(reads.is_empty());
     874              :         }
     875              :         {
     876            2 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     877            2 :             let mut reads = Vec::new();
     878            2 :             reads.extend(planner.handle(key, lsn, 0));
     879            2 :             reads.extend(planner.handle_range_end(652 * 1024));
     880            2 :             assert_eq!(reads.len(), 1);
     881            2 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     882            2 :         }
     883            2 :         {
     884            2 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     885            2 :             let mut reads = Vec::new();
     886            2 :             reads.extend(planner.handle(key, lsn, 0));
     887            2 :             reads.extend(planner.handle(key, lsn, 128 * 1024));
     888            2 :             reads.extend(planner.handle_range_end(652 * 1024));
     889            2 :             assert_eq!(reads.len(), 2);
     890            2 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     891            2 :             validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
     892            2 :         }
     893            2 :         {
     894            2 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     895            2 :             let mut reads = Vec::new();
     896            2 :             reads.extend(planner.handle(key, lsn, 0));
     897            2 :             reads.extend(planner.handle(key, lsn, 128 * 1024));
     898            2 :             reads.extend(planner.handle_range_end(652 * 1024));
     899            2 :             assert_eq!(reads.len(), 1);
     900            2 :             validate_read(
     901            2 :                 &reads[0],
     902            2 :                 &[
     903            2 :                     (key, lsn, 0, BlobFlag::None),
     904            2 :                     (key, lsn, 128 * 1024, BlobFlag::None),
     905            2 :                 ],
     906            2 :             );
     907            2 :         }
     908            2 :     }
     909              : 
     910            8 :     async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
     911            8 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     912            8 :         let (_temp_dir, pathbuf, offsets) =
     913          279 :             write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
     914              : 
     915            8 :         let file = VirtualFile::open(&pathbuf, &ctx).await?;
     916            8 :         let file_len = std::fs::metadata(&pathbuf)?.len();
     917            8 : 
     918            8 :         // Multiply by two (compressed data might need more space), and add a few bytes for the header
     919         4120 :         let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
     920            8 :         let mut buf = IoBufferMut::with_capacity(reserved_bytes);
     921            8 : 
     922            8 :         let vectored_blob_reader = VectoredBlobReader::new(&file);
     923            8 :         let meta = BlobMeta {
     924            8 :             key: Key::MIN,
     925            8 :             lsn: Lsn(0),
     926            8 :         };
     927              : 
     928         4120 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     929         4120 :             let end = offsets.get(idx + 1).unwrap_or(&file_len);
     930         4120 :             if idx + 1 == offsets.len() {
     931            8 :                 continue;
     932         4112 :             }
     933         4112 :             let read_builder = ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
     934         4112 :             let read = read_builder.build();
     935         4112 :             let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
     936         4112 :             assert_eq!(result.blobs.len(), 1);
     937         4112 :             let read_blob = &result.blobs[0];
     938         4112 :             let view = BufView::new_slice(&result.buf);
     939         4112 :             let read_buf = read_blob.read(&view).await?;
     940         4112 :             assert_eq!(
     941         4112 :                 &blob[..],
     942         4112 :                 &read_buf[..],
     943            0 :                 "mismatch for idx={idx} at offset={offset}"
     944              :             );
     945         4112 :             buf = result.buf;
     946              :         }
     947            8 :         Ok(())
     948            8 :     }
     949              : 
     950              :     #[tokio::test]
     951            2 :     async fn test_really_big_array() -> Result<(), Error> {
     952            2 :         let blobs = &[
     953            2 :             b"test".to_vec(),
     954            2 :             random_array(10 * PAGE_SZ),
     955            2 :             b"hello".to_vec(),
     956            2 :             random_array(66 * PAGE_SZ),
     957            2 :             vec![0xf3; 24 * PAGE_SZ],
     958            2 :             b"foobar".to_vec(),
     959            2 :         ];
     960           14 :         round_trip_test_compressed(blobs, false).await?;
     961           11 :         round_trip_test_compressed(blobs, true).await?;
     962            2 :         Ok(())
     963            2 :     }
     964              : 
     965              :     #[tokio::test]
     966            2 :     async fn test_arrays_inc() -> Result<(), Error> {
     967            2 :         let blobs = (0..PAGE_SZ / 8)
     968         2048 :             .map(|v| random_array(v * 16))
     969            2 :             .collect::<Vec<_>>();
     970         1172 :         round_trip_test_compressed(&blobs, false).await?;
     971         1172 :         round_trip_test_compressed(&blobs, true).await?;
     972            2 :         Ok(())
     973            2 :     }
     974              : }
        

Generated by: LCOV version 2.1-beta