LCOV - code coverage report
Current view: top level - safekeeper/src - control_file.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 85.9 % 206 177
Test Date: 2024-02-12 20:26:03 Functions: 75.0 % 24 18

            Line data    Source code
       1              : //! Control file serialization, deserialization and persistence.
       2              : 
       3              : use anyhow::{bail, ensure, Context, Result};
       4              : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
       5              : use camino::Utf8PathBuf;
       6              : use tokio::fs::File;
       7              : use tokio::io::AsyncWriteExt;
       8              : use utils::crashsafe::durable_rename;
       9              : 
      10              : use std::io::Read;
      11              : use std::ops::Deref;
      12              : use std::path::Path;
      13              : use std::time::Instant;
      14              : 
      15              : use crate::control_file_upgrade::upgrade_control_file;
      16              : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
      17              : use crate::state::TimelinePersistentState;
      18              : use utils::{bin_ser::LeSer, id::TenantTimelineId};
      19              : 
      20              : use crate::SafeKeeperConf;
      21              : 
      22              : use std::convert::TryInto;
      23              : 
      24              : pub const SK_MAGIC: u32 = 0xcafeceefu32;
      25              : pub const SK_FORMAT_VERSION: u32 = 7;
      26              : 
      27              : // contains persistent metadata for safekeeper
      28              : const CONTROL_FILE_NAME: &str = "safekeeper.control";
      29              : // needed to atomically update the state using `rename`
      30              : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
      31              : pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
      32              : 
      33              : /// Storage should keep actual state inside of it. It should implement Deref
      34              : /// trait to access state fields and have persist method for updating that state.
      35              : #[async_trait::async_trait]
      36              : pub trait Storage: Deref<Target = TimelinePersistentState> {
      37              :     /// Persist safekeeper state on disk and update internal state.
      38              :     async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()>;
      39              : 
      40              :     /// Timestamp of last persist.
      41              :     fn last_persist_at(&self) -> Instant;
      42              : }
      43              : 
      44            0 : #[derive(Debug)]
      45              : pub struct FileStorage {
      46              :     // save timeline dir to avoid reconstructing it every time
      47              :     timeline_dir: Utf8PathBuf,
      48              :     conf: SafeKeeperConf,
      49              : 
      50              :     /// Last state persisted to disk.
      51              :     state: TimelinePersistentState,
      52              :     /// Not preserved across restarts.
      53              :     last_persist_at: Instant,
      54              : }
      55              : 
      56              : impl FileStorage {
      57              :     /// Initialize storage by loading state from disk.
      58          137 :     pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
      59          137 :         let timeline_dir = conf.timeline_dir(ttid);
      60              : 
      61          137 :         let state = Self::load_control_file_conf(conf, ttid)?;
      62              : 
      63          135 :         Ok(FileStorage {
      64          135 :             timeline_dir,
      65          135 :             conf: conf.clone(),
      66          135 :             state,
      67          135 :             last_persist_at: Instant::now(),
      68          135 :         })
      69          137 :     }
      70              : 
      71              :     /// Create file storage for a new timeline, but don't persist it yet.
      72          531 :     pub fn create_new(
      73          531 :         timeline_dir: Utf8PathBuf,
      74          531 :         conf: &SafeKeeperConf,
      75          531 :         state: TimelinePersistentState,
      76          531 :     ) -> Result<FileStorage> {
      77          531 :         let store = FileStorage {
      78          531 :             timeline_dir,
      79          531 :             conf: conf.clone(),
      80          531 :             state,
      81          531 :             last_persist_at: Instant::now(),
      82          531 :         };
      83          531 : 
      84          531 :         Ok(store)
      85          531 :     }
      86              : 
      87              :     /// Check the magic/version in the on-disk data and deserialize it, if possible.
      88          186 :     fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
      89              :         // Read the version independent part
      90          186 :         let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
      91          186 :         if magic != SK_MAGIC {
      92            0 :             bail!(
      93            0 :                 "bad control file magic: {:X}, expected {:X}",
      94            0 :                 magic,
      95            0 :                 SK_MAGIC
      96            0 :             );
      97          186 :         }
      98          186 :         let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
      99          186 :         if version == SK_FORMAT_VERSION {
     100          186 :             let res = TimelinePersistentState::des(buf)?;
     101          186 :             return Ok(res);
     102            0 :         }
     103            0 :         // try to upgrade
     104            0 :         upgrade_control_file(buf, version)
     105          186 :     }
     106              : 
     107              :     /// Load control file for given ttid at path specified by conf.
     108          139 :     pub fn load_control_file_conf(
     109          139 :         conf: &SafeKeeperConf,
     110          139 :         ttid: &TenantTimelineId,
     111          139 :     ) -> Result<TimelinePersistentState> {
     112          139 :         let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME);
     113          139 :         Self::load_control_file(path)
     114          139 :     }
     115              : 
     116              :     /// Read in the control file.
     117          188 :     pub fn load_control_file<P: AsRef<Path>>(
     118          188 :         control_file_path: P,
     119          188 :     ) -> Result<TimelinePersistentState> {
     120          188 :         let mut control_file = std::fs::OpenOptions::new()
     121          188 :             .read(true)
     122          188 :             .write(true)
     123          188 :             .open(&control_file_path)
     124          188 :             .with_context(|| {
     125            0 :                 format!(
     126            0 :                     "failed to open control file at {}",
     127            0 :                     control_file_path.as_ref().display(),
     128            0 :                 )
     129          188 :             })?;
     130              : 
     131          188 :         let mut buf = Vec::new();
     132          188 :         control_file
     133          188 :             .read_to_end(&mut buf)
     134          188 :             .context("failed to read control file")?;
     135              : 
     136          188 :         let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
     137              : 
     138          188 :         let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
     139          188 :             buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
     140          188 :         let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
     141          188 : 
     142          188 :         ensure!(
     143          188 :             calculated_checksum == expected_checksum,
     144            2 :             format!(
     145            2 :                 "safekeeper control file checksum mismatch: expected {} got {}",
     146            2 :                 expected_checksum, calculated_checksum
     147            2 :             )
     148              :         );
     149              : 
     150          186 :         let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
     151          186 :             .with_context(|| {
     152            0 :                 format!(
     153            0 :                     "while reading control file {}",
     154            0 :                     control_file_path.as_ref().display(),
     155            0 :                 )
     156          186 :             })?;
     157          186 :         Ok(state)
     158          188 :     }
     159              : }
     160              : 
     161              : impl Deref for FileStorage {
     162              :     type Target = TimelinePersistentState;
     163              : 
     164     29927347 :     fn deref(&self) -> &Self::Target {
     165     29927347 :         &self.state
     166     29927347 :     }
     167              : }
     168              : 
     169              : #[async_trait::async_trait]
     170              : impl Storage for FileStorage {
     171              :     /// Persists state durably to the underlying storage.
     172              :     ///
     173              :     /// For a description, see <https://lwn.net/Articles/457667/>.
     174         4997 :     async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
     175         4997 :         let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
     176         4997 : 
     177         4997 :         // write data to safekeeper.control.partial
     178         4997 :         let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
     179         4997 :         let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
     180            0 :             format!(
     181            0 :                 "failed to create partial control file at: {}",
     182            0 :                 &control_partial_path
     183            0 :             )
     184         4997 :         })?;
     185         4997 :         let mut buf: Vec<u8> = Vec::new();
     186         4997 :         WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
     187         4997 :         WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
     188         4997 :         s.ser_into(&mut buf)?;
     189              : 
     190              :         // calculate checksum before resize
     191         4997 :         let checksum = crc32c::crc32c(&buf);
     192         4997 :         buf.extend_from_slice(&checksum.to_le_bytes());
     193         4997 : 
     194         4997 :         control_partial.write_all(&buf).await.with_context(|| {
     195            0 :             format!(
     196            0 :                 "failed to write safekeeper state into control file at: {}",
     197            0 :                 control_partial_path
     198            0 :             )
     199         4997 :         })?;
     200         4997 :         control_partial.flush().await.with_context(|| {
     201            0 :             format!(
     202            0 :                 "failed to flush safekeeper state into control file at: {}",
     203            0 :                 control_partial_path
     204            0 :             )
     205         4997 :         })?;
     206              : 
     207         4997 :         let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
     208         5060 :         durable_rename(&control_partial_path, &control_path, !self.conf.no_sync).await?;
     209              : 
     210              :         // update internal state
     211         4997 :         self.state = s.clone();
     212         4997 :         Ok(())
     213        14991 :     }
     214              : 
     215         1737 :     fn last_persist_at(&self) -> Instant {
     216         1737 :         self.last_persist_at
     217         1737 :     }
     218              : }
     219              : 
     220              : #[cfg(test)]
     221              : mod test {
     222              :     use super::FileStorage;
     223              :     use super::*;
     224              :     use crate::SafeKeeperConf;
     225              :     use anyhow::Result;
     226              :     use tokio::fs;
     227              :     use utils::{id::TenantTimelineId, lsn::Lsn};
     228              : 
     229            4 :     fn stub_conf() -> SafeKeeperConf {
     230            4 :         let workdir = camino_tempfile::tempdir().unwrap().into_path();
     231            4 :         SafeKeeperConf {
     232            4 :             workdir,
     233            4 :             ..SafeKeeperConf::dummy()
     234            4 :         }
     235            4 :     }
     236              : 
     237            4 :     async fn load_from_control_file(
     238            4 :         conf: &SafeKeeperConf,
     239            4 :         ttid: &TenantTimelineId,
     240            4 :     ) -> Result<(FileStorage, TimelinePersistentState)> {
     241            4 :         fs::create_dir_all(conf.timeline_dir(ttid))
     242            4 :             .await
     243            4 :             .expect("failed to create timeline dir");
     244            4 :         Ok((
     245            4 :             FileStorage::restore_new(ttid, conf)?,
     246            2 :             FileStorage::load_control_file_conf(conf, ttid)?,
     247              :         ))
     248            4 :     }
     249              : 
     250            4 :     async fn create(
     251            4 :         conf: &SafeKeeperConf,
     252            4 :         ttid: &TenantTimelineId,
     253            4 :     ) -> Result<(FileStorage, TimelinePersistentState)> {
     254            4 :         fs::create_dir_all(conf.timeline_dir(ttid))
     255            4 :             .await
     256            4 :             .expect("failed to create timeline dir");
     257            4 :         let state = TimelinePersistentState::empty();
     258            4 :         let timeline_dir = conf.timeline_dir(ttid);
     259            4 :         let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?;
     260            4 :         Ok((storage, state))
     261            4 :     }
     262              : 
     263            2 :     #[tokio::test]
     264            2 :     async fn test_read_write_safekeeper_state() {
     265            2 :         let conf = stub_conf();
     266            2 :         let ttid = TenantTimelineId::generate();
     267            2 :         {
     268            2 :             let (mut storage, mut state) =
     269            2 :                 create(&conf, &ttid).await.expect("failed to create state");
     270            2 :             // change something
     271            2 :             state.commit_lsn = Lsn(42);
     272            2 :             storage
     273            2 :                 .persist(&state)
     274           12 :                 .await
     275            2 :                 .expect("failed to persist state");
     276            2 :         }
     277            2 : 
     278            2 :         let (_, state) = load_from_control_file(&conf, &ttid)
     279            2 :             .await
     280            2 :             .expect("failed to read state");
     281            2 :         assert_eq!(state.commit_lsn, Lsn(42));
     282            2 :     }
     283              : 
     284            2 :     #[tokio::test]
     285            2 :     async fn test_safekeeper_state_checksum_mismatch() {
     286            2 :         let conf = stub_conf();
     287            2 :         let ttid = TenantTimelineId::generate();
     288            2 :         {
     289            2 :             let (mut storage, mut state) =
     290            2 :                 create(&conf, &ttid).await.expect("failed to read state");
     291            2 : 
     292            2 :             // change something
     293            2 :             state.commit_lsn = Lsn(42);
     294            2 :             storage
     295            2 :                 .persist(&state)
     296           18 :                 .await
     297            2 :                 .expect("failed to persist state");
     298            2 :         }
     299            2 :         let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
     300            2 :         let mut data = fs::read(&control_path).await.unwrap();
     301            2 :         data[0] += 1; // change the first byte of the file to fail checksum validation
     302            2 :         fs::write(&control_path, &data)
     303            2 :             .await
     304            2 :             .expect("failed to write control file");
     305            2 : 
     306            2 :         match load_from_control_file(&conf, &ttid).await {
     307            2 :             Err(err) => assert!(err
     308            2 :                 .to_string()
     309            2 :                 .contains("safekeeper control file checksum mismatch")),
     310            2 :             Ok(_) => panic!("expected error"),
     311            2 :         }
     312            2 :     }
     313              : }
        

Generated by: LCOV version 2.1-beta