Line data Source code
1 : //! Control file serialization, deserialization and persistence.
2 :
3 : use anyhow::{bail, ensure, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use tokio::fs::File;
7 : use tokio::io::AsyncWriteExt;
8 : use utils::crashsafe::durable_rename;
9 :
10 : use std::io::Read;
11 : use std::ops::Deref;
12 : use std::path::Path;
13 : use std::time::Instant;
14 :
15 : use crate::control_file_upgrade::downgrade_v9_to_v8;
16 : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
17 : use crate::state::{EvictionState, TimelinePersistentState};
18 : use crate::{control_file_upgrade::upgrade_control_file, timeline::get_timeline_dir};
19 : use utils::{bin_ser::LeSer, id::TenantTimelineId};
20 :
21 : use crate::SafeKeeperConf;
22 :
23 : pub const SK_MAGIC: u32 = 0xcafeceefu32;
24 : pub const SK_FORMAT_VERSION: u32 = 9;
25 :
26 : // contains persistent metadata for safekeeper
27 : pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
28 : // needed to atomically update the state using `rename`
29 : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
30 : pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
31 :
32 : /// Storage should keep actual state inside of it. It should implement Deref
33 : /// trait to access state fields and have persist method for updating that state.
34 : #[async_trait::async_trait]
35 : pub trait Storage: Deref<Target = TimelinePersistentState> {
36 : /// Persist safekeeper state on disk and update internal state.
37 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()>;
38 :
39 : /// Timestamp of last persist.
40 : fn last_persist_at(&self) -> Instant;
41 : }
42 :
43 : #[derive(Debug)]
44 : pub struct FileStorage {
45 : // save timeline dir to avoid reconstructing it every time
46 : timeline_dir: Utf8PathBuf,
47 : no_sync: bool,
48 :
49 : /// Last state persisted to disk.
50 : state: TimelinePersistentState,
51 : /// Not preserved across restarts.
52 : last_persist_at: Instant,
53 : }
54 :
55 : impl FileStorage {
56 : /// Initialize storage by loading state from disk.
57 4 : pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
58 4 : let timeline_dir = get_timeline_dir(conf, ttid);
59 4 : let state = Self::load_control_file_from_dir(&timeline_dir)?;
60 :
61 2 : Ok(FileStorage {
62 2 : timeline_dir,
63 2 : no_sync: conf.no_sync,
64 2 : state,
65 2 : last_persist_at: Instant::now(),
66 2 : })
67 4 : }
68 :
69 : /// Create file storage for a new timeline, but don't persist it yet.
70 4 : pub fn create_new(
71 4 : timeline_dir: Utf8PathBuf,
72 4 : conf: &SafeKeeperConf,
73 4 : state: TimelinePersistentState,
74 4 : ) -> Result<FileStorage> {
75 4 : let store = FileStorage {
76 4 : timeline_dir,
77 4 : no_sync: conf.no_sync,
78 4 : state,
79 4 : last_persist_at: Instant::now(),
80 4 : };
81 4 :
82 4 : Ok(store)
83 4 : }
84 :
85 : /// Check the magic/version in the on-disk data and deserialize it, if possible.
86 4 : fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
87 : // Read the version independent part
88 4 : let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
89 4 : if magic != SK_MAGIC {
90 0 : bail!(
91 0 : "bad control file magic: {:X}, expected {:X}",
92 0 : magic,
93 0 : SK_MAGIC
94 0 : );
95 4 : }
96 4 : let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
97 4 : if version == SK_FORMAT_VERSION {
98 0 : let res = TimelinePersistentState::des(buf)?;
99 0 : return Ok(res);
100 4 : }
101 4 : // try to upgrade
102 4 : upgrade_control_file(buf, version)
103 4 : }
104 :
105 : /// Load control file from given directory.
106 6 : pub fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
107 6 : let path = timeline_dir.join(CONTROL_FILE_NAME);
108 6 : Self::load_control_file(path)
109 6 : }
110 :
111 : /// Read in the control file.
112 6 : pub fn load_control_file<P: AsRef<Path>>(
113 6 : control_file_path: P,
114 6 : ) -> Result<TimelinePersistentState> {
115 6 : let mut control_file = std::fs::OpenOptions::new()
116 6 : .read(true)
117 6 : .write(true)
118 6 : .open(&control_file_path)
119 6 : .with_context(|| {
120 0 : format!(
121 0 : "failed to open control file at {}",
122 0 : control_file_path.as_ref().display(),
123 0 : )
124 6 : })?;
125 :
126 6 : let mut buf = Vec::new();
127 6 : control_file
128 6 : .read_to_end(&mut buf)
129 6 : .context("failed to read control file")?;
130 :
131 6 : let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
132 :
133 6 : let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
134 6 : buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
135 6 : let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
136 6 :
137 6 : ensure!(
138 6 : calculated_checksum == expected_checksum,
139 2 : format!(
140 2 : "safekeeper control file checksum mismatch: expected {} got {}",
141 2 : expected_checksum, calculated_checksum
142 2 : )
143 : );
144 :
145 4 : let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
146 4 : .with_context(|| {
147 0 : format!(
148 0 : "while reading control file {}",
149 0 : control_file_path.as_ref().display(),
150 0 : )
151 4 : })?;
152 4 : Ok(state)
153 6 : }
154 : }
155 :
156 : impl Deref for FileStorage {
157 : type Target = TimelinePersistentState;
158 :
159 0 : fn deref(&self) -> &Self::Target {
160 0 : &self.state
161 0 : }
162 : }
163 :
164 : #[async_trait::async_trait]
165 : impl Storage for FileStorage {
166 : /// Persists state durably to the underlying storage.
167 : ///
168 : /// For a description, see <https://lwn.net/Articles/457667/>.
169 4 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
170 4 : let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
171 4 :
172 4 : // write data to safekeeper.control.partial
173 4 : let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
174 4 : let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
175 0 : format!(
176 0 : "failed to create partial control file at: {}",
177 0 : &control_partial_path
178 0 : )
179 4 : })?;
180 4 : let mut buf: Vec<u8> = Vec::new();
181 4 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
182 4 :
183 4 : if s.eviction_state == EvictionState::Present {
184 4 : // temp hack for forward compatibility
185 4 : const PREV_FORMAT_VERSION: u32 = 8;
186 4 : let prev = downgrade_v9_to_v8(s);
187 4 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
188 4 : prev.ser_into(&mut buf)?;
189 4 : } else {
190 4 : // otherwise, we write the current format version
191 4 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
192 4 : s.ser_into(&mut buf)?;
193 4 : }
194 4 :
195 4 : // calculate checksum before resize
196 4 : let checksum = crc32c::crc32c(&buf);
197 4 : buf.extend_from_slice(&checksum.to_le_bytes());
198 4 :
199 4 : control_partial.write_all(&buf).await.with_context(|| {
200 0 : format!(
201 0 : "failed to write safekeeper state into control file at: {}",
202 0 : control_partial_path
203 0 : )
204 4 : })?;
205 4 : control_partial.flush().await.with_context(|| {
206 0 : format!(
207 0 : "failed to flush safekeeper state into control file at: {}",
208 0 : control_partial_path
209 0 : )
210 4 : })?;
211 4 :
212 4 : let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
213 28 : durable_rename(&control_partial_path, &control_path, !self.no_sync).await?;
214 4 :
215 4 : // update internal state
216 4 : self.state = s.clone();
217 4 : Ok(())
218 4 : }
219 :
220 0 : fn last_persist_at(&self) -> Instant {
221 0 : self.last_persist_at
222 0 : }
223 : }
224 :
225 : #[cfg(test)]
226 : mod test {
227 : use super::*;
228 : use tokio::fs;
229 : use utils::lsn::Lsn;
230 :
231 4 : fn stub_conf() -> SafeKeeperConf {
232 4 : let workdir = camino_tempfile::tempdir().unwrap().into_path();
233 4 : SafeKeeperConf {
234 4 : workdir,
235 4 : ..SafeKeeperConf::dummy()
236 4 : }
237 4 : }
238 :
239 4 : async fn load_from_control_file(
240 4 : conf: &SafeKeeperConf,
241 4 : ttid: &TenantTimelineId,
242 4 : ) -> Result<(FileStorage, TimelinePersistentState)> {
243 4 : let timeline_dir = get_timeline_dir(conf, ttid);
244 4 : fs::create_dir_all(&timeline_dir)
245 4 : .await
246 4 : .expect("failed to create timeline dir");
247 4 : Ok((
248 4 : FileStorage::restore_new(ttid, conf)?,
249 2 : FileStorage::load_control_file_from_dir(&timeline_dir)?,
250 : ))
251 4 : }
252 :
253 4 : async fn create(
254 4 : conf: &SafeKeeperConf,
255 4 : ttid: &TenantTimelineId,
256 4 : ) -> Result<(FileStorage, TimelinePersistentState)> {
257 4 : let timeline_dir = get_timeline_dir(conf, ttid);
258 4 : fs::create_dir_all(&timeline_dir)
259 4 : .await
260 4 : .expect("failed to create timeline dir");
261 4 : let state = TimelinePersistentState::empty();
262 4 : let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?;
263 4 : Ok((storage, state))
264 4 : }
265 :
266 : #[tokio::test]
267 2 : async fn test_read_write_safekeeper_state() {
268 2 : let conf = stub_conf();
269 2 : let ttid = TenantTimelineId::generate();
270 2 : {
271 2 : let (mut storage, mut state) =
272 2 : create(&conf, &ttid).await.expect("failed to create state");
273 2 : // change something
274 2 : state.commit_lsn = Lsn(42);
275 2 : storage
276 2 : .persist(&state)
277 18 : .await
278 2 : .expect("failed to persist state");
279 2 : }
280 2 :
281 2 : let (_, state) = load_from_control_file(&conf, &ttid)
282 2 : .await
283 2 : .expect("failed to read state");
284 2 : assert_eq!(state.commit_lsn, Lsn(42));
285 2 : }
286 :
287 : #[tokio::test]
288 2 : async fn test_safekeeper_state_checksum_mismatch() {
289 2 : let conf = stub_conf();
290 2 : let ttid = TenantTimelineId::generate();
291 2 : {
292 2 : let (mut storage, mut state) =
293 2 : create(&conf, &ttid).await.expect("failed to read state");
294 2 :
295 2 : // change something
296 2 : state.commit_lsn = Lsn(42);
297 2 : storage
298 2 : .persist(&state)
299 18 : .await
300 2 : .expect("failed to persist state");
301 2 : }
302 2 : let control_path = get_timeline_dir(&conf, &ttid).join(CONTROL_FILE_NAME);
303 2 : let mut data = fs::read(&control_path).await.unwrap();
304 2 : data[0] += 1; // change the first byte of the file to fail checksum validation
305 2 : fs::write(&control_path, &data)
306 2 : .await
307 2 : .expect("failed to write control file");
308 2 :
309 2 : match load_from_control_file(&conf, &ttid).await {
310 2 : Err(err) => assert!(err
311 2 : .to_string()
312 2 : .contains("safekeeper control file checksum mismatch")),
313 2 : Ok(_) => panic!("expected error"),
314 2 : }
315 2 : }
316 : }
|