Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte.
10 : //!
11 : //! len < 128: 0XXXXXXX
12 : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
13 : //!
14 : use crate::context::RequestContext;
15 : use crate::page_cache::PAGE_SZ;
16 : use crate::tenant::block_io::BlockCursor;
17 : use crate::virtual_file::VirtualFile;
18 : use std::cmp::min;
19 : use std::io::{Error, ErrorKind};
20 :
21 : impl<'a> BlockCursor<'a> {
22 : /// Read a blob into a new buffer.
23 53535319 : pub async fn read_blob(
24 53535319 : &self,
25 53535319 : offset: u64,
26 53535319 : ctx: &RequestContext,
27 53535319 : ) -> Result<Vec<u8>, std::io::Error> {
28 53535319 : let mut buf = Vec::new();
29 53535319 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
30 53535319 : Ok(buf)
31 53535319 : }
32 : /// Read blob into the given buffer. Any previous contents in the buffer
33 : /// are overwritten.
34 139016425 : pub async fn read_blob_into_buf(
35 139016425 : &self,
36 139016425 : offset: u64,
37 139016425 : dstbuf: &mut Vec<u8>,
38 139016425 : ctx: &RequestContext,
39 139016520 : ) -> Result<(), std::io::Error> {
40 139016520 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
41 139016520 : let mut off = (offset % PAGE_SZ as u64) as usize;
42 :
43 139016520 : let mut buf = self.read_blk(blknum, ctx).await?;
44 :
45 : // peek at the first byte, to determine if it's a 1- or 4-byte length
46 139016519 : let first_len_byte = buf[off];
47 139016519 : let len: usize = if first_len_byte < 0x80 {
48 : // 1-byte length header
49 102715024 : off += 1;
50 102715024 : first_len_byte as usize
51 : } else {
52 : // 4-byte length header
53 36301495 : let mut len_buf = [0u8; 4];
54 36301495 : let thislen = PAGE_SZ - off;
55 36301495 : if thislen < 4 {
56 : // it is split across two pages
57 11768 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
58 11768 : blknum += 1;
59 11768 : buf = self.read_blk(blknum, ctx).await?;
60 11768 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
61 11768 : off = 4 - thislen;
62 36289727 : } else {
63 36289727 : len_buf.copy_from_slice(&buf[off..off + 4]);
64 36289727 : off += 4;
65 36289727 : }
66 36301495 : len_buf[0] &= 0x7f;
67 36301495 : u32::from_be_bytes(len_buf) as usize
68 : };
69 :
70 139016519 : dstbuf.clear();
71 139016519 : dstbuf.reserve(len);
72 139016519 :
73 139016519 : // Read the payload
74 139016519 : let mut remain = len;
75 286731462 : while remain > 0 {
76 147714943 : let mut page_remain = PAGE_SZ - off;
77 147714943 : if page_remain == 0 {
78 : // continue on next page
79 8713660 : blknum += 1;
80 8713660 : buf = self.read_blk(blknum, ctx).await?;
81 8713660 : off = 0;
82 8713660 : page_remain = PAGE_SZ;
83 139001283 : }
84 147714943 : let this_blk_len = min(remain, page_remain);
85 147714943 : dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
86 147714943 : remain -= this_blk_len;
87 147714943 : off += this_blk_len;
88 : }
89 139016519 : Ok(())
90 139016519 : }
91 : }
92 :
93 : /// A wrapper of `VirtualFile` that allows users to write blobs.
94 : ///
95 : /// If a `BlobWriter` is dropped, the internal buffer will be
96 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
97 : /// manually before dropping.
98 : pub struct BlobWriter<const BUFFERED: bool> {
99 : inner: VirtualFile,
100 : offset: u64,
101 : /// A buffer to save on write calls, only used if BUFFERED=true
102 : buf: Vec<u8>,
103 : }
104 :
105 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
106 21971 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
107 21971 : Self {
108 21971 : inner,
109 21971 : offset: start_offset,
110 21971 : buf: Vec::with_capacity(Self::CAPACITY),
111 21971 : }
112 21971 : }
113 :
114 2395574 : pub fn size(&self) -> u64 {
115 2395574 : self.offset
116 2395574 : }
117 :
118 : const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };
119 :
120 : #[inline(always)]
121 : /// Writes the given buffer directly to the underlying `VirtualFile`.
122 : /// You need to make sure that the internal buffer is empty, otherwise
123 : /// data will be written in wrong order.
124 470923 : async fn write_all_unbuffered(&mut self, src_buf: &[u8]) -> Result<(), Error> {
125 470923 : self.inner.write_all(src_buf).await?;
126 470923 : self.offset += src_buf.len() as u64;
127 470923 : Ok(())
128 470923 : }
129 :
130 : #[inline(always)]
131 : /// Flushes the internal buffer to the underlying `VirtualFile`.
132 4585098 : pub async fn flush_buffer(&mut self) -> Result<(), Error> {
133 4585098 : self.inner.write_all(&self.buf).await?;
134 4585097 : self.buf.clear();
135 4585097 : Ok(())
136 4585097 : }
137 :
138 : #[inline(always)]
139 : /// Writes as much of `src_buf` into the internal buffer as it fits
140 109640927 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
141 109640927 : let remaining = Self::CAPACITY - self.buf.len();
142 109640927 : let to_copy = src_buf.len().min(remaining);
143 109640927 : self.buf.extend_from_slice(&src_buf[..to_copy]);
144 109640927 : self.offset += to_copy as u64;
145 109640927 : to_copy
146 109640927 : }
147 :
148 : /// Internal, possibly buffered, write function
149 105553324 : async fn write_all(&mut self, mut src_buf: &[u8]) -> Result<(), Error> {
150 105553324 : if !BUFFERED {
151 466972 : assert!(self.buf.is_empty());
152 466972 : self.write_all_unbuffered(src_buf).await?;
153 466972 : return Ok(());
154 105086352 : }
155 105086352 : let remaining = Self::CAPACITY - self.buf.len();
156 105086352 : // First try to copy as much as we can into the buffer
157 105086352 : if remaining > 0 {
158 105086352 : let copied = self.write_into_buffer(src_buf);
159 105086352 : src_buf = &src_buf[copied..];
160 105086352 : }
161 : // Then, if the buffer is full, flush it out
162 105086352 : if self.buf.len() == Self::CAPACITY {
163 4569159 : self.flush_buffer().await?;
164 100517193 : }
165 : // Finally, write the tail of src_buf:
166 : // If it wholly fits into the buffer without
167 : // completely filling it, then put it there.
168 : // If not, write it out directly.
169 105086351 : if !src_buf.is_empty() {
170 4558526 : assert_eq!(self.buf.len(), 0);
171 4558526 : if src_buf.len() < Self::CAPACITY {
172 4554575 : let copied = self.write_into_buffer(src_buf);
173 4554575 : // We just verified above that src_buf fits into our internal buffer.
174 4554575 : assert_eq!(copied, src_buf.len());
175 : } else {
176 3951 : self.write_all_unbuffered(src_buf).await?;
177 : }
178 100527825 : }
179 105086351 : Ok(())
180 105553323 : }
181 :
182 : /// Write a blob of data. Returns the offset that it was written to,
183 : /// which can be used to retrieve the data later.
184 52776662 : pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
185 52776662 : let offset = self.offset;
186 52776662 :
187 52776662 : if srcbuf.len() < 128 {
188 : // Short blob. Write a 1-byte length header
189 32095843 : let len_buf = srcbuf.len() as u8;
190 32095843 : self.write_all(&[len_buf]).await?;
191 : } else {
192 : // Write a 4-byte length header
193 20680819 : if srcbuf.len() > 0x7fff_ffff {
194 0 : return Err(Error::new(
195 0 : ErrorKind::Other,
196 0 : format!("blob too large ({} bytes)", srcbuf.len()),
197 0 : ));
198 20680819 : }
199 20680819 : let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
200 20680819 : len_buf[0] |= 0x80;
201 20680819 : self.write_all(&len_buf).await?;
202 : }
203 52776662 : self.write_all(srcbuf).await?;
204 52776661 : Ok(offset)
205 52776661 : }
206 : }
207 :
208 : impl BlobWriter<true> {
209 : /// Access the underlying `VirtualFile`.
210 : ///
211 : /// This function flushes the internal buffer before giving access
212 : /// to the underlying `VirtualFile`.
213 15915 : pub async fn into_inner(mut self) -> Result<VirtualFile, Error> {
214 15915 : self.flush_buffer().await?;
215 15915 : Ok(self.inner)
216 15915 : }
217 :
218 : /// Access the underlying `VirtualFile`.
219 : ///
220 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
221 : /// the internal buffer before giving access.
222 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
223 0 : self.inner
224 0 : }
225 : }
226 :
227 : impl BlobWriter<false> {
228 : /// Access the underlying `VirtualFile`.
229 6029 : pub fn into_inner(self) -> VirtualFile {
230 6029 : self.inner
231 6029 : }
232 : }
233 :
234 : #[cfg(test)]
235 : mod tests {
236 : use super::*;
237 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
238 : use rand::{Rng, SeedableRng};
239 :
240 24 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
241 24 : let temp_dir = camino_tempfile::tempdir()?;
242 24 : let pathbuf = temp_dir.path().join("file");
243 24 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
244 24 :
245 24 : // Write part (in block to drop the file)
246 24 : let mut offsets = Vec::new();
247 : {
248 24 : let file = VirtualFile::create(pathbuf.as_path()).await?;
249 24 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
250 8236 : for blob in blobs.iter() {
251 8236 : let offs = wtr.write_blob(blob).await?;
252 8236 : offsets.push(offs);
253 : }
254 : // Write out one page worth of zeros so that we can
255 : // read again with read_blk
256 24 : let offs = wtr.write_blob(&vec![0; PAGE_SZ]).await?;
257 24 : println!("Writing final blob at offs={offs}");
258 24 : wtr.flush_buffer().await?;
259 : }
260 :
261 24 : let file = VirtualFile::open(pathbuf.as_path()).await?;
262 24 : let rdr = BlockReaderRef::VirtualFile(&file);
263 24 : let rdr = BlockCursor::new(rdr);
264 8236 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
265 10256 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
266 8236 : assert_eq!(
267 8236 : blob, &blob_read,
268 0 : "mismatch for idx={idx} at offset={offset}"
269 : );
270 : }
271 24 : Ok(())
272 24 : }
273 :
274 4104 : fn random_array(len: usize) -> Vec<u8> {
275 4104 : let mut rng = rand::thread_rng();
276 49064544 : (0..len).map(|_| rng.gen()).collect::<_>()
277 4104 : }
278 :
279 2 : #[tokio::test]
280 2 : async fn test_one() -> Result<(), Error> {
281 2 : let blobs = &[vec![12, 21, 22]];
282 4 : round_trip_test::<false>(blobs).await?;
283 3 : round_trip_test::<true>(blobs).await?;
284 2 : Ok(())
285 : }
286 :
287 2 : #[tokio::test]
288 2 : async fn test_hello_simple() -> Result<(), Error> {
289 2 : let blobs = &[
290 2 : vec![0, 1, 2, 3],
291 2 : b"Hello, World!".to_vec(),
292 2 : Vec::new(),
293 2 : b"foobar".to_vec(),
294 2 : ];
295 7 : round_trip_test::<false>(blobs).await?;
296 6 : round_trip_test::<true>(blobs).await?;
297 2 : Ok(())
298 : }
299 :
300 2 : #[tokio::test]
301 2 : async fn test_really_big_array() -> Result<(), Error> {
302 2 : let blobs = &[
303 2 : b"test".to_vec(),
304 2 : random_array(10 * PAGE_SZ),
305 2 : b"foobar".to_vec(),
306 2 : ];
307 16 : round_trip_test::<false>(blobs).await?;
308 15 : round_trip_test::<true>(blobs).await?;
309 2 : Ok(())
310 : }
311 :
312 2 : #[tokio::test]
313 2 : async fn test_arrays_inc() -> Result<(), Error> {
314 2 : let blobs = (0..PAGE_SZ / 8)
315 2048 : .map(|v| random_array(v * 16))
316 2 : .collect::<Vec<_>>();
317 2145 : round_trip_test::<false>(&blobs).await?;
318 2110 : round_trip_test::<true>(&blobs).await?;
319 2 : Ok(())
320 : }
321 :
322 2 : #[tokio::test]
323 2 : async fn test_arrays_random_size() -> Result<(), Error> {
324 2 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
325 2 : let blobs = (0..1024)
326 2048 : .map(|_| {
327 2048 : let mut sz: u16 = rng.gen();
328 2048 : // Make 50% of the arrays small
329 2048 : if rng.gen() {
330 1032 : sz &= 63;
331 1032 : }
332 2048 : random_array(sz.into())
333 2048 : })
334 2 : .collect::<Vec<_>>();
335 3094 : round_trip_test::<false>(&blobs).await?;
336 3055 : round_trip_test::<true>(&blobs).await?;
337 2 : Ok(())
338 : }
339 :
340 2 : #[tokio::test]
341 2 : async fn test_arrays_page_boundary() -> Result<(), Error> {
342 2 : let blobs = &[
343 2 : random_array(PAGE_SZ - 4),
344 2 : random_array(PAGE_SZ - 4),
345 2 : random_array(PAGE_SZ - 4),
346 2 : ];
347 6 : round_trip_test::<false>(blobs).await?;
348 5 : round_trip_test::<true>(blobs).await?;
349 2 : Ok(())
350 : }
351 : }
|