Line data Source code
1 : //! Control file serialization, deserialization and persistence.
2 :
3 : use anyhow::{bail, ensure, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use tokio::fs::File;
7 : use tokio::io::AsyncWriteExt;
8 : use utils::crashsafe::durable_rename;
9 :
10 : use std::future::Future;
11 : use std::io::Read;
12 : use std::ops::Deref;
13 : use std::path::Path;
14 : use std::time::Instant;
15 :
16 : use crate::control_file_upgrade::downgrade_v9_to_v8;
17 : use crate::control_file_upgrade::upgrade_control_file;
18 : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
19 : use crate::state::{EvictionState, TimelinePersistentState};
20 : use utils::bin_ser::LeSer;
21 :
22 : pub const SK_MAGIC: u32 = 0xcafeceefu32;
23 : pub const SK_FORMAT_VERSION: u32 = 9;
24 :
25 : // contains persistent metadata for safekeeper
26 : pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
27 : // needed to atomically update the state using `rename`
28 : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
29 : pub const CHECKSUM_SIZE: usize = size_of::<u32>();
30 :
31 : /// Storage should keep actual state inside of it. It should implement Deref
32 : /// trait to access state fields and have persist method for updating that state.
33 : pub trait Storage: Deref<Target = TimelinePersistentState> {
34 : /// Persist safekeeper state on disk and update internal state.
35 : fn persist(&mut self, s: &TimelinePersistentState) -> impl Future<Output = Result<()>> + Send;
36 :
37 : /// Timestamp of last persist.
38 : fn last_persist_at(&self) -> Instant;
39 : }
40 :
41 : #[derive(Debug)]
42 : pub struct FileStorage {
43 : // save timeline dir to avoid reconstructing it every time
44 : timeline_dir: Utf8PathBuf,
45 : no_sync: bool,
46 :
47 : /// Last state persisted to disk.
48 : state: TimelinePersistentState,
49 : /// Not preserved across restarts.
50 : last_persist_at: Instant,
51 : }
52 :
53 : impl FileStorage {
54 : /// Initialize storage by loading state from disk.
55 0 : pub fn restore_new(timeline_dir: &Utf8Path, no_sync: bool) -> Result<FileStorage> {
56 0 : let state = Self::load_control_file_from_dir(timeline_dir)?;
57 :
58 0 : Ok(FileStorage {
59 0 : timeline_dir: timeline_dir.to_path_buf(),
60 0 : no_sync,
61 0 : state,
62 0 : last_persist_at: Instant::now(),
63 0 : })
64 0 : }
65 :
66 : /// Create and reliably persist new control file at given location.
67 : ///
68 : /// Note: we normally call this in temp directory for atomic init, so
69 : /// interested in FileStorage as a result only in tests.
70 2 : pub async fn create_new(
71 2 : timeline_dir: &Utf8Path,
72 2 : state: TimelinePersistentState,
73 2 : no_sync: bool,
74 2 : ) -> Result<FileStorage> {
75 2 : // we don't support creating new timelines in offloaded state
76 2 : assert!(matches!(state.eviction_state, EvictionState::Present));
77 :
78 2 : let mut store = FileStorage {
79 2 : timeline_dir: timeline_dir.to_path_buf(),
80 2 : no_sync,
81 2 : state: state.clone(),
82 2 : last_persist_at: Instant::now(),
83 2 : };
84 5 : store.persist(&state).await?;
85 2 : Ok(store)
86 2 : }
87 :
88 : /// Check the magic/version in the on-disk data and deserialize it, if possible.
89 1 : fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
90 : // Read the version independent part
91 1 : let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
92 1 : if magic != SK_MAGIC {
93 0 : bail!(
94 0 : "bad control file magic: {:X}, expected {:X}",
95 0 : magic,
96 0 : SK_MAGIC
97 0 : );
98 1 : }
99 1 : let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
100 1 : if version == SK_FORMAT_VERSION {
101 0 : let res = TimelinePersistentState::des(buf)?;
102 0 : return Ok(res);
103 1 : }
104 1 : // try to upgrade
105 1 : upgrade_control_file(buf, version)
106 1 : }
107 :
108 : /// Load control file from given directory.
109 2 : fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
110 2 : let path = timeline_dir.join(CONTROL_FILE_NAME);
111 2 : Self::load_control_file(path)
112 2 : }
113 :
114 : /// Read in the control file.
115 2 : pub fn load_control_file<P: AsRef<Path>>(
116 2 : control_file_path: P,
117 2 : ) -> Result<TimelinePersistentState> {
118 2 : let mut control_file = std::fs::OpenOptions::new()
119 2 : .read(true)
120 2 : .write(true)
121 2 : .open(&control_file_path)
122 2 : .with_context(|| {
123 0 : format!(
124 0 : "failed to open control file at {}",
125 0 : control_file_path.as_ref().display(),
126 0 : )
127 2 : })?;
128 :
129 2 : let mut buf = Vec::new();
130 2 : control_file
131 2 : .read_to_end(&mut buf)
132 2 : .context("failed to read control file")?;
133 :
134 2 : let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
135 :
136 2 : let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
137 2 : buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
138 2 : let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
139 2 :
140 2 : ensure!(
141 2 : calculated_checksum == expected_checksum,
142 1 : format!(
143 1 : "safekeeper control file checksum mismatch: expected {} got {}",
144 1 : expected_checksum, calculated_checksum
145 1 : )
146 : );
147 :
148 1 : let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
149 1 : .with_context(|| {
150 0 : format!(
151 0 : "while reading control file {}",
152 0 : control_file_path.as_ref().display(),
153 0 : )
154 1 : })?;
155 1 : Ok(state)
156 2 : }
157 : }
158 :
159 : impl Deref for FileStorage {
160 : type Target = TimelinePersistentState;
161 :
162 0 : fn deref(&self) -> &Self::Target {
163 0 : &self.state
164 0 : }
165 : }
166 :
167 : impl TimelinePersistentState {
168 4 : pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
169 4 : let mut buf: Vec<u8> = Vec::new();
170 4 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
171 :
172 4 : if self.eviction_state == EvictionState::Present {
173 4 : // temp hack for forward compatibility
174 4 : const PREV_FORMAT_VERSION: u32 = 8;
175 4 : let prev = downgrade_v9_to_v8(self);
176 4 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
177 4 : prev.ser_into(&mut buf)?;
178 : } else {
179 : // otherwise, we write the current format version
180 0 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
181 0 : self.ser_into(&mut buf)?;
182 : }
183 :
184 : // calculate checksum before resize
185 4 : let checksum = crc32c::crc32c(&buf);
186 4 : buf.extend_from_slice(&checksum.to_le_bytes());
187 4 : Ok(buf)
188 4 : }
189 : }
190 :
191 : impl Storage for FileStorage {
192 : /// Persists state durably to the underlying storage.
193 4 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
194 4 : let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
195 4 :
196 4 : // write data to safekeeper.control.partial
197 4 : let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
198 4 : let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
199 0 : format!(
200 0 : "failed to create partial control file at: {}",
201 0 : &control_partial_path
202 0 : )
203 4 : })?;
204 :
205 4 : let buf: Vec<u8> = s.write_to_buf()?;
206 :
207 4 : control_partial.write_all(&buf).await.with_context(|| {
208 0 : format!(
209 0 : "failed to write safekeeper state into control file at: {}",
210 0 : control_partial_path
211 0 : )
212 4 : })?;
213 4 : control_partial.flush().await.with_context(|| {
214 0 : format!(
215 0 : "failed to flush safekeeper state into control file at: {}",
216 0 : control_partial_path
217 0 : )
218 4 : })?;
219 :
220 4 : let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
221 4 : durable_rename(&control_partial_path, &control_path, !self.no_sync).await?;
222 :
223 : // update internal state
224 4 : self.state = s.clone();
225 4 : Ok(())
226 4 : }
227 :
228 0 : fn last_persist_at(&self) -> Instant {
229 0 : self.last_persist_at
230 0 : }
231 : }
232 :
233 : #[cfg(test)]
234 : mod test {
235 : use super::*;
236 : use tokio::fs;
237 : use utils::lsn::Lsn;
238 :
239 : const NO_SYNC: bool = true;
240 :
241 : #[tokio::test]
242 1 : async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
243 1 : let tempdir = camino_tempfile::tempdir()?;
244 1 : let mut state = TimelinePersistentState::empty();
245 2 : let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
246 1 :
247 1 : // Make a change.
248 1 : state.commit_lsn = Lsn(42);
249 3 : storage.persist(&state).await?;
250 1 :
251 1 : // Reload the state. It should match the previously persisted state.
252 1 : let loaded_state = FileStorage::load_control_file_from_dir(tempdir.path())?;
253 1 : assert_eq!(loaded_state, state);
254 1 : Ok(())
255 1 : }
256 :
257 : #[tokio::test]
258 1 : async fn test_safekeeper_state_checksum_mismatch() -> anyhow::Result<()> {
259 1 : let tempdir = camino_tempfile::tempdir()?;
260 1 : let mut state = TimelinePersistentState::empty();
261 3 : let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
262 1 :
263 1 : // Make a change.
264 1 : state.commit_lsn = Lsn(42);
265 3 : storage.persist(&state).await?;
266 1 :
267 1 : // Change the first byte to fail checksum validation.
268 1 : let ctrl_path = tempdir.path().join(CONTROL_FILE_NAME);
269 1 : let mut data = fs::read(&ctrl_path).await?;
270 1 : data[0] += 1;
271 1 : fs::write(&ctrl_path, &data).await?;
272 1 :
273 1 : // Loading the file should fail checksum validation.
274 1 : if let Err(err) = FileStorage::load_control_file_from_dir(tempdir.path()) {
275 1 : assert!(err.to_string().contains("control file checksum mismatch"))
276 1 : } else {
277 1 : panic!("expected checksum error")
278 1 : }
279 1 : Ok(())
280 1 : }
281 : }
|