TLA Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte.
10 : //!
11 : //! len < 128: 0XXXXXXX
12 : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
13 : //!
14 : use crate::context::RequestContext;
15 : use crate::page_cache::PAGE_SZ;
16 : use crate::tenant::block_io::BlockCursor;
17 : use crate::virtual_file::VirtualFile;
18 : use std::cmp::min;
19 : use std::io::{Error, ErrorKind};
20 :
21 : impl<'a> BlockCursor<'a> {
22 : /// Read a blob into a new buffer.
23 CBC 61295721 : pub async fn read_blob(
24 61295721 : &self,
25 61295721 : offset: u64,
26 61295721 : ctx: &RequestContext,
27 61295721 : ) -> Result<Vec<u8>, std::io::Error> {
28 61295719 : let mut buf = Vec::new();
29 61295719 : self.read_blob_into_buf(offset, &mut buf, ctx).await?;
30 61295716 : Ok(buf)
31 61295716 : }
32 : /// Read blob into the given buffer. Any previous contents in the buffer
33 : /// are overwritten.
34 245880840 : pub async fn read_blob_into_buf(
35 245880840 : &self,
36 245880840 : offset: u64,
37 245880840 : dstbuf: &mut Vec<u8>,
38 245880840 : ctx: &RequestContext,
39 245880840 : ) -> Result<(), std::io::Error> {
40 245880828 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
41 245880828 : let mut off = (offset % PAGE_SZ as u64) as usize;
42 :
43 245880828 : let mut buf = self.read_blk(blknum, ctx).await?;
44 :
45 : // peek at the first byte, to determine if it's a 1- or 4-byte length
46 245880822 : let first_len_byte = buf[off];
47 245880822 : let len: usize = if first_len_byte < 0x80 {
48 : // 1-byte length header
49 155608083 : off += 1;
50 155608083 : first_len_byte as usize
51 : } else {
52 : // 4-byte length header
53 90272739 : let mut len_buf = [0u8; 4];
54 90272739 : let thislen = PAGE_SZ - off;
55 90272739 : if thislen < 4 {
56 : // it is split across two pages
57 29488 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
58 29488 : blknum += 1;
59 29488 : buf = self.read_blk(blknum, ctx).await?;
60 29488 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
61 29488 : off = 4 - thislen;
62 90243251 : } else {
63 90243251 : len_buf.copy_from_slice(&buf[off..off + 4]);
64 90243251 : off += 4;
65 90243251 : }
66 90272739 : len_buf[0] &= 0x7f;
67 90272739 : u32::from_be_bytes(len_buf) as usize
68 : };
69 :
70 245880822 : dstbuf.clear();
71 245880822 : dstbuf.reserve(len);
72 245880822 :
73 245880822 : // Read the payload
74 245880822 : let mut remain = len;
75 500376991 : while remain > 0 {
76 254496171 : let mut page_remain = PAGE_SZ - off;
77 254496171 : if page_remain == 0 {
78 : // continue on next page
79 8645372 : blknum += 1;
80 8645372 : buf = self.read_blk(blknum, ctx).await?;
81 8645370 : off = 0;
82 8645370 : page_remain = PAGE_SZ;
83 245850799 : }
84 254496169 : let this_blk_len = min(remain, page_remain);
85 254496169 : dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
86 254496169 : remain -= this_blk_len;
87 254496169 : off += this_blk_len;
88 : }
89 245880820 : Ok(())
90 245880820 : }
91 : }
92 :
93 : /// A wrapper of `VirtualFile` that allows users to write blobs.
94 : ///
95 : /// If a `BlobWriter` is dropped, the internal buffer will be
96 : /// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
97 : /// manually before dropping.
98 : pub struct BlobWriter<const BUFFERED: bool> {
99 : inner: VirtualFile,
100 : offset: u64,
101 : /// A buffer to save on write calls, only used if BUFFERED=true
102 : buf: Vec<u8>,
103 : }
104 :
105 : impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
106 19146 : pub fn new(inner: VirtualFile, start_offset: u64) -> Self {
107 19146 : Self {
108 19146 : inner,
109 19146 : offset: start_offset,
110 19146 : buf: Vec::with_capacity(Self::CAPACITY),
111 19146 : }
112 19146 : }
113 :
114 1889824 : pub fn size(&self) -> u64 {
115 1889824 : self.offset
116 1889824 : }
117 :
118 : const CAPACITY: usize = if BUFFERED { PAGE_SZ } else { 0 };
119 :
120 : #[inline(always)]
121 : /// Writes the given buffer directly to the underlying `VirtualFile`.
122 : /// You need to make sure that the internal buffer is empty, otherwise
123 : /// data will be written in wrong order.
124 457790 : async fn write_all_unbuffered(&mut self, src_buf: &[u8]) -> Result<(), Error> {
125 457790 : self.inner.write_all(src_buf).await?;
126 457790 : self.offset += src_buf.len() as u64;
127 457790 : Ok(())
128 457790 : }
129 :
130 : #[inline(always)]
131 : /// Flushes the internal buffer to the underlying `VirtualFile`.
132 4792104 : pub async fn flush_buffer(&mut self) -> Result<(), Error> {
133 4792104 : self.inner.write_all(&self.buf).await?;
134 4792103 : self.buf.clear();
135 4792103 : Ok(())
136 4792103 : }
137 :
138 : #[inline(always)]
139 : /// Writes as much of `src_buf` into the internal buffer as it fits
140 170217609 : fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
141 170217609 : let remaining = Self::CAPACITY - self.buf.len();
142 170217609 : let to_copy = src_buf.len().min(remaining);
143 170217609 : self.buf.extend_from_slice(&src_buf[..to_copy]);
144 170217609 : self.offset += to_copy as u64;
145 170217609 : to_copy
146 170217609 : }
147 :
148 : /// Internal, possibly buffered, write function
149 165916608 : async fn write_all(&mut self, mut src_buf: &[u8]) -> Result<(), Error> {
150 165916608 : if !BUFFERED {
151 452374 : assert!(self.buf.is_empty());
152 452374 : self.write_all_unbuffered(src_buf).await?;
153 452374 : return Ok(());
154 165464234 : }
155 165464234 : let remaining = Self::CAPACITY - self.buf.len();
156 165464234 : // First try to copy as much as we can into the buffer
157 165464234 : if remaining > 0 {
158 165464234 : let copied = self.write_into_buffer(src_buf);
159 165464234 : src_buf = &src_buf[copied..];
160 165464234 : }
161 : // Then, if the buffer is full, flush it out
162 165464234 : if self.buf.len() == Self::CAPACITY {
163 4776371 : self.flush_buffer().await?;
164 160687863 : }
165 : // Finally, write the tail of src_buf:
166 : // If it wholly fits into the buffer without
167 : // completely filling it, then put it there.
168 : // If not, write it out directly.
169 165464233 : if !src_buf.is_empty() {
170 4758791 : assert_eq!(self.buf.len(), 0);
171 4758791 : if src_buf.len() < Self::CAPACITY {
172 4753375 : let copied = self.write_into_buffer(src_buf);
173 4753375 : // We just verified above that src_buf fits into our internal buffer.
174 4753375 : assert_eq!(copied, src_buf.len());
175 : } else {
176 5416 : self.write_all_unbuffered(src_buf).await?;
177 : }
178 160705442 : }
179 165464233 : Ok(())
180 165916607 : }
181 :
182 : /// Write a blob of data. Returns the offset that it was written to,
183 : /// which can be used to retrieve the data later.
184 82958304 : pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
185 82958304 : let offset = self.offset;
186 82958304 :
187 82958304 : if srcbuf.len() < 128 {
188 : // Short blob. Write a 1-byte length header
189 51721507 : let len_buf = srcbuf.len() as u8;
190 51721507 : self.write_all(&[len_buf]).await?;
191 : } else {
192 : // Write a 4-byte length header
193 31236797 : if srcbuf.len() > 0x7fff_ffff {
194 UBC 0 : return Err(Error::new(
195 0 : ErrorKind::Other,
196 0 : format!("blob too large ({} bytes)", srcbuf.len()),
197 0 : ));
198 CBC 31236797 : }
199 31236797 : let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
200 31236797 : len_buf[0] |= 0x80;
201 31236797 : self.write_all(&len_buf).await?;
202 : }
203 82958304 : self.write_all(srcbuf).await?;
204 82958303 : Ok(offset)
205 82958303 : }
206 : }
207 :
208 : impl BlobWriter<true> {
209 : /// Access the underlying `VirtualFile`.
210 : ///
211 : /// This function flushes the internal buffer before giving access
212 : /// to the underlying `VirtualFile`.
213 15721 : pub async fn into_inner(mut self) -> Result<VirtualFile, Error> {
214 15721 : self.flush_buffer().await?;
215 15721 : Ok(self.inner)
216 15721 : }
217 :
218 : /// Access the underlying `VirtualFile`.
219 : ///
220 : /// Unlike [`into_inner`](Self::into_inner), this doesn't flush
221 : /// the internal buffer before giving access.
222 UBC 0 : pub fn into_inner_no_flush(self) -> VirtualFile {
223 0 : self.inner
224 0 : }
225 : }
226 :
227 : impl BlobWriter<false> {
228 : /// Access the underlying `VirtualFile`.
229 CBC 3402 : pub fn into_inner(self) -> VirtualFile {
230 3402 : self.inner
231 3402 : }
232 : }
233 :
234 : #[cfg(test)]
235 : mod tests {
236 : use super::*;
237 : use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
238 : use rand::{Rng, SeedableRng};
239 :
240 12 : async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
241 12 : let temp_dir = camino_tempfile::tempdir()?;
242 12 : let pathbuf = temp_dir.path().join("file");
243 12 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
244 12 :
245 12 : // Write part (in block to drop the file)
246 12 : let mut offsets = Vec::new();
247 : {
248 12 : let file = VirtualFile::create(pathbuf.as_path()).await?;
249 12 : let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
250 4118 : for blob in blobs.iter() {
251 4118 : let offs = wtr.write_blob(blob).await?;
252 4118 : offsets.push(offs);
253 : }
254 : // Write out one page worth of zeros so that we can
255 : // read again with read_blk
256 12 : let offs = wtr.write_blob(&vec![0; PAGE_SZ]).await?;
257 12 : println!("Writing final blob at offs={offs}");
258 12 : wtr.flush_buffer().await?;
259 : }
260 :
261 12 : let file = VirtualFile::open(pathbuf.as_path()).await?;
262 12 : let rdr = BlockReaderRef::VirtualFile(&file);
263 12 : let rdr = BlockCursor::new(rdr);
264 4118 : for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
265 4118 : let blob_read = rdr.read_blob(*offset, &ctx).await?;
266 4118 : assert_eq!(
267 4118 : blob, &blob_read,
268 UBC 0 : "mismatch for idx={idx} at offset={offset}"
269 : );
270 : }
271 CBC 12 : Ok(())
272 12 : }
273 :
274 2052 : fn random_array(len: usize) -> Vec<u8> {
275 2052 : let mut rng = rand::thread_rng();
276 40965728 : (0..len).map(|_| rng.gen()).collect::<_>()
277 2052 : }
278 :
279 1 : #[tokio::test]
280 1 : async fn test_one() -> Result<(), Error> {
281 1 : let blobs = &[vec![12, 21, 22]];
282 1 : round_trip_test::<false>(blobs).await?;
283 1 : round_trip_test::<true>(blobs).await?;
284 1 : Ok(())
285 : }
286 :
287 1 : #[tokio::test]
288 1 : async fn test_hello_simple() -> Result<(), Error> {
289 1 : let blobs = &[
290 1 : vec![0, 1, 2, 3],
291 1 : b"Hello, World!".to_vec(),
292 1 : Vec::new(),
293 1 : b"foobar".to_vec(),
294 1 : ];
295 1 : round_trip_test::<false>(blobs).await?;
296 1 : round_trip_test::<true>(blobs).await?;
297 1 : Ok(())
298 : }
299 :
300 1 : #[tokio::test]
301 1 : async fn test_really_big_array() -> Result<(), Error> {
302 1 : let blobs = &[
303 1 : b"test".to_vec(),
304 1 : random_array(10 * PAGE_SZ),
305 1 : b"foobar".to_vec(),
306 1 : ];
307 1 : round_trip_test::<false>(blobs).await?;
308 1 : round_trip_test::<true>(blobs).await?;
309 1 : Ok(())
310 : }
311 :
312 1 : #[tokio::test]
313 1 : async fn test_arrays_inc() -> Result<(), Error> {
314 1 : let blobs = (0..PAGE_SZ / 8)
315 1024 : .map(|v| random_array(v * 16))
316 1 : .collect::<Vec<_>>();
317 1 : round_trip_test::<false>(&blobs).await?;
318 1 : round_trip_test::<true>(&blobs).await?;
319 1 : Ok(())
320 : }
321 :
322 1 : #[tokio::test]
323 1 : async fn test_arrays_random_size() -> Result<(), Error> {
324 1 : let mut rng = rand::rngs::StdRng::seed_from_u64(42);
325 1 : let blobs = (0..1024)
326 1024 : .map(|_| {
327 1024 : let mut sz: u16 = rng.gen();
328 1024 : // Make 50% of the arrays small
329 1024 : if rng.gen() {
330 516 : sz |= 63;
331 516 : }
332 1024 : random_array(sz.into())
333 1024 : })
334 1 : .collect::<Vec<_>>();
335 1 : round_trip_test::<false>(&blobs).await?;
336 1 : round_trip_test::<true>(&blobs).await?;
337 1 : Ok(())
338 : }
339 :
340 1 : #[tokio::test]
341 1 : async fn test_arrays_page_boundary() -> Result<(), Error> {
342 1 : let blobs = &[
343 1 : random_array(PAGE_SZ - 4),
344 1 : random_array(PAGE_SZ - 4),
345 1 : random_array(PAGE_SZ - 4),
346 1 : ];
347 1 : round_trip_test::<false>(blobs).await?;
348 1 : round_trip_test::<true>(blobs).await?;
349 1 : Ok(())
350 : }
351 : }
|