Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte.
10 : //!
11 : //! len < 128: 0XXXXXXX
12 : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
13 : //!
14 : use bytes::{BufMut, BytesMut};
15 : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
16 :
17 : use crate::context::RequestContext;
18 : use crate::page_cache::PAGE_SZ;
19 : use crate::tenant::block_io::BlockCursor;
20 : use crate::virtual_file::VirtualFile;
21 : use std::cmp::min;
22 : use std::io::{Error, ErrorKind};
23 :
24 : impl<'a> BlockCursor<'a> {
25 : /// Read a blob into a new buffer.
26 3581013 : pub async fn read_blob(
27 3581013 : &self,
28 3581013 : offset: u64,
29 3581013 : ctx: &RequestContext,
30 3581013 : ) -> Result<Vec<u8>, std::io::Error> {
31 3581013 : let mut buf = Vec::new();
32 3581013 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
33 3581013 : Ok(buf)
34 3581013 : }
35 : /// Read blob into the given buffer. Any previous contents in the buffer
36 : /// are overwritten.
37 6882593 : pub async fn read_blob_into_buf(
38 6882593 : &self,
39 6882593 : offset: u64,
40 6882593 : dstbuf: &mut Vec<u8>,
41 6882593 : ctx: &RequestContext,
42 6882593 : ) -> Result<(), std::io::Error> {
43 6882593 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
44 6882593 : let mut off = (offset % PAGE_SZ as u64) as usize;
45 :
46 6882593 : let mut buf = self.read_blk(blknum, ctx).await?;
47 :
48 : // peek at the first byte, to determine if it's a 1- or 4-byte length
49 6882593 : let first_len_byte = buf[off];
50 6882593 : let len: usize = if first_len_byte < 0x80 {
51 : // 1-byte length header
52 6867801 : off += 1;
53 6867801 : first_len_byte as usize
54 : } else {
55 : // 4-byte length header
56 14792 : let mut len_buf = [0u8; 4];
57 14792 : let thislen = PAGE_SZ - off;
58 14792 : if thislen < 4 {
59 : // it is split across two pages
60 1 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
61 1 : blknum += 1;
62 1 : buf = self.read_blk(blknum, ctx).await?;
63 1 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
64 1 : off = 4 - thislen;
65 14791 : } else {
66 14791 : len_buf.copy_from_slice(&buf[off..off + 4]);
67 14791 : off += 4;
68 14791 : }
69 14792 : len_buf[0] &= 0x7f;
70 14792 : u32::from_be_bytes(len_buf) as usize
71 : };
72 :
73 6882593 : dstbuf.clear();
74 6882593 : dstbuf.reserve(len);
75 6882593 :
76 6882593 : // Read the payload
77 6882593 : let mut remain = len;
78 13845755 : while remain > 0 {
79 6963162 : let mut page_remain = PAGE_SZ - off;
80 6963162 : if page_remain == 0 {
81 : // continue on next page
82 81242 : blknum += 1;
83 81242 : buf = self.read_blk(blknum, ctx).await?;
84 81242 : off = 0;
85 81242 : page_remain = PAGE_SZ;
86 6881920 : }
87 6963162 : let this_blk_len = min(remain, page_remain);
88 6963162 : dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
89 6963162 : remain -= this_blk_len;
90 6963162 : off += this_blk_len;
91 : }
92 6882593 : Ok(())
93 6882593 : }
94 : }
95 :
96 : /// A wrapper of `VirtualFile` that allows users to write blobs.
97 : ///
98 : /// If a `BlobWriter` is dropped, the internal buffer will be
99 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
100 : /// manually before dropping.
101 : pub struct BlobWriter<const BUFFERED: bool> {
102 : inner: VirtualFile,
103 : offset: u64,
104 : /// A buffer to save on write calls, only used if BUFFERED=true
105 : buf: Vec<u8>,
106 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
107 : io_buf: Option<BytesMut>,
108 : }
109 :
110 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
111 966 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
112 966 : Self {
113 966 : inner,
114 966 : offset: start_offset,
115 966 : buf: Vec::with_capacity(Self::CAPACITY),
116 966 : io_buf: Some(BytesMut::new()),
117 966 : }
118 966 : }
119 :
120 3030898 : pub fn size(&self) -> u64 {
121 3030898 : self.offset
122 3030898 : }
123 :
124 : const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };
125 :
126 : /// Writes the given buffer directly to the underlying `VirtualFile`.
127 : /// You need to make sure that the internal buffer is empty, otherwise
128 : /// data will be written in wrong order.
129 : #[inline(always)]
130 11034 : async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
131 11034 : &mut self,
132 11034 : src_buf: B,
133 11034 : ) -> (B::Buf, Result<(), Error>) {
134 11034 : let (src_buf, res) = self.inner.write_all(src_buf).await;
135 11034 : let nbytes = match res {
136 11034 : Ok(nbytes) => nbytes,
137 0 : Err(e) => return (src_buf, Err(e)),
138 : };
139 11034 : self.offset += nbytes as u64;
140 11034 : (src_buf, Ok(()))
141 11034 : }
142 :
143 : #[inline(always)]
144 : /// Flushes the internal buffer to the underlying `VirtualFile`.
145 67990 : pub async fn flush_buffer(&mut self) -> Result<(), Error> {
146 67990 : let buf = std::mem::take(&mut self.buf);
147 67990 : let (mut buf, res) = self.inner.write_all(buf).await;
148 67990 : res?;
149 67990 : buf.clear();
150 67990 : self.buf = buf;
151 67990 : Ok(())
152 67990 : }
153 :
154 : #[inline(always)]
155 : /// Writes as much of `src_buf` into the internal buffer as it fits
156 12777838 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
157 12777838 : let remaining = Self::CAPACITY - self.buf.len();
158 12777838 : let to_copy = src_buf.len().min(remaining);
159 12777838 : self.buf.extend_from_slice(&src_buf[..to_copy]);
160 12777838 : self.offset += to_copy as u64;
161 12777838 : to_copy
162 12777838 : }
163 :
164 : /// Internal, possibly buffered, write function
165 12723112 : async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
166 12723112 : &mut self,
167 12723112 : src_buf: B,
168 12723112 : ) -> (B::Buf, Result<(), Error>) {
169 12723112 : if !BUFFERED {
170 9700 : assert!(self.buf.is_empty());
171 9700 : return self.write_all_unbuffered(src_buf).await;
172 12713412 : }
173 12713412 : let remaining = Self::CAPACITY - self.buf.len();
174 12713412 : let src_buf_len = src_buf.bytes_init();
175 12713412 : if src_buf_len == 0 {
176 18 : return (Slice::into_inner(src_buf.slice_full()), Ok(()));
177 12713394 : }
178 12713394 : let mut src_buf = src_buf.slice(0..src_buf_len);
179 12713394 : // First try to copy as much as we can into the buffer
180 12713394 : if remaining > 0 {
181 12713394 : let copied = self.write_into_buffer(&src_buf);
182 12713394 : src_buf = src_buf.slice(copied..);
183 12713394 : }
184 : // Then, if the buffer is full, flush it out
185 12713394 : if self.buf.len() == Self::CAPACITY {
186 67128 : if let Err(e) = self.flush_buffer().await {
187 0 : return (Slice::into_inner(src_buf), Err(e));
188 67128 : }
189 12646266 : }
190 : // Finally, write the tail of src_buf:
191 : // If it wholly fits into the buffer without
192 : // completely filling it, then put it there.
193 : // If not, write it out directly.
194 12713394 : let src_buf = if !src_buf.is_empty() {
195 65778 : assert_eq!(self.buf.len(), 0);
196 65778 : if src_buf.len() < Self::CAPACITY {
197 64444 : let copied = self.write_into_buffer(&src_buf);
198 64444 : // We just verified above that src_buf fits into our internal buffer.
199 64444 : assert_eq!(copied, src_buf.len());
200 64444 : Slice::into_inner(src_buf)
201 : } else {
202 1334 : let (src_buf, res) = self.write_all_unbuffered(src_buf).await;
203 1334 : if let Err(e) = res {
204 0 : return (src_buf, Err(e));
205 1334 : }
206 1334 : src_buf
207 : }
208 : } else {
209 12647616 : Slice::into_inner(src_buf)
210 : };
211 12713394 : (src_buf, Ok(()))
212 12723112 : }
213 :
214 : /// Write a blob of data. Returns the offset that it was written to,
215 : /// which can be used to retrieve the data later.
216 6361556 : pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
217 6361556 : &mut self,
218 6361556 : srcbuf: B,
219 6361556 : ) -> (B::Buf, Result<u64, Error>) {
220 6361556 : let offset = self.offset;
221 6361556 :
222 6361556 : let len = srcbuf.bytes_init();
223 6361556 :
224 6361556 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
225 6361556 : io_buf.clear();
226 6361556 : let (io_buf, hdr_res) = async {
227 6361556 : if len < 128 {
228 : // Short blob. Write a 1-byte length header
229 6349310 : io_buf.put_u8(len as u8);
230 6349310 : self.write_all(io_buf).await
231 : } else {
232 : // Write a 4-byte length header
233 12246 : if len > 0x7fff_ffff {
234 0 : return (
235 0 : io_buf,
236 0 : Err(Error::new(
237 0 : ErrorKind::Other,
238 0 : format!("blob too large ({} bytes)", len),
239 0 : )),
240 0 : );
241 12246 : }
242 12246 : let mut len_buf = (len as u32).to_be_bytes();
243 12246 : len_buf[0] |= 0x80;
244 12246 : io_buf.extend_from_slice(&len_buf[..]);
245 12246 : self.write_all(io_buf).await
246 : }
247 6361556 : }
248 2824 : .await;
249 6361556 : self.io_buf = Some(io_buf);
250 6361556 : match hdr_res {
251 6361556 : Ok(_) => (),
252 0 : Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
253 : }
254 6361556 : let (srcbuf, res) = self.write_all(srcbuf).await;
255 6361556 : (srcbuf, res.map(|_| offset))
256 6361556 : }
257 : }
258 :
259 : impl BlobWriter<true> {
260 : /// Access the underlying `VirtualFile`.
261 : ///
262 : /// This function flushes the internal buffer before giving access
263 : /// to the underlying `VirtualFile`.
264 838 : pub async fn into_inner(mut self) -> Result<VirtualFile, Error> {
265 838 : self.flush_buffer().await?;
266 838 : Ok(self.inner)
267 838 : }
268 :
269 : /// Access the underlying `VirtualFile`.
270 : ///
271 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
272 : /// the internal buffer before giving access.
273 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
274 0 : self.inner
275 0 : }
276 : }
277 :
278 : impl BlobWriter<false> {
279 : /// Access the underlying `VirtualFile`.
280 104 : pub fn into_inner(self) -> VirtualFile {
281 104 : self.inner
282 104 : }
283 : }
284 :
285 : #[cfg(test)]
286 : mod tests {
287 : use super::*;
288 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
289 : use rand::{Rng, SeedableRng};
290 :
291 24 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
292 24 : let temp_dir = camino_tempfile::tempdir()?;
293 24 : let pathbuf = temp_dir.path().join("file");
294 24 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
295 24 :
296 24 : // Write part (in block to drop the file)
297 24 : let mut offsets = Vec::new();
298 : {
299 24 : let file = VirtualFile::create(pathbuf.as_path()).await?;
300 24 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
301 8236 : for blob in blobs.iter() {
302 8236 : let (_, res) = wtr.write_blob(blob.clone()).await;
303 8236 : let offs = res?;
304 8236 : offsets.push(offs);
305 : }
306 : // Write out one page worth of zeros so that we can
307 : // read again with read_blk
308 24 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ]).await;
309 24 : let offs = res?;
310 24 : println!("Writing final blob at offs={offs}");
311 24 : wtr.flush_buffer().await?;
312 : }
313 :
314 24 : let file = VirtualFile::open(pathbuf.as_path()).await?;
315 24 : let rdr = BlockReaderRef::VirtualFile(&file);
316 24 : let rdr = BlockCursor::new(rdr);
317 8236 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
318 10256 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
319 8236 : assert_eq!(
320 8236 : blob, &blob_read,
321 0 : "mismatch for idx={idx} at offset={offset}"
322 : );
323 : }
324 24 : Ok(())
325 24 : }
326 :
327 4104 : fn random_array(len: usize) -> Vec<u8> {
328 4104 : let mut rng = rand::thread_rng();
329 49064544 : (0..len).map(|_| rng.gen()).collect::<_>()
330 4104 : }
331 :
332 : #[tokio::test]
333 2 : async fn test_one() -> Result<(), Error> {
334 2 : let blobs = &[vec![12, 21, 22]];
335 8 : round_trip_test::<false>(blobs).await?;
336 5 : round_trip_test::<true>(blobs).await?;
337 2 : Ok(())
338 2 : }
339 :
340 : #[tokio::test]
341 2 : async fn test_hello_simple() -> Result<(), Error> {
342 2 : let blobs = &[
343 2 : vec![0, 1, 2, 3],
344 2 : b"Hello, World!".to_vec(),
345 2 : Vec::new(),
346 2 : b"foobar".to_vec(),
347 2 : ];
348 16 : round_trip_test::<false>(blobs).await?;
349 8 : round_trip_test::<true>(blobs).await?;
350 2 : Ok(())
351 2 : }
352 :
353 : #[tokio::test]
354 2 : async fn test_really_big_array() -> Result<(), Error> {
355 2 : let blobs = &[
356 2 : b"test".to_vec(),
357 2 : random_array(10 * PAGE_SZ),
358 2 : b"foobar".to_vec(),
359 2 : ];
360 24 : round_trip_test::<false>(blobs).await?;
361 19 : round_trip_test::<true>(blobs).await?;
362 2 : Ok(())
363 2 : }
364 :
365 : #[tokio::test]
366 2 : async fn test_arrays_inc() -> Result<(), Error> {
367 2 : let blobs = (0..PAGE_SZ / 8)
368 2048 : .map(|v| random_array(v * 16))
369 2 : .collect::<Vec<_>>();
370 4162 : round_trip_test::<false>(&blobs).await?;
371 3047 : round_trip_test::<true>(&blobs).await?;
372 2 : Ok(())
373 2 : }
374 :
375 : #[tokio::test]
376 2 : async fn test_arrays_random_size() -> Result<(), Error> {
377 2 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
378 2 : let blobs = (0..1024)
379 2048 : .map(|_| {
380 2048 : let mut sz: u16 = rng.gen();
381 2048 : // Make 50% of the arrays small
382 2048 : if rng.gen() {
383 1032 : sz &= 63;
384 1032 : }
385 2048 : random_array(sz.into())
386 2048 : })
387 2 : .collect::<Vec<_>>();
388 5106 : round_trip_test::<false>(&blobs).await?;
389 3868 : round_trip_test::<true>(&blobs).await?;
390 2 : Ok(())
391 2 : }
392 :
393 : #[tokio::test]
394 2 : async fn test_arrays_page_boundary() -> Result<(), Error> {
395 2 : let blobs = &[
396 2 : random_array(PAGE_SZ - 4),
397 2 : random_array(PAGE_SZ - 4),
398 2 : random_array(PAGE_SZ - 4),
399 2 : ];
400 14 : round_trip_test::<false>(blobs).await?;
401 10 : round_trip_test::<true>(blobs).await?;
402 2 : Ok(())
403 2 : }
404 : }
|