Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 :
20 : use bytes::BytesMut;
21 : use pageserver_api::key::Key;
22 : use tokio::io::AsyncWriteExt;
23 : use tokio_epoll_uring::BoundedBuf;
24 : use utils::lsn::Lsn;
25 : use utils::vec_map::VecMap;
26 :
27 : use crate::context::RequestContext;
28 : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
29 : use crate::virtual_file::{self, VirtualFile};
30 :
31 : /// Metadata bundled with the start and end offset of a blob.
32 : #[derive(Copy, Clone, Debug)]
33 : pub struct BlobMeta {
34 : pub key: Key,
35 : pub lsn: Lsn,
36 : }
37 :
38 : /// Blob offsets into [`VectoredBlobsBuf::buf`]
39 : pub struct VectoredBlob {
40 : pub start: usize,
41 : pub end: usize,
42 : pub meta: BlobMeta,
43 : }
44 :
45 : /// Return type of [`VectoredBlobReader::read_blobs`]
46 : pub struct VectoredBlobsBuf {
47 : /// Buffer for all blobs in this read
48 : pub buf: BytesMut,
49 : /// Offsets into the buffer and metadata for all blobs in this read
50 : pub blobs: Vec<VectoredBlob>,
51 : }
52 :
53 : /// Description of one disk read for multiple blobs.
54 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
55 : #[derive(Debug)]
56 : pub struct VectoredRead {
57 : pub start: u64,
58 : pub end: u64,
59 : /// Start offset and metadata for each blob in this read
60 : pub blobs_at: VecMap<u64, BlobMeta>,
61 : }
62 :
63 : impl VectoredRead {
64 2852696 : pub(crate) fn size(&self) -> usize {
65 2852696 : (self.end - self.start) as usize
66 2852696 : }
67 : }
68 :
69 : #[derive(Eq, PartialEq, Debug)]
70 : pub(crate) enum VectoredReadExtended {
71 : Yes,
72 : No,
73 : }
74 :
75 : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
76 : pub enum VectoredReadCoalesceMode {
77 : /// Only coalesce exactly adjacent reads.
78 : AdjacentOnly,
79 : /// In addition to adjacent reads, also consider reads whose corresponding
80 : /// `end` and `start` offsets reside at the same chunk.
81 : Chunked(usize),
82 : }
83 :
84 : impl VectoredReadCoalesceMode {
85 : /// [`AdjacentVectoredReadBuilder`] is used if alignment requirement is 0,
86 : /// whereas [`ChunkedVectoredReadBuilder`] is used for alignment requirement 1 and higher.
87 642594 : pub(crate) fn get() -> Self {
88 642594 : let align = virtual_file::get_io_buffer_alignment_raw();
89 642594 : if align == 0 {
90 214116 : VectoredReadCoalesceMode::AdjacentOnly
91 : } else {
92 428478 : VectoredReadCoalesceMode::Chunked(align)
93 : }
94 642594 : }
95 : }
96 :
97 : pub(crate) enum VectoredReadBuilder {
98 : Adjacent(AdjacentVectoredReadBuilder),
99 : Chunked(ChunkedVectoredReadBuilder),
100 : }
101 :
102 : impl VectoredReadBuilder {
103 624645 : fn new_impl(
104 624645 : start_offset: u64,
105 624645 : end_offset: u64,
106 624645 : meta: BlobMeta,
107 624645 : max_read_size: Option<usize>,
108 624645 : mode: VectoredReadCoalesceMode,
109 624645 : ) -> Self {
110 624645 : match mode {
111 213047 : VectoredReadCoalesceMode::AdjacentOnly => Self::Adjacent(
112 213047 : AdjacentVectoredReadBuilder::new(start_offset, end_offset, meta, max_read_size),
113 213047 : ),
114 411598 : VectoredReadCoalesceMode::Chunked(chunk_size) => {
115 411598 : Self::Chunked(ChunkedVectoredReadBuilder::new(
116 411598 : start_offset,
117 411598 : end_offset,
118 411598 : meta,
119 411598 : max_read_size,
120 411598 : chunk_size,
121 411598 : ))
122 : }
123 : }
124 624645 : }
125 :
126 503715 : pub(crate) fn new(
127 503715 : start_offset: u64,
128 503715 : end_offset: u64,
129 503715 : meta: BlobMeta,
130 503715 : max_read_size: usize,
131 503715 : mode: VectoredReadCoalesceMode,
132 503715 : ) -> Self {
133 503715 : Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), mode)
134 503715 : }
135 :
136 120930 : pub(crate) fn new_streaming(
137 120930 : start_offset: u64,
138 120930 : end_offset: u64,
139 120930 : meta: BlobMeta,
140 120930 : mode: VectoredReadCoalesceMode,
141 120930 : ) -> Self {
142 120930 : Self::new_impl(start_offset, end_offset, meta, None, mode)
143 120930 : }
144 :
145 7333970 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
146 7333970 : match self {
147 2444738 : VectoredReadBuilder::Adjacent(builder) => builder.extend(start, end, meta),
148 4889232 : VectoredReadBuilder::Chunked(builder) => builder.extend(start, end, meta),
149 : }
150 7333970 : }
151 :
152 624645 : pub(crate) fn build(self) -> VectoredRead {
153 624645 : match self {
154 213047 : VectoredReadBuilder::Adjacent(builder) => builder.build(),
155 411598 : VectoredReadBuilder::Chunked(builder) => builder.build(),
156 : }
157 624645 : }
158 :
159 6380026 : pub(crate) fn size(&self) -> usize {
160 6380026 : match self {
161 2126676 : VectoredReadBuilder::Adjacent(builder) => builder.size(),
162 4253350 : VectoredReadBuilder::Chunked(builder) => builder.size(),
163 : }
164 6380026 : }
165 : }
166 :
167 : pub(crate) struct AdjacentVectoredReadBuilder {
168 : /// Start offset of the read.
169 : start: u64,
170 : // End offset of the read.
171 : end: u64,
172 : /// Start offset and metadata for each blob in this read
173 : blobs_at: VecMap<u64, BlobMeta>,
174 : max_read_size: Option<usize>,
175 : }
176 :
177 : impl AdjacentVectoredReadBuilder {
178 : /// Start building a new vectored read.
179 : ///
180 : /// Note that by design, this does not check against reading more than `max_read_size` to
181 : /// support reading larger blobs than the configuration value. The builder will be single use
182 : /// however after that.
183 213047 : pub(crate) fn new(
184 213047 : start_offset: u64,
185 213047 : end_offset: u64,
186 213047 : meta: BlobMeta,
187 213047 : max_read_size: Option<usize>,
188 213047 : ) -> Self {
189 213047 : let mut blobs_at = VecMap::default();
190 213047 : blobs_at
191 213047 : .append(start_offset, meta)
192 213047 : .expect("First insertion always succeeds");
193 213047 :
194 213047 : Self {
195 213047 : start: start_offset,
196 213047 : end: end_offset,
197 213047 : blobs_at,
198 213047 : max_read_size,
199 213047 : }
200 213047 : }
201 : /// Attempt to extend the current read with a new blob if the start
202 : /// offset matches with the current end of the vectored read
203 : /// and the resuting size is below the max read size
204 2444738 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
205 2444738 : tracing::trace!(start, end, "trying to extend");
206 2444738 : let size = (end - start) as usize;
207 2444738 : let not_limited_by_max_read_size = {
208 2444738 : if let Some(max_read_size) = self.max_read_size {
209 357710 : self.size() + size <= max_read_size
210 : } else {
211 2087028 : true
212 : }
213 : };
214 :
215 2444738 : if self.end == start && not_limited_by_max_read_size {
216 2406418 : self.end = end;
217 2406418 : self.blobs_at
218 2406418 : .append(start, meta)
219 2406418 : .expect("LSNs are ordered within vectored reads");
220 2406418 :
221 2406418 : return VectoredReadExtended::Yes;
222 38320 : }
223 38320 :
224 38320 : VectoredReadExtended::No
225 2444738 : }
226 :
227 2484386 : pub(crate) fn size(&self) -> usize {
228 2484386 : (self.end - self.start) as usize
229 2484386 : }
230 :
231 213047 : pub(crate) fn build(self) -> VectoredRead {
232 213047 : VectoredRead {
233 213047 : start: self.start,
234 213047 : end: self.end,
235 213047 : blobs_at: self.blobs_at,
236 213047 : }
237 213047 : }
238 : }
239 :
240 : pub(crate) struct ChunkedVectoredReadBuilder {
241 : /// Start block number
242 : start_blk_no: usize,
243 : /// End block number (exclusive).
244 : end_blk_no: usize,
245 : /// Start offset and metadata for each blob in this read
246 : blobs_at: VecMap<u64, BlobMeta>,
247 : max_read_size: Option<usize>,
248 : /// Chunk size reads are coalesced into.
249 : chunk_size: usize,
250 : }
251 :
252 : /// Computes x / d rounded up.
253 5300848 : fn div_round_up(x: usize, d: usize) -> usize {
254 5300848 : (x + (d - 1)) / d
255 5300848 : }
256 :
257 : impl ChunkedVectoredReadBuilder {
258 : /// Start building a new vectored read.
259 : ///
260 : /// Note that by design, this does not check against reading more than `max_read_size` to
261 : /// support reading larger blobs than the configuration value. The builder will be single use
262 : /// however after that.
263 411598 : pub(crate) fn new(
264 411598 : start_offset: u64,
265 411598 : end_offset: u64,
266 411598 : meta: BlobMeta,
267 411598 : max_read_size: Option<usize>,
268 411598 : chunk_size: usize,
269 411598 : ) -> Self {
270 411598 : let mut blobs_at = VecMap::default();
271 411598 : blobs_at
272 411598 : .append(start_offset, meta)
273 411598 : .expect("First insertion always succeeds");
274 411598 :
275 411598 : let start_blk_no = start_offset as usize / chunk_size;
276 411598 : let end_blk_no = div_round_up(end_offset as usize, chunk_size);
277 411598 : Self {
278 411598 : start_blk_no,
279 411598 : end_blk_no,
280 411598 : blobs_at,
281 411598 : max_read_size,
282 411598 : chunk_size,
283 411598 : }
284 411598 : }
285 :
286 : /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
287 : ///
288 : /// The resulting size also must be below the max read size.
289 4889232 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
290 4889232 : tracing::trace!(start, end, "trying to extend");
291 4889232 : let start_blk_no = start as usize / self.chunk_size;
292 4889232 : let end_blk_no = div_round_up(end as usize, self.chunk_size);
293 :
294 4889232 : let not_limited_by_max_read_size = {
295 4889232 : if let Some(max_read_size) = self.max_read_size {
296 715436 : let coalesced_size = (end_blk_no - self.start_blk_no) * self.chunk_size;
297 715436 : coalesced_size <= max_read_size
298 : } else {
299 4173796 : true
300 : }
301 : };
302 :
303 : // True if the second block starts in the same block or the immediate next block where the first block ended.
304 : //
305 : // Note: This automatically handles the case where two blocks are adjacent to each other,
306 : // whether they starts on chunk size boundary or not.
307 4889232 : let is_adjacent_chunk_read = {
308 : // 1. first.end & second.start are in the same block
309 4889232 : self.end_blk_no == start_blk_no + 1 ||
310 : // 2. first.end ends one block before second.start
311 2457532 : self.end_blk_no == start_blk_no
312 : };
313 :
314 4889232 : if is_adjacent_chunk_read && not_limited_by_max_read_size {
315 4827352 : self.end_blk_no = end_blk_no;
316 4827352 : self.blobs_at
317 4827352 : .append(start, meta)
318 4827352 : .expect("LSNs are ordered within vectored reads");
319 4827352 :
320 4827352 : return VectoredReadExtended::Yes;
321 61880 : }
322 61880 :
323 61880 : VectoredReadExtended::No
324 4889232 : }
325 :
326 4253350 : pub(crate) fn size(&self) -> usize {
327 4253350 : (self.end_blk_no - self.start_blk_no) * self.chunk_size
328 4253350 : }
329 :
330 411598 : pub(crate) fn build(self) -> VectoredRead {
331 411598 : let start = (self.start_blk_no * self.chunk_size) as u64;
332 411598 : let end = (self.end_blk_no * self.chunk_size) as u64;
333 411598 : VectoredRead {
334 411598 : start,
335 411598 : end,
336 411598 : blobs_at: self.blobs_at,
337 411598 : }
338 411598 : }
339 : }
340 :
341 : #[derive(Copy, Clone, Debug)]
342 : pub enum BlobFlag {
343 : None,
344 : Ignore,
345 : ReplaceAll,
346 : }
347 :
348 : /// Planner for vectored blob reads.
349 : ///
350 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
351 : /// and coalesced into disk reads.
352 : ///
353 : /// The implementation is very simple:
354 : /// * Collect all blob offsets in an ordered structure
355 : /// * Iterate over the collected blobs and coalesce them into reads at the end
356 : pub struct VectoredReadPlanner {
357 : // Track all the blob offsets. Start offsets must be ordered.
358 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
359 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
360 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
361 :
362 : max_read_size: usize,
363 :
364 : mode: VectoredReadCoalesceMode,
365 : }
366 :
367 : impl VectoredReadPlanner {
368 640302 : pub fn new(max_read_size: usize) -> Self {
369 640302 : let mode = VectoredReadCoalesceMode::get();
370 640302 : Self {
371 640302 : blobs: BTreeMap::new(),
372 640302 : prev: None,
373 640302 : max_read_size,
374 640302 : mode,
375 640302 : }
376 640302 : }
377 :
378 : /// Include a new blob in the read plan.
379 : ///
380 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
381 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
382 : /// keys in a given keyspace. This function must be called for each key in the desired
383 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
384 : /// be called after every range in the offset.
385 : ///
386 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
387 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
388 : /// incorrect data to the user.
389 : ///
390 : /// The `flag` argument has two interesting values:
391 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
392 : /// This is used for WAL records that `will_init`.
393 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
394 : /// if the blob is cached.
395 4238582 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
396 : // Implementation note: internally lag behind by one blob such that
397 : // we have a start and end offset when initialising [`VectoredRead`]
398 4238582 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
399 : None => {
400 411859 : self.prev = Some((key, lsn, offset, flag));
401 411859 : return;
402 : }
403 3826723 : Some(prev) => prev,
404 3826723 : };
405 3826723 :
406 3826723 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
407 3826723 :
408 3826723 : self.prev = Some((key, lsn, offset, flag));
409 4238582 : }
410 :
411 832262 : pub fn handle_range_end(&mut self, offset: u64) {
412 832262 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
413 411859 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
414 420403 : }
415 :
416 832262 : self.prev = None;
417 832262 : }
418 :
419 4238582 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
420 4238582 : match flag {
421 1014722 : BlobFlag::None => {
422 1014722 : let blobs_for_key = self.blobs.entry(key).or_default();
423 1014722 : blobs_for_key.push((lsn, start_offset, end_offset));
424 1014722 : }
425 771234 : BlobFlag::ReplaceAll => {
426 771234 : let blobs_for_key = self.blobs.entry(key).or_default();
427 771234 : blobs_for_key.clear();
428 771234 : blobs_for_key.push((lsn, start_offset, end_offset));
429 771234 : }
430 2452626 : BlobFlag::Ignore => {}
431 : }
432 4238582 : }
433 :
434 640302 : pub fn finish(self) -> Vec<VectoredRead> {
435 640302 : let mut current_read_builder: Option<VectoredReadBuilder> = None;
436 640302 : let mut reads = Vec::new();
437 :
438 1951107 : for (key, blobs_for_key) in self.blobs {
439 2775034 : for (lsn, start_offset, end_offset) in blobs_for_key {
440 1464229 : let extended = match &mut current_read_builder {
441 1073080 : Some(read_builder) => {
442 1073080 : read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
443 : }
444 391149 : None => VectoredReadExtended::No,
445 : };
446 :
447 1464229 : if extended == VectoredReadExtended::No {
448 491331 : let next_read_builder = VectoredReadBuilder::new(
449 491331 : start_offset,
450 491331 : end_offset,
451 491331 : BlobMeta { key, lsn },
452 491331 : self.max_read_size,
453 491331 : self.mode,
454 491331 : );
455 491331 :
456 491331 : let prev_read_builder = current_read_builder.replace(next_read_builder);
457 :
458 : // `current_read_builder` is None in the first iteration of the outer loop
459 491331 : if let Some(read_builder) = prev_read_builder {
460 100182 : reads.push(read_builder.build());
461 391149 : }
462 972898 : }
463 : }
464 : }
465 :
466 640302 : if let Some(read_builder) = current_read_builder {
467 391149 : reads.push(read_builder.build());
468 391149 : }
469 :
470 640302 : reads
471 640302 : }
472 : }
473 :
474 : /// Disk reader for vectored blob spans (does not go through the page cache)
475 : pub struct VectoredBlobReader<'a> {
476 : file: &'a VirtualFile,
477 : }
478 :
479 : impl<'a> VectoredBlobReader<'a> {
480 761206 : pub fn new(file: &'a VirtualFile) -> Self {
481 761206 : Self { file }
482 761206 : }
483 :
484 : /// Read the requested blobs into the buffer.
485 : ///
486 : /// We have to deal with the fact that blobs are not fixed size.
487 : /// Each blob is prefixed by a size header.
488 : ///
489 : /// The success return value is a struct which contains the buffer
490 : /// filled from disk and a list of offsets at which each blob lies
491 : /// in the buffer.
492 624473 : pub async fn read_blobs(
493 624473 : &self,
494 624473 : read: &VectoredRead,
495 624473 : buf: BytesMut,
496 624473 : ctx: &RequestContext,
497 624473 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
498 624473 : assert!(read.size() > 0);
499 624473 : assert!(
500 624473 : read.size() <= buf.capacity(),
501 0 : "{} > {}",
502 0 : read.size(),
503 0 : buf.capacity()
504 : );
505 :
506 624473 : if cfg!(debug_assertions) {
507 624473 : let align = virtual_file::get_io_buffer_alignment() as u64;
508 624473 : debug_assert_eq!(
509 624473 : read.start % align,
510 : 0,
511 0 : "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
512 : read.start,
513 : align
514 : );
515 0 : }
516 :
517 624473 : let mut buf = self
518 624473 : .file
519 624473 : .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
520 317895 : .await?
521 624473 : .into_inner();
522 624473 :
523 624473 : let blobs_at = read.blobs_at.as_slice();
524 624473 :
525 624473 : let start_offset = read.start;
526 624473 :
527 624473 : let mut metas = Vec::with_capacity(blobs_at.len());
528 624473 : // Blobs in `read` only provide their starting offset. The end offset
529 624473 : // of a blob is implicit: the start of the next blob if one exists
530 624473 : // or the end of the read.
531 624473 :
532 624473 : // Some scratch space, put here for reusing the allocation
533 624473 : let mut decompressed_vec = Vec::new();
534 :
535 8482588 : for (blob_start, meta) in blobs_at {
536 7858115 : let blob_start_in_buf = blob_start - start_offset;
537 7858115 : let first_len_byte = buf[blob_start_in_buf as usize];
538 :
539 : // Each blob is prefixed by a header containing its size and compression information.
540 : // Extract the size and skip that header to find the start of the data.
541 : // The size can be 1 or 4 bytes. The most significant bit is 0 in the
542 : // 1 byte case and 1 in the 4 byte case.
543 7858115 : let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
544 7745039 : (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
545 : } else {
546 113076 : let mut blob_size_buf = [0u8; 4];
547 113076 : let offset_in_buf = blob_start_in_buf as usize;
548 113076 :
549 113076 : blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
550 113076 : blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
551 113076 :
552 113076 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
553 113076 : (
554 113076 : 4,
555 113076 : u32::from_be_bytes(blob_size_buf) as u64,
556 113076 : compression_bits,
557 113076 : )
558 : };
559 :
560 7858115 : let start_raw = blob_start_in_buf + size_length;
561 7858115 : let end_raw = start_raw + blob_size;
562 7858115 : let (start, end);
563 7858115 : if compression_bits == BYTE_UNCOMPRESSED {
564 7858109 : start = start_raw as usize;
565 7858109 : end = end_raw as usize;
566 7858109 : } else if compression_bits == BYTE_ZSTD {
567 6 : let mut decoder =
568 6 : async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
569 6 : decoder
570 6 : .write_all(&buf[start_raw as usize..end_raw as usize])
571 0 : .await?;
572 6 : decoder.flush().await?;
573 6 : start = buf.len();
574 6 : buf.extend_from_slice(&decompressed_vec);
575 6 : end = buf.len();
576 6 : decompressed_vec.clear();
577 : } else {
578 0 : let error = std::io::Error::new(
579 0 : std::io::ErrorKind::InvalidData,
580 0 : format!("invalid compression byte {compression_bits:x}"),
581 0 : );
582 0 : return Err(error);
583 : }
584 :
585 7858115 : metas.push(VectoredBlob {
586 7858115 : start,
587 7858115 : end,
588 7858115 : meta: *meta,
589 7858115 : });
590 : }
591 :
592 624473 : Ok(VectoredBlobsBuf { buf, blobs: metas })
593 624473 : }
594 : }
595 :
596 : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`].
597 : ///
598 : /// It provides a streaming API for getting read blobs. It returns a batch when
599 : /// `handle` gets called and when the current key would just exceed the read_size and
600 : /// max_cnt constraints.
601 : pub struct StreamingVectoredReadPlanner {
602 : read_builder: Option<VectoredReadBuilder>,
603 : // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
604 : prev: Option<(Key, Lsn, u64)>,
605 : /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
606 : /// we will produce a single batch instead of split them.
607 : max_read_size: u64,
608 : /// Max item count per batch
609 : max_cnt: usize,
610 : /// Size of the current batch
611 : cnt: usize,
612 :
613 : mode: VectoredReadCoalesceMode,
614 : }
615 :
616 : impl StreamingVectoredReadPlanner {
617 2238 : pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
618 2238 : assert!(max_cnt > 0);
619 2238 : assert!(max_read_size > 0);
620 2238 : let mode = VectoredReadCoalesceMode::get();
621 2238 : Self {
622 2238 : read_builder: None,
623 2238 : prev: None,
624 2238 : max_cnt,
625 2238 : max_read_size,
626 2238 : cnt: 0,
627 2238 : mode,
628 2238 : }
629 2238 : }
630 :
631 6381922 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
632 : // Implementation note: internally lag behind by one blob such that
633 : // we have a start and end offset when initialising [`VectoredRead`]
634 6381922 : let (prev_key, prev_lsn, prev_offset) = match self.prev {
635 : None => {
636 1896 : self.prev = Some((key, lsn, offset));
637 1896 : return None;
638 : }
639 6380026 : Some(prev) => prev,
640 6380026 : };
641 6380026 :
642 6380026 : let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
643 6380026 :
644 6380026 : self.prev = Some((key, lsn, offset));
645 6380026 :
646 6380026 : res
647 6381922 : }
648 :
649 1734 : pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
650 1734 : let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
651 1728 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
652 : } else {
653 6 : None
654 : };
655 :
656 1734 : self.prev = None;
657 1734 :
658 1734 : res
659 1734 : }
660 :
661 6381754 : fn add_blob(
662 6381754 : &mut self,
663 6381754 : key: Key,
664 6381754 : lsn: Lsn,
665 6381754 : start_offset: u64,
666 6381754 : end_offset: u64,
667 6381754 : is_last_blob_in_read: bool,
668 6381754 : ) -> Option<VectoredRead> {
669 6381754 : match &mut self.read_builder {
670 6260824 : Some(read_builder) => {
671 6260824 : let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
672 6260824 : assert_eq!(extended, VectoredReadExtended::Yes);
673 : }
674 120930 : None => {
675 120930 : self.read_builder = {
676 120930 : Some(VectoredReadBuilder::new_streaming(
677 120930 : start_offset,
678 120930 : end_offset,
679 120930 : BlobMeta { key, lsn },
680 120930 : self.mode,
681 120930 : ))
682 120930 : };
683 120930 : }
684 : }
685 6381754 : let read_builder = self.read_builder.as_mut().unwrap();
686 6381754 : self.cnt += 1;
687 6381754 : if is_last_blob_in_read
688 6380026 : || read_builder.size() >= self.max_read_size as usize
689 6295054 : || self.cnt >= self.max_cnt
690 : {
691 120930 : let prev_read_builder = self.read_builder.take();
692 120930 : self.cnt = 0;
693 :
694 : // `current_read_builder` is None in the first iteration
695 120930 : if let Some(read_builder) = prev_read_builder {
696 120930 : return Some(read_builder.build());
697 0 : }
698 6260824 : }
699 6260824 : None
700 6381754 : }
701 : }
702 :
703 : #[cfg(test)]
704 : mod tests {
705 : use anyhow::Error;
706 :
707 : use crate::context::DownloadBehavior;
708 : use crate::page_cache::PAGE_SZ;
709 : use crate::task_mgr::TaskKind;
710 :
711 : use super::super::blob_io::tests::{random_array, write_maybe_compressed};
712 : use super::*;
713 :
714 132 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
715 132 : let align = virtual_file::get_io_buffer_alignment() as u64;
716 132 : assert_eq!(read.start % align, 0);
717 132 : assert_eq!(read.start / align, offset_range.first().unwrap().2 / align);
718 :
719 216 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
720 132 :
721 132 : let offsets_in_read: Vec<_> = read
722 132 : .blobs_at
723 132 : .as_slice()
724 132 : .iter()
725 216 : .map(|(offset, _)| *offset)
726 132 : .collect();
727 132 :
728 132 : assert_eq!(expected_offsets_in_read, offsets_in_read);
729 132 : }
730 :
731 : #[test]
732 6 : fn planner_chunked_coalesce_all_test() {
733 : use crate::virtual_file;
734 :
735 6 : let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
736 6 :
737 6 : // The test explicitly does not check chunk size < 512
738 6 : if chunk_size < 512 {
739 4 : return;
740 2 : }
741 2 :
742 2 : let max_read_size = chunk_size as usize * 8;
743 2 : let key = Key::MIN;
744 2 : let lsn = Lsn(0);
745 2 :
746 2 : let blob_descriptions = [
747 2 : (key, lsn, chunk_size / 8, BlobFlag::None), // Read 1 BEGIN
748 2 : (key, lsn, chunk_size / 4, BlobFlag::Ignore), // Gap
749 2 : (key, lsn, chunk_size / 2, BlobFlag::None),
750 2 : (key, lsn, chunk_size - 2, BlobFlag::Ignore), // Gap
751 2 : (key, lsn, chunk_size, BlobFlag::None),
752 2 : (key, lsn, chunk_size * 2 - 1, BlobFlag::None),
753 2 : (key, lsn, chunk_size * 2 + 1, BlobFlag::Ignore), // Gap
754 2 : (key, lsn, chunk_size * 3 + 1, BlobFlag::None),
755 2 : (key, lsn, chunk_size * 5 + 1, BlobFlag::None),
756 2 : (key, lsn, chunk_size * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
757 2 : (key, lsn, chunk_size * 7 + 1, BlobFlag::None),
758 2 : (key, lsn, chunk_size * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
759 2 : (key, lsn, chunk_size * 9, BlobFlag::Ignore), // ==== skipped a chunk
760 2 : (key, lsn, chunk_size * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
761 2 : ];
762 2 :
763 2 : let ranges = [
764 2 : &[
765 2 : blob_descriptions[0],
766 2 : blob_descriptions[2],
767 2 : blob_descriptions[4],
768 2 : blob_descriptions[5],
769 2 : blob_descriptions[7],
770 2 : blob_descriptions[8],
771 2 : blob_descriptions[10],
772 2 : ],
773 2 : &blob_descriptions[11..12],
774 2 : &blob_descriptions[13..],
775 2 : ];
776 2 :
777 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
778 30 : for (key, lsn, offset, flag) in blob_descriptions {
779 28 : planner.handle(key, lsn, offset, flag);
780 28 : }
781 :
782 2 : planner.handle_range_end(652 * 1024);
783 2 :
784 2 : let reads = planner.finish();
785 2 :
786 2 : assert_eq!(reads.len(), ranges.len());
787 :
788 6 : for (idx, read) in reads.iter().enumerate() {
789 6 : validate_read(read, ranges[idx]);
790 6 : }
791 6 : }
792 :
793 : #[test]
794 6 : fn planner_max_read_size_test() {
795 6 : let max_read_size = 128 * 1024;
796 6 : let key = Key::MIN;
797 6 : let lsn = Lsn(0);
798 6 :
799 6 : let blob_descriptions = vec![
800 6 : (key, lsn, 0, BlobFlag::None),
801 6 : (key, lsn, 32 * 1024, BlobFlag::None),
802 6 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
803 6 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
804 6 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
805 6 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
806 6 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
807 6 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
808 6 : ];
809 6 :
810 6 : let ranges = [
811 6 : &blob_descriptions[0..3],
812 6 : &blob_descriptions[3..4],
813 6 : &blob_descriptions[4..5],
814 6 : &blob_descriptions[5..6],
815 6 : &blob_descriptions[6..7],
816 6 : &blob_descriptions[7..],
817 6 : ];
818 6 :
819 6 : let mut planner = VectoredReadPlanner::new(max_read_size);
820 48 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
821 48 : planner.handle(key, lsn, offset, flag);
822 48 : }
823 :
824 6 : planner.handle_range_end(652 * 1024);
825 6 :
826 6 : let reads = planner.finish();
827 6 :
828 6 : assert_eq!(reads.len(), 6);
829 :
830 : // TODO: could remove zero reads to produce 5 reads here
831 :
832 36 : for (idx, read) in reads.iter().enumerate() {
833 36 : validate_read(read, ranges[idx]);
834 36 : }
835 6 : }
836 :
837 : #[test]
838 6 : fn planner_replacement_test() {
839 6 : let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
840 6 : let max_read_size = 128 * chunk_size as usize;
841 6 : let first_key = Key::MIN;
842 6 : let second_key = first_key.next();
843 6 : let lsn = Lsn(0);
844 6 :
845 6 : let blob_descriptions = vec![
846 6 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
847 6 : (first_key, lsn, chunk_size, BlobFlag::None), // Last in read 1
848 6 : (second_key, lsn, 2 * chunk_size, BlobFlag::ReplaceAll),
849 6 : (second_key, lsn, 3 * chunk_size, BlobFlag::None),
850 6 : (second_key, lsn, 4 * chunk_size, BlobFlag::ReplaceAll), // First in read 2
851 6 : (second_key, lsn, 5 * chunk_size, BlobFlag::None), // Last in read 2
852 6 : ];
853 6 :
854 6 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
855 6 :
856 6 : let mut planner = VectoredReadPlanner::new(max_read_size);
857 36 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
858 36 : planner.handle(key, lsn, offset, flag);
859 36 : }
860 :
861 6 : planner.handle_range_end(6 * chunk_size);
862 6 :
863 6 : let reads = planner.finish();
864 6 : assert_eq!(reads.len(), 2);
865 :
866 12 : for (idx, read) in reads.iter().enumerate() {
867 12 : validate_read(read, ranges[idx]);
868 12 : }
869 6 : }
870 :
871 : #[test]
872 6 : fn streaming_planner_max_read_size_test() {
873 6 : let max_read_size = 128 * 1024;
874 6 : let key = Key::MIN;
875 6 : let lsn = Lsn(0);
876 6 :
877 6 : let blob_descriptions = vec![
878 6 : (key, lsn, 0, BlobFlag::None),
879 6 : (key, lsn, 32 * 1024, BlobFlag::None),
880 6 : (key, lsn, 96 * 1024, BlobFlag::None),
881 6 : (key, lsn, 128 * 1024, BlobFlag::None),
882 6 : (key, lsn, 198 * 1024, BlobFlag::None),
883 6 : (key, lsn, 268 * 1024, BlobFlag::None),
884 6 : (key, lsn, 396 * 1024, BlobFlag::None),
885 6 : (key, lsn, 652 * 1024, BlobFlag::None),
886 6 : ];
887 6 :
888 6 : let ranges = [
889 6 : &blob_descriptions[0..3],
890 6 : &blob_descriptions[3..5],
891 6 : &blob_descriptions[5..6],
892 6 : &blob_descriptions[6..7],
893 6 : &blob_descriptions[7..],
894 6 : ];
895 6 :
896 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
897 6 : let mut reads = Vec::new();
898 48 : for (key, lsn, offset, _) in blob_descriptions.clone() {
899 48 : reads.extend(planner.handle(key, lsn, offset));
900 48 : }
901 6 : reads.extend(planner.handle_range_end(652 * 1024));
902 6 :
903 6 : assert_eq!(reads.len(), ranges.len());
904 :
905 30 : for (idx, read) in reads.iter().enumerate() {
906 30 : validate_read(read, ranges[idx]);
907 30 : }
908 6 : }
909 :
910 : #[test]
911 6 : fn streaming_planner_max_cnt_test() {
912 6 : let max_read_size = 1024 * 1024;
913 6 : let key = Key::MIN;
914 6 : let lsn = Lsn(0);
915 6 :
916 6 : let blob_descriptions = vec![
917 6 : (key, lsn, 0, BlobFlag::None),
918 6 : (key, lsn, 32 * 1024, BlobFlag::None),
919 6 : (key, lsn, 96 * 1024, BlobFlag::None),
920 6 : (key, lsn, 128 * 1024, BlobFlag::None),
921 6 : (key, lsn, 198 * 1024, BlobFlag::None),
922 6 : (key, lsn, 268 * 1024, BlobFlag::None),
923 6 : (key, lsn, 396 * 1024, BlobFlag::None),
924 6 : (key, lsn, 652 * 1024, BlobFlag::None),
925 6 : ];
926 6 :
927 6 : let ranges = [
928 6 : &blob_descriptions[0..2],
929 6 : &blob_descriptions[2..4],
930 6 : &blob_descriptions[4..6],
931 6 : &blob_descriptions[6..],
932 6 : ];
933 6 :
934 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
935 6 : let mut reads = Vec::new();
936 48 : for (key, lsn, offset, _) in blob_descriptions.clone() {
937 48 : reads.extend(planner.handle(key, lsn, offset));
938 48 : }
939 6 : reads.extend(planner.handle_range_end(652 * 1024));
940 6 :
941 6 : assert_eq!(reads.len(), ranges.len());
942 :
943 24 : for (idx, read) in reads.iter().enumerate() {
944 24 : validate_read(read, ranges[idx]);
945 24 : }
946 6 : }
947 :
948 : #[test]
949 6 : fn streaming_planner_edge_test() {
950 6 : let max_read_size = 1024 * 1024;
951 6 : let key = Key::MIN;
952 6 : let lsn = Lsn(0);
953 6 : {
954 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
955 6 : let mut reads = Vec::new();
956 6 : reads.extend(planner.handle_range_end(652 * 1024));
957 6 : assert!(reads.is_empty());
958 : }
959 : {
960 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
961 6 : let mut reads = Vec::new();
962 6 : reads.extend(planner.handle(key, lsn, 0));
963 6 : reads.extend(planner.handle_range_end(652 * 1024));
964 6 : assert_eq!(reads.len(), 1);
965 6 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
966 6 : }
967 6 : {
968 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
969 6 : let mut reads = Vec::new();
970 6 : reads.extend(planner.handle(key, lsn, 0));
971 6 : reads.extend(planner.handle(key, lsn, 128 * 1024));
972 6 : reads.extend(planner.handle_range_end(652 * 1024));
973 6 : assert_eq!(reads.len(), 2);
974 6 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
975 6 : validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
976 6 : }
977 6 : {
978 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
979 6 : let mut reads = Vec::new();
980 6 : reads.extend(planner.handle(key, lsn, 0));
981 6 : reads.extend(planner.handle(key, lsn, 128 * 1024));
982 6 : reads.extend(planner.handle_range_end(652 * 1024));
983 6 : assert_eq!(reads.len(), 1);
984 6 : validate_read(
985 6 : &reads[0],
986 6 : &[
987 6 : (key, lsn, 0, BlobFlag::None),
988 6 : (key, lsn, 128 * 1024, BlobFlag::None),
989 6 : ],
990 6 : );
991 6 : }
992 6 : }
993 :
994 24 : async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
995 24 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
996 24 : let (_temp_dir, pathbuf, offsets) =
997 837 : write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
998 :
999 24 : let file = VirtualFile::open(&pathbuf, &ctx).await?;
1000 24 : let file_len = std::fs::metadata(&pathbuf)?.len();
1001 24 :
1002 24 : // Multiply by two (compressed data might need more space), and add a few bytes for the header
1003 12360 : let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
1004 24 : let mut buf = BytesMut::with_capacity(reserved_bytes);
1005 24 :
1006 24 : let mode = VectoredReadCoalesceMode::get();
1007 24 : let vectored_blob_reader = VectoredBlobReader::new(&file);
1008 24 : let meta = BlobMeta {
1009 24 : key: Key::MIN,
1010 24 : lsn: Lsn(0),
1011 24 : };
1012 :
1013 12360 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
1014 12360 : let end = offsets.get(idx + 1).unwrap_or(&file_len);
1015 12360 : if idx + 1 == offsets.len() {
1016 24 : continue;
1017 12336 : }
1018 12336 : let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, mode);
1019 12336 : let read = read_builder.build();
1020 12336 : let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
1021 12336 : assert_eq!(result.blobs.len(), 1);
1022 12336 : let read_blob = &result.blobs[0];
1023 12336 : let read_buf = &result.buf[read_blob.start..read_blob.end];
1024 12336 : assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
1025 12336 : buf = result.buf;
1026 : }
1027 24 : Ok(())
1028 24 : }
1029 :
1030 : #[tokio::test]
1031 6 : async fn test_really_big_array() -> Result<(), Error> {
1032 6 : let blobs = &[
1033 6 : b"test".to_vec(),
1034 6 : random_array(10 * PAGE_SZ),
1035 6 : b"hello".to_vec(),
1036 6 : random_array(66 * PAGE_SZ),
1037 6 : vec![0xf3; 24 * PAGE_SZ],
1038 6 : b"foobar".to_vec(),
1039 6 : ];
1040 42 : round_trip_test_compressed(blobs, false).await?;
1041 33 : round_trip_test_compressed(blobs, true).await?;
1042 6 : Ok(())
1043 6 : }
1044 :
1045 : #[tokio::test]
1046 6 : async fn test_arrays_inc() -> Result<(), Error> {
1047 6 : let blobs = (0..PAGE_SZ / 8)
1048 6144 : .map(|v| random_array(v * 16))
1049 6 : .collect::<Vec<_>>();
1050 3516 : round_trip_test_compressed(&blobs, false).await?;
1051 3516 : round_trip_test_compressed(&blobs, true).await?;
1052 6 : Ok(())
1053 6 : }
1054 :
1055 : #[test]
1056 6 : fn test_div_round_up() {
1057 : const CHUNK_SIZE: usize = 512;
1058 6 : assert_eq!(1, div_round_up(200, CHUNK_SIZE));
1059 6 : assert_eq!(1, div_round_up(CHUNK_SIZE, CHUNK_SIZE));
1060 6 : assert_eq!(2, div_round_up(CHUNK_SIZE + 1, CHUNK_SIZE));
1061 6 : }
1062 : }
|