Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte.
10 : //!
11 : //! len < 128: 0XXXXXXX
12 : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
13 : //!
14 : use bytes::{BufMut, BytesMut};
15 : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
16 :
17 : use crate::context::RequestContext;
18 : use crate::page_cache::PAGE_SZ;
19 : use crate::tenant::block_io::BlockCursor;
20 : use crate::virtual_file::VirtualFile;
21 : use std::cmp::min;
22 : use std::io::{Error, ErrorKind};
23 :
24 : impl<'a> BlockCursor<'a> {
25 : /// Read a blob into a new buffer.
26 2569238 : pub async fn read_blob(
27 2569238 : &self,
28 2569238 : offset: u64,
29 2569238 : ctx: &RequestContext,
30 2569238 : ) -> Result<Vec<u8>, std::io::Error> {
31 2569238 : let mut buf = Vec::new();
32 2569238 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
33 2569238 : Ok(buf)
34 2569238 : }
35 : /// Read blob into the given buffer. Any previous contents in the buffer
36 : /// are overwritten.
37 7056795 : pub async fn read_blob_into_buf(
38 7056795 : &self,
39 7056795 : offset: u64,
40 7056795 : dstbuf: &mut Vec<u8>,
41 7056795 : ctx: &RequestContext,
42 7056795 : ) -> Result<(), std::io::Error> {
43 7056795 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
44 7056795 : let mut off = (offset % PAGE_SZ as u64) as usize;
45 :
46 7056795 : let mut buf = self.read_blk(blknum, ctx).await?;
47 :
48 : // peek at the first byte, to determine if it's a 1- or 4-byte length
49 7056795 : let first_len_byte = buf[off];
50 7056795 : let len: usize = if first_len_byte < 0x80 {
51 : // 1-byte length header
52 7041993 : off += 1;
53 7041993 : first_len_byte as usize
54 : } else {
55 : // 4-byte length header
56 14802 : let mut len_buf = [0u8; 4];
57 14802 : let thislen = PAGE_SZ - off;
58 14802 : if thislen < 4 {
59 : // it is split across two pages
60 0 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
61 0 : blknum += 1;
62 0 : buf = self.read_blk(blknum, ctx).await?;
63 0 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
64 0 : off = 4 - thislen;
65 14802 : } else {
66 14802 : len_buf.copy_from_slice(&buf[off..off + 4]);
67 14802 : off += 4;
68 14802 : }
69 14802 : len_buf[0] &= 0x7f;
70 14802 : u32::from_be_bytes(len_buf) as usize
71 : };
72 :
73 7056795 : dstbuf.clear();
74 7056795 : dstbuf.reserve(len);
75 7056795 :
76 7056795 : // Read the payload
77 7056795 : let mut remain = len;
78 14195868 : while remain > 0 {
79 7139073 : let mut page_remain = PAGE_SZ - off;
80 7139073 : if page_remain == 0 {
81 : // continue on next page
82 82957 : blknum += 1;
83 82957 : buf = self.read_blk(blknum, ctx).await?;
84 82957 : off = 0;
85 82957 : page_remain = PAGE_SZ;
86 7056116 : }
87 7139073 : let this_blk_len = min(remain, page_remain);
88 7139073 : dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
89 7139073 : remain -= this_blk_len;
90 7139073 : off += this_blk_len;
91 : }
92 7056795 : Ok(())
93 7056795 : }
94 : }
95 :
96 : /// A wrapper of `VirtualFile` that allows users to write blobs.
97 : ///
98 : /// If a `BlobWriter` is dropped, the internal buffer will be
99 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
100 : /// manually before dropping.
101 : pub struct BlobWriter<const BUFFERED: bool> {
102 : inner: VirtualFile,
103 : offset: u64,
104 : /// A buffer to save on write calls, only used if BUFFERED=true
105 : buf: Vec<u8>,
106 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
107 : io_buf: Option<BytesMut>,
108 : }
109 :
110 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
111 1202 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
112 1202 : Self {
113 1202 : inner,
114 1202 : offset: start_offset,
115 1202 : buf: Vec::with_capacity(Self::CAPACITY),
116 1202 : io_buf: Some(BytesMut::new()),
117 1202 : }
118 1202 : }
119 :
120 2023152 : pub fn size(&self) -> u64 {
121 2023152 : self.offset
122 2023152 : }
123 :
124 : const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
125 :
126 : /// Writes the given buffer directly to the underlying `VirtualFile`.
127 : /// You need to make sure that the internal buffer is empty, otherwise
128 : /// data will be written in wrong order.
129 : #[inline(always)]
130 10110 : async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
131 10110 : &mut self,
132 10110 : src_buf: B,
133 10110 : ctx: &RequestContext,
134 10110 : ) -> (B::Buf, Result<(), Error>) {
135 10110 : let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
136 10110 : let nbytes = match res {
137 10110 : Ok(nbytes) => nbytes,
138 0 : Err(e) => return (src_buf, Err(e)),
139 : };
140 10110 : self.offset += nbytes as u64;
141 10110 : (src_buf, Ok(()))
142 10110 : }
143 :
144 : #[inline(always)]
145 : /// Flushes the internal buffer to the underlying `VirtualFile`.
146 9830 : pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
147 9830 : let buf = std::mem::take(&mut self.buf);
148 9830 : let (mut buf, res) = self.inner.write_all(buf, ctx).await;
149 9830 : res?;
150 9830 : buf.clear();
151 9830 : self.buf = buf;
152 9830 : Ok(())
153 9830 : }
154 :
155 : #[inline(always)]
156 : /// Writes as much of `src_buf` into the internal buffer as it fits
157 12825968 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
158 12825968 : let remaining = Self::CAPACITY - self.buf.len();
159 12825968 : let to_copy = src_buf.len().min(remaining);
160 12825968 : self.buf.extend_from_slice(&src_buf[..to_copy]);
161 12825968 : self.offset += to_copy as u64;
162 12825968 : to_copy
163 12825968 : }
164 :
165 : /// Internal, possibly buffered, write function
166 12827392 : async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
167 12827392 : &mut self,
168 12827392 : src_buf: B,
169 12827392 : ctx: &RequestContext,
170 12827392 : ) -> (B::Buf, Result<(), Error>) {
171 12827392 : if !BUFFERED {
172 9892 : assert!(self.buf.is_empty());
173 9892 : return self.write_all_unbuffered(src_buf, ctx).await;
174 12817500 : }
175 12817500 : let remaining = Self::CAPACITY - self.buf.len();
176 12817500 : let src_buf_len = src_buf.bytes_init();
177 12817500 : if src_buf_len == 0 {
178 18 : return (Slice::into_inner(src_buf.slice_full()), Ok(()));
179 12817482 : }
180 12817482 : let mut src_buf = src_buf.slice(0..src_buf_len);
181 12817482 : // First try to copy as much as we can into the buffer
182 12817482 : if remaining > 0 {
183 12817482 : let copied = self.write_into_buffer(&src_buf);
184 12817482 : src_buf = src_buf.slice(copied..);
185 12817482 : }
186 : // Then, if the buffer is full, flush it out
187 12817482 : if self.buf.len() == Self::CAPACITY {
188 8744 : if let Err(e) = self.flush_buffer(ctx).await {
189 0 : return (Slice::into_inner(src_buf), Err(e));
190 8744 : }
191 12808738 : }
192 : // Finally, write the tail of src_buf:
193 : // If it wholly fits into the buffer without
194 : // completely filling it, then put it there.
195 : // If not, write it out directly.
196 12817482 : let src_buf = if !src_buf.is_empty() {
197 8704 : assert_eq!(self.buf.len(), 0);
198 8704 : if src_buf.len() < Self::CAPACITY {
199 8486 : let copied = self.write_into_buffer(&src_buf);
200 8486 : // We just verified above that src_buf fits into our internal buffer.
201 8486 : assert_eq!(copied, src_buf.len());
202 8486 : Slice::into_inner(src_buf)
203 : } else {
204 218 : let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
205 218 : if let Err(e) = res {
206 0 : return (src_buf, Err(e));
207 218 : }
208 218 : src_buf
209 : }
210 : } else {
211 12808778 : Slice::into_inner(src_buf)
212 : };
213 12817482 : (src_buf, Ok(()))
214 12827392 : }
215 :
216 : /// Write a blob of data. Returns the offset that it was written to,
217 : /// which can be used to retrieve the data later.
218 6413696 : pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
219 6413696 : &mut self,
220 6413696 : srcbuf: B,
221 6413696 : ctx: &RequestContext,
222 6413696 : ) -> (B::Buf, Result<u64, Error>) {
223 6413696 : let offset = self.offset;
224 6413696 :
225 6413696 : let len = srcbuf.bytes_init();
226 6413696 :
227 6413696 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
228 6413696 : io_buf.clear();
229 6413696 : let (io_buf, hdr_res) = async {
230 6413696 : if len < 128 {
231 6413696 : // Short blob. Write a 1-byte length header
232 6413696 : io_buf.put_u8(len as u8);
233 6401444 : self.write_all(io_buf, ctx).await
234 6413696 : } else {
235 6413696 : // Write a 4-byte length header
236 6413696 : if len > 0x7fff_ffff {
237 6413696 : return (
238 0 : io_buf,
239 0 : Err(Error::new(
240 0 : ErrorKind::Other,
241 0 : format!("blob too large ({} bytes)", len),
242 0 : )),
243 0 : );
244 6413696 : }
245 12252 : let mut len_buf = (len as u32).to_be_bytes();
246 12252 : len_buf[0] |= 0x80;
247 12252 : io_buf.extend_from_slice(&len_buf[..]);
248 12252 : self.write_all(io_buf, ctx).await
249 6413696 : }
250 6413696 : }
251 2557 : .await;
252 6413696 : self.io_buf = Some(io_buf);
253 6413696 : match hdr_res {
254 6413696 : Ok(_) => (),
255 0 : Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
256 : }
257 6413696 : let (srcbuf, res) = self.write_all(srcbuf, ctx).await;
258 6413696 : (srcbuf, res.map(|_| offset))
259 6413696 : }
260 : }
261 :
262 : impl BlobWriter<true> {
263 : /// Access the underlying `VirtualFile`.
264 : ///
265 : /// This function flushes the internal buffer before giving access
266 : /// to the underlying `VirtualFile`.
267 1062 : pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
268 1062 : self.flush_buffer(ctx).await?;
269 1062 : Ok(self.inner)
270 1062 : }
271 :
272 : /// Access the underlying `VirtualFile`.
273 : ///
274 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
275 : /// the internal buffer before giving access.
276 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
277 0 : self.inner
278 0 : }
279 : }
280 :
281 : impl BlobWriter<false> {
282 : /// Access the underlying `VirtualFile`.
283 116 : pub fn into_inner(self) -> VirtualFile {
284 116 : self.inner
285 116 : }
286 : }
287 :
288 : #[cfg(test)]
289 : mod tests {
290 : use super::*;
291 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
292 : use rand::{Rng, SeedableRng};
293 :
294 24 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
295 24 : let temp_dir = camino_tempfile::tempdir()?;
296 24 : let pathbuf = temp_dir.path().join("file");
297 24 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
298 24 :
299 24 : // Write part (in block to drop the file)
300 24 : let mut offsets = Vec::new();
301 : {
302 24 : let file = VirtualFile::create(pathbuf.as_path()).await?;
303 24 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
304 8236 : for blob in blobs.iter() {
305 8236 : let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
306 8236 : let offs = res?;
307 8236 : offsets.push(offs);
308 : }
309 : // Write out one page worth of zeros so that we can
310 : // read again with read_blk
311 24 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], &ctx).await;
312 24 : let offs = res?;
313 24 : println!("Writing final blob at offs={offs}");
314 24 : wtr.flush_buffer(&ctx).await?;
315 : }
316 :
317 24 : let file = VirtualFile::open(pathbuf.as_path()).await?;
318 24 : let rdr = BlockReaderRef::VirtualFile(&file);
319 24 : let rdr = BlockCursor::new(rdr);
320 8236 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
321 10255 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
322 8236 : assert_eq!(
323 8236 : blob, &blob_read,
324 0 : "mismatch for idx={idx} at offset={offset}"
325 : );
326 : }
327 24 : Ok(())
328 24 : }
329 :
330 4104 : fn random_array(len: usize) -> Vec<u8> {
331 4104 : let mut rng = rand::thread_rng();
332 49064544 : (0..len).map(|_| rng.gen()).collect::<_>()
333 4104 : }
334 :
335 : #[tokio::test]
336 2 : async fn test_one() -> Result<(), Error> {
337 2 : let blobs = &[vec![12, 21, 22]];
338 8 : round_trip_test::<false>(blobs).await?;
339 4 : round_trip_test::<true>(blobs).await?;
340 2 : Ok(())
341 2 : }
342 :
343 : #[tokio::test]
344 2 : async fn test_hello_simple() -> Result<(), Error> {
345 2 : let blobs = &[
346 2 : vec![0, 1, 2, 3],
347 2 : b"Hello, World!".to_vec(),
348 2 : Vec::new(),
349 2 : b"foobar".to_vec(),
350 2 : ];
351 16 : round_trip_test::<false>(blobs).await?;
352 7 : round_trip_test::<true>(blobs).await?;
353 2 : Ok(())
354 2 : }
355 :
356 : #[tokio::test]
357 2 : async fn test_really_big_array() -> Result<(), Error> {
358 2 : let blobs = &[
359 2 : b"test".to_vec(),
360 2 : random_array(10 * PAGE_SZ),
361 2 : b"foobar".to_vec(),
362 2 : ];
363 24 : round_trip_test::<false>(blobs).await?;
364 17 : round_trip_test::<true>(blobs).await?;
365 2 : Ok(())
366 2 : }
367 :
368 : #[tokio::test]
369 2 : async fn test_arrays_inc() -> Result<(), Error> {
370 2 : let blobs = (0..PAGE_SZ / 8)
371 2048 : .map(|v| random_array(v * 16))
372 2 : .collect::<Vec<_>>();
373 4162 : round_trip_test::<false>(&blobs).await?;
374 2212 : round_trip_test::<true>(&blobs).await?;
375 2 : Ok(())
376 2 : }
377 :
378 : #[tokio::test]
379 2 : async fn test_arrays_random_size() -> Result<(), Error> {
380 2 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
381 2 : let blobs = (0..1024)
382 2048 : .map(|_| {
383 2048 : let mut sz: u16 = rng.gen();
384 2048 : // Make 50% of the arrays small
385 2048 : if rng.gen() {
386 1032 : sz &= 63;
387 1032 : }
388 2048 : random_array(sz.into())
389 2048 : })
390 2 : .collect::<Vec<_>>();
391 5106 : round_trip_test::<false>(&blobs).await?;
392 3279 : round_trip_test::<true>(&blobs).await?;
393 2 : Ok(())
394 2 : }
395 :
396 : #[tokio::test]
397 2 : async fn test_arrays_page_boundary() -> Result<(), Error> {
398 2 : let blobs = &[
399 2 : random_array(PAGE_SZ - 4),
400 2 : random_array(PAGE_SZ - 4),
401 2 : random_array(PAGE_SZ - 4),
402 2 : ];
403 14 : round_trip_test::<false>(blobs).await?;
404 6 : round_trip_test::<true>(blobs).await?;
405 2 : Ok(())
406 2 : }
407 : }
|