Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::num::NonZeroUsize;
20 :
21 : use bytes::BytesMut;
22 : use pageserver_api::key::Key;
23 : use utils::lsn::Lsn;
24 : use utils::vec_map::VecMap;
25 :
26 : use crate::context::RequestContext;
27 : use crate::virtual_file::VirtualFile;
28 :
29 : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
30 : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
31 :
32 : /// Metadata bundled with the start and end offset of a blob.
33 : #[derive(Copy, Clone, Debug)]
34 : pub struct BlobMeta {
35 : pub key: Key,
36 : pub lsn: Lsn,
37 : }
38 :
39 : /// Blob offsets into [`VectoredBlobsBuf::buf`]
40 : pub struct VectoredBlob {
41 : pub start: usize,
42 : pub end: usize,
43 : pub meta: BlobMeta,
44 : }
45 :
46 : /// Return type of [`VectoredBlobReader::read_blobs`]
47 : pub struct VectoredBlobsBuf {
48 : /// Buffer for all blobs in this read
49 : pub buf: BytesMut,
50 : /// Offsets into the buffer and metadata for all blobs in this read
51 : pub blobs: Vec<VectoredBlob>,
52 : }
53 :
54 : /// Description of one disk read for multiple blobs.
55 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
56 : #[derive(Debug)]
57 : pub struct VectoredRead {
58 : pub start: u64,
59 : pub end: u64,
60 : /// Starting offsets and metadata for each blob in this read
61 : pub blobs_at: VecMap<u64, BlobMeta>,
62 : }
63 :
64 : impl VectoredRead {
65 230392 : pub(crate) fn size(&self) -> usize {
66 230392 : (self.end - self.start) as usize
67 230392 : }
68 : }
69 :
70 : #[derive(Eq, PartialEq)]
71 : pub(crate) enum VectoredReadExtended {
72 : Yes,
73 : No,
74 : }
75 :
76 : pub(crate) struct VectoredReadBuilder {
77 : start: u64,
78 : end: u64,
79 : blobs_at: VecMap<u64, BlobMeta>,
80 : max_read_size: Option<usize>,
81 : }
82 :
83 : impl VectoredReadBuilder {
84 : /// Start building a new vectored read.
85 : ///
86 : /// Note that by design, this does not check against reading more than `max_read_size` to
87 : /// support reading larger blobs than the configuration value. The builder will be single use
88 : /// however after that.
89 57535 : pub(crate) fn new(
90 57535 : start_offset: u64,
91 57535 : end_offset: u64,
92 57535 : meta: BlobMeta,
93 57535 : max_read_size: Option<usize>,
94 57535 : ) -> Self {
95 57535 : let mut blobs_at = VecMap::default();
96 57535 : blobs_at
97 57535 : .append(start_offset, meta)
98 57535 : .expect("First insertion always succeeds");
99 57535 :
100 57535 : Self {
101 57535 : start: start_offset,
102 57535 : end: end_offset,
103 57535 : blobs_at,
104 57535 : max_read_size,
105 57535 : }
106 57535 : }
107 :
108 : /// Attempt to extend the current read with a new blob if the start
109 : /// offset matches with the current end of the vectored read
110 : /// and the resuting size is below the max read size
111 366934 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
112 366934 : tracing::trace!(start, end, "trying to extend");
113 366934 : let size = (end - start) as usize;
114 366934 : if self.end == start && {
115 348123 : if let Some(max_read_size) = self.max_read_size {
116 338795 : self.size() + size <= max_read_size
117 : } else {
118 9328 : true
119 : }
120 : } {
121 328757 : self.end = end;
122 328757 : self.blobs_at
123 328757 : .append(start, meta)
124 328757 : .expect("LSNs are ordered within vectored reads");
125 328757 :
126 328757 : return VectoredReadExtended::Yes;
127 38177 : }
128 38177 :
129 38177 : VectoredReadExtended::No
130 366934 : }
131 :
132 338795 : pub(crate) fn size(&self) -> usize {
133 338795 : (self.end - self.start) as usize
134 338795 : }
135 :
136 57535 : pub(crate) fn build(self) -> VectoredRead {
137 57535 : VectoredRead {
138 57535 : start: self.start,
139 57535 : end: self.end,
140 57535 : blobs_at: self.blobs_at,
141 57535 : }
142 57535 : }
143 : }
144 :
145 : #[derive(Copy, Clone, Debug)]
146 : pub enum BlobFlag {
147 : None,
148 : Ignore,
149 : ReplaceAll,
150 : }
151 :
152 : /// Planner for vectored blob reads.
153 : ///
154 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
155 : /// and coalesced into disk reads.
156 : ///
157 : /// The implementation is very simple:
158 : /// * Collect all blob offsets in an ordered structure
159 : /// * Iterate over the collected blobs and coalesce them into reads at the end
160 : pub struct VectoredReadPlanner {
161 : // Track all the blob offsets. Start offsets must be ordered.
162 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
163 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
164 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
165 :
166 : max_read_size: Option<usize>,
167 : }
168 :
169 : impl VectoredReadPlanner {
170 442 : pub fn new(max_read_size: usize) -> Self {
171 442 : Self {
172 442 : blobs: BTreeMap::new(),
173 442 : prev: None,
174 442 : max_read_size: Some(max_read_size),
175 442 : }
176 442 : }
177 :
178 : /// This function should *only* be used if the caller has a way to control the limit. e.g., in [`StreamingVectoredReadPlanner`],
179 : /// it uses the vectored read planner to avoid duplicated logic on handling blob start/end, while expecting the vectored
180 : /// read planner to give a single read to a continuous range of bytes in the image layer. Therefore, it does not need the
181 : /// code path to split reads into chunks of `max_read_size`, and controls the read size itself.
182 : #[cfg(test)]
183 19054 : pub(crate) fn new_caller_controlled_max_limit() -> Self {
184 19054 : Self {
185 19054 : blobs: BTreeMap::new(),
186 19054 : prev: None,
187 19054 : max_read_size: None,
188 19054 : }
189 19054 : }
190 :
191 : /// Include a new blob in the read plan.
192 : ///
193 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
194 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
195 : /// keys in a given keyspace. This function must be called for each key in the desired
196 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
197 : /// be called after every range in the offset.
198 : ///
199 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
200 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
201 : /// incorrect data to the user.
202 : ///
203 : /// The `flag` argument has two interesting values:
204 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
205 : /// This is used for WAL records that `will_init`.
206 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
207 : /// if the blob is cached.
208 1234300 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
209 : // Implementation note: internally lag behind by one blob such that
210 : // we have a start and end offset when initialising [`VectoredRead`]
211 1234300 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
212 : None => {
213 26242 : self.prev = Some((key, lsn, offset, flag));
214 26242 : return;
215 : }
216 1208058 : Some(prev) => prev,
217 1208058 : };
218 1208058 :
219 1208058 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
220 1208058 :
221 1208058 : self.prev = Some((key, lsn, offset, flag));
222 1234300 : }
223 :
224 83368 : pub fn handle_range_end(&mut self, offset: u64) {
225 83368 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
226 26214 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
227 57154 : }
228 :
229 83368 : self.prev = None;
230 83368 : }
231 :
232 1234272 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
233 1234272 : match flag {
234 358230 : BlobFlag::None => {
235 358230 : let blobs_for_key = self.blobs.entry(key).or_default();
236 358230 : blobs_for_key.push((lsn, start_offset, end_offset));
237 358230 : }
238 58585 : BlobFlag::ReplaceAll => {
239 58585 : let blobs_for_key = self.blobs.entry(key).or_default();
240 58585 : blobs_for_key.clear();
241 58585 : blobs_for_key.push((lsn, start_offset, end_offset));
242 58585 : }
243 817457 : BlobFlag::Ignore => {}
244 : }
245 1234272 : }
246 :
247 19384 : pub fn finish(self) -> Vec<VectoredRead> {
248 19384 : let mut current_read_builder: Option<VectoredReadBuilder> = None;
249 19384 : let mut reads = Vec::new();
250 :
251 354604 : for (key, blobs_for_key) in self.blobs {
252 721480 : for (lsn, start_offset, end_offset) in blobs_for_key {
253 386260 : let extended = match &mut current_read_builder {
254 366912 : Some(read_builder) => {
255 366912 : read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
256 : }
257 19348 : None => VectoredReadExtended::No,
258 : };
259 :
260 386260 : if extended == VectoredReadExtended::No {
261 57519 : let next_read_builder = VectoredReadBuilder::new(
262 57519 : start_offset,
263 57519 : end_offset,
264 57519 : BlobMeta { key, lsn },
265 57519 : self.max_read_size,
266 57519 : );
267 57519 :
268 57519 : let prev_read_builder = current_read_builder.replace(next_read_builder);
269 :
270 : // `current_read_builder` is None in the first iteration of the outer loop
271 57519 : if let Some(read_builder) = prev_read_builder {
272 38171 : reads.push(read_builder.build());
273 38171 : }
274 328741 : }
275 : }
276 : }
277 :
278 19384 : if let Some(read_builder) = current_read_builder {
279 19348 : reads.push(read_builder.build());
280 19348 : }
281 :
282 19384 : reads
283 19384 : }
284 : }
285 :
286 : /// Disk reader for vectored blob spans (does not go through the page cache)
287 : pub struct VectoredBlobReader<'a> {
288 : file: &'a VirtualFile,
289 : }
290 :
291 : impl<'a> VectoredBlobReader<'a> {
292 19394 : pub fn new(file: &'a VirtualFile) -> Self {
293 19394 : Self { file }
294 19394 : }
295 :
296 : /// Read the requested blobs into the buffer.
297 : ///
298 : /// We have to deal with the fact that blobs are not fixed size.
299 : /// Each blob is prefixed by a size header.
300 : ///
301 : /// The success return value is a struct which contains the buffer
302 : /// filled from disk and a list of offsets at which each blob lies
303 : /// in the buffer.
304 57513 : pub async fn read_blobs(
305 57513 : &self,
306 57513 : read: &VectoredRead,
307 57513 : buf: BytesMut,
308 57513 : ctx: &RequestContext,
309 57513 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
310 57513 : assert!(read.size() > 0);
311 57513 : assert!(
312 57513 : read.size() <= buf.capacity(),
313 0 : "{} > {}",
314 0 : read.size(),
315 0 : buf.capacity()
316 : );
317 57513 : let buf = self
318 57513 : .file
319 57513 : .read_exact_at_n(buf, read.start, read.size(), ctx)
320 29190 : .await?;
321 :
322 57513 : let blobs_at = read.blobs_at.as_slice();
323 57513 : let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
324 57513 :
325 57513 : let mut metas = Vec::with_capacity(blobs_at.len());
326 57513 :
327 57513 : // Blobs in `read` only provide their starting offset. The end offset
328 57513 : // of a blob is implicit: the start of the next blob if one exists
329 57513 : // or the end of the read.
330 57513 : let pairs = blobs_at.iter().zip(
331 57513 : blobs_at
332 57513 : .iter()
333 57513 : .map(Some)
334 57513 : .skip(1)
335 57513 : .chain(std::iter::once(None)),
336 57513 : );
337 :
338 443753 : for ((offset, meta), next) in pairs {
339 386240 : let offset_in_buf = offset - start_offset;
340 386240 : let first_len_byte = buf[offset_in_buf as usize];
341 :
342 : // Each blob is prefixed by a header containing it's size.
343 : // Extract the size and skip that header to find the start of the data.
344 : // The size can be 1 or 4 bytes. The most significant bit is 0 in the
345 : // 1 byte case and 1 in the 4 byte case.
346 386240 : let (size_length, blob_size) = if first_len_byte < 0x80 {
347 352622 : (1, first_len_byte as u64)
348 : } else {
349 33618 : let mut blob_size_buf = [0u8; 4];
350 33618 : let offset_in_buf = offset_in_buf as usize;
351 33618 :
352 33618 : blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
353 33618 : blob_size_buf[0] &= 0x7f;
354 33618 : (4, u32::from_be_bytes(blob_size_buf) as u64)
355 : };
356 :
357 386240 : let start = offset_in_buf + size_length;
358 386240 : let end = match next {
359 328727 : Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
360 57513 : None => start + blob_size,
361 : };
362 :
363 386240 : assert_eq!(end - start, blob_size);
364 :
365 386240 : metas.push(VectoredBlob {
366 386240 : start: start as usize,
367 386240 : end: end as usize,
368 386240 : meta: *meta,
369 386240 : })
370 : }
371 :
372 57513 : Ok(VectoredBlobsBuf { buf, blobs: metas })
373 57513 : }
374 : }
375 :
376 : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
377 : /// getting read blobs. It returns a batch when `handle` gets called and when the current key would exceed the read_size and
378 : /// max_cnt constraints. Underlying it uses [`VectoredReadPlanner`].
379 : #[cfg(test)]
380 : pub struct StreamingVectoredReadPlanner {
381 : planner: VectoredReadPlanner,
382 : /// Max read size per batch
383 : max_read_size: u64,
384 : /// Max item count per batch
385 : max_cnt: usize,
386 : /// The first offset of this batch
387 : this_batch_first_offset: Option<u64>,
388 : /// Size of the current batch
389 : cnt: usize,
390 : }
391 :
392 : #[cfg(test)]
393 : impl StreamingVectoredReadPlanner {
394 112 : pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
395 112 : assert!(max_cnt > 0);
396 112 : assert!(max_read_size > 0);
397 112 : Self {
398 112 : // We want to have exactly one read syscall (plus several others for index lookup) for each `next_batch` call.
399 112 : // Therefore, we enforce `self.max_read_size` by ourselves instead of using the VectoredReadPlanner's capability,
400 112 : // to avoid splitting into two I/Os.
401 112 : planner: VectoredReadPlanner::new_caller_controlled_max_limit(),
402 112 : max_cnt,
403 112 : max_read_size,
404 112 : this_batch_first_offset: None,
405 112 : cnt: 0,
406 112 : }
407 112 : }
408 :
409 18942 : fn emit(&mut self, this_batch_first_offset: u64) -> VectoredRead {
410 18942 : let planner = std::mem::replace(
411 18942 : &mut self.planner,
412 18942 : VectoredReadPlanner::new_caller_controlled_max_limit(),
413 18942 : );
414 18942 : self.this_batch_first_offset = Some(this_batch_first_offset);
415 18942 : self.cnt = 1;
416 18942 : let mut batch = planner.finish();
417 18942 : assert_eq!(batch.len(), 1, "should have exactly one read batch");
418 18942 : batch.pop().unwrap()
419 18942 : }
420 :
421 28298 : pub fn handle(
422 28298 : &mut self,
423 28298 : key: Key,
424 28298 : lsn: Lsn,
425 28298 : offset: u64,
426 28298 : flag: BlobFlag,
427 28298 : ) -> Option<VectoredRead> {
428 28298 : if let Some(begin_offset) = self.this_batch_first_offset {
429 : // Each batch will have at least one item b/c `self.this_batch_first_offset` is set
430 : // after one item gets processed
431 28242 : if offset - begin_offset > self.max_read_size {
432 14028 : self.planner.handle_range_end(offset); // End the current batch with the offset
433 14028 : let batch = self.emit(offset); // Produce a batch
434 14028 : self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch
435 14028 : return Some(batch);
436 14214 : }
437 : } else {
438 56 : self.this_batch_first_offset = Some(offset)
439 : }
440 14270 : if self.cnt >= self.max_cnt {
441 4886 : self.planner.handle_range_end(offset); // End the current batch with the offset
442 4886 : let batch = self.emit(offset); // Produce a batch
443 4886 : self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch
444 4886 : return Some(batch);
445 9384 : }
446 9384 : self.planner.handle(key, lsn, offset, flag); // Add this key to the current batch
447 9384 : self.cnt += 1;
448 9384 : None
449 28298 : }
450 :
451 28 : pub fn handle_range_end(&mut self, offset: u64) -> VectoredRead {
452 28 : self.planner.handle_range_end(offset);
453 28 : self.emit(offset)
454 28 : }
455 : }
456 :
457 : #[cfg(test)]
458 : mod tests {
459 : use super::*;
460 :
461 16 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
462 16 : assert_eq!(read.start, offset_range.first().unwrap().2);
463 :
464 24 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
465 16 :
466 16 : let offsets_in_read: Vec<_> = read
467 16 : .blobs_at
468 16 : .as_slice()
469 16 : .iter()
470 24 : .map(|(offset, _)| *offset)
471 16 : .collect();
472 16 :
473 16 : assert_eq!(expected_offsets_in_read, offsets_in_read);
474 16 : }
475 :
476 : #[test]
477 2 : fn planner_max_read_size_test() {
478 2 : let max_read_size = 128 * 1024;
479 2 : let key = Key::MIN;
480 2 : let lsn = Lsn(0);
481 2 :
482 2 : let blob_descriptions = vec![
483 2 : (key, lsn, 0, BlobFlag::None),
484 2 : (key, lsn, 32 * 1024, BlobFlag::None),
485 2 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
486 2 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
487 2 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
488 2 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
489 2 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
490 2 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
491 2 : ];
492 2 :
493 2 : let ranges = [
494 2 : &blob_descriptions[0..3],
495 2 : &blob_descriptions[3..4],
496 2 : &blob_descriptions[4..5],
497 2 : &blob_descriptions[5..6],
498 2 : &blob_descriptions[6..7],
499 2 : &blob_descriptions[7..],
500 2 : ];
501 2 :
502 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
503 16 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
504 16 : planner.handle(key, lsn, offset, flag);
505 16 : }
506 :
507 2 : planner.handle_range_end(652 * 1024);
508 2 :
509 2 : let reads = planner.finish();
510 2 : assert_eq!(reads.len(), 6);
511 :
512 12 : for (idx, read) in reads.iter().enumerate() {
513 12 : validate_read(read, ranges[idx]);
514 12 : }
515 2 : }
516 :
517 : #[test]
518 2 : fn planner_replacement_test() {
519 2 : let max_read_size = 128 * 1024;
520 2 : let first_key = Key::MIN;
521 2 : let second_key = first_key.next();
522 2 : let lsn = Lsn(0);
523 2 :
524 2 : let blob_descriptions = vec![
525 2 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
526 2 : (first_key, lsn, 1024, BlobFlag::None), // Last in read 1
527 2 : (second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll),
528 2 : (second_key, lsn, 3 * 1024, BlobFlag::None),
529 2 : (second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2
530 2 : (second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2
531 2 : ];
532 2 :
533 2 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
534 2 :
535 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
536 12 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
537 12 : planner.handle(key, lsn, offset, flag);
538 12 : }
539 :
540 2 : planner.handle_range_end(6 * 1024);
541 2 :
542 2 : let reads = planner.finish();
543 2 : assert_eq!(reads.len(), 2);
544 :
545 4 : for (idx, read) in reads.iter().enumerate() {
546 4 : validate_read(read, ranges[idx]);
547 4 : }
548 2 : }
549 : }
|