Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::ops::Deref;
20 :
21 : use bytes::Bytes;
22 : use pageserver_api::key::Key;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::BoundedBuf;
25 : use utils::lsn::Lsn;
26 : use utils::vec_map::VecMap;
27 :
28 : use crate::context::RequestContext;
29 : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, Header};
30 : use crate::virtual_file::{self, IoBufferMut, VirtualFile};
31 :
32 : /// Metadata bundled with the start and end offset of a blob.
33 : #[derive(Copy, Clone, Debug)]
34 : pub struct BlobMeta {
35 : pub key: Key,
36 : pub lsn: Lsn,
37 : pub will_init: bool,
38 : }
39 :
40 : /// A view into the vectored blobs read buffer.
41 : #[derive(Clone, Debug)]
42 : pub(crate) enum BufView<'a> {
43 : Slice(&'a [u8]),
44 : Bytes(bytes::Bytes),
45 : }
46 :
47 : impl<'a> BufView<'a> {
48 : /// Creates a new slice-based view on the blob.
49 1615391 : pub fn new_slice(slice: &'a [u8]) -> Self {
50 1615391 : Self::Slice(slice)
51 1615391 : }
52 :
53 : /// Creates a new [`bytes::Bytes`]-based view on the blob.
54 12 : pub fn new_bytes(bytes: bytes::Bytes) -> Self {
55 12 : Self::Bytes(bytes)
56 12 : }
57 :
58 : /// Convert the view into `Bytes`.
59 : ///
60 : /// If using slice as the underlying storage, the copy will be an O(n) operation.
61 10261454 : pub fn into_bytes(self) -> Bytes {
62 10261454 : match self {
63 10261454 : BufView::Slice(slice) => Bytes::copy_from_slice(slice),
64 0 : BufView::Bytes(bytes) => bytes,
65 : }
66 10261454 : }
67 :
68 : /// Creates a sub-view of the blob based on the range.
69 23249138 : fn view(&self, range: std::ops::Range<usize>) -> Self {
70 23249138 : match self {
71 23249138 : BufView::Slice(slice) => BufView::Slice(&slice[range]),
72 0 : BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)),
73 : }
74 23249138 : }
75 : }
76 :
77 : impl Deref for BufView<'_> {
78 : type Target = [u8];
79 :
80 13012752 : fn deref(&self) -> &Self::Target {
81 13012752 : match self {
82 13012740 : BufView::Slice(slice) => slice,
83 12 : BufView::Bytes(bytes) => bytes,
84 : }
85 13012752 : }
86 : }
87 :
88 : impl AsRef<[u8]> for BufView<'_> {
89 0 : fn as_ref(&self) -> &[u8] {
90 0 : match self {
91 0 : BufView::Slice(slice) => slice,
92 0 : BufView::Bytes(bytes) => bytes.as_ref(),
93 : }
94 0 : }
95 : }
96 :
97 : impl<'a> From<&'a [u8]> for BufView<'a> {
98 0 : fn from(value: &'a [u8]) -> Self {
99 0 : Self::new_slice(value)
100 0 : }
101 : }
102 :
103 : impl From<Bytes> for BufView<'_> {
104 0 : fn from(value: Bytes) -> Self {
105 0 : Self::new_bytes(value)
106 0 : }
107 : }
108 :
109 : /// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed,
110 : /// subject to [`VectoredBlob::compression_bits`].
111 : pub struct VectoredBlob {
112 : /// Blob metadata.
113 : pub meta: BlobMeta,
114 : /// Header start offset.
115 : header_start: usize,
116 : /// Data start offset.
117 : data_start: usize,
118 : /// End offset.
119 : end: usize,
120 : /// Compression used on the data, extracted from the header.
121 : compression_bits: u8,
122 : }
123 :
124 : impl VectoredBlob {
125 : /// Reads a decompressed view of the blob.
126 23126162 : pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
127 23126162 : let view = buf.view(self.data_start..self.end);
128 23126162 :
129 23126162 : match self.compression_bits {
130 23126150 : BYTE_UNCOMPRESSED => Ok(view),
131 : BYTE_ZSTD => {
132 12 : let mut decompressed_vec = Vec::new();
133 12 : let mut decoder =
134 12 : async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
135 12 : decoder.write_all(&view).await?;
136 12 : decoder.flush().await?;
137 : // Zero-copy conversion from `Vec` to `Bytes`
138 12 : Ok(BufView::new_bytes(Bytes::from(decompressed_vec)))
139 : }
140 0 : bits => {
141 0 : let error = std::io::Error::new(
142 0 : std::io::ErrorKind::InvalidData,
143 0 : format!(
144 0 : "Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}",
145 0 : self.meta.key, self.meta.lsn, self.data_start, self.end
146 0 : ),
147 0 : );
148 0 : Err(error)
149 : }
150 : }
151 23126162 : }
152 :
153 : /// Returns the raw blob including header.
154 122976 : pub(crate) fn raw_with_header<'a>(&self, buf: &BufView<'a>) -> BufView<'a> {
155 122976 : buf.view(self.header_start..self.end)
156 122976 : }
157 : }
158 :
159 : impl std::fmt::Display for VectoredBlob {
160 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 0 : write!(
162 0 : f,
163 0 : "{}@{}, {}..{}",
164 0 : self.meta.key, self.meta.lsn, self.data_start, self.end
165 0 : )
166 0 : }
167 : }
168 :
169 : /// Return type of [`VectoredBlobReader::read_blobs`]
170 : pub struct VectoredBlobsBuf {
171 : /// Buffer for all blobs in this read
172 : pub buf: IoBufferMut,
173 : /// Offsets into the buffer and metadata for all blobs in this read
174 : pub blobs: Vec<VectoredBlob>,
175 : }
176 :
177 : /// Description of one disk read for multiple blobs.
178 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
179 : #[derive(Debug)]
180 : pub struct VectoredRead {
181 : pub start: u64,
182 : pub end: u64,
183 : /// Start offset and metadata for each blob in this read
184 : pub blobs_at: VecMap<u64, BlobMeta>,
185 : }
186 :
187 : impl VectoredRead {
188 7334427 : pub(crate) fn size(&self) -> usize {
189 7334427 : (self.end - self.start) as usize
190 7334427 : }
191 : }
192 :
193 : #[derive(Eq, PartialEq, Debug)]
194 : pub(crate) enum VectoredReadExtended {
195 : Yes,
196 : No,
197 : }
198 :
199 : /// A vectored read builder that tries to coalesce all reads that fits in a chunk.
200 : pub(crate) struct ChunkedVectoredReadBuilder {
201 : /// Start block number
202 : start_blk_no: usize,
203 : /// End block number (exclusive).
204 : end_blk_no: usize,
205 : /// Start offset and metadata for each blob in this read
206 : blobs_at: VecMap<u64, BlobMeta>,
207 : max_read_size: Option<usize>,
208 : }
209 :
210 : impl ChunkedVectoredReadBuilder {
211 : const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
212 : /// Start building a new vectored read.
213 : ///
214 : /// Note that by design, this does not check against reading more than `max_read_size` to
215 : /// support reading larger blobs than the configuration value. The builder will be single use
216 : /// however after that.
217 1615847 : fn new_impl(
218 1615847 : start_offset: u64,
219 1615847 : end_offset: u64,
220 1615847 : meta: BlobMeta,
221 1615847 : max_read_size: Option<usize>,
222 1615847 : ) -> Self {
223 1615847 : let mut blobs_at = VecMap::default();
224 1615847 : blobs_at
225 1615847 : .append(start_offset, meta)
226 1615847 : .expect("First insertion always succeeds");
227 1615847 :
228 1615847 : let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
229 1615847 : let end_blk_no = (end_offset as usize).div_ceil(Self::CHUNK_SIZE);
230 1615847 : Self {
231 1615847 : start_blk_no,
232 1615847 : end_blk_no,
233 1615847 : blobs_at,
234 1615847 : max_read_size,
235 1615847 : }
236 1615847 : }
237 :
238 1372475 : pub(crate) fn new(
239 1372475 : start_offset: u64,
240 1372475 : end_offset: u64,
241 1372475 : meta: BlobMeta,
242 1372475 : max_read_size: usize,
243 1372475 : ) -> Self {
244 1372475 : Self::new_impl(start_offset, end_offset, meta, Some(max_read_size))
245 1372475 : }
246 :
247 243372 : pub(crate) fn new_streaming(start_offset: u64, end_offset: u64, meta: BlobMeta) -> Self {
248 243372 : Self::new_impl(start_offset, end_offset, meta, None)
249 243372 : }
250 :
251 : /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
252 : ///
253 : /// The resulting size also must be below the max read size.
254 21878131 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
255 21878131 : tracing::trace!(start, end, "trying to extend");
256 21878131 : let start_blk_no = start as usize / Self::CHUNK_SIZE;
257 21878131 : let end_blk_no = (end as usize).div_ceil(Self::CHUNK_SIZE);
258 :
259 21878131 : let not_limited_by_max_read_size = {
260 21878131 : if let Some(max_read_size) = self.max_read_size {
261 9353239 : let coalesced_size = (end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE;
262 9353239 : coalesced_size <= max_read_size
263 : } else {
264 12524892 : true
265 : }
266 : };
267 :
268 : // True if the second block starts in the same block or the immediate next block where the first block ended.
269 : //
270 : // Note: This automatically handles the case where two blocks are adjacent to each other,
271 : // whether they starts on chunk size boundary or not.
272 21878131 : let is_adjacent_chunk_read = {
273 : // 1. first.end & second.start are in the same block
274 21878131 : self.end_blk_no == start_blk_no + 1 ||
275 : // 2. first.end ends one block before second.start
276 262224 : self.end_blk_no == start_blk_no
277 : };
278 :
279 21878131 : if is_adjacent_chunk_read && not_limited_by_max_read_size {
280 21609291 : self.end_blk_no = end_blk_no;
281 21609291 : self.blobs_at
282 21609291 : .append(start, meta)
283 21609291 : .expect("LSNs are ordered within vectored reads");
284 21609291 :
285 21609291 : return VectoredReadExtended::Yes;
286 268840 : }
287 268840 :
288 268840 : VectoredReadExtended::No
289 21878131 : }
290 :
291 12764328 : pub(crate) fn size(&self) -> usize {
292 12764328 : (self.end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE
293 12764328 : }
294 :
295 1615847 : pub(crate) fn build(self) -> VectoredRead {
296 1615847 : let start = (self.start_blk_no * Self::CHUNK_SIZE) as u64;
297 1615847 : let end = (self.end_blk_no * Self::CHUNK_SIZE) as u64;
298 1615847 : VectoredRead {
299 1615847 : start,
300 1615847 : end,
301 1615847 : blobs_at: self.blobs_at,
302 1615847 : }
303 1615847 : }
304 : }
305 :
306 : #[derive(Copy, Clone, Debug)]
307 : pub enum BlobFlag {
308 : None,
309 : Ignore,
310 : ReplaceAll,
311 : }
312 :
313 : /// Planner for vectored blob reads.
314 : ///
315 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
316 : /// and coalesced into disk reads.
317 : ///
318 : /// The implementation is very simple:
319 : /// * Collect all blob offsets in an ordered structure
320 : /// * Iterate over the collected blobs and coalesce them into reads at the end
321 : pub struct VectoredReadPlanner {
322 : // Track all the blob offsets. Start offsets must be ordered.
323 : // Values in the value tuples are:
324 : // (
325 : // lsn of the blob,
326 : // start offset of the blob in the underlying file,
327 : // end offset of the blob in the underlying file,
328 : // whether the blob initializes the page image or not
329 : // see [`pageserver_api::record::NeonWalRecord::will_init`]
330 : // )
331 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
332 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
333 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
334 :
335 : max_read_size: usize,
336 : }
337 :
338 : impl VectoredReadPlanner {
339 1600783 : pub fn new(max_read_size: usize) -> Self {
340 1600783 : Self {
341 1600783 : blobs: BTreeMap::new(),
342 1600783 : prev: None,
343 1600783 : max_read_size,
344 1600783 : }
345 1600783 : }
346 :
347 : /// Include a new blob in the read plan.
348 : ///
349 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
350 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
351 : /// keys in a given keyspace. This function must be called for each key in the desired
352 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
353 : /// be called after every range in the offset.
354 : ///
355 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
356 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
357 : /// incorrect data to the user.
358 : ///
359 : /// The `flag` argument has two interesting values:
360 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
361 : /// This is used for WAL records that `will_init`.
362 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
363 : /// if the blob is cached.
364 20999994 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
365 : // Implementation note: internally lag behind by one blob such that
366 : // we have a start and end offset when initialising [`VectoredRead`]
367 20999994 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
368 : None => {
369 1197019 : self.prev = Some((key, lsn, offset, flag));
370 1197019 : return;
371 : }
372 19802975 : Some(prev) => prev,
373 19802975 : };
374 19802975 :
375 19802975 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
376 19802975 :
377 19802975 : self.prev = Some((key, lsn, offset, flag));
378 20999994 : }
379 :
380 1719091 : pub fn handle_range_end(&mut self, offset: u64) {
381 1719091 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
382 1197019 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
383 1197019 : }
384 :
385 1719091 : self.prev = None;
386 1719091 : }
387 :
388 20999994 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
389 20999994 : match flag {
390 13551168 : BlobFlag::None => {
391 13551168 : let blobs_for_key = self.blobs.entry(key).or_default();
392 13551168 : blobs_for_key.push((lsn, start_offset, end_offset, false));
393 13551168 : }
394 2053118 : BlobFlag::ReplaceAll => {
395 2053118 : let blobs_for_key = self.blobs.entry(key).or_default();
396 2053118 : blobs_for_key.clear();
397 2053118 : blobs_for_key.push((lsn, start_offset, end_offset, true));
398 2053118 : }
399 5395708 : BlobFlag::Ignore => {}
400 : }
401 20999994 : }
402 :
403 1600783 : pub fn finish(self) -> Vec<VectoredRead> {
404 1600783 : let mut current_read_builder: Option<ChunkedVectoredReadBuilder> = None;
405 1600783 : let mut reads = Vec::new();
406 :
407 3639801 : for (key, blobs_for_key) in self.blobs {
408 12471028 : for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
409 10432010 : let extended = match &mut current_read_builder {
410 9353107 : Some(read_builder) => read_builder.extend(
411 9353107 : start_offset,
412 9353107 : end_offset,
413 9353107 : BlobMeta {
414 9353107 : key,
415 9353107 : lsn,
416 9353107 : will_init,
417 9353107 : },
418 9353107 : ),
419 1078903 : None => VectoredReadExtended::No,
420 : };
421 :
422 10432010 : if extended == VectoredReadExtended::No {
423 1347707 : let next_read_builder = ChunkedVectoredReadBuilder::new(
424 1347707 : start_offset,
425 1347707 : end_offset,
426 1347707 : BlobMeta {
427 1347707 : key,
428 1347707 : lsn,
429 1347707 : will_init,
430 1347707 : },
431 1347707 : self.max_read_size,
432 1347707 : );
433 1347707 :
434 1347707 : let prev_read_builder = current_read_builder.replace(next_read_builder);
435 :
436 : // `current_read_builder` is None in the first iteration of the outer loop
437 1347707 : if let Some(read_builder) = prev_read_builder {
438 268804 : reads.push(read_builder.build());
439 1078903 : }
440 9084303 : }
441 : }
442 : }
443 :
444 1600783 : if let Some(read_builder) = current_read_builder {
445 1078903 : reads.push(read_builder.build());
446 1078903 : }
447 :
448 1600783 : reads
449 1600783 : }
450 : }
451 :
452 : /// Disk reader for vectored blob spans (does not go through the page cache)
453 : pub struct VectoredBlobReader<'a> {
454 : file: &'a VirtualFile,
455 : }
456 :
457 : impl<'a> VectoredBlobReader<'a> {
458 1473623 : pub fn new(file: &'a VirtualFile) -> Self {
459 1473623 : Self { file }
460 1473623 : }
461 :
462 : /// Read the requested blobs into the buffer.
463 : ///
464 : /// We have to deal with the fact that blobs are not fixed size.
465 : /// Each blob is prefixed by a size header.
466 : ///
467 : /// The success return value is a struct which contains the buffer
468 : /// filled from disk and a list of offsets at which each blob lies
469 : /// in the buffer.
470 1615391 : pub async fn read_blobs(
471 1615391 : &self,
472 1615391 : read: &VectoredRead,
473 1615391 : buf: IoBufferMut,
474 1615391 : ctx: &RequestContext,
475 1615391 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
476 1615391 : assert!(read.size() > 0);
477 1615391 : assert!(
478 1615391 : read.size() <= buf.capacity(),
479 0 : "{} > {}",
480 0 : read.size(),
481 0 : buf.capacity()
482 : );
483 :
484 1615391 : if cfg!(debug_assertions) {
485 1615391 : const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
486 1615391 : debug_assert_eq!(
487 1615391 : read.start % ALIGN,
488 : 0,
489 0 : "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
490 : read.start,
491 : ALIGN
492 : );
493 0 : }
494 :
495 1615391 : let buf = self
496 1615391 : .file
497 1615391 : .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
498 1615391 : .await?
499 1615391 : .into_inner();
500 1615391 :
501 1615391 : let blobs_at = read.blobs_at.as_slice();
502 1615391 :
503 1615391 : let mut blobs = Vec::with_capacity(blobs_at.len());
504 : // Blobs in `read` only provide their starting offset. The end offset
505 : // of a blob is implicit: the start of the next blob if one exists
506 : // or the end of the read.
507 :
508 23224466 : for (blob_start, meta) in blobs_at.iter().copied() {
509 23224466 : let header_start = (blob_start - read.start) as usize;
510 23224466 : let header = Header::decode(&buf[header_start..]).map_err(|anyhow_err| {
511 0 : std::io::Error::new(std::io::ErrorKind::InvalidData, anyhow_err)
512 23224466 : })?;
513 23224466 : let data_start = header_start + header.header_len;
514 23224466 : let end = data_start + header.data_len;
515 23224466 : let compression_bits = header.compression_bits;
516 23224466 :
517 23224466 : blobs.push(VectoredBlob {
518 23224466 : header_start,
519 23224466 : data_start,
520 23224466 : end,
521 23224466 : meta,
522 23224466 : compression_bits,
523 23224466 : });
524 : }
525 :
526 1615391 : Ok(VectoredBlobsBuf { buf, blobs })
527 1615391 : }
528 : }
529 :
530 : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
531 : ///
532 : /// It provides a streaming API for getting read blobs. It returns a batch when
533 : /// `handle` gets called and when the current key would just exceed the read_size and
534 : /// max_cnt constraints.
535 : pub struct StreamingVectoredReadPlanner {
536 : read_builder: Option<ChunkedVectoredReadBuilder>,
537 : // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
538 : prev: Option<(Key, Lsn, u64, bool)>,
539 : /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
540 : /// we will produce a single batch instead of split them.
541 : max_read_size: u64,
542 : /// Max item count per batch
543 : max_cnt: usize,
544 : /// Size of the current batch
545 : cnt: usize,
546 : }
547 :
548 : impl StreamingVectoredReadPlanner {
549 4956 : pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
550 4956 : assert!(max_cnt > 0);
551 4956 : assert!(max_read_size > 0);
552 4956 : Self {
553 4956 : read_builder: None,
554 4956 : prev: None,
555 4956 : max_cnt,
556 4956 : max_read_size,
557 4956 : cnt: 0,
558 4956 : }
559 4956 : }
560 :
561 12768600 : pub fn handle(
562 12768600 : &mut self,
563 12768600 : key: Key,
564 12768600 : lsn: Lsn,
565 12768600 : offset: u64,
566 12768600 : will_init: bool,
567 12768600 : ) -> Option<VectoredRead> {
568 : // Implementation note: internally lag behind by one blob such that
569 : // we have a start and end offset when initialising [`VectoredRead`]
570 12768600 : let (prev_key, prev_lsn, prev_offset, prev_will_init) = match self.prev {
571 : None => {
572 4272 : self.prev = Some((key, lsn, offset, will_init));
573 4272 : return None;
574 : }
575 12764328 : Some(prev) => prev,
576 12764328 : };
577 12764328 :
578 12764328 : let res = self.add_blob(
579 12764328 : prev_key,
580 12764328 : prev_lsn,
581 12764328 : prev_offset,
582 12764328 : offset,
583 12764328 : false,
584 12764328 : prev_will_init,
585 12764328 : );
586 12764328 :
587 12764328 : self.prev = Some((key, lsn, offset, will_init));
588 12764328 :
589 12764328 : res
590 12768600 : }
591 :
592 3948 : pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
593 3948 : let res = if let Some((prev_key, prev_lsn, prev_offset, prev_will_init)) = self.prev {
594 3936 : self.add_blob(
595 3936 : prev_key,
596 3936 : prev_lsn,
597 3936 : prev_offset,
598 3936 : offset,
599 3936 : true,
600 3936 : prev_will_init,
601 3936 : )
602 : } else {
603 12 : None
604 : };
605 :
606 3948 : self.prev = None;
607 3948 :
608 3948 : res
609 3948 : }
610 :
611 12768264 : fn add_blob(
612 12768264 : &mut self,
613 12768264 : key: Key,
614 12768264 : lsn: Lsn,
615 12768264 : start_offset: u64,
616 12768264 : end_offset: u64,
617 12768264 : is_last_blob_in_read: bool,
618 12768264 : will_init: bool,
619 12768264 : ) -> Option<VectoredRead> {
620 12768264 : match &mut self.read_builder {
621 12524892 : Some(read_builder) => {
622 12524892 : let extended = read_builder.extend(
623 12524892 : start_offset,
624 12524892 : end_offset,
625 12524892 : BlobMeta {
626 12524892 : key,
627 12524892 : lsn,
628 12524892 : will_init,
629 12524892 : },
630 12524892 : );
631 12524892 : assert_eq!(extended, VectoredReadExtended::Yes);
632 : }
633 243372 : None => {
634 243372 : self.read_builder = {
635 243372 : Some(ChunkedVectoredReadBuilder::new_streaming(
636 243372 : start_offset,
637 243372 : end_offset,
638 243372 : BlobMeta {
639 243372 : key,
640 243372 : lsn,
641 243372 : will_init,
642 243372 : },
643 243372 : ))
644 243372 : };
645 243372 : }
646 : }
647 12768264 : let read_builder = self.read_builder.as_mut().unwrap();
648 12768264 : self.cnt += 1;
649 12768264 : if is_last_blob_in_read
650 12764328 : || read_builder.size() >= self.max_read_size as usize
651 12591264 : || self.cnt >= self.max_cnt
652 : {
653 243372 : let prev_read_builder = self.read_builder.take();
654 243372 : self.cnt = 0;
655 :
656 : // `current_read_builder` is None in the first iteration
657 243372 : if let Some(read_builder) = prev_read_builder {
658 243372 : return Some(read_builder.build());
659 0 : }
660 12524892 : }
661 12524892 : None
662 12768264 : }
663 : }
664 :
665 : #[cfg(test)]
666 : mod tests {
667 :
668 : use super::super::blob_io::tests::{random_array, write_maybe_compressed};
669 : use super::*;
670 : use crate::context::DownloadBehavior;
671 : use crate::page_cache::PAGE_SZ;
672 : use crate::task_mgr::TaskKind;
673 :
674 288 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
675 : const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
676 288 : assert_eq!(read.start % ALIGN, 0);
677 288 : assert_eq!(read.start / ALIGN, offset_range.first().unwrap().2 / ALIGN);
678 :
679 504 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
680 288 :
681 288 : let offsets_in_read: Vec<_> = read
682 288 : .blobs_at
683 288 : .as_slice()
684 288 : .iter()
685 504 : .map(|(offset, _)| *offset)
686 288 : .collect();
687 288 :
688 288 : assert_eq!(expected_offsets_in_read, offsets_in_read);
689 288 : }
690 :
691 : #[test]
692 12 : fn planner_chunked_coalesce_all_test() {
693 : use crate::virtual_file;
694 :
695 : const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
696 :
697 12 : let max_read_size = CHUNK_SIZE as usize * 8;
698 12 : let key = Key::MIN;
699 12 : let lsn = Lsn(0);
700 12 :
701 12 : let blob_descriptions = [
702 12 : (key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
703 12 : (key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
704 12 : (key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
705 12 : (key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
706 12 : (key, lsn, CHUNK_SIZE, BlobFlag::None),
707 12 : (key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
708 12 : (key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
709 12 : (key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
710 12 : (key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
711 12 : (key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
712 12 : (key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
713 12 : (key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
714 12 : (key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
715 12 : (key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
716 12 : ];
717 12 :
718 12 : let ranges = [
719 12 : &[
720 12 : blob_descriptions[0],
721 12 : blob_descriptions[2],
722 12 : blob_descriptions[4],
723 12 : blob_descriptions[5],
724 12 : blob_descriptions[7],
725 12 : blob_descriptions[8],
726 12 : blob_descriptions[10],
727 12 : ],
728 12 : &blob_descriptions[11..12],
729 12 : &blob_descriptions[13..],
730 12 : ];
731 12 :
732 12 : let mut planner = VectoredReadPlanner::new(max_read_size);
733 180 : for (key, lsn, offset, flag) in blob_descriptions {
734 168 : planner.handle(key, lsn, offset, flag);
735 168 : }
736 :
737 12 : planner.handle_range_end(652 * 1024);
738 12 :
739 12 : let reads = planner.finish();
740 12 :
741 12 : assert_eq!(reads.len(), ranges.len());
742 :
743 36 : for (idx, read) in reads.iter().enumerate() {
744 36 : validate_read(read, ranges[idx]);
745 36 : }
746 12 : }
747 :
748 : #[test]
749 12 : fn planner_max_read_size_test() {
750 12 : let max_read_size = 128 * 1024;
751 12 : let key = Key::MIN;
752 12 : let lsn = Lsn(0);
753 12 :
754 12 : let blob_descriptions = vec![
755 12 : (key, lsn, 0, BlobFlag::None),
756 12 : (key, lsn, 32 * 1024, BlobFlag::None),
757 12 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
758 12 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
759 12 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
760 12 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
761 12 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
762 12 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
763 12 : ];
764 12 :
765 12 : let ranges = [
766 12 : &blob_descriptions[0..3],
767 12 : &blob_descriptions[3..4],
768 12 : &blob_descriptions[4..5],
769 12 : &blob_descriptions[5..6],
770 12 : &blob_descriptions[6..7],
771 12 : &blob_descriptions[7..],
772 12 : ];
773 12 :
774 12 : let mut planner = VectoredReadPlanner::new(max_read_size);
775 96 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
776 96 : planner.handle(key, lsn, offset, flag);
777 96 : }
778 :
779 12 : planner.handle_range_end(652 * 1024);
780 12 :
781 12 : let reads = planner.finish();
782 12 :
783 12 : assert_eq!(reads.len(), 6);
784 :
785 : // TODO: could remove zero reads to produce 5 reads here
786 :
787 72 : for (idx, read) in reads.iter().enumerate() {
788 72 : validate_read(read, ranges[idx]);
789 72 : }
790 12 : }
791 :
792 : #[test]
793 12 : fn planner_replacement_test() {
794 : const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
795 12 : let max_read_size = 128 * CHUNK_SIZE as usize;
796 12 : let first_key = Key::MIN;
797 12 : let second_key = first_key.next();
798 12 : let lsn = Lsn(0);
799 12 :
800 12 : let blob_descriptions = vec![
801 12 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
802 12 : (first_key, lsn, CHUNK_SIZE, BlobFlag::None), // Last in read 1
803 12 : (second_key, lsn, 2 * CHUNK_SIZE, BlobFlag::ReplaceAll),
804 12 : (second_key, lsn, 3 * CHUNK_SIZE, BlobFlag::None),
805 12 : (second_key, lsn, 4 * CHUNK_SIZE, BlobFlag::ReplaceAll), // First in read 2
806 12 : (second_key, lsn, 5 * CHUNK_SIZE, BlobFlag::None), // Last in read 2
807 12 : ];
808 12 :
809 12 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
810 12 :
811 12 : let mut planner = VectoredReadPlanner::new(max_read_size);
812 72 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
813 72 : planner.handle(key, lsn, offset, flag);
814 72 : }
815 :
816 12 : planner.handle_range_end(6 * CHUNK_SIZE);
817 12 :
818 12 : let reads = planner.finish();
819 12 : assert_eq!(reads.len(), 2);
820 :
821 24 : for (idx, read) in reads.iter().enumerate() {
822 24 : validate_read(read, ranges[idx]);
823 24 : }
824 12 : }
825 :
826 : #[test]
827 12 : fn streaming_planner_max_read_size_test() {
828 12 : let max_read_size = 128 * 1024;
829 12 : let key = Key::MIN;
830 12 : let lsn = Lsn(0);
831 12 :
832 12 : let blob_descriptions = vec![
833 12 : (key, lsn, 0, BlobFlag::None),
834 12 : (key, lsn, 32 * 1024, BlobFlag::None),
835 12 : (key, lsn, 96 * 1024, BlobFlag::None),
836 12 : (key, lsn, 128 * 1024, BlobFlag::None),
837 12 : (key, lsn, 198 * 1024, BlobFlag::None),
838 12 : (key, lsn, 268 * 1024, BlobFlag::None),
839 12 : (key, lsn, 396 * 1024, BlobFlag::None),
840 12 : (key, lsn, 652 * 1024, BlobFlag::None),
841 12 : ];
842 12 :
843 12 : let ranges = [
844 12 : &blob_descriptions[0..3],
845 12 : &blob_descriptions[3..5],
846 12 : &blob_descriptions[5..6],
847 12 : &blob_descriptions[6..7],
848 12 : &blob_descriptions[7..],
849 12 : ];
850 12 :
851 12 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
852 12 : let mut reads = Vec::new();
853 96 : for (key, lsn, offset, _) in blob_descriptions.clone() {
854 96 : reads.extend(planner.handle(key, lsn, offset, false));
855 96 : }
856 12 : reads.extend(planner.handle_range_end(652 * 1024));
857 12 :
858 12 : assert_eq!(reads.len(), ranges.len());
859 :
860 60 : for (idx, read) in reads.iter().enumerate() {
861 60 : validate_read(read, ranges[idx]);
862 60 : }
863 12 : }
864 :
865 : #[test]
866 12 : fn streaming_planner_max_cnt_test() {
867 12 : let max_read_size = 1024 * 1024;
868 12 : let key = Key::MIN;
869 12 : let lsn = Lsn(0);
870 12 :
871 12 : let blob_descriptions = vec![
872 12 : (key, lsn, 0, BlobFlag::None),
873 12 : (key, lsn, 32 * 1024, BlobFlag::None),
874 12 : (key, lsn, 96 * 1024, BlobFlag::None),
875 12 : (key, lsn, 128 * 1024, BlobFlag::None),
876 12 : (key, lsn, 198 * 1024, BlobFlag::None),
877 12 : (key, lsn, 268 * 1024, BlobFlag::None),
878 12 : (key, lsn, 396 * 1024, BlobFlag::None),
879 12 : (key, lsn, 652 * 1024, BlobFlag::None),
880 12 : ];
881 12 :
882 12 : let ranges = [
883 12 : &blob_descriptions[0..2],
884 12 : &blob_descriptions[2..4],
885 12 : &blob_descriptions[4..6],
886 12 : &blob_descriptions[6..],
887 12 : ];
888 12 :
889 12 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
890 12 : let mut reads = Vec::new();
891 96 : for (key, lsn, offset, _) in blob_descriptions.clone() {
892 96 : reads.extend(planner.handle(key, lsn, offset, false));
893 96 : }
894 12 : reads.extend(planner.handle_range_end(652 * 1024));
895 12 :
896 12 : assert_eq!(reads.len(), ranges.len());
897 :
898 48 : for (idx, read) in reads.iter().enumerate() {
899 48 : validate_read(read, ranges[idx]);
900 48 : }
901 12 : }
902 :
903 : #[test]
904 12 : fn streaming_planner_edge_test() {
905 12 : let max_read_size = 1024 * 1024;
906 12 : let key = Key::MIN;
907 12 : let lsn = Lsn(0);
908 12 : {
909 12 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
910 12 : let mut reads = Vec::new();
911 12 : reads.extend(planner.handle_range_end(652 * 1024));
912 12 : assert!(reads.is_empty());
913 : }
914 : {
915 12 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
916 12 : let mut reads = Vec::new();
917 12 : reads.extend(planner.handle(key, lsn, 0, false));
918 12 : reads.extend(planner.handle_range_end(652 * 1024));
919 12 : assert_eq!(reads.len(), 1);
920 12 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
921 12 : }
922 12 : {
923 12 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
924 12 : let mut reads = Vec::new();
925 12 : reads.extend(planner.handle(key, lsn, 0, false));
926 12 : reads.extend(planner.handle(key, lsn, 128 * 1024, false));
927 12 : reads.extend(planner.handle_range_end(652 * 1024));
928 12 : assert_eq!(reads.len(), 2);
929 12 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
930 12 : validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
931 12 : }
932 12 : {
933 12 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
934 12 : let mut reads = Vec::new();
935 12 : reads.extend(planner.handle(key, lsn, 0, false));
936 12 : reads.extend(planner.handle(key, lsn, 128 * 1024, false));
937 12 : reads.extend(planner.handle_range_end(652 * 1024));
938 12 : assert_eq!(reads.len(), 1);
939 12 : validate_read(
940 12 : &reads[0],
941 12 : &[
942 12 : (key, lsn, 0, BlobFlag::None),
943 12 : (key, lsn, 128 * 1024, BlobFlag::None),
944 12 : ],
945 12 : );
946 12 : }
947 12 : }
948 :
949 48 : async fn round_trip_test_compressed(
950 48 : blobs: &[Vec<u8>],
951 48 : compression: bool,
952 48 : ) -> anyhow::Result<()> {
953 48 : let ctx =
954 48 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
955 48 : let (_temp_dir, pathbuf, offsets) =
956 48 : write_maybe_compressed(blobs, compression, &ctx).await?;
957 :
958 48 : let file = VirtualFile::open_v2(&pathbuf, &ctx).await?;
959 48 : let file_len = std::fs::metadata(&pathbuf)?.len();
960 48 :
961 48 : // Multiply by two (compressed data might need more space), and add a few bytes for the header
962 24720 : let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
963 48 : let mut buf = IoBufferMut::with_capacity(reserved_bytes);
964 48 :
965 48 : let vectored_blob_reader = VectoredBlobReader::new(&file);
966 48 : let meta = BlobMeta {
967 48 : key: Key::MIN,
968 48 : lsn: Lsn(0),
969 48 : will_init: false,
970 48 : };
971 :
972 24720 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
973 24720 : let end = offsets.get(idx + 1).unwrap_or(&file_len);
974 24720 : if idx + 1 == offsets.len() {
975 48 : continue;
976 24672 : }
977 24672 : let read_builder = ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
978 24672 : let read = read_builder.build();
979 24672 : let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
980 24672 : assert_eq!(result.blobs.len(), 1);
981 24672 : let read_blob = &result.blobs[0];
982 24672 : let view = BufView::new_slice(&result.buf);
983 24672 : let read_buf = read_blob.read(&view).await?;
984 24672 : assert_eq!(
985 24672 : &blob[..],
986 24672 : &read_buf[..],
987 0 : "mismatch for idx={idx} at offset={offset}"
988 : );
989 :
990 : // Check that raw_with_header returns a valid header.
991 24672 : let raw = read_blob.raw_with_header(&view);
992 24672 : let header = Header::decode(&raw)?;
993 24672 : if !compression || header.header_len == 1 {
994 12456 : assert_eq!(header.compression_bits, BYTE_UNCOMPRESSED);
995 12216 : }
996 24672 : assert_eq!(raw.len(), header.total_len());
997 :
998 24672 : buf = result.buf;
999 : }
1000 48 : Ok(())
1001 48 : }
1002 :
1003 : #[tokio::test]
1004 12 : async fn test_really_big_array() -> anyhow::Result<()> {
1005 12 : let blobs = &[
1006 12 : b"test".to_vec(),
1007 12 : random_array(10 * PAGE_SZ),
1008 12 : b"hello".to_vec(),
1009 12 : random_array(66 * PAGE_SZ),
1010 12 : vec![0xf3; 24 * PAGE_SZ],
1011 12 : b"foobar".to_vec(),
1012 12 : ];
1013 12 : round_trip_test_compressed(blobs, false).await?;
1014 12 : round_trip_test_compressed(blobs, true).await?;
1015 12 : Ok(())
1016 12 : }
1017 :
1018 : #[tokio::test]
1019 12 : async fn test_arrays_inc() -> anyhow::Result<()> {
1020 12 : let blobs = (0..PAGE_SZ / 8)
1021 12288 : .map(|v| random_array(v * 16))
1022 12 : .collect::<Vec<_>>();
1023 12 : round_trip_test_compressed(&blobs, false).await?;
1024 12 : round_trip_test_compressed(&blobs, true).await?;
1025 12 : Ok(())
1026 12 : }
1027 : }
|