Line data Source code
1 : //! Control file serialization, deserialization and persistence.
2 :
3 : use anyhow::{bail, ensure, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use tokio::fs::File;
7 : use tokio::io::AsyncWriteExt;
8 : use utils::crashsafe::durable_rename;
9 :
10 : use std::future::Future;
11 : use std::io::Read;
12 : use std::ops::Deref;
13 : use std::path::Path;
14 : use std::time::Instant;
15 :
16 : use crate::control_file_upgrade::downgrade_v9_to_v8;
17 : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
18 : use crate::state::{EvictionState, TimelinePersistentState};
19 : use crate::{control_file_upgrade::upgrade_control_file, timeline::get_timeline_dir};
20 : use utils::{bin_ser::LeSer, id::TenantTimelineId};
21 :
22 : use crate::SafeKeeperConf;
23 :
24 : pub const SK_MAGIC: u32 = 0xcafeceefu32;
25 : pub const SK_FORMAT_VERSION: u32 = 9;
26 :
27 : // contains persistent metadata for safekeeper
28 : pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
29 : // needed to atomically update the state using `rename`
30 : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
31 : pub const CHECKSUM_SIZE: usize = size_of::<u32>();
32 :
33 : /// Storage should keep actual state inside of it. It should implement Deref
34 : /// trait to access state fields and have persist method for updating that state.
35 : pub trait Storage: Deref<Target = TimelinePersistentState> {
36 : /// Persist safekeeper state on disk and update internal state.
37 : fn persist(&mut self, s: &TimelinePersistentState) -> impl Future<Output = Result<()>> + Send;
38 :
39 : /// Timestamp of last persist.
40 : fn last_persist_at(&self) -> Instant;
41 : }
42 :
43 : #[derive(Debug)]
44 : pub struct FileStorage {
45 : // save timeline dir to avoid reconstructing it every time
46 : timeline_dir: Utf8PathBuf,
47 : no_sync: bool,
48 :
49 : /// Last state persisted to disk.
50 : state: TimelinePersistentState,
51 : /// Not preserved across restarts.
52 : last_persist_at: Instant,
53 : }
54 :
55 : impl FileStorage {
56 : /// Initialize storage by loading state from disk.
57 2 : pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
58 2 : let timeline_dir = get_timeline_dir(conf, ttid);
59 2 : let state = Self::load_control_file_from_dir(&timeline_dir)?;
60 :
61 1 : Ok(FileStorage {
62 1 : timeline_dir,
63 1 : no_sync: conf.no_sync,
64 1 : state,
65 1 : last_persist_at: Instant::now(),
66 1 : })
67 2 : }
68 :
69 : /// Create and reliably persist new control file at given location.
70 : ///
71 : /// Note: we normally call this in temp directory for atomic init, so
72 : /// interested in FileStorage as a result only in tests.
73 2 : pub async fn create_new(
74 2 : dir: Utf8PathBuf,
75 2 : conf: &SafeKeeperConf,
76 2 : state: TimelinePersistentState,
77 2 : ) -> Result<FileStorage> {
78 2 : // we don't support creating new timelines in offloaded state
79 2 : assert!(matches!(state.eviction_state, EvictionState::Present));
80 :
81 2 : let mut store = FileStorage {
82 2 : timeline_dir: dir,
83 2 : no_sync: conf.no_sync,
84 2 : state: state.clone(),
85 2 : last_persist_at: Instant::now(),
86 2 : };
87 18 : store.persist(&state).await?;
88 2 : Ok(store)
89 2 : }
90 :
91 : /// Check the magic/version in the on-disk data and deserialize it, if possible.
92 2 : fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
93 : // Read the version independent part
94 2 : let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
95 2 : if magic != SK_MAGIC {
96 0 : bail!(
97 0 : "bad control file magic: {:X}, expected {:X}",
98 0 : magic,
99 0 : SK_MAGIC
100 0 : );
101 2 : }
102 2 : let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
103 2 : if version == SK_FORMAT_VERSION {
104 0 : let res = TimelinePersistentState::des(buf)?;
105 0 : return Ok(res);
106 2 : }
107 2 : // try to upgrade
108 2 : upgrade_control_file(buf, version)
109 2 : }
110 :
111 : /// Load control file from given directory.
112 3 : fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
113 3 : let path = timeline_dir.join(CONTROL_FILE_NAME);
114 3 : Self::load_control_file(path)
115 3 : }
116 :
117 : /// Read in the control file.
118 3 : pub fn load_control_file<P: AsRef<Path>>(
119 3 : control_file_path: P,
120 3 : ) -> Result<TimelinePersistentState> {
121 3 : let mut control_file = std::fs::OpenOptions::new()
122 3 : .read(true)
123 3 : .write(true)
124 3 : .open(&control_file_path)
125 3 : .with_context(|| {
126 0 : format!(
127 0 : "failed to open control file at {}",
128 0 : control_file_path.as_ref().display(),
129 0 : )
130 3 : })?;
131 :
132 3 : let mut buf = Vec::new();
133 3 : control_file
134 3 : .read_to_end(&mut buf)
135 3 : .context("failed to read control file")?;
136 :
137 3 : let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
138 :
139 3 : let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
140 3 : buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
141 3 : let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
142 3 :
143 3 : ensure!(
144 3 : calculated_checksum == expected_checksum,
145 1 : format!(
146 1 : "safekeeper control file checksum mismatch: expected {} got {}",
147 1 : expected_checksum, calculated_checksum
148 1 : )
149 : );
150 :
151 2 : let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
152 2 : .with_context(|| {
153 0 : format!(
154 0 : "while reading control file {}",
155 0 : control_file_path.as_ref().display(),
156 0 : )
157 2 : })?;
158 2 : Ok(state)
159 3 : }
160 : }
161 :
162 : impl Deref for FileStorage {
163 : type Target = TimelinePersistentState;
164 :
165 0 : fn deref(&self) -> &Self::Target {
166 0 : &self.state
167 0 : }
168 : }
169 :
170 : impl TimelinePersistentState {
171 4 : pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
172 4 : let mut buf: Vec<u8> = Vec::new();
173 4 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
174 :
175 4 : if self.eviction_state == EvictionState::Present {
176 4 : // temp hack for forward compatibility
177 4 : const PREV_FORMAT_VERSION: u32 = 8;
178 4 : let prev = downgrade_v9_to_v8(self);
179 4 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
180 4 : prev.ser_into(&mut buf)?;
181 : } else {
182 : // otherwise, we write the current format version
183 0 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
184 0 : self.ser_into(&mut buf)?;
185 : }
186 :
187 : // calculate checksum before resize
188 4 : let checksum = crc32c::crc32c(&buf);
189 4 : buf.extend_from_slice(&checksum.to_le_bytes());
190 4 : Ok(buf)
191 4 : }
192 : }
193 :
194 : impl Storage for FileStorage {
195 : /// Persists state durably to the underlying storage.
196 4 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
197 4 : let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
198 4 :
199 4 : // write data to safekeeper.control.partial
200 4 : let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
201 4 : let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
202 0 : format!(
203 0 : "failed to create partial control file at: {}",
204 0 : &control_partial_path
205 0 : )
206 4 : })?;
207 :
208 4 : let buf: Vec<u8> = s.write_to_buf()?;
209 :
210 4 : control_partial.write_all(&buf).await.with_context(|| {
211 0 : format!(
212 0 : "failed to write safekeeper state into control file at: {}",
213 0 : control_partial_path
214 0 : )
215 4 : })?;
216 4 : control_partial.flush().await.with_context(|| {
217 0 : format!(
218 0 : "failed to flush safekeeper state into control file at: {}",
219 0 : control_partial_path
220 0 : )
221 4 : })?;
222 :
223 4 : let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
224 28 : durable_rename(&control_partial_path, &control_path, !self.no_sync).await?;
225 :
226 : // update internal state
227 4 : self.state = s.clone();
228 4 : Ok(())
229 4 : }
230 :
231 0 : fn last_persist_at(&self) -> Instant {
232 0 : self.last_persist_at
233 0 : }
234 : }
235 :
236 : #[cfg(test)]
237 : mod test {
238 : use super::*;
239 : use tokio::fs;
240 : use utils::lsn::Lsn;
241 :
242 2 : fn stub_conf() -> SafeKeeperConf {
243 2 : let workdir = camino_tempfile::tempdir().unwrap().into_path();
244 2 : SafeKeeperConf {
245 2 : workdir,
246 2 : ..SafeKeeperConf::dummy()
247 2 : }
248 2 : }
249 :
250 2 : async fn load_from_control_file(
251 2 : conf: &SafeKeeperConf,
252 2 : ttid: &TenantTimelineId,
253 2 : ) -> Result<(FileStorage, TimelinePersistentState)> {
254 2 : let timeline_dir = get_timeline_dir(conf, ttid);
255 2 : fs::create_dir_all(&timeline_dir)
256 2 : .await
257 2 : .expect("failed to create timeline dir");
258 2 : Ok((
259 2 : FileStorage::restore_new(ttid, conf)?,
260 1 : FileStorage::load_control_file_from_dir(&timeline_dir)?,
261 : ))
262 2 : }
263 :
264 2 : async fn create(
265 2 : conf: &SafeKeeperConf,
266 2 : ttid: &TenantTimelineId,
267 2 : ) -> Result<(FileStorage, TimelinePersistentState)> {
268 2 : let timeline_dir = get_timeline_dir(conf, ttid);
269 2 : fs::create_dir_all(&timeline_dir)
270 2 : .await
271 2 : .expect("failed to create timeline dir");
272 2 : let state = TimelinePersistentState::empty();
273 18 : let storage = FileStorage::create_new(timeline_dir, conf, state.clone()).await?;
274 2 : Ok((storage, state))
275 2 : }
276 :
277 : #[tokio::test]
278 1 : async fn test_read_write_safekeeper_state() {
279 1 : let conf = stub_conf();
280 1 : let ttid = TenantTimelineId::generate();
281 1 : {
282 1 : let (mut storage, mut state) =
283 10 : create(&conf, &ttid).await.expect("failed to create state");
284 1 : // change something
285 1 : state.commit_lsn = Lsn(42);
286 1 : storage
287 1 : .persist(&state)
288 9 : .await
289 1 : .expect("failed to persist state");
290 1 : }
291 1 :
292 1 : let (_, state) = load_from_control_file(&conf, &ttid)
293 1 : .await
294 1 : .expect("failed to read state");
295 1 : assert_eq!(state.commit_lsn, Lsn(42));
296 1 : }
297 :
298 : #[tokio::test]
299 1 : async fn test_safekeeper_state_checksum_mismatch() {
300 1 : let conf = stub_conf();
301 1 : let ttid = TenantTimelineId::generate();
302 1 : {
303 1 : let (mut storage, mut state) =
304 10 : create(&conf, &ttid).await.expect("failed to read state");
305 1 :
306 1 : // change something
307 1 : state.commit_lsn = Lsn(42);
308 1 : storage
309 1 : .persist(&state)
310 9 : .await
311 1 : .expect("failed to persist state");
312 1 : }
313 1 : let control_path = get_timeline_dir(&conf, &ttid).join(CONTROL_FILE_NAME);
314 1 : let mut data = fs::read(&control_path).await.unwrap();
315 1 : data[0] += 1; // change the first byte of the file to fail checksum validation
316 1 : fs::write(&control_path, &data)
317 1 : .await
318 1 : .expect("failed to write control file");
319 1 :
320 1 : match load_from_control_file(&conf, &ttid).await {
321 1 : Err(err) => assert!(err
322 1 : .to_string()
323 1 : .contains("safekeeper control file checksum mismatch")),
324 1 : Ok(_) => panic!("expected error"),
325 1 : }
326 1 : }
327 : }
|