Line data Source code
1 : //!
2 : //! Low-level Block-oriented I/O functions
3 : //!
4 :
5 : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
6 : use crate::context::RequestContext;
7 : use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
8 : #[cfg(test)]
9 : use crate::virtual_file::IoBufferMut;
10 : use crate::virtual_file::VirtualFile;
11 : use bytes::Bytes;
12 : use std::ops::Deref;
13 :
14 : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
15 : /// blocks, using the page cache
16 : ///
17 : /// There are currently two implementations: EphemeralFile, and FileBlockReader
18 : /// below.
19 : pub trait BlockReader {
20 : ///
21 : /// Create a new "cursor" for reading from this reader.
22 : ///
23 : /// A cursor caches the last accessed page, allowing for faster
24 : /// access if the same block is accessed repeatedly.
25 : fn block_cursor(&self) -> BlockCursor<'_>;
26 : }
27 :
28 : impl<B> BlockReader for &B
29 : where
30 : B: BlockReader,
31 : {
32 40 : fn block_cursor(&self) -> BlockCursor<'_> {
33 40 : (*self).block_cursor()
34 40 : }
35 : }
36 :
37 : /// Reference to an in-memory copy of an immutable on-disk block.
38 : pub enum BlockLease<'a> {
39 : PageReadGuard(PageReadGuard<'static>),
40 : EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
41 : Slice(&'a [u8; PAGE_SZ]),
42 : #[cfg(test)]
43 : Arc(std::sync::Arc<[u8; PAGE_SZ]>),
44 : #[cfg(test)]
45 : IoBufferMut(IoBufferMut),
46 : }
47 :
48 : impl From<PageReadGuard<'static>> for BlockLease<'static> {
49 972296 : fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
50 972296 : BlockLease::PageReadGuard(value)
51 972296 : }
52 : }
53 :
54 : #[cfg(test)]
55 : impl From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'_> {
56 2033740 : fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
57 2033740 : BlockLease::Arc(value)
58 2033740 : }
59 : }
60 :
61 : impl Deref for BlockLease<'_> {
62 : type Target = [u8; PAGE_SZ];
63 :
64 3076780 : fn deref(&self) -> &Self::Target {
65 3076780 : match self {
66 972456 : BlockLease::PageReadGuard(v) => v.deref(),
67 0 : BlockLease::EphemeralFileMutableTail(v) => v,
68 0 : BlockLease::Slice(v) => v,
69 : #[cfg(test)]
70 2033740 : BlockLease::Arc(v) => v.deref(),
71 : #[cfg(test)]
72 70584 : BlockLease::IoBufferMut(v) => {
73 70584 : TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
74 : }
75 : }
76 3076780 : }
77 : }
78 :
79 : /// Provides the ability to read blocks from different sources,
80 : /// similar to using traits for this purpose.
81 : ///
82 : /// Unlike traits, we also support the read function to be async though.
83 : pub(crate) enum BlockReaderRef<'a> {
84 : FileBlockReader(&'a FileBlockReader<'a>),
85 : Adapter(Adapter<&'a DeltaLayerInner>),
86 : #[cfg(test)]
87 : TestDisk(&'a super::disk_btree::tests::TestDisk),
88 : #[cfg(test)]
89 : VirtualFile(&'a VirtualFile),
90 : }
91 :
92 : impl BlockReaderRef<'_> {
93 : #[inline(always)]
94 3045412 : async fn read_blk(
95 3045412 : &self,
96 3045412 : blknum: u32,
97 3045412 : ctx: &RequestContext,
98 3045412 : ) -> Result<BlockLease, std::io::Error> {
99 : use BlockReaderRef::*;
100 3045412 : match self {
101 969824 : FileBlockReader(r) => r.read_blk(blknum, ctx).await,
102 16 : Adapter(r) => r.read_blk(blknum, ctx).await,
103 : #[cfg(test)]
104 2033740 : TestDisk(r) => r.read_blk(blknum),
105 : #[cfg(test)]
106 41832 : VirtualFile(r) => r.read_blk(blknum, ctx).await,
107 : }
108 3045412 : }
109 : }
110 :
111 : ///
112 : /// A "cursor" for efficiently reading multiple pages from a BlockReader
113 : ///
114 : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
115 : /// that in many cases you can use a BlockCursor as a drop-in replacement for
116 : /// the underlying BlockReader. For example:
117 : ///
118 : /// ```no_run
119 : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
120 : /// # use pageserver::context::RequestContext;
121 : /// # let reader: FileBlockReader = unimplemented!("stub");
122 : /// # let ctx: RequestContext = unimplemented!("stub");
123 : /// let cursor = reader.block_cursor();
124 : /// let buf = cursor.read_blk(1, &ctx);
125 : /// // do stuff with 'buf'
126 : /// let buf = cursor.read_blk(2, &ctx);
127 : /// // do stuff with 'buf'
128 : /// ```
129 : ///
130 : pub struct BlockCursor<'a> {
131 : pub(super) read_compressed: bool,
132 : reader: BlockReaderRef<'a>,
133 : }
134 :
135 : impl<'a> BlockCursor<'a> {
136 822563 : pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
137 822563 : Self::new_with_compression(reader, false)
138 822563 : }
139 1305359 : pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
140 1305359 : BlockCursor {
141 1305359 : read_compressed,
142 1305359 : reader,
143 1305359 : }
144 1305359 : }
145 : // Needed by cli
146 0 : pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
147 0 : BlockCursor {
148 0 : read_compressed: false,
149 0 : reader: BlockReaderRef::FileBlockReader(reader),
150 0 : }
151 0 : }
152 :
153 : /// Read a block.
154 : ///
155 : /// Returns a "lease" object that can be used to
156 : /// access to the contents of the page. (For the page cache, the
157 : /// lease object represents a lock on the buffer.)
158 : #[inline(always)]
159 3045412 : pub async fn read_blk(
160 3045412 : &self,
161 3045412 : blknum: u32,
162 3045412 : ctx: &RequestContext,
163 3045412 : ) -> Result<BlockLease, std::io::Error> {
164 3045412 : self.reader.read_blk(blknum, ctx).await
165 3045412 : }
166 : }
167 :
168 : /// An adapter for reading a (virtual) file using the page cache.
169 : ///
170 : /// The file is assumed to be immutable. This doesn't provide any functions
171 : /// for modifying the file, nor for invalidating the cache if it is modified.
172 : #[derive(Clone)]
173 : pub struct FileBlockReader<'a> {
174 : pub file: &'a VirtualFile,
175 :
176 : /// Unique ID of this file, used as key in the page cache.
177 : file_id: page_cache::FileId,
178 :
179 : compressed_reads: bool,
180 : }
181 :
182 : impl<'a> FileBlockReader<'a> {
183 484600 : pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
184 484600 : FileBlockReader {
185 484600 : file_id,
186 484600 : file,
187 484600 : compressed_reads: true,
188 484600 : }
189 484600 : }
190 :
191 : /// Read a page from the underlying file into given buffer.
192 63357 : async fn fill_buffer(
193 63357 : &self,
194 63357 : buf: PageWriteGuard<'static>,
195 63357 : blkno: u32,
196 63357 : ctx: &RequestContext,
197 63357 : ) -> Result<PageWriteGuard<'static>, std::io::Error> {
198 63357 : assert!(buf.len() == PAGE_SZ);
199 63357 : self.file
200 63357 : .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
201 63357 : .await
202 63357 : }
203 : /// Read a block.
204 : ///
205 : /// Returns a "lease" object that can be used to
206 : /// access to the contents of the page. (For the page cache, the
207 : /// lease object represents a lock on the buffer.)
208 972296 : pub async fn read_blk<'b>(
209 972296 : &self,
210 972296 : blknum: u32,
211 972296 : ctx: &RequestContext,
212 972296 : ) -> Result<BlockLease<'b>, std::io::Error> {
213 972296 : let cache = page_cache::get();
214 972296 : match cache
215 972296 : .read_immutable_buf(self.file_id, blknum, ctx)
216 972296 : .await
217 972296 : .map_err(|e| {
218 0 : std::io::Error::new(
219 0 : std::io::ErrorKind::Other,
220 0 : format!("Failed to read immutable buf: {e:#}"),
221 0 : )
222 972296 : })? {
223 908939 : ReadBufResult::Found(guard) => Ok(guard.into()),
224 63357 : ReadBufResult::NotFound(write_guard) => {
225 : // Read the page from disk into the buffer
226 63357 : let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
227 63357 : Ok(write_guard.mark_valid().into())
228 : }
229 : }
230 972296 : }
231 : }
232 :
233 : impl BlockReader for FileBlockReader<'_> {
234 482732 : fn block_cursor(&self) -> BlockCursor<'_> {
235 482732 : BlockCursor::new_with_compression(
236 482732 : BlockReaderRef::FileBlockReader(self),
237 482732 : self.compressed_reads,
238 482732 : )
239 482732 : }
240 : }
241 :
242 : ///
243 : /// Trait for block-oriented output
244 : ///
245 : pub trait BlockWriter {
246 : ///
247 : /// Write a page to the underlying storage.
248 : ///
249 : /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
250 : /// written to.
251 : ///
252 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
253 : }
254 :
255 : ///
256 : /// A simple in-memory buffer of blocks.
257 : ///
258 : pub struct BlockBuf {
259 : pub blocks: Vec<Bytes>,
260 : }
261 : impl BlockWriter for BlockBuf {
262 28938 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
263 28938 : assert!(buf.len() == PAGE_SZ);
264 28938 : let blknum = self.blocks.len();
265 28938 : self.blocks.push(buf);
266 28938 : Ok(blknum as u32)
267 28938 : }
268 : }
269 :
270 : impl BlockBuf {
271 4116 : pub fn new() -> Self {
272 4116 : BlockBuf { blocks: Vec::new() }
273 4116 : }
274 :
275 4095304 : pub fn size(&self) -> u64 {
276 4095304 : (self.blocks.len() * PAGE_SZ) as u64
277 4095304 : }
278 : }
279 : impl Default for BlockBuf {
280 0 : fn default() -> Self {
281 0 : Self::new()
282 0 : }
283 : }
|