Line data Source code
1 : use anyhow::{Context, Result};
2 :
3 : use camino::{Utf8Path, Utf8PathBuf};
4 : use futures::stream::FuturesOrdered;
5 : use futures::StreamExt;
6 : use tokio::task::JoinHandle;
7 : use tokio_util::sync::CancellationToken;
8 : use utils::backoff;
9 : use utils::id::NodeId;
10 :
11 : use std::cmp::min;
12 : use std::collections::{HashMap, HashSet};
13 : use std::num::NonZeroU32;
14 : use std::pin::Pin;
15 : use std::sync::Arc;
16 : use std::time::Duration;
17 :
18 : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
19 : use postgres_ffi::XLogFileName;
20 : use postgres_ffi::{XLogSegNo, PG_TLI};
21 : use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata};
22 : use tokio::fs::File;
23 :
24 : use tokio::select;
25 : use tokio::sync::mpsc::{self, Receiver, Sender};
26 : use tokio::sync::watch;
27 : use tokio::time::sleep;
28 : use tracing::*;
29 :
30 : use utils::{id::TenantTimelineId, lsn::Lsn};
31 :
32 : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
33 : use crate::timeline::{PeerInfo, Timeline};
34 : use crate::{GlobalTimelines, SafeKeeperConf};
35 :
36 : use once_cell::sync::OnceCell;
37 :
38 : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
39 : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
40 :
41 : /// Default buffer size when interfacing with [`tokio::fs::File`].
42 : const BUFFER_SIZE: usize = 32 * 1024;
43 :
44 : /// Check whether wal backup is required for timeline. If yes, mark that launcher is
45 : /// aware of current status and return the timeline.
46 0 : async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
47 0 : match GlobalTimelines::get(ttid).ok() {
48 0 : Some(tli) => {
49 0 : tli.wal_backup_attend().await;
50 0 : Some(tli)
51 : }
52 0 : None => None,
53 : }
54 0 : }
55 :
56 : struct WalBackupTaskHandle {
57 : shutdown_tx: Sender<()>,
58 : handle: JoinHandle<()>,
59 : }
60 :
61 : struct WalBackupTimelineEntry {
62 : timeline: Arc<Timeline>,
63 : handle: Option<WalBackupTaskHandle>,
64 : }
65 :
66 0 : async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
67 0 : if let Some(wb_handle) = entry.handle.take() {
68 : // Tell the task to shutdown. Error means task exited earlier, that's ok.
69 0 : let _ = wb_handle.shutdown_tx.send(()).await;
70 : // Await the task itself. TODO: restart panicked tasks earlier.
71 0 : if let Err(e) = wb_handle.handle.await {
72 0 : warn!("WAL backup task for {} panicked: {}", ttid, e);
73 0 : }
74 0 : }
75 0 : }
76 :
77 : /// The goal is to ensure that normally only one safekeepers offloads. However,
78 : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
79 : /// time we have several ones as they PUT the same files. Also,
80 : /// - frequently changing the offloader would be bad;
81 : /// - electing seriously lagging safekeeper is undesirable;
82 : /// So we deterministically choose among the reasonably caught up candidates.
83 : /// TODO: take into account failed attempts to deal with hypothetical situation
84 : /// where s3 is unreachable only for some sks.
85 0 : fn determine_offloader(
86 0 : alive_peers: &[PeerInfo],
87 0 : wal_backup_lsn: Lsn,
88 0 : ttid: TenantTimelineId,
89 0 : conf: &SafeKeeperConf,
90 0 : ) -> (Option<NodeId>, String) {
91 0 : // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
92 0 : let capable_peers = alive_peers
93 0 : .iter()
94 0 : .filter(|p| p.local_start_lsn <= wal_backup_lsn);
95 0 : match capable_peers.clone().map(|p| p.commit_lsn).max() {
96 0 : None => (None, "no connected peers to elect from".to_string()),
97 0 : Some(max_commit_lsn) => {
98 0 : let threshold = max_commit_lsn
99 0 : .checked_sub(conf.max_offloader_lag_bytes)
100 0 : .unwrap_or(Lsn(0));
101 0 : let mut caughtup_peers = capable_peers
102 0 : .clone()
103 0 : .filter(|p| p.commit_lsn >= threshold)
104 0 : .collect::<Vec<_>>();
105 0 : caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
106 0 :
107 0 : // To distribute the load, shift by timeline_id.
108 0 : let offloader = caughtup_peers
109 0 : [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
110 0 : .sk_id;
111 0 :
112 0 : let mut capable_peers_dbg = capable_peers
113 0 : .map(|p| (p.sk_id, p.commit_lsn))
114 0 : .collect::<Vec<_>>();
115 0 : capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
116 0 : (
117 0 : Some(offloader),
118 0 : format!(
119 0 : "elected {} among {:?} peers, with {} of them being caughtup",
120 0 : offloader,
121 0 : capable_peers_dbg,
122 0 : caughtup_peers.len()
123 0 : ),
124 0 : )
125 : }
126 : }
127 0 : }
128 :
129 : /// Based on peer information determine which safekeeper should offload; if it
130 : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
131 : /// is running, kill it.
132 0 : async fn update_task(
133 0 : conf: &SafeKeeperConf,
134 0 : ttid: TenantTimelineId,
135 0 : entry: &mut WalBackupTimelineEntry,
136 0 : ) {
137 0 : let alive_peers = entry.timeline.get_peers(conf).await;
138 0 : let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
139 0 : let (offloader, election_dbg_str) =
140 0 : determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
141 0 : let elected_me = Some(conf.my_id) == offloader;
142 0 :
143 0 : if elected_me != (entry.handle.is_some()) {
144 0 : if elected_me {
145 0 : info!("elected for backup: {}", election_dbg_str);
146 :
147 0 : let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
148 0 : let timeline_dir = conf.timeline_dir(&ttid);
149 0 :
150 0 : let handle = tokio::spawn(
151 0 : backup_task_main(
152 0 : ttid,
153 0 : timeline_dir,
154 0 : conf.workdir.clone(),
155 0 : conf.backup_parallel_jobs,
156 0 : shutdown_rx,
157 0 : )
158 0 : .in_current_span(),
159 0 : );
160 0 :
161 0 : entry.handle = Some(WalBackupTaskHandle {
162 0 : shutdown_tx,
163 0 : handle,
164 0 : });
165 : } else {
166 0 : info!("stepping down from backup: {}", election_dbg_str);
167 0 : shut_down_task(ttid, entry).await;
168 : }
169 0 : }
170 0 : }
171 :
172 : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
173 :
174 : // Storage must be configured and initialized when this is called.
175 0 : fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
176 0 : REMOTE_STORAGE
177 0 : .get()
178 0 : .expect("failed to get remote storage")
179 0 : .as_ref()
180 0 : .unwrap()
181 0 : }
182 :
183 0 : pub fn init_remote_storage(conf: &SafeKeeperConf) {
184 0 : // TODO: refactor REMOTE_STORAGE to avoid using global variables, and provide
185 0 : // dependencies to all tasks instead.
186 0 : REMOTE_STORAGE.get_or_init(|| {
187 0 : conf.remote_storage
188 0 : .as_ref()
189 0 : .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
190 0 : });
191 0 : }
192 :
193 : const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
194 :
195 : /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
196 : /// tasks. Having this in separate task simplifies locking, allows to reap
197 : /// panics and separate elections from offloading itself.
198 0 : pub async fn wal_backup_launcher_task_main(
199 0 : conf: SafeKeeperConf,
200 0 : mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
201 0 : ) -> anyhow::Result<()> {
202 0 : info!(
203 0 : "WAL backup launcher started, remote config {:?}",
204 : conf.remote_storage
205 : );
206 :
207 : // Presence in this map means launcher is aware s3 offloading is needed for
208 : // the timeline, but task is started only if it makes sense for to offload
209 : // from this safekeeper.
210 0 : let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
211 0 :
212 0 : let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
213 0 : loop {
214 0 : tokio::select! {
215 : ttid = wal_backup_launcher_rx.recv() => {
216 : // channel is never expected to get closed
217 : let ttid = ttid.unwrap();
218 : if !conf.is_wal_backup_enabled() {
219 : continue; /* just drain the channel and do nothing */
220 : }
221 0 : async {
222 0 : let timeline = is_wal_backup_required(ttid).await;
223 : // do we need to do anything at all?
224 0 : if timeline.is_some() != tasks.contains_key(&ttid) {
225 0 : if let Some(timeline) = timeline {
226 : // need to start the task
227 0 : let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
228 0 : timeline,
229 0 : handle: None,
230 0 : });
231 0 : update_task(&conf, ttid, entry).await;
232 : } else {
233 : // need to stop the task
234 0 : info!("stopping WAL backup task");
235 0 : let mut entry = tasks.remove(&ttid).unwrap();
236 0 : shut_down_task(ttid, &mut entry).await;
237 : }
238 0 : }
239 0 : }.instrument(info_span!("WAL backup", ttid = %ttid)).await;
240 : }
241 : // For each timeline needing offloading, check if this safekeeper
242 : // should do the job and start/stop the task accordingly.
243 : _ = ticker.tick() => {
244 : for (ttid, entry) in tasks.iter_mut() {
245 : update_task(&conf, *ttid, entry)
246 : .instrument(info_span!("WAL backup", ttid = %ttid))
247 : .await;
248 : }
249 : }
250 0 : }
251 0 : }
252 : }
253 :
254 : struct WalBackupTask {
255 : timeline: Arc<Timeline>,
256 : timeline_dir: Utf8PathBuf,
257 : workspace_dir: Utf8PathBuf,
258 : wal_seg_size: usize,
259 : parallel_jobs: usize,
260 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
261 : }
262 :
263 : /// Offload single timeline.
264 0 : async fn backup_task_main(
265 0 : ttid: TenantTimelineId,
266 0 : timeline_dir: Utf8PathBuf,
267 0 : workspace_dir: Utf8PathBuf,
268 0 : parallel_jobs: usize,
269 0 : mut shutdown_rx: Receiver<()>,
270 0 : ) {
271 0 : info!("started");
272 0 : let res = GlobalTimelines::get(ttid);
273 0 : if let Err(e) = res {
274 0 : error!("backup error: {}", e);
275 0 : return;
276 0 : }
277 0 : let tli = res.unwrap();
278 :
279 0 : let mut wb = WalBackupTask {
280 0 : wal_seg_size: tli.get_wal_seg_size().await,
281 0 : commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
282 0 : timeline: tli,
283 0 : timeline_dir,
284 0 : workspace_dir,
285 0 : parallel_jobs,
286 0 : };
287 0 :
288 0 : // task is spinned up only when wal_seg_size already initialized
289 0 : assert!(wb.wal_seg_size > 0);
290 :
291 0 : let mut canceled = false;
292 : select! {
293 : _ = wb.run() => {}
294 : _ = shutdown_rx.recv() => {
295 : canceled = true;
296 : }
297 : }
298 0 : info!("task {}", if canceled { "canceled" } else { "terminated" });
299 0 : }
300 :
301 : impl WalBackupTask {
302 0 : async fn run(&mut self) {
303 0 : let mut backup_lsn = Lsn(0);
304 0 :
305 0 : let mut retry_attempt = 0u32;
306 : // offload loop
307 0 : loop {
308 0 : if retry_attempt == 0 {
309 : // wait for new WAL to arrive
310 0 : if let Err(e) = self.commit_lsn_watch_rx.changed().await {
311 : // should never happen, as we hold Arc to timeline.
312 0 : error!("commit_lsn watch shut down: {:?}", e);
313 0 : return;
314 0 : }
315 : } else {
316 : // or just sleep if we errored previously
317 0 : let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
318 0 : if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
319 0 : {
320 0 : retry_delay = min(retry_delay, backoff_delay);
321 0 : }
322 0 : sleep(Duration::from_millis(retry_delay)).await;
323 : }
324 :
325 0 : let commit_lsn = *self.commit_lsn_watch_rx.borrow();
326 0 :
327 0 : // Note that backup_lsn can be higher than commit_lsn if we
328 0 : // don't have much local WAL and others already uploaded
329 0 : // segments we don't even have.
330 0 : if backup_lsn.segment_number(self.wal_seg_size)
331 0 : >= commit_lsn.segment_number(self.wal_seg_size)
332 : {
333 0 : retry_attempt = 0;
334 0 : continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
335 0 : }
336 0 : // Perhaps peers advanced the position, check shmem value.
337 0 : backup_lsn = self.timeline.get_wal_backup_lsn().await;
338 0 : if backup_lsn.segment_number(self.wal_seg_size)
339 0 : >= commit_lsn.segment_number(self.wal_seg_size)
340 : {
341 0 : retry_attempt = 0;
342 0 : continue;
343 0 : }
344 0 :
345 0 : match backup_lsn_range(
346 0 : &self.timeline,
347 0 : &mut backup_lsn,
348 0 : commit_lsn,
349 0 : self.wal_seg_size,
350 0 : &self.timeline_dir,
351 0 : &self.workspace_dir,
352 0 : self.parallel_jobs,
353 0 : )
354 0 : .await
355 : {
356 0 : Ok(()) => {
357 0 : retry_attempt = 0;
358 0 : }
359 0 : Err(e) => {
360 0 : error!(
361 0 : "failed while offloading range {}-{}: {:?}",
362 : backup_lsn, commit_lsn, e
363 : );
364 :
365 0 : retry_attempt = retry_attempt.saturating_add(1);
366 : }
367 : }
368 : }
369 0 : }
370 : }
371 :
372 0 : async fn backup_lsn_range(
373 0 : timeline: &Arc<Timeline>,
374 0 : backup_lsn: &mut Lsn,
375 0 : end_lsn: Lsn,
376 0 : wal_seg_size: usize,
377 0 : timeline_dir: &Utf8Path,
378 0 : workspace_dir: &Utf8Path,
379 0 : parallel_jobs: usize,
380 0 : ) -> Result<()> {
381 0 : if parallel_jobs < 1 {
382 0 : anyhow::bail!("parallel_jobs must be >= 1");
383 0 : }
384 0 :
385 0 : let start_lsn = *backup_lsn;
386 0 : let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
387 0 :
388 0 : // Pool of concurrent upload tasks. We use `FuturesOrdered` to
389 0 : // preserve order of uploads, and update `backup_lsn` only after
390 0 : // all previous uploads are finished.
391 0 : let mut uploads = FuturesOrdered::new();
392 0 : let mut iter = segments.iter();
393 :
394 : loop {
395 0 : let added_task = match iter.next() {
396 0 : Some(s) => {
397 0 : uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
398 0 : true
399 : }
400 0 : None => false,
401 : };
402 :
403 : // Wait for the next segment to upload if we don't have any more segments,
404 : // or if we have too many concurrent uploads.
405 0 : if !added_task || uploads.len() >= parallel_jobs {
406 0 : let next = uploads.next().await;
407 0 : if let Some(res) = next {
408 : // next segment uploaded
409 0 : let segment = res?;
410 0 : let new_backup_lsn = segment.end_lsn;
411 0 : timeline
412 0 : .set_wal_backup_lsn(new_backup_lsn)
413 0 : .await
414 0 : .context("setting wal_backup_lsn")?;
415 0 : *backup_lsn = new_backup_lsn;
416 : } else {
417 : // no more segments to upload
418 0 : break;
419 : }
420 0 : }
421 : }
422 :
423 0 : info!(
424 0 : "offloaded segnos {:?} up to {}, previous backup_lsn {}",
425 0 : segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
426 : end_lsn,
427 : start_lsn,
428 : );
429 0 : Ok(())
430 0 : }
431 :
432 0 : async fn backup_single_segment(
433 0 : seg: &Segment,
434 0 : timeline_dir: &Utf8Path,
435 0 : workspace_dir: &Utf8Path,
436 0 : ) -> Result<Segment> {
437 0 : let segment_file_path = seg.file_path(timeline_dir)?;
438 0 : let remote_segment_path = segment_file_path
439 0 : .strip_prefix(workspace_dir)
440 0 : .context("Failed to strip workspace dir prefix")
441 0 : .and_then(RemotePath::new)
442 0 : .with_context(|| {
443 0 : format!(
444 0 : "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
445 0 : )
446 0 : })?;
447 :
448 0 : let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
449 0 : if res.is_ok() {
450 0 : BACKED_UP_SEGMENTS.inc();
451 0 : } else {
452 0 : BACKUP_ERRORS.inc();
453 0 : }
454 0 : res?;
455 0 : debug!("Backup of {} done", segment_file_path);
456 :
457 0 : Ok(*seg)
458 0 : }
459 :
460 : #[derive(Debug, Copy, Clone)]
461 : pub struct Segment {
462 : seg_no: XLogSegNo,
463 : start_lsn: Lsn,
464 : end_lsn: Lsn,
465 : }
466 :
467 : impl Segment {
468 0 : pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
469 0 : Self {
470 0 : seg_no,
471 0 : start_lsn,
472 0 : end_lsn,
473 0 : }
474 0 : }
475 :
476 0 : pub fn object_name(self) -> String {
477 0 : XLogFileName(PG_TLI, self.seg_no, self.size())
478 0 : }
479 :
480 0 : pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
481 0 : Ok(timeline_dir.join(self.object_name()))
482 0 : }
483 :
484 0 : pub fn size(self) -> usize {
485 0 : (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
486 0 : }
487 : }
488 :
489 0 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
490 0 : let first_seg = start.segment_number(seg_size);
491 0 : let last_seg = end.segment_number(seg_size);
492 0 :
493 0 : let res: Vec<Segment> = (first_seg..last_seg)
494 0 : .map(|s| {
495 0 : let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
496 0 : let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
497 0 : Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
498 0 : })
499 0 : .collect();
500 0 : res
501 0 : }
502 :
503 0 : async fn backup_object(
504 0 : source_file: &Utf8Path,
505 0 : target_file: &RemotePath,
506 0 : size: usize,
507 0 : ) -> Result<()> {
508 0 : let storage = get_configured_remote_storage();
509 :
510 0 : let file = File::open(&source_file)
511 0 : .await
512 0 : .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
513 :
514 0 : let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
515 0 :
516 0 : let cancel = CancellationToken::new();
517 0 :
518 0 : storage
519 0 : .upload_storage_object(file, size, target_file, &cancel)
520 0 : .await
521 0 : }
522 :
523 0 : pub(crate) async fn backup_partial_segment(
524 0 : source_file: &Utf8Path,
525 0 : target_file: &RemotePath,
526 0 : size: usize,
527 0 : ) -> Result<()> {
528 0 : let storage = get_configured_remote_storage();
529 :
530 0 : let file = File::open(&source_file)
531 0 : .await
532 0 : .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
533 :
534 : // limiting the file to read only the first `size` bytes
535 0 : let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);
536 0 :
537 0 : let file = tokio_util::io::ReaderStream::with_capacity(limited_file, BUFFER_SIZE);
538 0 :
539 0 : let cancel = CancellationToken::new();
540 0 :
541 0 : storage
542 0 : .upload(
543 0 : file,
544 0 : size,
545 0 : target_file,
546 0 : Some(StorageMetadata::from([("sk_type", "partial_segment")])),
547 0 : &cancel,
548 0 : )
549 0 : .await
550 0 : }
551 :
552 0 : pub async fn read_object(
553 0 : file_path: &RemotePath,
554 0 : offset: u64,
555 0 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
556 0 : let storage = REMOTE_STORAGE
557 0 : .get()
558 0 : .context("Failed to get remote storage")?
559 0 : .as_ref()
560 0 : .context("No remote storage configured")?;
561 :
562 0 : info!("segment download about to start from remote path {file_path:?} at offset {offset}");
563 :
564 0 : let cancel = CancellationToken::new();
565 :
566 0 : let download = storage
567 0 : .download_storage_object(Some((offset, None)), file_path, &cancel)
568 0 : .await
569 0 : .with_context(|| {
570 0 : format!("Failed to open WAL segment download stream for remote path {file_path:?}")
571 0 : })?;
572 :
573 0 : let reader = tokio_util::io::StreamReader::new(download.download_stream);
574 0 :
575 0 : let reader = tokio::io::BufReader::with_capacity(BUFFER_SIZE, reader);
576 0 :
577 0 : Ok(Box::pin(reader))
578 0 : }
579 :
580 : /// Delete WAL files for the given timeline. Remote storage must be configured
581 : /// when called.
582 0 : pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
583 0 : let storage = get_configured_remote_storage();
584 0 : let ttid_path = Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string());
585 0 : let remote_path = RemotePath::new(&ttid_path)?;
586 :
587 : // see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
588 : // const Option unwrap is not stable, otherwise it would be const.
589 0 : let batch_size: NonZeroU32 = NonZeroU32::new(1000).unwrap();
590 0 :
591 0 : // A backoff::retry is used here for two reasons:
592 0 : // - To provide a backoff rather than busy-polling the API on errors
593 0 : // - To absorb transient 429/503 conditions without hitting our error
594 0 : // logging path for issues deleting objects.
595 0 : //
596 0 : // Note: listing segments might take a long time if there are many of them.
597 0 : // We don't currently have http requests timeout cancellation, but if/once
598 0 : // we have listing should get streaming interface to make progress.
599 0 :
600 0 : let cancel = CancellationToken::new(); // not really used
601 0 : backoff::retry(
602 0 : || async {
603 0 : // Do list-delete in batch_size batches to make progress even if there a lot of files.
604 0 : // Alternatively we could make remote storage list return iterator, but it is more complicated and
605 0 : // I'm not sure deleting while iterating is expected in s3.
606 0 : loop {
607 0 : let files = storage
608 0 : .list(
609 0 : Some(&remote_path),
610 0 : ListingMode::NoDelimiter,
611 0 : Some(batch_size),
612 0 : &cancel,
613 0 : )
614 0 : .await?
615 0 : .keys;
616 0 : if files.is_empty() {
617 0 : return Ok(()); // done
618 0 : }
619 0 : // (at least) s3 results are sorted, so can log min/max:
620 0 : // "List results are always returned in UTF-8 binary order."
621 0 : info!(
622 0 : "deleting batch of {} WAL segments [{}-{}]",
623 0 : files.len(),
624 0 : files.first().unwrap().object_name().unwrap_or(""),
625 0 : files.last().unwrap().object_name().unwrap_or("")
626 0 : );
627 0 : storage.delete_objects(&files, &cancel).await?;
628 0 : }
629 0 : },
630 0 : // consider TimeoutOrCancel::caused_by_cancel when using cancellation
631 0 : |_| false,
632 0 : 3,
633 0 : 10,
634 0 : "executing WAL segments deletion batch",
635 0 : &cancel,
636 0 : )
637 0 : .await
638 0 : .ok_or_else(|| anyhow::anyhow!("canceled"))
639 0 : .and_then(|x| x)?;
640 :
641 0 : Ok(())
642 0 : }
643 :
644 : /// Used by wal_backup_partial.
645 0 : pub async fn delete_objects(paths: &[RemotePath]) -> Result<()> {
646 0 : let cancel = CancellationToken::new(); // not really used
647 0 : let storage = get_configured_remote_storage();
648 0 : storage.delete_objects(paths, &cancel).await
649 0 : }
650 :
651 : /// Copy segments from one timeline to another. Used in copy_timeline.
652 0 : pub async fn copy_s3_segments(
653 0 : wal_seg_size: usize,
654 0 : src_ttid: &TenantTimelineId,
655 0 : dst_ttid: &TenantTimelineId,
656 0 : from_segment: XLogSegNo,
657 0 : to_segment: XLogSegNo,
658 0 : ) -> Result<()> {
659 0 : const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
660 0 :
661 0 : let storage = REMOTE_STORAGE
662 0 : .get()
663 0 : .expect("failed to get remote storage")
664 0 : .as_ref()
665 0 : .unwrap();
666 0 :
667 0 : let relative_dst_path =
668 0 : Utf8Path::new(&dst_ttid.tenant_id.to_string()).join(dst_ttid.timeline_id.to_string());
669 :
670 0 : let remote_path = RemotePath::new(&relative_dst_path)?;
671 :
672 0 : let cancel = CancellationToken::new();
673 :
674 0 : let files = storage
675 0 : .list(Some(&remote_path), ListingMode::NoDelimiter, None, &cancel)
676 0 : .await?
677 : .keys;
678 :
679 0 : let uploaded_segments = &files
680 0 : .iter()
681 0 : .filter_map(|file| file.object_name().map(ToOwned::to_owned))
682 0 : .collect::<HashSet<_>>();
683 0 :
684 0 : debug!(
685 0 : "these segments have already been uploaded: {:?}",
686 : uploaded_segments
687 : );
688 :
689 0 : let relative_src_path =
690 0 : Utf8Path::new(&src_ttid.tenant_id.to_string()).join(src_ttid.timeline_id.to_string());
691 :
692 0 : for segno in from_segment..to_segment {
693 0 : if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
694 0 : info!("copied all segments from {} until {}", from_segment, segno);
695 0 : }
696 :
697 0 : let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
698 0 : if uploaded_segments.contains(&segment_name) {
699 0 : continue;
700 0 : }
701 0 : debug!("copying segment {}", segment_name);
702 :
703 0 : let from = RemotePath::new(&relative_src_path.join(&segment_name))?;
704 0 : let to = RemotePath::new(&relative_dst_path.join(&segment_name))?;
705 :
706 0 : storage.copy_object(&from, &to, &cancel).await?;
707 : }
708 :
709 0 : info!(
710 0 : "finished copying segments from {} until {}",
711 : from_segment, to_segment
712 : );
713 0 : Ok(())
714 0 : }
|