Line data Source code
1 : //!
2 : //! Low-level Block-oriented I/O functions
3 : //!
4 :
5 : use std::ops::Deref;
6 :
7 : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
8 : use crate::context::RequestContext;
9 : use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult};
10 : #[cfg(test)]
11 : use crate::virtual_file::IoBufferMut;
12 : use crate::virtual_file::{IoBuffer, VirtualFile};
13 :
14 : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
15 : /// blocks, using the page cache
16 : ///
17 : /// There are currently two implementations: EphemeralFile, and FileBlockReader
18 : /// below.
19 : pub trait BlockReader {
20 : ///
21 : /// Create a new "cursor" for reading from this reader.
22 : ///
23 : /// A cursor caches the last accessed page, allowing for faster
24 : /// access if the same block is accessed repeatedly.
25 : fn block_cursor(&self) -> BlockCursor<'_>;
26 : }
27 :
28 : impl<B> BlockReader for &B
29 : where
30 : B: BlockReader,
31 : {
32 120 : fn block_cursor(&self) -> BlockCursor<'_> {
33 120 : (*self).block_cursor()
34 120 : }
35 : }
36 :
37 : /// Reference to an in-memory copy of an immutable on-disk block.
38 : pub enum BlockLease<'a> {
39 : PageReadGuard(PageReadGuard<'static>),
40 : EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
41 : Slice(&'a [u8; PAGE_SZ]),
42 : #[cfg(test)]
43 : Arc(std::sync::Arc<[u8; PAGE_SZ]>),
44 : #[cfg(test)]
45 : IoBufferMut(IoBufferMut),
46 : }
47 :
48 : impl From<PageReadGuard<'static>> for BlockLease<'static> {
49 3384956 : fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
50 3384956 : BlockLease::PageReadGuard(value)
51 3384956 : }
52 : }
53 :
54 : #[cfg(test)]
55 : impl From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'_> {
56 6100260 : fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
57 6100260 : BlockLease::Arc(value)
58 6100260 : }
59 : }
60 :
61 : impl Deref for BlockLease<'_> {
62 : type Target = [u8; PAGE_SZ];
63 :
64 9591572 : fn deref(&self) -> &Self::Target {
65 9591572 : match self {
66 3385436 : BlockLease::PageReadGuard(v) => v.deref(),
67 0 : BlockLease::EphemeralFileMutableTail(v) => v,
68 0 : BlockLease::Slice(v) => v,
69 : #[cfg(test)]
70 6100260 : BlockLease::Arc(v) => v.deref(),
71 : #[cfg(test)]
72 105876 : BlockLease::IoBufferMut(v) => {
73 105876 : TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
74 : }
75 : }
76 9591572 : }
77 : }
78 :
79 : /// Provides the ability to read blocks from different sources,
80 : /// similar to using traits for this purpose.
81 : ///
82 : /// Unlike traits, we also support the read function to be async though.
83 : pub(crate) enum BlockReaderRef<'a> {
84 : FileBlockReader(&'a FileBlockReader<'a>),
85 : Adapter(Adapter<&'a DeltaLayerInner>),
86 : #[cfg(test)]
87 : TestDisk(&'a super::disk_btree::tests::TestDisk),
88 : #[cfg(test)]
89 : VirtualFile(&'a VirtualFile),
90 : }
91 :
92 : impl BlockReaderRef<'_> {
93 : #[inline(always)]
94 9540452 : async fn read_blk(
95 9540452 : &self,
96 9540452 : blknum: u32,
97 9540452 : ctx: &RequestContext,
98 9540452 : ) -> Result<BlockLease, std::io::Error> {
99 : use BlockReaderRef::*;
100 9540452 : match self {
101 3377396 : FileBlockReader(r) => r.read_blk(blknum, ctx).await,
102 48 : Adapter(r) => r.read_blk(blknum, ctx).await,
103 : #[cfg(test)]
104 6100260 : TestDisk(r) => r.read_blk(blknum),
105 : #[cfg(test)]
106 62748 : VirtualFile(r) => r.read_blk(blknum, ctx).await,
107 : }
108 9540452 : }
109 : }
110 :
111 : ///
112 : /// A "cursor" for efficiently reading multiple pages from a BlockReader
113 : ///
114 : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
115 : /// that in many cases you can use a BlockCursor as a drop-in replacement for
116 : /// the underlying BlockReader. For example:
117 : ///
118 : /// ```no_run
119 : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
120 : /// # use pageserver::context::RequestContext;
121 : /// # let reader: FileBlockReader = unimplemented!("stub");
122 : /// # let ctx: RequestContext = unimplemented!("stub");
123 : /// let cursor = reader.block_cursor();
124 : /// let buf = cursor.read_blk(1, &ctx);
125 : /// // do stuff with 'buf'
126 : /// let buf = cursor.read_blk(2, &ctx);
127 : /// // do stuff with 'buf'
128 : /// ```
129 : ///
130 : pub struct BlockCursor<'a> {
131 : pub(super) read_compressed: bool,
132 : reader: BlockReaderRef<'a>,
133 : }
134 :
135 : impl<'a> BlockCursor<'a> {
136 2467209 : pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
137 2467209 : Self::new_with_compression(reader, false)
138 2467209 : }
139 4193572 : pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
140 4193572 : BlockCursor {
141 4193572 : read_compressed,
142 4193572 : reader,
143 4193572 : }
144 4193572 : }
145 : // Needed by cli
146 0 : pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
147 0 : BlockCursor {
148 0 : read_compressed: false,
149 0 : reader: BlockReaderRef::FileBlockReader(reader),
150 0 : }
151 0 : }
152 :
153 : /// Read a block.
154 : ///
155 : /// Returns a "lease" object that can be used to
156 : /// access to the contents of the page. (For the page cache, the
157 : /// lease object represents a lock on the buffer.)
158 : #[inline(always)]
159 9540452 : pub async fn read_blk(
160 9540452 : &self,
161 9540452 : blknum: u32,
162 9540452 : ctx: &RequestContext,
163 9540452 : ) -> Result<BlockLease, std::io::Error> {
164 9540452 : self.reader.read_blk(blknum, ctx).await
165 9540452 : }
166 : }
167 :
168 : /// An adapter for reading a (virtual) file using the page cache.
169 : ///
170 : /// The file is assumed to be immutable. This doesn't provide any functions
171 : /// for modifying the file, nor for invalidating the cache if it is modified.
172 : #[derive(Clone)]
173 : pub struct FileBlockReader<'a> {
174 : pub file: &'a VirtualFile,
175 :
176 : /// Unique ID of this file, used as key in the page cache.
177 : file_id: page_cache::FileId,
178 :
179 : compressed_reads: bool,
180 : }
181 :
182 : impl<'a> FileBlockReader<'a> {
183 1615147 : pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
184 1615147 : FileBlockReader {
185 1615147 : file_id,
186 1615147 : file,
187 1615147 : compressed_reads: true,
188 1615147 : }
189 1615147 : }
190 :
191 : /// Read a page from the underlying file into given buffer.
192 185181 : async fn fill_buffer(
193 185181 : &self,
194 185181 : buf: PageWriteGuard<'static>,
195 185181 : blkno: u32,
196 185181 : ctx: &RequestContext,
197 185181 : ) -> Result<PageWriteGuard<'static>, std::io::Error> {
198 185181 : assert!(buf.len() == PAGE_SZ);
199 185181 : self.file
200 185181 : .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
201 185181 : .await
202 185181 : }
203 : /// Read a block.
204 : ///
205 : /// Returns a "lease" object that can be used to
206 : /// access to the contents of the page. (For the page cache, the
207 : /// lease object represents a lock on the buffer.)
208 3384956 : pub async fn read_blk<'b>(
209 3384956 : &self,
210 3384956 : blknum: u32,
211 3384956 : ctx: &RequestContext,
212 3384956 : ) -> Result<BlockLease<'b>, std::io::Error> {
213 3384956 : let cache = page_cache::get();
214 3384956 : match cache
215 3384956 : .read_immutable_buf(self.file_id, blknum, ctx)
216 3384956 : .await
217 3384956 : .map_err(|e| std::io::Error::other(format!("Failed to read immutable buf: {e:#}")))?
218 : {
219 3199775 : ReadBufResult::Found(guard) => Ok(guard.into()),
220 185181 : ReadBufResult::NotFound(write_guard) => {
221 : // Read the page from disk into the buffer
222 185181 : let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
223 185181 : Ok(write_guard.mark_valid().into())
224 : }
225 : }
226 3384956 : }
227 : }
228 :
229 : impl BlockReader for FileBlockReader<'_> {
230 1726267 : fn block_cursor(&self) -> BlockCursor<'_> {
231 1726267 : BlockCursor::new_with_compression(
232 1726267 : BlockReaderRef::FileBlockReader(self),
233 1726267 : self.compressed_reads,
234 1726267 : )
235 1726267 : }
236 : }
237 :
238 : ///
239 : /// Trait for block-oriented output
240 : ///
241 : pub trait BlockWriter {
242 : ///
243 : /// Write a page to the underlying storage.
244 : ///
245 : /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
246 : /// written to.
247 : ///
248 : fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error>;
249 : }
250 :
251 : ///
252 : /// A simple in-memory buffer of blocks.
253 : ///
254 : pub struct BlockBuf {
255 : pub blocks: Vec<IoBuffer>,
256 : }
257 : impl BlockWriter for BlockBuf {
258 84683 : fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error> {
259 84683 : assert!(buf.len() == PAGE_SZ);
260 84683 : let blknum = self.blocks.len();
261 84683 : self.blocks.push(buf);
262 84683 : Ok(blknum as u32)
263 84683 : }
264 : }
265 :
266 : impl BlockBuf {
267 12600 : pub fn new() -> Self {
268 12600 : BlockBuf { blocks: Vec::new() }
269 12600 : }
270 :
271 12285996 : pub fn size(&self) -> u64 {
272 12285996 : (self.blocks.len() * PAGE_SZ) as u64
273 12285996 : }
274 : }
275 : impl Default for BlockBuf {
276 0 : fn default() -> Self {
277 0 : Self::new()
278 0 : }
279 : }
|