TLA Line data Source code
1 : use anyhow::{Context, Result};
2 :
3 : use camino::{Utf8Path, Utf8PathBuf};
4 : use futures::stream::FuturesOrdered;
5 : use futures::StreamExt;
6 : use tokio::task::JoinHandle;
7 : use utils::id::NodeId;
8 :
9 : use std::cmp::min;
10 : use std::collections::{HashMap, HashSet};
11 : use std::pin::Pin;
12 : use std::sync::Arc;
13 : use std::time::Duration;
14 :
15 : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
16 : use postgres_ffi::XLogFileName;
17 : use postgres_ffi::{XLogSegNo, PG_TLI};
18 : use remote_storage::{GenericRemoteStorage, RemotePath};
19 : use tokio::fs::File;
20 :
21 : use tokio::select;
22 : use tokio::sync::mpsc::{self, Receiver, Sender};
23 : use tokio::sync::watch;
24 : use tokio::time::sleep;
25 : use tracing::*;
26 :
27 : use utils::{id::TenantTimelineId, lsn::Lsn};
28 :
29 : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
30 : use crate::timeline::{PeerInfo, Timeline};
31 : use crate::{GlobalTimelines, SafeKeeperConf};
32 :
33 : use once_cell::sync::OnceCell;
34 :
35 : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
36 : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
37 :
38 : /// Default buffer size when interfacing with [`tokio::fs::File`].
39 : const BUFFER_SIZE: usize = 32 * 1024;
40 :
41 : /// Check whether wal backup is required for timeline. If yes, mark that launcher is
42 : /// aware of current status and return the timeline.
43 CBC 146 : async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
44 146 : match GlobalTimelines::get(ttid).ok() {
45 146 : Some(tli) => {
46 146 : tli.wal_backup_attend().await;
47 146 : Some(tli)
48 : }
49 UBC 0 : None => None,
50 : }
51 CBC 146 : }
52 :
53 : struct WalBackupTaskHandle {
54 : shutdown_tx: Sender<()>,
55 : handle: JoinHandle<()>,
56 : }
57 :
58 : struct WalBackupTimelineEntry {
59 : timeline: Arc<Timeline>,
60 : handle: Option<WalBackupTaskHandle>,
61 : }
62 :
63 3 : async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
64 3 : if let Some(wb_handle) = entry.handle.take() {
65 : // Tell the task to shutdown. Error means task exited earlier, that's ok.
66 3 : let _ = wb_handle.shutdown_tx.send(()).await;
67 : // Await the task itself. TODO: restart panicked tasks earlier.
68 3 : if let Err(e) = wb_handle.handle.await {
69 UBC 0 : warn!("WAL backup task for {} panicked: {}", ttid, e);
70 CBC 3 : }
71 UBC 0 : }
72 CBC 3 : }
73 :
74 : /// The goal is to ensure that normally only one safekeepers offloads. However,
75 : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
76 : /// time we have several ones as they PUT the same files. Also,
77 : /// - frequently changing the offloader would be bad;
78 : /// - electing seriously lagging safekeeper is undesirable;
79 : /// So we deterministically choose among the reasonably caught up candidates.
80 : /// TODO: take into account failed attempts to deal with hypothetical situation
81 : /// where s3 is unreachable only for some sks.
82 291 : fn determine_offloader(
83 291 : alive_peers: &[PeerInfo],
84 291 : wal_backup_lsn: Lsn,
85 291 : ttid: TenantTimelineId,
86 291 : conf: &SafeKeeperConf,
87 291 : ) -> (Option<NodeId>, String) {
88 291 : // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
89 291 : let capable_peers = alive_peers
90 291 : .iter()
91 2256 : .filter(|p| p.local_start_lsn <= wal_backup_lsn);
92 752 : match capable_peers.clone().map(|p| p.commit_lsn).max() {
93 28 : None => (None, "no connected peers to elect from".to_string()),
94 263 : Some(max_commit_lsn) => {
95 263 : let threshold = max_commit_lsn
96 263 : .checked_sub(conf.max_offloader_lag_bytes)
97 263 : .unwrap_or(Lsn(0));
98 263 : let mut caughtup_peers = capable_peers
99 263 : .clone()
100 752 : .filter(|p| p.commit_lsn >= threshold)
101 263 : .collect::<Vec<_>>();
102 498 : caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
103 263 :
104 263 : // To distribute the load, shift by timeline_id.
105 263 : let offloader = caughtup_peers
106 263 : [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
107 263 : .sk_id;
108 263 :
109 263 : let mut capable_peers_dbg = capable_peers
110 752 : .map(|p| (p.sk_id, p.commit_lsn))
111 263 : .collect::<Vec<_>>();
112 498 : capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
113 263 : (
114 263 : Some(offloader),
115 263 : format!(
116 263 : "elected {} among {:?} peers, with {} of them being caughtup",
117 263 : offloader,
118 263 : capable_peers_dbg,
119 263 : caughtup_peers.len()
120 263 : ),
121 263 : )
122 : }
123 : }
124 291 : }
125 :
126 : /// Based on peer information determine which safekeeper should offload; if it
127 : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
128 : /// is running, kill it.
129 291 : async fn update_task(
130 291 : conf: &SafeKeeperConf,
131 291 : ttid: TenantTimelineId,
132 291 : entry: &mut WalBackupTimelineEntry,
133 291 : ) {
134 291 : let alive_peers = entry.timeline.get_peers(conf).await;
135 291 : let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
136 291 : let (offloader, election_dbg_str) =
137 291 : determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
138 291 : let elected_me = Some(conf.my_id) == offloader;
139 291 :
140 291 : if elected_me != (entry.handle.is_some()) {
141 15 : if elected_me {
142 12 : info!("elected for backup: {}", election_dbg_str);
143 :
144 12 : let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
145 12 : let timeline_dir = conf.timeline_dir(&ttid);
146 12 :
147 12 : let handle = tokio::spawn(
148 12 : backup_task_main(
149 12 : ttid,
150 12 : timeline_dir,
151 12 : conf.workdir.clone(),
152 12 : conf.backup_parallel_jobs,
153 12 : shutdown_rx,
154 12 : )
155 12 : .in_current_span(),
156 12 : );
157 12 :
158 12 : entry.handle = Some(WalBackupTaskHandle {
159 12 : shutdown_tx,
160 12 : handle,
161 12 : });
162 : } else {
163 3 : info!("stepping down from backup: {}", election_dbg_str);
164 3 : shut_down_task(ttid, entry).await;
165 : }
166 276 : }
167 291 : }
168 :
169 : const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
170 :
171 : /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
172 : /// tasks. Having this in separate task simplifies locking, allows to reap
173 : /// panics and separate elections from offloading itself.
174 485 : pub async fn wal_backup_launcher_task_main(
175 485 : conf: SafeKeeperConf,
176 485 : mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
177 485 : ) -> anyhow::Result<()> {
178 485 : info!(
179 485 : "WAL backup launcher started, remote config {:?}",
180 485 : conf.remote_storage
181 485 : );
182 :
183 485 : let conf_ = conf.clone();
184 485 : REMOTE_STORAGE.get_or_init(|| {
185 485 : conf_
186 485 : .remote_storage
187 485 : .as_ref()
188 485 : .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
189 485 : });
190 485 :
191 485 : // Presence in this map means launcher is aware s3 offloading is needed for
192 485 : // the timeline, but task is started only if it makes sense for to offload
193 485 : // from this safekeeper.
194 485 : let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
195 485 :
196 485 : let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
197 23160 : loop {
198 44779 : tokio::select! {
199 13408 : ttid = wal_backup_launcher_rx.recv() => {
200 : // channel is never expected to get closed
201 : let ttid = ttid.unwrap();
202 : if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
203 : continue; /* just drain the channel and do nothing */
204 : }
205 146 : async {
206 146 : let timeline = is_wal_backup_required(ttid).await;
207 : // do we need to do anything at all?
208 146 : if timeline.is_some() != tasks.contains_key(&ttid) {
209 24 : if let Some(timeline) = timeline {
210 : // need to start the task
211 24 : let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
212 24 : timeline,
213 24 : handle: None,
214 24 : });
215 24 : update_task(&conf, ttid, entry).await;
216 : } else {
217 : // need to stop the task
218 UBC 0 : info!("stopping WAL backup task");
219 0 : let mut entry = tasks.remove(&ttid).unwrap();
220 0 : shut_down_task(ttid, &mut entry).await;
221 : }
222 CBC 122 : }
223 146 : }.instrument(info_span!("WAL backup", ttid = %ttid)).await;
224 : }
225 : // For each timeline needing offloading, check if this safekeeper
226 : // should do the job and start/stop the task accordingly.
227 : _ = ticker.tick() => {
228 : for (ttid, entry) in tasks.iter_mut() {
229 : update_task(&conf, *ttid, entry)
230 : .instrument(info_span!("WAL backup", ttid = %ttid))
231 : .await;
232 : }
233 : }
234 23160 : }
235 23160 : }
236 : }
237 :
238 : struct WalBackupTask {
239 : timeline: Arc<Timeline>,
240 : timeline_dir: Utf8PathBuf,
241 : workspace_dir: Utf8PathBuf,
242 : wal_seg_size: usize,
243 : parallel_jobs: usize,
244 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
245 : }
246 :
247 : /// Offload single timeline.
248 12 : async fn backup_task_main(
249 12 : ttid: TenantTimelineId,
250 12 : timeline_dir: Utf8PathBuf,
251 12 : workspace_dir: Utf8PathBuf,
252 12 : parallel_jobs: usize,
253 12 : mut shutdown_rx: Receiver<()>,
254 12 : ) {
255 12 : info!("started");
256 12 : let res = GlobalTimelines::get(ttid);
257 12 : if let Err(e) = res {
258 UBC 0 : error!("backup error: {}", e);
259 0 : return;
260 CBC 12 : }
261 12 : let tli = res.unwrap();
262 :
263 12 : let mut wb = WalBackupTask {
264 12 : wal_seg_size: tli.get_wal_seg_size().await,
265 12 : commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
266 12 : timeline: tli,
267 12 : timeline_dir,
268 12 : workspace_dir,
269 12 : parallel_jobs,
270 12 : };
271 12 :
272 12 : // task is spinned up only when wal_seg_size already initialized
273 12 : assert!(wb.wal_seg_size > 0);
274 :
275 12 : let mut canceled = false;
276 6880 : select! {
277 6880 : _ = wb.run() => {}
278 6880 : _ = shutdown_rx.recv() => {
279 6880 : canceled = true;
280 6880 : }
281 6880 : }
282 3 : info!("task {}", if canceled { "canceled" } else { "terminated" });
283 3 : }
284 :
285 : impl WalBackupTask {
286 12 : async fn run(&mut self) {
287 12 : let mut backup_lsn = Lsn(0);
288 12 :
289 12 : let mut retry_attempt = 0u32;
290 : // offload loop
291 1475 : loop {
292 1475 : if retry_attempt == 0 {
293 : // wait for new WAL to arrive
294 1533 : if let Err(e) = self.commit_lsn_watch_rx.changed().await {
295 : // should never happen, as we hold Arc to timeline.
296 UBC 0 : error!("commit_lsn watch shut down: {:?}", e);
297 0 : return;
298 CBC 1464 : }
299 : } else {
300 : // or just sleep if we errored previously
301 LBC (2) : let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
302 (2) : if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
303 (2) : {
304 (2) : retry_delay = min(retry_delay, backoff_delay);
305 (2) : }
306 (2) : sleep(Duration::from_millis(retry_delay)).await;
307 : }
308 :
309 CBC 1464 : let commit_lsn = *self.commit_lsn_watch_rx.borrow();
310 1464 :
311 1464 : // Note that backup_lsn can be higher than commit_lsn if we
312 1464 : // don't have much local WAL and others already uploaded
313 1464 : // segments we don't even have.
314 1464 : if backup_lsn.segment_number(self.wal_seg_size)
315 1464 : >= commit_lsn.segment_number(self.wal_seg_size)
316 : {
317 1443 : retry_attempt = 0;
318 1443 : continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
319 21 : }
320 21 : // Perhaps peers advanced the position, check shmem value.
321 21 : backup_lsn = self.timeline.get_wal_backup_lsn().await;
322 21 : if backup_lsn.segment_number(self.wal_seg_size)
323 21 : >= commit_lsn.segment_number(self.wal_seg_size)
324 : {
325 8 : retry_attempt = 0;
326 8 : continue;
327 13 : }
328 13 :
329 13 : match backup_lsn_range(
330 13 : &self.timeline,
331 13 : &mut backup_lsn,
332 13 : commit_lsn,
333 13 : self.wal_seg_size,
334 13 : &self.timeline_dir,
335 13 : &self.workspace_dir,
336 13 : self.parallel_jobs,
337 13 : )
338 5331 : .await
339 : {
340 12 : Ok(()) => {
341 12 : retry_attempt = 0;
342 12 : }
343 LBC (2) : Err(e) => {
344 (2) : error!(
345 (2) : "failed while offloading range {}-{}: {:?}",
346 (2) : backup_lsn, commit_lsn, e
347 (2) : );
348 :
349 (2) : retry_attempt = retry_attempt.saturating_add(1);
350 : }
351 : }
352 : }
353 UBC 0 : }
354 : }
355 :
356 CBC 13 : async fn backup_lsn_range(
357 13 : timeline: &Arc<Timeline>,
358 13 : backup_lsn: &mut Lsn,
359 13 : end_lsn: Lsn,
360 13 : wal_seg_size: usize,
361 13 : timeline_dir: &Utf8Path,
362 13 : workspace_dir: &Utf8Path,
363 13 : parallel_jobs: usize,
364 13 : ) -> Result<()> {
365 13 : if parallel_jobs < 1 {
366 UBC 0 : anyhow::bail!("parallel_jobs must be >= 1");
367 CBC 13 : }
368 13 :
369 13 : let start_lsn = *backup_lsn;
370 13 : let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
371 13 :
372 13 : // Pool of concurrent upload tasks. We use `FuturesOrdered` to
373 13 : // preserve order of uploads, and update `backup_lsn` only after
374 13 : // all previous uploads are finished.
375 13 : let mut uploads = FuturesOrdered::new();
376 13 : let mut iter = segments.iter();
377 :
378 : loop {
379 40 : let added_task = match iter.next() {
380 14 : Some(s) => {
381 14 : uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
382 14 : true
383 : }
384 26 : None => false,
385 : };
386 :
387 : // Wait for the next segment to upload if we don't have any more segments,
388 : // or if we have too many concurrent uploads.
389 40 : if !added_task || uploads.len() >= parallel_jobs {
390 5327 : let next = uploads.next().await;
391 25 : if let Some(res) = next {
392 : // next segment uploaded
393 13 : let segment = res?;
394 13 : let new_backup_lsn = segment.end_lsn;
395 13 : timeline
396 13 : .set_wal_backup_lsn(new_backup_lsn)
397 4 : .await
398 13 : .context("setting wal_backup_lsn")?;
399 13 : *backup_lsn = new_backup_lsn;
400 : } else {
401 : // no more segments to upload
402 12 : break;
403 : }
404 14 : }
405 : }
406 :
407 12 : info!(
408 12 : "offloaded segnos {:?} up to {}, previous backup_lsn {}",
409 13 : segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
410 12 : end_lsn,
411 12 : start_lsn,
412 12 : );
413 12 : Ok(())
414 12 : }
415 :
416 14 : async fn backup_single_segment(
417 14 : seg: &Segment,
418 14 : timeline_dir: &Utf8Path,
419 14 : workspace_dir: &Utf8Path,
420 14 : ) -> Result<Segment> {
421 14 : let segment_file_path = seg.file_path(timeline_dir)?;
422 14 : let remote_segment_path = segment_file_path
423 14 : .strip_prefix(workspace_dir)
424 14 : .context("Failed to strip workspace dir prefix")
425 14 : .and_then(RemotePath::new)
426 14 : .with_context(|| {
427 UBC 0 : format!(
428 0 : "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
429 0 : )
430 CBC 14 : })?;
431 :
432 4796 : let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
433 13 : if res.is_ok() {
434 13 : BACKED_UP_SEGMENTS.inc();
435 13 : } else {
436 LBC (2) : BACKUP_ERRORS.inc();
437 (2) : }
438 CBC 13 : res?;
439 UBC 0 : debug!("Backup of {} done", segment_file_path);
440 :
441 CBC 13 : Ok(*seg)
442 13 : }
443 :
444 UBC 0 : #[derive(Debug, Copy, Clone)]
445 : pub struct Segment {
446 : seg_no: XLogSegNo,
447 : start_lsn: Lsn,
448 : end_lsn: Lsn,
449 : }
450 :
451 : impl Segment {
452 CBC 14 : pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
453 14 : Self {
454 14 : seg_no,
455 14 : start_lsn,
456 14 : end_lsn,
457 14 : }
458 14 : }
459 :
460 14 : pub fn object_name(self) -> String {
461 14 : XLogFileName(PG_TLI, self.seg_no, self.size())
462 14 : }
463 :
464 14 : pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
465 14 : Ok(timeline_dir.join(self.object_name()))
466 14 : }
467 :
468 28 : pub fn size(self) -> usize {
469 28 : (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
470 28 : }
471 : }
472 :
473 13 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
474 13 : let first_seg = start.segment_number(seg_size);
475 13 : let last_seg = end.segment_number(seg_size);
476 13 :
477 13 : let res: Vec<Segment> = (first_seg..last_seg)
478 14 : .map(|s| {
479 14 : let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
480 14 : let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
481 14 : Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
482 14 : })
483 13 : .collect();
484 13 : res
485 13 : }
486 :
487 : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
488 :
489 14 : async fn backup_object(
490 14 : source_file: &Utf8Path,
491 14 : target_file: &RemotePath,
492 14 : size: usize,
493 14 : ) -> Result<()> {
494 14 : let storage = REMOTE_STORAGE
495 14 : .get()
496 14 : .expect("failed to get remote storage")
497 14 : .as_ref()
498 14 : .unwrap();
499 :
500 14 : let file = File::open(&source_file)
501 14 : .await
502 14 : .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
503 :
504 14 : let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
505 14 :
506 4782 : storage.upload_storage_object(file, size, target_file).await
507 13 : }
508 :
509 38 : pub async fn read_object(
510 38 : file_path: &RemotePath,
511 38 : offset: u64,
512 38 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
513 38 : let storage = REMOTE_STORAGE
514 38 : .get()
515 38 : .context("Failed to get remote storage")?
516 38 : .as_ref()
517 38 : .context("No remote storage configured")?;
518 :
519 38 : info!("segment download about to start from remote path {file_path:?} at offset {offset}");
520 :
521 38 : let download = storage
522 38 : .download_storage_object(Some((offset, None)), file_path)
523 110 : .await
524 38 : .with_context(|| {
525 UBC 0 : format!("Failed to open WAL segment download stream for remote path {file_path:?}")
526 CBC 38 : })?;
527 :
528 38 : let reader = tokio_util::io::StreamReader::new(download.download_stream);
529 38 :
530 38 : let reader = tokio::io::BufReader::with_capacity(BUFFER_SIZE, reader);
531 38 :
532 38 : Ok(Box::pin(reader))
533 38 : }
534 :
535 : /// Copy segments from one timeline to another. Used in copy_timeline.
536 48 : pub async fn copy_s3_segments(
537 48 : wal_seg_size: usize,
538 48 : src_ttid: &TenantTimelineId,
539 48 : dst_ttid: &TenantTimelineId,
540 48 : from_segment: XLogSegNo,
541 48 : to_segment: XLogSegNo,
542 48 : ) -> Result<()> {
543 48 : const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
544 48 :
545 48 : let storage = REMOTE_STORAGE
546 48 : .get()
547 48 : .expect("failed to get remote storage")
548 48 : .as_ref()
549 48 : .unwrap();
550 48 :
551 48 : let relative_dst_path =
552 48 : Utf8Path::new(&dst_ttid.tenant_id.to_string()).join(dst_ttid.timeline_id.to_string());
553 :
554 48 : let remote_path = RemotePath::new(&relative_dst_path)?;
555 :
556 190 : let files = storage.list_files(Some(&remote_path)).await?;
557 48 : let uploaded_segments = &files
558 48 : .iter()
559 48 : .filter_map(|file| file.object_name().map(ToOwned::to_owned))
560 48 : .collect::<HashSet<_>>();
561 :
562 UBC 0 : debug!(
563 0 : "these segments have already been uploaded: {:?}",
564 0 : uploaded_segments
565 0 : );
566 :
567 CBC 48 : let relative_src_path =
568 48 : Utf8Path::new(&src_ttid.tenant_id.to_string()).join(src_ttid.timeline_id.to_string());
569 :
570 48 : for segno in from_segment..to_segment {
571 36 : if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
572 UBC 0 : info!("copied all segments from {} until {}", from_segment, segno);
573 CBC 36 : }
574 :
575 36 : let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
576 36 : if uploaded_segments.contains(&segment_name) {
577 24 : continue;
578 12 : }
579 UBC 0 : debug!("copying segment {}", segment_name);
580 :
581 CBC 12 : let from = RemotePath::new(&relative_src_path.join(&segment_name))?;
582 12 : let to = RemotePath::new(&relative_dst_path.join(&segment_name))?;
583 :
584 48 : storage.copy_object(&from, &to).await?;
585 : }
586 :
587 48 : info!(
588 48 : "finished copying segments from {} until {}",
589 48 : from_segment, to_segment
590 48 : );
591 48 : Ok(())
592 48 : }
|