Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::ops::Deref;
20 :
21 : use bytes::Bytes;
22 : use pageserver_api::key::Key;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::BoundedBuf;
25 : use utils::lsn::Lsn;
26 : use utils::vec_map::VecMap;
27 :
28 : use crate::context::RequestContext;
29 : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
30 : use crate::virtual_file::IoBufferMut;
31 : use crate::virtual_file::{self, VirtualFile};
32 :
33 : /// Metadata bundled with the start and end offset of a blob.
34 : #[derive(Copy, Clone, Debug)]
35 : pub struct BlobMeta {
36 : pub key: Key,
37 : pub lsn: Lsn,
38 : }
39 :
40 : /// A view into the vectored blobs read buffer.
41 : #[derive(Clone, Debug)]
42 : pub(crate) enum BufView<'a> {
43 : Slice(&'a [u8]),
44 : Bytes(bytes::Bytes),
45 : }
46 :
47 : impl<'a> BufView<'a> {
48 : /// Creates a new slice-based view on the blob.
49 211956 : pub fn new_slice(slice: &'a [u8]) -> Self {
50 211956 : Self::Slice(slice)
51 211956 : }
52 :
53 : /// Creates a new [`bytes::Bytes`]-based view on the blob.
54 2 : pub fn new_bytes(bytes: bytes::Bytes) -> Self {
55 2 : Self::Bytes(bytes)
56 2 : }
57 :
58 : /// Convert the view into `Bytes`.
59 : ///
60 : /// If using slice as the underlying storage, the copy will be an O(n) operation.
61 344838 : pub fn into_bytes(self) -> Bytes {
62 344838 : match self {
63 344838 : BufView::Slice(slice) => Bytes::copy_from_slice(slice),
64 0 : BufView::Bytes(bytes) => bytes,
65 : }
66 344838 : }
67 :
68 : /// Creates a sub-view of the blob based on the range.
69 2655501 : fn view(&self, range: std::ops::Range<usize>) -> Self {
70 2655501 : match self {
71 2655501 : BufView::Slice(slice) => BufView::Slice(&slice[range]),
72 0 : BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)),
73 : }
74 2655501 : }
75 : }
76 :
77 : impl Deref for BufView<'_> {
78 : type Target = [u8];
79 :
80 2310729 : fn deref(&self) -> &Self::Target {
81 2310729 : match self {
82 2310727 : BufView::Slice(slice) => slice,
83 2 : BufView::Bytes(bytes) => bytes,
84 : }
85 2310729 : }
86 : }
87 :
88 : impl AsRef<[u8]> for BufView<'_> {
89 0 : fn as_ref(&self) -> &[u8] {
90 0 : match self {
91 0 : BufView::Slice(slice) => slice,
92 0 : BufView::Bytes(bytes) => bytes.as_ref(),
93 : }
94 0 : }
95 : }
96 :
97 : impl<'a> From<&'a [u8]> for BufView<'a> {
98 0 : fn from(value: &'a [u8]) -> Self {
99 0 : Self::new_slice(value)
100 0 : }
101 : }
102 :
103 : impl From<Bytes> for BufView<'_> {
104 0 : fn from(value: Bytes) -> Self {
105 0 : Self::new_bytes(value)
106 0 : }
107 : }
108 :
109 : /// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed,
110 : /// subject to [`VectoredBlob::compression_bits`].
111 : pub struct VectoredBlob {
112 : /// Blob metadata.
113 : pub meta: BlobMeta,
114 : /// Start offset.
115 : start: usize,
116 : /// End offset.
117 : end: usize,
118 : /// Compression used on the the blob.
119 : compression_bits: u8,
120 : }
121 :
122 : impl VectoredBlob {
123 : /// Reads a decompressed view of the blob.
124 2655501 : pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
125 2655501 : let view = buf.view(self.start..self.end);
126 2655501 :
127 2655501 : match self.compression_bits {
128 2655499 : BYTE_UNCOMPRESSED => Ok(view),
129 : BYTE_ZSTD => {
130 2 : let mut decompressed_vec = Vec::new();
131 2 : let mut decoder =
132 2 : async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
133 2 : decoder.write_all(&view).await?;
134 2 : decoder.flush().await?;
135 : // Zero-copy conversion from `Vec` to `Bytes`
136 2 : Ok(BufView::new_bytes(Bytes::from(decompressed_vec)))
137 : }
138 0 : bits => {
139 0 : let error = std::io::Error::new(
140 0 : std::io::ErrorKind::InvalidData,
141 0 : format!("Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}", self.meta.key, self.meta.lsn, self.start, self.end),
142 0 : );
143 0 : Err(error)
144 : }
145 : }
146 2655501 : }
147 : }
148 :
149 : impl std::fmt::Display for VectoredBlob {
150 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 0 : write!(
152 0 : f,
153 0 : "{}@{}, {}..{}",
154 0 : self.meta.key, self.meta.lsn, self.start, self.end
155 0 : )
156 0 : }
157 : }
158 :
159 : /// Return type of [`VectoredBlobReader::read_blobs`]
160 : pub struct VectoredBlobsBuf {
161 : /// Buffer for all blobs in this read
162 : pub buf: IoBufferMut,
163 : /// Offsets into the buffer and metadata for all blobs in this read
164 : pub blobs: Vec<VectoredBlob>,
165 : }
166 :
167 : /// Description of one disk read for multiple blobs.
168 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
169 : #[derive(Debug)]
170 : pub struct VectoredRead {
171 : pub start: u64,
172 : pub end: u64,
173 : /// Start offset and metadata for each blob in this read
174 : pub blobs_at: VecMap<u64, BlobMeta>,
175 : }
176 :
177 : impl VectoredRead {
178 966061 : pub(crate) fn size(&self) -> usize {
179 966061 : (self.end - self.start) as usize
180 966061 : }
181 : }
182 :
183 : #[derive(Eq, PartialEq, Debug)]
184 : pub(crate) enum VectoredReadExtended {
185 : Yes,
186 : No,
187 : }
188 :
189 : /// A vectored read builder that tries to coalesce all reads that fits in a chunk.
190 : pub(crate) struct ChunkedVectoredReadBuilder {
191 : /// Start block number
192 : start_blk_no: usize,
193 : /// End block number (exclusive).
194 : end_blk_no: usize,
195 : /// Start offset and metadata for each blob in this read
196 : blobs_at: VecMap<u64, BlobMeta>,
197 : max_read_size: Option<usize>,
198 : }
199 :
200 : impl ChunkedVectoredReadBuilder {
201 : const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
202 : /// Start building a new vectored read.
203 : ///
204 : /// Note that by design, this does not check against reading more than `max_read_size` to
205 : /// support reading larger blobs than the configuration value. The builder will be single use
206 : /// however after that.
207 212032 : fn new_impl(
208 212032 : start_offset: u64,
209 212032 : end_offset: u64,
210 212032 : meta: BlobMeta,
211 212032 : max_read_size: Option<usize>,
212 212032 : ) -> Self {
213 212032 : let mut blobs_at = VecMap::default();
214 212032 : blobs_at
215 212032 : .append(start_offset, meta)
216 212032 : .expect("First insertion always succeeds");
217 212032 :
218 212032 : let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
219 212032 : let end_blk_no = (end_offset as usize).div_ceil(Self::CHUNK_SIZE);
220 212032 : Self {
221 212032 : start_blk_no,
222 212032 : end_blk_no,
223 212032 : blobs_at,
224 212032 : max_read_size,
225 212032 : }
226 212032 : }
227 :
228 171508 : pub(crate) fn new(
229 171508 : start_offset: u64,
230 171508 : end_offset: u64,
231 171508 : meta: BlobMeta,
232 171508 : max_read_size: usize,
233 171508 : ) -> Self {
234 171508 : Self::new_impl(start_offset, end_offset, meta, Some(max_read_size))
235 171508 : }
236 :
237 40524 : pub(crate) fn new_streaming(start_offset: u64, end_offset: u64, meta: BlobMeta) -> Self {
238 40524 : Self::new_impl(start_offset, end_offset, meta, None)
239 40524 : }
240 :
241 : /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
242 : ///
243 : /// The resulting size also must be below the max read size.
244 2466808 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
245 2466808 : tracing::trace!(start, end, "trying to extend");
246 2466808 : let start_blk_no = start as usize / Self::CHUNK_SIZE;
247 2466808 : let end_blk_no = (end as usize).div_ceil(Self::CHUNK_SIZE);
248 :
249 2466808 : let not_limited_by_max_read_size = {
250 2466808 : if let Some(max_read_size) = self.max_read_size {
251 379460 : let coalesced_size = (end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE;
252 379460 : coalesced_size <= max_read_size
253 : } else {
254 2087348 : true
255 : }
256 : };
257 :
258 : // True if the second block starts in the same block or the immediate next block where the first block ended.
259 : //
260 : // Note: This automatically handles the case where two blocks are adjacent to each other,
261 : // whether they starts on chunk size boundary or not.
262 2466808 : let is_adjacent_chunk_read = {
263 : // 1. first.end & second.start are in the same block
264 2466808 : self.end_blk_no == start_blk_no + 1 ||
265 : // 2. first.end ends one block before second.start
266 12062 : self.end_blk_no == start_blk_no
267 : };
268 :
269 2466808 : if is_adjacent_chunk_read && not_limited_by_max_read_size {
270 2443581 : self.end_blk_no = end_blk_no;
271 2443581 : self.blobs_at
272 2443581 : .append(start, meta)
273 2443581 : .expect("LSNs are ordered within vectored reads");
274 2443581 :
275 2443581 : return VectoredReadExtended::Yes;
276 23227 : }
277 23227 :
278 23227 : VectoredReadExtended::No
279 2466808 : }
280 :
281 2127254 : pub(crate) fn size(&self) -> usize {
282 2127254 : (self.end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE
283 2127254 : }
284 :
285 212032 : pub(crate) fn build(self) -> VectoredRead {
286 212032 : let start = (self.start_blk_no * Self::CHUNK_SIZE) as u64;
287 212032 : let end = (self.end_blk_no * Self::CHUNK_SIZE) as u64;
288 212032 : VectoredRead {
289 212032 : start,
290 212032 : end,
291 212032 : blobs_at: self.blobs_at,
292 212032 : }
293 212032 : }
294 : }
295 :
296 : #[derive(Copy, Clone, Debug)]
297 : pub enum BlobFlag {
298 : None,
299 : Ignore,
300 : ReplaceAll,
301 : }
302 :
303 : /// Planner for vectored blob reads.
304 : ///
305 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
306 : /// and coalesced into disk reads.
307 : ///
308 : /// The implementation is very simple:
309 : /// * Collect all blob offsets in an ordered structure
310 : /// * Iterate over the collected blobs and coalesce them into reads at the end
311 : pub struct VectoredReadPlanner {
312 : // Track all the blob offsets. Start offsets must be ordered.
313 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
314 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
315 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
316 :
317 : max_read_size: usize,
318 : }
319 :
320 : impl VectoredReadPlanner {
321 240148 : pub fn new(max_read_size: usize) -> Self {
322 240148 : Self {
323 240148 : blobs: BTreeMap::new(),
324 240148 : prev: None,
325 240148 : max_read_size,
326 240148 : }
327 240148 : }
328 :
329 : /// Include a new blob in the read plan.
330 : ///
331 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
332 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
333 : /// keys in a given keyspace. This function must be called for each key in the desired
334 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
335 : /// be called after every range in the offset.
336 : ///
337 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
338 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
339 : /// incorrect data to the user.
340 : ///
341 : /// The `flag` argument has two interesting values:
342 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
343 : /// This is used for WAL records that `will_init`.
344 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
345 : /// if the blob is cached.
346 1520177 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
347 : // Implementation note: internally lag behind by one blob such that
348 : // we have a start and end offset when initialising [`VectoredRead`]
349 1520177 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
350 : None => {
351 152801 : self.prev = Some((key, lsn, offset, flag));
352 152801 : return;
353 : }
354 1367376 : Some(prev) => prev,
355 1367376 : };
356 1367376 :
357 1367376 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
358 1367376 :
359 1367376 : self.prev = Some((key, lsn, offset, flag));
360 1520177 : }
361 :
362 240358 : pub fn handle_range_end(&mut self, offset: u64) {
363 240358 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
364 152801 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
365 152801 : }
366 :
367 240358 : self.prev = None;
368 240358 : }
369 :
370 1520177 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
371 1520177 : match flag {
372 373822 : BlobFlag::None => {
373 373822 : let blobs_for_key = self.blobs.entry(key).or_default();
374 373822 : blobs_for_key.push((lsn, start_offset, end_offset));
375 373822 : }
376 257133 : BlobFlag::ReplaceAll => {
377 257133 : let blobs_for_key = self.blobs.entry(key).or_default();
378 257133 : blobs_for_key.clear();
379 257133 : blobs_for_key.push((lsn, start_offset, end_offset));
380 257133 : }
381 889222 : BlobFlag::Ignore => {}
382 : }
383 1520177 : }
384 :
385 240148 : pub fn finish(self) -> Vec<VectoredRead> {
386 240148 : let mut current_read_builder: Option<ChunkedVectoredReadBuilder> = None;
387 240148 : let mut reads = Vec::new();
388 :
389 712587 : for (key, blobs_for_key) in self.blobs {
390 996036 : for (lsn, start_offset, end_offset) in blobs_for_key {
391 523597 : let extended = match &mut current_read_builder {
392 379438 : Some(read_builder) => {
393 379438 : read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
394 : }
395 144159 : None => VectoredReadExtended::No,
396 : };
397 :
398 523597 : if extended == VectoredReadExtended::No {
399 167380 : let next_read_builder = ChunkedVectoredReadBuilder::new(
400 167380 : start_offset,
401 167380 : end_offset,
402 167380 : BlobMeta { key, lsn },
403 167380 : self.max_read_size,
404 167380 : );
405 167380 :
406 167380 : let prev_read_builder = current_read_builder.replace(next_read_builder);
407 :
408 : // `current_read_builder` is None in the first iteration of the outer loop
409 167380 : if let Some(read_builder) = prev_read_builder {
410 23221 : reads.push(read_builder.build());
411 144159 : }
412 356217 : }
413 : }
414 : }
415 :
416 240148 : if let Some(read_builder) = current_read_builder {
417 144159 : reads.push(read_builder.build());
418 144159 : }
419 :
420 240148 : reads
421 240148 : }
422 : }
423 :
424 : /// Disk reader for vectored blob spans (does not go through the page cache)
425 : pub struct VectoredBlobReader<'a> {
426 : file: &'a VirtualFile,
427 : }
428 :
429 : impl<'a> VectoredBlobReader<'a> {
430 280662 : pub fn new(file: &'a VirtualFile) -> Self {
431 280662 : Self { file }
432 280662 : }
433 :
434 : /// Read the requested blobs into the buffer.
435 : ///
436 : /// We have to deal with the fact that blobs are not fixed size.
437 : /// Each blob is prefixed by a size header.
438 : ///
439 : /// The success return value is a struct which contains the buffer
440 : /// filled from disk and a list of offsets at which each blob lies
441 : /// in the buffer.
442 211956 : pub async fn read_blobs(
443 211956 : &self,
444 211956 : read: &VectoredRead,
445 211956 : buf: IoBufferMut,
446 211956 : ctx: &RequestContext,
447 211956 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
448 211956 : assert!(read.size() > 0);
449 211956 : assert!(
450 211956 : read.size() <= buf.capacity(),
451 0 : "{} > {}",
452 0 : read.size(),
453 0 : buf.capacity()
454 : );
455 :
456 211956 : if cfg!(debug_assertions) {
457 211956 : const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
458 211956 : debug_assert_eq!(
459 211956 : read.start % ALIGN,
460 : 0,
461 0 : "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
462 : read.start,
463 : ALIGN
464 : );
465 0 : }
466 :
467 211956 : let buf = self
468 211956 : .file
469 211956 : .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
470 107788 : .await?
471 211956 : .into_inner();
472 211956 :
473 211956 : let blobs_at = read.blobs_at.as_slice();
474 211956 :
475 211956 : let start_offset = read.start;
476 211956 :
477 211956 : let mut metas = Vec::with_capacity(blobs_at.len());
478 : // Blobs in `read` only provide their starting offset. The end offset
479 : // of a blob is implicit: the start of the next blob if one exists
480 : // or the end of the read.
481 :
482 2867457 : for (blob_start, meta) in blobs_at {
483 2655501 : let blob_start_in_buf = blob_start - start_offset;
484 2655501 : let first_len_byte = buf[blob_start_in_buf as usize];
485 :
486 : // Each blob is prefixed by a header containing its size and compression information.
487 : // Extract the size and skip that header to find the start of the data.
488 : // The size can be 1 or 4 bytes. The most significant bit is 0 in the
489 : // 1 byte case and 1 in the 4 byte case.
490 2655501 : let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
491 2617809 : (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
492 : } else {
493 37692 : let mut blob_size_buf = [0u8; 4];
494 37692 : let offset_in_buf = blob_start_in_buf as usize;
495 37692 :
496 37692 : blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
497 37692 : blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
498 37692 :
499 37692 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
500 37692 : (
501 37692 : 4,
502 37692 : u32::from_be_bytes(blob_size_buf) as u64,
503 37692 : compression_bits,
504 37692 : )
505 : };
506 :
507 2655501 : let start = (blob_start_in_buf + size_length) as usize;
508 2655501 : let end = start + blob_size as usize;
509 2655501 :
510 2655501 : metas.push(VectoredBlob {
511 2655501 : start,
512 2655501 : end,
513 2655501 : meta: *meta,
514 2655501 : compression_bits,
515 2655501 : });
516 : }
517 :
518 211956 : Ok(VectoredBlobsBuf { buf, blobs: metas })
519 211956 : }
520 : }
521 :
522 : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
523 : ///
524 : /// It provides a streaming API for getting read blobs. It returns a batch when
525 : /// `handle` gets called and when the current key would just exceed the read_size and
526 : /// max_cnt constraints.
527 : pub struct StreamingVectoredReadPlanner {
528 : read_builder: Option<ChunkedVectoredReadBuilder>,
529 : // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
530 : prev: Option<(Key, Lsn, u64)>,
531 : /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
532 : /// we will produce a single batch instead of split them.
533 : max_read_size: u64,
534 : /// Max item count per batch
535 : max_cnt: usize,
536 : /// Size of the current batch
537 : cnt: usize,
538 : }
539 :
540 : impl StreamingVectoredReadPlanner {
541 788 : pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
542 788 : assert!(max_cnt > 0);
543 788 : assert!(max_read_size > 0);
544 788 : Self {
545 788 : read_builder: None,
546 788 : prev: None,
547 788 : max_cnt,
548 788 : max_read_size,
549 788 : cnt: 0,
550 788 : }
551 788 : }
552 :
553 2127928 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
554 : // Implementation note: internally lag behind by one blob such that
555 : // we have a start and end offset when initialising [`VectoredRead`]
556 2127928 : let (prev_key, prev_lsn, prev_offset) = match self.prev {
557 : None => {
558 674 : self.prev = Some((key, lsn, offset));
559 674 : return None;
560 : }
561 2127254 : Some(prev) => prev,
562 2127254 : };
563 2127254 :
564 2127254 : let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
565 2127254 :
566 2127254 : self.prev = Some((key, lsn, offset));
567 2127254 :
568 2127254 : res
569 2127928 : }
570 :
571 620 : pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
572 620 : let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
573 618 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
574 : } else {
575 2 : None
576 : };
577 :
578 620 : self.prev = None;
579 620 :
580 620 : res
581 620 : }
582 :
583 2127872 : fn add_blob(
584 2127872 : &mut self,
585 2127872 : key: Key,
586 2127872 : lsn: Lsn,
587 2127872 : start_offset: u64,
588 2127872 : end_offset: u64,
589 2127872 : is_last_blob_in_read: bool,
590 2127872 : ) -> Option<VectoredRead> {
591 2127872 : match &mut self.read_builder {
592 2087348 : Some(read_builder) => {
593 2087348 : let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
594 2087348 : assert_eq!(extended, VectoredReadExtended::Yes);
595 : }
596 40524 : None => {
597 40524 : self.read_builder = {
598 40524 : Some(ChunkedVectoredReadBuilder::new_streaming(
599 40524 : start_offset,
600 40524 : end_offset,
601 40524 : BlobMeta { key, lsn },
602 40524 : ))
603 40524 : };
604 40524 : }
605 : }
606 2127872 : let read_builder = self.read_builder.as_mut().unwrap();
607 2127872 : self.cnt += 1;
608 2127872 : if is_last_blob_in_read
609 2127254 : || read_builder.size() >= self.max_read_size as usize
610 2098410 : || self.cnt >= self.max_cnt
611 : {
612 40524 : let prev_read_builder = self.read_builder.take();
613 40524 : self.cnt = 0;
614 :
615 : // `current_read_builder` is None in the first iteration
616 40524 : if let Some(read_builder) = prev_read_builder {
617 40524 : return Some(read_builder.build());
618 0 : }
619 2087348 : }
620 2087348 : None
621 2127872 : }
622 : }
623 :
624 : #[cfg(test)]
625 : mod tests {
626 : use anyhow::Error;
627 :
628 : use crate::context::DownloadBehavior;
629 : use crate::page_cache::PAGE_SZ;
630 : use crate::task_mgr::TaskKind;
631 :
632 : use super::super::blob_io::tests::{random_array, write_maybe_compressed};
633 : use super::*;
634 :
635 48 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
636 : const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
637 48 : assert_eq!(read.start % ALIGN, 0);
638 48 : assert_eq!(read.start / ALIGN, offset_range.first().unwrap().2 / ALIGN);
639 :
640 84 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
641 48 :
642 48 : let offsets_in_read: Vec<_> = read
643 48 : .blobs_at
644 48 : .as_slice()
645 48 : .iter()
646 84 : .map(|(offset, _)| *offset)
647 48 : .collect();
648 48 :
649 48 : assert_eq!(expected_offsets_in_read, offsets_in_read);
650 48 : }
651 :
652 : #[test]
653 2 : fn planner_chunked_coalesce_all_test() {
654 : use crate::virtual_file;
655 :
656 : const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
657 :
658 2 : let max_read_size = CHUNK_SIZE as usize * 8;
659 2 : let key = Key::MIN;
660 2 : let lsn = Lsn(0);
661 2 :
662 2 : let blob_descriptions = [
663 2 : (key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
664 2 : (key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
665 2 : (key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
666 2 : (key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
667 2 : (key, lsn, CHUNK_SIZE, BlobFlag::None),
668 2 : (key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
669 2 : (key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
670 2 : (key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
671 2 : (key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
672 2 : (key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
673 2 : (key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
674 2 : (key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
675 2 : (key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
676 2 : (key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
677 2 : ];
678 2 :
679 2 : let ranges = [
680 2 : &[
681 2 : blob_descriptions[0],
682 2 : blob_descriptions[2],
683 2 : blob_descriptions[4],
684 2 : blob_descriptions[5],
685 2 : blob_descriptions[7],
686 2 : blob_descriptions[8],
687 2 : blob_descriptions[10],
688 2 : ],
689 2 : &blob_descriptions[11..12],
690 2 : &blob_descriptions[13..],
691 2 : ];
692 2 :
693 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
694 30 : for (key, lsn, offset, flag) in blob_descriptions {
695 28 : planner.handle(key, lsn, offset, flag);
696 28 : }
697 :
698 2 : planner.handle_range_end(652 * 1024);
699 2 :
700 2 : let reads = planner.finish();
701 2 :
702 2 : assert_eq!(reads.len(), ranges.len());
703 :
704 6 : for (idx, read) in reads.iter().enumerate() {
705 6 : validate_read(read, ranges[idx]);
706 6 : }
707 2 : }
708 :
709 : #[test]
710 2 : fn planner_max_read_size_test() {
711 2 : let max_read_size = 128 * 1024;
712 2 : let key = Key::MIN;
713 2 : let lsn = Lsn(0);
714 2 :
715 2 : let blob_descriptions = vec![
716 2 : (key, lsn, 0, BlobFlag::None),
717 2 : (key, lsn, 32 * 1024, BlobFlag::None),
718 2 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
719 2 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
720 2 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
721 2 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
722 2 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
723 2 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
724 2 : ];
725 2 :
726 2 : let ranges = [
727 2 : &blob_descriptions[0..3],
728 2 : &blob_descriptions[3..4],
729 2 : &blob_descriptions[4..5],
730 2 : &blob_descriptions[5..6],
731 2 : &blob_descriptions[6..7],
732 2 : &blob_descriptions[7..],
733 2 : ];
734 2 :
735 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
736 16 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
737 16 : planner.handle(key, lsn, offset, flag);
738 16 : }
739 :
740 2 : planner.handle_range_end(652 * 1024);
741 2 :
742 2 : let reads = planner.finish();
743 2 :
744 2 : assert_eq!(reads.len(), 6);
745 :
746 : // TODO: could remove zero reads to produce 5 reads here
747 :
748 12 : for (idx, read) in reads.iter().enumerate() {
749 12 : validate_read(read, ranges[idx]);
750 12 : }
751 2 : }
752 :
753 : #[test]
754 2 : fn planner_replacement_test() {
755 : const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
756 2 : let max_read_size = 128 * CHUNK_SIZE as usize;
757 2 : let first_key = Key::MIN;
758 2 : let second_key = first_key.next();
759 2 : let lsn = Lsn(0);
760 2 :
761 2 : let blob_descriptions = vec![
762 2 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
763 2 : (first_key, lsn, CHUNK_SIZE, BlobFlag::None), // Last in read 1
764 2 : (second_key, lsn, 2 * CHUNK_SIZE, BlobFlag::ReplaceAll),
765 2 : (second_key, lsn, 3 * CHUNK_SIZE, BlobFlag::None),
766 2 : (second_key, lsn, 4 * CHUNK_SIZE, BlobFlag::ReplaceAll), // First in read 2
767 2 : (second_key, lsn, 5 * CHUNK_SIZE, BlobFlag::None), // Last in read 2
768 2 : ];
769 2 :
770 2 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
771 2 :
772 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
773 12 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
774 12 : planner.handle(key, lsn, offset, flag);
775 12 : }
776 :
777 2 : planner.handle_range_end(6 * CHUNK_SIZE);
778 2 :
779 2 : let reads = planner.finish();
780 2 : assert_eq!(reads.len(), 2);
781 :
782 4 : for (idx, read) in reads.iter().enumerate() {
783 4 : validate_read(read, ranges[idx]);
784 4 : }
785 2 : }
786 :
787 : #[test]
788 2 : fn streaming_planner_max_read_size_test() {
789 2 : let max_read_size = 128 * 1024;
790 2 : let key = Key::MIN;
791 2 : let lsn = Lsn(0);
792 2 :
793 2 : let blob_descriptions = vec![
794 2 : (key, lsn, 0, BlobFlag::None),
795 2 : (key, lsn, 32 * 1024, BlobFlag::None),
796 2 : (key, lsn, 96 * 1024, BlobFlag::None),
797 2 : (key, lsn, 128 * 1024, BlobFlag::None),
798 2 : (key, lsn, 198 * 1024, BlobFlag::None),
799 2 : (key, lsn, 268 * 1024, BlobFlag::None),
800 2 : (key, lsn, 396 * 1024, BlobFlag::None),
801 2 : (key, lsn, 652 * 1024, BlobFlag::None),
802 2 : ];
803 2 :
804 2 : let ranges = [
805 2 : &blob_descriptions[0..3],
806 2 : &blob_descriptions[3..5],
807 2 : &blob_descriptions[5..6],
808 2 : &blob_descriptions[6..7],
809 2 : &blob_descriptions[7..],
810 2 : ];
811 2 :
812 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
813 2 : let mut reads = Vec::new();
814 16 : for (key, lsn, offset, _) in blob_descriptions.clone() {
815 16 : reads.extend(planner.handle(key, lsn, offset));
816 16 : }
817 2 : reads.extend(planner.handle_range_end(652 * 1024));
818 2 :
819 2 : assert_eq!(reads.len(), ranges.len());
820 :
821 10 : for (idx, read) in reads.iter().enumerate() {
822 10 : validate_read(read, ranges[idx]);
823 10 : }
824 2 : }
825 :
826 : #[test]
827 2 : fn streaming_planner_max_cnt_test() {
828 2 : let max_read_size = 1024 * 1024;
829 2 : let key = Key::MIN;
830 2 : let lsn = Lsn(0);
831 2 :
832 2 : let blob_descriptions = vec![
833 2 : (key, lsn, 0, BlobFlag::None),
834 2 : (key, lsn, 32 * 1024, BlobFlag::None),
835 2 : (key, lsn, 96 * 1024, BlobFlag::None),
836 2 : (key, lsn, 128 * 1024, BlobFlag::None),
837 2 : (key, lsn, 198 * 1024, BlobFlag::None),
838 2 : (key, lsn, 268 * 1024, BlobFlag::None),
839 2 : (key, lsn, 396 * 1024, BlobFlag::None),
840 2 : (key, lsn, 652 * 1024, BlobFlag::None),
841 2 : ];
842 2 :
843 2 : let ranges = [
844 2 : &blob_descriptions[0..2],
845 2 : &blob_descriptions[2..4],
846 2 : &blob_descriptions[4..6],
847 2 : &blob_descriptions[6..],
848 2 : ];
849 2 :
850 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
851 2 : let mut reads = Vec::new();
852 16 : for (key, lsn, offset, _) in blob_descriptions.clone() {
853 16 : reads.extend(planner.handle(key, lsn, offset));
854 16 : }
855 2 : reads.extend(planner.handle_range_end(652 * 1024));
856 2 :
857 2 : assert_eq!(reads.len(), ranges.len());
858 :
859 8 : for (idx, read) in reads.iter().enumerate() {
860 8 : validate_read(read, ranges[idx]);
861 8 : }
862 2 : }
863 :
864 : #[test]
865 2 : fn streaming_planner_edge_test() {
866 2 : let max_read_size = 1024 * 1024;
867 2 : let key = Key::MIN;
868 2 : let lsn = Lsn(0);
869 2 : {
870 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
871 2 : let mut reads = Vec::new();
872 2 : reads.extend(planner.handle_range_end(652 * 1024));
873 2 : assert!(reads.is_empty());
874 : }
875 : {
876 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
877 2 : let mut reads = Vec::new();
878 2 : reads.extend(planner.handle(key, lsn, 0));
879 2 : reads.extend(planner.handle_range_end(652 * 1024));
880 2 : assert_eq!(reads.len(), 1);
881 2 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
882 2 : }
883 2 : {
884 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
885 2 : let mut reads = Vec::new();
886 2 : reads.extend(planner.handle(key, lsn, 0));
887 2 : reads.extend(planner.handle(key, lsn, 128 * 1024));
888 2 : reads.extend(planner.handle_range_end(652 * 1024));
889 2 : assert_eq!(reads.len(), 2);
890 2 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
891 2 : validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
892 2 : }
893 2 : {
894 2 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
895 2 : let mut reads = Vec::new();
896 2 : reads.extend(planner.handle(key, lsn, 0));
897 2 : reads.extend(planner.handle(key, lsn, 128 * 1024));
898 2 : reads.extend(planner.handle_range_end(652 * 1024));
899 2 : assert_eq!(reads.len(), 1);
900 2 : validate_read(
901 2 : &reads[0],
902 2 : &[
903 2 : (key, lsn, 0, BlobFlag::None),
904 2 : (key, lsn, 128 * 1024, BlobFlag::None),
905 2 : ],
906 2 : );
907 2 : }
908 2 : }
909 :
910 8 : async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
911 8 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
912 8 : let (_temp_dir, pathbuf, offsets) =
913 279 : write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
914 :
915 8 : let file = VirtualFile::open(&pathbuf, &ctx).await?;
916 8 : let file_len = std::fs::metadata(&pathbuf)?.len();
917 8 :
918 8 : // Multiply by two (compressed data might need more space), and add a few bytes for the header
919 4120 : let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
920 8 : let mut buf = IoBufferMut::with_capacity(reserved_bytes);
921 8 :
922 8 : let vectored_blob_reader = VectoredBlobReader::new(&file);
923 8 : let meta = BlobMeta {
924 8 : key: Key::MIN,
925 8 : lsn: Lsn(0),
926 8 : };
927 :
928 4120 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
929 4120 : let end = offsets.get(idx + 1).unwrap_or(&file_len);
930 4120 : if idx + 1 == offsets.len() {
931 8 : continue;
932 4112 : }
933 4112 : let read_builder = ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
934 4112 : let read = read_builder.build();
935 4112 : let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
936 4112 : assert_eq!(result.blobs.len(), 1);
937 4112 : let read_blob = &result.blobs[0];
938 4112 : let view = BufView::new_slice(&result.buf);
939 4112 : let read_buf = read_blob.read(&view).await?;
940 4112 : assert_eq!(
941 4112 : &blob[..],
942 4112 : &read_buf[..],
943 0 : "mismatch for idx={idx} at offset={offset}"
944 : );
945 4112 : buf = result.buf;
946 : }
947 8 : Ok(())
948 8 : }
949 :
950 : #[tokio::test]
951 2 : async fn test_really_big_array() -> Result<(), Error> {
952 2 : let blobs = &[
953 2 : b"test".to_vec(),
954 2 : random_array(10 * PAGE_SZ),
955 2 : b"hello".to_vec(),
956 2 : random_array(66 * PAGE_SZ),
957 2 : vec![0xf3; 24 * PAGE_SZ],
958 2 : b"foobar".to_vec(),
959 2 : ];
960 14 : round_trip_test_compressed(blobs, false).await?;
961 11 : round_trip_test_compressed(blobs, true).await?;
962 2 : Ok(())
963 2 : }
964 :
965 : #[tokio::test]
966 2 : async fn test_arrays_inc() -> Result<(), Error> {
967 2 : let blobs = (0..PAGE_SZ / 8)
968 2048 : .map(|v| random_array(v * 16))
969 2 : .collect::<Vec<_>>();
970 1172 : round_trip_test_compressed(&blobs, false).await?;
971 1172 : round_trip_test_compressed(&blobs, true).await?;
972 2 : Ok(())
973 2 : }
974 : }
|