LCOV - differential code coverage report
Current view: top level - safekeeper/src - control_file.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 84.5 % 220 186 34 186
Current Date: 2023-10-19 02:04:12 Functions: 69.2 % 26 18 8 18
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Control file serialization, deserialization and persistence.
       2                 : 
       3                 : use anyhow::{bail, ensure, Context, Result};
       4                 : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
       5                 : use camino::Utf8PathBuf;
       6                 : use tokio::fs::{self, File};
       7                 : use tokio::io::AsyncWriteExt;
       8                 : 
       9                 : use std::io::Read;
      10                 : use std::ops::Deref;
      11                 : use std::path::Path;
      12                 : use std::time::Instant;
      13                 : 
      14                 : use crate::control_file_upgrade::upgrade_control_file;
      15                 : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
      16                 : use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC};
      17                 : use utils::{bin_ser::LeSer, id::TenantTimelineId};
      18                 : 
      19                 : use crate::SafeKeeperConf;
      20                 : 
      21                 : use std::convert::TryInto;
      22                 : 
      23                 : // contains persistent metadata for safekeeper
      24                 : const CONTROL_FILE_NAME: &str = "safekeeper.control";
      25                 : // needed to atomically update the state using `rename`
      26                 : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
      27                 : pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
      28                 : 
      29                 : /// Storage should keep actual state inside of it. It should implement Deref
      30                 : /// trait to access state fields and have persist method for updating that state.
      31                 : #[async_trait::async_trait]
      32                 : pub trait Storage: Deref<Target = SafeKeeperState> {
      33                 :     /// Persist safekeeper state on disk and update internal state.
      34                 :     async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
      35                 : 
      36                 :     /// Timestamp of last persist.
      37                 :     fn last_persist_at(&self) -> Instant;
      38                 : }
      39                 : 
      40 UBC           0 : #[derive(Debug)]
      41                 : pub struct FileStorage {
      42                 :     // save timeline dir to avoid reconstructing it every time
      43                 :     timeline_dir: Utf8PathBuf,
      44                 :     conf: SafeKeeperConf,
      45                 : 
      46                 :     /// Last state persisted to disk.
      47                 :     state: SafeKeeperState,
      48                 :     /// Not preserved across restarts.
      49                 :     last_persist_at: Instant,
      50                 : }
      51                 : 
      52                 : impl FileStorage {
      53                 :     /// Initialize storage by loading state from disk.
      54 CBC          86 :     pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
      55              86 :         let timeline_dir = conf.timeline_dir(ttid);
      56                 : 
      57              86 :         let state = Self::load_control_file_conf(conf, ttid)?;
      58                 : 
      59              85 :         Ok(FileStorage {
      60              85 :             timeline_dir,
      61              85 :             conf: conf.clone(),
      62              85 :             state,
      63              85 :             last_persist_at: Instant::now(),
      64              85 :         })
      65              86 :     }
      66                 : 
      67                 :     /// Create file storage for a new timeline, but don't persist it yet.
      68             483 :     pub fn create_new(
      69             483 :         ttid: &TenantTimelineId,
      70             483 :         conf: &SafeKeeperConf,
      71             483 :         state: SafeKeeperState,
      72             483 :     ) -> Result<FileStorage> {
      73             483 :         let timeline_dir = conf.timeline_dir(ttid);
      74             483 : 
      75             483 :         let store = FileStorage {
      76             483 :             timeline_dir,
      77             483 :             conf: conf.clone(),
      78             483 :             state,
      79             483 :             last_persist_at: Instant::now(),
      80             483 :         };
      81             483 : 
      82             483 :         Ok(store)
      83             483 :     }
      84                 : 
      85                 :     /// Check the magic/version in the on-disk data and deserialize it, if possible.
      86              87 :     fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
      87                 :         // Read the version independent part
      88              87 :         let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
      89              87 :         if magic != SK_MAGIC {
      90 UBC           0 :             bail!(
      91               0 :                 "bad control file magic: {:X}, expected {:X}",
      92               0 :                 magic,
      93               0 :                 SK_MAGIC
      94               0 :             );
      95 CBC          87 :         }
      96              87 :         let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
      97              87 :         if version == SK_FORMAT_VERSION {
      98              87 :             let res = SafeKeeperState::des(buf)?;
      99              87 :             return Ok(res);
     100 UBC           0 :         }
     101               0 :         // try to upgrade
     102               0 :         upgrade_control_file(buf, version)
     103 CBC          87 :     }
     104                 : 
     105                 :     /// Load control file for given ttid at path specified by conf.
     106              87 :     pub fn load_control_file_conf(
     107              87 :         conf: &SafeKeeperConf,
     108              87 :         ttid: &TenantTimelineId,
     109              87 :     ) -> Result<SafeKeeperState> {
     110              87 :         let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME);
     111              87 :         Self::load_control_file(path)
     112              87 :     }
     113                 : 
     114                 :     /// Read in the control file.
     115              88 :     pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
     116              88 :         let mut control_file = std::fs::OpenOptions::new()
     117              88 :             .read(true)
     118              88 :             .write(true)
     119              88 :             .open(&control_file_path)
     120              88 :             .with_context(|| {
     121 UBC           0 :                 format!(
     122               0 :                     "failed to open control file at {}",
     123               0 :                     control_file_path.as_ref().display(),
     124               0 :                 )
     125 CBC          88 :             })?;
     126                 : 
     127              88 :         let mut buf = Vec::new();
     128              88 :         control_file
     129              88 :             .read_to_end(&mut buf)
     130              88 :             .context("failed to read control file")?;
     131                 : 
     132              88 :         let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
     133                 : 
     134              88 :         let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
     135              88 :             buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
     136              88 :         let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
     137              88 : 
     138              88 :         ensure!(
     139              88 :             calculated_checksum == expected_checksum,
     140               1 :             format!(
     141               1 :                 "safekeeper control file checksum mismatch: expected {} got {}",
     142               1 :                 expected_checksum, calculated_checksum
     143               1 :             )
     144                 :         );
     145                 : 
     146              87 :         let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
     147              87 :             .with_context(|| {
     148 UBC           0 :                 format!(
     149               0 :                     "while reading control file {}",
     150               0 :                     control_file_path.as_ref().display(),
     151               0 :                 )
     152 CBC          87 :             })?;
     153              87 :         Ok(state)
     154              88 :     }
     155                 : }
     156                 : 
     157                 : impl Deref for FileStorage {
     158                 :     type Target = SafeKeeperState;
     159                 : 
     160        32917201 :     fn deref(&self) -> &Self::Target {
     161        32917201 :         &self.state
     162        32917201 :     }
     163                 : }
     164                 : 
     165                 : #[async_trait::async_trait]
     166                 : impl Storage for FileStorage {
     167                 :     /// Persists state durably to the underlying storage.
     168                 :     ///
     169                 :     /// For a description, see <https://lwn.net/Articles/457667/>.
     170            5418 :     async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
     171            5418 :         let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
     172            5418 : 
     173            5418 :         // write data to safekeeper.control.partial
     174            5418 :         let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
     175            5463 :         let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
     176 UBC           0 :             format!(
     177               0 :                 "failed to create partial control file at: {}",
     178               0 :                 &control_partial_path
     179               0 :             )
     180 CBC        5418 :         })?;
     181            5418 :         let mut buf: Vec<u8> = Vec::new();
     182            5418 :         WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
     183            5418 :         WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
     184            5418 :         s.ser_into(&mut buf)?;
     185                 : 
     186                 :         // calculate checksum before resize
     187            5418 :         let checksum = crc32c::crc32c(&buf);
     188            5418 :         buf.extend_from_slice(&checksum.to_le_bytes());
     189            5418 : 
     190            5418 :         control_partial.write_all(&buf).await.with_context(|| {
     191 UBC           0 :             format!(
     192               0 :                 "failed to write safekeeper state into control file at: {}",
     193               0 :                 control_partial_path
     194               0 :             )
     195 CBC        5418 :         })?;
     196            5418 :         control_partial.flush().await.with_context(|| {
     197 UBC           0 :             format!(
     198               0 :                 "failed to flush safekeeper state into control file at: {}",
     199               0 :                 control_partial_path
     200               0 :             )
     201 CBC        5418 :         })?;
     202                 : 
     203                 :         // fsync the file
     204            5418 :         if !self.conf.no_sync {
     205               2 :             control_partial.sync_all().await.with_context(|| {
     206 UBC           0 :                 format!(
     207               0 :                     "failed to sync partial control file at {}",
     208               0 :                     control_partial_path
     209               0 :                 )
     210 CBC           2 :             })?;
     211            5416 :         }
     212                 : 
     213            5418 :         let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
     214            5418 : 
     215            5418 :         // rename should be atomic
     216            5418 :         fs::rename(&control_partial_path, &control_path).await?;
     217                 :         // this sync is not required by any standard but postgres does this (see durable_rename)
     218            5418 :         if !self.conf.no_sync {
     219               2 :             let new_f = File::open(&control_path).await?;
     220               2 :             new_f
     221               2 :                 .sync_all()
     222               2 :                 .await
     223               2 :                 .with_context(|| format!("failed to sync control file at: {}", &control_path))?;
     224                 : 
     225                 :             // fsync the directory (linux specific)
     226               2 :             let tli_dir = File::open(&self.timeline_dir).await?;
     227               2 :             tli_dir
     228               2 :                 .sync_all()
     229               2 :                 .await
     230               2 :                 .context("failed to sync control file directory")?;
     231            5416 :         }
     232                 : 
     233                 :         // update internal state
     234            5418 :         self.state = s.clone();
     235            5418 :         Ok(())
     236           10836 :     }
     237                 : 
     238            1363 :     fn last_persist_at(&self) -> Instant {
     239            1363 :         self.last_persist_at
     240            1363 :     }
     241                 : }
     242                 : 
     243                 : #[cfg(test)]
     244                 : mod test {
     245                 :     use super::FileStorage;
     246                 :     use super::*;
     247                 :     use crate::{safekeeper::SafeKeeperState, SafeKeeperConf};
     248                 :     use anyhow::Result;
     249                 :     use utils::{id::TenantTimelineId, lsn::Lsn};
     250                 : 
     251               2 :     fn stub_conf() -> SafeKeeperConf {
     252               2 :         let workdir = camino_tempfile::tempdir().unwrap().into_path();
     253               2 :         SafeKeeperConf {
     254               2 :             workdir,
     255               2 :             ..SafeKeeperConf::dummy()
     256               2 :         }
     257               2 :     }
     258                 : 
     259               2 :     async fn load_from_control_file(
     260               2 :         conf: &SafeKeeperConf,
     261               2 :         ttid: &TenantTimelineId,
     262               2 :     ) -> Result<(FileStorage, SafeKeeperState)> {
     263               2 :         fs::create_dir_all(conf.timeline_dir(ttid))
     264               2 :             .await
     265               2 :             .expect("failed to create timeline dir");
     266               2 :         Ok((
     267               2 :             FileStorage::restore_new(ttid, conf)?,
     268               1 :             FileStorage::load_control_file_conf(conf, ttid)?,
     269                 :         ))
     270               2 :     }
     271                 : 
     272               2 :     async fn create(
     273               2 :         conf: &SafeKeeperConf,
     274               2 :         ttid: &TenantTimelineId,
     275               2 :     ) -> Result<(FileStorage, SafeKeeperState)> {
     276               2 :         fs::create_dir_all(conf.timeline_dir(ttid))
     277               2 :             .await
     278               2 :             .expect("failed to create timeline dir");
     279               2 :         let state = SafeKeeperState::empty();
     280               2 :         let storage = FileStorage::create_new(ttid, conf, state.clone())?;
     281               2 :         Ok((storage, state))
     282               2 :     }
     283                 : 
     284               1 :     #[tokio::test]
     285               1 :     async fn test_read_write_safekeeper_state() {
     286               1 :         let conf = stub_conf();
     287               1 :         let ttid = TenantTimelineId::generate();
     288                 :         {
     289               1 :             let (mut storage, mut state) =
     290               1 :                 create(&conf, &ttid).await.expect("failed to create state");
     291               1 :             // change something
     292               1 :             state.commit_lsn = Lsn(42);
     293               1 :             storage
     294               1 :                 .persist(&state)
     295               8 :                 .await
     296               1 :                 .expect("failed to persist state");
     297                 :         }
     298                 : 
     299               1 :         let (_, state) = load_from_control_file(&conf, &ttid)
     300               1 :             .await
     301               1 :             .expect("failed to read state");
     302               1 :         assert_eq!(state.commit_lsn, Lsn(42));
     303                 :     }
     304                 : 
     305               1 :     #[tokio::test]
     306               1 :     async fn test_safekeeper_state_checksum_mismatch() {
     307               1 :         let conf = stub_conf();
     308               1 :         let ttid = TenantTimelineId::generate();
     309                 :         {
     310               1 :             let (mut storage, mut state) =
     311               1 :                 create(&conf, &ttid).await.expect("failed to read state");
     312               1 : 
     313               1 :             // change something
     314               1 :             state.commit_lsn = Lsn(42);
     315               1 :             storage
     316               1 :                 .persist(&state)
     317               8 :                 .await
     318               1 :                 .expect("failed to persist state");
     319               1 :         }
     320               1 :         let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
     321               1 :         let mut data = fs::read(&control_path).await.unwrap();
     322               1 :         data[0] += 1; // change the first byte of the file to fail checksum validation
     323               1 :         fs::write(&control_path, &data)
     324               1 :             .await
     325               1 :             .expect("failed to write control file");
     326               1 : 
     327               1 :         match load_from_control_file(&conf, &ttid).await {
     328               1 :             Err(err) => assert!(err
     329               1 :                 .to_string()
     330               1 :                 .contains("safekeeper control file checksum mismatch")),
     331 UBC           0 :             Ok(_) => panic!("expected error"),
     332                 :         }
     333                 :     }
     334                 : }
        

Generated by: LCOV version 2.1-beta