Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::num::NonZeroUsize;
20 :
21 : use bytes::BytesMut;
22 : use pageserver_api::key::Key;
23 : use tokio_epoll_uring::BoundedBuf;
24 : use utils::lsn::Lsn;
25 : use utils::vec_map::VecMap;
26 :
27 : use crate::context::RequestContext;
28 : use crate::virtual_file::VirtualFile;
29 :
30 : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
31 : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
32 :
33 : /// Metadata bundled with the start and end offset of a blob.
34 : #[derive(Copy, Clone, Debug)]
35 : pub struct BlobMeta {
36 : pub key: Key,
37 : pub lsn: Lsn,
38 : }
39 :
40 : /// Blob offsets into [`VectoredBlobsBuf::buf`]
41 : pub struct VectoredBlob {
42 : pub start: usize,
43 : pub end: usize,
44 : pub meta: BlobMeta,
45 : }
46 :
47 : /// Return type of [`VectoredBlobReader::read_blobs`]
48 : pub struct VectoredBlobsBuf {
49 : /// Buffer for all blobs in this read
50 : pub buf: BytesMut,
51 : /// Offsets into the buffer and metadata for all blobs in this read
52 : pub blobs: Vec<VectoredBlob>,
53 : }
54 :
55 : /// Description of one disk read for multiple blobs.
56 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
57 : #[derive(Debug)]
58 : pub struct VectoredRead {
59 : pub start: u64,
60 : pub end: u64,
61 : /// Starting offsets and metadata for each blob in this read
62 : pub blobs_at: VecMap<u64, BlobMeta>,
63 : }
64 :
65 : impl VectoredRead {
66 306976 : pub(crate) fn size(&self) -> usize {
67 306976 : (self.end - self.start) as usize
68 306976 : }
69 : }
70 :
71 : #[derive(Eq, PartialEq)]
72 : pub(crate) enum VectoredReadExtended {
73 : Yes,
74 : No,
75 : }
76 :
77 : pub(crate) struct VectoredReadBuilder {
78 : start: u64,
79 : end: u64,
80 : blobs_at: VecMap<u64, BlobMeta>,
81 : max_read_size: Option<usize>,
82 : }
83 :
84 : impl VectoredReadBuilder {
85 : /// Start building a new vectored read.
86 : ///
87 : /// Note that by design, this does not check against reading more than `max_read_size` to
88 : /// support reading larger blobs than the configuration value. The builder will be single use
89 : /// however after that.
90 76681 : pub(crate) fn new(
91 76681 : start_offset: u64,
92 76681 : end_offset: u64,
93 76681 : meta: BlobMeta,
94 76681 : max_read_size: Option<usize>,
95 76681 : ) -> Self {
96 76681 : let mut blobs_at = VecMap::default();
97 76681 : blobs_at
98 76681 : .append(start_offset, meta)
99 76681 : .expect("First insertion always succeeds");
100 76681 :
101 76681 : Self {
102 76681 : start: start_offset,
103 76681 : end: end_offset,
104 76681 : blobs_at,
105 76681 : max_read_size,
106 76681 : }
107 76681 : }
108 :
109 : /// Attempt to extend the current read with a new blob if the start
110 : /// offset matches with the current end of the vectored read
111 : /// and the resuting size is below the max read size
112 376262 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
113 376262 : tracing::trace!(start, end, "trying to extend");
114 376262 : let size = (end - start) as usize;
115 376262 : if self.end == start && {
116 357247 : if let Some(max_read_size) = self.max_read_size {
117 338591 : self.size() + size <= max_read_size
118 : } else {
119 18656 : true
120 : }
121 : } {
122 337881 : self.end = end;
123 337881 : self.blobs_at
124 337881 : .append(start, meta)
125 337881 : .expect("LSNs are ordered within vectored reads");
126 337881 :
127 337881 : return VectoredReadExtended::Yes;
128 38381 : }
129 38381 :
130 38381 : VectoredReadExtended::No
131 376262 : }
132 :
133 338591 : pub(crate) fn size(&self) -> usize {
134 338591 : (self.end - self.start) as usize
135 338591 : }
136 :
137 76681 : pub(crate) fn build(self) -> VectoredRead {
138 76681 : VectoredRead {
139 76681 : start: self.start,
140 76681 : end: self.end,
141 76681 : blobs_at: self.blobs_at,
142 76681 : }
143 76681 : }
144 : }
145 :
146 : #[derive(Copy, Clone, Debug)]
147 : pub enum BlobFlag {
148 : None,
149 : Ignore,
150 : ReplaceAll,
151 : }
152 :
153 : /// Planner for vectored blob reads.
154 : ///
155 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
156 : /// and coalesced into disk reads.
157 : ///
158 : /// The implementation is very simple:
159 : /// * Collect all blob offsets in an ordered structure
160 : /// * Iterate over the collected blobs and coalesce them into reads at the end
161 : pub struct VectoredReadPlanner {
162 : // Track all the blob offsets. Start offsets must be ordered.
163 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
164 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
165 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
166 :
167 : max_read_size: Option<usize>,
168 : }
169 :
170 : impl VectoredReadPlanner {
171 442 : pub fn new(max_read_size: usize) -> Self {
172 442 : Self {
173 442 : blobs: BTreeMap::new(),
174 442 : prev: None,
175 442 : max_read_size: Some(max_read_size),
176 442 : }
177 442 : }
178 :
179 : /// This function should *only* be used if the caller has a way to control the limit. e.g., in [`StreamingVectoredReadPlanner`],
180 : /// it uses the vectored read planner to avoid duplicated logic on handling blob start/end, while expecting the vectored
181 : /// read planner to give a single read to a continuous range of bytes in the image layer. Therefore, it does not need the
182 : /// code path to split reads into chunks of `max_read_size`, and controls the read size itself.
183 : #[cfg(test)]
184 38108 : pub(crate) fn new_caller_controlled_max_limit() -> Self {
185 38108 : Self {
186 38108 : blobs: BTreeMap::new(),
187 38108 : prev: None,
188 38108 : max_read_size: None,
189 38108 : }
190 38108 : }
191 :
192 : /// Include a new blob in the read plan.
193 : ///
194 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
195 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
196 : /// keys in a given keyspace. This function must be called for each key in the desired
197 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
198 : /// be called after every range in the offset.
199 : ///
200 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
201 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
202 : /// incorrect data to the user.
203 : ///
204 : /// The `flag` argument has two interesting values:
205 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
206 : /// This is used for WAL records that `will_init`.
207 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
208 : /// if the blob is cached.
209 1263040 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
210 : // Implementation note: internally lag behind by one blob such that
211 : // we have a start and end offset when initialising [`VectoredRead`]
212 1263040 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
213 : None => {
214 45262 : self.prev = Some((key, lsn, offset, flag));
215 45262 : return;
216 : }
217 1217778 : Some(prev) => prev,
218 1217778 : };
219 1217778 :
220 1217778 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
221 1217778 :
222 1217778 : self.prev = Some((key, lsn, offset, flag));
223 1263040 : }
224 :
225 102173 : pub fn handle_range_end(&mut self, offset: u64) {
226 102173 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
227 45206 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
228 56967 : }
229 :
230 102173 : self.prev = None;
231 102173 : }
232 :
233 1262984 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
234 1262984 : match flag {
235 386446 : BlobFlag::None => {
236 386446 : let blobs_for_key = self.blobs.entry(key).or_default();
237 386446 : blobs_for_key.push((lsn, start_offset, end_offset));
238 386446 : }
239 58864 : BlobFlag::ReplaceAll => {
240 58864 : let blobs_for_key = self.blobs.entry(key).or_default();
241 58864 : blobs_for_key.clear();
242 58864 : blobs_for_key.push((lsn, start_offset, end_offset));
243 58864 : }
244 817674 : BlobFlag::Ignore => {}
245 : }
246 1262984 : }
247 :
248 38326 : pub fn finish(self) -> Vec<VectoredRead> {
249 38326 : let mut current_read_builder: Option<VectoredReadBuilder> = None;
250 38326 : let mut reads = Vec::new();
251 :
252 393240 : for (key, blobs_for_key) in self.blobs {
253 769444 : for (lsn, start_offset, end_offset) in blobs_for_key {
254 414530 : let extended = match &mut current_read_builder {
255 376240 : Some(read_builder) => {
256 376240 : read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
257 : }
258 38290 : None => VectoredReadExtended::No,
259 : };
260 :
261 414530 : if extended == VectoredReadExtended::No {
262 76665 : let next_read_builder = VectoredReadBuilder::new(
263 76665 : start_offset,
264 76665 : end_offset,
265 76665 : BlobMeta { key, lsn },
266 76665 : self.max_read_size,
267 76665 : );
268 76665 :
269 76665 : let prev_read_builder = current_read_builder.replace(next_read_builder);
270 :
271 : // `current_read_builder` is None in the first iteration of the outer loop
272 76665 : if let Some(read_builder) = prev_read_builder {
273 38375 : reads.push(read_builder.build());
274 38375 : }
275 337865 : }
276 : }
277 : }
278 :
279 38326 : if let Some(read_builder) = current_read_builder {
280 38290 : reads.push(read_builder.build());
281 38290 : }
282 :
283 38326 : reads
284 38326 : }
285 : }
286 :
287 : /// Disk reader for vectored blob spans (does not go through the page cache)
288 : pub struct VectoredBlobReader<'a> {
289 : file: &'a VirtualFile,
290 : }
291 :
292 : impl<'a> VectoredBlobReader<'a> {
293 38336 : pub fn new(file: &'a VirtualFile) -> Self {
294 38336 : Self { file }
295 38336 : }
296 :
297 : /// Read the requested blobs into the buffer.
298 : ///
299 : /// We have to deal with the fact that blobs are not fixed size.
300 : /// Each blob is prefixed by a size header.
301 : ///
302 : /// The success return value is a struct which contains the buffer
303 : /// filled from disk and a list of offsets at which each blob lies
304 : /// in the buffer.
305 76659 : pub async fn read_blobs(
306 76659 : &self,
307 76659 : read: &VectoredRead,
308 76659 : buf: BytesMut,
309 76659 : ctx: &RequestContext,
310 76659 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
311 76659 : assert!(read.size() > 0);
312 76659 : assert!(
313 76659 : read.size() <= buf.capacity(),
314 0 : "{} > {}",
315 0 : read.size(),
316 0 : buf.capacity()
317 : );
318 76659 : let buf = self
319 76659 : .file
320 76659 : .read_exact_at(buf.slice(0..read.size()), read.start, ctx)
321 38930 : .await?
322 76659 : .into_inner();
323 76659 :
324 76659 : let blobs_at = read.blobs_at.as_slice();
325 76659 : let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
326 76659 :
327 76659 : let mut metas = Vec::with_capacity(blobs_at.len());
328 76659 :
329 76659 : // Blobs in `read` only provide their starting offset. The end offset
330 76659 : // of a blob is implicit: the start of the next blob if one exists
331 76659 : // or the end of the read.
332 76659 : let pairs = blobs_at.iter().zip(
333 76659 : blobs_at
334 76659 : .iter()
335 76659 : .map(Some)
336 76659 : .skip(1)
337 76659 : .chain(std::iter::once(None)),
338 76659 : );
339 :
340 491169 : for ((offset, meta), next) in pairs {
341 414510 : let offset_in_buf = offset - start_offset;
342 414510 : let first_len_byte = buf[offset_in_buf as usize];
343 :
344 : // Each blob is prefixed by a header containing it's size.
345 : // Extract the size and skip that header to find the start of the data.
346 : // The size can be 1 or 4 bytes. The most significant bit is 0 in the
347 : // 1 byte case and 1 in the 4 byte case.
348 414510 : let (size_length, blob_size) = if first_len_byte < 0x80 {
349 380892 : (1, first_len_byte as u64)
350 : } else {
351 33618 : let mut blob_size_buf = [0u8; 4];
352 33618 : let offset_in_buf = offset_in_buf as usize;
353 33618 :
354 33618 : blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
355 33618 : blob_size_buf[0] &= 0x7f;
356 33618 : (4, u32::from_be_bytes(blob_size_buf) as u64)
357 : };
358 :
359 414510 : let start = offset_in_buf + size_length;
360 414510 : let end = match next {
361 337851 : Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
362 76659 : None => start + blob_size,
363 : };
364 :
365 414510 : assert_eq!(end - start, blob_size);
366 :
367 414510 : metas.push(VectoredBlob {
368 414510 : start: start as usize,
369 414510 : end: end as usize,
370 414510 : meta: *meta,
371 414510 : })
372 : }
373 :
374 76659 : Ok(VectoredBlobsBuf { buf, blobs: metas })
375 76659 : }
376 : }
377 :
378 : /// Read planner used in [`crate::tenant::storage_layer::image_layer::ImageLayerIterator`]. It provides a streaming API for
379 : /// getting read blobs. It returns a batch when `handle` gets called and when the current key would exceed the read_size and
380 : /// max_cnt constraints. Underlying it uses [`VectoredReadPlanner`].
381 : #[cfg(test)]
382 : pub struct StreamingVectoredReadPlanner {
383 : planner: VectoredReadPlanner,
384 : /// Max read size per batch
385 : max_read_size: u64,
386 : /// Max item count per batch
387 : max_cnt: usize,
388 : /// The first offset of this batch
389 : this_batch_first_offset: Option<u64>,
390 : /// Size of the current batch
391 : cnt: usize,
392 : }
393 :
394 : #[cfg(test)]
395 : impl StreamingVectoredReadPlanner {
396 224 : pub fn new(max_read_size: u64, max_cnt: usize) -> Self {
397 224 : assert!(max_cnt > 0);
398 224 : assert!(max_read_size > 0);
399 224 : Self {
400 224 : // We want to have exactly one read syscall (plus several others for index lookup) for each `next_batch` call.
401 224 : // Therefore, we enforce `self.max_read_size` by ourselves instead of using the VectoredReadPlanner's capability,
402 224 : // to avoid splitting into two I/Os.
403 224 : planner: VectoredReadPlanner::new_caller_controlled_max_limit(),
404 224 : max_cnt,
405 224 : max_read_size,
406 224 : this_batch_first_offset: None,
407 224 : cnt: 0,
408 224 : }
409 224 : }
410 :
411 37884 : fn emit(&mut self, this_batch_first_offset: u64) -> VectoredRead {
412 37884 : let planner = std::mem::replace(
413 37884 : &mut self.planner,
414 37884 : VectoredReadPlanner::new_caller_controlled_max_limit(),
415 37884 : );
416 37884 : self.this_batch_first_offset = Some(this_batch_first_offset);
417 37884 : self.cnt = 1;
418 37884 : let mut batch = planner.finish();
419 37884 : assert_eq!(batch.len(), 1, "should have exactly one read batch");
420 37884 : batch.pop().unwrap()
421 37884 : }
422 :
423 56596 : pub fn handle(
424 56596 : &mut self,
425 56596 : key: Key,
426 56596 : lsn: Lsn,
427 56596 : offset: u64,
428 56596 : flag: BlobFlag,
429 56596 : ) -> Option<VectoredRead> {
430 56596 : if let Some(begin_offset) = self.this_batch_first_offset {
431 : // Each batch will have at least one item b/c `self.this_batch_first_offset` is set
432 : // after one item gets processed
433 56484 : if offset - begin_offset > self.max_read_size {
434 28056 : self.planner.handle_range_end(offset); // End the current batch with the offset
435 28056 : let batch = self.emit(offset); // Produce a batch
436 28056 : self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch
437 28056 : return Some(batch);
438 28428 : }
439 : } else {
440 112 : self.this_batch_first_offset = Some(offset)
441 : }
442 28540 : if self.cnt >= self.max_cnt {
443 9772 : self.planner.handle_range_end(offset); // End the current batch with the offset
444 9772 : let batch = self.emit(offset); // Produce a batch
445 9772 : self.planner.handle(key, lsn, offset, flag); // Add this key to the next batch
446 9772 : return Some(batch);
447 18768 : }
448 18768 : self.planner.handle(key, lsn, offset, flag); // Add this key to the current batch
449 18768 : self.cnt += 1;
450 18768 : None
451 56596 : }
452 :
453 56 : pub fn handle_range_end(&mut self, offset: u64) -> VectoredRead {
454 56 : self.planner.handle_range_end(offset);
455 56 : self.emit(offset)
456 56 : }
457 : }
458 :
459 : #[cfg(test)]
460 : mod tests {
461 : use super::*;
462 :
463 16 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
464 16 : assert_eq!(read.start, offset_range.first().unwrap().2);
465 :
466 24 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
467 16 :
468 16 : let offsets_in_read: Vec<_> = read
469 16 : .blobs_at
470 16 : .as_slice()
471 16 : .iter()
472 24 : .map(|(offset, _)| *offset)
473 16 : .collect();
474 16 :
475 16 : assert_eq!(expected_offsets_in_read, offsets_in_read);
476 16 : }
477 :
478 : #[test]
479 2 : fn planner_max_read_size_test() {
480 2 : let max_read_size = 128 * 1024;
481 2 : let key = Key::MIN;
482 2 : let lsn = Lsn(0);
483 2 :
484 2 : let blob_descriptions = vec![
485 2 : (key, lsn, 0, BlobFlag::None),
486 2 : (key, lsn, 32 * 1024, BlobFlag::None),
487 2 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
488 2 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
489 2 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
490 2 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
491 2 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
492 2 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
493 2 : ];
494 2 :
495 2 : let ranges = [
496 2 : &blob_descriptions[0..3],
497 2 : &blob_descriptions[3..4],
498 2 : &blob_descriptions[4..5],
499 2 : &blob_descriptions[5..6],
500 2 : &blob_descriptions[6..7],
501 2 : &blob_descriptions[7..],
502 2 : ];
503 2 :
504 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
505 16 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
506 16 : planner.handle(key, lsn, offset, flag);
507 16 : }
508 :
509 2 : planner.handle_range_end(652 * 1024);
510 2 :
511 2 : let reads = planner.finish();
512 2 : assert_eq!(reads.len(), 6);
513 :
514 12 : for (idx, read) in reads.iter().enumerate() {
515 12 : validate_read(read, ranges[idx]);
516 12 : }
517 2 : }
518 :
519 : #[test]
520 2 : fn planner_replacement_test() {
521 2 : let max_read_size = 128 * 1024;
522 2 : let first_key = Key::MIN;
523 2 : let second_key = first_key.next();
524 2 : let lsn = Lsn(0);
525 2 :
526 2 : let blob_descriptions = vec![
527 2 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
528 2 : (first_key, lsn, 1024, BlobFlag::None), // Last in read 1
529 2 : (second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll),
530 2 : (second_key, lsn, 3 * 1024, BlobFlag::None),
531 2 : (second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2
532 2 : (second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2
533 2 : ];
534 2 :
535 2 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
536 2 :
537 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
538 12 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
539 12 : planner.handle(key, lsn, offset, flag);
540 12 : }
541 :
542 2 : planner.handle_range_end(6 * 1024);
543 2 :
544 2 : let reads = planner.finish();
545 2 : assert_eq!(reads.len(), 2);
546 :
547 4 : for (idx, read) in reads.iter().enumerate() {
548 4 : validate_read(read, ranges[idx]);
549 4 : }
550 2 : }
551 : }
|