Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte.
10 : //!
11 : //! len < 128: 0XXXXXXX
12 : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
13 : //!
14 : use bytes::{BufMut, BytesMut};
15 : use tokio_epoll_uring::{BoundedBuf, Slice};
16 :
17 : use crate::context::RequestContext;
18 : use crate::page_cache::PAGE_SZ;
19 : use crate::tenant::block_io::BlockCursor;
20 : use crate::virtual_file::VirtualFile;
21 : use std::cmp::min;
22 : use std::io::{Error, ErrorKind};
23 :
24 : impl<'a> BlockCursor<'a> {
25 : /// Read a blob into a new buffer.
26 53279163 : pub async fn read_blob(
27 53279163 : &self,
28 53279163 : offset: u64,
29 53279163 : ctx: &RequestContext,
30 53279163 : ) -> Result<Vec<u8>, std::io::Error> {
31 53279163 : let mut buf = Vec::new();
32 53279163 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
33 53279163 : Ok(buf)
34 53279163 : }
35 : /// Read blob into the given buffer. Any previous contents in the buffer
36 : /// are overwritten.
37 137206922 : pub async fn read_blob_into_buf(
38 137206922 : &self,
39 137206922 : offset: u64,
40 137206922 : dstbuf: &mut Vec<u8>,
41 137206922 : ctx: &RequestContext,
42 137206922 : ) -> Result<(), std::io::Error> {
43 137206907 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
44 137206907 : let mut off = (offset % PAGE_SZ as u64) as usize;
45 :
46 137206907 : let mut buf = self.read_blk(blknum, ctx).await?;
47 :
48 : // peek at the first byte, to determine if it's a 1- or 4-byte length
49 137206904 : let first_len_byte = buf[off];
50 137206904 : let len: usize = if first_len_byte < 0x80 {
51 : // 1-byte length header
52 102416583 : off += 1;
53 102416583 : first_len_byte as usize
54 : } else {
55 : // 4-byte length header
56 34790321 : let mut len_buf = [0u8; 4];
57 34790321 : let thislen = PAGE_SZ - off;
58 34790321 : if thislen < 4 {
59 : // it is split across two pages
60 11697 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
61 11697 : blknum += 1;
62 11697 : buf = self.read_blk(blknum, ctx).await?;
63 11697 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
64 11697 : off = 4 - thislen;
65 34778624 : } else {
66 34778624 : len_buf.copy_from_slice(&buf[off..off + 4]);
67 34778624 : off += 4;
68 34778624 : }
69 34790321 : len_buf[0] &= 0x7f;
70 34790321 : u32::from_be_bytes(len_buf) as usize
71 : };
72 :
73 137206904 : dstbuf.clear();
74 137206904 : dstbuf.reserve(len);
75 137206904 :
76 137206904 : // Read the payload
77 137206904 : let mut remain = len;
78 281947648 : while remain > 0 {
79 144740744 : let mut page_remain = PAGE_SZ - off;
80 144740744 : if page_remain == 0 {
81 : // continue on next page
82 7549510 : blknum += 1;
83 7549510 : buf = self.read_blk(blknum, ctx).await?;
84 7549510 : off = 0;
85 7549510 : page_remain = PAGE_SZ;
86 137191234 : }
87 144740744 : let this_blk_len = min(remain, page_remain);
88 144740744 : dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
89 144740744 : remain -= this_blk_len;
90 144740744 : off += this_blk_len;
91 : }
92 137206904 : Ok(())
93 137206904 : }
94 : }
95 :
96 : /// A wrapper of `VirtualFile` that allows users to write blobs.
97 : ///
98 : /// If a `BlobWriter` is dropped, the internal buffer will be
99 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
100 : /// manually before dropping.
101 : pub struct BlobWriter<const BUFFERED: bool> {
102 : inner: VirtualFile,
103 : offset: u64,
104 : /// A buffer to save on write calls, only used if BUFFERED=true
105 : buf: Vec<u8>,
106 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
107 : io_buf: Option<BytesMut>,
108 : }
109 :
110 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
111 22391 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
112 22391 : Self {
113 22391 : inner,
114 22391 : offset: start_offset,
115 22391 : buf: Vec::with_capacity(Self::CAPACITY),
116 22391 : io_buf: Some(BytesMut::new()),
117 22391 : }
118 22391 : }
119 :
120 2302594 : pub fn size(&self) -> u64 {
121 2302594 : self.offset
122 2302594 : }
123 :
124 : const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };
125 :
126 : /// Writes the given buffer directly to the underlying `VirtualFile`.
127 : /// You need to make sure that the internal buffer is empty, otherwise
128 : /// data will be written in wrong order.
129 : #[inline(always)]
130 470731 : async fn write_all_unbuffered<B: BoundedBuf>(
131 470731 : &mut self,
132 470731 : src_buf: B,
133 470731 : ) -> (B::Buf, Result<(), Error>) {
134 470731 : let src_buf_len = src_buf.bytes_init();
135 470731 : let (src_buf, res) = if src_buf_len > 0 {
136 470713 : let src_buf = src_buf.slice(0..src_buf_len);
137 470713 : let res = self.inner.write_all(&src_buf).await;
138 470713 : let src_buf = Slice::into_inner(src_buf);
139 470713 : (src_buf, res)
140 : } else {
141 18 : let res = self.inner.write_all(&[]).await;
142 18 : (Slice::into_inner(src_buf.slice_full()), res)
143 : };
144 470731 : if let Ok(()) = &res {
145 470731 : self.offset += src_buf_len as u64;
146 470731 : }
147 470731 : (src_buf, res)
148 470731 : }
149 :
150 : #[inline(always)]
151 : /// Flushes the internal buffer to the underlying `VirtualFile`.
152 4488957 : pub async fn flush_buffer(&mut self) -> Result<(), Error> {
153 4488957 : self.inner.write_all(&self.buf).await?;
154 4488957 : self.buf.clear();
155 4488957 : Ok(())
156 4488957 : }
157 :
158 : #[inline(always)]
159 : /// Writes as much of `src_buf` into the internal buffer as it fits
160 108664758 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
161 108664758 : let remaining = Self::CAPACITY - self.buf.len();
162 108664758 : let to_copy = src_buf.len().min(remaining);
163 108664758 : self.buf.extend_from_slice(&src_buf[..to_copy]);
164 108664758 : self.offset += to_copy as u64;
165 108664758 : to_copy
166 108664758 : }
167 :
168 : /// Internal, possibly buffered, write function
169 104672834 : async fn write_all<B: BoundedBuf>(&mut self, src_buf: B) -> (B::Buf, Result<(), Error>) {
170 104672834 : if !BUFFERED {
171 466768 : assert!(self.buf.is_empty());
172 466768 : return self.write_all_unbuffered(src_buf).await;
173 104206066 : }
174 104206066 : let remaining = Self::CAPACITY - self.buf.len();
175 104206066 : let src_buf_len = src_buf.bytes_init();
176 104206066 : if src_buf_len == 0 {
177 18 : return (Slice::into_inner(src_buf.slice_full()), Ok(()));
178 104206048 : }
179 104206048 : let mut src_buf = src_buf.slice(0..src_buf_len);
180 104206048 : // First try to copy as much as we can into the buffer
181 104206048 : if remaining > 0 {
182 104206048 : let copied = self.write_into_buffer(&src_buf);
183 104206048 : src_buf = src_buf.slice(copied..);
184 104206048 : }
185 : // Then, if the buffer is full, flush it out
186 104206048 : if self.buf.len() == Self::CAPACITY {
187 4473040 : if let Err(e) = self.flush_buffer().await {
188 0 : return (Slice::into_inner(src_buf), Err(e));
189 4473040 : }
190 99733008 : }
191 : // Finally, write the tail of src_buf:
192 : // If it wholly fits into the buffer without
193 : // completely filling it, then put it there.
194 : // If not, write it out directly.
195 104206048 : let src_buf = if !src_buf.is_empty() {
196 4462673 : assert_eq!(self.buf.len(), 0);
197 4462673 : if src_buf.len() < Self::CAPACITY {
198 4458710 : let copied = self.write_into_buffer(&src_buf);
199 4458710 : // We just verified above that src_buf fits into our internal buffer.
200 4458710 : assert_eq!(copied, src_buf.len());
201 4458710 : Slice::into_inner(src_buf)
202 : } else {
203 3963 : let (src_buf, res) = self.write_all_unbuffered(src_buf).await;
204 3963 : if let Err(e) = res {
205 0 : return (src_buf, Err(e));
206 3963 : }
207 3963 : src_buf
208 : }
209 : } else {
210 99743375 : Slice::into_inner(src_buf)
211 : };
212 104206048 : (src_buf, Ok(()))
213 104672834 : }
214 :
215 : /// Write a blob of data. Returns the offset that it was written to,
216 : /// which can be used to retrieve the data later.
217 52336417 : pub async fn write_blob<B: BoundedBuf>(&mut self, srcbuf: B) -> (B::Buf, Result<u64, Error>) {
218 52336417 : let offset = self.offset;
219 52336417 :
220 52336417 : let len = srcbuf.bytes_init();
221 52336417 :
222 52336417 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
223 52336417 : io_buf.clear();
224 52336417 : let (io_buf, hdr_res) = async {
225 52336417 : if len < 128 {
226 : // Short blob. Write a 1-byte length header
227 31810061 : io_buf.put_u8(len as u8);
228 31810061 : self.write_all(io_buf).await
229 : } else {
230 : // Write a 4-byte length header
231 20526356 : if len > 0x7fff_ffff {
232 0 : return (
233 0 : io_buf,
234 0 : Err(Error::new(
235 0 : ErrorKind::Other,
236 0 : format!("blob too large ({} bytes)", len),
237 0 : )),
238 0 : );
239 20526356 : }
240 20526356 : let mut len_buf = (len as u32).to_be_bytes();
241 20526356 : len_buf[0] |= 0x80;
242 20526356 : io_buf.extend_from_slice(&len_buf[..]);
243 20526356 : self.write_all(io_buf).await
244 : }
245 52336417 : }
246 3588 : .await;
247 52336417 : self.io_buf = Some(io_buf);
248 52336417 : match hdr_res {
249 52336417 : Ok(_) => (),
250 0 : Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
251 : }
252 52336417 : let (srcbuf, res) = self.write_all(srcbuf).await;
253 52336417 : (srcbuf, res.map(|_| offset))
254 52336417 : }
255 : }
256 :
257 : impl BlobWriter<true> {
258 : /// Access the underlying `VirtualFile`.
259 : ///
260 : /// This function flushes the internal buffer before giving access
261 : /// to the underlying `VirtualFile`.
262 15893 : pub async fn into_inner(mut self) -> Result<VirtualFile, Error> {
263 15893 : self.flush_buffer().await?;
264 15893 : Ok(self.inner)
265 15893 : }
266 :
267 : /// Access the underlying `VirtualFile`.
268 : ///
269 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
270 : /// the internal buffer before giving access.
271 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
272 0 : self.inner
273 0 : }
274 : }
275 :
276 : impl BlobWriter<false> {
277 : /// Access the underlying `VirtualFile`.
278 6472 : pub fn into_inner(self) -> VirtualFile {
279 6472 : self.inner
280 6472 : }
281 : }
282 :
283 : #[cfg(test)]
284 : mod tests {
285 : use super::*;
286 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
287 : use rand::{Rng, SeedableRng};
288 :
289 24 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
290 24 : let temp_dir = camino_tempfile::tempdir()?;
291 24 : let pathbuf = temp_dir.path().join("file");
292 24 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
293 24 :
294 24 : // Write part (in block to drop the file)
295 24 : let mut offsets = Vec::new();
296 : {
297 24 : let file = VirtualFile::create(pathbuf.as_path()).await?;
298 24 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
299 8236 : for blob in blobs.iter() {
300 8236 : let (_, res) = wtr.write_blob(blob.clone()).await;
301 8236 : let offs = res?;
302 8236 : offsets.push(offs);
303 : }
304 : // Write out one page worth of zeros so that we can
305 : // read again with read_blk
306 24 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ]).await;
307 24 : let offs = res?;
308 24 : println!("Writing final blob at offs={offs}");
309 24 : wtr.flush_buffer().await?;
310 : }
311 :
312 24 : let file = VirtualFile::open(pathbuf.as_path()).await?;
313 24 : let rdr = BlockReaderRef::VirtualFile(&file);
314 24 : let rdr = BlockCursor::new(rdr);
315 8236 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
316 10256 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
317 8236 : assert_eq!(
318 8236 : blob, &blob_read,
319 0 : "mismatch for idx={idx} at offset={offset}"
320 : );
321 : }
322 24 : Ok(())
323 24 : }
324 :
325 4104 : fn random_array(len: usize) -> Vec<u8> {
326 4104 : let mut rng = rand::thread_rng();
327 49064544 : (0..len).map(|_| rng.gen()).collect::<_>()
328 4104 : }
329 :
330 2 : #[tokio::test]
331 2 : async fn test_one() -> Result<(), Error> {
332 2 : let blobs = &[vec![12, 21, 22]];
333 4 : round_trip_test::<false>(blobs).await?;
334 3 : round_trip_test::<true>(blobs).await?;
335 2 : Ok(())
336 2 : }
337 :
338 2 : #[tokio::test]
339 2 : async fn test_hello_simple() -> Result<(), Error> {
340 2 : let blobs = &[
341 2 : vec![0, 1, 2, 3],
342 2 : b"Hello, World!".to_vec(),
343 2 : Vec::new(),
344 2 : b"foobar".to_vec(),
345 2 : ];
346 7 : round_trip_test::<false>(blobs).await?;
347 6 : round_trip_test::<true>(blobs).await?;
348 2 : Ok(())
349 2 : }
350 :
351 2 : #[tokio::test]
352 2 : async fn test_really_big_array() -> Result<(), Error> {
353 2 : let blobs = &[
354 2 : b"test".to_vec(),
355 2 : random_array(10 * PAGE_SZ),
356 2 : b"foobar".to_vec(),
357 2 : ];
358 16 : round_trip_test::<false>(blobs).await?;
359 15 : round_trip_test::<true>(blobs).await?;
360 2 : Ok(())
361 2 : }
362 :
363 2 : #[tokio::test]
364 2 : async fn test_arrays_inc() -> Result<(), Error> {
365 2 : let blobs = (0..PAGE_SZ / 8)
366 2048 : .map(|v| random_array(v * 16))
367 2 : .collect::<Vec<_>>();
368 2145 : round_trip_test::<false>(&blobs).await?;
369 2110 : round_trip_test::<true>(&blobs).await?;
370 2 : Ok(())
371 2 : }
372 :
373 2 : #[tokio::test]
374 2 : async fn test_arrays_random_size() -> Result<(), Error> {
375 2 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
376 2 : let blobs = (0..1024)
377 2048 : .map(|_| {
378 2048 : let mut sz: u16 = rng.gen();
379 2048 : // Make 50% of the arrays small
380 2048 : if rng.gen() {
381 1032 : sz &= 63;
382 1032 : }
383 2048 : random_array(sz.into())
384 2048 : })
385 2 : .collect::<Vec<_>>();
386 3094 : round_trip_test::<false>(&blobs).await?;
387 3055 : round_trip_test::<true>(&blobs).await?;
388 2 : Ok(())
389 2 : }
390 :
391 2 : #[tokio::test]
392 2 : async fn test_arrays_page_boundary() -> Result<(), Error> {
393 2 : let blobs = &[
394 2 : random_array(PAGE_SZ - 4),
395 2 : random_array(PAGE_SZ - 4),
396 2 : random_array(PAGE_SZ - 4),
397 2 : ];
398 6 : round_trip_test::<false>(blobs).await?;
399 5 : round_trip_test::<true>(blobs).await?;
400 2 : Ok(())
401 2 : }
402 : }
|