Line data Source code
1 : //! Code related to evicting WAL files to remote storage.
2 : //!
3 : //! The actual upload is done by the partial WAL backup code. This file has
4 : //! code to delete and re-download WAL files, cross-validate with partial WAL
5 : //! backup if local file is still present.
6 :
7 : use anyhow::Context;
8 : use camino::Utf8PathBuf;
9 : use remote_storage::{GenericRemoteStorage, RemotePath};
10 : use tokio::fs::File;
11 : use tokio::io::{AsyncRead, AsyncWriteExt};
12 : use tracing::{debug, info, instrument, warn};
13 : use utils::crashsafe::durable_rename;
14 :
15 : use crate::metrics::{
16 : EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED, EvictionEvent, NUM_EVICTED_TIMELINES,
17 : };
18 : use crate::rate_limit::rand_duration;
19 : use crate::timeline_manager::{Manager, StateSnapshot};
20 : use crate::wal_backup;
21 : use crate::wal_backup_partial::{self, PartialRemoteSegment};
22 : use crate::wal_storage::wal_file_paths;
23 :
24 : impl Manager {
25 : /// Returns true if the timeline is ready for eviction.
26 : /// Current criteria:
27 : /// - no active tasks
28 : /// - control file is flushed (no next event scheduled)
29 : /// - no WAL residence guards
30 : /// - no pushes to the broker
31 : /// - last partial WAL segment is uploaded
32 : /// - all local segments before the uploaded partial are committed and uploaded
33 0 : pub(crate) fn ready_for_eviction(
34 0 : &self,
35 0 : next_event: &Option<tokio::time::Instant>,
36 0 : state: &StateSnapshot,
37 0 : ) -> bool {
38 0 : self.backup_task.is_none()
39 0 : && self.recovery_task.is_none()
40 0 : && self.wal_removal_task.is_none()
41 0 : && self.partial_backup_task.is_none()
42 0 : && next_event.is_none()
43 0 : && self.access_service.is_empty()
44 0 : && !self.tli_broker_active.get()
45 : // Partial segment of current flush_lsn is uploaded up to this flush_lsn.
46 0 : && !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded)
47 : // And it is the next one after the last removed. Given that local
48 : // WAL is removed only after it is uploaded to s3 (and pageserver
49 : // advancing remote_consistent_lsn) which happens only after WAL is
50 : // committed, true means all this is done.
51 : //
52 : // This also works for the first segment despite last_removed_segno
53 : // being 0 on init because this 0 triggers run of wal_removal_task
54 : // on success of which manager updates the horizon.
55 : //
56 : // **Note** pull_timeline functionality assumes that evicted timelines always have
57 : // a partial segment: if we ever change this condition, must also update that code.
58 0 : && self
59 0 : .partial_backup_uploaded
60 0 : .as_ref()
61 0 : .unwrap()
62 0 : .flush_lsn
63 0 : .segment_number(self.wal_seg_size)
64 0 : == self.last_removed_segno + 1
65 0 : }
66 :
67 : /// Evict the timeline to remote storage. Returns whether the eviction was successful.
68 : #[instrument(name = "evict_timeline", skip_all)]
69 : pub(crate) async fn evict_timeline(&mut self) -> bool {
70 : assert!(!self.is_offloaded);
71 : let Some(storage) = self.wal_backup.get_storage() else {
72 : warn!("no remote storage configured, skipping uneviction");
73 : return false;
74 : };
75 : let partial_backup_uploaded = match &self.partial_backup_uploaded {
76 : Some(p) => p.clone(),
77 : None => {
78 : warn!("no partial backup uploaded, skipping eviction");
79 : return false;
80 : }
81 : };
82 :
83 : info!("starting eviction, using {:?}", partial_backup_uploaded);
84 :
85 : EVICTION_EVENTS_STARTED
86 : .with_label_values(&[EvictionEvent::Evict.into()])
87 : .inc();
88 0 : let _guard = scopeguard::guard((), |_| {
89 0 : EVICTION_EVENTS_COMPLETED
90 0 : .with_label_values(&[EvictionEvent::Evict.into()])
91 0 : .inc();
92 0 : });
93 :
94 : if let Err(e) = do_eviction(self, &partial_backup_uploaded, &storage).await {
95 : warn!("failed to evict timeline: {:?}", e);
96 : return false;
97 : }
98 :
99 : info!("successfully evicted timeline");
100 : NUM_EVICTED_TIMELINES.inc();
101 : true
102 : }
103 :
104 : /// Attempt to restore evicted timeline from remote storage; it must be
105 : /// offloaded.
106 : #[instrument(name = "unevict_timeline", skip_all)]
107 : pub(crate) async fn unevict_timeline(&mut self) {
108 : assert!(self.is_offloaded);
109 : let Some(storage) = self.wal_backup.get_storage() else {
110 : warn!("no remote storage configured, skipping uneviction");
111 : return;
112 : };
113 : let partial_backup_uploaded = match &self.partial_backup_uploaded {
114 : Some(p) => p.clone(),
115 : None => {
116 : warn!("no partial backup uploaded, cannot unevict");
117 : return;
118 : }
119 : };
120 :
121 : info!("starting uneviction, using {:?}", partial_backup_uploaded);
122 :
123 : EVICTION_EVENTS_STARTED
124 : .with_label_values(&[EvictionEvent::Restore.into()])
125 : .inc();
126 0 : let _guard = scopeguard::guard((), |_| {
127 0 : EVICTION_EVENTS_COMPLETED
128 0 : .with_label_values(&[EvictionEvent::Restore.into()])
129 0 : .inc();
130 0 : });
131 :
132 : if let Err(e) = do_uneviction(self, &partial_backup_uploaded, &storage).await {
133 : warn!("failed to unevict timeline: {:?}", e);
134 : return;
135 : }
136 :
137 : self.evict_not_before =
138 : tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident);
139 :
140 : info!("successfully restored evicted timeline");
141 : NUM_EVICTED_TIMELINES.dec();
142 : }
143 : }
144 :
145 : /// Ensure that content matches the remote partial backup, if local segment exists.
146 : /// Then change state in control file and in-memory. If `delete_offloaded_wal` is set,
147 : /// delete the local segment.
148 0 : async fn do_eviction(
149 0 : mgr: &mut Manager,
150 0 : partial: &PartialRemoteSegment,
151 0 : storage: &GenericRemoteStorage,
152 0 : ) -> anyhow::Result<()> {
153 0 : compare_local_segment_with_remote(mgr, partial, storage).await?;
154 :
155 0 : mgr.tli.switch_to_offloaded(partial).await?;
156 : // switch manager state as soon as possible
157 0 : mgr.is_offloaded = true;
158 :
159 0 : if mgr.conf.delete_offloaded_wal {
160 0 : delete_local_segment(mgr, partial).await?;
161 0 : }
162 :
163 0 : Ok(())
164 0 : }
165 :
166 : /// Ensure that content matches the remote partial backup, if local segment exists.
167 : /// Then download segment to local disk and change state in control file and in-memory.
168 0 : async fn do_uneviction(
169 0 : mgr: &mut Manager,
170 0 : partial: &PartialRemoteSegment,
171 0 : storage: &GenericRemoteStorage,
172 0 : ) -> anyhow::Result<()> {
173 : // if the local segment is present, validate it
174 0 : compare_local_segment_with_remote(mgr, partial, storage).await?;
175 :
176 : // atomically download the partial segment
177 0 : redownload_partial_segment(mgr, partial, storage).await?;
178 :
179 0 : mgr.tli.switch_to_present().await?;
180 : // switch manager state as soon as possible
181 0 : mgr.is_offloaded = false;
182 :
183 0 : Ok(())
184 0 : }
185 :
186 : /// Delete local WAL segment.
187 0 : async fn delete_local_segment(mgr: &Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> {
188 0 : let local_path = local_segment_path(mgr, partial);
189 :
190 0 : info!("deleting WAL file to evict: {}", local_path);
191 0 : tokio::fs::remove_file(&local_path).await?;
192 0 : Ok(())
193 0 : }
194 :
195 : /// Redownload partial segment from remote storage.
196 : /// The segment is downloaded to a temporary file and then renamed to the final path.
197 0 : async fn redownload_partial_segment(
198 0 : mgr: &Manager,
199 0 : partial: &PartialRemoteSegment,
200 0 : storage: &GenericRemoteStorage,
201 0 : ) -> anyhow::Result<()> {
202 0 : let tmp_file = mgr.tli.timeline_dir().join("remote_partial.tmp");
203 0 : let remote_segfile = remote_segment_path(mgr, partial);
204 :
205 0 : debug!(
206 0 : "redownloading partial segment: {} -> {}",
207 : remote_segfile, tmp_file
208 : );
209 :
210 0 : let mut reader = wal_backup::read_object(storage, &remote_segfile, 0).await?;
211 0 : let mut file = File::create(&tmp_file).await?;
212 :
213 0 : let actual_len = tokio::io::copy(&mut reader, &mut file).await?;
214 0 : let expected_len = partial.flush_lsn.segment_offset(mgr.wal_seg_size);
215 :
216 0 : if actual_len != expected_len as u64 {
217 0 : anyhow::bail!(
218 0 : "partial downloaded {} bytes, expected {}",
219 : actual_len,
220 : expected_len
221 : );
222 0 : }
223 :
224 0 : if actual_len > mgr.wal_seg_size as u64 {
225 0 : anyhow::bail!(
226 0 : "remote segment is too long: {} bytes, expected {}",
227 : actual_len,
228 : mgr.wal_seg_size
229 : );
230 0 : }
231 0 : file.set_len(mgr.wal_seg_size as u64).await?;
232 0 : file.flush().await?;
233 :
234 0 : let final_path = local_segment_path(mgr, partial);
235 0 : info!("downloaded {actual_len} bytes, renaming to {final_path}");
236 0 : if let Err(e) = durable_rename(&tmp_file, &final_path, !mgr.conf.no_sync).await {
237 : // Probably rename succeeded, but fsync of it failed. Remove
238 : // the file then to avoid using it.
239 0 : tokio::fs::remove_file(tmp_file)
240 0 : .await
241 0 : .or_else(utils::fs_ext::ignore_not_found)?;
242 0 : return Err(e.into());
243 0 : }
244 :
245 0 : Ok(())
246 0 : }
247 :
248 : /// Compare local WAL segment with partial WAL backup in remote storage.
249 : /// If the local segment is not present, the function does nothing.
250 : /// If the local segment is present, it compares the local segment with the remote one.
251 0 : async fn compare_local_segment_with_remote(
252 0 : mgr: &Manager,
253 0 : partial: &PartialRemoteSegment,
254 0 : storage: &GenericRemoteStorage,
255 0 : ) -> anyhow::Result<()> {
256 0 : let local_path = local_segment_path(mgr, partial);
257 :
258 0 : match File::open(&local_path).await {
259 0 : Ok(mut local_file) => {
260 0 : do_validation(mgr, &mut local_file, mgr.wal_seg_size, partial, storage)
261 0 : .await
262 0 : .context("validation failed")
263 : }
264 : Err(_) => {
265 0 : info!(
266 0 : "local WAL file {} is not present, skipping validation",
267 : local_path
268 : );
269 0 : Ok(())
270 : }
271 : }
272 0 : }
273 :
274 : /// Compare opened local WAL segment with partial WAL backup in remote storage.
275 : /// Validate full content of both files.
276 0 : async fn do_validation(
277 0 : mgr: &Manager,
278 0 : file: &mut File,
279 0 : wal_seg_size: usize,
280 0 : partial: &PartialRemoteSegment,
281 0 : storage: &GenericRemoteStorage,
282 0 : ) -> anyhow::Result<()> {
283 0 : let local_size = file.metadata().await?.len() as usize;
284 0 : if local_size != wal_seg_size {
285 0 : anyhow::bail!(
286 0 : "local segment size is invalid: found {}, expected {}",
287 : local_size,
288 : wal_seg_size
289 : );
290 0 : }
291 :
292 0 : let remote_segfile = remote_segment_path(mgr, partial);
293 0 : let mut remote_reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> =
294 0 : wal_backup::read_object(storage, &remote_segfile, 0).await?;
295 :
296 : // remote segment should have bytes excatly up to `flush_lsn`
297 0 : let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size);
298 : // let's compare the first `expected_remote_size` bytes
299 0 : compare_n_bytes(&mut remote_reader, file, expected_remote_size).await?;
300 : // and check that the remote segment ends here
301 0 : check_end(&mut remote_reader).await?;
302 :
303 : // if local segment is longer, the rest should be zeroes
304 0 : read_n_zeroes(file, mgr.wal_seg_size - expected_remote_size).await?;
305 : // and check that the local segment ends here
306 0 : check_end(file).await?;
307 :
308 0 : Ok(())
309 0 : }
310 :
311 0 : fn local_segment_path(mgr: &Manager, partial: &PartialRemoteSegment) -> Utf8PathBuf {
312 0 : let flush_lsn = partial.flush_lsn;
313 0 : let segno = flush_lsn.segment_number(mgr.wal_seg_size);
314 0 : let (_, local_partial_segfile) =
315 0 : wal_file_paths(mgr.tli.timeline_dir(), segno, mgr.wal_seg_size);
316 0 : local_partial_segfile
317 0 : }
318 :
319 0 : fn remote_segment_path(mgr: &Manager, partial: &PartialRemoteSegment) -> RemotePath {
320 0 : partial.remote_path(&mgr.tli.remote_path)
321 0 : }
322 :
323 : /// Compare first `n` bytes of two readers. If the bytes differ, return an error.
324 : /// If the readers are shorter than `n`, return an error.
325 0 : async fn compare_n_bytes<R1, R2>(reader1: &mut R1, reader2: &mut R2, n: usize) -> anyhow::Result<()>
326 0 : where
327 0 : R1: AsyncRead + Unpin,
328 0 : R2: AsyncRead + Unpin,
329 0 : {
330 : use tokio::io::AsyncReadExt;
331 :
332 : const BUF_SIZE: usize = 32 * 1024;
333 :
334 0 : let mut buffer1 = vec![0u8; BUF_SIZE];
335 0 : let mut buffer2 = vec![0u8; BUF_SIZE];
336 :
337 0 : let mut offset = 0;
338 :
339 0 : while offset < n {
340 0 : let bytes_to_read = std::cmp::min(BUF_SIZE, n - offset);
341 :
342 0 : let bytes_read1 = reader1
343 0 : .read(&mut buffer1[..bytes_to_read])
344 0 : .await
345 0 : .with_context(|| format!("failed to read from reader1 at offset {offset}"))?;
346 0 : if bytes_read1 == 0 {
347 0 : anyhow::bail!("unexpected EOF from reader1 at offset {}", offset);
348 0 : }
349 :
350 0 : let bytes_read2 = reader2
351 0 : .read_exact(&mut buffer2[..bytes_read1])
352 0 : .await
353 0 : .with_context(|| {
354 0 : format!("failed to read {bytes_read1} bytes from reader2 at offset {offset}")
355 0 : })?;
356 0 : assert!(bytes_read2 == bytes_read1);
357 :
358 0 : if buffer1[..bytes_read1] != buffer2[..bytes_read2] {
359 0 : let diff_offset = buffer1[..bytes_read1]
360 0 : .iter()
361 0 : .zip(buffer2[..bytes_read2].iter())
362 0 : .position(|(a, b)| a != b)
363 0 : .expect("mismatched buffers, but no difference found");
364 0 : anyhow::bail!("mismatch at offset {}", offset + diff_offset);
365 0 : }
366 :
367 0 : offset += bytes_read1;
368 : }
369 :
370 0 : Ok(())
371 0 : }
372 :
373 0 : async fn check_end<R>(mut reader: R) -> anyhow::Result<()>
374 0 : where
375 0 : R: AsyncRead + Unpin,
376 0 : {
377 : use tokio::io::AsyncReadExt;
378 :
379 0 : let mut buffer = [0u8; 1];
380 0 : let bytes_read = reader.read(&mut buffer).await?;
381 0 : if bytes_read != 0 {
382 0 : anyhow::bail!("expected EOF, found bytes");
383 0 : }
384 0 : Ok(())
385 0 : }
386 :
387 0 : async fn read_n_zeroes<R>(reader: &mut R, n: usize) -> anyhow::Result<()>
388 0 : where
389 0 : R: AsyncRead + Unpin,
390 0 : {
391 : use tokio::io::AsyncReadExt;
392 :
393 : const BUF_SIZE: usize = 32 * 1024;
394 0 : let mut buffer = vec![0u8; BUF_SIZE];
395 0 : let mut offset = 0;
396 :
397 0 : while offset < n {
398 0 : let bytes_to_read = std::cmp::min(BUF_SIZE, n - offset);
399 :
400 0 : let bytes_read = reader
401 0 : .read(&mut buffer[..bytes_to_read])
402 0 : .await
403 0 : .context("expected zeroes, got read error")?;
404 0 : if bytes_read == 0 {
405 0 : anyhow::bail!("expected zeroes, got EOF");
406 0 : }
407 :
408 0 : if buffer[..bytes_read].iter().all(|&b| b == 0) {
409 0 : offset += bytes_read;
410 0 : } else {
411 0 : anyhow::bail!("non-zero byte found");
412 : }
413 : }
414 :
415 0 : Ok(())
416 0 : }
|