LCOV - code coverage report
Current view: top level - safekeeper/src - control_file.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 82.9 % 222 184
Test Date: 2023-09-06 10:18:01 Functions: 69.2 % 26 18

            Line data    Source code
       1              : //! Control file serialization, deserialization and persistence.
       2              : 
       3              : use anyhow::{bail, ensure, Context, Result};
       4              : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
       5              : use tokio::fs::{self, File};
       6              : use tokio::io::AsyncWriteExt;
       7              : 
       8              : use std::io::Read;
       9              : use std::ops::Deref;
      10              : use std::path::{Path, PathBuf};
      11              : use std::time::Instant;
      12              : 
      13              : use crate::control_file_upgrade::upgrade_control_file;
      14              : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
      15              : use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC};
      16              : use utils::{bin_ser::LeSer, id::TenantTimelineId};
      17              : 
      18              : use crate::SafeKeeperConf;
      19              : 
      20              : use std::convert::TryInto;
      21              : 
      22              : // contains persistent metadata for safekeeper
      23              : const CONTROL_FILE_NAME: &str = "safekeeper.control";
      24              : // needed to atomically update the state using `rename`
      25              : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
      26              : pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
      27              : 
      28              : /// Storage should keep actual state inside of it. It should implement Deref
      29              : /// trait to access state fields and have persist method for updating that state.
      30              : #[async_trait::async_trait]
      31              : pub trait Storage: Deref<Target = SafeKeeperState> {
      32              :     /// Persist safekeeper state on disk and update internal state.
      33              :     async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
      34              : 
      35              :     /// Timestamp of last persist.
      36              :     fn last_persist_at(&self) -> Instant;
      37              : }
      38              : 
      39            0 : #[derive(Debug)]
      40              : pub struct FileStorage {
      41              :     // save timeline dir to avoid reconstructing it every time
      42              :     timeline_dir: PathBuf,
      43              :     conf: SafeKeeperConf,
      44              : 
      45              :     /// Last state persisted to disk.
      46              :     state: SafeKeeperState,
      47              :     /// Not preserved across restarts.
      48              :     last_persist_at: Instant,
      49              : }
      50              : 
      51              : impl FileStorage {
      52              :     /// Initialize storage by loading state from disk.
      53           83 :     pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
      54           83 :         let timeline_dir = conf.timeline_dir(ttid);
      55              : 
      56           83 :         let state = Self::load_control_file_conf(conf, ttid)?;
      57              : 
      58           82 :         Ok(FileStorage {
      59           82 :             timeline_dir,
      60           82 :             conf: conf.clone(),
      61           82 :             state,
      62           82 :             last_persist_at: Instant::now(),
      63           82 :         })
      64           83 :     }
      65              : 
      66              :     /// Create file storage for a new timeline, but don't persist it yet.
      67          525 :     pub fn create_new(
      68          525 :         ttid: &TenantTimelineId,
      69          525 :         conf: &SafeKeeperConf,
      70          525 :         state: SafeKeeperState,
      71          525 :     ) -> Result<FileStorage> {
      72          525 :         let timeline_dir = conf.timeline_dir(ttid);
      73          525 : 
      74          525 :         let store = FileStorage {
      75          525 :             timeline_dir,
      76          525 :             conf: conf.clone(),
      77          525 :             state,
      78          525 :             last_persist_at: Instant::now(),
      79          525 :         };
      80          525 : 
      81          525 :         Ok(store)
      82          525 :     }
      83              : 
      84              :     /// Check the magic/version in the on-disk data and deserialize it, if possible.
      85           84 :     fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
      86              :         // Read the version independent part
      87           84 :         let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
      88           84 :         if magic != SK_MAGIC {
      89            0 :             bail!(
      90            0 :                 "bad control file magic: {:X}, expected {:X}",
      91            0 :                 magic,
      92            0 :                 SK_MAGIC
      93            0 :             );
      94           84 :         }
      95           84 :         let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
      96           84 :         if version == SK_FORMAT_VERSION {
      97           84 :             let res = SafeKeeperState::des(buf)?;
      98           84 :             return Ok(res);
      99            0 :         }
     100            0 :         // try to upgrade
     101            0 :         upgrade_control_file(buf, version)
     102           84 :     }
     103              : 
     104              :     /// Load control file for given ttid at path specified by conf.
     105           84 :     pub fn load_control_file_conf(
     106           84 :         conf: &SafeKeeperConf,
     107           84 :         ttid: &TenantTimelineId,
     108           84 :     ) -> Result<SafeKeeperState> {
     109           84 :         let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME);
     110           84 :         Self::load_control_file(path)
     111           84 :     }
     112              : 
     113              :     /// Read in the control file.
     114           85 :     pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
     115           85 :         let mut control_file = std::fs::OpenOptions::new()
     116           85 :             .read(true)
     117           85 :             .write(true)
     118           85 :             .open(&control_file_path)
     119           85 :             .with_context(|| {
     120            0 :                 format!(
     121            0 :                     "failed to open control file at {}",
     122            0 :                     control_file_path.as_ref().display(),
     123            0 :                 )
     124           85 :             })?;
     125              : 
     126           85 :         let mut buf = Vec::new();
     127           85 :         control_file
     128           85 :             .read_to_end(&mut buf)
     129           85 :             .context("failed to read control file")?;
     130              : 
     131           85 :         let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
     132              : 
     133           85 :         let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
     134           85 :             buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
     135           85 :         let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
     136           85 : 
     137           85 :         ensure!(
     138           85 :             calculated_checksum == expected_checksum,
     139            1 :             format!(
     140            1 :                 "safekeeper control file checksum mismatch: expected {} got {}",
     141            1 :                 expected_checksum, calculated_checksum
     142            1 :             )
     143              :         );
     144              : 
     145           84 :         let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
     146           84 :             .with_context(|| {
     147            0 :                 format!(
     148            0 :                     "while reading control file {}",
     149            0 :                     control_file_path.as_ref().display(),
     150            0 :                 )
     151           84 :             })?;
     152           84 :         Ok(state)
     153           85 :     }
     154              : }
     155              : 
     156              : impl Deref for FileStorage {
     157              :     type Target = SafeKeeperState;
     158              : 
     159     29727724 :     fn deref(&self) -> &Self::Target {
     160     29727724 :         &self.state
     161     29727724 :     }
     162              : }
     163              : 
     164              : #[async_trait::async_trait]
     165              : impl Storage for FileStorage {
     166              :     /// Persists state durably to the underlying storage.
     167              :     ///
     168              :     /// For a description, see <https://lwn.net/Articles/457667/>.
     169         5439 :     async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
     170         5439 :         let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
     171         5439 : 
     172         5439 :         // write data to safekeeper.control.partial
     173         5439 :         let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
     174         5466 :         let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
     175            0 :             format!(
     176            0 :                 "failed to create partial control file at: {}",
     177            0 :                 &control_partial_path.display()
     178            0 :             )
     179         5439 :         })?;
     180         5439 :         let mut buf: Vec<u8> = Vec::new();
     181         5439 :         WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
     182         5439 :         WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
     183         5439 :         s.ser_into(&mut buf)?;
     184              : 
     185              :         // calculate checksum before resize
     186         5439 :         let checksum = crc32c::crc32c(&buf);
     187         5439 :         buf.extend_from_slice(&checksum.to_le_bytes());
     188         5439 : 
     189         5439 :         control_partial.write_all(&buf).await.with_context(|| {
     190            0 :             format!(
     191            0 :                 "failed to write safekeeper state into control file at: {}",
     192            0 :                 control_partial_path.display()
     193            0 :             )
     194         5439 :         })?;
     195         5439 :         control_partial.flush().await.with_context(|| {
     196            0 :             format!(
     197            0 :                 "failed to flush safekeeper state into control file at: {}",
     198            0 :                 control_partial_path.display()
     199            0 :             )
     200         5439 :         })?;
     201              : 
     202              :         // fsync the file
     203         5439 :         if !self.conf.no_sync {
     204            2 :             control_partial.sync_all().await.with_context(|| {
     205            0 :                 format!(
     206            0 :                     "failed to sync partial control file at {}",
     207            0 :                     control_partial_path.display()
     208            0 :                 )
     209            2 :             })?;
     210         5437 :         }
     211              : 
     212         5439 :         let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
     213         5439 : 
     214         5439 :         // rename should be atomic
     215         5440 :         fs::rename(&control_partial_path, &control_path).await?;
     216              :         // this sync is not required by any standard but postgres does this (see durable_rename)
     217         5439 :         if !self.conf.no_sync {
     218            2 :             let new_f = File::open(&control_path).await?;
     219            2 :             new_f.sync_all().await.with_context(|| {
     220            0 :                 format!(
     221            0 :                     "failed to sync control file at: {}",
     222            0 :                     &control_path.display()
     223            0 :                 )
     224            2 :             })?;
     225              : 
     226              :             // fsync the directory (linux specific)
     227            2 :             let tli_dir = File::open(&self.timeline_dir).await?;
     228            2 :             tli_dir
     229            2 :                 .sync_all()
     230            2 :                 .await
     231            2 :                 .context("failed to sync control file directory")?;
     232         5437 :         }
     233              : 
     234              :         // update internal state
     235         5439 :         self.state = s.clone();
     236         5439 :         Ok(())
     237        10878 :     }
     238              : 
     239         1315 :     fn last_persist_at(&self) -> Instant {
     240         1315 :         self.last_persist_at
     241         1315 :     }
     242              : }
     243              : 
     244              : #[cfg(test)]
     245              : mod test {
     246              :     use super::FileStorage;
     247              :     use super::*;
     248              :     use crate::{safekeeper::SafeKeeperState, SafeKeeperConf};
     249              :     use anyhow::Result;
     250              :     use utils::{id::TenantTimelineId, lsn::Lsn};
     251              : 
     252            2 :     fn stub_conf() -> SafeKeeperConf {
     253            2 :         let workdir = tempfile::tempdir().unwrap().into_path();
     254            2 :         SafeKeeperConf {
     255            2 :             workdir,
     256            2 :             ..SafeKeeperConf::dummy()
     257            2 :         }
     258            2 :     }
     259              : 
     260            2 :     async fn load_from_control_file(
     261            2 :         conf: &SafeKeeperConf,
     262            2 :         ttid: &TenantTimelineId,
     263            2 :     ) -> Result<(FileStorage, SafeKeeperState)> {
     264            2 :         fs::create_dir_all(conf.timeline_dir(ttid))
     265            2 :             .await
     266            2 :             .expect("failed to create timeline dir");
     267            2 :         Ok((
     268            2 :             FileStorage::restore_new(ttid, conf)?,
     269            1 :             FileStorage::load_control_file_conf(conf, ttid)?,
     270              :         ))
     271            2 :     }
     272              : 
     273            2 :     async fn create(
     274            2 :         conf: &SafeKeeperConf,
     275            2 :         ttid: &TenantTimelineId,
     276            2 :     ) -> Result<(FileStorage, SafeKeeperState)> {
     277            2 :         fs::create_dir_all(conf.timeline_dir(ttid))
     278            2 :             .await
     279            2 :             .expect("failed to create timeline dir");
     280            2 :         let state = SafeKeeperState::empty();
     281            2 :         let storage = FileStorage::create_new(ttid, conf, state.clone())?;
     282            2 :         Ok((storage, state))
     283            2 :     }
     284              : 
     285            1 :     #[tokio::test]
     286            1 :     async fn test_read_write_safekeeper_state() {
     287            1 :         let conf = stub_conf();
     288            1 :         let ttid = TenantTimelineId::generate();
     289              :         {
     290            1 :             let (mut storage, mut state) =
     291            1 :                 create(&conf, &ttid).await.expect("failed to create state");
     292            1 :             // change something
     293            1 :             state.commit_lsn = Lsn(42);
     294            1 :             storage
     295            1 :                 .persist(&state)
     296            8 :                 .await
     297            1 :                 .expect("failed to persist state");
     298              :         }
     299              : 
     300            1 :         let (_, state) = load_from_control_file(&conf, &ttid)
     301            1 :             .await
     302            1 :             .expect("failed to read state");
     303            1 :         assert_eq!(state.commit_lsn, Lsn(42));
     304              :     }
     305              : 
     306            1 :     #[tokio::test]
     307            1 :     async fn test_safekeeper_state_checksum_mismatch() {
     308            1 :         let conf = stub_conf();
     309            1 :         let ttid = TenantTimelineId::generate();
     310              :         {
     311            1 :             let (mut storage, mut state) =
     312            1 :                 create(&conf, &ttid).await.expect("failed to read state");
     313            1 : 
     314            1 :             // change something
     315            1 :             state.commit_lsn = Lsn(42);
     316            1 :             storage
     317            1 :                 .persist(&state)
     318            8 :                 .await
     319            1 :                 .expect("failed to persist state");
     320            1 :         }
     321            1 :         let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
     322            1 :         let mut data = fs::read(&control_path).await.unwrap();
     323            1 :         data[0] += 1; // change the first byte of the file to fail checksum validation
     324            1 :         fs::write(&control_path, &data)
     325            1 :             .await
     326            1 :             .expect("failed to write control file");
     327            1 : 
     328            1 :         match load_from_control_file(&conf, &ttid).await {
     329            1 :             Err(err) => assert!(err
     330            1 :                 .to_string()
     331            1 :                 .contains("safekeeper control file checksum mismatch")),
     332            0 :             Ok(_) => panic!("expected error"),
     333              :         }
     334              :     }
     335              : }
        

Generated by: LCOV version 2.1-beta