Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::ops::Deref;
20 :
21 : use bytes::Bytes;
22 : use pageserver_api::key::Key;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::BoundedBuf;
25 : use utils::lsn::Lsn;
26 : use utils::vec_map::VecMap;
27 :
28 : use crate::context::RequestContext;
29 : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
30 : use crate::virtual_file::IoBufferMut;
31 : use crate::virtual_file::{self, VirtualFile};
32 :
33 : /// Metadata bundled with the start and end offset of a blob.
34 : #[derive(Copy, Clone, Debug)]
35 : pub struct BlobMeta {
36 : pub key: Key,
37 : pub lsn: Lsn,
38 : pub will_init: bool,
39 : }
40 :
41 : /// A view into the vectored blobs read buffer.
42 : #[derive(Clone, Debug)]
43 : pub(crate) enum BufView<'a> {
44 : Slice(&'a [u8]),
45 : Bytes(bytes::Bytes),
46 : }
47 :
48 : impl<'a> BufView<'a> {
49 : /// Creates a new slice-based view on the blob.
50 441261 : pub fn new_slice(slice: &'a [u8]) -> Self {
51 441261 : Self::Slice(slice)
52 441261 : }
53 :
54 : /// Creates a new [`bytes::Bytes`]-based view on the blob.
55 4 : pub fn new_bytes(bytes: bytes::Bytes) -> Self {
56 4 : Self::Bytes(bytes)
57 4 : }
58 :
59 : /// Convert the view into `Bytes`.
60 : ///
61 : /// If using slice as the underlying storage, the copy will be an O(n) operation.
62 1066050 : pub fn into_bytes(self) -> Bytes {
63 1066050 : match self {
64 1066050 : BufView::Slice(slice) => Bytes::copy_from_slice(slice),
65 0 : BufView::Bytes(bytes) => bytes,
66 : }
67 1066050 : }
68 :
69 : /// Creates a sub-view of the blob based on the range.
70 5387042 : fn view(&self, range: std::ops::Range<usize>) -> Self {
71 5387042 : match self {
72 5387042 : BufView::Slice(slice) => BufView::Slice(&slice[range]),
73 0 : BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)),
74 : }
75 5387042 : }
76 : }
77 :
78 : impl Deref for BufView<'_> {
79 : type Target = [u8];
80 :
81 4321124 : fn deref(&self) -> &Self::Target {
82 4321124 : match self {
83 4321120 : BufView::Slice(slice) => slice,
84 4 : BufView::Bytes(bytes) => bytes,
85 : }
86 4321124 : }
87 : }
88 :
89 : impl AsRef<[u8]> for BufView<'_> {
90 0 : fn as_ref(&self) -> &[u8] {
91 0 : match self {
92 0 : BufView::Slice(slice) => slice,
93 0 : BufView::Bytes(bytes) => bytes.as_ref(),
94 : }
95 0 : }
96 : }
97 :
98 : impl<'a> From<&'a [u8]> for BufView<'a> {
99 0 : fn from(value: &'a [u8]) -> Self {
100 0 : Self::new_slice(value)
101 0 : }
102 : }
103 :
104 : impl From<Bytes> for BufView<'_> {
105 0 : fn from(value: Bytes) -> Self {
106 0 : Self::new_bytes(value)
107 0 : }
108 : }
109 :
110 : /// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed,
111 : /// subject to [`VectoredBlob::compression_bits`].
112 : pub struct VectoredBlob {
113 : /// Blob metadata.
114 : pub meta: BlobMeta,
115 : /// Start offset.
116 : start: usize,
117 : /// End offset.
118 : end: usize,
119 : /// Compression used on the the blob.
120 : compression_bits: u8,
121 : }
122 :
123 : impl VectoredBlob {
124 : /// Reads a decompressed view of the blob.
125 5387042 : pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
126 5387042 : let view = buf.view(self.start..self.end);
127 5387042 :
128 5387042 : match self.compression_bits {
129 5387038 : BYTE_UNCOMPRESSED => Ok(view),
130 : BYTE_ZSTD => {
131 4 : let mut decompressed_vec = Vec::new();
132 4 : let mut decoder =
133 4 : async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
134 4 : decoder.write_all(&view).await?;
135 4 : decoder.flush().await?;
136 : // Zero-copy conversion from `Vec` to `Bytes`
137 4 : Ok(BufView::new_bytes(Bytes::from(decompressed_vec)))
138 : }
139 0 : bits => {
140 0 : let error = std::io::Error::new(
141 0 : std::io::ErrorKind::InvalidData,
142 0 : format!("Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}", self.meta.key, self.meta.lsn, self.start, self.end),
143 0 : );
144 0 : Err(error)
145 : }
146 : }
147 5387042 : }
148 : }
149 :
150 : impl std::fmt::Display for VectoredBlob {
151 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 0 : write!(
153 0 : f,
154 0 : "{}@{}, {}..{}",
155 0 : self.meta.key, self.meta.lsn, self.start, self.end
156 0 : )
157 0 : }
158 : }
159 :
160 : /// Return type of [`VectoredBlobReader::read_blobs`]
161 : pub struct VectoredBlobsBuf {
162 : /// Buffer for all blobs in this read
163 : pub buf: IoBufferMut,
164 : /// Offsets into the buffer and metadata for all blobs in this read
165 : pub blobs: Vec<VectoredBlob>,
166 : }
167 :
168 : /// Description of one disk read for multiple blobs.
169 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
170 : #[derive(Debug)]
171 : pub struct VectoredRead {
172 : pub start: u64,
173 : pub end: u64,
174 : /// Start offset and metadata for each blob in this read
175 : pub blobs_at: VecMap<u64, BlobMeta>,
176 : }
177 :
178 : impl VectoredRead {
179 2018615 : pub(crate) fn size(&self) -> usize {
180 2018615 : (self.end - self.start) as usize
181 2018615 : }
182 : }
183 :
184 : #[derive(Eq, PartialEq, Debug)]
185 : pub(crate) enum VectoredReadExtended {
186 : Yes,
187 : No,
188 : }
189 :
190 : /// A vectored read builder that tries to coalesce all reads that fits in a chunk.
191 : pub(crate) struct ChunkedVectoredReadBuilder {
192 : /// Start block number
193 : start_blk_no: usize,
194 : /// End block number (exclusive).
195 : end_blk_no: usize,
196 : /// Start offset and metadata for each blob in this read
197 : blobs_at: VecMap<u64, BlobMeta>,
198 : max_read_size: Option<usize>,
199 : }
200 :
201 : impl ChunkedVectoredReadBuilder {
202 : const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
203 : /// Start building a new vectored read.
204 : ///
205 : /// Note that by design, this does not check against reading more than `max_read_size` to
206 : /// support reading larger blobs than the configuration value. The builder will be single use
207 : /// however after that.
208 441413 : fn new_impl(
209 441413 : start_offset: u64,
210 441413 : end_offset: u64,
211 441413 : meta: BlobMeta,
212 441413 : max_read_size: Option<usize>,
213 441413 : ) -> Self {
214 441413 : let mut blobs_at = VecMap::default();
215 441413 : blobs_at
216 441413 : .append(start_offset, meta)
217 441413 : .expect("First insertion always succeeds");
218 441413 :
219 441413 : let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
220 441413 : let end_blk_no = (end_offset as usize).div_ceil(Self::CHUNK_SIZE);
221 441413 : Self {
222 441413 : start_blk_no,
223 441413 : end_blk_no,
224 441413 : blobs_at,
225 441413 : max_read_size,
226 441413 : }
227 441413 : }
228 :
229 360301 : pub(crate) fn new(
230 360301 : start_offset: u64,
231 360301 : end_offset: u64,
232 360301 : meta: BlobMeta,
233 360301 : max_read_size: usize,
234 360301 : ) -> Self {
235 360301 : Self::new_impl(start_offset, end_offset, meta, Some(max_read_size))
236 360301 : }
237 :
238 81112 : pub(crate) fn new_streaming(start_offset: u64, end_offset: u64, meta: BlobMeta) -> Self {
239 81112 : Self::new_impl(start_offset, end_offset, meta, None)
240 81112 : }
241 :
242 : /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
243 : ///
244 : /// The resulting size also must be below the max read size.
245 4991047 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
246 4991047 : tracing::trace!(start, end, "trying to extend");
247 4991047 : let start_blk_no = start as usize / Self::CHUNK_SIZE;
248 4991047 : let end_blk_no = (end as usize).div_ceil(Self::CHUNK_SIZE);
249 :
250 4991047 : let not_limited_by_max_read_size = {
251 4991047 : if let Some(max_read_size) = self.max_read_size {
252 816151 : let coalesced_size = (end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE;
253 816151 : coalesced_size <= max_read_size
254 : } else {
255 4174896 : true
256 : }
257 : };
258 :
259 : // True if the second block starts in the same block or the immediate next block where the first block ended.
260 : //
261 : // Note: This automatically handles the case where two blocks are adjacent to each other,
262 : // whether they starts on chunk size boundary or not.
263 4991047 : let is_adjacent_chunk_read = {
264 : // 1. first.end & second.start are in the same block
265 4991047 : self.end_blk_no == start_blk_no + 1 ||
266 : // 2. first.end ends one block before second.start
267 26641 : self.end_blk_no == start_blk_no
268 : };
269 :
270 4991047 : if is_adjacent_chunk_read && not_limited_by_max_read_size {
271 4945853 : self.end_blk_no = end_blk_no;
272 4945853 : self.blobs_at
273 4945853 : .append(start, meta)
274 4945853 : .expect("LSNs are ordered within vectored reads");
275 4945853 :
276 4945853 : return VectoredReadExtended::Yes;
277 45194 : }
278 45194 :
279 45194 : VectoredReadExtended::No
280 4991047 : }
281 :
282 4254708 : pub(crate) fn size(&self) -> usize {
283 4254708 : (self.end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE
284 4254708 : }
285 :
286 441413 : pub(crate) fn build(self) -> VectoredRead {
287 441413 : let start = (self.start_blk_no * Self::CHUNK_SIZE) as u64;
288 441413 : let end = (self.end_blk_no * Self::CHUNK_SIZE) as u64;
289 441413 : VectoredRead {
290 441413 : start,
291 441413 : end,
292 441413 : blobs_at: self.blobs_at,
293 441413 : }
294 441413 : }
295 : }
296 :
297 : #[derive(Copy, Clone, Debug)]
298 : pub enum BlobFlag {
299 : None,
300 : Ignore,
301 : ReplaceAll,
302 : }
303 :
304 : /// Planner for vectored blob reads.
305 : ///
306 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
307 : /// and coalesced into disk reads.
308 : ///
309 : /// The implementation is very simple:
310 : /// * Collect all blob offsets in an ordered structure
311 : /// * Iterate over the collected blobs and coalesce them into reads at the end
312 : pub struct VectoredReadPlanner {
313 : // Track all the blob offsets. Start offsets must be ordered.
314 : // Values in the value tuples are:
315 : // (
316 : // lsn of the blob,
317 : // start offset of the blob in the underlying file,
318 : // end offset of the blob in the underlying file,
319 : // whether the blob initializes the page image or not
320 : // see [`pageserver_api::record::NeonWalRecord::will_init`]
321 : // )
322 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
323 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
324 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
325 :
326 : max_read_size: usize,
327 : }
328 :
329 : impl VectoredReadPlanner {
330 479872 : pub fn new(max_read_size: usize) -> Self {
331 479872 : Self {
332 479872 : blobs: BTreeMap::new(),
333 479872 : prev: None,
334 479872 : max_read_size,
335 479872 : }
336 479872 : }
337 :
338 : /// Include a new blob in the read plan.
339 : ///
340 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
341 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
342 : /// keys in a given keyspace. This function must be called for each key in the desired
343 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
344 : /// be called after every range in the offset.
345 : ///
346 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
347 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
348 : /// incorrect data to the user.
349 : ///
350 : /// The `flag` argument has two interesting values:
351 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
352 : /// This is used for WAL records that `will_init`.
353 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
354 : /// if the blob is cached.
355 3043423 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
356 : // Implementation note: internally lag behind by one blob such that
357 : // we have a start and end offset when initialising [`VectoredRead`]
358 3043423 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
359 : None => {
360 307279 : self.prev = Some((key, lsn, offset, flag));
361 307279 : return;
362 : }
363 2736144 : Some(prev) => prev,
364 2736144 : };
365 2736144 :
366 2736144 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
367 2736144 :
368 2736144 : self.prev = Some((key, lsn, offset, flag));
369 3043423 : }
370 :
371 480352 : pub fn handle_range_end(&mut self, offset: u64) {
372 480352 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
373 307279 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
374 307279 : }
375 :
376 480352 : self.prev = None;
377 480352 : }
378 :
379 3043423 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
380 3043423 : match flag {
381 749516 : BlobFlag::None => {
382 749516 : let blobs_for_key = self.blobs.entry(key).or_default();
383 749516 : blobs_for_key.push((lsn, start_offset, end_offset, false));
384 749516 : }
385 641087 : BlobFlag::ReplaceAll => {
386 641087 : let blobs_for_key = self.blobs.entry(key).or_default();
387 641087 : blobs_for_key.clear();
388 641087 : blobs_for_key.push((lsn, start_offset, end_offset, true));
389 641087 : }
390 1652820 : BlobFlag::Ignore => {}
391 : }
392 3043423 : }
393 :
394 479872 : pub fn finish(self) -> Vec<VectoredRead> {
395 479872 : let mut current_read_builder: Option<ChunkedVectoredReadBuilder> = None;
396 479872 : let mut reads = Vec::new();
397 :
398 1500406 : for (key, blobs_for_key) in self.blobs {
399 2143504 : for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
400 1122970 : let extended = match &mut current_read_builder {
401 816107 : Some(read_builder) => read_builder.extend(
402 816107 : start_offset,
403 816107 : end_offset,
404 816107 : BlobMeta {
405 816107 : key,
406 816107 : lsn,
407 816107 : will_init,
408 816107 : },
409 816107 : ),
410 306863 : None => VectoredReadExtended::No,
411 : };
412 :
413 1122970 : if extended == VectoredReadExtended::No {
414 352045 : let next_read_builder = ChunkedVectoredReadBuilder::new(
415 352045 : start_offset,
416 352045 : end_offset,
417 352045 : BlobMeta {
418 352045 : key,
419 352045 : lsn,
420 352045 : will_init,
421 352045 : },
422 352045 : self.max_read_size,
423 352045 : );
424 352045 :
425 352045 : let prev_read_builder = current_read_builder.replace(next_read_builder);
426 :
427 : // `current_read_builder` is None in the first iteration of the outer loop
428 352045 : if let Some(read_builder) = prev_read_builder {
429 45182 : reads.push(read_builder.build());
430 306863 : }
431 770925 : }
432 : }
433 : }
434 :
435 479872 : if let Some(read_builder) = current_read_builder {
436 306863 : reads.push(read_builder.build());
437 306863 : }
438 :
439 479872 : reads
440 479872 : }
441 : }
442 :
443 : /// Disk reader for vectored blob spans (does not go through the page cache)
444 : pub struct VectoredBlobReader<'a> {
445 : file: &'a VirtualFile,
446 : }
447 :
448 : impl<'a> VectoredBlobReader<'a> {
449 393989 : pub fn new(file: &'a VirtualFile) -> Self {
450 393989 : Self { file }
451 393989 : }
452 :
453 : /// Read the requested blobs into the buffer.
454 : ///
455 : /// We have to deal with the fact that blobs are not fixed size.
456 : /// Each blob is prefixed by a size header.
457 : ///
458 : /// The success return value is a struct which contains the buffer
459 : /// filled from disk and a list of offsets at which each blob lies
460 : /// in the buffer.
461 441261 : pub async fn read_blobs(
462 441261 : &self,
463 441261 : read: &VectoredRead,
464 441261 : buf: IoBufferMut,
465 441261 : ctx: &RequestContext,
466 441261 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
467 441261 : assert!(read.size() > 0);
468 441261 : assert!(
469 441261 : read.size() <= buf.capacity(),
470 0 : "{} > {}",
471 0 : read.size(),
472 0 : buf.capacity()
473 : );
474 :
475 441261 : if cfg!(debug_assertions) {
476 441261 : const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
477 441261 : debug_assert_eq!(
478 441261 : read.start % ALIGN,
479 : 0,
480 0 : "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
481 : read.start,
482 : ALIGN
483 : );
484 0 : }
485 :
486 441261 : let buf = self
487 441261 : .file
488 441261 : .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
489 441261 : .await?
490 441261 : .into_inner();
491 441261 :
492 441261 : let blobs_at = read.blobs_at.as_slice();
493 441261 :
494 441261 : let start_offset = read.start;
495 441261 :
496 441261 : let mut metas = Vec::with_capacity(blobs_at.len());
497 : // Blobs in `read` only provide their starting offset. The end offset
498 : // of a blob is implicit: the start of the next blob if one exists
499 : // or the end of the read.
500 :
501 5828303 : for (blob_start, meta) in blobs_at {
502 5387042 : let blob_start_in_buf = blob_start - start_offset;
503 5387042 : let first_len_byte = buf[blob_start_in_buf as usize];
504 :
505 : // Each blob is prefixed by a header containing its size and compression information.
506 : // Extract the size and skip that header to find the start of the data.
507 : // The size can be 1 or 4 bytes. The most significant bit is 0 in the
508 : // 1 byte case and 1 in the 4 byte case.
509 5387042 : let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
510 5311658 : (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
511 : } else {
512 75384 : let mut blob_size_buf = [0u8; 4];
513 75384 : let offset_in_buf = blob_start_in_buf as usize;
514 75384 :
515 75384 : blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
516 75384 : blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
517 75384 :
518 75384 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
519 75384 : (
520 75384 : 4,
521 75384 : u32::from_be_bytes(blob_size_buf) as u64,
522 75384 : compression_bits,
523 75384 : )
524 : };
525 :
526 5387042 : let start = (blob_start_in_buf + size_length) as usize;
527 5387042 : let end = start + blob_size as usize;
528 5387042 :
529 5387042 : metas.push(VectoredBlob {
530 5387042 : start,
531 5387042 : end,
532 5387042 : meta: *meta,
533 5387042 : compression_bits,
534 5387042 : });
535 : }
536 :
537 441261 : Ok(VectoredBlobsBuf { buf, blobs: metas })
538 441261 : }
539 : }
540 :
541 : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
542 : ///
543 : /// It provides a streaming API for getting read blobs. It returns a batch when
544 : /// `handle` gets called and when the current key would just exceed the read_size and
545 : /// max_cnt constraints.
546 : pub struct StreamingVectoredReadPlanner {
547 : read_builder: Option<ChunkedVectoredReadBuilder>,
548 : // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
549 : prev: Option<(Key, Lsn, u64, bool)>,
550 : /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
551 : /// we will produce a single batch instead of split them.
552 : max_read_size: u64,
553 : /// Max item count per batch
554 : max_cnt: usize,
555 : /// Size of the current batch
556 : cnt: usize,
557 : }
558 :
559 : impl StreamingVectoredReadPlanner {
560 1640 : pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
561 1640 : assert!(max_cnt > 0);
562 1640 : assert!(max_read_size > 0);
563 1640 : Self {
564 1640 : read_builder: None,
565 1640 : prev: None,
566 1640 : max_cnt,
567 1640 : max_read_size,
568 1640 : cnt: 0,
569 1640 : }
570 1640 : }
571 :
572 4256120 : pub fn handle(
573 4256120 : &mut self,
574 4256120 : key: Key,
575 4256120 : lsn: Lsn,
576 4256120 : offset: u64,
577 4256120 : will_init: bool,
578 4256120 : ) -> Option<VectoredRead> {
579 : // Implementation note: internally lag behind by one blob such that
580 : // we have a start and end offset when initialising [`VectoredRead`]
581 4256120 : let (prev_key, prev_lsn, prev_offset, prev_will_init) = match self.prev {
582 : None => {
583 1412 : self.prev = Some((key, lsn, offset, will_init));
584 1412 : return None;
585 : }
586 4254708 : Some(prev) => prev,
587 4254708 : };
588 4254708 :
589 4254708 : let res = self.add_blob(
590 4254708 : prev_key,
591 4254708 : prev_lsn,
592 4254708 : prev_offset,
593 4254708 : offset,
594 4254708 : false,
595 4254708 : prev_will_init,
596 4254708 : );
597 4254708 :
598 4254708 : self.prev = Some((key, lsn, offset, will_init));
599 4254708 :
600 4254708 : res
601 4256120 : }
602 :
603 1304 : pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
604 1304 : let res = if let Some((prev_key, prev_lsn, prev_offset, prev_will_init)) = self.prev {
605 1300 : self.add_blob(
606 1300 : prev_key,
607 1300 : prev_lsn,
608 1300 : prev_offset,
609 1300 : offset,
610 1300 : true,
611 1300 : prev_will_init,
612 1300 : )
613 : } else {
614 4 : None
615 : };
616 :
617 1304 : self.prev = None;
618 1304 :
619 1304 : res
620 1304 : }
621 :
622 4256008 : fn add_blob(
623 4256008 : &mut self,
624 4256008 : key: Key,
625 4256008 : lsn: Lsn,
626 4256008 : start_offset: u64,
627 4256008 : end_offset: u64,
628 4256008 : is_last_blob_in_read: bool,
629 4256008 : will_init: bool,
630 4256008 : ) -> Option<VectoredRead> {
631 4256008 : match &mut self.read_builder {
632 4174896 : Some(read_builder) => {
633 4174896 : let extended = read_builder.extend(
634 4174896 : start_offset,
635 4174896 : end_offset,
636 4174896 : BlobMeta {
637 4174896 : key,
638 4174896 : lsn,
639 4174896 : will_init,
640 4174896 : },
641 4174896 : );
642 4174896 : assert_eq!(extended, VectoredReadExtended::Yes);
643 : }
644 81112 : None => {
645 81112 : self.read_builder = {
646 81112 : Some(ChunkedVectoredReadBuilder::new_streaming(
647 81112 : start_offset,
648 81112 : end_offset,
649 81112 : BlobMeta {
650 81112 : key,
651 81112 : lsn,
652 81112 : will_init,
653 81112 : },
654 81112 : ))
655 81112 : };
656 81112 : }
657 : }
658 4256008 : let read_builder = self.read_builder.as_mut().unwrap();
659 4256008 : self.cnt += 1;
660 4256008 : if is_last_blob_in_read
661 4254708 : || read_builder.size() >= self.max_read_size as usize
662 4197020 : || self.cnt >= self.max_cnt
663 : {
664 81112 : let prev_read_builder = self.read_builder.take();
665 81112 : self.cnt = 0;
666 :
667 : // `current_read_builder` is None in the first iteration
668 81112 : if let Some(read_builder) = prev_read_builder {
669 81112 : return Some(read_builder.build());
670 0 : }
671 4174896 : }
672 4174896 : None
673 4256008 : }
674 : }
675 :
676 : #[cfg(test)]
677 : mod tests {
678 : use anyhow::Error;
679 :
680 : use crate::context::DownloadBehavior;
681 : use crate::page_cache::PAGE_SZ;
682 : use crate::task_mgr::TaskKind;
683 :
684 : use super::super::blob_io::tests::{random_array, write_maybe_compressed};
685 : use super::*;
686 :
687 96 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
688 : const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
689 96 : assert_eq!(read.start % ALIGN, 0);
690 96 : assert_eq!(read.start / ALIGN, offset_range.first().unwrap().2 / ALIGN);
691 :
692 168 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
693 96 :
694 96 : let offsets_in_read: Vec<_> = read
695 96 : .blobs_at
696 96 : .as_slice()
697 96 : .iter()
698 168 : .map(|(offset, _)| *offset)
699 96 : .collect();
700 96 :
701 96 : assert_eq!(expected_offsets_in_read, offsets_in_read);
702 96 : }
703 :
704 : #[test]
705 4 : fn planner_chunked_coalesce_all_test() {
706 : use crate::virtual_file;
707 :
708 : const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
709 :
710 4 : let max_read_size = CHUNK_SIZE as usize * 8;
711 4 : let key = Key::MIN;
712 4 : let lsn = Lsn(0);
713 4 :
714 4 : let blob_descriptions = [
715 4 : (key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
716 4 : (key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
717 4 : (key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
718 4 : (key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
719 4 : (key, lsn, CHUNK_SIZE, BlobFlag::None),
720 4 : (key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
721 4 : (key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
722 4 : (key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
723 4 : (key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
724 4 : (key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
725 4 : (key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
726 4 : (key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
727 4 : (key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
728 4 : (key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
729 4 : ];
730 4 :
731 4 : let ranges = [
732 4 : &[
733 4 : blob_descriptions[0],
734 4 : blob_descriptions[2],
735 4 : blob_descriptions[4],
736 4 : blob_descriptions[5],
737 4 : blob_descriptions[7],
738 4 : blob_descriptions[8],
739 4 : blob_descriptions[10],
740 4 : ],
741 4 : &blob_descriptions[11..12],
742 4 : &blob_descriptions[13..],
743 4 : ];
744 4 :
745 4 : let mut planner = VectoredReadPlanner::new(max_read_size);
746 60 : for (key, lsn, offset, flag) in blob_descriptions {
747 56 : planner.handle(key, lsn, offset, flag);
748 56 : }
749 :
750 4 : planner.handle_range_end(652 * 1024);
751 4 :
752 4 : let reads = planner.finish();
753 4 :
754 4 : assert_eq!(reads.len(), ranges.len());
755 :
756 12 : for (idx, read) in reads.iter().enumerate() {
757 12 : validate_read(read, ranges[idx]);
758 12 : }
759 4 : }
760 :
761 : #[test]
762 4 : fn planner_max_read_size_test() {
763 4 : let max_read_size = 128 * 1024;
764 4 : let key = Key::MIN;
765 4 : let lsn = Lsn(0);
766 4 :
767 4 : let blob_descriptions = vec![
768 4 : (key, lsn, 0, BlobFlag::None),
769 4 : (key, lsn, 32 * 1024, BlobFlag::None),
770 4 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
771 4 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
772 4 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
773 4 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
774 4 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
775 4 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
776 4 : ];
777 4 :
778 4 : let ranges = [
779 4 : &blob_descriptions[0..3],
780 4 : &blob_descriptions[3..4],
781 4 : &blob_descriptions[4..5],
782 4 : &blob_descriptions[5..6],
783 4 : &blob_descriptions[6..7],
784 4 : &blob_descriptions[7..],
785 4 : ];
786 4 :
787 4 : let mut planner = VectoredReadPlanner::new(max_read_size);
788 32 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
789 32 : planner.handle(key, lsn, offset, flag);
790 32 : }
791 :
792 4 : planner.handle_range_end(652 * 1024);
793 4 :
794 4 : let reads = planner.finish();
795 4 :
796 4 : assert_eq!(reads.len(), 6);
797 :
798 : // TODO: could remove zero reads to produce 5 reads here
799 :
800 24 : for (idx, read) in reads.iter().enumerate() {
801 24 : validate_read(read, ranges[idx]);
802 24 : }
803 4 : }
804 :
805 : #[test]
806 4 : fn planner_replacement_test() {
807 : const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
808 4 : let max_read_size = 128 * CHUNK_SIZE as usize;
809 4 : let first_key = Key::MIN;
810 4 : let second_key = first_key.next();
811 4 : let lsn = Lsn(0);
812 4 :
813 4 : let blob_descriptions = vec![
814 4 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
815 4 : (first_key, lsn, CHUNK_SIZE, BlobFlag::None), // Last in read 1
816 4 : (second_key, lsn, 2 * CHUNK_SIZE, BlobFlag::ReplaceAll),
817 4 : (second_key, lsn, 3 * CHUNK_SIZE, BlobFlag::None),
818 4 : (second_key, lsn, 4 * CHUNK_SIZE, BlobFlag::ReplaceAll), // First in read 2
819 4 : (second_key, lsn, 5 * CHUNK_SIZE, BlobFlag::None), // Last in read 2
820 4 : ];
821 4 :
822 4 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
823 4 :
824 4 : let mut planner = VectoredReadPlanner::new(max_read_size);
825 24 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
826 24 : planner.handle(key, lsn, offset, flag);
827 24 : }
828 :
829 4 : planner.handle_range_end(6 * CHUNK_SIZE);
830 4 :
831 4 : let reads = planner.finish();
832 4 : assert_eq!(reads.len(), 2);
833 :
834 8 : for (idx, read) in reads.iter().enumerate() {
835 8 : validate_read(read, ranges[idx]);
836 8 : }
837 4 : }
838 :
839 : #[test]
840 4 : fn streaming_planner_max_read_size_test() {
841 4 : let max_read_size = 128 * 1024;
842 4 : let key = Key::MIN;
843 4 : let lsn = Lsn(0);
844 4 :
845 4 : let blob_descriptions = vec![
846 4 : (key, lsn, 0, BlobFlag::None),
847 4 : (key, lsn, 32 * 1024, BlobFlag::None),
848 4 : (key, lsn, 96 * 1024, BlobFlag::None),
849 4 : (key, lsn, 128 * 1024, BlobFlag::None),
850 4 : (key, lsn, 198 * 1024, BlobFlag::None),
851 4 : (key, lsn, 268 * 1024, BlobFlag::None),
852 4 : (key, lsn, 396 * 1024, BlobFlag::None),
853 4 : (key, lsn, 652 * 1024, BlobFlag::None),
854 4 : ];
855 4 :
856 4 : let ranges = [
857 4 : &blob_descriptions[0..3],
858 4 : &blob_descriptions[3..5],
859 4 : &blob_descriptions[5..6],
860 4 : &blob_descriptions[6..7],
861 4 : &blob_descriptions[7..],
862 4 : ];
863 4 :
864 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
865 4 : let mut reads = Vec::new();
866 32 : for (key, lsn, offset, _) in blob_descriptions.clone() {
867 32 : reads.extend(planner.handle(key, lsn, offset, false));
868 32 : }
869 4 : reads.extend(planner.handle_range_end(652 * 1024));
870 4 :
871 4 : assert_eq!(reads.len(), ranges.len());
872 :
873 20 : for (idx, read) in reads.iter().enumerate() {
874 20 : validate_read(read, ranges[idx]);
875 20 : }
876 4 : }
877 :
878 : #[test]
879 4 : fn streaming_planner_max_cnt_test() {
880 4 : let max_read_size = 1024 * 1024;
881 4 : let key = Key::MIN;
882 4 : let lsn = Lsn(0);
883 4 :
884 4 : let blob_descriptions = vec![
885 4 : (key, lsn, 0, BlobFlag::None),
886 4 : (key, lsn, 32 * 1024, BlobFlag::None),
887 4 : (key, lsn, 96 * 1024, BlobFlag::None),
888 4 : (key, lsn, 128 * 1024, BlobFlag::None),
889 4 : (key, lsn, 198 * 1024, BlobFlag::None),
890 4 : (key, lsn, 268 * 1024, BlobFlag::None),
891 4 : (key, lsn, 396 * 1024, BlobFlag::None),
892 4 : (key, lsn, 652 * 1024, BlobFlag::None),
893 4 : ];
894 4 :
895 4 : let ranges = [
896 4 : &blob_descriptions[0..2],
897 4 : &blob_descriptions[2..4],
898 4 : &blob_descriptions[4..6],
899 4 : &blob_descriptions[6..],
900 4 : ];
901 4 :
902 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
903 4 : let mut reads = Vec::new();
904 32 : for (key, lsn, offset, _) in blob_descriptions.clone() {
905 32 : reads.extend(planner.handle(key, lsn, offset, false));
906 32 : }
907 4 : reads.extend(planner.handle_range_end(652 * 1024));
908 4 :
909 4 : assert_eq!(reads.len(), ranges.len());
910 :
911 16 : for (idx, read) in reads.iter().enumerate() {
912 16 : validate_read(read, ranges[idx]);
913 16 : }
914 4 : }
915 :
916 : #[test]
917 4 : fn streaming_planner_edge_test() {
918 4 : let max_read_size = 1024 * 1024;
919 4 : let key = Key::MIN;
920 4 : let lsn = Lsn(0);
921 4 : {
922 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
923 4 : let mut reads = Vec::new();
924 4 : reads.extend(planner.handle_range_end(652 * 1024));
925 4 : assert!(reads.is_empty());
926 : }
927 : {
928 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
929 4 : let mut reads = Vec::new();
930 4 : reads.extend(planner.handle(key, lsn, 0, false));
931 4 : reads.extend(planner.handle_range_end(652 * 1024));
932 4 : assert_eq!(reads.len(), 1);
933 4 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
934 4 : }
935 4 : {
936 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
937 4 : let mut reads = Vec::new();
938 4 : reads.extend(planner.handle(key, lsn, 0, false));
939 4 : reads.extend(planner.handle(key, lsn, 128 * 1024, false));
940 4 : reads.extend(planner.handle_range_end(652 * 1024));
941 4 : assert_eq!(reads.len(), 2);
942 4 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
943 4 : validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
944 4 : }
945 4 : {
946 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
947 4 : let mut reads = Vec::new();
948 4 : reads.extend(planner.handle(key, lsn, 0, false));
949 4 : reads.extend(planner.handle(key, lsn, 128 * 1024, false));
950 4 : reads.extend(planner.handle_range_end(652 * 1024));
951 4 : assert_eq!(reads.len(), 1);
952 4 : validate_read(
953 4 : &reads[0],
954 4 : &[
955 4 : (key, lsn, 0, BlobFlag::None),
956 4 : (key, lsn, 128 * 1024, BlobFlag::None),
957 4 : ],
958 4 : );
959 4 : }
960 4 : }
961 :
962 16 : async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
963 16 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
964 16 : let (_temp_dir, pathbuf, offsets) =
965 16 : write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
966 :
967 16 : let file = VirtualFile::open(&pathbuf, &ctx).await?;
968 16 : let file_len = std::fs::metadata(&pathbuf)?.len();
969 16 :
970 16 : // Multiply by two (compressed data might need more space), and add a few bytes for the header
971 8240 : let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
972 16 : let mut buf = IoBufferMut::with_capacity(reserved_bytes);
973 16 :
974 16 : let vectored_blob_reader = VectoredBlobReader::new(&file);
975 16 : let meta = BlobMeta {
976 16 : key: Key::MIN,
977 16 : lsn: Lsn(0),
978 16 : will_init: false,
979 16 : };
980 :
981 8240 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
982 8240 : let end = offsets.get(idx + 1).unwrap_or(&file_len);
983 8240 : if idx + 1 == offsets.len() {
984 16 : continue;
985 8224 : }
986 8224 : let read_builder = ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
987 8224 : let read = read_builder.build();
988 8224 : let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
989 8224 : assert_eq!(result.blobs.len(), 1);
990 8224 : let read_blob = &result.blobs[0];
991 8224 : let view = BufView::new_slice(&result.buf);
992 8224 : let read_buf = read_blob.read(&view).await?;
993 8224 : assert_eq!(
994 8224 : &blob[..],
995 8224 : &read_buf[..],
996 0 : "mismatch for idx={idx} at offset={offset}"
997 : );
998 8224 : buf = result.buf;
999 : }
1000 16 : Ok(())
1001 16 : }
1002 :
1003 : #[tokio::test]
1004 4 : async fn test_really_big_array() -> Result<(), Error> {
1005 4 : let blobs = &[
1006 4 : b"test".to_vec(),
1007 4 : random_array(10 * PAGE_SZ),
1008 4 : b"hello".to_vec(),
1009 4 : random_array(66 * PAGE_SZ),
1010 4 : vec![0xf3; 24 * PAGE_SZ],
1011 4 : b"foobar".to_vec(),
1012 4 : ];
1013 4 : round_trip_test_compressed(blobs, false).await?;
1014 4 : round_trip_test_compressed(blobs, true).await?;
1015 4 : Ok(())
1016 4 : }
1017 :
1018 : #[tokio::test]
1019 4 : async fn test_arrays_inc() -> Result<(), Error> {
1020 4 : let blobs = (0..PAGE_SZ / 8)
1021 4096 : .map(|v| random_array(v * 16))
1022 4 : .collect::<Vec<_>>();
1023 4 : round_trip_test_compressed(&blobs, false).await?;
1024 4 : round_trip_test_compressed(&blobs, true).await?;
1025 4 : Ok(())
1026 4 : }
1027 : }
|