Line data Source code
1 : //!
2 : //! Low-level Block-oriented I/O functions
3 : //!
4 :
5 : use super::ephemeral_file::EphemeralFile;
6 : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
7 : use crate::context::RequestContext;
8 : use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
9 : use crate::virtual_file::VirtualFile;
10 : use bytes::Bytes;
11 : use std::ops::Deref;
12 :
13 : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
14 : /// blocks, using the page cache
15 : ///
16 : /// There are currently two implementations: EphemeralFile, and FileBlockReader
17 : /// below.
18 : pub trait BlockReader {
19 : ///
20 : /// Create a new "cursor" for reading from this reader.
21 : ///
22 : /// A cursor caches the last accessed page, allowing for faster
23 : /// access if the same block is accessed repeatedly.
24 : fn block_cursor(&self) -> BlockCursor<'_>;
25 : }
26 :
27 : impl<B> BlockReader for &B
28 : where
29 : B: BlockReader,
30 : {
31 210798 : fn block_cursor(&self) -> BlockCursor<'_> {
32 210798 : (*self).block_cursor()
33 210798 : }
34 : }
35 :
36 : /// Reference to an in-memory copy of an immutable on-disk block.
37 : pub enum BlockLease<'a> {
38 : PageReadGuard(PageReadGuard<'static>),
39 : EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
40 : Slice(&'a [u8; PAGE_SZ]),
41 : #[cfg(test)]
42 : Arc(std::sync::Arc<[u8; PAGE_SZ]>),
43 : #[cfg(test)]
44 : Vec(Vec<u8>),
45 : }
46 :
47 : impl From<PageReadGuard<'static>> for BlockLease<'static> {
48 2739717 : fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
49 2739717 : BlockLease::PageReadGuard(value)
50 2739717 : }
51 : }
52 :
53 : #[cfg(test)]
54 : impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
55 1016766 : fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
56 1016766 : BlockLease::Arc(value)
57 1016766 : }
58 : }
59 :
60 : impl<'a> Deref for BlockLease<'a> {
61 : type Target = [u8; PAGE_SZ];
62 :
63 15853472 : fn deref(&self) -> &Self::Target {
64 15853472 : match self {
65 13498801 : BlockLease::PageReadGuard(v) => v.deref(),
66 1302613 : BlockLease::EphemeralFileMutableTail(v) => v,
67 0 : BlockLease::Slice(v) => v,
68 : #[cfg(test)]
69 1016766 : BlockLease::Arc(v) => v.deref(),
70 : #[cfg(test)]
71 35292 : BlockLease::Vec(v) => {
72 35292 : TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
73 : }
74 : }
75 15853472 : }
76 : }
77 :
78 : /// Provides the ability to read blocks from different sources,
79 : /// similar to using traits for this purpose.
80 : ///
81 : /// Unlike traits, we also support the read function to be async though.
82 : pub(crate) enum BlockReaderRef<'a> {
83 : FileBlockReader(&'a FileBlockReader<'a>),
84 : EphemeralFile(&'a EphemeralFile),
85 : Adapter(Adapter<&'a DeltaLayerInner>),
86 : Slice(&'a [u8]),
87 : #[cfg(test)]
88 : TestDisk(&'a super::disk_btree::tests::TestDisk),
89 : #[cfg(test)]
90 : VirtualFile(&'a VirtualFile),
91 : }
92 :
93 : impl<'a> BlockReaderRef<'a> {
94 : #[inline(always)]
95 8732158 : async fn read_blk(
96 8732158 : &self,
97 8732158 : blknum: u32,
98 8732158 : ctx: &RequestContext,
99 8732158 : ) -> Result<BlockLease, std::io::Error> {
100 8732158 : use BlockReaderRef::*;
101 8732158 : match self {
102 655625 : FileBlockReader(r) => r.read_blk(blknum, ctx).await,
103 4955849 : EphemeralFile(r) => r.read_blk(blknum, ctx).await,
104 2083002 : Adapter(r) => r.read_blk(blknum, ctx).await,
105 0 : Slice(s) => Self::read_blk_slice(s, blknum),
106 : #[cfg(test)]
107 1016766 : TestDisk(r) => r.read_blk(blknum),
108 : #[cfg(test)]
109 20916 : VirtualFile(r) => r.read_blk(blknum, ctx).await,
110 : }
111 8732158 : }
112 : }
113 :
114 : impl<'a> BlockReaderRef<'a> {
115 0 : fn read_blk_slice(slice: &[u8], blknum: u32) -> std::io::Result<BlockLease> {
116 0 : let start = (blknum as usize).checked_mul(PAGE_SZ).unwrap();
117 0 : let end = start.checked_add(PAGE_SZ).unwrap();
118 0 : if end > slice.len() {
119 0 : return Err(std::io::Error::new(
120 0 : std::io::ErrorKind::UnexpectedEof,
121 0 : format!("slice too short, len={} end={}", slice.len(), end),
122 0 : ));
123 0 : }
124 0 : let slice = &slice[start..end];
125 0 : let page_sized: &[u8; PAGE_SZ] = slice
126 0 : .try_into()
127 0 : .expect("we add PAGE_SZ to start, so the slice must have PAGE_SZ");
128 0 : Ok(BlockLease::Slice(page_sized))
129 0 : }
130 : }
131 :
132 : ///
133 : /// A "cursor" for efficiently reading multiple pages from a BlockReader
134 : ///
135 : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
136 : /// that in many cases you can use a BlockCursor as a drop-in replacement for
137 : /// the underlying BlockReader. For example:
138 : ///
139 : /// ```no_run
140 : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
141 : /// # use pageserver::context::RequestContext;
142 : /// # let reader: FileBlockReader = unimplemented!("stub");
143 : /// # let ctx: RequestContext = unimplemented!("stub");
144 : /// let cursor = reader.block_cursor();
145 : /// let buf = cursor.read_blk(1, &ctx);
146 : /// // do stuff with 'buf'
147 : /// let buf = cursor.read_blk(2, &ctx);
148 : /// // do stuff with 'buf'
149 : /// ```
150 : ///
151 : pub struct BlockCursor<'a> {
152 : pub(super) read_compressed: bool,
153 : reader: BlockReaderRef<'a>,
154 : }
155 :
156 : impl<'a> BlockCursor<'a> {
157 3082456 : pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
158 3082456 : Self::new_with_compression(reader, false)
159 3082456 : }
160 3569064 : pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
161 3569064 : BlockCursor {
162 3569064 : read_compressed,
163 3569064 : reader,
164 3569064 : }
165 3569064 : }
166 : // Needed by cli
167 0 : pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
168 0 : BlockCursor {
169 0 : read_compressed: false,
170 0 : reader: BlockReaderRef::FileBlockReader(reader),
171 0 : }
172 0 : }
173 :
174 : /// Read a block.
175 : ///
176 : /// Returns a "lease" object that can be used to
177 : /// access to the contents of the page. (For the page cache, the
178 : /// lease object represents a lock on the buffer.)
179 : #[inline(always)]
180 8732158 : pub async fn read_blk(
181 8732158 : &self,
182 8732158 : blknum: u32,
183 8732158 : ctx: &RequestContext,
184 8732158 : ) -> Result<BlockLease, std::io::Error> {
185 8732158 : self.reader.read_blk(blknum, ctx).await
186 8732158 : }
187 : }
188 :
189 : /// An adapter for reading a (virtual) file using the page cache.
190 : ///
191 : /// The file is assumed to be immutable. This doesn't provide any functions
192 : /// for modifying the file, nor for invalidating the cache if it is modified.
193 : #[derive(Clone)]
194 : pub struct FileBlockReader<'a> {
195 : pub file: &'a VirtualFile,
196 :
197 : /// Unique ID of this file, used as key in the page cache.
198 : file_id: page_cache::FileId,
199 :
200 : compressed_reads: bool,
201 : }
202 :
203 : impl<'a> FileBlockReader<'a> {
204 2288625 : pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
205 2288625 : Self::new_with_compression(file, file_id, false)
206 2288625 : }
207 :
208 2295880 : pub fn new_with_compression(
209 2295880 : file: &'a VirtualFile,
210 2295880 : file_id: FileId,
211 2295880 : compressed_reads: bool,
212 2295880 : ) -> Self {
213 2295880 : FileBlockReader {
214 2295880 : file_id,
215 2295880 : file,
216 2295880 : compressed_reads,
217 2295880 : }
218 2295880 : }
219 :
220 : /// Read a page from the underlying file into given buffer.
221 60136 : async fn fill_buffer(
222 60136 : &self,
223 60136 : buf: PageWriteGuard<'static>,
224 60136 : blkno: u32,
225 60136 : ctx: &RequestContext,
226 60136 : ) -> Result<PageWriteGuard<'static>, std::io::Error> {
227 60136 : assert!(buf.len() == PAGE_SZ);
228 60136 : self.file
229 60136 : .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
230 36353 : .await
231 60136 : }
232 : /// Read a block.
233 : ///
234 : /// Returns a "lease" object that can be used to
235 : /// access to the contents of the page. (For the page cache, the
236 : /// lease object represents a lock on the buffer.)
237 2739717 : pub async fn read_blk<'b>(
238 2739717 : &self,
239 2739717 : blknum: u32,
240 2739717 : ctx: &RequestContext,
241 2739717 : ) -> Result<BlockLease<'b>, std::io::Error> {
242 2739717 : let cache = page_cache::get();
243 2739717 : match cache
244 2739717 : .read_immutable_buf(self.file_id, blknum, ctx)
245 33971 : .await
246 2739717 : .map_err(|e| {
247 0 : std::io::Error::new(
248 0 : std::io::ErrorKind::Other,
249 0 : format!("Failed to read immutable buf: {e:#}"),
250 0 : )
251 2739717 : })? {
252 2679581 : ReadBufResult::Found(guard) => Ok(guard.into()),
253 60136 : ReadBufResult::NotFound(write_guard) => {
254 : // Read the page from disk into the buffer
255 60136 : let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
256 60136 : Ok(write_guard.mark_valid().into())
257 : }
258 : }
259 2739717 : }
260 : }
261 :
262 : impl BlockReader for FileBlockReader<'_> {
263 486576 : fn block_cursor(&self) -> BlockCursor<'_> {
264 486576 : BlockCursor::new_with_compression(
265 486576 : BlockReaderRef::FileBlockReader(self),
266 486576 : self.compressed_reads,
267 486576 : )
268 486576 : }
269 : }
270 :
271 : ///
272 : /// Trait for block-oriented output
273 : ///
274 : pub trait BlockWriter {
275 : ///
276 : /// Write a page to the underlying storage.
277 : ///
278 : /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
279 : /// written to.
280 : ///
281 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
282 : }
283 :
284 : ///
285 : /// A simple in-memory buffer of blocks.
286 : ///
287 : pub struct BlockBuf {
288 : pub blocks: Vec<Bytes>,
289 : }
290 : impl BlockWriter for BlockBuf {
291 14223 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
292 14223 : assert!(buf.len() == PAGE_SZ);
293 14223 : let blknum = self.blocks.len();
294 14223 : self.blocks.push(buf);
295 14223 : Ok(blknum as u32)
296 14223 : }
297 : }
298 :
299 : impl BlockBuf {
300 1572 : pub fn new() -> Self {
301 1572 : BlockBuf { blocks: Vec::new() }
302 1572 : }
303 :
304 2023972 : pub fn size(&self) -> u64 {
305 2023972 : (self.blocks.len() * PAGE_SZ) as u64
306 2023972 : }
307 : }
308 : impl Default for BlockBuf {
309 0 : fn default() -> Self {
310 0 : Self::new()
311 0 : }
312 : }
|