Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte.
10 : //!
11 : //! len < 128: 0XXXXXXX
12 : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
13 : //!
14 : use bytes::{BufMut, BytesMut};
15 : use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
16 :
17 : use crate::context::RequestContext;
18 : use crate::page_cache::PAGE_SZ;
19 : use crate::tenant::block_io::BlockCursor;
20 : use crate::virtual_file::VirtualFile;
21 : use std::cmp::min;
22 : use std::io::{Error, ErrorKind};
23 :
24 : impl<'a> BlockCursor<'a> {
25 : /// Read a blob into a new buffer.
26 2598532 : pub async fn read_blob(
27 2598532 : &self,
28 2598532 : offset: u64,
29 2598532 : ctx: &RequestContext,
30 2598532 : ) -> Result<Vec<u8>, std::io::Error> {
31 2598532 : let mut buf = Vec::new();
32 2598532 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
33 2598532 : Ok(buf)
34 2598532 : }
35 : /// Read blob into the given buffer. Any previous contents in the buffer
36 : /// are overwritten.
37 7105766 : pub async fn read_blob_into_buf(
38 7105766 : &self,
39 7105766 : offset: u64,
40 7105766 : dstbuf: &mut Vec<u8>,
41 7105766 : ctx: &RequestContext,
42 7105766 : ) -> Result<(), std::io::Error> {
43 7105766 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
44 7105766 : let mut off = (offset % PAGE_SZ as u64) as usize;
45 :
46 7105766 : let mut buf = self.read_blk(blknum, ctx).await?;
47 :
48 : // peek at the first byte, to determine if it's a 1- or 4-byte length
49 7105766 : let first_len_byte = buf[off];
50 7105766 : let len: usize = if first_len_byte < 0x80 {
51 : // 1-byte length header
52 7090964 : off += 1;
53 7090964 : first_len_byte as usize
54 : } else {
55 : // 4-byte length header
56 14802 : let mut len_buf = [0u8; 4];
57 14802 : let thislen = PAGE_SZ - off;
58 14802 : if thislen < 4 {
59 : // it is split across two pages
60 4 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
61 4 : blknum += 1;
62 4 : buf = self.read_blk(blknum, ctx).await?;
63 4 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
64 4 : off = 4 - thislen;
65 14798 : } else {
66 14798 : len_buf.copy_from_slice(&buf[off..off + 4]);
67 14798 : off += 4;
68 14798 : }
69 14802 : len_buf[0] &= 0x7f;
70 14802 : u32::from_be_bytes(len_buf) as usize
71 : };
72 :
73 7105766 : dstbuf.clear();
74 7105766 : dstbuf.reserve(len);
75 7105766 :
76 7105766 : // Read the payload
77 7105766 : let mut remain = len;
78 14294292 : while remain > 0 {
79 7188526 : let mut page_remain = PAGE_SZ - off;
80 7188526 : if page_remain == 0 {
81 : // continue on next page
82 83445 : blknum += 1;
83 83445 : buf = self.read_blk(blknum, ctx).await?;
84 83445 : off = 0;
85 83445 : page_remain = PAGE_SZ;
86 7105081 : }
87 7188526 : let this_blk_len = min(remain, page_remain);
88 7188526 : dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
89 7188526 : remain -= this_blk_len;
90 7188526 : off += this_blk_len;
91 : }
92 7105766 : Ok(())
93 7105766 : }
94 : }
95 :
96 : /// A wrapper of `VirtualFile` that allows users to write blobs.
97 : ///
98 : /// If a `BlobWriter` is dropped, the internal buffer will be
99 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
100 : /// manually before dropping.
101 : pub struct BlobWriter<const BUFFERED: bool> {
102 : inner: VirtualFile,
103 : offset: u64,
104 : /// A buffer to save on write calls, only used if BUFFERED=true
105 : buf: Vec<u8>,
106 : /// We do tiny writes for the length headers; they need to be in an owned buffer;
107 : io_buf: Option<BytesMut>,
108 : }
109 :
110 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
111 1574 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
112 1574 : Self {
113 1574 : inner,
114 1574 : offset: start_offset,
115 1574 : buf: Vec::with_capacity(Self::CAPACITY),
116 1574 : io_buf: Some(BytesMut::new()),
117 1574 : }
118 1574 : }
119 :
120 2025516 : pub fn size(&self) -> u64 {
121 2025516 : self.offset
122 2025516 : }
123 :
124 : const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
125 :
126 : /// Writes the given buffer directly to the underlying `VirtualFile`.
127 : /// You need to make sure that the internal buffer is empty, otherwise
128 : /// data will be written in wrong order.
129 : #[inline(always)]
130 1079706 : async fn write_all_unbuffered<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
131 1079706 : &mut self,
132 1079706 : src_buf: B,
133 1079706 : ctx: &RequestContext,
134 1079706 : ) -> (B::Buf, Result<(), Error>) {
135 1079706 : let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
136 1079706 : let nbytes = match res {
137 1079706 : Ok(nbytes) => nbytes,
138 0 : Err(e) => return (src_buf, Err(e)),
139 : };
140 1079706 : self.offset += nbytes as u64;
141 1079706 : (src_buf, Ok(()))
142 1079706 : }
143 :
144 : #[inline(always)]
145 : /// Flushes the internal buffer to the underlying `VirtualFile`.
146 10134 : pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
147 10134 : let buf = std::mem::take(&mut self.buf);
148 10134 : let (mut buf, res) = self.inner.write_all(buf, ctx).await;
149 10134 : res?;
150 10134 : buf.clear();
151 10134 : self.buf = buf;
152 10134 : Ok(())
153 10134 : }
154 :
155 : #[inline(always)]
156 : /// Writes as much of `src_buf` into the internal buffer as it fits
157 12918288 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
158 12918288 : let remaining = Self::CAPACITY - self.buf.len();
159 12918288 : let to_copy = src_buf.len().min(remaining);
160 12918288 : self.buf.extend_from_slice(&src_buf[..to_copy]);
161 12918288 : self.offset += to_copy as u64;
162 12918288 : to_copy
163 12918288 : }
164 :
165 : /// Internal, possibly buffered, write function
166 13989260 : async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
167 13989260 : &mut self,
168 13989260 : src_buf: B,
169 13989260 : ctx: &RequestContext,
170 13989260 : ) -> (B::Buf, Result<(), Error>) {
171 13989260 : if !BUFFERED {
172 1079488 : assert!(self.buf.is_empty());
173 1079488 : return self.write_all_unbuffered(src_buf, ctx).await;
174 12909772 : }
175 12909772 : let remaining = Self::CAPACITY - self.buf.len();
176 12909772 : let src_buf_len = src_buf.bytes_init();
177 12909772 : if src_buf_len == 0 {
178 18 : return (Slice::into_inner(src_buf.slice_full()), Ok(()));
179 12909754 : }
180 12909754 : let mut src_buf = src_buf.slice(0..src_buf_len);
181 12909754 : // First try to copy as much as we can into the buffer
182 12909754 : if remaining > 0 {
183 12909754 : let copied = self.write_into_buffer(&src_buf);
184 12909754 : src_buf = src_buf.slice(copied..);
185 12909754 : }
186 : // Then, if the buffer is full, flush it out
187 12909754 : if self.buf.len() == Self::CAPACITY {
188 8792 : if let Err(e) = self.flush_buffer(ctx).await {
189 0 : return (Slice::into_inner(src_buf), Err(e));
190 8792 : }
191 12900962 : }
192 : // Finally, write the tail of src_buf:
193 : // If it wholly fits into the buffer without
194 : // completely filling it, then put it there.
195 : // If not, write it out directly.
196 12909754 : let src_buf = if !src_buf.is_empty() {
197 8752 : assert_eq!(self.buf.len(), 0);
198 8752 : if src_buf.len() < Self::CAPACITY {
199 8534 : let copied = self.write_into_buffer(&src_buf);
200 8534 : // We just verified above that src_buf fits into our internal buffer.
201 8534 : assert_eq!(copied, src_buf.len());
202 8534 : Slice::into_inner(src_buf)
203 : } else {
204 218 : let (src_buf, res) = self.write_all_unbuffered(src_buf, ctx).await;
205 218 : if let Err(e) = res {
206 0 : return (src_buf, Err(e));
207 218 : }
208 218 : src_buf
209 : }
210 : } else {
211 12901002 : Slice::into_inner(src_buf)
212 : };
213 12909754 : (src_buf, Ok(()))
214 13989260 : }
215 :
216 : /// Write a blob of data. Returns the offset that it was written to,
217 : /// which can be used to retrieve the data later.
218 6994630 : pub async fn write_blob<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
219 6994630 : &mut self,
220 6994630 : srcbuf: B,
221 6994630 : ctx: &RequestContext,
222 6994630 : ) -> (B::Buf, Result<u64, Error>) {
223 6994630 : let offset = self.offset;
224 6994630 :
225 6994630 : let len = srcbuf.bytes_init();
226 6994630 :
227 6994630 : let mut io_buf = self.io_buf.take().expect("we always put it back below");
228 6994630 : io_buf.clear();
229 6994630 : let (io_buf, hdr_res) = async {
230 6994630 : if len < 128 {
231 6994630 : // Short blob. Write a 1-byte length header
232 6994630 : io_buf.put_u8(len as u8);
233 6982378 : self.write_all(io_buf, ctx).await
234 6994630 : } else {
235 6994630 : // Write a 4-byte length header
236 6994630 : if len > 0x7fff_ffff {
237 6994630 : return (
238 0 : io_buf,
239 0 : Err(Error::new(
240 0 : ErrorKind::Other,
241 0 : format!("blob too large ({len} bytes)"),
242 0 : )),
243 0 : );
244 6994630 : }
245 12252 : if len > 0x0fff_ffff {
246 6994630 : tracing::warn!("writing blob above future limit ({len} bytes)");
247 6994630 : }
248 6994630 : let mut len_buf = (len as u32).to_be_bytes();
249 12252 : len_buf[0] |= 0x80;
250 12252 : io_buf.extend_from_slice(&len_buf[..]);
251 12252 : self.write_all(io_buf, ctx).await
252 6994630 : }
253 6994630 : }
254 274924 : .await;
255 6994630 : self.io_buf = Some(io_buf);
256 6994630 : match hdr_res {
257 6994630 : Ok(_) => (),
258 0 : Err(e) => return (Slice::into_inner(srcbuf.slice(..)), Err(e)),
259 : }
260 6994630 : let (srcbuf, res) = self.write_all(srcbuf, ctx).await;
261 6994630 : (srcbuf, res.map(|_| offset))
262 6994630 : }
263 : }
264 :
265 : impl BlobWriter<true> {
266 : /// Access the underlying `VirtualFile`.
267 : ///
268 : /// This function flushes the internal buffer before giving access
269 : /// to the underlying `VirtualFile`.
270 1318 : pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
271 1318 : self.flush_buffer(ctx).await?;
272 1318 : Ok(self.inner)
273 1318 : }
274 :
275 : /// Access the underlying `VirtualFile`.
276 : ///
277 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
278 : /// the internal buffer before giving access.
279 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
280 0 : self.inner
281 0 : }
282 : }
283 :
284 : impl BlobWriter<false> {
285 : /// Access the underlying `VirtualFile`.
286 232 : pub fn into_inner(self) -> VirtualFile {
287 232 : self.inner
288 232 : }
289 : }
290 :
291 : #[cfg(test)]
292 : mod tests {
293 : use super::*;
294 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
295 : use rand::{Rng, SeedableRng};
296 :
297 24 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
298 24 : let temp_dir = camino_tempfile::tempdir()?;
299 24 : let pathbuf = temp_dir.path().join("file");
300 24 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
301 24 :
302 24 : // Write part (in block to drop the file)
303 24 : let mut offsets = Vec::new();
304 : {
305 24 : let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
306 24 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
307 8236 : for blob in blobs.iter() {
308 8236 : let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
309 8236 : let offs = res?;
310 8236 : offsets.push(offs);
311 : }
312 : // Write out one page worth of zeros so that we can
313 : // read again with read_blk
314 24 : let (_, res) = wtr.write_blob(vec![0; PAGE_SZ], &ctx).await;
315 24 : let offs = res?;
316 24 : println!("Writing final blob at offs={offs}");
317 24 : wtr.flush_buffer(&ctx).await?;
318 : }
319 :
320 24 : let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
321 24 : let rdr = BlockReaderRef::VirtualFile(&file);
322 24 : let rdr = BlockCursor::new(rdr);
323 8236 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
324 10255 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
325 8236 : assert_eq!(
326 8236 : blob, &blob_read,
327 0 : "mismatch for idx={idx} at offset={offset}"
328 : );
329 : }
330 24 : Ok(())
331 24 : }
332 :
333 4104 : fn random_array(len: usize) -> Vec<u8> {
334 4104 : let mut rng = rand::thread_rng();
335 49064544 : (0..len).map(|_| rng.gen()).collect::<_>()
336 4104 : }
337 :
338 : #[tokio::test]
339 2 : async fn test_one() -> Result<(), Error> {
340 2 : let blobs = &[vec![12, 21, 22]];
341 8 : round_trip_test::<false>(blobs).await?;
342 4 : round_trip_test::<true>(blobs).await?;
343 2 : Ok(())
344 2 : }
345 :
346 : #[tokio::test]
347 2 : async fn test_hello_simple() -> Result<(), Error> {
348 2 : let blobs = &[
349 2 : vec![0, 1, 2, 3],
350 2 : b"Hello, World!".to_vec(),
351 2 : Vec::new(),
352 2 : b"foobar".to_vec(),
353 2 : ];
354 16 : round_trip_test::<false>(blobs).await?;
355 7 : round_trip_test::<true>(blobs).await?;
356 2 : Ok(())
357 2 : }
358 :
359 : #[tokio::test]
360 2 : async fn test_really_big_array() -> Result<(), Error> {
361 2 : let blobs = &[
362 2 : b"test".to_vec(),
363 2 : random_array(10 * PAGE_SZ),
364 2 : b"foobar".to_vec(),
365 2 : ];
366 24 : round_trip_test::<false>(blobs).await?;
367 17 : round_trip_test::<true>(blobs).await?;
368 2 : Ok(())
369 2 : }
370 :
371 : #[tokio::test]
372 2 : async fn test_arrays_inc() -> Result<(), Error> {
373 2 : let blobs = (0..PAGE_SZ / 8)
374 2048 : .map(|v| random_array(v * 16))
375 2 : .collect::<Vec<_>>();
376 4162 : round_trip_test::<false>(&blobs).await?;
377 2212 : round_trip_test::<true>(&blobs).await?;
378 2 : Ok(())
379 2 : }
380 :
381 : #[tokio::test]
382 2 : async fn test_arrays_random_size() -> Result<(), Error> {
383 2 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
384 2 : let blobs = (0..1024)
385 2048 : .map(|_| {
386 2048 : let mut sz: u16 = rng.gen();
387 2048 : // Make 50% of the arrays small
388 2048 : if rng.gen() {
389 1032 : sz &= 63;
390 1032 : }
391 2048 : random_array(sz.into())
392 2048 : })
393 2 : .collect::<Vec<_>>();
394 5106 : round_trip_test::<false>(&blobs).await?;
395 3279 : round_trip_test::<true>(&blobs).await?;
396 2 : Ok(())
397 2 : }
398 :
399 : #[tokio::test]
400 2 : async fn test_arrays_page_boundary() -> Result<(), Error> {
401 2 : let blobs = &[
402 2 : random_array(PAGE_SZ - 4),
403 2 : random_array(PAGE_SZ - 4),
404 2 : random_array(PAGE_SZ - 4),
405 2 : ];
406 14 : round_trip_test::<false>(blobs).await?;
407 6 : round_trip_test::<true>(blobs).await?;
408 2 : Ok(())
409 2 : }
410 : }
|