Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte.
10 : //!
11 : //! len < 128: 0XXXXXXX
12 : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
13 : //!
14 : use bytes::{BufMut, BytesMut};
15 : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
16 :
17 : use crate::context::RequestContext;
18 : use crate::page_cache::PAGE_SZ;
19 : use crate::tenant::block_io::BlockCursor;
20 : use crate::virtual_file::VirtualFile;
21 : use std::cmp::min;
22 : use std::io::{Error, ErrorKind};
23 :
24 : impl<'a> BlockCursor<'a> {
25 : /// Read a blob into a new buffer.
26 3581206 : pub async fn read_blob(
27 3581206 : &self,
28 3581206 : offset: u64,
29 3581206 : ctx: &RequestContext,
30 3581206 : ) -> Result<Vec<u8>, std::io::Error> {
31 3581206 : let mut buf = Vec::new();
32 3581206 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
33 3581206 : Ok(buf)
34 3581206 : }
35 : /// Read blob into the given buffer. Any previous contents in the buffer
36 : /// are overwritten.
37 6882731 : pub async fn read_blob_into_buf(
38 6882731 : &self,
39 6882731 : offset: u64,
40 6882731 : dstbuf: &mut Vec<u8>,
41 6882731 : ctx: &RequestContext,
42 6882731 : ) -> Result<(), std::io::Error> {
43 6882731 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
44 6882731 : let mut off = (offset % PAGE_SZ as u64) as usize;
45 :
46 6882731 : let mut buf = self.read_blk(blknum, ctx).await?;
47 :
48 : // peek at the first byte, to determine if it's a 1- or 4-byte length
49 6882731 : let first_len_byte = buf[off];
50 6882731 : let len: usize = if first_len_byte < 0x80 {
51 : // 1-byte length header
52 6867929 : off += 1;
53 6867929 : first_len_byte as usize
54 : } else {
55 : // 4-byte length header
56 14802 : let mut len_buf = [0u8; 4];
57 14802 : let thislen = PAGE_SZ - off;
58 14802 : if thislen < 4 {
59 : // it is split across two pages
60 3 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
61 3 : blknum += 1;
62 3 : buf = self.read_blk(blknum, ctx).await?;
63 3 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
64 3 : off = 4 - thislen;
65 14799 : } else {
66 14799 : len_buf.copy_from_slice(&buf[off..off + 4]);
67 14799 : off += 4;
68 14799 : }
69 14802 : len_buf[0] &= 0x7f;
70 14802 : u32::from_be_bytes(len_buf) as usize
71 : };
72 :
73 6882731 : dstbuf.clear();
74 6882731 : dstbuf.reserve(len);
75 6882731 :
76 6882731 : // Read the payload
77 6882731 : let mut remain = len;
78 13846060 : while remain > 0 {
79 6963329 : let mut page_remain = PAGE_SZ - off;
80 6963329 : if page_remain == 0 {
81 : // continue on next page
82 81278 : blknum += 1;
83 81278 : buf = self.read_blk(blknum, ctx).await?;
84 81278 : off = 0;
85 81278 : page_remain = PAGE_SZ;
86 6882051 : }
87 6963329 : let this_blk_len = min(remain, page_remain);
88 6963329 : dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
89 6963329 : remain -= this_blk_len;
90 6963329 : off += this_blk_len;
91 : }
92 6882731 : Ok(())
93 6882731 : }
94 : }
95 :
96 : /// A wrapper of `VirtualFile` that allows users to write blobs.
97 : ///
98 : /// If a `BlobWriter` is dropped, the internal buffer will be
99 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
100 : /// manually before dropping.
101 : pub struct BlobWriter<const BUFFERED: bool> {
102 : inner: VirtualFile,
103 : offset: u64,
104 : /// A buffer to save on write calls, only used if BUFFERED=true
105 : buf: Vec<u8>,
106 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
107 : io_buf: Option<BytesMut>,
108 : }
109 :
110 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
111 984 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
112 984 : Self {
113 984 : inner,
114 984 : offset: start_offset,
115 984 : buf: Vec::with_capacity(Self::CAPACITY),
116 984 : io_buf: Some(BytesMut::new()),
117 984 : }
118 984 : }
119 :
120 3030916 : pub fn size(&self) -> u64 {
121 3030916 : self.offset
122 3030916 : }
123 :
124 : const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };
125 :
126 : /// Writes the given buffer directly to the underlying `VirtualFile`.
127 : /// You need to make sure that the internal buffer is empty, otherwise
128 : /// data will be written in wrong order.
129 : #[inline(always)]
130 11104 : async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
131 11104 : &mut self,
132 11104 : src_buf: B,
133 11104 : ) -> (B::Buf, Result<(), Error>) {
134 11104 : let (src_buf, res) = self.inner.write_all(src_buf).await;
135 11104 : let nbytes = match res {
136 11104 : Ok(nbytes) => nbytes,
137 0 : Err(e) => return (src_buf, Err(e)),
138 : };
139 11104 : self.offset += nbytes as u64;
140 11104 : (src_buf, Ok(()))
141 11104 : }
142 :
143 : #[inline(always)]
144 : /// Flushes the internal buffer to the underlying `VirtualFile`.
145 68010 : pub async fn flush_buffer(&mut self) -> Result<(), Error> {
146 68010 : let buf = std::mem::take(&mut self.buf);
147 68010 : let (mut buf, res) = self.inner.write_all(buf).await;
148 68010 : res?;
149 68010 : buf.clear();
150 68010 : self.buf = buf;
151 68010 : Ok(())
152 68010 : }
153 :
154 : #[inline(always)]
155 : /// Writes as much of `src_buf` into the internal buffer as it fits
156 12777930 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
157 12777930 : let remaining = Self::CAPACITY - self.buf.len();
158 12777930 : let to_copy = src_buf.len().min(remaining);
159 12777930 : self.buf.extend_from_slice(&src_buf[..to_copy]);
160 12777930 : self.offset += to_copy as u64;
161 12777930 : to_copy
162 12777930 : }
163 :
164 : /// Internal, possibly buffered, write function
165 12723268 : async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
166 12723268 : &mut self,
167 12723268 : src_buf: B,
168 12723268 : ) -> (B::Buf, Result<(), Error>) {
169 12723268 : if !BUFFERED {
170 9764 : assert!(self.buf.is_empty());
171 9764 : return self.write_all_unbuffered(src_buf).await;
172 12713504 : }
173 12713504 : let remaining = Self::CAPACITY - self.buf.len();
174 12713504 : let src_buf_len = src_buf.bytes_init();
175 12713504 : if src_buf_len == 0 {
176 18 : return (Slice::into_inner(src_buf.slice_full()), Ok(()));
177 12713486 : }
178 12713486 : let mut src_buf = src_buf.slice(0..src_buf_len);
179 12713486 : // First try to copy as much as we can into the buffer
180 12713486 : if remaining > 0 {
181 12713486 : let copied = self.write_into_buffer(&src_buf);
182 12713486 : src_buf = src_buf.slice(copied..);
183 12713486 : }
184 : // Then, if the buffer is full, flush it out
185 12713486 : if self.buf.len() == Self::CAPACITY {
186 67134 : if let Err(e) = self.flush_buffer().await {
187 0 : return (Slice::into_inner(src_buf), Err(e));
188 67134 : }
189 12646352 : }
190 : // Finally, write the tail of src_buf:
191 : // If it wholly fits into the buffer without
192 : // completely filling it, then put it there.
193 : // If not, write it out directly.
194 12713486 : let src_buf = if !src_buf.is_empty() {
195 65784 : assert_eq!(self.buf.len(), 0);
196 65784 : if src_buf.len() < Self::CAPACITY {
197 64444 : let copied = self.write_into_buffer(&src_buf);
198 64444 : // We just verified above that src_buf fits into our internal buffer.
199 64444 : assert_eq!(copied, src_buf.len());
200 64444 : Slice::into_inner(src_buf)
201 : } else {
202 1340 : let (src_buf, res) = self.write_all_unbuffered(src_buf).await;
203 1340 : if let Err(e) = res {
204 0 : return (src_buf, Err(e));
205 1340 : }
206 1340 : src_buf
207 : }
208 : } else {
209 12647702 : Slice::into_inner(src_buf)
210 : };
211 12713486 : (src_buf, Ok(()))
212 12723268 : }
213 :
214 : /// Write a blob of data. Returns the offset that it was written to,
215 : /// which can be used to retrieve the data later.
216 6361634 : pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
217 6361634 : &mut self,
218 6361634 : srcbuf: B,
219 6361634 : ) -> (B::Buf, Result<u64, Error>) {
220 6361634 : let offset = self.offset;
221 6361634 :
222 6361634 : let len = srcbuf.bytes_init();
223 6361634 :
224 6361634 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
225 6361634 : io_buf.clear();
226 6361634 : let (io_buf, hdr_res) = async {
227 6361634 : if len < 128 {
228 : // Short blob. Write a 1-byte length header
229 6349382 : io_buf.put_u8(len as u8);
230 6349382 : self.write_all(io_buf).await
231 : } else {
232 : // Write a 4-byte length header
233 12252 : if len > 0x7fff_ffff {
234 0 : return (
235 0 : io_buf,
236 0 : Err(Error::new(
237 0 : ErrorKind::Other,
238 0 : format!("blob too large ({} bytes)", len),
239 0 : )),
240 0 : );
241 12252 : }
242 12252 : let mut len_buf = (len as u32).to_be_bytes();
243 12252 : len_buf[0] |= 0x80;
244 12252 : io_buf.extend_from_slice(&len_buf[..]);
245 12252 : self.write_all(io_buf).await
246 : }
247 6361634 : }
248 2841 : .await;
249 6361634 : self.io_buf = Some(io_buf);
250 6361634 : match hdr_res {
251 6361634 : Ok(_) => (),
252 0 : Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
253 : }
254 6361634 : let (srcbuf, res) = self.write_all(srcbuf).await;
255 6361634 : (srcbuf, res.map(|_| offset))
256 6361634 : }
257 : }
258 :
259 : impl BlobWriter<true> {
260 : /// Access the underlying `VirtualFile`.
261 : ///
262 : /// This function flushes the internal buffer before giving access
263 : /// to the underlying `VirtualFile`.
264 852 : pub async fn into_inner(mut self) -> Result<VirtualFile, Error> {
265 852 : self.flush_buffer().await?;
266 852 : Ok(self.inner)
267 852 : }
268 :
269 : /// Access the underlying `VirtualFile`.
270 : ///
271 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
272 : /// the internal buffer before giving access.
273 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
274 0 : self.inner
275 0 : }
276 : }
277 :
278 : impl BlobWriter<false> {
279 : /// Access the underlying `VirtualFile`.
280 108 : pub fn into_inner(self) -> VirtualFile {
281 108 : self.inner
282 108 : }
283 : }
284 :
285 : #[cfg(test)]
286 : mod tests {
287 : use super::*;
288 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
289 : use rand::{Rng, SeedableRng};
290 :
291 24 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
292 24 : let temp_dir = camino_tempfile::tempdir()?;
293 24 : let pathbuf = temp_dir.path().join("file");
294 24 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
295 24 :
296 24 : // Write part (in block to drop the file)
297 24 : let mut offsets = Vec::new();
298 : {
299 24 : let file = VirtualFile::create(pathbuf.as_path()).await?;
300 24 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
301 8236 : for blob in blobs.iter() {
302 8236 : let (_, res) = wtr.write_blob(blob.clone()).await;
303 8236 : let offs = res?;
304 8236 : offsets.push(offs);
305 : }
306 : // Write out one page worth of zeros so that we can
307 : // read again with read_blk
308 24 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ]).await;
309 24 : let offs = res?;
310 24 : println!("Writing final blob at offs={offs}");
311 24 : wtr.flush_buffer().await?;
312 : }
313 :
314 24 : let file = VirtualFile::open(pathbuf.as_path()).await?;
315 24 : let rdr = BlockReaderRef::VirtualFile(&file);
316 24 : let rdr = BlockCursor::new(rdr);
317 8236 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
318 10256 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
319 8236 : assert_eq!(
320 8236 : blob, &blob_read,
321 0 : "mismatch for idx={idx} at offset={offset}"
322 : );
323 : }
324 24 : Ok(())
325 24 : }
326 :
327 4104 : fn random_array(len: usize) -> Vec<u8> {
328 4104 : let mut rng = rand::thread_rng();
329 49064544 : (0..len).map(|_| rng.gen()).collect::<_>()
330 4104 : }
331 :
332 : #[tokio::test]
333 2 : async fn test_one() -> Result<(), Error> {
334 2 : let blobs = &[vec![12, 21, 22]];
335 8 : round_trip_test::<false>(blobs).await?;
336 5 : round_trip_test::<true>(blobs).await?;
337 2 : Ok(())
338 2 : }
339 :
340 : #[tokio::test]
341 2 : async fn test_hello_simple() -> Result<(), Error> {
342 2 : let blobs = &[
343 2 : vec![0, 1, 2, 3],
344 2 : b"Hello, World!".to_vec(),
345 2 : Vec::new(),
346 2 : b"foobar".to_vec(),
347 2 : ];
348 16 : round_trip_test::<false>(blobs).await?;
349 8 : round_trip_test::<true>(blobs).await?;
350 2 : Ok(())
351 2 : }
352 :
353 : #[tokio::test]
354 2 : async fn test_really_big_array() -> Result<(), Error> {
355 2 : let blobs = &[
356 2 : b"test".to_vec(),
357 2 : random_array(10 * PAGE_SZ),
358 2 : b"foobar".to_vec(),
359 2 : ];
360 24 : round_trip_test::<false>(blobs).await?;
361 19 : round_trip_test::<true>(blobs).await?;
362 2 : Ok(())
363 2 : }
364 :
365 : #[tokio::test]
366 2 : async fn test_arrays_inc() -> Result<(), Error> {
367 2 : let blobs = (0..PAGE_SZ / 8)
368 2048 : .map(|v| random_array(v * 16))
369 2 : .collect::<Vec<_>>();
370 4162 : round_trip_test::<false>(&blobs).await?;
371 3047 : round_trip_test::<true>(&blobs).await?;
372 2 : Ok(())
373 2 : }
374 :
375 : #[tokio::test]
376 2 : async fn test_arrays_random_size() -> Result<(), Error> {
377 2 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
378 2 : let blobs = (0..1024)
379 2048 : .map(|_| {
380 2048 : let mut sz: u16 = rng.gen();
381 2048 : // Make 50% of the arrays small
382 2048 : if rng.gen() {
383 1032 : sz &= 63;
384 1032 : }
385 2048 : random_array(sz.into())
386 2048 : })
387 2 : .collect::<Vec<_>>();
388 5106 : round_trip_test::<false>(&blobs).await?;
389 3868 : round_trip_test::<true>(&blobs).await?;
390 2 : Ok(())
391 2 : }
392 :
393 : #[tokio::test]
394 2 : async fn test_arrays_page_boundary() -> Result<(), Error> {
395 2 : let blobs = &[
396 2 : random_array(PAGE_SZ - 4),
397 2 : random_array(PAGE_SZ - 4),
398 2 : random_array(PAGE_SZ - 4),
399 2 : ];
400 14 : round_trip_test::<false>(blobs).await?;
401 10 : round_trip_test::<true>(blobs).await?;
402 2 : Ok(())
403 2 : }
404 : }
|