Line data Source code
1 : use std::cmp::min;
2 : use std::collections::HashSet;
3 : use std::num::NonZeroU32;
4 : use std::pin::Pin;
5 : use std::sync::Arc;
6 : use std::time::Duration;
7 :
8 : use anyhow::{Context, Result};
9 : use camino::{Utf8Path, Utf8PathBuf};
10 : use futures::StreamExt;
11 : use futures::stream::FuturesOrdered;
12 : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
13 : use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
14 : use remote_storage::{
15 : DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata,
16 : };
17 : use safekeeper_api::models::PeerInfo;
18 : use tokio::fs::File;
19 : use tokio::select;
20 : use tokio::sync::mpsc::{self, Receiver, Sender};
21 : use tokio::sync::watch;
22 : use tokio::task::JoinHandle;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::*;
25 : use utils::id::{NodeId, TenantTimelineId};
26 : use utils::lsn::Lsn;
27 : use utils::{backoff, pausable_failpoint};
28 :
29 : use crate::metrics::{
30 : BACKED_UP_SEGMENTS, BACKUP_ERRORS, BACKUP_REELECT_LEADER_COUNT, WAL_BACKUP_TASKS,
31 : };
32 : use crate::timeline::WalResidentTimeline;
33 : use crate::timeline_manager::{Manager, StateSnapshot};
34 : use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME};
35 :
36 : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
37 : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
38 :
39 : /// Default buffer size when interfacing with [`tokio::fs::File`].
40 : const BUFFER_SIZE: usize = 32 * 1024;
41 :
42 : pub struct WalBackupTaskHandle {
43 : shutdown_tx: Sender<()>,
44 : handle: JoinHandle<()>,
45 : }
46 :
47 : impl WalBackupTaskHandle {
48 0 : pub(crate) async fn join(self) {
49 0 : if let Err(e) = self.handle.await {
50 0 : error!("WAL backup task panicked: {}", e);
51 0 : }
52 0 : }
53 : }
54 :
55 : /// Do we have anything to upload to S3, i.e. should safekeepers run backup activity?
56 50 : pub(crate) fn is_wal_backup_required(
57 50 : wal_seg_size: usize,
58 50 : num_computes: usize,
59 50 : state: &StateSnapshot,
60 50 : ) -> bool {
61 50 : num_computes > 0 ||
62 : // Currently only the whole segment is offloaded, so compare segment numbers.
63 24 : (state.commit_lsn.segment_number(wal_seg_size) > state.backup_lsn.segment_number(wal_seg_size))
64 50 : }
65 :
66 : /// Based on peer information determine which safekeeper should offload; if it
67 : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
68 : /// is running, kill it.
69 0 : pub(crate) async fn update_task(
70 0 : mgr: &mut Manager,
71 0 : storage: Arc<GenericRemoteStorage>,
72 0 : need_backup: bool,
73 0 : state: &StateSnapshot,
74 0 : ) {
75 : /* BEGIN_HADRON */
76 0 : let (offloader, election_dbg_str) = hadron_determine_offloader(mgr, state);
77 : /* END_HADRON */
78 0 : let elected_me = Some(mgr.conf.my_id) == offloader;
79 :
80 0 : let should_task_run = need_backup && elected_me;
81 :
82 : // start or stop the task
83 0 : if should_task_run != (mgr.backup_task.is_some()) {
84 0 : if should_task_run {
85 0 : info!("elected for backup: {}", election_dbg_str);
86 :
87 0 : let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
88 :
89 0 : let Ok(resident) = mgr.wal_resident_timeline() else {
90 0 : info!("Timeline shut down");
91 0 : return;
92 : };
93 :
94 0 : let async_task = backup_task_main(
95 0 : resident,
96 0 : storage,
97 0 : mgr.conf.backup_parallel_jobs,
98 0 : shutdown_rx,
99 : );
100 :
101 0 : let handle = if mgr.conf.current_thread_runtime {
102 0 : tokio::spawn(async_task)
103 : } else {
104 0 : WAL_BACKUP_RUNTIME.spawn(async_task)
105 : };
106 :
107 0 : mgr.backup_task = Some(WalBackupTaskHandle {
108 0 : shutdown_tx,
109 0 : handle,
110 0 : });
111 : } else {
112 0 : if !need_backup {
113 : // don't need backup at all
114 0 : info!("stepping down from backup, need_backup={}", need_backup);
115 : } else {
116 : // someone else has been elected
117 0 : info!("stepping down from backup: {}", election_dbg_str);
118 : }
119 0 : shut_down_task(&mut mgr.backup_task).await;
120 : }
121 0 : }
122 0 : }
123 :
124 0 : async fn shut_down_task(entry: &mut Option<WalBackupTaskHandle>) {
125 0 : if let Some(wb_handle) = entry.take() {
126 : // Tell the task to shutdown. Error means task exited earlier, that's ok.
127 0 : let _ = wb_handle.shutdown_tx.send(()).await;
128 : // Await the task itself. TODO: restart panicked tasks earlier.
129 0 : wb_handle.join().await;
130 0 : }
131 0 : }
132 :
133 : /* BEGIN_HADRON */
134 : // On top of the neon determine_offloader, we also check if the current offloader is lagging behind too much.
135 : // If it is, we re-elect a new offloader. This mitigates the below issue. It also helps distribute the load across SKs.
136 : //
137 : // We observe that the offloader fails to upload a segment due to race conditions on XLOG SWITCH and PG start streaming WALs.
138 : // wal_backup task continously failing to upload a full segment while the segment remains partial on the disk.
139 : // The consequence is that commit_lsn for all SKs move forward but backup_lsn stays the same. Then, all SKs run out of disk space.
140 : // See go/sk-ood-xlog-switch for more details.
141 : //
142 : // To mitigate this issue, we will re-elect a new offloader if the current offloader is lagging behind too much.
143 : // Each SK makes the decision locally but they are aware of each other's commit and backup lsns.
144 : //
145 : // determine_offloader will pick a SK. say SK-1.
146 : // Each SK checks
147 : // -- if commit_lsn - back_lsn > threshold,
148 : // -- -- remove SK-1 from the candidate and call determine_offloader again.
149 : // SK-1 will step down and all SKs will elect the same leader again.
150 : // After the backup is caught up, the leader will become SK-1 again.
151 0 : fn hadron_determine_offloader(mgr: &Manager, state: &StateSnapshot) -> (Option<NodeId>, String) {
152 : let mut offloader: Option<NodeId>;
153 : let mut election_dbg_str: String;
154 : let caughtup_peers_count: usize;
155 0 : (offloader, election_dbg_str, caughtup_peers_count) =
156 0 : determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
157 :
158 0 : if offloader.is_none()
159 0 : || caughtup_peers_count <= 1
160 0 : || mgr.conf.max_reelect_offloader_lag_bytes == 0
161 : {
162 0 : return (offloader, election_dbg_str);
163 0 : }
164 :
165 0 : let offloader_sk_id = offloader.unwrap();
166 :
167 0 : let backup_lag = state.commit_lsn.checked_sub(state.backup_lsn);
168 0 : if backup_lag.is_none() {
169 0 : debug!("Backup lag is None. Skipping re-election.");
170 0 : return (offloader, election_dbg_str);
171 0 : }
172 :
173 0 : let backup_lag = backup_lag.unwrap().0;
174 :
175 0 : if backup_lag < mgr.conf.max_reelect_offloader_lag_bytes {
176 0 : return (offloader, election_dbg_str);
177 0 : }
178 :
179 0 : info!(
180 0 : "Electing a new leader: Backup lag is too high backup lsn lag {} threshold {}: {}",
181 : backup_lag, mgr.conf.max_reelect_offloader_lag_bytes, election_dbg_str
182 : );
183 0 : BACKUP_REELECT_LEADER_COUNT.inc();
184 : // Remove the current offloader if lag is too high.
185 0 : let new_peers: Vec<_> = state
186 0 : .peers
187 0 : .iter()
188 0 : .filter(|p| p.sk_id != offloader_sk_id)
189 0 : .cloned()
190 0 : .collect();
191 0 : (offloader, election_dbg_str, _) =
192 0 : determine_offloader(&new_peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
193 0 : (offloader, election_dbg_str)
194 0 : }
195 : /* END_HADRON */
196 :
197 : /// The goal is to ensure that normally only one safekeepers offloads. However,
198 : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
199 : /// time we have several ones as they PUT the same files. Also,
200 : /// - frequently changing the offloader would be bad;
201 : /// - electing seriously lagging safekeeper is undesirable;
202 : ///
203 : /// So we deterministically choose among the reasonably caught up candidates.
204 : /// TODO: take into account failed attempts to deal with hypothetical situation
205 : /// where s3 is unreachable only for some sks.
206 0 : fn determine_offloader(
207 0 : alive_peers: &[PeerInfo],
208 0 : wal_backup_lsn: Lsn,
209 0 : ttid: TenantTimelineId,
210 0 : conf: &SafeKeeperConf,
211 0 : ) -> (Option<NodeId>, String, usize) {
212 : // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
213 0 : let capable_peers = alive_peers
214 0 : .iter()
215 0 : .filter(|p| p.local_start_lsn <= wal_backup_lsn);
216 0 : match capable_peers.clone().map(|p| p.commit_lsn).max() {
217 0 : None => (None, "no connected peers to elect from".to_string(), 0),
218 0 : Some(max_commit_lsn) => {
219 0 : let threshold = max_commit_lsn
220 0 : .checked_sub(conf.max_offloader_lag_bytes)
221 0 : .unwrap_or(Lsn(0));
222 0 : let mut caughtup_peers = capable_peers
223 0 : .clone()
224 0 : .filter(|p| p.commit_lsn >= threshold)
225 0 : .collect::<Vec<_>>();
226 0 : caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
227 :
228 : // To distribute the load, shift by timeline_id.
229 0 : let offloader = caughtup_peers
230 0 : [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
231 0 : .sk_id;
232 :
233 0 : let mut capable_peers_dbg = capable_peers
234 0 : .map(|p| (p.sk_id, p.commit_lsn))
235 0 : .collect::<Vec<_>>();
236 0 : capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
237 0 : (
238 0 : Some(offloader),
239 0 : format!(
240 0 : "elected {} among {:?} peers, with {} of them being caughtup",
241 0 : offloader,
242 0 : capable_peers_dbg,
243 0 : caughtup_peers.len()
244 0 : ),
245 0 : caughtup_peers.len(),
246 0 : )
247 : }
248 : }
249 0 : }
250 :
251 : pub struct WalBackup {
252 : storage: Option<Arc<GenericRemoteStorage>>,
253 : }
254 :
255 : impl WalBackup {
256 : /// Create a new WalBackup instance.
257 5 : pub async fn new(conf: &SafeKeeperConf) -> Result<Self> {
258 5 : if !conf.wal_backup_enabled {
259 0 : return Ok(Self { storage: None });
260 5 : }
261 :
262 5 : match conf.remote_storage.as_ref() {
263 0 : Some(config) => {
264 0 : let storage = GenericRemoteStorage::from_config(config).await?;
265 0 : Ok(Self {
266 0 : storage: Some(Arc::new(storage)),
267 0 : })
268 : }
269 5 : None => Ok(Self { storage: None }),
270 : }
271 5 : }
272 :
273 99 : pub fn get_storage(&self) -> Option<Arc<GenericRemoteStorage>> {
274 99 : self.storage.clone()
275 99 : }
276 : }
277 :
278 : struct WalBackupTask {
279 : timeline: WalResidentTimeline,
280 : timeline_dir: Utf8PathBuf,
281 : wal_seg_size: usize,
282 : parallel_jobs: usize,
283 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
284 : storage: Arc<GenericRemoteStorage>,
285 : }
286 :
287 : /// Offload single timeline.
288 : #[instrument(name = "wal_backup", skip_all, fields(ttid = %tli.ttid))]
289 : async fn backup_task_main(
290 : tli: WalResidentTimeline,
291 : storage: Arc<GenericRemoteStorage>,
292 : parallel_jobs: usize,
293 : mut shutdown_rx: Receiver<()>,
294 : ) {
295 : let _guard = WAL_BACKUP_TASKS.guard();
296 : info!("started");
297 :
298 : let cancel = tli.tli.cancel.clone();
299 : let mut wb = WalBackupTask {
300 : wal_seg_size: tli.get_wal_seg_size().await,
301 : commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
302 : timeline_dir: tli.get_timeline_dir(),
303 : timeline: tli,
304 : parallel_jobs,
305 : storage,
306 : };
307 :
308 : // task is spinned up only when wal_seg_size already initialized
309 : assert!(wb.wal_seg_size > 0);
310 :
311 : let mut canceled = false;
312 : select! {
313 : _ = wb.run() => {}
314 : _ = shutdown_rx.recv() => {
315 : canceled = true;
316 : },
317 : _ = cancel.cancelled() => {
318 : canceled = true;
319 : }
320 : }
321 : info!("task {}", if canceled { "canceled" } else { "terminated" });
322 : }
323 :
324 : impl WalBackupTask {
325 : /// This function must be called from a select! that also respects self.timeline's
326 : /// cancellation token. This is done in [`backup_task_main`].
327 : ///
328 : /// The future returned by this function is safe to drop at any time because it
329 : /// does not write to local disk.
330 0 : async fn run(&mut self) {
331 0 : let mut backup_lsn = Lsn(0);
332 :
333 0 : let mut retry_attempt = 0u32;
334 : // offload loop
335 0 : while !self.timeline.cancel.is_cancelled() {
336 0 : if retry_attempt == 0 {
337 : // wait for new WAL to arrive
338 0 : if let Err(e) = self.commit_lsn_watch_rx.changed().await {
339 : // should never happen, as we hold Arc to timeline and transmitter's lifetime
340 : // is within Timeline's
341 0 : error!("commit_lsn watch shut down: {:?}", e);
342 0 : return;
343 0 : };
344 : } else {
345 : // or just sleep if we errored previously
346 0 : let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
347 0 : if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
348 0 : {
349 0 : retry_delay = min(retry_delay, backoff_delay);
350 0 : }
351 0 : tokio::time::sleep(Duration::from_millis(retry_delay)).await;
352 : }
353 :
354 0 : let commit_lsn = *self.commit_lsn_watch_rx.borrow();
355 :
356 : // Note that backup_lsn can be higher than commit_lsn if we
357 : // don't have much local WAL and others already uploaded
358 : // segments we don't even have.
359 0 : if backup_lsn.segment_number(self.wal_seg_size)
360 0 : >= commit_lsn.segment_number(self.wal_seg_size)
361 : {
362 0 : retry_attempt = 0;
363 0 : continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
364 0 : }
365 : // Perhaps peers advanced the position, check shmem value.
366 0 : backup_lsn = self.timeline.get_wal_backup_lsn().await;
367 0 : if backup_lsn.segment_number(self.wal_seg_size)
368 0 : >= commit_lsn.segment_number(self.wal_seg_size)
369 : {
370 0 : retry_attempt = 0;
371 0 : continue;
372 0 : }
373 :
374 0 : match backup_lsn_range(
375 0 : &self.timeline,
376 0 : self.storage.clone(),
377 0 : &mut backup_lsn,
378 0 : commit_lsn,
379 0 : self.wal_seg_size,
380 0 : &self.timeline_dir,
381 0 : self.parallel_jobs,
382 : )
383 0 : .await
384 : {
385 0 : Ok(()) => {
386 0 : retry_attempt = 0;
387 0 : }
388 0 : Err(e) => {
389 : // We might have managed to upload some segment even though
390 : // some later in the range failed, so log backup_lsn
391 : // separately.
392 0 : error!(
393 0 : "failed while offloading range {}-{}, backup_lsn {}: {:?}",
394 : backup_lsn, commit_lsn, backup_lsn, e
395 : );
396 :
397 0 : retry_attempt = retry_attempt.saturating_add(1);
398 : }
399 : }
400 : }
401 0 : }
402 : }
403 :
404 0 : async fn backup_lsn_range(
405 0 : timeline: &WalResidentTimeline,
406 0 : storage: Arc<GenericRemoteStorage>,
407 0 : backup_lsn: &mut Lsn,
408 0 : end_lsn: Lsn,
409 0 : wal_seg_size: usize,
410 0 : timeline_dir: &Utf8Path,
411 0 : parallel_jobs: usize,
412 0 : ) -> Result<()> {
413 0 : if parallel_jobs < 1 {
414 0 : anyhow::bail!("parallel_jobs must be >= 1");
415 0 : }
416 :
417 0 : pausable_failpoint!("backup-lsn-range-pausable");
418 :
419 0 : let remote_timeline_path = &timeline.remote_path;
420 0 : let start_lsn = *backup_lsn;
421 0 : let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
422 :
423 0 : info!(
424 0 : "offloading segnos {:?} of range [{}-{})",
425 0 : segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
426 : start_lsn,
427 : end_lsn,
428 : );
429 :
430 : // Pool of concurrent upload tasks. We use `FuturesOrdered` to
431 : // preserve order of uploads, and update `backup_lsn` only after
432 : // all previous uploads are finished.
433 0 : let mut uploads = FuturesOrdered::new();
434 0 : let mut iter = segments.iter();
435 :
436 : loop {
437 0 : let added_task = match iter.next() {
438 0 : Some(s) => {
439 0 : uploads.push_back(backup_single_segment(
440 0 : &storage,
441 0 : s,
442 0 : timeline_dir,
443 0 : remote_timeline_path,
444 : ));
445 0 : true
446 : }
447 0 : None => false,
448 : };
449 :
450 : // Wait for the next segment to upload if we don't have any more segments,
451 : // or if we have too many concurrent uploads.
452 0 : if !added_task || uploads.len() >= parallel_jobs {
453 0 : let next = uploads.next().await;
454 0 : if let Some(res) = next {
455 : // next segment uploaded
456 0 : let segment = res?;
457 0 : let new_backup_lsn = segment.end_lsn;
458 0 : timeline
459 0 : .set_wal_backup_lsn(new_backup_lsn)
460 0 : .await
461 0 : .context("setting wal_backup_lsn")?;
462 0 : *backup_lsn = new_backup_lsn;
463 : } else {
464 : // no more segments to upload
465 0 : break;
466 : }
467 0 : }
468 : }
469 :
470 0 : info!(
471 0 : "offloaded segnos {:?} of range [{}-{})",
472 0 : segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
473 : start_lsn,
474 : end_lsn,
475 : );
476 0 : Ok(())
477 0 : }
478 :
479 0 : async fn backup_single_segment(
480 0 : storage: &GenericRemoteStorage,
481 0 : seg: &Segment,
482 0 : timeline_dir: &Utf8Path,
483 0 : remote_timeline_path: &RemotePath,
484 0 : ) -> Result<Segment> {
485 0 : let segment_file_path = seg.file_path(timeline_dir)?;
486 0 : let remote_segment_path = seg.remote_path(remote_timeline_path);
487 :
488 0 : let res = backup_object(
489 0 : storage,
490 0 : &segment_file_path,
491 0 : &remote_segment_path,
492 0 : seg.size(),
493 0 : )
494 0 : .await;
495 0 : if res.is_ok() {
496 0 : BACKED_UP_SEGMENTS.inc();
497 0 : } else {
498 0 : BACKUP_ERRORS.inc();
499 0 : }
500 0 : res?;
501 0 : debug!("Backup of {} done", segment_file_path);
502 :
503 0 : Ok(*seg)
504 0 : }
505 :
506 : #[derive(Debug, Copy, Clone)]
507 : pub struct Segment {
508 : seg_no: XLogSegNo,
509 : start_lsn: Lsn,
510 : end_lsn: Lsn,
511 : }
512 :
513 : impl Segment {
514 0 : pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
515 0 : Self {
516 0 : seg_no,
517 0 : start_lsn,
518 0 : end_lsn,
519 0 : }
520 0 : }
521 :
522 0 : pub fn object_name(self) -> String {
523 0 : XLogFileName(PG_TLI, self.seg_no, self.size())
524 0 : }
525 :
526 0 : pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
527 0 : Ok(timeline_dir.join(self.object_name()))
528 0 : }
529 :
530 0 : pub fn remote_path(self, remote_timeline_path: &RemotePath) -> RemotePath {
531 0 : remote_timeline_path.join(self.object_name())
532 0 : }
533 :
534 0 : pub fn size(self) -> usize {
535 0 : (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
536 0 : }
537 : }
538 :
539 0 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
540 0 : let first_seg = start.segment_number(seg_size);
541 0 : let last_seg = end.segment_number(seg_size);
542 :
543 0 : let res: Vec<Segment> = (first_seg..last_seg)
544 0 : .map(|s| {
545 0 : let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
546 0 : let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
547 0 : Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
548 0 : })
549 0 : .collect();
550 0 : res
551 0 : }
552 :
553 0 : async fn backup_object(
554 0 : storage: &GenericRemoteStorage,
555 0 : source_file: &Utf8Path,
556 0 : target_file: &RemotePath,
557 0 : size: usize,
558 0 : ) -> Result<()> {
559 0 : let file = File::open(&source_file)
560 0 : .await
561 0 : .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
562 :
563 0 : let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
564 :
565 0 : let cancel = CancellationToken::new();
566 :
567 0 : storage
568 0 : .upload_storage_object(file, size, target_file, &cancel)
569 0 : .await
570 0 : }
571 :
572 0 : pub(crate) async fn backup_partial_segment(
573 0 : storage: &GenericRemoteStorage,
574 0 : source_file: &Utf8Path,
575 0 : target_file: &RemotePath,
576 0 : size: usize,
577 0 : ) -> Result<()> {
578 0 : let file = File::open(&source_file)
579 0 : .await
580 0 : .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
581 :
582 : // limiting the file to read only the first `size` bytes
583 0 : let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);
584 :
585 0 : let file = tokio_util::io::ReaderStream::with_capacity(limited_file, BUFFER_SIZE);
586 :
587 0 : let cancel = CancellationToken::new();
588 :
589 0 : storage
590 0 : .upload(
591 0 : file,
592 0 : size,
593 0 : target_file,
594 0 : Some(StorageMetadata::from([("sk_type", "partial_segment")])),
595 0 : &cancel,
596 0 : )
597 0 : .await
598 0 : }
599 :
600 0 : pub(crate) async fn copy_partial_segment(
601 0 : storage: &GenericRemoteStorage,
602 0 : source: &RemotePath,
603 0 : destination: &RemotePath,
604 0 : ) -> Result<()> {
605 0 : let cancel = CancellationToken::new();
606 :
607 0 : storage.copy_object(source, destination, &cancel).await
608 0 : }
609 :
610 0 : pub async fn read_object(
611 0 : storage: &GenericRemoteStorage,
612 0 : file_path: &RemotePath,
613 0 : offset: u64,
614 0 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
615 0 : info!("segment download about to start from remote path {file_path:?} at offset {offset}");
616 :
617 0 : let cancel = CancellationToken::new();
618 :
619 0 : let opts = DownloadOpts {
620 0 : byte_start: std::ops::Bound::Included(offset),
621 0 : ..Default::default()
622 0 : };
623 0 : let download = storage
624 0 : .download(file_path, &opts, &cancel)
625 0 : .await
626 0 : .with_context(|| {
627 0 : format!("Failed to open WAL segment download stream for remote path {file_path:?}")
628 0 : })?;
629 :
630 0 : let reader = tokio_util::io::StreamReader::new(download.download_stream);
631 :
632 0 : let reader = tokio::io::BufReader::with_capacity(BUFFER_SIZE, reader);
633 :
634 0 : Ok(Box::pin(reader))
635 0 : }
636 :
637 : /// Delete WAL files for the given timeline. Remote storage must be configured
638 : /// when called.
639 0 : pub async fn delete_timeline(
640 0 : storage: &GenericRemoteStorage,
641 0 : ttid: &TenantTimelineId,
642 0 : ) -> Result<()> {
643 0 : let remote_path = remote_timeline_path(ttid)?;
644 :
645 : // see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
646 : // const Option unwrap is not stable, otherwise it would be const.
647 0 : let batch_size: NonZeroU32 = NonZeroU32::new(1000).unwrap();
648 :
649 : // A backoff::retry is used here for two reasons:
650 : // - To provide a backoff rather than busy-polling the API on errors
651 : // - To absorb transient 429/503 conditions without hitting our error
652 : // logging path for issues deleting objects.
653 : //
654 : // Note: listing segments might take a long time if there are many of them.
655 : // We don't currently have http requests timeout cancellation, but if/once
656 : // we have listing should get streaming interface to make progress.
657 :
658 0 : pausable_failpoint!("sk-delete-timeline-remote-pause");
659 :
660 0 : fail::fail_point!("sk-delete-timeline-remote", |_| {
661 0 : Err(anyhow::anyhow!("failpoint: sk-delete-timeline-remote"))
662 0 : });
663 :
664 0 : let cancel = CancellationToken::new(); // not really used
665 0 : backoff::retry(
666 0 : || async {
667 : // Do list-delete in batch_size batches to make progress even if there a lot of files.
668 : // Alternatively we could make remote storage list return iterator, but it is more complicated and
669 : // I'm not sure deleting while iterating is expected in s3.
670 : loop {
671 0 : let files = storage
672 0 : .list(
673 0 : Some(&remote_path),
674 0 : ListingMode::NoDelimiter,
675 0 : Some(batch_size),
676 0 : &cancel,
677 0 : )
678 0 : .await?
679 : .keys
680 0 : .into_iter()
681 0 : .map(|o| o.key)
682 0 : .collect::<Vec<_>>();
683 0 : if files.is_empty() {
684 0 : return Ok(()); // done
685 0 : }
686 : // (at least) s3 results are sorted, so can log min/max:
687 : // "List results are always returned in UTF-8 binary order."
688 0 : info!(
689 0 : "deleting batch of {} WAL segments [{}-{}]",
690 0 : files.len(),
691 0 : files.first().unwrap().object_name().unwrap_or(""),
692 0 : files.last().unwrap().object_name().unwrap_or("")
693 : );
694 0 : storage.delete_objects(&files, &cancel).await?;
695 : }
696 0 : },
697 : // consider TimeoutOrCancel::caused_by_cancel when using cancellation
698 : |_| false,
699 : 3,
700 : 10,
701 0 : "executing WAL segments deletion batch",
702 0 : &cancel,
703 : )
704 0 : .await
705 0 : .ok_or_else(|| anyhow::anyhow!("canceled"))
706 0 : .and_then(|x| x)?;
707 :
708 0 : Ok(())
709 0 : }
710 :
711 : /// Used by wal_backup_partial.
712 0 : pub async fn delete_objects(storage: &GenericRemoteStorage, paths: &[RemotePath]) -> Result<()> {
713 0 : let cancel = CancellationToken::new(); // not really used
714 0 : storage.delete_objects(paths, &cancel).await
715 0 : }
716 :
717 : /// Copy segments from one timeline to another. Used in copy_timeline.
718 0 : pub async fn copy_s3_segments(
719 0 : storage: &GenericRemoteStorage,
720 0 : wal_seg_size: usize,
721 0 : src_ttid: &TenantTimelineId,
722 0 : dst_ttid: &TenantTimelineId,
723 0 : from_segment: XLogSegNo,
724 0 : to_segment: XLogSegNo,
725 0 : ) -> Result<()> {
726 : const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
727 :
728 0 : let remote_dst_path = remote_timeline_path(dst_ttid)?;
729 :
730 0 : let cancel = CancellationToken::new();
731 :
732 0 : let files = storage
733 0 : .list(
734 0 : Some(&remote_dst_path),
735 0 : ListingMode::NoDelimiter,
736 0 : None,
737 0 : &cancel,
738 0 : )
739 0 : .await?
740 : .keys;
741 :
742 0 : let uploaded_segments = &files
743 0 : .iter()
744 0 : .filter_map(|o| o.key.object_name().map(ToOwned::to_owned))
745 0 : .collect::<HashSet<_>>();
746 :
747 0 : debug!(
748 0 : "these segments have already been uploaded: {:?}",
749 : uploaded_segments
750 : );
751 :
752 0 : for segno in from_segment..to_segment {
753 0 : if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
754 0 : info!("copied all segments from {} until {}", from_segment, segno);
755 0 : }
756 :
757 0 : let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
758 0 : if uploaded_segments.contains(&segment_name) {
759 0 : continue;
760 0 : }
761 0 : debug!("copying segment {}", segment_name);
762 :
763 0 : let from = remote_timeline_path(src_ttid)?.join(&segment_name);
764 0 : let to = remote_dst_path.join(&segment_name);
765 :
766 0 : storage.copy_object(&from, &to, &cancel).await?;
767 : }
768 :
769 0 : info!(
770 0 : "finished copying segments from {} until {}",
771 : from_segment, to_segment
772 : );
773 0 : Ok(())
774 0 : }
775 :
776 : /// Get S3 (remote_storage) prefix path used for timeline files.
777 14 : pub fn remote_timeline_path(ttid: &TenantTimelineId) -> Result<RemotePath> {
778 14 : RemotePath::new(&Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string()))
779 14 : }
|