Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache::{self, PAGE_SZ};
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::{self, VirtualFile};
9 : use bytes::BytesMut;
10 : use camino::Utf8PathBuf;
11 : use pageserver_api::shard::TenantShardId;
12 : use std::cmp::min;
13 :
14 : use std::io::{self, ErrorKind};
15 : use std::ops::DerefMut;
16 : use std::sync::atomic::AtomicU64;
17 : use tracing::*;
18 : use utils::id::TimelineId;
19 :
20 : pub struct EphemeralFile {
21 : page_cache_file_id: page_cache::FileId,
22 :
23 : _tenant_shard_id: TenantShardId,
24 : _timeline_id: TimelineId,
25 : file: VirtualFile,
26 : len: u64,
27 : /// An ephemeral file is append-only.
28 : /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
29 : /// The other pages, which can no longer be modified, are accessed through the page cache.
30 : ///
31 : /// None <=> IO is ongoing.
32 : /// Size is fixed to PAGE_SZ at creation time and must not be changed.
33 : mutable_tail: Option<BytesMut>,
34 : }
35 :
36 : impl EphemeralFile {
37 642 : pub async fn create(
38 642 : conf: &PageServerConf,
39 642 : tenant_shard_id: TenantShardId,
40 642 : timeline_id: TimelineId,
41 642 : ) -> Result<EphemeralFile, io::Error> {
42 642 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
43 642 : let filename_disambiguator =
44 642 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
45 642 :
46 642 : let filename = conf
47 642 : .timeline_path(&tenant_shard_id, &timeline_id)
48 642 : .join(Utf8PathBuf::from(format!(
49 642 : "ephemeral-{filename_disambiguator}"
50 642 : )));
51 :
52 642 : let file = VirtualFile::open_with_options(
53 642 : &filename,
54 642 : virtual_file::OpenOptions::new()
55 642 : .read(true)
56 642 : .write(true)
57 642 : .create(true),
58 642 : )
59 322 : .await?;
60 :
61 642 : Ok(EphemeralFile {
62 642 : page_cache_file_id: page_cache::next_file_id(),
63 642 : _tenant_shard_id: tenant_shard_id,
64 642 : _timeline_id: timeline_id,
65 642 : file,
66 642 : len: 0,
67 642 : mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
68 642 : })
69 642 : }
70 :
71 2627992 : pub(crate) fn len(&self) -> u64 {
72 2627992 : self.len
73 2627992 : }
74 :
75 2690039 : pub(crate) async fn read_blk(
76 2690039 : &self,
77 2690039 : blknum: u32,
78 2690039 : ctx: &RequestContext,
79 2690039 : ) -> Result<BlockLease, io::Error> {
80 2690039 : let flushed_blknums = 0..self.len / PAGE_SZ as u64;
81 2690039 : if flushed_blknums.contains(&(blknum as u64)) {
82 2377251 : let cache = page_cache::get();
83 2377251 : match cache
84 2377251 : .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
85 30048 : .await
86 2377251 : .map_err(|e| {
87 0 : std::io::Error::new(
88 0 : std::io::ErrorKind::Other,
89 0 : // order path before error because error is anyhow::Error => might have many contexts
90 0 : format!(
91 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
92 0 : blknum, self.file.path, e,
93 0 : ),
94 0 : )
95 2377251 : })? {
96 2339930 : page_cache::ReadBufResult::Found(guard) => {
97 2339930 : return Ok(BlockLease::PageReadGuard(guard))
98 : }
99 37321 : page_cache::ReadBufResult::NotFound(write_guard) => {
100 37321 : let write_guard = self
101 37321 : .file
102 37321 : .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
103 19084 : .await?;
104 37321 : let read_guard = write_guard.mark_valid();
105 37321 : return Ok(BlockLease::PageReadGuard(read_guard));
106 : }
107 : };
108 : } else {
109 312788 : debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
110 312788 : Ok(BlockLease::EphemeralFileMutableTail(
111 312788 : self.mutable_tail
112 312788 : .as_deref()
113 312788 : .expect("we're not doing IO, it must be Some()")
114 312788 : .try_into()
115 312788 : .expect("we ensure that it's always PAGE_SZ"),
116 312788 : ))
117 : }
118 2690039 : }
119 :
120 2933972 : pub(crate) async fn write_blob(
121 2933972 : &mut self,
122 2933972 : srcbuf: &[u8],
123 2933972 : ctx: &RequestContext,
124 2933972 : ) -> Result<u64, io::Error> {
125 2933972 : struct Writer<'a> {
126 2933972 : ephemeral_file: &'a mut EphemeralFile,
127 2933972 : /// The block to which the next [`push_bytes`] will write.
128 2933972 : blknum: u32,
129 2933972 : /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
130 2933972 : off: usize,
131 2933972 : }
132 2933972 : impl<'a> Writer<'a> {
133 2933972 : fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
134 2933972 : Ok(Writer {
135 2933972 : blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
136 2933972 : off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
137 2933972 : ephemeral_file,
138 2933972 : })
139 2933972 : }
140 2933972 : #[inline(always)]
141 5867944 : async fn push_bytes(
142 5867944 : &mut self,
143 5867944 : src: &[u8],
144 5867944 : ctx: &RequestContext,
145 5867944 : ) -> Result<(), io::Error> {
146 5867944 : let mut src_remaining = src;
147 11770597 : while !src_remaining.is_empty() {
148 5902653 : let dst_remaining = &mut self
149 5902653 : .ephemeral_file
150 5902653 : .mutable_tail
151 5902653 : .as_deref_mut()
152 5902653 : .expect("IO is not yet ongoing")[self.off..];
153 5902653 : let n = min(dst_remaining.len(), src_remaining.len());
154 5902653 : dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
155 5902653 : self.off += n;
156 5902653 : src_remaining = &src_remaining[n..];
157 5902653 : if self.off == PAGE_SZ {
158 2933972 : let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
159 35256 : .expect("IO is not yet ongoing");
160 2933972 : let (mutable_tail, res) = self
161 35256 : .ephemeral_file
162 35256 : .file
163 35256 : .write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
164 2933972 : .await;
165 2933972 : // TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
166 2933972 : // I.e., the IO isn't retryable if we panic.
167 2933972 : self.ephemeral_file.mutable_tail = Some(mutable_tail);
168 35256 : match res {
169 2933972 : Ok(_) => {
170 2933972 : // Pre-warm the page cache with what we just wrote.
171 2933972 : // This isn't necessary for coherency/correctness, but it's how we've always done it.
172 2933972 : let cache = page_cache::get();
173 35256 : match cache
174 35256 : .read_immutable_buf(
175 35256 : self.ephemeral_file.page_cache_file_id,
176 35256 : self.blknum,
177 35256 : ctx,
178 35256 : )
179 2933972 : .await
180 2933972 : {
181 2933972 : Ok(page_cache::ReadBufResult::Found(_guard)) => {
182 0 : // This function takes &mut self, so, it shouldn't be possible to reach this point.
183 0 : unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
184 2933972 : }
185 2933972 : Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
186 35256 : let buf: &mut [u8] = write_guard.deref_mut();
187 2933972 : debug_assert_eq!(buf.len(), PAGE_SZ);
188 2933972 : buf.copy_from_slice(
189 35256 : self.ephemeral_file
190 35256 : .mutable_tail
191 35256 : .as_deref()
192 35256 : .expect("IO is not ongoing"),
193 35256 : );
194 35256 : let _ = write_guard.mark_valid();
195 2933972 : // pre-warm successful
196 2933972 : }
197 2933972 : Err(e) => {
198 2933972 : error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
199 2933972 : // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
200 2933972 : }
201 2933972 : }
202 2933972 : // Zero the buffer for re-use.
203 2933972 : // Zeroing is critical for correcntess because the write_blob code below
204 2933972 : // and similarly read_blk expect zeroed pages.
205 2933972 : self.ephemeral_file
206 35256 : .mutable_tail
207 35256 : .as_deref_mut()
208 35256 : .expect("IO is not ongoing")
209 35256 : .fill(0);
210 35256 : // This block is done, move to next one.
211 35256 : self.blknum += 1;
212 35256 : self.off = 0;
213 2933972 : }
214 2933972 : Err(e) => {
215 0 : return Err(std::io::Error::new(
216 0 : ErrorKind::Other,
217 0 : // order error before path because path is long and error is short
218 0 : format!(
219 0 : "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
220 0 : self.blknum,
221 0 : e,
222 0 : self.ephemeral_file.file.path,
223 0 : ),
224 0 : ));
225 2933972 : }
226 2933972 : }
227 5867397 : }
228 2933972 : }
229 5867944 : Ok(())
230 5867944 : }
231 2933972 : }
232 2933972 :
233 2933972 : let pos = self.len;
234 2933972 : let mut writer = Writer::new(self)?;
235 :
236 : // Write the length field
237 2933972 : if srcbuf.len() < 0x80 {
238 : // short one-byte length header
239 2780156 : let len_buf = [srcbuf.len() as u8];
240 2780156 : writer.push_bytes(&len_buf, ctx).await?;
241 : } else {
242 153816 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
243 153816 : len_buf[0] |= 0x80;
244 153816 : writer.push_bytes(&len_buf, ctx).await?;
245 : }
246 :
247 : // Write the payload
248 2933972 : writer.push_bytes(srcbuf, ctx).await?;
249 :
250 2933972 : if srcbuf.len() < 0x80 {
251 2780156 : self.len += 1;
252 2780156 : } else {
253 153816 : self.len += 4;
254 153816 : }
255 2933972 : self.len += srcbuf.len() as u64;
256 2933972 :
257 2933972 : Ok(pos)
258 2933972 : }
259 : }
260 :
261 : /// Does the given filename look like an ephemeral file?
262 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
263 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
264 0 : rest.parse::<u32>().is_ok()
265 : } else {
266 0 : false
267 : }
268 0 : }
269 :
270 : impl Drop for EphemeralFile {
271 524 : fn drop(&mut self) {
272 524 : // There might still be pages in the [`crate::page_cache`] for this file.
273 524 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
274 524 :
275 524 : // unlink the file
276 524 : let res = std::fs::remove_file(&self.file.path);
277 524 : if let Err(e) = res {
278 2 : if e.kind() != std::io::ErrorKind::NotFound {
279 : // just never log the not found errors, we cannot do anything for them; on detach
280 : // the tenant directory is already gone.
281 : //
282 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
283 0 : error!(
284 0 : "could not remove ephemeral file '{}': {}",
285 0 : self.file.path, e
286 0 : );
287 2 : }
288 522 : }
289 524 : }
290 : }
291 :
292 : impl BlockReader for EphemeralFile {
293 503670 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
294 503670 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
295 503670 : }
296 : }
297 :
298 : #[cfg(test)]
299 : mod tests {
300 : use super::*;
301 : use crate::context::DownloadBehavior;
302 : use crate::task_mgr::TaskKind;
303 : use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
304 : use rand::{thread_rng, RngCore};
305 : use std::fs;
306 : use std::str::FromStr;
307 :
308 2 : fn harness(
309 2 : test_name: &str,
310 2 : ) -> Result<
311 2 : (
312 2 : &'static PageServerConf,
313 2 : TenantShardId,
314 2 : TimelineId,
315 2 : RequestContext,
316 2 : ),
317 2 : io::Error,
318 2 : > {
319 2 : let repo_dir = PageServerConf::test_repo_dir(test_name);
320 2 : let _ = fs::remove_dir_all(&repo_dir);
321 2 : let conf = PageServerConf::dummy_conf(repo_dir);
322 2 : // Make a static copy of the config. This can never be free'd, but that's
323 2 : // OK in a test.
324 2 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
325 2 :
326 2 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
327 2 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
328 2 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
329 :
330 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
331 2 :
332 2 : Ok((conf, tenant_shard_id, timeline_id, ctx))
333 2 : }
334 :
335 2 : #[tokio::test]
336 2 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
337 2 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
338 2 :
339 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
340 2 :
341 2 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
342 2 : assert_eq!(
343 2 : b"foo",
344 2 : file.block_cursor()
345 2 : .read_blob(pos_foo, &ctx)
346 2 : .await?
347 2 : .as_slice()
348 2 : );
349 2 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
350 2 : assert_eq!(
351 2 : b"foo",
352 2 : file.block_cursor()
353 2 : .read_blob(pos_foo, &ctx)
354 2 : .await?
355 2 : .as_slice()
356 2 : );
357 2 : assert_eq!(
358 2 : b"bar",
359 2 : file.block_cursor()
360 2 : .read_blob(pos_bar, &ctx)
361 2 : .await?
362 2 : .as_slice()
363 2 : );
364 2 :
365 2 : let mut blobs = Vec::new();
366 20002 : for i in 0..10000 {
367 20000 : let data = Vec::from(format!("blob{}", i).as_bytes());
368 20000 : let pos = file.write_blob(&data, &ctx).await?;
369 20000 : blobs.push((pos, data));
370 2 : }
371 2 : // also test with a large blobs
372 202 : for i in 0..100 {
373 200 : let data = format!("blob{}", i).as_bytes().repeat(100);
374 200 : let pos = file.write_blob(&data, &ctx).await?;
375 200 : blobs.push((pos, data));
376 2 : }
377 2 :
378 2 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
379 20202 : for (pos, expected) in blobs {
380 20200 : let actual = cursor.read_blob(pos, &ctx).await?;
381 20200 : assert_eq!(actual, expected);
382 2 : }
383 2 :
384 2 : // Test a large blob that spans multiple pages
385 2 : let mut large_data = vec![0; 20000];
386 2 : thread_rng().fill_bytes(&mut large_data);
387 2 : let pos_large = file.write_blob(&large_data, &ctx).await?;
388 2 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
389 2 : assert_eq!(result, large_data);
390 2 :
391 2 : Ok(())
392 2 : }
393 : }
|