Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache::{self, PAGE_SZ};
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::{self, VirtualFile};
9 : use bytes::BytesMut;
10 : use camino::Utf8PathBuf;
11 : use pageserver_api::shard::TenantShardId;
12 : use std::cmp::min;
13 :
14 : use std::io::{self, ErrorKind};
15 : use std::ops::DerefMut;
16 : use std::sync::atomic::AtomicU64;
17 : use tracing::*;
18 : use utils::id::TimelineId;
19 :
20 : pub struct EphemeralFile {
21 : page_cache_file_id: page_cache::FileId,
22 :
23 : _tenant_shard_id: TenantShardId,
24 : _timeline_id: TimelineId,
25 : file: VirtualFile,
26 : len: u64,
27 : /// An ephemeral file is append-only.
28 : /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
29 : /// The other pages, which can no longer be modified, are accessed through the page cache.
30 : ///
31 : /// None <=> IO is ongoing.
32 : /// Size is fixed to PAGE_SZ at creation time and must not be changed.
33 : mutable_tail: Option<BytesMut>,
34 : }
35 :
36 : impl EphemeralFile {
37 806 : pub async fn create(
38 806 : conf: &PageServerConf,
39 806 : tenant_shard_id: TenantShardId,
40 806 : timeline_id: TimelineId,
41 806 : ) -> Result<EphemeralFile, io::Error> {
42 806 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
43 806 : let filename_disambiguator =
44 806 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
45 806 :
46 806 : let filename = conf
47 806 : .timeline_path(&tenant_shard_id, &timeline_id)
48 806 : .join(Utf8PathBuf::from(format!(
49 806 : "ephemeral-{filename_disambiguator}"
50 806 : )));
51 :
52 806 : let file = VirtualFile::open_with_options(
53 806 : &filename,
54 806 : virtual_file::OpenOptions::new()
55 806 : .read(true)
56 806 : .write(true)
57 806 : .create(true),
58 806 : )
59 451 : .await?;
60 :
61 806 : Ok(EphemeralFile {
62 806 : page_cache_file_id: page_cache::next_file_id(),
63 806 : _tenant_shard_id: tenant_shard_id,
64 806 : _timeline_id: timeline_id,
65 806 : file,
66 806 : len: 0,
67 806 : mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
68 806 : })
69 806 : }
70 :
71 7581938 : pub(crate) fn len(&self) -> u64 {
72 7581938 : self.len
73 7581938 : }
74 :
75 804 : pub(crate) fn id(&self) -> page_cache::FileId {
76 804 : self.page_cache_file_id
77 804 : }
78 :
79 3719661 : pub(crate) async fn read_blk(
80 3719661 : &self,
81 3719661 : blknum: u32,
82 3719661 : ctx: &RequestContext,
83 3719661 : ) -> Result<BlockLease, io::Error> {
84 3719661 : let flushed_blknums = 0..self.len / PAGE_SZ as u64;
85 3719661 : if flushed_blknums.contains(&(blknum as u64)) {
86 3392943 : let cache = page_cache::get();
87 3392943 : match cache
88 3392943 : .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
89 42045 : .await
90 3392943 : .map_err(|e| {
91 0 : std::io::Error::new(
92 0 : std::io::ErrorKind::Other,
93 0 : // order path before error because error is anyhow::Error => might have many contexts
94 0 : format!(
95 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
96 0 : blknum, self.file.path, e,
97 0 : ),
98 0 : )
99 3392943 : })? {
100 3346344 : page_cache::ReadBufResult::Found(guard) => {
101 3346344 : return Ok(BlockLease::PageReadGuard(guard))
102 : }
103 46599 : page_cache::ReadBufResult::NotFound(write_guard) => {
104 46599 : let write_guard = self
105 46599 : .file
106 46599 : .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
107 23916 : .await?;
108 46599 : let read_guard = write_guard.mark_valid();
109 46599 : return Ok(BlockLease::PageReadGuard(read_guard));
110 : }
111 : };
112 : } else {
113 326718 : debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
114 326718 : Ok(BlockLease::EphemeralFileMutableTail(
115 326718 : self.mutable_tail
116 326718 : .as_deref()
117 326718 : .expect("we're not doing IO, it must be Some()")
118 326718 : .try_into()
119 326718 : .expect("we ensure that it's always PAGE_SZ"),
120 326718 : ))
121 : }
122 3719661 : }
123 :
124 3954132 : pub(crate) async fn write_blob(
125 3954132 : &mut self,
126 3954132 : srcbuf: &[u8],
127 3954132 : ctx: &RequestContext,
128 3954132 : ) -> Result<u64, io::Error> {
129 3954132 : struct Writer<'a> {
130 3954132 : ephemeral_file: &'a mut EphemeralFile,
131 3954132 : /// The block to which the next [`push_bytes`] will write.
132 3954132 : blknum: u32,
133 3954132 : /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
134 3954132 : off: usize,
135 3954132 : }
136 3954132 : impl<'a> Writer<'a> {
137 3954132 : fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
138 3954132 : Ok(Writer {
139 3954132 : blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
140 3954132 : off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
141 3954132 : ephemeral_file,
142 3954132 : })
143 3954132 : }
144 3954132 : #[inline(always)]
145 7908264 : async fn push_bytes(
146 7908264 : &mut self,
147 7908264 : src: &[u8],
148 7908264 : ctx: &RequestContext,
149 7908264 : ) -> Result<(), io::Error> {
150 7908264 : let mut src_remaining = src;
151 15860505 : while !src_remaining.is_empty() {
152 7952241 : let dst_remaining = &mut self
153 7952241 : .ephemeral_file
154 7952241 : .mutable_tail
155 7952241 : .as_deref_mut()
156 7952241 : .expect("IO is not yet ongoing")[self.off..];
157 7952241 : let n = min(dst_remaining.len(), src_remaining.len());
158 7952241 : dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
159 7952241 : self.off += n;
160 7952241 : src_remaining = &src_remaining[n..];
161 7952241 : if self.off == PAGE_SZ {
162 3954132 : let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
163 44716 : .expect("IO is not yet ongoing");
164 3954132 : let (mutable_tail, res) = self
165 44716 : .ephemeral_file
166 44716 : .file
167 44716 : .write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
168 3954132 : .await;
169 3954132 : // TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
170 3954132 : // I.e., the IO isn't retryable if we panic.
171 3954132 : self.ephemeral_file.mutable_tail = Some(mutable_tail);
172 44716 : match res {
173 3954132 : Ok(_) => {
174 3954132 : // Pre-warm the page cache with what we just wrote.
175 3954132 : // This isn't necessary for coherency/correctness, but it's how we've always done it.
176 3954132 : let cache = page_cache::get();
177 44716 : match cache
178 44716 : .read_immutable_buf(
179 44716 : self.ephemeral_file.page_cache_file_id,
180 44716 : self.blknum,
181 44716 : ctx,
182 44716 : )
183 3954132 : .await
184 3954132 : {
185 3954132 : Ok(page_cache::ReadBufResult::Found(_guard)) => {
186 0 : // This function takes &mut self, so, it shouldn't be possible to reach this point.
187 0 : unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
188 3954132 : }
189 3954132 : Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
190 44716 : let buf: &mut [u8] = write_guard.deref_mut();
191 44716 : debug_assert_eq!(buf.len(), PAGE_SZ);
192 3954132 : buf.copy_from_slice(
193 44716 : self.ephemeral_file
194 44716 : .mutable_tail
195 44716 : .as_deref()
196 44716 : .expect("IO is not ongoing"),
197 44716 : );
198 44716 : let _ = write_guard.mark_valid();
199 3954132 : // pre-warm successful
200 3954132 : }
201 3954132 : Err(e) => {
202 0 : error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
203 3954132 : // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
204 3954132 : }
205 3954132 : }
206 3954132 : // Zero the buffer for re-use.
207 3954132 : // Zeroing is critical for correcntess because the write_blob code below
208 3954132 : // and similarly read_blk expect zeroed pages.
209 3954132 : self.ephemeral_file
210 44716 : .mutable_tail
211 44716 : .as_deref_mut()
212 44716 : .expect("IO is not ongoing")
213 44716 : .fill(0);
214 44716 : // This block is done, move to next one.
215 44716 : self.blknum += 1;
216 44716 : self.off = 0;
217 3954132 : }
218 3954132 : Err(e) => {
219 0 : return Err(std::io::Error::new(
220 0 : ErrorKind::Other,
221 0 : // order error before path because path is long and error is short
222 0 : format!(
223 0 : "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
224 0 : self.blknum,
225 0 : e,
226 0 : self.ephemeral_file.file.path,
227 0 : ),
228 0 : ));
229 3954132 : }
230 3954132 : }
231 7907525 : }
232 3954132 : }
233 7908264 : Ok(())
234 7908264 : }
235 3954132 : }
236 3954132 :
237 3954132 : let pos = self.len;
238 3954132 : let mut writer = Writer::new(self)?;
239 :
240 : // Write the length field
241 3954132 : if srcbuf.len() < 0x80 {
242 : // short one-byte length header
243 3800316 : let len_buf = [srcbuf.len() as u8];
244 3800316 : writer.push_bytes(&len_buf, ctx).await?;
245 : } else {
246 153816 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
247 153816 : len_buf[0] |= 0x80;
248 153816 : writer.push_bytes(&len_buf, ctx).await?;
249 : }
250 :
251 : // Write the payload
252 3954132 : writer.push_bytes(srcbuf, ctx).await?;
253 :
254 3954132 : if srcbuf.len() < 0x80 {
255 3800316 : self.len += 1;
256 3800316 : } else {
257 153816 : self.len += 4;
258 153816 : }
259 3954132 : self.len += srcbuf.len() as u64;
260 3954132 :
261 3954132 : Ok(pos)
262 3954132 : }
263 : }
264 :
265 : /// Does the given filename look like an ephemeral file?
266 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
267 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
268 0 : rest.parse::<u32>().is_ok()
269 : } else {
270 0 : false
271 : }
272 0 : }
273 :
274 : impl Drop for EphemeralFile {
275 688 : fn drop(&mut self) {
276 688 : // There might still be pages in the [`crate::page_cache`] for this file.
277 688 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
278 688 :
279 688 : // unlink the file
280 688 : let res = std::fs::remove_file(&self.file.path);
281 688 : if let Err(e) = res {
282 2 : if e.kind() != std::io::ErrorKind::NotFound {
283 : // just never log the not found errors, we cannot do anything for them; on detach
284 : // the tenant directory is already gone.
285 : //
286 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
287 0 : error!(
288 0 : "could not remove ephemeral file '{}': {}",
289 0 : self.file.path, e
290 0 : );
291 2 : }
292 686 : }
293 688 : }
294 : }
295 :
296 : impl BlockReader for EphemeralFile {
297 504059 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
298 504059 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
299 504059 : }
300 : }
301 :
302 : #[cfg(test)]
303 : mod tests {
304 : use super::*;
305 : use crate::context::DownloadBehavior;
306 : use crate::task_mgr::TaskKind;
307 : use crate::tenant::block_io::BlockReaderRef;
308 : use rand::{thread_rng, RngCore};
309 : use std::fs;
310 : use std::str::FromStr;
311 :
312 2 : fn harness(
313 2 : test_name: &str,
314 2 : ) -> Result<
315 2 : (
316 2 : &'static PageServerConf,
317 2 : TenantShardId,
318 2 : TimelineId,
319 2 : RequestContext,
320 2 : ),
321 2 : io::Error,
322 2 : > {
323 2 : let repo_dir = PageServerConf::test_repo_dir(test_name);
324 2 : let _ = fs::remove_dir_all(&repo_dir);
325 2 : let conf = PageServerConf::dummy_conf(repo_dir);
326 2 : // Make a static copy of the config. This can never be free'd, but that's
327 2 : // OK in a test.
328 2 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
329 2 :
330 2 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
331 2 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
332 2 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
333 :
334 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
335 2 :
336 2 : Ok((conf, tenant_shard_id, timeline_id, ctx))
337 2 : }
338 :
339 : #[tokio::test]
340 2 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
341 2 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
342 2 :
343 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
344 2 :
345 2 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
346 2 : assert_eq!(
347 2 : b"foo",
348 2 : file.block_cursor()
349 2 : .read_blob(pos_foo, &ctx)
350 2 : .await?
351 2 : .as_slice()
352 2 : );
353 2 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
354 2 : assert_eq!(
355 2 : b"foo",
356 2 : file.block_cursor()
357 2 : .read_blob(pos_foo, &ctx)
358 2 : .await?
359 2 : .as_slice()
360 2 : );
361 2 : assert_eq!(
362 2 : b"bar",
363 2 : file.block_cursor()
364 2 : .read_blob(pos_bar, &ctx)
365 2 : .await?
366 2 : .as_slice()
367 2 : );
368 2 :
369 2 : let mut blobs = Vec::new();
370 20002 : for i in 0..10000 {
371 20000 : let data = Vec::from(format!("blob{}", i).as_bytes());
372 20000 : let pos = file.write_blob(&data, &ctx).await?;
373 20000 : blobs.push((pos, data));
374 2 : }
375 2 : // also test with a large blobs
376 202 : for i in 0..100 {
377 200 : let data = format!("blob{}", i).as_bytes().repeat(100);
378 200 : let pos = file.write_blob(&data, &ctx).await?;
379 200 : blobs.push((pos, data));
380 2 : }
381 2 :
382 2 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
383 20202 : for (pos, expected) in blobs {
384 20200 : let actual = cursor.read_blob(pos, &ctx).await?;
385 20200 : assert_eq!(actual, expected);
386 2 : }
387 2 :
388 2 : // Test a large blob that spans multiple pages
389 2 : let mut large_data = vec![0; 20000];
390 2 : thread_rng().fill_bytes(&mut large_data);
391 2 : let pos_large = file.write_blob(&large_data, &ctx).await?;
392 2 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
393 2 : assert_eq!(result, large_data);
394 2 :
395 2 : Ok(())
396 2 : }
397 : }
|