TLA Line data Source code
1 : //!
2 : //! Low-level Block-oriented I/O functions
3 : //!
4 :
5 : use super::ephemeral_file::EphemeralFile;
6 : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
7 : use crate::context::RequestContext;
8 : use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
9 : use crate::virtual_file::VirtualFile;
10 : use bytes::Bytes;
11 : use std::ops::{Deref, DerefMut};
12 :
13 : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
14 : /// blocks, using the page cache
15 : ///
16 : /// There are currently two implementations: EphemeralFile, and FileBlockReader
17 : /// below.
18 : pub trait BlockReader {
19 : ///
20 : /// Create a new "cursor" for reading from this reader.
21 : ///
22 : /// A cursor caches the last accessed page, allowing for faster
23 : /// access if the same block is accessed repeatedly.
24 : fn block_cursor(&self) -> BlockCursor<'_>;
25 : }
26 :
27 : impl<B> BlockReader for &B
28 : where
29 : B: BlockReader,
30 : {
31 CBC 23854577 : fn block_cursor(&self) -> BlockCursor<'_> {
32 23854577 : (*self).block_cursor()
33 23854577 : }
34 : }
35 :
36 : /// Reference to an in-memory copy of an immutable on-disk block.
37 : pub enum BlockLease<'a> {
38 : PageReadGuard(PageReadGuard<'static>),
39 : EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
40 : #[cfg(test)]
41 : Arc(std::sync::Arc<[u8; PAGE_SZ]>),
42 : }
43 :
44 : impl From<PageReadGuard<'static>> for BlockLease<'static> {
45 202242987 : fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
46 202242987 : BlockLease::PageReadGuard(value)
47 202242987 : }
48 : }
49 :
50 : #[cfg(test)]
51 : impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
52 522247 : fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
53 522247 : BlockLease::Arc(value)
54 522247 : }
55 : }
56 :
57 : impl<'a> Deref for BlockLease<'a> {
58 : type Target = [u8; PAGE_SZ];
59 :
60 627462856 : fn deref(&self) -> &Self::Target {
61 627462856 : match self {
62 621465340 : BlockLease::PageReadGuard(v) => v.deref(),
63 5467071 : BlockLease::EphemeralFileMutableTail(v) => v,
64 : #[cfg(test)]
65 530445 : BlockLease::Arc(v) => v.deref(),
66 : }
67 627462856 : }
68 : }
69 :
70 : /// Provides the ability to read blocks from different sources,
71 : /// similar to using traits for this purpose.
72 : ///
73 : /// Unlike traits, we also support the read function to be async though.
74 : pub(crate) enum BlockReaderRef<'a> {
75 : FileBlockReader(&'a FileBlockReader),
76 : EphemeralFile(&'a EphemeralFile),
77 : Adapter(Adapter<&'a DeltaLayerInner>),
78 : #[cfg(test)]
79 : TestDisk(&'a super::disk_btree::tests::TestDisk),
80 : #[cfg(test)]
81 : VirtualFile(&'a VirtualFile),
82 : }
83 :
84 : impl<'a> BlockReaderRef<'a> {
85 : #[inline(always)]
86 291324726 : async fn read_blk(
87 291324726 : &self,
88 291324726 : blknum: u32,
89 291324726 : ctx: &RequestContext,
90 291324726 : ) -> Result<BlockLease, std::io::Error> {
91 291324726 : use BlockReaderRef::*;
92 291324726 : match self {
93 171161445 : FileBlockReader(r) => r.read_blk(blknum, ctx).await,
94 88574108 : EphemeralFile(r) => r.read_blk(blknum, ctx).await,
95 31066926 : Adapter(r) => r.read_blk(blknum, ctx).await,
96 : #[cfg(test)]
97 508133 : TestDisk(r) => r.read_blk(blknum),
98 : #[cfg(test)]
99 14114 : VirtualFile(r) => r.read_blk(blknum).await,
100 : }
101 291324720 : }
102 : }
103 :
104 : ///
105 : /// A "cursor" for efficiently reading multiple pages from a BlockReader
106 : ///
107 : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
108 : /// that in many cases you can use a BlockCursor as a drop-in replacement for
109 : /// the underlying BlockReader. For example:
110 : ///
111 : /// ```no_run
112 : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
113 : /// # use pageserver::context::RequestContext;
114 : /// # let reader: FileBlockReader = unimplemented!("stub");
115 : /// # let ctx: RequestContext = unimplemented!("stub");
116 : /// let cursor = reader.block_cursor();
117 : /// let buf = cursor.read_blk(1, &ctx);
118 : /// // do stuff with 'buf'
119 : /// let buf = cursor.read_blk(2, &ctx);
120 : /// // do stuff with 'buf'
121 : /// ```
122 : ///
123 : pub struct BlockCursor<'a> {
124 : reader: BlockReaderRef<'a>,
125 : }
126 :
127 : impl<'a> BlockCursor<'a> {
128 83614466 : pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
129 83614466 : BlockCursor { reader }
130 83614466 : }
131 : // Needed by cli
132 UBC 0 : pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
133 0 : BlockCursor {
134 0 : reader: BlockReaderRef::FileBlockReader(reader),
135 0 : }
136 0 : }
137 :
138 : /// Read a block.
139 : ///
140 : /// Returns a "lease" object that can be used to
141 : /// access to the contents of the page. (For the page cache, the
142 : /// lease object represents a lock on the buffer.)
143 : #[inline(always)]
144 CBC 291324507 : pub async fn read_blk(
145 291324507 : &self,
146 291324507 : blknum: u32,
147 291324507 : ctx: &RequestContext,
148 291324507 : ) -> Result<BlockLease, std::io::Error> {
149 291324726 : self.reader.read_blk(blknum, ctx).await
150 291324720 : }
151 : }
152 :
153 : /// An adapter for reading a (virtual) file using the page cache.
154 : ///
155 : /// The file is assumed to be immutable. This doesn't provide any functions
156 : /// for modifying the file, nor for invalidating the cache if it is modified.
157 : pub struct FileBlockReader {
158 : pub file: VirtualFile,
159 :
160 : /// Unique ID of this file, used as key in the page cache.
161 : file_id: page_cache::FileId,
162 : }
163 :
164 : impl FileBlockReader {
165 14809 : pub fn new(file: VirtualFile) -> Self {
166 14809 : let file_id = page_cache::next_file_id();
167 14809 :
168 14809 : FileBlockReader { file_id, file }
169 14809 : }
170 :
171 : /// Read a page from the underlying file into given buffer.
172 4869491 : async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
173 4869491 : assert!(buf.len() == PAGE_SZ);
174 4869491 : self.file
175 4869491 : .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
176 UBC 0 : .await
177 CBC 4869491 : }
178 : /// Read a block.
179 : ///
180 : /// Returns a "lease" object that can be used to
181 : /// access to the contents of the page. (For the page cache, the
182 : /// lease object represents a lock on the buffer.)
183 202242988 : pub async fn read_blk(
184 202242988 : &self,
185 202242988 : blknum: u32,
186 202242988 : ctx: &RequestContext,
187 202243180 : ) -> Result<BlockLease, std::io::Error> {
188 202243180 : let cache = page_cache::get();
189 202243180 : match cache
190 202243180 : .read_immutable_buf(self.file_id, blknum, ctx)
191 2603217 : .await
192 202243175 : .map_err(|e| {
193 UBC 0 : std::io::Error::new(
194 0 : std::io::ErrorKind::Other,
195 0 : format!("Failed to read immutable buf: {e:#}"),
196 0 : )
197 CBC 202243175 : })? {
198 197373684 : ReadBufResult::Found(guard) => Ok(guard.into()),
199 4869491 : ReadBufResult::NotFound(mut write_guard) => {
200 4869491 : // Read the page from disk into the buffer
201 4869491 : self.fill_buffer(write_guard.deref_mut(), blknum).await?;
202 4869491 : Ok(write_guard.mark_valid().into())
203 : }
204 : }
205 202243175 : }
206 : }
207 :
208 : impl BlockReader for FileBlockReader {
209 47704777 : fn block_cursor(&self) -> BlockCursor<'_> {
210 47704777 : BlockCursor::new(BlockReaderRef::FileBlockReader(self))
211 47704777 : }
212 : }
213 :
214 : ///
215 : /// Trait for block-oriented output
216 : ///
217 : pub trait BlockWriter {
218 : ///
219 : /// Write a page to the underlying storage.
220 : ///
221 : /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
222 : /// written to.
223 : ///
224 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
225 : }
226 :
227 : ///
228 : /// A simple in-memory buffer of blocks.
229 : ///
230 : pub struct BlockBuf {
231 : pub blocks: Vec<Bytes>,
232 : }
233 : impl BlockWriter for BlockBuf {
234 181007 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
235 181007 : assert!(buf.len() == PAGE_SZ);
236 181007 : let blknum = self.blocks.len();
237 181007 : self.blocks.push(buf);
238 181007 : Ok(blknum as u32)
239 181007 : }
240 : }
241 :
242 : impl BlockBuf {
243 19134 : pub fn new() -> Self {
244 19134 : BlockBuf { blocks: Vec::new() }
245 19134 : }
246 :
247 1870701 : pub fn size(&self) -> u64 {
248 1870701 : (self.blocks.len() * PAGE_SZ) as u64
249 1870701 : }
250 : }
251 : impl Default for BlockBuf {
252 UBC 0 : fn default() -> Self {
253 0 : Self::new()
254 0 : }
255 : }
|