LCOV - code coverage report
Current view: top level - pageserver/src/tenant - vectored_blob_io.rs (source / functions) Coverage Total Hit
Test: 12c2fc96834f59604b8ade5b9add28f1dce41ec6.info Lines: 99.0 % 311 308
Test Date: 2024-07-03 15:33:13 Functions: 100.0 % 23 23

            Line data    Source code
       1              : //!
       2              : //! Utilities for vectored reading of variable-sized "blobs".
       3              : //!
       4              : //! The "blob" api is an abstraction on top of the "block" api,
       5              : //! with the main difference being that blobs do not have a fixed
       6              : //! size (each blob is prefixed with 1 or 4 byte length field)
       7              : //!
       8              : //! The vectored apis provided in this module allow for planning
       9              : //! and executing disk IO which covers multiple blobs.
      10              : //!
      11              : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
      12              : //! adjacent blocks into a single disk IO request and exectuted by
      13              : //! [`VectoredBlobReader`] which does all the required offset juggling
      14              : //! and returns a buffer housing all the blobs and a list of offsets.
      15              : //!
      16              : //! Note that the vectored blob api does *not* go through the page cache.
      17              : 
      18              : use std::collections::BTreeMap;
      19              : use std::num::NonZeroUsize;
      20              : 
      21              : use bytes::BytesMut;
      22              : use pageserver_api::key::Key;
      23              : use tokio_epoll_uring::BoundedBuf;
      24              : use utils::lsn::Lsn;
      25              : use utils::vec_map::VecMap;
      26              : 
      27              : use crate::context::RequestContext;
      28              : use crate::virtual_file::VirtualFile;
      29              : 
      30              : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
      31              : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
      32              : 
      33              : /// Metadata bundled with the start and end offset of a blob.
      34              : #[derive(Copy, Clone, Debug)]
      35              : pub struct BlobMeta {
      36              :     pub key: Key,
      37              :     pub lsn: Lsn,
      38              : }
      39              : 
      40              : /// Blob offsets into [`VectoredBlobsBuf::buf`]
      41              : pub struct VectoredBlob {
      42              :     pub start: usize,
      43              :     pub end: usize,
      44              :     pub meta: BlobMeta,
      45              : }
      46              : 
      47              : /// Return type of [`VectoredBlobReader::read_blobs`]
      48              : pub struct VectoredBlobsBuf {
      49              :     /// Buffer for all blobs in this read
      50              :     pub buf: BytesMut,
      51              :     /// Offsets into the buffer and metadata for all blobs in this read
      52              :     pub blobs: Vec<VectoredBlob>,
      53              : }
      54              : 
      55              : /// Description of one disk read for multiple blobs.
      56              : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
      57              : #[derive(Debug)]
      58              : pub struct VectoredRead {
      59              :     pub start: u64,
      60              :     pub end: u64,
      61              :     /// Starting offsets and metadata for each blob in this read
      62              :     pub blobs_at: VecMap<u64, BlobMeta>,
      63              : }
      64              : 
      65              : impl VectoredRead {
      66       306976 :     pub(crate) fn size(&self) -> usize {
      67       306976 :         (self.end - self.start) as usize
      68       306976 :     }
      69              : }
      70              : 
      71              : #[derive(Eq, PartialEq)]
      72              : pub(crate) enum VectoredReadExtended {
      73              :     Yes,
      74              :     No,
      75              : }
      76              : 
      77              : pub(crate) struct VectoredReadBuilder {
      78              :     start: u64,
      79              :     end: u64,
      80              :     blobs_at: VecMap<u64, BlobMeta>,
      81              :     max_read_size: Option<usize>,
      82              : }
      83              : 
      84              : impl VectoredReadBuilder {
      85              :     /// Start building a new vectored read.
      86              :     ///
      87              :     /// Note that by design, this does not check against reading more than `max_read_size` to
      88              :     /// support reading larger blobs than the configuration value. The builder will be single use
      89              :     /// however after that.
      90        76681 :     pub(crate) fn new(
      91        76681 :         start_offset: u64,
      92        76681 :         end_offset: u64,
      93        76681 :         meta: BlobMeta,
      94        76681 :         max_read_size: Option<usize>,
      95        76681 :     ) -> Self {
      96        76681 :         let mut blobs_at = VecMap::default();
      97        76681 :         blobs_at
      98        76681 :             .append(start_offset, meta)
      99        76681 :             .expect("First insertion always succeeds");
     100        76681 : 
     101        76681 :         Self {
     102        76681 :             start: start_offset,
     103        76681 :             end: end_offset,
     104        76681 :             blobs_at,
     105        76681 :             max_read_size,
     106        76681 :         }
     107        76681 :     }
     108              : 
     109              :     /// Attempt to extend the current read with a new blob if the start
     110              :     /// offset matches with the current end of the vectored read
     111              :     /// and the resuting size is below the max read size
     112       376262 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     113       376262 :         tracing::trace!(start, end, "trying to extend");
     114       376262 :         let size = (end - start) as usize;
     115       376262 :         if self.end == start && {
     116       357247 :             if let Some(max_read_size) = self.max_read_size {
     117       338591 :                 self.size() + size <= max_read_size
     118              :             } else {
     119        18656 :                 true
     120              :             }
     121              :         } {
     122       337881 :             self.end = end;
     123       337881 :             self.blobs_at
     124       337881 :                 .append(start, meta)
     125       337881 :                 .expect("LSNs are ordered within vectored reads");
     126       337881 : 
     127       337881 :             return VectoredReadExtended::Yes;
     128        38381 :         }
     129        38381 : 
     130        38381 :         VectoredReadExtended::No
     131       376262 :     }
     132              : 
     133       338591 :     pub(crate) fn size(&self) -> usize {
     134       338591 :         (self.end - self.start) as usize
     135       338591 :     }
     136              : 
     137        76681 :     pub(crate) fn build(self) -> VectoredRead {
     138        76681 :         VectoredRead {
     139        76681 :             start: self.start,
     140        76681 :             end: self.end,
     141        76681 :             blobs_at: self.blobs_at,
     142        76681 :         }
     143        76681 :     }
     144              : }
     145              : 
     146              : #[derive(Copy, Clone, Debug)]
     147              : pub enum BlobFlag {
     148              :     None,
     149              :     Ignore,
     150              :     ReplaceAll,
     151              : }
     152              : 
     153              : /// Planner for vectored blob reads.
     154              : ///
     155              : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
     156              : /// and coalesced into disk reads.
     157              : ///
     158              : /// The implementation is very simple:
     159              : /// * Collect all blob offsets in an ordered structure
     160              : /// * Iterate over the collected blobs and coalesce them into reads at the end
     161              : pub struct VectoredReadPlanner {
     162              :     // Track all the blob offsets. Start offsets must be ordered.
     163              :     blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
     164              :     // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
     165              :     prev: Option<(Key, Lsn, u64, BlobFlag)>,
     166              : 
     167              :     max_read_size: Option<usize>,
     168              : }
     169              : 
     170              : impl VectoredReadPlanner {
     171          442 :     pub fn new(max_read_size: usize) -> Self {
     172          442 :         Self {
     173          442 :             blobs: BTreeMap::new(),
     174          442 :             prev: None,
     175          442 :             max_read_size: Some(max_read_size),
     176          442 :         }
     177          442 :     }
     178              : 
     179              :     /// This function should *only* be used if the caller has a way to control the limit. e.g., in [`StreamingVectoredReadPlanner`],
     180              :     /// it uses the vectored read planner to avoid duplicated logic on handling blob start/end, while expecting the vectored
     181              :     /// read planner to give a single read to a continuous range of bytes in the image layer. Therefore, it does not need the
     182              :     /// code path to split reads into chunks of `max_read_size`, and controls the read size itself.
     183              :     #[cfg(test)]
     184        38108 :     pub(crate) fn new_caller_controlled_max_limit() -> Self {
     185        38108 :         Self {
     186        38108 :             blobs: BTreeMap::new(),
     187        38108 :             prev: None,
     188        38108 :             max_read_size: None,
     189        38108 :         }
     190        38108 :     }
     191              : 
     192              :     /// Include a new blob in the read plan.
     193              :     ///
     194              :     /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
     195              :     /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
     196              :     /// keys in a given keyspace. This function must be called for each key in the desired
     197              :     /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
     198              :     /// be called after every range in the offset.
     199              :     ///
     200              :     /// In the event that keys are skipped, the behaviour is undefined and can lead to an
     201              :     /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
     202              :     /// incorrect data to the user.
     203              :     ///
     204              :     /// The `flag` argument has two interesting values:
     205              :     /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
     206              :     /// This is used for WAL records that `will_init`.
     207              :     /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
     208              :     /// if the blob is cached.
     209      1263040 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
     210              :         // Implementation note: internally lag behind by one blob such that
     211              :         // we have a start and end offset when initialising [`VectoredRead`]
     212      1263040 :         let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
     213              :             None => {
     214        45262 :                 self.prev = Some((key, lsn, offset, flag));
     215        45262 :                 return;
     216              :             }
     217      1217778 :             Some(prev) => prev,
     218      1217778 :         };
     219      1217778 : 
     220      1217778 :         self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     221      1217778 : 
     222      1217778 :         self.prev = Some((key, lsn, offset, flag));
     223      1263040 :     }
     224              : 
     225       102173 :     pub fn handle_range_end(&mut self, offset: u64) {
     226       102173 :         if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
     227        45206 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     228        56967 :         }
     229              : 
     230       102173 :         self.prev = None;
     231       102173 :     }
     232              : 
     233      1262984 :     fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
     234      1262984 :         match flag {
     235       386446 :             BlobFlag::None => {
     236       386446 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     237       386446 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     238       386446 :             }
     239        58864 :             BlobFlag::ReplaceAll => {
     240        58864 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     241        58864 :                 blobs_for_key.clear();
     242        58864 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     243        58864 :             }
     244       817674 :             BlobFlag::Ignore => {}
     245              :         }
     246      1262984 :     }
     247              : 
     248        38326 :     pub fn finish(self) -> Vec<VectoredRead> {
     249        38326 :         let mut current_read_builder: Option<VectoredReadBuilder> = None;
     250        38326 :         let mut reads = Vec::new();
     251              : 
     252       393240 :         for (key, blobs_for_key) in self.blobs {
     253       769444 :             for (lsn, start_offset, end_offset) in blobs_for_key {
     254       414530 :                 let extended = match &mut current_read_builder {
     255       376240 :                     Some(read_builder) => {
     256       376240 :                         read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
     257              :                     }
     258        38290 :                     None => VectoredReadExtended::No,
     259              :                 };
     260              : 
     261       414530 :                 if extended == VectoredReadExtended::No {
     262        76665 :                     let next_read_builder = VectoredReadBuilder::new(
     263        76665 :                         start_offset,
     264        76665 :                         end_offset,
     265        76665 :                         BlobMeta { key, lsn },
     266        76665 :                         self.max_read_size,
     267        76665 :                     );
     268        76665 : 
     269        76665 :                     let prev_read_builder = current_read_builder.replace(next_read_builder);
     270              : 
     271              :                     // `current_read_builder` is None in the first iteration of the outer loop
     272        76665 :                     if let Some(read_builder) = prev_read_builder {
     273        38375 :                         reads.push(read_builder.build());
     274        38375 :                     }
     275       337865 :                 }
     276              :             }
     277              :         }
     278              : 
     279        38326 :         if let Some(read_builder) = current_read_builder {
     280        38290 :             reads.push(read_builder.build());
     281        38290 :         }
     282              : 
     283        38326 :         reads
     284        38326 :     }
     285              : }
     286              : 
     287              : /// Disk reader for vectored blob spans (does not go through the page cache)
     288              : pub struct VectoredBlobReader<'a> {
     289              :     file: &'a VirtualFile,
     290              : }
     291              : 
     292              : impl<'a> VectoredBlobReader<'a> {
     293        38336 :     pub fn new(file: &'a VirtualFile) -> Self {
     294        38336 :         Self { file }
     295        38336 :     }
     296              : 
     297              :     /// Read the requested blobs into the buffer.
     298              :     ///
     299              :     /// We have to deal with the fact that blobs are not fixed size.
     300              :     /// Each blob is prefixed by a size header.
     301              :     ///
     302              :     /// The success return value is a struct which contains the buffer
     303              :     /// filled from disk and a list of offsets at which each blob lies
     304              :     /// in the buffer.
     305        76659 :     pub async fn read_blobs(
     306        76659 :         &self,
     307        76659 :         read: &VectoredRead,
     308        76659 :         buf: BytesMut,
     309        76659 :         ctx: &RequestContext,
     310        76659 :     ) -> Result<VectoredBlobsBuf, std::io::Error> {
     311        76659 :         assert!(read.size() > 0);
     312        76659 :         assert!(
     313        76659 :             read.size() <= buf.capacity(),
     314            0 :             "{} > {}",
     315            0 :             read.size(),
     316            0 :             buf.capacity()
     317              :         );
     318        76659 :         let buf = self
     319        76659 :             .file
     320        76659 :             .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
     321        38930 :             .await?
     322        76659 :             .into_inner();
     323        76659 : 
     324        76659 :         let blobs_at = read.blobs_at.as_slice();
     325        76659 :         let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
     326        76659 : 
     327        76659 :         let mut metas = Vec::with_capacity(blobs_at.len());
     328        76659 : 
     329        76659 :         // Blobs in `read` only provide their starting offset. The end offset
     330        76659 :         // of a blob is implicit: the start of the next blob if one exists
     331        76659 :         // or the end of the read.
     332        76659 :         let pairs = blobs_at.iter().zip(
     333        76659 :             blobs_at
     334        76659 :                 .iter()
     335        76659 :                 .map(Some)
     336        76659 :                 .skip(1)
     337        76659 :                 .chain(std::iter::once(None)),
     338        76659 :         );
     339              : 
     340       491169 :         for ((offset, meta), next) in pairs {
     341       414510 :             let offset_in_buf = offset - start_offset;
     342       414510 :             let first_len_byte = buf[offset_in_buf as usize];
     343              : 
     344              :             // Each blob is prefixed by a header containing it's size.
     345              :             // Extract the size and skip that header to find the start of the data.
     346              :             // The size can be 1 or 4 bytes. The most significant bit is 0 in the
     347              :             // 1 byte case and 1 in the 4 byte case.
     348       414510 :             let (size_length, blob_size) = if first_len_byte < 0x80 {
     349       380892 :                 (1, first_len_byte as u64)
     350              :             } else {
     351        33618 :                 let mut blob_size_buf = [0u8; 4];
     352        33618 :                 let offset_in_buf = offset_in_buf as usize;
     353        33618 : 
     354        33618 :                 blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
     355        33618 :                 blob_size_buf[0] &= 0x7f;
     356        33618 :                 (4, u32::from_be_bytes(blob_size_buf) as u64)
     357              :             };
     358              : 
     359       414510 :             let start = offset_in_buf + size_length;
     360       414510 :             let end = match next {
     361       337851 :                 Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
     362        76659 :                 None => start + blob_size,
     363              :             };
     364              : 
     365       414510 :             assert_eq!(end - start, blob_size);
     366              : 
     367       414510 :             metas.push(VectoredBlob {
     368       414510 :                 start: start as usize,
     369       414510 :                 end: end as usize,
     370       414510 :                 meta: *meta,
     371       414510 :             })
     372              :         }
     373              : 
     374        76659 :         Ok(VectoredBlobsBuf { buf, blobs: metas })
     375        76659 :     }
     376              : }
     377              : 
     378              : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
     379              : /// getting read blobs. It returns a batch when `handle` gets called and when the current key would exceed the read_size and
     380              : /// max_cnt constraints. Underlying it uses [`VectoredReadPlanner`].
     381              : #[cfg(test)]
     382              : pub struct StreamingVectoredReadPlanner {
     383              :     planner: VectoredReadPlanner,
     384              :     /// Max read size per batch
     385              :     max_read_size: u64,
     386              :     /// Max item count per batch
     387              :     max_cnt: usize,
     388              :     /// The first offset of this batch
     389              :     this_batch_first_offset: Option<u64>,
     390              :     /// Size of the current batch
     391              :     cnt: usize,
     392              : }
     393              : 
     394              : #[cfg(test)]
     395              : impl StreamingVectoredReadPlanner {
     396          224 :     pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
     397          224 :         assert!(max_cnt > 0);
     398          224 :         assert!(max_read_size > 0);
     399          224 :         Self {
     400          224 :             // We want to have exactly one read syscall (plus several others for index lookup) for each `next_batch` call.
     401          224 :             // Therefore, we enforce `self.max_read_size` by ourselves instead of using the VectoredReadPlanner's capability,
     402          224 :             // to avoid splitting into two I/Os.
     403          224 :             planner: VectoredReadPlanner::new_caller_controlled_max_limit(),
     404          224 :             max_cnt,
     405          224 :             max_read_size,
     406          224 :             this_batch_first_offset: None,
     407          224 :             cnt: 0,
     408          224 :         }
     409          224 :     }
     410              : 
     411        37884 :     fn emit(&mut self, this_batch_first_offset: u64) -> VectoredRead {
     412        37884 :         let planner = std::mem::replace(
     413        37884 :             &mut self.planner,
     414        37884 :             VectoredReadPlanner::new_caller_controlled_max_limit(),
     415        37884 :         );
     416        37884 :         self.this_batch_first_offset = Some(this_batch_first_offset);
     417        37884 :         self.cnt = 1;
     418        37884 :         let mut batch = planner.finish();
     419        37884 :         assert_eq!(batch.len(), 1, "should have exactly one read batch");
     420        37884 :         batch.pop().unwrap()
     421        37884 :     }
     422              : 
     423        56596 :     pub fn handle(
     424        56596 :         &mut self,
     425        56596 :         key: Key,
     426        56596 :         lsn: Lsn,
     427        56596 :         offset: u64,
     428        56596 :         flag: BlobFlag,
     429        56596 :     ) -> Option<VectoredRead> {
     430        56596 :         if let Some(begin_offset) = self.this_batch_first_offset {
     431              :             // Each batch will have at least one item b/c `self.this_batch_first_offset` is set
     432              :             // after one item gets processed
     433        56484 :             if offset - begin_offset > self.max_read_size {
     434        28056 :                 self.planner.handle_range_end(offset); // End the current batch with the offset
     435        28056 :                 let batch = self.emit(offset); // Produce a batch
     436        28056 :                 self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch
     437        28056 :                 return Some(batch);
     438        28428 :             }
     439              :         } else {
     440          112 :             self.this_batch_first_offset = Some(offset)
     441              :         }
     442        28540 :         if self.cnt >= self.max_cnt {
     443         9772 :             self.planner.handle_range_end(offset); // End the current batch with the offset
     444         9772 :             let batch = self.emit(offset); // Produce a batch
     445         9772 :             self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch
     446         9772 :             return Some(batch);
     447        18768 :         }
     448        18768 :         self.planner.handle(key, lsn, offset, flag); // Add this key to the current batch
     449        18768 :         self.cnt += 1;
     450        18768 :         None
     451        56596 :     }
     452              : 
     453           56 :     pub fn handle_range_end(&mut self, offset: u64) -> VectoredRead {
     454           56 :         self.planner.handle_range_end(offset);
     455           56 :         self.emit(offset)
     456           56 :     }
     457              : }
     458              : 
     459              : #[cfg(test)]
     460              : mod tests {
     461              :     use super::*;
     462              : 
     463           16 :     fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
     464           16 :         assert_eq!(read.start, offset_range.first().unwrap().2);
     465              : 
     466           24 :         let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
     467           16 : 
     468           16 :         let offsets_in_read: Vec<_> = read
     469           16 :             .blobs_at
     470           16 :             .as_slice()
     471           16 :             .iter()
     472           24 :             .map(|(offset, _)| *offset)
     473           16 :             .collect();
     474           16 : 
     475           16 :         assert_eq!(expected_offsets_in_read, offsets_in_read);
     476           16 :     }
     477              : 
     478              :     #[test]
     479            2 :     fn planner_max_read_size_test() {
     480            2 :         let max_read_size = 128 * 1024;
     481            2 :         let key = Key::MIN;
     482            2 :         let lsn = Lsn(0);
     483            2 : 
     484            2 :         let blob_descriptions = vec![
     485            2 :             (key, lsn, 0, BlobFlag::None),
     486            2 :             (key, lsn, 32 * 1024, BlobFlag::None),
     487            2 :             (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
     488            2 :             (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
     489            2 :             (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
     490            2 :             (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
     491            2 :             (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
     492            2 :             (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
     493            2 :         ];
     494            2 : 
     495            2 :         let ranges = [
     496            2 :             &blob_descriptions[0..3],
     497            2 :             &blob_descriptions[3..4],
     498            2 :             &blob_descriptions[4..5],
     499            2 :             &blob_descriptions[5..6],
     500            2 :             &blob_descriptions[6..7],
     501            2 :             &blob_descriptions[7..],
     502            2 :         ];
     503            2 : 
     504            2 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     505           16 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     506           16 :             planner.handle(key, lsn, offset, flag);
     507           16 :         }
     508              : 
     509            2 :         planner.handle_range_end(652 * 1024);
     510            2 : 
     511            2 :         let reads = planner.finish();
     512            2 :         assert_eq!(reads.len(), 6);
     513              : 
     514           12 :         for (idx, read) in reads.iter().enumerate() {
     515           12 :             validate_read(read, ranges[idx]);
     516           12 :         }
     517            2 :     }
     518              : 
     519              :     #[test]
     520            2 :     fn planner_replacement_test() {
     521            2 :         let max_read_size = 128 * 1024;
     522            2 :         let first_key = Key::MIN;
     523            2 :         let second_key = first_key.next();
     524            2 :         let lsn = Lsn(0);
     525            2 : 
     526            2 :         let blob_descriptions = vec![
     527            2 :             (first_key, lsn, 0, BlobFlag::None),    // First in read 1
     528            2 :             (first_key, lsn, 1024, BlobFlag::None), // Last in read 1
     529            2 :             (second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll),
     530            2 :             (second_key, lsn, 3 * 1024, BlobFlag::None),
     531            2 :             (second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2
     532            2 :             (second_key, lsn, 5 * 1024, BlobFlag::None),       // Last in read 2
     533            2 :         ];
     534            2 : 
     535            2 :         let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
     536            2 : 
     537            2 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     538           12 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     539           12 :             planner.handle(key, lsn, offset, flag);
     540           12 :         }
     541              : 
     542            2 :         planner.handle_range_end(6 * 1024);
     543            2 : 
     544            2 :         let reads = planner.finish();
     545            2 :         assert_eq!(reads.len(), 2);
     546              : 
     547            4 :         for (idx, read) in reads.iter().enumerate() {
     548            4 :             validate_read(read, ranges[idx]);
     549            4 :         }
     550            2 :     }
     551              : }
        

Generated by: LCOV version 2.1-beta