LCOV - code coverage report
Current view: top level - safekeeper/src - wal_backup.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 91.2 % 432 394
Test Date: 2024-02-07 07:37:29 Functions: 76.4 % 72 55

            Line data    Source code
       1              : use anyhow::{Context, Result};
       2              : 
       3              : use camino::{Utf8Path, Utf8PathBuf};
       4              : use futures::stream::FuturesOrdered;
       5              : use futures::StreamExt;
       6              : use tokio::task::JoinHandle;
       7              : use tokio_util::sync::CancellationToken;
       8              : use utils::backoff;
       9              : use utils::id::NodeId;
      10              : 
      11              : use std::cmp::min;
      12              : use std::collections::{HashMap, HashSet};
      13              : use std::pin::Pin;
      14              : use std::sync::Arc;
      15              : use std::time::Duration;
      16              : 
      17              : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
      18              : use postgres_ffi::XLogFileName;
      19              : use postgres_ffi::{XLogSegNo, PG_TLI};
      20              : use remote_storage::{GenericRemoteStorage, RemotePath};
      21              : use tokio::fs::File;
      22              : 
      23              : use tokio::select;
      24              : use tokio::sync::mpsc::{self, Receiver, Sender};
      25              : use tokio::sync::watch;
      26              : use tokio::time::sleep;
      27              : use tracing::*;
      28              : 
      29              : use utils::{id::TenantTimelineId, lsn::Lsn};
      30              : 
      31              : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
      32              : use crate::timeline::{PeerInfo, Timeline};
      33              : use crate::{GlobalTimelines, SafeKeeperConf};
      34              : 
      35              : use once_cell::sync::OnceCell;
      36              : 
      37              : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
      38              : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
      39              : 
      40              : /// Default buffer size when interfacing with [`tokio::fs::File`].
      41              : const BUFFER_SIZE: usize = 32 * 1024;
      42              : 
      43              : /// Check whether wal backup is required for timeline. If yes, mark that launcher is
      44              : /// aware of current status and return the timeline.
      45          152 : async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
      46          152 :     match GlobalTimelines::get(ttid).ok() {
      47          152 :         Some(tli) => {
      48          152 :             tli.wal_backup_attend().await;
      49          152 :             Some(tli)
      50              :         }
      51            0 :         None => None,
      52              :     }
      53          152 : }
      54              : 
      55              : struct WalBackupTaskHandle {
      56              :     shutdown_tx: Sender<()>,
      57              :     handle: JoinHandle<()>,
      58              : }
      59              : 
      60              : struct WalBackupTimelineEntry {
      61              :     timeline: Arc<Timeline>,
      62              :     handle: Option<WalBackupTaskHandle>,
      63              : }
      64              : 
      65            2 : async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
      66            2 :     if let Some(wb_handle) = entry.handle.take() {
      67              :         // Tell the task to shutdown. Error means task exited earlier, that's ok.
      68            2 :         let _ = wb_handle.shutdown_tx.send(()).await;
      69              :         // Await the task itself. TODO: restart panicked tasks earlier.
      70            2 :         if let Err(e) = wb_handle.handle.await {
      71            0 :             warn!("WAL backup task for {} panicked: {}", ttid, e);
      72            2 :         }
      73            0 :     }
      74            2 : }
      75              : 
      76              : /// The goal is to ensure that normally only one safekeepers offloads. However,
      77              : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
      78              : /// time we have several ones as they PUT the same files. Also,
      79              : /// - frequently changing the offloader would be bad;
      80              : /// - electing seriously lagging safekeeper is undesirable;
      81              : /// So we deterministically choose among the reasonably caught up candidates.
      82              : /// TODO: take into account failed attempts to deal with hypothetical situation
      83              : /// where s3 is unreachable only for some sks.
      84          253 : fn determine_offloader(
      85          253 :     alive_peers: &[PeerInfo],
      86          253 :     wal_backup_lsn: Lsn,
      87          253 :     ttid: TenantTimelineId,
      88          253 :     conf: &SafeKeeperConf,
      89          253 : ) -> (Option<NodeId>, String) {
      90          253 :     // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
      91          253 :     let capable_peers = alive_peers
      92          253 :         .iter()
      93         1857 :         .filter(|p| p.local_start_lsn <= wal_backup_lsn);
      94          619 :     match capable_peers.clone().map(|p| p.commit_lsn).max() {
      95           41 :         None => (None, "no connected peers to elect from".to_string()),
      96          212 :         Some(max_commit_lsn) => {
      97          212 :             let threshold = max_commit_lsn
      98          212 :                 .checked_sub(conf.max_offloader_lag_bytes)
      99          212 :                 .unwrap_or(Lsn(0));
     100          212 :             let mut caughtup_peers = capable_peers
     101          212 :                 .clone()
     102          619 :                 .filter(|p| p.commit_lsn >= threshold)
     103          212 :                 .collect::<Vec<_>>();
     104          426 :             caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
     105          212 : 
     106          212 :             // To distribute the load, shift by timeline_id.
     107          212 :             let offloader = caughtup_peers
     108          212 :                 [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
     109          212 :                 .sk_id;
     110          212 : 
     111          212 :             let mut capable_peers_dbg = capable_peers
     112          619 :                 .map(|p| (p.sk_id, p.commit_lsn))
     113          212 :                 .collect::<Vec<_>>();
     114          426 :             capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
     115          212 :             (
     116          212 :                 Some(offloader),
     117          212 :                 format!(
     118          212 :                     "elected {} among {:?} peers, with {} of them being caughtup",
     119          212 :                     offloader,
     120          212 :                     capable_peers_dbg,
     121          212 :                     caughtup_peers.len()
     122          212 :                 ),
     123          212 :             )
     124              :         }
     125              :     }
     126          253 : }
     127              : 
     128              : /// Based on peer information determine which safekeeper should offload; if it
     129              : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
     130              : /// is running, kill it.
     131          253 : async fn update_task(
     132          253 :     conf: &SafeKeeperConf,
     133          253 :     ttid: TenantTimelineId,
     134          253 :     entry: &mut WalBackupTimelineEntry,
     135          253 : ) {
     136          253 :     let alive_peers = entry.timeline.get_peers(conf).await;
     137          253 :     let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
     138          253 :     let (offloader, election_dbg_str) =
     139          253 :         determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
     140          253 :     let elected_me = Some(conf.my_id) == offloader;
     141          253 : 
     142          253 :     if elected_me != (entry.handle.is_some()) {
     143           11 :         if elected_me {
     144            9 :             info!("elected for backup: {}", election_dbg_str);
     145              : 
     146            9 :             let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
     147            9 :             let timeline_dir = conf.timeline_dir(&ttid);
     148            9 : 
     149            9 :             let handle = tokio::spawn(
     150            9 :                 backup_task_main(
     151            9 :                     ttid,
     152            9 :                     timeline_dir,
     153            9 :                     conf.workdir.clone(),
     154            9 :                     conf.backup_parallel_jobs,
     155            9 :                     shutdown_rx,
     156            9 :                 )
     157            9 :                 .in_current_span(),
     158            9 :             );
     159            9 : 
     160            9 :             entry.handle = Some(WalBackupTaskHandle {
     161            9 :                 shutdown_tx,
     162            9 :                 handle,
     163            9 :             });
     164              :         } else {
     165            2 :             info!("stepping down from backup: {}", election_dbg_str);
     166            2 :             shut_down_task(ttid, entry).await;
     167              :         }
     168          242 :     }
     169          253 : }
     170              : 
     171              : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
     172              : 
     173              : // Storage must be configured and initialized when this is called.
     174           15 : fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
     175           15 :     REMOTE_STORAGE
     176           15 :         .get()
     177           15 :         .expect("failed to get remote storage")
     178           15 :         .as_ref()
     179           15 :         .unwrap()
     180           15 : }
     181              : 
     182              : const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
     183              : 
     184              : /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
     185              : /// tasks. Having this in separate task simplifies locking, allows to reap
     186              : /// panics and separate elections from offloading itself.
     187          510 : pub async fn wal_backup_launcher_task_main(
     188          510 :     conf: SafeKeeperConf,
     189          510 :     mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
     190          510 : ) -> anyhow::Result<()> {
     191          510 :     info!(
     192          510 :         "WAL backup launcher started, remote config {:?}",
     193          510 :         conf.remote_storage
     194          510 :     );
     195              : 
     196          510 :     let conf_ = conf.clone();
     197          510 :     REMOTE_STORAGE.get_or_init(|| {
     198          510 :         conf_
     199          510 :             .remote_storage
     200          510 :             .as_ref()
     201          510 :             .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
     202          510 :     });
     203          510 : 
     204          510 :     // Presence in this map means launcher is aware s3 offloading is needed for
     205          510 :     // the timeline, but task is started only if it makes sense for to offload
     206          510 :     // from this safekeeper.
     207          510 :     let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
     208          510 : 
     209          510 :     let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
     210        21847 :     loop {
     211        41712 :         tokio::select! {
     212        12800 :             ttid = wal_backup_launcher_rx.recv() => {
     213              :                 // channel is never expected to get closed
     214              :                 let ttid = ttid.unwrap();
     215              :                 if !conf.is_wal_backup_enabled() {
     216              :                     continue; /* just drain the channel and do nothing */
     217              :                 }
     218          152 :                 async {
     219          152 :                     let timeline = is_wal_backup_required(ttid).await;
     220              :                     // do we need to do anything at all?
     221          152 :                     if timeline.is_some() != tasks.contains_key(&ttid) {
     222           25 :                         if let Some(timeline) = timeline {
     223              :                             // need to start the task
     224           25 :                             let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
     225           25 :                                 timeline,
     226           25 :                                 handle: None,
     227           25 :                             });
     228           25 :                             update_task(&conf, ttid, entry).await;
     229              :                         } else {
     230              :                             // need to stop the task
     231            0 :                             info!("stopping WAL backup task");
     232            0 :                             let mut entry = tasks.remove(&ttid).unwrap();
     233            0 :                             shut_down_task(ttid, &mut entry).await;
     234              :                         }
     235          127 :                     }
     236          152 :                 }.instrument(info_span!("WAL backup", ttid = %ttid)).await;
     237              :             }
     238              :             // For each timeline needing offloading, check if this safekeeper
     239              :             // should do the job and start/stop the task accordingly.
     240              :             _ = ticker.tick() => {
     241              :                 for (ttid, entry) in tasks.iter_mut() {
     242              :                     update_task(&conf, *ttid, entry)
     243              :                         .instrument(info_span!("WAL backup", ttid = %ttid))
     244              :                         .await;
     245              :                 }
     246              :             }
     247        21847 :         }
     248        21847 :     }
     249              : }
     250              : 
     251              : struct WalBackupTask {
     252              :     timeline: Arc<Timeline>,
     253              :     timeline_dir: Utf8PathBuf,
     254              :     workspace_dir: Utf8PathBuf,
     255              :     wal_seg_size: usize,
     256              :     parallel_jobs: usize,
     257              :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     258              : }
     259              : 
     260              : /// Offload single timeline.
     261            9 : async fn backup_task_main(
     262            9 :     ttid: TenantTimelineId,
     263            9 :     timeline_dir: Utf8PathBuf,
     264            9 :     workspace_dir: Utf8PathBuf,
     265            9 :     parallel_jobs: usize,
     266            9 :     mut shutdown_rx: Receiver<()>,
     267            9 : ) {
     268            9 :     info!("started");
     269            9 :     let res = GlobalTimelines::get(ttid);
     270            9 :     if let Err(e) = res {
     271            0 :         error!("backup error: {}", e);
     272            0 :         return;
     273            9 :     }
     274            9 :     let tli = res.unwrap();
     275              : 
     276            9 :     let mut wb = WalBackupTask {
     277            9 :         wal_seg_size: tli.get_wal_seg_size().await,
     278            9 :         commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
     279            9 :         timeline: tli,
     280            9 :         timeline_dir,
     281            9 :         workspace_dir,
     282            9 :         parallel_jobs,
     283            9 :     };
     284            9 : 
     285            9 :     // task is spinned up only when wal_seg_size already initialized
     286            9 :     assert!(wb.wal_seg_size > 0);
     287              : 
     288            9 :     let mut canceled = false;
     289         3327 :     select! {
     290         3327 :         _ = wb.run() => {}
     291         3327 :         _ = shutdown_rx.recv() => {
     292         3327 :             canceled = true;
     293         3327 :         }
     294         3327 :     }
     295            2 :     info!("task {}", if canceled { "canceled" } else { "terminated" });
     296            2 : }
     297              : 
     298              : impl WalBackupTask {
     299            9 :     async fn run(&mut self) {
     300            9 :         let mut backup_lsn = Lsn(0);
     301            9 : 
     302            9 :         let mut retry_attempt = 0u32;
     303              :         // offload loop
     304         1200 :         loop {
     305         1200 :             if retry_attempt == 0 {
     306              :                 // wait for new WAL to arrive
     307         1227 :                 if let Err(e) = self.commit_lsn_watch_rx.changed().await {
     308              :                     // should never happen, as we hold Arc to timeline.
     309            0 :                     error!("commit_lsn watch shut down: {:?}", e);
     310            0 :                     return;
     311         1191 :                 }
     312              :             } else {
     313              :                 // or just sleep if we errored previously
     314            0 :                 let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
     315            0 :                 if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
     316            0 :                 {
     317            0 :                     retry_delay = min(retry_delay, backoff_delay);
     318            0 :                 }
     319            0 :                 sleep(Duration::from_millis(retry_delay)).await;
     320              :             }
     321              : 
     322         1191 :             let commit_lsn = *self.commit_lsn_watch_rx.borrow();
     323         1191 : 
     324         1191 :             // Note that backup_lsn can be higher than commit_lsn if we
     325         1191 :             // don't have much local WAL and others already uploaded
     326         1191 :             // segments we don't even have.
     327         1191 :             if backup_lsn.segment_number(self.wal_seg_size)
     328         1191 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     329              :             {
     330         1175 :                 retry_attempt = 0;
     331         1175 :                 continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
     332           16 :             }
     333           16 :             // Perhaps peers advanced the position, check shmem value.
     334           16 :             backup_lsn = self.timeline.get_wal_backup_lsn().await;
     335           16 :             if backup_lsn.segment_number(self.wal_seg_size)
     336           16 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     337              :             {
     338            4 :                 retry_attempt = 0;
     339            4 :                 continue;
     340           12 :             }
     341           12 : 
     342           12 :             match backup_lsn_range(
     343           12 :                 &self.timeline,
     344           12 :                 &mut backup_lsn,
     345           12 :                 commit_lsn,
     346           12 :                 self.wal_seg_size,
     347           12 :                 &self.timeline_dir,
     348           12 :                 &self.workspace_dir,
     349           12 :                 self.parallel_jobs,
     350           12 :             )
     351         2087 :             .await
     352              :             {
     353           12 :                 Ok(()) => {
     354           12 :                     retry_attempt = 0;
     355           12 :                 }
     356            0 :                 Err(e) => {
     357            0 :                     error!(
     358            0 :                         "failed while offloading range {}-{}: {:?}",
     359            0 :                         backup_lsn, commit_lsn, e
     360            0 :                     );
     361              : 
     362            0 :                     retry_attempt = retry_attempt.saturating_add(1);
     363              :                 }
     364              :             }
     365              :         }
     366            0 :     }
     367              : }
     368              : 
     369           12 : async fn backup_lsn_range(
     370           12 :     timeline: &Arc<Timeline>,
     371           12 :     backup_lsn: &mut Lsn,
     372           12 :     end_lsn: Lsn,
     373           12 :     wal_seg_size: usize,
     374           12 :     timeline_dir: &Utf8Path,
     375           12 :     workspace_dir: &Utf8Path,
     376           12 :     parallel_jobs: usize,
     377           12 : ) -> Result<()> {
     378           12 :     if parallel_jobs < 1 {
     379            0 :         anyhow::bail!("parallel_jobs must be >= 1");
     380           12 :     }
     381           12 : 
     382           12 :     let start_lsn = *backup_lsn;
     383           12 :     let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
     384           12 : 
     385           12 :     // Pool of concurrent upload tasks. We use `FuturesOrdered` to
     386           12 :     // preserve order of uploads, and update `backup_lsn` only after
     387           12 :     // all previous uploads are finished.
     388           12 :     let mut uploads = FuturesOrdered::new();
     389           12 :     let mut iter = segments.iter();
     390              : 
     391              :     loop {
     392           36 :         let added_task = match iter.next() {
     393           12 :             Some(s) => {
     394           12 :                 uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
     395           12 :                 true
     396              :             }
     397           24 :             None => false,
     398              :         };
     399              : 
     400              :         // Wait for the next segment to upload if we don't have any more segments,
     401              :         // or if we have too many concurrent uploads.
     402           36 :         if !added_task || uploads.len() >= parallel_jobs {
     403         2086 :             let next = uploads.next().await;
     404           24 :             if let Some(res) = next {
     405              :                 // next segment uploaded
     406           12 :                 let segment = res?;
     407           12 :                 let new_backup_lsn = segment.end_lsn;
     408           12 :                 timeline
     409           12 :                     .set_wal_backup_lsn(new_backup_lsn)
     410            1 :                     .await
     411           12 :                     .context("setting wal_backup_lsn")?;
     412           12 :                 *backup_lsn = new_backup_lsn;
     413              :             } else {
     414              :                 // no more segments to upload
     415           12 :                 break;
     416              :             }
     417           12 :         }
     418              :     }
     419              : 
     420           12 :     info!(
     421           12 :         "offloaded segnos {:?} up to {}, previous backup_lsn {}",
     422           12 :         segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
     423           12 :         end_lsn,
     424           12 :         start_lsn,
     425           12 :     );
     426           12 :     Ok(())
     427           12 : }
     428              : 
     429           12 : async fn backup_single_segment(
     430           12 :     seg: &Segment,
     431           12 :     timeline_dir: &Utf8Path,
     432           12 :     workspace_dir: &Utf8Path,
     433           12 : ) -> Result<Segment> {
     434           12 :     let segment_file_path = seg.file_path(timeline_dir)?;
     435           12 :     let remote_segment_path = segment_file_path
     436           12 :         .strip_prefix(workspace_dir)
     437           12 :         .context("Failed to strip workspace dir prefix")
     438           12 :         .and_then(RemotePath::new)
     439           12 :         .with_context(|| {
     440            0 :             format!(
     441            0 :                 "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
     442            0 :             )
     443           12 :         })?;
     444              : 
     445         1642 :     let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
     446           12 :     if res.is_ok() {
     447           12 :         BACKED_UP_SEGMENTS.inc();
     448           12 :     } else {
     449            0 :         BACKUP_ERRORS.inc();
     450            0 :     }
     451           12 :     res?;
     452            0 :     debug!("Backup of {} done", segment_file_path);
     453              : 
     454           12 :     Ok(*seg)
     455           12 : }
     456              : 
     457            0 : #[derive(Debug, Copy, Clone)]
     458              : pub struct Segment {
     459              :     seg_no: XLogSegNo,
     460              :     start_lsn: Lsn,
     461              :     end_lsn: Lsn,
     462              : }
     463              : 
     464              : impl Segment {
     465           12 :     pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
     466           12 :         Self {
     467           12 :             seg_no,
     468           12 :             start_lsn,
     469           12 :             end_lsn,
     470           12 :         }
     471           12 :     }
     472              : 
     473           12 :     pub fn object_name(self) -> String {
     474           12 :         XLogFileName(PG_TLI, self.seg_no, self.size())
     475           12 :     }
     476              : 
     477           12 :     pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
     478           12 :         Ok(timeline_dir.join(self.object_name()))
     479           12 :     }
     480              : 
     481           24 :     pub fn size(self) -> usize {
     482           24 :         (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
     483           24 :     }
     484              : }
     485              : 
     486           12 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
     487           12 :     let first_seg = start.segment_number(seg_size);
     488           12 :     let last_seg = end.segment_number(seg_size);
     489           12 : 
     490           12 :     let res: Vec<Segment> = (first_seg..last_seg)
     491           12 :         .map(|s| {
     492           12 :             let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
     493           12 :             let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
     494           12 :             Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
     495           12 :         })
     496           12 :         .collect();
     497           12 :     res
     498           12 : }
     499              : 
     500           12 : async fn backup_object(
     501           12 :     source_file: &Utf8Path,
     502           12 :     target_file: &RemotePath,
     503           12 :     size: usize,
     504           12 : ) -> Result<()> {
     505           12 :     let storage = get_configured_remote_storage();
     506              : 
     507           12 :     let file = File::open(&source_file)
     508           12 :         .await
     509           12 :         .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
     510              : 
     511           12 :     let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
     512           12 : 
     513         1630 :     storage.upload_storage_object(file, size, target_file).await
     514           12 : }
     515              : 
     516           38 : pub async fn read_object(
     517           38 :     file_path: &RemotePath,
     518           38 :     offset: u64,
     519           38 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
     520           38 :     let storage = REMOTE_STORAGE
     521           38 :         .get()
     522           38 :         .context("Failed to get remote storage")?
     523           38 :         .as_ref()
     524           38 :         .context("No remote storage configured")?;
     525              : 
     526           38 :     info!("segment download about to start from remote path {file_path:?} at offset {offset}");
     527              : 
     528           38 :     let download = storage
     529           38 :         .download_storage_object(Some((offset, None)), file_path)
     530          111 :         .await
     531           38 :         .with_context(|| {
     532            0 :             format!("Failed to open WAL segment download stream for remote path {file_path:?}")
     533           38 :         })?;
     534              : 
     535           38 :     let reader = tokio_util::io::StreamReader::new(download.download_stream);
     536           38 : 
     537           38 :     let reader = tokio::io::BufReader::with_capacity(BUFFER_SIZE, reader);
     538           38 : 
     539           38 :     Ok(Box::pin(reader))
     540           38 : }
     541              : 
     542              : /// Delete WAL files for the given timeline. Remote storage must be configured
     543              : /// when called.
     544            3 : pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
     545            3 :     let storage = get_configured_remote_storage();
     546            3 :     let ttid_path = Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string());
     547            3 :     let remote_path = RemotePath::new(&ttid_path)?;
     548              : 
     549              :     // A backoff::retry is used here for two reasons:
     550              :     // - To provide a backoff rather than busy-polling the API on errors
     551              :     // - To absorb transient 429/503 conditions without hitting our error
     552              :     //   logging path for issues deleting objects.
     553              :     //
     554              :     // Note: listing segments might take a long time if there are many of them.
     555              :     // We don't currently have http requests timeout cancellation, but if/once
     556              :     // we have listing should get streaming interface to make progress.
     557            3 :     let token = CancellationToken::new(); // not really used
     558            3 :     backoff::retry(
     559            3 :         || async {
     560           16 :             let files = storage.list_files(Some(&remote_path)).await?;
     561            3 :             storage.delete_objects(&files).await
     562            3 :         },
     563            3 :         |_| false,
     564            3 :         3,
     565            3 :         10,
     566            3 :         "executing WAL segments deletion batch",
     567            3 :         &token,
     568            3 :     )
     569           18 :     .await
     570            3 :     .ok_or_else(|| anyhow::anyhow!("canceled"))
     571            3 :     .and_then(|x| x)?;
     572              : 
     573            3 :     Ok(())
     574            3 : }
     575              : 
     576              : /// Copy segments from one timeline to another. Used in copy_timeline.
     577           48 : pub async fn copy_s3_segments(
     578           48 :     wal_seg_size: usize,
     579           48 :     src_ttid: &TenantTimelineId,
     580           48 :     dst_ttid: &TenantTimelineId,
     581           48 :     from_segment: XLogSegNo,
     582           48 :     to_segment: XLogSegNo,
     583           48 : ) -> Result<()> {
     584           48 :     const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
     585           48 : 
     586           48 :     let storage = REMOTE_STORAGE
     587           48 :         .get()
     588           48 :         .expect("failed to get remote storage")
     589           48 :         .as_ref()
     590           48 :         .unwrap();
     591           48 : 
     592           48 :     let relative_dst_path =
     593           48 :         Utf8Path::new(&dst_ttid.tenant_id.to_string()).join(dst_ttid.timeline_id.to_string());
     594              : 
     595           48 :     let remote_path = RemotePath::new(&relative_dst_path)?;
     596              : 
     597          191 :     let files = storage.list_files(Some(&remote_path)).await?;
     598           48 :     let uploaded_segments = &files
     599           48 :         .iter()
     600           48 :         .filter_map(|file| file.object_name().map(ToOwned::to_owned))
     601           48 :         .collect::<HashSet<_>>();
     602              : 
     603            0 :     debug!(
     604            0 :         "these segments have already been uploaded: {:?}",
     605            0 :         uploaded_segments
     606            0 :     );
     607              : 
     608           48 :     let relative_src_path =
     609           48 :         Utf8Path::new(&src_ttid.tenant_id.to_string()).join(src_ttid.timeline_id.to_string());
     610              : 
     611           48 :     for segno in from_segment..to_segment {
     612           36 :         if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
     613            0 :             info!("copied all segments from {} until {}", from_segment, segno);
     614           36 :         }
     615              : 
     616           36 :         let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     617           36 :         if uploaded_segments.contains(&segment_name) {
     618           24 :             continue;
     619           12 :         }
     620            0 :         debug!("copying segment {}", segment_name);
     621              : 
     622           12 :         let from = RemotePath::new(&relative_src_path.join(&segment_name))?;
     623           12 :         let to = RemotePath::new(&relative_dst_path.join(&segment_name))?;
     624              : 
     625           48 :         storage.copy_object(&from, &to).await?;
     626              :     }
     627              : 
     628           48 :     info!(
     629           48 :         "finished copying segments from {} until {}",
     630           48 :         from_segment, to_segment
     631           48 :     );
     632           48 :     Ok(())
     633           48 : }
        

Generated by: LCOV version 2.1-beta