Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache::{self, PAGE_SZ};
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::{self, VirtualFile};
9 : use camino::Utf8PathBuf;
10 : use pageserver_api::shard::TenantShardId;
11 : use std::cmp::min;
12 :
13 : use std::io::{self, ErrorKind};
14 : use std::ops::DerefMut;
15 : use std::sync::atomic::AtomicU64;
16 : use tracing::*;
17 : use utils::id::TimelineId;
18 :
19 : pub struct EphemeralFile {
20 : page_cache_file_id: page_cache::FileId,
21 :
22 : _tenant_shard_id: TenantShardId,
23 : _timeline_id: TimelineId,
24 : file: VirtualFile,
25 : len: u64,
26 : /// An ephemeral file is append-only.
27 : /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
28 : /// The other pages, which can no longer be modified, are accessed through the page cache.
29 : mutable_tail: [u8; PAGE_SZ],
30 : }
31 :
32 : impl EphemeralFile {
33 5698 : pub async fn create(
34 5698 : conf: &PageServerConf,
35 5698 : tenant_shard_id: TenantShardId,
36 5698 : timeline_id: TimelineId,
37 5698 : ) -> Result<EphemeralFile, io::Error> {
38 5698 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
39 5698 : let filename_disambiguator =
40 5698 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
41 5698 :
42 5698 : let filename = conf
43 5698 : .timeline_path(&tenant_shard_id, &timeline_id)
44 5698 : .join(Utf8PathBuf::from(format!(
45 5698 : "ephemeral-{filename_disambiguator}"
46 5698 : )));
47 :
48 5698 : let file = VirtualFile::open_with_options(
49 5698 : &filename,
50 5698 : virtual_file::OpenOptions::new()
51 5698 : .read(true)
52 5698 : .write(true)
53 5698 : .create(true),
54 5698 : )
55 271 : .await?;
56 :
57 5698 : Ok(EphemeralFile {
58 5698 : page_cache_file_id: page_cache::next_file_id(),
59 5698 : _tenant_shard_id: tenant_shard_id,
60 5698 : _timeline_id: timeline_id,
61 5698 : file,
62 5698 : len: 0,
63 5698 : mutable_tail: [0u8; PAGE_SZ],
64 5698 : })
65 5698 : }
66 :
67 1396420 : pub(crate) fn len(&self) -> u64 {
68 1396420 : self.len
69 1396420 : }
70 :
71 74566564 : pub(crate) async fn read_blk(
72 74566564 : &self,
73 74566564 : blknum: u32,
74 74566564 : ctx: &RequestContext,
75 74566565 : ) -> Result<BlockLease, io::Error> {
76 74566565 : let flushed_blknums = 0..self.len / PAGE_SZ as u64;
77 74566565 : if flushed_blknums.contains(&(blknum as u64)) {
78 73723080 : let cache = page_cache::get();
79 73723080 : match cache
80 73723080 : .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
81 946859 : .await
82 73723079 : .map_err(|e| {
83 0 : std::io::Error::new(
84 0 : std::io::ErrorKind::Other,
85 0 : // order path before error because error is anyhow::Error => might have many contexts
86 0 : format!(
87 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
88 0 : blknum, self.file.path, e,
89 0 : ),
90 0 : )
91 73723079 : })? {
92 72183475 : page_cache::ReadBufResult::Found(guard) => {
93 72183475 : return Ok(BlockLease::PageReadGuard(guard))
94 : }
95 1539604 : page_cache::ReadBufResult::NotFound(write_guard) => {
96 1539604 : let write_guard = self
97 1539604 : .file
98 1539604 : .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
99 31615 : .await?;
100 1539604 : let read_guard = write_guard.mark_valid();
101 1539604 : return Ok(BlockLease::PageReadGuard(read_guard));
102 : }
103 : };
104 : } else {
105 843485 : debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
106 843485 : Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
107 : }
108 74566564 : }
109 :
110 65049495 : pub(crate) async fn write_blob(
111 65049495 : &mut self,
112 65049495 : srcbuf: &[u8],
113 65049495 : ctx: &RequestContext,
114 65049495 : ) -> Result<u64, io::Error> {
115 65049495 : struct Writer<'a> {
116 65049495 : ephemeral_file: &'a mut EphemeralFile,
117 65049495 : /// The block to which the next [`push_bytes`] will write.
118 65049495 : blknum: u32,
119 65049495 : /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
120 65049495 : off: usize,
121 65049495 : }
122 65049495 : impl<'a> Writer<'a> {
123 65049495 : fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
124 65049495 : Ok(Writer {
125 65049495 : blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
126 65049495 : off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
127 65049495 : ephemeral_file,
128 65049495 : })
129 65049495 : }
130 65049495 : #[inline(always)]
131 130098990 : async fn push_bytes(
132 130098990 : &mut self,
133 130098990 : src: &[u8],
134 130098990 : ctx: &RequestContext,
135 130098990 : ) -> Result<(), io::Error> {
136 130098990 : let mut src_remaining = src;
137 263871004 : while !src_remaining.is_empty() {
138 133772014 : let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
139 133772014 : let n = min(dst_remaining.len(), src_remaining.len());
140 133772014 : dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
141 133772014 : self.off += n;
142 133772014 : src_remaining = &src_remaining[n..];
143 133772014 : if self.off == PAGE_SZ {
144 65049495 : match self
145 3688650 : .ephemeral_file
146 3688650 : .file
147 3688650 : .write_all_at(
148 3688650 : &self.ephemeral_file.mutable_tail,
149 3688650 : self.blknum as u64 * PAGE_SZ as u64,
150 3688650 : )
151 65049495 : .await
152 65049495 : {
153 65049495 : Ok(_) => {
154 65049495 : // Pre-warm the page cache with what we just wrote.
155 65049495 : // This isn't necessary for coherency/correctness, but it's how we've always done it.
156 65049495 : let cache = page_cache::get();
157 3688650 : match cache
158 3688650 : .read_immutable_buf(
159 3688650 : self.ephemeral_file.page_cache_file_id,
160 3688650 : self.blknum,
161 3688650 : ctx,
162 3688650 : )
163 65049495 : .await
164 65049495 : {
165 65049495 : Ok(page_cache::ReadBufResult::Found(_guard)) => {
166 0 : // This function takes &mut self, so, it shouldn't be possible to reach this point.
167 0 : unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
168 65049495 : }
169 65049495 : Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
170 3688650 : let buf: &mut [u8] = write_guard.deref_mut();
171 65049495 : debug_assert_eq!(buf.len(), PAGE_SZ);
172 65049495 : buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
173 3688650 : let _ = write_guard.mark_valid();
174 65049495 : // pre-warm successful
175 65049495 : }
176 65049495 : Err(e) => {
177 65049495 : error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
178 65049495 : // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
179 65049495 : }
180 65049495 : }
181 65049495 : // Zero the buffer for re-use.
182 65049495 : // Zeroing is critical for correcntess because the write_blob code below
183 65049495 : // and similarly read_blk expect zeroed pages.
184 65049495 : self.ephemeral_file.mutable_tail.fill(0);
185 3688650 : // This block is done, move to next one.
186 3688650 : self.blknum += 1;
187 3688650 : self.off = 0;
188 65049495 : }
189 65049495 : Err(e) => {
190 0 : return Err(std::io::Error::new(
191 0 : ErrorKind::Other,
192 0 : // order error before path because path is long and error is short
193 0 : format!(
194 0 : "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
195 0 : self.blknum,
196 0 : e,
197 0 : self.ephemeral_file.file.path,
198 0 : ),
199 0 : ));
200 65049495 : }
201 65049495 : }
202 130083364 : }
203 65049495 : }
204 130098990 : Ok(())
205 130098990 : }
206 65049495 : }
207 65049495 :
208 65049495 : let pos = self.len;
209 65049495 : let mut writer = Writer::new(self)?;
210 :
211 : // Write the length field
212 65049495 : if srcbuf.len() < 0x80 {
213 : // short one-byte length header
214 51888205 : let len_buf = [srcbuf.len() as u8];
215 51888205 : writer.push_bytes(&len_buf, ctx).await?;
216 : } else {
217 13161290 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
218 13161290 : len_buf[0] |= 0x80;
219 13161290 : writer.push_bytes(&len_buf, ctx).await?;
220 : }
221 :
222 : // Write the payload
223 65049495 : writer.push_bytes(srcbuf, ctx).await?;
224 :
225 65049495 : if srcbuf.len() < 0x80 {
226 51888205 : self.len += 1;
227 51888205 : } else {
228 13161290 : self.len += 4;
229 13161290 : }
230 65049495 : self.len += srcbuf.len() as u64;
231 65049495 :
232 65049495 : Ok(pos)
233 65049495 : }
234 : }
235 :
236 : /// Does the given filename look like an ephemeral file?
237 52 : pub fn is_ephemeral_file(filename: &str) -> bool {
238 52 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
239 51 : rest.parse::<u32>().is_ok()
240 : } else {
241 1 : false
242 : }
243 52 : }
244 :
245 : impl Drop for EphemeralFile {
246 5328 : fn drop(&mut self) {
247 5328 : // There might still be pages in the [`crate::page_cache`] for this file.
248 5328 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
249 5328 :
250 5328 : // unlink the file
251 5328 : let res = std::fs::remove_file(&self.file.path);
252 5328 : if let Err(e) = res {
253 57 : if e.kind() != std::io::ErrorKind::NotFound {
254 : // just never log the not found errors, we cannot do anything for them; on detach
255 : // the tenant directory is already gone.
256 : //
257 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
258 0 : error!(
259 0 : "could not remove ephemeral file '{}': {}",
260 0 : self.file.path, e
261 0 : );
262 57 : }
263 5271 : }
264 5328 : }
265 : }
266 :
267 : impl BlockReader for EphemeralFile {
268 6017837 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
269 6017837 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
270 6017837 : }
271 : }
272 :
273 : #[cfg(test)]
274 : mod tests {
275 : use super::*;
276 : use crate::context::DownloadBehavior;
277 : use crate::task_mgr::TaskKind;
278 : use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
279 : use rand::{thread_rng, RngCore};
280 : use std::fs;
281 : use std::str::FromStr;
282 :
283 2 : fn harness(
284 2 : test_name: &str,
285 2 : ) -> Result<
286 2 : (
287 2 : &'static PageServerConf,
288 2 : TenantShardId,
289 2 : TimelineId,
290 2 : RequestContext,
291 2 : ),
292 2 : io::Error,
293 2 : > {
294 2 : let repo_dir = PageServerConf::test_repo_dir(test_name);
295 2 : let _ = fs::remove_dir_all(&repo_dir);
296 2 : let conf = PageServerConf::dummy_conf(repo_dir);
297 2 : // Make a static copy of the config. This can never be free'd, but that's
298 2 : // OK in a test.
299 2 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
300 2 :
301 2 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
302 2 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
303 2 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
304 :
305 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
306 2 :
307 2 : Ok((conf, tenant_shard_id, timeline_id, ctx))
308 2 : }
309 :
310 2 : #[tokio::test]
311 2 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
312 2 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
313 2 :
314 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
315 2 :
316 2 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
317 2 : assert_eq!(
318 2 : b"foo",
319 2 : file.block_cursor()
320 2 : .read_blob(pos_foo, &ctx)
321 2 : .await?
322 2 : .as_slice()
323 2 : );
324 2 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
325 2 : assert_eq!(
326 2 : b"foo",
327 2 : file.block_cursor()
328 2 : .read_blob(pos_foo, &ctx)
329 2 : .await?
330 2 : .as_slice()
331 2 : );
332 2 : assert_eq!(
333 2 : b"bar",
334 2 : file.block_cursor()
335 2 : .read_blob(pos_bar, &ctx)
336 2 : .await?
337 2 : .as_slice()
338 2 : );
339 2 :
340 2 : let mut blobs = Vec::new();
341 20002 : for i in 0..10000 {
342 20000 : let data = Vec::from(format!("blob{}", i).as_bytes());
343 20000 : let pos = file.write_blob(&data, &ctx).await?;
344 20000 : blobs.push((pos, data));
345 2 : }
346 2 : // also test with a large blobs
347 202 : for i in 0..100 {
348 200 : let data = format!("blob{}", i).as_bytes().repeat(100);
349 200 : let pos = file.write_blob(&data, &ctx).await?;
350 200 : blobs.push((pos, data));
351 2 : }
352 2 :
353 2 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
354 20202 : for (pos, expected) in blobs {
355 20200 : let actual = cursor.read_blob(pos, &ctx).await?;
356 20200 : assert_eq!(actual, expected);
357 2 : }
358 2 :
359 2 : // Test a large blob that spans multiple pages
360 2 : let mut large_data = vec![0; 20000];
361 2 : thread_rng().fill_bytes(&mut large_data);
362 2 : let pos_large = file.write_blob(&large_data, &ctx).await?;
363 2 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
364 2 : assert_eq!(result, large_data);
365 2 :
366 2 : Ok(())
367 2 : }
368 : }
|