LCOV - code coverage report
Current view: top level - pageserver/src/tenant - vectored_blob_io.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 98.8 % 244 241
Test Date: 2024-05-10 13:18:37 Functions: 100.0 % 18 18

            Line data    Source code
       1              : //!
       2              : //! Utilities for vectored reading of variable-sized "blobs".
       3              : //!
       4              : //! The "blob" api is an abstraction on top of the "block" api,
       5              : //! with the main difference being that blobs do not have a fixed
       6              : //! size (each blob is prefixed with 1 or 4 byte length field)
       7              : //!
       8              : //! The vectored apis provided in this module allow for planning
       9              : //! and executing disk IO which covers multiple blobs.
      10              : //!
      11              : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
      12              : //! adjacent blocks into a single disk IO request and exectuted by
      13              : //! [`VectoredBlobReader`] which does all the required offset juggling
      14              : //! and returns a buffer housing all the blobs and a list of offsets.
      15              : //!
      16              : //! Note that the vectored blob api does *not* go through the page cache.
      17              : 
      18              : use std::collections::BTreeMap;
      19              : use std::num::NonZeroUsize;
      20              : 
      21              : use bytes::BytesMut;
      22              : use pageserver_api::key::Key;
      23              : use utils::lsn::Lsn;
      24              : use utils::vec_map::VecMap;
      25              : 
      26              : use crate::virtual_file::VirtualFile;
      27              : 
      28              : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
      29              : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
      30              : 
      31              : /// Metadata bundled with the start and end offset of a blob.
      32              : #[derive(Copy, Clone, Debug)]
      33              : pub struct BlobMeta {
      34              :     pub key: Key,
      35              :     pub lsn: Lsn,
      36              : }
      37              : 
      38              : /// Blob offsets into [`VectoredBlobsBuf::buf`]
      39              : pub struct VectoredBlob {
      40              :     pub start: usize,
      41              :     pub end: usize,
      42              :     pub meta: BlobMeta,
      43              : }
      44              : 
      45              : /// Return type of [`VectoredBlobReader::read_blobs`]
      46              : pub struct VectoredBlobsBuf {
      47              :     /// Buffer for all blobs in this read
      48              :     pub buf: BytesMut,
      49              :     /// Offsets into the buffer and metadata for all blobs in this read
      50              :     pub blobs: Vec<VectoredBlob>,
      51              : }
      52              : 
      53              : /// Description of one disk read for multiple blobs.
      54              : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
      55              : #[derive(Debug)]
      56              : pub struct VectoredRead {
      57              :     pub start: u64,
      58              :     pub end: u64,
      59              :     /// Starting offsets and metadata for each blob in this read
      60              :     pub blobs_at: VecMap<u64, BlobMeta>,
      61              : }
      62              : 
      63              : impl VectoredRead {
      64       121109 :     pub(crate) fn size(&self) -> usize {
      65       121109 :         (self.end - self.start) as usize
      66       121109 :     }
      67              : }
      68              : 
      69              : #[derive(Eq, PartialEq)]
      70              : pub(crate) enum VectoredReadExtended {
      71              :     Yes,
      72              :     No,
      73              : }
      74              : 
      75              : pub(crate) struct VectoredReadBuilder {
      76              :     start: u64,
      77              :     end: u64,
      78              :     blobs_at: VecMap<u64, BlobMeta>,
      79              :     max_read_size: usize,
      80              : }
      81              : 
      82              : impl VectoredReadBuilder {
      83              :     /// Start building a new vectored read.
      84              :     ///
      85              :     /// Note that by design, this does not check against reading more than `max_read_size` to
      86              :     /// support reading larger blobs than the configuration value. The builder will be single use
      87              :     /// however after that.
      88        30218 :     pub(crate) fn new(
      89        30218 :         start_offset: u64,
      90        30218 :         end_offset: u64,
      91        30218 :         meta: BlobMeta,
      92        30218 :         max_read_size: usize,
      93        30218 :     ) -> Self {
      94        30218 :         let mut blobs_at = VecMap::default();
      95        30218 :         blobs_at
      96        30218 :             .append(start_offset, meta)
      97        30218 :             .expect("First insertion always succeeds");
      98        30218 : 
      99        30218 :         Self {
     100        30218 :             start: start_offset,
     101        30218 :             end: end_offset,
     102        30218 :             blobs_at,
     103        30218 :             max_read_size,
     104        30218 :         }
     105        30218 :     }
     106              : 
     107              :     /// Attempt to extend the current read with a new blob if the start
     108              :     /// offset matches with the current end of the vectored read
     109              :     /// and the resuting size is below the max read size
     110        75435 :     pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
     111        75435 :         tracing::trace!(start, end, "trying to extend");
     112        75435 :         let size = (end - start) as usize;
     113        75435 :         if self.end == start && self.size() + size <= self.max_read_size {
     114        45558 :             self.end = end;
     115        45558 :             self.blobs_at
     116        45558 :                 .append(start, meta)
     117        45558 :                 .expect("LSNs are ordered within vectored reads");
     118        45558 : 
     119        45558 :             return VectoredReadExtended::Yes;
     120        29877 :         }
     121        29877 : 
     122        29877 :         VectoredReadExtended::No
     123        75435 :     }
     124              : 
     125        64916 :     pub(crate) fn size(&self) -> usize {
     126        64916 :         (self.end - self.start) as usize
     127        64916 :     }
     128              : 
     129        30218 :     pub(crate) fn build(self) -> VectoredRead {
     130        30218 :         VectoredRead {
     131        30218 :             start: self.start,
     132        30218 :             end: self.end,
     133        30218 :             blobs_at: self.blobs_at,
     134        30218 :         }
     135        30218 :     }
     136              : }
     137              : 
     138              : #[derive(Copy, Clone, Debug)]
     139              : pub enum BlobFlag {
     140              :     None,
     141              :     Ignore,
     142              :     ReplaceAll,
     143              : }
     144              : 
     145              : /// Planner for vectored blob reads.
     146              : ///
     147              : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
     148              : /// and coalesced into disk reads.
     149              : ///
     150              : /// The implementation is very simple:
     151              : /// * Collect all blob offsets in an ordered structure
     152              : /// * Iterate over the collected blobs and coalesce them into reads at the end
     153              : pub struct VectoredReadPlanner {
     154              :     // Track all the blob offsets. Start offsets must be ordered.
     155              :     blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
     156              :     // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
     157              :     prev: Option<(Key, Lsn, u64, BlobFlag)>,
     158              : 
     159              :     max_read_size: usize,
     160              : }
     161              : 
     162              : impl VectoredReadPlanner {
     163          356 :     pub fn new(max_read_size: usize) -> Self {
     164          356 :         Self {
     165          356 :             blobs: BTreeMap::new(),
     166          356 :             prev: None,
     167          356 :             max_read_size,
     168          356 :         }
     169          356 :     }
     170              : 
     171              :     /// Include a new blob in the read plan.
     172              :     ///
     173              :     /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
     174              :     /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
     175              :     /// keys in a given keyspace. This function must be called for each key in the desired
     176              :     /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
     177              :     /// be called after every range in the offset.
     178              :     ///
     179              :     /// In the event that keys are skipped, the behaviour is undefined and can lead to an
     180              :     /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
     181              :     /// incorrect data to the user.
     182              :     ///
     183              :     /// The `flag` argument has two interesting values:
     184              :     /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
     185              :     /// This is used for WAL records that `will_init`.
     186              :     /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
     187              :     /// if the blob is cached.
     188        87586 :     pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
     189              :         // Implementation note: internally lag behind by one blob such that
     190              :         // we have a start and end offset when initialising [`VectoredRead`]
     191        87586 :         let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
     192              :             None => {
     193         5102 :                 self.prev = Some((key, lsn, offset, flag));
     194         5102 :                 return;
     195              :             }
     196        82484 :             Some(prev) => prev,
     197        82484 :         };
     198        82484 : 
     199        82484 :         self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     200        82484 : 
     201        82484 :         self.prev = Some((key, lsn, offset, flag));
     202        87586 :     }
     203              : 
     204        63932 :     pub fn handle_range_end(&mut self, offset: u64) {
     205        63932 :         if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
     206         5102 :             self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
     207        58830 :         }
     208              : 
     209        63932 :         self.prev = None;
     210        63932 :     }
     211              : 
     212        87586 :     fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
     213        87586 :         match flag {
     214        57356 :             BlobFlag::None => {
     215        57356 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     216        57356 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     217        57356 :             }
     218        30228 :             BlobFlag::ReplaceAll => {
     219        30228 :                 let blobs_for_key = self.blobs.entry(key).or_default();
     220        30228 :                 blobs_for_key.clear();
     221        30228 :                 blobs_for_key.push((lsn, start_offset, end_offset));
     222        30228 :             }
     223            2 :             BlobFlag::Ignore => {}
     224              :         }
     225        87586 :     }
     226              : 
     227          356 :     pub fn finish(self) -> Vec<VectoredRead> {
     228          356 :         let mut current_read_builder: Option<VectoredReadBuilder> = None;
     229          356 :         let mut reads = Vec::new();
     230              : 
     231        25060 :         for (key, blobs_for_key) in self.blobs {
     232       100448 :             for (lsn, start_offset, end_offset) in blobs_for_key {
     233        75744 :                 let extended = match &mut current_read_builder {
     234        75413 :                     Some(read_builder) => {
     235        75413 :                         read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
     236              :                     }
     237          331 :                     None => VectoredReadExtended::No,
     238              :                 };
     239              : 
     240        75744 :                 if extended == VectoredReadExtended::No {
     241        30202 :                     let next_read_builder = VectoredReadBuilder::new(
     242        30202 :                         start_offset,
     243        30202 :                         end_offset,
     244        30202 :                         BlobMeta { key, lsn },
     245        30202 :                         self.max_read_size,
     246        30202 :                     );
     247        30202 : 
     248        30202 :                     let prev_read_builder = current_read_builder.replace(next_read_builder);
     249              : 
     250              :                     // `current_read_builder` is None in the first iteration of the outer loop
     251        30202 :                     if let Some(read_builder) = prev_read_builder {
     252        29871 :                         reads.push(read_builder.build());
     253        29871 :                     }
     254        45542 :                 }
     255              :             }
     256              :         }
     257              : 
     258          356 :         if let Some(read_builder) = current_read_builder {
     259          331 :             reads.push(read_builder.build());
     260          331 :         }
     261              : 
     262          356 :         reads
     263          356 :     }
     264              : }
     265              : 
     266              : /// Disk reader for vectored blob spans (does not go through the page cache)
     267              : pub struct VectoredBlobReader<'a> {
     268              :     file: &'a VirtualFile,
     269              : }
     270              : 
     271              : impl<'a> VectoredBlobReader<'a> {
     272          366 :     pub fn new(file: &'a VirtualFile) -> Self {
     273          366 :         Self { file }
     274          366 :     }
     275              : 
     276              :     /// Read the requested blobs into the buffer.
     277              :     ///
     278              :     /// We have to deal with the fact that blobs are not fixed size.
     279              :     /// Each blob is prefixed by a size header.
     280              :     ///
     281              :     /// The success return value is a struct which contains the buffer
     282              :     /// filled from disk and a list of offsets at which each blob lies
     283              :     /// in the buffer.
     284        30196 :     pub async fn read_blobs(
     285        30196 :         &self,
     286        30196 :         read: &VectoredRead,
     287        30196 :         buf: BytesMut,
     288        30196 :     ) -> Result<VectoredBlobsBuf, std::io::Error> {
     289        30196 :         assert!(read.size() > 0);
     290        30196 :         assert!(
     291        30196 :             read.size() <= buf.capacity(),
     292            0 :             "{} > {}",
     293            0 :             read.size(),
     294            0 :             buf.capacity()
     295              :         );
     296        30196 :         let buf = self
     297        30196 :             .file
     298        30196 :             .read_exact_at_n(buf, read.start, read.size())
     299        15369 :             .await?;
     300              : 
     301        30196 :         let blobs_at = read.blobs_at.as_slice();
     302        30196 :         let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
     303        30196 : 
     304        30196 :         let mut metas = Vec::with_capacity(blobs_at.len());
     305        30196 : 
     306        30196 :         // Blobs in `read` only provide their starting offset. The end offset
     307        30196 :         // of a blob is implicit: the start of the next blob if one exists
     308        30196 :         // or the end of the read.
     309        30196 :         let pairs = blobs_at.iter().zip(
     310        30196 :             blobs_at
     311        30196 :                 .iter()
     312        30196 :                 .map(Some)
     313        30196 :                 .skip(1)
     314        30196 :                 .chain(std::iter::once(None)),
     315        30196 :         );
     316              : 
     317       105920 :         for ((offset, meta), next) in pairs {
     318        75724 :             let offset_in_buf = offset - start_offset;
     319        75724 :             let first_len_byte = buf[offset_in_buf as usize];
     320              : 
     321              :             // Each blob is prefixed by a header containing it's size.
     322              :             // Extract the size and skip that header to find the start of the data.
     323              :             // The size can be 1 or 4 bytes. The most significant bit is 0 in the
     324              :             // 1 byte case and 1 in the 4 byte case.
     325        75724 :             let (size_length, blob_size) = if first_len_byte < 0x80 {
     326        42106 :                 (1, first_len_byte as u64)
     327              :             } else {
     328        33618 :                 let mut blob_size_buf = [0u8; 4];
     329        33618 :                 let offset_in_buf = offset_in_buf as usize;
     330        33618 : 
     331        33618 :                 blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
     332        33618 :                 blob_size_buf[0] &= 0x7f;
     333        33618 :                 (4, u32::from_be_bytes(blob_size_buf) as u64)
     334              :             };
     335              : 
     336        75724 :             let start = offset_in_buf + size_length;
     337        75724 :             let end = match next {
     338        45528 :                 Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
     339        30196 :                 None => start + blob_size,
     340              :             };
     341              : 
     342        75724 :             assert_eq!(end - start, blob_size);
     343              : 
     344        75724 :             metas.push(VectoredBlob {
     345        75724 :                 start: start as usize,
     346        75724 :                 end: end as usize,
     347        75724 :                 meta: *meta,
     348        75724 :             })
     349              :         }
     350              : 
     351        30196 :         Ok(VectoredBlobsBuf { buf, blobs: metas })
     352        30196 :     }
     353              : }
     354              : 
     355              : #[cfg(test)]
     356              : mod tests {
     357              :     use super::*;
     358              : 
     359           16 :     fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
     360           16 :         assert_eq!(read.start, offset_range.first().unwrap().2);
     361              : 
     362           24 :         let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
     363           16 : 
     364           16 :         let offsets_in_read: Vec<_> = read
     365           16 :             .blobs_at
     366           16 :             .as_slice()
     367           16 :             .iter()
     368           24 :             .map(|(offset, _)| *offset)
     369           16 :             .collect();
     370           16 : 
     371           16 :         assert_eq!(expected_offsets_in_read, offsets_in_read);
     372           16 :     }
     373              : 
     374              :     #[test]
     375            2 :     fn planner_max_read_size_test() {
     376            2 :         let max_read_size = 128 * 1024;
     377            2 :         let key = Key::MIN;
     378            2 :         let lsn = Lsn(0);
     379            2 : 
     380            2 :         let blob_descriptions = vec![
     381            2 :             (key, lsn, 0, BlobFlag::None),
     382            2 :             (key, lsn, 32 * 1024, BlobFlag::None),
     383            2 :             (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
     384            2 :             (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
     385            2 :             (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
     386            2 :             (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
     387            2 :             (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
     388            2 :             (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
     389            2 :         ];
     390            2 : 
     391            2 :         let ranges = [
     392            2 :             &blob_descriptions[0..3],
     393            2 :             &blob_descriptions[3..4],
     394            2 :             &blob_descriptions[4..5],
     395            2 :             &blob_descriptions[5..6],
     396            2 :             &blob_descriptions[6..7],
     397            2 :             &blob_descriptions[7..],
     398            2 :         ];
     399            2 : 
     400            2 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     401           16 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     402           16 :             planner.handle(key, lsn, offset, flag);
     403           16 :         }
     404              : 
     405            2 :         planner.handle_range_end(652 * 1024);
     406            2 : 
     407            2 :         let reads = planner.finish();
     408            2 :         assert_eq!(reads.len(), 6);
     409              : 
     410           12 :         for (idx, read) in reads.iter().enumerate() {
     411           12 :             validate_read(read, ranges[idx]);
     412           12 :         }
     413            2 :     }
     414              : 
     415              :     #[test]
     416            2 :     fn planner_replacement_test() {
     417            2 :         let max_read_size = 128 * 1024;
     418            2 :         let first_key = Key::MIN;
     419            2 :         let second_key = first_key.next();
     420            2 :         let lsn = Lsn(0);
     421            2 : 
     422            2 :         let blob_descriptions = vec![
     423            2 :             (first_key, lsn, 0, BlobFlag::None),    // First in read 1
     424            2 :             (first_key, lsn, 1024, BlobFlag::None), // Last in read 1
     425            2 :             (second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll),
     426            2 :             (second_key, lsn, 3 * 1024, BlobFlag::None),
     427            2 :             (second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2
     428            2 :             (second_key, lsn, 5 * 1024, BlobFlag::None),       // Last in read 2
     429            2 :         ];
     430            2 : 
     431            2 :         let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
     432            2 : 
     433            2 :         let mut planner = VectoredReadPlanner::new(max_read_size);
     434           12 :         for (key, lsn, offset, flag) in blob_descriptions.clone() {
     435           12 :             planner.handle(key, lsn, offset, flag);
     436           12 :         }
     437              : 
     438            2 :         planner.handle_range_end(6 * 1024);
     439            2 : 
     440            2 :         let reads = planner.finish();
     441            2 :         assert_eq!(reads.len(), 2);
     442              : 
     443            4 :         for (idx, read) in reads.iter().enumerate() {
     444            4 :             validate_read(read, ranges[idx]);
     445            4 :         }
     446            2 :     }
     447              : }
        

Generated by: LCOV version 2.1-beta