LCOV - code coverage report
Current view: top level - safekeeper/src - control_file.rs (source / functions) Coverage Total Hit
Test: 903780b8ddc62f532be8f220102da7b91c63a235.info Lines: 83.5 % 212 177
Test Date: 2024-10-25 10:10:57 Functions: 72.0 % 25 18

            Line data    Source code
       1              : //! Control file serialization, deserialization and persistence.
       2              : 
       3              : use anyhow::{bail, ensure, Context, Result};
       4              : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
       5              : use camino::{Utf8Path, Utf8PathBuf};
       6              : use tokio::fs::File;
       7              : use tokio::io::AsyncWriteExt;
       8              : use utils::crashsafe::durable_rename;
       9              : 
      10              : use std::future::Future;
      11              : use std::io::Read;
      12              : use std::ops::Deref;
      13              : use std::path::Path;
      14              : use std::time::Instant;
      15              : 
      16              : use crate::control_file_upgrade::downgrade_v9_to_v8;
      17              : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
      18              : use crate::state::{EvictionState, TimelinePersistentState};
      19              : use crate::{control_file_upgrade::upgrade_control_file, timeline::get_timeline_dir};
      20              : use utils::{bin_ser::LeSer, id::TenantTimelineId};
      21              : 
      22              : use crate::SafeKeeperConf;
      23              : 
      24              : pub const SK_MAGIC: u32 = 0xcafeceefu32;
      25              : pub const SK_FORMAT_VERSION: u32 = 9;
      26              : 
      27              : // contains persistent metadata for safekeeper
      28              : pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
      29              : // needed to atomically update the state using `rename`
      30              : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
      31              : pub const CHECKSUM_SIZE: usize = size_of::<u32>();
      32              : 
      33              : /// Storage should keep actual state inside of it. It should implement Deref
      34              : /// trait to access state fields and have persist method for updating that state.
      35              : pub trait Storage: Deref<Target = TimelinePersistentState> {
      36              :     /// Persist safekeeper state on disk and update internal state.
      37              :     fn persist(&mut self, s: &TimelinePersistentState) -> impl Future<Output = Result<()>> + Send;
      38              : 
      39              :     /// Timestamp of last persist.
      40              :     fn last_persist_at(&self) -> Instant;
      41              : }
      42              : 
      43              : #[derive(Debug)]
      44              : pub struct FileStorage {
      45              :     // save timeline dir to avoid reconstructing it every time
      46              :     timeline_dir: Utf8PathBuf,
      47              :     no_sync: bool,
      48              : 
      49              :     /// Last state persisted to disk.
      50              :     state: TimelinePersistentState,
      51              :     /// Not preserved across restarts.
      52              :     last_persist_at: Instant,
      53              : }
      54              : 
      55              : impl FileStorage {
      56              :     /// Initialize storage by loading state from disk.
      57            2 :     pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
      58            2 :         let timeline_dir = get_timeline_dir(conf, ttid);
      59            2 :         let state = Self::load_control_file_from_dir(&timeline_dir)?;
      60              : 
      61            1 :         Ok(FileStorage {
      62            1 :             timeline_dir,
      63            1 :             no_sync: conf.no_sync,
      64            1 :             state,
      65            1 :             last_persist_at: Instant::now(),
      66            1 :         })
      67            2 :     }
      68              : 
      69              :     /// Create and reliably persist new control file at given location.
      70              :     ///
      71              :     /// Note: we normally call this in temp directory for atomic init, so
      72              :     /// interested in FileStorage as a result only in tests.
      73            2 :     pub async fn create_new(
      74            2 :         dir: Utf8PathBuf,
      75            2 :         conf: &SafeKeeperConf,
      76            2 :         state: TimelinePersistentState,
      77            2 :     ) -> Result<FileStorage> {
      78            2 :         // we don't support creating new timelines in offloaded state
      79            2 :         assert!(matches!(state.eviction_state, EvictionState::Present));
      80              : 
      81            2 :         let mut store = FileStorage {
      82            2 :             timeline_dir: dir,
      83            2 :             no_sync: conf.no_sync,
      84            2 :             state: state.clone(),
      85            2 :             last_persist_at: Instant::now(),
      86            2 :         };
      87           18 :         store.persist(&state).await?;
      88            2 :         Ok(store)
      89            2 :     }
      90              : 
      91              :     /// Check the magic/version in the on-disk data and deserialize it, if possible.
      92            2 :     fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
      93              :         // Read the version independent part
      94            2 :         let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
      95            2 :         if magic != SK_MAGIC {
      96            0 :             bail!(
      97            0 :                 "bad control file magic: {:X}, expected {:X}",
      98            0 :                 magic,
      99            0 :                 SK_MAGIC
     100            0 :             );
     101            2 :         }
     102            2 :         let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
     103            2 :         if version == SK_FORMAT_VERSION {
     104            0 :             let res = TimelinePersistentState::des(buf)?;
     105            0 :             return Ok(res);
     106            2 :         }
     107            2 :         // try to upgrade
     108            2 :         upgrade_control_file(buf, version)
     109            2 :     }
     110              : 
     111              :     /// Load control file from given directory.
     112            3 :     fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
     113            3 :         let path = timeline_dir.join(CONTROL_FILE_NAME);
     114            3 :         Self::load_control_file(path)
     115            3 :     }
     116              : 
     117              :     /// Read in the control file.
     118            3 :     pub fn load_control_file<P: AsRef<Path>>(
     119            3 :         control_file_path: P,
     120            3 :     ) -> Result<TimelinePersistentState> {
     121            3 :         let mut control_file = std::fs::OpenOptions::new()
     122            3 :             .read(true)
     123            3 :             .write(true)
     124            3 :             .open(&control_file_path)
     125            3 :             .with_context(|| {
     126            0 :                 format!(
     127            0 :                     "failed to open control file at {}",
     128            0 :                     control_file_path.as_ref().display(),
     129            0 :                 )
     130            3 :             })?;
     131              : 
     132            3 :         let mut buf = Vec::new();
     133            3 :         control_file
     134            3 :             .read_to_end(&mut buf)
     135            3 :             .context("failed to read control file")?;
     136              : 
     137            3 :         let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
     138              : 
     139            3 :         let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
     140            3 :             buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
     141            3 :         let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
     142            3 : 
     143            3 :         ensure!(
     144            3 :             calculated_checksum == expected_checksum,
     145            1 :             format!(
     146            1 :                 "safekeeper control file checksum mismatch: expected {} got {}",
     147            1 :                 expected_checksum, calculated_checksum
     148            1 :             )
     149              :         );
     150              : 
     151            2 :         let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
     152            2 :             .with_context(|| {
     153            0 :                 format!(
     154            0 :                     "while reading control file {}",
     155            0 :                     control_file_path.as_ref().display(),
     156            0 :                 )
     157            2 :             })?;
     158            2 :         Ok(state)
     159            3 :     }
     160              : }
     161              : 
     162              : impl Deref for FileStorage {
     163              :     type Target = TimelinePersistentState;
     164              : 
     165            0 :     fn deref(&self) -> &Self::Target {
     166            0 :         &self.state
     167            0 :     }
     168              : }
     169              : 
     170              : impl TimelinePersistentState {
     171            4 :     pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
     172            4 :         let mut buf: Vec<u8> = Vec::new();
     173            4 :         WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
     174              : 
     175            4 :         if self.eviction_state == EvictionState::Present {
     176            4 :             // temp hack for forward compatibility
     177            4 :             const PREV_FORMAT_VERSION: u32 = 8;
     178            4 :             let prev = downgrade_v9_to_v8(self);
     179            4 :             WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
     180            4 :             prev.ser_into(&mut buf)?;
     181              :         } else {
     182              :             // otherwise, we write the current format version
     183            0 :             WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
     184            0 :             self.ser_into(&mut buf)?;
     185              :         }
     186              : 
     187              :         // calculate checksum before resize
     188            4 :         let checksum = crc32c::crc32c(&buf);
     189            4 :         buf.extend_from_slice(&checksum.to_le_bytes());
     190            4 :         Ok(buf)
     191            4 :     }
     192              : }
     193              : 
     194              : impl Storage for FileStorage {
     195              :     /// Persists state durably to the underlying storage.
     196            4 :     async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
     197            4 :         let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
     198            4 : 
     199            4 :         // write data to safekeeper.control.partial
     200            4 :         let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
     201            4 :         let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
     202            0 :             format!(
     203            0 :                 "failed to create partial control file at: {}",
     204            0 :                 &control_partial_path
     205            0 :             )
     206            4 :         })?;
     207              : 
     208            4 :         let buf: Vec<u8> = s.write_to_buf()?;
     209              : 
     210            4 :         control_partial.write_all(&buf).await.with_context(|| {
     211            0 :             format!(
     212            0 :                 "failed to write safekeeper state into control file at: {}",
     213            0 :                 control_partial_path
     214            0 :             )
     215            4 :         })?;
     216            4 :         control_partial.flush().await.with_context(|| {
     217            0 :             format!(
     218            0 :                 "failed to flush safekeeper state into control file at: {}",
     219            0 :                 control_partial_path
     220            0 :             )
     221            4 :         })?;
     222              : 
     223            4 :         let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
     224           28 :         durable_rename(&control_partial_path, &control_path, !self.no_sync).await?;
     225              : 
     226              :         // update internal state
     227            4 :         self.state = s.clone();
     228            4 :         Ok(())
     229            4 :     }
     230              : 
     231            0 :     fn last_persist_at(&self) -> Instant {
     232            0 :         self.last_persist_at
     233            0 :     }
     234              : }
     235              : 
     236              : #[cfg(test)]
     237              : mod test {
     238              :     use super::*;
     239              :     use tokio::fs;
     240              :     use utils::lsn::Lsn;
     241              : 
     242            2 :     fn stub_conf() -> SafeKeeperConf {
     243            2 :         let workdir = camino_tempfile::tempdir().unwrap().into_path();
     244            2 :         SafeKeeperConf {
     245            2 :             workdir,
     246            2 :             ..SafeKeeperConf::dummy()
     247            2 :         }
     248            2 :     }
     249              : 
     250            2 :     async fn load_from_control_file(
     251            2 :         conf: &SafeKeeperConf,
     252            2 :         ttid: &TenantTimelineId,
     253            2 :     ) -> Result<(FileStorage, TimelinePersistentState)> {
     254            2 :         let timeline_dir = get_timeline_dir(conf, ttid);
     255            2 :         fs::create_dir_all(&timeline_dir)
     256            2 :             .await
     257            2 :             .expect("failed to create timeline dir");
     258            2 :         Ok((
     259            2 :             FileStorage::restore_new(ttid, conf)?,
     260            1 :             FileStorage::load_control_file_from_dir(&timeline_dir)?,
     261              :         ))
     262            2 :     }
     263              : 
     264            2 :     async fn create(
     265            2 :         conf: &SafeKeeperConf,
     266            2 :         ttid: &TenantTimelineId,
     267            2 :     ) -> Result<(FileStorage, TimelinePersistentState)> {
     268            2 :         let timeline_dir = get_timeline_dir(conf, ttid);
     269            2 :         fs::create_dir_all(&timeline_dir)
     270            2 :             .await
     271            2 :             .expect("failed to create timeline dir");
     272            2 :         let state = TimelinePersistentState::empty();
     273           18 :         let storage = FileStorage::create_new(timeline_dir, conf, state.clone()).await?;
     274            2 :         Ok((storage, state))
     275            2 :     }
     276              : 
     277              :     #[tokio::test]
     278            1 :     async fn test_read_write_safekeeper_state() {
     279            1 :         let conf = stub_conf();
     280            1 :         let ttid = TenantTimelineId::generate();
     281            1 :         {
     282            1 :             let (mut storage, mut state) =
     283           10 :                 create(&conf, &ttid).await.expect("failed to create state");
     284            1 :             // change something
     285            1 :             state.commit_lsn = Lsn(42);
     286            1 :             storage
     287            1 :                 .persist(&state)
     288            9 :                 .await
     289            1 :                 .expect("failed to persist state");
     290            1 :         }
     291            1 : 
     292            1 :         let (_, state) = load_from_control_file(&conf, &ttid)
     293            1 :             .await
     294            1 :             .expect("failed to read state");
     295            1 :         assert_eq!(state.commit_lsn, Lsn(42));
     296            1 :     }
     297              : 
     298              :     #[tokio::test]
     299            1 :     async fn test_safekeeper_state_checksum_mismatch() {
     300            1 :         let conf = stub_conf();
     301            1 :         let ttid = TenantTimelineId::generate();
     302            1 :         {
     303            1 :             let (mut storage, mut state) =
     304           10 :                 create(&conf, &ttid).await.expect("failed to read state");
     305            1 : 
     306            1 :             // change something
     307            1 :             state.commit_lsn = Lsn(42);
     308            1 :             storage
     309            1 :                 .persist(&state)
     310            9 :                 .await
     311            1 :                 .expect("failed to persist state");
     312            1 :         }
     313            1 :         let control_path = get_timeline_dir(&conf, &ttid).join(CONTROL_FILE_NAME);
     314            1 :         let mut data = fs::read(&control_path).await.unwrap();
     315            1 :         data[0] += 1; // change the first byte of the file to fail checksum validation
     316            1 :         fs::write(&control_path, &data)
     317            1 :             .await
     318            1 :             .expect("failed to write control file");
     319            1 : 
     320            1 :         match load_from_control_file(&conf, &ttid).await {
     321            1 :             Err(err) => assert!(err
     322            1 :                 .to_string()
     323            1 :                 .contains("safekeeper control file checksum mismatch")),
     324            1 :             Ok(_) => panic!("expected error"),
     325            1 :         }
     326            1 :     }
     327              : }
        

Generated by: LCOV version 2.1-beta