Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
5 : use crate::config::PageServerConf;
6 : use crate::context::RequestContext;
7 : use crate::page_cache;
8 : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
9 : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
10 : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
11 : use crate::virtual_file::owned_buffers_io::write::Buffer;
12 : use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
13 : use camino::Utf8PathBuf;
14 : use num_traits::Num;
15 : use pageserver_api::shard::TenantShardId;
16 : use tokio_epoll_uring::{BoundedBuf, Slice};
17 : use tracing::error;
18 :
19 : use std::io;
20 : use std::sync::atomic::AtomicU64;
21 : use std::sync::Arc;
22 : use utils::id::TimelineId;
23 :
24 : pub struct EphemeralFile {
25 : _tenant_shard_id: TenantShardId,
26 : _timeline_id: TimelineId,
27 : page_cache_file_id: page_cache::FileId,
28 : bytes_written: u64,
29 : buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
30 : /// Gate guard is held on as long as we need to do operations in the path (delete on drop)
31 : _gate_guard: utils::sync::gate::GateGuard,
32 : }
33 :
34 : const TAIL_SZ: usize = 64 * 1024;
35 :
36 : impl EphemeralFile {
37 1280 : pub async fn create(
38 1280 : conf: &PageServerConf,
39 1280 : tenant_shard_id: TenantShardId,
40 1280 : timeline_id: TimelineId,
41 1280 : gate: &utils::sync::gate::Gate,
42 1280 : ctx: &RequestContext,
43 1280 : ) -> anyhow::Result<EphemeralFile> {
44 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
45 1280 : let filename_disambiguator =
46 1280 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
47 1280 :
48 1280 : let filename = conf
49 1280 : .timeline_path(&tenant_shard_id, &timeline_id)
50 1280 : .join(Utf8PathBuf::from(format!(
51 1280 : "ephemeral-{filename_disambiguator}"
52 1280 : )));
53 :
54 1280 : let file = Arc::new(
55 1280 : VirtualFile::open_with_options_v2(
56 1280 : &filename,
57 1280 : virtual_file::OpenOptions::new()
58 1280 : .read(true)
59 1280 : .write(true)
60 1280 : .create(true),
61 1280 : ctx,
62 1280 : )
63 1280 : .await?,
64 : );
65 :
66 1280 : let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
67 1280 :
68 1280 : Ok(EphemeralFile {
69 1280 : _tenant_shard_id: tenant_shard_id,
70 1280 : _timeline_id: timeline_id,
71 1280 : page_cache_file_id,
72 1280 : bytes_written: 0,
73 1280 : buffered_writer: owned_buffers_io::write::BufferedWriter::new(
74 1280 : file,
75 2560 : || IoBufferMut::with_capacity(TAIL_SZ),
76 1280 : gate.enter()?,
77 1280 : ctx,
78 1280 : ),
79 1280 : _gate_guard: gate.enter()?,
80 : })
81 1280 : }
82 : }
83 :
84 : impl Drop for EphemeralFile {
85 1158 : fn drop(&mut self) {
86 1158 : // unlink the file
87 1158 : // we are clear to do this, because we have entered a gate
88 1158 : let path = self.buffered_writer.as_inner().path();
89 1158 : let res = std::fs::remove_file(path);
90 1158 : if let Err(e) = res {
91 2 : if e.kind() != std::io::ErrorKind::NotFound {
92 : // just never log the not found errors, we cannot do anything for them; on detach
93 : // the tenant directory is already gone.
94 : //
95 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
96 0 : error!("could not remove ephemeral file '{path}': {e}");
97 2 : }
98 1156 : }
99 1158 : }
100 : }
101 :
102 : impl EphemeralFile {
103 9610656 : pub(crate) fn len(&self) -> u64 {
104 9610656 : self.bytes_written
105 9610656 : }
106 :
107 1272 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
108 1272 : self.page_cache_file_id
109 1272 : }
110 :
111 970 : pub(crate) async fn load_to_io_buf(
112 970 : &self,
113 970 : ctx: &RequestContext,
114 970 : ) -> Result<IoBufferMut, io::Error> {
115 970 : let size = self.len().into_usize();
116 970 : let buf = IoBufferMut::with_capacity(size);
117 970 : let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
118 970 : assert_eq!(nread, size);
119 970 : let buf = slice.into_inner();
120 970 : assert_eq!(buf.len(), nread);
121 970 : assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
122 970 : Ok(buf)
123 970 : }
124 :
125 : /// Returns the offset at which the first byte of the input was written, for use
126 : /// in constructing indices over the written value.
127 : ///
128 : /// Panics if the write is short because there's no way we can recover from that.
129 : /// TODO: make upstack handle this as an error.
130 4804848 : pub(crate) async fn write_raw(
131 4804848 : &mut self,
132 4804848 : srcbuf: &[u8],
133 4804848 : ctx: &RequestContext,
134 4804848 : ) -> std::io::Result<u64> {
135 4804848 : let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
136 4804848 : if let Some(control) = control {
137 5534 : control.release().await;
138 4799314 : }
139 4804848 : Ok(pos)
140 4804848 : }
141 :
142 4804850 : async fn write_raw_controlled(
143 4804850 : &mut self,
144 4804850 : srcbuf: &[u8],
145 4804850 : ctx: &RequestContext,
146 4804850 : ) -> std::io::Result<(u64, Option<owned_buffers_io::write::FlushControl>)> {
147 4804850 : let pos = self.bytes_written;
148 :
149 4804850 : let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
150 0 : std::io::Error::new(
151 0 : std::io::ErrorKind::Other,
152 0 : format!(
153 0 : "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
154 0 : srcbuf_len = srcbuf.len(),
155 0 : ),
156 0 : )
157 4804850 : })?;
158 :
159 : // Write the payload
160 4804850 : let (nwritten, control) = self
161 4804850 : .buffered_writer
162 4804850 : .write_buffered_borrowed_controlled(srcbuf, ctx)
163 4804850 : .await?;
164 4804850 : assert_eq!(
165 4804850 : nwritten,
166 4804850 : srcbuf.len(),
167 0 : "buffered writer has no short writes"
168 : );
169 :
170 4804850 : self.bytes_written = new_bytes_written;
171 4804850 :
172 4804850 : Ok((pos, control))
173 4804850 : }
174 : }
175 :
176 : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
177 498416 : async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
178 498416 : &self,
179 498416 : start: u64,
180 498416 : dst: tokio_epoll_uring::Slice<B>,
181 498416 : ctx: &RequestContext,
182 498416 : ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
183 498416 : let submitted_offset = self.buffered_writer.bytes_submitted();
184 498416 :
185 498416 : let mutable = self.buffered_writer.inspect_mutable();
186 498416 : let mutable = &mutable[0..mutable.pending()];
187 498416 :
188 498416 : let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
189 498416 :
190 498416 : let dst_cap = dst.bytes_total().into_u64();
191 498416 : let end = {
192 : // saturating_add is correct here because the max file size is u64::MAX, so,
193 : // if start + dst.len() > u64::MAX, then we know it will be a short read
194 498416 : let mut end: u64 = start.saturating_add(dst_cap);
195 498416 : if end > self.bytes_written {
196 276856 : end = self.bytes_written;
197 276856 : }
198 498416 : end
199 : };
200 :
201 : // inclusive, exclusive
202 : #[derive(Debug)]
203 : struct Range<N>(N, N);
204 : impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
205 3304174 : fn len(&self) -> N {
206 3304174 : if self.0 > self.1 {
207 1774301 : N::zero()
208 : } else {
209 1529873 : self.1 - self.0
210 : }
211 3304174 : }
212 : }
213 :
214 498416 : let (written_range, maybe_flushed_range) = {
215 498416 : if maybe_flushed.is_some() {
216 : // [ written ][ maybe_flushed ][ mutable ]
217 : // <- TAIL_SZ -><- TAIL_SZ ->
218 : // ^
219 : // `submitted_offset`
220 : // <++++++ on disk +++++++????????????????>
221 488263 : (
222 488263 : Range(
223 488263 : start,
224 488263 : std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
225 488263 : ),
226 488263 : Range(
227 488263 : std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
228 488263 : std::cmp::min(end, submitted_offset),
229 488263 : ),
230 488263 : )
231 : } else {
232 : // [ written ][ mutable ]
233 : // <- TAIL_SZ ->
234 : // ^
235 : // `submitted_offset`
236 : // <++++++ on disk +++++++++++++++++++++++>
237 10153 : (
238 10153 : Range(start, std::cmp::min(end, submitted_offset)),
239 10153 : // zero len
240 10153 : Range(submitted_offset, u64::MIN),
241 10153 : )
242 : }
243 : };
244 :
245 498416 : let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
246 :
247 498416 : let dst = if written_range.len() > 0 {
248 10183 : let file: &VirtualFile = self.buffered_writer.as_inner();
249 10183 : let bounds = dst.bounds();
250 10183 : let slice = file
251 10183 : .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
252 10183 : .await?;
253 10183 : Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
254 : } else {
255 488233 : dst
256 : };
257 :
258 498416 : let dst = if maybe_flushed_range.len() > 0 {
259 160413 : let offset_in_buffer = maybe_flushed_range
260 160413 : .0
261 160413 : .checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
262 160413 : .unwrap()
263 160413 : .into_usize();
264 160413 : // Checked previously the buffer is Some.
265 160413 : let maybe_flushed = maybe_flushed.unwrap();
266 160413 : let to_copy = &maybe_flushed
267 160413 : [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
268 160413 : let bounds = dst.bounds();
269 160413 : let mut view = dst.slice({
270 160413 : let start = written_range.len().into_usize();
271 160413 : let end = start
272 160413 : .checked_add(maybe_flushed_range.len().into_usize())
273 160413 : .unwrap();
274 160413 : start..end
275 160413 : });
276 160413 : view.as_mut_rust_slice_full_zeroed()
277 160413 : .copy_from_slice(to_copy);
278 160413 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
279 : } else {
280 338003 : dst
281 : };
282 :
283 498416 : let dst = if mutable_range.len() > 0 {
284 329376 : let offset_in_buffer = mutable_range
285 329376 : .0
286 329376 : .checked_sub(submitted_offset)
287 329376 : .unwrap()
288 329376 : .into_usize();
289 329376 : let to_copy =
290 329376 : &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
291 329376 : let bounds = dst.bounds();
292 329376 : let mut view = dst.slice({
293 329376 : let start =
294 329376 : written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
295 329376 : let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
296 329376 : start..end
297 329376 : });
298 329376 : view.as_mut_rust_slice_full_zeroed()
299 329376 : .copy_from_slice(to_copy);
300 329376 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
301 : } else {
302 169040 : dst
303 : };
304 :
305 : // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
306 :
307 498416 : Ok((dst, (end - start).into_usize()))
308 498416 : }
309 : }
310 :
311 : /// Does the given filename look like an ephemeral file?
312 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
313 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
314 0 : rest.parse::<u32>().is_ok()
315 : } else {
316 0 : false
317 : }
318 0 : }
319 :
320 : #[cfg(test)]
321 : mod tests {
322 : use rand::Rng;
323 :
324 : use super::*;
325 : use crate::context::DownloadBehavior;
326 : use crate::task_mgr::TaskKind;
327 : use std::fs;
328 : use std::str::FromStr;
329 :
330 8 : fn harness(
331 8 : test_name: &str,
332 8 : ) -> Result<
333 8 : (
334 8 : &'static PageServerConf,
335 8 : TenantShardId,
336 8 : TimelineId,
337 8 : RequestContext,
338 8 : ),
339 8 : io::Error,
340 8 : > {
341 8 : let repo_dir = PageServerConf::test_repo_dir(test_name);
342 8 : let _ = fs::remove_dir_all(&repo_dir);
343 8 : let conf = PageServerConf::dummy_conf(repo_dir);
344 8 : // Make a static copy of the config. This can never be free'd, but that's
345 8 : // OK in a test.
346 8 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
347 8 :
348 8 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
349 8 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
350 8 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
351 :
352 8 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
353 8 :
354 8 : Ok((conf, tenant_shard_id, timeline_id, ctx))
355 8 : }
356 :
357 : #[tokio::test]
358 2 : async fn ephemeral_file_holds_gate_open() {
359 2 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
360 2 :
361 2 : let (conf, tenant_id, timeline_id, ctx) =
362 2 : harness("ephemeral_file_holds_gate_open").unwrap();
363 2 :
364 2 : let gate = utils::sync::gate::Gate::default();
365 2 :
366 2 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
367 2 : .await
368 2 : .unwrap();
369 2 :
370 2 : let mut closing = tokio::task::spawn(async move {
371 2 : gate.close().await;
372 2 : });
373 2 :
374 2 : // gate is entered until the ephemeral file is dropped
375 2 : // do not start paused tokio-epoll-uring has a sleep loop
376 2 : tokio::time::pause();
377 2 : tokio::time::timeout(FOREVER, &mut closing)
378 2 : .await
379 2 : .expect_err("closing cannot complete before dropping");
380 2 :
381 2 : // this is a requirement of the reset_tenant functionality: we have to be able to restart a
382 2 : // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
383 2 : drop(file);
384 2 :
385 2 : tokio::time::timeout(FOREVER, &mut closing)
386 2 : .await
387 2 : .expect("closing completes right away")
388 2 : .expect("closing does not panic");
389 2 : }
390 :
391 : #[tokio::test]
392 2 : async fn test_ephemeral_file_basics() {
393 2 : let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
394 2 :
395 2 : let gate = utils::sync::gate::Gate::default();
396 2 :
397 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
398 2 : .await
399 2 : .unwrap();
400 2 :
401 2 : let mutable = file.buffered_writer.inspect_mutable();
402 2 : let cap = mutable.capacity();
403 2 : let align = mutable.align();
404 2 :
405 2 : let write_nbytes = cap * 2 + cap / 2;
406 2 :
407 2 : let content: Vec<u8> = rand::thread_rng()
408 2 : .sample_iter(rand::distributions::Standard)
409 2 : .take(write_nbytes)
410 2 : .collect();
411 2 :
412 2 : let mut value_offsets = Vec::new();
413 640 : for range in (0..write_nbytes)
414 2 : .step_by(align)
415 640 : .map(|start| start..(start + align).min(write_nbytes))
416 2 : {
417 640 : let off = file.write_raw(&content[range], &ctx).await.unwrap();
418 640 : value_offsets.push(off);
419 2 : }
420 2 :
421 2 : assert_eq!(file.len() as usize, write_nbytes);
422 640 : for (i, range) in (0..write_nbytes)
423 2 : .step_by(align)
424 640 : .map(|start| start..(start + align).min(write_nbytes))
425 2 : .enumerate()
426 2 : {
427 640 : assert_eq!(value_offsets[i], range.start.into_u64());
428 640 : let buf = IoBufferMut::with_capacity(range.len());
429 640 : let (buf_slice, nread) = file
430 640 : .read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
431 640 : .await
432 640 : .unwrap();
433 640 : let buf = buf_slice.into_inner();
434 640 : assert_eq!(nread, range.len());
435 640 : assert_eq!(&buf, &content[range]);
436 2 : }
437 2 :
438 2 : let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
439 2 : assert!(file_contents == content[0..cap * 2]);
440 2 :
441 2 : let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
442 2 : assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
443 2 :
444 2 : let mutable_buffer_contents = file.buffered_writer.inspect_mutable();
445 2 : assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
446 2 : }
447 :
448 : #[tokio::test]
449 2 : async fn test_flushes_do_happen() {
450 2 : let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
451 2 :
452 2 : let gate = utils::sync::gate::Gate::default();
453 2 :
454 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
455 2 : .await
456 2 : .unwrap();
457 2 :
458 2 : // mutable buffer and maybe_flushed buffer each has `cap` bytes.
459 2 : let cap = file.buffered_writer.inspect_mutable().capacity();
460 2 :
461 2 : let content: Vec<u8> = rand::thread_rng()
462 2 : .sample_iter(rand::distributions::Standard)
463 2 : .take(cap * 2 + cap / 2)
464 2 : .collect();
465 2 :
466 2 : file.write_raw(&content, &ctx).await.unwrap();
467 2 :
468 2 : // assert the state is as this test expects it to be
469 2 : assert_eq!(
470 2 : &file.load_to_io_buf(&ctx).await.unwrap(),
471 2 : &content[0..cap * 2 + cap / 2]
472 2 : );
473 2 : let md = file.buffered_writer.as_inner().path().metadata().unwrap();
474 2 : assert_eq!(
475 2 : md.len(),
476 2 : 2 * cap.into_u64(),
477 2 : "buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
478 2 : );
479 2 : assert_eq!(
480 2 : &file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
481 2 : &content[cap..cap * 2]
482 2 : );
483 2 : assert_eq!(
484 2 : &file.buffered_writer.inspect_mutable()[0..cap / 2],
485 2 : &content[cap * 2..cap * 2 + cap / 2]
486 2 : );
487 2 : }
488 :
489 : #[tokio::test]
490 2 : async fn test_read_split_across_file_and_buffer() {
491 2 : // This test exercises the logic on the read path that splits the logical read
492 2 : // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
493 2 : //
494 2 : // This test build on the assertions in test_flushes_do_happen
495 2 :
496 2 : let (conf, tenant_id, timeline_id, ctx) =
497 2 : harness("test_read_split_across_file_and_buffer").unwrap();
498 2 :
499 2 : let gate = utils::sync::gate::Gate::default();
500 2 :
501 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
502 2 : .await
503 2 : .unwrap();
504 2 :
505 2 : let mutable = file.buffered_writer.inspect_mutable();
506 2 : let cap = mutable.capacity();
507 2 : let align = mutable.align();
508 2 : let content: Vec<u8> = rand::thread_rng()
509 2 : .sample_iter(rand::distributions::Standard)
510 2 : .take(cap * 2 + cap / 2)
511 2 : .collect();
512 2 :
513 2 : let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
514 2 :
515 54 : let test_read = |start: usize, len: usize| {
516 54 : let file = &file;
517 54 : let ctx = &ctx;
518 54 : let content = &content;
519 54 : async move {
520 54 : let (buf, nread) = file
521 54 : .read_exact_at_eof_ok(
522 54 : start.into_u64(),
523 54 : IoBufferMut::with_capacity(len).slice_full(),
524 54 : ctx,
525 54 : )
526 54 : .await
527 54 : .unwrap();
528 54 : assert_eq!(nread, len);
529 54 : assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
530 54 : }
531 54 : };
532 2 :
533 6 : let test_read_all_offset_combinations = || {
534 6 : async move {
535 6 : test_read(align, align).await;
536 2 : // border onto edge of file
537 6 : test_read(cap - align, align).await;
538 2 : // read across file and buffer
539 6 : test_read(cap - align, 2 * align).await;
540 2 : // stay from start of maybe flushed buffer
541 6 : test_read(cap, align).await;
542 2 : // completely within maybe flushed buffer
543 6 : test_read(cap + align, align).await;
544 2 : // border onto edge of maybe flushed buffer.
545 6 : test_read(cap * 2 - align, align).await;
546 2 : // read across maybe flushed and mutable buffer
547 6 : test_read(cap * 2 - align, 2 * align).await;
548 2 : // read across three segments
549 6 : test_read(cap - align, cap + 2 * align).await;
550 2 : // completely within mutable buffer
551 6 : test_read(cap * 2 + align, align).await;
552 6 : }
553 6 : };
554 2 :
555 2 : // completely within the file range
556 2 : assert!(align < cap, "test assumption");
557 2 : assert!(cap % align == 0);
558 2 :
559 2 : // test reads at different flush stages.
560 2 : let not_started = control.unwrap().into_not_started();
561 2 : test_read_all_offset_combinations().await;
562 2 : let in_progress = not_started.ready_to_flush();
563 2 : test_read_all_offset_combinations().await;
564 2 : in_progress.wait_until_flush_is_done().await;
565 2 : test_read_all_offset_combinations().await;
566 2 : }
567 : }
|