Line data Source code
1 : //!
2 : //! Low-level Block-oriented I/O functions
3 : //!
4 :
5 : use super::ephemeral_file::EphemeralFile;
6 : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
7 : use crate::context::RequestContext;
8 : use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
9 : use crate::virtual_file::VirtualFile;
10 : use bytes::Bytes;
11 : use std::ops::Deref;
12 :
13 : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
14 : /// blocks, using the page cache
15 : ///
16 : /// There are currently two implementations: EphemeralFile, and FileBlockReader
17 : /// below.
18 : pub trait BlockReader {
19 : ///
20 : /// Create a new "cursor" for reading from this reader.
21 : ///
22 : /// A cursor caches the last accessed page, allowing for faster
23 : /// access if the same block is accessed repeatedly.
24 : fn block_cursor(&self) -> BlockCursor<'_>;
25 : }
26 :
27 : impl<B> BlockReader for &B
28 : where
29 : B: BlockReader,
30 : {
31 16822805 : fn block_cursor(&self) -> BlockCursor<'_> {
32 16822805 : (*self).block_cursor()
33 16822805 : }
34 : }
35 :
36 : /// Reference to an in-memory copy of an immutable on-disk block.
37 : pub enum BlockLease<'a> {
38 : PageReadGuard(PageReadGuard<'static>),
39 : EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
40 : #[cfg(test)]
41 : Arc(std::sync::Arc<[u8; PAGE_SZ]>),
42 : #[cfg(test)]
43 : Vec(Vec<u8>),
44 : }
45 :
46 : impl From<PageReadGuard<'static>> for BlockLease<'static> {
47 93073305 : fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
48 93073305 : BlockLease::PageReadGuard(value)
49 93073305 : }
50 : }
51 :
52 : #[cfg(test)]
53 : impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
54 1016586 : fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
55 1016586 : BlockLease::Arc(value)
56 1016586 : }
57 : }
58 :
59 : impl<'a> Deref for BlockLease<'a> {
60 : type Target = [u8; PAGE_SZ];
61 :
62 340658224 : fn deref(&self) -> &Self::Target {
63 340658224 : match self {
64 337893020 : BlockLease::PageReadGuard(v) => v.deref(),
65 1714110 : BlockLease::EphemeralFileMutableTail(v) => v,
66 : #[cfg(test)]
67 1016586 : BlockLease::Arc(v) => v.deref(),
68 : #[cfg(test)]
69 34508 : BlockLease::Vec(v) => {
70 34508 : TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
71 : }
72 : }
73 340658224 : }
74 : }
75 :
76 : /// Provides the ability to read blocks from different sources,
77 : /// similar to using traits for this purpose.
78 : ///
79 : /// Unlike traits, we also support the read function to be async though.
80 : pub(crate) enum BlockReaderRef<'a> {
81 : FileBlockReader(&'a FileBlockReader),
82 : EphemeralFile(&'a EphemeralFile),
83 : Adapter(Adapter<&'a DeltaLayerInner>),
84 : #[cfg(test)]
85 : TestDisk(&'a super::disk_btree::tests::TestDisk),
86 : #[cfg(test)]
87 : VirtualFile(&'a VirtualFile),
88 : }
89 :
90 : impl<'a> BlockReaderRef<'a> {
91 : #[inline(always)]
92 168643249 : async fn read_blk(
93 168643249 : &self,
94 168643249 : blknum: u32,
95 168643249 : ctx: &RequestContext,
96 168643249 : ) -> Result<BlockLease, std::io::Error> {
97 168643249 : use BlockReaderRef::*;
98 168643249 : match self {
99 74423319 : FileBlockReader(r) => r.read_blk(blknum, ctx).await,
100 74566561 : EphemeralFile(r) => r.read_blk(blknum, ctx).await,
101 18616583 : Adapter(r) => r.read_blk(blknum, ctx).await,
102 : #[cfg(test)]
103 1016586 : TestDisk(r) => r.read_blk(blknum),
104 : #[cfg(test)]
105 20200 : VirtualFile(r) => r.read_blk(blknum).await,
106 : }
107 168643246 : }
108 : }
109 :
110 : ///
111 : /// A "cursor" for efficiently reading multiple pages from a BlockReader
112 : ///
113 : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
114 : /// that in many cases you can use a BlockCursor as a drop-in replacement for
115 : /// the underlying BlockReader. For example:
116 : ///
117 : /// ```no_run
118 : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
119 : /// # use pageserver::context::RequestContext;
120 : /// # let reader: FileBlockReader = unimplemented!("stub");
121 : /// # let ctx: RequestContext = unimplemented!("stub");
122 : /// let cursor = reader.block_cursor();
123 : /// let buf = cursor.read_blk(1, &ctx);
124 : /// // do stuff with 'buf'
125 : /// let buf = cursor.read_blk(2, &ctx);
126 : /// // do stuff with 'buf'
127 : /// ```
128 : ///
129 : pub struct BlockCursor<'a> {
130 : reader: BlockReaderRef<'a>,
131 : }
132 :
133 : impl<'a> BlockCursor<'a> {
134 57374785 : pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
135 57374785 : BlockCursor { reader }
136 57374785 : }
137 : // Needed by cli
138 0 : pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
139 0 : BlockCursor {
140 0 : reader: BlockReaderRef::FileBlockReader(reader),
141 0 : }
142 0 : }
143 :
144 : /// Read a block.
145 : ///
146 : /// Returns a "lease" object that can be used to
147 : /// access to the contents of the page. (For the page cache, the
148 : /// lease object represents a lock on the buffer.)
149 : #[inline(always)]
150 168643316 : pub async fn read_blk(
151 168643316 : &self,
152 168643316 : blknum: u32,
153 168643316 : ctx: &RequestContext,
154 168643316 : ) -> Result<BlockLease, std::io::Error> {
155 168643249 : self.reader.read_blk(blknum, ctx).await
156 168643246 : }
157 : }
158 :
159 : /// An adapter for reading a (virtual) file using the page cache.
160 : ///
161 : /// The file is assumed to be immutable. This doesn't provide any functions
162 : /// for modifying the file, nor for invalidating the cache if it is modified.
163 : pub struct FileBlockReader {
164 : pub file: VirtualFile,
165 :
166 : /// Unique ID of this file, used as key in the page cache.
167 : file_id: page_cache::FileId,
168 : }
169 :
170 : impl FileBlockReader {
171 33391 : pub fn new(file: VirtualFile) -> Self {
172 33391 : let file_id = page_cache::next_file_id();
173 33391 :
174 33391 : FileBlockReader { file_id, file }
175 33391 : }
176 :
177 : /// Read a page from the underlying file into given buffer.
178 3575537 : async fn fill_buffer(
179 3575537 : &self,
180 3575537 : buf: PageWriteGuard<'static>,
181 3575537 : blkno: u32,
182 3575537 : ) -> Result<PageWriteGuard<'static>, std::io::Error> {
183 3575536 : assert!(buf.len() == PAGE_SZ);
184 3575536 : self.file
185 3575536 : .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
186 60942 : .await
187 3575536 : }
188 : /// Read a block.
189 : ///
190 : /// Returns a "lease" object that can be used to
191 : /// access to the contents of the page. (For the page cache, the
192 : /// lease object represents a lock on the buffer.)
193 93073307 : pub async fn read_blk(
194 93073307 : &self,
195 93073307 : blknum: u32,
196 93073307 : ctx: &RequestContext,
197 93073307 : ) -> Result<BlockLease, std::io::Error> {
198 93073293 : let cache = page_cache::get();
199 93073293 : match cache
200 93073293 : .read_immutable_buf(self.file_id, blknum, ctx)
201 1213687 : .await
202 93073290 : .map_err(|e| {
203 0 : std::io::Error::new(
204 0 : std::io::ErrorKind::Other,
205 0 : format!("Failed to read immutable buf: {e:#}"),
206 0 : )
207 93073290 : })? {
208 89497754 : ReadBufResult::Found(guard) => Ok(guard.into()),
209 3575536 : ReadBufResult::NotFound(write_guard) => {
210 : // Read the page from disk into the buffer
211 3575536 : let write_guard = self.fill_buffer(write_guard, blknum).await?;
212 3575536 : Ok(write_guard.mark_valid().into())
213 : }
214 : }
215 93073290 : }
216 : }
217 :
218 : impl BlockReader for FileBlockReader {
219 33641449 : fn block_cursor(&self) -> BlockCursor<'_> {
220 33641449 : BlockCursor::new(BlockReaderRef::FileBlockReader(self))
221 33641449 : }
222 : }
223 :
224 : ///
225 : /// Trait for block-oriented output
226 : ///
227 : pub trait BlockWriter {
228 : ///
229 : /// Write a page to the underlying storage.
230 : ///
231 : /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
232 : /// written to.
233 : ///
234 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
235 : }
236 :
237 : ///
238 : /// A simple in-memory buffer of blocks.
239 : ///
240 : pub struct BlockBuf {
241 : pub blocks: Vec<Bytes>,
242 : }
243 : impl BlockWriter for BlockBuf {
244 124282 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
245 124282 : assert!(buf.len() == PAGE_SZ);
246 124282 : let blknum = self.blocks.len();
247 124282 : self.blocks.push(buf);
248 124282 : Ok(blknum as u32)
249 124282 : }
250 : }
251 :
252 : impl BlockBuf {
253 22367 : pub fn new() -> Self {
254 22367 : BlockBuf { blocks: Vec::new() }
255 22367 : }
256 :
257 2280229 : pub fn size(&self) -> u64 {
258 2280229 : (self.blocks.len() * PAGE_SZ) as u64
259 2280229 : }
260 : }
261 : impl Default for BlockBuf {
262 0 : fn default() -> Self {
263 0 : Self::new()
264 0 : }
265 : }
|