Line data Source code
1 : //! Control file serialization, deserialization and persistence.
2 :
3 : use std::future::Future;
4 : use std::io::Read;
5 : use std::ops::Deref;
6 : use std::path::Path;
7 : use std::time::Instant;
8 :
9 : use anyhow::{Context, Result, bail, ensure};
10 : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
11 : use camino::{Utf8Path, Utf8PathBuf};
12 : use safekeeper_api::membership::INVALID_GENERATION;
13 : use tokio::fs::File;
14 : use tokio::io::AsyncWriteExt;
15 : use utils::bin_ser::LeSer;
16 : use utils::crashsafe::durable_rename;
17 :
18 : use crate::control_file_upgrade::{downgrade_v10_to_v9, upgrade_control_file};
19 : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
20 : use crate::metrics::WAL_DISK_IO_ERRORS;
21 : use crate::state::{EvictionState, TimelinePersistentState};
22 :
23 : pub const SK_MAGIC: u32 = 0xcafeceefu32;
24 : pub const SK_FORMAT_VERSION: u32 = 10;
25 :
26 : // contains persistent metadata for safekeeper
27 : pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
28 : // needed to atomically update the state using `rename`
29 : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
30 : pub const CHECKSUM_SIZE: usize = size_of::<u32>();
31 :
32 : /// Storage should keep actual state inside of it. It should implement Deref
33 : /// trait to access state fields and have persist method for updating that state.
34 : pub trait Storage: Deref<Target = TimelinePersistentState> {
35 : /// Persist safekeeper state on disk and update internal state.
36 : fn persist(&mut self, s: &TimelinePersistentState) -> impl Future<Output = Result<()>> + Send;
37 :
38 : /// Timestamp of last persist.
39 : fn last_persist_at(&self) -> Instant;
40 : }
41 :
42 : #[derive(Debug)]
43 : pub struct FileStorage {
44 : // save timeline dir to avoid reconstructing it every time
45 : timeline_dir: Utf8PathBuf,
46 : no_sync: bool,
47 :
48 : /// Last state persisted to disk.
49 : state: TimelinePersistentState,
50 : /// Not preserved across restarts.
51 : last_persist_at: Instant,
52 : }
53 :
54 : impl FileStorage {
55 : /// Initialize storage by loading state from disk.
56 0 : pub fn restore_new(timeline_dir: &Utf8Path, no_sync: bool) -> Result<FileStorage> {
57 0 : let state = Self::load_control_file_from_dir(timeline_dir)?;
58 :
59 0 : Ok(FileStorage {
60 0 : timeline_dir: timeline_dir.to_path_buf(),
61 0 : no_sync,
62 0 : state,
63 0 : last_persist_at: Instant::now(),
64 0 : })
65 0 : }
66 :
67 : /// Create and reliably persist new control file at given location.
68 : ///
69 : /// Note: we normally call this in temp directory for atomic init, so
70 : /// interested in FileStorage as a result only in tests.
71 7 : pub async fn create_new(
72 7 : timeline_dir: &Utf8Path,
73 7 : state: TimelinePersistentState,
74 7 : no_sync: bool,
75 7 : ) -> Result<FileStorage> {
76 : // we don't support creating new timelines in offloaded state
77 7 : assert!(matches!(state.eviction_state, EvictionState::Present));
78 :
79 7 : let mut store = FileStorage {
80 7 : timeline_dir: timeline_dir.to_path_buf(),
81 7 : no_sync,
82 7 : state: state.clone(),
83 7 : last_persist_at: Instant::now(),
84 7 : };
85 7 : store.persist(&state).await?;
86 7 : Ok(store)
87 7 : }
88 :
89 : /// Check the magic/version in the on-disk data and deserialize it, if possible.
90 1 : fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
91 : // Read the version independent part
92 1 : let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
93 1 : if magic != SK_MAGIC {
94 0 : bail!(
95 0 : "bad control file magic: {:X}, expected {:X}",
96 : magic,
97 : SK_MAGIC
98 : );
99 1 : }
100 1 : let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
101 1 : if version == SK_FORMAT_VERSION {
102 1 : let res = TimelinePersistentState::des(buf)?;
103 1 : return Ok(res);
104 0 : }
105 : // try to upgrade
106 0 : upgrade_control_file(buf, version)
107 1 : }
108 :
109 : /// Load control file from given directory.
110 2 : fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
111 2 : let path = timeline_dir.join(CONTROL_FILE_NAME);
112 2 : Self::load_control_file(path)
113 2 : }
114 :
115 : /// Read in the control file.
116 2 : pub fn load_control_file<P: AsRef<Path>>(
117 2 : control_file_path: P,
118 2 : ) -> Result<TimelinePersistentState> {
119 2 : let mut control_file = std::fs::OpenOptions::new()
120 2 : .read(true)
121 2 : .write(true)
122 2 : .open(&control_file_path)
123 2 : .with_context(|| {
124 0 : format!(
125 0 : "failed to open control file at {}",
126 0 : control_file_path.as_ref().display(),
127 : )
128 0 : })?;
129 :
130 2 : let mut buf = Vec::new();
131 2 : control_file
132 2 : .read_to_end(&mut buf)
133 2 : .context("failed to read control file")?;
134 :
135 2 : let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
136 :
137 2 : let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
138 2 : buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
139 2 : let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
140 :
141 2 : ensure!(
142 2 : calculated_checksum == expected_checksum,
143 1 : format!(
144 1 : "safekeeper control file checksum mismatch: expected {} got {}",
145 : expected_checksum, calculated_checksum
146 : )
147 : );
148 :
149 1 : let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
150 1 : .with_context(|| {
151 0 : format!(
152 0 : "while reading control file {}",
153 0 : control_file_path.as_ref().display(),
154 : )
155 0 : })?;
156 1 : Ok(state)
157 2 : }
158 : }
159 :
160 : impl Deref for FileStorage {
161 : type Target = TimelinePersistentState;
162 :
163 7980 : fn deref(&self) -> &Self::Target {
164 7980 : &self.state
165 7980 : }
166 : }
167 :
168 : impl TimelinePersistentState {
169 44 : pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
170 44 : let mut buf: Vec<u8> = Vec::new();
171 44 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
172 :
173 44 : if self.mconf.generation == INVALID_GENERATION {
174 : // Temp hack for forward compatibility test: in case of none
175 : // configuration save cfile in previous v9 format.
176 : const PREV_FORMAT_VERSION: u32 = 9;
177 42 : let prev = downgrade_v10_to_v9(self);
178 42 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
179 42 : prev.ser_into(&mut buf)?;
180 : } else {
181 : // otherwise, we write the current format version
182 2 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
183 2 : self.ser_into(&mut buf)?;
184 : }
185 :
186 : // calculate checksum before resize
187 44 : let checksum = crc32c::crc32c(&buf);
188 44 : buf.extend_from_slice(&checksum.to_le_bytes());
189 44 : Ok(buf)
190 44 : }
191 : }
192 :
193 : impl Storage for FileStorage {
194 : /// Persists state durably to the underlying storage.
195 44 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
196 : // start timer for metrics
197 44 : let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
198 : // write data to safekeeper.control.partial
199 44 : let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
200 44 : let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
201 : /* BEGIN_HADRON */
202 0 : WAL_DISK_IO_ERRORS.inc();
203 : /*END_HADRON */
204 0 : format!(
205 0 : "failed to create partial control file at: {}",
206 0 : &control_partial_path
207 : )
208 0 : })?;
209 :
210 44 : let buf: Vec<u8> = s.write_to_buf()?;
211 :
212 44 : control_partial.write_all(&buf).await.with_context(|| {
213 : /* BEGIN_HADRON */
214 0 : WAL_DISK_IO_ERRORS.inc();
215 : /*END_HADRON */
216 0 : format!("failed to write safekeeper state into control file at: {control_partial_path}")
217 0 : })?;
218 44 : control_partial.flush().await.with_context(|| {
219 : /* BEGIN_HADRON */
220 0 : WAL_DISK_IO_ERRORS.inc();
221 : /*END_HADRON */
222 0 : format!("failed to flush safekeeper state into control file at: {control_partial_path}")
223 0 : })?;
224 :
225 44 : let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
226 44 : durable_rename(&control_partial_path, &control_path, !self.no_sync)
227 44 : .await
228 : /* BEGIN_HADRON */
229 43 : .inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
230 : /* END_HADRON */
231 :
232 : // update internal state
233 43 : self.state = s.clone();
234 43 : Ok(())
235 43 : }
236 :
237 50 : fn last_persist_at(&self) -> Instant {
238 50 : self.last_persist_at
239 50 : }
240 : }
241 :
242 : #[cfg(test)]
243 : mod test {
244 : use safekeeper_api::membership::{Configuration, MemberSet, SafekeeperGeneration};
245 : use tokio::fs;
246 : use utils::lsn::Lsn;
247 :
248 : use super::*;
249 :
250 : const NO_SYNC: bool = true;
251 :
252 : #[tokio::test]
253 1 : async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
254 1 : let tempdir = camino_tempfile::tempdir()?;
255 1 : let mut state = TimelinePersistentState::empty();
256 1 : state.mconf = Configuration {
257 1 : generation: SafekeeperGeneration::new(42),
258 1 : members: MemberSet::empty(),
259 1 : new_members: None,
260 1 : };
261 1 : let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
262 :
263 : // Make a change.
264 1 : state.commit_lsn = Lsn(42);
265 1 : storage.persist(&state).await?;
266 :
267 : // Reload the state. It should match the previously persisted state.
268 1 : let loaded_state = FileStorage::load_control_file_from_dir(tempdir.path())?;
269 1 : assert_eq!(loaded_state, state);
270 2 : Ok(())
271 1 : }
272 :
273 : #[tokio::test]
274 1 : async fn test_safekeeper_state_checksum_mismatch() -> anyhow::Result<()> {
275 1 : let tempdir = camino_tempfile::tempdir()?;
276 1 : let mut state = TimelinePersistentState::empty();
277 1 : let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
278 :
279 : // Make a change.
280 1 : state.commit_lsn = Lsn(42);
281 1 : storage.persist(&state).await?;
282 :
283 : // Change the first byte to fail checksum validation.
284 1 : let ctrl_path = tempdir.path().join(CONTROL_FILE_NAME);
285 1 : let mut data = fs::read(&ctrl_path).await?;
286 1 : data[0] += 1;
287 1 : fs::write(&ctrl_path, &data).await?;
288 :
289 : // Loading the file should fail checksum validation.
290 1 : if let Err(err) = FileStorage::load_control_file_from_dir(tempdir.path()) {
291 1 : assert!(err.to_string().contains("control file checksum mismatch"))
292 1 : } else {
293 1 : panic!("expected checksum error")
294 1 : }
295 1 : Ok(())
296 1 : }
297 : }
|