Line data Source code
1 : use std::{
2 : borrow::Cow,
3 : fs::{self, File},
4 : io::{self, Write},
5 : };
6 :
7 : use camino::{Utf8Path, Utf8PathBuf};
8 :
9 : /// Similar to [`std::fs::create_dir`], except we fsync the
10 : /// created directory and its parent.
11 1155 : pub fn create_dir(path: impl AsRef<Utf8Path>) -> io::Result<()> {
12 1155 : let path = path.as_ref();
13 1155 :
14 1155 : fs::create_dir(path)?;
15 1151 : fsync_file_and_parent(path)?;
16 1151 : Ok(())
17 1155 : }
18 :
19 : /// Similar to [`std::fs::create_dir_all`], except we fsync all
20 : /// newly created directories and the pre-existing parent.
21 409 : pub fn create_dir_all(path: impl AsRef<Utf8Path>) -> io::Result<()> {
22 409 : let mut path = path.as_ref();
23 409 :
24 409 : let mut dirs_to_create = Vec::new();
25 :
26 : // Figure out which directories we need to create.
27 : loop {
28 814 : match path.metadata() {
29 407 : Ok(metadata) if metadata.is_dir() => break,
30 : Ok(_) => {
31 2 : return Err(io::Error::new(
32 2 : io::ErrorKind::AlreadyExists,
33 2 : format!("non-directory found in path: {path}"),
34 2 : ));
35 : }
36 407 : Err(ref e) if e.kind() == io::ErrorKind::NotFound => {}
37 2 : Err(e) => return Err(e),
38 : }
39 :
40 405 : dirs_to_create.push(path);
41 405 :
42 405 : match path.parent() {
43 405 : Some(parent) => path = parent,
44 : None => {
45 0 : return Err(io::Error::new(
46 0 : io::ErrorKind::InvalidInput,
47 0 : format!("can't find parent of path '{path}'"),
48 0 : ));
49 : }
50 : }
51 : }
52 :
53 : // Create directories from parent to child.
54 405 : for &path in dirs_to_create.iter().rev() {
55 405 : fs::create_dir(path)?;
56 : }
57 :
58 : // Fsync the created directories from child to parent.
59 405 : for &path in dirs_to_create.iter() {
60 405 : fsync(path)?;
61 : }
62 :
63 : // If we created any new directories, fsync the parent.
64 405 : if !dirs_to_create.is_empty() {
65 403 : fsync(path)?;
66 2 : }
67 :
68 405 : Ok(())
69 409 : }
70 :
71 : /// Adds a suffix to the file(directory) name, either appending the suffix to the end of its extension,
72 : /// or if there's no extension, creates one and puts a suffix there.
73 35140 : pub fn path_with_suffix_extension(
74 35140 : original_path: impl AsRef<Utf8Path>,
75 35140 : suffix: &str,
76 35140 : ) -> Utf8PathBuf {
77 35140 : let new_extension = match original_path.as_ref().extension() {
78 4615 : Some(extension) => Cow::Owned(format!("{extension}.{suffix}")),
79 30525 : None => Cow::Borrowed(suffix),
80 : };
81 35140 : original_path.as_ref().with_extension(new_extension)
82 35140 : }
83 :
84 3552 : pub fn fsync_file_and_parent(file_path: &Utf8Path) -> io::Result<()> {
85 3552 : let parent = file_path.parent().ok_or_else(|| {
86 0 : io::Error::new(
87 0 : io::ErrorKind::Other,
88 0 : format!("File {file_path:?} has no parent"),
89 0 : )
90 3552 : })?;
91 :
92 3552 : fsync(file_path)?;
93 3552 : fsync(parent)?;
94 3552 : Ok(())
95 3552 : }
96 :
97 9078 : pub fn fsync(path: &Utf8Path) -> io::Result<()> {
98 9078 : File::open(path)
99 9078 : .map_err(|e| io::Error::new(e.kind(), format!("Failed to open the file {path:?}: {e}")))
100 9078 : .and_then(|file| {
101 9078 : file.sync_all().map_err(|e| {
102 0 : io::Error::new(
103 0 : e.kind(),
104 0 : format!("Failed to sync file {path:?} data and metadata: {e}"),
105 0 : )
106 9078 : })
107 9078 : })
108 9078 : .map_err(|e| io::Error::new(e.kind(), format!("Failed to fsync file {path:?}: {e}")))
109 9078 : }
110 :
111 10978 : pub async fn fsync_async(path: impl AsRef<Utf8Path>) -> Result<(), std::io::Error> {
112 10978 : tokio::fs::File::open(path.as_ref()).await?.sync_all().await
113 10977 : }
114 :
115 18747 : pub async fn fsync_async_opt(
116 18747 : path: impl AsRef<Utf8Path>,
117 18747 : do_fsync: bool,
118 18747 : ) -> Result<(), std::io::Error> {
119 18747 : if do_fsync {
120 24 : fsync_async(path.as_ref()).await?;
121 18735 : }
122 18747 : Ok(())
123 18747 : }
124 :
125 : /// Like postgres' durable_rename, renames file issuing fsyncs do make it
126 : /// durable. After return, file and rename are guaranteed to be persisted.
127 : ///
128 : /// Unlike postgres, it only does fsyncs to 1) file to be renamed to make
129 : /// contents durable; 2) its directory entry to make rename durable 3) again to
130 : /// already renamed file, which is not required by standards but postgres does
131 : /// it, let's stick to that. Postgres additionally fsyncs newpath *before*
132 : /// rename if it exists to ensure that at least one of the files survives, but
133 : /// current callers don't need that.
134 : ///
135 : /// virtual_file.rs has similar code, but it doesn't use vfs.
136 : ///
137 : /// Useful links: <https://lwn.net/Articles/457667/>
138 : /// <https://www.postgresql.org/message-id/flat/56583BDD.9060302%402ndquadrant.com>
139 : /// <https://thunk.org/tytso/blog/2009/03/15/dont-fear-the-fsync/>
140 6249 : pub async fn durable_rename(
141 6249 : old_path: impl AsRef<Utf8Path>,
142 6249 : new_path: impl AsRef<Utf8Path>,
143 6249 : do_fsync: bool,
144 6249 : ) -> io::Result<()> {
145 6249 : // first fsync the file
146 6249 : fsync_async_opt(old_path.as_ref(), do_fsync).await?;
147 :
148 : // Time to do the real deal.
149 6249 : tokio::fs::rename(old_path.as_ref(), new_path.as_ref()).await?;
150 :
151 : // Postgres'ish fsync of renamed file.
152 6249 : fsync_async_opt(new_path.as_ref(), do_fsync).await?;
153 :
154 : // Now fsync the parent
155 6249 : let parent = match new_path.as_ref().parent() {
156 6249 : Some(p) => p,
157 0 : None => Utf8Path::new("./"), // assume current dir if there is no parent
158 : };
159 6249 : fsync_async_opt(parent, do_fsync).await?;
160 :
161 6249 : Ok(())
162 6249 : }
163 :
164 : /// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`].
165 : ///
166 : /// The file is first written to the specified `tmp_path`, and in a second
167 : /// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync
168 : /// and atomic rename guarantee that, if we crash at any point, there will never
169 : /// be a partially written file at `final_path` (but maybe at `tmp_path`).
170 : ///
171 : /// Callers are responsible for serializing calls of this function for a given `final_path`.
172 : /// If they don't, there may be an error due to conflicting `tmp_path`, or there will
173 : /// be no error and the content of `final_path` will be the "winner" caller's `content`.
174 : /// I.e., the atomticity guarantees still hold.
175 8859 : pub fn overwrite(
176 8859 : final_path: &Utf8Path,
177 8859 : tmp_path: &Utf8Path,
178 8859 : content: &[u8],
179 8859 : ) -> std::io::Result<()> {
180 8859 : let Some(final_path_parent) = final_path.parent() else {
181 0 : return Err(std::io::Error::from_raw_os_error(
182 0 : nix::errno::Errno::EINVAL as i32,
183 0 : ));
184 : };
185 8859 : std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
186 8859 : let mut file = std::fs::OpenOptions::new()
187 8859 : .write(true)
188 8859 : // Use `create_new` so that, if we race with ourselves or something else,
189 8859 : // we bail out instead of causing damage.
190 8859 : .create_new(true)
191 8859 : .open(tmp_path)?;
192 8859 : file.write_all(content)?;
193 8859 : file.sync_all()?;
194 8859 : drop(file); // don't keep the fd open for longer than we have to
195 8859 :
196 8859 : std::fs::rename(tmp_path, final_path)?;
197 :
198 8859 : let final_parent_dirfd = std::fs::OpenOptions::new()
199 8859 : .read(true)
200 8859 : .open(final_path_parent)?;
201 :
202 8859 : final_parent_dirfd.sync_all()?;
203 8859 : Ok(())
204 8859 : }
205 :
206 : #[cfg(test)]
207 : mod tests {
208 :
209 : use super::*;
210 :
211 2 : #[test]
212 2 : fn test_create_dir_fsyncd() {
213 2 : let dir = camino_tempfile::tempdir().unwrap();
214 2 :
215 2 : let existing_dir_path = dir.path();
216 2 : let err = create_dir(existing_dir_path).unwrap_err();
217 2 : assert_eq!(err.kind(), io::ErrorKind::AlreadyExists);
218 :
219 2 : let child_dir = existing_dir_path.join("child");
220 2 : create_dir(child_dir).unwrap();
221 2 :
222 2 : let nested_child_dir = existing_dir_path.join("child1").join("child2");
223 2 : let err = create_dir(nested_child_dir).unwrap_err();
224 2 : assert_eq!(err.kind(), io::ErrorKind::NotFound);
225 2 : }
226 :
227 2 : #[test]
228 2 : fn test_create_dir_all_fsyncd() {
229 2 : let dir = camino_tempfile::tempdir().unwrap();
230 2 :
231 2 : let existing_dir_path = dir.path();
232 2 : create_dir_all(existing_dir_path).unwrap();
233 2 :
234 2 : let child_dir = existing_dir_path.join("child");
235 2 : assert!(!child_dir.exists());
236 2 : create_dir_all(&child_dir).unwrap();
237 2 : assert!(child_dir.exists());
238 :
239 2 : let nested_child_dir = existing_dir_path.join("child1").join("child2");
240 2 : assert!(!nested_child_dir.exists());
241 2 : create_dir_all(&nested_child_dir).unwrap();
242 2 : assert!(nested_child_dir.exists());
243 :
244 2 : let file_path = existing_dir_path.join("file");
245 2 : std::fs::write(&file_path, b"").unwrap();
246 2 :
247 2 : let err = create_dir_all(&file_path).unwrap_err();
248 2 : assert_eq!(err.kind(), io::ErrorKind::AlreadyExists);
249 :
250 2 : let invalid_dir_path = file_path.join("folder");
251 2 : create_dir_all(invalid_dir_path).unwrap_err();
252 2 : }
253 :
254 2 : #[test]
255 2 : fn test_path_with_suffix_extension() {
256 2 : let p = Utf8PathBuf::from("/foo/bar");
257 2 : assert_eq!(
258 2 : &path_with_suffix_extension(p, "temp").to_string(),
259 2 : "/foo/bar.temp"
260 2 : );
261 2 : let p = Utf8PathBuf::from("/foo/bar");
262 2 : assert_eq!(
263 2 : &path_with_suffix_extension(p, "temp.temp").to_string(),
264 2 : "/foo/bar.temp.temp"
265 2 : );
266 2 : let p = Utf8PathBuf::from("/foo/bar.baz");
267 2 : assert_eq!(
268 2 : &path_with_suffix_extension(p, "temp.temp").to_string(),
269 2 : "/foo/bar.baz.temp.temp"
270 2 : );
271 2 : let p = Utf8PathBuf::from("/foo/bar.baz");
272 2 : assert_eq!(
273 2 : &path_with_suffix_extension(p, ".temp").to_string(),
274 2 : "/foo/bar.baz..temp"
275 2 : );
276 2 : let p = Utf8PathBuf::from("/foo/bar/dir/");
277 2 : assert_eq!(
278 2 : &path_with_suffix_extension(p, ".temp").to_string(),
279 2 : "/foo/bar/dir..temp"
280 2 : );
281 2 : }
282 : }
|