Line data Source code
1 : use anyhow::{Context, Result};
2 :
3 : use camino::{Utf8Path, Utf8PathBuf};
4 : use futures::stream::FuturesOrdered;
5 : use futures::StreamExt;
6 : use tokio::task::JoinHandle;
7 : use tokio_util::sync::CancellationToken;
8 : use utils::backoff;
9 : use utils::id::NodeId;
10 :
11 : use std::cmp::min;
12 : use std::collections::HashSet;
13 : use std::num::NonZeroU32;
14 : use std::pin::Pin;
15 : use std::time::Duration;
16 :
17 : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
18 : use postgres_ffi::XLogFileName;
19 : use postgres_ffi::{XLogSegNo, PG_TLI};
20 : use remote_storage::{
21 : DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata,
22 : };
23 : use tokio::fs::File;
24 :
25 : use tokio::select;
26 : use tokio::sync::mpsc::{self, Receiver, Sender};
27 : use tokio::sync::{watch, OnceCell};
28 : use tokio::time::sleep;
29 : use tracing::*;
30 :
31 : use utils::{id::TenantTimelineId, lsn::Lsn};
32 :
33 : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
34 : use crate::timeline::{PeerInfo, WalResidentTimeline};
35 : use crate::timeline_manager::{Manager, StateSnapshot};
36 : use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME};
37 :
38 : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
39 : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
40 :
41 : /// Default buffer size when interfacing with [`tokio::fs::File`].
42 : const BUFFER_SIZE: usize = 32 * 1024;
43 :
44 : pub struct WalBackupTaskHandle {
45 : shutdown_tx: Sender<()>,
46 : handle: JoinHandle<()>,
47 : }
48 :
49 : /// Do we have anything to upload to S3, i.e. should safekeepers run backup activity?
50 0 : pub(crate) fn is_wal_backup_required(
51 0 : wal_seg_size: usize,
52 0 : num_computes: usize,
53 0 : state: &StateSnapshot,
54 0 : ) -> bool {
55 0 : num_computes > 0 ||
56 : // Currently only the whole segment is offloaded, so compare segment numbers.
57 0 : (state.commit_lsn.segment_number(wal_seg_size) > state.backup_lsn.segment_number(wal_seg_size))
58 0 : }
59 :
60 : /// Based on peer information determine which safekeeper should offload; if it
61 : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
62 : /// is running, kill it.
63 0 : pub(crate) async fn update_task(mgr: &mut Manager, need_backup: bool, state: &StateSnapshot) {
64 0 : let (offloader, election_dbg_str) =
65 0 : determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
66 0 : let elected_me = Some(mgr.conf.my_id) == offloader;
67 :
68 0 : let should_task_run = need_backup && elected_me;
69 :
70 : // start or stop the task
71 0 : if should_task_run != (mgr.backup_task.is_some()) {
72 0 : if should_task_run {
73 0 : info!("elected for backup: {}", election_dbg_str);
74 :
75 0 : let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
76 0 :
77 0 : let async_task = backup_task_main(
78 0 : mgr.wal_resident_timeline(),
79 0 : mgr.conf.backup_parallel_jobs,
80 0 : shutdown_rx,
81 0 : );
82 :
83 0 : let handle = if mgr.conf.current_thread_runtime {
84 0 : tokio::spawn(async_task)
85 : } else {
86 0 : WAL_BACKUP_RUNTIME.spawn(async_task)
87 : };
88 :
89 0 : mgr.backup_task = Some(WalBackupTaskHandle {
90 0 : shutdown_tx,
91 0 : handle,
92 0 : });
93 : } else {
94 0 : if !need_backup {
95 : // don't need backup at all
96 0 : info!("stepping down from backup, need_backup={}", need_backup);
97 : } else {
98 : // someone else has been elected
99 0 : info!("stepping down from backup: {}", election_dbg_str);
100 : }
101 0 : shut_down_task(&mut mgr.backup_task).await;
102 : }
103 0 : }
104 0 : }
105 :
106 0 : async fn shut_down_task(entry: &mut Option<WalBackupTaskHandle>) {
107 0 : if let Some(wb_handle) = entry.take() {
108 : // Tell the task to shutdown. Error means task exited earlier, that's ok.
109 0 : let _ = wb_handle.shutdown_tx.send(()).await;
110 : // Await the task itself. TODO: restart panicked tasks earlier.
111 0 : if let Err(e) = wb_handle.handle.await {
112 0 : warn!("WAL backup task panicked: {}", e);
113 0 : }
114 0 : }
115 0 : }
116 :
117 : /// The goal is to ensure that normally only one safekeepers offloads. However,
118 : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
119 : /// time we have several ones as they PUT the same files. Also,
120 : /// - frequently changing the offloader would be bad;
121 : /// - electing seriously lagging safekeeper is undesirable;
122 : ///
123 : /// So we deterministically choose among the reasonably caught up candidates.
124 : /// TODO: take into account failed attempts to deal with hypothetical situation
125 : /// where s3 is unreachable only for some sks.
126 0 : fn determine_offloader(
127 0 : alive_peers: &[PeerInfo],
128 0 : wal_backup_lsn: Lsn,
129 0 : ttid: TenantTimelineId,
130 0 : conf: &SafeKeeperConf,
131 0 : ) -> (Option<NodeId>, String) {
132 0 : // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
133 0 : let capable_peers = alive_peers
134 0 : .iter()
135 0 : .filter(|p| p.local_start_lsn <= wal_backup_lsn);
136 0 : match capable_peers.clone().map(|p| p.commit_lsn).max() {
137 0 : None => (None, "no connected peers to elect from".to_string()),
138 0 : Some(max_commit_lsn) => {
139 0 : let threshold = max_commit_lsn
140 0 : .checked_sub(conf.max_offloader_lag_bytes)
141 0 : .unwrap_or(Lsn(0));
142 0 : let mut caughtup_peers = capable_peers
143 0 : .clone()
144 0 : .filter(|p| p.commit_lsn >= threshold)
145 0 : .collect::<Vec<_>>();
146 0 : caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
147 0 :
148 0 : // To distribute the load, shift by timeline_id.
149 0 : let offloader = caughtup_peers
150 0 : [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
151 0 : .sk_id;
152 0 :
153 0 : let mut capable_peers_dbg = capable_peers
154 0 : .map(|p| (p.sk_id, p.commit_lsn))
155 0 : .collect::<Vec<_>>();
156 0 : capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
157 0 : (
158 0 : Some(offloader),
159 0 : format!(
160 0 : "elected {} among {:?} peers, with {} of them being caughtup",
161 0 : offloader,
162 0 : capable_peers_dbg,
163 0 : caughtup_peers.len()
164 0 : ),
165 0 : )
166 : }
167 : }
168 0 : }
169 :
170 : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::const_new();
171 :
172 : // Storage must be configured and initialized when this is called.
173 0 : fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
174 0 : REMOTE_STORAGE
175 0 : .get()
176 0 : .expect("failed to get remote storage")
177 0 : .as_ref()
178 0 : .unwrap()
179 0 : }
180 :
181 0 : pub async fn init_remote_storage(conf: &SafeKeeperConf) {
182 0 : // TODO: refactor REMOTE_STORAGE to avoid using global variables, and provide
183 0 : // dependencies to all tasks instead.
184 0 : REMOTE_STORAGE
185 0 : .get_or_init(|| async {
186 0 : if let Some(conf) = conf.remote_storage.as_ref() {
187 : Some(
188 0 : GenericRemoteStorage::from_config(conf)
189 0 : .await
190 0 : .expect("failed to create remote storage"),
191 : )
192 : } else {
193 0 : None
194 : }
195 0 : })
196 0 : .await;
197 0 : }
198 :
199 : struct WalBackupTask {
200 : timeline: WalResidentTimeline,
201 : timeline_dir: Utf8PathBuf,
202 : wal_seg_size: usize,
203 : parallel_jobs: usize,
204 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
205 : }
206 :
207 : /// Offload single timeline.
208 0 : #[instrument(name = "wal_backup", skip_all, fields(ttid = %tli.ttid))]
209 : async fn backup_task_main(
210 : tli: WalResidentTimeline,
211 : parallel_jobs: usize,
212 : mut shutdown_rx: Receiver<()>,
213 : ) {
214 : let _guard = WAL_BACKUP_TASKS.guard();
215 : info!("started");
216 :
217 : let mut wb = WalBackupTask {
218 : wal_seg_size: tli.get_wal_seg_size().await,
219 : commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
220 : timeline_dir: tli.get_timeline_dir(),
221 : timeline: tli,
222 : parallel_jobs,
223 : };
224 :
225 : // task is spinned up only when wal_seg_size already initialized
226 : assert!(wb.wal_seg_size > 0);
227 :
228 : let mut canceled = false;
229 : select! {
230 : _ = wb.run() => {}
231 : _ = shutdown_rx.recv() => {
232 : canceled = true;
233 : }
234 : }
235 : info!("task {}", if canceled { "canceled" } else { "terminated" });
236 : }
237 :
238 : impl WalBackupTask {
239 0 : async fn run(&mut self) {
240 0 : let mut backup_lsn = Lsn(0);
241 0 :
242 0 : let mut retry_attempt = 0u32;
243 : // offload loop
244 : loop {
245 0 : if retry_attempt == 0 {
246 : // wait for new WAL to arrive
247 0 : if let Err(e) = self.commit_lsn_watch_rx.changed().await {
248 : // should never happen, as we hold Arc to timeline.
249 0 : error!("commit_lsn watch shut down: {:?}", e);
250 0 : return;
251 0 : }
252 : } else {
253 : // or just sleep if we errored previously
254 0 : let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
255 0 : if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
256 0 : {
257 0 : retry_delay = min(retry_delay, backoff_delay);
258 0 : }
259 0 : sleep(Duration::from_millis(retry_delay)).await;
260 : }
261 :
262 0 : let commit_lsn = *self.commit_lsn_watch_rx.borrow();
263 0 :
264 0 : // Note that backup_lsn can be higher than commit_lsn if we
265 0 : // don't have much local WAL and others already uploaded
266 0 : // segments we don't even have.
267 0 : if backup_lsn.segment_number(self.wal_seg_size)
268 0 : >= commit_lsn.segment_number(self.wal_seg_size)
269 : {
270 0 : retry_attempt = 0;
271 0 : continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
272 0 : }
273 0 : // Perhaps peers advanced the position, check shmem value.
274 0 : backup_lsn = self.timeline.get_wal_backup_lsn().await;
275 0 : if backup_lsn.segment_number(self.wal_seg_size)
276 0 : >= commit_lsn.segment_number(self.wal_seg_size)
277 : {
278 0 : retry_attempt = 0;
279 0 : continue;
280 0 : }
281 0 :
282 0 : match backup_lsn_range(
283 0 : &self.timeline,
284 0 : &mut backup_lsn,
285 0 : commit_lsn,
286 0 : self.wal_seg_size,
287 0 : &self.timeline_dir,
288 0 : self.parallel_jobs,
289 0 : )
290 0 : .await
291 : {
292 0 : Ok(()) => {
293 0 : retry_attempt = 0;
294 0 : }
295 0 : Err(e) => {
296 0 : error!(
297 0 : "failed while offloading range {}-{}: {:?}",
298 : backup_lsn, commit_lsn, e
299 : );
300 :
301 0 : retry_attempt = retry_attempt.saturating_add(1);
302 : }
303 : }
304 : }
305 0 : }
306 : }
307 :
308 0 : async fn backup_lsn_range(
309 0 : timeline: &WalResidentTimeline,
310 0 : backup_lsn: &mut Lsn,
311 0 : end_lsn: Lsn,
312 0 : wal_seg_size: usize,
313 0 : timeline_dir: &Utf8Path,
314 0 : parallel_jobs: usize,
315 0 : ) -> Result<()> {
316 0 : if parallel_jobs < 1 {
317 0 : anyhow::bail!("parallel_jobs must be >= 1");
318 0 : }
319 0 :
320 0 : let remote_timeline_path = &timeline.remote_path;
321 0 : let start_lsn = *backup_lsn;
322 0 : let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
323 0 :
324 0 : // Pool of concurrent upload tasks. We use `FuturesOrdered` to
325 0 : // preserve order of uploads, and update `backup_lsn` only after
326 0 : // all previous uploads are finished.
327 0 : let mut uploads = FuturesOrdered::new();
328 0 : let mut iter = segments.iter();
329 :
330 : loop {
331 0 : let added_task = match iter.next() {
332 0 : Some(s) => {
333 0 : uploads.push_back(backup_single_segment(s, timeline_dir, remote_timeline_path));
334 0 : true
335 : }
336 0 : None => false,
337 : };
338 :
339 : // Wait for the next segment to upload if we don't have any more segments,
340 : // or if we have too many concurrent uploads.
341 0 : if !added_task || uploads.len() >= parallel_jobs {
342 0 : let next = uploads.next().await;
343 0 : if let Some(res) = next {
344 : // next segment uploaded
345 0 : let segment = res?;
346 0 : let new_backup_lsn = segment.end_lsn;
347 0 : timeline
348 0 : .set_wal_backup_lsn(new_backup_lsn)
349 0 : .await
350 0 : .context("setting wal_backup_lsn")?;
351 0 : *backup_lsn = new_backup_lsn;
352 : } else {
353 : // no more segments to upload
354 0 : break;
355 : }
356 0 : }
357 : }
358 :
359 0 : info!(
360 0 : "offloaded segnos {:?} up to {}, previous backup_lsn {}",
361 0 : segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
362 : end_lsn,
363 : start_lsn,
364 : );
365 0 : Ok(())
366 0 : }
367 :
368 0 : async fn backup_single_segment(
369 0 : seg: &Segment,
370 0 : timeline_dir: &Utf8Path,
371 0 : remote_timeline_path: &RemotePath,
372 0 : ) -> Result<Segment> {
373 0 : let segment_file_path = seg.file_path(timeline_dir)?;
374 0 : let remote_segment_path = seg.remote_path(remote_timeline_path);
375 :
376 0 : let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
377 0 : if res.is_ok() {
378 0 : BACKED_UP_SEGMENTS.inc();
379 0 : } else {
380 0 : BACKUP_ERRORS.inc();
381 0 : }
382 0 : res?;
383 0 : debug!("Backup of {} done", segment_file_path);
384 :
385 0 : Ok(*seg)
386 0 : }
387 :
388 : #[derive(Debug, Copy, Clone)]
389 : pub struct Segment {
390 : seg_no: XLogSegNo,
391 : start_lsn: Lsn,
392 : end_lsn: Lsn,
393 : }
394 :
395 : impl Segment {
396 0 : pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
397 0 : Self {
398 0 : seg_no,
399 0 : start_lsn,
400 0 : end_lsn,
401 0 : }
402 0 : }
403 :
404 0 : pub fn object_name(self) -> String {
405 0 : XLogFileName(PG_TLI, self.seg_no, self.size())
406 0 : }
407 :
408 0 : pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
409 0 : Ok(timeline_dir.join(self.object_name()))
410 0 : }
411 :
412 0 : pub fn remote_path(self, remote_timeline_path: &RemotePath) -> RemotePath {
413 0 : remote_timeline_path.join(self.object_name())
414 0 : }
415 :
416 0 : pub fn size(self) -> usize {
417 0 : (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
418 0 : }
419 : }
420 :
421 0 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
422 0 : let first_seg = start.segment_number(seg_size);
423 0 : let last_seg = end.segment_number(seg_size);
424 0 :
425 0 : let res: Vec<Segment> = (first_seg..last_seg)
426 0 : .map(|s| {
427 0 : let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
428 0 : let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
429 0 : Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
430 0 : })
431 0 : .collect();
432 0 : res
433 0 : }
434 :
435 0 : async fn backup_object(
436 0 : source_file: &Utf8Path,
437 0 : target_file: &RemotePath,
438 0 : size: usize,
439 0 : ) -> Result<()> {
440 0 : let storage = get_configured_remote_storage();
441 :
442 0 : let file = File::open(&source_file)
443 0 : .await
444 0 : .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
445 :
446 0 : let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
447 0 :
448 0 : let cancel = CancellationToken::new();
449 0 :
450 0 : storage
451 0 : .upload_storage_object(file, size, target_file, &cancel)
452 0 : .await
453 0 : }
454 :
455 0 : pub(crate) async fn backup_partial_segment(
456 0 : source_file: &Utf8Path,
457 0 : target_file: &RemotePath,
458 0 : size: usize,
459 0 : ) -> Result<()> {
460 0 : let storage = get_configured_remote_storage();
461 :
462 0 : let file = File::open(&source_file)
463 0 : .await
464 0 : .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
465 :
466 : // limiting the file to read only the first `size` bytes
467 0 : let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);
468 0 :
469 0 : let file = tokio_util::io::ReaderStream::with_capacity(limited_file, BUFFER_SIZE);
470 0 :
471 0 : let cancel = CancellationToken::new();
472 0 :
473 0 : storage
474 0 : .upload(
475 0 : file,
476 0 : size,
477 0 : target_file,
478 0 : Some(StorageMetadata::from([("sk_type", "partial_segment")])),
479 0 : &cancel,
480 0 : )
481 0 : .await
482 0 : }
483 :
484 0 : pub(crate) async fn copy_partial_segment(
485 0 : source: &RemotePath,
486 0 : destination: &RemotePath,
487 0 : ) -> Result<()> {
488 0 : let storage = get_configured_remote_storage();
489 0 : let cancel = CancellationToken::new();
490 0 :
491 0 : storage.copy_object(source, destination, &cancel).await
492 0 : }
493 :
494 0 : pub async fn read_object(
495 0 : file_path: &RemotePath,
496 0 : offset: u64,
497 0 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
498 0 : let storage = REMOTE_STORAGE
499 0 : .get()
500 0 : .context("Failed to get remote storage")?
501 0 : .as_ref()
502 0 : .context("No remote storage configured")?;
503 :
504 0 : info!("segment download about to start from remote path {file_path:?} at offset {offset}");
505 :
506 0 : let cancel = CancellationToken::new();
507 0 :
508 0 : let opts = DownloadOpts {
509 0 : byte_start: std::ops::Bound::Included(offset),
510 0 : ..Default::default()
511 0 : };
512 0 : let download = storage
513 0 : .download(file_path, &opts, &cancel)
514 0 : .await
515 0 : .with_context(|| {
516 0 : format!("Failed to open WAL segment download stream for remote path {file_path:?}")
517 0 : })?;
518 :
519 0 : let reader = tokio_util::io::StreamReader::new(download.download_stream);
520 0 :
521 0 : let reader = tokio::io::BufReader::with_capacity(BUFFER_SIZE, reader);
522 0 :
523 0 : Ok(Box::pin(reader))
524 0 : }
525 :
526 : /// Delete WAL files for the given timeline. Remote storage must be configured
527 : /// when called.
528 0 : pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
529 0 : let storage = get_configured_remote_storage();
530 0 : let remote_path = remote_timeline_path(ttid)?;
531 :
532 : // see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
533 : // const Option unwrap is not stable, otherwise it would be const.
534 0 : let batch_size: NonZeroU32 = NonZeroU32::new(1000).unwrap();
535 0 :
536 0 : // A backoff::retry is used here for two reasons:
537 0 : // - To provide a backoff rather than busy-polling the API on errors
538 0 : // - To absorb transient 429/503 conditions without hitting our error
539 0 : // logging path for issues deleting objects.
540 0 : //
541 0 : // Note: listing segments might take a long time if there are many of them.
542 0 : // We don't currently have http requests timeout cancellation, but if/once
543 0 : // we have listing should get streaming interface to make progress.
544 0 :
545 0 : let cancel = CancellationToken::new(); // not really used
546 0 : backoff::retry(
547 0 : || async {
548 : // Do list-delete in batch_size batches to make progress even if there a lot of files.
549 : // Alternatively we could make remote storage list return iterator, but it is more complicated and
550 : // I'm not sure deleting while iterating is expected in s3.
551 : loop {
552 0 : let files = storage
553 0 : .list(
554 0 : Some(&remote_path),
555 0 : ListingMode::NoDelimiter,
556 0 : Some(batch_size),
557 0 : &cancel,
558 0 : )
559 0 : .await?
560 : .keys
561 0 : .into_iter()
562 0 : .map(|o| o.key)
563 0 : .collect::<Vec<_>>();
564 0 : if files.is_empty() {
565 0 : return Ok(()); // done
566 0 : }
567 0 : // (at least) s3 results are sorted, so can log min/max:
568 0 : // "List results are always returned in UTF-8 binary order."
569 0 : info!(
570 0 : "deleting batch of {} WAL segments [{}-{}]",
571 0 : files.len(),
572 0 : files.first().unwrap().object_name().unwrap_or(""),
573 0 : files.last().unwrap().object_name().unwrap_or("")
574 : );
575 0 : storage.delete_objects(&files, &cancel).await?;
576 : }
577 0 : },
578 0 : // consider TimeoutOrCancel::caused_by_cancel when using cancellation
579 0 : |_| false,
580 0 : 3,
581 0 : 10,
582 0 : "executing WAL segments deletion batch",
583 0 : &cancel,
584 0 : )
585 0 : .await
586 0 : .ok_or_else(|| anyhow::anyhow!("canceled"))
587 0 : .and_then(|x| x)?;
588 :
589 0 : Ok(())
590 0 : }
591 :
592 : /// Used by wal_backup_partial.
593 0 : pub async fn delete_objects(paths: &[RemotePath]) -> Result<()> {
594 0 : let cancel = CancellationToken::new(); // not really used
595 0 : let storage = get_configured_remote_storage();
596 0 : storage.delete_objects(paths, &cancel).await
597 0 : }
598 :
599 : /// Copy segments from one timeline to another. Used in copy_timeline.
600 0 : pub async fn copy_s3_segments(
601 0 : wal_seg_size: usize,
602 0 : src_ttid: &TenantTimelineId,
603 0 : dst_ttid: &TenantTimelineId,
604 0 : from_segment: XLogSegNo,
605 0 : to_segment: XLogSegNo,
606 0 : ) -> Result<()> {
607 : const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
608 :
609 0 : let storage = REMOTE_STORAGE
610 0 : .get()
611 0 : .expect("failed to get remote storage")
612 0 : .as_ref()
613 0 : .unwrap();
614 :
615 0 : let remote_dst_path = remote_timeline_path(dst_ttid)?;
616 :
617 0 : let cancel = CancellationToken::new();
618 :
619 0 : let files = storage
620 0 : .list(
621 0 : Some(&remote_dst_path),
622 0 : ListingMode::NoDelimiter,
623 0 : None,
624 0 : &cancel,
625 0 : )
626 0 : .await?
627 : .keys;
628 :
629 0 : let uploaded_segments = &files
630 0 : .iter()
631 0 : .filter_map(|o| o.key.object_name().map(ToOwned::to_owned))
632 0 : .collect::<HashSet<_>>();
633 0 :
634 0 : debug!(
635 0 : "these segments have already been uploaded: {:?}",
636 : uploaded_segments
637 : );
638 :
639 0 : for segno in from_segment..to_segment {
640 0 : if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
641 0 : info!("copied all segments from {} until {}", from_segment, segno);
642 0 : }
643 :
644 0 : let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
645 0 : if uploaded_segments.contains(&segment_name) {
646 0 : continue;
647 0 : }
648 0 : debug!("copying segment {}", segment_name);
649 :
650 0 : let from = remote_timeline_path(src_ttid)?.join(&segment_name);
651 0 : let to = remote_dst_path.join(&segment_name);
652 0 :
653 0 : storage.copy_object(&from, &to, &cancel).await?;
654 : }
655 :
656 0 : info!(
657 0 : "finished copying segments from {} until {}",
658 : from_segment, to_segment
659 : );
660 0 : Ok(())
661 0 : }
662 :
663 : /// Get S3 (remote_storage) prefix path used for timeline files.
664 0 : pub fn remote_timeline_path(ttid: &TenantTimelineId) -> Result<RemotePath> {
665 0 : RemotePath::new(&Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string()))
666 0 : }
|