Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache::{self, PAGE_SZ};
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::{self, VirtualFile};
9 : use camino::Utf8PathBuf;
10 : use pageserver_api::shard::TenantShardId;
11 : use std::cmp::min;
12 :
13 : use std::io::{self, ErrorKind};
14 : use std::ops::DerefMut;
15 : use std::sync::atomic::AtomicU64;
16 : use tracing::*;
17 : use utils::id::TimelineId;
18 :
19 : pub struct EphemeralFile {
20 : page_cache_file_id: page_cache::FileId,
21 :
22 : _tenant_shard_id: TenantShardId,
23 : _timeline_id: TimelineId,
24 : file: VirtualFile,
25 : len: u64,
26 : /// An ephemeral file is append-only.
27 : /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
28 : /// The other pages, which can no longer be modified, are accessed through the page cache.
29 : mutable_tail: [u8; PAGE_SZ],
30 : }
31 :
32 : impl EphemeralFile {
33 5715 : pub async fn create(
34 5715 : conf: &PageServerConf,
35 5715 : tenant_shard_id: TenantShardId,
36 5715 : timeline_id: TimelineId,
37 5715 : ) -> Result<EphemeralFile, io::Error> {
38 5715 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
39 5715 : let filename_disambiguator =
40 5715 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
41 5715 :
42 5715 : let filename = conf
43 5715 : .timeline_path(&tenant_shard_id, &timeline_id)
44 5715 : .join(Utf8PathBuf::from(format!(
45 5715 : "ephemeral-{filename_disambiguator}"
46 5715 : )));
47 :
48 5715 : let file = VirtualFile::open_with_options(
49 5715 : &filename,
50 5715 : virtual_file::OpenOptions::new()
51 5715 : .read(true)
52 5715 : .write(true)
53 5715 : .create(true),
54 5715 : )
55 271 : .await?;
56 :
57 5715 : Ok(EphemeralFile {
58 5715 : page_cache_file_id: page_cache::next_file_id(),
59 5715 : _tenant_shard_id: tenant_shard_id,
60 5715 : _timeline_id: timeline_id,
61 5715 : file,
62 5715 : len: 0,
63 5715 : mutable_tail: [0u8; PAGE_SZ],
64 5715 : })
65 5715 : }
66 :
67 1400622 : pub(crate) fn len(&self) -> u64 {
68 1400622 : self.len
69 1400622 : }
70 :
71 74145201 : pub(crate) async fn read_blk(
72 74145201 : &self,
73 74145201 : blknum: u32,
74 74145201 : ctx: &RequestContext,
75 74145201 : ) -> Result<BlockLease, io::Error> {
76 74145199 : let flushed_blknums = 0..self.len / PAGE_SZ as u64;
77 74145199 : if flushed_blknums.contains(&(blknum as u64)) {
78 73271138 : let cache = page_cache::get();
79 73271138 : match cache
80 73271138 : .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
81 943218 : .await
82 73271137 : .map_err(|e| {
83 0 : std::io::Error::new(
84 0 : std::io::ErrorKind::Other,
85 0 : // order path before error because error is anyhow::Error => might have many contexts
86 0 : format!(
87 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
88 0 : blknum, self.file.path, e,
89 0 : ),
90 0 : )
91 73271137 : })? {
92 71740078 : page_cache::ReadBufResult::Found(guard) => {
93 71740078 : return Ok(BlockLease::PageReadGuard(guard))
94 : }
95 1531059 : page_cache::ReadBufResult::NotFound(write_guard) => {
96 1531059 : let write_guard = self
97 1531059 : .file
98 1531059 : .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
99 31604 : .await?;
100 1531059 : let read_guard = write_guard.mark_valid();
101 1531059 : return Ok(BlockLease::PageReadGuard(read_guard));
102 : }
103 : };
104 : } else {
105 874061 : debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
106 874061 : Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
107 : }
108 74145198 : }
109 :
110 65328307 : pub(crate) async fn write_blob(
111 65328307 : &mut self,
112 65328307 : srcbuf: &[u8],
113 65328307 : ctx: &RequestContext,
114 65328307 : ) -> Result<u64, io::Error> {
115 65328307 : struct Writer<'a> {
116 65328307 : ephemeral_file: &'a mut EphemeralFile,
117 65328307 : /// The block to which the next [`push_bytes`] will write.
118 65328307 : blknum: u32,
119 65328307 : /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
120 65328307 : off: usize,
121 65328307 : }
122 65328307 : impl<'a> Writer<'a> {
123 65328307 : fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
124 65328307 : Ok(Writer {
125 65328307 : blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
126 65328307 : off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
127 65328307 : ephemeral_file,
128 65328307 : })
129 65328307 : }
130 65328307 : #[inline(always)]
131 130656614 : async fn push_bytes(
132 130656614 : &mut self,
133 130656614 : src: &[u8],
134 130656614 : ctx: &RequestContext,
135 130656614 : ) -> Result<(), io::Error> {
136 130656614 : let mut src_remaining = src;
137 265015021 : while !src_remaining.is_empty() {
138 134358408 : let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
139 134358408 : let n = min(dst_remaining.len(), src_remaining.len());
140 134358408 : dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
141 134358408 : self.off += n;
142 134358408 : src_remaining = &src_remaining[n..];
143 134358408 : if self.off == PAGE_SZ {
144 65328307 : match self
145 3717705 : .ephemeral_file
146 3717705 : .file
147 3717705 : .write_all_at(
148 3717705 : &self.ephemeral_file.mutable_tail,
149 3717705 : self.blknum as u64 * PAGE_SZ as u64,
150 3717705 : )
151 65328307 : .await
152 65328307 : {
153 65328307 : Ok(_) => {
154 65328307 : // Pre-warm the page cache with what we just wrote.
155 65328307 : // This isn't necessary for coherency/correctness, but it's how we've always done it.
156 65328307 : let cache = page_cache::get();
157 3717704 : match cache
158 3717704 : .read_immutable_buf(
159 3717704 : self.ephemeral_file.page_cache_file_id,
160 3717704 : self.blknum,
161 3717704 : ctx,
162 3717704 : )
163 65328307 : .await
164 65328307 : {
165 65328307 : Ok(page_cache::ReadBufResult::Found(_guard)) => {
166 0 : // This function takes &mut self, so, it shouldn't be possible to reach this point.
167 0 : unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
168 65328307 : }
169 65328307 : Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
170 3717704 : let buf: &mut [u8] = write_guard.deref_mut();
171 65328307 : debug_assert_eq!(buf.len(), PAGE_SZ);
172 65328307 : buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
173 3717704 : let _ = write_guard.mark_valid();
174 65328307 : // pre-warm successful
175 65328307 : }
176 65328307 : Err(e) => {
177 65328307 : error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
178 65328307 : // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
179 65328307 : }
180 65328307 : }
181 65328307 : // Zero the buffer for re-use.
182 65328307 : // Zeroing is critical for correcntess because the write_blob code below
183 65328307 : // and similarly read_blk expect zeroed pages.
184 65328307 : self.ephemeral_file.mutable_tail.fill(0);
185 3717704 : // This block is done, move to next one.
186 3717704 : self.blknum += 1;
187 3717704 : self.off = 0;
188 65328307 : }
189 65328307 : Err(e) => {
190 0 : return Err(std::io::Error::new(
191 0 : ErrorKind::Other,
192 0 : // order error before path because path is long and error is short
193 0 : format!(
194 0 : "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
195 0 : self.blknum,
196 0 : e,
197 0 : self.ephemeral_file.file.path,
198 0 : ),
199 0 : ));
200 65328307 : }
201 65328307 : }
202 130640703 : }
203 65328307 : }
204 130656613 : Ok(())
205 130656613 : }
206 65328307 : }
207 65328307 :
208 65328307 : let pos = self.len;
209 65328307 : let mut writer = Writer::new(self)?;
210 :
211 : // Write the length field
212 65328307 : if srcbuf.len() < 0x80 {
213 : // short one-byte length header
214 52117855 : let len_buf = [srcbuf.len() as u8];
215 52117855 : writer.push_bytes(&len_buf, ctx).await?;
216 : } else {
217 13210452 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
218 13210452 : len_buf[0] |= 0x80;
219 13210452 : writer.push_bytes(&len_buf, ctx).await?;
220 : }
221 :
222 : // Write the payload
223 65328307 : writer.push_bytes(srcbuf, ctx).await?;
224 :
225 65328306 : if srcbuf.len() < 0x80 {
226 52117854 : self.len += 1;
227 52117854 : } else {
228 13210452 : self.len += 4;
229 13210452 : }
230 65328306 : self.len += srcbuf.len() as u64;
231 65328306 :
232 65328306 : Ok(pos)
233 65328306 : }
234 : }
235 :
236 : /// Does the given filename look like an ephemeral file?
237 48 : pub fn is_ephemeral_file(filename: &str) -> bool {
238 48 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
239 47 : rest.parse::<u32>().is_ok()
240 : } else {
241 1 : false
242 : }
243 48 : }
244 :
245 : impl Drop for EphemeralFile {
246 5358 : fn drop(&mut self) {
247 5358 : // There might still be pages in the [`crate::page_cache`] for this file.
248 5358 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
249 5358 :
250 5358 : // unlink the file
251 5358 : let res = std::fs::remove_file(&self.file.path);
252 5358 : if let Err(e) = res {
253 61 : if e.kind() != std::io::ErrorKind::NotFound {
254 : // just never log the not found errors, we cannot do anything for them; on detach
255 : // the tenant directory is already gone.
256 : //
257 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
258 0 : error!(
259 0 : "could not remove ephemeral file '{}': {}",
260 0 : self.file.path, e
261 0 : );
262 61 : }
263 5297 : }
264 5358 : }
265 : }
266 :
267 : impl BlockReader for EphemeralFile {
268 6967533 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
269 6967533 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
270 6967533 : }
271 : }
272 :
273 : #[cfg(test)]
274 : mod tests {
275 : use super::*;
276 : use crate::context::DownloadBehavior;
277 : use crate::task_mgr::TaskKind;
278 : use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
279 : use rand::{thread_rng, RngCore};
280 : use std::fs;
281 : use std::str::FromStr;
282 :
283 2 : fn harness(
284 2 : test_name: &str,
285 2 : ) -> Result<
286 2 : (
287 2 : &'static PageServerConf,
288 2 : TenantShardId,
289 2 : TimelineId,
290 2 : RequestContext,
291 2 : ),
292 2 : io::Error,
293 2 : > {
294 2 : let repo_dir = PageServerConf::test_repo_dir(test_name);
295 2 : let _ = fs::remove_dir_all(&repo_dir);
296 2 : let conf = PageServerConf::dummy_conf(repo_dir);
297 2 : // Make a static copy of the config. This can never be free'd, but that's
298 2 : // OK in a test.
299 2 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
300 2 :
301 2 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
302 2 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
303 2 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
304 :
305 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
306 2 :
307 2 : Ok((conf, tenant_shard_id, timeline_id, ctx))
308 2 : }
309 :
310 2 : #[tokio::test]
311 2 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
312 2 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
313 :
314 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
315 :
316 2 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
317 2 : assert_eq!(
318 2 : b"foo",
319 2 : file.block_cursor()
320 2 : .read_blob(pos_foo, &ctx)
321 0 : .await?
322 2 : .as_slice()
323 : );
324 2 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
325 2 : assert_eq!(
326 2 : b"foo",
327 2 : file.block_cursor()
328 2 : .read_blob(pos_foo, &ctx)
329 0 : .await?
330 2 : .as_slice()
331 : );
332 2 : assert_eq!(
333 2 : b"bar",
334 2 : file.block_cursor()
335 2 : .read_blob(pos_bar, &ctx)
336 0 : .await?
337 2 : .as_slice()
338 : );
339 :
340 2 : let mut blobs = Vec::new();
341 20002 : for i in 0..10000 {
342 20000 : let data = Vec::from(format!("blob{}", i).as_bytes());
343 20000 : let pos = file.write_blob(&data, &ctx).await?;
344 20000 : blobs.push((pos, data));
345 : }
346 : // also test with a large blobs
347 202 : for i in 0..100 {
348 200 : let data = format!("blob{}", i).as_bytes().repeat(100);
349 200 : let pos = file.write_blob(&data, &ctx).await?;
350 200 : blobs.push((pos, data));
351 : }
352 :
353 2 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
354 20202 : for (pos, expected) in blobs {
355 20200 : let actual = cursor.read_blob(pos, &ctx).await?;
356 20200 : assert_eq!(actual, expected);
357 : }
358 :
359 : // Test a large blob that spans multiple pages
360 2 : let mut large_data = vec![0; 20000];
361 2 : thread_rng().fill_bytes(&mut large_data);
362 2 : let pos_large = file.write_blob(&large_data, &ctx).await?;
363 2 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
364 2 : assert_eq!(result, large_data);
365 :
366 2 : Ok(())
367 : }
368 : }
|