Line data Source code
1 : //!
2 : //! Low-level Block-oriented I/O functions
3 : //!
4 :
5 : use std::ops::Deref;
6 :
7 : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
8 : use crate::context::RequestContext;
9 : use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult};
10 : #[cfg(test)]
11 : use crate::virtual_file::IoBufferMut;
12 : use crate::virtual_file::{IoBuffer, VirtualFile};
13 :
14 : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
15 : /// blocks, using the page cache
16 : ///
17 : /// There are currently two implementations: EphemeralFile, and FileBlockReader
18 : /// below.
19 : pub trait BlockReader {
20 : ///
21 : /// Create a new "cursor" for reading from this reader.
22 : ///
23 : /// A cursor caches the last accessed page, allowing for faster
24 : /// access if the same block is accessed repeatedly.
25 : fn block_cursor(&self) -> BlockCursor<'_>;
26 : }
27 :
28 : impl<B> BlockReader for &B
29 : where
30 : B: BlockReader,
31 : {
32 10 : fn block_cursor(&self) -> BlockCursor<'_> {
33 10 : (*self).block_cursor()
34 10 : }
35 : }
36 :
37 : /// Reference to an in-memory copy of an immutable on-disk block.
38 : pub enum BlockLease<'a> {
39 : PageReadGuard(PageReadGuard<'static>),
40 : EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
41 : Slice(&'a [u8; PAGE_SZ]),
42 : #[cfg(test)]
43 : Arc(std::sync::Arc<[u8; PAGE_SZ]>),
44 : #[cfg(test)]
45 : IoBufferMut(IoBufferMut),
46 : }
47 :
48 : impl From<PageReadGuard<'static>> for BlockLease<'static> {
49 295610 : fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
50 295610 : BlockLease::PageReadGuard(value)
51 295610 : }
52 : }
53 :
54 : #[cfg(test)]
55 : impl From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'_> {
56 508346 : fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
57 508346 : BlockLease::Arc(value)
58 508346 : }
59 : }
60 :
61 : impl Deref for BlockLease<'_> {
62 : type Target = [u8; PAGE_SZ];
63 :
64 812819 : fn deref(&self) -> &Self::Target {
65 812819 : match self {
66 295650 : BlockLease::PageReadGuard(v) => v.deref(),
67 0 : BlockLease::EphemeralFileMutableTail(v) => v,
68 0 : BlockLease::Slice(v) => v,
69 : #[cfg(test)]
70 508346 : BlockLease::Arc(v) => v.deref(),
71 : #[cfg(test)]
72 8823 : BlockLease::IoBufferMut(v) => {
73 8823 : TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
74 : }
75 : }
76 812819 : }
77 : }
78 :
79 : /// Provides the ability to read blocks from different sources,
80 : /// similar to using traits for this purpose.
81 : ///
82 : /// Unlike traits, we also support the read function to be async though.
83 : pub(crate) enum BlockReaderRef<'a> {
84 : FileBlockReader(&'a FileBlockReader<'a>),
85 : Adapter(Adapter<&'a DeltaLayerInner>),
86 : #[cfg(test)]
87 : TestDisk(&'a super::disk_btree::tests::TestDisk),
88 : #[cfg(test)]
89 : VirtualFile(&'a VirtualFile),
90 : }
91 :
92 : impl BlockReaderRef<'_> {
93 : #[inline(always)]
94 808550 : async fn read_blk(
95 808550 : &self,
96 808550 : blknum: u32,
97 808550 : ctx: &RequestContext,
98 808550 : ) -> Result<BlockLease, std::io::Error> {
99 : use BlockReaderRef::*;
100 808550 : match self {
101 294971 : FileBlockReader(r) => r.read_blk(blknum, ctx).await,
102 4 : Adapter(r) => r.read_blk(blknum, ctx).await,
103 : #[cfg(test)]
104 508346 : TestDisk(r) => r.read_blk(blknum),
105 : #[cfg(test)]
106 5229 : VirtualFile(r) => r.read_blk(blknum, ctx).await,
107 : }
108 808550 : }
109 : }
110 :
111 : ///
112 : /// A "cursor" for efficiently reading multiple pages from a BlockReader
113 : ///
114 : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
115 : /// that in many cases you can use a BlockCursor as a drop-in replacement for
116 : /// the underlying BlockReader. For example:
117 : ///
118 : /// ```no_run
119 : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
120 : /// # use pageserver::context::RequestContext;
121 : /// # let reader: FileBlockReader = unimplemented!("stub");
122 : /// # let ctx: RequestContext = unimplemented!("stub");
123 : /// let cursor = reader.block_cursor();
124 : /// let buf = cursor.read_blk(1, &ctx);
125 : /// // do stuff with 'buf'
126 : /// let buf = cursor.read_blk(2, &ctx);
127 : /// // do stuff with 'buf'
128 : /// ```
129 : ///
130 : pub struct BlockCursor<'a> {
131 : pub(super) read_compressed: bool,
132 : reader: BlockReaderRef<'a>,
133 : }
134 :
135 : impl<'a> BlockCursor<'a> {
136 205597 : pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
137 205597 : Self::new_with_compression(reader, false)
138 205597 : }
139 355066 : pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
140 355066 : BlockCursor {
141 355066 : read_compressed,
142 355066 : reader,
143 355066 : }
144 355066 : }
145 : // Needed by cli
146 0 : pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
147 0 : BlockCursor {
148 0 : read_compressed: false,
149 0 : reader: BlockReaderRef::FileBlockReader(reader),
150 0 : }
151 0 : }
152 :
153 : /// Read a block.
154 : ///
155 : /// Returns a "lease" object that can be used to
156 : /// access to the contents of the page. (For the page cache, the
157 : /// lease object represents a lock on the buffer.)
158 : #[inline(always)]
159 808550 : pub async fn read_blk(
160 808550 : &self,
161 808550 : blknum: u32,
162 808550 : ctx: &RequestContext,
163 808550 : ) -> Result<BlockLease, std::io::Error> {
164 808550 : self.reader.read_blk(blknum, ctx).await
165 808550 : }
166 : }
167 :
168 : /// An adapter for reading a (virtual) file using the page cache.
169 : ///
170 : /// The file is assumed to be immutable. This doesn't provide any functions
171 : /// for modifying the file, nor for invalidating the cache if it is modified.
172 : #[derive(Clone)]
173 : pub struct FileBlockReader<'a> {
174 : pub file: &'a VirtualFile,
175 :
176 : /// Unique ID of this file, used as key in the page cache.
177 : file_id: page_cache::FileId,
178 :
179 : compressed_reads: bool,
180 : }
181 :
182 : impl<'a> FileBlockReader<'a> {
183 140165 : pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
184 140165 : FileBlockReader {
185 140165 : file_id,
186 140165 : file,
187 140165 : compressed_reads: true,
188 140165 : }
189 140165 : }
190 :
191 : /// Read a page from the underlying file into given buffer.
192 17319 : async fn fill_buffer(
193 17319 : &self,
194 17319 : buf: PageWriteGuard<'static>,
195 17319 : blkno: u32,
196 17319 : ctx: &RequestContext,
197 17319 : ) -> Result<PageWriteGuard<'static>, std::io::Error> {
198 17319 : assert!(buf.len() == PAGE_SZ);
199 17319 : self.file
200 17319 : .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
201 17319 : .await
202 17319 : }
203 : /// Read a block.
204 : ///
205 : /// Returns a "lease" object that can be used to
206 : /// access to the contents of the page. (For the page cache, the
207 : /// lease object represents a lock on the buffer.)
208 295610 : pub async fn read_blk<'b>(
209 295610 : &self,
210 295610 : blknum: u32,
211 295610 : ctx: &RequestContext,
212 295610 : ) -> Result<BlockLease<'b>, std::io::Error> {
213 295610 : let cache = page_cache::get();
214 295610 : match cache
215 295610 : .read_immutable_buf(self.file_id, blknum, ctx)
216 295610 : .await
217 295610 : .map_err(|e| std::io::Error::other(format!("Failed to read immutable buf: {e:#}")))?
218 : {
219 278291 : ReadBufResult::Found(guard) => Ok(guard.into()),
220 17319 : ReadBufResult::NotFound(write_guard) => {
221 : // Read the page from disk into the buffer
222 17319 : let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
223 17319 : Ok(write_guard.mark_valid().into())
224 : }
225 : }
226 295610 : }
227 : }
228 :
229 : impl BlockReader for FileBlockReader<'_> {
230 149461 : fn block_cursor(&self) -> BlockCursor<'_> {
231 149461 : BlockCursor::new_with_compression(
232 149461 : BlockReaderRef::FileBlockReader(self),
233 149461 : self.compressed_reads,
234 : )
235 149461 : }
236 : }
237 :
238 : ///
239 : /// Trait for block-oriented output
240 : ///
241 : pub trait BlockWriter {
242 : ///
243 : /// Write a page to the underlying storage.
244 : ///
245 : /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
246 : /// written to.
247 : ///
248 : fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error>;
249 : }
250 :
251 : ///
252 : /// A simple in-memory buffer of blocks.
253 : ///
254 : pub struct BlockBuf {
255 : pub blocks: Vec<IoBuffer>,
256 : }
257 : impl BlockWriter for BlockBuf {
258 7073 : fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error> {
259 7073 : assert!(buf.len() == PAGE_SZ);
260 7073 : let blknum = self.blocks.len();
261 7073 : self.blocks.push(buf);
262 7073 : Ok(blknum as u32)
263 7073 : }
264 : }
265 :
266 : impl BlockBuf {
267 1117 : pub fn new() -> Self {
268 1117 : BlockBuf { blocks: Vec::new() }
269 1117 : }
270 :
271 1029527 : pub fn size(&self) -> u64 {
272 1029527 : (self.blocks.len() * PAGE_SZ) as u64
273 1029527 : }
274 : }
275 : impl Default for BlockBuf {
276 0 : fn default() -> Self {
277 0 : Self::new()
278 0 : }
279 : }
|