Line data Source code
1 : //!
2 : //! Low-level Block-oriented I/O functions
3 : //!
4 :
5 : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
6 : use crate::context::RequestContext;
7 : use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
8 : use crate::virtual_file::VirtualFile;
9 : use bytes::Bytes;
10 : use std::ops::Deref;
11 :
12 : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
13 : /// blocks, using the page cache
14 : ///
15 : /// There are currently two implementations: EphemeralFile, and FileBlockReader
16 : /// below.
17 : pub trait BlockReader {
18 : ///
19 : /// Create a new "cursor" for reading from this reader.
20 : ///
21 : /// A cursor caches the last accessed page, allowing for faster
22 : /// access if the same block is accessed repeatedly.
23 : fn block_cursor(&self) -> BlockCursor<'_>;
24 : }
25 :
26 : impl<B> BlockReader for &B
27 : where
28 : B: BlockReader,
29 : {
30 60 : fn block_cursor(&self) -> BlockCursor<'_> {
31 60 : (*self).block_cursor()
32 60 : }
33 : }
34 :
35 : /// Reference to an in-memory copy of an immutable on-disk block.
36 : pub enum BlockLease<'a> {
37 : PageReadGuard(PageReadGuard<'static>),
38 : EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
39 : Slice(&'a [u8; PAGE_SZ]),
40 : #[cfg(test)]
41 : Arc(std::sync::Arc<[u8; PAGE_SZ]>),
42 : #[cfg(test)]
43 : Vec(Vec<u8>),
44 : }
45 :
46 : impl From<PageReadGuard<'static>> for BlockLease<'static> {
47 1650306 : fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
48 1650306 : BlockLease::PageReadGuard(value)
49 1650306 : }
50 : }
51 :
52 : #[cfg(test)]
53 : impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
54 3050538 : fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
55 3050538 : BlockLease::Arc(value)
56 3050538 : }
57 : }
58 :
59 : impl<'a> Deref for BlockLease<'a> {
60 : type Target = [u8; PAGE_SZ];
61 :
62 4806960 : fn deref(&self) -> &Self::Target {
63 4806960 : match self {
64 1650546 : BlockLease::PageReadGuard(v) => v.deref(),
65 0 : BlockLease::EphemeralFileMutableTail(v) => v,
66 0 : BlockLease::Slice(v) => v,
67 : #[cfg(test)]
68 3050538 : BlockLease::Arc(v) => v.deref(),
69 : #[cfg(test)]
70 105876 : BlockLease::Vec(v) => {
71 105876 : TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
72 : }
73 : }
74 4806960 : }
75 : }
76 :
77 : /// Provides the ability to read blocks from different sources,
78 : /// similar to using traits for this purpose.
79 : ///
80 : /// Unlike traits, we also support the read function to be async though.
81 : pub(crate) enum BlockReaderRef<'a> {
82 : FileBlockReader(&'a FileBlockReader<'a>),
83 : Adapter(Adapter<&'a DeltaLayerInner>),
84 : #[cfg(test)]
85 : TestDisk(&'a super::disk_btree::tests::TestDisk),
86 : #[cfg(test)]
87 : VirtualFile(&'a VirtualFile),
88 : }
89 :
90 : impl<'a> BlockReaderRef<'a> {
91 : #[inline(always)]
92 4760142 : async fn read_blk(
93 4760142 : &self,
94 4760142 : blknum: u32,
95 4760142 : ctx: &RequestContext,
96 4760142 : ) -> Result<BlockLease, std::io::Error> {
97 : use BlockReaderRef::*;
98 4760142 : match self {
99 1646832 : FileBlockReader(r) => r.read_blk(blknum, ctx).await,
100 24 : Adapter(r) => r.read_blk(blknum, ctx).await,
101 : #[cfg(test)]
102 3050538 : TestDisk(r) => r.read_blk(blknum),
103 : #[cfg(test)]
104 62748 : VirtualFile(r) => r.read_blk(blknum, ctx).await,
105 : }
106 4760142 : }
107 : }
108 :
109 : ///
110 : /// A "cursor" for efficiently reading multiple pages from a BlockReader
111 : ///
112 : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
113 : /// that in many cases you can use a BlockCursor as a drop-in replacement for
114 : /// the underlying BlockReader. For example:
115 : ///
116 : /// ```no_run
117 : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
118 : /// # use pageserver::context::RequestContext;
119 : /// # let reader: FileBlockReader = unimplemented!("stub");
120 : /// # let ctx: RequestContext = unimplemented!("stub");
121 : /// let cursor = reader.block_cursor();
122 : /// let buf = cursor.read_blk(1, &ctx);
123 : /// // do stuff with 'buf'
124 : /// let buf = cursor.read_blk(2, &ctx);
125 : /// // do stuff with 'buf'
126 : /// ```
127 : ///
128 : pub struct BlockCursor<'a> {
129 : pub(super) read_compressed: bool,
130 : reader: BlockReaderRef<'a>,
131 : }
132 :
133 : impl<'a> BlockCursor<'a> {
134 1233807 : pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
135 1233807 : Self::new_with_compression(reader, false)
136 1233807 : }
137 2065293 : pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
138 2065293 : BlockCursor {
139 2065293 : read_compressed,
140 2065293 : reader,
141 2065293 : }
142 2065293 : }
143 : // Needed by cli
144 0 : pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
145 0 : BlockCursor {
146 0 : read_compressed: false,
147 0 : reader: BlockReaderRef::FileBlockReader(reader),
148 0 : }
149 0 : }
150 :
151 : /// Read a block.
152 : ///
153 : /// Returns a "lease" object that can be used to
154 : /// access to the contents of the page. (For the page cache, the
155 : /// lease object represents a lock on the buffer.)
156 : #[inline(always)]
157 4760142 : pub async fn read_blk(
158 4760142 : &self,
159 4760142 : blknum: u32,
160 4760142 : ctx: &RequestContext,
161 4760142 : ) -> Result<BlockLease, std::io::Error> {
162 4760142 : self.reader.read_blk(blknum, ctx).await
163 4760142 : }
164 : }
165 :
166 : /// An adapter for reading a (virtual) file using the page cache.
167 : ///
168 : /// The file is assumed to be immutable. This doesn't provide any functions
169 : /// for modifying the file, nor for invalidating the cache if it is modified.
170 : #[derive(Clone)]
171 : pub struct FileBlockReader<'a> {
172 : pub file: &'a VirtualFile,
173 :
174 : /// Unique ID of this file, used as key in the page cache.
175 : file_id: page_cache::FileId,
176 :
177 : compressed_reads: bool,
178 : }
179 :
180 : impl<'a> FileBlockReader<'a> {
181 642991 : pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
182 642991 : FileBlockReader {
183 642991 : file_id,
184 642991 : file,
185 642991 : compressed_reads: true,
186 642991 : }
187 642991 : }
188 :
189 : /// Read a page from the underlying file into given buffer.
190 96350 : async fn fill_buffer(
191 96350 : &self,
192 96350 : buf: PageWriteGuard<'static>,
193 96350 : blkno: u32,
194 96350 : ctx: &RequestContext,
195 96350 : ) -> Result<PageWriteGuard<'static>, std::io::Error> {
196 96350 : assert!(buf.len() == PAGE_SZ);
197 96350 : self.file
198 96350 : .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
199 65682 : .await
200 96350 : }
201 : /// Read a block.
202 : ///
203 : /// Returns a "lease" object that can be used to
204 : /// access to the contents of the page. (For the page cache, the
205 : /// lease object represents a lock on the buffer.)
206 1650306 : pub async fn read_blk<'b>(
207 1650306 : &self,
208 1650306 : blknum: u32,
209 1650306 : ctx: &RequestContext,
210 1650306 : ) -> Result<BlockLease<'b>, std::io::Error> {
211 1650306 : let cache = page_cache::get();
212 1650306 : match cache
213 1650306 : .read_immutable_buf(self.file_id, blknum, ctx)
214 14762 : .await
215 1650306 : .map_err(|e| {
216 0 : std::io::Error::new(
217 0 : std::io::ErrorKind::Other,
218 0 : format!("Failed to read immutable buf: {e:#}"),
219 0 : )
220 1650306 : })? {
221 1553956 : ReadBufResult::Found(guard) => Ok(guard.into()),
222 96350 : ReadBufResult::NotFound(write_guard) => {
223 : // Read the page from disk into the buffer
224 96350 : let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
225 96350 : Ok(write_guard.mark_valid().into())
226 : }
227 : }
228 1650306 : }
229 : }
230 :
231 : impl BlockReader for FileBlockReader<'_> {
232 831390 : fn block_cursor(&self) -> BlockCursor<'_> {
233 831390 : BlockCursor::new_with_compression(
234 831390 : BlockReaderRef::FileBlockReader(self),
235 831390 : self.compressed_reads,
236 831390 : )
237 831390 : }
238 : }
239 :
240 : ///
241 : /// Trait for block-oriented output
242 : ///
243 : pub trait BlockWriter {
244 : ///
245 : /// Write a page to the underlying storage.
246 : ///
247 : /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
248 : /// written to.
249 : ///
250 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
251 : }
252 :
253 : ///
254 : /// A simple in-memory buffer of blocks.
255 : ///
256 : pub struct BlockBuf {
257 : pub blocks: Vec<Bytes>,
258 : }
259 : impl BlockWriter for BlockBuf {
260 43160 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
261 43160 : assert!(buf.len() == PAGE_SZ);
262 43160 : let blknum = self.blocks.len();
263 43160 : self.blocks.push(buf);
264 43160 : Ok(blknum as u32)
265 43160 : }
266 : }
267 :
268 : impl BlockBuf {
269 5736 : pub fn new() -> Self {
270 5736 : BlockBuf { blocks: Vec::new() }
271 5736 : }
272 :
273 6142314 : pub fn size(&self) -> u64 {
274 6142314 : (self.blocks.len() * PAGE_SZ) as u64
275 6142314 : }
276 : }
277 : impl Default for BlockBuf {
278 0 : fn default() -> Self {
279 0 : Self::new()
280 0 : }
281 : }
|