Line data Source code
1 : //! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
2 : //! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
3 :
4 : use crate::context::RequestContext;
5 : use crate::page_cache::{self, PAGE_SZ};
6 : use crate::tenant::block_io::BlockLease;
7 : use crate::virtual_file::VirtualFile;
8 :
9 : use once_cell::sync::Lazy;
10 : use std::io::{self, ErrorKind};
11 : use tokio_epoll_uring::BoundedBuf;
12 : use tracing::*;
13 :
14 : use super::zero_padded_read_write;
15 :
16 : /// See module-level comment.
17 : pub struct RW {
18 : page_cache_file_id: page_cache::FileId,
19 : rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
20 : }
21 :
22 : impl RW {
23 1240 : pub fn new(file: VirtualFile) -> Self {
24 1240 : let page_cache_file_id = page_cache::next_file_id();
25 1240 : Self {
26 1240 : page_cache_file_id,
27 1240 : rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new(
28 1240 : page_cache_file_id,
29 1240 : file,
30 1240 : )),
31 1240 : }
32 1240 : }
33 :
34 1238 : pub fn page_cache_file_id(&self) -> page_cache::FileId {
35 1238 : self.page_cache_file_id
36 1238 : }
37 :
38 10221216 : pub(crate) async fn write_all_borrowed(
39 10221216 : &mut self,
40 10221216 : srcbuf: &[u8],
41 10221216 : ctx: &RequestContext,
42 10221216 : ) -> Result<usize, io::Error> {
43 10221216 : // It doesn't make sense to proactively fill the page cache on the Pageserver write path
44 10221216 : // because Compute is unlikely to access recently written data.
45 10221216 : self.rw.write_all_borrowed(srcbuf, ctx).await
46 10221216 : }
47 :
48 10202248 : pub(crate) fn bytes_written(&self) -> u64 {
49 10202248 : self.rw.bytes_written()
50 10202248 : }
51 :
52 4955748 : pub(crate) async fn read_blk(
53 4955748 : &self,
54 4955748 : blknum: u32,
55 4955748 : ctx: &RequestContext,
56 4955748 : ) -> Result<BlockLease, io::Error> {
57 4955748 : match self.rw.read_blk(blknum).await? {
58 4302384 : zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => {
59 4302384 : let cache = page_cache::get();
60 4302384 : match cache
61 4302384 : .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
62 53412 : .await
63 4302384 : .map_err(|e| {
64 0 : std::io::Error::new(
65 0 : std::io::ErrorKind::Other,
66 0 : // order path before error because error is anyhow::Error => might have many contexts
67 0 : format!(
68 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
69 0 : blknum,
70 0 : self.rw.as_writer().file.path,
71 0 : e,
72 0 : ),
73 0 : )
74 4302384 : })? {
75 4248436 : page_cache::ReadBufResult::Found(guard) => {
76 4248436 : return Ok(BlockLease::PageReadGuard(guard))
77 : }
78 53948 : page_cache::ReadBufResult::NotFound(write_guard) => {
79 53948 : let write_guard = writer
80 53948 : .file
81 53948 : .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
82 28162 : .await?;
83 53948 : let read_guard = write_guard.mark_valid();
84 53948 : return Ok(BlockLease::PageReadGuard(read_guard));
85 : }
86 : }
87 : }
88 653364 : zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => {
89 653364 : Ok(BlockLease::EphemeralFileMutableTail(buffer))
90 : }
91 : }
92 4955748 : }
93 : }
94 :
95 : impl Drop for RW {
96 1112 : fn drop(&mut self) {
97 1112 : // There might still be pages in the [`crate::page_cache`] for this file.
98 1112 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
99 1112 :
100 1112 : // unlink the file
101 1112 : let res = std::fs::remove_file(&self.rw.as_writer().file.path);
102 1112 : if let Err(e) = res {
103 2 : if e.kind() != std::io::ErrorKind::NotFound {
104 : // just never log the not found errors, we cannot do anything for them; on detach
105 : // the tenant directory is already gone.
106 : //
107 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
108 0 : error!(
109 0 : "could not remove ephemeral file '{}': {}",
110 0 : self.rw.as_writer().file.path,
111 : e
112 : );
113 2 : }
114 1110 : }
115 1112 : }
116 : }
117 :
118 : struct PreWarmingWriter {
119 : nwritten_blocks: u32,
120 : page_cache_file_id: page_cache::FileId,
121 : file: VirtualFile,
122 : }
123 :
124 : impl PreWarmingWriter {
125 1240 : fn new(page_cache_file_id: page_cache::FileId, file: VirtualFile) -> Self {
126 1240 : Self {
127 1240 : nwritten_blocks: 0,
128 1240 : page_cache_file_id,
129 1240 : file,
130 1240 : }
131 1240 : }
132 : }
133 :
134 : impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter {
135 6610 : async fn write_all<
136 6610 : B: tokio_epoll_uring::BoundedBuf<Buf = Buf>,
137 6610 : Buf: tokio_epoll_uring::IoBuf + Send,
138 6610 : >(
139 6610 : &mut self,
140 6610 : buf: B,
141 6610 : ctx: &RequestContext,
142 6610 : ) -> std::io::Result<(usize, B::Buf)> {
143 6610 : let buf = buf.slice(..);
144 6610 : let saved_bounds = buf.bounds(); // save for reconstructing the Slice from iobuf after the IO is done
145 6610 : let check_bounds_stuff_works = if cfg!(test) && cfg!(debug_assertions) {
146 6610 : Some(buf.to_vec())
147 : } else {
148 0 : None
149 : };
150 6610 : let buflen = buf.len();
151 6610 : assert_eq!(
152 6610 : buflen % PAGE_SZ,
153 : 0,
154 0 : "{buflen} ; we know TAIL_SZ is a PAGE_SZ multiple, and write_buffered_borrowed is used"
155 : );
156 :
157 : // Do the IO.
158 6610 : let iobuf = match self.file.write_all(buf, ctx).await {
159 6610 : (iobuf, Ok(nwritten)) => {
160 6610 : assert_eq!(nwritten, buflen);
161 6610 : iobuf
162 : }
163 0 : (_, Err(e)) => {
164 0 : return Err(std::io::Error::new(
165 0 : ErrorKind::Other,
166 0 : // order error before path because path is long and error is short
167 0 : format!(
168 0 : "ephemeral_file: write_blob: write-back tail self.nwritten_blocks={}, buflen={}, {:#}: {}",
169 0 : self.nwritten_blocks, buflen, e, self.file.path,
170 0 : ),
171 0 : ));
172 : }
173 : };
174 :
175 : // Reconstruct the Slice (the write path consumed the Slice and returned us the underlying IoBuf)
176 6610 : let buf = tokio_epoll_uring::Slice::from_buf_bounds(iobuf, saved_bounds);
177 6610 : if let Some(check_bounds_stuff_works) = check_bounds_stuff_works {
178 6610 : assert_eq!(&check_bounds_stuff_works, &*buf);
179 0 : }
180 :
181 : // Pre-warm page cache with the contents.
182 : // At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming
183 : // benefits the code that writes InMemoryLayer=>L0 layers.
184 6610 : let nblocks = buflen / PAGE_SZ;
185 6610 : let nblocks32 = u32::try_from(nblocks).unwrap();
186 6610 : let cache = page_cache::get();
187 28 : static CTX: Lazy<RequestContext> = Lazy::new(|| {
188 28 : RequestContext::new(
189 28 : crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache,
190 28 : crate::context::DownloadBehavior::Error,
191 28 : )
192 28 : });
193 52880 : for blknum_in_buffer in 0..nblocks {
194 52880 : let blk_in_buffer = &buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ];
195 52880 : let blknum = self
196 52880 : .nwritten_blocks
197 52880 : .checked_add(blknum_in_buffer as u32)
198 52880 : .unwrap();
199 52880 : match cache
200 52880 : .read_immutable_buf(self.page_cache_file_id, blknum, &CTX)
201 256 : .await
202 : {
203 0 : Err(e) => {
204 0 : error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
205 : // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
206 : }
207 52880 : Ok(v) => match v {
208 0 : page_cache::ReadBufResult::Found(_guard) => {
209 0 : // This function takes &mut self, so, it shouldn't be possible to reach this point.
210 0 : unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \
211 0 : and this function takes &mut self, so, no concurrent read_blk is possible");
212 : }
213 52880 : page_cache::ReadBufResult::NotFound(mut write_guard) => {
214 52880 : write_guard.copy_from_slice(blk_in_buffer);
215 52880 : let _ = write_guard.mark_valid();
216 52880 : }
217 : },
218 : }
219 : }
220 6610 : self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap();
221 6610 : Ok((buflen, buf.into_inner()))
222 6610 : }
223 : }
|