Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use std::io;
5 : use std::sync::Arc;
6 : use std::sync::atomic::AtomicU64;
7 :
8 : use camino::Utf8PathBuf;
9 : use num_traits::Num;
10 : use pageserver_api::shard::TenantShardId;
11 : use tokio_epoll_uring::{BoundedBuf, Slice};
12 : use tracing::error;
13 : use utils::id::TimelineId;
14 :
15 : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
16 : use crate::config::PageServerConf;
17 : use crate::context::RequestContext;
18 : use crate::page_cache;
19 : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
20 : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
21 : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
22 : use crate::virtual_file::owned_buffers_io::write::Buffer;
23 : use crate::virtual_file::{self, IoBufferMut, VirtualFile, owned_buffers_io};
24 :
25 : pub struct EphemeralFile {
26 : _tenant_shard_id: TenantShardId,
27 : _timeline_id: TimelineId,
28 : page_cache_file_id: page_cache::FileId,
29 : bytes_written: u64,
30 : buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
31 : /// Gate guard is held on as long as we need to do operations in the path (delete on drop)
32 : _gate_guard: utils::sync::gate::GateGuard,
33 : }
34 :
35 : const TAIL_SZ: usize = 64 * 1024;
36 :
37 : impl EphemeralFile {
38 2632 : pub async fn create(
39 2632 : conf: &PageServerConf,
40 2632 : tenant_shard_id: TenantShardId,
41 2632 : timeline_id: TimelineId,
42 2632 : gate: &utils::sync::gate::Gate,
43 2632 : ctx: &RequestContext,
44 2632 : ) -> anyhow::Result<EphemeralFile> {
45 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
46 2632 : let filename_disambiguator =
47 2632 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
48 2632 :
49 2632 : let filename = conf
50 2632 : .timeline_path(&tenant_shard_id, &timeline_id)
51 2632 : .join(Utf8PathBuf::from(format!(
52 2632 : "ephemeral-{filename_disambiguator}"
53 2632 : )));
54 :
55 2632 : let file = Arc::new(
56 2632 : VirtualFile::open_with_options_v2(
57 2632 : &filename,
58 2632 : virtual_file::OpenOptions::new()
59 2632 : .read(true)
60 2632 : .write(true)
61 2632 : .create(true),
62 2632 : ctx,
63 2632 : )
64 2632 : .await?,
65 : );
66 :
67 2632 : let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
68 2632 :
69 2632 : Ok(EphemeralFile {
70 2632 : _tenant_shard_id: tenant_shard_id,
71 2632 : _timeline_id: timeline_id,
72 2632 : page_cache_file_id,
73 2632 : bytes_written: 0,
74 2632 : buffered_writer: owned_buffers_io::write::BufferedWriter::new(
75 2632 : file,
76 5264 : || IoBufferMut::with_capacity(TAIL_SZ),
77 2632 : gate.enter()?,
78 2632 : ctx,
79 2632 : ),
80 2632 : _gate_guard: gate.enter()?,
81 : })
82 2632 : }
83 : }
84 :
85 : impl Drop for EphemeralFile {
86 2380 : fn drop(&mut self) {
87 2380 : // unlink the file
88 2380 : // we are clear to do this, because we have entered a gate
89 2380 : let path = self.buffered_writer.as_inner().path();
90 2380 : let res = std::fs::remove_file(path);
91 2380 : if let Err(e) = res {
92 4 : if e.kind() != std::io::ErrorKind::NotFound {
93 : // just never log the not found errors, we cannot do anything for them; on detach
94 : // the tenant directory is already gone.
95 : //
96 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
97 0 : error!("could not remove ephemeral file '{path}': {e}");
98 4 : }
99 2376 : }
100 2380 : }
101 : }
102 :
103 : impl EphemeralFile {
104 19226220 : pub(crate) fn len(&self) -> u64 {
105 19226220 : self.bytes_written
106 19226220 : }
107 :
108 2616 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
109 2616 : self.page_cache_file_id
110 2616 : }
111 :
112 1940 : pub(crate) async fn load_to_io_buf(
113 1940 : &self,
114 1940 : ctx: &RequestContext,
115 1940 : ) -> Result<IoBufferMut, io::Error> {
116 1940 : let size = self.len().into_usize();
117 1940 : let buf = IoBufferMut::with_capacity(size);
118 1940 : let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
119 1940 : assert_eq!(nread, size);
120 1940 : let buf = slice.into_inner();
121 1940 : assert_eq!(buf.len(), nread);
122 1940 : assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
123 1940 : Ok(buf)
124 1940 : }
125 :
126 : /// Returns the offset at which the first byte of the input was written, for use
127 : /// in constructing indices over the written value.
128 : ///
129 : /// Panics if the write is short because there's no way we can recover from that.
130 : /// TODO: make upstack handle this as an error.
131 9609764 : pub(crate) async fn write_raw(
132 9609764 : &mut self,
133 9609764 : srcbuf: &[u8],
134 9609764 : ctx: &RequestContext,
135 9609764 : ) -> std::io::Result<u64> {
136 9609764 : let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
137 9609764 : if let Some(control) = control {
138 11068 : control.release().await;
139 9598696 : }
140 9609764 : Ok(pos)
141 9609764 : }
142 :
143 9609768 : async fn write_raw_controlled(
144 9609768 : &mut self,
145 9609768 : srcbuf: &[u8],
146 9609768 : ctx: &RequestContext,
147 9609768 : ) -> std::io::Result<(u64, Option<owned_buffers_io::write::FlushControl>)> {
148 9609768 : let pos = self.bytes_written;
149 :
150 9609768 : let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
151 0 : std::io::Error::new(
152 0 : std::io::ErrorKind::Other,
153 0 : format!(
154 0 : "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
155 0 : srcbuf_len = srcbuf.len(),
156 0 : ),
157 0 : )
158 9609768 : })?;
159 :
160 : // Write the payload
161 9609768 : let (nwritten, control) = self
162 9609768 : .buffered_writer
163 9609768 : .write_buffered_borrowed_controlled(srcbuf, ctx)
164 9609768 : .await?;
165 9609768 : assert_eq!(
166 9609768 : nwritten,
167 9609768 : srcbuf.len(),
168 0 : "buffered writer has no short writes"
169 : );
170 :
171 9609768 : self.bytes_written = new_bytes_written;
172 9609768 :
173 9609768 : Ok((pos, control))
174 9609768 : }
175 : }
176 :
177 : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
178 997798 : async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
179 997798 : &self,
180 997798 : start: u64,
181 997798 : dst: tokio_epoll_uring::Slice<B>,
182 997798 : ctx: &RequestContext,
183 997798 : ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
184 997798 : let submitted_offset = self.buffered_writer.bytes_submitted();
185 997798 :
186 997798 : let mutable = self.buffered_writer.inspect_mutable();
187 997798 : let mutable = &mutable[0..mutable.pending()];
188 997798 :
189 997798 : let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
190 997798 :
191 997798 : let dst_cap = dst.bytes_total().into_u64();
192 997798 : let end = {
193 : // saturating_add is correct here because the max file size is u64::MAX, so,
194 : // if start + dst.len() > u64::MAX, then we know it will be a short read
195 997798 : let mut end: u64 = start.saturating_add(dst_cap);
196 997798 : if end > self.bytes_written {
197 554397 : end = self.bytes_written;
198 554397 : }
199 997798 : end
200 : };
201 :
202 : // inclusive, exclusive
203 : #[derive(Debug)]
204 : struct Range<N>(N, N);
205 : impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
206 6614785 : fn len(&self) -> N {
207 6614785 : if self.0 > self.1 {
208 3550042 : N::zero()
209 : } else {
210 3064743 : self.1 - self.0
211 : }
212 6614785 : }
213 : }
214 :
215 997798 : let (written_range, maybe_flushed_range) = {
216 997798 : if maybe_flushed.is_some() {
217 : // [ written ][ maybe_flushed ][ mutable ]
218 : // <- TAIL_SZ -><- TAIL_SZ ->
219 : // ^
220 : // `submitted_offset`
221 : // <++++++ on disk +++++++????????????????>
222 977036 : (
223 977036 : Range(
224 977036 : start,
225 977036 : std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
226 977036 : ),
227 977036 : Range(
228 977036 : std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
229 977036 : std::cmp::min(end, submitted_offset),
230 977036 : ),
231 977036 : )
232 : } else {
233 : // [ written ][ mutable ]
234 : // <- TAIL_SZ ->
235 : // ^
236 : // `submitted_offset`
237 : // <++++++ on disk +++++++++++++++++++++++>
238 20762 : (
239 20762 : Range(start, std::cmp::min(end, submitted_offset)),
240 20762 : // zero len
241 20762 : Range(submitted_offset, u64::MIN),
242 20762 : )
243 : }
244 : };
245 :
246 997798 : let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
247 :
248 997798 : let dst = if written_range.len() > 0 {
249 20348 : let file: &VirtualFile = self.buffered_writer.as_inner();
250 20348 : let bounds = dst.bounds();
251 20348 : let slice = file
252 20348 : .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
253 20348 : .await?;
254 20348 : Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
255 : } else {
256 977450 : dst
257 : };
258 :
259 997798 : let dst = if maybe_flushed_range.len() > 0 {
260 321101 : let offset_in_buffer = maybe_flushed_range
261 321101 : .0
262 321101 : .checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
263 321101 : .unwrap()
264 321101 : .into_usize();
265 321101 : // Checked previously the buffer is Some.
266 321101 : let maybe_flushed = maybe_flushed.unwrap();
267 321101 : let to_copy = &maybe_flushed
268 321101 : [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
269 321101 : let bounds = dst.bounds();
270 321101 : let mut view = dst.slice({
271 321101 : let start = written_range.len().into_usize();
272 321101 : let end = start
273 321101 : .checked_add(maybe_flushed_range.len().into_usize())
274 321101 : .unwrap();
275 321101 : start..end
276 321101 : });
277 321101 : view.as_mut_rust_slice_full_zeroed()
278 321101 : .copy_from_slice(to_copy);
279 321101 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
280 : } else {
281 676697 : dst
282 : };
283 :
284 997798 : let dst = if mutable_range.len() > 0 {
285 659435 : let offset_in_buffer = mutable_range
286 659435 : .0
287 659435 : .checked_sub(submitted_offset)
288 659435 : .unwrap()
289 659435 : .into_usize();
290 659435 : let to_copy =
291 659435 : &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
292 659435 : let bounds = dst.bounds();
293 659435 : let mut view = dst.slice({
294 659435 : let start =
295 659435 : written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
296 659435 : let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
297 659435 : start..end
298 659435 : });
299 659435 : view.as_mut_rust_slice_full_zeroed()
300 659435 : .copy_from_slice(to_copy);
301 659435 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
302 : } else {
303 338363 : dst
304 : };
305 :
306 : // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
307 :
308 997798 : Ok((dst, (end - start).into_usize()))
309 997798 : }
310 : }
311 :
312 : /// Does the given filename look like an ephemeral file?
313 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
314 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
315 0 : rest.parse::<u32>().is_ok()
316 : } else {
317 0 : false
318 : }
319 0 : }
320 :
321 : #[cfg(test)]
322 : mod tests {
323 : use std::fs;
324 : use std::str::FromStr;
325 :
326 : use rand::Rng;
327 :
328 : use super::*;
329 : use crate::context::DownloadBehavior;
330 : use crate::task_mgr::TaskKind;
331 :
332 16 : fn harness(
333 16 : test_name: &str,
334 16 : ) -> Result<
335 16 : (
336 16 : &'static PageServerConf,
337 16 : TenantShardId,
338 16 : TimelineId,
339 16 : RequestContext,
340 16 : ),
341 16 : io::Error,
342 16 : > {
343 16 : let repo_dir = PageServerConf::test_repo_dir(test_name);
344 16 : let _ = fs::remove_dir_all(&repo_dir);
345 16 : let conf = PageServerConf::dummy_conf(repo_dir);
346 16 : // Make a static copy of the config. This can never be free'd, but that's
347 16 : // OK in a test.
348 16 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
349 16 :
350 16 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
351 16 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
352 16 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
353 :
354 16 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
355 16 :
356 16 : Ok((conf, tenant_shard_id, timeline_id, ctx))
357 16 : }
358 :
359 : #[tokio::test]
360 4 : async fn ephemeral_file_holds_gate_open() {
361 4 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
362 4 :
363 4 : let (conf, tenant_id, timeline_id, ctx) =
364 4 : harness("ephemeral_file_holds_gate_open").unwrap();
365 4 :
366 4 : let gate = utils::sync::gate::Gate::default();
367 4 :
368 4 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
369 4 : .await
370 4 : .unwrap();
371 4 :
372 4 : let mut closing = tokio::task::spawn(async move {
373 4 : gate.close().await;
374 4 : });
375 4 :
376 4 : // gate is entered until the ephemeral file is dropped
377 4 : // do not start paused tokio-epoll-uring has a sleep loop
378 4 : tokio::time::pause();
379 4 : tokio::time::timeout(FOREVER, &mut closing)
380 4 : .await
381 4 : .expect_err("closing cannot complete before dropping");
382 4 :
383 4 : // this is a requirement of the reset_tenant functionality: we have to be able to restart a
384 4 : // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
385 4 : drop(file);
386 4 :
387 4 : tokio::time::timeout(FOREVER, &mut closing)
388 4 : .await
389 4 : .expect("closing completes right away")
390 4 : .expect("closing does not panic");
391 4 : }
392 :
393 : #[tokio::test]
394 4 : async fn test_ephemeral_file_basics() {
395 4 : let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
396 4 :
397 4 : let gate = utils::sync::gate::Gate::default();
398 4 :
399 4 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
400 4 : .await
401 4 : .unwrap();
402 4 :
403 4 : let mutable = file.buffered_writer.inspect_mutable();
404 4 : let cap = mutable.capacity();
405 4 : let align = mutable.align();
406 4 :
407 4 : let write_nbytes = cap * 2 + cap / 2;
408 4 :
409 4 : let content: Vec<u8> = rand::thread_rng()
410 4 : .sample_iter(rand::distributions::Standard)
411 4 : .take(write_nbytes)
412 4 : .collect();
413 4 :
414 4 : let mut value_offsets = Vec::new();
415 1280 : for range in (0..write_nbytes)
416 4 : .step_by(align)
417 1280 : .map(|start| start..(start + align).min(write_nbytes))
418 4 : {
419 1280 : let off = file.write_raw(&content[range], &ctx).await.unwrap();
420 1280 : value_offsets.push(off);
421 4 : }
422 4 :
423 4 : assert_eq!(file.len() as usize, write_nbytes);
424 1280 : for (i, range) in (0..write_nbytes)
425 4 : .step_by(align)
426 1280 : .map(|start| start..(start + align).min(write_nbytes))
427 4 : .enumerate()
428 4 : {
429 1280 : assert_eq!(value_offsets[i], range.start.into_u64());
430 1280 : let buf = IoBufferMut::with_capacity(range.len());
431 1280 : let (buf_slice, nread) = file
432 1280 : .read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
433 1280 : .await
434 1280 : .unwrap();
435 1280 : let buf = buf_slice.into_inner();
436 1280 : assert_eq!(nread, range.len());
437 1280 : assert_eq!(&buf, &content[range]);
438 4 : }
439 4 :
440 4 : let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
441 4 : assert!(file_contents == content[0..cap * 2]);
442 4 :
443 4 : let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
444 4 : assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
445 4 :
446 4 : let mutable_buffer_contents = file.buffered_writer.inspect_mutable();
447 4 : assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
448 4 : }
449 :
450 : #[tokio::test]
451 4 : async fn test_flushes_do_happen() {
452 4 : let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
453 4 :
454 4 : let gate = utils::sync::gate::Gate::default();
455 4 :
456 4 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
457 4 : .await
458 4 : .unwrap();
459 4 :
460 4 : // mutable buffer and maybe_flushed buffer each has `cap` bytes.
461 4 : let cap = file.buffered_writer.inspect_mutable().capacity();
462 4 :
463 4 : let content: Vec<u8> = rand::thread_rng()
464 4 : .sample_iter(rand::distributions::Standard)
465 4 : .take(cap * 2 + cap / 2)
466 4 : .collect();
467 4 :
468 4 : file.write_raw(&content, &ctx).await.unwrap();
469 4 :
470 4 : // assert the state is as this test expects it to be
471 4 : assert_eq!(
472 4 : &file.load_to_io_buf(&ctx).await.unwrap(),
473 4 : &content[0..cap * 2 + cap / 2]
474 4 : );
475 4 : let md = file.buffered_writer.as_inner().path().metadata().unwrap();
476 4 : assert_eq!(
477 4 : md.len(),
478 4 : 2 * cap.into_u64(),
479 4 : "buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
480 4 : );
481 4 : assert_eq!(
482 4 : &file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
483 4 : &content[cap..cap * 2]
484 4 : );
485 4 : assert_eq!(
486 4 : &file.buffered_writer.inspect_mutable()[0..cap / 2],
487 4 : &content[cap * 2..cap * 2 + cap / 2]
488 4 : );
489 4 : }
490 :
491 : #[tokio::test]
492 4 : async fn test_read_split_across_file_and_buffer() {
493 4 : // This test exercises the logic on the read path that splits the logical read
494 4 : // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
495 4 : //
496 4 : // This test build on the assertions in test_flushes_do_happen
497 4 :
498 4 : let (conf, tenant_id, timeline_id, ctx) =
499 4 : harness("test_read_split_across_file_and_buffer").unwrap();
500 4 :
501 4 : let gate = utils::sync::gate::Gate::default();
502 4 :
503 4 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
504 4 : .await
505 4 : .unwrap();
506 4 :
507 4 : let mutable = file.buffered_writer.inspect_mutable();
508 4 : let cap = mutable.capacity();
509 4 : let align = mutable.align();
510 4 : let content: Vec<u8> = rand::thread_rng()
511 4 : .sample_iter(rand::distributions::Standard)
512 4 : .take(cap * 2 + cap / 2)
513 4 : .collect();
514 4 :
515 4 : let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
516 4 :
517 108 : let test_read = |start: usize, len: usize| {
518 108 : let file = &file;
519 108 : let ctx = &ctx;
520 108 : let content = &content;
521 108 : async move {
522 108 : let (buf, nread) = file
523 108 : .read_exact_at_eof_ok(
524 108 : start.into_u64(),
525 108 : IoBufferMut::with_capacity(len).slice_full(),
526 108 : ctx,
527 108 : )
528 108 : .await
529 108 : .unwrap();
530 108 : assert_eq!(nread, len);
531 108 : assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
532 108 : }
533 108 : };
534 4 :
535 12 : let test_read_all_offset_combinations = || {
536 12 : async move {
537 12 : test_read(align, align).await;
538 4 : // border onto edge of file
539 12 : test_read(cap - align, align).await;
540 4 : // read across file and buffer
541 12 : test_read(cap - align, 2 * align).await;
542 4 : // stay from start of maybe flushed buffer
543 12 : test_read(cap, align).await;
544 4 : // completely within maybe flushed buffer
545 12 : test_read(cap + align, align).await;
546 4 : // border onto edge of maybe flushed buffer.
547 12 : test_read(cap * 2 - align, align).await;
548 4 : // read across maybe flushed and mutable buffer
549 12 : test_read(cap * 2 - align, 2 * align).await;
550 4 : // read across three segments
551 12 : test_read(cap - align, cap + 2 * align).await;
552 4 : // completely within mutable buffer
553 12 : test_read(cap * 2 + align, align).await;
554 12 : }
555 12 : };
556 4 :
557 4 : // completely within the file range
558 4 : assert!(align < cap, "test assumption");
559 4 : assert!(cap % align == 0);
560 4 :
561 4 : // test reads at different flush stages.
562 4 : let not_started = control.unwrap().into_not_started();
563 4 : test_read_all_offset_combinations().await;
564 4 : let in_progress = not_started.ready_to_flush();
565 4 : test_read_all_offset_combinations().await;
566 4 : in_progress.wait_until_flush_is_done().await;
567 4 : test_read_all_offset_combinations().await;
568 4 : }
569 : }
|