LCOV - code coverage report
Current view: top level - safekeeper/src - control_file.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 74.1 % 170 126
Test Date: 2024-11-25 17:48:16 Functions: 60.0 % 20 12

            Line data    Source code
       1              : //! Control file serialization, deserialization and persistence.
       2              : 
       3              : use anyhow::{bail, ensure, Context, Result};
       4              : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
       5              : use camino::{Utf8Path, Utf8PathBuf};
       6              : use tokio::fs::File;
       7              : use tokio::io::AsyncWriteExt;
       8              : use utils::crashsafe::durable_rename;
       9              : 
      10              : use std::future::Future;
      11              : use std::io::Read;
      12              : use std::ops::Deref;
      13              : use std::path::Path;
      14              : use std::time::Instant;
      15              : 
      16              : use crate::control_file_upgrade::downgrade_v9_to_v8;
      17              : use crate::control_file_upgrade::upgrade_control_file;
      18              : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
      19              : use crate::state::{EvictionState, TimelinePersistentState};
      20              : use utils::bin_ser::LeSer;
      21              : 
      22              : pub const SK_MAGIC: u32 = 0xcafeceefu32;
      23              : pub const SK_FORMAT_VERSION: u32 = 9;
      24              : 
      25              : // contains persistent metadata for safekeeper
      26              : pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
      27              : // needed to atomically update the state using `rename`
      28              : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
      29              : pub const CHECKSUM_SIZE: usize = size_of::<u32>();
      30              : 
      31              : /// Storage should keep actual state inside of it. It should implement Deref
      32              : /// trait to access state fields and have persist method for updating that state.
      33              : pub trait Storage: Deref<Target = TimelinePersistentState> {
      34              :     /// Persist safekeeper state on disk and update internal state.
      35              :     fn persist(&mut self, s: &TimelinePersistentState) -> impl Future<Output = Result<()>> + Send;
      36              : 
      37              :     /// Timestamp of last persist.
      38              :     fn last_persist_at(&self) -> Instant;
      39              : }
      40              : 
      41              : #[derive(Debug)]
      42              : pub struct FileStorage {
      43              :     // save timeline dir to avoid reconstructing it every time
      44              :     timeline_dir: Utf8PathBuf,
      45              :     no_sync: bool,
      46              : 
      47              :     /// Last state persisted to disk.
      48              :     state: TimelinePersistentState,
      49              :     /// Not preserved across restarts.
      50              :     last_persist_at: Instant,
      51              : }
      52              : 
      53              : impl FileStorage {
      54              :     /// Initialize storage by loading state from disk.
      55            0 :     pub fn restore_new(timeline_dir: &Utf8Path, no_sync: bool) -> Result<FileStorage> {
      56            0 :         let state = Self::load_control_file_from_dir(timeline_dir)?;
      57              : 
      58            0 :         Ok(FileStorage {
      59            0 :             timeline_dir: timeline_dir.to_path_buf(),
      60            0 :             no_sync,
      61            0 :             state,
      62            0 :             last_persist_at: Instant::now(),
      63            0 :         })
      64            0 :     }
      65              : 
      66              :     /// Create and reliably persist new control file at given location.
      67              :     ///
      68              :     /// Note: we normally call this in temp directory for atomic init, so
      69              :     /// interested in FileStorage as a result only in tests.
      70            2 :     pub async fn create_new(
      71            2 :         timeline_dir: &Utf8Path,
      72            2 :         state: TimelinePersistentState,
      73            2 :         no_sync: bool,
      74            2 :     ) -> Result<FileStorage> {
      75            2 :         // we don't support creating new timelines in offloaded state
      76            2 :         assert!(matches!(state.eviction_state, EvictionState::Present));
      77              : 
      78            2 :         let mut store = FileStorage {
      79            2 :             timeline_dir: timeline_dir.to_path_buf(),
      80            2 :             no_sync,
      81            2 :             state: state.clone(),
      82            2 :             last_persist_at: Instant::now(),
      83            2 :         };
      84            6 :         store.persist(&state).await?;
      85            2 :         Ok(store)
      86            2 :     }
      87              : 
      88              :     /// Check the magic/version in the on-disk data and deserialize it, if possible.
      89            1 :     fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
      90              :         // Read the version independent part
      91            1 :         let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
      92            1 :         if magic != SK_MAGIC {
      93            0 :             bail!(
      94            0 :                 "bad control file magic: {:X}, expected {:X}",
      95            0 :                 magic,
      96            0 :                 SK_MAGIC
      97            0 :             );
      98            1 :         }
      99            1 :         let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
     100            1 :         if version == SK_FORMAT_VERSION {
     101            0 :             let res = TimelinePersistentState::des(buf)?;
     102            0 :             return Ok(res);
     103            1 :         }
     104            1 :         // try to upgrade
     105            1 :         upgrade_control_file(buf, version)
     106            1 :     }
     107              : 
     108              :     /// Load control file from given directory.
     109            2 :     fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
     110            2 :         let path = timeline_dir.join(CONTROL_FILE_NAME);
     111            2 :         Self::load_control_file(path)
     112            2 :     }
     113              : 
     114              :     /// Read in the control file.
     115            2 :     pub fn load_control_file<P: AsRef<Path>>(
     116            2 :         control_file_path: P,
     117            2 :     ) -> Result<TimelinePersistentState> {
     118            2 :         let mut control_file = std::fs::OpenOptions::new()
     119            2 :             .read(true)
     120            2 :             .write(true)
     121            2 :             .open(&control_file_path)
     122            2 :             .with_context(|| {
     123            0 :                 format!(
     124            0 :                     "failed to open control file at {}",
     125            0 :                     control_file_path.as_ref().display(),
     126            0 :                 )
     127            2 :             })?;
     128              : 
     129            2 :         let mut buf = Vec::new();
     130            2 :         control_file
     131            2 :             .read_to_end(&mut buf)
     132            2 :             .context("failed to read control file")?;
     133              : 
     134            2 :         let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
     135              : 
     136            2 :         let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
     137            2 :             buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
     138            2 :         let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
     139            2 : 
     140            2 :         ensure!(
     141            2 :             calculated_checksum == expected_checksum,
     142            1 :             format!(
     143            1 :                 "safekeeper control file checksum mismatch: expected {} got {}",
     144            1 :                 expected_checksum, calculated_checksum
     145            1 :             )
     146              :         );
     147              : 
     148            1 :         let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
     149            1 :             .with_context(|| {
     150            0 :                 format!(
     151            0 :                     "while reading control file {}",
     152            0 :                     control_file_path.as_ref().display(),
     153            0 :                 )
     154            1 :             })?;
     155            1 :         Ok(state)
     156            2 :     }
     157              : }
     158              : 
     159              : impl Deref for FileStorage {
     160              :     type Target = TimelinePersistentState;
     161              : 
     162            0 :     fn deref(&self) -> &Self::Target {
     163            0 :         &self.state
     164            0 :     }
     165              : }
     166              : 
     167              : impl TimelinePersistentState {
     168            4 :     pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
     169            4 :         let mut buf: Vec<u8> = Vec::new();
     170            4 :         WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
     171              : 
     172            4 :         if self.eviction_state == EvictionState::Present {
     173            4 :             // temp hack for forward compatibility
     174            4 :             const PREV_FORMAT_VERSION: u32 = 8;
     175            4 :             let prev = downgrade_v9_to_v8(self);
     176            4 :             WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
     177            4 :             prev.ser_into(&mut buf)?;
     178              :         } else {
     179              :             // otherwise, we write the current format version
     180            0 :             WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
     181            0 :             self.ser_into(&mut buf)?;
     182              :         }
     183              : 
     184              :         // calculate checksum before resize
     185            4 :         let checksum = crc32c::crc32c(&buf);
     186            4 :         buf.extend_from_slice(&checksum.to_le_bytes());
     187            4 :         Ok(buf)
     188            4 :     }
     189              : }
     190              : 
     191              : impl Storage for FileStorage {
     192              :     /// Persists state durably to the underlying storage.
     193            4 :     async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
     194            4 :         let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
     195            4 : 
     196            4 :         // write data to safekeeper.control.partial
     197            4 :         let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
     198            4 :         let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
     199            0 :             format!(
     200            0 :                 "failed to create partial control file at: {}",
     201            0 :                 &control_partial_path
     202            0 :             )
     203            4 :         })?;
     204              : 
     205            4 :         let buf: Vec<u8> = s.write_to_buf()?;
     206              : 
     207            4 :         control_partial.write_all(&buf).await.with_context(|| {
     208            0 :             format!(
     209            0 :                 "failed to write safekeeper state into control file at: {}",
     210            0 :                 control_partial_path
     211            0 :             )
     212            4 :         })?;
     213            4 :         control_partial.flush().await.with_context(|| {
     214            0 :             format!(
     215            0 :                 "failed to flush safekeeper state into control file at: {}",
     216            0 :                 control_partial_path
     217            0 :             )
     218            4 :         })?;
     219              : 
     220            4 :         let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
     221            4 :         durable_rename(&control_partial_path, &control_path, !self.no_sync).await?;
     222              : 
     223              :         // update internal state
     224            4 :         self.state = s.clone();
     225            4 :         Ok(())
     226            4 :     }
     227              : 
     228            0 :     fn last_persist_at(&self) -> Instant {
     229            0 :         self.last_persist_at
     230            0 :     }
     231              : }
     232              : 
     233              : #[cfg(test)]
     234              : mod test {
     235              :     use super::*;
     236              :     use tokio::fs;
     237              :     use utils::lsn::Lsn;
     238              : 
     239              :     const NO_SYNC: bool = true;
     240              : 
     241              :     #[tokio::test]
     242            1 :     async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
     243            1 :         let tempdir = camino_tempfile::tempdir()?;
     244            1 :         let mut state = TimelinePersistentState::empty();
     245            3 :         let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
     246            1 : 
     247            1 :         // Make a change.
     248            1 :         state.commit_lsn = Lsn(42);
     249            3 :         storage.persist(&state).await?;
     250            1 : 
     251            1 :         // Reload the state. It should match the previously persisted state.
     252            1 :         let loaded_state = FileStorage::load_control_file_from_dir(tempdir.path())?;
     253            1 :         assert_eq!(loaded_state, state);
     254            1 :         Ok(())
     255            1 :     }
     256              : 
     257              :     #[tokio::test]
     258            1 :     async fn test_safekeeper_state_checksum_mismatch() -> anyhow::Result<()> {
     259            1 :         let tempdir = camino_tempfile::tempdir()?;
     260            1 :         let mut state = TimelinePersistentState::empty();
     261            3 :         let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
     262            1 : 
     263            1 :         // Make a change.
     264            1 :         state.commit_lsn = Lsn(42);
     265            3 :         storage.persist(&state).await?;
     266            1 : 
     267            1 :         // Change the first byte to fail checksum validation.
     268            1 :         let ctrl_path = tempdir.path().join(CONTROL_FILE_NAME);
     269            1 :         let mut data = fs::read(&ctrl_path).await?;
     270            1 :         data[0] += 1;
     271            1 :         fs::write(&ctrl_path, &data).await?;
     272            1 : 
     273            1 :         // Loading the file should fail checksum validation.
     274            1 :         if let Err(err) = FileStorage::load_control_file_from_dir(tempdir.path()) {
     275            1 :             assert!(err.to_string().contains("control file checksum mismatch"))
     276            1 :         } else {
     277            1 :             panic!("expected checksum error")
     278            1 :         }
     279            1 :         Ok(())
     280            1 :     }
     281              : }
        

Generated by: LCOV version 2.1-beta