Line data Source code
1 : //! Wrapper around [`super::zero_padded_read_write::RW`] that uses the
2 : //! [`crate::page_cache`] to serve reads that need to go to the underlying [`VirtualFile`].
3 : //!
4 : //! Subject to removal in <https://github.com/neondatabase/neon/pull/8537>
5 :
6 : use crate::context::RequestContext;
7 : use crate::page_cache::{self, PAGE_SZ};
8 : use crate::tenant::block_io::BlockLease;
9 : use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
10 : use crate::virtual_file::VirtualFile;
11 :
12 : use std::io::{self};
13 : use tokio_epoll_uring::BoundedBuf;
14 : use tracing::*;
15 :
16 : use super::zero_padded_read_write;
17 :
18 : /// See module-level comment.
19 : pub struct RW {
20 : page_cache_file_id: page_cache::FileId,
21 : rw: super::zero_padded_read_write::RW<size_tracking_writer::Writer<VirtualFile>>,
22 : /// Gate guard is held on as long as we need to do operations in the path (delete on drop).
23 : _gate_guard: utils::sync::gate::GateGuard,
24 : }
25 :
26 : impl RW {
27 1268 : pub fn new(file: VirtualFile, _gate_guard: utils::sync::gate::GateGuard) -> Self {
28 1268 : let page_cache_file_id = page_cache::next_file_id();
29 1268 : Self {
30 1268 : page_cache_file_id,
31 1268 : rw: super::zero_padded_read_write::RW::new(size_tracking_writer::Writer::new(file)),
32 1268 : _gate_guard,
33 1268 : }
34 1268 : }
35 :
36 1264 : pub fn page_cache_file_id(&self) -> page_cache::FileId {
37 1264 : self.page_cache_file_id
38 1264 : }
39 :
40 10221632 : pub(crate) async fn write_all_borrowed(
41 10221632 : &mut self,
42 10221632 : srcbuf: &[u8],
43 10221632 : ctx: &RequestContext,
44 10221632 : ) -> Result<usize, io::Error> {
45 10221632 : // It doesn't make sense to proactively fill the page cache on the Pageserver write path
46 10221632 : // because Compute is unlikely to access recently written data.
47 10221632 : self.rw.write_all_borrowed(srcbuf, ctx).await
48 10221632 : }
49 :
50 10204626 : pub(crate) fn bytes_written(&self) -> u64 {
51 10204626 : self.rw.bytes_written()
52 10204626 : }
53 :
54 : /// Load all blocks that can be read via [`Self::read_blk`] into a contiguous memory buffer.
55 : ///
56 : /// This includes the blocks that aren't yet flushed to disk by the internal buffered writer.
57 : /// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`].
58 968 : pub(super) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
59 : // round up to the next PAGE_SZ multiple, required by blob_io
60 968 : let size = {
61 968 : let s = usize::try_from(self.bytes_written()).unwrap();
62 968 : if s % PAGE_SZ == 0 {
63 0 : s
64 : } else {
65 968 : s.checked_add(PAGE_SZ - (s % PAGE_SZ)).unwrap()
66 : }
67 : };
68 968 : let vec = Vec::with_capacity(size);
69 968 :
70 968 : // read from disk what we've already flushed
71 968 : let file_size_tracking_writer = self.rw.as_writer();
72 968 : let flushed_range = 0..usize::try_from(file_size_tracking_writer.bytes_written()).unwrap();
73 968 : let mut vec = file_size_tracking_writer
74 968 : .as_inner()
75 968 : .read_exact_at(
76 968 : vec.slice(0..(flushed_range.end - flushed_range.start)),
77 968 : u64::try_from(flushed_range.start).unwrap(),
78 968 : ctx,
79 968 : )
80 389 : .await?
81 968 : .into_inner();
82 968 :
83 968 : // copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk
84 968 : let buffered = self.rw.get_tail_zero_padded();
85 968 : vec.extend_from_slice(buffered);
86 968 : assert_eq!(vec.len(), size);
87 968 : assert_eq!(vec.len() % PAGE_SZ, 0);
88 968 : Ok(vec)
89 968 : }
90 :
91 524745 : pub(crate) async fn read_blk(
92 524745 : &self,
93 524745 : blknum: u32,
94 524745 : ctx: &RequestContext,
95 524745 : ) -> Result<BlockLease, io::Error> {
96 524745 : match self.rw.read_blk(blknum).await? {
97 202268 : zero_padded_read_write::ReadResult::NeedsReadFromWriter { writer } => {
98 202268 : let cache = page_cache::get();
99 202268 : match cache
100 202268 : .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
101 1908 : .await
102 202268 : .map_err(|e| {
103 0 : std::io::Error::new(
104 0 : std::io::ErrorKind::Other,
105 0 : // order path before error because error is anyhow::Error => might have many contexts
106 0 : format!(
107 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
108 0 : blknum,
109 0 : self.rw.as_writer().as_inner().path,
110 0 : e,
111 0 : ),
112 0 : )
113 202268 : })? {
114 193135 : page_cache::ReadBufResult::Found(guard) => {
115 193135 : return Ok(BlockLease::PageReadGuard(guard))
116 : }
117 9133 : page_cache::ReadBufResult::NotFound(write_guard) => {
118 9133 : let write_guard = writer
119 9133 : .as_inner()
120 9133 : .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
121 4639 : .await?;
122 9133 : let read_guard = write_guard.mark_valid();
123 9133 : return Ok(BlockLease::PageReadGuard(read_guard));
124 : }
125 : }
126 : }
127 322477 : zero_padded_read_write::ReadResult::ServedFromZeroPaddedMutableTail { buffer } => {
128 322477 : Ok(BlockLease::EphemeralFileMutableTail(buffer))
129 : }
130 : }
131 524745 : }
132 : }
133 :
134 : impl Drop for RW {
135 1140 : fn drop(&mut self) {
136 1140 : // There might still be pages in the [`crate::page_cache`] for this file.
137 1140 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
138 1140 :
139 1140 : // unlink the file
140 1140 : // we are clear to do this, because we have entered a gate
141 1140 : let path = &self.rw.as_writer().as_inner().path;
142 1140 : let res = std::fs::remove_file(path);
143 1140 : if let Err(e) = res {
144 2 : if e.kind() != std::io::ErrorKind::NotFound {
145 : // just never log the not found errors, we cannot do anything for them; on detach
146 : // the tenant directory is already gone.
147 : //
148 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
149 0 : error!("could not remove ephemeral file '{path}': {e}");
150 2 : }
151 1138 : }
152 1140 : }
153 : }
|