Line data Source code
1 : //!
2 : //! Utilities for vectored reading of variable-sized "blobs".
3 : //!
4 : //! The "blob" api is an abstraction on top of the "block" api,
5 : //! with the main difference being that blobs do not have a fixed
6 : //! size (each blob is prefixed with 1 or 4 byte length field)
7 : //!
8 : //! The vectored apis provided in this module allow for planning
9 : //! and executing disk IO which covers multiple blobs.
10 : //!
11 : //! Reads are planned with [`VectoredReadPlanner`] which will coalesce
12 : //! adjacent blocks into a single disk IO request and exectuted by
13 : //! [`VectoredBlobReader`] which does all the required offset juggling
14 : //! and returns a buffer housing all the blobs and a list of offsets.
15 : //!
16 : //! Note that the vectored blob api does *not* go through the page cache.
17 :
18 : use std::collections::BTreeMap;
19 : use std::num::NonZeroUsize;
20 :
21 : use bytes::BytesMut;
22 : use pageserver_api::key::Key;
23 : use utils::lsn::Lsn;
24 : use utils::vec_map::VecMap;
25 :
26 : use crate::virtual_file::VirtualFile;
27 :
28 : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
29 : pub struct MaxVectoredReadBytes(pub NonZeroUsize);
30 :
31 : /// Metadata bundled with the start and end offset of a blob.
32 : #[derive(Copy, Clone, Debug)]
33 : pub struct BlobMeta {
34 : pub key: Key,
35 : pub lsn: Lsn,
36 : }
37 :
38 : /// Blob offsets into [`VectoredBlobsBuf::buf`]
39 : pub struct VectoredBlob {
40 : pub start: usize,
41 : pub end: usize,
42 : pub meta: BlobMeta,
43 : }
44 :
45 : /// Return type of [`VectoredBlobReader::read_blobs`]
46 : pub struct VectoredBlobsBuf {
47 : /// Buffer for all blobs in this read
48 : pub buf: BytesMut,
49 : /// Offsets into the buffer and metadata for all blobs in this read
50 : pub blobs: Vec<VectoredBlob>,
51 : }
52 :
53 : /// Description of one disk read for multiple blobs.
54 : /// Used as the argument form [`VectoredBlobReader::read_blobs`]
55 : #[derive(Debug)]
56 : pub struct VectoredRead {
57 : pub start: u64,
58 : pub end: u64,
59 : /// Starting offsets and metadata for each blob in this read
60 : pub blobs_at: VecMap<u64, BlobMeta>,
61 : }
62 :
63 : impl VectoredRead {
64 121109 : pub(crate) fn size(&self) -> usize {
65 121109 : (self.end - self.start) as usize
66 121109 : }
67 : }
68 :
69 : #[derive(Eq, PartialEq)]
70 : pub(crate) enum VectoredReadExtended {
71 : Yes,
72 : No,
73 : }
74 :
75 : pub(crate) struct VectoredReadBuilder {
76 : start: u64,
77 : end: u64,
78 : blobs_at: VecMap<u64, BlobMeta>,
79 : max_read_size: usize,
80 : }
81 :
82 : impl VectoredReadBuilder {
83 : /// Start building a new vectored read.
84 : ///
85 : /// Note that by design, this does not check against reading more than `max_read_size` to
86 : /// support reading larger blobs than the configuration value. The builder will be single use
87 : /// however after that.
88 30218 : pub(crate) fn new(
89 30218 : start_offset: u64,
90 30218 : end_offset: u64,
91 30218 : meta: BlobMeta,
92 30218 : max_read_size: usize,
93 30218 : ) -> Self {
94 30218 : let mut blobs_at = VecMap::default();
95 30218 : blobs_at
96 30218 : .append(start_offset, meta)
97 30218 : .expect("First insertion always succeeds");
98 30218 :
99 30218 : Self {
100 30218 : start: start_offset,
101 30218 : end: end_offset,
102 30218 : blobs_at,
103 30218 : max_read_size,
104 30218 : }
105 30218 : }
106 :
107 : /// Attempt to extend the current read with a new blob if the start
108 : /// offset matches with the current end of the vectored read
109 : /// and the resuting size is below the max read size
110 75435 : pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
111 75435 : tracing::trace!(start, end, "trying to extend");
112 75435 : let size = (end - start) as usize;
113 75435 : if self.end == start && self.size() + size <= self.max_read_size {
114 45558 : self.end = end;
115 45558 : self.blobs_at
116 45558 : .append(start, meta)
117 45558 : .expect("LSNs are ordered within vectored reads");
118 45558 :
119 45558 : return VectoredReadExtended::Yes;
120 29877 : }
121 29877 :
122 29877 : VectoredReadExtended::No
123 75435 : }
124 :
125 64916 : pub(crate) fn size(&self) -> usize {
126 64916 : (self.end - self.start) as usize
127 64916 : }
128 :
129 30218 : pub(crate) fn build(self) -> VectoredRead {
130 30218 : VectoredRead {
131 30218 : start: self.start,
132 30218 : end: self.end,
133 30218 : blobs_at: self.blobs_at,
134 30218 : }
135 30218 : }
136 : }
137 :
138 : #[derive(Copy, Clone, Debug)]
139 : pub enum BlobFlag {
140 : None,
141 : Ignore,
142 : ReplaceAll,
143 : }
144 :
145 : /// Planner for vectored blob reads.
146 : ///
147 : /// Blob offsets are received via [`VectoredReadPlanner::handle`]
148 : /// and coalesced into disk reads.
149 : ///
150 : /// The implementation is very simple:
151 : /// * Collect all blob offsets in an ordered structure
152 : /// * Iterate over the collected blobs and coalesce them into reads at the end
153 : pub struct VectoredReadPlanner {
154 : // Track all the blob offsets. Start offsets must be ordered.
155 : blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
156 : // Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
157 : prev: Option<(Key, Lsn, u64, BlobFlag)>,
158 :
159 : max_read_size: usize,
160 : }
161 :
162 : impl VectoredReadPlanner {
163 356 : pub fn new(max_read_size: usize) -> Self {
164 356 : Self {
165 356 : blobs: BTreeMap::new(),
166 356 : prev: None,
167 356 : max_read_size,
168 356 : }
169 356 : }
170 :
171 : /// Include a new blob in the read plan.
172 : ///
173 : /// This function is called from a B-Tree index visitor (see `DeltaLayerInner::plan_reads`
174 : /// and `ImageLayerInner::plan_reads`). Said visitor wants to collect blob offsets for all
175 : /// keys in a given keyspace. This function must be called for each key in the desired
176 : /// keyspace (monotonically continuous). [`Self::handle_range_end`] must
177 : /// be called after every range in the offset.
178 : ///
179 : /// In the event that keys are skipped, the behaviour is undefined and can lead to an
180 : /// incorrect read plan. We can end up asserting, erroring in wal redo or returning
181 : /// incorrect data to the user.
182 : ///
183 : /// The `flag` argument has two interesting values:
184 : /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs.
185 : /// This is used for WAL records that `will_init`.
186 : /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
187 : /// if the blob is cached.
188 87586 : pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag) {
189 : // Implementation note: internally lag behind by one blob such that
190 : // we have a start and end offset when initialising [`VectoredRead`]
191 87586 : let (prev_key, prev_lsn, prev_offset, prev_flag) = match self.prev {
192 : None => {
193 5102 : self.prev = Some((key, lsn, offset, flag));
194 5102 : return;
195 : }
196 82484 : Some(prev) => prev,
197 82484 : };
198 82484 :
199 82484 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
200 82484 :
201 82484 : self.prev = Some((key, lsn, offset, flag));
202 87586 : }
203 :
204 63932 : pub fn handle_range_end(&mut self, offset: u64) {
205 63932 : if let Some((prev_key, prev_lsn, prev_offset, prev_flag)) = self.prev {
206 5102 : self.add_blob(prev_key, prev_lsn, prev_offset, offset, prev_flag);
207 58830 : }
208 :
209 63932 : self.prev = None;
210 63932 : }
211 :
212 87586 : fn add_blob(&mut self, key: Key, lsn: Lsn, start_offset: u64, end_offset: u64, flag: BlobFlag) {
213 87586 : match flag {
214 57356 : BlobFlag::None => {
215 57356 : let blobs_for_key = self.blobs.entry(key).or_default();
216 57356 : blobs_for_key.push((lsn, start_offset, end_offset));
217 57356 : }
218 30228 : BlobFlag::ReplaceAll => {
219 30228 : let blobs_for_key = self.blobs.entry(key).or_default();
220 30228 : blobs_for_key.clear();
221 30228 : blobs_for_key.push((lsn, start_offset, end_offset));
222 30228 : }
223 2 : BlobFlag::Ignore => {}
224 : }
225 87586 : }
226 :
227 356 : pub fn finish(self) -> Vec<VectoredRead> {
228 356 : let mut current_read_builder: Option<VectoredReadBuilder> = None;
229 356 : let mut reads = Vec::new();
230 :
231 25060 : for (key, blobs_for_key) in self.blobs {
232 100448 : for (lsn, start_offset, end_offset) in blobs_for_key {
233 75744 : let extended = match &mut current_read_builder {
234 75413 : Some(read_builder) => {
235 75413 : read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
236 : }
237 331 : None => VectoredReadExtended::No,
238 : };
239 :
240 75744 : if extended == VectoredReadExtended::No {
241 30202 : let next_read_builder = VectoredReadBuilder::new(
242 30202 : start_offset,
243 30202 : end_offset,
244 30202 : BlobMeta { key, lsn },
245 30202 : self.max_read_size,
246 30202 : );
247 30202 :
248 30202 : let prev_read_builder = current_read_builder.replace(next_read_builder);
249 :
250 : // `current_read_builder` is None in the first iteration of the outer loop
251 30202 : if let Some(read_builder) = prev_read_builder {
252 29871 : reads.push(read_builder.build());
253 29871 : }
254 45542 : }
255 : }
256 : }
257 :
258 356 : if let Some(read_builder) = current_read_builder {
259 331 : reads.push(read_builder.build());
260 331 : }
261 :
262 356 : reads
263 356 : }
264 : }
265 :
266 : /// Disk reader for vectored blob spans (does not go through the page cache)
267 : pub struct VectoredBlobReader<'a> {
268 : file: &'a VirtualFile,
269 : }
270 :
271 : impl<'a> VectoredBlobReader<'a> {
272 366 : pub fn new(file: &'a VirtualFile) -> Self {
273 366 : Self { file }
274 366 : }
275 :
276 : /// Read the requested blobs into the buffer.
277 : ///
278 : /// We have to deal with the fact that blobs are not fixed size.
279 : /// Each blob is prefixed by a size header.
280 : ///
281 : /// The success return value is a struct which contains the buffer
282 : /// filled from disk and a list of offsets at which each blob lies
283 : /// in the buffer.
284 30196 : pub async fn read_blobs(
285 30196 : &self,
286 30196 : read: &VectoredRead,
287 30196 : buf: BytesMut,
288 30196 : ) -> Result<VectoredBlobsBuf, std::io::Error> {
289 30196 : assert!(read.size() > 0);
290 30196 : assert!(
291 30196 : read.size() <= buf.capacity(),
292 0 : "{} > {}",
293 0 : read.size(),
294 0 : buf.capacity()
295 : );
296 30196 : let buf = self
297 30196 : .file
298 30196 : .read_exact_at_n(buf, read.start, read.size())
299 15369 : .await?;
300 :
301 30196 : let blobs_at = read.blobs_at.as_slice();
302 30196 : let start_offset = blobs_at.first().expect("VectoredRead is never empty").0;
303 30196 :
304 30196 : let mut metas = Vec::with_capacity(blobs_at.len());
305 30196 :
306 30196 : // Blobs in `read` only provide their starting offset. The end offset
307 30196 : // of a blob is implicit: the start of the next blob if one exists
308 30196 : // or the end of the read.
309 30196 : let pairs = blobs_at.iter().zip(
310 30196 : blobs_at
311 30196 : .iter()
312 30196 : .map(Some)
313 30196 : .skip(1)
314 30196 : .chain(std::iter::once(None)),
315 30196 : );
316 :
317 105920 : for ((offset, meta), next) in pairs {
318 75724 : let offset_in_buf = offset - start_offset;
319 75724 : let first_len_byte = buf[offset_in_buf as usize];
320 :
321 : // Each blob is prefixed by a header containing it's size.
322 : // Extract the size and skip that header to find the start of the data.
323 : // The size can be 1 or 4 bytes. The most significant bit is 0 in the
324 : // 1 byte case and 1 in the 4 byte case.
325 75724 : let (size_length, blob_size) = if first_len_byte < 0x80 {
326 42106 : (1, first_len_byte as u64)
327 : } else {
328 33618 : let mut blob_size_buf = [0u8; 4];
329 33618 : let offset_in_buf = offset_in_buf as usize;
330 33618 :
331 33618 : blob_size_buf.copy_from_slice(&buf[offset_in_buf..offset_in_buf + 4]);
332 33618 : blob_size_buf[0] &= 0x7f;
333 33618 : (4, u32::from_be_bytes(blob_size_buf) as u64)
334 : };
335 :
336 75724 : let start = offset_in_buf + size_length;
337 75724 : let end = match next {
338 45528 : Some((next_blob_start_offset, _)) => next_blob_start_offset - start_offset,
339 30196 : None => start + blob_size,
340 : };
341 :
342 75724 : assert_eq!(end - start, blob_size);
343 :
344 75724 : metas.push(VectoredBlob {
345 75724 : start: start as usize,
346 75724 : end: end as usize,
347 75724 : meta: *meta,
348 75724 : })
349 : }
350 :
351 30196 : Ok(VectoredBlobsBuf { buf, blobs: metas })
352 30196 : }
353 : }
354 :
355 : #[cfg(test)]
356 : mod tests {
357 : use super::*;
358 :
359 16 : fn validate_read(read: &VectoredRead, offset_range: &[(Key, Lsn, u64, BlobFlag)]) {
360 16 : assert_eq!(read.start, offset_range.first().unwrap().2);
361 :
362 24 : let expected_offsets_in_read: Vec<_> = offset_range.iter().map(|o| o.2).collect();
363 16 :
364 16 : let offsets_in_read: Vec<_> = read
365 16 : .blobs_at
366 16 : .as_slice()
367 16 : .iter()
368 24 : .map(|(offset, _)| *offset)
369 16 : .collect();
370 16 :
371 16 : assert_eq!(expected_offsets_in_read, offsets_in_read);
372 16 : }
373 :
374 : #[test]
375 2 : fn planner_max_read_size_test() {
376 2 : let max_read_size = 128 * 1024;
377 2 : let key = Key::MIN;
378 2 : let lsn = Lsn(0);
379 2 :
380 2 : let blob_descriptions = vec![
381 2 : (key, lsn, 0, BlobFlag::None),
382 2 : (key, lsn, 32 * 1024, BlobFlag::None),
383 2 : (key, lsn, 96 * 1024, BlobFlag::None), // Last in read 1
384 2 : (key, lsn, 128 * 1024, BlobFlag::None), // Last in read 2
385 2 : (key, lsn, 198 * 1024, BlobFlag::None), // Last in read 3
386 2 : (key, lsn, 268 * 1024, BlobFlag::None), // Last in read 4
387 2 : (key, lsn, 396 * 1024, BlobFlag::None), // Last in read 5
388 2 : (key, lsn, 652 * 1024, BlobFlag::None), // Last in read 6
389 2 : ];
390 2 :
391 2 : let ranges = [
392 2 : &blob_descriptions[0..3],
393 2 : &blob_descriptions[3..4],
394 2 : &blob_descriptions[4..5],
395 2 : &blob_descriptions[5..6],
396 2 : &blob_descriptions[6..7],
397 2 : &blob_descriptions[7..],
398 2 : ];
399 2 :
400 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
401 16 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
402 16 : planner.handle(key, lsn, offset, flag);
403 16 : }
404 :
405 2 : planner.handle_range_end(652 * 1024);
406 2 :
407 2 : let reads = planner.finish();
408 2 : assert_eq!(reads.len(), 6);
409 :
410 12 : for (idx, read) in reads.iter().enumerate() {
411 12 : validate_read(read, ranges[idx]);
412 12 : }
413 2 : }
414 :
415 : #[test]
416 2 : fn planner_replacement_test() {
417 2 : let max_read_size = 128 * 1024;
418 2 : let first_key = Key::MIN;
419 2 : let second_key = first_key.next();
420 2 : let lsn = Lsn(0);
421 2 :
422 2 : let blob_descriptions = vec![
423 2 : (first_key, lsn, 0, BlobFlag::None), // First in read 1
424 2 : (first_key, lsn, 1024, BlobFlag::None), // Last in read 1
425 2 : (second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll),
426 2 : (second_key, lsn, 3 * 1024, BlobFlag::None),
427 2 : (second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2
428 2 : (second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2
429 2 : ];
430 2 :
431 2 : let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];
432 2 :
433 2 : let mut planner = VectoredReadPlanner::new(max_read_size);
434 12 : for (key, lsn, offset, flag) in blob_descriptions.clone() {
435 12 : planner.handle(key, lsn, offset, flag);
436 12 : }
437 :
438 2 : planner.handle_range_end(6 * 1024);
439 2 :
440 2 : let reads = planner.finish();
441 2 : assert_eq!(reads.len(), 2);
442 :
443 4 : for (idx, read) in reads.iter().enumerate() {
444 4 : validate_read(read, ranges[idx]);
445 4 : }
446 2 : }
447 : }
|