Line data Source code
1 : use anyhow::{Context, Result};
2 :
3 : use camino::{Utf8Path, Utf8PathBuf};
4 : use futures::stream::FuturesOrdered;
5 : use futures::StreamExt;
6 : use tokio::task::JoinHandle;
7 : use tokio_util::sync::CancellationToken;
8 : use utils::backoff;
9 : use utils::id::NodeId;
10 :
11 : use std::cmp::min;
12 : use std::collections::{HashMap, HashSet};
13 : use std::pin::Pin;
14 : use std::sync::Arc;
15 : use std::time::Duration;
16 :
17 : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
18 : use postgres_ffi::XLogFileName;
19 : use postgres_ffi::{XLogSegNo, PG_TLI};
20 : use remote_storage::{GenericRemoteStorage, RemotePath};
21 : use tokio::fs::File;
22 :
23 : use tokio::select;
24 : use tokio::sync::mpsc::{self, Receiver, Sender};
25 : use tokio::sync::watch;
26 : use tokio::time::sleep;
27 : use tracing::*;
28 :
29 : use utils::{id::TenantTimelineId, lsn::Lsn};
30 :
31 : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
32 : use crate::timeline::{PeerInfo, Timeline};
33 : use crate::{GlobalTimelines, SafeKeeperConf};
34 :
35 : use once_cell::sync::OnceCell;
36 :
37 : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
38 : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
39 :
40 : /// Default buffer size when interfacing with [`tokio::fs::File`].
41 : const BUFFER_SIZE: usize = 32 * 1024;
42 :
43 : /// Check whether wal backup is required for timeline. If yes, mark that launcher is
44 : /// aware of current status and return the timeline.
45 152 : async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
46 152 : match GlobalTimelines::get(ttid).ok() {
47 152 : Some(tli) => {
48 152 : tli.wal_backup_attend().await;
49 152 : Some(tli)
50 : }
51 0 : None => None,
52 : }
53 152 : }
54 :
55 : struct WalBackupTaskHandle {
56 : shutdown_tx: Sender<()>,
57 : handle: JoinHandle<()>,
58 : }
59 :
60 : struct WalBackupTimelineEntry {
61 : timeline: Arc<Timeline>,
62 : handle: Option<WalBackupTaskHandle>,
63 : }
64 :
65 2 : async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
66 2 : if let Some(wb_handle) = entry.handle.take() {
67 : // Tell the task to shutdown. Error means task exited earlier, that's ok.
68 2 : let _ = wb_handle.shutdown_tx.send(()).await;
69 : // Await the task itself. TODO: restart panicked tasks earlier.
70 2 : if let Err(e) = wb_handle.handle.await {
71 0 : warn!("WAL backup task for {} panicked: {}", ttid, e);
72 2 : }
73 0 : }
74 2 : }
75 :
76 : /// The goal is to ensure that normally only one safekeepers offloads. However,
77 : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
78 : /// time we have several ones as they PUT the same files. Also,
79 : /// - frequently changing the offloader would be bad;
80 : /// - electing seriously lagging safekeeper is undesirable;
81 : /// So we deterministically choose among the reasonably caught up candidates.
82 : /// TODO: take into account failed attempts to deal with hypothetical situation
83 : /// where s3 is unreachable only for some sks.
84 253 : fn determine_offloader(
85 253 : alive_peers: &[PeerInfo],
86 253 : wal_backup_lsn: Lsn,
87 253 : ttid: TenantTimelineId,
88 253 : conf: &SafeKeeperConf,
89 253 : ) -> (Option<NodeId>, String) {
90 253 : // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
91 253 : let capable_peers = alive_peers
92 253 : .iter()
93 1857 : .filter(|p| p.local_start_lsn <= wal_backup_lsn);
94 619 : match capable_peers.clone().map(|p| p.commit_lsn).max() {
95 41 : None => (None, "no connected peers to elect from".to_string()),
96 212 : Some(max_commit_lsn) => {
97 212 : let threshold = max_commit_lsn
98 212 : .checked_sub(conf.max_offloader_lag_bytes)
99 212 : .unwrap_or(Lsn(0));
100 212 : let mut caughtup_peers = capable_peers
101 212 : .clone()
102 619 : .filter(|p| p.commit_lsn >= threshold)
103 212 : .collect::<Vec<_>>();
104 426 : caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
105 212 :
106 212 : // To distribute the load, shift by timeline_id.
107 212 : let offloader = caughtup_peers
108 212 : [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
109 212 : .sk_id;
110 212 :
111 212 : let mut capable_peers_dbg = capable_peers
112 619 : .map(|p| (p.sk_id, p.commit_lsn))
113 212 : .collect::<Vec<_>>();
114 426 : capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
115 212 : (
116 212 : Some(offloader),
117 212 : format!(
118 212 : "elected {} among {:?} peers, with {} of them being caughtup",
119 212 : offloader,
120 212 : capable_peers_dbg,
121 212 : caughtup_peers.len()
122 212 : ),
123 212 : )
124 : }
125 : }
126 253 : }
127 :
128 : /// Based on peer information determine which safekeeper should offload; if it
129 : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
130 : /// is running, kill it.
131 253 : async fn update_task(
132 253 : conf: &SafeKeeperConf,
133 253 : ttid: TenantTimelineId,
134 253 : entry: &mut WalBackupTimelineEntry,
135 253 : ) {
136 253 : let alive_peers = entry.timeline.get_peers(conf).await;
137 253 : let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
138 253 : let (offloader, election_dbg_str) =
139 253 : determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
140 253 : let elected_me = Some(conf.my_id) == offloader;
141 253 :
142 253 : if elected_me != (entry.handle.is_some()) {
143 11 : if elected_me {
144 9 : info!("elected for backup: {}", election_dbg_str);
145 :
146 9 : let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
147 9 : let timeline_dir = conf.timeline_dir(&ttid);
148 9 :
149 9 : let handle = tokio::spawn(
150 9 : backup_task_main(
151 9 : ttid,
152 9 : timeline_dir,
153 9 : conf.workdir.clone(),
154 9 : conf.backup_parallel_jobs,
155 9 : shutdown_rx,
156 9 : )
157 9 : .in_current_span(),
158 9 : );
159 9 :
160 9 : entry.handle = Some(WalBackupTaskHandle {
161 9 : shutdown_tx,
162 9 : handle,
163 9 : });
164 : } else {
165 2 : info!("stepping down from backup: {}", election_dbg_str);
166 2 : shut_down_task(ttid, entry).await;
167 : }
168 242 : }
169 253 : }
170 :
171 : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
172 :
173 : // Storage must be configured and initialized when this is called.
174 15 : fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
175 15 : REMOTE_STORAGE
176 15 : .get()
177 15 : .expect("failed to get remote storage")
178 15 : .as_ref()
179 15 : .unwrap()
180 15 : }
181 :
182 : const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
183 :
184 : /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
185 : /// tasks. Having this in separate task simplifies locking, allows to reap
186 : /// panics and separate elections from offloading itself.
187 510 : pub async fn wal_backup_launcher_task_main(
188 510 : conf: SafeKeeperConf,
189 510 : mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
190 510 : ) -> anyhow::Result<()> {
191 510 : info!(
192 510 : "WAL backup launcher started, remote config {:?}",
193 510 : conf.remote_storage
194 510 : );
195 :
196 510 : let conf_ = conf.clone();
197 510 : REMOTE_STORAGE.get_or_init(|| {
198 510 : conf_
199 510 : .remote_storage
200 510 : .as_ref()
201 510 : .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
202 510 : });
203 510 :
204 510 : // Presence in this map means launcher is aware s3 offloading is needed for
205 510 : // the timeline, but task is started only if it makes sense for to offload
206 510 : // from this safekeeper.
207 510 : let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
208 510 :
209 510 : let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
210 21847 : loop {
211 41712 : tokio::select! {
212 12800 : ttid = wal_backup_launcher_rx.recv() => {
213 : // channel is never expected to get closed
214 : let ttid = ttid.unwrap();
215 : if !conf.is_wal_backup_enabled() {
216 : continue; /* just drain the channel and do nothing */
217 : }
218 152 : async {
219 152 : let timeline = is_wal_backup_required(ttid).await;
220 : // do we need to do anything at all?
221 152 : if timeline.is_some() != tasks.contains_key(&ttid) {
222 25 : if let Some(timeline) = timeline {
223 : // need to start the task
224 25 : let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
225 25 : timeline,
226 25 : handle: None,
227 25 : });
228 25 : update_task(&conf, ttid, entry).await;
229 : } else {
230 : // need to stop the task
231 0 : info!("stopping WAL backup task");
232 0 : let mut entry = tasks.remove(&ttid).unwrap();
233 0 : shut_down_task(ttid, &mut entry).await;
234 : }
235 127 : }
236 152 : }.instrument(info_span!("WAL backup", ttid = %ttid)).await;
237 : }
238 : // For each timeline needing offloading, check if this safekeeper
239 : // should do the job and start/stop the task accordingly.
240 : _ = ticker.tick() => {
241 : for (ttid, entry) in tasks.iter_mut() {
242 : update_task(&conf, *ttid, entry)
243 : .instrument(info_span!("WAL backup", ttid = %ttid))
244 : .await;
245 : }
246 : }
247 21847 : }
248 21847 : }
249 : }
250 :
251 : struct WalBackupTask {
252 : timeline: Arc<Timeline>,
253 : timeline_dir: Utf8PathBuf,
254 : workspace_dir: Utf8PathBuf,
255 : wal_seg_size: usize,
256 : parallel_jobs: usize,
257 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
258 : }
259 :
260 : /// Offload single timeline.
261 9 : async fn backup_task_main(
262 9 : ttid: TenantTimelineId,
263 9 : timeline_dir: Utf8PathBuf,
264 9 : workspace_dir: Utf8PathBuf,
265 9 : parallel_jobs: usize,
266 9 : mut shutdown_rx: Receiver<()>,
267 9 : ) {
268 9 : info!("started");
269 9 : let res = GlobalTimelines::get(ttid);
270 9 : if let Err(e) = res {
271 0 : error!("backup error: {}", e);
272 0 : return;
273 9 : }
274 9 : let tli = res.unwrap();
275 :
276 9 : let mut wb = WalBackupTask {
277 9 : wal_seg_size: tli.get_wal_seg_size().await,
278 9 : commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
279 9 : timeline: tli,
280 9 : timeline_dir,
281 9 : workspace_dir,
282 9 : parallel_jobs,
283 9 : };
284 9 :
285 9 : // task is spinned up only when wal_seg_size already initialized
286 9 : assert!(wb.wal_seg_size > 0);
287 :
288 9 : let mut canceled = false;
289 3327 : select! {
290 3327 : _ = wb.run() => {}
291 3327 : _ = shutdown_rx.recv() => {
292 3327 : canceled = true;
293 3327 : }
294 3327 : }
295 2 : info!("task {}", if canceled { "canceled" } else { "terminated" });
296 2 : }
297 :
298 : impl WalBackupTask {
299 9 : async fn run(&mut self) {
300 9 : let mut backup_lsn = Lsn(0);
301 9 :
302 9 : let mut retry_attempt = 0u32;
303 : // offload loop
304 1200 : loop {
305 1200 : if retry_attempt == 0 {
306 : // wait for new WAL to arrive
307 1227 : if let Err(e) = self.commit_lsn_watch_rx.changed().await {
308 : // should never happen, as we hold Arc to timeline.
309 0 : error!("commit_lsn watch shut down: {:?}", e);
310 0 : return;
311 1191 : }
312 : } else {
313 : // or just sleep if we errored previously
314 0 : let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
315 0 : if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
316 0 : {
317 0 : retry_delay = min(retry_delay, backoff_delay);
318 0 : }
319 0 : sleep(Duration::from_millis(retry_delay)).await;
320 : }
321 :
322 1191 : let commit_lsn = *self.commit_lsn_watch_rx.borrow();
323 1191 :
324 1191 : // Note that backup_lsn can be higher than commit_lsn if we
325 1191 : // don't have much local WAL and others already uploaded
326 1191 : // segments we don't even have.
327 1191 : if backup_lsn.segment_number(self.wal_seg_size)
328 1191 : >= commit_lsn.segment_number(self.wal_seg_size)
329 : {
330 1175 : retry_attempt = 0;
331 1175 : continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
332 16 : }
333 16 : // Perhaps peers advanced the position, check shmem value.
334 16 : backup_lsn = self.timeline.get_wal_backup_lsn().await;
335 16 : if backup_lsn.segment_number(self.wal_seg_size)
336 16 : >= commit_lsn.segment_number(self.wal_seg_size)
337 : {
338 4 : retry_attempt = 0;
339 4 : continue;
340 12 : }
341 12 :
342 12 : match backup_lsn_range(
343 12 : &self.timeline,
344 12 : &mut backup_lsn,
345 12 : commit_lsn,
346 12 : self.wal_seg_size,
347 12 : &self.timeline_dir,
348 12 : &self.workspace_dir,
349 12 : self.parallel_jobs,
350 12 : )
351 2087 : .await
352 : {
353 12 : Ok(()) => {
354 12 : retry_attempt = 0;
355 12 : }
356 0 : Err(e) => {
357 0 : error!(
358 0 : "failed while offloading range {}-{}: {:?}",
359 0 : backup_lsn, commit_lsn, e
360 0 : );
361 :
362 0 : retry_attempt = retry_attempt.saturating_add(1);
363 : }
364 : }
365 : }
366 0 : }
367 : }
368 :
369 12 : async fn backup_lsn_range(
370 12 : timeline: &Arc<Timeline>,
371 12 : backup_lsn: &mut Lsn,
372 12 : end_lsn: Lsn,
373 12 : wal_seg_size: usize,
374 12 : timeline_dir: &Utf8Path,
375 12 : workspace_dir: &Utf8Path,
376 12 : parallel_jobs: usize,
377 12 : ) -> Result<()> {
378 12 : if parallel_jobs < 1 {
379 0 : anyhow::bail!("parallel_jobs must be >= 1");
380 12 : }
381 12 :
382 12 : let start_lsn = *backup_lsn;
383 12 : let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
384 12 :
385 12 : // Pool of concurrent upload tasks. We use `FuturesOrdered` to
386 12 : // preserve order of uploads, and update `backup_lsn` only after
387 12 : // all previous uploads are finished.
388 12 : let mut uploads = FuturesOrdered::new();
389 12 : let mut iter = segments.iter();
390 :
391 : loop {
392 36 : let added_task = match iter.next() {
393 12 : Some(s) => {
394 12 : uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
395 12 : true
396 : }
397 24 : None => false,
398 : };
399 :
400 : // Wait for the next segment to upload if we don't have any more segments,
401 : // or if we have too many concurrent uploads.
402 36 : if !added_task || uploads.len() >= parallel_jobs {
403 2086 : let next = uploads.next().await;
404 24 : if let Some(res) = next {
405 : // next segment uploaded
406 12 : let segment = res?;
407 12 : let new_backup_lsn = segment.end_lsn;
408 12 : timeline
409 12 : .set_wal_backup_lsn(new_backup_lsn)
410 1 : .await
411 12 : .context("setting wal_backup_lsn")?;
412 12 : *backup_lsn = new_backup_lsn;
413 : } else {
414 : // no more segments to upload
415 12 : break;
416 : }
417 12 : }
418 : }
419 :
420 12 : info!(
421 12 : "offloaded segnos {:?} up to {}, previous backup_lsn {}",
422 12 : segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
423 12 : end_lsn,
424 12 : start_lsn,
425 12 : );
426 12 : Ok(())
427 12 : }
428 :
429 12 : async fn backup_single_segment(
430 12 : seg: &Segment,
431 12 : timeline_dir: &Utf8Path,
432 12 : workspace_dir: &Utf8Path,
433 12 : ) -> Result<Segment> {
434 12 : let segment_file_path = seg.file_path(timeline_dir)?;
435 12 : let remote_segment_path = segment_file_path
436 12 : .strip_prefix(workspace_dir)
437 12 : .context("Failed to strip workspace dir prefix")
438 12 : .and_then(RemotePath::new)
439 12 : .with_context(|| {
440 0 : format!(
441 0 : "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
442 0 : )
443 12 : })?;
444 :
445 1642 : let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
446 12 : if res.is_ok() {
447 12 : BACKED_UP_SEGMENTS.inc();
448 12 : } else {
449 0 : BACKUP_ERRORS.inc();
450 0 : }
451 12 : res?;
452 0 : debug!("Backup of {} done", segment_file_path);
453 :
454 12 : Ok(*seg)
455 12 : }
456 :
457 0 : #[derive(Debug, Copy, Clone)]
458 : pub struct Segment {
459 : seg_no: XLogSegNo,
460 : start_lsn: Lsn,
461 : end_lsn: Lsn,
462 : }
463 :
464 : impl Segment {
465 12 : pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
466 12 : Self {
467 12 : seg_no,
468 12 : start_lsn,
469 12 : end_lsn,
470 12 : }
471 12 : }
472 :
473 12 : pub fn object_name(self) -> String {
474 12 : XLogFileName(PG_TLI, self.seg_no, self.size())
475 12 : }
476 :
477 12 : pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
478 12 : Ok(timeline_dir.join(self.object_name()))
479 12 : }
480 :
481 24 : pub fn size(self) -> usize {
482 24 : (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
483 24 : }
484 : }
485 :
486 12 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
487 12 : let first_seg = start.segment_number(seg_size);
488 12 : let last_seg = end.segment_number(seg_size);
489 12 :
490 12 : let res: Vec<Segment> = (first_seg..last_seg)
491 12 : .map(|s| {
492 12 : let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
493 12 : let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
494 12 : Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
495 12 : })
496 12 : .collect();
497 12 : res
498 12 : }
499 :
500 12 : async fn backup_object(
501 12 : source_file: &Utf8Path,
502 12 : target_file: &RemotePath,
503 12 : size: usize,
504 12 : ) -> Result<()> {
505 12 : let storage = get_configured_remote_storage();
506 :
507 12 : let file = File::open(&source_file)
508 12 : .await
509 12 : .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
510 :
511 12 : let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
512 12 :
513 1630 : storage.upload_storage_object(file, size, target_file).await
514 12 : }
515 :
516 38 : pub async fn read_object(
517 38 : file_path: &RemotePath,
518 38 : offset: u64,
519 38 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
520 38 : let storage = REMOTE_STORAGE
521 38 : .get()
522 38 : .context("Failed to get remote storage")?
523 38 : .as_ref()
524 38 : .context("No remote storage configured")?;
525 :
526 38 : info!("segment download about to start from remote path {file_path:?} at offset {offset}");
527 :
528 38 : let download = storage
529 38 : .download_storage_object(Some((offset, None)), file_path)
530 111 : .await
531 38 : .with_context(|| {
532 0 : format!("Failed to open WAL segment download stream for remote path {file_path:?}")
533 38 : })?;
534 :
535 38 : let reader = tokio_util::io::StreamReader::new(download.download_stream);
536 38 :
537 38 : let reader = tokio::io::BufReader::with_capacity(BUFFER_SIZE, reader);
538 38 :
539 38 : Ok(Box::pin(reader))
540 38 : }
541 :
542 : /// Delete WAL files for the given timeline. Remote storage must be configured
543 : /// when called.
544 3 : pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
545 3 : let storage = get_configured_remote_storage();
546 3 : let ttid_path = Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string());
547 3 : let remote_path = RemotePath::new(&ttid_path)?;
548 :
549 : // A backoff::retry is used here for two reasons:
550 : // - To provide a backoff rather than busy-polling the API on errors
551 : // - To absorb transient 429/503 conditions without hitting our error
552 : // logging path for issues deleting objects.
553 : //
554 : // Note: listing segments might take a long time if there are many of them.
555 : // We don't currently have http requests timeout cancellation, but if/once
556 : // we have listing should get streaming interface to make progress.
557 3 : let token = CancellationToken::new(); // not really used
558 3 : backoff::retry(
559 3 : || async {
560 16 : let files = storage.list_files(Some(&remote_path)).await?;
561 3 : storage.delete_objects(&files).await
562 3 : },
563 3 : |_| false,
564 3 : 3,
565 3 : 10,
566 3 : "executing WAL segments deletion batch",
567 3 : &token,
568 3 : )
569 18 : .await
570 3 : .ok_or_else(|| anyhow::anyhow!("canceled"))
571 3 : .and_then(|x| x)?;
572 :
573 3 : Ok(())
574 3 : }
575 :
576 : /// Copy segments from one timeline to another. Used in copy_timeline.
577 48 : pub async fn copy_s3_segments(
578 48 : wal_seg_size: usize,
579 48 : src_ttid: &TenantTimelineId,
580 48 : dst_ttid: &TenantTimelineId,
581 48 : from_segment: XLogSegNo,
582 48 : to_segment: XLogSegNo,
583 48 : ) -> Result<()> {
584 48 : const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
585 48 :
586 48 : let storage = REMOTE_STORAGE
587 48 : .get()
588 48 : .expect("failed to get remote storage")
589 48 : .as_ref()
590 48 : .unwrap();
591 48 :
592 48 : let relative_dst_path =
593 48 : Utf8Path::new(&dst_ttid.tenant_id.to_string()).join(dst_ttid.timeline_id.to_string());
594 :
595 48 : let remote_path = RemotePath::new(&relative_dst_path)?;
596 :
597 191 : let files = storage.list_files(Some(&remote_path)).await?;
598 48 : let uploaded_segments = &files
599 48 : .iter()
600 48 : .filter_map(|file| file.object_name().map(ToOwned::to_owned))
601 48 : .collect::<HashSet<_>>();
602 :
603 0 : debug!(
604 0 : "these segments have already been uploaded: {:?}",
605 0 : uploaded_segments
606 0 : );
607 :
608 48 : let relative_src_path =
609 48 : Utf8Path::new(&src_ttid.tenant_id.to_string()).join(src_ttid.timeline_id.to_string());
610 :
611 48 : for segno in from_segment..to_segment {
612 36 : if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
613 0 : info!("copied all segments from {} until {}", from_segment, segno);
614 36 : }
615 :
616 36 : let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
617 36 : if uploaded_segments.contains(&segment_name) {
618 24 : continue;
619 12 : }
620 0 : debug!("copying segment {}", segment_name);
621 :
622 12 : let from = RemotePath::new(&relative_src_path.join(&segment_name))?;
623 12 : let to = RemotePath::new(&relative_dst_path.join(&segment_name))?;
624 :
625 48 : storage.copy_object(&from, &to).await?;
626 : }
627 :
628 48 : info!(
629 48 : "finished copying segments from {} until {}",
630 48 : from_segment, to_segment
631 48 : );
632 48 : Ok(())
633 48 : }
|