Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::ops::Deref;
20 :
21 : use bytes::Bytes;
22 : use pageserver_api::key::Key;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::BoundedBuf;
25 : use utils::lsn::Lsn;
26 : use utils::vec_map::VecMap;
27 :
28 : use crate::context::RequestContext;
29 : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, Header};
30 : use crate::virtual_file::{self, IoBufferMut, VirtualFile};
31 :
32 : /// Metadata bundled with the start and end offset of a blob.
33 : #[derive(Copy, Clone, Debug)]
34 : pub struct BlobMeta {
35 : pub key: Key,
36 : pub lsn: Lsn,
37 : pub will_init: bool,
38 : }
39 :
40 : /// A view into the vectored blobs read buffer.
41 : #[derive(Clone, Debug)]
42 : pub(crate) enum BufView<'a> {
43 : Slice(&'a [u8]),
44 : Bytes(bytes::Bytes),
45 : }
46 :
47 : impl<'a> BufView<'a> {
48 : /// Creates a new slice-based view on the blob.
49 538595 : pub fn new_slice(slice: &'a [u8]) -> Self {
50 538595 : Self::Slice(slice)
51 538595 : }
52 :
53 : /// Creates a new [`bytes::Bytes`]-based view on the blob.
54 4 : pub fn new_bytes(bytes: bytes::Bytes) -> Self {
55 4 : Self::Bytes(bytes)
56 4 : }
57 :
58 : /// Convert the view into `Bytes`.
59 : ///
60 : /// If using slice as the underlying storage, the copy will be an O(n) operation.
61 3420727 : pub fn into_bytes(self) -> Bytes {
62 3420727 : match self {
63 3420727 : BufView::Slice(slice) => Bytes::copy_from_slice(slice),
64 0 : BufView::Bytes(bytes) => bytes,
65 : }
66 3420727 : }
67 :
68 : /// Creates a sub-view of the blob based on the range.
69 7749955 : fn view(&self, range: std::ops::Range<usize>) -> Self {
70 7749955 : match self {
71 7749955 : BufView::Slice(slice) => BufView::Slice(&slice[range]),
72 0 : BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)),
73 : }
74 7749955 : }
75 : }
76 :
77 : impl Deref for BufView<'_> {
78 : type Target = [u8];
79 :
80 4337584 : fn deref(&self) -> &Self::Target {
81 4337584 : match self {
82 4337580 : BufView::Slice(slice) => slice,
83 4 : BufView::Bytes(bytes) => bytes,
84 : }
85 4337584 : }
86 : }
87 :
88 : impl AsRef<[u8]> for BufView<'_> {
89 0 : fn as_ref(&self) -> &[u8] {
90 0 : match self {
91 0 : BufView::Slice(slice) => slice,
92 0 : BufView::Bytes(bytes) => bytes.as_ref(),
93 : }
94 0 : }
95 : }
96 :
97 : impl<'a> From<&'a [u8]> for BufView<'a> {
98 0 : fn from(value: &'a [u8]) -> Self {
99 0 : Self::new_slice(value)
100 0 : }
101 : }
102 :
103 : impl From<Bytes> for BufView<'_> {
104 0 : fn from(value: Bytes) -> Self {
105 0 : Self::new_bytes(value)
106 0 : }
107 : }
108 :
109 : /// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed,
110 : /// subject to [`VectoredBlob::compression_bits`].
111 : pub struct VectoredBlob {
112 : /// Blob metadata.
113 : pub meta: BlobMeta,
114 : /// Header start offset.
115 : header_start: usize,
116 : /// Data start offset.
117 : data_start: usize,
118 : /// End offset.
119 : end: usize,
120 : /// Compression used on the data, extracted from the header.
121 : compression_bits: u8,
122 : }
123 :
124 : impl VectoredBlob {
125 : /// Reads a decompressed view of the blob.
126 7741731 : pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
127 7741731 : let view = buf.view(self.data_start..self.end);
128 7741731 :
129 7741731 : match self.compression_bits {
130 7741727 : BYTE_UNCOMPRESSED => Ok(view),
131 : BYTE_ZSTD => {
132 4 : let mut decompressed_vec = Vec::new();
133 4 : let mut decoder =
134 4 : async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
135 4 : decoder.write_all(&view).await?;
136 4 : decoder.flush().await?;
137 : // Zero-copy conversion from `Vec` to `Bytes`
138 4 : Ok(BufView::new_bytes(Bytes::from(decompressed_vec)))
139 : }
140 0 : bits => {
141 0 : let error = std::io::Error::new(
142 0 : std::io::ErrorKind::InvalidData,
143 0 : format!(
144 0 : "Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}",
145 0 : self.meta.key, self.meta.lsn, self.data_start, self.end
146 0 : ),
147 0 : );
148 0 : Err(error)
149 : }
150 : }
151 7741731 : }
152 :
153 : /// Returns the raw blob including header.
154 : #[allow(unused)]
155 8224 : pub(crate) fn raw_with_header<'a>(&self, buf: &BufView<'a>) -> BufView<'a> {
156 8224 : buf.view(self.header_start..self.end)
157 8224 : }
158 : }
159 :
160 : impl std::fmt::Display for VectoredBlob {
161 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 0 : write!(
163 0 : f,
164 0 : "{}@{}, {}..{}",
165 0 : self.meta.key, self.meta.lsn, self.data_start, self.end
166 0 : )
167 0 : }
168 : }
169 :
170 : /// Return type of [`VectoredBlobReader::read_blobs`]
171 : pub struct VectoredBlobsBuf {
172 : /// Buffer for all blobs in this read
173 : pub buf: IoBufferMut,
174 : /// Offsets into the buffer and metadata for all blobs in this read
175 : pub blobs: Vec<VectoredBlob>,
176 : }
177 :
178 : /// Description of one disk read for multiple blobs.
179 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
180 : #[derive(Debug)]
181 : pub struct VectoredRead {
182 : pub start: u64,
183 : pub end: u64,
184 : /// Start offset and metadata for each blob in this read
185 : pub blobs_at: VecMap<u64, BlobMeta>,
186 : }
187 :
188 : impl VectoredRead {
189 2445442 : pub(crate) fn size(&self) -> usize {
190 2445442 : (self.end - self.start) as usize
191 2445442 : }
192 : }
193 :
194 : #[derive(Eq, PartialEq, Debug)]
195 : pub(crate) enum VectoredReadExtended {
196 : Yes,
197 : No,
198 : }
199 :
200 : /// A vectored read builder that tries to coalesce all reads that fits in a chunk.
201 : pub(crate) struct ChunkedVectoredReadBuilder {
202 : /// Start block number
203 : start_blk_no: usize,
204 : /// End block number (exclusive).
205 : end_blk_no: usize,
206 : /// Start offset and metadata for each blob in this read
207 : blobs_at: VecMap<u64, BlobMeta>,
208 : max_read_size: Option<usize>,
209 : }
210 :
211 : impl ChunkedVectoredReadBuilder {
212 : const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
213 : /// Start building a new vectored read.
214 : ///
215 : /// Note that by design, this does not check against reading more than `max_read_size` to
216 : /// support reading larger blobs than the configuration value. The builder will be single use
217 : /// however after that.
218 538747 : fn new_impl(
219 538747 : start_offset: u64,
220 538747 : end_offset: u64,
221 538747 : meta: BlobMeta,
222 538747 : max_read_size: Option<usize>,
223 538747 : ) -> Self {
224 538747 : let mut blobs_at = VecMap::default();
225 538747 : blobs_at
226 538747 : .append(start_offset, meta)
227 538747 : .expect("First insertion always succeeds");
228 538747 :
229 538747 : let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
230 538747 : let end_blk_no = (end_offset as usize).div_ceil(Self::CHUNK_SIZE);
231 538747 : Self {
232 538747 : start_blk_no,
233 538747 : end_blk_no,
234 538747 : blobs_at,
235 538747 : max_read_size,
236 538747 : }
237 538747 : }
238 :
239 457623 : pub(crate) fn new(
240 457623 : start_offset: u64,
241 457623 : end_offset: u64,
242 457623 : meta: BlobMeta,
243 457623 : max_read_size: usize,
244 457623 : ) -> Self {
245 457623 : Self::new_impl(start_offset, end_offset, meta, Some(max_read_size))
246 457623 : }
247 :
248 81124 : pub(crate) fn new_streaming(start_offset: u64, end_offset: u64, meta: BlobMeta) -> Self {
249 81124 : Self::new_impl(start_offset, end_offset, meta, None)
250 81124 : }
251 :
252 : /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
253 : ///
254 : /// The resulting size also must be below the max read size.
255 7292845 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
256 7292845 : tracing::trace!(start, end, "trying to extend");
257 7292845 : let start_blk_no = start as usize / Self::CHUNK_SIZE;
258 7292845 : let end_blk_no = (end as usize).div_ceil(Self::CHUNK_SIZE);
259 :
260 7292845 : let not_limited_by_max_read_size = {
261 7292845 : if let Some(max_read_size) = self.max_read_size {
262 3117881 : let coalesced_size = (end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE;
263 3117881 : coalesced_size <= max_read_size
264 : } else {
265 4174964 : true
266 : }
267 : };
268 :
269 : // True if the second block starts in the same block or the immediate next block where the first block ended.
270 : //
271 : // Note: This automatically handles the case where two blocks are adjacent to each other,
272 : // whether they starts on chunk size boundary or not.
273 7292845 : let is_adjacent_chunk_read = {
274 : // 1. first.end & second.start are in the same block
275 7292845 : self.end_blk_no == start_blk_no + 1 ||
276 : // 2. first.end ends one block before second.start
277 87407 : self.end_blk_no == start_blk_no
278 : };
279 :
280 7292845 : if is_adjacent_chunk_read && not_limited_by_max_read_size {
281 7203208 : self.end_blk_no = end_blk_no;
282 7203208 : self.blobs_at
283 7203208 : .append(start, meta)
284 7203208 : .expect("LSNs are ordered within vectored reads");
285 7203208 :
286 7203208 : return VectoredReadExtended::Yes;
287 89637 : }
288 89637 :
289 89637 : VectoredReadExtended::No
290 7292845 : }
291 :
292 4254776 : pub(crate) fn size(&self) -> usize {
293 4254776 : (self.end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE
294 4254776 : }
295 :
296 538747 : pub(crate) fn build(self) -> VectoredRead {
297 538747 : let start = (self.start_blk_no * Self::CHUNK_SIZE) as u64;
298 538747 : let end = (self.end_blk_no * Self::CHUNK_SIZE) as u64;
299 538747 : VectoredRead {
300 538747 : start,
301 538747 : end,
302 538747 : blobs_at: self.blobs_at,
303 538747 : }
304 538747 : }
305 : }
306 :
307 : #[derive(Copy, Clone, Debug)]
308 : pub enum BlobFlag {
309 : None,
310 : Ignore,
311 : ReplaceAll,
312 : }
313 :
314 : /// Planner for vectored blob reads.
315 : ///
316 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
317 : /// and coalesced into disk reads.
318 : ///
319 : /// The implementation is very simple:
320 : /// * Collect all blob offsets in an ordered structure
321 : /// * Iterate over the collected blobs and coalesce them into reads at the end
322 : pub struct VectoredReadPlanner {
323 : // Track all the blob offsets. Start offsets must be ordered.
324 : // Values in the value tuples are:
325 : // (
326 : // lsn of the blob,
327 : // start offset of the blob in the underlying file,
328 : // end offset of the blob in the underlying file,
329 : // whether the blob initializes the page image or not
330 : // see [`pageserver_api::record::NeonWalRecord::will_init`]
331 : // )
332 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
333 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
334 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
335 :
336 : max_read_size: usize,
337 : }
338 :
339 : impl VectoredReadPlanner {
340 531925 : pub fn new(max_read_size: usize) -> Self {
341 531925 : Self {
342 531925 : blobs: BTreeMap::new(),
343 531925 : prev: None,
344 531925 : max_read_size,
345 531925 : }
346 531925 : }
347 :
348 : /// Include a new blob in the read plan.
349 : ///
350 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
351 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
352 : /// keys in a given keyspace. This function must be called for each key in the desired
353 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
354 : /// be called after every range in the offset.
355 : ///
356 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
357 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
358 : /// incorrect data to the user.
359 : ///
360 : /// The `flag` argument has two interesting values:
361 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
362 : /// This is used for WAL records that `will_init`.
363 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
364 : /// if the blob is cached.
365 6999698 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
366 : // Implementation note: internally lag behind by one blob such that
367 : // we have a start and end offset when initialising [`VectoredRead`]
368 6999698 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
369 : None => {
370 399114 : self.prev = Some((key, lsn, offset, flag));
371 399114 : return;
372 : }
373 6600584 : Some(prev) => prev,
374 6600584 : };
375 6600584 :
376 6600584 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
377 6600584 :
378 6600584 : self.prev = Some((key, lsn, offset, flag));
379 6999698 : }
380 :
381 571361 : pub fn handle_range_end(&mut self, offset: u64) {
382 571361 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
383 399114 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
384 399114 : }
385 :
386 571361 : self.prev = None;
387 571361 : }
388 :
389 6999698 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
390 6999698 : match flag {
391 4517056 : BlobFlag::None => {
392 4517056 : let blobs_for_key = self.blobs.entry(key).or_default();
393 4517056 : blobs_for_key.push((lsn, start_offset, end_offset, false));
394 4517056 : }
395 684081 : BlobFlag::ReplaceAll => {
396 684081 : let blobs_for_key = self.blobs.entry(key).or_default();
397 684081 : blobs_for_key.clear();
398 684081 : blobs_for_key.push((lsn, start_offset, end_offset, true));
399 684081 : }
400 1798561 : BlobFlag::Ignore => {}
401 : }
402 6999698 : }
403 :
404 531925 : pub fn finish(self) -> Vec<VectoredRead> {
405 531925 : let mut current_read_builder: Option<ChunkedVectoredReadBuilder> = None;
406 531925 : let mut reads = Vec::new();
407 :
408 1211840 : for (key, blobs_for_key) in self.blobs {
409 4157494 : for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
410 3477579 : let extended = match &mut current_read_builder {
411 3117837 : Some(read_builder) => read_builder.extend(
412 3117837 : start_offset,
413 3117837 : end_offset,
414 3117837 : BlobMeta {
415 3117837 : key,
416 3117837 : lsn,
417 3117837 : will_init,
418 3117837 : },
419 3117837 : ),
420 359742 : None => VectoredReadExtended::No,
421 : };
422 :
423 3477579 : if extended == VectoredReadExtended::No {
424 449367 : let next_read_builder = ChunkedVectoredReadBuilder::new(
425 449367 : start_offset,
426 449367 : end_offset,
427 449367 : BlobMeta {
428 449367 : key,
429 449367 : lsn,
430 449367 : will_init,
431 449367 : },
432 449367 : self.max_read_size,
433 449367 : );
434 449367 :
435 449367 : let prev_read_builder = current_read_builder.replace(next_read_builder);
436 :
437 : // `current_read_builder` is None in the first iteration of the outer loop
438 449367 : if let Some(read_builder) = prev_read_builder {
439 89625 : reads.push(read_builder.build());
440 359742 : }
441 3028212 : }
442 : }
443 : }
444 :
445 531925 : if let Some(read_builder) = current_read_builder {
446 359742 : reads.push(read_builder.build());
447 359742 : }
448 :
449 531925 : reads
450 531925 : }
451 : }
452 :
453 : /// Disk reader for vectored blob spans (does not go through the page cache)
454 : pub struct VectoredBlobReader<'a> {
455 : file: &'a VirtualFile,
456 : }
457 :
458 : impl<'a> VectoredBlobReader<'a> {
459 491339 : pub fn new(file: &'a VirtualFile) -> Self {
460 491339 : Self { file }
461 491339 : }
462 :
463 : /// Read the requested blobs into the buffer.
464 : ///
465 : /// We have to deal with the fact that blobs are not fixed size.
466 : /// Each blob is prefixed by a size header.
467 : ///
468 : /// The success return value is a struct which contains the buffer
469 : /// filled from disk and a list of offsets at which each blob lies
470 : /// in the buffer.
471 538595 : pub async fn read_blobs(
472 538595 : &self,
473 538595 : read: &VectoredRead,
474 538595 : buf: IoBufferMut,
475 538595 : ctx: &RequestContext,
476 538595 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
477 538595 : assert!(read.size() > 0);
478 538595 : assert!(
479 538595 : read.size() <= buf.capacity(),
480 0 : "{} > {}",
481 0 : read.size(),
482 0 : buf.capacity()
483 : );
484 :
485 538595 : if cfg!(debug_assertions) {
486 538595 : const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
487 538595 : debug_assert_eq!(
488 538595 : read.start % ALIGN,
489 : 0,
490 0 : "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
491 : read.start,
492 : ALIGN
493 : );
494 0 : }
495 :
496 538595 : let buf = self
497 538595 : .file
498 538595 : .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
499 538595 : .await?
500 538595 : .into_inner();
501 538595 :
502 538595 : let blobs_at = read.blobs_at.as_slice();
503 538595 :
504 538595 : let mut blobs = Vec::with_capacity(blobs_at.len());
505 : // Blobs in `read` only provide their starting offset. The end offset
506 : // of a blob is implicit: the start of the next blob if one exists
507 : // or the end of the read.
508 :
509 7741731 : for (blob_start, meta) in blobs_at.iter().copied() {
510 7741731 : let header_start = (blob_start - read.start) as usize;
511 7741731 : let header = Header::decode(&buf[header_start..])?;
512 7741731 : let data_start = header_start + header.header_len;
513 7741731 : let end = data_start + header.data_len;
514 7741731 : let compression_bits = header.compression_bits;
515 7741731 :
516 7741731 : blobs.push(VectoredBlob {
517 7741731 : header_start,
518 7741731 : data_start,
519 7741731 : end,
520 7741731 : meta,
521 7741731 : compression_bits,
522 7741731 : });
523 : }
524 :
525 538595 : Ok(VectoredBlobsBuf { buf, blobs })
526 538595 : }
527 : }
528 :
529 : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
530 : ///
531 : /// It provides a streaming API for getting read blobs. It returns a batch when
532 : /// `handle` gets called and when the current key would just exceed the read_size and
533 : /// max_cnt constraints.
534 : pub struct StreamingVectoredReadPlanner {
535 : read_builder: Option<ChunkedVectoredReadBuilder>,
536 : // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
537 : prev: Option<(Key, Lsn, u64, bool)>,
538 : /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
539 : /// we will produce a single batch instead of split them.
540 : max_read_size: u64,
541 : /// Max item count per batch
542 : max_cnt: usize,
543 : /// Size of the current batch
544 : cnt: usize,
545 : }
546 :
547 : impl StreamingVectoredReadPlanner {
548 1652 : pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
549 1652 : assert!(max_cnt > 0);
550 1652 : assert!(max_read_size > 0);
551 1652 : Self {
552 1652 : read_builder: None,
553 1652 : prev: None,
554 1652 : max_cnt,
555 1652 : max_read_size,
556 1652 : cnt: 0,
557 1652 : }
558 1652 : }
559 :
560 4256200 : pub fn handle(
561 4256200 : &mut self,
562 4256200 : key: Key,
563 4256200 : lsn: Lsn,
564 4256200 : offset: u64,
565 4256200 : will_init: bool,
566 4256200 : ) -> Option<VectoredRead> {
567 : // Implementation note: internally lag behind by one blob such that
568 : // we have a start and end offset when initialising [`VectoredRead`]
569 4256200 : let (prev_key, prev_lsn, prev_offset, prev_will_init) = match self.prev {
570 : None => {
571 1424 : self.prev = Some((key, lsn, offset, will_init));
572 1424 : return None;
573 : }
574 4254776 : Some(prev) => prev,
575 4254776 : };
576 4254776 :
577 4254776 : let res = self.add_blob(
578 4254776 : prev_key,
579 4254776 : prev_lsn,
580 4254776 : prev_offset,
581 4254776 : offset,
582 4254776 : false,
583 4254776 : prev_will_init,
584 4254776 : );
585 4254776 :
586 4254776 : self.prev = Some((key, lsn, offset, will_init));
587 4254776 :
588 4254776 : res
589 4256200 : }
590 :
591 1316 : pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
592 1316 : let res = if let Some((prev_key, prev_lsn, prev_offset, prev_will_init)) = self.prev {
593 1312 : self.add_blob(
594 1312 : prev_key,
595 1312 : prev_lsn,
596 1312 : prev_offset,
597 1312 : offset,
598 1312 : true,
599 1312 : prev_will_init,
600 1312 : )
601 : } else {
602 4 : None
603 : };
604 :
605 1316 : self.prev = None;
606 1316 :
607 1316 : res
608 1316 : }
609 :
610 4256088 : fn add_blob(
611 4256088 : &mut self,
612 4256088 : key: Key,
613 4256088 : lsn: Lsn,
614 4256088 : start_offset: u64,
615 4256088 : end_offset: u64,
616 4256088 : is_last_blob_in_read: bool,
617 4256088 : will_init: bool,
618 4256088 : ) -> Option<VectoredRead> {
619 4256088 : match &mut self.read_builder {
620 4174964 : Some(read_builder) => {
621 4174964 : let extended = read_builder.extend(
622 4174964 : start_offset,
623 4174964 : end_offset,
624 4174964 : BlobMeta {
625 4174964 : key,
626 4174964 : lsn,
627 4174964 : will_init,
628 4174964 : },
629 4174964 : );
630 4174964 : assert_eq!(extended, VectoredReadExtended::Yes);
631 : }
632 81124 : None => {
633 81124 : self.read_builder = {
634 81124 : Some(ChunkedVectoredReadBuilder::new_streaming(
635 81124 : start_offset,
636 81124 : end_offset,
637 81124 : BlobMeta {
638 81124 : key,
639 81124 : lsn,
640 81124 : will_init,
641 81124 : },
642 81124 : ))
643 81124 : };
644 81124 : }
645 : }
646 4256088 : let read_builder = self.read_builder.as_mut().unwrap();
647 4256088 : self.cnt += 1;
648 4256088 : if is_last_blob_in_read
649 4254776 : || read_builder.size() >= self.max_read_size as usize
650 4197088 : || self.cnt >= self.max_cnt
651 : {
652 81124 : let prev_read_builder = self.read_builder.take();
653 81124 : self.cnt = 0;
654 :
655 : // `current_read_builder` is None in the first iteration
656 81124 : if let Some(read_builder) = prev_read_builder {
657 81124 : return Some(read_builder.build());
658 0 : }
659 4174964 : }
660 4174964 : None
661 4256088 : }
662 : }
663 :
664 : #[cfg(test)]
665 : mod tests {
666 : use anyhow::Error;
667 :
668 : use super::super::blob_io::tests::{random_array, write_maybe_compressed};
669 : use super::*;
670 : use crate::context::DownloadBehavior;
671 : use crate::page_cache::PAGE_SZ;
672 : use crate::task_mgr::TaskKind;
673 :
674 96 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
675 : const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
676 96 : assert_eq!(read.start % ALIGN, 0);
677 96 : assert_eq!(read.start / ALIGN, offset_range.first().unwrap().2 / ALIGN);
678 :
679 168 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
680 96 :
681 96 : let offsets_in_read: Vec<_> = read
682 96 : .blobs_at
683 96 : .as_slice()
684 96 : .iter()
685 168 : .map(|(offset, _)| *offset)
686 96 : .collect();
687 96 :
688 96 : assert_eq!(expected_offsets_in_read, offsets_in_read);
689 96 : }
690 :
691 : #[test]
692 4 : fn planner_chunked_coalesce_all_test() {
693 : use crate::virtual_file;
694 :
695 : const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
696 :
697 4 : let max_read_size = CHUNK_SIZE as usize * 8;
698 4 : let key = Key::MIN;
699 4 : let lsn = Lsn(0);
700 4 :
701 4 : let blob_descriptions = [
702 4 : (key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
703 4 : (key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
704 4 : (key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
705 4 : (key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
706 4 : (key, lsn, CHUNK_SIZE, BlobFlag::None),
707 4 : (key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
708 4 : (key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
709 4 : (key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
710 4 : (key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
711 4 : (key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
712 4 : (key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
713 4 : (key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
714 4 : (key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
715 4 : (key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
716 4 : ];
717 4 :
718 4 : let ranges = [
719 4 : &[
720 4 : blob_descriptions[0],
721 4 : blob_descriptions[2],
722 4 : blob_descriptions[4],
723 4 : blob_descriptions[5],
724 4 : blob_descriptions[7],
725 4 : blob_descriptions[8],
726 4 : blob_descriptions[10],
727 4 : ],
728 4 : &blob_descriptions[11..12],
729 4 : &blob_descriptions[13..],
730 4 : ];
731 4 :
732 4 : let mut planner = VectoredReadPlanner::new(max_read_size);
733 60 : for (key, lsn, offset, flag) in blob_descriptions {
734 56 : planner.handle(key, lsn, offset, flag);
735 56 : }
736 :
737 4 : planner.handle_range_end(652 * 1024);
738 4 :
739 4 : let reads = planner.finish();
740 4 :
741 4 : assert_eq!(reads.len(), ranges.len());
742 :
743 12 : for (idx, read) in reads.iter().enumerate() {
744 12 : validate_read(read, ranges[idx]);
745 12 : }
746 4 : }
747 :
748 : #[test]
749 4 : fn planner_max_read_size_test() {
750 4 : let max_read_size = 128 * 1024;
751 4 : let key = Key::MIN;
752 4 : let lsn = Lsn(0);
753 4 :
754 4 : let blob_descriptions = vec![
755 4 : (key, lsn, 0, BlobFlag::None),
756 4 : (key, lsn, 32 * 1024, BlobFlag::None),
757 4 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
758 4 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
759 4 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
760 4 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
761 4 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
762 4 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
763 4 : ];
764 4 :
765 4 : let ranges = [
766 4 : &blob_descriptions[0..3],
767 4 : &blob_descriptions[3..4],
768 4 : &blob_descriptions[4..5],
769 4 : &blob_descriptions[5..6],
770 4 : &blob_descriptions[6..7],
771 4 : &blob_descriptions[7..],
772 4 : ];
773 4 :
774 4 : let mut planner = VectoredReadPlanner::new(max_read_size);
775 32 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
776 32 : planner.handle(key, lsn, offset, flag);
777 32 : }
778 :
779 4 : planner.handle_range_end(652 * 1024);
780 4 :
781 4 : let reads = planner.finish();
782 4 :
783 4 : assert_eq!(reads.len(), 6);
784 :
785 : // TODO: could remove zero reads to produce 5 reads here
786 :
787 24 : for (idx, read) in reads.iter().enumerate() {
788 24 : validate_read(read, ranges[idx]);
789 24 : }
790 4 : }
791 :
792 : #[test]
793 4 : fn planner_replacement_test() {
794 : const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
795 4 : let max_read_size = 128 * CHUNK_SIZE as usize;
796 4 : let first_key = Key::MIN;
797 4 : let second_key = first_key.next();
798 4 : let lsn = Lsn(0);
799 4 :
800 4 : let blob_descriptions = vec![
801 4 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
802 4 : (first_key, lsn, CHUNK_SIZE, BlobFlag::None), // Last in read 1
803 4 : (second_key, lsn, 2 * CHUNK_SIZE, BlobFlag::ReplaceAll),
804 4 : (second_key, lsn, 3 * CHUNK_SIZE, BlobFlag::None),
805 4 : (second_key, lsn, 4 * CHUNK_SIZE, BlobFlag::ReplaceAll), // First in read 2
806 4 : (second_key, lsn, 5 * CHUNK_SIZE, BlobFlag::None), // Last in read 2
807 4 : ];
808 4 :
809 4 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
810 4 :
811 4 : let mut planner = VectoredReadPlanner::new(max_read_size);
812 24 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
813 24 : planner.handle(key, lsn, offset, flag);
814 24 : }
815 :
816 4 : planner.handle_range_end(6 * CHUNK_SIZE);
817 4 :
818 4 : let reads = planner.finish();
819 4 : assert_eq!(reads.len(), 2);
820 :
821 8 : for (idx, read) in reads.iter().enumerate() {
822 8 : validate_read(read, ranges[idx]);
823 8 : }
824 4 : }
825 :
826 : #[test]
827 4 : fn streaming_planner_max_read_size_test() {
828 4 : let max_read_size = 128 * 1024;
829 4 : let key = Key::MIN;
830 4 : let lsn = Lsn(0);
831 4 :
832 4 : let blob_descriptions = vec![
833 4 : (key, lsn, 0, BlobFlag::None),
834 4 : (key, lsn, 32 * 1024, BlobFlag::None),
835 4 : (key, lsn, 96 * 1024, BlobFlag::None),
836 4 : (key, lsn, 128 * 1024, BlobFlag::None),
837 4 : (key, lsn, 198 * 1024, BlobFlag::None),
838 4 : (key, lsn, 268 * 1024, BlobFlag::None),
839 4 : (key, lsn, 396 * 1024, BlobFlag::None),
840 4 : (key, lsn, 652 * 1024, BlobFlag::None),
841 4 : ];
842 4 :
843 4 : let ranges = [
844 4 : &blob_descriptions[0..3],
845 4 : &blob_descriptions[3..5],
846 4 : &blob_descriptions[5..6],
847 4 : &blob_descriptions[6..7],
848 4 : &blob_descriptions[7..],
849 4 : ];
850 4 :
851 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
852 4 : let mut reads = Vec::new();
853 32 : for (key, lsn, offset, _) in blob_descriptions.clone() {
854 32 : reads.extend(planner.handle(key, lsn, offset, false));
855 32 : }
856 4 : reads.extend(planner.handle_range_end(652 * 1024));
857 4 :
858 4 : assert_eq!(reads.len(), ranges.len());
859 :
860 20 : for (idx, read) in reads.iter().enumerate() {
861 20 : validate_read(read, ranges[idx]);
862 20 : }
863 4 : }
864 :
865 : #[test]
866 4 : fn streaming_planner_max_cnt_test() {
867 4 : let max_read_size = 1024 * 1024;
868 4 : let key = Key::MIN;
869 4 : let lsn = Lsn(0);
870 4 :
871 4 : let blob_descriptions = vec![
872 4 : (key, lsn, 0, BlobFlag::None),
873 4 : (key, lsn, 32 * 1024, BlobFlag::None),
874 4 : (key, lsn, 96 * 1024, BlobFlag::None),
875 4 : (key, lsn, 128 * 1024, BlobFlag::None),
876 4 : (key, lsn, 198 * 1024, BlobFlag::None),
877 4 : (key, lsn, 268 * 1024, BlobFlag::None),
878 4 : (key, lsn, 396 * 1024, BlobFlag::None),
879 4 : (key, lsn, 652 * 1024, BlobFlag::None),
880 4 : ];
881 4 :
882 4 : let ranges = [
883 4 : &blob_descriptions[0..2],
884 4 : &blob_descriptions[2..4],
885 4 : &blob_descriptions[4..6],
886 4 : &blob_descriptions[6..],
887 4 : ];
888 4 :
889 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
890 4 : let mut reads = Vec::new();
891 32 : for (key, lsn, offset, _) in blob_descriptions.clone() {
892 32 : reads.extend(planner.handle(key, lsn, offset, false));
893 32 : }
894 4 : reads.extend(planner.handle_range_end(652 * 1024));
895 4 :
896 4 : assert_eq!(reads.len(), ranges.len());
897 :
898 16 : for (idx, read) in reads.iter().enumerate() {
899 16 : validate_read(read, ranges[idx]);
900 16 : }
901 4 : }
902 :
903 : #[test]
904 4 : fn streaming_planner_edge_test() {
905 4 : let max_read_size = 1024 * 1024;
906 4 : let key = Key::MIN;
907 4 : let lsn = Lsn(0);
908 4 : {
909 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
910 4 : let mut reads = Vec::new();
911 4 : reads.extend(planner.handle_range_end(652 * 1024));
912 4 : assert!(reads.is_empty());
913 : }
914 : {
915 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
916 4 : let mut reads = Vec::new();
917 4 : reads.extend(planner.handle(key, lsn, 0, false));
918 4 : reads.extend(planner.handle_range_end(652 * 1024));
919 4 : assert_eq!(reads.len(), 1);
920 4 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
921 4 : }
922 4 : {
923 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
924 4 : let mut reads = Vec::new();
925 4 : reads.extend(planner.handle(key, lsn, 0, false));
926 4 : reads.extend(planner.handle(key, lsn, 128 * 1024, false));
927 4 : reads.extend(planner.handle_range_end(652 * 1024));
928 4 : assert_eq!(reads.len(), 2);
929 4 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
930 4 : validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
931 4 : }
932 4 : {
933 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
934 4 : let mut reads = Vec::new();
935 4 : reads.extend(planner.handle(key, lsn, 0, false));
936 4 : reads.extend(planner.handle(key, lsn, 128 * 1024, false));
937 4 : reads.extend(planner.handle_range_end(652 * 1024));
938 4 : assert_eq!(reads.len(), 1);
939 4 : validate_read(
940 4 : &reads[0],
941 4 : &[
942 4 : (key, lsn, 0, BlobFlag::None),
943 4 : (key, lsn, 128 * 1024, BlobFlag::None),
944 4 : ],
945 4 : );
946 4 : }
947 4 : }
948 :
949 16 : async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
950 16 : let ctx =
951 16 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
952 16 : let (_temp_dir, pathbuf, offsets) =
953 16 : write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
954 :
955 16 : let file = VirtualFile::open(&pathbuf, &ctx).await?;
956 16 : let file_len = std::fs::metadata(&pathbuf)?.len();
957 16 :
958 16 : // Multiply by two (compressed data might need more space), and add a few bytes for the header
959 8240 : let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
960 16 : let mut buf = IoBufferMut::with_capacity(reserved_bytes);
961 16 :
962 16 : let vectored_blob_reader = VectoredBlobReader::new(&file);
963 16 : let meta = BlobMeta {
964 16 : key: Key::MIN,
965 16 : lsn: Lsn(0),
966 16 : will_init: false,
967 16 : };
968 :
969 8240 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
970 8240 : let end = offsets.get(idx + 1).unwrap_or(&file_len);
971 8240 : if idx + 1 == offsets.len() {
972 16 : continue;
973 8224 : }
974 8224 : let read_builder = ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
975 8224 : let read = read_builder.build();
976 8224 : let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
977 8224 : assert_eq!(result.blobs.len(), 1);
978 8224 : let read_blob = &result.blobs[0];
979 8224 : let view = BufView::new_slice(&result.buf);
980 8224 : let read_buf = read_blob.read(&view).await?;
981 8224 : assert_eq!(
982 8224 : &blob[..],
983 8224 : &read_buf[..],
984 0 : "mismatch for idx={idx} at offset={offset}"
985 : );
986 :
987 : // Check that raw_with_header returns a valid header.
988 8224 : let raw = read_blob.raw_with_header(&view);
989 8224 : let header = Header::decode(&raw)?;
990 8224 : if !compression || header.header_len == 1 {
991 4152 : assert_eq!(header.compression_bits, BYTE_UNCOMPRESSED);
992 4072 : }
993 8224 : assert_eq!(raw.len(), header.total_len());
994 :
995 8224 : buf = result.buf;
996 : }
997 16 : Ok(())
998 16 : }
999 :
1000 : #[tokio::test]
1001 4 : async fn test_really_big_array() -> Result<(), Error> {
1002 4 : let blobs = &[
1003 4 : b"test".to_vec(),
1004 4 : random_array(10 * PAGE_SZ),
1005 4 : b"hello".to_vec(),
1006 4 : random_array(66 * PAGE_SZ),
1007 4 : vec![0xf3; 24 * PAGE_SZ],
1008 4 : b"foobar".to_vec(),
1009 4 : ];
1010 4 : round_trip_test_compressed(blobs, false).await?;
1011 4 : round_trip_test_compressed(blobs, true).await?;
1012 4 : Ok(())
1013 4 : }
1014 :
1015 : #[tokio::test]
1016 4 : async fn test_arrays_inc() -> Result<(), Error> {
1017 4 : let blobs = (0..PAGE_SZ / 8)
1018 4096 : .map(|v| random_array(v * 16))
1019 4 : .collect::<Vec<_>>();
1020 4 : round_trip_test_compressed(&blobs, false).await?;
1021 4 : round_trip_test_compressed(&blobs, true).await?;
1022 4 : Ok(())
1023 4 : }
1024 : }
|