Line data Source code
1 : //!
2 : //! Low-level Block-oriented I/O functions
3 : //!
4 :
5 : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
6 : use crate::context::RequestContext;
7 : use crate::page_cache::{self, FileId, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
8 : #[cfg(test)]
9 : use crate::virtual_file::IoBufferMut;
10 : use crate::virtual_file::VirtualFile;
11 : use bytes::Bytes;
12 : use std::ops::Deref;
13 :
14 : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
15 : /// blocks, using the page cache
16 : ///
17 : /// There are currently two implementations: EphemeralFile, and FileBlockReader
18 : /// below.
19 : pub trait BlockReader {
20 : ///
21 : /// Create a new "cursor" for reading from this reader.
22 : ///
23 : /// A cursor caches the last accessed page, allowing for faster
24 : /// access if the same block is accessed repeatedly.
25 : fn block_cursor(&self) -> BlockCursor<'_>;
26 : }
27 :
28 : impl<B> BlockReader for &B
29 : where
30 : B: BlockReader,
31 : {
32 20 : fn block_cursor(&self) -> BlockCursor<'_> {
33 20 : (*self).block_cursor()
34 20 : }
35 : }
36 :
37 : /// Reference to an in-memory copy of an immutable on-disk block.
38 : pub enum BlockLease<'a> {
39 : PageReadGuard(PageReadGuard<'static>),
40 : EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
41 : Slice(&'a [u8; PAGE_SZ]),
42 : #[cfg(test)]
43 : Arc(std::sync::Arc<[u8; PAGE_SZ]>),
44 : #[cfg(test)]
45 : IoBufferMut(IoBufferMut),
46 : }
47 :
48 : impl From<PageReadGuard<'static>> for BlockLease<'static> {
49 485269 : fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
50 485269 : BlockLease::PageReadGuard(value)
51 485269 : }
52 : }
53 :
54 : #[cfg(test)]
55 : impl From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'_> {
56 1016776 : fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
57 1016776 : BlockLease::Arc(value)
58 1016776 : }
59 : }
60 :
61 : impl Deref for BlockLease<'_> {
62 : type Target = [u8; PAGE_SZ];
63 :
64 1537417 : fn deref(&self) -> &Self::Target {
65 1537417 : match self {
66 485349 : BlockLease::PageReadGuard(v) => v.deref(),
67 0 : BlockLease::EphemeralFileMutableTail(v) => v,
68 0 : BlockLease::Slice(v) => v,
69 : #[cfg(test)]
70 1016776 : BlockLease::Arc(v) => v.deref(),
71 : #[cfg(test)]
72 35292 : BlockLease::IoBufferMut(v) => {
73 35292 : TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
74 : }
75 : }
76 1537417 : }
77 : }
78 :
79 : /// Provides the ability to read blocks from different sources,
80 : /// similar to using traits for this purpose.
81 : ///
82 : /// Unlike traits, we also support the read function to be async though.
83 : pub(crate) enum BlockReaderRef<'a> {
84 : FileBlockReader(&'a FileBlockReader<'a>),
85 : Adapter(Adapter<&'a DeltaLayerInner>),
86 : #[cfg(test)]
87 : TestDisk(&'a super::disk_btree::tests::TestDisk),
88 : #[cfg(test)]
89 : VirtualFile(&'a VirtualFile),
90 : }
91 :
92 : impl<'a> BlockReaderRef<'a> {
93 : #[inline(always)]
94 1521771 : async fn read_blk(
95 1521771 : &self,
96 1521771 : blknum: u32,
97 1521771 : ctx: &RequestContext,
98 1521771 : ) -> Result<BlockLease, std::io::Error> {
99 : use BlockReaderRef::*;
100 1521771 : match self {
101 484071 : FileBlockReader(r) => r.read_blk(blknum, ctx).await,
102 8 : Adapter(r) => r.read_blk(blknum, ctx).await,
103 : #[cfg(test)]
104 1016776 : TestDisk(r) => r.read_blk(blknum),
105 : #[cfg(test)]
106 20916 : VirtualFile(r) => r.read_blk(blknum, ctx).await,
107 : }
108 1521771 : }
109 : }
110 :
111 : ///
112 : /// A "cursor" for efficiently reading multiple pages from a BlockReader
113 : ///
114 : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
115 : /// that in many cases you can use a BlockCursor as a drop-in replacement for
116 : /// the underlying BlockReader. For example:
117 : ///
118 : /// ```no_run
119 : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
120 : /// # use pageserver::context::RequestContext;
121 : /// # let reader: FileBlockReader = unimplemented!("stub");
122 : /// # let ctx: RequestContext = unimplemented!("stub");
123 : /// let cursor = reader.block_cursor();
124 : /// let buf = cursor.read_blk(1, &ctx);
125 : /// // do stuff with 'buf'
126 : /// let buf = cursor.read_blk(2, &ctx);
127 : /// // do stuff with 'buf'
128 : /// ```
129 : ///
130 : pub struct BlockCursor<'a> {
131 : pub(super) read_compressed: bool,
132 : reader: BlockReaderRef<'a>,
133 : }
134 :
135 : impl<'a> BlockCursor<'a> {
136 411232 : pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
137 411232 : Self::new_with_compression(reader, false)
138 411232 : }
139 651984 : pub(crate) fn new_with_compression(reader: BlockReaderRef<'a>, read_compressed: bool) -> Self {
140 651984 : BlockCursor {
141 651984 : read_compressed,
142 651984 : reader,
143 651984 : }
144 651984 : }
145 : // Needed by cli
146 0 : pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
147 0 : BlockCursor {
148 0 : read_compressed: false,
149 0 : reader: BlockReaderRef::FileBlockReader(reader),
150 0 : }
151 0 : }
152 :
153 : /// Read a block.
154 : ///
155 : /// Returns a "lease" object that can be used to
156 : /// access to the contents of the page. (For the page cache, the
157 : /// lease object represents a lock on the buffer.)
158 : #[inline(always)]
159 1521771 : pub async fn read_blk(
160 1521771 : &self,
161 1521771 : blknum: u32,
162 1521771 : ctx: &RequestContext,
163 1521771 : ) -> Result<BlockLease, std::io::Error> {
164 1521771 : self.reader.read_blk(blknum, ctx).await
165 1521771 : }
166 : }
167 :
168 : /// An adapter for reading a (virtual) file using the page cache.
169 : ///
170 : /// The file is assumed to be immutable. This doesn't provide any functions
171 : /// for modifying the file, nor for invalidating the cache if it is modified.
172 : #[derive(Clone)]
173 : pub struct FileBlockReader<'a> {
174 : pub file: &'a VirtualFile,
175 :
176 : /// Unique ID of this file, used as key in the page cache.
177 : file_id: page_cache::FileId,
178 :
179 : compressed_reads: bool,
180 : }
181 :
182 : impl<'a> FileBlockReader<'a> {
183 241646 : pub fn new(file: &'a VirtualFile, file_id: FileId) -> Self {
184 241646 : FileBlockReader {
185 241646 : file_id,
186 241646 : file,
187 241646 : compressed_reads: true,
188 241646 : }
189 241646 : }
190 :
191 : /// Read a page from the underlying file into given buffer.
192 32246 : async fn fill_buffer(
193 32246 : &self,
194 32246 : buf: PageWriteGuard<'static>,
195 32246 : blkno: u32,
196 32246 : ctx: &RequestContext,
197 32246 : ) -> Result<PageWriteGuard<'static>, std::io::Error> {
198 32246 : assert!(buf.len() == PAGE_SZ);
199 32246 : self.file
200 32246 : .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
201 21999 : .await
202 32246 : }
203 : /// Read a block.
204 : ///
205 : /// Returns a "lease" object that can be used to
206 : /// access to the contents of the page. (For the page cache, the
207 : /// lease object represents a lock on the buffer.)
208 485269 : pub async fn read_blk<'b>(
209 485269 : &self,
210 485269 : blknum: u32,
211 485269 : ctx: &RequestContext,
212 485269 : ) -> Result<BlockLease<'b>, std::io::Error> {
213 485269 : let cache = page_cache::get();
214 485269 : match cache
215 485269 : .read_immutable_buf(self.file_id, blknum, ctx)
216 3717 : .await
217 485269 : .map_err(|e| {
218 0 : std::io::Error::new(
219 0 : std::io::ErrorKind::Other,
220 0 : format!("Failed to read immutable buf: {e:#}"),
221 0 : )
222 485269 : })? {
223 453023 : ReadBufResult::Found(guard) => Ok(guard.into()),
224 32246 : ReadBufResult::NotFound(write_guard) => {
225 : // Read the page from disk into the buffer
226 32246 : let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
227 32246 : Ok(write_guard.mark_valid().into())
228 : }
229 : }
230 485269 : }
231 : }
232 :
233 : impl BlockReader for FileBlockReader<'_> {
234 240720 : fn block_cursor(&self) -> BlockCursor<'_> {
235 240720 : BlockCursor::new_with_compression(
236 240720 : BlockReaderRef::FileBlockReader(self),
237 240720 : self.compressed_reads,
238 240720 : )
239 240720 : }
240 : }
241 :
242 : ///
243 : /// Trait for block-oriented output
244 : ///
245 : pub trait BlockWriter {
246 : ///
247 : /// Write a page to the underlying storage.
248 : ///
249 : /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
250 : /// written to.
251 : ///
252 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
253 : }
254 :
255 : ///
256 : /// A simple in-memory buffer of blocks.
257 : ///
258 : pub struct BlockBuf {
259 : pub blocks: Vec<Bytes>,
260 : }
261 : impl BlockWriter for BlockBuf {
262 14408 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
263 14408 : assert!(buf.len() == PAGE_SZ);
264 14408 : let blknum = self.blocks.len();
265 14408 : self.blocks.push(buf);
266 14408 : Ok(blknum as u32)
267 14408 : }
268 : }
269 :
270 : impl BlockBuf {
271 1956 : pub fn new() -> Self {
272 1956 : BlockBuf { blocks: Vec::new() }
273 1956 : }
274 :
275 2047536 : pub fn size(&self) -> u64 {
276 2047536 : (self.blocks.len() * PAGE_SZ) as u64
277 2047536 : }
278 : }
279 : impl Default for BlockBuf {
280 0 : fn default() -> Self {
281 0 : Self::new()
282 0 : }
283 : }
|