Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::num::NonZeroUsize;
20 :
21 : use bytes::BytesMut;
22 : use pageserver_api::key::Key;
23 : use tokio::io::AsyncWriteExt;
24 : use tokio_epoll_uring::BoundedBuf;
25 : use utils::lsn::Lsn;
26 : use utils::vec_map::VecMap;
27 :
28 : use crate::context::RequestContext;
29 : use crate::tenant::blob_io::{BYTE_UNCOMPRESSED, BYTE_ZSTD, LEN_COMPRESSION_BIT_MASK};
30 : use crate::virtual_file::{self, VirtualFile};
31 :
32 : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
33 : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
34 :
35 : /// Metadata bundled with the start and end offset of a blob.
36 : #[derive(Copy, Clone, Debug)]
37 : pub struct BlobMeta {
38 : pub key: Key,
39 : pub lsn: Lsn,
40 : }
41 :
42 : /// Blob offsets into [`VectoredBlobsBuf::buf`]
43 : pub struct VectoredBlob {
44 : pub start: usize,
45 : pub end: usize,
46 : pub meta: BlobMeta,
47 : }
48 :
49 : /// Return type of [`VectoredBlobReader::read_blobs`]
50 : pub struct VectoredBlobsBuf {
51 : /// Buffer for all blobs in this read
52 : pub buf: BytesMut,
53 : /// Offsets into the buffer and metadata for all blobs in this read
54 : pub blobs: Vec<VectoredBlob>,
55 : }
56 :
57 : /// Description of one disk read for multiple blobs.
58 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
59 : #[derive(Debug)]
60 : pub struct VectoredRead {
61 : pub start: u64,
62 : pub end: u64,
63 : /// Start offset and metadata for each blob in this read
64 : pub blobs_at: VecMap<u64, BlobMeta>,
65 : }
66 :
67 : impl VectoredRead {
68 2849955 : pub(crate) fn size(&self) -> usize {
69 2849955 : (self.end - self.start) as usize
70 2849955 : }
71 : }
72 :
73 : #[derive(Eq, PartialEq, Debug)]
74 : pub(crate) enum VectoredReadExtended {
75 : Yes,
76 : No,
77 : }
78 :
79 : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
80 : pub enum VectoredReadCoalesceMode {
81 : /// Only coalesce exactly adjacent reads.
82 : AdjacentOnly,
83 : /// In addition to adjacent reads, also consider reads whose corresponding
84 : /// `end` and `start` offsets reside at the same chunk.
85 : Chunked(usize),
86 : }
87 :
88 : impl VectoredReadCoalesceMode {
89 : /// [`AdjacentVectoredReadBuilder`] is used if alignment requirement is 0,
90 : /// whereas [`ChunkedVectoredReadBuilder`] is used for alignment requirement 1 and higher.
91 641733 : pub(crate) fn get() -> Self {
92 641733 : let align = virtual_file::get_io_buffer_alignment_raw();
93 641733 : if align == 0 {
94 213920 : VectoredReadCoalesceMode::AdjacentOnly
95 : } else {
96 427813 : VectoredReadCoalesceMode::Chunked(align)
97 : }
98 641733 : }
99 : }
100 :
101 : pub(crate) enum VectoredReadBuilder {
102 : Adjacent(AdjacentVectoredReadBuilder),
103 : Chunked(ChunkedVectoredReadBuilder),
104 : }
105 :
106 : impl VectoredReadBuilder {
107 624115 : fn new_impl(
108 624115 : start_offset: u64,
109 624115 : end_offset: u64,
110 624115 : meta: BlobMeta,
111 624115 : max_read_size: Option<usize>,
112 624115 : mode: VectoredReadCoalesceMode,
113 624115 : ) -> Self {
114 624115 : match mode {
115 213135 : VectoredReadCoalesceMode::AdjacentOnly => Self::Adjacent(
116 213135 : AdjacentVectoredReadBuilder::new(start_offset, end_offset, meta, max_read_size),
117 213135 : ),
118 410980 : VectoredReadCoalesceMode::Chunked(chunk_size) => {
119 410980 : Self::Chunked(ChunkedVectoredReadBuilder::new(
120 410980 : start_offset,
121 410980 : end_offset,
122 410980 : meta,
123 410980 : max_read_size,
124 410980 : chunk_size,
125 410980 : ))
126 : }
127 : }
128 624115 : }
129 :
130 503185 : pub(crate) fn new(
131 503185 : start_offset: u64,
132 503185 : end_offset: u64,
133 503185 : meta: BlobMeta,
134 503185 : max_read_size: usize,
135 503185 : mode: VectoredReadCoalesceMode,
136 503185 : ) -> Self {
137 503185 : Self::new_impl(start_offset, end_offset, meta, Some(max_read_size), mode)
138 503185 : }
139 :
140 120930 : pub(crate) fn new_streaming(
141 120930 : start_offset: u64,
142 120930 : end_offset: u64,
143 120930 : meta: BlobMeta,
144 120930 : mode: VectoredReadCoalesceMode,
145 120930 : ) -> Self {
146 120930 : Self::new_impl(start_offset, end_offset, meta, None, mode)
147 120930 : }
148 :
149 7333970 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
150 7333970 : match self {
151 2444738 : VectoredReadBuilder::Adjacent(builder) => builder.extend(start, end, meta),
152 4889232 : VectoredReadBuilder::Chunked(builder) => builder.extend(start, end, meta),
153 : }
154 7333970 : }
155 :
156 624115 : pub(crate) fn build(self) -> VectoredRead {
157 624115 : match self {
158 213135 : VectoredReadBuilder::Adjacent(builder) => builder.build(),
159 410980 : VectoredReadBuilder::Chunked(builder) => builder.build(),
160 : }
161 624115 : }
162 :
163 6380026 : pub(crate) fn size(&self) -> usize {
164 6380026 : match self {
165 2126676 : VectoredReadBuilder::Adjacent(builder) => builder.size(),
166 4253350 : VectoredReadBuilder::Chunked(builder) => builder.size(),
167 : }
168 6380026 : }
169 : }
170 :
171 : pub(crate) struct AdjacentVectoredReadBuilder {
172 : /// Start offset of the read.
173 : start: u64,
174 : // End offset of the read.
175 : end: u64,
176 : /// Start offset and metadata for each blob in this read
177 : blobs_at: VecMap<u64, BlobMeta>,
178 : max_read_size: Option<usize>,
179 : }
180 :
181 : impl AdjacentVectoredReadBuilder {
182 : /// Start building a new vectored read.
183 : ///
184 : /// Note that by design, this does not check against reading more than `max_read_size` to
185 : /// support reading larger blobs than the configuration value. The builder will be single use
186 : /// however after that.
187 213135 : pub(crate) fn new(
188 213135 : start_offset: u64,
189 213135 : end_offset: u64,
190 213135 : meta: BlobMeta,
191 213135 : max_read_size: Option<usize>,
192 213135 : ) -> Self {
193 213135 : let mut blobs_at = VecMap::default();
194 213135 : blobs_at
195 213135 : .append(start_offset, meta)
196 213135 : .expect("First insertion always succeeds");
197 213135 :
198 213135 : Self {
199 213135 : start: start_offset,
200 213135 : end: end_offset,
201 213135 : blobs_at,
202 213135 : max_read_size,
203 213135 : }
204 213135 : }
205 : /// Attempt to extend the current read with a new blob if the start
206 : /// offset matches with the current end of the vectored read
207 : /// and the resuting size is below the max read size
208 2444738 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
209 2444738 : tracing::trace!(start, end, "trying to extend");
210 2444738 : let size = (end - start) as usize;
211 2444738 : let not_limited_by_max_read_size = {
212 2444738 : if let Some(max_read_size) = self.max_read_size {
213 357710 : self.size() + size <= max_read_size
214 : } else {
215 2087028 : true
216 : }
217 : };
218 :
219 2444738 : if self.end == start && not_limited_by_max_read_size {
220 2406422 : self.end = end;
221 2406422 : self.blobs_at
222 2406422 : .append(start, meta)
223 2406422 : .expect("LSNs are ordered within vectored reads");
224 2406422 :
225 2406422 : return VectoredReadExtended::Yes;
226 38316 : }
227 38316 :
228 38316 : VectoredReadExtended::No
229 2444738 : }
230 :
231 2484386 : pub(crate) fn size(&self) -> usize {
232 2484386 : (self.end - self.start) as usize
233 2484386 : }
234 :
235 213135 : pub(crate) fn build(self) -> VectoredRead {
236 213135 : VectoredRead {
237 213135 : start: self.start,
238 213135 : end: self.end,
239 213135 : blobs_at: self.blobs_at,
240 213135 : }
241 213135 : }
242 : }
243 :
244 : pub(crate) struct ChunkedVectoredReadBuilder {
245 : /// Start block number
246 : start_blk_no: usize,
247 : /// End block number (exclusive).
248 : end_blk_no: usize,
249 : /// Start offset and metadata for each blob in this read
250 : blobs_at: VecMap<u64, BlobMeta>,
251 : max_read_size: Option<usize>,
252 : /// Chunk size reads are coalesced into.
253 : chunk_size: usize,
254 : }
255 :
256 : /// Computes x / d rounded up.
257 5300230 : fn div_round_up(x: usize, d: usize) -> usize {
258 5300230 : (x + (d - 1)) / d
259 5300230 : }
260 :
261 : impl ChunkedVectoredReadBuilder {
262 : /// Start building a new vectored read.
263 : ///
264 : /// Note that by design, this does not check against reading more than `max_read_size` to
265 : /// support reading larger blobs than the configuration value. The builder will be single use
266 : /// however after that.
267 410980 : pub(crate) fn new(
268 410980 : start_offset: u64,
269 410980 : end_offset: u64,
270 410980 : meta: BlobMeta,
271 410980 : max_read_size: Option<usize>,
272 410980 : chunk_size: usize,
273 410980 : ) -> Self {
274 410980 : let mut blobs_at = VecMap::default();
275 410980 : blobs_at
276 410980 : .append(start_offset, meta)
277 410980 : .expect("First insertion always succeeds");
278 410980 :
279 410980 : let start_blk_no = start_offset as usize / chunk_size;
280 410980 : let end_blk_no = div_round_up(end_offset as usize, chunk_size);
281 410980 : Self {
282 410980 : start_blk_no,
283 410980 : end_blk_no,
284 410980 : blobs_at,
285 410980 : max_read_size,
286 410980 : chunk_size,
287 410980 : }
288 410980 : }
289 :
290 : /// Attempts to extend the current read with a new blob if the new blob resides in the same or the immediate next chunk.
291 : ///
292 : /// The resulting size also must be below the max read size.
293 4889232 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
294 4889232 : tracing::trace!(start, end, "trying to extend");
295 4889232 : let start_blk_no = start as usize / self.chunk_size;
296 4889232 : let end_blk_no = div_round_up(end as usize, self.chunk_size);
297 :
298 4889232 : let not_limited_by_max_read_size = {
299 4889232 : if let Some(max_read_size) = self.max_read_size {
300 715436 : let coalesced_size = (end_blk_no - self.start_blk_no) * self.chunk_size;
301 715436 : coalesced_size <= max_read_size
302 : } else {
303 4173796 : true
304 : }
305 : };
306 :
307 : // True if the second block starts in the same block or the immediate next block where the first block ended.
308 : //
309 : // Note: This automatically handles the case where two blocks are adjacent to each other,
310 : // whether they starts on chunk size boundary or not.
311 4889232 : let is_adjacent_chunk_read = {
312 : // 1. first.end & second.start are in the same block
313 4889232 : self.end_blk_no == start_blk_no + 1 ||
314 : // 2. first.end ends one block before second.start
315 2457497 : self.end_blk_no == start_blk_no
316 : };
317 :
318 4889232 : if is_adjacent_chunk_read && not_limited_by_max_read_size {
319 4827378 : self.end_blk_no = end_blk_no;
320 4827378 : self.blobs_at
321 4827378 : .append(start, meta)
322 4827378 : .expect("LSNs are ordered within vectored reads");
323 4827378 :
324 4827378 : return VectoredReadExtended::Yes;
325 61854 : }
326 61854 :
327 61854 : VectoredReadExtended::No
328 4889232 : }
329 :
330 4253350 : pub(crate) fn size(&self) -> usize {
331 4253350 : (self.end_blk_no - self.start_blk_no) * self.chunk_size
332 4253350 : }
333 :
334 410980 : pub(crate) fn build(self) -> VectoredRead {
335 410980 : let start = (self.start_blk_no * self.chunk_size) as u64;
336 410980 : let end = (self.end_blk_no * self.chunk_size) as u64;
337 410980 : VectoredRead {
338 410980 : start,
339 410980 : end,
340 410980 : blobs_at: self.blobs_at,
341 410980 : }
342 410980 : }
343 : }
344 :
345 : #[derive(Copy, Clone, Debug)]
346 : pub enum BlobFlag {
347 : None,
348 : Ignore,
349 : ReplaceAll,
350 : }
351 :
352 : /// Planner for vectored blob reads.
353 : ///
354 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
355 : /// and coalesced into disk reads.
356 : ///
357 : /// The implementation is very simple:
358 : /// * Collect all blob offsets in an ordered structure
359 : /// * Iterate over the collected blobs and coalesce them into reads at the end
360 : pub struct VectoredReadPlanner {
361 : // Track all the blob offsets. Start offsets must be ordered.
362 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
363 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
364 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
365 :
366 : max_read_size: usize,
367 :
368 : mode: VectoredReadCoalesceMode,
369 : }
370 :
371 : impl VectoredReadPlanner {
372 639441 : pub fn new(max_read_size: usize) -> Self {
373 639441 : let mode = VectoredReadCoalesceMode::get();
374 639441 : Self {
375 639441 : blobs: BTreeMap::new(),
376 639441 : prev: None,
377 639441 : max_read_size,
378 639441 : mode,
379 639441 : }
380 639441 : }
381 :
382 : /// Include a new blob in the read plan.
383 : ///
384 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
385 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
386 : /// keys in a given keyspace. This function must be called for each key in the desired
387 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
388 : /// be called after every range in the offset.
389 : ///
390 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
391 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
392 : /// incorrect data to the user.
393 : ///
394 : /// The `flag` argument has two interesting values:
395 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
396 : /// This is used for WAL records that `will_init`.
397 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
398 : /// if the blob is cached.
399 4238202 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
400 : // Implementation note: internally lag behind by one blob such that
401 : // we have a start and end offset when initialising [`VectoredRead`]
402 4238202 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
403 : None => {
404 411411 : self.prev = Some((key, lsn, offset, flag));
405 411411 : return;
406 : }
407 3826791 : Some(prev) => prev,
408 3826791 : };
409 3826791 :
410 3826791 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
411 3826791 :
412 3826791 : self.prev = Some((key, lsn, offset, flag));
413 4238202 : }
414 :
415 831088 : pub fn handle_range_end(&mut self, offset: u64) {
416 831088 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
417 411411 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
418 419677 : }
419 :
420 831088 : self.prev = None;
421 831088 : }
422 :
423 4238202 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
424 4238202 : match flag {
425 1015008 : BlobFlag::None => {
426 1015008 : let blobs_for_key = self.blobs.entry(key).or_default();
427 1015008 : blobs_for_key.push((lsn, start_offset, end_offset));
428 1015008 : }
429 770482 : BlobFlag::ReplaceAll => {
430 770482 : let blobs_for_key = self.blobs.entry(key).or_default();
431 770482 : blobs_for_key.clear();
432 770482 : blobs_for_key.push((lsn, start_offset, end_offset));
433 770482 : }
434 2452712 : BlobFlag::Ignore => {}
435 : }
436 4238202 : }
437 :
438 639441 : pub fn finish(self) -> Vec<VectoredRead> {
439 639441 : let mut current_read_builder: Option<VectoredReadBuilder> = None;
440 639441 : let mut reads = Vec::new();
441 :
442 1949746 : for (key, blobs_for_key) in self.blobs {
443 2774034 : for (lsn, start_offset, end_offset) in blobs_for_key {
444 1463729 : let extended = match &mut current_read_builder {
445 1073080 : Some(read_builder) => {
446 1073080 : read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
447 : }
448 390649 : None => VectoredReadExtended::No,
449 : };
450 :
451 1463729 : if extended == VectoredReadExtended::No {
452 490801 : let next_read_builder = VectoredReadBuilder::new(
453 490801 : start_offset,
454 490801 : end_offset,
455 490801 : BlobMeta { key, lsn },
456 490801 : self.max_read_size,
457 490801 : self.mode,
458 490801 : );
459 490801 :
460 490801 : let prev_read_builder = current_read_builder.replace(next_read_builder);
461 :
462 : // `current_read_builder` is None in the first iteration of the outer loop
463 490801 : if let Some(read_builder) = prev_read_builder {
464 100152 : reads.push(read_builder.build());
465 390649 : }
466 972928 : }
467 : }
468 : }
469 :
470 639441 : if let Some(read_builder) = current_read_builder {
471 390649 : reads.push(read_builder.build());
472 390649 : }
473 :
474 639441 : reads
475 639441 : }
476 : }
477 :
478 : /// Disk reader for vectored blob spans (does not go through the page cache)
479 : pub struct VectoredBlobReader<'a> {
480 : file: &'a VirtualFile,
481 : }
482 :
483 : impl<'a> VectoredBlobReader<'a> {
484 760345 : pub fn new(file: &'a VirtualFile) -> Self {
485 760345 : Self { file }
486 760345 : }
487 :
488 : /// Read the requested blobs into the buffer.
489 : ///
490 : /// We have to deal with the fact that blobs are not fixed size.
491 : /// Each blob is prefixed by a size header.
492 : ///
493 : /// The success return value is a struct which contains the buffer
494 : /// filled from disk and a list of offsets at which each blob lies
495 : /// in the buffer.
496 623943 : pub async fn read_blobs(
497 623943 : &self,
498 623943 : read: &VectoredRead,
499 623943 : buf: BytesMut,
500 623943 : ctx: &RequestContext,
501 623943 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
502 623943 : assert!(read.size() > 0);
503 623943 : assert!(
504 623943 : read.size() <= buf.capacity(),
505 0 : "{} > {}",
506 0 : read.size(),
507 0 : buf.capacity()
508 : );
509 :
510 623943 : if cfg!(debug_assertions) {
511 623943 : let align = virtual_file::get_io_buffer_alignment() as u64;
512 623943 : debug_assert_eq!(
513 623943 : read.start % align,
514 : 0,
515 0 : "Read start at {} does not satisfy the required io buffer alignment ({} bytes)",
516 : read.start,
517 : align
518 : );
519 0 : }
520 :
521 623943 : let mut buf = self
522 623943 : .file
523 623943 : .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
524 317305 : .await?
525 623943 : .into_inner();
526 623943 :
527 623943 : let blobs_at = read.blobs_at.as_slice();
528 623943 :
529 623943 : let start_offset = read.start;
530 623943 :
531 623943 : let mut metas = Vec::with_capacity(blobs_at.len());
532 623943 : // Blobs in `read` only provide their starting offset. The end offset
533 623943 : // of a blob is implicit: the start of the next blob if one exists
534 623943 : // or the end of the read.
535 623943 :
536 623943 : // Some scratch space, put here for reusing the allocation
537 623943 : let mut decompressed_vec = Vec::new();
538 :
539 8481558 : for (blob_start, meta) in blobs_at {
540 7857615 : let blob_start_in_buf = blob_start - start_offset;
541 7857615 : let first_len_byte = buf[blob_start_in_buf as usize];
542 :
543 : // Each blob is prefixed by a header containing its size and compression information.
544 : // Extract the size and skip that header to find the start of the data.
545 : // The size can be 1 or 4 bytes. The most significant bit is 0 in the
546 : // 1 byte case and 1 in the 4 byte case.
547 7857615 : let (size_length, blob_size, compression_bits) = if first_len_byte < 0x80 {
548 7744539 : (1, first_len_byte as u64, BYTE_UNCOMPRESSED)
549 : } else {
550 113076 : let mut blob_size_buf = [0u8; 4];
551 113076 : let offset_in_buf = blob_start_in_buf as usize;
552 113076 :
553 113076 : blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
554 113076 : blob_size_buf[0] &= !LEN_COMPRESSION_BIT_MASK;
555 113076 :
556 113076 : let compression_bits = first_len_byte & LEN_COMPRESSION_BIT_MASK;
557 113076 : (
558 113076 : 4,
559 113076 : u32::from_be_bytes(blob_size_buf) as u64,
560 113076 : compression_bits,
561 113076 : )
562 : };
563 :
564 7857615 : let start_raw = blob_start_in_buf + size_length;
565 7857615 : let end_raw = start_raw + blob_size;
566 7857615 : let (start, end);
567 7857615 : if compression_bits == BYTE_UNCOMPRESSED {
568 7857609 : start = start_raw as usize;
569 7857609 : end = end_raw as usize;
570 7857609 : } else if compression_bits == BYTE_ZSTD {
571 6 : let mut decoder =
572 6 : async_compression::tokio::write::ZstdDecoder::new(&mut decompressed_vec);
573 6 : decoder
574 6 : .write_all(&buf[start_raw as usize..end_raw as usize])
575 0 : .await?;
576 6 : decoder.flush().await?;
577 6 : start = buf.len();
578 6 : buf.extend_from_slice(&decompressed_vec);
579 6 : end = buf.len();
580 6 : decompressed_vec.clear();
581 : } else {
582 0 : let error = std::io::Error::new(
583 0 : std::io::ErrorKind::InvalidData,
584 0 : format!("invalid compression byte {compression_bits:x}"),
585 0 : );
586 0 : return Err(error);
587 : }
588 :
589 7857615 : metas.push(VectoredBlob {
590 7857615 : start,
591 7857615 : end,
592 7857615 : meta: *meta,
593 7857615 : });
594 : }
595 :
596 623943 : Ok(VectoredBlobsBuf { buf, blobs: metas })
597 623943 : }
598 : }
599 :
600 : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
601 : /// getting read blobs. It returns a batch when `handle` gets called and when the current key would just exceed the read_size and
602 : /// max_cnt constraints.
603 : pub struct StreamingVectoredReadPlanner {
604 : read_builder: Option<VectoredReadBuilder>,
605 : // Arguments for previous blob passed into [`StreamingVectoredReadPlanner::handle`]
606 : prev: Option<(Key, Lsn, u64)>,
607 : /// Max read size per batch. This is not a strict limit. If there are [0, 100) and [100, 200), while the `max_read_size` is 150,
608 : /// we will produce a single batch instead of split them.
609 : max_read_size: u64,
610 : /// Max item count per batch
611 : max_cnt: usize,
612 : /// Size of the current batch
613 : cnt: usize,
614 :
615 : mode: VectoredReadCoalesceMode,
616 : }
617 :
618 : impl StreamingVectoredReadPlanner {
619 2238 : pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
620 2238 : assert!(max_cnt > 0);
621 2238 : assert!(max_read_size > 0);
622 2238 : let mode = VectoredReadCoalesceMode::get();
623 2238 : Self {
624 2238 : read_builder: None,
625 2238 : prev: None,
626 2238 : max_cnt,
627 2238 : max_read_size,
628 2238 : cnt: 0,
629 2238 : mode,
630 2238 : }
631 2238 : }
632 :
633 6381922 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64) -> Option<VectoredRead> {
634 : // Implementation note: internally lag behind by one blob such that
635 : // we have a start and end offset when initialising [`VectoredRead`]
636 6381922 : let (prev_key, prev_lsn, prev_offset) = match self.prev {
637 : None => {
638 1896 : self.prev = Some((key, lsn, offset));
639 1896 : return None;
640 : }
641 6380026 : Some(prev) => prev,
642 6380026 : };
643 6380026 :
644 6380026 : let res = self.add_blob(prev_key, prev_lsn, prev_offset, offset, false);
645 6380026 :
646 6380026 : self.prev = Some((key, lsn, offset));
647 6380026 :
648 6380026 : res
649 6381922 : }
650 :
651 1734 : pub fn handle_range_end(&mut self, offset: u64) -> Option<VectoredRead> {
652 1734 : let res = if let Some((prev_key, prev_lsn, prev_offset)) = self.prev {
653 1728 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, true)
654 : } else {
655 6 : None
656 : };
657 :
658 1734 : self.prev = None;
659 1734 :
660 1734 : res
661 1734 : }
662 :
663 6381754 : fn add_blob(
664 6381754 : &mut self,
665 6381754 : key: Key,
666 6381754 : lsn: Lsn,
667 6381754 : start_offset: u64,
668 6381754 : end_offset: u64,
669 6381754 : is_last_blob_in_read: bool,
670 6381754 : ) -> Option<VectoredRead> {
671 6381754 : match &mut self.read_builder {
672 6260824 : Some(read_builder) => {
673 6260824 : let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
674 6260824 : assert_eq!(extended, VectoredReadExtended::Yes);
675 : }
676 120930 : None => {
677 120930 : self.read_builder = {
678 120930 : Some(VectoredReadBuilder::new_streaming(
679 120930 : start_offset,
680 120930 : end_offset,
681 120930 : BlobMeta { key, lsn },
682 120930 : self.mode,
683 120930 : ))
684 120930 : };
685 120930 : }
686 : }
687 6381754 : let read_builder = self.read_builder.as_mut().unwrap();
688 6381754 : self.cnt += 1;
689 6381754 : if is_last_blob_in_read
690 6380026 : || read_builder.size() >= self.max_read_size as usize
691 6295054 : || self.cnt >= self.max_cnt
692 : {
693 120930 : let prev_read_builder = self.read_builder.take();
694 120930 : self.cnt = 0;
695 :
696 : // `current_read_builder` is None in the first iteration
697 120930 : if let Some(read_builder) = prev_read_builder {
698 120930 : return Some(read_builder.build());
699 0 : }
700 6260824 : }
701 6260824 : None
702 6381754 : }
703 : }
704 :
705 : #[cfg(test)]
706 : mod tests {
707 : use anyhow::Error;
708 :
709 : use crate::context::DownloadBehavior;
710 : use crate::page_cache::PAGE_SZ;
711 : use crate::task_mgr::TaskKind;
712 :
713 : use super::super::blob_io::tests::{random_array, write_maybe_compressed};
714 : use super::*;
715 :
716 132 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
717 132 : let align = virtual_file::get_io_buffer_alignment() as u64;
718 132 : assert_eq!(read.start % align, 0);
719 132 : assert_eq!(read.start / align, offset_range.first().unwrap().2 / align);
720 :
721 216 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
722 132 :
723 132 : let offsets_in_read: Vec<_> = read
724 132 : .blobs_at
725 132 : .as_slice()
726 132 : .iter()
727 216 : .map(|(offset, _)| *offset)
728 132 : .collect();
729 132 :
730 132 : assert_eq!(expected_offsets_in_read, offsets_in_read);
731 132 : }
732 :
733 : #[test]
734 6 : fn planner_chunked_coalesce_all_test() {
735 6 : use crate::virtual_file;
736 6 :
737 6 : let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
738 6 :
739 6 : // The test explicitly does not check chunk size < 512
740 6 : if chunk_size < 512 {
741 4 : return;
742 2 : }
743 2 :
744 2 : let max_read_size = chunk_size as usize * 8;
745 2 : let key = Key::MIN;
746 2 : let lsn = Lsn(0);
747 2 :
748 2 : let blob_descriptions = [
749 2 : (key, lsn, chunk_size / 8, BlobFlag::None), // Read 1 BEGIN
750 2 : (key, lsn, chunk_size / 4, BlobFlag::Ignore), // Gap
751 2 : (key, lsn, chunk_size / 2, BlobFlag::None),
752 2 : (key, lsn, chunk_size - 2, BlobFlag::Ignore), // Gap
753 2 : (key, lsn, chunk_size, BlobFlag::None),
754 2 : (key, lsn, chunk_size * 2 - 1, BlobFlag::None),
755 2 : (key, lsn, chunk_size * 2 + 1, BlobFlag::Ignore), // Gap
756 2 : (key, lsn, chunk_size * 3 + 1, BlobFlag::None),
757 2 : (key, lsn, chunk_size * 5 + 1, BlobFlag::None),
758 2 : (key, lsn, chunk_size * 6 + 1, BlobFlag::Ignore), // skipped chunk size, but not a chunk: should coalesce.
759 2 : (key, lsn, chunk_size * 7 + 1, BlobFlag::None),
760 2 : (key, lsn, chunk_size * 8, BlobFlag::None), // Read 2 BEGIN (b/c max_read_size)
761 2 : (key, lsn, chunk_size * 9, BlobFlag::Ignore), // ==== skipped a chunk
762 2 : (key, lsn, chunk_size * 10, BlobFlag::None), // Read 3 BEGIN (cannot coalesce)
763 2 : ];
764 2 :
765 2 : let ranges = [
766 2 : &[
767 2 : blob_descriptions[0],
768 2 : blob_descriptions[2],
769 2 : blob_descriptions[4],
770 2 : blob_descriptions[5],
771 2 : blob_descriptions[7],
772 2 : blob_descriptions[8],
773 2 : blob_descriptions[10],
774 2 : ],
775 2 : &blob_descriptions[11..12],
776 2 : &blob_descriptions[13..],
777 2 : ];
778 2 :
779 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
780 30 : for (key, lsn, offset, flag) in blob_descriptions {
781 28 : planner.handle(key, lsn, offset, flag);
782 28 : }
783 :
784 2 : planner.handle_range_end(652 * 1024);
785 2 :
786 2 : let reads = planner.finish();
787 2 :
788 2 : assert_eq!(reads.len(), ranges.len());
789 :
790 6 : for (idx, read) in reads.iter().enumerate() {
791 6 : validate_read(read, ranges[idx]);
792 6 : }
793 6 : }
794 :
795 : #[test]
796 6 : fn planner_max_read_size_test() {
797 6 : let max_read_size = 128 * 1024;
798 6 : let key = Key::MIN;
799 6 : let lsn = Lsn(0);
800 6 :
801 6 : let blob_descriptions = vec![
802 6 : (key, lsn, 0, BlobFlag::None),
803 6 : (key, lsn, 32 * 1024, BlobFlag::None),
804 6 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
805 6 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
806 6 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
807 6 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
808 6 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
809 6 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
810 6 : ];
811 6 :
812 6 : let ranges = [
813 6 : &blob_descriptions[0..3],
814 6 : &blob_descriptions[3..4],
815 6 : &blob_descriptions[4..5],
816 6 : &blob_descriptions[5..6],
817 6 : &blob_descriptions[6..7],
818 6 : &blob_descriptions[7..],
819 6 : ];
820 6 :
821 6 : let mut planner = VectoredReadPlanner::new(max_read_size);
822 48 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
823 48 : planner.handle(key, lsn, offset, flag);
824 48 : }
825 :
826 6 : planner.handle_range_end(652 * 1024);
827 6 :
828 6 : let reads = planner.finish();
829 6 :
830 6 : assert_eq!(reads.len(), 6);
831 :
832 : // TODO: could remove zero reads to produce 5 reads here
833 :
834 36 : for (idx, read) in reads.iter().enumerate() {
835 36 : validate_read(read, ranges[idx]);
836 36 : }
837 6 : }
838 :
839 : #[test]
840 6 : fn planner_replacement_test() {
841 6 : let chunk_size = virtual_file::get_io_buffer_alignment() as u64;
842 6 : let max_read_size = 128 * chunk_size as usize;
843 6 : let first_key = Key::MIN;
844 6 : let second_key = first_key.next();
845 6 : let lsn = Lsn(0);
846 6 :
847 6 : let blob_descriptions = vec![
848 6 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
849 6 : (first_key, lsn, chunk_size, BlobFlag::None), // Last in read 1
850 6 : (second_key, lsn, 2 * chunk_size, BlobFlag::ReplaceAll),
851 6 : (second_key, lsn, 3 * chunk_size, BlobFlag::None),
852 6 : (second_key, lsn, 4 * chunk_size, BlobFlag::ReplaceAll), // First in read 2
853 6 : (second_key, lsn, 5 * chunk_size, BlobFlag::None), // Last in read 2
854 6 : ];
855 6 :
856 6 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
857 6 :
858 6 : let mut planner = VectoredReadPlanner::new(max_read_size);
859 36 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
860 36 : planner.handle(key, lsn, offset, flag);
861 36 : }
862 :
863 6 : planner.handle_range_end(6 * chunk_size);
864 6 :
865 6 : let reads = planner.finish();
866 6 : assert_eq!(reads.len(), 2);
867 :
868 12 : for (idx, read) in reads.iter().enumerate() {
869 12 : validate_read(read, ranges[idx]);
870 12 : }
871 6 : }
872 :
873 : #[test]
874 6 : fn streaming_planner_max_read_size_test() {
875 6 : let max_read_size = 128 * 1024;
876 6 : let key = Key::MIN;
877 6 : let lsn = Lsn(0);
878 6 :
879 6 : let blob_descriptions = vec![
880 6 : (key, lsn, 0, BlobFlag::None),
881 6 : (key, lsn, 32 * 1024, BlobFlag::None),
882 6 : (key, lsn, 96 * 1024, BlobFlag::None),
883 6 : (key, lsn, 128 * 1024, BlobFlag::None),
884 6 : (key, lsn, 198 * 1024, BlobFlag::None),
885 6 : (key, lsn, 268 * 1024, BlobFlag::None),
886 6 : (key, lsn, 396 * 1024, BlobFlag::None),
887 6 : (key, lsn, 652 * 1024, BlobFlag::None),
888 6 : ];
889 6 :
890 6 : let ranges = [
891 6 : &blob_descriptions[0..3],
892 6 : &blob_descriptions[3..5],
893 6 : &blob_descriptions[5..6],
894 6 : &blob_descriptions[6..7],
895 6 : &blob_descriptions[7..],
896 6 : ];
897 6 :
898 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1000);
899 6 : let mut reads = Vec::new();
900 48 : for (key, lsn, offset, _) in blob_descriptions.clone() {
901 48 : reads.extend(planner.handle(key, lsn, offset));
902 48 : }
903 6 : reads.extend(planner.handle_range_end(652 * 1024));
904 6 :
905 6 : assert_eq!(reads.len(), ranges.len());
906 :
907 30 : for (idx, read) in reads.iter().enumerate() {
908 30 : validate_read(read, ranges[idx]);
909 30 : }
910 6 : }
911 :
912 : #[test]
913 6 : fn streaming_planner_max_cnt_test() {
914 6 : let max_read_size = 1024 * 1024;
915 6 : let key = Key::MIN;
916 6 : let lsn = Lsn(0);
917 6 :
918 6 : let blob_descriptions = vec![
919 6 : (key, lsn, 0, BlobFlag::None),
920 6 : (key, lsn, 32 * 1024, BlobFlag::None),
921 6 : (key, lsn, 96 * 1024, BlobFlag::None),
922 6 : (key, lsn, 128 * 1024, BlobFlag::None),
923 6 : (key, lsn, 198 * 1024, BlobFlag::None),
924 6 : (key, lsn, 268 * 1024, BlobFlag::None),
925 6 : (key, lsn, 396 * 1024, BlobFlag::None),
926 6 : (key, lsn, 652 * 1024, BlobFlag::None),
927 6 : ];
928 6 :
929 6 : let ranges = [
930 6 : &blob_descriptions[0..2],
931 6 : &blob_descriptions[2..4],
932 6 : &blob_descriptions[4..6],
933 6 : &blob_descriptions[6..],
934 6 : ];
935 6 :
936 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
937 6 : let mut reads = Vec::new();
938 48 : for (key, lsn, offset, _) in blob_descriptions.clone() {
939 48 : reads.extend(planner.handle(key, lsn, offset));
940 48 : }
941 6 : reads.extend(planner.handle_range_end(652 * 1024));
942 6 :
943 6 : assert_eq!(reads.len(), ranges.len());
944 :
945 24 : for (idx, read) in reads.iter().enumerate() {
946 24 : validate_read(read, ranges[idx]);
947 24 : }
948 6 : }
949 :
950 : #[test]
951 6 : fn streaming_planner_edge_test() {
952 6 : let max_read_size = 1024 * 1024;
953 6 : let key = Key::MIN;
954 6 : let lsn = Lsn(0);
955 6 : {
956 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
957 6 : let mut reads = Vec::new();
958 6 : reads.extend(planner.handle_range_end(652 * 1024));
959 6 : assert!(reads.is_empty());
960 : }
961 : {
962 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
963 6 : let mut reads = Vec::new();
964 6 : reads.extend(planner.handle(key, lsn, 0));
965 6 : reads.extend(planner.handle_range_end(652 * 1024));
966 6 : assert_eq!(reads.len(), 1);
967 6 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
968 6 : }
969 6 : {
970 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 1);
971 6 : let mut reads = Vec::new();
972 6 : reads.extend(planner.handle(key, lsn, 0));
973 6 : reads.extend(planner.handle(key, lsn, 128 * 1024));
974 6 : reads.extend(planner.handle_range_end(652 * 1024));
975 6 : assert_eq!(reads.len(), 2);
976 6 : validate_read(&reads[0], &[(key, lsn, 0, BlobFlag::None)]);
977 6 : validate_read(&reads[1], &[(key, lsn, 128 * 1024, BlobFlag::None)]);
978 6 : }
979 6 : {
980 6 : let mut planner = StreamingVectoredReadPlanner::new(max_read_size, 2);
981 6 : let mut reads = Vec::new();
982 6 : reads.extend(planner.handle(key, lsn, 0));
983 6 : reads.extend(planner.handle(key, lsn, 128 * 1024));
984 6 : reads.extend(planner.handle_range_end(652 * 1024));
985 6 : assert_eq!(reads.len(), 1);
986 6 : validate_read(
987 6 : &reads[0],
988 6 : &[
989 6 : (key, lsn, 0, BlobFlag::None),
990 6 : (key, lsn, 128 * 1024, BlobFlag::None),
991 6 : ],
992 6 : );
993 6 : }
994 6 : }
995 :
996 24 : async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
997 24 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
998 24 : let (_temp_dir, pathbuf, offsets) =
999 837 : write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
1000 :
1001 24 : let file = VirtualFile::open(&pathbuf, &ctx).await?;
1002 24 : let file_len = std::fs::metadata(&pathbuf)?.len();
1003 24 :
1004 24 : // Multiply by two (compressed data might need more space), and add a few bytes for the header
1005 12360 : let reserved_bytes = blobs.iter().map(|bl| bl.len()).max().unwrap() * 2 + 16;
1006 24 : let mut buf = BytesMut::with_capacity(reserved_bytes);
1007 24 :
1008 24 : let mode = VectoredReadCoalesceMode::get();
1009 24 : let vectored_blob_reader = VectoredBlobReader::new(&file);
1010 24 : let meta = BlobMeta {
1011 24 : key: Key::MIN,
1012 24 : lsn: Lsn(0),
1013 24 : };
1014 :
1015 12360 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
1016 12360 : let end = offsets.get(idx + 1).unwrap_or(&file_len);
1017 12360 : if idx + 1 == offsets.len() {
1018 24 : continue;
1019 12336 : }
1020 12336 : let read_builder = VectoredReadBuilder::new(*offset, *end, meta, 16 * 4096, mode);
1021 12336 : let read = read_builder.build();
1022 12336 : let result = vectored_blob_reader.read_blobs(&read, buf, &ctx).await?;
1023 12336 : assert_eq!(result.blobs.len(), 1);
1024 12336 : let read_blob = &result.blobs[0];
1025 12336 : let read_buf = &result.buf[read_blob.start..read_blob.end];
1026 12336 : assert_eq!(blob, read_buf, "mismatch for idx={idx} at offset={offset}");
1027 12336 : buf = result.buf;
1028 : }
1029 24 : Ok(())
1030 24 : }
1031 :
1032 : #[tokio::test]
1033 6 : async fn test_really_big_array() -> Result<(), Error> {
1034 6 : let blobs = &[
1035 6 : b"test".to_vec(),
1036 6 : random_array(10 * PAGE_SZ),
1037 6 : b"hello".to_vec(),
1038 6 : random_array(66 * PAGE_SZ),
1039 6 : vec![0xf3; 24 * PAGE_SZ],
1040 6 : b"foobar".to_vec(),
1041 6 : ];
1042 42 : round_trip_test_compressed(blobs, false).await?;
1043 33 : round_trip_test_compressed(blobs, true).await?;
1044 6 : Ok(())
1045 6 : }
1046 :
1047 : #[tokio::test]
1048 6 : async fn test_arrays_inc() -> Result<(), Error> {
1049 6 : let blobs = (0..PAGE_SZ / 8)
1050 6144 : .map(|v| random_array(v * 16))
1051 6 : .collect::<Vec<_>>();
1052 3516 : round_trip_test_compressed(&blobs, false).await?;
1053 3516 : round_trip_test_compressed(&blobs, true).await?;
1054 6 : Ok(())
1055 6 : }
1056 :
1057 : #[test]
1058 6 : fn test_div_round_up() {
1059 6 : const CHUNK_SIZE: usize = 512;
1060 6 : assert_eq!(1, div_round_up(200, CHUNK_SIZE));
1061 6 : assert_eq!(1, div_round_up(CHUNK_SIZE, CHUNK_SIZE));
1062 6 : assert_eq!(2, div_round_up(CHUNK_SIZE + 1, CHUNK_SIZE));
1063 6 : }
1064 : }
|