LCOV - code coverage report
Current view: top level - libs/utils/src - crashsafe.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 91.5 % 188 172
Test Date: 2024-02-14 18:05:35 Functions: 69.0 % 42 29

            Line data    Source code
       1              : use std::{
       2              :     borrow::Cow,
       3              :     fs::{self, File},
       4              :     io::{self, Write},
       5              : };
       6              : 
       7              : use camino::{Utf8Path, Utf8PathBuf};
       8              : 
       9              : /// Similar to [`std::fs::create_dir`], except we fsync the
      10              : /// created directory and its parent.
      11         1155 : pub fn create_dir(path: impl AsRef<Utf8Path>) -> io::Result<()> {
      12         1155 :     let path = path.as_ref();
      13         1155 : 
      14         1155 :     fs::create_dir(path)?;
      15         1151 :     fsync_file_and_parent(path)?;
      16         1151 :     Ok(())
      17         1155 : }
      18              : 
      19              : /// Similar to [`std::fs::create_dir_all`], except we fsync all
      20              : /// newly created directories and the pre-existing parent.
      21          409 : pub fn create_dir_all(path: impl AsRef<Utf8Path>) -> io::Result<()> {
      22          409 :     let mut path = path.as_ref();
      23          409 : 
      24          409 :     let mut dirs_to_create = Vec::new();
      25              : 
      26              :     // Figure out which directories we need to create.
      27              :     loop {
      28          814 :         match path.metadata() {
      29          407 :             Ok(metadata) if metadata.is_dir() => break,
      30              :             Ok(_) => {
      31            2 :                 return Err(io::Error::new(
      32            2 :                     io::ErrorKind::AlreadyExists,
      33            2 :                     format!("non-directory found in path: {path}"),
      34            2 :                 ));
      35              :             }
      36          407 :             Err(ref e) if e.kind() == io::ErrorKind::NotFound => {}
      37            2 :             Err(e) => return Err(e),
      38              :         }
      39              : 
      40          405 :         dirs_to_create.push(path);
      41          405 : 
      42          405 :         match path.parent() {
      43          405 :             Some(parent) => path = parent,
      44              :             None => {
      45            0 :                 return Err(io::Error::new(
      46            0 :                     io::ErrorKind::InvalidInput,
      47            0 :                     format!("can't find parent of path '{path}'"),
      48            0 :                 ));
      49              :             }
      50              :         }
      51              :     }
      52              : 
      53              :     // Create directories from parent to child.
      54          405 :     for &path in dirs_to_create.iter().rev() {
      55          405 :         fs::create_dir(path)?;
      56              :     }
      57              : 
      58              :     // Fsync the created directories from child to parent.
      59          405 :     for &path in dirs_to_create.iter() {
      60          405 :         fsync(path)?;
      61              :     }
      62              : 
      63              :     // If we created any new directories, fsync the parent.
      64          405 :     if !dirs_to_create.is_empty() {
      65          403 :         fsync(path)?;
      66            2 :     }
      67              : 
      68          405 :     Ok(())
      69          409 : }
      70              : 
      71              : /// Adds a suffix to the file(directory) name, either appending the suffix to the end of its extension,
      72              : /// or if there's no extension, creates one and puts a suffix there.
      73        35140 : pub fn path_with_suffix_extension(
      74        35140 :     original_path: impl AsRef<Utf8Path>,
      75        35140 :     suffix: &str,
      76        35140 : ) -> Utf8PathBuf {
      77        35140 :     let new_extension = match original_path.as_ref().extension() {
      78         4615 :         Some(extension) => Cow::Owned(format!("{extension}.{suffix}")),
      79        30525 :         None => Cow::Borrowed(suffix),
      80              :     };
      81        35140 :     original_path.as_ref().with_extension(new_extension)
      82        35140 : }
      83              : 
      84         3552 : pub fn fsync_file_and_parent(file_path: &Utf8Path) -> io::Result<()> {
      85         3552 :     let parent = file_path.parent().ok_or_else(|| {
      86            0 :         io::Error::new(
      87            0 :             io::ErrorKind::Other,
      88            0 :             format!("File {file_path:?} has no parent"),
      89            0 :         )
      90         3552 :     })?;
      91              : 
      92         3552 :     fsync(file_path)?;
      93         3552 :     fsync(parent)?;
      94         3552 :     Ok(())
      95         3552 : }
      96              : 
      97         9078 : pub fn fsync(path: &Utf8Path) -> io::Result<()> {
      98         9078 :     File::open(path)
      99         9078 :         .map_err(|e| io::Error::new(e.kind(), format!("Failed to open the file {path:?}: {e}")))
     100         9078 :         .and_then(|file| {
     101         9078 :             file.sync_all().map_err(|e| {
     102            0 :                 io::Error::new(
     103            0 :                     e.kind(),
     104            0 :                     format!("Failed to sync file {path:?} data and metadata: {e}"),
     105            0 :                 )
     106         9078 :             })
     107         9078 :         })
     108         9078 :         .map_err(|e| io::Error::new(e.kind(), format!("Failed to fsync file {path:?}: {e}")))
     109         9078 : }
     110              : 
     111        10978 : pub async fn fsync_async(path: impl AsRef<Utf8Path>) -> Result<(), std::io::Error> {
     112        10978 :     tokio::fs::File::open(path.as_ref()).await?.sync_all().await
     113        10977 : }
     114              : 
     115        18747 : pub async fn fsync_async_opt(
     116        18747 :     path: impl AsRef<Utf8Path>,
     117        18747 :     do_fsync: bool,
     118        18747 : ) -> Result<(), std::io::Error> {
     119        18747 :     if do_fsync {
     120           24 :         fsync_async(path.as_ref()).await?;
     121        18735 :     }
     122        18747 :     Ok(())
     123        18747 : }
     124              : 
     125              : /// Like postgres' durable_rename, renames file issuing fsyncs do make it
     126              : /// durable. After return, file and rename are guaranteed to be persisted.
     127              : ///
     128              : /// Unlike postgres, it only does fsyncs to 1) file to be renamed to make
     129              : /// contents durable; 2) its directory entry to make rename durable 3) again to
     130              : /// already renamed file, which is not required by standards but postgres does
     131              : /// it, let's stick to that. Postgres additionally fsyncs newpath *before*
     132              : /// rename if it exists to ensure that at least one of the files survives, but
     133              : /// current callers don't need that.
     134              : ///
     135              : /// virtual_file.rs has similar code, but it doesn't use vfs.
     136              : ///
     137              : /// Useful links: <https://lwn.net/Articles/457667/>
     138              : /// <https://www.postgresql.org/message-id/flat/56583BDD.9060302%402ndquadrant.com>
     139              : /// <https://thunk.org/tytso/blog/2009/03/15/dont-fear-the-fsync/>
     140         6249 : pub async fn durable_rename(
     141         6249 :     old_path: impl AsRef<Utf8Path>,
     142         6249 :     new_path: impl AsRef<Utf8Path>,
     143         6249 :     do_fsync: bool,
     144         6249 : ) -> io::Result<()> {
     145         6249 :     // first fsync the file
     146         6249 :     fsync_async_opt(old_path.as_ref(), do_fsync).await?;
     147              : 
     148              :     // Time to do the real deal.
     149         6249 :     tokio::fs::rename(old_path.as_ref(), new_path.as_ref()).await?;
     150              : 
     151              :     // Postgres'ish fsync of renamed file.
     152         6249 :     fsync_async_opt(new_path.as_ref(), do_fsync).await?;
     153              : 
     154              :     // Now fsync the parent
     155         6249 :     let parent = match new_path.as_ref().parent() {
     156         6249 :         Some(p) => p,
     157            0 :         None => Utf8Path::new("./"), // assume current dir if there is no parent
     158              :     };
     159         6249 :     fsync_async_opt(parent, do_fsync).await?;
     160              : 
     161         6249 :     Ok(())
     162         6249 : }
     163              : 
     164              : /// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`].
     165              : ///
     166              : /// The file is first written to the specified `tmp_path`, and in a second
     167              : /// step, the `tmp_path` is renamed to the `final_path`. Intermediary fsync
     168              : /// and atomic rename guarantee that, if we crash at any point, there will never
     169              : /// be a partially written file at `final_path` (but maybe at `tmp_path`).
     170              : ///
     171              : /// Callers are responsible for serializing calls of this function for a given `final_path`.
     172              : /// If they don't, there may be an error due to conflicting `tmp_path`, or there will
     173              : /// be no error and the content of `final_path` will be the "winner" caller's `content`.
     174              : /// I.e., the atomticity guarantees still hold.
     175         8859 : pub fn overwrite(
     176         8859 :     final_path: &Utf8Path,
     177         8859 :     tmp_path: &Utf8Path,
     178         8859 :     content: &[u8],
     179         8859 : ) -> std::io::Result<()> {
     180         8859 :     let Some(final_path_parent) = final_path.parent() else {
     181            0 :         return Err(std::io::Error::from_raw_os_error(
     182            0 :             nix::errno::Errno::EINVAL as i32,
     183            0 :         ));
     184              :     };
     185         8859 :     std::fs::remove_file(tmp_path).or_else(crate::fs_ext::ignore_not_found)?;
     186         8859 :     let mut file = std::fs::OpenOptions::new()
     187         8859 :         .write(true)
     188         8859 :         // Use `create_new` so that, if we race with ourselves or something else,
     189         8859 :         // we bail out instead of causing damage.
     190         8859 :         .create_new(true)
     191         8859 :         .open(tmp_path)?;
     192         8859 :     file.write_all(content)?;
     193         8859 :     file.sync_all()?;
     194         8859 :     drop(file); // don't keep the fd open for longer than we have to
     195         8859 : 
     196         8859 :     std::fs::rename(tmp_path, final_path)?;
     197              : 
     198         8859 :     let final_parent_dirfd = std::fs::OpenOptions::new()
     199         8859 :         .read(true)
     200         8859 :         .open(final_path_parent)?;
     201              : 
     202         8859 :     final_parent_dirfd.sync_all()?;
     203         8859 :     Ok(())
     204         8859 : }
     205              : 
     206              : #[cfg(test)]
     207              : mod tests {
     208              : 
     209              :     use super::*;
     210              : 
     211            2 :     #[test]
     212            2 :     fn test_create_dir_fsyncd() {
     213            2 :         let dir = camino_tempfile::tempdir().unwrap();
     214            2 : 
     215            2 :         let existing_dir_path = dir.path();
     216            2 :         let err = create_dir(existing_dir_path).unwrap_err();
     217            2 :         assert_eq!(err.kind(), io::ErrorKind::AlreadyExists);
     218              : 
     219            2 :         let child_dir = existing_dir_path.join("child");
     220            2 :         create_dir(child_dir).unwrap();
     221            2 : 
     222            2 :         let nested_child_dir = existing_dir_path.join("child1").join("child2");
     223            2 :         let err = create_dir(nested_child_dir).unwrap_err();
     224            2 :         assert_eq!(err.kind(), io::ErrorKind::NotFound);
     225            2 :     }
     226              : 
     227            2 :     #[test]
     228            2 :     fn test_create_dir_all_fsyncd() {
     229            2 :         let dir = camino_tempfile::tempdir().unwrap();
     230            2 : 
     231            2 :         let existing_dir_path = dir.path();
     232            2 :         create_dir_all(existing_dir_path).unwrap();
     233            2 : 
     234            2 :         let child_dir = existing_dir_path.join("child");
     235            2 :         assert!(!child_dir.exists());
     236            2 :         create_dir_all(&child_dir).unwrap();
     237            2 :         assert!(child_dir.exists());
     238              : 
     239            2 :         let nested_child_dir = existing_dir_path.join("child1").join("child2");
     240            2 :         assert!(!nested_child_dir.exists());
     241            2 :         create_dir_all(&nested_child_dir).unwrap();
     242            2 :         assert!(nested_child_dir.exists());
     243              : 
     244            2 :         let file_path = existing_dir_path.join("file");
     245            2 :         std::fs::write(&file_path, b"").unwrap();
     246            2 : 
     247            2 :         let err = create_dir_all(&file_path).unwrap_err();
     248            2 :         assert_eq!(err.kind(), io::ErrorKind::AlreadyExists);
     249              : 
     250            2 :         let invalid_dir_path = file_path.join("folder");
     251            2 :         create_dir_all(invalid_dir_path).unwrap_err();
     252            2 :     }
     253              : 
     254            2 :     #[test]
     255            2 :     fn test_path_with_suffix_extension() {
     256            2 :         let p = Utf8PathBuf::from("/foo/bar");
     257            2 :         assert_eq!(
     258            2 :             &path_with_suffix_extension(p, "temp").to_string(),
     259            2 :             "/foo/bar.temp"
     260            2 :         );
     261            2 :         let p = Utf8PathBuf::from("/foo/bar");
     262            2 :         assert_eq!(
     263            2 :             &path_with_suffix_extension(p, "temp.temp").to_string(),
     264            2 :             "/foo/bar.temp.temp"
     265            2 :         );
     266            2 :         let p = Utf8PathBuf::from("/foo/bar.baz");
     267            2 :         assert_eq!(
     268            2 :             &path_with_suffix_extension(p, "temp.temp").to_string(),
     269            2 :             "/foo/bar.baz.temp.temp"
     270            2 :         );
     271            2 :         let p = Utf8PathBuf::from("/foo/bar.baz");
     272            2 :         assert_eq!(
     273            2 :             &path_with_suffix_extension(p, ".temp").to_string(),
     274            2 :             "/foo/bar.baz..temp"
     275            2 :         );
     276            2 :         let p = Utf8PathBuf::from("/foo/bar/dir/");
     277            2 :         assert_eq!(
     278            2 :             &path_with_suffix_extension(p, ".temp").to_string(),
     279            2 :             "/foo/bar/dir..temp"
     280            2 :         );
     281            2 :     }
     282              : }
        

Generated by: LCOV version 2.1-beta