Line data Source code
1 : //!
2 : //! Low-level Block-oriented I/O functions
3 : //!
4 :
5 : use std::ops::Deref;
6 :
7 : use bytes::Bytes;
8 :
9 : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
10 : use crate::context::RequestContext;
11 : use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult};
12 : #[cfg(test)]
13 : use crate::virtual_file::IoBufferMut;
14 : use crate::virtual_file::VirtualFile;
15 :
16 : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
17 : /// blocks, using the page cache
18 : ///
19 : /// There are currently two implementations: EphemeralFile, and FileBlockReader
20 : /// below.
21 : pub trait BlockReader {
22 : ///
23 : /// Create a new "cursor" for reading from this reader.
24 : ///
25 : /// A cursor caches the last accessed page, allowing for faster
26 : /// access if the same block is accessed repeatedly.
27 : fn block_cursor(&self) -> BlockCursor<'_>;
28 : }
29 :
30 : impl<B> BlockReader for &B
31 : where
32 : B: BlockReader,
33 : {
34 40 : fn block_cursor(&self) -> BlockCursor<'_> {
35 40 : (*self).block_cursor()
36 40 : }
37 : }
38 :
39 : /// Reference to an in-memory copy of an immutable on-disk block.
40 : pub enum BlockLease<'a> {
41 : PageReadGuard(PageReadGuard<'static>),
42 : EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
43 : Slice(&'a [u8; PAGE_SZ]),
44 : #[cfg(test)]
45 : Arc(std::sync::Arc<[u8; PAGE_SZ]>),
46 : #[cfg(test)]
47 : IoBufferMut(IoBufferMut),
48 : }
49 :
50 : impl From<PageReadGuard<'static>> for BlockLease<'static> {
51 974273 : fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
52 974273 : BlockLease::PageReadGuard(value)
53 974273 : }
54 : }
55 :
56 : #[cfg(test)]
57 : impl From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'_> {
58 2033394 : fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
59 2033394 : BlockLease::Arc(value)
60 2033394 : }
61 : }
62 :
63 : impl Deref for BlockLease<'_> {
64 : type Target = [u8; PAGE_SZ];
65 :
66 3078411 : fn deref(&self) -> &Self::Target {
67 3078411 : match self {
68 974433 : BlockLease::PageReadGuard(v) => v.deref(),
69 0 : BlockLease::EphemeralFileMutableTail(v) => v,
70 0 : BlockLease::Slice(v) => v,
71 : #[cfg(test)]
72 2033394 : BlockLease::Arc(v) => v.deref(),
73 : #[cfg(test)]
74 70584 : BlockLease::IoBufferMut(v) => {
75 70584 : TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
76 : }
77 : }
78 3078411 : }
79 : }
80 :
81 : /// Provides the ability to read blocks from different sources,
82 : /// similar to using traits for this purpose.
83 : ///
84 : /// Unlike traits, we also support the read function to be async though.
85 : pub(crate) enum BlockReaderRef<'a> {
86 : FileBlockReader(&'a FileBlockReader<'a>),
87 : Adapter(Adapter<&'a DeltaLayerInner>),
88 : #[cfg(test)]
89 : TestDisk(&'a super::disk_btree::tests::TestDisk),
90 : #[cfg(test)]
91 : VirtualFile(&'a VirtualFile),
92 : }
93 :
94 : impl BlockReaderRef<'_> {
95 : #[inline(always)]
96 3047035 : async fn read_blk(
97 3047035 : &self,
98 3047035 : blknum: u32,
99 3047035 : ctx: &RequestContext,
100 3047035 : ) -> Result<BlockLease, std::io::Error> {
101 : use BlockReaderRef::*;
102 3047035 : match self {
103 971793 : FileBlockReader(r) => r.read_blk(blknum, ctx).await,
104 16 : Adapter(r) => r.read_blk(blknum, ctx).await,
105 : #[cfg(test)]
106 2033394 : TestDisk(r) => r.read_blk(blknum),
107 : #[cfg(test)]
108 41832 : VirtualFile(r) => r.read_blk(blknum, ctx).await,
109 : }
110 3047035 : }
111 : }
112 :
113 : ///
114 : /// A "cursor" for efficiently reading multiple pages from a BlockReader
115 : ///
116 : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
117 : /// that in many cases you can use a BlockCursor as a drop-in replacement for
118 : /// the underlying BlockReader. For example:
119 : ///
120 : /// ```no_run
121 : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
122 : /// # use pageserver::context::RequestContext;
123 : /// # let reader: FileBlockReader = unimplemented!("stub");
124 : /// # let ctx: RequestContext = unimplemented!("stub");
125 : /// let cursor = reader.block_cursor();
126 : /// let buf = cursor.read_blk(1, &ctx);
127 : /// // do stuff with 'buf'
128 : /// let buf = cursor.read_blk(2, &ctx);
129 : /// // do stuff with 'buf'
130 : /// ```
131 : ///
132 : pub struct BlockCursor<'a> {
133 : pub(super) read_compressed: bool,
134 : reader: BlockReaderRef<'a>,
135 : }
136 :
137 : impl<'a> BlockCursor<'a> {
138 822392 : pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
139 822392 : Self::new_with_compression(reader, false)
140 822392 : }
141 1306126 : pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
142 1306126 : BlockCursor {
143 1306126 : read_compressed,
144 1306126 : reader,
145 1306126 : }
146 1306126 : }
147 : // Needed by cli
148 0 : pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
149 0 : BlockCursor {
150 0 : read_compressed: false,
151 0 : reader: BlockReaderRef::FileBlockReader(reader),
152 0 : }
153 0 : }
154 :
155 : /// Read a block.
156 : ///
157 : /// Returns a "lease" object that can be used to
158 : /// access to the contents of the page. (For the page cache, the
159 : /// lease object represents a lock on the buffer.)
160 : #[inline(always)]
161 3047035 : pub async fn read_blk(
162 3047035 : &self,
163 3047035 : blknum: u32,
164 3047035 : ctx: &RequestContext,
165 3047035 : ) -> Result<BlockLease, std::io::Error> {
166 3047035 : self.reader.read_blk(blknum, ctx).await
167 3047035 : }
168 : }
169 :
170 : /// An adapter for reading a (virtual) file using the page cache.
171 : ///
172 : /// The file is assumed to be immutable. This doesn't provide any functions
173 : /// for modifying the file, nor for invalidating the cache if it is modified.
174 : #[derive(Clone)]
175 : pub struct FileBlockReader<'a> {
176 : pub file: &'a VirtualFile,
177 :
178 : /// Unique ID of this file, used as key in the page cache.
179 : file_id: page_cache::FileId,
180 :
181 : compressed_reads: bool,
182 : }
183 :
184 : impl<'a> FileBlockReader<'a> {
185 485538 : pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
186 485538 : FileBlockReader {
187 485538 : file_id,
188 485538 : file,
189 485538 : compressed_reads: true,
190 485538 : }
191 485538 : }
192 :
193 : /// Read a page from the underlying file into given buffer.
194 63468 : async fn fill_buffer(
195 63468 : &self,
196 63468 : buf: PageWriteGuard<'static>,
197 63468 : blkno: u32,
198 63468 : ctx: &RequestContext,
199 63468 : ) -> Result<PageWriteGuard<'static>, std::io::Error> {
200 63468 : assert!(buf.len() == PAGE_SZ);
201 63468 : self.file
202 63468 : .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
203 63468 : .await
204 63468 : }
205 : /// Read a block.
206 : ///
207 : /// Returns a "lease" object that can be used to
208 : /// access to the contents of the page. (For the page cache, the
209 : /// lease object represents a lock on the buffer.)
210 974273 : pub async fn read_blk<'b>(
211 974273 : &self,
212 974273 : blknum: u32,
213 974273 : ctx: &RequestContext,
214 974273 : ) -> Result<BlockLease<'b>, std::io::Error> {
215 974273 : let cache = page_cache::get();
216 974273 : match cache
217 974273 : .read_immutable_buf(self.file_id, blknum, ctx)
218 974273 : .await
219 974273 : .map_err(|e| {
220 0 : std::io::Error::new(
221 0 : std::io::ErrorKind::Other,
222 0 : format!("Failed to read immutable buf: {e:#}"),
223 0 : )
224 974273 : })? {
225 910805 : ReadBufResult::Found(guard) => Ok(guard.into()),
226 63468 : ReadBufResult::NotFound(write_guard) => {
227 : // Read the page from disk into the buffer
228 63468 : let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
229 63468 : Ok(write_guard.mark_valid().into())
230 : }
231 : }
232 974273 : }
233 : }
234 :
235 : impl BlockReader for FileBlockReader<'_> {
236 483670 : fn block_cursor(&self) -> BlockCursor<'_> {
237 483670 : BlockCursor::new_with_compression(
238 483670 : BlockReaderRef::FileBlockReader(self),
239 483670 : self.compressed_reads,
240 483670 : )
241 483670 : }
242 : }
243 :
244 : ///
245 : /// Trait for block-oriented output
246 : ///
247 : pub trait BlockWriter {
248 : ///
249 : /// Write a page to the underlying storage.
250 : ///
251 : /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
252 : /// written to.
253 : ///
254 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
255 : }
256 :
257 : ///
258 : /// A simple in-memory buffer of blocks.
259 : ///
260 : pub struct BlockBuf {
261 : pub blocks: Vec<Bytes>,
262 : }
263 : impl BlockWriter for BlockBuf {
264 28960 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
265 28960 : assert!(buf.len() == PAGE_SZ);
266 28960 : let blknum = self.blocks.len();
267 28960 : self.blocks.push(buf);
268 28960 : Ok(blknum as u32)
269 28960 : }
270 : }
271 :
272 : impl BlockBuf {
273 4140 : pub fn new() -> Self {
274 4140 : BlockBuf { blocks: Vec::new() }
275 4140 : }
276 :
277 4095304 : pub fn size(&self) -> u64 {
278 4095304 : (self.blocks.len() * PAGE_SZ) as u64
279 4095304 : }
280 : }
281 : impl Default for BlockBuf {
282 0 : fn default() -> Self {
283 0 : Self::new()
284 0 : }
285 : }
|