TLA Line data Source code
1 : //! Control file serialization, deserialization and persistence.
2 :
3 : use anyhow::{bail, ensure, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5 : use camino::Utf8PathBuf;
6 : use tokio::fs::{self, File};
7 : use tokio::io::AsyncWriteExt;
8 :
9 : use std::io::Read;
10 : use std::ops::Deref;
11 : use std::path::Path;
12 : use std::time::Instant;
13 :
14 : use crate::control_file_upgrade::upgrade_control_file;
15 : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
16 : use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC};
17 : use utils::{bin_ser::LeSer, id::TenantTimelineId};
18 :
19 : use crate::SafeKeeperConf;
20 :
21 : use std::convert::TryInto;
22 :
23 : // contains persistent metadata for safekeeper
24 : const CONTROL_FILE_NAME: &str = "safekeeper.control";
25 : // needed to atomically update the state using `rename`
26 : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
27 : pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
28 :
29 : /// Storage should keep actual state inside of it. It should implement Deref
30 : /// trait to access state fields and have persist method for updating that state.
31 : #[async_trait::async_trait]
32 : pub trait Storage: Deref<Target = SafeKeeperState> {
33 : /// Persist safekeeper state on disk and update internal state.
34 : async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
35 :
36 : /// Timestamp of last persist.
37 : fn last_persist_at(&self) -> Instant;
38 : }
39 :
40 UBC 0 : #[derive(Debug)]
41 : pub struct FileStorage {
42 : // save timeline dir to avoid reconstructing it every time
43 : timeline_dir: Utf8PathBuf,
44 : conf: SafeKeeperConf,
45 :
46 : /// Last state persisted to disk.
47 : state: SafeKeeperState,
48 : /// Not preserved across restarts.
49 : last_persist_at: Instant,
50 : }
51 :
52 : impl FileStorage {
53 : /// Initialize storage by loading state from disk.
54 CBC 135 : pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
55 135 : let timeline_dir = conf.timeline_dir(ttid);
56 :
57 135 : let state = Self::load_control_file_conf(conf, ttid)?;
58 :
59 134 : Ok(FileStorage {
60 134 : timeline_dir,
61 134 : conf: conf.clone(),
62 134 : state,
63 134 : last_persist_at: Instant::now(),
64 134 : })
65 135 : }
66 :
67 : /// Create file storage for a new timeline, but don't persist it yet.
68 499 : pub fn create_new(
69 499 : timeline_dir: Utf8PathBuf,
70 499 : conf: &SafeKeeperConf,
71 499 : state: SafeKeeperState,
72 499 : ) -> Result<FileStorage> {
73 499 : let store = FileStorage {
74 499 : timeline_dir,
75 499 : conf: conf.clone(),
76 499 : state,
77 499 : last_persist_at: Instant::now(),
78 499 : };
79 499 :
80 499 : Ok(store)
81 499 : }
82 :
83 : /// Check the magic/version in the on-disk data and deserialize it, if possible.
84 184 : fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
85 : // Read the version independent part
86 184 : let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
87 184 : if magic != SK_MAGIC {
88 UBC 0 : bail!(
89 0 : "bad control file magic: {:X}, expected {:X}",
90 0 : magic,
91 0 : SK_MAGIC
92 0 : );
93 CBC 184 : }
94 184 : let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
95 184 : if version == SK_FORMAT_VERSION {
96 184 : let res = SafeKeeperState::des(buf)?;
97 184 : return Ok(res);
98 UBC 0 : }
99 0 : // try to upgrade
100 0 : upgrade_control_file(buf, version)
101 CBC 184 : }
102 :
103 : /// Load control file for given ttid at path specified by conf.
104 136 : pub fn load_control_file_conf(
105 136 : conf: &SafeKeeperConf,
106 136 : ttid: &TenantTimelineId,
107 136 : ) -> Result<SafeKeeperState> {
108 136 : let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME);
109 136 : Self::load_control_file(path)
110 136 : }
111 :
112 : /// Read in the control file.
113 185 : pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
114 185 : let mut control_file = std::fs::OpenOptions::new()
115 185 : .read(true)
116 185 : .write(true)
117 185 : .open(&control_file_path)
118 185 : .with_context(|| {
119 UBC 0 : format!(
120 0 : "failed to open control file at {}",
121 0 : control_file_path.as_ref().display(),
122 0 : )
123 CBC 185 : })?;
124 :
125 185 : let mut buf = Vec::new();
126 185 : control_file
127 185 : .read_to_end(&mut buf)
128 185 : .context("failed to read control file")?;
129 :
130 185 : let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
131 :
132 185 : let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
133 185 : buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
134 185 : let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
135 185 :
136 185 : ensure!(
137 185 : calculated_checksum == expected_checksum,
138 1 : format!(
139 1 : "safekeeper control file checksum mismatch: expected {} got {}",
140 1 : expected_checksum, calculated_checksum
141 1 : )
142 : );
143 :
144 184 : let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
145 184 : .with_context(|| {
146 UBC 0 : format!(
147 0 : "while reading control file {}",
148 0 : control_file_path.as_ref().display(),
149 0 : )
150 CBC 184 : })?;
151 184 : Ok(state)
152 185 : }
153 : }
154 :
155 : impl Deref for FileStorage {
156 : type Target = SafeKeeperState;
157 :
158 22549672 : fn deref(&self) -> &Self::Target {
159 22549672 : &self.state
160 22549672 : }
161 : }
162 :
163 : #[async_trait::async_trait]
164 : impl Storage for FileStorage {
165 : /// Persists state durably to the underlying storage.
166 : ///
167 : /// For a description, see <https://lwn.net/Articles/457667/>.
168 5149 : async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
169 5149 : let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
170 5149 :
171 5149 : // write data to safekeeper.control.partial
172 5149 : let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
173 5149 : let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
174 UBC 0 : format!(
175 0 : "failed to create partial control file at: {}",
176 0 : &control_partial_path
177 0 : )
178 CBC 5149 : })?;
179 5149 : let mut buf: Vec<u8> = Vec::new();
180 5149 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
181 5149 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
182 5149 : s.ser_into(&mut buf)?;
183 :
184 : // calculate checksum before resize
185 5149 : let checksum = crc32c::crc32c(&buf);
186 5149 : buf.extend_from_slice(&checksum.to_le_bytes());
187 5149 :
188 5149 : control_partial.write_all(&buf).await.with_context(|| {
189 UBC 0 : format!(
190 0 : "failed to write safekeeper state into control file at: {}",
191 0 : control_partial_path
192 0 : )
193 CBC 5149 : })?;
194 5149 : control_partial.flush().await.with_context(|| {
195 UBC 0 : format!(
196 0 : "failed to flush safekeeper state into control file at: {}",
197 0 : control_partial_path
198 0 : )
199 CBC 5149 : })?;
200 :
201 : // fsync the file
202 5149 : if !self.conf.no_sync {
203 2 : control_partial.sync_all().await.with_context(|| {
204 UBC 0 : format!(
205 0 : "failed to sync partial control file at {}",
206 0 : control_partial_path
207 0 : )
208 CBC 2 : })?;
209 5147 : }
210 :
211 5149 : let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
212 5149 :
213 5149 : // rename should be atomic
214 5150 : fs::rename(&control_partial_path, &control_path).await?;
215 : // this sync is not required by any standard but postgres does this (see durable_rename)
216 5149 : if !self.conf.no_sync {
217 2 : let new_f = File::open(&control_path).await?;
218 2 : new_f
219 2 : .sync_all()
220 2 : .await
221 2 : .with_context(|| format!("failed to sync control file at: {}", &control_path))?;
222 :
223 : // fsync the directory (linux specific)
224 2 : let tli_dir = File::open(&self.timeline_dir).await?;
225 2 : tli_dir
226 2 : .sync_all()
227 2 : .await
228 2 : .context("failed to sync control file directory")?;
229 5147 : }
230 :
231 : // update internal state
232 5149 : self.state = s.clone();
233 5149 : Ok(())
234 10298 : }
235 :
236 1846 : fn last_persist_at(&self) -> Instant {
237 1846 : self.last_persist_at
238 1846 : }
239 : }
240 :
241 : #[cfg(test)]
242 : mod test {
243 : use super::FileStorage;
244 : use super::*;
245 : use crate::{safekeeper::SafeKeeperState, SafeKeeperConf};
246 : use anyhow::Result;
247 : use utils::{id::TenantTimelineId, lsn::Lsn};
248 :
249 2 : fn stub_conf() -> SafeKeeperConf {
250 2 : let workdir = camino_tempfile::tempdir().unwrap().into_path();
251 2 : SafeKeeperConf {
252 2 : workdir,
253 2 : ..SafeKeeperConf::dummy()
254 2 : }
255 2 : }
256 :
257 2 : async fn load_from_control_file(
258 2 : conf: &SafeKeeperConf,
259 2 : ttid: &TenantTimelineId,
260 2 : ) -> Result<(FileStorage, SafeKeeperState)> {
261 2 : fs::create_dir_all(conf.timeline_dir(ttid))
262 2 : .await
263 2 : .expect("failed to create timeline dir");
264 2 : Ok((
265 2 : FileStorage::restore_new(ttid, conf)?,
266 1 : FileStorage::load_control_file_conf(conf, ttid)?,
267 : ))
268 2 : }
269 :
270 2 : async fn create(
271 2 : conf: &SafeKeeperConf,
272 2 : ttid: &TenantTimelineId,
273 2 : ) -> Result<(FileStorage, SafeKeeperState)> {
274 2 : fs::create_dir_all(conf.timeline_dir(ttid))
275 2 : .await
276 2 : .expect("failed to create timeline dir");
277 2 : let state = SafeKeeperState::empty();
278 2 : let timeline_dir = conf.timeline_dir(ttid);
279 2 : let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?;
280 2 : Ok((storage, state))
281 2 : }
282 :
283 1 : #[tokio::test]
284 1 : async fn test_read_write_safekeeper_state() {
285 1 : let conf = stub_conf();
286 1 : let ttid = TenantTimelineId::generate();
287 : {
288 1 : let (mut storage, mut state) =
289 1 : create(&conf, &ttid).await.expect("failed to create state");
290 1 : // change something
291 1 : state.commit_lsn = Lsn(42);
292 1 : storage
293 1 : .persist(&state)
294 8 : .await
295 1 : .expect("failed to persist state");
296 : }
297 :
298 1 : let (_, state) = load_from_control_file(&conf, &ttid)
299 1 : .await
300 1 : .expect("failed to read state");
301 1 : assert_eq!(state.commit_lsn, Lsn(42));
302 : }
303 :
304 1 : #[tokio::test]
305 1 : async fn test_safekeeper_state_checksum_mismatch() {
306 1 : let conf = stub_conf();
307 1 : let ttid = TenantTimelineId::generate();
308 : {
309 1 : let (mut storage, mut state) =
310 1 : create(&conf, &ttid).await.expect("failed to read state");
311 1 :
312 1 : // change something
313 1 : state.commit_lsn = Lsn(42);
314 1 : storage
315 1 : .persist(&state)
316 8 : .await
317 1 : .expect("failed to persist state");
318 1 : }
319 1 : let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
320 1 : let mut data = fs::read(&control_path).await.unwrap();
321 1 : data[0] += 1; // change the first byte of the file to fail checksum validation
322 1 : fs::write(&control_path, &data)
323 1 : .await
324 1 : .expect("failed to write control file");
325 1 :
326 1 : match load_from_control_file(&conf, &ttid).await {
327 1 : Err(err) => assert!(err
328 1 : .to_string()
329 1 : .contains("safekeeper control file checksum mismatch")),
330 UBC 0 : Ok(_) => panic!("expected error"),
331 : }
332 : }
333 : }
|