Line data Source code
1 : //! Control file serialization, deserialization and persistence.
2 :
3 : use anyhow::{bail, ensure, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5 : use camino::Utf8PathBuf;
6 : use tokio::fs::File;
7 : use tokio::io::AsyncWriteExt;
8 : use utils::crashsafe::durable_rename;
9 :
10 : use std::io::Read;
11 : use std::ops::Deref;
12 : use std::path::Path;
13 : use std::time::Instant;
14 :
15 : use crate::control_file_upgrade::upgrade_control_file;
16 : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
17 : use crate::state::TimelinePersistentState;
18 : use utils::{bin_ser::LeSer, id::TenantTimelineId};
19 :
20 : use crate::SafeKeeperConf;
21 :
22 : use std::convert::TryInto;
23 :
24 : pub const SK_MAGIC: u32 = 0xcafeceefu32;
25 : pub const SK_FORMAT_VERSION: u32 = 7;
26 :
27 : // contains persistent metadata for safekeeper
28 : const CONTROL_FILE_NAME: &str = "safekeeper.control";
29 : // needed to atomically update the state using `rename`
30 : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
31 : pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
32 :
33 : /// Storage should keep actual state inside of it. It should implement Deref
34 : /// trait to access state fields and have persist method for updating that state.
35 : #[async_trait::async_trait]
36 : pub trait Storage: Deref<Target = TimelinePersistentState> {
37 : /// Persist safekeeper state on disk and update internal state.
38 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()>;
39 :
40 : /// Timestamp of last persist.
41 : fn last_persist_at(&self) -> Instant;
42 : }
43 :
44 0 : #[derive(Debug)]
45 : pub struct FileStorage {
46 : // save timeline dir to avoid reconstructing it every time
47 : timeline_dir: Utf8PathBuf,
48 : conf: SafeKeeperConf,
49 :
50 : /// Last state persisted to disk.
51 : state: TimelinePersistentState,
52 : /// Not preserved across restarts.
53 : last_persist_at: Instant,
54 : }
55 :
56 : impl FileStorage {
57 : /// Initialize storage by loading state from disk.
58 137 : pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
59 137 : let timeline_dir = conf.timeline_dir(ttid);
60 :
61 137 : let state = Self::load_control_file_conf(conf, ttid)?;
62 :
63 135 : Ok(FileStorage {
64 135 : timeline_dir,
65 135 : conf: conf.clone(),
66 135 : state,
67 135 : last_persist_at: Instant::now(),
68 135 : })
69 137 : }
70 :
71 : /// Create file storage for a new timeline, but don't persist it yet.
72 531 : pub fn create_new(
73 531 : timeline_dir: Utf8PathBuf,
74 531 : conf: &SafeKeeperConf,
75 531 : state: TimelinePersistentState,
76 531 : ) -> Result<FileStorage> {
77 531 : let store = FileStorage {
78 531 : timeline_dir,
79 531 : conf: conf.clone(),
80 531 : state,
81 531 : last_persist_at: Instant::now(),
82 531 : };
83 531 :
84 531 : Ok(store)
85 531 : }
86 :
87 : /// Check the magic/version in the on-disk data and deserialize it, if possible.
88 186 : fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
89 : // Read the version independent part
90 186 : let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
91 186 : if magic != SK_MAGIC {
92 0 : bail!(
93 0 : "bad control file magic: {:X}, expected {:X}",
94 0 : magic,
95 0 : SK_MAGIC
96 0 : );
97 186 : }
98 186 : let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
99 186 : if version == SK_FORMAT_VERSION {
100 186 : let res = TimelinePersistentState::des(buf)?;
101 186 : return Ok(res);
102 0 : }
103 0 : // try to upgrade
104 0 : upgrade_control_file(buf, version)
105 186 : }
106 :
107 : /// Load control file for given ttid at path specified by conf.
108 139 : pub fn load_control_file_conf(
109 139 : conf: &SafeKeeperConf,
110 139 : ttid: &TenantTimelineId,
111 139 : ) -> Result<TimelinePersistentState> {
112 139 : let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME);
113 139 : Self::load_control_file(path)
114 139 : }
115 :
116 : /// Read in the control file.
117 188 : pub fn load_control_file<P: AsRef<Path>>(
118 188 : control_file_path: P,
119 188 : ) -> Result<TimelinePersistentState> {
120 188 : let mut control_file = std::fs::OpenOptions::new()
121 188 : .read(true)
122 188 : .write(true)
123 188 : .open(&control_file_path)
124 188 : .with_context(|| {
125 0 : format!(
126 0 : "failed to open control file at {}",
127 0 : control_file_path.as_ref().display(),
128 0 : )
129 188 : })?;
130 :
131 188 : let mut buf = Vec::new();
132 188 : control_file
133 188 : .read_to_end(&mut buf)
134 188 : .context("failed to read control file")?;
135 :
136 188 : let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
137 :
138 188 : let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
139 188 : buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
140 188 : let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
141 188 :
142 188 : ensure!(
143 188 : calculated_checksum == expected_checksum,
144 2 : format!(
145 2 : "safekeeper control file checksum mismatch: expected {} got {}",
146 2 : expected_checksum, calculated_checksum
147 2 : )
148 : );
149 :
150 186 : let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
151 186 : .with_context(|| {
152 0 : format!(
153 0 : "while reading control file {}",
154 0 : control_file_path.as_ref().display(),
155 0 : )
156 186 : })?;
157 186 : Ok(state)
158 188 : }
159 : }
160 :
161 : impl Deref for FileStorage {
162 : type Target = TimelinePersistentState;
163 :
164 29927347 : fn deref(&self) -> &Self::Target {
165 29927347 : &self.state
166 29927347 : }
167 : }
168 :
169 : #[async_trait::async_trait]
170 : impl Storage for FileStorage {
171 : /// Persists state durably to the underlying storage.
172 : ///
173 : /// For a description, see <https://lwn.net/Articles/457667/>.
174 4997 : async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
175 4997 : let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
176 4997 :
177 4997 : // write data to safekeeper.control.partial
178 4997 : let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
179 4997 : let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
180 0 : format!(
181 0 : "failed to create partial control file at: {}",
182 0 : &control_partial_path
183 0 : )
184 4997 : })?;
185 4997 : let mut buf: Vec<u8> = Vec::new();
186 4997 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
187 4997 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
188 4997 : s.ser_into(&mut buf)?;
189 :
190 : // calculate checksum before resize
191 4997 : let checksum = crc32c::crc32c(&buf);
192 4997 : buf.extend_from_slice(&checksum.to_le_bytes());
193 4997 :
194 4997 : control_partial.write_all(&buf).await.with_context(|| {
195 0 : format!(
196 0 : "failed to write safekeeper state into control file at: {}",
197 0 : control_partial_path
198 0 : )
199 4997 : })?;
200 4997 : control_partial.flush().await.with_context(|| {
201 0 : format!(
202 0 : "failed to flush safekeeper state into control file at: {}",
203 0 : control_partial_path
204 0 : )
205 4997 : })?;
206 :
207 4997 : let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
208 5060 : durable_rename(&control_partial_path, &control_path, !self.conf.no_sync).await?;
209 :
210 : // update internal state
211 4997 : self.state = s.clone();
212 4997 : Ok(())
213 14991 : }
214 :
215 1737 : fn last_persist_at(&self) -> Instant {
216 1737 : self.last_persist_at
217 1737 : }
218 : }
219 :
220 : #[cfg(test)]
221 : mod test {
222 : use super::FileStorage;
223 : use super::*;
224 : use crate::SafeKeeperConf;
225 : use anyhow::Result;
226 : use tokio::fs;
227 : use utils::{id::TenantTimelineId, lsn::Lsn};
228 :
229 4 : fn stub_conf() -> SafeKeeperConf {
230 4 : let workdir = camino_tempfile::tempdir().unwrap().into_path();
231 4 : SafeKeeperConf {
232 4 : workdir,
233 4 : ..SafeKeeperConf::dummy()
234 4 : }
235 4 : }
236 :
237 4 : async fn load_from_control_file(
238 4 : conf: &SafeKeeperConf,
239 4 : ttid: &TenantTimelineId,
240 4 : ) -> Result<(FileStorage, TimelinePersistentState)> {
241 4 : fs::create_dir_all(conf.timeline_dir(ttid))
242 4 : .await
243 4 : .expect("failed to create timeline dir");
244 4 : Ok((
245 4 : FileStorage::restore_new(ttid, conf)?,
246 2 : FileStorage::load_control_file_conf(conf, ttid)?,
247 : ))
248 4 : }
249 :
250 4 : async fn create(
251 4 : conf: &SafeKeeperConf,
252 4 : ttid: &TenantTimelineId,
253 4 : ) -> Result<(FileStorage, TimelinePersistentState)> {
254 4 : fs::create_dir_all(conf.timeline_dir(ttid))
255 4 : .await
256 4 : .expect("failed to create timeline dir");
257 4 : let state = TimelinePersistentState::empty();
258 4 : let timeline_dir = conf.timeline_dir(ttid);
259 4 : let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?;
260 4 : Ok((storage, state))
261 4 : }
262 :
263 2 : #[tokio::test]
264 2 : async fn test_read_write_safekeeper_state() {
265 2 : let conf = stub_conf();
266 2 : let ttid = TenantTimelineId::generate();
267 2 : {
268 2 : let (mut storage, mut state) =
269 2 : create(&conf, &ttid).await.expect("failed to create state");
270 2 : // change something
271 2 : state.commit_lsn = Lsn(42);
272 2 : storage
273 2 : .persist(&state)
274 12 : .await
275 2 : .expect("failed to persist state");
276 2 : }
277 2 :
278 2 : let (_, state) = load_from_control_file(&conf, &ttid)
279 2 : .await
280 2 : .expect("failed to read state");
281 2 : assert_eq!(state.commit_lsn, Lsn(42));
282 2 : }
283 :
284 2 : #[tokio::test]
285 2 : async fn test_safekeeper_state_checksum_mismatch() {
286 2 : let conf = stub_conf();
287 2 : let ttid = TenantTimelineId::generate();
288 2 : {
289 2 : let (mut storage, mut state) =
290 2 : create(&conf, &ttid).await.expect("failed to read state");
291 2 :
292 2 : // change something
293 2 : state.commit_lsn = Lsn(42);
294 2 : storage
295 2 : .persist(&state)
296 18 : .await
297 2 : .expect("failed to persist state");
298 2 : }
299 2 : let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
300 2 : let mut data = fs::read(&control_path).await.unwrap();
301 2 : data[0] += 1; // change the first byte of the file to fail checksum validation
302 2 : fs::write(&control_path, &data)
303 2 : .await
304 2 : .expect("failed to write control file");
305 2 :
306 2 : match load_from_control_file(&conf, &ttid).await {
307 2 : Err(err) => assert!(err
308 2 : .to_string()
309 2 : .contains("safekeeper control file checksum mismatch")),
310 2 : Ok(_) => panic!("expected error"),
311 2 : }
312 2 : }
313 : }
|