Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache::{self, PAGE_SZ};
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::{self, VirtualFile};
9 : use bytes::BytesMut;
10 : use camino::Utf8PathBuf;
11 : use pageserver_api::shard::TenantShardId;
12 : use std::cmp::min;
13 :
14 : use std::io::{self, ErrorKind};
15 : use std::ops::DerefMut;
16 : use std::sync::atomic::AtomicU64;
17 : use tracing::*;
18 : use utils::id::TimelineId;
19 :
20 : pub struct EphemeralFile {
21 : page_cache_file_id: page_cache::FileId,
22 :
23 : _tenant_shard_id: TenantShardId,
24 : _timeline_id: TimelineId,
25 : file: VirtualFile,
26 : len: u64,
27 : /// An ephemeral file is append-only.
28 : /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
29 : /// The other pages, which can no longer be modified, are accessed through the page cache.
30 : ///
31 : /// None <=> IO is ongoing.
32 : /// Size is fixed to PAGE_SZ at creation time and must not be changed.
33 : mutable_tail: Option<BytesMut>,
34 : }
35 :
36 : impl EphemeralFile {
37 814 : pub async fn create(
38 814 : conf: &PageServerConf,
39 814 : tenant_shard_id: TenantShardId,
40 814 : timeline_id: TimelineId,
41 814 : ) -> Result<EphemeralFile, io::Error> {
42 814 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
43 814 : let filename_disambiguator =
44 814 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
45 814 :
46 814 : let filename = conf
47 814 : .timeline_path(&tenant_shard_id, &timeline_id)
48 814 : .join(Utf8PathBuf::from(format!(
49 814 : "ephemeral-{filename_disambiguator}"
50 814 : )));
51 :
52 814 : let file = VirtualFile::open_with_options(
53 814 : &filename,
54 814 : virtual_file::OpenOptions::new()
55 814 : .read(true)
56 814 : .write(true)
57 814 : .create(true),
58 814 : )
59 457 : .await?;
60 :
61 814 : Ok(EphemeralFile {
62 814 : page_cache_file_id: page_cache::next_file_id(),
63 814 : _tenant_shard_id: tenant_shard_id,
64 814 : _timeline_id: timeline_id,
65 814 : file,
66 814 : len: 0,
67 814 : mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
68 814 : })
69 814 : }
70 :
71 7581992 : pub(crate) fn len(&self) -> u64 {
72 7581992 : self.len
73 7581992 : }
74 :
75 812 : pub(crate) fn id(&self) -> page_cache::FileId {
76 812 : self.page_cache_file_id
77 812 : }
78 :
79 3719850 : pub(crate) async fn read_blk(
80 3719850 : &self,
81 3719850 : blknum: u32,
82 3719850 : ctx: &RequestContext,
83 3719850 : ) -> Result<BlockLease, io::Error> {
84 3719850 : let flushed_blknums = 0..self.len / PAGE_SZ as u64;
85 3719850 : if flushed_blknums.contains(&(blknum as u64)) {
86 3393049 : let cache = page_cache::get();
87 3393049 : match cache
88 3393049 : .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
89 41977 : .await
90 3393049 : .map_err(|e| {
91 0 : std::io::Error::new(
92 0 : std::io::ErrorKind::Other,
93 0 : // order path before error because error is anyhow::Error => might have many contexts
94 0 : format!(
95 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
96 0 : blknum, self.file.path, e,
97 0 : ),
98 0 : )
99 3393049 : })? {
100 3346426 : page_cache::ReadBufResult::Found(guard) => {
101 3346426 : return Ok(BlockLease::PageReadGuard(guard))
102 : }
103 46623 : page_cache::ReadBufResult::NotFound(write_guard) => {
104 46623 : let write_guard = self
105 46623 : .file
106 46623 : .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
107 23930 : .await?;
108 46623 : let read_guard = write_guard.mark_valid();
109 46623 : return Ok(BlockLease::PageReadGuard(read_guard));
110 : }
111 : };
112 : } else {
113 326801 : debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
114 326801 : Ok(BlockLease::EphemeralFileMutableTail(
115 326801 : self.mutable_tail
116 326801 : .as_deref()
117 326801 : .expect("we're not doing IO, it must be Some()")
118 326801 : .try_into()
119 326801 : .expect("we ensure that it's always PAGE_SZ"),
120 326801 : ))
121 : }
122 3719850 : }
123 :
124 3954178 : pub(crate) async fn write_blob(
125 3954178 : &mut self,
126 3954178 : srcbuf: &[u8],
127 3954178 : ctx: &RequestContext,
128 3954178 : ) -> Result<u64, io::Error> {
129 3954178 : struct Writer<'a> {
130 3954178 : ephemeral_file: &'a mut EphemeralFile,
131 3954178 : /// The block to which the next [`push_bytes`] will write.
132 3954178 : blknum: u32,
133 3954178 : /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
134 3954178 : off: usize,
135 3954178 : }
136 3954178 : impl<'a> Writer<'a> {
137 3954178 : fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
138 3954178 : Ok(Writer {
139 3954178 : blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
140 3954178 : off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
141 3954178 : ephemeral_file,
142 3954178 : })
143 3954178 : }
144 3954178 : #[inline(always)]
145 7908356 : async fn push_bytes(
146 7908356 : &mut self,
147 7908356 : src: &[u8],
148 7908356 : ctx: &RequestContext,
149 7908356 : ) -> Result<(), io::Error> {
150 7908356 : let mut src_remaining = src;
151 15860707 : while !src_remaining.is_empty() {
152 7952351 : let dst_remaining = &mut self
153 7952351 : .ephemeral_file
154 7952351 : .mutable_tail
155 7952351 : .as_deref_mut()
156 7952351 : .expect("IO is not yet ongoing")[self.off..];
157 7952351 : let n = min(dst_remaining.len(), src_remaining.len());
158 7952351 : dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
159 7952351 : self.off += n;
160 7952351 : src_remaining = &src_remaining[n..];
161 7952351 : if self.off == PAGE_SZ {
162 3954178 : let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
163 44748 : .expect("IO is not yet ongoing");
164 3954178 : let (mutable_tail, res) = self
165 44748 : .ephemeral_file
166 44748 : .file
167 44748 : .write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
168 3954178 : .await;
169 3954178 : // TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
170 3954178 : // I.e., the IO isn't retryable if we panic.
171 3954178 : self.ephemeral_file.mutable_tail = Some(mutable_tail);
172 44748 : match res {
173 3954178 : Ok(_) => {
174 3954178 : // Pre-warm the page cache with what we just wrote.
175 3954178 : // This isn't necessary for coherency/correctness, but it's how we've always done it.
176 3954178 : let cache = page_cache::get();
177 44748 : match cache
178 44748 : .read_immutable_buf(
179 44748 : self.ephemeral_file.page_cache_file_id,
180 44748 : self.blknum,
181 44748 : ctx,
182 44748 : )
183 3954178 : .await
184 3954178 : {
185 3954178 : Ok(page_cache::ReadBufResult::Found(_guard)) => {
186 0 : // This function takes &mut self, so, it shouldn't be possible to reach this point.
187 0 : unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
188 3954178 : }
189 3954178 : Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
190 44748 : let buf: &mut [u8] = write_guard.deref_mut();
191 44748 : debug_assert_eq!(buf.len(), PAGE_SZ);
192 3954178 : buf.copy_from_slice(
193 44748 : self.ephemeral_file
194 44748 : .mutable_tail
195 44748 : .as_deref()
196 44748 : .expect("IO is not ongoing"),
197 44748 : );
198 44748 : let _ = write_guard.mark_valid();
199 3954178 : // pre-warm successful
200 3954178 : }
201 3954178 : Err(e) => {
202 0 : error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
203 3954178 : // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
204 3954178 : }
205 3954178 : }
206 3954178 : // Zero the buffer for re-use.
207 3954178 : // Zeroing is critical for correcntess because the write_blob code below
208 3954178 : // and similarly read_blk expect zeroed pages.
209 3954178 : self.ephemeral_file
210 44748 : .mutable_tail
211 44748 : .as_deref_mut()
212 44748 : .expect("IO is not ongoing")
213 44748 : .fill(0);
214 44748 : // This block is done, move to next one.
215 44748 : self.blknum += 1;
216 44748 : self.off = 0;
217 3954178 : }
218 3954178 : Err(e) => {
219 0 : return Err(std::io::Error::new(
220 0 : ErrorKind::Other,
221 0 : // order error before path because path is long and error is short
222 0 : format!(
223 0 : "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
224 0 : self.blknum,
225 0 : e,
226 0 : self.ephemeral_file.file.path,
227 0 : ),
228 0 : ));
229 3954178 : }
230 3954178 : }
231 7907603 : }
232 3954178 : }
233 7908356 : Ok(())
234 7908356 : }
235 3954178 : }
236 3954178 :
237 3954178 : let pos = self.len;
238 3954178 : let mut writer = Writer::new(self)?;
239 :
240 : // Write the length field
241 3954178 : if srcbuf.len() < 0x80 {
242 : // short one-byte length header
243 3800360 : let len_buf = [srcbuf.len() as u8];
244 3800360 : writer.push_bytes(&len_buf, ctx).await?;
245 : } else {
246 153818 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
247 153818 : len_buf[0] |= 0x80;
248 153818 : writer.push_bytes(&len_buf, ctx).await?;
249 : }
250 :
251 : // Write the payload
252 3954178 : writer.push_bytes(srcbuf, ctx).await?;
253 :
254 3954178 : if srcbuf.len() < 0x80 {
255 3800360 : self.len += 1;
256 3800360 : } else {
257 153818 : self.len += 4;
258 153818 : }
259 3954178 : self.len += srcbuf.len() as u64;
260 3954178 :
261 3954178 : Ok(pos)
262 3954178 : }
263 : }
264 :
265 : /// Does the given filename look like an ephemeral file?
266 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
267 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
268 0 : rest.parse::<u32>().is_ok()
269 : } else {
270 0 : false
271 : }
272 0 : }
273 :
274 : impl Drop for EphemeralFile {
275 696 : fn drop(&mut self) {
276 696 : // There might still be pages in the [`crate::page_cache`] for this file.
277 696 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
278 696 :
279 696 : // unlink the file
280 696 : let res = std::fs::remove_file(&self.file.path);
281 696 : if let Err(e) = res {
282 2 : if e.kind() != std::io::ErrorKind::NotFound {
283 : // just never log the not found errors, we cannot do anything for them; on detach
284 : // the tenant directory is already gone.
285 : //
286 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
287 0 : error!(
288 0 : "could not remove ephemeral file '{}': {}",
289 0 : self.file.path, e
290 0 : );
291 2 : }
292 694 : }
293 696 : }
294 : }
295 :
296 : impl BlockReader for EphemeralFile {
297 504061 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
298 504061 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
299 504061 : }
300 : }
301 :
302 : #[cfg(test)]
303 : mod tests {
304 : use super::*;
305 : use crate::context::DownloadBehavior;
306 : use crate::task_mgr::TaskKind;
307 : use crate::tenant::block_io::BlockReaderRef;
308 : use rand::{thread_rng, RngCore};
309 : use std::fs;
310 : use std::str::FromStr;
311 :
312 2 : fn harness(
313 2 : test_name: &str,
314 2 : ) -> Result<
315 2 : (
316 2 : &'static PageServerConf,
317 2 : TenantShardId,
318 2 : TimelineId,
319 2 : RequestContext,
320 2 : ),
321 2 : io::Error,
322 2 : > {
323 2 : let repo_dir = PageServerConf::test_repo_dir(test_name);
324 2 : let _ = fs::remove_dir_all(&repo_dir);
325 2 : let conf = PageServerConf::dummy_conf(repo_dir);
326 2 : // Make a static copy of the config. This can never be free'd, but that's
327 2 : // OK in a test.
328 2 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
329 2 :
330 2 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
331 2 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
332 2 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
333 :
334 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
335 2 :
336 2 : Ok((conf, tenant_shard_id, timeline_id, ctx))
337 2 : }
338 :
339 : #[tokio::test]
340 2 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
341 2 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
342 2 :
343 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
344 2 :
345 2 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
346 2 : assert_eq!(
347 2 : b"foo",
348 2 : file.block_cursor()
349 2 : .read_blob(pos_foo, &ctx)
350 2 : .await?
351 2 : .as_slice()
352 2 : );
353 2 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
354 2 : assert_eq!(
355 2 : b"foo",
356 2 : file.block_cursor()
357 2 : .read_blob(pos_foo, &ctx)
358 2 : .await?
359 2 : .as_slice()
360 2 : );
361 2 : assert_eq!(
362 2 : b"bar",
363 2 : file.block_cursor()
364 2 : .read_blob(pos_bar, &ctx)
365 2 : .await?
366 2 : .as_slice()
367 2 : );
368 2 :
369 2 : let mut blobs = Vec::new();
370 20002 : for i in 0..10000 {
371 20000 : let data = Vec::from(format!("blob{}", i).as_bytes());
372 20000 : let pos = file.write_blob(&data, &ctx).await?;
373 20000 : blobs.push((pos, data));
374 2 : }
375 2 : // also test with a large blobs
376 202 : for i in 0..100 {
377 200 : let data = format!("blob{}", i).as_bytes().repeat(100);
378 200 : let pos = file.write_blob(&data, &ctx).await?;
379 200 : blobs.push((pos, data));
380 2 : }
381 2 :
382 2 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
383 20202 : for (pos, expected) in blobs {
384 20200 : let actual = cursor.read_blob(pos, &ctx).await?;
385 20200 : assert_eq!(actual, expected);
386 2 : }
387 2 :
388 2 : // Test a large blob that spans multiple pages
389 2 : let mut large_data = vec![0; 20000];
390 2 : thread_rng().fill_bytes(&mut large_data);
391 2 : let pos_large = file.write_blob(&large_data, &ctx).await?;
392 2 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
393 2 : assert_eq!(result, large_data);
394 2 :
395 2 : Ok(())
396 2 : }
397 : }
|