Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
5 : use crate::config::PageServerConf;
6 : use crate::context::RequestContext;
7 : use crate::page_cache;
8 : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
9 : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
10 : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
11 : use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
12 : use crate::virtual_file::owned_buffers_io::write::Buffer;
13 : use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
14 : use bytes::BytesMut;
15 : use camino::Utf8PathBuf;
16 : use num_traits::Num;
17 : use pageserver_api::shard::TenantShardId;
18 : use tokio_epoll_uring::{BoundedBuf, Slice};
19 : use tracing::error;
20 :
21 : use std::io;
22 : use std::sync::atomic::AtomicU64;
23 : use utils::id::TimelineId;
24 :
25 : pub struct EphemeralFile {
26 : _tenant_shard_id: TenantShardId,
27 : _timeline_id: TimelineId,
28 : page_cache_file_id: page_cache::FileId,
29 : bytes_written: u64,
30 : buffered_writer: owned_buffers_io::write::BufferedWriter<
31 : BytesMut,
32 : size_tracking_writer::Writer<VirtualFile>,
33 : >,
34 : /// Gate guard is held on as long as we need to do operations in the path (delete on drop)
35 : _gate_guard: utils::sync::gate::GateGuard,
36 : }
37 :
38 : const TAIL_SZ: usize = 64 * 1024;
39 :
40 : impl EphemeralFile {
41 1276 : pub async fn create(
42 1276 : conf: &PageServerConf,
43 1276 : tenant_shard_id: TenantShardId,
44 1276 : timeline_id: TimelineId,
45 1276 : gate_guard: utils::sync::gate::GateGuard,
46 1276 : ctx: &RequestContext,
47 1276 : ) -> Result<EphemeralFile, io::Error> {
48 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
49 1276 : let filename_disambiguator =
50 1276 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
51 1276 :
52 1276 : let filename = conf
53 1276 : .timeline_path(&tenant_shard_id, &timeline_id)
54 1276 : .join(Utf8PathBuf::from(format!(
55 1276 : "ephemeral-{filename_disambiguator}"
56 1276 : )));
57 :
58 1276 : let file = VirtualFile::open_with_options(
59 1276 : &filename,
60 1276 : virtual_file::OpenOptions::new()
61 1276 : .read(true)
62 1276 : .write(true)
63 1276 : .create(true),
64 1276 : ctx,
65 1276 : )
66 727 : .await?;
67 :
68 1276 : let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
69 1276 :
70 1276 : Ok(EphemeralFile {
71 1276 : _tenant_shard_id: tenant_shard_id,
72 1276 : _timeline_id: timeline_id,
73 1276 : page_cache_file_id,
74 1276 : bytes_written: 0,
75 1276 : buffered_writer: owned_buffers_io::write::BufferedWriter::new(
76 1276 : size_tracking_writer::Writer::new(file),
77 1276 : BytesMut::with_capacity(TAIL_SZ),
78 1276 : ),
79 1276 : _gate_guard: gate_guard,
80 1276 : })
81 1276 : }
82 : }
83 :
84 : impl Drop for EphemeralFile {
85 1154 : fn drop(&mut self) {
86 1154 : // unlink the file
87 1154 : // we are clear to do this, because we have entered a gate
88 1154 : let path = self.buffered_writer.as_inner().as_inner().path();
89 1154 : let res = std::fs::remove_file(path);
90 1154 : if let Err(e) = res {
91 2 : if e.kind() != std::io::ErrorKind::NotFound {
92 : // just never log the not found errors, we cannot do anything for them; on detach
93 : // the tenant directory is already gone.
94 : //
95 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
96 0 : error!("could not remove ephemeral file '{path}': {e}");
97 2 : }
98 1152 : }
99 1154 : }
100 : }
101 :
102 : impl EphemeralFile {
103 9610644 : pub(crate) fn len(&self) -> u64 {
104 9610644 : self.bytes_written
105 9610644 : }
106 :
107 1268 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
108 1268 : self.page_cache_file_id
109 1268 : }
110 :
111 970 : pub(crate) async fn load_to_io_buf(
112 970 : &self,
113 970 : ctx: &RequestContext,
114 970 : ) -> Result<IoBufferMut, io::Error> {
115 970 : let size = self.len().into_usize();
116 970 : let buf = IoBufferMut::with_capacity(size);
117 970 : let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
118 970 : assert_eq!(nread, size);
119 970 : let buf = slice.into_inner();
120 970 : assert_eq!(buf.len(), nread);
121 970 : assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
122 970 : Ok(buf)
123 970 : }
124 :
125 : /// Returns the offset at which the first byte of the input was written, for use
126 : /// in constructing indices over the written value.
127 : ///
128 : /// Panics if the write is short because there's no way we can recover from that.
129 : /// TODO: make upstack handle this as an error.
130 5000814 : pub(crate) async fn write_raw(
131 5000814 : &mut self,
132 5000814 : srcbuf: &[u8],
133 5000814 : ctx: &RequestContext,
134 5000814 : ) -> std::io::Result<u64> {
135 5000814 : let pos = self.bytes_written;
136 :
137 5000814 : let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
138 0 : std::io::Error::new(
139 0 : std::io::ErrorKind::Other,
140 0 : format!(
141 0 : "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
142 0 : srcbuf_len = srcbuf.len(),
143 0 : ),
144 0 : )
145 5000814 : })?;
146 :
147 : // Write the payload
148 5000814 : let nwritten = self
149 5000814 : .buffered_writer
150 5000814 : .write_buffered_borrowed(srcbuf, ctx)
151 3408 : .await?;
152 5000814 : assert_eq!(
153 5000814 : nwritten,
154 5000814 : srcbuf.len(),
155 0 : "buffered writer has no short writes"
156 : );
157 :
158 5000814 : self.bytes_written = new_bytes_written;
159 5000814 :
160 5000814 : Ok(pos)
161 5000814 : }
162 : }
163 :
164 : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
165 694796 : async fn read_exact_at_eof_ok<'a, 'b, B: IoBufAlignedMut + Send>(
166 694796 : &'b self,
167 694796 : start: u64,
168 694796 : dst: tokio_epoll_uring::Slice<B>,
169 694796 : ctx: &'a RequestContext,
170 694796 : ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
171 694796 : let file_size_tracking_writer = self.buffered_writer.as_inner();
172 694796 : let flushed_offset = file_size_tracking_writer.bytes_written();
173 694796 :
174 694796 : let buffer = self.buffered_writer.inspect_buffer();
175 694796 : let buffered = &buffer[0..buffer.pending()];
176 694796 :
177 694796 : let dst_cap = dst.bytes_total().into_u64();
178 694796 : let end = {
179 : // saturating_add is correct here because the max file size is u64::MAX, so,
180 : // if start + dst.len() > u64::MAX, then we know it will be a short read
181 694796 : let mut end: u64 = start.saturating_add(dst_cap);
182 694796 : if end > self.bytes_written {
183 276850 : end = self.bytes_written;
184 417946 : }
185 694796 : end
186 : };
187 :
188 : // inclusive, exclusive
189 : #[derive(Debug)]
190 : struct Range<N>(N, N);
191 : impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
192 2875133 : fn len(&self) -> N {
193 2875133 : if self.0 > self.1 {
194 1072424 : N::zero()
195 : } else {
196 1802709 : self.1 - self.0
197 : }
198 2875133 : }
199 : }
200 694796 : let written_range = Range(start, std::cmp::min(end, flushed_offset));
201 694796 : let buffered_range = Range(std::cmp::max(start, flushed_offset), end);
202 :
203 694796 : let dst = if written_range.len() > 0 {
204 301075 : let file: &VirtualFile = file_size_tracking_writer.as_inner();
205 301075 : let bounds = dst.bounds();
206 301075 : let slice = file
207 301075 : .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
208 152874 : .await?;
209 301075 : Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
210 : } else {
211 393721 : dst
212 : };
213 :
214 694796 : let dst = if buffered_range.len() > 0 {
215 394822 : let offset_in_buffer = buffered_range
216 394822 : .0
217 394822 : .checked_sub(flushed_offset)
218 394822 : .unwrap()
219 394822 : .into_usize();
220 394822 : let to_copy =
221 394822 : &buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())];
222 394822 : let bounds = dst.bounds();
223 394822 : let mut view = dst.slice({
224 394822 : let start = written_range.len().into_usize();
225 394822 : let end = start
226 394822 : .checked_add(buffered_range.len().into_usize())
227 394822 : .unwrap();
228 394822 : start..end
229 394822 : });
230 394822 : view.as_mut_rust_slice_full_zeroed()
231 394822 : .copy_from_slice(to_copy);
232 394822 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
233 : } else {
234 299974 : dst
235 : };
236 :
237 : // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
238 :
239 694796 : Ok((dst, (end - start).into_usize()))
240 694796 : }
241 : }
242 :
243 : /// Does the given filename look like an ephemeral file?
244 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
245 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
246 0 : rest.parse::<u32>().is_ok()
247 : } else {
248 0 : false
249 : }
250 0 : }
251 :
252 : #[cfg(test)]
253 : mod tests {
254 : use rand::Rng;
255 :
256 : use super::*;
257 : use crate::context::DownloadBehavior;
258 : use crate::task_mgr::TaskKind;
259 : use std::fs;
260 : use std::str::FromStr;
261 :
262 8 : fn harness(
263 8 : test_name: &str,
264 8 : ) -> Result<
265 8 : (
266 8 : &'static PageServerConf,
267 8 : TenantShardId,
268 8 : TimelineId,
269 8 : RequestContext,
270 8 : ),
271 8 : io::Error,
272 8 : > {
273 8 : let repo_dir = PageServerConf::test_repo_dir(test_name);
274 8 : let _ = fs::remove_dir_all(&repo_dir);
275 8 : let conf = PageServerConf::dummy_conf(repo_dir);
276 8 : // Make a static copy of the config. This can never be free'd, but that's
277 8 : // OK in a test.
278 8 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
279 8 :
280 8 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
281 8 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
282 8 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
283 :
284 8 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
285 8 :
286 8 : Ok((conf, tenant_shard_id, timeline_id, ctx))
287 8 : }
288 :
289 : #[tokio::test]
290 2 : async fn ephemeral_file_holds_gate_open() {
291 2 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
292 2 :
293 2 : let (conf, tenant_id, timeline_id, ctx) =
294 2 : harness("ephemeral_file_holds_gate_open").unwrap();
295 2 :
296 2 : let gate = utils::sync::gate::Gate::default();
297 2 :
298 2 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
299 2 : .await
300 2 : .unwrap();
301 2 :
302 2 : let mut closing = tokio::task::spawn(async move {
303 4 : gate.close().await;
304 2 : });
305 2 :
306 2 : // gate is entered until the ephemeral file is dropped
307 2 : // do not start paused tokio-epoll-uring has a sleep loop
308 2 : tokio::time::pause();
309 2 : tokio::time::timeout(FOREVER, &mut closing)
310 2 : .await
311 2 : .expect_err("closing cannot complete before dropping");
312 2 :
313 2 : // this is a requirement of the reset_tenant functionality: we have to be able to restart a
314 2 : // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
315 2 : drop(file);
316 2 :
317 2 : tokio::time::timeout(FOREVER, &mut closing)
318 2 : .await
319 2 : .expect("closing completes right away")
320 2 : .expect("closing does not panic");
321 2 : }
322 :
323 : #[tokio::test]
324 2 : async fn test_ephemeral_file_basics() {
325 2 : let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
326 2 :
327 2 : let gate = utils::sync::gate::Gate::default();
328 2 :
329 2 : let mut file =
330 2 : EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
331 2 : .await
332 2 : .unwrap();
333 2 :
334 2 : let cap = file.buffered_writer.inspect_buffer().capacity();
335 2 :
336 2 : let write_nbytes = cap + cap / 2;
337 2 :
338 2 : let content: Vec<u8> = rand::thread_rng()
339 2 : .sample_iter(rand::distributions::Standard)
340 2 : .take(write_nbytes)
341 2 : .collect();
342 2 :
343 2 : let mut value_offsets = Vec::new();
344 196608 : for i in 0..write_nbytes {
345 196608 : let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap();
346 196608 : value_offsets.push(off);
347 2 : }
348 2 :
349 2 : assert!(file.len() as usize == write_nbytes);
350 196608 : for i in 0..write_nbytes {
351 196608 : assert_eq!(value_offsets[i], i.into_u64());
352 196608 : let buf = IoBufferMut::with_capacity(1);
353 196608 : let (buf_slice, nread) = file
354 196608 : .read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
355 66560 : .await
356 196608 : .unwrap();
357 196608 : let buf = buf_slice.into_inner();
358 196608 : assert_eq!(nread, 1);
359 196608 : assert_eq!(&buf, &content[i..i + 1]);
360 2 : }
361 2 :
362 2 : let file_contents =
363 2 : std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
364 2 : assert_eq!(file_contents, &content[0..cap]);
365 2 :
366 2 : let buffer_contents = file.buffered_writer.inspect_buffer();
367 2 : assert_eq!(buffer_contents, &content[cap..write_nbytes]);
368 2 : }
369 :
370 : #[tokio::test]
371 2 : async fn test_flushes_do_happen() {
372 2 : let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
373 2 :
374 2 : let gate = utils::sync::gate::Gate::default();
375 2 :
376 2 : let mut file =
377 2 : EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
378 2 : .await
379 2 : .unwrap();
380 2 :
381 2 : let cap = file.buffered_writer.inspect_buffer().capacity();
382 2 :
383 2 : let content: Vec<u8> = rand::thread_rng()
384 2 : .sample_iter(rand::distributions::Standard)
385 2 : .take(cap + cap / 2)
386 2 : .collect();
387 2 :
388 2 : file.write_raw(&content, &ctx).await.unwrap();
389 2 :
390 2 : // assert the state is as this test expects it to be
391 2 : assert_eq!(
392 2 : &file.load_to_io_buf(&ctx).await.unwrap(),
393 2 : &content[0..cap + cap / 2]
394 2 : );
395 2 : let md = file
396 2 : .buffered_writer
397 2 : .as_inner()
398 2 : .as_inner()
399 2 : .path()
400 2 : .metadata()
401 2 : .unwrap();
402 2 : assert_eq!(
403 2 : md.len(),
404 2 : cap.into_u64(),
405 2 : "buffered writer does one write if we write 1.5x buffer capacity"
406 2 : );
407 2 : assert_eq!(
408 2 : &file.buffered_writer.inspect_buffer()[0..cap / 2],
409 2 : &content[cap..cap + cap / 2]
410 2 : );
411 2 : }
412 :
413 : #[tokio::test]
414 2 : async fn test_read_split_across_file_and_buffer() {
415 2 : // This test exercises the logic on the read path that splits the logical read
416 2 : // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
417 2 : //
418 2 : // This test build on the assertions in test_flushes_do_happen
419 2 :
420 2 : let (conf, tenant_id, timeline_id, ctx) =
421 2 : harness("test_read_split_across_file_and_buffer").unwrap();
422 2 :
423 2 : let gate = utils::sync::gate::Gate::default();
424 2 :
425 2 : let mut file =
426 2 : EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
427 2 : .await
428 2 : .unwrap();
429 2 :
430 2 : let cap = file.buffered_writer.inspect_buffer().capacity();
431 2 :
432 2 : let content: Vec<u8> = rand::thread_rng()
433 2 : .sample_iter(rand::distributions::Standard)
434 2 : .take(cap + cap / 2)
435 2 : .collect();
436 2 :
437 2 : file.write_raw(&content, &ctx).await.unwrap();
438 2 :
439 10 : let test_read = |start: usize, len: usize| {
440 10 : let file = &file;
441 10 : let ctx = &ctx;
442 10 : let content = &content;
443 10 : async move {
444 10 : let (buf, nread) = file
445 10 : .read_exact_at_eof_ok(
446 10 : start.into_u64(),
447 10 : IoBufferMut::with_capacity(len).slice_full(),
448 10 : ctx,
449 10 : )
450 3 : .await
451 10 : .unwrap();
452 10 : assert_eq!(nread, len);
453 10 : assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
454 10 : }
455 10 : };
456 2 :
457 2 : // completely within the file range
458 2 : assert!(20 < cap, "test assumption");
459 2 : test_read(10, 10).await;
460 2 : // border onto edge of file
461 2 : test_read(cap - 10, 10).await;
462 2 : // read across file and buffer
463 2 : test_read(cap - 10, 20).await;
464 2 : // stay from start of buffer
465 2 : test_read(cap, 10).await;
466 2 : // completely within buffer
467 2 : test_read(cap + 10, 10).await;
468 2 : }
469 : }
|