Line data Source code
1 : //! [`super::VirtualFile`] supports different IO engines.
2 : //!
3 : //! The [`IoEngineKind`] enum identifies them.
4 : //!
5 : //! The choice of IO engine is global.
6 : //! Initialize using [`init`].
7 : //!
8 : //! Then use [`get`] and [`super::OpenOptions`].
9 :
10 : #[derive(
11 : Copy,
12 0 : Clone,
13 4 : PartialEq,
14 : Eq,
15 0 : Hash,
16 1208 : strum_macros::EnumString,
17 604 : strum_macros::Display,
18 0 : serde_with::DeserializeFromStr,
19 0 : serde_with::SerializeDisplay,
20 0 : Debug,
21 : )]
22 : #[strum(serialize_all = "kebab-case")]
23 : pub enum IoEngineKind {
24 : StdFs,
25 : #[cfg(target_os = "linux")]
26 : TokioEpollUring,
27 : }
28 :
29 : static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
30 :
31 : #[cfg(not(test))]
32 604 : pub(super) fn init(engine: IoEngineKind) {
33 604 : if IO_ENGINE.set(engine).is_err() {
34 0 : panic!("called twice");
35 604 : }
36 604 : crate::metrics::virtual_file_io_engine::KIND
37 604 : .with_label_values(&[&format!("{engine}")])
38 604 : .set(1);
39 604 : }
40 :
41 293275 : pub(super) fn get() -> &'static IoEngineKind {
42 293275 : #[cfg(test)]
43 293275 : {
44 293275 : let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
45 293275 : IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
46 6304092 : Ok(v) => match v.parse::<IoEngineKind>() {
47 6304092 : Ok(engine_kind) => engine_kind,
48 6304092 : Err(e) => {
49 0 : panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
50 6304092 : }
51 6304092 : },
52 6304092 : Err(std::env::VarError::NotPresent) => {
53 6304092 : crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
54 0 : .parse()
55 0 : .unwrap()
56 6304092 : }
57 6304092 : Err(std::env::VarError::NotUnicode(_)) => {
58 6304092 : panic!("env var {env_var_name} is not unicode");
59 6304092 : }
60 6304092 : })
61 293275 : }
62 293275 : #[cfg(not(test))]
63 293275 : IO_ENGINE.get().unwrap()
64 293275 : }
65 :
66 : use std::os::unix::prelude::FileExt;
67 :
68 : use super::FileGuard;
69 :
70 : impl IoEngineKind {
71 6516922 : pub(super) async fn read_at<B>(
72 6516922 : &self,
73 6516922 : file_guard: FileGuard,
74 6516922 : offset: u64,
75 6516922 : mut buf: B,
76 6516922 : ) -> ((FileGuard, B), std::io::Result<usize>)
77 6516922 : where
78 6516922 : B: tokio_epoll_uring::BoundedBufMut + Send,
79 6516922 : {
80 6516922 : match self {
81 : IoEngineKind::StdFs => {
82 : // SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
83 6372097 : let dst = unsafe {
84 6372097 : std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
85 6372097 : };
86 6372097 : let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
87 6372097 : if let Ok(nbytes) = &res {
88 6372096 : assert!(*nbytes <= buf.bytes_total());
89 : // SAFETY: see above assertion
90 6372096 : unsafe {
91 6372096 : buf.set_init(*nbytes);
92 6372096 : }
93 1 : }
94 : #[allow(dropping_references)]
95 6372097 : drop(dst);
96 6372097 : ((file_guard, buf), res)
97 : }
98 : #[cfg(target_os = "linux")]
99 : IoEngineKind::TokioEpollUring => {
100 144825 : let system = tokio_epoll_uring::thread_local_system().await;
101 144844 : let (resources, res) = system.read(file_guard, offset, buf).await;
102 144825 : (
103 144825 : resources,
104 144825 : res.map_err(|e| match e {
105 1 : tokio_epoll_uring::Error::Op(e) => e,
106 0 : tokio_epoll_uring::Error::System(system) => {
107 0 : std::io::Error::new(std::io::ErrorKind::Other, system)
108 : }
109 144825 : }),
110 144825 : )
111 : }
112 : }
113 6516922 : }
114 : }
|