Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::num::NonZeroUsize;
20 :
21 : use bytes::BytesMut;
22 : use pageserver_api::key::Key;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::BoundedBuf;
25 : use utils::lsn::Lsn;
26 : use utils::vec_map::VecMap;
27 :
28 : use crate::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
29 : use crate::context::RequestContext;
30 : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
31 : use crate::virtual_file::{self, VirtualFile};
32 :
33 : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
34 : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
35 :
36 : /// Metadata bundled with the start and end offset of a blob.
37 : #[derive(Copy, Clone, Debug)]
38 : pub struct BlobMeta {
39 : pub key: Key,
40 : pub lsn: Lsn,
41 : }
42 :
43 : /// Blob offsets into [`VectoredBlobsBuf::buf`]
44 : pub struct VectoredBlob {
45 : pub start: usize,
46 : pub end: usize,
47 : pub meta: BlobMeta,
48 : }
49 :
50 : /// Return type of [`VectoredBlobReader::read_blobs`]
51 : pub struct VectoredBlobsBuf {
52 : /// Buffer for all blobs in this read
53 : pub buf: BytesMut,
54 : /// Offsets into the buffer and metadata for all blobs in this read
55 : pub blobs: Vec<VectoredBlob>,
56 : }
57 :
58 : /// Description of one disk read for multiple blobs.
59 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
60 : #[derive(Debug)]
61 : pub struct VectoredRead {
62 : pub start: u64,
63 : pub end: u64,
64 : /// Start offset and metadata for each blob in this read
65 : pub blobs_at: VecMap<u64, BlobMeta>,
66 : }
67 :
68 : impl VectoredRead {
69 2848262 : pub(crate) fn size(&self) -> usize {
70 2848262 : (self.end - self.start) as usize
71 2848262 : }
72 : }
73 :
74 : #[derive(Eq, PartialEq, Debug)]
75 : pub(crate) enum VectoredReadExtended {
76 : Yes,
77 : No,
78 : }
79 :
80 : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
81 : pub enum VectoredReadCoalesceMode {
82 : /// Only coalesce exactly adjacent reads.
83 : AdjacentOnly,
84 : /// In addition to adjacent reads, also consider reads whose corresponding
85 : /// `end` and `start` offsets reside at the same chunk.
86 : Chunked(usize),
87 : }
88 :
89 : impl VectoredReadCoalesceMode {
90 : /// [`AdjacentVectoredReadBuilder`] is used if alignment requirement is 0,
91 : /// whereas [`ChunkedVectoredReadBuilder`] is used for alignment requirement 1 and higher.
92 641199 : pub(crate) fn get() -> Self {
93 641199 : let align = virtual_file::get_io_buffer_alignment_raw();
94 641199 : if align == DEFAULT_IO_BUFFER_ALIGNMENT {
95 213187 : VectoredReadCoalesceMode::AdjacentOnly
96 : } else {
97 428012 : VectoredReadCoalesceMode::Chunked(align)
98 : }
99 641199 : }
100 : }
101 :
102 : pub(crate) enum VectoredReadBuilder {
103 : Adjacent(AdjacentVectoredReadBuilder),
104 : Chunked(ChunkedVectoredReadBuilder),
105 : }
106 :
107 : impl VectoredReadBuilder {
108 623674 : fn new_impl(
109 623674 : start_offset: u64,
110 623674 : end_offset: u64,
111 623674 : meta: BlobMeta,
112 623674 : max_read_size: Option<usize>,
113 623674 : mode: VectoredReadCoalesceMode,
114 623674 : ) -> Self {
115 623674 : match mode {
116 212646 : VectoredReadCoalesceMode::AdjacentOnly => Self::Adjacent(
117 212646 : AdjacentVectoredReadBuilder::new(start_offset, end_offset, meta, max_read_size),
118 212646 : ),
119 411028 : VectoredReadCoalesceMode::Chunked(chunk_size) => {
120 411028 : Self::Chunked(ChunkedVectoredReadBuilder::new(
121 411028 : start_offset,
122 411028 : end_offset,
123 411028 : meta,
124 411028 : max_read_size,
125 411028 : chunk_size,
126 411028 : ))
127 : }
128 : }
129 623674 : }
130 :
131 503002 : pub(crate) fn new(
132 503002 : start_offset: u64,
133 503002 : end_offset: u64,
134 503002 : meta: BlobMeta,
135 503002 : max_read_size: usize,
136 503002 : mode: VectoredReadCoalesceMode,
137 503002 : ) -> Self {
138 503002 : Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), mode)
139 503002 : }
140 :
141 120672 : pub(crate) fn new_streaming(
142 120672 : start_offset: u64,
143 120672 : end_offset: u64,
144 120672 : meta: BlobMeta,
145 120672 : mode: VectoredReadCoalesceMode,
146 120672 : ) -> Self {
147 120672 : Self::new_impl(start_offset, end_offset, meta, None, mode)
148 120672 : }
149 :
150 7334262 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
151 7334262 : match self {
152 2444738 : VectoredReadBuilder::Adjacent(builder) => builder.extend(start, end, meta),
153 4889524 : VectoredReadBuilder::Chunked(builder) => builder.extend(start, end, meta),
154 : }
155 7334262 : }
156 :
157 623674 : pub(crate) fn build(self) -> VectoredRead {
158 623674 : match self {
159 212646 : VectoredReadBuilder::Adjacent(builder) => builder.build(),
160 411028 : VectoredReadBuilder::Chunked(builder) => builder.build(),
161 : }
162 623674 : }
163 :
164 6380028 : pub(crate) fn size(&self) -> usize {
165 6380028 : match self {
166 2126676 : VectoredReadBuilder::Adjacent(builder) => builder.size(),
167 4253352 : VectoredReadBuilder::Chunked(builder) => builder.size(),
168 : }
169 6380028 : }
170 : }
171 :
172 : pub(crate) struct AdjacentVectoredReadBuilder {
173 : /// Start offset of the read.
174 : start: u64,
175 : // End offset of the read.
176 : end: u64,
177 : /// Start offset and metadata for each blob in this read
178 : blobs_at: VecMap<u64, BlobMeta>,
179 : max_read_size: Option<usize>,
180 : }
181 :
182 : impl AdjacentVectoredReadBuilder {
183 : /// Start building a new vectored read.
184 : ///
185 : /// Note that by design, this does not check against reading more than `max_read_size` to
186 : /// support reading larger blobs than the configuration value. The builder will be single use
187 : /// however after that.
188 212646 : pub(crate) fn new(
189 212646 : start_offset: u64,
190 212646 : end_offset: u64,
191 212646 : meta: BlobMeta,
192 212646 : max_read_size: Option<usize>,
193 212646 : ) -> Self {
194 212646 : let mut blobs_at = VecMap::default();
195 212646 : blobs_at
196 212646 : .append(start_offset, meta)
197 212646 : .expect("First insertion always succeeds");
198 212646 :
199 212646 : Self {
200 212646 : start: start_offset,
201 212646 : end: end_offset,
202 212646 : blobs_at,
203 212646 : max_read_size,
204 212646 : }
205 212646 : }
206 : /// Attempt to extend the current read with a new blob if the start
207 : /// offset matches with the current end of the vectored read
208 : /// and the resuting size is below the max read size
209 2444738 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
210 2444738 : tracing::trace!(start, end, "trying to extend");
211 2444738 : let size = (end - start) as usize;
212 2444738 : let not_limited_by_max_read_size = {
213 2444738 : if let Some(max_read_size) = self.max_read_size {
214 357710 : self.size() + size <= max_read_size
215 : } else {
216 2087028 : true
217 : }
218 : };
219 :
220 2444738 : if self.end == start && not_limited_by_max_read_size {
221 2406589 : self.end = end;
222 2406589 : self.blobs_at
223 2406589 : .append(start, meta)
224 2406589 : .expect("LSNs are ordered within vectored reads");
225 2406589 :
226 2406589 : return VectoredReadExtended::Yes;
227 38149 : }
228 38149 :
229 38149 : VectoredReadExtended::No
230 2444738 : }
231 :
232 2484386 : pub(crate) fn size(&self) -> usize {
233 2484386 : (self.end - self.start) as usize
234 2484386 : }
235 :
236 212646 : pub(crate) fn build(self) -> VectoredRead {
237 212646 : VectoredRead {
238 212646 : start: self.start,
239 212646 : end: self.end,
240 212646 : blobs_at: self.blobs_at,
241 212646 : }
242 212646 : }
243 : }
244 :
245 : pub(crate) struct ChunkedVectoredReadBuilder {
246 : /// Start block number
247 : start_blk_no: usize,
248 : /// End block number (exclusive).
249 : end_blk_no: usize,
250 : /// Start offset and metadata for each blob in this read
251 : blobs_at: VecMap<u64, BlobMeta>,
252 : max_read_size: Option<usize>,
253 : /// Chunk size reads are coalesced into.
254 : chunk_size: usize,
255 : }
256 :
257 : /// Computes x / d rounded up.
258 5300570 : fn div_round_up(x: usize, d: usize) -> usize {
259 5300570 : (x + (d - 1)) / d
260 5300570 : }
261 :
262 : impl ChunkedVectoredReadBuilder {
263 : /// Start building a new vectored read.
264 : ///
265 : /// Note that by design, this does not check against reading more than `max_read_size` to
266 : /// support reading larger blobs than the configuration value. The builder will be single use
267 : /// however after that.
268 411028 : pub(crate) fn new(
269 411028 : start_offset: u64,
270 411028 : end_offset: u64,
271 411028 : meta: BlobMeta,
272 411028 : max_read_size: Option<usize>,
273 411028 : chunk_size: usize,
274 411028 : ) -> Self {
275 411028 : let mut blobs_at = VecMap::default();
276 411028 : blobs_at
277 411028 : .append(start_offset, meta)
278 411028 : .expect("First insertion always succeeds");
279 411028 :
280 411028 : let start_blk_no = start_offset as usize / chunk_size;
281 411028 : let end_blk_no = div_round_up(end_offset as usize, chunk_size);
282 411028 : Self {
283 411028 : start_blk_no,
284 411028 : end_blk_no,
285 411028 : blobs_at,
286 411028 : max_read_size,
287 411028 : chunk_size,
288 411028 : }
289 411028 : }
290 :
291 : /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
292 : ///
293 : /// The resulting size also must be below the max read size.
294 4889524 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
295 4889524 : tracing::trace!(start, end, "trying to extend");
296 4889524 : let start_blk_no = start as usize / self.chunk_size;
297 4889524 : let end_blk_no = div_round_up(end as usize, self.chunk_size);
298 :
299 4889524 : let not_limited_by_max_read_size = {
300 4889524 : if let Some(max_read_size) = self.max_read_size {
301 715468 : let coalesced_size = (end_blk_no - self.start_blk_no) * self.chunk_size;
302 715468 : coalesced_size <= max_read_size
303 : } else {
304 4174056 : true
305 : }
306 : };
307 :
308 : // True if the second block starts in the same block or the immediate next block where the first block ended.
309 : //
310 : // Note: This automatically handles the case where two blocks are adjacent to each other,
311 : // whether they starts on chunk size boundary or not.
312 4889524 : let is_adjacent_chunk_read = {
313 : // 1. first.end & second.start are in the same block
314 4889524 : self.end_blk_no == start_blk_no + 1 ||
315 : // 2. first.end ends one block before second.start
316 2457611 : self.end_blk_no == start_blk_no
317 : };
318 :
319 4889524 : if is_adjacent_chunk_read && not_limited_by_max_read_size {
320 4827765 : self.end_blk_no = end_blk_no;
321 4827765 : self.blobs_at
322 4827765 : .append(start, meta)
323 4827765 : .expect("LSNs are ordered within vectored reads");
324 4827765 :
325 4827765 : return VectoredReadExtended::Yes;
326 61759 : }
327 61759 :
328 61759 : VectoredReadExtended::No
329 4889524 : }
330 :
331 4253352 : pub(crate) fn size(&self) -> usize {
332 4253352 : (self.end_blk_no - self.start_blk_no) * self.chunk_size
333 4253352 : }
334 :
335 411028 : pub(crate) fn build(self) -> VectoredRead {
336 411028 : let start = (self.start_blk_no * self.chunk_size) as u64;
337 411028 : let end = (self.end_blk_no * self.chunk_size) as u64;
338 411028 : VectoredRead {
339 411028 : start,
340 411028 : end,
341 411028 : blobs_at: self.blobs_at,
342 411028 : }
343 411028 : }
344 : }
345 :
346 : #[derive(Copy, Clone, Debug)]
347 : pub enum BlobFlag {
348 : None,
349 : Ignore,
350 : ReplaceAll,
351 : }
352 :
353 : /// Planner for vectored blob reads.
354 : ///
355 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
356 : /// and coalesced into disk reads.
357 : ///
358 : /// The implementation is very simple:
359 : /// * Collect all blob offsets in an ordered structure
360 : /// * Iterate over the collected blobs and coalesce them into reads at the end
361 : pub struct VectoredReadPlanner {
362 : // Track all the blob offsets. Start offsets must be ordered.
363 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
364 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
365 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
366 :
367 : max_read_size: usize,
368 :
369 : mode: VectoredReadCoalesceMode,
370 : }
371 :
372 : impl VectoredReadPlanner {
373 638907 : pub fn new(max_read_size: usize) -> Self {
374 638907 : let mode = VectoredReadCoalesceMode::get();
375 638907 : Self {
376 638907 : blobs: BTreeMap::new(),
377 638907 : prev: None,
378 638907 : max_read_size,
379 638907 : mode,
380 638907 : }
381 638907 : }
382 :
383 : /// Include a new blob in the read plan.
384 : ///
385 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
386 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
387 : /// keys in a given keyspace. This function must be called for each key in the desired
388 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
389 : /// be called after every range in the offset.
390 : ///
391 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
392 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
393 : /// incorrect data to the user.
394 : ///
395 : /// The `flag` argument has two interesting values:
396 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
397 : /// This is used for WAL records that `will_init`.
398 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
399 : /// if the blob is cached.
400 4238286 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
401 : // Implementation note: internally lag behind by one blob such that
402 : // we have a start and end offset when initialising [`VectoredRead`]
403 4238286 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
404 : None => {
405 411238 : self.prev = Some((key, lsn, offset, flag));
406 411238 : return;
407 : }
408 3827048 : Some(prev) => prev,
409 3827048 : };
410 3827048 :
411 3827048 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
412 3827048 :
413 3827048 : self.prev = Some((key, lsn, offset, flag));
414 4238286 : }
415 :
416 831098 : pub fn handle_range_end(&mut self, offset: u64) {
417 831098 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
418 411238 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
419 419860 : }
420 :
421 831098 : self.prev = None;
422 831098 : }
423 :
424 4238286 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
425 4238286 : match flag {
426 1014908 : BlobFlag::None => {
427 1014908 : let blobs_for_key = self.blobs.entry(key).or_default();
428 1014908 : blobs_for_key.push((lsn, start_offset, end_offset));
429 1014908 : }
430 770620 : BlobFlag::ReplaceAll => {
431 770620 : let blobs_for_key = self.blobs.entry(key).or_default();
432 770620 : blobs_for_key.clear();
433 770620 : blobs_for_key.push((lsn, start_offset, end_offset));
434 770620 : }
435 2452758 : BlobFlag::Ignore => {}
436 : }
437 4238286 : }
438 :
439 638907 : pub fn finish(self) -> Vec<VectoredRead> {
440 638907 : let mut current_read_builder: Option<VectoredReadBuilder> = None;
441 638907 : let mut reads = Vec::new();
442 :
443 1949291 : for (key, blobs_for_key) in self.blobs {
444 2774224 : for (lsn, start_offset, end_offset) in blobs_for_key {
445 1463840 : let extended = match &mut current_read_builder {
446 1073112 : Some(read_builder) => {
447 1073112 : read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
448 : }
449 390728 : None => VectoredReadExtended::No,
450 : };
451 :
452 1463840 : if extended == VectoredReadExtended::No {
453 490618 : let next_read_builder = VectoredReadBuilder::new(
454 490618 : start_offset,
455 490618 : end_offset,
456 490618 : BlobMeta { key, lsn },
457 490618 : self.max_read_size,
458 490618 : self.mode,
459 490618 : );
460 490618 :
461 490618 : let prev_read_builder = current_read_builder.replace(next_read_builder);
462 :
463 : // `current_read_builder` is None in the first iteration of the outer loop
464 490618 : if let Some(read_builder) = prev_read_builder {
465 99890 : reads.push(read_builder.build());
466 390728 : }
467 973222 : }
468 : }
469 : }
470 :
471 638907 : if let Some(read_builder) = current_read_builder {
472 390728 : reads.push(read_builder.build());
473 390728 : }
474 :
475 638907 : reads
476 638907 : }
477 : }
478 :
479 : /// Disk reader for vectored blob spans (does not go through the page cache)
480 : pub struct VectoredBlobReader<'a> {
481 : file: &'a VirtualFile,
482 : }
483 :
484 : impl<'a> VectoredBlobReader<'a> {
485 759549 : pub fn new(file: &'a VirtualFile) -> Self {
486 759549 : Self { file }
487 759549 : }
488 :
489 : /// Read the requested blobs into the buffer.
490 : ///
491 : /// We have to deal with the fact that blobs are not fixed size.
492 : /// Each blob is prefixed by a size header.
493 : ///
494 : /// The success return value is a struct which contains the buffer
495 : /// filled from disk and a list of offsets at which each blob lies
496 : /// in the buffer.
497 623490 : pub async fn read_blobs(
498 623490 : &self,
499 623490 : read: &VectoredRead,
500 623490 : buf: BytesMut,
501 623490 : ctx: &RequestContext,
502 623490 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
503 623490 : assert!(read.size() > 0);
504 623490 : assert!(
505 623490 : read.size() <= buf.capacity(),
506 0 : "{} > {}",
507 0 : read.size(),
508 0 : buf.capacity()
509 : );
510 :
511 623490 : if cfg!(debug_assertions) {
512 623490 : let align = virtual_file::get_io_buffer_alignment() as u64;
513 623490 : debug_assert_eq!(
514 623490 : read.start % align,
515 : 0,
516 0 : "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
517 : read.start,
518 : align
519 : );
520 0 : }
521 :
522 623490 : let mut buf = self
523 623490 : .file
524 623490 : .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
525 317170 : .await?
526 623490 : .into_inner();
527 623490 :
528 623490 : let blobs_at = read.blobs_at.as_slice();
529 623490 :
530 623490 : let start_offset = read.start;
531 623490 :
532 623490 : let mut metas = Vec::with_capacity(blobs_at.len());
533 623490 : // Blobs in `read` only provide their starting offset. The end offset
534 623490 : // of a blob is implicit: the start of the next blob if one exists
535 623490 : // or the end of the read.
536 623490 :
537 623490 : // Some scratch space, put here for reusing the allocation
538 623490 : let mut decompressed_vec = Vec::new();
539 :
540 8481182 : for (blob_start, meta) in blobs_at {
541 7857692 : let blob_start_in_buf = blob_start - start_offset;
542 7857692 : let first_len_byte = buf[blob_start_in_buf as usize];
543 :
544 : // Each blob is prefixed by a header containing its size and compression information.
545 : // Extract the size and skip that header to find the start of the data.
546 : // The size can be 1 or 4 bytes. The most significant bit is 0 in the
547 : // 1 byte case and 1 in the 4 byte case.
548 7857692 : let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
549 7744616 : (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
550 : } else {
551 113076 : let mut blob_size_buf = [0u8; 4];
552 113076 : let offset_in_buf = blob_start_in_buf as usize;
553 113076 :
554 113076 : blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
555 113076 : blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
556 113076 :
557 113076 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
558 113076 : (
559 113076 : 4,
560 113076 : u32::from_be_bytes(blob_size_buf) as u64,
561 113076 : compression_bits,
562 113076 : )
563 : };
564 :
565 7857692 : let start_raw = blob_start_in_buf + size_length;
566 7857692 : let end_raw = start_raw + blob_size;
567 7857692 : let (start, end);
568 7857692 : if compression_bits == BYTE_UNCOMPRESSED {
569 7857686 : start = start_raw as usize;
570 7857686 : end = end_raw as usize;
571 7857686 : } else if compression_bits == BYTE_ZSTD {
572 6 : let mut decoder =
573 6 : async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
574 6 : decoder
575 6 : .write_all(&buf[start_raw as usize..end_raw as usize])
576 0 : .await?;
577 6 : decoder.flush().await?;
578 6 : start = buf.len();
579 6 : buf.extend_from_slice(&decompressed_vec);
580 6 : end = buf.len();
581 6 : decompressed_vec.clear();
582 : } else {
583 0 : let error = std::io::Error::new(
584 0 : std::io::ErrorKind::InvalidData,
585 0 : format!("invalid compression byte {compression_bits:x}"),
586 0 : );
587 0 : return Err(error);
588 : }
589 :
590 7857692 : metas.push(VectoredBlob {
591 7857692 : start,
592 7857692 : end,
593 7857692 : meta: *meta,
594 7857692 : });
595 : }
596 :
597 623490 : Ok(VectoredBlobsBuf { buf, blobs: metas })
598 623490 : }
599 : }
600 :
601 : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
602 : /// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
603 : /// max_cnt constraints.
604 : pub struct StreamingVectoredReadPlanner {
605 : read_builder: Option<VectoredReadBuilder>,
606 : // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
607 : prev: Option<(Key, Lsn, u64)>,
608 : /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
609 : /// we will produce a single batch instead of split them.
610 : max_read_size: u64,
611 : /// Max item count per batch
612 : max_cnt: usize,
613 : /// Size of the current batch
614 : cnt: usize,
615 :
616 : mode: VectoredReadCoalesceMode,
617 : }
618 :
619 : impl StreamingVectoredReadPlanner {
620 2238 : pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
621 2238 : assert!(max_cnt > 0);
622 2238 : assert!(max_read_size > 0);
623 2238 : let mode = VectoredReadCoalesceMode::get();
624 2238 : Self {
625 2238 : read_builder: None,
626 2238 : prev: None,
627 2238 : max_cnt,
628 2238 : max_read_size,
629 2238 : cnt: 0,
630 2238 : mode,
631 2238 : }
632 2238 : }
633 :
634 6381924 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
635 : // Implementation note: internally lag behind by one blob such that
636 : // we have a start and end offset when initialising [`VectoredRead`]
637 6381924 : let (prev_key, prev_lsn, prev_offset) = match self.prev {
638 : None => {
639 1896 : self.prev = Some((key, lsn, offset));
640 1896 : return None;
641 : }
642 6380028 : Some(prev) => prev,
643 6380028 : };
644 6380028 :
645 6380028 : let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
646 6380028 :
647 6380028 : self.prev = Some((key, lsn, offset));
648 6380028 :
649 6380028 : res
650 6381924 : }
651 :
652 1734 : pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
653 1734 : let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
654 1728 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
655 : } else {
656 6 : None
657 : };
658 :
659 1734 : self.prev = None;
660 1734 :
661 1734 : res
662 1734 : }
663 :
664 6381756 : fn add_blob(
665 6381756 : &mut self,
666 6381756 : key: Key,
667 6381756 : lsn: Lsn,
668 6381756 : start_offset: u64,
669 6381756 : end_offset: u64,
670 6381756 : is_last_blob_in_read: bool,
671 6381756 : ) -> Option<VectoredRead> {
672 6381756 : match &mut self.read_builder {
673 6261084 : Some(read_builder) => {
674 6261084 : let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
675 6261084 : assert_eq!(extended, VectoredReadExtended::Yes);
676 : }
677 120672 : None => {
678 120672 : self.read_builder = {
679 120672 : Some(VectoredReadBuilder::new_streaming(
680 120672 : start_offset,
681 120672 : end_offset,
682 120672 : BlobMeta { key, lsn },
683 120672 : self.mode,
684 120672 : ))
685 120672 : };
686 120672 : }
687 : }
688 6381756 : let read_builder = self.read_builder.as_mut().unwrap();
689 6381756 : self.cnt += 1;
690 6381756 : if is_last_blob_in_read
691 6380028 : || read_builder.size() >= self.max_read_size as usize
692 6295836 : || self.cnt >= self.max_cnt
693 : {
694 120672 : let prev_read_builder = self.read_builder.take();
695 120672 : self.cnt = 0;
696 :
697 : // `current_read_builder` is None in the first iteration
698 120672 : if let Some(read_builder) = prev_read_builder {
699 120672 : return Some(read_builder.build());
700 0 : }
701 6261084 : }
702 6261084 : None
703 6381756 : }
704 : }
705 :
706 : #[cfg(test)]
707 : mod tests {
708 : use anyhow::Error;
709 :
710 : use crate::context::DownloadBehavior;
711 : use crate::page_cache::PAGE_SZ;
712 : use crate::task_mgr::TaskKind;
713 :
714 : use super::super::blob_io::tests::{random_array, write_maybe_compressed};
715 : use super::*;
716 :
717 144 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
718 144 : let align = virtual_file::get_io_buffer_alignment() as u64;
719 144 : assert_eq!(read.start % align, 0);
720 144 : assert_eq!(read.start / align, offset_range.first().unwrap().2 / align);
721 :
722 252 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
723 144 :
724 144 : let offsets_in_read: Vec<_> = read
725 144 : .blobs_at
726 144 : .as_slice()
727 144 : .iter()
728 252 : .map(|(offset, _)| *offset)
729 144 : .collect();
730 144 :
731 144 : assert_eq!(expected_offsets_in_read, offsets_in_read);
732 144 : }
733 :
734 : #[test]
735 6 : fn planner_chunked_coalesce_all_test() {
736 6 : use crate::virtual_file;
737 6 :
738 6 : const CHUNK_SIZE: u64 = 512;
739 6 : virtual_file::set_io_buffer_alignment(CHUNK_SIZE as usize).unwrap();
740 6 : let max_read_size = CHUNK_SIZE as usize * 8;
741 6 : let key = Key::MIN;
742 6 : let lsn = Lsn(0);
743 6 :
744 6 : let blob_descriptions = [
745 6 : (key, lsn, CHUNK_SIZE / 8, BlobFlag::None), // Read 1 BEGIN
746 6 : (key, lsn, CHUNK_SIZE / 4, BlobFlag::Ignore), // Gap
747 6 : (key, lsn, CHUNK_SIZE / 2, BlobFlag::None),
748 6 : (key, lsn, CHUNK_SIZE - 2, BlobFlag::Ignore), // Gap
749 6 : (key, lsn, CHUNK_SIZE, BlobFlag::None),
750 6 : (key, lsn, CHUNK_SIZE * 2 - 1, BlobFlag::None),
751 6 : (key, lsn, CHUNK_SIZE * 2 + 1, BlobFlag::Ignore), // Gap
752 6 : (key, lsn, CHUNK_SIZE * 3 + 1, BlobFlag::None),
753 6 : (key, lsn, CHUNK_SIZE * 5 + 1, BlobFlag::None),
754 6 : (key, lsn, CHUNK_SIZE * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
755 6 : (key, lsn, CHUNK_SIZE * 7 + 1, BlobFlag::None),
756 6 : (key, lsn, CHUNK_SIZE * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
757 6 : (key, lsn, CHUNK_SIZE * 9, BlobFlag::Ignore), // ==== skipped a chunk
758 6 : (key, lsn, CHUNK_SIZE * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
759 6 : ];
760 6 :
761 6 : let ranges = [
762 6 : &[
763 6 : blob_descriptions[0],
764 6 : blob_descriptions[2],
765 6 : blob_descriptions[4],
766 6 : blob_descriptions[5],
767 6 : blob_descriptions[7],
768 6 : blob_descriptions[8],
769 6 : blob_descriptions[10],
770 6 : ],
771 6 : &blob_descriptions[11..12],
772 6 : &blob_descriptions[13..],
773 6 : ];
774 6 :
775 6 : let mut planner = VectoredReadPlanner::new(max_read_size);
776 90 : for (key, lsn, offset, flag) in blob_descriptions {
777 84 : planner.handle(key, lsn, offset, flag);
778 84 : }
779 :
780 6 : planner.handle_range_end(652 * 1024);
781 6 :
782 6 : let reads = planner.finish();
783 6 :
784 6 : assert_eq!(reads.len(), ranges.len());
785 :
786 18 : for (idx, read) in reads.iter().enumerate() {
787 18 : validate_read(read, ranges[idx]);
788 18 : }
789 6 : }
790 :
791 : #[test]
792 6 : fn planner_max_read_size_test() {
793 6 : let max_read_size = 128 * 1024;
794 6 : let key = Key::MIN;
795 6 : let lsn = Lsn(0);
796 6 :
797 6 : let blob_descriptions = vec![
798 6 : (key, lsn, 0, BlobFlag::None),
799 6 : (key, lsn, 32 * 1024, BlobFlag::None),
800 6 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
801 6 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
802 6 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
803 6 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
804 6 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
805 6 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
806 6 : ];
807 6 :
808 6 : let ranges = [
809 6 : &blob_descriptions[0..3],
810 6 : &blob_descriptions[3..4],
811 6 : &blob_descriptions[4..5],
812 6 : &blob_descriptions[5..6],
813 6 : &blob_descriptions[6..7],
814 6 : &blob_descriptions[7..],
815 6 : ];
816 6 :
817 6 : let mut planner = VectoredReadPlanner::new(max_read_size);
818 48 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
819 48 : planner.handle(key, lsn, offset, flag);
820 48 : }
821 :
822 6 : planner.handle_range_end(652 * 1024);
823 6 :
824 6 : let reads = planner.finish();
825 6 :
826 6 : assert_eq!(reads.len(), 6);
827 :
828 : // TODO: could remove zero reads to produce 5 reads here
829 :
830 36 : for (idx, read) in reads.iter().enumerate() {
831 36 : validate_read(read, ranges[idx]);
832 36 : }
833 6 : }
834 :
835 : #[test]
836 6 : fn planner_replacement_test() {
837 6 : let max_read_size = 128 * 1024;
838 6 : let first_key = Key::MIN;
839 6 : let second_key = first_key.next();
840 6 : let lsn = Lsn(0);
841 6 :
842 6 : let blob_descriptions = vec![
843 6 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
844 6 : (first_key, lsn, 1024, BlobFlag::None), // Last in read 1
845 6 : (second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll),
846 6 : (second_key, lsn, 3 * 1024, BlobFlag::None),
847 6 : (second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2
848 6 : (second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2
849 6 : ];
850 6 :
851 6 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
852 6 :
853 6 : let mut planner = VectoredReadPlanner::new(max_read_size);
854 36 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
855 36 : planner.handle(key, lsn, offset, flag);
856 36 : }
857 :
858 6 : planner.handle_range_end(6 * 1024);
859 6 :
860 6 : let reads = planner.finish();
861 6 : assert_eq!(reads.len(), 2);
862 :
863 12 : for (idx, read) in reads.iter().enumerate() {
864 12 : validate_read(read, ranges[idx]);
865 12 : }
866 6 : }
867 :
868 : #[test]
869 6 : fn streaming_planner_max_read_size_test() {
870 6 : let max_read_size = 128 * 1024;
871 6 : let key = Key::MIN;
872 6 : let lsn = Lsn(0);
873 6 :
874 6 : let blob_descriptions = vec![
875 6 : (key, lsn, 0, BlobFlag::None),
876 6 : (key, lsn, 32 * 1024, BlobFlag::None),
877 6 : (key, lsn, 96 * 1024, BlobFlag::None),
878 6 : (key, lsn, 128 * 1024, BlobFlag::None),
879 6 : (key, lsn, 198 * 1024, BlobFlag::None),
880 6 : (key, lsn, 268 * 1024, BlobFlag::None),
881 6 : (key, lsn, 396 * 1024, BlobFlag::None),
882 6 : (key, lsn, 652 * 1024, BlobFlag::None),
883 6 : ];
884 6 :
885 6 : let ranges = [
886 6 : &blob_descriptions[0..3],
887 6 : &blob_descriptions[3..5],
888 6 : &blob_descriptions[5..6],
889 6 : &blob_descriptions[6..7],
890 6 : &blob_descriptions[7..],
891 6 : ];
892 6 :
893 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
894 6 : let mut reads = Vec::new();
895 48 : for (key, lsn, offset, _) in blob_descriptions.clone() {
896 48 : reads.extend(planner.handle(key, lsn, offset));
897 48 : }
898 6 : reads.extend(planner.handle_range_end(652 * 1024));
899 6 :
900 6 : assert_eq!(reads.len(), ranges.len());
901 :
902 30 : for (idx, read) in reads.iter().enumerate() {
903 30 : validate_read(read, ranges[idx]);
904 30 : }
905 6 : }
906 :
907 : #[test]
908 6 : fn streaming_planner_max_cnt_test() {
909 6 : let max_read_size = 1024 * 1024;
910 6 : let key = Key::MIN;
911 6 : let lsn = Lsn(0);
912 6 :
913 6 : let blob_descriptions = vec![
914 6 : (key, lsn, 0, BlobFlag::None),
915 6 : (key, lsn, 32 * 1024, BlobFlag::None),
916 6 : (key, lsn, 96 * 1024, BlobFlag::None),
917 6 : (key, lsn, 128 * 1024, BlobFlag::None),
918 6 : (key, lsn, 198 * 1024, BlobFlag::None),
919 6 : (key, lsn, 268 * 1024, BlobFlag::None),
920 6 : (key, lsn, 396 * 1024, BlobFlag::None),
921 6 : (key, lsn, 652 * 1024, BlobFlag::None),
922 6 : ];
923 6 :
924 6 : let ranges = [
925 6 : &blob_descriptions[0..2],
926 6 : &blob_descriptions[2..4],
927 6 : &blob_descriptions[4..6],
928 6 : &blob_descriptions[6..],
929 6 : ];
930 6 :
931 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
932 6 : let mut reads = Vec::new();
933 48 : for (key, lsn, offset, _) in blob_descriptions.clone() {
934 48 : reads.extend(planner.handle(key, lsn, offset));
935 48 : }
936 6 : reads.extend(planner.handle_range_end(652 * 1024));
937 6 :
938 6 : assert_eq!(reads.len(), ranges.len());
939 :
940 24 : for (idx, read) in reads.iter().enumerate() {
941 24 : validate_read(read, ranges[idx]);
942 24 : }
943 6 : }
944 :
945 : #[test]
946 6 : fn streaming_planner_edge_test() {
947 6 : let max_read_size = 1024 * 1024;
948 6 : let key = Key::MIN;
949 6 : let lsn = Lsn(0);
950 6 : {
951 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
952 6 : let mut reads = Vec::new();
953 6 : reads.extend(planner.handle_range_end(652 * 1024));
954 6 : assert!(reads.is_empty());
955 : }
956 : {
957 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
958 6 : let mut reads = Vec::new();
959 6 : reads.extend(planner.handle(key, lsn, 0));
960 6 : reads.extend(planner.handle_range_end(652 * 1024));
961 6 : assert_eq!(reads.len(), 1);
962 6 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
963 6 : }
964 6 : {
965 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
966 6 : let mut reads = Vec::new();
967 6 : reads.extend(planner.handle(key, lsn, 0));
968 6 : reads.extend(planner.handle(key, lsn, 128 * 1024));
969 6 : reads.extend(planner.handle_range_end(652 * 1024));
970 6 : assert_eq!(reads.len(), 2);
971 6 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
972 6 : validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
973 6 : }
974 6 : {
975 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
976 6 : let mut reads = Vec::new();
977 6 : reads.extend(planner.handle(key, lsn, 0));
978 6 : reads.extend(planner.handle(key, lsn, 128 * 1024));
979 6 : reads.extend(planner.handle_range_end(652 * 1024));
980 6 : assert_eq!(reads.len(), 1);
981 6 : validate_read(
982 6 : &reads[0],
983 6 : &[
984 6 : (key, lsn, 0, BlobFlag::None),
985 6 : (key, lsn, 128 * 1024, BlobFlag::None),
986 6 : ],
987 6 : );
988 6 : }
989 6 : }
990 :
991 24 : async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
992 24 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
993 24 : let (_temp_dir, pathbuf, offsets) =
994 837 : write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
995 :
996 24 : let file = VirtualFile::open(&pathbuf, &ctx).await?;
997 24 : let file_len = std::fs::metadata(&pathbuf)?.len();
998 24 :
999 24 : // Multiply by two (compressed data might need more space), and add a few bytes for the header
1000 12360 : let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
1001 24 : let mut buf = BytesMut::with_capacity(reserved_bytes);
1002 24 :
1003 24 : let mode = VectoredReadCoalesceMode::get();
1004 24 : let vectored_blob_reader = VectoredBlobReader::new(&file);
1005 24 : let meta = BlobMeta {
1006 24 : key: Key::MIN,
1007 24 : lsn: Lsn(0),
1008 24 : };
1009 :
1010 12360 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
1011 12360 : let end = offsets.get(idx + 1).unwrap_or(&file_len);
1012 12360 : if idx + 1 == offsets.len() {
1013 24 : continue;
1014 12336 : }
1015 12336 : let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, mode);
1016 12336 : let read = read_builder.build();
1017 12336 : let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
1018 12336 : assert_eq!(result.blobs.len(), 1);
1019 12336 : let read_blob = &result.blobs[0];
1020 12336 : let read_buf = &result.buf[read_blob.start..read_blob.end];
1021 12336 : assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
1022 12336 : buf = result.buf;
1023 : }
1024 24 : Ok(())
1025 24 : }
1026 :
1027 : #[tokio::test]
1028 6 : async fn test_really_big_array() -> Result<(), Error> {
1029 6 : let blobs = &[
1030 6 : b"test".to_vec(),
1031 6 : random_array(10 * PAGE_SZ),
1032 6 : b"hello".to_vec(),
1033 6 : random_array(66 * PAGE_SZ),
1034 6 : vec![0xf3; 24 * PAGE_SZ],
1035 6 : b"foobar".to_vec(),
1036 6 : ];
1037 42 : round_trip_test_compressed(blobs, false).await?;
1038 33 : round_trip_test_compressed(blobs, true).await?;
1039 6 : Ok(())
1040 6 : }
1041 :
1042 : #[tokio::test]
1043 6 : async fn test_arrays_inc() -> Result<(), Error> {
1044 6 : let blobs = (0..PAGE_SZ / 8)
1045 6144 : .map(|v| random_array(v * 16))
1046 6 : .collect::<Vec<_>>();
1047 3516 : round_trip_test_compressed(&blobs, false).await?;
1048 3516 : round_trip_test_compressed(&blobs, true).await?;
1049 6 : Ok(())
1050 6 : }
1051 :
1052 : #[test]
1053 6 : fn test_div_round_up() {
1054 6 : const CHUNK_SIZE: usize = 512;
1055 6 : assert_eq!(1, div_round_up(200, CHUNK_SIZE));
1056 6 : assert_eq!(1, div_round_up(CHUNK_SIZE, CHUNK_SIZE));
1057 6 : assert_eq!(2, div_round_up(CHUNK_SIZE + 1, CHUNK_SIZE));
1058 6 : }
1059 : }
|