LCOV - code coverage report
Current view: top level - pageserver/src/tenant - vectored_blob_io.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 98.1 % 516 506
Test Date: 2024-08-02 21:34:27 Functions: 100.0 % 33 33

            Line data    Source code
       1              : //!
       2              : //! Utilities for vectored reading of variable-sized "blobs".
       3              : //!
       4              : //! The "blob" api is an abstraction on top of the "block" api,
       5              : //! with the main difference being that blobs do not have a fixed
       6              : //! size (each blob is prefixed with 1 or 4 byte length field)
       7              : //!
       8              : //! The vectored apis provided in this module allow for planning
       9              : //! and executing disk IO which covers multiple blobs.
      10              : //!
      11              : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
      12              : //! adjacent blocks into a single disk IO request and exectuted by
      13              : //! [`VectoredBlobReader`] which does all the required offset juggling
      14              : //! and returns a buffer housing all the blobs and a list of offsets.
      15              : //!
      16              : //! Note that the vectored blob api does *not* go through the page cache.
      17              : 
      18              : use std::collections::BTreeMap;
      19              : use std::num::NonZeroUsize;
      20              : 
      21              : use bytes::BytesMut;
      22              : use pageserver_api::key::Key;
      23              : use tokio::io::AsyncWriteExt;
      24              : use tokio_epoll_uring::BoundedBuf;
      25              : use utils::lsn::Lsn;
      26              : use utils::vec_map::VecMap;
      27              : 
      28              : use crate::context::RequestContext;
      29              : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
      30              : use crate::virtual_file::VirtualFile;
      31              : 
      32              : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
      33              : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
      34              : 
      35              : /// Metadata bundled with the start and end offset of a blob.
      36              : #[derive(Copy, Clone, Debug)]
      37              : pub struct BlobMeta {
      38              :     pub key: Key,
      39              :     pub lsn: Lsn,
      40              : }
      41              : 
      42              : /// Blob offsets into [`VectoredBlobsBuf::buf`]
      43              : pub struct VectoredBlob {
      44              :     pub start: usize,
      45              :     pub end: usize,
      46              :     pub meta: BlobMeta,
      47              : }
      48              : 
      49              : /// Return type of [`VectoredBlobReader::read_blobs`]
      50              : pub struct VectoredBlobsBuf {
      51              :     /// Buffer for all blobs in this read
      52              :     pub buf: BytesMut,
      53              :     /// Offsets into the buffer and metadata for all blobs in this read
      54              :     pub blobs: Vec<VectoredBlob>,
      55              : }
      56              : 
      57              : /// Description of one disk read for multiple blobs.
      58              : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
      59              : #[derive(Debug)]
      60              : pub struct VectoredRead {
      61              :     pub start: u64,
      62              :     pub end: u64,
      63              :     /// Starting offsets and metadata for each blob in this read
      64              :     pub blobs_at: VecMap<u64, BlobMeta>,
      65              : }
      66              : 
      67              : impl VectoredRead {
      68       966831 :     pub(crate) fn size(&self) -> usize {
      69       966831 :         (self.end - self.start) as usize
      70       966831 :     }
      71              : }
      72              : 
      73              : #[derive(Eq, PartialEq, Debug)]
      74              : pub(crate) enum VectoredReadExtended {
      75              :     Yes,
      76              :     No,
      77              : }
      78              : 
      79              : pub(crate) struct VectoredReadBuilder {
      80              :     start: u64,
      81              :     end: u64,
      82              :     blobs_at: VecMap<u64, BlobMeta>,
      83              :     max_read_size: Option<usize>,
      84              : }
      85              : 
      86              : impl VectoredReadBuilder {
      87              :     /// Start building a new vectored read.
      88              :     ///
      89              :     /// Note that by design, this does not check against reading more than `max_read_size` to
      90              :     /// support reading larger blobs than the configuration value. The builder will be single use
      91              :     /// however after that.
      92       172056 :     pub(crate) fn new(
      93       172056 :         start_offset: u64,
      94       172056 :         end_offset: u64,
      95       172056 :         meta: BlobMeta,
      96       172056 :         max_read_size: usize,
      97       172056 :     ) -> Self {
      98       172056 :         let mut blobs_at = VecMap::default();
      99       172056 :         blobs_at
     100       172056 :             .append(start_offset, meta)
     101       172056 :             .expect("First insertion always succeeds");
     102       172056 : 
     103       172056 :         Self {
     104       172056 :             start: start_offset,
     105       172056 :             end: end_offset,
     106       172056 :             blobs_at,
     107       172056 :             max_read_size: Some(max_read_size),
     108       172056 :         }
     109       172056 :     }
     110              :     /// Attempt to extend the current read with a new blob if the start
     111              :     /// offset matches with the current end of the vectored read
     112              :     /// and the resuting size is below the max read size
     113      2444560 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     114      2444560 :         tracing::trace!(start, end, "trying to extend");
     115      2444560 :         let size = (end - start) as usize;
     116      2444560 :         if self.end == start && {
     117      2425641 :             if let Some(max_read_size) = self.max_read_size {
     118       338773 :                 self.size() + size <= max_read_size
     119              :             } else {
     120      2086868 :                 true
     121              :             }
     122              :         } {
     123      2406275 :             self.end = end;
     124      2406275 :             self.blobs_at
     125      2406275 :                 .append(start, meta)
     126      2406275 :                 .expect("LSNs are ordered within vectored reads");
     127      2406275 : 
     128      2406275 :             return VectoredReadExtended::Yes;
     129        38285 :         }
     130        38285 : 
     131        38285 :         VectoredReadExtended::No
     132      2444560 :     }
     133              : 
     134      2465289 :     pub(crate) fn size(&self) -> usize {
     135      2465289 :         (self.end - self.start) as usize
     136      2465289 :     }
     137              : 
     138       212270 :     pub(crate) fn build(self) -> VectoredRead {
     139       212270 :         VectoredRead {
     140       212270 :             start: self.start,
     141       212270 :             end: self.end,
     142       212270 :             blobs_at: self.blobs_at,
     143       212270 :         }
     144       212270 :     }
     145              : }
     146              : 
     147              : #[derive(Copy, Clone, Debug)]
     148              : pub enum BlobFlag {
     149              :     None,
     150              :     Ignore,
     151              :     ReplaceAll,
     152              : }
     153              : 
     154              : /// Planner for vectored blob reads.
     155              : ///
     156              : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
     157              : /// and coalesced into disk reads.
     158              : ///
     159              : /// The implementation is very simple:
     160              : /// * Collect all blob offsets in an ordered structure
     161              : /// * Iterate over the collected blobs and coalesce them into reads at the end
     162              : pub struct VectoredReadPlanner {
     163              :     // Track all the blob offsets. Start offsets must be ordered.
     164              :     blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
     165              :     // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
     166              :     prev: Option<(Key, Lsn, u64, BlobFlag)>,
     167              : 
     168              :     max_read_size: usize,
     169              : }
     170              : 
     171              : impl VectoredReadPlanner {
     172       212119 :     pub fn new(max_read_size: usize) -> Self {
     173       212119 :         Self {
     174       212119 :             blobs: BTreeMap::new(),
     175       212119 :             prev: None,
     176       212119 :             max_read_size,
     177       212119 :         }
     178       212119 :     }
     179              : 
     180              :     /// Include a new blob in the read plan.
     181              :     ///
     182              :     /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
     183              :     /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
     184              :     /// keys in a given keyspace. This function must be called for each key in the desired
     185              :     /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
     186              :     /// be called after every range in the offset.
     187              :     ///
     188              :     /// In the event that keys are skipped, the behaviour is undefined and can lead to an
     189              :     /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
     190              :     /// incorrect data to the user.
     191              :     ///
     192              :     /// The `flag` argument has two interesting values:
     193              :     /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
     194              :     ///   This is used for WAL records that `will_init`.
     195              :     /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
     196              :     ///   if the blob is cached.
     197      1411758 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
     198              :         // Implementation note: internally lag behind by one blob such that
     199              :         // we have a start and end offset when initialising [`VectoredRead`]
     200      1411758 :         let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
     201              :             None => {
     202       136502 :                 self.prev = Some((key, lsn, offset, flag));
     203       136502 :                 return;
     204              :             }
     205      1275256 :             Some(prev) => prev,
     206      1275256 :         };
     207      1275256 : 
     208      1275256 :         self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     209      1275256 : 
     210      1275256 :         self.prev = Some((key, lsn, offset, flag));
     211      1411758 :     }
     212              : 
     213       276129 :     pub fn handle_range_end(&mut self, offset: u64) {
     214       276129 :         if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
     215       136502 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     216       139627 :         }
     217              : 
     218       276129 :         self.prev = None;
     219       276129 :     }
     220              : 
     221      1411758 :     fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
     222      1411758 :         match flag {
     223       337709 :             BlobFlag::None => {
     224       337709 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     225       337709 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     226       337709 :             }
     227       256520 :             BlobFlag::ReplaceAll => {
     228       256520 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     229       256520 :                 blobs_for_key.clear();
     230       256520 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     231       256520 :             }
     232       817529 :             BlobFlag::Ignore => {}
     233              :         }
     234      1411758 :     }
     235              : 
     236       212119 :     pub fn finish(self) -> Vec<VectoredRead> {
     237       212119 :         let mut current_read_builder: Option<VectoredReadBuilder> = None;
     238       212119 :         let mut reads = Vec::new();
     239              : 
     240       648340 :         for (key, blobs_for_key) in self.blobs {
     241       923540 :             for (lsn, start_offset, end_offset) in blobs_for_key {
     242       487319 :                 let extended = match &mut current_read_builder {
     243       357670 :                     Some(read_builder) => {
     244       357670 :                         read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
     245              :                     }
     246       129649 :                     None => VectoredReadExtended::No,
     247              :                 };
     248              : 
     249       487319 :                 if extended == VectoredReadExtended::No {
     250       167928 :                     let next_read_builder = VectoredReadBuilder::new(
     251       167928 :                         start_offset,
     252       167928 :                         end_offset,
     253       167928 :                         BlobMeta { key, lsn },
     254       167928 :                         self.max_read_size,
     255       167928 :                     );
     256       167928 : 
     257       167928 :                     let prev_read_builder = current_read_builder.replace(next_read_builder);
     258              : 
     259              :                     // `current_read_builder` is None in the first iteration of the outer loop
     260       167928 :                     if let Some(read_builder) = prev_read_builder {
     261        38279 :                         reads.push(read_builder.build());
     262       129649 :                     }
     263       319391 :                 }
     264              :             }
     265              :         }
     266              : 
     267       212119 :         if let Some(read_builder) = current_read_builder {
     268       129649 :             reads.push(read_builder.build());
     269       129649 :         }
     270              : 
     271       212119 :         reads
     272       212119 :     }
     273              : }
     274              : 
     275              : /// Disk reader for vectored blob spans (does not go through the page cache)
     276              : pub struct VectoredBlobReader<'a> {
     277              :     file: &'a VirtualFile,
     278              : }
     279              : 
     280              : impl<'a> VectoredBlobReader<'a> {
     281       252325 :     pub fn new(file: &'a VirtualFile) -> Self {
     282       252325 :         Self { file }
     283       252325 :     }
     284              : 
     285              :     /// Read the requested blobs into the buffer.
     286              :     ///
     287              :     /// We have to deal with the fact that blobs are not fixed size.
     288              :     /// Each blob is prefixed by a size header.
     289              :     ///
     290              :     /// The success return value is a struct which contains the buffer
     291              :     /// filled from disk and a list of offsets at which each blob lies
     292              :     /// in the buffer.
     293       212222 :     pub async fn read_blobs(
     294       212222 :         &self,
     295       212222 :         read: &VectoredRead,
     296       212222 :         buf: BytesMut,
     297       212222 :         ctx: &RequestContext,
     298       212222 :     ) -> Result<VectoredBlobsBuf, std::io::Error> {
     299       212222 :         assert!(read.size() > 0);
     300       212222 :         assert!(
     301       212222 :             read.size() <= buf.capacity(),
     302            0 :             "{} > {}",
     303            0 :             read.size(),
     304            0 :             buf.capacity()
     305              :         );
     306       212222 :         let mut buf = self
     307       212222 :             .file
     308       212222 :             .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
     309       107886 :             .await?
     310       212222 :             .into_inner();
     311       212222 : 
     312       212222 :         let blobs_at = read.blobs_at.as_slice();
     313       212222 :         let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
     314       212222 : 
     315       212222 :         let mut metas = Vec::with_capacity(blobs_at.len());
     316       212222 : 
     317       212222 :         // Blobs in `read` only provide their starting offset. The end offset
     318       212222 :         // of a blob is implicit: the start of the next blob if one exists
     319       212222 :         // or the end of the read.
     320       212222 :         let pairs = blobs_at.iter().zip(
     321       212222 :             blobs_at
     322       212222 :                 .iter()
     323       212222 :                 .map(Some)
     324       212222 :                 .skip(1)
     325       212222 :                 .chain(std::iter::once(None)),
     326       212222 :         );
     327       212222 : 
     328       212222 :         // Some scratch space, put here for reusing the allocation
     329       212222 :         let mut decompressed_vec = Vec::new();
     330              : 
     331      2830673 :         for ((offset, meta), next) in pairs {
     332      2618451 :             let offset_in_buf = offset - start_offset;
     333      2618451 :             let first_len_byte = buf[offset_in_buf as usize];
     334              : 
     335              :             // Each blob is prefixed by a header containing its size and compression information.
     336              :             // Extract the size and skip that header to find the start of the data.
     337              :             // The size can be 1 or 4 bytes. The most significant bit is 0 in the
     338              :             // 1 byte case and 1 in the 4 byte case.
     339      2618451 :             let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
     340      2580759 :                 (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
     341              :             } else {
     342        37692 :                 let mut blob_size_buf = [0u8; 4];
     343        37692 :                 let offset_in_buf = offset_in_buf as usize;
     344        37692 : 
     345        37692 :                 blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
     346        37692 :                 blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
     347        37692 : 
     348        37692 :                 let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
     349        37692 :                 (
     350        37692 :                     4,
     351        37692 :                     u32::from_be_bytes(blob_size_buf) as u64,
     352        37692 :                     compression_bits,
     353        37692 :                 )
     354              :             };
     355              : 
     356      2618451 :             let start_raw = offset_in_buf + size_length;
     357      2618451 :             let end_raw = match next {
     358      2406229 :                 Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
     359       212222 :                 None => start_raw + blob_size,
     360              :             };
     361      2618451 :             assert_eq!(end_raw - start_raw, blob_size);
     362              :             let (start, end);
     363      2618451 :             if compression_bits == BYTE_UNCOMPRESSED {
     364      2618449 :                 start = start_raw as usize;
     365      2618449 :                 end = end_raw as usize;
     366      2618449 :             } else if compression_bits == BYTE_ZSTD {
     367            2 :                 let mut decoder =
     368            2 :                     async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
     369            2 :                 decoder
     370            2 :                     .write_all(&buf[start_raw as usize..end_raw as usize])
     371            0 :                     .await?;
     372            2 :                 decoder.flush().await?;
     373            2 :                 start = buf.len();
     374            2 :                 buf.extend_from_slice(&decompressed_vec);
     375            2 :                 end = buf.len();
     376            2 :                 decompressed_vec.clear();
     377              :             } else {
     378            0 :                 let error = std::io::Error::new(
     379            0 :                     std::io::ErrorKind::InvalidData,
     380            0 :                     format!("invalid compression byte {compression_bits:x}"),
     381            0 :                 );
     382            0 :                 return Err(error);
     383              :             }
     384              : 
     385      2618451 :             metas.push(VectoredBlob {
     386      2618451 :                 start,
     387      2618451 :                 end,
     388      2618451 :                 meta: *meta,
     389      2618451 :             });
     390              :         }
     391              : 
     392       212222 :         Ok(VectoredBlobsBuf { buf, blobs: metas })
     393       212222 :     }
     394              : }
     395              : 
     396              : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
     397              : /// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
     398              : /// max_cnt constraints.
     399              : pub struct StreamingVectoredReadPlanner {
     400              :     read_builder: Option<VectoredReadBuilder>,
     401              :     // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
     402              :     prev: Option<(Key, Lsn, u64)>,
     403              :     /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
     404              :     /// we will produce a single batch instead of split them.
     405              :     max_read_size: u64,
     406              :     /// Max item count per batch
     407              :     max_cnt: usize,
     408              :     /// Size of the current batch
     409              :     cnt: usize,
     410              : }
     411              : 
     412              : impl StreamingVectoredReadPlanner {
     413          736 :     pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
     414          736 :         assert!(max_cnt > 0);
     415          736 :         assert!(max_read_size > 0);
     416          736 :         Self {
     417          736 :             read_builder: None,
     418          736 :             prev: None,
     419          736 :             max_cnt,
     420          736 :             max_read_size,
     421          736 :             cnt: 0,
     422          736 :         }
     423          736 :     }
     424              : 
     425      2127138 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
     426              :         // Implementation note: internally lag behind by one blob such that
     427              :         // we have a start and end offset when initialising [`VectoredRead`]
     428      2127138 :         let (prev_key, prev_lsn, prev_offset) = match self.prev {
     429              :             None => {
     430          622 :                 self.prev = Some((key, lsn, offset));
     431          622 :                 return None;
     432              :             }
     433      2126516 :             Some(prev) => prev,
     434      2126516 :         };
     435      2126516 : 
     436      2126516 :         let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
     437      2126516 : 
     438      2126516 :         self.prev = Some((key, lsn, offset));
     439      2126516 : 
     440      2126516 :         res
     441      2127138 :     }
     442              : 
     443          568 :     pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
     444          568 :         let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
     445          566 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
     446              :         } else {
     447            2 :             None
     448              :         };
     449              : 
     450          568 :         self.prev = None;
     451          568 : 
     452          568 :         res
     453          568 :     }
     454              : 
     455      2127082 :     fn add_blob(
     456      2127082 :         &mut self,
     457      2127082 :         key: Key,
     458      2127082 :         lsn: Lsn,
     459      2127082 :         start_offset: u64,
     460      2127082 :         end_offset: u64,
     461      2127082 :         is_last_blob_in_read: bool,
     462      2127082 :     ) -> Option<VectoredRead> {
     463      2127082 :         match &mut self.read_builder {
     464      2086868 :             Some(read_builder) => {
     465      2086868 :                 let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
     466      2086868 :                 assert_eq!(extended, VectoredReadExtended::Yes);
     467              :             }
     468        40214 :             None => {
     469        40214 :                 self.read_builder = {
     470        40214 :                     let mut blobs_at = VecMap::default();
     471        40214 :                     blobs_at
     472        40214 :                         .append(start_offset, BlobMeta { key, lsn })
     473        40214 :                         .expect("First insertion always succeeds");
     474        40214 : 
     475        40214 :                     Some(VectoredReadBuilder {
     476        40214 :                         start: start_offset,
     477        40214 :                         end: end_offset,
     478        40214 :                         blobs_at,
     479        40214 :                         max_read_size: None,
     480        40214 :                     })
     481        40214 :                 };
     482        40214 :             }
     483              :         }
     484      2127082 :         let read_builder = self.read_builder.as_mut().unwrap();
     485      2127082 :         self.cnt += 1;
     486      2127082 :         if is_last_blob_in_read
     487      2126516 :             || read_builder.size() >= self.max_read_size as usize
     488      2098452 :             || self.cnt >= self.max_cnt
     489              :         {
     490        40214 :             let prev_read_builder = self.read_builder.take();
     491        40214 :             self.cnt = 0;
     492              : 
     493              :             // `current_read_builder` is None in the first iteration
     494        40214 :             if let Some(read_builder) = prev_read_builder {
     495        40214 :                 return Some(read_builder.build());
     496            0 :             }
     497      2086868 :         }
     498      2086868 :         None
     499      2127082 :     }
     500              : }
     501              : 
     502              : #[cfg(test)]
     503              : mod tests {
     504              :     use anyhow::Error;
     505              : 
     506              :     use crate::context::DownloadBehavior;
     507              :     use crate::page_cache::PAGE_SZ;
     508              :     use crate::task_mgr::TaskKind;
     509              : 
     510              :     use super::super::blob_io::tests::{random_array, write_maybe_compressed};
     511              :     use super::*;
     512              : 
     513           42 :     fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
     514           42 :         assert_eq!(read.start, offset_range.first().unwrap().2);
     515              : 
     516           66 :         let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
     517           42 : 
     518           42 :         let offsets_in_read: Vec<_> = read
     519           42 :             .blobs_at
     520           42 :             .as_slice()
     521           42 :             .iter()
     522           66 :             .map(|(offset, _)| *offset)
     523           42 :             .collect();
     524           42 : 
     525           42 :         assert_eq!(expected_offsets_in_read, offsets_in_read);
     526           42 :     }
     527              : 
     528              :     #[test]
     529            2 :     fn planner_max_read_size_test() {
     530            2 :         let max_read_size = 128 * 1024;
     531            2 :         let key = Key::MIN;
     532            2 :         let lsn = Lsn(0);
     533            2 : 
     534            2 :         let blob_descriptions = vec![
     535            2 :             (key, lsn, 0, BlobFlag::None),
     536            2 :             (key, lsn, 32 * 1024, BlobFlag::None),
     537            2 :             (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
     538            2 :             (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
     539            2 :             (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
     540            2 :             (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
     541            2 :             (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
     542            2 :             (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
     543            2 :         ];
     544            2 : 
     545            2 :         let ranges = [
     546            2 :             &blob_descriptions[0..3],
     547            2 :             &blob_descriptions[3..4],
     548            2 :             &blob_descriptions[4..5],
     549            2 :             &blob_descriptions[5..6],
     550            2 :             &blob_descriptions[6..7],
     551            2 :             &blob_descriptions[7..],
     552            2 :         ];
     553            2 : 
     554            2 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     555           16 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     556           16 :             planner.handle(key, lsn, offset, flag);
     557           16 :         }
     558              : 
     559            2 :         planner.handle_range_end(652 * 1024);
     560            2 : 
     561            2 :         let reads = planner.finish();
     562            2 : 
     563            2 :         assert_eq!(reads.len(), 6);
     564              : 
     565              :         // TODO: could remove zero reads to produce 5 reads here
     566              : 
     567           12 :         for (idx, read) in reads.iter().enumerate() {
     568           12 :             validate_read(read, ranges[idx]);
     569           12 :         }
     570            2 :     }
     571              : 
     572              :     #[test]
     573            2 :     fn planner_replacement_test() {
     574            2 :         let max_read_size = 128 * 1024;
     575            2 :         let first_key = Key::MIN;
     576            2 :         let second_key = first_key.next();
     577            2 :         let lsn = Lsn(0);
     578            2 : 
     579            2 :         let blob_descriptions = vec![
     580            2 :             (first_key, lsn, 0, BlobFlag::None),    // First in read 1
     581            2 :             (first_key, lsn, 1024, BlobFlag::None), // Last in read 1
     582            2 :             (second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll),
     583            2 :             (second_key, lsn, 3 * 1024, BlobFlag::None),
     584            2 :             (second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2
     585            2 :             (second_key, lsn, 5 * 1024, BlobFlag::None),       // Last in read 2
     586            2 :         ];
     587            2 : 
     588            2 :         let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
     589            2 : 
     590            2 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     591           12 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     592           12 :             planner.handle(key, lsn, offset, flag);
     593           12 :         }
     594              : 
     595            2 :         planner.handle_range_end(6 * 1024);
     596            2 : 
     597            2 :         let reads = planner.finish();
     598            2 :         assert_eq!(reads.len(), 2);
     599              : 
     600            4 :         for (idx, read) in reads.iter().enumerate() {
     601            4 :             validate_read(read, ranges[idx]);
     602            4 :         }
     603            2 :     }
     604              : 
     605              :     #[test]
     606            2 :     fn streaming_planner_max_read_size_test() {
     607            2 :         let max_read_size = 128 * 1024;
     608            2 :         let key = Key::MIN;
     609            2 :         let lsn = Lsn(0);
     610            2 : 
     611            2 :         let blob_descriptions = vec![
     612            2 :             (key, lsn, 0, BlobFlag::None),
     613            2 :             (key, lsn, 32 * 1024, BlobFlag::None),
     614            2 :             (key, lsn, 96 * 1024, BlobFlag::None),
     615            2 :             (key, lsn, 128 * 1024, BlobFlag::None),
     616            2 :             (key, lsn, 198 * 1024, BlobFlag::None),
     617            2 :             (key, lsn, 268 * 1024, BlobFlag::None),
     618            2 :             (key, lsn, 396 * 1024, BlobFlag::None),
     619            2 :             (key, lsn, 652 * 1024, BlobFlag::None),
     620            2 :         ];
     621            2 : 
     622            2 :         let ranges = [
     623            2 :             &blob_descriptions[0..3],
     624            2 :             &blob_descriptions[3..5],
     625            2 :             &blob_descriptions[5..6],
     626            2 :             &blob_descriptions[6..7],
     627            2 :             &blob_descriptions[7..],
     628            2 :         ];
     629            2 : 
     630            2 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
     631            2 :         let mut reads = Vec::new();
     632           16 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     633           16 :             reads.extend(planner.handle(key, lsn, offset));
     634           16 :         }
     635            2 :         reads.extend(planner.handle_range_end(652 * 1024));
     636            2 : 
     637            2 :         assert_eq!(reads.len(), ranges.len());
     638              : 
     639           10 :         for (idx, read) in reads.iter().enumerate() {
     640           10 :             validate_read(read, ranges[idx]);
     641           10 :         }
     642            2 :     }
     643              : 
     644              :     #[test]
     645            2 :     fn streaming_planner_max_cnt_test() {
     646            2 :         let max_read_size = 1024 * 1024;
     647            2 :         let key = Key::MIN;
     648            2 :         let lsn = Lsn(0);
     649            2 : 
     650            2 :         let blob_descriptions = vec![
     651            2 :             (key, lsn, 0, BlobFlag::None),
     652            2 :             (key, lsn, 32 * 1024, BlobFlag::None),
     653            2 :             (key, lsn, 96 * 1024, BlobFlag::None),
     654            2 :             (key, lsn, 128 * 1024, BlobFlag::None),
     655            2 :             (key, lsn, 198 * 1024, BlobFlag::None),
     656            2 :             (key, lsn, 268 * 1024, BlobFlag::None),
     657            2 :             (key, lsn, 396 * 1024, BlobFlag::None),
     658            2 :             (key, lsn, 652 * 1024, BlobFlag::None),
     659            2 :         ];
     660            2 : 
     661            2 :         let ranges = [
     662            2 :             &blob_descriptions[0..2],
     663            2 :             &blob_descriptions[2..4],
     664            2 :             &blob_descriptions[4..6],
     665            2 :             &blob_descriptions[6..],
     666            2 :         ];
     667            2 : 
     668            2 :         let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     669            2 :         let mut reads = Vec::new();
     670           16 :         for (key, lsn, offset, _) in blob_descriptions.clone() {
     671           16 :             reads.extend(planner.handle(key, lsn, offset));
     672           16 :         }
     673            2 :         reads.extend(planner.handle_range_end(652 * 1024));
     674            2 : 
     675            2 :         assert_eq!(reads.len(), ranges.len());
     676              : 
     677            8 :         for (idx, read) in reads.iter().enumerate() {
     678            8 :             validate_read(read, ranges[idx]);
     679            8 :         }
     680            2 :     }
     681              : 
     682              :     #[test]
     683            2 :     fn streaming_planner_edge_test() {
     684            2 :         let max_read_size = 1024 * 1024;
     685            2 :         let key = Key::MIN;
     686            2 :         let lsn = Lsn(0);
     687            2 :         {
     688            2 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     689            2 :             let mut reads = Vec::new();
     690            2 :             reads.extend(planner.handle_range_end(652 * 1024));
     691            2 :             assert!(reads.is_empty());
     692              :         }
     693              :         {
     694            2 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     695            2 :             let mut reads = Vec::new();
     696            2 :             reads.extend(planner.handle(key, lsn, 0));
     697            2 :             reads.extend(planner.handle_range_end(652 * 1024));
     698            2 :             assert_eq!(reads.len(), 1);
     699            2 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     700            2 :         }
     701            2 :         {
     702            2 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
     703            2 :             let mut reads = Vec::new();
     704            2 :             reads.extend(planner.handle(key, lsn, 0));
     705            2 :             reads.extend(planner.handle(key, lsn, 128 * 1024));
     706            2 :             reads.extend(planner.handle_range_end(652 * 1024));
     707            2 :             assert_eq!(reads.len(), 2);
     708            2 :             validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
     709            2 :             validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
     710            2 :         }
     711            2 :         {
     712            2 :             let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
     713            2 :             let mut reads = Vec::new();
     714            2 :             reads.extend(planner.handle(key, lsn, 0));
     715            2 :             reads.extend(planner.handle(key, lsn, 128 * 1024));
     716            2 :             reads.extend(planner.handle_range_end(652 * 1024));
     717            2 :             assert_eq!(reads.len(), 1);
     718            2 :             validate_read(
     719            2 :                 &reads[0],
     720            2 :                 &[
     721            2 :                     (key, lsn, 0, BlobFlag::None),
     722            2 :                     (key, lsn, 128 * 1024, BlobFlag::None),
     723            2 :                 ],
     724            2 :             );
     725            2 :         }
     726            2 :     }
     727              : 
     728            8 :     async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
     729            8 :         let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
     730            8 :         let (_temp_dir, pathbuf, offsets) =
     731          279 :             write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
     732              : 
     733            8 :         let file = VirtualFile::open(&pathbuf, &ctx).await?;
     734            8 :         let file_len = std::fs::metadata(&pathbuf)?.len();
     735            8 : 
     736            8 :         // Multiply by two (compressed data might need more space), and add a few bytes for the header
     737         4120 :         let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
     738            8 :         let mut buf = BytesMut::with_capacity(reserved_bytes);
     739            8 : 
     740            8 :         let vectored_blob_reader = VectoredBlobReader::new(&file);
     741            8 :         let meta = BlobMeta {
     742            8 :             key: Key::MIN,
     743            8 :             lsn: Lsn(0),
     744            8 :         };
     745              : 
     746         4120 :         for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
     747         4120 :             let end = offsets.get(idx + 1).unwrap_or(&file_len);
     748         4120 :             if idx + 1 == offsets.len() {
     749            8 :                 continue;
     750         4112 :             }
     751         4112 :             let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
     752         4112 :             let read = read_builder.build();
     753         4112 :             let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
     754         4112 :             assert_eq!(result.blobs.len(), 1);
     755         4112 :             let read_blob = &result.blobs[0];
     756         4112 :             let read_buf = &result.buf[read_blob.start..read_blob.end];
     757         4112 :             assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
     758         4112 :             buf = result.buf;
     759              :         }
     760            8 :         Ok(())
     761            8 :     }
     762              : 
     763              :     #[tokio::test]
     764            2 :     async fn test_really_big_array() -> Result<(), Error> {
     765            2 :         let blobs = &[
     766            2 :             b"test".to_vec(),
     767            2 :             random_array(10 * PAGE_SZ),
     768            2 :             b"hello".to_vec(),
     769            2 :             random_array(66 * PAGE_SZ),
     770            2 :             vec![0xf3; 24 * PAGE_SZ],
     771            2 :             b"foobar".to_vec(),
     772            2 :         ];
     773           14 :         round_trip_test_compressed(blobs, false).await?;
     774           11 :         round_trip_test_compressed(blobs, true).await?;
     775            2 :         Ok(())
     776            2 :     }
     777              : 
     778              :     #[tokio::test]
     779            2 :     async fn test_arrays_inc() -> Result<(), Error> {
     780            2 :         let blobs = (0..PAGE_SZ / 8)
     781         2048 :             .map(|v| random_array(v * 16))
     782            2 :             .collect::<Vec<_>>();
     783         1172 :         round_trip_test_compressed(&blobs, false).await?;
     784         1172 :         round_trip_test_compressed(&blobs, true).await?;
     785            2 :         Ok(())
     786            2 :     }
     787              : }
        

Generated by: LCOV version 2.1-beta