Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::page_cache::{self, PAGE_SZ};
6 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
7 : use crate::virtual_file::VirtualFile;
8 : use std::cmp::min;
9 : use std::fs::OpenOptions;
10 : use std::io::{self, ErrorKind};
11 : use std::ops::DerefMut;
12 : use std::path::PathBuf;
13 : use std::sync::atomic::AtomicU64;
14 : use tracing::*;
15 : use utils::id::{TenantId, TimelineId};
16 :
17 : pub struct EphemeralFile {
18 : page_cache_file_id: page_cache::FileId,
19 :
20 : _tenant_id: TenantId,
21 : _timeline_id: TimelineId,
22 : file: VirtualFile,
23 : len: u64,
24 : /// An ephemeral file is append-only.
25 : /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
26 : /// The other pages, which can no longer be modified, are accessed through the page cache.
27 : mutable_tail: [u8; PAGE_SZ],
28 : }
29 :
30 : impl EphemeralFile {
31 6869 : pub fn create(
32 6869 : conf: &PageServerConf,
33 6869 : tenant_id: TenantId,
34 6869 : timeline_id: TimelineId,
35 6869 : ) -> Result<EphemeralFile, io::Error> {
36 6869 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
37 6869 : let filename_disambiguator =
38 6869 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
39 6869 :
40 6869 : let filename = conf
41 6869 : .timeline_path(&tenant_id, &timeline_id)
42 6869 : .join(PathBuf::from(format!("ephemeral-{filename_disambiguator}")));
43 :
44 6869 : let file = VirtualFile::open_with_options(
45 6869 : &filename,
46 6869 : OpenOptions::new().read(true).write(true).create(true),
47 6869 : )?;
48 :
49 6869 : Ok(EphemeralFile {
50 6869 : page_cache_file_id: page_cache::next_file_id(),
51 6869 : _tenant_id: tenant_id,
52 6869 : _timeline_id: timeline_id,
53 6869 : file,
54 6869 : len: 0,
55 6869 : mutable_tail: [0u8; PAGE_SZ],
56 6869 : })
57 6869 : }
58 :
59 733761 : pub(crate) fn len(&self) -> u64 {
60 733761 : self.len
61 733761 : }
62 :
63 96762958 : pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
64 96762870 : let flushed_blknums = 0..self.len / PAGE_SZ as u64;
65 96762870 : if flushed_blknums.contains(&(blknum as u64)) {
66 94136815 : let cache = page_cache::get();
67 : loop {
68 95788974 : match cache
69 95788974 : .read_immutable_buf(self.page_cache_file_id, blknum)
70 592846 : .await
71 95788973 : .map_err(|e| {
72 0 : std::io::Error::new(
73 0 : std::io::ErrorKind::Other,
74 0 : // order path before error because error is anyhow::Error => might have many contexts
75 0 : format!(
76 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
77 0 : blknum,
78 0 : self.file.path.display(),
79 0 : e,
80 0 : ),
81 0 : )
82 95788973 : })? {
83 94136814 : page_cache::ReadBufResult::Found(guard) => {
84 94136814 : return Ok(BlockLease::PageReadGuard(guard))
85 : }
86 1652159 : page_cache::ReadBufResult::NotFound(mut write_guard) => {
87 1652159 : let buf: &mut [u8] = write_guard.deref_mut();
88 1652159 : debug_assert_eq!(buf.len(), PAGE_SZ);
89 1652159 : self.file
90 1652159 : .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
91 0 : .await?;
92 1652159 : write_guard.mark_valid();
93 1652159 :
94 1652159 : // Swap for read lock
95 1652159 : continue;
96 : }
97 : };
98 : }
99 : } else {
100 2626055 : debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
101 2626055 : Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
102 : }
103 96762869 : }
104 :
105 82556103 : pub(crate) async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, io::Error> {
106 82556073 : struct Writer<'a> {
107 82556073 : ephemeral_file: &'a mut EphemeralFile,
108 82556073 : /// The block to which the next [`push_bytes`] will write.
109 82556073 : blknum: u32,
110 82556073 : /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
111 82556073 : off: usize,
112 82556073 : }
113 82556073 : impl<'a> Writer<'a> {
114 82556103 : fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
115 82556103 : Ok(Writer {
116 82556103 : blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
117 82556103 : off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
118 82556103 : ephemeral_file,
119 82556103 : })
120 82556103 : }
121 82556073 : #[inline(always)]
122 165112146 : async fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> {
123 165112146 : let mut src_remaining = src;
124 334152305 : while !src_remaining.is_empty() {
125 169040160 : let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
126 169040160 : let n = min(dst_remaining.len(), src_remaining.len());
127 169040160 : dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
128 169040160 : self.off += n;
129 169040160 : src_remaining = &src_remaining[n..];
130 169040160 : if self.off == PAGE_SZ {
131 82556073 : match self
132 3950823 : .ephemeral_file
133 3950823 : .file
134 3950823 : .write_all_at(
135 3950823 : &self.ephemeral_file.mutable_tail,
136 3950823 : self.blknum as u64 * PAGE_SZ as u64,
137 3950823 : )
138 82556073 : .await
139 82556073 : {
140 82556073 : Ok(_) => {
141 82556073 : // Pre-warm the page cache with what we just wrote.
142 82556073 : // This isn't necessary for coherency/correctness, but it's how we've always done it.
143 82556073 : let cache = page_cache::get();
144 3950822 : match cache
145 3950822 : .read_immutable_buf(
146 3950822 : self.ephemeral_file.page_cache_file_id,
147 3950822 : self.blknum,
148 3950822 : )
149 82556073 : .await
150 82556073 : {
151 82556073 : Ok(page_cache::ReadBufResult::Found(_guard)) => {
152 0 : // This function takes &mut self, so, it shouldn't be possible to reach this point.
153 0 : unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
154 82556073 : }
155 82556073 : Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
156 3950822 : let buf: &mut [u8] = write_guard.deref_mut();
157 82556073 : debug_assert_eq!(buf.len(), PAGE_SZ);
158 82556073 : buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
159 3950822 : write_guard.mark_valid();
160 82556073 : // pre-warm successful
161 82556073 : }
162 82556073 : Err(e) => {
163 82556073 : error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
164 82556073 : // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
165 82556073 : }
166 82556073 : }
167 82556073 : // Zero the buffer for re-use.
168 82556073 : // Zeroing is critical for correcntess because the write_blob code below
169 82556073 : // and similarly read_blk expect zeroed pages.
170 82556073 : self.ephemeral_file.mutable_tail.fill(0);
171 3950822 : // This block is done, move to next one.
172 3950822 : self.blknum += 1;
173 3950822 : self.off = 0;
174 82556073 : }
175 82556073 : Err(e) => {
176 0 : return Err(std::io::Error::new(
177 0 : ErrorKind::Other,
178 0 : // order error before path because path is long and error is short
179 0 : format!(
180 0 : "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
181 0 : self.blknum,
182 0 : e,
183 0 : self.ephemeral_file.file.path.display(),
184 0 : ),
185 0 : ));
186 82556073 : }
187 82556073 : }
188 165089337 : }
189 82556073 : }
190 165112145 : Ok(())
191 165112145 : }
192 82556073 : }
193 82556073 :
194 82556073 : let pos = self.len;
195 82556073 : let mut writer = Writer::new(self)?;
196 :
197 : // Write the length field
198 82556073 : if srcbuf.len() < 0x80 {
199 : // short one-byte length header
200 61017636 : let len_buf = [srcbuf.len() as u8];
201 61017636 : writer.push_bytes(&len_buf).await?;
202 : } else {
203 21538437 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
204 21538437 : len_buf[0] |= 0x80;
205 21538437 : writer.push_bytes(&len_buf).await?;
206 : }
207 :
208 : // Write the payload
209 82556073 : writer.push_bytes(srcbuf).await?;
210 :
211 82556072 : if srcbuf.len() < 0x80 {
212 61017635 : self.len += 1;
213 61017635 : } else {
214 21538437 : self.len += 4;
215 21538437 : }
216 82556072 : self.len += srcbuf.len() as u64;
217 82556072 :
218 82556072 : Ok(pos)
219 82556072 : }
220 : }
221 :
222 : /// Does the given filename look like an ephemeral file?
223 : pub fn is_ephemeral_file(filename: &str) -> bool {
224 88 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
225 83 : rest.parse::<u32>().is_ok()
226 : } else {
227 5 : false
228 : }
229 88 : }
230 :
231 : impl Drop for EphemeralFile {
232 6568 : fn drop(&mut self) {
233 6568 : // There might still be pages in the [`crate::page_cache`] for this file.
234 6568 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
235 6568 :
236 6568 : // unlink the file
237 6568 : let res = std::fs::remove_file(&self.file.path);
238 6568 : if let Err(e) = res {
239 57 : if e.kind() != std::io::ErrorKind::NotFound {
240 : // just never log the not found errors, we cannot do anything for them; on detach
241 : // the tenant directory is already gone.
242 : //
243 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
244 0 : error!(
245 0 : "could not remove ephemeral file '{}': {}",
246 0 : self.file.path.display(),
247 0 : e
248 0 : );
249 57 : }
250 6511 : }
251 6568 : }
252 : }
253 :
254 : impl BlockReader for EphemeralFile {
255 6254591 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
256 6254591 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
257 6254591 : }
258 : }
259 :
260 : #[cfg(test)]
261 : mod tests {
262 : use super::*;
263 : use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
264 : use rand::{thread_rng, RngCore};
265 : use std::fs;
266 : use std::str::FromStr;
267 :
268 1 : fn harness(
269 1 : test_name: &str,
270 1 : ) -> Result<(&'static PageServerConf, TenantId, TimelineId), io::Error> {
271 1 : let repo_dir = PageServerConf::test_repo_dir(test_name);
272 1 : let _ = fs::remove_dir_all(&repo_dir);
273 1 : let conf = PageServerConf::dummy_conf(repo_dir);
274 1 : // Make a static copy of the config. This can never be free'd, but that's
275 1 : // OK in a test.
276 1 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
277 1 :
278 1 : let tenant_id = TenantId::from_str("11000000000000000000000000000000").unwrap();
279 1 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
280 1 : fs::create_dir_all(conf.timeline_path(&tenant_id, &timeline_id))?;
281 :
282 1 : Ok((conf, tenant_id, timeline_id))
283 1 : }
284 :
285 1 : #[tokio::test]
286 1 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
287 1 : let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?;
288 :
289 1 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id)?;
290 :
291 1 : let pos_foo = file.write_blob(b"foo").await?;
292 1 : assert_eq!(
293 1 : b"foo",
294 1 : file.block_cursor().read_blob(pos_foo).await?.as_slice()
295 : );
296 1 : let pos_bar = file.write_blob(b"bar").await?;
297 1 : assert_eq!(
298 1 : b"foo",
299 1 : file.block_cursor().read_blob(pos_foo).await?.as_slice()
300 : );
301 1 : assert_eq!(
302 1 : b"bar",
303 1 : file.block_cursor().read_blob(pos_bar).await?.as_slice()
304 : );
305 :
306 1 : let mut blobs = Vec::new();
307 10001 : for i in 0..10000 {
308 10000 : let data = Vec::from(format!("blob{}", i).as_bytes());
309 10000 : let pos = file.write_blob(&data).await?;
310 10000 : blobs.push((pos, data));
311 : }
312 : // also test with a large blobs
313 101 : for i in 0..100 {
314 100 : let data = format!("blob{}", i).as_bytes().repeat(100);
315 100 : let pos = file.write_blob(&data).await?;
316 100 : blobs.push((pos, data));
317 : }
318 :
319 1 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
320 10101 : for (pos, expected) in blobs {
321 10100 : let actual = cursor.read_blob(pos).await?;
322 10100 : assert_eq!(actual, expected);
323 : }
324 :
325 : // Test a large blob that spans multiple pages
326 1 : let mut large_data = Vec::new();
327 1 : large_data.resize(20000, 0);
328 1 : thread_rng().fill_bytes(&mut large_data);
329 1 : let pos_large = file.write_blob(&large_data).await?;
330 1 : let result = file.block_cursor().read_blob(pos_large).await?;
331 1 : assert_eq!(result, large_data);
332 :
333 1 : Ok(())
334 : }
335 : }
|