TLA Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache::{self, PAGE_SZ};
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::VirtualFile;
9 : use camino::Utf8PathBuf;
10 : use std::cmp::min;
11 : use std::fs::OpenOptions;
12 : use std::io::{self, ErrorKind};
13 : use std::ops::DerefMut;
14 : use std::sync::atomic::AtomicU64;
15 : use tracing::*;
16 : use utils::id::{TenantId, TimelineId};
17 :
18 : pub struct EphemeralFile {
19 : page_cache_file_id: page_cache::FileId,
20 :
21 : _tenant_id: TenantId,
22 : _timeline_id: TimelineId,
23 : file: VirtualFile,
24 : len: u64,
25 : /// An ephemeral file is append-only.
26 : /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
27 : /// The other pages, which can no longer be modified, are accessed through the page cache.
28 : mutable_tail: [u8; PAGE_SZ],
29 : }
30 :
31 : impl EphemeralFile {
32 CBC 5680 : pub async fn create(
33 5680 : conf: &PageServerConf,
34 5680 : tenant_id: TenantId,
35 5680 : timeline_id: TimelineId,
36 5680 : ) -> Result<EphemeralFile, io::Error> {
37 5680 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
38 5680 : let filename_disambiguator =
39 5680 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
40 5680 :
41 5680 : let filename = conf
42 5680 : .timeline_path(&tenant_id, &timeline_id)
43 5680 : .join(Utf8PathBuf::from(format!(
44 5680 : "ephemeral-{filename_disambiguator}"
45 5680 : )));
46 :
47 5680 : let file = VirtualFile::open_with_options(
48 5680 : &filename,
49 5680 : OpenOptions::new().read(true).write(true).create(true),
50 5680 : )
51 UBC 0 : .await?;
52 :
53 CBC 5680 : Ok(EphemeralFile {
54 5680 : page_cache_file_id: page_cache::next_file_id(),
55 5680 : _tenant_id: tenant_id,
56 5680 : _timeline_id: timeline_id,
57 5680 : file,
58 5680 : len: 0,
59 5680 : mutable_tail: [0u8; PAGE_SZ],
60 5680 : })
61 5680 : }
62 :
63 776346 : pub(crate) fn len(&self) -> u64 {
64 776346 : self.len
65 776346 : }
66 :
67 88574108 : pub(crate) async fn read_blk(
68 88574108 : &self,
69 88574108 : blknum: u32,
70 88574108 : ctx: &RequestContext,
71 88574108 : ) -> Result<BlockLease, io::Error> {
72 88573971 : let flushed_blknums = 0..self.len / PAGE_SZ as u64;
73 88573971 : if flushed_blknums.contains(&(blknum as u64)) {
74 86368168 : let cache = page_cache::get();
75 86368168 : match cache
76 86368168 : .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
77 1156028 : .await
78 86368168 : .map_err(|e| {
79 UBC 0 : std::io::Error::new(
80 0 : std::io::ErrorKind::Other,
81 0 : // order path before error because error is anyhow::Error => might have many contexts
82 0 : format!(
83 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
84 0 : blknum, self.file.path, e,
85 0 : ),
86 0 : )
87 CBC 86368168 : })? {
88 84624793 : page_cache::ReadBufResult::Found(guard) => {
89 84624793 : return Ok(BlockLease::PageReadGuard(guard))
90 : }
91 1743375 : page_cache::ReadBufResult::NotFound(mut write_guard) => {
92 1743375 : let buf: &mut [u8] = write_guard.deref_mut();
93 1743375 : debug_assert_eq!(buf.len(), PAGE_SZ);
94 1743375 : self.file
95 1743375 : .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
96 UBC 0 : .await?;
97 CBC 1743375 : let read_guard = write_guard.mark_valid();
98 1743375 : return Ok(BlockLease::PageReadGuard(read_guard));
99 : }
100 : };
101 : } else {
102 2205803 : debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
103 2205803 : Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
104 : }
105 88573971 : }
106 :
107 76632322 : pub(crate) async fn write_blob(
108 76632322 : &mut self,
109 76632322 : srcbuf: &[u8],
110 76632322 : ctx: &RequestContext,
111 76632322 : ) -> Result<u64, io::Error> {
112 76632167 : struct Writer<'a> {
113 76632167 : ephemeral_file: &'a mut EphemeralFile,
114 76632167 : /// The block to which the next [`push_bytes`] will write.
115 76632167 : blknum: u32,
116 76632167 : /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
117 76632167 : off: usize,
118 76632167 : }
119 76632167 : impl<'a> Writer<'a> {
120 76632322 : fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
121 76632322 : Ok(Writer {
122 76632322 : blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
123 76632322 : off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
124 76632322 : ephemeral_file,
125 76632322 : })
126 76632322 : }
127 76632167 : #[inline(always)]
128 153264334 : async fn push_bytes(
129 153264334 : &mut self,
130 153264334 : src: &[u8],
131 153264334 : ctx: &RequestContext,
132 153264334 : ) -> Result<(), io::Error> {
133 153264334 : let mut src_remaining = src;
134 310168063 : while !src_remaining.is_empty() {
135 156903730 : let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
136 156903730 : let n = min(dst_remaining.len(), src_remaining.len());
137 156903730 : dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
138 156903730 : self.off += n;
139 156903730 : src_remaining = &src_remaining[n..];
140 156903730 : if self.off == PAGE_SZ {
141 76632167 : match self
142 3660153 : .ephemeral_file
143 3660153 : .file
144 3660153 : .write_all_at(
145 3660153 : &self.ephemeral_file.mutable_tail,
146 3660153 : self.blknum as u64 * PAGE_SZ as u64,
147 3660153 : )
148 76632167 : .await
149 76632167 : {
150 76632167 : Ok(_) => {
151 76632167 : // Pre-warm the page cache with what we just wrote.
152 76632167 : // This isn't necessary for coherency/correctness, but it's how we've always done it.
153 76632167 : let cache = page_cache::get();
154 3660152 : match cache
155 3660152 : .read_immutable_buf(
156 3660152 : self.ephemeral_file.page_cache_file_id,
157 3660152 : self.blknum,
158 3660152 : ctx,
159 3660152 : )
160 76632167 : .await
161 76632167 : {
162 76632167 : Ok(page_cache::ReadBufResult::Found(_guard)) => {
163 UBC 0 : // This function takes &mut self, so, it shouldn't be possible to reach this point.
164 0 : unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
165 CBC 76632167 : }
166 76632167 : Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
167 3660152 : let buf: &mut [u8] = write_guard.deref_mut();
168 76632167 : debug_assert_eq!(buf.len(), PAGE_SZ);
169 76632167 : buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
170 3660152 : let _ = write_guard.mark_valid();
171 76632167 : // pre-warm successful
172 76632167 : }
173 76632167 : Err(e) => {
174 76632167 : error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
175 76632167 : // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
176 76632167 : }
177 76632167 : }
178 76632167 : // Zero the buffer for re-use.
179 76632167 : // Zeroing is critical for correcntess because the write_blob code below
180 76632167 : // and similarly read_blk expect zeroed pages.
181 76632167 : self.ephemeral_file.mutable_tail.fill(0);
182 3660152 : // This block is done, move to next one.
183 3660152 : self.blknum += 1;
184 3660152 : self.off = 0;
185 76632167 : }
186 76632167 : Err(e) => {
187 UBC 0 : return Err(std::io::Error::new(
188 0 : ErrorKind::Other,
189 0 : // order error before path because path is long and error is short
190 0 : format!(
191 0 : "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
192 0 : self.blknum,
193 0 : e,
194 0 : self.ephemeral_file.file.path,
195 0 : ),
196 0 : ));
197 CBC 76632167 : }
198 76632167 : }
199 153243577 : }
200 76632167 : }
201 153264333 : Ok(())
202 153264333 : }
203 76632167 : }
204 76632167 :
205 76632167 : let pos = self.len;
206 76632167 : let mut writer = Writer::new(self)?;
207 :
208 : // Write the length field
209 76632167 : if srcbuf.len() < 0x80 {
210 : // short one-byte length header
211 58942613 : let len_buf = [srcbuf.len() as u8];
212 58942613 : writer.push_bytes(&len_buf, ctx).await?;
213 : } else {
214 17689554 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
215 17689554 : len_buf[0] |= 0x80;
216 17689554 : writer.push_bytes(&len_buf, ctx).await?;
217 : }
218 :
219 : // Write the payload
220 76632167 : writer.push_bytes(srcbuf, ctx).await?;
221 :
222 76632166 : if srcbuf.len() < 0x80 {
223 58942613 : self.len += 1;
224 58942613 : } else {
225 17689553 : self.len += 4;
226 17689553 : }
227 76632166 : self.len += srcbuf.len() as u64;
228 76632166 :
229 76632166 : Ok(pos)
230 76632166 : }
231 : }
232 :
233 : /// Does the given filename look like an ephemeral file?
234 : pub fn is_ephemeral_file(filename: &str) -> bool {
235 85 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
236 77 : rest.parse::<u32>().is_ok()
237 : } else {
238 8 : false
239 : }
240 85 : }
241 :
242 : impl Drop for EphemeralFile {
243 5387 : fn drop(&mut self) {
244 5387 : // There might still be pages in the [`crate::page_cache`] for this file.
245 5387 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
246 5387 :
247 5387 : // unlink the file
248 5387 : let res = std::fs::remove_file(&self.file.path);
249 5387 : if let Err(e) = res {
250 41 : if e.kind() != std::io::ErrorKind::NotFound {
251 : // just never log the not found errors, we cannot do anything for them; on detach
252 : // the tenant directory is already gone.
253 : //
254 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
255 UBC 0 : error!(
256 0 : "could not remove ephemeral file '{}': {}",
257 0 : self.file.path, e
258 0 : );
259 CBC 41 : }
260 5346 : }
261 5387 : }
262 : }
263 :
264 : impl BlockReader for EphemeralFile {
265 5468649 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
266 5468649 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
267 5468649 : }
268 : }
269 :
270 : #[cfg(test)]
271 : mod tests {
272 : use super::*;
273 : use crate::context::DownloadBehavior;
274 : use crate::task_mgr::TaskKind;
275 : use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
276 : use rand::{thread_rng, RngCore};
277 : use std::fs;
278 : use std::str::FromStr;
279 :
280 1 : fn harness(
281 1 : test_name: &str,
282 1 : ) -> Result<
283 1 : (
284 1 : &'static PageServerConf,
285 1 : TenantId,
286 1 : TimelineId,
287 1 : RequestContext,
288 1 : ),
289 1 : io::Error,
290 1 : > {
291 1 : let repo_dir = PageServerConf::test_repo_dir(test_name);
292 1 : let _ = fs::remove_dir_all(&repo_dir);
293 1 : let conf = PageServerConf::dummy_conf(repo_dir);
294 1 : // Make a static copy of the config. This can never be free'd, but that's
295 1 : // OK in a test.
296 1 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
297 1 :
298 1 : let tenant_id = TenantId::from_str("11000000000000000000000000000000").unwrap();
299 1 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
300 1 : fs::create_dir_all(conf.timeline_path(&tenant_id, &timeline_id))?;
301 :
302 1 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
303 1 :
304 1 : Ok((conf, tenant_id, timeline_id, ctx))
305 1 : }
306 :
307 1 : #[tokio::test]
308 1 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
309 1 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
310 :
311 1 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
312 :
313 1 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
314 1 : assert_eq!(
315 1 : b"foo",
316 1 : file.block_cursor()
317 1 : .read_blob(pos_foo, &ctx)
318 UBC 0 : .await?
319 CBC 1 : .as_slice()
320 : );
321 1 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
322 1 : assert_eq!(
323 1 : b"foo",
324 1 : file.block_cursor()
325 1 : .read_blob(pos_foo, &ctx)
326 UBC 0 : .await?
327 CBC 1 : .as_slice()
328 : );
329 1 : assert_eq!(
330 1 : b"bar",
331 1 : file.block_cursor()
332 1 : .read_blob(pos_bar, &ctx)
333 UBC 0 : .await?
334 CBC 1 : .as_slice()
335 : );
336 :
337 1 : let mut blobs = Vec::new();
338 10001 : for i in 0..10000 {
339 10000 : let data = Vec::from(format!("blob{}", i).as_bytes());
340 10000 : let pos = file.write_blob(&data, &ctx).await?;
341 10000 : blobs.push((pos, data));
342 : }
343 : // also test with a large blobs
344 101 : for i in 0..100 {
345 100 : let data = format!("blob{}", i).as_bytes().repeat(100);
346 100 : let pos = file.write_blob(&data, &ctx).await?;
347 100 : blobs.push((pos, data));
348 : }
349 :
350 1 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
351 10101 : for (pos, expected) in blobs {
352 10100 : let actual = cursor.read_blob(pos, &ctx).await?;
353 10100 : assert_eq!(actual, expected);
354 : }
355 :
356 : // Test a large blob that spans multiple pages
357 1 : let mut large_data = vec![0; 20000];
358 1 : thread_rng().fill_bytes(&mut large_data);
359 1 : let pos_large = file.write_blob(&large_data, &ctx).await?;
360 1 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
361 1 : assert_eq!(result, large_data);
362 :
363 1 : Ok(())
364 : }
365 : }
|