Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache::{self, PAGE_SZ};
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::{self, VirtualFile};
9 : use bytes::BytesMut;
10 : use camino::Utf8PathBuf;
11 : use pageserver_api::shard::TenantShardId;
12 : use std::cmp::min;
13 :
14 : use std::io::{self, ErrorKind};
15 : use std::ops::DerefMut;
16 : use std::sync::atomic::AtomicU64;
17 : use tracing::*;
18 : use utils::id::TimelineId;
19 :
20 : pub struct EphemeralFile {
21 : page_cache_file_id: page_cache::FileId,
22 :
23 : _tenant_shard_id: TenantShardId,
24 : _timeline_id: TimelineId,
25 : file: VirtualFile,
26 : len: u64,
27 : /// An ephemeral file is append-only.
28 : /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
29 : /// The other pages, which can no longer be modified, are accessed through the page cache.
30 : ///
31 : /// None <=> IO is ongoing.
32 : /// Size is fixed to PAGE_SZ at creation time and must not be changed.
33 : mutable_tail: Option<BytesMut>,
34 : }
35 :
36 : impl EphemeralFile {
37 646 : pub async fn create(
38 646 : conf: &PageServerConf,
39 646 : tenant_shard_id: TenantShardId,
40 646 : timeline_id: TimelineId,
41 646 : ) -> Result<EphemeralFile, io::Error> {
42 646 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
43 646 : let filename_disambiguator =
44 646 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
45 646 :
46 646 : let filename = conf
47 646 : .timeline_path(&tenant_shard_id, &timeline_id)
48 646 : .join(Utf8PathBuf::from(format!(
49 646 : "ephemeral-{filename_disambiguator}"
50 646 : )));
51 :
52 646 : let file = VirtualFile::open_with_options(
53 646 : &filename,
54 646 : virtual_file::OpenOptions::new()
55 646 : .read(true)
56 646 : .write(true)
57 646 : .create(true),
58 646 : )
59 364 : .await?;
60 :
61 646 : Ok(EphemeralFile {
62 646 : page_cache_file_id: page_cache::next_file_id(),
63 646 : _tenant_shard_id: tenant_shard_id,
64 646 : _timeline_id: timeline_id,
65 646 : file,
66 646 : len: 0,
67 646 : mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
68 646 : })
69 646 : }
70 :
71 0 : pub(crate) fn len(&self) -> u64 {
72 0 : self.len
73 0 : }
74 :
75 2690103 : pub(crate) async fn read_blk(
76 2690103 : &self,
77 2690103 : blknum: u32,
78 2690103 : ctx: &RequestContext,
79 2690103 : ) -> Result<BlockLease, io::Error> {
80 2690103 : let flushed_blknums = 0..self.len / PAGE_SZ as u64;
81 2690103 : if flushed_blknums.contains(&(blknum as u64)) {
82 2377243 : let cache = page_cache::get();
83 2377243 : match cache
84 2377243 : .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
85 30076 : .await
86 2377243 : .map_err(|e| {
87 0 : std::io::Error::new(
88 0 : std::io::ErrorKind::Other,
89 0 : // order path before error because error is anyhow::Error => might have many contexts
90 0 : format!(
91 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
92 0 : blknum, self.file.path, e,
93 0 : ),
94 0 : )
95 2377243 : })? {
96 2339912 : page_cache::ReadBufResult::Found(guard) => {
97 2339912 : return Ok(BlockLease::PageReadGuard(guard))
98 : }
99 37331 : page_cache::ReadBufResult::NotFound(write_guard) => {
100 37331 : let write_guard = self
101 37331 : .file
102 37331 : .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
103 19123 : .await?;
104 37331 : let read_guard = write_guard.mark_valid();
105 37331 : return Ok(BlockLease::PageReadGuard(read_guard));
106 : }
107 : };
108 : } else {
109 312860 : debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
110 312860 : Ok(BlockLease::EphemeralFileMutableTail(
111 312860 : self.mutable_tail
112 312860 : .as_deref()
113 312860 : .expect("we're not doing IO, it must be Some()")
114 312860 : .try_into()
115 312860 : .expect("we ensure that it's always PAGE_SZ"),
116 312860 : ))
117 : }
118 2690103 : }
119 :
120 2934004 : pub(crate) async fn write_blob(
121 2934004 : &mut self,
122 2934004 : srcbuf: &[u8],
123 2934004 : ctx: &RequestContext,
124 2934004 : ) -> Result<u64, io::Error> {
125 2934004 : struct Writer<'a> {
126 2934004 : ephemeral_file: &'a mut EphemeralFile,
127 2934004 : /// The block to which the next [`push_bytes`] will write.
128 2934004 : blknum: u32,
129 2934004 : /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
130 2934004 : off: usize,
131 2934004 : }
132 2934004 : impl<'a> Writer<'a> {
133 2934004 : fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
134 2934004 : Ok(Writer {
135 2934004 : blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
136 2934004 : off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
137 2934004 : ephemeral_file,
138 2934004 : })
139 2934004 : }
140 2934004 : #[inline(always)]
141 5868008 : async fn push_bytes(
142 5868008 : &mut self,
143 5868008 : src: &[u8],
144 5868008 : ctx: &RequestContext,
145 5868008 : ) -> Result<(), io::Error> {
146 5868008 : let mut src_remaining = src;
147 11770731 : while !src_remaining.is_empty() {
148 5902723 : let dst_remaining = &mut self
149 5902723 : .ephemeral_file
150 5902723 : .mutable_tail
151 5902723 : .as_deref_mut()
152 5902723 : .expect("IO is not yet ongoing")[self.off..];
153 5902723 : let n = min(dst_remaining.len(), src_remaining.len());
154 5902723 : dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
155 5902723 : self.off += n;
156 5902723 : src_remaining = &src_remaining[n..];
157 5902723 : if self.off == PAGE_SZ {
158 2934004 : let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
159 35256 : .expect("IO is not yet ongoing");
160 2934004 : let (mutable_tail, res) = self
161 35256 : .ephemeral_file
162 35256 : .file
163 35256 : .write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
164 2934004 : .await;
165 2934004 : // TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
166 2934004 : // I.e., the IO isn't retryable if we panic.
167 2934004 : self.ephemeral_file.mutable_tail = Some(mutable_tail);
168 35256 : match res {
169 2934004 : Ok(_) => {
170 2934004 : // Pre-warm the page cache with what we just wrote.
171 2934004 : // This isn't necessary for coherency/correctness, but it's how we've always done it.
172 2934004 : let cache = page_cache::get();
173 35256 : match cache
174 35256 : .read_immutable_buf(
175 35256 : self.ephemeral_file.page_cache_file_id,
176 35256 : self.blknum,
177 35256 : ctx,
178 35256 : )
179 2934004 : .await
180 2934004 : {
181 2934004 : Ok(page_cache::ReadBufResult::Found(_guard)) => {
182 0 : // This function takes &mut self, so, it shouldn't be possible to reach this point.
183 0 : unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
184 2934004 : }
185 2934004 : Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
186 35256 : let buf: &mut [u8] = write_guard.deref_mut();
187 2934004 : debug_assert_eq!(buf.len(), PAGE_SZ);
188 2934004 : buf.copy_from_slice(
189 35256 : self.ephemeral_file
190 35256 : .mutable_tail
191 35256 : .as_deref()
192 35256 : .expect("IO is not ongoing"),
193 35256 : );
194 35256 : let _ = write_guard.mark_valid();
195 2934004 : // pre-warm successful
196 2934004 : }
197 2934004 : Err(e) => {
198 2934004 : error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
199 2934004 : // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
200 2934004 : }
201 2934004 : }
202 2934004 : // Zero the buffer for re-use.
203 2934004 : // Zeroing is critical for correcntess because the write_blob code below
204 2934004 : // and similarly read_blk expect zeroed pages.
205 2934004 : self.ephemeral_file
206 35256 : .mutable_tail
207 35256 : .as_deref_mut()
208 35256 : .expect("IO is not ongoing")
209 35256 : .fill(0);
210 35256 : // This block is done, move to next one.
211 35256 : self.blknum += 1;
212 35256 : self.off = 0;
213 2934004 : }
214 2934004 : Err(e) => {
215 0 : return Err(std::io::Error::new(
216 0 : ErrorKind::Other,
217 0 : // order error before path because path is long and error is short
218 0 : format!(
219 0 : "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
220 0 : self.blknum,
221 0 : e,
222 0 : self.ephemeral_file.file.path,
223 0 : ),
224 0 : ));
225 2934004 : }
226 2934004 : }
227 5867467 : }
228 2934004 : }
229 5868008 : Ok(())
230 5868008 : }
231 2934004 : }
232 2934004 :
233 2934004 : let pos = self.len;
234 2934004 : let mut writer = Writer::new(self)?;
235 :
236 : // Write the length field
237 2934004 : if srcbuf.len() < 0x80 {
238 : // short one-byte length header
239 2780188 : let len_buf = [srcbuf.len() as u8];
240 2780188 : writer.push_bytes(&len_buf, ctx).await?;
241 : } else {
242 153816 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
243 153816 : len_buf[0] |= 0x80;
244 153816 : writer.push_bytes(&len_buf, ctx).await?;
245 : }
246 :
247 : // Write the payload
248 2934004 : writer.push_bytes(srcbuf, ctx).await?;
249 :
250 2934004 : if srcbuf.len() < 0x80 {
251 2780188 : self.len += 1;
252 2780188 : } else {
253 153816 : self.len += 4;
254 153816 : }
255 2934004 : self.len += srcbuf.len() as u64;
256 2934004 :
257 2934004 : Ok(pos)
258 2934004 : }
259 : }
260 :
261 : /// Does the given filename look like an ephemeral file?
262 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
263 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
264 0 : rest.parse::<u32>().is_ok()
265 : } else {
266 0 : false
267 : }
268 0 : }
269 :
270 : impl Drop for EphemeralFile {
271 528 : fn drop(&mut self) {
272 528 : // There might still be pages in the [`crate::page_cache`] for this file.
273 528 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
274 528 :
275 528 : // unlink the file
276 528 : let res = std::fs::remove_file(&self.file.path);
277 528 : if let Err(e) = res {
278 2 : if e.kind() != std::io::ErrorKind::NotFound {
279 : // just never log the not found errors, we cannot do anything for them; on detach
280 : // the tenant directory is already gone.
281 : //
282 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
283 0 : error!(
284 0 : "could not remove ephemeral file '{}': {}",
285 0 : self.file.path, e
286 0 : );
287 2 : }
288 526 : }
289 528 : }
290 : }
291 :
292 : impl BlockReader for EphemeralFile {
293 503703 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
294 503703 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
295 503703 : }
296 : }
297 :
298 : #[cfg(test)]
299 : mod tests {
300 : use super::*;
301 : use crate::context::DownloadBehavior;
302 : use crate::task_mgr::TaskKind;
303 : use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
304 : use rand::{thread_rng, RngCore};
305 : use std::fs;
306 : use std::str::FromStr;
307 :
308 2 : fn harness(
309 2 : test_name: &str,
310 2 : ) -> Result<
311 2 : (
312 2 : &'static PageServerConf,
313 2 : TenantShardId,
314 2 : TimelineId,
315 2 : RequestContext,
316 2 : ),
317 2 : io::Error,
318 2 : > {
319 2 : let repo_dir = PageServerConf::test_repo_dir(test_name);
320 2 : let _ = fs::remove_dir_all(&repo_dir);
321 2 : let conf = PageServerConf::dummy_conf(repo_dir);
322 2 : // Make a static copy of the config. This can never be free'd, but that's
323 2 : // OK in a test.
324 2 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
325 2 :
326 2 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
327 2 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
328 2 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
329 :
330 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
331 2 :
332 2 : Ok((conf, tenant_shard_id, timeline_id, ctx))
333 2 : }
334 :
335 2 : #[tokio::test]
336 2 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
337 2 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
338 2 :
339 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
340 2 :
341 2 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
342 2 : assert_eq!(
343 2 : b"foo",
344 2 : file.block_cursor()
345 2 : .read_blob(pos_foo, &ctx)
346 2 : .await?
347 2 : .as_slice()
348 2 : );
349 2 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
350 2 : assert_eq!(
351 2 : b"foo",
352 2 : file.block_cursor()
353 2 : .read_blob(pos_foo, &ctx)
354 2 : .await?
355 2 : .as_slice()
356 2 : );
357 2 : assert_eq!(
358 2 : b"bar",
359 2 : file.block_cursor()
360 2 : .read_blob(pos_bar, &ctx)
361 2 : .await?
362 2 : .as_slice()
363 2 : );
364 2 :
365 2 : let mut blobs = Vec::new();
366 20002 : for i in 0..10000 {
367 20000 : let data = Vec::from(format!("blob{}", i).as_bytes());
368 20000 : let pos = file.write_blob(&data, &ctx).await?;
369 20000 : blobs.push((pos, data));
370 2 : }
371 2 : // also test with a large blobs
372 202 : for i in 0..100 {
373 200 : let data = format!("blob{}", i).as_bytes().repeat(100);
374 200 : let pos = file.write_blob(&data, &ctx).await?;
375 200 : blobs.push((pos, data));
376 2 : }
377 2 :
378 2 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
379 20202 : for (pos, expected) in blobs {
380 20200 : let actual = cursor.read_blob(pos, &ctx).await?;
381 20200 : assert_eq!(actual, expected);
382 2 : }
383 2 :
384 2 : // Test a large blob that spans multiple pages
385 2 : let mut large_data = vec![0; 20000];
386 2 : thread_rng().fill_bytes(&mut large_data);
387 2 : let pos_large = file.write_blob(&large_data, &ctx).await?;
388 2 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
389 2 : assert_eq!(result, large_data);
390 2 :
391 2 : Ok(())
392 2 : }
393 : }
|