TLA Line data Source code
1 : //! Control file serialization, deserialization and persistence.
2 :
3 : use anyhow::{bail, ensure, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5 : use camino::Utf8PathBuf;
6 : use tokio::fs::{self, File};
7 : use tokio::io::AsyncWriteExt;
8 :
9 : use std::io::Read;
10 : use std::ops::Deref;
11 : use std::path::Path;
12 : use std::time::Instant;
13 :
14 : use crate::control_file_upgrade::upgrade_control_file;
15 : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
16 : use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC};
17 : use utils::{bin_ser::LeSer, id::TenantTimelineId};
18 :
19 : use crate::SafeKeeperConf;
20 :
21 : use std::convert::TryInto;
22 :
23 : // contains persistent metadata for safekeeper
24 : const CONTROL_FILE_NAME: &str = "safekeeper.control";
25 : // needed to atomically update the state using `rename`
26 : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
27 : pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
28 :
29 : /// Storage should keep actual state inside of it. It should implement Deref
30 : /// trait to access state fields and have persist method for updating that state.
31 : #[async_trait::async_trait]
32 : pub trait Storage: Deref<Target = SafeKeeperState> {
33 : /// Persist safekeeper state on disk and update internal state.
34 : async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
35 :
36 : /// Timestamp of last persist.
37 : fn last_persist_at(&self) -> Instant;
38 : }
39 :
40 UBC 0 : #[derive(Debug)]
41 : pub struct FileStorage {
42 : // save timeline dir to avoid reconstructing it every time
43 : timeline_dir: Utf8PathBuf,
44 : conf: SafeKeeperConf,
45 :
46 : /// Last state persisted to disk.
47 : state: SafeKeeperState,
48 : /// Not preserved across restarts.
49 : last_persist_at: Instant,
50 : }
51 :
52 : impl FileStorage {
53 : /// Initialize storage by loading state from disk.
54 CBC 86 : pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
55 86 : let timeline_dir = conf.timeline_dir(ttid);
56 :
57 86 : let state = Self::load_control_file_conf(conf, ttid)?;
58 :
59 85 : Ok(FileStorage {
60 85 : timeline_dir,
61 85 : conf: conf.clone(),
62 85 : state,
63 85 : last_persist_at: Instant::now(),
64 85 : })
65 86 : }
66 :
67 : /// Create file storage for a new timeline, but don't persist it yet.
68 483 : pub fn create_new(
69 483 : ttid: &TenantTimelineId,
70 483 : conf: &SafeKeeperConf,
71 483 : state: SafeKeeperState,
72 483 : ) -> Result<FileStorage> {
73 483 : let timeline_dir = conf.timeline_dir(ttid);
74 483 :
75 483 : let store = FileStorage {
76 483 : timeline_dir,
77 483 : conf: conf.clone(),
78 483 : state,
79 483 : last_persist_at: Instant::now(),
80 483 : };
81 483 :
82 483 : Ok(store)
83 483 : }
84 :
85 : /// Check the magic/version in the on-disk data and deserialize it, if possible.
86 87 : fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
87 : // Read the version independent part
88 87 : let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
89 87 : if magic != SK_MAGIC {
90 UBC 0 : bail!(
91 0 : "bad control file magic: {:X}, expected {:X}",
92 0 : magic,
93 0 : SK_MAGIC
94 0 : );
95 CBC 87 : }
96 87 : let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
97 87 : if version == SK_FORMAT_VERSION {
98 87 : let res = SafeKeeperState::des(buf)?;
99 87 : return Ok(res);
100 UBC 0 : }
101 0 : // try to upgrade
102 0 : upgrade_control_file(buf, version)
103 CBC 87 : }
104 :
105 : /// Load control file for given ttid at path specified by conf.
106 87 : pub fn load_control_file_conf(
107 87 : conf: &SafeKeeperConf,
108 87 : ttid: &TenantTimelineId,
109 87 : ) -> Result<SafeKeeperState> {
110 87 : let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME);
111 87 : Self::load_control_file(path)
112 87 : }
113 :
114 : /// Read in the control file.
115 88 : pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
116 88 : let mut control_file = std::fs::OpenOptions::new()
117 88 : .read(true)
118 88 : .write(true)
119 88 : .open(&control_file_path)
120 88 : .with_context(|| {
121 UBC 0 : format!(
122 0 : "failed to open control file at {}",
123 0 : control_file_path.as_ref().display(),
124 0 : )
125 CBC 88 : })?;
126 :
127 88 : let mut buf = Vec::new();
128 88 : control_file
129 88 : .read_to_end(&mut buf)
130 88 : .context("failed to read control file")?;
131 :
132 88 : let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
133 :
134 88 : let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
135 88 : buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
136 88 : let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
137 88 :
138 88 : ensure!(
139 88 : calculated_checksum == expected_checksum,
140 1 : format!(
141 1 : "safekeeper control file checksum mismatch: expected {} got {}",
142 1 : expected_checksum, calculated_checksum
143 1 : )
144 : );
145 :
146 87 : let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
147 87 : .with_context(|| {
148 UBC 0 : format!(
149 0 : "while reading control file {}",
150 0 : control_file_path.as_ref().display(),
151 0 : )
152 CBC 87 : })?;
153 87 : Ok(state)
154 88 : }
155 : }
156 :
157 : impl Deref for FileStorage {
158 : type Target = SafeKeeperState;
159 :
160 32917201 : fn deref(&self) -> &Self::Target {
161 32917201 : &self.state
162 32917201 : }
163 : }
164 :
165 : #[async_trait::async_trait]
166 : impl Storage for FileStorage {
167 : /// Persists state durably to the underlying storage.
168 : ///
169 : /// For a description, see <https://lwn.net/Articles/457667/>.
170 5418 : async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
171 5418 : let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
172 5418 :
173 5418 : // write data to safekeeper.control.partial
174 5418 : let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
175 5463 : let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
176 UBC 0 : format!(
177 0 : "failed to create partial control file at: {}",
178 0 : &control_partial_path
179 0 : )
180 CBC 5418 : })?;
181 5418 : let mut buf: Vec<u8> = Vec::new();
182 5418 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
183 5418 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
184 5418 : s.ser_into(&mut buf)?;
185 :
186 : // calculate checksum before resize
187 5418 : let checksum = crc32c::crc32c(&buf);
188 5418 : buf.extend_from_slice(&checksum.to_le_bytes());
189 5418 :
190 5418 : control_partial.write_all(&buf).await.with_context(|| {
191 UBC 0 : format!(
192 0 : "failed to write safekeeper state into control file at: {}",
193 0 : control_partial_path
194 0 : )
195 CBC 5418 : })?;
196 5418 : control_partial.flush().await.with_context(|| {
197 UBC 0 : format!(
198 0 : "failed to flush safekeeper state into control file at: {}",
199 0 : control_partial_path
200 0 : )
201 CBC 5418 : })?;
202 :
203 : // fsync the file
204 5418 : if !self.conf.no_sync {
205 2 : control_partial.sync_all().await.with_context(|| {
206 UBC 0 : format!(
207 0 : "failed to sync partial control file at {}",
208 0 : control_partial_path
209 0 : )
210 CBC 2 : })?;
211 5416 : }
212 :
213 5418 : let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
214 5418 :
215 5418 : // rename should be atomic
216 5418 : fs::rename(&control_partial_path, &control_path).await?;
217 : // this sync is not required by any standard but postgres does this (see durable_rename)
218 5418 : if !self.conf.no_sync {
219 2 : let new_f = File::open(&control_path).await?;
220 2 : new_f
221 2 : .sync_all()
222 2 : .await
223 2 : .with_context(|| format!("failed to sync control file at: {}", &control_path))?;
224 :
225 : // fsync the directory (linux specific)
226 2 : let tli_dir = File::open(&self.timeline_dir).await?;
227 2 : tli_dir
228 2 : .sync_all()
229 2 : .await
230 2 : .context("failed to sync control file directory")?;
231 5416 : }
232 :
233 : // update internal state
234 5418 : self.state = s.clone();
235 5418 : Ok(())
236 10836 : }
237 :
238 1363 : fn last_persist_at(&self) -> Instant {
239 1363 : self.last_persist_at
240 1363 : }
241 : }
242 :
243 : #[cfg(test)]
244 : mod test {
245 : use super::FileStorage;
246 : use super::*;
247 : use crate::{safekeeper::SafeKeeperState, SafeKeeperConf};
248 : use anyhow::Result;
249 : use utils::{id::TenantTimelineId, lsn::Lsn};
250 :
251 2 : fn stub_conf() -> SafeKeeperConf {
252 2 : let workdir = camino_tempfile::tempdir().unwrap().into_path();
253 2 : SafeKeeperConf {
254 2 : workdir,
255 2 : ..SafeKeeperConf::dummy()
256 2 : }
257 2 : }
258 :
259 2 : async fn load_from_control_file(
260 2 : conf: &SafeKeeperConf,
261 2 : ttid: &TenantTimelineId,
262 2 : ) -> Result<(FileStorage, SafeKeeperState)> {
263 2 : fs::create_dir_all(conf.timeline_dir(ttid))
264 2 : .await
265 2 : .expect("failed to create timeline dir");
266 2 : Ok((
267 2 : FileStorage::restore_new(ttid, conf)?,
268 1 : FileStorage::load_control_file_conf(conf, ttid)?,
269 : ))
270 2 : }
271 :
272 2 : async fn create(
273 2 : conf: &SafeKeeperConf,
274 2 : ttid: &TenantTimelineId,
275 2 : ) -> Result<(FileStorage, SafeKeeperState)> {
276 2 : fs::create_dir_all(conf.timeline_dir(ttid))
277 2 : .await
278 2 : .expect("failed to create timeline dir");
279 2 : let state = SafeKeeperState::empty();
280 2 : let storage = FileStorage::create_new(ttid, conf, state.clone())?;
281 2 : Ok((storage, state))
282 2 : }
283 :
284 1 : #[tokio::test]
285 1 : async fn test_read_write_safekeeper_state() {
286 1 : let conf = stub_conf();
287 1 : let ttid = TenantTimelineId::generate();
288 : {
289 1 : let (mut storage, mut state) =
290 1 : create(&conf, &ttid).await.expect("failed to create state");
291 1 : // change something
292 1 : state.commit_lsn = Lsn(42);
293 1 : storage
294 1 : .persist(&state)
295 8 : .await
296 1 : .expect("failed to persist state");
297 : }
298 :
299 1 : let (_, state) = load_from_control_file(&conf, &ttid)
300 1 : .await
301 1 : .expect("failed to read state");
302 1 : assert_eq!(state.commit_lsn, Lsn(42));
303 : }
304 :
305 1 : #[tokio::test]
306 1 : async fn test_safekeeper_state_checksum_mismatch() {
307 1 : let conf = stub_conf();
308 1 : let ttid = TenantTimelineId::generate();
309 : {
310 1 : let (mut storage, mut state) =
311 1 : create(&conf, &ttid).await.expect("failed to read state");
312 1 :
313 1 : // change something
314 1 : state.commit_lsn = Lsn(42);
315 1 : storage
316 1 : .persist(&state)
317 8 : .await
318 1 : .expect("failed to persist state");
319 1 : }
320 1 : let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
321 1 : let mut data = fs::read(&control_path).await.unwrap();
322 1 : data[0] += 1; // change the first byte of the file to fail checksum validation
323 1 : fs::write(&control_path, &data)
324 1 : .await
325 1 : .expect("failed to write control file");
326 1 :
327 1 : match load_from_control_file(&conf, &ttid).await {
328 1 : Err(err) => assert!(err
329 1 : .to_string()
330 1 : .contains("safekeeper control file checksum mismatch")),
331 UBC 0 : Ok(_) => panic!("expected error"),
332 : }
333 : }
334 : }
|