Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte.
10 : //!
11 : //! len < 128: 0XXXXXXX
12 : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
13 : //!
14 : use bytes::{BufMut, BytesMut};
15 : use tokio_epoll_uring::{BoundedBuf, Slice};
16 :
17 : use crate::context::RequestContext;
18 : use crate::page_cache::PAGE_SZ;
19 : use crate::tenant::block_io::BlockCursor;
20 : use crate::virtual_file::VirtualFile;
21 : use std::cmp::min;
22 : use std::io::{Error, ErrorKind};
23 :
24 : impl<'a> BlockCursor<'a> {
25 : /// Read a blob into a new buffer.
26 2560807 : pub async fn read_blob(
27 2560807 : &self,
28 2560807 : offset: u64,
29 2560807 : ctx: &RequestContext,
30 2560807 : ) -> Result<Vec<u8>, std::io::Error> {
31 2560807 : let mut buf = Vec::new();
32 2560807 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
33 2560807 : Ok(buf)
34 2560807 : }
35 : /// Read blob into the given buffer. Any previous contents in the buffer
36 : /// are overwritten.
37 4842344 : pub async fn read_blob_into_buf(
38 4842344 : &self,
39 4842344 : offset: u64,
40 4842344 : dstbuf: &mut Vec<u8>,
41 4842344 : ctx: &RequestContext,
42 4842344 : ) -> Result<(), std::io::Error> {
43 4842344 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
44 4842344 : let mut off = (offset % PAGE_SZ as u64) as usize;
45 :
46 4842344 : let mut buf = self.read_blk(blknum, ctx).await?;
47 :
48 : // peek at the first byte, to determine if it's a 1- or 4-byte length
49 4842344 : let first_len_byte = buf[off];
50 4842344 : let len: usize = if first_len_byte < 0x80 {
51 : // 1-byte length header
52 4827552 : off += 1;
53 4827552 : first_len_byte as usize
54 : } else {
55 : // 4-byte length header
56 14792 : let mut len_buf = [0u8; 4];
57 14792 : let thislen = PAGE_SZ - off;
58 14792 : if thislen < 4 {
59 : // it is split across two pages
60 1 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
61 1 : blknum += 1;
62 1 : buf = self.read_blk(blknum, ctx).await?;
63 1 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
64 1 : off = 4 - thislen;
65 14791 : } else {
66 14791 : len_buf.copy_from_slice(&buf[off..off + 4]);
67 14791 : off += 4;
68 14791 : }
69 14792 : len_buf[0] &= 0x7f;
70 14792 : u32::from_be_bytes(len_buf) as usize
71 : };
72 :
73 4842344 : dstbuf.clear();
74 4842344 : dstbuf.reserve(len);
75 4842344 :
76 4842344 : // Read the payload
77 4842344 : let mut remain = len;
78 9746715 : while remain > 0 {
79 4904371 : let mut page_remain = PAGE_SZ - off;
80 4904371 : if page_remain == 0 {
81 : // continue on next page
82 62495 : blknum += 1;
83 62495 : buf = self.read_blk(blknum, ctx).await?;
84 62495 : off = 0;
85 62495 : page_remain = PAGE_SZ;
86 4841876 : }
87 4904371 : let this_blk_len = min(remain, page_remain);
88 4904371 : dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
89 4904371 : remain -= this_blk_len;
90 4904371 : off += this_blk_len;
91 : }
92 4842344 : Ok(())
93 4842344 : }
94 : }
95 :
96 : /// A wrapper of `VirtualFile` that allows users to write blobs.
97 : ///
98 : /// If a `BlobWriter` is dropped, the internal buffer will be
99 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
100 : /// manually before dropping.
101 : pub struct BlobWriter<const BUFFERED: bool> {
102 : inner: VirtualFile,
103 : offset: u64,
104 : /// A buffer to save on write calls, only used if BUFFERED=true
105 : buf: Vec<u8>,
106 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
107 : io_buf: Option<BytesMut>,
108 : }
109 :
110 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
111 578 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
112 578 : Self {
113 578 : inner,
114 578 : offset: start_offset,
115 578 : buf: Vec::with_capacity(Self::CAPACITY),
116 578 : io_buf: Some(BytesMut::new()),
117 578 : }
118 578 : }
119 :
120 2010524 : pub fn size(&self) -> u64 {
121 2010524 : self.offset
122 2010524 : }
123 :
124 : const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };
125 :
126 : /// Writes the given buffer directly to the underlying `VirtualFile`.
127 : /// You need to make sure that the internal buffer is empty, otherwise
128 : /// data will be written in wrong order.
129 : #[inline(always)]
130 10566 : async fn write_all_unbuffered<B: BoundedBuf>(
131 10566 : &mut self,
132 10566 : src_buf: B,
133 10566 : ) -> (B::Buf, Result<(), Error>) {
134 10566 : let (src_buf, res) = self.inner.write_all(src_buf).await;
135 10566 : let nbytes = match res {
136 10566 : Ok(nbytes) => nbytes,
137 0 : Err(e) => return (src_buf, Err(e)),
138 : };
139 10566 : self.offset += nbytes as u64;
140 10566 : (src_buf, Ok(()))
141 10566 : }
142 :
143 : #[inline(always)]
144 : /// Flushes the internal buffer to the underlying `VirtualFile`.
145 48560 : pub async fn flush_buffer(&mut self) -> Result<(), Error> {
146 48560 : let buf = std::mem::take(&mut self.buf);
147 48560 : let (mut buf, res) = self.inner.write_all(buf).await;
148 48560 : res?;
149 48560 : buf.clear();
150 48560 : self.buf = buf;
151 48560 : Ok(())
152 48560 : }
153 :
154 : #[inline(always)]
155 : /// Writes as much of `src_buf` into the internal buffer as it fits
156 8677198 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
157 8677198 : let remaining = Self::CAPACITY - self.buf.len();
158 8677198 : let to_copy = src_buf.len().min(remaining);
159 8677198 : self.buf.extend_from_slice(&src_buf[..to_copy]);
160 8677198 : self.offset += to_copy as u64;
161 8677198 : to_copy
162 8677198 : }
163 :
164 : /// Internal, possibly buffered, write function
165 8640636 : async fn write_all<B: BoundedBuf>(&mut self, src_buf: B) -> (B::Buf, Result<(), Error>) {
166 8640636 : if !BUFFERED {
167 9444 : assert!(self.buf.is_empty());
168 9444 : return self.write_all_unbuffered(src_buf).await;
169 8631192 : }
170 8631192 : let remaining = Self::CAPACITY - self.buf.len();
171 8631192 : let src_buf_len = src_buf.bytes_init();
172 8631192 : if src_buf_len == 0 {
173 18 : return (Slice::into_inner(src_buf.slice_full()), Ok(()));
174 8631174 : }
175 8631174 : let mut src_buf = src_buf.slice(0..src_buf_len);
176 8631174 : // First try to copy as much as we can into the buffer
177 8631174 : if remaining > 0 {
178 8631174 : let copied = self.write_into_buffer(&src_buf);
179 8631174 : src_buf = src_buf.slice(copied..);
180 8631174 : }
181 : // Then, if the buffer is full, flush it out
182 8631174 : if self.buf.len() == Self::CAPACITY {
183 48056 : if let Err(e) = self.flush_buffer().await {
184 0 : return (Slice::into_inner(src_buf), Err(e));
185 48056 : }
186 8583118 : }
187 : // Finally, write the tail of src_buf:
188 : // If it wholly fits into the buffer without
189 : // completely filling it, then put it there.
190 : // If not, write it out directly.
191 8631174 : let src_buf = if !src_buf.is_empty() {
192 47146 : assert_eq!(self.buf.len(), 0);
193 47146 : if src_buf.len() < Self::CAPACITY {
194 46024 : let copied = self.write_into_buffer(&src_buf);
195 46024 : // We just verified above that src_buf fits into our internal buffer.
196 46024 : assert_eq!(copied, src_buf.len());
197 46024 : Slice::into_inner(src_buf)
198 : } else {
199 1122 : let (src_buf, res) = self.write_all_unbuffered(src_buf).await;
200 1122 : if let Err(e) = res {
201 0 : return (src_buf, Err(e));
202 1122 : }
203 1122 : src_buf
204 : }
205 : } else {
206 8584028 : Slice::into_inner(src_buf)
207 : };
208 8631174 : (src_buf, Ok(()))
209 8640636 : }
210 :
211 : /// Write a blob of data. Returns the offset that it was written to,
212 : /// which can be used to retrieve the data later.
213 4320318 : pub async fn write_blob<B: BoundedBuf>(&mut self, srcbuf: B) -> (B::Buf, Result<u64, Error>) {
214 4320318 : let offset = self.offset;
215 4320318 :
216 4320318 : let len = srcbuf.bytes_init();
217 4320318 :
218 4320318 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
219 4320318 : io_buf.clear();
220 4320318 : let (io_buf, hdr_res) = async {
221 4320318 : if len < 128 {
222 : // Short blob. Write a 1-byte length header
223 4308722 : io_buf.put_u8(len as u8);
224 4308722 : self.write_all(io_buf).await
225 : } else {
226 : // Write a 4-byte length header
227 11596 : if len > 0x7fff_ffff {
228 0 : return (
229 0 : io_buf,
230 0 : Err(Error::new(
231 0 : ErrorKind::Other,
232 0 : format!("blob too large ({} bytes)", len),
233 0 : )),
234 0 : );
235 11596 : }
236 11596 : let mut len_buf = (len as u32).to_be_bytes();
237 11596 : len_buf[0] |= 0x80;
238 11596 : io_buf.extend_from_slice(&len_buf[..]);
239 11596 : self.write_all(io_buf).await
240 : }
241 4320318 : }
242 26 : .await;
243 4320318 : self.io_buf = Some(io_buf);
244 4320318 : match hdr_res {
245 4320318 : Ok(_) => (),
246 0 : Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
247 : }
248 4320318 : let (srcbuf, res) = self.write_all(srcbuf).await;
249 4320318 : (srcbuf, res.map(|_| offset))
250 4320318 : }
251 : }
252 :
253 : impl BlobWriter<true> {
254 : /// Access the underlying `VirtualFile`.
255 : ///
256 : /// This function flushes the internal buffer before giving access
257 : /// to the underlying `VirtualFile`.
258 480 : pub async fn into_inner(mut self) -> Result<VirtualFile, Error> {
259 480 : self.flush_buffer().await?;
260 480 : Ok(self.inner)
261 480 : }
262 :
263 : /// Access the underlying `VirtualFile`.
264 : ///
265 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
266 : /// the internal buffer before giving access.
267 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
268 0 : self.inner
269 0 : }
270 : }
271 :
272 : impl BlobWriter<false> {
273 : /// Access the underlying `VirtualFile`.
274 74 : pub fn into_inner(self) -> VirtualFile {
275 74 : self.inner
276 74 : }
277 : }
278 :
279 : #[cfg(test)]
280 : mod tests {
281 : use super::*;
282 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
283 : use rand::{Rng, SeedableRng};
284 :
285 24 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
286 24 : let temp_dir = camino_tempfile::tempdir()?;
287 24 : let pathbuf = temp_dir.path().join("file");
288 24 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
289 24 :
290 24 : // Write part (in block to drop the file)
291 24 : let mut offsets = Vec::new();
292 : {
293 24 : let file = VirtualFile::create(pathbuf.as_path()).await?;
294 24 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
295 8236 : for blob in blobs.iter() {
296 8236 : let (_, res) = wtr.write_blob(blob.clone()).await;
297 8236 : let offs = res?;
298 8236 : offsets.push(offs);
299 : }
300 : // Write out one page worth of zeros so that we can
301 : // read again with read_blk
302 24 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ]).await;
303 24 : let offs = res?;
304 24 : println!("Writing final blob at offs={offs}");
305 24 : wtr.flush_buffer().await?;
306 : }
307 :
308 24 : let file = VirtualFile::open(pathbuf.as_path()).await?;
309 24 : let rdr = BlockReaderRef::VirtualFile(&file);
310 24 : let rdr = BlockCursor::new(rdr);
311 8236 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
312 10256 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
313 8236 : assert_eq!(
314 8236 : blob, &blob_read,
315 0 : "mismatch for idx={idx} at offset={offset}"
316 : );
317 : }
318 24 : Ok(())
319 24 : }
320 :
321 4104 : fn random_array(len: usize) -> Vec<u8> {
322 4104 : let mut rng = rand::thread_rng();
323 49064544 : (0..len).map(|_| rng.gen()).collect::<_>()
324 4104 : }
325 :
326 2 : #[tokio::test]
327 2 : async fn test_one() -> Result<(), Error> {
328 2 : let blobs = &[vec![12, 21, 22]];
329 4 : round_trip_test::<false>(blobs).await?;
330 3 : round_trip_test::<true>(blobs).await?;
331 2 : Ok(())
332 2 : }
333 :
334 2 : #[tokio::test]
335 2 : async fn test_hello_simple() -> Result<(), Error> {
336 2 : let blobs = &[
337 2 : vec![0, 1, 2, 3],
338 2 : b"Hello, World!".to_vec(),
339 2 : Vec::new(),
340 2 : b"foobar".to_vec(),
341 2 : ];
342 7 : round_trip_test::<false>(blobs).await?;
343 6 : round_trip_test::<true>(blobs).await?;
344 2 : Ok(())
345 2 : }
346 :
347 2 : #[tokio::test]
348 2 : async fn test_really_big_array() -> Result<(), Error> {
349 2 : let blobs = &[
350 2 : b"test".to_vec(),
351 2 : random_array(10 * PAGE_SZ),
352 2 : b"foobar".to_vec(),
353 2 : ];
354 16 : round_trip_test::<false>(blobs).await?;
355 15 : round_trip_test::<true>(blobs).await?;
356 2 : Ok(())
357 2 : }
358 :
359 2 : #[tokio::test]
360 2 : async fn test_arrays_inc() -> Result<(), Error> {
361 2 : let blobs = (0..PAGE_SZ / 8)
362 2048 : .map(|v| random_array(v * 16))
363 2 : .collect::<Vec<_>>();
364 2145 : round_trip_test::<false>(&blobs).await?;
365 2110 : round_trip_test::<true>(&blobs).await?;
366 2 : Ok(())
367 2 : }
368 :
369 2 : #[tokio::test]
370 2 : async fn test_arrays_random_size() -> Result<(), Error> {
371 2 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
372 2 : let blobs = (0..1024)
373 2048 : .map(|_| {
374 2048 : let mut sz: u16 = rng.gen();
375 2048 : // Make 50% of the arrays small
376 2048 : if rng.gen() {
377 1032 : sz &= 63;
378 1032 : }
379 2048 : random_array(sz.into())
380 2048 : })
381 2 : .collect::<Vec<_>>();
382 3094 : round_trip_test::<false>(&blobs).await?;
383 3055 : round_trip_test::<true>(&blobs).await?;
384 2 : Ok(())
385 2 : }
386 :
387 2 : #[tokio::test]
388 2 : async fn test_arrays_page_boundary() -> Result<(), Error> {
389 2 : let blobs = &[
390 2 : random_array(PAGE_SZ - 4),
391 2 : random_array(PAGE_SZ - 4),
392 2 : random_array(PAGE_SZ - 4),
393 2 : ];
394 6 : round_trip_test::<false>(blobs).await?;
395 5 : round_trip_test::<true>(blobs).await?;
396 2 : Ok(())
397 2 : }
398 : }
|