Line data Source code
1 : //!
2 : //! Low-level Block-oriented I/O functions
3 : //!
4 :
5 : use super::ephemeral_file::EphemeralFile;
6 : use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
7 : use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
8 : use crate::virtual_file::VirtualFile;
9 : use bytes::Bytes;
10 : use std::ops::{Deref, DerefMut};
11 :
12 : /// This is implemented by anything that can read 8 kB (PAGE_SZ)
13 : /// blocks, using the page cache
14 : ///
15 : /// There are currently two implementations: EphemeralFile, and FileBlockReader
16 : /// below.
17 : pub trait BlockReader {
18 : ///
19 : /// Create a new "cursor" for reading from this reader.
20 : ///
21 : /// A cursor caches the last accessed page, allowing for faster
22 : /// access if the same block is accessed repeatedly.
23 : fn block_cursor(&self) -> BlockCursor<'_>;
24 : }
25 :
26 : impl<B> BlockReader for &B
27 : where
28 : B: BlockReader,
29 : {
30 45700309 : fn block_cursor(&self) -> BlockCursor<'_> {
31 45700309 : (*self).block_cursor()
32 45700309 : }
33 : }
34 :
35 : /// Reference to an in-memory copy of an immutable on-disk block.
36 : pub enum BlockLease<'a> {
37 : PageReadGuard(PageReadGuard<'static>),
38 : EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
39 : #[cfg(test)]
40 : Arc(std::sync::Arc<[u8; PAGE_SZ]>),
41 : }
42 :
43 : impl From<PageReadGuard<'static>> for BlockLease<'static> {
44 291337465 : fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
45 291337465 : BlockLease::PageReadGuard(value)
46 291337465 : }
47 : }
48 :
49 : #[cfg(test)]
50 : impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
51 508211 : fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
52 508211 : BlockLease::Arc(value)
53 508211 : }
54 : }
55 :
56 : impl<'a> Deref for BlockLease<'a> {
57 : type Target = [u8; PAGE_SZ];
58 :
59 791982016 : fn deref(&self) -> &Self::Target {
60 791982016 : match self {
61 784687836 : BlockLease::PageReadGuard(v) => v.deref(),
62 6785969 : BlockLease::EphemeralFileMutableTail(v) => v,
63 : #[cfg(test)]
64 508211 : BlockLease::Arc(v) => v.deref(),
65 : }
66 791982016 : }
67 : }
68 :
69 : /// Provides the ability to read blocks from different sources,
70 : /// similar to using traits for this purpose.
71 : ///
72 : /// Unlike traits, we also support the read function to be async though.
73 : pub(crate) enum BlockReaderRef<'a> {
74 : FileBlockReaderVirtual(&'a FileBlockReader),
75 : EphemeralFile(&'a EphemeralFile),
76 : Adapter(Adapter<&'a DeltaLayerInner>),
77 : #[cfg(test)]
78 : TestDisk(&'a super::disk_btree::tests::TestDisk),
79 : }
80 :
81 : impl<'a> BlockReaderRef<'a> {
82 : #[inline(always)]
83 388596454 : async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
84 388596454 : use BlockReaderRef::*;
85 388596454 : match self {
86 257511401 : FileBlockReaderVirtual(r) => r.read_blk(blknum).await,
87 96763021 : EphemeralFile(r) => r.read_blk(blknum).await,
88 33813821 : Adapter(r) => r.read_blk(blknum).await,
89 : #[cfg(test)]
90 508211 : TestDisk(r) => r.read_blk(blknum),
91 : }
92 388596452 : }
93 : }
94 :
95 : ///
96 : /// A "cursor" for efficiently reading multiple pages from a BlockReader
97 : ///
98 : /// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
99 : /// that in many cases you can use a BlockCursor as a drop-in replacement for
100 : /// the underlying BlockReader. For example:
101 : ///
102 : /// ```no_run
103 : /// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
104 : /// # let reader: FileBlockReader = unimplemented!("stub");
105 : /// let cursor = reader.block_cursor();
106 : /// let buf = cursor.read_blk(1);
107 : /// // do stuff with 'buf'
108 : /// let buf = cursor.read_blk(2);
109 : /// // do stuff with 'buf'
110 : /// ```
111 : ///
112 : pub struct BlockCursor<'a> {
113 : reader: BlockReaderRef<'a>,
114 : }
115 :
116 : impl<'a> BlockCursor<'a> {
117 144311348 : pub(crate) fn new(reader: BlockReaderRef<'a>) -> Self {
118 144311348 : BlockCursor { reader }
119 144311348 : }
120 : // Needed by cli
121 0 : pub fn new_fileblockreader(reader: &'a FileBlockReader) -> Self {
122 0 : BlockCursor {
123 0 : reader: BlockReaderRef::FileBlockReaderVirtual(reader),
124 0 : }
125 0 : }
126 :
127 : /// Read a block.
128 : ///
129 : /// Returns a "lease" object that can be used to
130 : /// access to the contents of the page. (For the page cache, the
131 : /// lease object represents a lock on the buffer.)
132 : #[inline(always)]
133 388596337 : pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
134 388596454 : self.reader.read_blk(blknum).await
135 388596452 : }
136 : }
137 :
138 : /// An adapter for reading a (virtual) file using the page cache.
139 : ///
140 : /// The file is assumed to be immutable. This doesn't provide any functions
141 : /// for modifying the file, nor for invalidating the cache if it is modified.
142 : pub struct FileBlockReader {
143 : pub file: VirtualFile,
144 :
145 : /// Unique ID of this file, used as key in the page cache.
146 : file_id: page_cache::FileId,
147 : }
148 :
149 : impl FileBlockReader {
150 12140 : pub fn new(file: VirtualFile) -> Self {
151 12140 : let file_id = page_cache::next_file_id();
152 12140 :
153 12140 : FileBlockReader { file_id, file }
154 12140 : }
155 :
156 : /// Read a page from the underlying file into given buffer.
157 5359546 : async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
158 5359540 : assert!(buf.len() == PAGE_SZ);
159 5359540 : self.file
160 5359540 : .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
161 0 : .await
162 5359540 : }
163 : /// Read a block.
164 : ///
165 : /// Returns a "lease" object that can be used to
166 : /// access to the contents of the page. (For the page cache, the
167 : /// lease object represents a lock on the buffer.)
168 291337479 : pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
169 291337362 : let cache = page_cache::get();
170 : loop {
171 296696891 : match cache
172 296696891 : .read_immutable_buf(self.file_id, blknum)
173 1640566 : .await
174 296696890 : .map_err(|e| {
175 0 : std::io::Error::new(
176 0 : std::io::ErrorKind::Other,
177 0 : format!("Failed to read immutable buf: {e:#}"),
178 0 : )
179 296696890 : })? {
180 291337350 : ReadBufResult::Found(guard) => break Ok(guard.into()),
181 5359540 : ReadBufResult::NotFound(mut write_guard) => {
182 5359540 : // Read the page from disk into the buffer
183 5359540 : self.fill_buffer(write_guard.deref_mut(), blknum).await?;
184 5359529 : write_guard.mark_valid();
185 5359529 :
186 5359529 : // Swap for read lock
187 5359529 : continue;
188 : }
189 : };
190 : }
191 291337361 : }
192 : }
193 :
194 : impl BlockReader for FileBlockReader {
195 91395711 : fn block_cursor(&self) -> BlockCursor<'_> {
196 91395711 : BlockCursor::new(BlockReaderRef::FileBlockReaderVirtual(self))
197 91395711 : }
198 : }
199 :
200 : ///
201 : /// Trait for block-oriented output
202 : ///
203 : pub trait BlockWriter {
204 : ///
205 : /// Write a page to the underlying storage.
206 : ///
207 : /// 'buf' must be of size PAGE_SZ. Returns the block number the page was
208 : /// written to.
209 : ///
210 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
211 : }
212 :
213 : ///
214 : /// A simple in-memory buffer of blocks.
215 : ///
216 : pub struct BlockBuf {
217 : pub blocks: Vec<Bytes>,
218 : }
219 : impl BlockWriter for BlockBuf {
220 193641 : fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
221 193641 : assert!(buf.len() == PAGE_SZ);
222 193641 : let blknum = self.blocks.len();
223 193641 : self.blocks.push(buf);
224 193641 : Ok(blknum as u32)
225 193641 : }
226 : }
227 :
228 : impl BlockBuf {
229 16627 : pub fn new() -> Self {
230 16627 : BlockBuf { blocks: Vec::new() }
231 16627 : }
232 :
233 1751919 : pub fn size(&self) -> u64 {
234 1751919 : (self.blocks.len() * PAGE_SZ) as u64
235 1751919 : }
236 : }
237 : impl Default for BlockBuf {
238 0 : fn default() -> Self {
239 0 : Self::new()
240 0 : }
241 : }
|