Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
5 : use crate::config::PageServerConf;
6 : use crate::context::RequestContext;
7 : use crate::page_cache;
8 : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
9 : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
10 : use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
11 : use crate::virtual_file::owned_buffers_io::write::Buffer;
12 : use crate::virtual_file::{self, owned_buffers_io, VirtualFile};
13 : use bytes::BytesMut;
14 : use camino::Utf8PathBuf;
15 : use num_traits::Num;
16 : use pageserver_api::shard::TenantShardId;
17 : use tokio_epoll_uring::{BoundedBuf, Slice};
18 : use tracing::error;
19 :
20 : use std::io;
21 : use std::sync::atomic::AtomicU64;
22 : use utils::id::TimelineId;
23 :
24 : pub struct EphemeralFile {
25 : _tenant_shard_id: TenantShardId,
26 : _timeline_id: TimelineId,
27 : page_cache_file_id: page_cache::FileId,
28 : bytes_written: u64,
29 : buffered_writer: owned_buffers_io::write::BufferedWriter<
30 : BytesMut,
31 : size_tracking_writer::Writer<VirtualFile>,
32 : >,
33 : /// Gate guard is held on as long as we need to do operations in the path (delete on drop)
34 : _gate_guard: utils::sync::gate::GateGuard,
35 : }
36 :
37 : const TAIL_SZ: usize = 64 * 1024;
38 :
39 : impl EphemeralFile {
40 3840 : pub async fn create(
41 3840 : conf: &PageServerConf,
42 3840 : tenant_shard_id: TenantShardId,
43 3840 : timeline_id: TimelineId,
44 3840 : gate_guard: utils::sync::gate::GateGuard,
45 3840 : ctx: &RequestContext,
46 3840 : ) -> Result<EphemeralFile, io::Error> {
47 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
48 3840 : let filename_disambiguator =
49 3840 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
50 3840 :
51 3840 : let filename = conf
52 3840 : .timeline_path(&tenant_shard_id, &timeline_id)
53 3840 : .join(Utf8PathBuf::from(format!(
54 3840 : "ephemeral-{filename_disambiguator}"
55 3840 : )));
56 :
57 3840 : let file = VirtualFile::open_with_options(
58 3840 : &filename,
59 3840 : virtual_file::OpenOptions::new()
60 3840 : .read(true)
61 3840 : .write(true)
62 3840 : .create(true),
63 3840 : ctx,
64 3840 : )
65 2184 : .await?;
66 :
67 3840 : let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
68 3840 :
69 3840 : Ok(EphemeralFile {
70 3840 : _tenant_shard_id: tenant_shard_id,
71 3840 : _timeline_id: timeline_id,
72 3840 : page_cache_file_id,
73 3840 : bytes_written: 0,
74 3840 : buffered_writer: owned_buffers_io::write::BufferedWriter::new(
75 3840 : size_tracking_writer::Writer::new(file),
76 3840 : BytesMut::with_capacity(TAIL_SZ),
77 3840 : ),
78 3840 : _gate_guard: gate_guard,
79 3840 : })
80 3840 : }
81 : }
82 :
83 : impl Drop for EphemeralFile {
84 3456 : fn drop(&mut self) {
85 3456 : // unlink the file
86 3456 : // we are clear to do this, because we have entered a gate
87 3456 : let path = &self.buffered_writer.as_inner().as_inner().path;
88 3456 : let res = std::fs::remove_file(path);
89 3456 : if let Err(e) = res {
90 6 : if e.kind() != std::io::ErrorKind::NotFound {
91 : // just never log the not found errors, we cannot do anything for them; on detach
92 : // the tenant directory is already gone.
93 : //
94 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
95 0 : error!("could not remove ephemeral file '{path}': {e}");
96 6 : }
97 3450 : }
98 3456 : }
99 : }
100 :
101 : impl EphemeralFile {
102 28832028 : pub(crate) fn len(&self) -> u64 {
103 28832028 : self.bytes_written
104 28832028 : }
105 :
106 3816 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
107 3816 : self.page_cache_file_id
108 3816 : }
109 :
110 2910 : pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
111 2910 : let size = self.len().into_usize();
112 2910 : let vec = Vec::with_capacity(size);
113 2910 : let (slice, nread) = self.read_exact_at_eof_ok(0, vec.slice_full(), ctx).await?;
114 2910 : assert_eq!(nread, size);
115 2910 : let vec = slice.into_inner();
116 2910 : assert_eq!(vec.len(), nread);
117 2910 : assert_eq!(vec.capacity(), size, "we shouldn't be reallocating");
118 2910 : Ok(vec)
119 2910 : }
120 :
121 : /// Returns the offset at which the first byte of the input was written, for use
122 : /// in constructing indices over the written value.
123 : ///
124 : /// Panics if the write is short because there's no way we can recover from that.
125 : /// TODO: make upstack handle this as an error.
126 15002484 : pub(crate) async fn write_raw(
127 15002484 : &mut self,
128 15002484 : srcbuf: &[u8],
129 15002484 : ctx: &RequestContext,
130 15002484 : ) -> std::io::Result<u64> {
131 15002484 : let pos = self.bytes_written;
132 :
133 15002484 : let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
134 0 : std::io::Error::new(
135 0 : std::io::ErrorKind::Other,
136 0 : format!(
137 0 : "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
138 0 : srcbuf_len = srcbuf.len(),
139 0 : ),
140 0 : )
141 15002484 : })?;
142 :
143 : // Write the payload
144 15002484 : let nwritten = self
145 15002484 : .buffered_writer
146 15002484 : .write_buffered_borrowed(srcbuf, ctx)
147 10544 : .await?;
148 15002484 : assert_eq!(
149 15002484 : nwritten,
150 15002484 : srcbuf.len(),
151 0 : "buffered writer has no short writes"
152 : );
153 :
154 15002484 : self.bytes_written = new_bytes_written;
155 15002484 :
156 15002484 : Ok(pos)
157 15002484 : }
158 : }
159 :
160 : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
161 2082949 : async fn read_exact_at_eof_ok<'a, 'b, B: tokio_epoll_uring::IoBufMut + Send>(
162 2082949 : &'b self,
163 2082949 : start: u64,
164 2082949 : dst: tokio_epoll_uring::Slice<B>,
165 2082949 : ctx: &'a RequestContext,
166 2082949 : ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
167 2082949 : let file_size_tracking_writer = self.buffered_writer.as_inner();
168 2082949 : let flushed_offset = file_size_tracking_writer.bytes_written();
169 2082949 :
170 2082949 : let buffer = self.buffered_writer.inspect_buffer();
171 2082949 : let buffered = &buffer[0..buffer.pending()];
172 2082949 :
173 2082949 : let dst_cap = dst.bytes_total().into_u64();
174 2082949 : let end = {
175 : // saturating_add is correct here because the max file size is u64::MAX, so,
176 : // if start + dst.len() > u64::MAX, then we know it will be a short read
177 2082949 : let mut end: u64 = start.saturating_add(dst_cap);
178 2082949 : if end > self.bytes_written {
179 830523 : end = self.bytes_written;
180 1252426 : }
181 2082949 : end
182 : };
183 :
184 : // inclusive, exclusive
185 : #[derive(Debug)]
186 : struct Range<N>(N, N);
187 : impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
188 8620919 : fn len(&self) -> N {
189 8620919 : if self.0 > self.1 {
190 3215775 : N::zero()
191 : } else {
192 5405144 : self.1 - self.0
193 : }
194 8620919 : }
195 : }
196 2082949 : let written_range = Range(start, std::cmp::min(end, flushed_offset));
197 2082949 : let buffered_range = Range(std::cmp::max(start, flushed_offset), end);
198 :
199 2082949 : let dst = if written_range.len() > 0 {
200 901836 : let file: &VirtualFile = file_size_tracking_writer.as_inner();
201 901836 : let bounds = dst.bounds();
202 901836 : let slice = file
203 901836 : .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
204 457651 : .await?;
205 901836 : Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
206 : } else {
207 1181113 : dst
208 : };
209 :
210 2082949 : let dst = if buffered_range.len() > 0 {
211 1184395 : let offset_in_buffer = buffered_range
212 1184395 : .0
213 1184395 : .checked_sub(flushed_offset)
214 1184395 : .unwrap()
215 1184395 : .into_usize();
216 1184395 : let to_copy =
217 1184395 : &buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())];
218 1184395 : let bounds = dst.bounds();
219 1184395 : let mut view = dst.slice({
220 1184395 : let start = written_range.len().into_usize();
221 1184395 : let end = start
222 1184395 : .checked_add(buffered_range.len().into_usize())
223 1184395 : .unwrap();
224 1184395 : start..end
225 1184395 : });
226 1184395 : view.as_mut_rust_slice_full_zeroed()
227 1184395 : .copy_from_slice(to_copy);
228 1184395 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
229 : } else {
230 898554 : dst
231 : };
232 :
233 : // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
234 :
235 2082949 : Ok((dst, (end - start).into_usize()))
236 2082949 : }
237 : }
238 :
239 : /// Does the given filename look like an ephemeral file?
240 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
241 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
242 0 : rest.parse::<u32>().is_ok()
243 : } else {
244 0 : false
245 : }
246 0 : }
247 :
248 : #[cfg(test)]
249 : mod tests {
250 : use rand::Rng;
251 :
252 : use super::*;
253 : use crate::context::DownloadBehavior;
254 : use crate::task_mgr::TaskKind;
255 : use std::fs;
256 : use std::str::FromStr;
257 :
258 24 : fn harness(
259 24 : test_name: &str,
260 24 : ) -> Result<
261 24 : (
262 24 : &'static PageServerConf,
263 24 : TenantShardId,
264 24 : TimelineId,
265 24 : RequestContext,
266 24 : ),
267 24 : io::Error,
268 24 : > {
269 24 : let repo_dir = PageServerConf::test_repo_dir(test_name);
270 24 : let _ = fs::remove_dir_all(&repo_dir);
271 24 : let conf = PageServerConf::dummy_conf(repo_dir);
272 24 : // Make a static copy of the config. This can never be free'd, but that's
273 24 : // OK in a test.
274 24 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
275 24 :
276 24 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
277 24 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
278 24 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
279 :
280 24 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
281 24 :
282 24 : Ok((conf, tenant_shard_id, timeline_id, ctx))
283 24 : }
284 :
285 : #[tokio::test]
286 6 : async fn ephemeral_file_holds_gate_open() {
287 6 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
288 6 :
289 6 : let (conf, tenant_id, timeline_id, ctx) =
290 6 : harness("ephemeral_file_holds_gate_open").unwrap();
291 6 :
292 6 : let gate = utils::sync::gate::Gate::default();
293 6 :
294 6 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
295 6 : .await
296 6 : .unwrap();
297 6 :
298 6 : let mut closing = tokio::task::spawn(async move {
299 12 : gate.close().await;
300 6 : });
301 6 :
302 6 : // gate is entered until the ephemeral file is dropped
303 6 : // do not start paused tokio-epoll-uring has a sleep loop
304 6 : tokio::time::pause();
305 6 : tokio::time::timeout(FOREVER, &mut closing)
306 6 : .await
307 6 : .expect_err("closing cannot complete before dropping");
308 6 :
309 6 : // this is a requirement of the reset_tenant functionality: we have to be able to restart a
310 6 : // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
311 6 : drop(file);
312 6 :
313 6 : tokio::time::timeout(FOREVER, &mut closing)
314 6 : .await
315 6 : .expect("closing completes right away")
316 6 : .expect("closing does not panic");
317 6 : }
318 :
319 : #[tokio::test]
320 6 : async fn test_ephemeral_file_basics() {
321 6 : let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
322 6 :
323 6 : let gate = utils::sync::gate::Gate::default();
324 6 :
325 6 : let mut file =
326 6 : EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
327 6 : .await
328 6 : .unwrap();
329 6 :
330 6 : let cap = file.buffered_writer.inspect_buffer().capacity();
331 6 :
332 6 : let write_nbytes = cap + cap / 2;
333 6 :
334 6 : let content: Vec<u8> = rand::thread_rng()
335 6 : .sample_iter(rand::distributions::Standard)
336 6 : .take(write_nbytes)
337 6 : .collect();
338 6 :
339 6 : let mut value_offsets = Vec::new();
340 589824 : for i in 0..write_nbytes {
341 589824 : let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap();
342 589824 : value_offsets.push(off);
343 6 : }
344 6 :
345 6 : assert!(file.len() as usize == write_nbytes);
346 589824 : for i in 0..write_nbytes {
347 589824 : assert_eq!(value_offsets[i], i.into_u64());
348 589824 : let buf = Vec::with_capacity(1);
349 589824 : let (buf_slice, nread) = file
350 589824 : .read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
351 199680 : .await
352 589824 : .unwrap();
353 589824 : let buf = buf_slice.into_inner();
354 589824 : assert_eq!(nread, 1);
355 589824 : assert_eq!(&buf, &content[i..i + 1]);
356 6 : }
357 6 :
358 6 : let file_contents =
359 6 : std::fs::read(&file.buffered_writer.as_inner().as_inner().path).unwrap();
360 6 : assert_eq!(file_contents, &content[0..cap]);
361 6 :
362 6 : let buffer_contents = file.buffered_writer.inspect_buffer();
363 6 : assert_eq!(buffer_contents, &content[cap..write_nbytes]);
364 6 : }
365 :
366 : #[tokio::test]
367 6 : async fn test_flushes_do_happen() {
368 6 : let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
369 6 :
370 6 : let gate = utils::sync::gate::Gate::default();
371 6 :
372 6 : let mut file =
373 6 : EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
374 6 : .await
375 6 : .unwrap();
376 6 :
377 6 : let cap = file.buffered_writer.inspect_buffer().capacity();
378 6 :
379 6 : let content: Vec<u8> = rand::thread_rng()
380 6 : .sample_iter(rand::distributions::Standard)
381 6 : .take(cap + cap / 2)
382 6 : .collect();
383 6 :
384 6 : file.write_raw(&content, &ctx).await.unwrap();
385 6 :
386 6 : // assert the state is as this test expects it to be
387 6 : assert_eq!(
388 6 : &file.load_to_vec(&ctx).await.unwrap(),
389 6 : &content[0..cap + cap / 2]
390 6 : );
391 6 : let md = file
392 6 : .buffered_writer
393 6 : .as_inner()
394 6 : .as_inner()
395 6 : .path
396 6 : .metadata()
397 6 : .unwrap();
398 6 : assert_eq!(
399 6 : md.len(),
400 6 : cap.into_u64(),
401 6 : "buffered writer does one write if we write 1.5x buffer capacity"
402 6 : );
403 6 : assert_eq!(
404 6 : &file.buffered_writer.inspect_buffer()[0..cap / 2],
405 6 : &content[cap..cap + cap / 2]
406 6 : );
407 6 : }
408 :
409 : #[tokio::test]
410 6 : async fn test_read_split_across_file_and_buffer() {
411 6 : // This test exercises the logic on the read path that splits the logical read
412 6 : // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
413 6 : //
414 6 : // This test build on the assertions in test_flushes_do_happen
415 6 :
416 6 : let (conf, tenant_id, timeline_id, ctx) =
417 6 : harness("test_read_split_across_file_and_buffer").unwrap();
418 6 :
419 6 : let gate = utils::sync::gate::Gate::default();
420 6 :
421 6 : let mut file =
422 6 : EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
423 6 : .await
424 6 : .unwrap();
425 6 :
426 6 : let cap = file.buffered_writer.inspect_buffer().capacity();
427 6 :
428 6 : let content: Vec<u8> = rand::thread_rng()
429 6 : .sample_iter(rand::distributions::Standard)
430 6 : .take(cap + cap / 2)
431 6 : .collect();
432 6 :
433 6 : file.write_raw(&content, &ctx).await.unwrap();
434 6 :
435 30 : let test_read = |start: usize, len: usize| {
436 30 : let file = &file;
437 30 : let ctx = &ctx;
438 30 : let content = &content;
439 30 : async move {
440 30 : let (buf, nread) = file
441 30 : .read_exact_at_eof_ok(
442 30 : start.into_u64(),
443 30 : Vec::with_capacity(len).slice_full(),
444 30 : ctx,
445 30 : )
446 9 : .await
447 30 : .unwrap();
448 30 : assert_eq!(nread, len);
449 30 : assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
450 30 : }
451 30 : };
452 6 :
453 6 : // completely within the file range
454 6 : assert!(20 < cap, "test assumption");
455 6 : test_read(10, 10).await;
456 6 : // border onto edge of file
457 6 : test_read(cap - 10, 10).await;
458 6 : // read across file and buffer
459 6 : test_read(cap - 10, 20).await;
460 6 : // stay from start of buffer
461 6 : test_read(cap, 10).await;
462 6 : // completely within buffer
463 6 : test_read(cap + 10, 10).await;
464 6 : }
465 : }
|