Line data Source code
1 : use anyhow::{Context, Result};
2 :
3 : use camino::{Utf8Path, Utf8PathBuf};
4 : use futures::stream::FuturesOrdered;
5 : use futures::StreamExt;
6 : use tokio::task::JoinHandle;
7 : use tokio_util::sync::CancellationToken;
8 : use utils::backoff;
9 : use utils::id::NodeId;
10 :
11 : use std::cmp::min;
12 : use std::collections::{HashMap, HashSet};
13 : use std::num::NonZeroU32;
14 : use std::pin::Pin;
15 : use std::sync::Arc;
16 : use std::time::Duration;
17 :
18 : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
19 : use postgres_ffi::XLogFileName;
20 : use postgres_ffi::{XLogSegNo, PG_TLI};
21 : use remote_storage::{GenericRemoteStorage, RemotePath};
22 : use tokio::fs::File;
23 :
24 : use tokio::select;
25 : use tokio::sync::mpsc::{self, Receiver, Sender};
26 : use tokio::sync::watch;
27 : use tokio::time::sleep;
28 : use tracing::*;
29 :
30 : use utils::{id::TenantTimelineId, lsn::Lsn};
31 :
32 : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
33 : use crate::timeline::{PeerInfo, Timeline};
34 : use crate::{GlobalTimelines, SafeKeeperConf};
35 :
36 : use once_cell::sync::OnceCell;
37 :
38 : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
39 : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
40 :
41 : /// Default buffer size when interfacing with [`tokio::fs::File`].
42 : const BUFFER_SIZE: usize = 32 * 1024;
43 :
44 : /// Check whether wal backup is required for timeline. If yes, mark that launcher is
45 : /// aware of current status and return the timeline.
46 151 : async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
47 151 : match GlobalTimelines::get(ttid).ok() {
48 151 : Some(tli) => {
49 151 : tli.wal_backup_attend().await;
50 151 : Some(tli)
51 : }
52 0 : None => None,
53 : }
54 151 : }
55 :
56 : struct WalBackupTaskHandle {
57 : shutdown_tx: Sender<()>,
58 : handle: JoinHandle<()>,
59 : }
60 :
61 : struct WalBackupTimelineEntry {
62 : timeline: Arc<Timeline>,
63 : handle: Option<WalBackupTaskHandle>,
64 : }
65 :
66 2 : async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
67 2 : if let Some(wb_handle) = entry.handle.take() {
68 : // Tell the task to shutdown. Error means task exited earlier, that's ok.
69 2 : let _ = wb_handle.shutdown_tx.send(()).await;
70 : // Await the task itself. TODO: restart panicked tasks earlier.
71 2 : if let Err(e) = wb_handle.handle.await {
72 0 : warn!("WAL backup task for {} panicked: {}", ttid, e);
73 2 : }
74 0 : }
75 2 : }
76 :
77 : /// The goal is to ensure that normally only one safekeepers offloads. However,
78 : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
79 : /// time we have several ones as they PUT the same files. Also,
80 : /// - frequently changing the offloader would be bad;
81 : /// - electing seriously lagging safekeeper is undesirable;
82 : /// So we deterministically choose among the reasonably caught up candidates.
83 : /// TODO: take into account failed attempts to deal with hypothetical situation
84 : /// where s3 is unreachable only for some sks.
85 253 : fn determine_offloader(
86 253 : alive_peers: &[PeerInfo],
87 253 : wal_backup_lsn: Lsn,
88 253 : ttid: TenantTimelineId,
89 253 : conf: &SafeKeeperConf,
90 253 : ) -> (Option<NodeId>, String) {
91 253 : // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
92 253 : let capable_peers = alive_peers
93 253 : .iter()
94 1854 : .filter(|p| p.local_start_lsn <= wal_backup_lsn);
95 618 : match capable_peers.clone().map(|p| p.commit_lsn).max() {
96 41 : None => (None, "no connected peers to elect from".to_string()),
97 212 : Some(max_commit_lsn) => {
98 212 : let threshold = max_commit_lsn
99 212 : .checked_sub(conf.max_offloader_lag_bytes)
100 212 : .unwrap_or(Lsn(0));
101 212 : let mut caughtup_peers = capable_peers
102 212 : .clone()
103 618 : .filter(|p| p.commit_lsn >= threshold)
104 212 : .collect::<Vec<_>>();
105 478 : caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
106 212 :
107 212 : // To distribute the load, shift by timeline_id.
108 212 : let offloader = caughtup_peers
109 212 : [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
110 212 : .sk_id;
111 212 :
112 212 : let mut capable_peers_dbg = capable_peers
113 618 : .map(|p| (p.sk_id, p.commit_lsn))
114 212 : .collect::<Vec<_>>();
115 478 : capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
116 212 : (
117 212 : Some(offloader),
118 212 : format!(
119 212 : "elected {} among {:?} peers, with {} of them being caughtup",
120 212 : offloader,
121 212 : capable_peers_dbg,
122 212 : caughtup_peers.len()
123 212 : ),
124 212 : )
125 : }
126 : }
127 253 : }
128 :
129 : /// Based on peer information determine which safekeeper should offload; if it
130 : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
131 : /// is running, kill it.
132 253 : async fn update_task(
133 253 : conf: &SafeKeeperConf,
134 253 : ttid: TenantTimelineId,
135 253 : entry: &mut WalBackupTimelineEntry,
136 253 : ) {
137 253 : let alive_peers = entry.timeline.get_peers(conf).await;
138 253 : let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
139 253 : let (offloader, election_dbg_str) =
140 253 : determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
141 253 : let elected_me = Some(conf.my_id) == offloader;
142 253 :
143 253 : if elected_me != (entry.handle.is_some()) {
144 12 : if elected_me {
145 10 : info!("elected for backup: {}", election_dbg_str);
146 :
147 10 : let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
148 10 : let timeline_dir = conf.timeline_dir(&ttid);
149 10 :
150 10 : let handle = tokio::spawn(
151 10 : backup_task_main(
152 10 : ttid,
153 10 : timeline_dir,
154 10 : conf.workdir.clone(),
155 10 : conf.backup_parallel_jobs,
156 10 : shutdown_rx,
157 10 : )
158 10 : .in_current_span(),
159 10 : );
160 10 :
161 10 : entry.handle = Some(WalBackupTaskHandle {
162 10 : shutdown_tx,
163 10 : handle,
164 10 : });
165 : } else {
166 2 : info!("stepping down from backup: {}", election_dbg_str);
167 2 : shut_down_task(ttid, entry).await;
168 : }
169 241 : }
170 253 : }
171 :
172 : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
173 :
174 : // Storage must be configured and initialized when this is called.
175 18 : fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
176 18 : REMOTE_STORAGE
177 18 : .get()
178 18 : .expect("failed to get remote storage")
179 18 : .as_ref()
180 18 : .unwrap()
181 18 : }
182 :
183 : const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
184 :
185 : /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
186 : /// tasks. Having this in separate task simplifies locking, allows to reap
187 : /// panics and separate elections from offloading itself.
188 508 : pub async fn wal_backup_launcher_task_main(
189 508 : conf: SafeKeeperConf,
190 508 : mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
191 508 : ) -> anyhow::Result<()> {
192 508 : info!(
193 508 : "WAL backup launcher started, remote config {:?}",
194 508 : conf.remote_storage
195 508 : );
196 :
197 508 : let conf_ = conf.clone();
198 508 : REMOTE_STORAGE.get_or_init(|| {
199 508 : conf_
200 508 : .remote_storage
201 508 : .as_ref()
202 508 : .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
203 508 : });
204 508 :
205 508 : // Presence in this map means launcher is aware s3 offloading is needed for
206 508 : // the timeline, but task is started only if it makes sense for to offload
207 508 : // from this safekeeper.
208 508 : let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
209 508 :
210 508 : let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
211 21795 : loop {
212 41619 : tokio::select! {
213 12945 : ttid = wal_backup_launcher_rx.recv() => {
214 : // channel is never expected to get closed
215 : let ttid = ttid.unwrap();
216 : if !conf.is_wal_backup_enabled() {
217 : continue; /* just drain the channel and do nothing */
218 : }
219 151 : async {
220 151 : let timeline = is_wal_backup_required(ttid).await;
221 : // do we need to do anything at all?
222 151 : if timeline.is_some() != tasks.contains_key(&ttid) {
223 25 : if let Some(timeline) = timeline {
224 : // need to start the task
225 25 : let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
226 25 : timeline,
227 25 : handle: None,
228 25 : });
229 25 : update_task(&conf, ttid, entry).await;
230 : } else {
231 : // need to stop the task
232 0 : info!("stopping WAL backup task");
233 0 : let mut entry = tasks.remove(&ttid).unwrap();
234 0 : shut_down_task(ttid, &mut entry).await;
235 : }
236 126 : }
237 151 : }.instrument(info_span!("WAL backup", ttid = %ttid)).await;
238 : }
239 : // For each timeline needing offloading, check if this safekeeper
240 : // should do the job and start/stop the task accordingly.
241 : _ = ticker.tick() => {
242 : for (ttid, entry) in tasks.iter_mut() {
243 : update_task(&conf, *ttid, entry)
244 : .instrument(info_span!("WAL backup", ttid = %ttid))
245 : .await;
246 : }
247 : }
248 21795 : }
249 21795 : }
250 : }
251 :
252 : struct WalBackupTask {
253 : timeline: Arc<Timeline>,
254 : timeline_dir: Utf8PathBuf,
255 : workspace_dir: Utf8PathBuf,
256 : wal_seg_size: usize,
257 : parallel_jobs: usize,
258 : commit_lsn_watch_rx: watch::Receiver<Lsn>,
259 : }
260 :
261 : /// Offload single timeline.
262 10 : async fn backup_task_main(
263 10 : ttid: TenantTimelineId,
264 10 : timeline_dir: Utf8PathBuf,
265 10 : workspace_dir: Utf8PathBuf,
266 10 : parallel_jobs: usize,
267 10 : mut shutdown_rx: Receiver<()>,
268 10 : ) {
269 10 : info!("started");
270 10 : let res = GlobalTimelines::get(ttid);
271 10 : if let Err(e) = res {
272 0 : error!("backup error: {}", e);
273 0 : return;
274 10 : }
275 10 : let tli = res.unwrap();
276 :
277 10 : let mut wb = WalBackupTask {
278 10 : wal_seg_size: tli.get_wal_seg_size().await,
279 10 : commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
280 10 : timeline: tli,
281 10 : timeline_dir,
282 10 : workspace_dir,
283 10 : parallel_jobs,
284 10 : };
285 10 :
286 10 : // task is spinned up only when wal_seg_size already initialized
287 10 : assert!(wb.wal_seg_size > 0);
288 :
289 10 : let mut canceled = false;
290 3711 : select! {
291 3711 : _ = wb.run() => {}
292 3711 : _ = shutdown_rx.recv() => {
293 3711 : canceled = true;
294 3711 : }
295 3711 : }
296 2 : info!("task {}", if canceled { "canceled" } else { "terminated" });
297 2 : }
298 :
299 : impl WalBackupTask {
300 10 : async fn run(&mut self) {
301 10 : let mut backup_lsn = Lsn(0);
302 10 :
303 10 : let mut retry_attempt = 0u32;
304 : // offload loop
305 1669 : loop {
306 1669 : if retry_attempt == 0 {
307 : // wait for new WAL to arrive
308 1668 : if let Err(e) = self.commit_lsn_watch_rx.changed().await {
309 : // should never happen, as we hold Arc to timeline.
310 0 : error!("commit_lsn watch shut down: {:?}", e);
311 0 : return;
312 1659 : }
313 : } else {
314 : // or just sleep if we errored previously
315 1 : let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
316 1 : if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
317 1 : {
318 1 : retry_delay = min(retry_delay, backoff_delay);
319 1 : }
320 1 : sleep(Duration::from_millis(retry_delay)).await;
321 : }
322 :
323 1660 : let commit_lsn = *self.commit_lsn_watch_rx.borrow();
324 1660 :
325 1660 : // Note that backup_lsn can be higher than commit_lsn if we
326 1660 : // don't have much local WAL and others already uploaded
327 1660 : // segments we don't even have.
328 1660 : if backup_lsn.segment_number(self.wal_seg_size)
329 1660 : >= commit_lsn.segment_number(self.wal_seg_size)
330 : {
331 1640 : retry_attempt = 0;
332 1640 : continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
333 20 : }
334 20 : // Perhaps peers advanced the position, check shmem value.
335 20 : backup_lsn = self.timeline.get_wal_backup_lsn().await;
336 20 : if backup_lsn.segment_number(self.wal_seg_size)
337 20 : >= commit_lsn.segment_number(self.wal_seg_size)
338 : {
339 7 : retry_attempt = 0;
340 7 : continue;
341 13 : }
342 13 :
343 13 : match backup_lsn_range(
344 13 : &self.timeline,
345 13 : &mut backup_lsn,
346 13 : commit_lsn,
347 13 : self.wal_seg_size,
348 13 : &self.timeline_dir,
349 13 : &self.workspace_dir,
350 13 : self.parallel_jobs,
351 13 : )
352 2044 : .await
353 : {
354 11 : Ok(()) => {
355 11 : retry_attempt = 0;
356 11 : }
357 1 : Err(e) => {
358 1 : error!(
359 1 : "failed while offloading range {}-{}: {:?}",
360 1 : backup_lsn, commit_lsn, e
361 1 : );
362 :
363 1 : retry_attempt = retry_attempt.saturating_add(1);
364 : }
365 : }
366 : }
367 0 : }
368 : }
369 :
370 13 : async fn backup_lsn_range(
371 13 : timeline: &Arc<Timeline>,
372 13 : backup_lsn: &mut Lsn,
373 13 : end_lsn: Lsn,
374 13 : wal_seg_size: usize,
375 13 : timeline_dir: &Utf8Path,
376 13 : workspace_dir: &Utf8Path,
377 13 : parallel_jobs: usize,
378 13 : ) -> Result<()> {
379 13 : if parallel_jobs < 1 {
380 0 : anyhow::bail!("parallel_jobs must be >= 1");
381 13 : }
382 13 :
383 13 : let start_lsn = *backup_lsn;
384 13 : let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
385 13 :
386 13 : // Pool of concurrent upload tasks. We use `FuturesOrdered` to
387 13 : // preserve order of uploads, and update `backup_lsn` only after
388 13 : // all previous uploads are finished.
389 13 : let mut uploads = FuturesOrdered::new();
390 13 : let mut iter = segments.iter();
391 :
392 : loop {
393 40 : let added_task = match iter.next() {
394 15 : Some(s) => {
395 15 : uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
396 15 : true
397 : }
398 25 : None => false,
399 : };
400 :
401 : // Wait for the next segment to upload if we don't have any more segments,
402 : // or if we have too many concurrent uploads.
403 40 : if !added_task || uploads.len() >= parallel_jobs {
404 2042 : let next = uploads.next().await;
405 24 : if let Some(res) = next {
406 : // next segment uploaded
407 13 : let segment = res?;
408 12 : let new_backup_lsn = segment.end_lsn;
409 12 : timeline
410 12 : .set_wal_backup_lsn(new_backup_lsn)
411 2 : .await
412 12 : .context("setting wal_backup_lsn")?;
413 12 : *backup_lsn = new_backup_lsn;
414 : } else {
415 : // no more segments to upload
416 11 : break;
417 : }
418 15 : }
419 : }
420 :
421 11 : info!(
422 11 : "offloaded segnos {:?} up to {}, previous backup_lsn {}",
423 12 : segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
424 11 : end_lsn,
425 11 : start_lsn,
426 11 : );
427 11 : Ok(())
428 12 : }
429 :
430 15 : async fn backup_single_segment(
431 15 : seg: &Segment,
432 15 : timeline_dir: &Utf8Path,
433 15 : workspace_dir: &Utf8Path,
434 15 : ) -> Result<Segment> {
435 15 : let segment_file_path = seg.file_path(timeline_dir)?;
436 15 : let remote_segment_path = segment_file_path
437 15 : .strip_prefix(workspace_dir)
438 15 : .context("Failed to strip workspace dir prefix")
439 15 : .and_then(RemotePath::new)
440 15 : .with_context(|| {
441 0 : format!(
442 0 : "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
443 0 : )
444 15 : })?;
445 :
446 1583 : let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
447 13 : if res.is_ok() {
448 12 : BACKED_UP_SEGMENTS.inc();
449 12 : } else {
450 1 : BACKUP_ERRORS.inc();
451 1 : }
452 13 : res?;
453 0 : debug!("Backup of {} done", segment_file_path);
454 :
455 12 : Ok(*seg)
456 13 : }
457 :
458 0 : #[derive(Debug, Copy, Clone)]
459 : pub struct Segment {
460 : seg_no: XLogSegNo,
461 : start_lsn: Lsn,
462 : end_lsn: Lsn,
463 : }
464 :
465 : impl Segment {
466 15 : pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
467 15 : Self {
468 15 : seg_no,
469 15 : start_lsn,
470 15 : end_lsn,
471 15 : }
472 15 : }
473 :
474 15 : pub fn object_name(self) -> String {
475 15 : XLogFileName(PG_TLI, self.seg_no, self.size())
476 15 : }
477 :
478 15 : pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
479 15 : Ok(timeline_dir.join(self.object_name()))
480 15 : }
481 :
482 30 : pub fn size(self) -> usize {
483 30 : (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
484 30 : }
485 : }
486 :
487 13 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
488 13 : let first_seg = start.segment_number(seg_size);
489 13 : let last_seg = end.segment_number(seg_size);
490 13 :
491 13 : let res: Vec<Segment> = (first_seg..last_seg)
492 15 : .map(|s| {
493 15 : let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
494 15 : let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
495 15 : Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
496 15 : })
497 13 : .collect();
498 13 : res
499 13 : }
500 :
501 15 : async fn backup_object(
502 15 : source_file: &Utf8Path,
503 15 : target_file: &RemotePath,
504 15 : size: usize,
505 15 : ) -> Result<()> {
506 15 : let storage = get_configured_remote_storage();
507 :
508 15 : let file = File::open(&source_file)
509 15 : .await
510 15 : .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
511 :
512 15 : let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
513 15 :
514 1568 : storage.upload_storage_object(file, size, target_file).await
515 13 : }
516 :
517 38 : pub async fn read_object(
518 38 : file_path: &RemotePath,
519 38 : offset: u64,
520 38 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
521 38 : let storage = REMOTE_STORAGE
522 38 : .get()
523 38 : .context("Failed to get remote storage")?
524 38 : .as_ref()
525 38 : .context("No remote storage configured")?;
526 :
527 38 : info!("segment download about to start from remote path {file_path:?} at offset {offset}");
528 :
529 38 : let download = storage
530 38 : .download_storage_object(Some((offset, None)), file_path)
531 113 : .await
532 38 : .with_context(|| {
533 0 : format!("Failed to open WAL segment download stream for remote path {file_path:?}")
534 38 : })?;
535 :
536 38 : let reader = tokio_util::io::StreamReader::new(download.download_stream);
537 38 :
538 38 : let reader = tokio::io::BufReader::with_capacity(BUFFER_SIZE, reader);
539 38 :
540 38 : Ok(Box::pin(reader))
541 38 : }
542 :
543 : /// Delete WAL files for the given timeline. Remote storage must be configured
544 : /// when called.
545 3 : pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
546 3 : let storage = get_configured_remote_storage();
547 3 : let ttid_path = Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string());
548 3 : let remote_path = RemotePath::new(&ttid_path)?;
549 :
550 : // see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
551 : // const Option unwrap is not stable, otherwise it would be const.
552 3 : let batch_size: NonZeroU32 = NonZeroU32::new(1000).unwrap();
553 3 :
554 3 : // A backoff::retry is used here for two reasons:
555 3 : // - To provide a backoff rather than busy-polling the API on errors
556 3 : // - To absorb transient 429/503 conditions without hitting our error
557 3 : // logging path for issues deleting objects.
558 3 : //
559 3 : // Note: listing segments might take a long time if there are many of them.
560 3 : // We don't currently have http requests timeout cancellation, but if/once
561 3 : // we have listing should get streaming interface to make progress.
562 3 : let token = CancellationToken::new(); // not really used
563 3 : backoff::retry(
564 3 : || async {
565 : // Do list-delete in batch_size batches to make progress even if there a lot of files.
566 : // Alternatively we could make list_files return iterator, but it is more complicated and
567 : // I'm not sure deleting while iterating is expected in s3.
568 : loop {
569 4 : let files = storage
570 4 : .list_files(Some(&remote_path), Some(batch_size))
571 28 : .await?;
572 4 : if files.is_empty() {
573 3 : return Ok(()); // done
574 1 : }
575 1 : // (at least) s3 results are sorted, so can log min/max:
576 1 : // "List results are always returned in UTF-8 binary order."
577 1 : info!(
578 1 : "deleting batch of {} WAL segments [{}-{}]",
579 1 : files.len(),
580 1 : files.first().unwrap().object_name().unwrap_or(""),
581 1 : files.last().unwrap().object_name().unwrap_or("")
582 1 : );
583 3 : storage.delete_objects(&files).await?;
584 : }
585 6 : },
586 3 : |_| false,
587 3 : 3,
588 3 : 10,
589 3 : "executing WAL segments deletion batch",
590 3 : &token,
591 3 : )
592 31 : .await
593 3 : .ok_or_else(|| anyhow::anyhow!("canceled"))
594 3 : .and_then(|x| x)?;
595 :
596 3 : Ok(())
597 3 : }
598 :
599 : /// Copy segments from one timeline to another. Used in copy_timeline.
600 48 : pub async fn copy_s3_segments(
601 48 : wal_seg_size: usize,
602 48 : src_ttid: &TenantTimelineId,
603 48 : dst_ttid: &TenantTimelineId,
604 48 : from_segment: XLogSegNo,
605 48 : to_segment: XLogSegNo,
606 48 : ) -> Result<()> {
607 48 : const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
608 48 :
609 48 : let storage = REMOTE_STORAGE
610 48 : .get()
611 48 : .expect("failed to get remote storage")
612 48 : .as_ref()
613 48 : .unwrap();
614 48 :
615 48 : let relative_dst_path =
616 48 : Utf8Path::new(&dst_ttid.tenant_id.to_string()).join(dst_ttid.timeline_id.to_string());
617 :
618 48 : let remote_path = RemotePath::new(&relative_dst_path)?;
619 :
620 191 : let files = storage.list_files(Some(&remote_path), None).await?;
621 48 : let uploaded_segments = &files
622 48 : .iter()
623 48 : .filter_map(|file| file.object_name().map(ToOwned::to_owned))
624 48 : .collect::<HashSet<_>>();
625 :
626 0 : debug!(
627 0 : "these segments have already been uploaded: {:?}",
628 0 : uploaded_segments
629 0 : );
630 :
631 48 : let relative_src_path =
632 48 : Utf8Path::new(&src_ttid.tenant_id.to_string()).join(src_ttid.timeline_id.to_string());
633 :
634 48 : for segno in from_segment..to_segment {
635 36 : if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
636 0 : info!("copied all segments from {} until {}", from_segment, segno);
637 36 : }
638 :
639 36 : let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
640 36 : if uploaded_segments.contains(&segment_name) {
641 24 : continue;
642 12 : }
643 0 : debug!("copying segment {}", segment_name);
644 :
645 12 : let from = RemotePath::new(&relative_src_path.join(&segment_name))?;
646 12 : let to = RemotePath::new(&relative_dst_path.join(&segment_name))?;
647 :
648 48 : storage.copy_object(&from, &to).await?;
649 : }
650 :
651 48 : info!(
652 48 : "finished copying segments from {} until {}",
653 48 : from_segment, to_segment
654 48 : );
655 48 : Ok(())
656 48 : }
|