Line data Source code
1 : //! Control file serialization, deserialization and persistence.
2 :
3 : use anyhow::{bail, ensure, Context, Result};
4 : use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
5 : use tokio::fs::{self, File};
6 : use tokio::io::AsyncWriteExt;
7 :
8 : use std::io::Read;
9 : use std::ops::Deref;
10 : use std::path::{Path, PathBuf};
11 : use std::time::Instant;
12 :
13 : use crate::control_file_upgrade::upgrade_control_file;
14 : use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
15 : use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC};
16 : use utils::{bin_ser::LeSer, id::TenantTimelineId};
17 :
18 : use crate::SafeKeeperConf;
19 :
20 : use std::convert::TryInto;
21 :
22 : // contains persistent metadata for safekeeper
23 : const CONTROL_FILE_NAME: &str = "safekeeper.control";
24 : // needed to atomically update the state using `rename`
25 : const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
26 : pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
27 :
28 : /// Storage should keep actual state inside of it. It should implement Deref
29 : /// trait to access state fields and have persist method for updating that state.
30 : #[async_trait::async_trait]
31 : pub trait Storage: Deref<Target = SafeKeeperState> {
32 : /// Persist safekeeper state on disk and update internal state.
33 : async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
34 :
35 : /// Timestamp of last persist.
36 : fn last_persist_at(&self) -> Instant;
37 : }
38 :
39 0 : #[derive(Debug)]
40 : pub struct FileStorage {
41 : // save timeline dir to avoid reconstructing it every time
42 : timeline_dir: PathBuf,
43 : conf: SafeKeeperConf,
44 :
45 : /// Last state persisted to disk.
46 : state: SafeKeeperState,
47 : /// Not preserved across restarts.
48 : last_persist_at: Instant,
49 : }
50 :
51 : impl FileStorage {
52 : /// Initialize storage by loading state from disk.
53 83 : pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
54 83 : let timeline_dir = conf.timeline_dir(ttid);
55 :
56 83 : let state = Self::load_control_file_conf(conf, ttid)?;
57 :
58 82 : Ok(FileStorage {
59 82 : timeline_dir,
60 82 : conf: conf.clone(),
61 82 : state,
62 82 : last_persist_at: Instant::now(),
63 82 : })
64 83 : }
65 :
66 : /// Create file storage for a new timeline, but don't persist it yet.
67 525 : pub fn create_new(
68 525 : ttid: &TenantTimelineId,
69 525 : conf: &SafeKeeperConf,
70 525 : state: SafeKeeperState,
71 525 : ) -> Result<FileStorage> {
72 525 : let timeline_dir = conf.timeline_dir(ttid);
73 525 :
74 525 : let store = FileStorage {
75 525 : timeline_dir,
76 525 : conf: conf.clone(),
77 525 : state,
78 525 : last_persist_at: Instant::now(),
79 525 : };
80 525 :
81 525 : Ok(store)
82 525 : }
83 :
84 : /// Check the magic/version in the on-disk data and deserialize it, if possible.
85 84 : fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
86 : // Read the version independent part
87 84 : let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
88 84 : if magic != SK_MAGIC {
89 0 : bail!(
90 0 : "bad control file magic: {:X}, expected {:X}",
91 0 : magic,
92 0 : SK_MAGIC
93 0 : );
94 84 : }
95 84 : let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
96 84 : if version == SK_FORMAT_VERSION {
97 84 : let res = SafeKeeperState::des(buf)?;
98 84 : return Ok(res);
99 0 : }
100 0 : // try to upgrade
101 0 : upgrade_control_file(buf, version)
102 84 : }
103 :
104 : /// Load control file for given ttid at path specified by conf.
105 84 : pub fn load_control_file_conf(
106 84 : conf: &SafeKeeperConf,
107 84 : ttid: &TenantTimelineId,
108 84 : ) -> Result<SafeKeeperState> {
109 84 : let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME);
110 84 : Self::load_control_file(path)
111 84 : }
112 :
113 : /// Read in the control file.
114 85 : pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
115 85 : let mut control_file = std::fs::OpenOptions::new()
116 85 : .read(true)
117 85 : .write(true)
118 85 : .open(&control_file_path)
119 85 : .with_context(|| {
120 0 : format!(
121 0 : "failed to open control file at {}",
122 0 : control_file_path.as_ref().display(),
123 0 : )
124 85 : })?;
125 :
126 85 : let mut buf = Vec::new();
127 85 : control_file
128 85 : .read_to_end(&mut buf)
129 85 : .context("failed to read control file")?;
130 :
131 85 : let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
132 :
133 85 : let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
134 85 : buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
135 85 : let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
136 85 :
137 85 : ensure!(
138 85 : calculated_checksum == expected_checksum,
139 1 : format!(
140 1 : "safekeeper control file checksum mismatch: expected {} got {}",
141 1 : expected_checksum, calculated_checksum
142 1 : )
143 : );
144 :
145 84 : let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
146 84 : .with_context(|| {
147 0 : format!(
148 0 : "while reading control file {}",
149 0 : control_file_path.as_ref().display(),
150 0 : )
151 84 : })?;
152 84 : Ok(state)
153 85 : }
154 : }
155 :
156 : impl Deref for FileStorage {
157 : type Target = SafeKeeperState;
158 :
159 29727724 : fn deref(&self) -> &Self::Target {
160 29727724 : &self.state
161 29727724 : }
162 : }
163 :
164 : #[async_trait::async_trait]
165 : impl Storage for FileStorage {
166 : /// Persists state durably to the underlying storage.
167 : ///
168 : /// For a description, see <https://lwn.net/Articles/457667/>.
169 5439 : async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
170 5439 : let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
171 5439 :
172 5439 : // write data to safekeeper.control.partial
173 5439 : let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
174 5466 : let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
175 0 : format!(
176 0 : "failed to create partial control file at: {}",
177 0 : &control_partial_path.display()
178 0 : )
179 5439 : })?;
180 5439 : let mut buf: Vec<u8> = Vec::new();
181 5439 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
182 5439 : WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
183 5439 : s.ser_into(&mut buf)?;
184 :
185 : // calculate checksum before resize
186 5439 : let checksum = crc32c::crc32c(&buf);
187 5439 : buf.extend_from_slice(&checksum.to_le_bytes());
188 5439 :
189 5439 : control_partial.write_all(&buf).await.with_context(|| {
190 0 : format!(
191 0 : "failed to write safekeeper state into control file at: {}",
192 0 : control_partial_path.display()
193 0 : )
194 5439 : })?;
195 5439 : control_partial.flush().await.with_context(|| {
196 0 : format!(
197 0 : "failed to flush safekeeper state into control file at: {}",
198 0 : control_partial_path.display()
199 0 : )
200 5439 : })?;
201 :
202 : // fsync the file
203 5439 : if !self.conf.no_sync {
204 2 : control_partial.sync_all().await.with_context(|| {
205 0 : format!(
206 0 : "failed to sync partial control file at {}",
207 0 : control_partial_path.display()
208 0 : )
209 2 : })?;
210 5437 : }
211 :
212 5439 : let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
213 5439 :
214 5439 : // rename should be atomic
215 5440 : fs::rename(&control_partial_path, &control_path).await?;
216 : // this sync is not required by any standard but postgres does this (see durable_rename)
217 5439 : if !self.conf.no_sync {
218 2 : let new_f = File::open(&control_path).await?;
219 2 : new_f.sync_all().await.with_context(|| {
220 0 : format!(
221 0 : "failed to sync control file at: {}",
222 0 : &control_path.display()
223 0 : )
224 2 : })?;
225 :
226 : // fsync the directory (linux specific)
227 2 : let tli_dir = File::open(&self.timeline_dir).await?;
228 2 : tli_dir
229 2 : .sync_all()
230 2 : .await
231 2 : .context("failed to sync control file directory")?;
232 5437 : }
233 :
234 : // update internal state
235 5439 : self.state = s.clone();
236 5439 : Ok(())
237 10878 : }
238 :
239 1315 : fn last_persist_at(&self) -> Instant {
240 1315 : self.last_persist_at
241 1315 : }
242 : }
243 :
244 : #[cfg(test)]
245 : mod test {
246 : use super::FileStorage;
247 : use super::*;
248 : use crate::{safekeeper::SafeKeeperState, SafeKeeperConf};
249 : use anyhow::Result;
250 : use utils::{id::TenantTimelineId, lsn::Lsn};
251 :
252 2 : fn stub_conf() -> SafeKeeperConf {
253 2 : let workdir = tempfile::tempdir().unwrap().into_path();
254 2 : SafeKeeperConf {
255 2 : workdir,
256 2 : ..SafeKeeperConf::dummy()
257 2 : }
258 2 : }
259 :
260 2 : async fn load_from_control_file(
261 2 : conf: &SafeKeeperConf,
262 2 : ttid: &TenantTimelineId,
263 2 : ) -> Result<(FileStorage, SafeKeeperState)> {
264 2 : fs::create_dir_all(conf.timeline_dir(ttid))
265 2 : .await
266 2 : .expect("failed to create timeline dir");
267 2 : Ok((
268 2 : FileStorage::restore_new(ttid, conf)?,
269 1 : FileStorage::load_control_file_conf(conf, ttid)?,
270 : ))
271 2 : }
272 :
273 2 : async fn create(
274 2 : conf: &SafeKeeperConf,
275 2 : ttid: &TenantTimelineId,
276 2 : ) -> Result<(FileStorage, SafeKeeperState)> {
277 2 : fs::create_dir_all(conf.timeline_dir(ttid))
278 2 : .await
279 2 : .expect("failed to create timeline dir");
280 2 : let state = SafeKeeperState::empty();
281 2 : let storage = FileStorage::create_new(ttid, conf, state.clone())?;
282 2 : Ok((storage, state))
283 2 : }
284 :
285 1 : #[tokio::test]
286 1 : async fn test_read_write_safekeeper_state() {
287 1 : let conf = stub_conf();
288 1 : let ttid = TenantTimelineId::generate();
289 : {
290 1 : let (mut storage, mut state) =
291 1 : create(&conf, &ttid).await.expect("failed to create state");
292 1 : // change something
293 1 : state.commit_lsn = Lsn(42);
294 1 : storage
295 1 : .persist(&state)
296 8 : .await
297 1 : .expect("failed to persist state");
298 : }
299 :
300 1 : let (_, state) = load_from_control_file(&conf, &ttid)
301 1 : .await
302 1 : .expect("failed to read state");
303 1 : assert_eq!(state.commit_lsn, Lsn(42));
304 : }
305 :
306 1 : #[tokio::test]
307 1 : async fn test_safekeeper_state_checksum_mismatch() {
308 1 : let conf = stub_conf();
309 1 : let ttid = TenantTimelineId::generate();
310 : {
311 1 : let (mut storage, mut state) =
312 1 : create(&conf, &ttid).await.expect("failed to read state");
313 1 :
314 1 : // change something
315 1 : state.commit_lsn = Lsn(42);
316 1 : storage
317 1 : .persist(&state)
318 8 : .await
319 1 : .expect("failed to persist state");
320 1 : }
321 1 : let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
322 1 : let mut data = fs::read(&control_path).await.unwrap();
323 1 : data[0] += 1; // change the first byte of the file to fail checksum validation
324 1 : fs::write(&control_path, &data)
325 1 : .await
326 1 : .expect("failed to write control file");
327 1 :
328 1 : match load_from_control_file(&conf, &ttid).await {
329 1 : Err(err) => assert!(err
330 1 : .to_string()
331 1 : .contains("safekeeper control file checksum mismatch")),
332 0 : Ok(_) => panic!("expected error"),
333 : }
334 : }
335 : }
|