Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::ops::Deref;
20 :
21 : use bytes::Bytes;
22 : use pageserver_api::key::Key;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::BoundedBuf;
25 : use utils::lsn::Lsn;
26 : use utils::vec_map::VecMap;
27 :
28 : use crate::context::RequestContext;
29 : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
30 : use crate::virtual_file::{self, IoBufferMut, VirtualFile};
31 :
32 : /// Metadata bundled with the start and end offset of a blob.
33 : #[derive(Copy, Clone, Debug)]
34 : pub struct BlobMeta {
35 : pub key: Key,
36 : pub lsn: Lsn,
37 : pub will_init: bool,
38 : }
39 :
40 : /// A view into the vectored blobs read buffer.
41 : #[derive(Clone, Debug)]
42 : pub(crate) enum BufView<'a> {
43 : Slice(&'a [u8]),
44 : Bytes(bytes::Bytes),
45 : }
46 :
47 : impl<'a> BufView<'a> {
48 : /// Creates a new slice-based view on the blob.
49 441050 : pub fn new_slice(slice: &'a [u8]) -> Self {
50 441050 : Self::Slice(slice)
51 441050 : }
52 :
53 : /// Creates a new [`bytes::Bytes`]-based view on the blob.
54 4 : pub fn new_bytes(bytes: bytes::Bytes) -> Self {
55 4 : Self::Bytes(bytes)
56 4 : }
57 :
58 : /// Convert the view into `Bytes`.
59 : ///
60 : /// If using slice as the underlying storage, the copy will be an O(n) operation.
61 1066083 : pub fn into_bytes(self) -> Bytes {
62 1066083 : match self {
63 1066083 : BufView::Slice(slice) => Bytes::copy_from_slice(slice),
64 0 : BufView::Bytes(bytes) => bytes,
65 : }
66 1066083 : }
67 :
68 : /// Creates a sub-view of the blob based on the range.
69 5387075 : fn view(&self, range: std::ops::Range<usize>) -> Self {
70 5387075 : match self {
71 5387075 : BufView::Slice(slice) => BufView::Slice(&slice[range]),
72 0 : BufView::Bytes(bytes) => BufView::Bytes(bytes.slice(range)),
73 : }
74 5387075 : }
75 : }
76 :
77 : impl Deref for BufView<'_> {
78 : type Target = [u8];
79 :
80 4321124 : fn deref(&self) -> &Self::Target {
81 4321124 : match self {
82 4321120 : BufView::Slice(slice) => slice,
83 4 : BufView::Bytes(bytes) => bytes,
84 : }
85 4321124 : }
86 : }
87 :
88 : impl AsRef<[u8]> for BufView<'_> {
89 0 : fn as_ref(&self) -> &[u8] {
90 0 : match self {
91 0 : BufView::Slice(slice) => slice,
92 0 : BufView::Bytes(bytes) => bytes.as_ref(),
93 : }
94 0 : }
95 : }
96 :
97 : impl<'a> From<&'a [u8]> for BufView<'a> {
98 0 : fn from(value: &'a [u8]) -> Self {
99 0 : Self::new_slice(value)
100 0 : }
101 : }
102 :
103 : impl From<Bytes> for BufView<'_> {
104 0 : fn from(value: Bytes) -> Self {
105 0 : Self::new_bytes(value)
106 0 : }
107 : }
108 :
109 : /// Blob offsets into [`VectoredBlobsBuf::buf`]. The byte ranges is potentially compressed,
110 : /// subject to [`VectoredBlob::compression_bits`].
111 : pub struct VectoredBlob {
112 : /// Blob metadata.
113 : pub meta: BlobMeta,
114 : /// Start offset.
115 : start: usize,
116 : /// End offset.
117 : end: usize,
118 : /// Compression used on the the blob.
119 : compression_bits: u8,
120 : }
121 :
122 : impl VectoredBlob {
123 : /// Reads a decompressed view of the blob.
124 5387075 : pub(crate) async fn read<'a>(&self, buf: &BufView<'a>) -> Result<BufView<'a>, std::io::Error> {
125 5387075 : let view = buf.view(self.start..self.end);
126 5387075 :
127 5387075 : match self.compression_bits {
128 5387071 : BYTE_UNCOMPRESSED => Ok(view),
129 : BYTE_ZSTD => {
130 4 : let mut decompressed_vec = Vec::new();
131 4 : let mut decoder =
132 4 : async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
133 4 : decoder.write_all(&view).await?;
134 4 : decoder.flush().await?;
135 : // Zero-copy conversion from `Vec` to `Bytes`
136 4 : Ok(BufView::new_bytes(Bytes::from(decompressed_vec)))
137 : }
138 0 : bits => {
139 0 : let error = std::io::Error::new(
140 0 : std::io::ErrorKind::InvalidData,
141 0 : format!(
142 0 : "Failed to decompress blob for {}@{}, {}..{}: invalid compression byte {bits:x}",
143 0 : self.meta.key, self.meta.lsn, self.start, self.end
144 0 : ),
145 0 : );
146 0 : Err(error)
147 : }
148 : }
149 5387075 : }
150 : }
151 :
152 : impl std::fmt::Display for VectoredBlob {
153 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 0 : write!(
155 0 : f,
156 0 : "{}@{}, {}..{}",
157 0 : self.meta.key, self.meta.lsn, self.start, self.end
158 0 : )
159 0 : }
160 : }
161 :
162 : /// Return type of [`VectoredBlobReader::read_blobs`]
163 : pub struct VectoredBlobsBuf {
164 : /// Buffer for all blobs in this read
165 : pub buf: IoBufferMut,
166 : /// Offsets into the buffer and metadata for all blobs in this read
167 : pub blobs: Vec<VectoredBlob>,
168 : }
169 :
170 : /// Description of one disk read for multiple blobs.
171 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
172 : #[derive(Debug)]
173 : pub struct VectoredRead {
174 : pub start: u64,
175 : pub end: u64,
176 : /// Start offset and metadata for each blob in this read
177 : pub blobs_at: VecMap<u64, BlobMeta>,
178 : }
179 :
180 : impl VectoredRead {
181 2017576 : pub(crate) fn size(&self) -> usize {
182 2017576 : (self.end - self.start) as usize
183 2017576 : }
184 : }
185 :
186 : #[derive(Eq, PartialEq, Debug)]
187 : pub(crate) enum VectoredReadExtended {
188 : Yes,
189 : No,
190 : }
191 :
192 : /// A vectored read builder that tries to coalesce all reads that fits in a chunk.
193 : pub(crate) struct ChunkedVectoredReadBuilder {
194 : /// Start block number
195 : start_blk_no: usize,
196 : /// End block number (exclusive).
197 : end_blk_no: usize,
198 : /// Start offset and metadata for each blob in this read
199 : blobs_at: VecMap<u64, BlobMeta>,
200 : max_read_size: Option<usize>,
201 : }
202 :
203 : impl ChunkedVectoredReadBuilder {
204 : const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
205 : /// Start building a new vectored read.
206 : ///
207 : /// Note that by design, this does not check against reading more than `max_read_size` to
208 : /// support reading larger blobs than the configuration value. The builder will be single use
209 : /// however after that.
210 441202 : fn new_impl(
211 441202 : start_offset: u64,
212 441202 : end_offset: u64,
213 441202 : meta: BlobMeta,
214 441202 : max_read_size: Option<usize>,
215 441202 : ) -> Self {
216 441202 : let mut blobs_at = VecMap::default();
217 441202 : blobs_at
218 441202 : .append(start_offset, meta)
219 441202 : .expect("First insertion always succeeds");
220 441202 :
221 441202 : let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
222 441202 : let end_blk_no = (end_offset as usize).div_ceil(Self::CHUNK_SIZE);
223 441202 : Self {
224 441202 : start_blk_no,
225 441202 : end_blk_no,
226 441202 : blobs_at,
227 441202 : max_read_size,
228 441202 : }
229 441202 : }
230 :
231 360090 : pub(crate) fn new(
232 360090 : start_offset: u64,
233 360090 : end_offset: u64,
234 360090 : meta: BlobMeta,
235 360090 : max_read_size: usize,
236 360090 : ) -> Self {
237 360090 : Self::new_impl(start_offset, end_offset, meta, Some(max_read_size))
238 360090 : }
239 :
240 81112 : pub(crate) fn new_streaming(start_offset: u64, end_offset: u64, meta: BlobMeta) -> Self {
241 81112 : Self::new_impl(start_offset, end_offset, meta, None)
242 81112 : }
243 :
244 : /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
245 : ///
246 : /// The resulting size also must be below the max read size.
247 4991267 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
248 4991267 : tracing::trace!(start, end, "trying to extend");
249 4991267 : let start_blk_no = start as usize / Self::CHUNK_SIZE;
250 4991267 : let end_blk_no = (end as usize).div_ceil(Self::CHUNK_SIZE);
251 :
252 4991267 : let not_limited_by_max_read_size = {
253 4991267 : if let Some(max_read_size) = self.max_read_size {
254 816371 : let coalesced_size = (end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE;
255 816371 : coalesced_size <= max_read_size
256 : } else {
257 4174896 : true
258 : }
259 : };
260 :
261 : // True if the second block starts in the same block or the immediate next block where the first block ended.
262 : //
263 : // Note: This automatically handles the case where two blocks are adjacent to each other,
264 : // whether they starts on chunk size boundary or not.
265 4991267 : let is_adjacent_chunk_read = {
266 : // 1. first.end & second.start are in the same block
267 4991267 : self.end_blk_no == start_blk_no + 1 ||
268 : // 2. first.end ends one block before second.start
269 26670 : self.end_blk_no == start_blk_no
270 : };
271 :
272 4991267 : if is_adjacent_chunk_read && not_limited_by_max_read_size {
273 4946097 : self.end_blk_no = end_blk_no;
274 4946097 : self.blobs_at
275 4946097 : .append(start, meta)
276 4946097 : .expect("LSNs are ordered within vectored reads");
277 4946097 :
278 4946097 : return VectoredReadExtended::Yes;
279 45170 : }
280 45170 :
281 45170 : VectoredReadExtended::No
282 4991267 : }
283 :
284 4254708 : pub(crate) fn size(&self) -> usize {
285 4254708 : (self.end_blk_no - self.start_blk_no) * Self::CHUNK_SIZE
286 4254708 : }
287 :
288 441202 : pub(crate) fn build(self) -> VectoredRead {
289 441202 : let start = (self.start_blk_no * Self::CHUNK_SIZE) as u64;
290 441202 : let end = (self.end_blk_no * Self::CHUNK_SIZE) as u64;
291 441202 : VectoredRead {
292 441202 : start,
293 441202 : end,
294 441202 : blobs_at: self.blobs_at,
295 441202 : }
296 441202 : }
297 : }
298 :
299 : #[derive(Copy, Clone, Debug)]
300 : pub enum BlobFlag {
301 : None,
302 : Ignore,
303 : ReplaceAll,
304 : }
305 :
306 : /// Planner for vectored blob reads.
307 : ///
308 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
309 : /// and coalesced into disk reads.
310 : ///
311 : /// The implementation is very simple:
312 : /// * Collect all blob offsets in an ordered structure
313 : /// * Iterate over the collected blobs and coalesce them into reads at the end
314 : pub struct VectoredReadPlanner {
315 : // Track all the blob offsets. Start offsets must be ordered.
316 : // Values in the value tuples are:
317 : // (
318 : // lsn of the blob,
319 : // start offset of the blob in the underlying file,
320 : // end offset of the blob in the underlying file,
321 : // whether the blob initializes the page image or not
322 : // see [`pageserver_api::record::NeonWalRecord::will_init`]
323 : // )
324 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
325 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
326 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
327 :
328 : max_read_size: usize,
329 : }
330 :
331 : impl VectoredReadPlanner {
332 479904 : pub fn new(max_read_size: usize) -> Self {
333 479904 : Self {
334 479904 : blobs: BTreeMap::new(),
335 479904 : prev: None,
336 479904 : max_read_size,
337 479904 : }
338 479904 : }
339 :
340 : /// Include a new blob in the read plan.
341 : ///
342 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
343 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
344 : /// keys in a given keyspace. This function must be called for each key in the desired
345 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
346 : /// be called after every range in the offset.
347 : ///
348 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
349 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
350 : /// incorrect data to the user.
351 : ///
352 : /// The `flag` argument has two interesting values:
353 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
354 : /// This is used for WAL records that `will_init`.
355 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
356 : /// if the blob is cached.
357 3041259 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
358 : // Implementation note: internally lag behind by one blob such that
359 : // we have a start and end offset when initialising [`VectoredRead`]
360 3041259 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
361 : None => {
362 307100 : self.prev = Some((key, lsn, offset, flag));
363 307100 : return;
364 : }
365 2734159 : Some(prev) => prev,
366 2734159 : };
367 2734159 :
368 2734159 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
369 2734159 :
370 2734159 : self.prev = Some((key, lsn, offset, flag));
371 3041259 : }
372 :
373 480392 : pub fn handle_range_end(&mut self, offset: u64) {
374 480392 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
375 307100 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
376 307100 : }
377 :
378 480392 : self.prev = None;
379 480392 : }
380 :
381 3041259 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
382 3041259 : match flag {
383 749548 : BlobFlag::None => {
384 749548 : let blobs_for_key = self.blobs.entry(key).or_default();
385 749548 : blobs_for_key.push((lsn, start_offset, end_offset, false));
386 749548 : }
387 638892 : BlobFlag::ReplaceAll => {
388 638892 : let blobs_for_key = self.blobs.entry(key).or_default();
389 638892 : blobs_for_key.clear();
390 638892 : blobs_for_key.push((lsn, start_offset, end_offset, true));
391 638892 : }
392 1652819 : BlobFlag::Ignore => {}
393 : }
394 3041259 : }
395 :
396 479904 : pub fn finish(self) -> Vec<VectoredRead> {
397 479904 : let mut current_read_builder: Option<ChunkedVectoredReadBuilder> = None;
398 479904 : let mut reads = Vec::new();
399 :
400 1500471 : for (key, blobs_for_key) in self.blobs {
401 2143570 : for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
402 1123003 : let extended = match &mut current_read_builder {
403 816327 : Some(read_builder) => read_builder.extend(
404 816327 : start_offset,
405 816327 : end_offset,
406 816327 : BlobMeta {
407 816327 : key,
408 816327 : lsn,
409 816327 : will_init,
410 816327 : },
411 816327 : ),
412 306676 : None => VectoredReadExtended::No,
413 : };
414 :
415 1123003 : if extended == VectoredReadExtended::No {
416 351834 : let next_read_builder = ChunkedVectoredReadBuilder::new(
417 351834 : start_offset,
418 351834 : end_offset,
419 351834 : BlobMeta {
420 351834 : key,
421 351834 : lsn,
422 351834 : will_init,
423 351834 : },
424 351834 : self.max_read_size,
425 351834 : );
426 351834 :
427 351834 : let prev_read_builder = current_read_builder.replace(next_read_builder);
428 :
429 : // `current_read_builder` is None in the first iteration of the outer loop
430 351834 : if let Some(read_builder) = prev_read_builder {
431 45158 : reads.push(read_builder.build());
432 306676 : }
433 771169 : }
434 : }
435 : }
436 :
437 479904 : if let Some(read_builder) = current_read_builder {
438 306676 : reads.push(read_builder.build());
439 306676 : }
440 :
441 479904 : reads
442 479904 : }
443 : }
444 :
445 : /// Disk reader for vectored blob spans (does not go through the page cache)
446 : pub struct VectoredBlobReader<'a> {
447 : file: &'a VirtualFile,
448 : }
449 :
450 : impl<'a> VectoredBlobReader<'a> {
451 393778 : pub fn new(file: &'a VirtualFile) -> Self {
452 393778 : Self { file }
453 393778 : }
454 :
455 : /// Read the requested blobs into the buffer.
456 : ///
457 : /// We have to deal with the fact that blobs are not fixed size.
458 : /// Each blob is prefixed by a size header.
459 : ///
460 : /// The success return value is a struct which contains the buffer
461 : /// filled from disk and a list of offsets at which each blob lies
462 : /// in the buffer.
463 441050 : pub async fn read_blobs(
464 441050 : &self,
465 441050 : read: &VectoredRead,
466 441050 : buf: IoBufferMut,
467 441050 : ctx: &RequestContext,
468 441050 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
469 441050 : assert!(read.size() > 0);
470 441050 : assert!(
471 441050 : read.size() <= buf.capacity(),
472 0 : "{} > {}",
473 0 : read.size(),
474 0 : buf.capacity()
475 : );
476 :
477 441050 : if cfg!(debug_assertions) {
478 441050 : const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
479 441050 : debug_assert_eq!(
480 441050 : read.start % ALIGN,
481 : 0,
482 0 : "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
483 : read.start,
484 : ALIGN
485 : );
486 0 : }
487 :
488 441050 : let buf = self
489 441050 : .file
490 441050 : .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
491 441050 : .await?
492 441050 : .into_inner();
493 441050 :
494 441050 : let blobs_at = read.blobs_at.as_slice();
495 441050 :
496 441050 : let start_offset = read.start;
497 441050 :
498 441050 : let mut metas = Vec::with_capacity(blobs_at.len());
499 : // Blobs in `read` only provide their starting offset. The end offset
500 : // of a blob is implicit: the start of the next blob if one exists
501 : // or the end of the read.
502 :
503 5828125 : for (blob_start, meta) in blobs_at {
504 5387075 : let blob_start_in_buf = blob_start - start_offset;
505 5387075 : let first_len_byte = buf[blob_start_in_buf as usize];
506 :
507 : // Each blob is prefixed by a header containing its size and compression information.
508 : // Extract the size and skip that header to find the start of the data.
509 : // The size can be 1 or 4 bytes. The most significant bit is 0 in the
510 : // 1 byte case and 1 in the 4 byte case.
511 5387075 : let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
512 5311691 : (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
513 : } else {
514 75384 : let mut blob_size_buf = [0u8; 4];
515 75384 : let offset_in_buf = blob_start_in_buf as usize;
516 75384 :
517 75384 : blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
518 75384 : blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
519 75384 :
520 75384 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
521 75384 : (
522 75384 : 4,
523 75384 : u32::from_be_bytes(blob_size_buf) as u64,
524 75384 : compression_bits,
525 75384 : )
526 : };
527 :
528 5387075 : let start = (blob_start_in_buf + size_length) as usize;
529 5387075 : let end = start + blob_size as usize;
530 5387075 :
531 5387075 : metas.push(VectoredBlob {
532 5387075 : start,
533 5387075 : end,
534 5387075 : meta: *meta,
535 5387075 : compression_bits,
536 5387075 : });
537 : }
538 :
539 441050 : Ok(VectoredBlobsBuf { buf, blobs: metas })
540 441050 : }
541 : }
542 :
543 : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
544 : ///
545 : /// It provides a streaming API for getting read blobs. It returns a batch when
546 : /// `handle` gets called and when the current key would just exceed the read_size and
547 : /// max_cnt constraints.
548 : pub struct StreamingVectoredReadPlanner {
549 : read_builder: Option<ChunkedVectoredReadBuilder>,
550 : // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
551 : prev: Option<(Key, Lsn, u64, bool)>,
552 : /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
553 : /// we will produce a single batch instead of split them.
554 : max_read_size: u64,
555 : /// Max item count per batch
556 : max_cnt: usize,
557 : /// Size of the current batch
558 : cnt: usize,
559 : }
560 :
561 : impl StreamingVectoredReadPlanner {
562 1640 : pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
563 1640 : assert!(max_cnt > 0);
564 1640 : assert!(max_read_size > 0);
565 1640 : Self {
566 1640 : read_builder: None,
567 1640 : prev: None,
568 1640 : max_cnt,
569 1640 : max_read_size,
570 1640 : cnt: 0,
571 1640 : }
572 1640 : }
573 :
574 4256120 : pub fn handle(
575 4256120 : &mut self,
576 4256120 : key: Key,
577 4256120 : lsn: Lsn,
578 4256120 : offset: u64,
579 4256120 : will_init: bool,
580 4256120 : ) -> Option<VectoredRead> {
581 : // Implementation note: internally lag behind by one blob such that
582 : // we have a start and end offset when initialising [`VectoredRead`]
583 4256120 : let (prev_key, prev_lsn, prev_offset, prev_will_init) = match self.prev {
584 : None => {
585 1412 : self.prev = Some((key, lsn, offset, will_init));
586 1412 : return None;
587 : }
588 4254708 : Some(prev) => prev,
589 4254708 : };
590 4254708 :
591 4254708 : let res = self.add_blob(
592 4254708 : prev_key,
593 4254708 : prev_lsn,
594 4254708 : prev_offset,
595 4254708 : offset,
596 4254708 : false,
597 4254708 : prev_will_init,
598 4254708 : );
599 4254708 :
600 4254708 : self.prev = Some((key, lsn, offset, will_init));
601 4254708 :
602 4254708 : res
603 4256120 : }
604 :
605 1304 : pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
606 1304 : let res = if let Some((prev_key, prev_lsn, prev_offset, prev_will_init)) = self.prev {
607 1300 : self.add_blob(
608 1300 : prev_key,
609 1300 : prev_lsn,
610 1300 : prev_offset,
611 1300 : offset,
612 1300 : true,
613 1300 : prev_will_init,
614 1300 : )
615 : } else {
616 4 : None
617 : };
618 :
619 1304 : self.prev = None;
620 1304 :
621 1304 : res
622 1304 : }
623 :
624 4256008 : fn add_blob(
625 4256008 : &mut self,
626 4256008 : key: Key,
627 4256008 : lsn: Lsn,
628 4256008 : start_offset: u64,
629 4256008 : end_offset: u64,
630 4256008 : is_last_blob_in_read: bool,
631 4256008 : will_init: bool,
632 4256008 : ) -> Option<VectoredRead> {
633 4256008 : match &mut self.read_builder {
634 4174896 : Some(read_builder) => {
635 4174896 : let extended = read_builder.extend(
636 4174896 : start_offset,
637 4174896 : end_offset,
638 4174896 : BlobMeta {
639 4174896 : key,
640 4174896 : lsn,
641 4174896 : will_init,
642 4174896 : },
643 4174896 : );
644 4174896 : assert_eq!(extended, VectoredReadExtended::Yes);
645 : }
646 81112 : None => {
647 81112 : self.read_builder = {
648 81112 : Some(ChunkedVectoredReadBuilder::new_streaming(
649 81112 : start_offset,
650 81112 : end_offset,
651 81112 : BlobMeta {
652 81112 : key,
653 81112 : lsn,
654 81112 : will_init,
655 81112 : },
656 81112 : ))
657 81112 : };
658 81112 : }
659 : }
660 4256008 : let read_builder = self.read_builder.as_mut().unwrap();
661 4256008 : self.cnt += 1;
662 4256008 : if is_last_blob_in_read
663 4254708 : || read_builder.size() >= self.max_read_size as usize
664 4197020 : || self.cnt >= self.max_cnt
665 : {
666 81112 : let prev_read_builder = self.read_builder.take();
667 81112 : self.cnt = 0;
668 :
669 : // `current_read_builder` is None in the first iteration
670 81112 : if let Some(read_builder) = prev_read_builder {
671 81112 : return Some(read_builder.build());
672 0 : }
673 4174896 : }
674 4174896 : None
675 4256008 : }
676 : }
677 :
678 : #[cfg(test)]
679 : mod tests {
680 : use anyhow::Error;
681 :
682 : use super::super::blob_io::tests::{random_array, write_maybe_compressed};
683 : use super::*;
684 : use crate::context::DownloadBehavior;
685 : use crate::page_cache::PAGE_SZ;
686 : use crate::task_mgr::TaskKind;
687 :
688 96 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
689 : const ALIGN: u64 = virtual_file::get_io_buffer_alignment() as u64;
690 96 : assert_eq!(read.start % ALIGN, 0);
691 96 : assert_eq!(read.start / ALIGN, offset_range.first().unwrap().2 / ALIGN);
692 :
693 168 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
694 96 :
695 96 : let offsets_in_read: Vec<_> = read
696 96 : .blobs_at
697 96 : .as_slice()
698 96 : .iter()
699 168 : .map(|(offset, _)| *offset)
700 96 : .collect();
701 96 :
702 96 : assert_eq!(expected_offsets_in_read, offsets_in_read);
703 96 : }
704 :
705 : #[test]
706 4 : fn planner_chunked_coalesce_all_test() {
707 : use crate::virtual_file;
708 :
709 : const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
710 :
711 4 : let max_read_size = CHUNK_SIZE as usize * 8;
712 4 : let key = Key::MIN;
713 4 : let lsn = Lsn(0);
714 4 :
715 4 : let blob_descriptions = [
716 4 : (key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
717 4 : (key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
718 4 : (key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
719 4 : (key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
720 4 : (key, lsn, CHUNK_SIZE, BlobFlag::None),
721 4 : (key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
722 4 : (key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
723 4 : (key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
724 4 : (key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
725 4 : (key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
726 4 : (key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
727 4 : (key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
728 4 : (key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
729 4 : (key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
730 4 : ];
731 4 :
732 4 : let ranges = [
733 4 : &[
734 4 : blob_descriptions[0],
735 4 : blob_descriptions[2],
736 4 : blob_descriptions[4],
737 4 : blob_descriptions[5],
738 4 : blob_descriptions[7],
739 4 : blob_descriptions[8],
740 4 : blob_descriptions[10],
741 4 : ],
742 4 : &blob_descriptions[11..12],
743 4 : &blob_descriptions[13..],
744 4 : ];
745 4 :
746 4 : let mut planner = VectoredReadPlanner::new(max_read_size);
747 60 : for (key, lsn, offset, flag) in blob_descriptions {
748 56 : planner.handle(key, lsn, offset, flag);
749 56 : }
750 :
751 4 : planner.handle_range_end(652 * 1024);
752 4 :
753 4 : let reads = planner.finish();
754 4 :
755 4 : assert_eq!(reads.len(), ranges.len());
756 :
757 12 : for (idx, read) in reads.iter().enumerate() {
758 12 : validate_read(read, ranges[idx]);
759 12 : }
760 4 : }
761 :
762 : #[test]
763 4 : fn planner_max_read_size_test() {
764 4 : let max_read_size = 128 * 1024;
765 4 : let key = Key::MIN;
766 4 : let lsn = Lsn(0);
767 4 :
768 4 : let blob_descriptions = vec![
769 4 : (key, lsn, 0, BlobFlag::None),
770 4 : (key, lsn, 32 * 1024, BlobFlag::None),
771 4 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
772 4 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
773 4 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
774 4 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
775 4 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
776 4 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
777 4 : ];
778 4 :
779 4 : let ranges = [
780 4 : &blob_descriptions[0..3],
781 4 : &blob_descriptions[3..4],
782 4 : &blob_descriptions[4..5],
783 4 : &blob_descriptions[5..6],
784 4 : &blob_descriptions[6..7],
785 4 : &blob_descriptions[7..],
786 4 : ];
787 4 :
788 4 : let mut planner = VectoredReadPlanner::new(max_read_size);
789 32 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
790 32 : planner.handle(key, lsn, offset, flag);
791 32 : }
792 :
793 4 : planner.handle_range_end(652 * 1024);
794 4 :
795 4 : let reads = planner.finish();
796 4 :
797 4 : assert_eq!(reads.len(), 6);
798 :
799 : // TODO: could remove zero reads to produce 5 reads here
800 :
801 24 : for (idx, read) in reads.iter().enumerate() {
802 24 : validate_read(read, ranges[idx]);
803 24 : }
804 4 : }
805 :
806 : #[test]
807 4 : fn planner_replacement_test() {
808 : const CHUNK_SIZE: u64 = virtual_file::get_io_buffer_alignment() as u64;
809 4 : let max_read_size = 128 * CHUNK_SIZE as usize;
810 4 : let first_key = Key::MIN;
811 4 : let second_key = first_key.next();
812 4 : let lsn = Lsn(0);
813 4 :
814 4 : let blob_descriptions = vec![
815 4 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
816 4 : (first_key, lsn, CHUNK_SIZE, BlobFlag::None), // Last in read 1
817 4 : (second_key, lsn, 2 * CHUNK_SIZE, BlobFlag::ReplaceAll),
818 4 : (second_key, lsn, 3 * CHUNK_SIZE, BlobFlag::None),
819 4 : (second_key, lsn, 4 * CHUNK_SIZE, BlobFlag::ReplaceAll), // First in read 2
820 4 : (second_key, lsn, 5 * CHUNK_SIZE, BlobFlag::None), // Last in read 2
821 4 : ];
822 4 :
823 4 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
824 4 :
825 4 : let mut planner = VectoredReadPlanner::new(max_read_size);
826 24 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
827 24 : planner.handle(key, lsn, offset, flag);
828 24 : }
829 :
830 4 : planner.handle_range_end(6 * CHUNK_SIZE);
831 4 :
832 4 : let reads = planner.finish();
833 4 : assert_eq!(reads.len(), 2);
834 :
835 8 : for (idx, read) in reads.iter().enumerate() {
836 8 : validate_read(read, ranges[idx]);
837 8 : }
838 4 : }
839 :
840 : #[test]
841 4 : fn streaming_planner_max_read_size_test() {
842 4 : let max_read_size = 128 * 1024;
843 4 : let key = Key::MIN;
844 4 : let lsn = Lsn(0);
845 4 :
846 4 : let blob_descriptions = vec![
847 4 : (key, lsn, 0, BlobFlag::None),
848 4 : (key, lsn, 32 * 1024, BlobFlag::None),
849 4 : (key, lsn, 96 * 1024, BlobFlag::None),
850 4 : (key, lsn, 128 * 1024, BlobFlag::None),
851 4 : (key, lsn, 198 * 1024, BlobFlag::None),
852 4 : (key, lsn, 268 * 1024, BlobFlag::None),
853 4 : (key, lsn, 396 * 1024, BlobFlag::None),
854 4 : (key, lsn, 652 * 1024, BlobFlag::None),
855 4 : ];
856 4 :
857 4 : let ranges = [
858 4 : &blob_descriptions[0..3],
859 4 : &blob_descriptions[3..5],
860 4 : &blob_descriptions[5..6],
861 4 : &blob_descriptions[6..7],
862 4 : &blob_descriptions[7..],
863 4 : ];
864 4 :
865 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
866 4 : let mut reads = Vec::new();
867 32 : for (key, lsn, offset, _) in blob_descriptions.clone() {
868 32 : reads.extend(planner.handle(key, lsn, offset, false));
869 32 : }
870 4 : reads.extend(planner.handle_range_end(652 * 1024));
871 4 :
872 4 : assert_eq!(reads.len(), ranges.len());
873 :
874 20 : for (idx, read) in reads.iter().enumerate() {
875 20 : validate_read(read, ranges[idx]);
876 20 : }
877 4 : }
878 :
879 : #[test]
880 4 : fn streaming_planner_max_cnt_test() {
881 4 : let max_read_size = 1024 * 1024;
882 4 : let key = Key::MIN;
883 4 : let lsn = Lsn(0);
884 4 :
885 4 : let blob_descriptions = vec![
886 4 : (key, lsn, 0, BlobFlag::None),
887 4 : (key, lsn, 32 * 1024, BlobFlag::None),
888 4 : (key, lsn, 96 * 1024, BlobFlag::None),
889 4 : (key, lsn, 128 * 1024, BlobFlag::None),
890 4 : (key, lsn, 198 * 1024, BlobFlag::None),
891 4 : (key, lsn, 268 * 1024, BlobFlag::None),
892 4 : (key, lsn, 396 * 1024, BlobFlag::None),
893 4 : (key, lsn, 652 * 1024, BlobFlag::None),
894 4 : ];
895 4 :
896 4 : let ranges = [
897 4 : &blob_descriptions[0..2],
898 4 : &blob_descriptions[2..4],
899 4 : &blob_descriptions[4..6],
900 4 : &blob_descriptions[6..],
901 4 : ];
902 4 :
903 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
904 4 : let mut reads = Vec::new();
905 32 : for (key, lsn, offset, _) in blob_descriptions.clone() {
906 32 : reads.extend(planner.handle(key, lsn, offset, false));
907 32 : }
908 4 : reads.extend(planner.handle_range_end(652 * 1024));
909 4 :
910 4 : assert_eq!(reads.len(), ranges.len());
911 :
912 16 : for (idx, read) in reads.iter().enumerate() {
913 16 : validate_read(read, ranges[idx]);
914 16 : }
915 4 : }
916 :
917 : #[test]
918 4 : fn streaming_planner_edge_test() {
919 4 : let max_read_size = 1024 * 1024;
920 4 : let key = Key::MIN;
921 4 : let lsn = Lsn(0);
922 4 : {
923 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
924 4 : let mut reads = Vec::new();
925 4 : reads.extend(planner.handle_range_end(652 * 1024));
926 4 : assert!(reads.is_empty());
927 : }
928 : {
929 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
930 4 : let mut reads = Vec::new();
931 4 : reads.extend(planner.handle(key, lsn, 0, false));
932 4 : reads.extend(planner.handle_range_end(652 * 1024));
933 4 : assert_eq!(reads.len(), 1);
934 4 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
935 4 : }
936 4 : {
937 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
938 4 : let mut reads = Vec::new();
939 4 : reads.extend(planner.handle(key, lsn, 0, false));
940 4 : reads.extend(planner.handle(key, lsn, 128 * 1024, false));
941 4 : reads.extend(planner.handle_range_end(652 * 1024));
942 4 : assert_eq!(reads.len(), 2);
943 4 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
944 4 : validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
945 4 : }
946 4 : {
947 4 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
948 4 : let mut reads = Vec::new();
949 4 : reads.extend(planner.handle(key, lsn, 0, false));
950 4 : reads.extend(planner.handle(key, lsn, 128 * 1024, false));
951 4 : reads.extend(planner.handle_range_end(652 * 1024));
952 4 : assert_eq!(reads.len(), 1);
953 4 : validate_read(
954 4 : &reads[0],
955 4 : &[
956 4 : (key, lsn, 0, BlobFlag::None),
957 4 : (key, lsn, 128 * 1024, BlobFlag::None),
958 4 : ],
959 4 : );
960 4 : }
961 4 : }
962 :
963 16 : async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
964 16 : let ctx =
965 16 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
966 16 : let (_temp_dir, pathbuf, offsets) =
967 16 : write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
968 :
969 16 : let file = VirtualFile::open(&pathbuf, &ctx).await?;
970 16 : let file_len = std::fs::metadata(&pathbuf)?.len();
971 16 :
972 16 : // Multiply by two (compressed data might need more space), and add a few bytes for the header
973 8240 : let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
974 16 : let mut buf = IoBufferMut::with_capacity(reserved_bytes);
975 16 :
976 16 : let vectored_blob_reader = VectoredBlobReader::new(&file);
977 16 : let meta = BlobMeta {
978 16 : key: Key::MIN,
979 16 : lsn: Lsn(0),
980 16 : will_init: false,
981 16 : };
982 :
983 8240 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
984 8240 : let end = offsets.get(idx + 1).unwrap_or(&file_len);
985 8240 : if idx + 1 == offsets.len() {
986 16 : continue;
987 8224 : }
988 8224 : let read_builder = ChunkedVectoredReadBuilder::new(*offset, *end, meta, 16 * 4096);
989 8224 : let read = read_builder.build();
990 8224 : let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
991 8224 : assert_eq!(result.blobs.len(), 1);
992 8224 : let read_blob = &result.blobs[0];
993 8224 : let view = BufView::new_slice(&result.buf);
994 8224 : let read_buf = read_blob.read(&view).await?;
995 8224 : assert_eq!(
996 8224 : &blob[..],
997 8224 : &read_buf[..],
998 0 : "mismatch for idx={idx} at offset={offset}"
999 : );
1000 8224 : buf = result.buf;
1001 : }
1002 16 : Ok(())
1003 16 : }
1004 :
1005 : #[tokio::test]
1006 4 : async fn test_really_big_array() -> Result<(), Error> {
1007 4 : let blobs = &[
1008 4 : b"test".to_vec(),
1009 4 : random_array(10 * PAGE_SZ),
1010 4 : b"hello".to_vec(),
1011 4 : random_array(66 * PAGE_SZ),
1012 4 : vec![0xf3; 24 * PAGE_SZ],
1013 4 : b"foobar".to_vec(),
1014 4 : ];
1015 4 : round_trip_test_compressed(blobs, false).await?;
1016 4 : round_trip_test_compressed(blobs, true).await?;
1017 4 : Ok(())
1018 4 : }
1019 :
1020 : #[tokio::test]
1021 4 : async fn test_arrays_inc() -> Result<(), Error> {
1022 4 : let blobs = (0..PAGE_SZ / 8)
1023 4096 : .map(|v| random_array(v * 16))
1024 4 : .collect::<Vec<_>>();
1025 4 : round_trip_test_compressed(&blobs, false).await?;
1026 4 : round_trip_test_compressed(&blobs, true).await?;
1027 4 : Ok(())
1028 4 : }
1029 : }
|