Line data Source code
1 : //! Code related to evicting WAL files to remote storage. The actual upload is done by the
2 : //! partial WAL backup code. This file has code to delete and re-download WAL files,
3 : //! cross-validate with partial WAL backup if local file is still present.
4 :
5 : use anyhow::Context;
6 : use camino::Utf8PathBuf;
7 : use remote_storage::RemotePath;
8 : use tokio::{
9 : fs::File,
10 : io::{AsyncRead, AsyncWriteExt},
11 : };
12 : use tracing::{debug, info, instrument, warn};
13 : use utils::crashsafe::durable_rename;
14 :
15 : use crate::{
16 : timeline_manager::{Manager, StateSnapshot},
17 : wal_backup,
18 : wal_backup_partial::{self, PartialRemoteSegment},
19 : wal_storage::wal_file_paths,
20 : };
21 :
22 : impl Manager {
23 : /// Returns true if the timeline is ready for eviction.
24 : /// Current criteria:
25 : /// - no active tasks
26 : /// - control file is flushed (no next event scheduled)
27 : /// - no WAL residence guards
28 : /// - no pushes to the broker
29 : /// - partial WAL backup is uploaded
30 0 : pub(crate) fn ready_for_eviction(
31 0 : &self,
32 0 : next_event: &Option<tokio::time::Instant>,
33 0 : state: &StateSnapshot,
34 0 : ) -> bool {
35 0 : self.backup_task.is_none()
36 0 : && self.recovery_task.is_none()
37 0 : && self.wal_removal_task.is_none()
38 0 : && self.partial_backup_task.is_none()
39 0 : && self.partial_backup_uploaded.is_some()
40 0 : && next_event.is_none()
41 0 : && self.access_service.is_empty()
42 0 : && !self.tli_broker_active.get()
43 0 : && !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded)
44 0 : && self
45 0 : .partial_backup_uploaded
46 0 : .as_ref()
47 0 : .unwrap()
48 0 : .flush_lsn
49 0 : .segment_number(self.wal_seg_size)
50 0 : == self.last_removed_segno + 1
51 0 : }
52 :
53 : /// Evict the timeline to remote storage.
54 0 : #[instrument(name = "evict_timeline", skip_all)]
55 : pub(crate) async fn evict_timeline(&mut self) {
56 : assert!(!self.is_offloaded);
57 : let partial_backup_uploaded = match &self.partial_backup_uploaded {
58 : Some(p) => p.clone(),
59 : None => {
60 : warn!("no partial backup uploaded, skipping eviction");
61 : return;
62 : }
63 : };
64 :
65 : info!("starting eviction, using {:?}", partial_backup_uploaded);
66 :
67 : if let Err(e) = do_eviction(self, &partial_backup_uploaded).await {
68 : warn!("failed to evict timeline: {:?}", e);
69 : return;
70 : }
71 :
72 : info!("successfully evicted timeline");
73 : }
74 :
75 : /// Restore evicted timeline from remote storage.
76 0 : #[instrument(name = "unevict_timeline", skip_all)]
77 : pub(crate) async fn unevict_timeline(&mut self) {
78 : assert!(self.is_offloaded);
79 : let partial_backup_uploaded = match &self.partial_backup_uploaded {
80 : Some(p) => p.clone(),
81 : None => {
82 : warn!("no partial backup uploaded, cannot unevict");
83 : return;
84 : }
85 : };
86 :
87 : info!("starting uneviction, using {:?}", partial_backup_uploaded);
88 :
89 : if let Err(e) = do_uneviction(self, &partial_backup_uploaded).await {
90 : warn!("failed to unevict timeline: {:?}", e);
91 : return;
92 : }
93 :
94 : info!("successfully restored evicted timeline");
95 : }
96 : }
97 :
98 : /// Ensure that content matches the remote partial backup, if local segment exists.
99 : /// Then change state in control file and in-memory. If `delete_offloaded_wal` is set,
100 : /// delete the local segment.
101 0 : async fn do_eviction(mgr: &mut Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> {
102 0 : compare_local_segment_with_remote(mgr, partial).await?;
103 :
104 0 : mgr.tli.switch_to_offloaded(partial).await?;
105 : // switch manager state as soon as possible
106 0 : mgr.is_offloaded = true;
107 0 :
108 0 : if mgr.conf.delete_offloaded_wal {
109 0 : delete_local_segment(mgr, partial).await?;
110 0 : }
111 :
112 0 : Ok(())
113 0 : }
114 :
115 : /// Ensure that content matches the remote partial backup, if local segment exists.
116 : /// Then download segment to local disk and change state in control file and in-memory.
117 0 : async fn do_uneviction(mgr: &mut Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> {
118 0 : // if the local segment is present, validate it
119 0 : compare_local_segment_with_remote(mgr, partial).await?;
120 :
121 : // atomically download the partial segment
122 0 : redownload_partial_segment(mgr, partial).await?;
123 :
124 0 : mgr.tli.switch_to_present().await?;
125 : // switch manager state as soon as possible
126 0 : mgr.is_offloaded = false;
127 0 :
128 0 : Ok(())
129 0 : }
130 :
131 : /// Delete local WAL segment.
132 0 : async fn delete_local_segment(mgr: &Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> {
133 0 : let local_path = local_segment_path(mgr, partial);
134 0 :
135 0 : info!("deleting WAL file to evict: {}", local_path);
136 0 : tokio::fs::remove_file(&local_path).await?;
137 0 : Ok(())
138 0 : }
139 :
140 : /// Redownload partial segment from remote storage.
141 : /// The segment is downloaded to a temporary file and then renamed to the final path.
142 0 : async fn redownload_partial_segment(
143 0 : mgr: &Manager,
144 0 : partial: &PartialRemoteSegment,
145 0 : ) -> anyhow::Result<()> {
146 0 : let tmp_file = mgr.tli.timeline_dir().join("remote_partial.tmp");
147 0 : let remote_segfile = remote_segment_path(mgr, partial)?;
148 :
149 0 : debug!(
150 0 : "redownloading partial segment: {} -> {}",
151 : remote_segfile, tmp_file
152 : );
153 :
154 0 : let mut reader = wal_backup::read_object(&remote_segfile, 0).await?;
155 0 : let mut file = File::create(&tmp_file).await?;
156 :
157 0 : let actual_len = tokio::io::copy(&mut reader, &mut file).await?;
158 0 : let expected_len = partial.flush_lsn.segment_offset(mgr.wal_seg_size);
159 0 :
160 0 : if actual_len != expected_len as u64 {
161 0 : anyhow::bail!(
162 0 : "partial downloaded {} bytes, expected {}",
163 0 : actual_len,
164 0 : expected_len
165 0 : );
166 0 : }
167 0 :
168 0 : if actual_len > mgr.wal_seg_size as u64 {
169 0 : anyhow::bail!(
170 0 : "remote segment is too long: {} bytes, expected {}",
171 0 : actual_len,
172 0 : mgr.wal_seg_size
173 0 : );
174 0 : }
175 0 : file.set_len(mgr.wal_seg_size as u64).await?;
176 0 : file.flush().await?;
177 :
178 0 : let final_path = local_segment_path(mgr, partial);
179 0 : info!(
180 0 : "downloaded {} bytes, renaming to {}",
181 : final_path, final_path,
182 : );
183 0 : if let Err(e) = durable_rename(&tmp_file, &final_path, !mgr.conf.no_sync).await {
184 : // Probably rename succeeded, but fsync of it failed. Remove
185 : // the file then to avoid using it.
186 0 : tokio::fs::remove_file(tmp_file)
187 0 : .await
188 0 : .or_else(utils::fs_ext::ignore_not_found)?;
189 0 : return Err(e.into());
190 0 : }
191 0 :
192 0 : Ok(())
193 0 : }
194 :
195 : /// Compare local WAL segment with partial WAL backup in remote storage.
196 : /// If the local segment is not present, the function does nothing.
197 : /// If the local segment is present, it compares the local segment with the remote one.
198 0 : async fn compare_local_segment_with_remote(
199 0 : mgr: &Manager,
200 0 : partial: &PartialRemoteSegment,
201 0 : ) -> anyhow::Result<()> {
202 0 : let local_path = local_segment_path(mgr, partial);
203 0 :
204 0 : match File::open(&local_path).await {
205 0 : Ok(mut local_file) => do_validation(mgr, &mut local_file, mgr.wal_seg_size, partial)
206 0 : .await
207 0 : .context("validation failed"),
208 : Err(_) => {
209 0 : info!(
210 0 : "local WAL file {} is not present, skipping validation",
211 : local_path
212 : );
213 0 : Ok(())
214 : }
215 : }
216 0 : }
217 :
218 : /// Compare opened local WAL segment with partial WAL backup in remote storage.
219 : /// Validate full content of both files.
220 0 : async fn do_validation(
221 0 : mgr: &Manager,
222 0 : file: &mut File,
223 0 : wal_seg_size: usize,
224 0 : partial: &PartialRemoteSegment,
225 0 : ) -> anyhow::Result<()> {
226 0 : let local_size = file.metadata().await?.len() as usize;
227 0 : if local_size != wal_seg_size {
228 0 : anyhow::bail!(
229 0 : "local segment size is invalid: found {}, expected {}",
230 0 : local_size,
231 0 : wal_seg_size
232 0 : );
233 0 : }
234 :
235 0 : let remote_segfile = remote_segment_path(mgr, partial)?;
236 0 : let mut remote_reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> =
237 0 : wal_backup::read_object(&remote_segfile, 0).await?;
238 :
239 : // remote segment should have bytes excatly up to `flush_lsn`
240 0 : let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size);
241 0 : // let's compare the first `expected_remote_size` bytes
242 0 : compare_n_bytes(&mut remote_reader, file, expected_remote_size).await?;
243 : // and check that the remote segment ends here
244 0 : check_end(&mut remote_reader).await?;
245 :
246 : // if local segment is longer, the rest should be zeroes
247 0 : read_n_zeroes(file, mgr.wal_seg_size - expected_remote_size).await?;
248 : // and check that the local segment ends here
249 0 : check_end(file).await?;
250 :
251 0 : Ok(())
252 0 : }
253 :
254 0 : fn local_segment_path(mgr: &Manager, partial: &PartialRemoteSegment) -> Utf8PathBuf {
255 0 : let flush_lsn = partial.flush_lsn;
256 0 : let segno = flush_lsn.segment_number(mgr.wal_seg_size);
257 0 : let (_, local_partial_segfile) =
258 0 : wal_file_paths(mgr.tli.timeline_dir(), segno, mgr.wal_seg_size);
259 0 : local_partial_segfile
260 0 : }
261 :
262 0 : fn remote_segment_path(
263 0 : mgr: &Manager,
264 0 : partial: &PartialRemoteSegment,
265 0 : ) -> anyhow::Result<RemotePath> {
266 0 : let remote_timeline_path = wal_backup::remote_timeline_path(&mgr.tli.ttid)?;
267 0 : Ok(partial.remote_path(&remote_timeline_path))
268 0 : }
269 :
270 : /// Compare first `n` bytes of two readers. If the bytes differ, return an error.
271 : /// If the readers are shorter than `n`, return an error.
272 0 : async fn compare_n_bytes<R1, R2>(reader1: &mut R1, reader2: &mut R2, n: usize) -> anyhow::Result<()>
273 0 : where
274 0 : R1: AsyncRead + Unpin,
275 0 : R2: AsyncRead + Unpin,
276 0 : {
277 0 : use tokio::io::AsyncReadExt;
278 0 :
279 0 : const BUF_SIZE: usize = 32 * 1024;
280 0 :
281 0 : let mut buffer1 = vec![0u8; BUF_SIZE];
282 0 : let mut buffer2 = vec![0u8; BUF_SIZE];
283 0 :
284 0 : let mut offset = 0;
285 :
286 0 : while offset < n {
287 0 : let bytes_to_read = std::cmp::min(BUF_SIZE, n - offset);
288 :
289 0 : let bytes_read1 = reader1
290 0 : .read(&mut buffer1[..bytes_to_read])
291 0 : .await
292 0 : .with_context(|| format!("failed to read from reader1 at offset {}", offset))?;
293 0 : if bytes_read1 == 0 {
294 0 : anyhow::bail!("unexpected EOF from reader1 at offset {}", offset);
295 0 : }
296 :
297 0 : let bytes_read2 = reader2
298 0 : .read_exact(&mut buffer2[..bytes_read1])
299 0 : .await
300 0 : .with_context(|| {
301 0 : format!(
302 0 : "failed to read {} bytes from reader2 at offset {}",
303 0 : bytes_read1, offset
304 0 : )
305 0 : })?;
306 0 : assert!(bytes_read2 == bytes_read1);
307 :
308 0 : if buffer1[..bytes_read1] != buffer2[..bytes_read2] {
309 0 : let diff_offset = buffer1[..bytes_read1]
310 0 : .iter()
311 0 : .zip(buffer2[..bytes_read2].iter())
312 0 : .position(|(a, b)| a != b)
313 0 : .expect("mismatched buffers, but no difference found");
314 0 : anyhow::bail!("mismatch at offset {}", offset + diff_offset);
315 0 : }
316 0 :
317 0 : offset += bytes_read1;
318 : }
319 :
320 0 : Ok(())
321 0 : }
322 :
323 0 : async fn check_end<R>(mut reader: R) -> anyhow::Result<()>
324 0 : where
325 0 : R: AsyncRead + Unpin,
326 0 : {
327 0 : use tokio::io::AsyncReadExt;
328 0 :
329 0 : let mut buffer = [0u8; 1];
330 0 : let bytes_read = reader.read(&mut buffer).await?;
331 0 : if bytes_read != 0 {
332 0 : anyhow::bail!("expected EOF, found bytes");
333 0 : }
334 0 : Ok(())
335 0 : }
336 :
337 0 : async fn read_n_zeroes<R>(reader: &mut R, n: usize) -> anyhow::Result<()>
338 0 : where
339 0 : R: AsyncRead + Unpin,
340 0 : {
341 0 : use tokio::io::AsyncReadExt;
342 0 :
343 0 : const BUF_SIZE: usize = 32 * 1024;
344 0 : let mut buffer = vec![0u8; BUF_SIZE];
345 0 : let mut offset = 0;
346 :
347 0 : while offset < n {
348 0 : let bytes_to_read = std::cmp::min(BUF_SIZE, n - offset);
349 :
350 0 : let bytes_read = reader
351 0 : .read(&mut buffer[..bytes_to_read])
352 0 : .await
353 0 : .context("expected zeroes, got read error")?;
354 0 : if bytes_read == 0 {
355 0 : anyhow::bail!("expected zeroes, got EOF");
356 0 : }
357 0 :
358 0 : if buffer[..bytes_read].iter().all(|&b| b == 0) {
359 0 : offset += bytes_read;
360 0 : } else {
361 0 : anyhow::bail!("non-zero byte found");
362 : }
363 : }
364 :
365 0 : Ok(())
366 0 : }
|