LCOV - code coverage report
Current view: top level - safekeeper/src - control_file.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 79.0 % 176 139
Test Date: 2025-02-20 13:11:02 Functions: 70.0 % 20 14

            Line data    Source code
       1              : //! Control file serialization, deserialization and persistence.
       2              : 
       3              : use anyhow::{bail, ensure, Context, Result};
       4              : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
       5              : use camino::{Utf8Path, Utf8PathBuf};
       6              : use safekeeper_api::membership::INVALID_GENERATION;
       7              : use tokio::fs::File;
       8              : use tokio::io::AsyncWriteExt;
       9              : use utils::crashsafe::durable_rename;
      10              : 
      11              : use std::future::Future;
      12              : use std::io::Read;
      13              : use std::ops::Deref;
      14              : use std::path::Path;
      15              : use std::time::Instant;
      16              : 
      17              : use crate::control_file_upgrade::downgrade_v10_to_v9;
      18              : use crate::control_file_upgrade::upgrade_control_file;
      19              : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
      20              : use crate::state::{EvictionState, TimelinePersistentState};
      21              : use utils::bin_ser::LeSer;
      22              : 
      23              : pub const SK_MAGIC: u32 = 0xcafeceefu32;
      24              : pub const SK_FORMAT_VERSION: u32 = 10;
      25              : 
      26              : // contains persistent metadata for safekeeper
      27              : pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
      28              : // needed to atomically update the state using `rename`
      29              : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
      30              : pub const CHECKSUM_SIZE: usize = size_of::<u32>();
      31              : 
      32              : /// Storage should keep actual state inside of it. It should implement Deref
      33              : /// trait to access state fields and have persist method for updating that state.
      34              : pub trait Storage: Deref<Target = TimelinePersistentState> {
      35              :     /// Persist safekeeper state on disk and update internal state.
      36              :     fn persist(&mut self, s: &TimelinePersistentState) -> impl Future<Output = Result<()>> + Send;
      37              : 
      38              :     /// Timestamp of last persist.
      39              :     fn last_persist_at(&self) -> Instant;
      40              : }
      41              : 
      42              : #[derive(Debug)]
      43              : pub struct FileStorage {
      44              :     // save timeline dir to avoid reconstructing it every time
      45              :     timeline_dir: Utf8PathBuf,
      46              :     no_sync: bool,
      47              : 
      48              :     /// Last state persisted to disk.
      49              :     state: TimelinePersistentState,
      50              :     /// Not preserved across restarts.
      51              :     last_persist_at: Instant,
      52              : }
      53              : 
      54              : impl FileStorage {
      55              :     /// Initialize storage by loading state from disk.
      56            0 :     pub fn restore_new(timeline_dir: &Utf8Path, no_sync: bool) -> Result<FileStorage> {
      57            0 :         let state = Self::load_control_file_from_dir(timeline_dir)?;
      58              : 
      59            0 :         Ok(FileStorage {
      60            0 :             timeline_dir: timeline_dir.to_path_buf(),
      61            0 :             no_sync,
      62            0 :             state,
      63            0 :             last_persist_at: Instant::now(),
      64            0 :         })
      65            0 :     }
      66              : 
      67              :     /// Create and reliably persist new control file at given location.
      68              :     ///
      69              :     /// Note: we normally call this in temp directory for atomic init, so
      70              :     /// interested in FileStorage as a result only in tests.
      71            5 :     pub async fn create_new(
      72            5 :         timeline_dir: &Utf8Path,
      73            5 :         state: TimelinePersistentState,
      74            5 :         no_sync: bool,
      75            5 :     ) -> Result<FileStorage> {
      76            5 :         // we don't support creating new timelines in offloaded state
      77            5 :         assert!(matches!(state.eviction_state, EvictionState::Present));
      78              : 
      79            5 :         let mut store = FileStorage {
      80            5 :             timeline_dir: timeline_dir.to_path_buf(),
      81            5 :             no_sync,
      82            5 :             state: state.clone(),
      83            5 :             last_persist_at: Instant::now(),
      84            5 :         };
      85            5 :         store.persist(&state).await?;
      86            5 :         Ok(store)
      87            5 :     }
      88              : 
      89              :     /// Check the magic/version in the on-disk data and deserialize it, if possible.
      90            1 :     fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
      91              :         // Read the version independent part
      92            1 :         let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
      93            1 :         if magic != SK_MAGIC {
      94            0 :             bail!(
      95            0 :                 "bad control file magic: {:X}, expected {:X}",
      96            0 :                 magic,
      97            0 :                 SK_MAGIC
      98            0 :             );
      99            1 :         }
     100            1 :         let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
     101            1 :         if version == SK_FORMAT_VERSION {
     102            1 :             let res = TimelinePersistentState::des(buf)?;
     103            1 :             return Ok(res);
     104            0 :         }
     105            0 :         // try to upgrade
     106            0 :         upgrade_control_file(buf, version)
     107            1 :     }
     108              : 
     109              :     /// Load control file from given directory.
     110            2 :     fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
     111            2 :         let path = timeline_dir.join(CONTROL_FILE_NAME);
     112            2 :         Self::load_control_file(path)
     113            2 :     }
     114              : 
     115              :     /// Read in the control file.
     116            2 :     pub fn load_control_file<P: AsRef<Path>>(
     117            2 :         control_file_path: P,
     118            2 :     ) -> Result<TimelinePersistentState> {
     119            2 :         let mut control_file = std::fs::OpenOptions::new()
     120            2 :             .read(true)
     121            2 :             .write(true)
     122            2 :             .open(&control_file_path)
     123            2 :             .with_context(|| {
     124            0 :                 format!(
     125            0 :                     "failed to open control file at {}",
     126            0 :                     control_file_path.as_ref().display(),
     127            0 :                 )
     128            2 :             })?;
     129              : 
     130            2 :         let mut buf = Vec::new();
     131            2 :         control_file
     132            2 :             .read_to_end(&mut buf)
     133            2 :             .context("failed to read control file")?;
     134              : 
     135            2 :         let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
     136              : 
     137            2 :         let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
     138            2 :             buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
     139            2 :         let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
     140            2 : 
     141            2 :         ensure!(
     142            2 :             calculated_checksum == expected_checksum,
     143            1 :             format!(
     144            1 :                 "safekeeper control file checksum mismatch: expected {} got {}",
     145            1 :                 expected_checksum, calculated_checksum
     146            1 :             )
     147              :         );
     148              : 
     149            1 :         let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
     150            1 :             .with_context(|| {
     151            0 :                 format!(
     152            0 :                     "while reading control file {}",
     153            0 :                     control_file_path.as_ref().display(),
     154            0 :                 )
     155            1 :             })?;
     156            1 :         Ok(state)
     157            2 :     }
     158              : }
     159              : 
     160              : impl Deref for FileStorage {
     161              :     type Target = TimelinePersistentState;
     162              : 
     163         6285 :     fn deref(&self) -> &Self::Target {
     164         6285 :         &self.state
     165         6285 :     }
     166              : }
     167              : 
     168              : impl TimelinePersistentState {
     169           22 :     pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
     170           22 :         let mut buf: Vec<u8> = Vec::new();
     171           22 :         WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
     172              : 
     173           22 :         if self.mconf.generation == INVALID_GENERATION {
     174           20 :             // Temp hack for forward compatibility test: in case of none
     175           20 :             // configuration save cfile in previous v9 format.
     176           20 :             const PREV_FORMAT_VERSION: u32 = 9;
     177           20 :             let prev = downgrade_v10_to_v9(self);
     178           20 :             WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
     179           20 :             prev.ser_into(&mut buf)?;
     180              :         } else {
     181              :             // otherwise, we write the current format version
     182            2 :             WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
     183            2 :             self.ser_into(&mut buf)?;
     184              :         }
     185              : 
     186              :         // calculate checksum before resize
     187           22 :         let checksum = crc32c::crc32c(&buf);
     188           22 :         buf.extend_from_slice(&checksum.to_le_bytes());
     189           22 :         Ok(buf)
     190           22 :     }
     191              : }
     192              : 
     193              : impl Storage for FileStorage {
     194              :     /// Persists state durably to the underlying storage.
     195           22 :     async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
     196           22 :         let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
     197           22 : 
     198           22 :         // write data to safekeeper.control.partial
     199           22 :         let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
     200           22 :         let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
     201            0 :             format!(
     202            0 :                 "failed to create partial control file at: {}",
     203            0 :                 &control_partial_path
     204            0 :             )
     205           22 :         })?;
     206              : 
     207           22 :         let buf: Vec<u8> = s.write_to_buf()?;
     208              : 
     209           22 :         control_partial.write_all(&buf).await.with_context(|| {
     210            0 :             format!(
     211            0 :                 "failed to write safekeeper state into control file at: {}",
     212            0 :                 control_partial_path
     213            0 :             )
     214           22 :         })?;
     215           22 :         control_partial.flush().await.with_context(|| {
     216            0 :             format!(
     217            0 :                 "failed to flush safekeeper state into control file at: {}",
     218            0 :                 control_partial_path
     219            0 :             )
     220           22 :         })?;
     221              : 
     222           22 :         let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
     223           22 :         durable_rename(&control_partial_path, &control_path, !self.no_sync).await?;
     224              : 
     225              :         // update internal state
     226           22 :         self.state = s.clone();
     227           22 :         Ok(())
     228           22 :     }
     229              : 
     230           26 :     fn last_persist_at(&self) -> Instant {
     231           26 :         self.last_persist_at
     232           26 :     }
     233              : }
     234              : 
     235              : #[cfg(test)]
     236              : mod test {
     237              :     use super::*;
     238              :     use safekeeper_api::membership::{Configuration, MemberSet, SafekeeperGeneration};
     239              :     use tokio::fs;
     240              :     use utils::lsn::Lsn;
     241              : 
     242              :     const NO_SYNC: bool = true;
     243              : 
     244              :     #[tokio::test]
     245            1 :     async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
     246            1 :         let tempdir = camino_tempfile::tempdir()?;
     247            1 :         let mut state = TimelinePersistentState::empty();
     248            1 :         state.mconf = Configuration {
     249            1 :             generation: SafekeeperGeneration::new(42),
     250            1 :             members: MemberSet::empty(),
     251            1 :             new_members: None,
     252            1 :         };
     253            1 :         let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
     254            1 : 
     255            1 :         // Make a change.
     256            1 :         state.commit_lsn = Lsn(42);
     257            1 :         storage.persist(&state).await?;
     258            1 : 
     259            1 :         // Reload the state. It should match the previously persisted state.
     260            1 :         let loaded_state = FileStorage::load_control_file_from_dir(tempdir.path())?;
     261            1 :         assert_eq!(loaded_state, state);
     262            1 :         Ok(())
     263            1 :     }
     264              : 
     265              :     #[tokio::test]
     266            1 :     async fn test_safekeeper_state_checksum_mismatch() -> anyhow::Result<()> {
     267            1 :         let tempdir = camino_tempfile::tempdir()?;
     268            1 :         let mut state = TimelinePersistentState::empty();
     269            1 :         let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
     270            1 : 
     271            1 :         // Make a change.
     272            1 :         state.commit_lsn = Lsn(42);
     273            1 :         storage.persist(&state).await?;
     274            1 : 
     275            1 :         // Change the first byte to fail checksum validation.
     276            1 :         let ctrl_path = tempdir.path().join(CONTROL_FILE_NAME);
     277            1 :         let mut data = fs::read(&ctrl_path).await?;
     278            1 :         data[0] += 1;
     279            1 :         fs::write(&ctrl_path, &data).await?;
     280            1 : 
     281            1 :         // Loading the file should fail checksum validation.
     282            1 :         if let Err(err) = FileStorage::load_control_file_from_dir(tempdir.path()) {
     283            1 :             assert!(err.to_string().contains("control file checksum mismatch"))
     284            1 :         } else {
     285            1 :             panic!("expected checksum error")
     286            1 :         }
     287            1 :         Ok(())
     288            1 :     }
     289              : }
        

Generated by: LCOV version 2.1-beta