Line data Source code
1 : //! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
2 : //! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
3 :
4 : use crate::context::RequestContext;
5 : use crate::page_cache::{self, PAGE_SZ};
6 : use crate::tenant::block_io::BlockLease;
7 : use crate::virtual_file::VirtualFile;
8 :
9 : use once_cell::sync::Lazy;
10 : use std::io::{self, ErrorKind};
11 : use std::ops::{Deref, Range};
12 : use tokio_epoll_uring::BoundedBuf;
13 : use tracing::*;
14 :
15 : use super::zero_padded_read_write;
16 :
17 : /// See module-level comment.
18 : pub struct RW {
19 : page_cache_file_id: page_cache::FileId,
20 : rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
21 : }
22 :
23 : /// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`],
24 : /// should we pre-warm the [`crate::page_cache`] with the contents?
25 : #[derive(Clone, Copy)]
26 : pub enum PrewarmOnWrite {
27 : Yes,
28 : No,
29 : }
30 :
31 : impl RW {
32 1246 : pub fn new(file: VirtualFile, prewarm_on_write: PrewarmOnWrite) -> Self {
33 1246 : let page_cache_file_id = page_cache::next_file_id();
34 1246 : Self {
35 1246 : page_cache_file_id,
36 1246 : rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(
37 1246 : page_cache_file_id,
38 1246 : file,
39 1246 : prewarm_on_write,
40 1246 : )),
41 1246 : }
42 1246 : }
43 :
44 1244 : pub fn page_cache_file_id(&self) -> page_cache::FileId {
45 1244 : self.page_cache_file_id
46 1244 : }
47 :
48 10221312 : pub(crate) async fn write_all_borrowed(
49 10221312 : &mut self,
50 10221312 : srcbuf: &[u8],
51 10221312 : ctx: &RequestContext,
52 10221312 : ) -> Result<usize, io::Error> {
53 10221312 : // It doesn't make sense to proactively fill the page cache on the Pageserver write path
54 10221312 : // because Compute is unlikely to access recently written data.
55 10221312 : self.rw.write_all_borrowed(srcbuf, ctx).await
56 10221312 : }
57 :
58 10202350 : pub(crate) fn bytes_written(&self) -> u64 {
59 10202350 : self.rw.bytes_written()
60 10202350 : }
61 :
62 : /// Load all blocks that can be read via [`Self::read_blk`] into a contiguous memory buffer.
63 : ///
64 : /// This includes the blocks that aren't yet flushed to disk by the internal buffered writer.
65 : /// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`].
66 0 : pub(super) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
67 : // round up to the next PAGE_SZ multiple, required by blob_io
68 0 : let size = {
69 0 : let s = usize::try_from(self.bytes_written()).unwrap();
70 0 : if s % PAGE_SZ == 0 {
71 0 : s
72 : } else {
73 0 : s.checked_add(PAGE_SZ - (s % PAGE_SZ)).unwrap()
74 : }
75 : };
76 0 : let vec = Vec::with_capacity(size);
77 0 :
78 0 : // read from disk what we've already flushed
79 0 : let writer = self.rw.as_writer();
80 0 : let flushed_range = writer.written_range();
81 0 : let mut vec = writer
82 0 : .file
83 0 : .read_exact_at(
84 0 : vec.slice(0..(flushed_range.end - flushed_range.start)),
85 0 : u64::try_from(flushed_range.start).unwrap(),
86 0 : ctx,
87 0 : )
88 0 : .await?
89 0 : .into_inner();
90 0 :
91 0 : // copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk
92 0 : let buffered = self.rw.get_tail_zero_padded();
93 0 : vec.extend_from_slice(buffered);
94 0 : assert_eq!(vec.len(), size);
95 0 : assert_eq!(vec.len() % PAGE_SZ, 0);
96 0 : Ok(vec)
97 0 : }
98 :
99 4955837 : pub(crate) async fn read_blk(
100 4955837 : &self,
101 4955837 : blknum: u32,
102 4955837 : ctx: &RequestContext,
103 4955837 : ) -> Result<BlockLease, io::Error> {
104 4955837 : match self.rw.read_blk(blknum).await? {
105 4302492 : zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => {
106 4302492 : let cache = page_cache::get();
107 4302492 : match cache
108 4302492 : .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
109 53438 : .await
110 4302492 : .map_err(|e| {
111 0 : std::io::Error::new(
112 0 : std::io::ErrorKind::Other,
113 0 : // order path before error because error is anyhow::Error => might have many contexts
114 0 : format!(
115 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
116 0 : blknum,
117 0 : self.rw.as_writer().file.path,
118 0 : e,
119 0 : ),
120 0 : )
121 4302492 : })? {
122 4248543 : page_cache::ReadBufResult::Found(guard) => {
123 4248543 : return Ok(BlockLease::PageReadGuard(guard))
124 : }
125 53949 : page_cache::ReadBufResult::NotFound(write_guard) => {
126 53949 : let write_guard = writer
127 53949 : .file
128 53949 : .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
129 28159 : .await?;
130 53949 : let read_guard = write_guard.mark_valid();
131 53949 : return Ok(BlockLease::PageReadGuard(read_guard));
132 : }
133 : }
134 : }
135 653345 : zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => {
136 653345 : Ok(BlockLease::EphemeralFileMutableTail(buffer))
137 : }
138 : }
139 4955837 : }
140 : }
141 :
142 : impl Drop for RW {
143 1118 : fn drop(&mut self) {
144 1118 : // There might still be pages in the [`crate::page_cache`] for this file.
145 1118 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
146 1118 :
147 1118 : // unlink the file
148 1118 : let res = std::fs::remove_file(&self.rw.as_writer().file.path);
149 1118 : if let Err(e) = res {
150 2 : if e.kind() != std::io::ErrorKind::NotFound {
151 : // just never log the not found errors, we cannot do anything for them; on detach
152 : // the tenant directory is already gone.
153 : //
154 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
155 0 : error!(
156 0 : "could not remove ephemeral file '{}': {}",
157 0 : self.rw.as_writer().file.path,
158 : e
159 : );
160 2 : }
161 1116 : }
162 1118 : }
163 : }
164 :
165 : struct PreWarmingWriter {
166 : prewarm_on_write: PrewarmOnWrite,
167 : nwritten_blocks: u32,
168 : page_cache_file_id: page_cache::FileId,
169 : file: VirtualFile,
170 : }
171 :
172 : impl PreWarmingWriter {
173 1246 : fn new(
174 1246 : page_cache_file_id: page_cache::FileId,
175 1246 : file: VirtualFile,
176 1246 : prewarm_on_write: PrewarmOnWrite,
177 1246 : ) -> Self {
178 1246 : Self {
179 1246 : prewarm_on_write,
180 1246 : nwritten_blocks: 0,
181 1246 : page_cache_file_id,
182 1246 : file,
183 1246 : }
184 1246 : }
185 :
186 : /// Return the byte range within `file` that has been written though `write_all`.
187 : ///
188 : /// The returned range would be invalidated by another `write_all`. To prevent that, we capture `&_`.
189 0 : fn written_range(&self) -> (impl Deref<Target = Range<usize>> + '_) {
190 0 : let nwritten_blocks = usize::try_from(self.nwritten_blocks).unwrap();
191 0 : struct Wrapper(Range<usize>);
192 0 : impl Deref for Wrapper {
193 0 : type Target = Range<usize>;
194 0 : fn deref(&self) -> &Range<usize> {
195 0 : &self.0
196 0 : }
197 0 : }
198 0 : Wrapper(0..nwritten_blocks * PAGE_SZ)
199 0 : }
200 : }
201 :
202 : impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
203 6610 : async fn write_all<
204 6610 : B: tokio_epoll_uring::BoundedBuf<Buf = Buf>,
205 6610 : Buf: tokio_epoll_uring::IoBuf + Send,
206 6610 : >(
207 6610 : &mut self,
208 6610 : buf: B,
209 6610 : ctx: &RequestContext,
210 6610 : ) -> std::io::Result<(usize, B::Buf)> {
211 6610 : let buf = buf.slice(..);
212 6610 : let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done
213 6610 : let check_bounds_stuff_works = if cfg!(test) && cfg!(debug_assertions) {
214 6610 : Some(buf.to_vec())
215 : } else {
216 0 : None
217 : };
218 6610 : let buflen = buf.len();
219 6610 : assert_eq!(
220 6610 : buflen % PAGE_SZ,
221 : 0,
222 0 : "{buflen} ; we know TAIL_SZ is a PAGE_SZ multiple, and write_buffered_borrowed is used"
223 : );
224 :
225 : // Do the IO.
226 6610 : let iobuf = match self.file.write_all(buf, ctx).await {
227 6610 : (iobuf, Ok(nwritten)) => {
228 6610 : assert_eq!(nwritten, buflen);
229 6610 : iobuf
230 : }
231 0 : (_, Err(e)) => {
232 0 : return Err(std::io::Error::new(
233 0 : ErrorKind::Other,
234 0 : // order error before path because path is long and error is short
235 0 : format!(
236 0 : "ephemeral_file: write_blob: write-back tail self.nwritten_blocks={}, buflen={}, {:#}: {}",
237 0 : self.nwritten_blocks, buflen, e, self.file.path,
238 0 : ),
239 0 : ));
240 : }
241 : };
242 :
243 : // Reconstruct the Slice (the write path consumed the Slice and returned us the underlying IoBuf)
244 6610 : let buf = tokio_epoll_uring::Slice::from_buf_bounds(iobuf, saved_bounds);
245 6610 : if let Some(check_bounds_stuff_works) = check_bounds_stuff_works {
246 6610 : assert_eq!(&check_bounds_stuff_works, &*buf);
247 0 : }
248 :
249 6610 : let nblocks = buflen / PAGE_SZ;
250 6610 : let nblocks32 = u32::try_from(nblocks).unwrap();
251 :
252 6610 : if matches!(self.prewarm_on_write, PrewarmOnWrite::Yes) {
253 : // Pre-warm page cache with the contents.
254 : // At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
255 : // benefits the code that writes InMemoryLayer=>L0 layers.
256 :
257 6610 : let cache = page_cache::get();
258 28 : static CTX: Lazy<RequestContext> = Lazy::new(|| {
259 28 : RequestContext::new(
260 28 : crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
261 28 : crate::context::DownloadBehavior::Error,
262 28 : )
263 28 : });
264 52880 : for blknum_in_buffer in 0..nblocks {
265 52880 : let blk_in_buffer =
266 52880 : &buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
267 52880 : let blknum = self
268 52880 : .nwritten_blocks
269 52880 : .checked_add(blknum_in_buffer as u32)
270 52880 : .unwrap();
271 52880 : match cache
272 52880 : .read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
273 261 : .await
274 : {
275 0 : Err(e) => {
276 0 : error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
277 : // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
278 : }
279 52880 : Ok(v) => match v {
280 0 : page_cache::ReadBufResult::Found(_guard) => {
281 0 : // This function takes &mut self, so, it shouldn't be possible to reach this point.
282 0 : unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
283 0 : and this function takes &mut self, so, no concurrent read_blk is possible");
284 : }
285 52880 : page_cache::ReadBufResult::NotFound(mut write_guard) => {
286 52880 : write_guard.copy_from_slice(blk_in_buffer);
287 52880 : let _ = write_guard.mark_valid();
288 52880 : }
289 : },
290 : }
291 : }
292 0 : }
293 :
294 6610 : self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
295 6610 : Ok((buflen, buf.into_inner()))
296 6610 : }
297 : }
|