Line data Source code
1 : //!
2 : //! VirtualFile is like a normal File, but it's not bound directly to
3 : //! a file descriptor. Instead, the file is opened when it's read from,
4 : //! and if too many files are open globally in the system, least-recently
5 : //! used ones are closed.
6 : //!
7 : //! To track which files have been recently used, we use the clock algorithm
8 : //! with a 'recently_used' flag on each slot.
9 : //!
10 : //! This is similar to PostgreSQL's virtual file descriptor facility in
11 : //! src/backend/storage/file/fd.c
12 : //!
13 : use crate::context::RequestContext;
14 : use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
15 :
16 : use crate::page_cache::PageWriteGuard;
17 : use crate::tenant::TENANTS_SEGMENT_NAME;
18 : use camino::{Utf8Path, Utf8PathBuf};
19 : use once_cell::sync::OnceCell;
20 : use pageserver_api::shard::TenantShardId;
21 : use std::fs::File;
22 : use std::io::{Error, ErrorKind, Seek, SeekFrom};
23 : use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
24 :
25 : use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
26 : use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
27 : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
28 : use tokio::time::Instant;
29 :
30 : pub use pageserver_api::models::virtual_file as api;
31 : pub(crate) mod io_engine;
32 : pub use io_engine::feature_test as io_engine_feature_test;
33 : pub use io_engine::FeatureTestResult as IoEngineFeatureTestResult;
34 : mod metadata;
35 : mod open_options;
36 : use self::owned_buffers_io::write::OwnedAsyncWriter;
37 : pub(crate) use io_engine::IoEngineKind;
38 : pub(crate) use metadata::Metadata;
39 : pub(crate) use open_options::*;
40 :
41 : pub(crate) mod owned_buffers_io {
42 : //! Abstractions for IO with owned buffers.
43 : //!
44 : //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary
45 : //! reason we need this abstraction.
46 : //!
47 : //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`,
48 : //! but for the time being we're proving out the primitives in the neon.git repo
49 : //! for faster iteration.
50 :
51 : pub(crate) mod write;
52 : pub(crate) mod util {
53 : pub(crate) mod size_tracking_writer;
54 : }
55 : }
56 :
57 : ///
58 : /// A virtual file descriptor. You can use this just like std::fs::File, but internally
59 : /// the underlying file is closed if the system is low on file descriptors,
60 : /// and re-opened when it's accessed again.
61 : ///
62 : /// Like with std::fs::File, multiple threads can read/write the file concurrently,
63 : /// holding just a shared reference the same VirtualFile, using the read_at() / write_at()
64 : /// functions from the FileExt trait. But the functions from the Read/Write/Seek traits
65 : /// require a mutable reference, because they modify the "current position".
66 : ///
67 : /// Each VirtualFile has a physical file descriptor in the global OPEN_FILES array, at the
68 : /// slot that 'handle points to, if the underlying file is currently open. If it's not
69 : /// currently open, the 'handle' can still point to the slot where it was last kept. The
70 : /// 'tag' field is used to detect whether the handle still is valid or not.
71 : ///
72 : #[derive(Debug)]
73 : pub struct VirtualFile {
74 : /// Lazy handle to the global file descriptor cache. The slot that this points to
75 : /// might contain our File, or it may be empty, or it may contain a File that
76 : /// belongs to a different VirtualFile.
77 : handle: RwLock<SlotHandle>,
78 :
79 : /// Current file position
80 : pos: u64,
81 :
82 : /// File path and options to use to open it.
83 : ///
84 : /// Note: this only contains the options needed to re-open it. For example,
85 : /// if a new file is created, we only pass the create flag when it's initially
86 : /// opened, in the VirtualFile::create() function, and strip the flag before
87 : /// storing it here.
88 : pub path: Utf8PathBuf,
89 : open_options: OpenOptions,
90 :
91 : // These are strings becase we only use them for metrics, and those expect strings.
92 : // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
93 : // strings.
94 : tenant_id: String,
95 : shard_id: String,
96 : timeline_id: String,
97 : }
98 :
99 : #[derive(Debug, PartialEq, Clone, Copy)]
100 : struct SlotHandle {
101 : /// Index into OPEN_FILES.slots
102 : index: usize,
103 :
104 : /// Value of 'tag' in the slot. If slot's tag doesn't match, then the slot has
105 : /// been recycled and no longer contains the FD for this virtual file.
106 : tag: u64,
107 : }
108 :
109 : /// OPEN_FILES is the global array that holds the physical file descriptors that
110 : /// are currently open. Each slot in the array is protected by a separate lock,
111 : /// so that different files can be accessed independently. The lock must be held
112 : /// in write mode to replace the slot with a different file, but a read mode
113 : /// is enough to operate on the file, whether you're reading or writing to it.
114 : ///
115 : /// OPEN_FILES starts in uninitialized state, and it's initialized by
116 : /// the virtual_file::init() function. It must be called exactly once at page
117 : /// server startup.
118 : static OPEN_FILES: OnceCell<OpenFiles> = OnceCell::new();
119 :
120 : struct OpenFiles {
121 : slots: &'static [Slot],
122 :
123 : /// clock arm for the clock algorithm
124 : next: AtomicUsize,
125 : }
126 :
127 : struct Slot {
128 : inner: RwLock<SlotInner>,
129 :
130 : /// has this file been used since last clock sweep?
131 : recently_used: AtomicBool,
132 : }
133 :
134 : struct SlotInner {
135 : /// Counter that's incremented every time a different file is stored here.
136 : /// To avoid the ABA problem.
137 : tag: u64,
138 :
139 : /// the underlying file
140 : file: Option<OwnedFd>,
141 : }
142 :
143 : /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
144 : struct PageWriteGuardBuf {
145 : page: PageWriteGuard<'static>,
146 : init_up_to: usize,
147 : }
148 : // Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
149 : // and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
150 : unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
151 111950 : fn stable_ptr(&self) -> *const u8 {
152 111950 : self.page.as_ptr()
153 111950 : }
154 335850 : fn bytes_init(&self) -> usize {
155 335850 : self.init_up_to
156 335850 : }
157 111950 : fn bytes_total(&self) -> usize {
158 111950 : self.page.len()
159 111950 : }
160 : }
161 : // Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
162 : // hence it's safe to hand out the `stable_mut_ptr()`.
163 : unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
164 111950 : fn stable_mut_ptr(&mut self) -> *mut u8 {
165 111950 : self.page.as_mut_ptr()
166 111950 : }
167 :
168 111950 : unsafe fn set_init(&mut self, pos: usize) {
169 111950 : assert!(pos <= self.page.len());
170 111950 : self.init_up_to = pos;
171 111950 : }
172 : }
173 :
174 : impl OpenFiles {
175 : /// Find a slot to use, evicting an existing file descriptor if needed.
176 : ///
177 : /// On return, we hold a lock on the slot, and its 'tag' has been updated
178 : /// recently_used has been set. It's all ready for reuse.
179 188806 : async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
180 188806 : //
181 188806 : // Run the clock algorithm to find a slot to replace.
182 188806 : //
183 188806 : let num_slots = self.slots.len();
184 188806 : let mut retries = 0;
185 : let mut slot;
186 : let mut slot_guard;
187 : let index;
188 2012858 : loop {
189 2012858 : let next = self.next.fetch_add(1, Ordering::AcqRel) % num_slots;
190 2012858 : slot = &self.slots[next];
191 2012858 :
192 2012858 : // If the recently_used flag on this slot is set, continue the clock
193 2012858 : // sweep. Otherwise try to use this slot. If we cannot acquire the
194 2012858 : // lock, also continue the clock sweep.
195 2012858 : //
196 2012858 : // We only continue in this manner for a while, though. If we loop
197 2012858 : // through the array twice without finding a victim, just pick the
198 2012858 : // next slot and wait until we can reuse it. This way, we avoid
199 2012858 : // spinning in the extreme case that all the slots are busy with an
200 2012858 : // I/O operation.
201 2012858 : if retries < num_slots * 2 {
202 1942377 : if !slot.recently_used.swap(false, Ordering::Release) {
203 1738026 : if let Ok(guard) = slot.inner.try_write() {
204 118325 : slot_guard = guard;
205 118325 : index = next;
206 118325 : break;
207 1619701 : }
208 204351 : }
209 1824052 : retries += 1;
210 : } else {
211 70481 : slot_guard = slot.inner.write().await;
212 70481 : index = next;
213 70481 : break;
214 : }
215 : }
216 :
217 : //
218 : // We now have the victim slot locked. If it was in use previously, close the
219 : // old file.
220 : //
221 188806 : if let Some(old_file) = slot_guard.file.take() {
222 184593 : // the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
223 184593 : // distinguish the two.
224 184593 : STORAGE_IO_TIME_METRIC
225 184593 : .get(StorageIoOperation::CloseByReplace)
226 184593 : .observe_closure_duration(|| drop(old_file));
227 184593 : }
228 :
229 : // Prepare the slot for reuse and return it
230 188806 : slot_guard.tag += 1;
231 188806 : slot.recently_used.store(true, Ordering::Relaxed);
232 188806 : (
233 188806 : SlotHandle {
234 188806 : index,
235 188806 : tag: slot_guard.tag,
236 188806 : },
237 188806 : slot_guard,
238 188806 : )
239 188806 : }
240 : }
241 :
242 : /// Identify error types that should alwways terminate the process. Other
243 : /// error types may be elegible for retry.
244 0 : pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
245 0 : use nix::errno::Errno::*;
246 0 : match e.raw_os_error().map(nix::errno::from_i32) {
247 : Some(EIO) => {
248 : // Terminate on EIO because we no longer trust the device to store
249 : // data safely, or to uphold persistence guarantees on fsync.
250 0 : true
251 : }
252 : Some(EROFS) => {
253 : // Terminate on EROFS because a filesystem is usually remounted
254 : // readonly when it has experienced some critical issue, so the same
255 : // logic as EIO applies.
256 0 : true
257 : }
258 : Some(EACCES) => {
259 : // Terminate on EACCESS because we should always have permissions
260 : // for our own data dir: if we don't, then we can't do our job and
261 : // need administrative intervention to fix permissions. Terminating
262 : // is the best way to make sure we stop cleanly rather than going
263 : // into infinite retry loops, and will make it clear to the outside
264 : // world that we need help.
265 0 : true
266 : }
267 : _ => {
268 : // Treat all other local file I/O errors are retryable. This includes:
269 : // - ENOSPC: we stay up and wait for eviction to free some space
270 : // - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
271 : // - WriteZero, Interrupted: these are used internally VirtualFile
272 0 : false
273 : }
274 : }
275 0 : }
276 :
277 : /// Call this when the local filesystem gives us an error with an external
278 : /// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
279 : /// bad storage or bad configuration, and we can't fix that from inside
280 : /// a running process.
281 0 : pub(crate) fn on_fatal_io_error(e: &std::io::Error, context: &str) -> ! {
282 0 : tracing::error!("Fatal I/O error: {e}: {context})");
283 0 : std::process::abort();
284 : }
285 :
286 : pub(crate) trait MaybeFatalIo<T> {
287 : fn maybe_fatal_err(self, context: &str) -> std::io::Result<T>;
288 : fn fatal_err(self, context: &str) -> T;
289 : }
290 :
291 : impl<T> MaybeFatalIo<T> for std::io::Result<T> {
292 : /// Terminate the process if the result is an error of a fatal type, else pass it through
293 : ///
294 : /// This is appropriate for writes, where we typically want to die on EIO/ACCES etc, but
295 : /// not on ENOSPC.
296 22 : fn maybe_fatal_err(self, context: &str) -> std::io::Result<T> {
297 22 : if let Err(e) = &self {
298 0 : if is_fatal_io_error(e) {
299 0 : on_fatal_io_error(e, context);
300 0 : }
301 22 : }
302 22 : self
303 22 : }
304 :
305 : /// Terminate the process on any I/O error.
306 : ///
307 : /// This is appropriate for reads on files that we know exist: they should always work.
308 2154 : fn fatal_err(self, context: &str) -> T {
309 2154 : match self {
310 2154 : Ok(v) => v,
311 0 : Err(e) => {
312 0 : on_fatal_io_error(&e, context);
313 : }
314 : }
315 2154 : }
316 : }
317 :
318 : /// Observe duration for the given storage I/O operation
319 : ///
320 : /// Unlike `observe_closure_duration`, this supports async,
321 : /// where "support" means that we measure wall clock time.
322 : macro_rules! observe_duration {
323 : ($op:expr, $($body:tt)*) => {{
324 : let instant = Instant::now();
325 : let result = $($body)*;
326 : let elapsed = instant.elapsed().as_secs_f64();
327 : STORAGE_IO_TIME_METRIC
328 : .get($op)
329 : .observe(elapsed);
330 : result
331 : }}
332 : }
333 :
334 : macro_rules! with_file {
335 : ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
336 : let $ident = $this.lock_file().await?;
337 : observe_duration!($op, $($body)*)
338 : }};
339 : ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
340 : let mut $ident = $this.lock_file().await?;
341 : observe_duration!($op, $($body)*)
342 : }};
343 : }
344 :
345 : impl VirtualFile {
346 : /// Open a file in read-only mode. Like File::open.
347 2046 : pub async fn open(
348 2046 : path: &Utf8Path,
349 2046 : ctx: &RequestContext,
350 2046 : ) -> Result<VirtualFile, std::io::Error> {
351 2046 : Self::open_with_options(path, OpenOptions::new().read(true), ctx).await
352 2046 : }
353 :
354 : /// Create a new file for writing. If the file exists, it will be truncated.
355 : /// Like File::create.
356 1163 : pub async fn create(
357 1163 : path: &Utf8Path,
358 1163 : ctx: &RequestContext,
359 1163 : ) -> Result<VirtualFile, std::io::Error> {
360 1163 : Self::open_with_options(
361 1163 : path,
362 1163 : OpenOptions::new().write(true).create(true).truncate(true),
363 1163 : ctx,
364 1163 : )
365 600 : .await
366 1163 : }
367 :
368 : /// Open a file with given options.
369 : ///
370 : /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
371 : /// they will be applied also when the file is subsequently re-opened, not only
372 : /// on the first time. Make sure that's sane!
373 5261 : pub async fn open_with_options(
374 5261 : path: &Utf8Path,
375 5261 : open_options: &OpenOptions,
376 5261 : _ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
377 5261 : ) -> Result<VirtualFile, std::io::Error> {
378 5261 : let path_str = path.to_string();
379 5261 : let parts = path_str.split('/').collect::<Vec<&str>>();
380 5261 : let (tenant_id, shard_id, timeline_id) =
381 5261 : if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
382 3741 : let tenant_shard_part = parts[parts.len() - 4];
383 3741 : let (tenant_id, shard_id) = match tenant_shard_part.parse::<TenantShardId>() {
384 3741 : Ok(tenant_shard_id) => (
385 3741 : tenant_shard_id.tenant_id.to_string(),
386 3741 : format!("{}", tenant_shard_id.shard_slug()),
387 3741 : ),
388 : Err(_) => {
389 : // Malformed path: this ID is just for observability, so tolerate it
390 : // and pass through
391 0 : (tenant_shard_part.to_string(), "*".to_string())
392 : }
393 : };
394 3741 : (tenant_id, shard_id, parts[parts.len() - 2].to_string())
395 : } else {
396 1520 : ("*".to_string(), "*".to_string(), "*".to_string())
397 : };
398 5261 : let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
399 :
400 : // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
401 : // where our caller doesn't get to use the returned VirtualFile before its
402 : // slot gets re-used by someone else.
403 5261 : let file = observe_duration!(StorageIoOperation::Open, {
404 5261 : open_options.open(path.as_std_path()).await?
405 : });
406 :
407 : // Strip all options other than read and write.
408 : //
409 : // It would perhaps be nicer to check just for the read and write flags
410 : // explicitly, but OpenOptions doesn't contain any functions to read flags,
411 : // only to set them.
412 5261 : let mut reopen_options = open_options.clone();
413 5261 : reopen_options.create(false);
414 5261 : reopen_options.create_new(false);
415 5261 : reopen_options.truncate(false);
416 5261 :
417 5261 : let vfile = VirtualFile {
418 5261 : handle: RwLock::new(handle),
419 5261 : pos: 0,
420 5261 : path: path.to_path_buf(),
421 5261 : open_options: reopen_options,
422 5261 : tenant_id,
423 5261 : shard_id,
424 5261 : timeline_id,
425 5261 : };
426 5261 :
427 5261 : // TODO: Under pressure, it's likely the slot will get re-used and
428 5261 : // the underlying file closed before they get around to using it.
429 5261 : // => https://github.com/neondatabase/neon/issues/6065
430 5261 : slot_guard.file.replace(file);
431 5261 :
432 5261 : Ok(vfile)
433 5261 : }
434 :
435 : /// Async version of [`::utils::crashsafe::overwrite`].
436 : ///
437 : /// # NB:
438 : ///
439 : /// Doesn't actually use the [`VirtualFile`] file descriptor cache, but,
440 : /// it did at an earlier time.
441 : /// And it will use this module's [`io_engine`] in the near future, so, leaving it here.
442 28 : pub async fn crashsafe_overwrite<B: BoundedBuf<Buf = Buf> + Send, Buf: IoBuf + Send>(
443 28 : final_path: Utf8PathBuf,
444 28 : tmp_path: Utf8PathBuf,
445 28 : content: B,
446 28 : ) -> std::io::Result<()> {
447 28 : // TODO: use tokio_epoll_uring if configured as `io_engine`.
448 28 : // See https://github.com/neondatabase/neon/issues/6663
449 28 :
450 28 : tokio::task::spawn_blocking(move || {
451 28 : let slice_storage;
452 28 : let content_len = content.bytes_init();
453 28 : let content = if content.bytes_init() > 0 {
454 28 : slice_storage = Some(content.slice(0..content_len));
455 28 : slice_storage.as_deref().expect("just set it to Some()")
456 : } else {
457 0 : &[]
458 : };
459 28 : utils::crashsafe::overwrite(&final_path, &tmp_path, content)
460 28 : })
461 28 : .await
462 28 : .expect("blocking task is never aborted")
463 28 : }
464 :
465 : /// Call File::sync_all() on the underlying File.
466 2357 : pub async fn sync_all(&self) -> Result<(), Error> {
467 2357 : with_file!(self, StorageIoOperation::Fsync, |file_guard| {
468 2357 : let (_file_guard, res) = io_engine::get().sync_all(file_guard).await;
469 2357 : res
470 : })
471 2357 : }
472 :
473 : /// Call File::sync_data() on the underlying File.
474 0 : pub async fn sync_data(&self) -> Result<(), Error> {
475 0 : with_file!(self, StorageIoOperation::Fsync, |file_guard| {
476 0 : let (_file_guard, res) = io_engine::get().sync_data(file_guard).await;
477 0 : res
478 : })
479 0 : }
480 :
481 1296 : pub async fn metadata(&self) -> Result<Metadata, Error> {
482 1296 : with_file!(self, StorageIoOperation::Metadata, |file_guard| {
483 1296 : let (_file_guard, res) = io_engine::get().metadata(file_guard).await;
484 1296 : res
485 : })
486 1296 : }
487 :
488 : /// Helper function internal to `VirtualFile` that looks up the underlying File,
489 : /// opens it and evicts some other File if necessary. The passed parameter is
490 : /// assumed to be a function available for the physical `File`.
491 : ///
492 : /// We are doing it via a macro as Rust doesn't support async closures that
493 : /// take on parameters with lifetimes.
494 436809 : async fn lock_file(&self) -> Result<FileGuard, Error> {
495 436809 : let open_files = get_open_files();
496 :
497 183545 : let mut handle_guard = {
498 : // Read the cached slot handle, and see if the slot that it points to still
499 : // contains our File.
500 : //
501 : // We only need to hold the handle lock while we read the current handle. If
502 : // another thread closes the file and recycles the slot for a different file,
503 : // we will notice that the handle we read is no longer valid and retry.
504 436809 : let mut handle = *self.handle.read().await;
505 528941 : loop {
506 528941 : // Check if the slot contains our File
507 528941 : {
508 528941 : let slot = &open_files.slots[handle.index];
509 528941 : let slot_guard = slot.inner.read().await;
510 528941 : if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
511 : // Found a cached file descriptor.
512 253264 : slot.recently_used.store(true, Ordering::Relaxed);
513 253264 : return Ok(FileGuard { slot_guard });
514 275677 : }
515 : }
516 :
517 : // The slot didn't contain our File. We will have to open it ourselves,
518 : // but before that, grab a write lock on handle in the VirtualFile, so
519 : // that no other thread will try to concurrently open the same file.
520 275677 : let handle_guard = self.handle.write().await;
521 :
522 : // If another thread changed the handle while we were not holding the lock,
523 : // then the handle might now be valid again. Loop back to retry.
524 275677 : if *handle_guard != handle {
525 92132 : handle = *handle_guard;
526 92132 : continue;
527 183545 : }
528 183545 : break handle_guard;
529 : }
530 : };
531 :
532 : // We need to open the file ourselves. The handle in the VirtualFile is
533 : // now locked in write-mode. Find a free slot to put it in.
534 183545 : let (handle, mut slot_guard) = open_files.find_victim_slot().await;
535 :
536 : // Re-open the physical file.
537 : // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
538 : // case from StorageIoOperation::Open. This helps with identifying thrashing
539 : // of the virtual file descriptor cache.
540 183545 : let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
541 183545 : self.open_options.open(self.path.as_std_path()).await?
542 : });
543 :
544 : // Store the File in the slot and update the handle in the VirtualFile
545 : // to point to it.
546 183545 : slot_guard.file.replace(file);
547 183545 :
548 183545 : *handle_guard = handle;
549 183545 :
550 183545 : return Ok(FileGuard {
551 183545 : slot_guard: slot_guard.downgrade(),
552 183545 : });
553 436809 : }
554 :
555 350 : pub fn remove(self) {
556 350 : let path = self.path.clone();
557 350 : drop(self);
558 350 : std::fs::remove_file(path).expect("failed to remove the virtual file");
559 350 : }
560 :
561 4250 : pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
562 4250 : match pos {
563 4240 : SeekFrom::Start(offset) => {
564 4240 : self.pos = offset;
565 4240 : }
566 4 : SeekFrom::End(offset) => {
567 4 : self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
568 4 : .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
569 : }
570 6 : SeekFrom::Current(offset) => {
571 6 : let pos = self.pos as i128 + offset as i128;
572 6 : if pos < 0 {
573 2 : return Err(Error::new(
574 2 : ErrorKind::InvalidInput,
575 2 : "offset would be negative",
576 2 : ));
577 4 : }
578 4 : if pos > u64::MAX as i128 {
579 0 : return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
580 4 : }
581 4 : self.pos = pos as u64;
582 : }
583 : }
584 4246 : Ok(self.pos)
585 4250 : }
586 :
587 332152 : pub async fn read_exact_at<B>(
588 332152 : &self,
589 332152 : buf: B,
590 332152 : offset: u64,
591 332152 : ctx: &RequestContext,
592 332152 : ) -> Result<B, Error>
593 332152 : where
594 332152 : B: IoBufMut + Send,
595 332152 : {
596 332152 : let (buf, res) = read_exact_at_impl(buf, offset, None, |buf, offset| {
597 332152 : self.read_at(buf, offset, ctx)
598 332152 : })
599 616031 : .await;
600 332152 : res.map(|()| buf)
601 332152 : }
602 :
603 38674 : pub async fn read_exact_at_n<B>(
604 38674 : &self,
605 38674 : buf: B,
606 38674 : offset: u64,
607 38674 : count: usize,
608 38674 : ctx: &RequestContext,
609 38674 : ) -> Result<B, Error>
610 38674 : where
611 38674 : B: IoBufMut + Send,
612 38674 : {
613 38674 : let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
614 38674 : self.read_at(buf, offset, ctx)
615 38674 : })
616 19677 : .await;
617 38674 : res.map(|()| buf)
618 38674 : }
619 :
620 : /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
621 111950 : pub async fn read_exact_at_page(
622 111950 : &self,
623 111950 : page: PageWriteGuard<'static>,
624 111950 : offset: u64,
625 111950 : ctx: &RequestContext,
626 111950 : ) -> Result<PageWriteGuard<'static>, Error> {
627 111950 : let buf = PageWriteGuardBuf {
628 111950 : page,
629 111950 : init_up_to: 0,
630 111950 : };
631 111950 : let res = self.read_exact_at(buf, offset, ctx).await;
632 111950 : res.map(|PageWriteGuardBuf { page, .. }| page)
633 111950 : .map_err(|e| Error::new(ErrorKind::Other, e))
634 111950 : }
635 :
636 : // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
637 4 : pub async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
638 4 : &self,
639 4 : buf: B,
640 4 : mut offset: u64,
641 4 : ctx: &RequestContext,
642 4 : ) -> (B::Buf, Result<(), Error>) {
643 4 : let buf_len = buf.bytes_init();
644 4 : if buf_len == 0 {
645 0 : return (Slice::into_inner(buf.slice_full()), Ok(()));
646 4 : }
647 4 : let mut buf = buf.slice(0..buf_len);
648 8 : while !buf.is_empty() {
649 : let res;
650 4 : (buf, res) = self.write_at(buf, offset, ctx).await;
651 0 : match res {
652 : Ok(0) => {
653 0 : return (
654 0 : Slice::into_inner(buf),
655 0 : Err(Error::new(
656 0 : std::io::ErrorKind::WriteZero,
657 0 : "failed to write whole buffer",
658 0 : )),
659 0 : );
660 : }
661 4 : Ok(n) => {
662 4 : buf = buf.slice(n..);
663 4 : offset += n as u64;
664 4 : }
665 0 : Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {}
666 0 : Err(e) => return (Slice::into_inner(buf), Err(e)),
667 : }
668 : }
669 4 : (Slice::into_inner(buf), Ok(()))
670 4 : }
671 :
672 : /// Writes `buf.slice(0..buf.bytes_init())`.
673 : /// Returns the IoBuf that is underlying the BoundedBuf `buf`.
674 : /// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
675 : /// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
676 61910 : pub async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
677 61910 : &mut self,
678 61910 : buf: B,
679 61910 : ctx: &RequestContext,
680 61910 : ) -> (B::Buf, Result<usize, Error>) {
681 61910 : let nbytes = buf.bytes_init();
682 61910 : if nbytes == 0 {
683 32 : return (Slice::into_inner(buf.slice_full()), Ok(0));
684 61878 : }
685 61878 : let mut buf = buf.slice(0..nbytes);
686 123754 : while !buf.is_empty() {
687 : let res;
688 61878 : (buf, res) = self.write(buf, ctx).await;
689 2 : match res {
690 : Ok(0) => {
691 0 : return (
692 0 : Slice::into_inner(buf),
693 0 : Err(Error::new(
694 0 : std::io::ErrorKind::WriteZero,
695 0 : "failed to write whole buffer",
696 0 : )),
697 0 : );
698 : }
699 61876 : Ok(n) => {
700 61876 : buf = buf.slice(n..);
701 61876 : }
702 2 : Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
703 2 : Err(e) => return (Slice::into_inner(buf), Err(e)),
704 : }
705 : }
706 61876 : (Slice::into_inner(buf), Ok(nbytes))
707 61910 : }
708 :
709 61878 : async fn write<B: IoBuf + Send>(
710 61878 : &mut self,
711 61878 : buf: Slice<B>,
712 61878 : ctx: &RequestContext,
713 61878 : ) -> (Slice<B>, Result<usize, std::io::Error>) {
714 61878 : let pos = self.pos;
715 61878 : let (buf, res) = self.write_at(buf, pos, ctx).await;
716 61878 : let n = match res {
717 61876 : Ok(n) => n,
718 2 : Err(e) => return (buf, Err(e)),
719 : };
720 61876 : self.pos += n as u64;
721 61876 : (buf, Ok(n))
722 61878 : }
723 :
724 371270 : pub(crate) async fn read_at<B>(
725 371270 : &self,
726 371270 : buf: B,
727 371270 : offset: u64,
728 371270 : _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
729 371270 : ) -> (B, Result<usize, Error>)
730 371270 : where
731 371270 : B: tokio_epoll_uring::BoundedBufMut + Send,
732 371270 : {
733 449988 : let file_guard = match self.lock_file().await {
734 371270 : Ok(file_guard) => file_guard,
735 0 : Err(e) => return (buf, Err(e)),
736 : };
737 :
738 371270 : observe_duration!(StorageIoOperation::Read, {
739 371270 : let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
740 371270 : if let Ok(size) = res {
741 371268 : STORAGE_IO_SIZE
742 371268 : .with_label_values(&[
743 371268 : "read",
744 371268 : &self.tenant_id,
745 371268 : &self.shard_id,
746 371268 : &self.timeline_id,
747 371268 : ])
748 371268 : .add(size as i64);
749 371268 : }
750 371270 : (buf, res)
751 : })
752 371270 : }
753 :
754 61882 : async fn write_at<B: IoBuf + Send>(
755 61882 : &self,
756 61882 : buf: Slice<B>,
757 61882 : offset: u64,
758 61882 : _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
759 61882 : ) -> (Slice<B>, Result<usize, Error>) {
760 61882 : let file_guard = match self.lock_file().await {
761 61882 : Ok(file_guard) => file_guard,
762 0 : Err(e) => return (buf, Err(e)),
763 : };
764 61882 : observe_duration!(StorageIoOperation::Write, {
765 61882 : let ((_file_guard, buf), result) =
766 61882 : io_engine::get().write_at(file_guard, offset, buf).await;
767 61882 : if let Ok(size) = result {
768 61880 : STORAGE_IO_SIZE
769 61880 : .with_label_values(&[
770 61880 : "write",
771 61880 : &self.tenant_id,
772 61880 : &self.shard_id,
773 61880 : &self.timeline_id,
774 61880 : ])
775 61880 : .add(size as i64);
776 61880 : }
777 61882 : (buf, result)
778 : })
779 61882 : }
780 : }
781 :
782 : // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
783 370836 : pub async fn read_exact_at_impl<B, F, Fut>(
784 370836 : buf: B,
785 370836 : mut offset: u64,
786 370836 : count: Option<usize>,
787 370836 : mut read_at: F,
788 370836 : ) -> (B, std::io::Result<()>)
789 370836 : where
790 370836 : B: IoBufMut + Send,
791 370836 : F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
792 370836 : Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
793 370836 : {
794 370836 : let mut buf: tokio_epoll_uring::Slice<B> = match count {
795 38676 : Some(count) => {
796 38676 : assert!(count <= buf.bytes_total());
797 38676 : assert!(count > 0);
798 38676 : buf.slice(..count) // may include uninitialized memory
799 : }
800 332160 : None => buf.slice_full(), // includes all the uninitialized memory
801 : };
802 :
803 741674 : while buf.bytes_total() != 0 {
804 : let res;
805 635708 : (buf, res) = read_at(buf, offset).await;
806 0 : match res {
807 2 : Ok(0) => break,
808 370838 : Ok(n) => {
809 370838 : buf = buf.slice(n..);
810 370838 : offset += n as u64;
811 370838 : }
812 0 : Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
813 0 : Err(e) => return (buf.into_inner(), Err(e)),
814 : }
815 : }
816 : // NB: don't use `buf.is_empty()` here; it is from the
817 : // `impl Deref for Slice { Target = [u8] }`; the &[u8]
818 : // returned by it only covers the initialized portion of `buf`.
819 : // Whereas we're interested in ensuring that we filled the entire
820 : // buffer that the user passed in.
821 370836 : if buf.bytes_total() != 0 {
822 2 : (
823 2 : buf.into_inner(),
824 2 : Err(std::io::Error::new(
825 2 : std::io::ErrorKind::UnexpectedEof,
826 2 : "failed to fill whole buffer",
827 2 : )),
828 2 : )
829 : } else {
830 370834 : assert_eq!(buf.len(), buf.bytes_total());
831 370834 : (buf.into_inner(), Ok(()))
832 : }
833 370836 : }
834 :
835 : #[cfg(test)]
836 : mod test_read_exact_at_impl {
837 :
838 : use std::{collections::VecDeque, sync::Arc};
839 :
840 : use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
841 :
842 : use super::read_exact_at_impl;
843 :
844 : struct Expectation {
845 : offset: u64,
846 : bytes_total: usize,
847 : result: std::io::Result<Vec<u8>>,
848 : }
849 : struct MockReadAt {
850 : expectations: VecDeque<Expectation>,
851 : }
852 :
853 : impl MockReadAt {
854 14 : async fn read_at(
855 14 : &mut self,
856 14 : mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
857 14 : offset: u64,
858 14 : ) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
859 14 : let exp = self
860 14 : .expectations
861 14 : .pop_front()
862 14 : .expect("read_at called but we have no expectations left");
863 14 : assert_eq!(exp.offset, offset);
864 14 : assert_eq!(exp.bytes_total, buf.bytes_total());
865 14 : match exp.result {
866 14 : Ok(bytes) => {
867 14 : assert!(bytes.len() <= buf.bytes_total());
868 14 : buf.put_slice(&bytes);
869 14 : (buf, Ok(bytes.len()))
870 : }
871 0 : Err(e) => (buf, Err(e)),
872 : }
873 14 : }
874 : }
875 :
876 : impl Drop for MockReadAt {
877 10 : fn drop(&mut self) {
878 10 : assert_eq!(self.expectations.len(), 0);
879 10 : }
880 : }
881 :
882 : #[tokio::test]
883 2 : async fn test_basic() {
884 2 : let buf = Vec::with_capacity(5);
885 2 : let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
886 2 : expectations: VecDeque::from(vec![Expectation {
887 2 : offset: 0,
888 2 : bytes_total: 5,
889 2 : result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
890 2 : }]),
891 2 : }));
892 2 : let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
893 2 : let mock_read_at = Arc::clone(&mock_read_at);
894 2 : async move { mock_read_at.lock().await.read_at(buf, offset).await }
895 2 : })
896 2 : .await;
897 2 : assert!(res.is_ok());
898 2 : assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
899 2 : }
900 :
901 : #[tokio::test]
902 2 : async fn test_with_count() {
903 2 : let buf = Vec::with_capacity(5);
904 2 : let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
905 2 : expectations: VecDeque::from(vec![Expectation {
906 2 : offset: 0,
907 2 : bytes_total: 3,
908 2 : result: Ok(vec![b'a', b'b', b'c']),
909 2 : }]),
910 2 : }));
911 2 :
912 2 : let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| {
913 2 : let mock_read_at = Arc::clone(&mock_read_at);
914 2 : async move { mock_read_at.lock().await.read_at(buf, offset).await }
915 2 : })
916 2 : .await;
917 2 : assert!(res.is_ok());
918 2 : assert_eq!(buf, vec![b'a', b'b', b'c']);
919 2 : }
920 :
921 : #[tokio::test]
922 2 : async fn test_empty_buf_issues_no_syscall() {
923 2 : let buf = Vec::new();
924 2 : let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
925 2 : expectations: VecDeque::new(),
926 2 : }));
927 2 : let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
928 0 : let mock_read_at = Arc::clone(&mock_read_at);
929 2 : async move { mock_read_at.lock().await.read_at(buf, offset).await }
930 2 : })
931 2 : .await;
932 2 : assert!(res.is_ok());
933 2 : }
934 :
935 : #[tokio::test]
936 2 : async fn test_two_read_at_calls_needed_until_buf_filled() {
937 2 : let buf = Vec::with_capacity(4);
938 2 : let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
939 2 : expectations: VecDeque::from(vec![
940 2 : Expectation {
941 2 : offset: 0,
942 2 : bytes_total: 4,
943 2 : result: Ok(vec![b'a', b'b']),
944 2 : },
945 2 : Expectation {
946 2 : offset: 2,
947 2 : bytes_total: 2,
948 2 : result: Ok(vec![b'c', b'd']),
949 2 : },
950 2 : ]),
951 2 : }));
952 4 : let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
953 4 : let mock_read_at = Arc::clone(&mock_read_at);
954 4 : async move { mock_read_at.lock().await.read_at(buf, offset).await }
955 4 : })
956 2 : .await;
957 2 : assert!(res.is_ok());
958 2 : assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
959 2 : }
960 :
961 : #[tokio::test]
962 2 : async fn test_eof_before_buffer_full() {
963 2 : let buf = Vec::with_capacity(3);
964 2 : let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
965 2 : expectations: VecDeque::from(vec![
966 2 : Expectation {
967 2 : offset: 0,
968 2 : bytes_total: 3,
969 2 : result: Ok(vec![b'a']),
970 2 : },
971 2 : Expectation {
972 2 : offset: 1,
973 2 : bytes_total: 2,
974 2 : result: Ok(vec![b'b']),
975 2 : },
976 2 : Expectation {
977 2 : offset: 2,
978 2 : bytes_total: 1,
979 2 : result: Ok(vec![]),
980 2 : },
981 2 : ]),
982 2 : }));
983 6 : let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| {
984 6 : let mock_read_at = Arc::clone(&mock_read_at);
985 6 : async move { mock_read_at.lock().await.read_at(buf, offset).await }
986 6 : })
987 2 : .await;
988 2 : let Err(err) = res else {
989 2 : panic!("should return an error");
990 2 : };
991 2 : assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
992 2 : assert_eq!(format!("{err}"), "failed to fill whole buffer");
993 2 : // buffer contents on error are unspecified
994 2 : }
995 : }
996 :
997 : struct FileGuard {
998 : slot_guard: RwLockReadGuard<'static, SlotInner>,
999 : }
1000 :
1001 : impl AsRef<OwnedFd> for FileGuard {
1002 436809 : fn as_ref(&self) -> &OwnedFd {
1003 436809 : // This unwrap is safe because we only create `FileGuard`s
1004 436809 : // if we know that the file is Some.
1005 436809 : self.slot_guard.file.as_ref().unwrap()
1006 436809 : }
1007 : }
1008 :
1009 : impl FileGuard {
1010 : /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
1011 218158 : fn with_std_file<F, R>(&self, with: F) -> R
1012 218158 : where
1013 218158 : F: FnOnce(&File) -> R,
1014 218158 : {
1015 218158 : // SAFETY:
1016 218158 : // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
1017 218158 : // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
1018 218158 : let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
1019 218158 : let res = with(&file);
1020 218158 : let _ = file.into_raw_fd();
1021 218158 : res
1022 218158 : }
1023 : /// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
1024 4 : fn with_std_file_mut<F, R>(&mut self, with: F) -> R
1025 4 : where
1026 4 : F: FnOnce(&mut File) -> R,
1027 4 : {
1028 4 : // SAFETY:
1029 4 : // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
1030 4 : // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
1031 4 : let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
1032 4 : let res = with(&mut file);
1033 4 : let _ = file.into_raw_fd();
1034 4 : res
1035 4 : }
1036 : }
1037 :
1038 : impl tokio_epoll_uring::IoFd for FileGuard {
1039 218647 : unsafe fn as_fd(&self) -> RawFd {
1040 218647 : let owned_fd: &OwnedFd = self.as_ref();
1041 218647 : owned_fd.as_raw_fd()
1042 218647 : }
1043 : }
1044 :
1045 : #[cfg(test)]
1046 : impl VirtualFile {
1047 20200 : pub(crate) async fn read_blk(
1048 20200 : &self,
1049 20200 : blknum: u32,
1050 20200 : ctx: &RequestContext,
1051 20200 : ) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
1052 20200 : use crate::page_cache::PAGE_SZ;
1053 20200 : let buf = vec![0; PAGE_SZ];
1054 20200 : let buf = self
1055 20200 : .read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64), ctx)
1056 10255 : .await?;
1057 20200 : Ok(crate::tenant::block_io::BlockLease::Vec(buf))
1058 20200 : }
1059 :
1060 224 : async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
1061 224 : let mut tmp = vec![0; 128];
1062 : loop {
1063 : let res;
1064 444 : (tmp, res) = self.read_at(tmp, self.pos, ctx).await;
1065 2 : match res {
1066 222 : Ok(0) => return Ok(()),
1067 220 : Ok(n) => {
1068 220 : self.pos += n as u64;
1069 220 : buf.extend_from_slice(&tmp[..n]);
1070 220 : }
1071 2 : Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
1072 2 : Err(e) => return Err(e),
1073 : }
1074 : }
1075 224 : }
1076 : }
1077 :
1078 : impl Drop for VirtualFile {
1079 : /// If a VirtualFile is dropped, close the underlying file if it was open.
1080 4524 : fn drop(&mut self) {
1081 4524 : let handle = self.handle.get_mut();
1082 4524 :
1083 4524 : fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
1084 4524 : if slot_guard.tag == tag {
1085 4524 : slot.recently_used.store(false, Ordering::Relaxed);
1086 4524 : // there is also operation "close-by-replace" for closes done on eviction for
1087 4524 : // comparison.
1088 4524 : if let Some(fd) = slot_guard.file.take() {
1089 4014 : STORAGE_IO_TIME_METRIC
1090 4014 : .get(StorageIoOperation::Close)
1091 4014 : .observe_closure_duration(|| drop(fd));
1092 4014 : }
1093 4524 : }
1094 4524 : }
1095 4524 :
1096 4524 : // We don't have async drop so we cannot directly await the lock here.
1097 4524 : // Instead, first do a best-effort attempt at closing the underlying
1098 4524 : // file descriptor by using `try_write`, and if that fails, spawn
1099 4524 : // a tokio task to do it asynchronously: we just want it to be
1100 4524 : // cleaned up eventually.
1101 4524 : // Most of the time, the `try_lock` should succeed though,
1102 4524 : // as we have `&mut self` access. In other words, if the slot
1103 4524 : // is still occupied by our file, there should be no access from
1104 4524 : // other I/O operations; the only other possible place to lock
1105 4524 : // the slot is the lock algorithm looking for free slots.
1106 4524 : let slot = &get_open_files().slots[handle.index];
1107 4524 : if let Ok(slot_guard) = slot.inner.try_write() {
1108 4524 : clean_slot(slot, slot_guard, handle.tag);
1109 4524 : } else {
1110 0 : let tag = handle.tag;
1111 0 : tokio::spawn(async move {
1112 0 : let slot_guard = slot.inner.write().await;
1113 0 : clean_slot(slot, slot_guard, tag);
1114 0 : });
1115 0 : };
1116 4524 : }
1117 : }
1118 :
1119 : impl OwnedAsyncWriter for VirtualFile {
1120 : #[inline(always)]
1121 3 : async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
1122 3 : &mut self,
1123 3 : buf: B,
1124 3 : ctx: &RequestContext,
1125 3 : ) -> std::io::Result<(usize, B::Buf)> {
1126 3 : let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
1127 3 : res.map(move |v| (v, buf))
1128 3 : }
1129 : }
1130 :
1131 : impl OpenFiles {
1132 138 : fn new(num_slots: usize) -> OpenFiles {
1133 138 : let mut slots = Box::new(Vec::with_capacity(num_slots));
1134 1380 : for _ in 0..num_slots {
1135 1380 : let slot = Slot {
1136 1380 : recently_used: AtomicBool::new(false),
1137 1380 : inner: RwLock::new(SlotInner { tag: 0, file: None }),
1138 1380 : };
1139 1380 : slots.push(slot);
1140 1380 : }
1141 :
1142 138 : OpenFiles {
1143 138 : next: AtomicUsize::new(0),
1144 138 : slots: Box::leak(slots),
1145 138 : }
1146 138 : }
1147 : }
1148 :
1149 : ///
1150 : /// Initialize the virtual file module. This must be called once at page
1151 : /// server startup.
1152 : ///
1153 : #[cfg(not(test))]
1154 0 : pub fn init(num_slots: usize, engine: IoEngineKind) {
1155 0 : if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
1156 0 : panic!("virtual_file::init called twice");
1157 0 : }
1158 0 : io_engine::init(engine);
1159 0 : crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
1160 0 : }
1161 :
1162 : const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
1163 :
1164 : // Get a handle to the global slots array.
1165 446594 : fn get_open_files() -> &'static OpenFiles {
1166 446594 : //
1167 446594 : // In unit tests, page server startup doesn't happen and no one calls
1168 446594 : // virtual_file::init(). Initialize it here, with a small array.
1169 446594 : //
1170 446594 : // This applies to the virtual file tests below, but all other unit
1171 446594 : // tests too, so the virtual file facility is always usable in
1172 446594 : // unit tests.
1173 446594 : //
1174 446594 : if cfg!(test) {
1175 446594 : OPEN_FILES.get_or_init(|| OpenFiles::new(TEST_MAX_FILE_DESCRIPTORS))
1176 : } else {
1177 0 : OPEN_FILES.get().expect("virtual_file::init not called yet")
1178 : }
1179 446594 : }
1180 :
1181 : #[cfg(test)]
1182 : mod tests {
1183 : use crate::context::DownloadBehavior;
1184 : use crate::task_mgr::TaskKind;
1185 :
1186 : use super::*;
1187 : use rand::seq::SliceRandom;
1188 : use rand::thread_rng;
1189 : use rand::Rng;
1190 : use std::io::Write;
1191 : use std::os::unix::fs::FileExt;
1192 : use std::sync::Arc;
1193 :
1194 : enum MaybeVirtualFile {
1195 : VirtualFile(VirtualFile),
1196 : File(File),
1197 : }
1198 :
1199 : impl From<VirtualFile> for MaybeVirtualFile {
1200 6 : fn from(vf: VirtualFile) -> Self {
1201 6 : MaybeVirtualFile::VirtualFile(vf)
1202 6 : }
1203 : }
1204 :
1205 : impl MaybeVirtualFile {
1206 404 : async fn read_exact_at(
1207 404 : &self,
1208 404 : mut buf: Vec<u8>,
1209 404 : offset: u64,
1210 404 : ctx: &RequestContext,
1211 404 : ) -> Result<Vec<u8>, Error> {
1212 404 : match self {
1213 203 : MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await,
1214 202 : MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
1215 : }
1216 404 : }
1217 8 : async fn write_all_at<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
1218 8 : &self,
1219 8 : buf: B,
1220 8 : offset: u64,
1221 8 : ctx: &RequestContext,
1222 8 : ) -> Result<(), Error> {
1223 8 : match self {
1224 4 : MaybeVirtualFile::VirtualFile(file) => {
1225 4 : let (_buf, res) = file.write_all_at(buf, offset, ctx).await;
1226 4 : res
1227 : }
1228 4 : MaybeVirtualFile::File(file) => {
1229 4 : let buf_len = buf.bytes_init();
1230 4 : if buf_len == 0 {
1231 0 : return Ok(());
1232 4 : }
1233 4 : file.write_all_at(&buf.slice(0..buf_len), offset)
1234 : }
1235 : }
1236 8 : }
1237 36 : async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
1238 36 : match self {
1239 18 : MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
1240 18 : MaybeVirtualFile::File(file) => file.seek(pos),
1241 : }
1242 36 : }
1243 8 : async fn write_all<B: BoundedBuf<Buf = Buf>, Buf: IoBuf + Send>(
1244 8 : &mut self,
1245 8 : buf: B,
1246 8 : ctx: &RequestContext,
1247 8 : ) -> Result<(), Error> {
1248 8 : match self {
1249 4 : MaybeVirtualFile::VirtualFile(file) => {
1250 4 : let (_buf, res) = file.write_all(buf, ctx).await;
1251 4 : res.map(|_| ())
1252 : }
1253 4 : MaybeVirtualFile::File(file) => {
1254 4 : let buf_len = buf.bytes_init();
1255 4 : if buf_len == 0 {
1256 0 : return Ok(());
1257 4 : }
1258 4 : file.write_all(&buf.slice(0..buf_len))
1259 : }
1260 : }
1261 8 : }
1262 :
1263 : // Helper function to slurp contents of a file, starting at the current position,
1264 : // into a string
1265 442 : async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
1266 442 : use std::io::Read;
1267 442 : let mut buf = String::new();
1268 442 : match self {
1269 224 : MaybeVirtualFile::VirtualFile(file) => {
1270 224 : let mut buf = Vec::new();
1271 226 : file.read_to_end(&mut buf, ctx).await?;
1272 222 : return Ok(String::from_utf8(buf).unwrap());
1273 : }
1274 218 : MaybeVirtualFile::File(file) => {
1275 218 : file.read_to_string(&mut buf)?;
1276 : }
1277 : }
1278 216 : Ok(buf)
1279 442 : }
1280 :
1281 : // Helper function to slurp a portion of a file into a string
1282 404 : async fn read_string_at(
1283 404 : &mut self,
1284 404 : pos: u64,
1285 404 : len: usize,
1286 404 : ctx: &RequestContext,
1287 404 : ) -> Result<String, Error> {
1288 404 : let buf = vec![0; len];
1289 404 : let buf = self.read_exact_at(buf, pos, ctx).await?;
1290 404 : Ok(String::from_utf8(buf).unwrap())
1291 404 : }
1292 : }
1293 :
1294 : #[tokio::test]
1295 2 : async fn test_virtual_files() -> anyhow::Result<()> {
1296 2 : // The real work is done in the test_files() helper function. This
1297 2 : // allows us to run the same set of tests against a native File, and
1298 2 : // VirtualFile. We trust the native Files and wouldn't need to test them,
1299 2 : // but this allows us to verify that the operations return the same
1300 2 : // results with VirtualFiles as with native Files. (Except that with
1301 2 : // native files, you will run out of file descriptors if the ulimit
1302 2 : // is low enough.)
1303 2 : struct A;
1304 2 :
1305 2 : impl Adapter for A {
1306 206 : async fn open(
1307 206 : path: Utf8PathBuf,
1308 206 : opts: OpenOptions,
1309 206 : ctx: &RequestContext,
1310 206 : ) -> Result<MaybeVirtualFile, anyhow::Error> {
1311 206 : let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
1312 206 : Ok(MaybeVirtualFile::VirtualFile(vf))
1313 206 : }
1314 2 : }
1315 531 : test_files::<A>("virtual_files").await
1316 2 : }
1317 :
1318 : #[tokio::test]
1319 2 : async fn test_physical_files() -> anyhow::Result<()> {
1320 2 : struct B;
1321 2 :
1322 2 : impl Adapter for B {
1323 206 : async fn open(
1324 206 : path: Utf8PathBuf,
1325 206 : opts: OpenOptions,
1326 206 : _ctx: &RequestContext,
1327 206 : ) -> Result<MaybeVirtualFile, anyhow::Error> {
1328 206 : Ok(MaybeVirtualFile::File({
1329 206 : let owned_fd = opts.open(path.as_std_path()).await?;
1330 206 : File::from(owned_fd)
1331 2 : }))
1332 206 : }
1333 2 : }
1334 2 :
1335 104 : test_files::<B>("physical_files").await
1336 2 : }
1337 :
1338 : /// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
1339 : /// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
1340 : /// in trait which benefits from the new lifetime capture rules already.
1341 : trait Adapter {
1342 : async fn open(
1343 : path: Utf8PathBuf,
1344 : opts: OpenOptions,
1345 : ctx: &RequestContext,
1346 : ) -> Result<MaybeVirtualFile, anyhow::Error>;
1347 : }
1348 :
1349 4 : async fn test_files<A>(testname: &str) -> anyhow::Result<()>
1350 4 : where
1351 4 : A: Adapter,
1352 4 : {
1353 4 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1354 4 : let testdir = crate::config::PageServerConf::test_repo_dir(testname);
1355 4 : std::fs::create_dir_all(&testdir)?;
1356 :
1357 4 : let path_a = testdir.join("file_a");
1358 4 : let mut file_a = A::open(
1359 4 : path_a.clone(),
1360 4 : OpenOptions::new()
1361 4 : .write(true)
1362 4 : .create(true)
1363 4 : .truncate(true)
1364 4 : .to_owned(),
1365 4 : &ctx,
1366 4 : )
1367 4 : .await?;
1368 4 : file_a.write_all(b"foobar".to_vec(), &ctx).await?;
1369 :
1370 : // cannot read from a file opened in write-only mode
1371 4 : let _ = file_a.read_string(&ctx).await.unwrap_err();
1372 :
1373 : // Close the file and re-open for reading
1374 4 : let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
1375 :
1376 : // cannot write to a file opened in read-only mode
1377 4 : let _ = file_a.write_all(b"bar".to_vec(), &ctx).await.unwrap_err();
1378 4 :
1379 4 : // Try simple read
1380 4 : assert_eq!("foobar", file_a.read_string(&ctx).await?);
1381 :
1382 : // It's positioned at the EOF now.
1383 4 : assert_eq!("", file_a.read_string(&ctx).await?);
1384 :
1385 : // Test seeks.
1386 4 : assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
1387 4 : assert_eq!("oobar", file_a.read_string(&ctx).await?);
1388 :
1389 4 : assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
1390 4 : assert_eq!("ar", file_a.read_string(&ctx).await?);
1391 :
1392 4 : assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
1393 4 : assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
1394 4 : assert_eq!("bar", file_a.read_string(&ctx).await?);
1395 :
1396 4 : assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
1397 4 : assert_eq!("oobar", file_a.read_string(&ctx).await?);
1398 :
1399 : // Test erroneous seeks to before byte 0
1400 4 : file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
1401 4 : assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
1402 4 : file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
1403 4 :
1404 4 : // the erroneous seek should have left the position unchanged
1405 4 : assert_eq!("oobar", file_a.read_string(&ctx).await?);
1406 :
1407 : // Create another test file, and try FileExt functions on it.
1408 4 : let path_b = testdir.join("file_b");
1409 4 : let mut file_b = A::open(
1410 4 : path_b.clone(),
1411 4 : OpenOptions::new()
1412 4 : .read(true)
1413 4 : .write(true)
1414 4 : .create(true)
1415 4 : .truncate(true)
1416 4 : .to_owned(),
1417 4 : &ctx,
1418 4 : )
1419 2 : .await?;
1420 4 : file_b.write_all_at(b"BAR".to_vec(), 3, &ctx).await?;
1421 4 : file_b.write_all_at(b"FOO".to_vec(), 0, &ctx).await?;
1422 :
1423 4 : assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
1424 :
1425 : // Open a lot of files, enough to cause some evictions. (Or to be precise,
1426 : // open the same file many times. The effect is the same.)
1427 : //
1428 : // leave file_a positioned at offset 1 before we start
1429 4 : assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
1430 :
1431 4 : let mut vfiles = Vec::new();
1432 404 : for _ in 0..100 {
1433 400 : let mut vfile = A::open(
1434 400 : path_b.clone(),
1435 400 : OpenOptions::new().read(true).to_owned(),
1436 400 : &ctx,
1437 400 : )
1438 200 : .await?;
1439 400 : assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
1440 400 : vfiles.push(vfile);
1441 : }
1442 :
1443 : // make sure we opened enough files to definitely cause evictions.
1444 4 : assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
1445 :
1446 : // The underlying file descriptor for 'file_a' should be closed now. Try to read
1447 : // from it again. We left the file positioned at offset 1 above.
1448 4 : assert_eq!("oobar", file_a.read_string(&ctx).await?);
1449 :
1450 : // Check that all the other FDs still work too. Use them in random order for
1451 : // good measure.
1452 4 : vfiles.as_mut_slice().shuffle(&mut thread_rng());
1453 400 : for vfile in vfiles.iter_mut() {
1454 400 : assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
1455 : }
1456 :
1457 4 : Ok(())
1458 4 : }
1459 :
1460 : /// Test using VirtualFiles from many threads concurrently. This tests both using
1461 : /// a lot of VirtualFiles concurrently, causing evictions, and also using the same
1462 : /// VirtualFile from multiple threads concurrently.
1463 : #[tokio::test]
1464 2 : async fn test_vfile_concurrency() -> Result<(), Error> {
1465 2 : const SIZE: usize = 8 * 1024;
1466 2 : const VIRTUAL_FILES: usize = 100;
1467 2 : const THREADS: usize = 100;
1468 2 : const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
1469 2 :
1470 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1471 2 : let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
1472 2 : std::fs::create_dir_all(&testdir)?;
1473 2 :
1474 2 : // Create a test file.
1475 2 : let test_file_path = testdir.join("concurrency_test_file");
1476 2 : {
1477 2 : let file = File::create(&test_file_path)?;
1478 2 : file.write_all_at(&SAMPLE, 0)?;
1479 2 : }
1480 2 :
1481 2 : // Open the file many times.
1482 2 : let mut files = Vec::new();
1483 202 : for _ in 0..VIRTUAL_FILES {
1484 200 : let f = VirtualFile::open_with_options(
1485 200 : &test_file_path,
1486 200 : OpenOptions::new().read(true),
1487 200 : &ctx,
1488 200 : )
1489 101 : .await?;
1490 200 : files.push(f);
1491 2 : }
1492 2 : let files = Arc::new(files);
1493 2 :
1494 2 : // Launch many threads, and use the virtual files concurrently in random order.
1495 2 : let rt = tokio::runtime::Builder::new_multi_thread()
1496 2 : .worker_threads(THREADS)
1497 2 : .thread_name("test_vfile_concurrency thread")
1498 2 : .build()
1499 2 : .unwrap();
1500 2 : let mut hdls = Vec::new();
1501 202 : for _threadno in 0..THREADS {
1502 200 : let files = files.clone();
1503 200 : let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
1504 200 : let hdl = rt.spawn(async move {
1505 200 : let mut buf = vec![0u8; SIZE];
1506 200 : let mut rng = rand::rngs::OsRng;
1507 200000 : for _ in 1..1000 {
1508 199800 : let f = &files[rng.gen_range(0..files.len())];
1509 542210 : buf = f.read_exact_at(buf, 0, &ctx).await.unwrap();
1510 199800 : assert!(buf == SAMPLE);
1511 2 : }
1512 200 : });
1513 200 : hdls.push(hdl);
1514 200 : }
1515 202 : for hdl in hdls {
1516 200 : hdl.await?;
1517 2 : }
1518 2 : std::mem::forget(rt);
1519 2 :
1520 2 : Ok(())
1521 2 : }
1522 :
1523 : #[tokio::test]
1524 2 : async fn test_atomic_overwrite_basic() {
1525 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1526 2 : let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
1527 2 : std::fs::create_dir_all(&testdir).unwrap();
1528 2 :
1529 2 : let path = testdir.join("myfile");
1530 2 : let tmp_path = testdir.join("myfile.tmp");
1531 2 :
1532 2 : VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
1533 2 : .await
1534 2 : .unwrap();
1535 2 : let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
1536 2 : let post = file.read_string(&ctx).await.unwrap();
1537 2 : assert_eq!(post, "foo");
1538 2 : assert!(!tmp_path.exists());
1539 2 : drop(file);
1540 2 :
1541 2 : VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
1542 2 : .await
1543 2 : .unwrap();
1544 2 : let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
1545 2 : let post = file.read_string(&ctx).await.unwrap();
1546 2 : assert_eq!(post, "bar");
1547 2 : assert!(!tmp_path.exists());
1548 2 : drop(file);
1549 2 : }
1550 :
1551 : #[tokio::test]
1552 2 : async fn test_atomic_overwrite_preexisting_tmp() {
1553 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
1554 2 : let testdir =
1555 2 : crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
1556 2 : std::fs::create_dir_all(&testdir).unwrap();
1557 2 :
1558 2 : let path = testdir.join("myfile");
1559 2 : let tmp_path = testdir.join("myfile.tmp");
1560 2 :
1561 2 : std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
1562 2 : assert!(tmp_path.exists());
1563 2 :
1564 2 : VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
1565 2 : .await
1566 2 : .unwrap();
1567 2 :
1568 2 : let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
1569 2 : let post = file.read_string(&ctx).await.unwrap();
1570 2 : assert_eq!(post, "foo");
1571 2 : assert!(!tmp_path.exists());
1572 2 : drop(file);
1573 2 : }
1574 : }
|