Line data Source code
1 : //!
2 : //! Functions for reading and writing variable-sized "blobs".
3 : //!
4 : //! Each blob begins with a 1- or 4-byte length field, followed by the
5 : //! actual data. If the length is smaller than 128 bytes, the length
6 : //! is written as a one byte. If it's larger than that, the length
7 : //! is written as a four-byte integer, in big-endian, with the high
8 : //! bit set. This way, we can detect whether it's 1- or 4-byte header
9 : //! by peeking at the first byte.
10 : //!
11 : //! len < 128: 0XXXXXXX
12 : //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
13 : //!
14 : use crate::page_cache::PAGE_SZ;
15 : use crate::tenant::block_io::BlockCursor;
16 : use std::cmp::min;
17 : use std::io::{Error, ErrorKind};
18 :
19 : impl<'a> BlockCursor<'a> {
20 : /// Read a blob into a new buffer.
21 66457620 : pub async fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
22 66457384 : let mut buf = Vec::new();
23 66457384 : self.read_blob_into_buf(offset, &mut buf).await?;
24 66457382 : Ok(buf)
25 66457382 : }
26 : /// Read blob into the given buffer. Any previous contents in the buffer
27 : /// are overwritten.
28 302763375 : pub async fn read_blob_into_buf(
29 302763375 : &self,
30 302763375 : offset: u64,
31 302763375 : dstbuf: &mut Vec<u8>,
32 302763375 : ) -> Result<(), std::io::Error> {
33 302763071 : let mut blknum = (offset / PAGE_SZ as u64) as u32;
34 302763071 : let mut off = (offset % PAGE_SZ as u64) as usize;
35 :
36 302763071 : let mut buf = self.read_blk(blknum).await?;
37 :
38 : // peek at the first byte, to determine if it's a 1- or 4-byte length
39 302763071 : let first_len_byte = buf[off];
40 302763071 : let len: usize = if first_len_byte < 0x80 {
41 : // 1-byte length header
42 202114663 : off += 1;
43 202114663 : first_len_byte as usize
44 : } else {
45 : // 4-byte length header
46 100648408 : let mut len_buf = [0u8; 4];
47 100648408 : let thislen = PAGE_SZ - off;
48 100648408 : if thislen < 4 {
49 : // it is split across two pages
50 34245 : len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
51 34245 : blknum += 1;
52 34245 : buf = self.read_blk(blknum).await?;
53 34244 : len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
54 34244 : off = 4 - thislen;
55 100614163 : } else {
56 100614163 : len_buf.copy_from_slice(&buf[off..off + 4]);
57 100614163 : off += 4;
58 100614163 : }
59 100648407 : len_buf[0] &= 0x7f;
60 100648407 : u32::from_be_bytes(len_buf) as usize
61 : };
62 :
63 302763070 : dstbuf.clear();
64 302763070 : dstbuf.reserve(len);
65 302763070 :
66 302763070 : // Read the payload
67 302763070 : let mut remain = len;
68 615065283 : while remain > 0 {
69 312302214 : let mut page_remain = PAGE_SZ - off;
70 312302214 : if page_remain == 0 {
71 : // continue on next page
72 9577769 : blknum += 1;
73 9577769 : buf = self.read_blk(blknum).await?;
74 9577768 : off = 0;
75 9577768 : page_remain = PAGE_SZ;
76 302724445 : }
77 312302213 : let this_blk_len = min(remain, page_remain);
78 312302213 : dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
79 312302213 : remain -= this_blk_len;
80 312302213 : off += this_blk_len;
81 : }
82 302763069 : Ok(())
83 302763069 : }
84 : }
85 :
86 : ///
87 : /// An implementation of BlobWriter to write blobs to anything that
88 : /// implements std::io::Write.
89 : ///
90 : pub struct WriteBlobWriter<W> {
91 : inner: W,
92 : offset: u64,
93 : }
94 :
95 : impl<W> WriteBlobWriter<W> {
96 16627 : pub fn new(inner: W, start_offset: u64) -> Self {
97 16627 : WriteBlobWriter {
98 16627 : inner,
99 16627 : offset: start_offset,
100 16627 : }
101 16627 : }
102 :
103 1768535 : pub fn size(&self) -> u64 {
104 1768535 : self.offset
105 1768535 : }
106 :
107 : /// Access the underlying Write object.
108 : ///
109 : /// NOTE: WriteBlobWriter keeps track of the current write offset. If
110 : /// you write something directly to the inner Write object, it makes the
111 : /// internally tracked 'offset' to go out of sync. So don't do that.
112 16620 : pub fn into_inner(self) -> W {
113 16620 : self.inner
114 16620 : }
115 : }
116 :
117 : impl<W> WriteBlobWriter<W>
118 : where
119 : W: std::io::Write,
120 : {
121 : /// Write a blob of data. Returns the offset that it was written to,
122 : /// which can be used to retrieve the data later.
123 91720901 : pub async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
124 91720901 : let offset = self.offset;
125 91720901 :
126 91720901 : if srcbuf.len() < 128 {
127 : // Short blob. Write a 1-byte length header
128 54391508 : let len_buf = srcbuf.len() as u8;
129 54391508 : self.inner.write_all(&[len_buf])?;
130 54391508 : self.offset += 1;
131 : } else {
132 : // Write a 4-byte length header
133 37329393 : if srcbuf.len() > 0x7fff_ffff {
134 0 : return Err(Error::new(
135 0 : ErrorKind::Other,
136 0 : format!("blob too large ({} bytes)", srcbuf.len()),
137 0 : ));
138 37329393 : }
139 37329393 : let mut len_buf = ((srcbuf.len()) as u32).to_be_bytes();
140 37329393 : len_buf[0] |= 0x80;
141 37329393 : self.inner.write_all(&len_buf)?;
142 37329393 : self.offset += 4;
143 : }
144 91720901 : self.inner.write_all(srcbuf)?;
145 91720901 : self.offset += srcbuf.len() as u64;
146 91720901 : Ok(offset)
147 91720901 : }
148 : }
|