Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::num::NonZeroUsize;
20 :
21 : use bytes::BytesMut;
22 : use pageserver_api::key::Key;
23 : use utils::lsn::Lsn;
24 : use utils::vec_map::VecMap;
25 :
26 : use crate::context::RequestContext;
27 : use crate::virtual_file::VirtualFile;
28 :
29 : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
30 : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
31 :
32 : /// Metadata bundled with the start and end offset of a blob.
33 : #[derive(Copy, Clone, Debug)]
34 : pub struct BlobMeta {
35 : pub key: Key,
36 : pub lsn: Lsn,
37 : }
38 :
39 : /// Blob offsets into [`VectoredBlobsBuf::buf`]
40 : pub struct VectoredBlob {
41 : pub start: usize,
42 : pub end: usize,
43 : pub meta: BlobMeta,
44 : }
45 :
46 : /// Return type of [`VectoredBlobReader::read_blobs`]
47 : pub struct VectoredBlobsBuf {
48 : /// Buffer for all blobs in this read
49 : pub buf: BytesMut,
50 : /// Offsets into the buffer and metadata for all blobs in this read
51 : pub blobs: Vec<VectoredBlob>,
52 : }
53 :
54 : /// Description of one disk read for multiple blobs.
55 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
56 : #[derive(Debug)]
57 : pub struct VectoredRead {
58 : pub start: u64,
59 : pub end: u64,
60 : /// Starting offsets and metadata for each blob in this read
61 : pub blobs_at: VecMap<u64, BlobMeta>,
62 : }
63 :
64 : impl VectoredRead {
65 154952 : pub(crate) fn size(&self) -> usize {
66 154952 : (self.end - self.start) as usize
67 154952 : }
68 : }
69 :
70 : #[derive(Eq, PartialEq)]
71 : pub(crate) enum VectoredReadExtended {
72 : Yes,
73 : No,
74 : }
75 :
76 : pub(crate) struct VectoredReadBuilder {
77 : start: u64,
78 : end: u64,
79 : blobs_at: VecMap<u64, BlobMeta>,
80 : max_read_size: usize,
81 : }
82 :
83 : impl VectoredReadBuilder {
84 : /// Start building a new vectored read.
85 : ///
86 : /// Note that by design, this does not check against reading more than `max_read_size` to
87 : /// support reading larger blobs than the configuration value. The builder will be single use
88 : /// however after that.
89 38675 : pub(crate) fn new(
90 38675 : start_offset: u64,
91 38675 : end_offset: u64,
92 38675 : meta: BlobMeta,
93 38675 : max_read_size: usize,
94 38675 : ) -> Self {
95 38675 : let mut blobs_at = VecMap::default();
96 38675 : blobs_at
97 38675 : .append(start_offset, meta)
98 38675 : .expect("First insertion always succeeds");
99 38675 :
100 38675 : Self {
101 38675 : start: start_offset,
102 38675 : end: end_offset,
103 38675 : blobs_at,
104 38675 : max_read_size,
105 38675 : }
106 38675 : }
107 :
108 : /// Attempt to extend the current read with a new blob if the start
109 : /// offset matches with the current end of the vectored read
110 : /// and the resuting size is below the max read size
111 357606 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
112 357606 : tracing::trace!(start, end, "trying to extend");
113 357606 : let size = (end - start) as usize;
114 357606 : if self.end == start && self.size() + size <= self.max_read_size {
115 319347 : self.end = end;
116 319347 : self.blobs_at
117 319347 : .append(start, meta)
118 319347 : .expect("LSNs are ordered within vectored reads");
119 319347 :
120 319347 : return VectoredReadExtended::Yes;
121 38259 : }
122 38259 :
123 38259 : VectoredReadExtended::No
124 357606 : }
125 :
126 338713 : pub(crate) fn size(&self) -> usize {
127 338713 : (self.end - self.start) as usize
128 338713 : }
129 :
130 38675 : pub(crate) fn build(self) -> VectoredRead {
131 38675 : VectoredRead {
132 38675 : start: self.start,
133 38675 : end: self.end,
134 38675 : blobs_at: self.blobs_at,
135 38675 : }
136 38675 : }
137 : }
138 :
139 : #[derive(Copy, Clone, Debug)]
140 : pub enum BlobFlag {
141 : None,
142 : Ignore,
143 : ReplaceAll,
144 : }
145 :
146 : /// Planner for vectored blob reads.
147 : ///
148 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
149 : /// and coalesced into disk reads.
150 : ///
151 : /// The implementation is very simple:
152 : /// * Collect all blob offsets in an ordered structure
153 : /// * Iterate over the collected blobs and coalesce them into reads at the end
154 : pub struct VectoredReadPlanner {
155 : // Track all the blob offsets. Start offsets must be ordered.
156 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
157 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
158 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
159 :
160 : max_read_size: usize,
161 : }
162 :
163 : impl VectoredReadPlanner {
164 442 : pub fn new(max_read_size: usize) -> Self {
165 442 : Self {
166 442 : blobs: BTreeMap::new(),
167 442 : prev: None,
168 442 : max_read_size,
169 442 : }
170 442 : }
171 :
172 : /// Include a new blob in the read plan.
173 : ///
174 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
175 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
176 : /// keys in a given keyspace. This function must be called for each key in the desired
177 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
178 : /// be called after every range in the offset.
179 : ///
180 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
181 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
182 : /// incorrect data to the user.
183 : ///
184 : /// The `flag` argument has two interesting values:
185 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
186 : /// This is used for WAL records that `will_init`.
187 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
188 : /// if the blob is cached.
189 1206187 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
190 : // Implementation note: internally lag behind by one blob such that
191 : // we have a start and end offset when initialising [`VectoredRead`]
192 1206187 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
193 : None => {
194 7259 : self.prev = Some((key, lsn, offset, flag));
195 7259 : return;
196 : }
197 1198928 : Some(prev) => prev,
198 1198928 : };
199 1198928 :
200 1198928 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
201 1198928 :
202 1198928 : self.prev = Some((key, lsn, offset, flag));
203 1206187 : }
204 :
205 64299 : pub fn handle_range_end(&mut self, offset: u64) {
206 64299 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
207 7259 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
208 57040 : }
209 :
210 64299 : self.prev = None;
211 64299 : }
212 :
213 1206187 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
214 1206187 : match flag {
215 329995 : BlobFlag::None => {
216 329995 : let blobs_for_key = self.blobs.entry(key).or_default();
217 329995 : blobs_for_key.push((lsn, start_offset, end_offset));
218 329995 : }
219 58632 : BlobFlag::ReplaceAll => {
220 58632 : let blobs_for_key = self.blobs.entry(key).or_default();
221 58632 : blobs_for_key.clear();
222 58632 : blobs_for_key.push((lsn, start_offset, end_offset));
223 58632 : }
224 817560 : BlobFlag::Ignore => {}
225 : }
226 1206187 : }
227 :
228 442 : pub fn finish(self) -> Vec<VectoredRead> {
229 442 : let mut current_read_builder: Option<VectoredReadBuilder> = None;
230 442 : let mut reads = Vec::new();
231 :
232 307392 : for (key, blobs_for_key) in self.blobs {
233 664940 : for (lsn, start_offset, end_offset) in blobs_for_key {
234 357990 : let extended = match &mut current_read_builder {
235 357584 : Some(read_builder) => {
236 357584 : read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
237 : }
238 406 : None => VectoredReadExtended::No,
239 : };
240 :
241 357990 : if extended == VectoredReadExtended::No {
242 38659 : let next_read_builder = VectoredReadBuilder::new(
243 38659 : start_offset,
244 38659 : end_offset,
245 38659 : BlobMeta { key, lsn },
246 38659 : self.max_read_size,
247 38659 : );
248 38659 :
249 38659 : let prev_read_builder = current_read_builder.replace(next_read_builder);
250 :
251 : // `current_read_builder` is None in the first iteration of the outer loop
252 38659 : if let Some(read_builder) = prev_read_builder {
253 38253 : reads.push(read_builder.build());
254 38253 : }
255 319331 : }
256 : }
257 : }
258 :
259 442 : if let Some(read_builder) = current_read_builder {
260 406 : reads.push(read_builder.build());
261 406 : }
262 :
263 442 : reads
264 442 : }
265 : }
266 :
267 : /// Disk reader for vectored blob spans (does not go through the page cache)
268 : pub struct VectoredBlobReader<'a> {
269 : file: &'a VirtualFile,
270 : }
271 :
272 : impl<'a> VectoredBlobReader<'a> {
273 452 : pub fn new(file: &'a VirtualFile) -> Self {
274 452 : Self { file }
275 452 : }
276 :
277 : /// Read the requested blobs into the buffer.
278 : ///
279 : /// We have to deal with the fact that blobs are not fixed size.
280 : /// Each blob is prefixed by a size header.
281 : ///
282 : /// The success return value is a struct which contains the buffer
283 : /// filled from disk and a list of offsets at which each blob lies
284 : /// in the buffer.
285 38653 : pub async fn read_blobs(
286 38653 : &self,
287 38653 : read: &VectoredRead,
288 38653 : buf: BytesMut,
289 38653 : ctx: &RequestContext,
290 38653 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
291 38653 : assert!(read.size() > 0);
292 38653 : assert!(
293 38653 : read.size() <= buf.capacity(),
294 0 : "{} > {}",
295 0 : read.size(),
296 0 : buf.capacity()
297 : );
298 38653 : let buf = self
299 38653 : .file
300 38653 : .read_exact_at_n(buf, read.start, read.size(), ctx)
301 19621 : .await?;
302 :
303 38653 : let blobs_at = read.blobs_at.as_slice();
304 38653 : let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
305 38653 :
306 38653 : let mut metas = Vec::with_capacity(blobs_at.len());
307 38653 :
308 38653 : // Blobs in `read` only provide their starting offset. The end offset
309 38653 : // of a blob is implicit: the start of the next blob if one exists
310 38653 : // or the end of the read.
311 38653 : let pairs = blobs_at.iter().zip(
312 38653 : blobs_at
313 38653 : .iter()
314 38653 : .map(Some)
315 38653 : .skip(1)
316 38653 : .chain(std::iter::once(None)),
317 38653 : );
318 :
319 396623 : for ((offset, meta), next) in pairs {
320 357970 : let offset_in_buf = offset - start_offset;
321 357970 : let first_len_byte = buf[offset_in_buf as usize];
322 :
323 : // Each blob is prefixed by a header containing it's size.
324 : // Extract the size and skip that header to find the start of the data.
325 : // The size can be 1 or 4 bytes. The most significant bit is 0 in the
326 : // 1 byte case and 1 in the 4 byte case.
327 357970 : let (size_length, blob_size) = if first_len_byte < 0x80 {
328 324352 : (1, first_len_byte as u64)
329 : } else {
330 33618 : let mut blob_size_buf = [0u8; 4];
331 33618 : let offset_in_buf = offset_in_buf as usize;
332 33618 :
333 33618 : blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
334 33618 : blob_size_buf[0] &= 0x7f;
335 33618 : (4, u32::from_be_bytes(blob_size_buf) as u64)
336 : };
337 :
338 357970 : let start = offset_in_buf + size_length;
339 357970 : let end = match next {
340 319317 : Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
341 38653 : None => start + blob_size,
342 : };
343 :
344 357970 : assert_eq!(end - start, blob_size);
345 :
346 357970 : metas.push(VectoredBlob {
347 357970 : start: start as usize,
348 357970 : end: end as usize,
349 357970 : meta: *meta,
350 357970 : })
351 : }
352 :
353 38653 : Ok(VectoredBlobsBuf { buf, blobs: metas })
354 38653 : }
355 : }
356 :
357 : #[cfg(test)]
358 : mod tests {
359 : use super::*;
360 :
361 16 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
362 16 : assert_eq!(read.start, offset_range.first().unwrap().2);
363 :
364 24 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
365 16 :
366 16 : let offsets_in_read: Vec<_> = read
367 16 : .blobs_at
368 16 : .as_slice()
369 16 : .iter()
370 24 : .map(|(offset, _)| *offset)
371 16 : .collect();
372 16 :
373 16 : assert_eq!(expected_offsets_in_read, offsets_in_read);
374 16 : }
375 :
376 : #[test]
377 2 : fn planner_max_read_size_test() {
378 2 : let max_read_size = 128 * 1024;
379 2 : let key = Key::MIN;
380 2 : let lsn = Lsn(0);
381 2 :
382 2 : let blob_descriptions = vec![
383 2 : (key, lsn, 0, BlobFlag::None),
384 2 : (key, lsn, 32 * 1024, BlobFlag::None),
385 2 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
386 2 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
387 2 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
388 2 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
389 2 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
390 2 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
391 2 : ];
392 2 :
393 2 : let ranges = [
394 2 : &blob_descriptions[0..3],
395 2 : &blob_descriptions[3..4],
396 2 : &blob_descriptions[4..5],
397 2 : &blob_descriptions[5..6],
398 2 : &blob_descriptions[6..7],
399 2 : &blob_descriptions[7..],
400 2 : ];
401 2 :
402 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
403 16 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
404 16 : planner.handle(key, lsn, offset, flag);
405 16 : }
406 :
407 2 : planner.handle_range_end(652 * 1024);
408 2 :
409 2 : let reads = planner.finish();
410 2 : assert_eq!(reads.len(), 6);
411 :
412 12 : for (idx, read) in reads.iter().enumerate() {
413 12 : validate_read(read, ranges[idx]);
414 12 : }
415 2 : }
416 :
417 : #[test]
418 2 : fn planner_replacement_test() {
419 2 : let max_read_size = 128 * 1024;
420 2 : let first_key = Key::MIN;
421 2 : let second_key = first_key.next();
422 2 : let lsn = Lsn(0);
423 2 :
424 2 : let blob_descriptions = vec![
425 2 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
426 2 : (first_key, lsn, 1024, BlobFlag::None), // Last in read 1
427 2 : (second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll),
428 2 : (second_key, lsn, 3 * 1024, BlobFlag::None),
429 2 : (second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2
430 2 : (second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2
431 2 : ];
432 2 :
433 2 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
434 2 :
435 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
436 12 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
437 12 : planner.handle(key, lsn, offset, flag);
438 12 : }
439 :
440 2 : planner.handle_range_end(6 * 1024);
441 2 :
442 2 : let reads = planner.finish();
443 2 : assert_eq!(reads.len(), 2);
444 :
445 4 : for (idx, read) in reads.iter().enumerate() {
446 4 : validate_read(read, ranges[idx]);
447 4 : }
448 2 : }
449 : }
|