Line data Source code
1 : //! VirtualFile is like a normal File, but it's not bound directly to
2 : //! a file descriptor.
3 : //!
4 : //! Instead, the file is opened when it's read from,
5 : //! and if too many files are open globally in the system, least-recently
6 : //! used ones are closed.
7 : //!
8 : //! To track which files have been recently used, we use the clock algorithm
9 : //! with a 'recently_used' flag on each slot.
10 : //!
11 : //! This is similar to PostgreSQL's virtual file descriptor facility in
12 : //! src/backend/storage/file/fd.c
13 : //!
14 : use std::fs::File;
15 : use std::io::{Error, ErrorKind};
16 : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
17 : #[cfg(target_os = "linux")]
18 : use std::os::unix::fs::OpenOptionsExt;
19 : use std::sync::LazyLock;
20 : use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
21 :
22 : use camino::{Utf8Path, Utf8PathBuf};
23 : use once_cell::sync::OnceCell;
24 : use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
25 : use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign};
26 : use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
27 : use owned_buffers_io::io_buf_ext::FullSlice;
28 : use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
29 : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
30 : use tokio::time::Instant;
31 : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
32 :
33 : use self::owned_buffers_io::write::OwnedAsyncWriter;
34 : use crate::assert_u64_eq_usize::UsizeIsU64;
35 : use crate::context::RequestContext;
36 : use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation};
37 : use crate::page_cache::{PAGE_SZ, PageWriteGuard};
38 :
39 : pub(crate) use api::IoMode;
40 : pub(crate) use io_engine::IoEngineKind;
41 : pub use io_engine::{
42 : FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test,
43 : io_engine_for_bench,
44 : };
45 : pub(crate) use metadata::Metadata;
46 : pub(crate) use open_options::*;
47 : pub use pageserver_api::models::virtual_file as api;
48 : pub use temporary::TempVirtualFile;
49 :
50 : pub(crate) mod io_engine;
51 : mod metadata;
52 : mod open_options;
53 : mod temporary;
54 : pub(crate) mod owned_buffers_io {
55 : //! Abstractions for IO with owned buffers.
56 : //!
57 : //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
58 : //! reason we need this abstraction.
59 : //!
60 : //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
61 : //! but for the time being we're proving out the primitives in the neon.git repo
62 : //! for faster iteration.
63 :
64 : pub(crate) mod aligned_buffer;
65 : pub(crate) mod io_buf_aligned;
66 : pub(crate) mod io_buf_ext;
67 : pub(crate) mod slice;
68 : pub(crate) mod write;
69 : }
70 :
71 : #[derive(Debug)]
72 : pub struct VirtualFile {
73 : inner: VirtualFileInner,
74 : _mode: IoMode,
75 : }
76 :
77 : impl VirtualFile {
78 : /// Open a file in read-only mode. Like File::open.
79 6096 : pub async fn open<P: AsRef<Utf8Path>>(
80 6096 : path: P,
81 6096 : ctx: &RequestContext,
82 6096 : ) -> Result<Self, std::io::Error> {
83 6096 : let inner = VirtualFileInner::open(path, ctx).await?;
84 6096 : Ok(VirtualFile {
85 6096 : inner,
86 6096 : _mode: IoMode::Buffered,
87 6096 : })
88 6096 : }
89 :
90 : /// Open a file in read-only mode. Like File::open.
91 : ///
92 : /// `O_DIRECT` will be enabled base on `virtual_file_io_mode`.
93 7656 : pub async fn open_v2<P: AsRef<Utf8Path>>(
94 7656 : path: P,
95 7656 : ctx: &RequestContext,
96 7656 : ) -> Result<Self, std::io::Error> {
97 7656 : Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
98 7656 : }
99 :
100 29676 : pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
101 29676 : path: P,
102 29676 : open_options: &OpenOptions,
103 29676 : ctx: &RequestContext,
104 29676 : ) -> Result<Self, std::io::Error> {
105 29676 : let mode = get_io_mode();
106 29676 : let set_o_direct = match (mode, open_options.is_write()) {
107 29676 : (IoMode::Buffered, _) => false,
108 : #[cfg(target_os = "linux")]
109 0 : (IoMode::Direct, false) => true,
110 : #[cfg(target_os = "linux")]
111 0 : (IoMode::Direct, true) => false,
112 : #[cfg(target_os = "linux")]
113 0 : (IoMode::DirectRw, _) => true,
114 : };
115 29676 : let open_options = open_options.clone();
116 29676 : let open_options = if set_o_direct {
117 : #[cfg(target_os = "linux")]
118 : {
119 0 : let mut open_options = open_options;
120 0 : open_options.custom_flags(nix::libc::O_DIRECT);
121 0 : open_options
122 : }
123 : #[cfg(not(target_os = "linux"))]
124 : unreachable!(
125 : "O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
126 : );
127 : } else {
128 29676 : open_options
129 : };
130 29676 : let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
131 29676 : Ok(VirtualFile { inner, _mode: mode })
132 29676 : }
133 :
134 8988 : pub fn path(&self) -> &Utf8Path {
135 8988 : self.inner.path.as_path()
136 8988 : }
137 :
138 132 : pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
139 132 : final_path: Utf8PathBuf,
140 132 : tmp_path: Utf8PathBuf,
141 132 : content: B,
142 132 : ) -> std::io::Result<()> {
143 132 : VirtualFileInner::crashsafe_overwrite(final_path, tmp_path, content).await
144 132 : }
145 :
146 17124 : pub async fn sync_all(&self) -> Result<(), Error> {
147 17124 : if SYNC_MODE.load(std::sync::atomic::Ordering::Relaxed) == SyncMode::UnsafeNoSync as u8 {
148 0 : return Ok(());
149 17124 : }
150 17124 : self.inner.sync_all().await
151 17124 : }
152 :
153 0 : pub async fn sync_data(&self) -> Result<(), Error> {
154 0 : if SYNC_MODE.load(std::sync::atomic::Ordering::Relaxed) == SyncMode::UnsafeNoSync as u8 {
155 0 : return Ok(());
156 0 : }
157 0 : self.inner.sync_data().await
158 0 : }
159 :
160 84 : pub async fn set_len(&self, len: u64, ctx: &RequestContext) -> Result<(), Error> {
161 84 : self.inner.set_len(len, ctx).await
162 84 : }
163 :
164 10992 : pub async fn metadata(&self) -> Result<Metadata, Error> {
165 10992 : self.inner.metadata().await
166 10992 : }
167 :
168 1841698 : pub async fn read_exact_at<Buf>(
169 1841698 : &self,
170 1841698 : slice: Slice<Buf>,
171 1841698 : offset: u64,
172 1841698 : ctx: &RequestContext,
173 1841698 : ) -> Result<Slice<Buf>, Error>
174 1841698 : where
175 1841698 : Buf: IoBufAlignedMut + Send,
176 1841698 : {
177 1841698 : self.inner.read_exact_at(slice, offset, ctx).await
178 1841698 : }
179 :
180 185181 : pub async fn read_exact_at_page(
181 185181 : &self,
182 185181 : page: PageWriteGuard<'static>,
183 185181 : offset: u64,
184 185181 : ctx: &RequestContext,
185 185181 : ) -> Result<PageWriteGuard<'static>, Error> {
186 185181 : self.inner.read_exact_at_page(page, offset, ctx).await
187 185181 : }
188 :
189 237203 : pub async fn write_all_at<Buf: IoBufAligned + Send>(
190 237203 : &self,
191 237203 : buf: FullSlice<Buf>,
192 237203 : offset: u64,
193 237203 : ctx: &RequestContext,
194 237203 : ) -> (FullSlice<Buf>, Result<(), Error>) {
195 237203 : self.inner.write_all_at(buf, offset, ctx).await
196 237203 : }
197 :
198 0 : pub(crate) async fn read_to_string<P: AsRef<Utf8Path>>(
199 0 : path: P,
200 0 : ctx: &RequestContext,
201 0 : ) -> std::io::Result<String> {
202 0 : let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2
203 0 : let mut buf = Vec::new();
204 0 : let mut tmp = vec![0; 128];
205 0 : let mut pos: u64 = 0;
206 : loop {
207 0 : let slice = tmp.slice(..128);
208 0 : let (slice, res) = file.inner.read_at(slice, pos, ctx).await;
209 0 : match res {
210 0 : Ok(0) => break,
211 0 : Ok(n) => {
212 0 : pos += n as u64;
213 0 : buf.extend_from_slice(&slice[..n]);
214 0 : }
215 0 : Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
216 0 : Err(e) => return Err(e),
217 : }
218 0 : tmp = slice.into_inner();
219 : }
220 0 : String::from_utf8(buf).map_err(|_| {
221 0 : std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8")
222 0 : })
223 0 : }
224 : }
225 :
226 : /// Indicates whether to enable fsync, fdatasync, or O_SYNC/O_DSYNC when writing
227 : /// files. Switching this off is unsafe and only used for testing on machines
228 : /// with slow drives.
229 : #[repr(u8)]
230 : pub enum SyncMode {
231 : Sync,
232 : UnsafeNoSync,
233 : }
234 :
235 : impl TryFrom<u8> for SyncMode {
236 : type Error = u8;
237 :
238 0 : fn try_from(value: u8) -> Result<Self, Self::Error> {
239 0 : Ok(match value {
240 0 : v if v == (SyncMode::Sync as u8) => SyncMode::Sync,
241 0 : v if v == (SyncMode::UnsafeNoSync as u8) => SyncMode::UnsafeNoSync,
242 0 : x => return Err(x),
243 : })
244 0 : }
245 : }
246 :
247 : ///
248 : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
249 : /// the underlying file is closed if the system is low on file descriptors,
250 : /// and re-opened when it's accessed again.
251 : ///
252 : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
253 : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
254 : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
255 : /// require a mutable reference, because they modify the "current position".
256 : ///
257 : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
258 : /// slot that 'handle points to, if the underlying file is currently open. If it's not
259 : /// currently open, the 'handle' can still point to the slot where it was last kept. The
260 : /// 'tag' field is used to detect whether the handle still is valid or not.
261 : ///
262 : #[derive(Debug)]
263 : pub struct VirtualFileInner {
264 : /// Lazy handle to the global file descriptor cache. The slot that this points to
265 : /// might contain our File, or it may be empty, or it may contain a File that
266 : /// belongs to a different VirtualFile.
267 : handle: RwLock<SlotHandle>,
268 :
269 : /// File path and options to use to open it.
270 : ///
271 : /// Note: this only contains the options needed to re-open it. For example,
272 : /// if a new file is created, we only pass the create flag when it's initially
273 : /// opened, in the VirtualFile::create() function, and strip the flag before
274 : /// storing it here.
275 : pub path: Utf8PathBuf,
276 : open_options: OpenOptions,
277 : }
278 :
279 : #[derive(Debug, PartialEq, Clone, Copy)]
280 : struct SlotHandle {
281 : /// Index into OPEN_FILES.slots
282 : index: usize,
283 :
284 : /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
285 : /// been recycled and no longer contains the FD for this virtual file.
286 : tag: u64,
287 : }
288 :
289 : /// OPEN_FILES is the global array that holds the physical file descriptors that
290 : /// are currently open. Each slot in the array is protected by a separate lock,
291 : /// so that different files can be accessed independently. The lock must be held
292 : /// in write mode to replace the slot with a different file, but a read mode
293 : /// is enough to operate on the file, whether you're reading or writing to it.
294 : ///
295 : /// OPEN_FILES starts in uninitialized state, and it's initialized by
296 : /// the virtual_file::init() function. It must be called exactly once at page
297 : /// server startup.
298 : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
299 :
300 : struct OpenFiles {
301 : slots: &'static [Slot],
302 :
303 : /// clock arm for the clock algorithm
304 : next: AtomicUsize,
305 : }
306 :
307 : struct Slot {
308 : inner: RwLock<SlotInner>,
309 :
310 : /// has this file been used since last clock sweep?
311 : recently_used: AtomicBool,
312 : }
313 :
314 : struct SlotInner {
315 : /// Counter that's incremented every time a different file is stored here.
316 : /// To avoid the ABA problem.
317 : tag: u64,
318 :
319 : /// the underlying file
320 : file: Option<OwnedFd>,
321 : }
322 :
323 : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
324 : struct PageWriteGuardBuf {
325 : page: PageWriteGuard<'static>,
326 : }
327 : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
328 : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
329 : // Page cache pages are zero-initialized, so, wrt uninitialized memory we're good.
330 : // (Page cache tracks separately whether the contents are valid, see `PageWriteGuard::mark_valid`.)
331 : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
332 740523 : fn stable_ptr(&self) -> *const u8 {
333 740523 : self.page.as_ptr()
334 740523 : }
335 1388556 : fn bytes_init(&self) -> usize {
336 1388556 : self.page.len()
337 1388556 : }
338 555543 : fn bytes_total(&self) -> usize {
339 555543 : self.page.len()
340 555543 : }
341 : }
342 : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
343 : // hence it's safe to hand out the `stable_mut_ptr()`.
344 : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
345 277671 : fn stable_mut_ptr(&mut self) -> *mut u8 {
346 277671 : self.page.as_mut_ptr()
347 277671 : }
348 :
349 185181 : unsafe fn set_init(&mut self, pos: usize) {
350 185181 : // There shouldn't really be any reason to call this API since bytes_init() == bytes_total().
351 185181 : assert!(pos <= self.page.len());
352 185181 : }
353 : }
354 :
355 : impl OpenFiles {
356 : /// Find a slot to use, evicting an existing file descriptor if needed.
357 : ///
358 : /// On return, we hold a lock on the slot, and its 'tag' has been updated
359 : /// recently_used has been set. It's all ready for reuse.
360 1160219 : async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
361 1160219 : //
362 1160219 : // Run the clock algorithm to find a slot to replace.
363 1160219 : //
364 1160219 : let num_slots = self.slots.len();
365 1160219 : let mut retries = 0;
366 : let mut slot;
367 : let mut slot_guard;
368 : let index;
369 : loop {
370 14129610 : let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
371 14129610 : slot = &self.slots[next];
372 14129610 :
373 14129610 : // If the recently_used flag on this slot is set, continue the clock
374 14129610 : // sweep. Otherwise try to use this slot. If we cannot acquire the
375 14129610 : // lock, also continue the clock sweep.
376 14129610 : //
377 14129610 : // We only continue in this manner for a while, though. If we loop
378 14129610 : // through the array twice without finding a victim, just pick the
379 14129610 : // next slot and wait until we can reuse it. This way, we avoid
380 14129610 : // spinning in the extreme case that all the slots are busy with an
381 14129610 : // I/O operation.
382 14129610 : if retries < num_slots * 2 {
383 13587570 : if !slot.recently_used.swap(false, Ordering::Release) {
384 12344676 : if let Ok(guard) = slot.inner.try_write() {
385 618179 : slot_guard = guard;
386 618179 : index = next;
387 618179 : break;
388 11726497 : }
389 1242894 : }
390 12969391 : retries += 1;
391 : } else {
392 542040 : slot_guard = slot.inner.write().await;
393 542040 : index = next;
394 542040 : break;
395 : }
396 : }
397 :
398 : //
399 : // We now have the victim slot locked. If it was in use previously, close the
400 : // old file.
401 : //
402 1160219 : if let Some(old_file) = slot_guard.file.take() {
403 1129920 : // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
404 1129920 : // distinguish the two.
405 1129920 : STORAGE_IO_TIME_METRIC
406 1129920 : .get(StorageIoOperation::CloseByReplace)
407 1129920 : .observe_closure_duration(|| drop(old_file));
408 1129920 : }
409 :
410 : // Prepare the slot for reuse and return it
411 1160219 : slot_guard.tag += 1;
412 1160219 : slot.recently_used.store(true, Ordering::Relaxed);
413 1160219 : (
414 1160219 : SlotHandle {
415 1160219 : index,
416 1160219 : tag: slot_guard.tag,
417 1160219 : },
418 1160219 : slot_guard,
419 1160219 : )
420 1160219 : }
421 : }
422 :
423 : /// Identify error types that should alwways terminate the process. Other
424 : /// error types may be elegible for retry.
425 36 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
426 : use nix::errno::Errno::*;
427 36 : match e.raw_os_error().map(nix::errno::from_i32) {
428 : Some(EIO) => {
429 : // Terminate on EIO because we no longer trust the device to store
430 : // data safely, or to uphold persistence guarantees on fsync.
431 0 : true
432 : }
433 : Some(EROFS) => {
434 : // Terminate on EROFS because a filesystem is usually remounted
435 : // readonly when it has experienced some critical issue, so the same
436 : // logic as EIO applies.
437 0 : true
438 : }
439 : Some(EACCES) => {
440 : // Terminate on EACCESS because we should always have permissions
441 : // for our own data dir: if we don't, then we can't do our job and
442 : // need administrative intervention to fix permissions. Terminating
443 : // is the best way to make sure we stop cleanly rather than going
444 : // into infinite retry loops, and will make it clear to the outside
445 : // world that we need help.
446 0 : true
447 : }
448 : _ => {
449 : // Treat all other local file I/O errors are retryable. This includes:
450 : // - ENOSPC: we stay up and wait for eviction to free some space
451 : // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
452 : // - WriteZero, Interrupted: these are used internally VirtualFile
453 36 : false
454 : }
455 : }
456 36 : }
457 :
458 : /// Call this when the local filesystem gives us an error with an external
459 : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
460 : /// bad storage or bad configuration, and we can't fix that from inside
461 : /// a running process.
462 0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
463 0 : let backtrace = std::backtrace::Backtrace::force_capture();
464 0 : tracing::error!("Fatal I/O error: {e}: {context})\n{backtrace}");
465 0 : std::process::abort();
466 : }
467 :
468 : pub(crate) trait MaybeFatalIo<T> {
469 : fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
470 : fn fatal_err(self, context: &str) -> T;
471 : }
472 :
473 : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
474 : /// Terminate the process if the result is an error of a fatal type, else pass it through
475 : ///
476 : /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
477 : /// not on ENOSPC.
478 6993169 : fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
479 6993169 : if let Err(e) = &self {
480 36 : if is_fatal_io_error(e) {
481 0 : on_fatal_io_error(e, context);
482 36 : }
483 6993133 : }
484 6993169 : self
485 6993169 : }
486 :
487 : /// Terminate the process on any I/O error.
488 : ///
489 : /// This is appropriate for reads on files that we know exist: they should always work.
490 12348 : fn fatal_err(self, context: &str) -> T {
491 12348 : match self {
492 12348 : Ok(v) => v,
493 0 : Err(e) => {
494 0 : on_fatal_io_error(&e, context);
495 : }
496 : }
497 12348 : }
498 : }
499 :
500 : /// Observe duration for the given storage I/O operation
501 : ///
502 : /// Unlike `observe_closure_duration`, this supports async,
503 : /// where "support" means that we measure wall clock time.
504 : macro_rules! observe_duration {
505 : ($op:expr, $($body:tt)*) => {{
506 : let instant = Instant::now();
507 : let result = $($body)*;
508 : let elapsed = instant.elapsed().as_secs_f64();
509 : STORAGE_IO_TIME_METRIC
510 : .get($op)
511 : .observe(elapsed);
512 : result
513 : }}
514 : }
515 :
516 : macro_rules! with_file {
517 : ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
518 : let $ident = $this.lock_file().await?;
519 : observe_duration!($op, $($body)*)
520 : }};
521 : ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
522 : let mut $ident = $this.lock_file().await?;
523 : observe_duration!($op, $($body)*)
524 : }};
525 : }
526 :
527 : impl VirtualFileInner {
528 : /// Open a file in read-only mode. Like File::open.
529 6096 : pub async fn open<P: AsRef<Utf8Path>>(
530 6096 : path: P,
531 6096 : ctx: &RequestContext,
532 6096 : ) -> Result<VirtualFileInner, std::io::Error> {
533 6096 : Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
534 6096 : }
535 :
536 : /// Open a file with given options.
537 : ///
538 : /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
539 : /// they will be applied also when the file is subsequently re-opened, not only
540 : /// on the first time. Make sure that's sane!
541 36972 : pub async fn open_with_options<P: AsRef<Utf8Path>>(
542 36972 : path: P,
543 36972 : open_options: OpenOptions,
544 36972 : _ctx: &RequestContext,
545 36972 : ) -> Result<VirtualFileInner, std::io::Error> {
546 36972 : let path = path.as_ref();
547 36972 : let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
548 :
549 : // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
550 : // where our caller doesn't get to use the returned VirtualFile before its
551 : // slot gets re-used by someone else.
552 36972 : let file = observe_duration!(StorageIoOperation::Open, {
553 36972 : open_options.open(path.as_std_path()).await?
554 : });
555 :
556 : // Strip all options other than read and write.
557 : //
558 : // It would perhaps be nicer to check just for the read and write flags
559 : // explicitly, but OpenOptions doesn't contain any functions to read flags,
560 : // only to set them.
561 36972 : let mut reopen_options = open_options.clone();
562 36972 : reopen_options.create(false);
563 36972 : reopen_options.create_new(false);
564 36972 : reopen_options.truncate(false);
565 36972 :
566 36972 : let vfile = VirtualFileInner {
567 36972 : handle: RwLock::new(handle),
568 36972 : path: path.to_owned(),
569 36972 : open_options: reopen_options,
570 36972 : };
571 36972 :
572 36972 : // TODO: Under pressure, it's likely the slot will get re-used and
573 36972 : // the underlying file closed before they get around to using it.
574 36972 : // => https://github.com/neondatabase/neon/issues/6065
575 36972 : slot_guard.file.replace(file);
576 36972 :
577 36972 : Ok(vfile)
578 36972 : }
579 :
580 : /// Async version of [`::utils::crashsafe::overwrite`].
581 : ///
582 : /// # NB:
583 : ///
584 : /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
585 : /// it did at an earlier time.
586 : /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
587 168 : pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
588 168 : final_path: Utf8PathBuf,
589 168 : tmp_path: Utf8PathBuf,
590 168 : content: B,
591 168 : ) -> std::io::Result<()> {
592 168 : // TODO: use tokio_epoll_uring if configured as `io_engine`.
593 168 : // See https://github.com/neondatabase/neon/issues/6663
594 168 :
595 168 : tokio::task::spawn_blocking(move || {
596 168 : let slice_storage;
597 168 : let content_len = content.bytes_init();
598 168 : let content = if content.bytes_init() > 0 {
599 168 : slice_storage = Some(content.slice(0..content_len));
600 168 : slice_storage.as_deref().expect("just set it to Some()")
601 : } else {
602 0 : &[]
603 : };
604 168 : utils::crashsafe::overwrite(&final_path, &tmp_path, content)
605 168 : .maybe_fatal_err("crashsafe_overwrite")
606 168 : })
607 168 : .await
608 168 : .expect("blocking task is never aborted")
609 168 : }
610 :
611 : /// Call File::sync_all() on the underlying File.
612 17124 : pub async fn sync_all(&self) -> Result<(), Error> {
613 17124 : with_file!(self, StorageIoOperation::Fsync, |file_guard| {
614 17124 : let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
615 17124 : res.maybe_fatal_err("sync_all")
616 : })
617 17124 : }
618 :
619 : /// Call File::sync_data() on the underlying File.
620 0 : pub async fn sync_data(&self) -> Result<(), Error> {
621 0 : with_file!(self, StorageIoOperation::Fsync, |file_guard| {
622 0 : let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
623 0 : res.maybe_fatal_err("sync_data")
624 : })
625 0 : }
626 :
627 10992 : pub async fn metadata(&self) -> Result<Metadata, Error> {
628 10992 : with_file!(self, StorageIoOperation::Metadata, |file_guard| {
629 10992 : let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
630 10992 : res
631 : })
632 10992 : }
633 :
634 84 : pub async fn set_len(&self, len: u64, _ctx: &RequestContext) -> Result<(), Error> {
635 84 : with_file!(self, StorageIoOperation::SetLen, |file_guard| {
636 84 : let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await;
637 84 : res.maybe_fatal_err("set_len")
638 : })
639 84 : }
640 :
641 : /// Helper function internal to `VirtualFile` that looks up the underlying File,
642 : /// opens it and evicts some other File if necessary. The passed parameter is
643 : /// assumed to be a function available for the physical `File`.
644 : ///
645 : /// We are doing it via a macro as Rust doesn't support async closures that
646 : /// take on parameters with lifetimes.
647 3553830 : async fn lock_file(&self) -> Result<FileGuard, Error> {
648 3553830 : let open_files = get_open_files();
649 :
650 1123247 : let mut handle_guard = {
651 : // Read the cached slot handle, and see if the slot that it points to still
652 : // contains our File.
653 : //
654 : // We only need to hold the handle lock while we read the current handle. If
655 : // another thread closes the file and recycles the slot for a different file,
656 : // we will notice that the handle we read is no longer valid and retry.
657 3553830 : let mut handle = *self.handle.read().await;
658 : loop {
659 : // Check if the slot contains our File
660 : {
661 4115354 : let slot = &open_files.slots[handle.index];
662 4115354 : let slot_guard = slot.inner.read().await;
663 4115354 : if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
664 : // Found a cached file descriptor.
665 2430583 : slot.recently_used.store(true, Ordering::Relaxed);
666 2430583 : return Ok(FileGuard { slot_guard });
667 1684771 : }
668 : }
669 :
670 : // The slot didn't contain our File. We will have to open it ourselves,
671 : // but before that, grab a write lock on handle in the VirtualFile, so
672 : // that no other thread will try to concurrently open the same file.
673 1684771 : let handle_guard = self.handle.write().await;
674 :
675 : // If another thread changed the handle while we were not holding the lock,
676 : // then the handle might now be valid again. Loop back to retry.
677 1684771 : if *handle_guard != handle {
678 561524 : handle = *handle_guard;
679 561524 : continue;
680 1123247 : }
681 1123247 : break handle_guard;
682 : }
683 : };
684 :
685 : // We need to open the file ourselves. The handle in the VirtualFile is
686 : // now locked in write-mode. Find a free slot to put it in.
687 1123247 : let (handle, mut slot_guard) = open_files.find_victim_slot().await;
688 :
689 : // Re-open the physical file.
690 : // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
691 : // case from StorageIoOperation::Open. This helps with identifying thrashing
692 : // of the virtual file descriptor cache.
693 1123247 : let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
694 1123247 : self.open_options.open(self.path.as_std_path()).await?
695 : });
696 :
697 : // Store the File in the slot and update the handle in the VirtualFile
698 : // to point to it.
699 1123247 : slot_guard.file.replace(file);
700 1123247 :
701 1123247 : *handle_guard = handle;
702 1123247 :
703 1123247 : Ok(FileGuard {
704 1123247 : slot_guard: slot_guard.downgrade(),
705 1123247 : })
706 3553830 : }
707 :
708 : /// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
709 : ///
710 : /// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
711 3288427 : pub async fn read_exact_at<Buf>(
712 3288427 : &self,
713 3288427 : slice: Slice<Buf>,
714 3288427 : offset: u64,
715 3288427 : ctx: &RequestContext,
716 3288427 : ) -> Result<Slice<Buf>, Error>
717 3288427 : where
718 3288427 : Buf: IoBufAlignedMut + Send,
719 3288427 : {
720 3288427 : let assert_we_return_original_bounds = if cfg!(debug_assertions) {
721 3288427 : Some((slice.stable_ptr() as usize, slice.bytes_total()))
722 : } else {
723 0 : None
724 : };
725 :
726 3288427 : let original_bounds = slice.bounds();
727 3288427 : let (buf, res) =
728 3288427 : read_exact_at_impl(slice, offset, |buf, offset| self.read_at(buf, offset, ctx)).await;
729 3288427 : let res = res.map(|_| buf.slice(original_bounds));
730 :
731 3288427 : if let Some(original_bounds) = assert_we_return_original_bounds {
732 3288427 : if let Ok(slice) = &res {
733 3288415 : let returned_bounds = (slice.stable_ptr() as usize, slice.bytes_total());
734 3288415 : assert_eq!(original_bounds, returned_bounds);
735 12 : }
736 0 : }
737 :
738 3288427 : res
739 3288427 : }
740 :
741 : /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
742 185181 : pub async fn read_exact_at_page(
743 185181 : &self,
744 185181 : page: PageWriteGuard<'static>,
745 185181 : offset: u64,
746 185181 : ctx: &RequestContext,
747 185181 : ) -> Result<PageWriteGuard<'static>, Error> {
748 185181 : let buf = PageWriteGuardBuf { page }.slice_full();
749 185181 : debug_assert_eq!(buf.bytes_total(), PAGE_SZ);
750 185181 : self.read_exact_at(buf, offset, ctx)
751 185181 : .await
752 185181 : .map(|slice| slice.into_inner().page)
753 185181 : }
754 :
755 : // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
756 237203 : pub async fn write_all_at<Buf: IoBuf + Send>(
757 237203 : &self,
758 237203 : buf: FullSlice<Buf>,
759 237203 : mut offset: u64,
760 237203 : ctx: &RequestContext,
761 237203 : ) -> (FullSlice<Buf>, Result<(), Error>) {
762 237203 : let buf = buf.into_raw_slice();
763 237203 : let bounds = buf.bounds();
764 237203 : let restore =
765 237203 : |buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
766 237203 : let mut buf = buf;
767 474394 : while !buf.is_empty() {
768 237203 : let (tmp, res) = self.write_at(FullSlice::must_new(buf), offset, ctx).await;
769 237203 : buf = tmp.into_raw_slice();
770 12 : match res {
771 : Ok(0) => {
772 0 : return (
773 0 : restore(buf),
774 0 : Err(Error::new(
775 0 : std::io::ErrorKind::WriteZero,
776 0 : "failed to write whole buffer",
777 0 : )),
778 0 : );
779 : }
780 237191 : Ok(n) => {
781 237191 : buf = buf.slice(n..);
782 237191 : offset += n as u64;
783 237191 : }
784 12 : Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
785 12 : Err(e) => return (restore(buf), Err(e)),
786 : }
787 : }
788 237191 : (restore(buf), Ok(()))
789 237203 : }
790 :
791 3288427 : pub(super) async fn read_at<Buf>(
792 3288427 : &self,
793 3288427 : buf: tokio_epoll_uring::Slice<Buf>,
794 3288427 : offset: u64,
795 3288427 : ctx: &RequestContext,
796 3288427 : ) -> (tokio_epoll_uring::Slice<Buf>, Result<usize, Error>)
797 3288427 : where
798 3288427 : Buf: tokio_epoll_uring::IoBufMut + Send,
799 3288427 : {
800 3288427 : let file_guard = match self
801 3288427 : .lock_file()
802 3288427 : .await
803 3288427 : .maybe_fatal_err("lock_file inside VirtualFileInner::read_at")
804 : {
805 3288427 : Ok(file_guard) => file_guard,
806 0 : Err(e) => return (buf, Err(e)),
807 : };
808 :
809 3288427 : observe_duration!(StorageIoOperation::Read, {
810 3288427 : let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
811 3288427 : let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at");
812 3288427 : if let Ok(size) = res {
813 3288415 : ctx.io_size_metrics().read.add(size.into_u64());
814 3288415 : }
815 3288427 : (buf, res)
816 : })
817 3288427 : }
818 :
819 237203 : async fn write_at<B: IoBuf + Send>(
820 237203 : &self,
821 237203 : buf: FullSlice<B>,
822 237203 : offset: u64,
823 237203 : ctx: &RequestContext,
824 237203 : ) -> (FullSlice<B>, Result<usize, Error>) {
825 237203 : let file_guard = match self.lock_file().await {
826 237203 : Ok(file_guard) => file_guard,
827 0 : Err(e) => return (buf, Err(e)),
828 : };
829 237203 : observe_duration!(StorageIoOperation::Write, {
830 237203 : let ((_file_guard, buf), result) =
831 237203 : io_engine::get().write_at(file_guard, offset, buf).await;
832 237203 : let result = result.maybe_fatal_err("write_at");
833 237203 : if let Ok(size) = result {
834 237191 : ctx.io_size_metrics().write.add(size.into_u64());
835 237191 : }
836 237203 : (buf, result)
837 : })
838 237203 : }
839 : }
840 :
841 : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
842 3288475 : pub async fn read_exact_at_impl<Buf, F, Fut>(
843 3288475 : mut buf: tokio_epoll_uring::Slice<Buf>,
844 3288475 : mut offset: u64,
845 3288475 : mut read_at: F,
846 3288475 : ) -> (Buf, std::io::Result<()>)
847 3288475 : where
848 3288475 : Buf: IoBufMut + Send,
849 3288475 : F: FnMut(tokio_epoll_uring::Slice<Buf>, u64) -> Fut,
850 3288475 : Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<Buf>, std::io::Result<usize>)>,
851 3288475 : {
852 6576950 : while buf.bytes_total() != 0 {
853 : let res;
854 3288499 : (buf, res) = read_at(buf, offset).await;
855 12 : match res {
856 12 : Ok(0) => break,
857 3288475 : Ok(n) => {
858 3288475 : buf = buf.slice(n..);
859 3288475 : offset += n as u64;
860 3288475 : }
861 12 : Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
862 12 : Err(e) => return (buf.into_inner(), Err(e)),
863 : }
864 : }
865 : // NB: don't use `buf.is_empty()` here; it is from the
866 : // `impl Deref for Slice { Target = [u8] }`; the &[u8]
867 : // returned by it only covers the initialized portion of `buf`.
868 : // Whereas we're interested in ensuring that we filled the entire
869 : // buffer that the user passed in.
870 3288463 : if buf.bytes_total() != 0 {
871 12 : (
872 12 : buf.into_inner(),
873 12 : Err(std::io::Error::new(
874 12 : std::io::ErrorKind::UnexpectedEof,
875 12 : "failed to fill whole buffer",
876 12 : )),
877 12 : )
878 : } else {
879 3288451 : assert_eq!(buf.len(), buf.bytes_total());
880 3288451 : (buf.into_inner(), Ok(()))
881 : }
882 3288475 : }
883 :
884 : #[cfg(test)]
885 : mod test_read_exact_at_impl {
886 :
887 : use std::collections::VecDeque;
888 : use std::sync::Arc;
889 :
890 : use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
891 :
892 : use super::read_exact_at_impl;
893 :
894 : struct Expectation {
895 : offset: u64,
896 : bytes_total: usize,
897 : result: std::io::Result<Vec<u8>>,
898 : }
899 : struct MockReadAt {
900 : expectations: VecDeque<Expectation>,
901 : }
902 :
903 : impl MockReadAt {
904 72 : async fn read_at(
905 72 : &mut self,
906 72 : mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
907 72 : offset: u64,
908 72 : ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
909 72 : let exp = self
910 72 : .expectations
911 72 : .pop_front()
912 72 : .expect("read_at called but we have no expectations left");
913 72 : assert_eq!(exp.offset, offset);
914 72 : assert_eq!(exp.bytes_total, buf.bytes_total());
915 72 : match exp.result {
916 72 : Ok(bytes) => {
917 72 : assert!(bytes.len() <= buf.bytes_total());
918 72 : buf.put_slice(&bytes);
919 72 : (buf, Ok(bytes.len()))
920 : }
921 0 : Err(e) => (buf, Err(e)),
922 : }
923 72 : }
924 : }
925 :
926 : impl Drop for MockReadAt {
927 48 : fn drop(&mut self) {
928 48 : assert_eq!(self.expectations.len(), 0);
929 48 : }
930 : }
931 :
932 : #[tokio::test]
933 12 : async fn test_basic() {
934 12 : let buf = Vec::with_capacity(5).slice_full();
935 12 : let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
936 12 : expectations: VecDeque::from(vec![Expectation {
937 12 : offset: 0,
938 12 : bytes_total: 5,
939 12 : result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
940 12 : }]),
941 12 : }));
942 12 : let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
943 12 : let mock_read_at = Arc::clone(&mock_read_at);
944 12 : async move { mock_read_at.lock().await.read_at(buf, offset).await }
945 12 : })
946 12 : .await;
947 12 : assert!(res.is_ok());
948 12 : assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
949 12 : }
950 :
951 : #[tokio::test]
952 12 : async fn test_empty_buf_issues_no_syscall() {
953 12 : let buf = Vec::new().slice_full();
954 12 : let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
955 12 : expectations: VecDeque::new(),
956 12 : }));
957 12 : let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
958 0 : let mock_read_at = Arc::clone(&mock_read_at);
959 12 : async move { mock_read_at.lock().await.read_at(buf, offset).await }
960 12 : })
961 12 : .await;
962 12 : assert!(res.is_ok());
963 12 : }
964 :
965 : #[tokio::test]
966 12 : async fn test_two_read_at_calls_needed_until_buf_filled() {
967 12 : let buf = Vec::with_capacity(4).slice_full();
968 12 : let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
969 12 : expectations: VecDeque::from(vec![
970 12 : Expectation {
971 12 : offset: 0,
972 12 : bytes_total: 4,
973 12 : result: Ok(vec![b'a', b'b']),
974 12 : },
975 12 : Expectation {
976 12 : offset: 2,
977 12 : bytes_total: 2,
978 12 : result: Ok(vec![b'c', b'd']),
979 12 : },
980 12 : ]),
981 12 : }));
982 24 : let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
983 24 : let mock_read_at = Arc::clone(&mock_read_at);
984 24 : async move { mock_read_at.lock().await.read_at(buf, offset).await }
985 24 : })
986 12 : .await;
987 12 : assert!(res.is_ok());
988 12 : assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
989 12 : }
990 :
991 : #[tokio::test]
992 12 : async fn test_eof_before_buffer_full() {
993 12 : let buf = Vec::with_capacity(3).slice_full();
994 12 : let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
995 12 : expectations: VecDeque::from(vec![
996 12 : Expectation {
997 12 : offset: 0,
998 12 : bytes_total: 3,
999 12 : result: Ok(vec![b'a']),
1000 12 : },
1001 12 : Expectation {
1002 12 : offset: 1,
1003 12 : bytes_total: 2,
1004 12 : result: Ok(vec![b'b']),
1005 12 : },
1006 12 : Expectation {
1007 12 : offset: 2,
1008 12 : bytes_total: 1,
1009 12 : result: Ok(vec![]),
1010 12 : },
1011 12 : ]),
1012 12 : }));
1013 36 : let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
1014 36 : let mock_read_at = Arc::clone(&mock_read_at);
1015 36 : async move { mock_read_at.lock().await.read_at(buf, offset).await }
1016 36 : })
1017 12 : .await;
1018 12 : let Err(err) = res else {
1019 12 : panic!("should return an error");
1020 12 : };
1021 12 : assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
1022 12 : assert_eq!(format!("{err}"), "failed to fill whole buffer");
1023 12 : // buffer contents on error are unspecified
1024 12 : }
1025 : }
1026 :
1027 : struct FileGuard {
1028 : slot_guard: RwLockReadGuard<'static, SlotInner>,
1029 : }
1030 :
1031 : impl AsRef<OwnedFd> for FileGuard {
1032 3553830 : fn as_ref(&self) -> &OwnedFd {
1033 3553830 : // This unwrap is safe because we only create `FileGuard`s
1034 3553830 : // if we know that the file is Some.
1035 3553830 : self.slot_guard.file.as_ref().unwrap()
1036 3553830 : }
1037 : }
1038 :
1039 : impl FileGuard {
1040 : /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
1041 1776832 : fn with_std_file<F, R>(&self, with: F) -> R
1042 1776832 : where
1043 1776832 : F: FnOnce(&File) -> R,
1044 1776832 : {
1045 1776832 : // SAFETY:
1046 1776832 : // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
1047 1776832 : // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
1048 1776832 : let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
1049 1776832 : let res = with(&file);
1050 1776832 : let _ = file.into_raw_fd();
1051 1776832 : res
1052 1776832 : }
1053 : }
1054 :
1055 : impl tokio_epoll_uring::IoFd for FileGuard {
1056 1776998 : unsafe fn as_fd(&self) -> RawFd {
1057 1776998 : let owned_fd: &OwnedFd = self.as_ref();
1058 1776998 : owned_fd.as_raw_fd()
1059 1776998 : }
1060 : }
1061 :
1062 : #[cfg(test)]
1063 : impl VirtualFile {
1064 62748 : pub(crate) async fn read_blk(
1065 62748 : &self,
1066 62748 : blknum: u32,
1067 62748 : ctx: &RequestContext,
1068 62748 : ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
1069 62748 : self.inner.read_blk(blknum, ctx).await
1070 62748 : }
1071 : }
1072 :
1073 : #[cfg(test)]
1074 : impl VirtualFileInner {
1075 62748 : pub(crate) async fn read_blk(
1076 62748 : &self,
1077 62748 : blknum: u32,
1078 62748 : ctx: &RequestContext,
1079 62748 : ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
1080 : use crate::page_cache::PAGE_SZ;
1081 62748 : let slice = IoBufferMut::with_capacity(PAGE_SZ).slice_full();
1082 62748 : assert_eq!(slice.bytes_total(), PAGE_SZ);
1083 62748 : let slice = self
1084 62748 : .read_exact_at(slice, blknum as u64 * (PAGE_SZ as u64), ctx)
1085 62748 : .await?;
1086 62748 : Ok(crate::tenant::block_io::BlockLease::IoBufferMut(
1087 62748 : slice.into_inner(),
1088 62748 : ))
1089 62748 : }
1090 : }
1091 :
1092 : impl Drop for VirtualFileInner {
1093 : /// If a VirtualFile is dropped, close the underlying file if it was open.
1094 31952 : fn drop(&mut self) {
1095 31952 : let handle = self.handle.get_mut();
1096 :
1097 31952 : fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
1098 31952 : if slot_guard.tag == tag {
1099 28486 : slot.recently_used.store(false, Ordering::Relaxed);
1100 : // there is also operation "close-by-replace" for closes done on eviction for
1101 : // comparison.
1102 28486 : if let Some(fd) = slot_guard.file.take() {
1103 28486 : STORAGE_IO_TIME_METRIC
1104 28486 : .get(StorageIoOperation::Close)
1105 28486 : .observe_closure_duration(|| drop(fd));
1106 28486 : }
1107 3466 : }
1108 31952 : }
1109 :
1110 : // We don't have async drop so we cannot directly await the lock here.
1111 : // Instead, first do a best-effort attempt at closing the underlying
1112 : // file descriptor by using `try_write`, and if that fails, spawn
1113 : // a tokio task to do it asynchronously: we just want it to be
1114 : // cleaned up eventually.
1115 : // Most of the time, the `try_lock` should succeed though,
1116 : // as we have `&mut self` access. In other words, if the slot
1117 : // is still occupied by our file, there should be no access from
1118 : // other I/O operations; the only other possible place to lock
1119 : // the slot is the lock algorithm looking for free slots.
1120 31952 : let slot = &get_open_files().slots[handle.index];
1121 31952 : if let Ok(slot_guard) = slot.inner.try_write() {
1122 31948 : clean_slot(slot, slot_guard, handle.tag);
1123 31948 : } else {
1124 4 : let tag = handle.tag;
1125 4 : tokio::spawn(async move {
1126 4 : let slot_guard = slot.inner.write().await;
1127 4 : clean_slot(slot, slot_guard, tag);
1128 4 : });
1129 4 : };
1130 31952 : }
1131 : }
1132 :
1133 : impl OwnedAsyncWriter for VirtualFile {
1134 0 : async fn write_all_at<Buf: IoBufAligned + Send>(
1135 0 : &self,
1136 0 : buf: FullSlice<Buf>,
1137 0 : offset: u64,
1138 0 : ctx: &RequestContext,
1139 0 : ) -> (FullSlice<Buf>, std::io::Result<()>) {
1140 0 : VirtualFile::write_all_at(self, buf, offset, ctx).await
1141 0 : }
1142 0 : async fn set_len(&self, len: u64, ctx: &RequestContext) -> std::io::Result<()> {
1143 0 : VirtualFile::set_len(self, len, ctx).await
1144 0 : }
1145 : }
1146 :
1147 : impl OpenFiles {
1148 1452 : fn new(num_slots: usize) -> OpenFiles {
1149 1452 : let mut slots = Box::new(Vec::with_capacity(num_slots));
1150 14520 : for _ in 0..num_slots {
1151 14520 : let slot = Slot {
1152 14520 : recently_used: AtomicBool::new(false),
1153 14520 : inner: RwLock::new(SlotInner { tag: 0, file: None }),
1154 14520 : };
1155 14520 : slots.push(slot);
1156 14520 : }
1157 :
1158 1452 : OpenFiles {
1159 1452 : next: AtomicUsize::new(0),
1160 1452 : slots: Box::leak(slots),
1161 1452 : }
1162 1452 : }
1163 : }
1164 :
1165 : ///
1166 : /// Initialize the virtual file module. This must be called once at page
1167 : /// server startup.
1168 : ///
1169 : #[cfg(not(test))]
1170 0 : pub fn init(num_slots: usize, engine: IoEngineKind, mode: IoMode, sync_mode: SyncMode) {
1171 0 : if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
1172 0 : panic!("virtual_file::init called twice");
1173 0 : }
1174 0 : set_io_mode(mode);
1175 0 : io_engine::init(engine);
1176 0 : SYNC_MODE.store(sync_mode as u8, std::sync::atomic::Ordering::Relaxed);
1177 0 : crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
1178 0 : }
1179 :
1180 : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
1181 :
1182 : // Get a handle to the global slots array.
1183 3622754 : fn get_open_files() -> &'static OpenFiles {
1184 3622754 : //
1185 3622754 : // In unit tests, page server startup doesn't happen and no one calls
1186 3622754 : // virtual_file::init(). Initialize it here, with a small array.
1187 3622754 : //
1188 3622754 : // This applies to the virtual file tests below, but all other unit
1189 3622754 : // tests too, so the virtual file facility is always usable in
1190 3622754 : // unit tests.
1191 3622754 : //
1192 3622754 : if cfg!(test) {
1193 3622754 : OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
1194 : } else {
1195 0 : OPEN_FILES.get().expect("virtual_file::init not called yet")
1196 : }
1197 3622754 : }
1198 :
1199 : /// Gets the io buffer alignment.
1200 0 : pub(crate) const fn get_io_buffer_alignment() -> usize {
1201 0 : DEFAULT_IO_BUFFER_ALIGNMENT
1202 0 : }
1203 :
1204 : pub(crate) type IoBufferMut = AlignedBufferMut<ConstAlign<{ get_io_buffer_alignment() }>>;
1205 : pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment() }>>;
1206 : pub(crate) type IoPageSlice<'a> =
1207 : AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
1208 :
1209 1416 : static IO_MODE: LazyLock<AtomicU8> = LazyLock::new(|| AtomicU8::new(IoMode::preferred() as u8));
1210 :
1211 0 : pub fn set_io_mode(mode: IoMode) {
1212 0 : IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
1213 0 : }
1214 :
1215 29676 : pub(crate) fn get_io_mode() -> IoMode {
1216 29676 : IoMode::try_from(IO_MODE.load(Ordering::Relaxed)).unwrap()
1217 29676 : }
1218 :
1219 : static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8);
1220 :
1221 : #[cfg(test)]
1222 : mod tests {
1223 : use std::os::unix::fs::FileExt;
1224 : use std::sync::Arc;
1225 :
1226 : use owned_buffers_io::io_buf_ext::IoBufExt;
1227 : use owned_buffers_io::slice::SliceMutExt;
1228 : use rand::seq::SliceRandom;
1229 : use rand::{Rng, thread_rng};
1230 :
1231 : use super::*;
1232 : use crate::context::DownloadBehavior;
1233 : use crate::task_mgr::TaskKind;
1234 :
1235 : enum MaybeVirtualFile {
1236 : VirtualFile(VirtualFile),
1237 : File(File),
1238 : }
1239 :
1240 : impl From<VirtualFile> for MaybeVirtualFile {
1241 36 : fn from(vf: VirtualFile) -> Self {
1242 36 : MaybeVirtualFile::VirtualFile(vf)
1243 36 : }
1244 : }
1245 :
1246 : impl MaybeVirtualFile {
1247 4932 : async fn read_exact_at(
1248 4932 : &self,
1249 4932 : mut slice: tokio_epoll_uring::Slice<IoBufferMut>,
1250 4932 : offset: u64,
1251 4932 : ctx: &RequestContext,
1252 4932 : ) -> Result<tokio_epoll_uring::Slice<IoBufferMut>, Error> {
1253 4932 : match self {
1254 2484 : MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(slice, offset, ctx).await,
1255 2448 : MaybeVirtualFile::File(file) => {
1256 2448 : let rust_slice: &mut [u8] = slice.as_mut_rust_slice_full_zeroed();
1257 2448 : file.read_exact_at(rust_slice, offset).map(|()| slice)
1258 : }
1259 : }
1260 4932 : }
1261 96 : async fn write_all_at<Buf: IoBufAligned + Send>(
1262 96 : &self,
1263 96 : buf: FullSlice<Buf>,
1264 96 : offset: u64,
1265 96 : ctx: &RequestContext,
1266 96 : ) -> Result<(), Error> {
1267 96 : match self {
1268 48 : MaybeVirtualFile::VirtualFile(file) => {
1269 48 : let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
1270 48 : res
1271 : }
1272 48 : MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
1273 : }
1274 96 : }
1275 :
1276 : // Helper function to slurp a portion of a file into a string
1277 4932 : async fn read_string_at(
1278 4932 : &mut self,
1279 4932 : pos: u64,
1280 4932 : len: usize,
1281 4932 : ctx: &RequestContext,
1282 4932 : ) -> Result<String, Error> {
1283 4932 : let slice = IoBufferMut::with_capacity(len).slice_full();
1284 4932 : assert_eq!(slice.bytes_total(), len);
1285 4932 : let slice = self.read_exact_at(slice, pos, ctx).await?;
1286 4908 : let buf = slice.into_inner();
1287 4908 : assert_eq!(buf.len(), len);
1288 :
1289 4908 : Ok(String::from_utf8(buf.to_vec()).unwrap())
1290 4932 : }
1291 : }
1292 :
1293 : #[tokio::test]
1294 12 : async fn test_virtual_files() -> anyhow::Result<()> {
1295 12 : // The real work is done in the test_files() helper function. This
1296 12 : // allows us to run the same set of tests against a native File, and
1297 12 : // VirtualFile. We trust the native Files and wouldn't need to test them,
1298 12 : // but this allows us to verify that the operations return the same
1299 12 : // results with VirtualFiles as with native Files. (Except that with
1300 12 : // native files, you will run out of file descriptors if the ulimit
1301 12 : // is low enough.)
1302 12 : struct A;
1303 12 :
1304 12 : impl Adapter for A {
1305 1236 : async fn open(
1306 1236 : path: Utf8PathBuf,
1307 1236 : opts: OpenOptions,
1308 1236 : ctx: &RequestContext,
1309 1236 : ) -> Result<MaybeVirtualFile, anyhow::Error> {
1310 1236 : let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
1311 1236 : Ok(MaybeVirtualFile::VirtualFile(vf))
1312 1236 : }
1313 12 : }
1314 12 : test_files::<A>("virtual_files").await
1315 12 : }
1316 :
1317 : #[tokio::test]
1318 12 : async fn test_physical_files() -> anyhow::Result<()> {
1319 12 : struct B;
1320 12 :
1321 12 : impl Adapter for B {
1322 1236 : async fn open(
1323 1236 : path: Utf8PathBuf,
1324 1236 : opts: OpenOptions,
1325 1236 : _ctx: &RequestContext,
1326 1236 : ) -> Result<MaybeVirtualFile, anyhow::Error> {
1327 12 : Ok(MaybeVirtualFile::File({
1328 1236 : let owned_fd = opts.open(path.as_std_path()).await?;
1329 1236 : File::from(owned_fd)
1330 12 : }))
1331 1236 : }
1332 12 : }
1333 12 :
1334 12 : test_files::<B>("physical_files").await
1335 12 : }
1336 :
1337 : /// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
1338 : /// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
1339 : /// in trait which benefits from the new lifetime capture rules already.
1340 : trait Adapter {
1341 : async fn open(
1342 : path: Utf8PathBuf,
1343 : opts: OpenOptions,
1344 : ctx: &RequestContext,
1345 : ) -> Result<MaybeVirtualFile, anyhow::Error>;
1346 : }
1347 :
1348 24 : async fn test_files<A>(testname: &str) -> anyhow::Result<()>
1349 24 : where
1350 24 : A: Adapter,
1351 24 : {
1352 24 : let ctx =
1353 24 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
1354 24 : let testdir = crate::config::PageServerConf::test_repo_dir(testname);
1355 24 : std::fs::create_dir_all(&testdir)?;
1356 :
1357 24 : let path_a = testdir.join("file_a");
1358 24 : let mut file_a = A::open(
1359 24 : path_a.clone(),
1360 24 : OpenOptions::new()
1361 24 : .write(true)
1362 24 : .create(true)
1363 24 : .truncate(true)
1364 24 : .to_owned(),
1365 24 : &ctx,
1366 24 : )
1367 24 : .await?;
1368 :
1369 24 : file_a
1370 24 : .write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx)
1371 24 : .await?;
1372 :
1373 : // cannot read from a file opened in write-only mode
1374 24 : let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
1375 :
1376 : // Close the file and re-open for reading
1377 24 : let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
1378 :
1379 : // cannot write to a file opened in read-only mode
1380 24 : let _ = file_a
1381 24 : .write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx)
1382 24 : .await
1383 24 : .unwrap_err();
1384 24 :
1385 24 : // Try simple read
1386 24 : assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
1387 :
1388 : // Create another test file, and try FileExt functions on it.
1389 24 : let path_b = testdir.join("file_b");
1390 24 : let mut file_b = A::open(
1391 24 : path_b.clone(),
1392 24 : OpenOptions::new()
1393 24 : .read(true)
1394 24 : .write(true)
1395 24 : .create(true)
1396 24 : .truncate(true)
1397 24 : .to_owned(),
1398 24 : &ctx,
1399 24 : )
1400 24 : .await?;
1401 24 : file_b
1402 24 : .write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx)
1403 24 : .await?;
1404 24 : file_b
1405 24 : .write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx)
1406 24 : .await?;
1407 :
1408 24 : assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
1409 :
1410 : // Open a lot of files, enough to cause some evictions. (Or to be precise,
1411 : // open the same file many times. The effect is the same.)
1412 :
1413 24 : let mut vfiles = Vec::new();
1414 2424 : for _ in 0..100 {
1415 2400 : let mut vfile = A::open(
1416 2400 : path_b.clone(),
1417 2400 : OpenOptions::new().read(true).to_owned(),
1418 2400 : &ctx,
1419 2400 : )
1420 2400 : .await?;
1421 2400 : assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
1422 2400 : vfiles.push(vfile);
1423 : }
1424 :
1425 : // make sure we opened enough files to definitely cause evictions.
1426 24 : assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
1427 :
1428 : // The underlying file descriptor for 'file_a' should be closed now. Try to read
1429 : // from it again.
1430 24 : assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
1431 :
1432 : // Check that all the other FDs still work too. Use them in random order for
1433 : // good measure.
1434 24 : vfiles.as_mut_slice().shuffle(&mut thread_rng());
1435 2400 : for vfile in vfiles.iter_mut() {
1436 2400 : assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
1437 : }
1438 :
1439 24 : Ok(())
1440 24 : }
1441 :
1442 : /// Test using VirtualFiles from many threads concurrently. This tests both using
1443 : /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
1444 : /// VirtualFile from multiple threads concurrently.
1445 : #[tokio::test]
1446 12 : async fn test_vfile_concurrency() -> Result<(), Error> {
1447 12 : const SIZE: usize = 8 * 1024;
1448 12 : const VIRTUAL_FILES: usize = 100;
1449 12 : const THREADS: usize = 100;
1450 12 : const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
1451 12 :
1452 12 : let ctx =
1453 12 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
1454 12 : let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
1455 12 : std::fs::create_dir_all(&testdir)?;
1456 12 :
1457 12 : // Create a test file.
1458 12 : let test_file_path = testdir.join("concurrency_test_file");
1459 12 : {
1460 12 : let file = File::create(&test_file_path)?;
1461 12 : file.write_all_at(&SAMPLE, 0)?;
1462 12 : }
1463 12 :
1464 12 : // Open the file many times.
1465 12 : let mut files = Vec::new();
1466 1212 : for _ in 0..VIRTUAL_FILES {
1467 1200 : let f = VirtualFileInner::open_with_options(
1468 1200 : &test_file_path,
1469 1200 : OpenOptions::new().read(true).clone(),
1470 1200 : &ctx,
1471 1200 : )
1472 1200 : .await?;
1473 1200 : files.push(f);
1474 12 : }
1475 12 : let files = Arc::new(files);
1476 12 :
1477 12 : // Launch many threads, and use the virtual files concurrently in random order.
1478 12 : let rt = tokio::runtime::Builder::new_multi_thread()
1479 12 : .worker_threads(THREADS)
1480 12 : .thread_name("test_vfile_concurrency thread")
1481 12 : .build()
1482 12 : .unwrap();
1483 12 : let mut hdls = Vec::new();
1484 1212 : for _threadno in 0..THREADS {
1485 1200 : let files = files.clone();
1486 1200 : let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
1487 1200 : let hdl = rt.spawn(async move {
1488 1200 : let mut buf = IoBufferMut::with_capacity_zeroed(SIZE);
1489 1200 : let mut rng = rand::rngs::OsRng;
1490 1200000 : for _ in 1..1000 {
1491 1198800 : let f = &files[rng.gen_range(0..files.len())];
1492 1198800 : buf = f
1493 1198800 : .read_exact_at(buf.slice_full(), 0, &ctx)
1494 1198800 : .await
1495 1198800 : .unwrap()
1496 1198800 : .into_inner();
1497 1198800 : assert!(buf[..] == SAMPLE);
1498 12 : }
1499 1200 : });
1500 1200 : hdls.push(hdl);
1501 1200 : }
1502 1212 : for hdl in hdls {
1503 1200 : hdl.await?;
1504 12 : }
1505 12 : std::mem::forget(rt);
1506 12 :
1507 12 : Ok(())
1508 12 : }
1509 :
1510 : #[tokio::test]
1511 12 : async fn test_atomic_overwrite_basic() {
1512 12 : let ctx =
1513 12 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
1514 12 : let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
1515 12 : std::fs::create_dir_all(&testdir).unwrap();
1516 12 :
1517 12 : let path = testdir.join("myfile");
1518 12 : let tmp_path = testdir.join("myfile.tmp");
1519 12 :
1520 12 : VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
1521 12 : .await
1522 12 : .unwrap();
1523 12 : let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
1524 12 : let post = file.read_string_at(0, 3, &ctx).await.unwrap();
1525 12 : assert_eq!(post, "foo");
1526 12 : assert!(!tmp_path.exists());
1527 12 : drop(file);
1528 12 :
1529 12 : VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
1530 12 : .await
1531 12 : .unwrap();
1532 12 : let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
1533 12 : let post = file.read_string_at(0, 3, &ctx).await.unwrap();
1534 12 : assert_eq!(post, "bar");
1535 12 : assert!(!tmp_path.exists());
1536 12 : drop(file);
1537 12 : }
1538 :
1539 : #[tokio::test]
1540 12 : async fn test_atomic_overwrite_preexisting_tmp() {
1541 12 : let ctx =
1542 12 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
1543 12 : let testdir =
1544 12 : crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
1545 12 : std::fs::create_dir_all(&testdir).unwrap();
1546 12 :
1547 12 : let path = testdir.join("myfile");
1548 12 : let tmp_path = testdir.join("myfile.tmp");
1549 12 :
1550 12 : std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
1551 12 : assert!(tmp_path.exists());
1552 12 :
1553 12 : VirtualFileInner::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
1554 12 : .await
1555 12 : .unwrap();
1556 12 :
1557 12 : let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
1558 12 : let post = file.read_string_at(0, 3, &ctx).await.unwrap();
1559 12 : assert_eq!(post, "foo");
1560 12 : assert!(!tmp_path.exists());
1561 12 : drop(file);
1562 12 : }
1563 : }
|