LCOV - code coverage report
Current view: top level - safekeeper/src - control_file.rs (source / functions) Coverage Total Hit
Test: 5445d246133daeceb0507e6cc0797ab7c1c70cb8.info Lines: 79.0 % 176 139
Test Date: 2025-03-12 18:05:02 Functions: 70.0 % 20 14

            Line data    Source code
       1              : //! Control file serialization, deserialization and persistence.
       2              : 
       3              : use std::future::Future;
       4              : use std::io::Read;
       5              : use std::ops::Deref;
       6              : use std::path::Path;
       7              : use std::time::Instant;
       8              : 
       9              : use anyhow::{Context, Result, bail, ensure};
      10              : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
      11              : use camino::{Utf8Path, Utf8PathBuf};
      12              : use safekeeper_api::membership::INVALID_GENERATION;
      13              : use tokio::fs::File;
      14              : use tokio::io::AsyncWriteExt;
      15              : use utils::bin_ser::LeSer;
      16              : use utils::crashsafe::durable_rename;
      17              : 
      18              : use crate::control_file_upgrade::{downgrade_v10_to_v9, upgrade_control_file};
      19              : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
      20              : use crate::state::{EvictionState, TimelinePersistentState};
      21              : 
      22              : pub const SK_MAGIC: u32 = 0xcafeceefu32;
      23              : pub const SK_FORMAT_VERSION: u32 = 10;
      24              : 
      25              : // contains persistent metadata for safekeeper
      26              : pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
      27              : // needed to atomically update the state using `rename`
      28              : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
      29              : pub const CHECKSUM_SIZE: usize = size_of::<u32>();
      30              : 
      31              : /// Storage should keep actual state inside of it. It should implement Deref
      32              : /// trait to access state fields and have persist method for updating that state.
      33              : pub trait Storage: Deref<Target = TimelinePersistentState> {
      34              :     /// Persist safekeeper state on disk and update internal state.
      35              :     fn persist(&mut self, s: &TimelinePersistentState) -> impl Future<Output = Result<()>> + Send;
      36              : 
      37              :     /// Timestamp of last persist.
      38              :     fn last_persist_at(&self) -> Instant;
      39              : }
      40              : 
      41              : #[derive(Debug)]
      42              : pub struct FileStorage {
      43              :     // save timeline dir to avoid reconstructing it every time
      44              :     timeline_dir: Utf8PathBuf,
      45              :     no_sync: bool,
      46              : 
      47              :     /// Last state persisted to disk.
      48              :     state: TimelinePersistentState,
      49              :     /// Not preserved across restarts.
      50              :     last_persist_at: Instant,
      51              : }
      52              : 
      53              : impl FileStorage {
      54              :     /// Initialize storage by loading state from disk.
      55            0 :     pub fn restore_new(timeline_dir: &Utf8Path, no_sync: bool) -> Result<FileStorage> {
      56            0 :         let state = Self::load_control_file_from_dir(timeline_dir)?;
      57              : 
      58            0 :         Ok(FileStorage {
      59            0 :             timeline_dir: timeline_dir.to_path_buf(),
      60            0 :             no_sync,
      61            0 :             state,
      62            0 :             last_persist_at: Instant::now(),
      63            0 :         })
      64            0 :     }
      65              : 
      66              :     /// Create and reliably persist new control file at given location.
      67              :     ///
      68              :     /// Note: we normally call this in temp directory for atomic init, so
      69              :     /// interested in FileStorage as a result only in tests.
      70            7 :     pub async fn create_new(
      71            7 :         timeline_dir: &Utf8Path,
      72            7 :         state: TimelinePersistentState,
      73            7 :         no_sync: bool,
      74            7 :     ) -> Result<FileStorage> {
      75            7 :         // we don't support creating new timelines in offloaded state
      76            7 :         assert!(matches!(state.eviction_state, EvictionState::Present));
      77              : 
      78            7 :         let mut store = FileStorage {
      79            7 :             timeline_dir: timeline_dir.to_path_buf(),
      80            7 :             no_sync,
      81            7 :             state: state.clone(),
      82            7 :             last_persist_at: Instant::now(),
      83            7 :         };
      84            7 :         store.persist(&state).await?;
      85            7 :         Ok(store)
      86            7 :     }
      87              : 
      88              :     /// Check the magic/version in the on-disk data and deserialize it, if possible.
      89            1 :     fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
      90              :         // Read the version independent part
      91            1 :         let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
      92            1 :         if magic != SK_MAGIC {
      93            0 :             bail!(
      94            0 :                 "bad control file magic: {:X}, expected {:X}",
      95            0 :                 magic,
      96            0 :                 SK_MAGIC
      97            0 :             );
      98            1 :         }
      99            1 :         let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
     100            1 :         if version == SK_FORMAT_VERSION {
     101            1 :             let res = TimelinePersistentState::des(buf)?;
     102            1 :             return Ok(res);
     103            0 :         }
     104            0 :         // try to upgrade
     105            0 :         upgrade_control_file(buf, version)
     106            1 :     }
     107              : 
     108              :     /// Load control file from given directory.
     109            2 :     fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
     110            2 :         let path = timeline_dir.join(CONTROL_FILE_NAME);
     111            2 :         Self::load_control_file(path)
     112            2 :     }
     113              : 
     114              :     /// Read in the control file.
     115            2 :     pub fn load_control_file<P: AsRef<Path>>(
     116            2 :         control_file_path: P,
     117            2 :     ) -> Result<TimelinePersistentState> {
     118            2 :         let mut control_file = std::fs::OpenOptions::new()
     119            2 :             .read(true)
     120            2 :             .write(true)
     121            2 :             .open(&control_file_path)
     122            2 :             .with_context(|| {
     123            0 :                 format!(
     124            0 :                     "failed to open control file at {}",
     125            0 :                     control_file_path.as_ref().display(),
     126            0 :                 )
     127            2 :             })?;
     128              : 
     129            2 :         let mut buf = Vec::new();
     130            2 :         control_file
     131            2 :             .read_to_end(&mut buf)
     132            2 :             .context("failed to read control file")?;
     133              : 
     134            2 :         let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
     135              : 
     136            2 :         let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
     137            2 :             buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
     138            2 :         let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
     139            2 : 
     140            2 :         ensure!(
     141            2 :             calculated_checksum == expected_checksum,
     142            1 :             format!(
     143            1 :                 "safekeeper control file checksum mismatch: expected {} got {}",
     144            1 :                 expected_checksum, calculated_checksum
     145            1 :             )
     146              :         );
     147              : 
     148            1 :         let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
     149            1 :             .with_context(|| {
     150            0 :                 format!(
     151            0 :                     "while reading control file {}",
     152            0 :                     control_file_path.as_ref().display(),
     153            0 :                 )
     154            1 :             })?;
     155            1 :         Ok(state)
     156            2 :     }
     157              : }
     158              : 
     159              : impl Deref for FileStorage {
     160              :     type Target = TimelinePersistentState;
     161              : 
     162         7853 :     fn deref(&self) -> &Self::Target {
     163         7853 :         &self.state
     164         7853 :     }
     165              : }
     166              : 
     167              : impl TimelinePersistentState {
     168           29 :     pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
     169           29 :         let mut buf: Vec<u8> = Vec::new();
     170           29 :         WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
     171              : 
     172           29 :         if self.mconf.generation == INVALID_GENERATION {
     173           27 :             // Temp hack for forward compatibility test: in case of none
     174           27 :             // configuration save cfile in previous v9 format.
     175           27 :             const PREV_FORMAT_VERSION: u32 = 9;
     176           27 :             let prev = downgrade_v10_to_v9(self);
     177           27 :             WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
     178           27 :             prev.ser_into(&mut buf)?;
     179              :         } else {
     180              :             // otherwise, we write the current format version
     181            2 :             WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
     182            2 :             self.ser_into(&mut buf)?;
     183              :         }
     184              : 
     185              :         // calculate checksum before resize
     186           29 :         let checksum = crc32c::crc32c(&buf);
     187           29 :         buf.extend_from_slice(&checksum.to_le_bytes());
     188           29 :         Ok(buf)
     189           29 :     }
     190              : }
     191              : 
     192              : impl Storage for FileStorage {
     193              :     /// Persists state durably to the underlying storage.
     194           29 :     async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
     195           29 :         let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
     196           29 : 
     197           29 :         // write data to safekeeper.control.partial
     198           29 :         let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
     199           29 :         let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
     200            0 :             format!(
     201            0 :                 "failed to create partial control file at: {}",
     202            0 :                 &control_partial_path
     203            0 :             )
     204           29 :         })?;
     205              : 
     206           29 :         let buf: Vec<u8> = s.write_to_buf()?;
     207              : 
     208           29 :         control_partial.write_all(&buf).await.with_context(|| {
     209            0 :             format!(
     210            0 :                 "failed to write safekeeper state into control file at: {}",
     211            0 :                 control_partial_path
     212            0 :             )
     213           29 :         })?;
     214           29 :         control_partial.flush().await.with_context(|| {
     215            0 :             format!(
     216            0 :                 "failed to flush safekeeper state into control file at: {}",
     217            0 :                 control_partial_path
     218            0 :             )
     219           29 :         })?;
     220              : 
     221           29 :         let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
     222           29 :         durable_rename(&control_partial_path, &control_path, !self.no_sync).await?;
     223              : 
     224              :         // update internal state
     225           28 :         self.state = s.clone();
     226           28 :         Ok(())
     227           28 :     }
     228              : 
     229           35 :     fn last_persist_at(&self) -> Instant {
     230           35 :         self.last_persist_at
     231           35 :     }
     232              : }
     233              : 
     234              : #[cfg(test)]
     235              : mod test {
     236              :     use safekeeper_api::membership::{Configuration, MemberSet, SafekeeperGeneration};
     237              :     use tokio::fs;
     238              :     use utils::lsn::Lsn;
     239              : 
     240              :     use super::*;
     241              : 
     242              :     const NO_SYNC: bool = true;
     243              : 
     244              :     #[tokio::test]
     245            1 :     async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
     246            1 :         let tempdir = camino_tempfile::tempdir()?;
     247            1 :         let mut state = TimelinePersistentState::empty();
     248            1 :         state.mconf = Configuration {
     249            1 :             generation: SafekeeperGeneration::new(42),
     250            1 :             members: MemberSet::empty(),
     251            1 :             new_members: None,
     252            1 :         };
     253            1 :         let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
     254            1 : 
     255            1 :         // Make a change.
     256            1 :         state.commit_lsn = Lsn(42);
     257            1 :         storage.persist(&state).await?;
     258            1 : 
     259            1 :         // Reload the state. It should match the previously persisted state.
     260            1 :         let loaded_state = FileStorage::load_control_file_from_dir(tempdir.path())?;
     261            1 :         assert_eq!(loaded_state, state);
     262            1 :         Ok(())
     263            1 :     }
     264              : 
     265              :     #[tokio::test]
     266            1 :     async fn test_safekeeper_state_checksum_mismatch() -> anyhow::Result<()> {
     267            1 :         let tempdir = camino_tempfile::tempdir()?;
     268            1 :         let mut state = TimelinePersistentState::empty();
     269            1 :         let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
     270            1 : 
     271            1 :         // Make a change.
     272            1 :         state.commit_lsn = Lsn(42);
     273            1 :         storage.persist(&state).await?;
     274            1 : 
     275            1 :         // Change the first byte to fail checksum validation.
     276            1 :         let ctrl_path = tempdir.path().join(CONTROL_FILE_NAME);
     277            1 :         let mut data = fs::read(&ctrl_path).await?;
     278            1 :         data[0] += 1;
     279            1 :         fs::write(&ctrl_path, &data).await?;
     280            1 : 
     281            1 :         // Loading the file should fail checksum validation.
     282            1 :         if let Err(err) = FileStorage::load_control_file_from_dir(tempdir.path()) {
     283            1 :             assert!(err.to_string().contains("control file checksum mismatch"))
     284            1 :         } else {
     285            1 :             panic!("expected checksum error")
     286            1 :         }
     287            1 :         Ok(())
     288            1 :     }
     289              : }
        

Generated by: LCOV version 2.1-beta