LCOV - differential code coverage report
Current view: top level - safekeeper/src - wal_backup.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 90.7 % 408 370 14 24 370
Current Date: 2024-01-09 02:06:09 Functions: 76.9 % 65 50 1 14 50
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use anyhow::{Context, Result};
       2                 : 
       3                 : use camino::{Utf8Path, Utf8PathBuf};
       4                 : use futures::stream::FuturesOrdered;
       5                 : use futures::StreamExt;
       6                 : use tokio::task::JoinHandle;
       7                 : use utils::id::NodeId;
       8                 : 
       9                 : use std::cmp::min;
      10                 : use std::collections::{HashMap, HashSet};
      11                 : use std::pin::Pin;
      12                 : use std::sync::Arc;
      13                 : use std::time::Duration;
      14                 : 
      15                 : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
      16                 : use postgres_ffi::XLogFileName;
      17                 : use postgres_ffi::{XLogSegNo, PG_TLI};
      18                 : use remote_storage::{GenericRemoteStorage, RemotePath};
      19                 : use tokio::fs::File;
      20                 : 
      21                 : use tokio::select;
      22                 : use tokio::sync::mpsc::{self, Receiver, Sender};
      23                 : use tokio::sync::watch;
      24                 : use tokio::time::sleep;
      25                 : use tracing::*;
      26                 : 
      27                 : use utils::{id::TenantTimelineId, lsn::Lsn};
      28                 : 
      29                 : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
      30                 : use crate::timeline::{PeerInfo, Timeline};
      31                 : use crate::{GlobalTimelines, SafeKeeperConf};
      32                 : 
      33                 : use once_cell::sync::OnceCell;
      34                 : 
      35                 : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
      36                 : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
      37                 : 
      38                 : /// Default buffer size when interfacing with [`tokio::fs::File`].
      39                 : const BUFFER_SIZE: usize = 32 * 1024;
      40                 : 
      41                 : /// Check whether wal backup is required for timeline. If yes, mark that launcher is
      42                 : /// aware of current status and return the timeline.
      43 CBC         146 : async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
      44             146 :     match GlobalTimelines::get(ttid).ok() {
      45             146 :         Some(tli) => {
      46             146 :             tli.wal_backup_attend().await;
      47             146 :             Some(tli)
      48                 :         }
      49 UBC           0 :         None => None,
      50                 :     }
      51 CBC         146 : }
      52                 : 
      53                 : struct WalBackupTaskHandle {
      54                 :     shutdown_tx: Sender<()>,
      55                 :     handle: JoinHandle<()>,
      56                 : }
      57                 : 
      58                 : struct WalBackupTimelineEntry {
      59                 :     timeline: Arc<Timeline>,
      60                 :     handle: Option<WalBackupTaskHandle>,
      61                 : }
      62                 : 
      63               3 : async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
      64               3 :     if let Some(wb_handle) = entry.handle.take() {
      65                 :         // Tell the task to shutdown. Error means task exited earlier, that's ok.
      66               3 :         let _ = wb_handle.shutdown_tx.send(()).await;
      67                 :         // Await the task itself. TODO: restart panicked tasks earlier.
      68               3 :         if let Err(e) = wb_handle.handle.await {
      69 UBC           0 :             warn!("WAL backup task for {} panicked: {}", ttid, e);
      70 CBC           3 :         }
      71 UBC           0 :     }
      72 CBC           3 : }
      73                 : 
      74                 : /// The goal is to ensure that normally only one safekeepers offloads. However,
      75                 : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
      76                 : /// time we have several ones as they PUT the same files. Also,
      77                 : /// - frequently changing the offloader would be bad;
      78                 : /// - electing seriously lagging safekeeper is undesirable;
      79                 : /// So we deterministically choose among the reasonably caught up candidates.
      80                 : /// TODO: take into account failed attempts to deal with hypothetical situation
      81                 : /// where s3 is unreachable only for some sks.
      82             291 : fn determine_offloader(
      83             291 :     alive_peers: &[PeerInfo],
      84             291 :     wal_backup_lsn: Lsn,
      85             291 :     ttid: TenantTimelineId,
      86             291 :     conf: &SafeKeeperConf,
      87             291 : ) -> (Option<NodeId>, String) {
      88             291 :     // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
      89             291 :     let capable_peers = alive_peers
      90             291 :         .iter()
      91            2256 :         .filter(|p| p.local_start_lsn <= wal_backup_lsn);
      92             752 :     match capable_peers.clone().map(|p| p.commit_lsn).max() {
      93              28 :         None => (None, "no connected peers to elect from".to_string()),
      94             263 :         Some(max_commit_lsn) => {
      95             263 :             let threshold = max_commit_lsn
      96             263 :                 .checked_sub(conf.max_offloader_lag_bytes)
      97             263 :                 .unwrap_or(Lsn(0));
      98             263 :             let mut caughtup_peers = capable_peers
      99             263 :                 .clone()
     100             752 :                 .filter(|p| p.commit_lsn >= threshold)
     101             263 :                 .collect::<Vec<_>>();
     102             498 :             caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
     103             263 : 
     104             263 :             // To distribute the load, shift by timeline_id.
     105             263 :             let offloader = caughtup_peers
     106             263 :                 [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
     107             263 :                 .sk_id;
     108             263 : 
     109             263 :             let mut capable_peers_dbg = capable_peers
     110             752 :                 .map(|p| (p.sk_id, p.commit_lsn))
     111             263 :                 .collect::<Vec<_>>();
     112             498 :             capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
     113             263 :             (
     114             263 :                 Some(offloader),
     115             263 :                 format!(
     116             263 :                     "elected {} among {:?} peers, with {} of them being caughtup",
     117             263 :                     offloader,
     118             263 :                     capable_peers_dbg,
     119             263 :                     caughtup_peers.len()
     120             263 :                 ),
     121             263 :             )
     122                 :         }
     123                 :     }
     124             291 : }
     125                 : 
     126                 : /// Based on peer information determine which safekeeper should offload; if it
     127                 : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
     128                 : /// is running, kill it.
     129             291 : async fn update_task(
     130             291 :     conf: &SafeKeeperConf,
     131             291 :     ttid: TenantTimelineId,
     132             291 :     entry: &mut WalBackupTimelineEntry,
     133             291 : ) {
     134             291 :     let alive_peers = entry.timeline.get_peers(conf).await;
     135             291 :     let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
     136             291 :     let (offloader, election_dbg_str) =
     137             291 :         determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
     138             291 :     let elected_me = Some(conf.my_id) == offloader;
     139             291 : 
     140             291 :     if elected_me != (entry.handle.is_some()) {
     141              15 :         if elected_me {
     142              12 :             info!("elected for backup: {}", election_dbg_str);
     143                 : 
     144              12 :             let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
     145              12 :             let timeline_dir = conf.timeline_dir(&ttid);
     146              12 : 
     147              12 :             let handle = tokio::spawn(
     148              12 :                 backup_task_main(
     149              12 :                     ttid,
     150              12 :                     timeline_dir,
     151              12 :                     conf.workdir.clone(),
     152              12 :                     conf.backup_parallel_jobs,
     153              12 :                     shutdown_rx,
     154              12 :                 )
     155              12 :                 .in_current_span(),
     156              12 :             );
     157              12 : 
     158              12 :             entry.handle = Some(WalBackupTaskHandle {
     159              12 :                 shutdown_tx,
     160              12 :                 handle,
     161              12 :             });
     162                 :         } else {
     163               3 :             info!("stepping down from backup: {}", election_dbg_str);
     164               3 :             shut_down_task(ttid, entry).await;
     165                 :         }
     166             276 :     }
     167             291 : }
     168                 : 
     169                 : const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
     170                 : 
     171                 : /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
     172                 : /// tasks. Having this in separate task simplifies locking, allows to reap
     173                 : /// panics and separate elections from offloading itself.
     174             485 : pub async fn wal_backup_launcher_task_main(
     175             485 :     conf: SafeKeeperConf,
     176             485 :     mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
     177             485 : ) -> anyhow::Result<()> {
     178             485 :     info!(
     179             485 :         "WAL backup launcher started, remote config {:?}",
     180             485 :         conf.remote_storage
     181             485 :     );
     182                 : 
     183             485 :     let conf_ = conf.clone();
     184             485 :     REMOTE_STORAGE.get_or_init(|| {
     185             485 :         conf_
     186             485 :             .remote_storage
     187             485 :             .as_ref()
     188             485 :             .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
     189             485 :     });
     190             485 : 
     191             485 :     // Presence in this map means launcher is aware s3 offloading is needed for
     192             485 :     // the timeline, but task is started only if it makes sense for to offload
     193             485 :     // from this safekeeper.
     194             485 :     let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
     195             485 : 
     196             485 :     let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
     197           23160 :     loop {
     198           44779 :         tokio::select! {
     199           13408 :             ttid = wal_backup_launcher_rx.recv() => {
     200                 :                 // channel is never expected to get closed
     201                 :                 let ttid = ttid.unwrap();
     202                 :                 if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
     203                 :                     continue; /* just drain the channel and do nothing */
     204                 :                 }
     205             146 :                 async {
     206             146 :                     let timeline = is_wal_backup_required(ttid).await;
     207                 :                     // do we need to do anything at all?
     208             146 :                     if timeline.is_some() != tasks.contains_key(&ttid) {
     209              24 :                         if let Some(timeline) = timeline {
     210                 :                             // need to start the task
     211              24 :                             let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
     212              24 :                                 timeline,
     213              24 :                                 handle: None,
     214              24 :                             });
     215              24 :                             update_task(&conf, ttid, entry).await;
     216                 :                         } else {
     217                 :                             // need to stop the task
     218 UBC           0 :                             info!("stopping WAL backup task");
     219               0 :                             let mut entry = tasks.remove(&ttid).unwrap();
     220               0 :                             shut_down_task(ttid, &mut entry).await;
     221                 :                         }
     222 CBC         122 :                     }
     223             146 :                 }.instrument(info_span!("WAL backup", ttid = %ttid)).await;
     224                 :             }
     225                 :             // For each timeline needing offloading, check if this safekeeper
     226                 :             // should do the job and start/stop the task accordingly.
     227                 :             _ = ticker.tick() => {
     228                 :                 for (ttid, entry) in tasks.iter_mut() {
     229                 :                     update_task(&conf, *ttid, entry)
     230                 :                         .instrument(info_span!("WAL backup", ttid = %ttid))
     231                 :                         .await;
     232                 :                 }
     233                 :             }
     234           23160 :         }
     235           23160 :     }
     236                 : }
     237                 : 
     238                 : struct WalBackupTask {
     239                 :     timeline: Arc<Timeline>,
     240                 :     timeline_dir: Utf8PathBuf,
     241                 :     workspace_dir: Utf8PathBuf,
     242                 :     wal_seg_size: usize,
     243                 :     parallel_jobs: usize,
     244                 :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     245                 : }
     246                 : 
     247                 : /// Offload single timeline.
     248              12 : async fn backup_task_main(
     249              12 :     ttid: TenantTimelineId,
     250              12 :     timeline_dir: Utf8PathBuf,
     251              12 :     workspace_dir: Utf8PathBuf,
     252              12 :     parallel_jobs: usize,
     253              12 :     mut shutdown_rx: Receiver<()>,
     254              12 : ) {
     255              12 :     info!("started");
     256              12 :     let res = GlobalTimelines::get(ttid);
     257              12 :     if let Err(e) = res {
     258 UBC           0 :         error!("backup error: {}", e);
     259               0 :         return;
     260 CBC          12 :     }
     261              12 :     let tli = res.unwrap();
     262                 : 
     263              12 :     let mut wb = WalBackupTask {
     264              12 :         wal_seg_size: tli.get_wal_seg_size().await,
     265              12 :         commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
     266              12 :         timeline: tli,
     267              12 :         timeline_dir,
     268              12 :         workspace_dir,
     269              12 :         parallel_jobs,
     270              12 :     };
     271              12 : 
     272              12 :     // task is spinned up only when wal_seg_size already initialized
     273              12 :     assert!(wb.wal_seg_size > 0);
     274                 : 
     275              12 :     let mut canceled = false;
     276            6880 :     select! {
     277            6880 :         _ = wb.run() => {}
     278            6880 :         _ = shutdown_rx.recv() => {
     279            6880 :             canceled = true;
     280            6880 :         }
     281            6880 :     }
     282               3 :     info!("task {}", if canceled { "canceled" } else { "terminated" });
     283               3 : }
     284                 : 
     285                 : impl WalBackupTask {
     286              12 :     async fn run(&mut self) {
     287              12 :         let mut backup_lsn = Lsn(0);
     288              12 : 
     289              12 :         let mut retry_attempt = 0u32;
     290                 :         // offload loop
     291            1475 :         loop {
     292            1475 :             if retry_attempt == 0 {
     293                 :                 // wait for new WAL to arrive
     294            1533 :                 if let Err(e) = self.commit_lsn_watch_rx.changed().await {
     295                 :                     // should never happen, as we hold Arc to timeline.
     296 UBC           0 :                     error!("commit_lsn watch shut down: {:?}", e);
     297               0 :                     return;
     298 CBC        1464 :                 }
     299                 :             } else {
     300                 :                 // or just sleep if we errored previously
     301 LBC         (2) :                 let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
     302             (2) :                 if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
     303             (2) :                 {
     304             (2) :                     retry_delay = min(retry_delay, backoff_delay);
     305             (2) :                 }
     306             (2) :                 sleep(Duration::from_millis(retry_delay)).await;
     307                 :             }
     308                 : 
     309 CBC        1464 :             let commit_lsn = *self.commit_lsn_watch_rx.borrow();
     310            1464 : 
     311            1464 :             // Note that backup_lsn can be higher than commit_lsn if we
     312            1464 :             // don't have much local WAL and others already uploaded
     313            1464 :             // segments we don't even have.
     314            1464 :             if backup_lsn.segment_number(self.wal_seg_size)
     315            1464 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     316                 :             {
     317            1443 :                 retry_attempt = 0;
     318            1443 :                 continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
     319              21 :             }
     320              21 :             // Perhaps peers advanced the position, check shmem value.
     321              21 :             backup_lsn = self.timeline.get_wal_backup_lsn().await;
     322              21 :             if backup_lsn.segment_number(self.wal_seg_size)
     323              21 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     324                 :             {
     325               8 :                 retry_attempt = 0;
     326               8 :                 continue;
     327              13 :             }
     328              13 : 
     329              13 :             match backup_lsn_range(
     330              13 :                 &self.timeline,
     331              13 :                 &mut backup_lsn,
     332              13 :                 commit_lsn,
     333              13 :                 self.wal_seg_size,
     334              13 :                 &self.timeline_dir,
     335              13 :                 &self.workspace_dir,
     336              13 :                 self.parallel_jobs,
     337              13 :             )
     338            5331 :             .await
     339                 :             {
     340              12 :                 Ok(()) => {
     341              12 :                     retry_attempt = 0;
     342              12 :                 }
     343 LBC         (2) :                 Err(e) => {
     344             (2) :                     error!(
     345             (2) :                         "failed while offloading range {}-{}: {:?}",
     346             (2) :                         backup_lsn, commit_lsn, e
     347             (2) :                     );
     348                 : 
     349             (2) :                     retry_attempt = retry_attempt.saturating_add(1);
     350                 :                 }
     351                 :             }
     352                 :         }
     353 UBC           0 :     }
     354                 : }
     355                 : 
     356 CBC          13 : async fn backup_lsn_range(
     357              13 :     timeline: &Arc<Timeline>,
     358              13 :     backup_lsn: &mut Lsn,
     359              13 :     end_lsn: Lsn,
     360              13 :     wal_seg_size: usize,
     361              13 :     timeline_dir: &Utf8Path,
     362              13 :     workspace_dir: &Utf8Path,
     363              13 :     parallel_jobs: usize,
     364              13 : ) -> Result<()> {
     365              13 :     if parallel_jobs < 1 {
     366 UBC           0 :         anyhow::bail!("parallel_jobs must be >= 1");
     367 CBC          13 :     }
     368              13 : 
     369              13 :     let start_lsn = *backup_lsn;
     370              13 :     let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
     371              13 : 
     372              13 :     // Pool of concurrent upload tasks. We use `FuturesOrdered` to
     373              13 :     // preserve order of uploads, and update `backup_lsn` only after
     374              13 :     // all previous uploads are finished.
     375              13 :     let mut uploads = FuturesOrdered::new();
     376              13 :     let mut iter = segments.iter();
     377                 : 
     378                 :     loop {
     379              40 :         let added_task = match iter.next() {
     380              14 :             Some(s) => {
     381              14 :                 uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
     382              14 :                 true
     383                 :             }
     384              26 :             None => false,
     385                 :         };
     386                 : 
     387                 :         // Wait for the next segment to upload if we don't have any more segments,
     388                 :         // or if we have too many concurrent uploads.
     389              40 :         if !added_task || uploads.len() >= parallel_jobs {
     390            5327 :             let next = uploads.next().await;
     391              25 :             if let Some(res) = next {
     392                 :                 // next segment uploaded
     393              13 :                 let segment = res?;
     394              13 :                 let new_backup_lsn = segment.end_lsn;
     395              13 :                 timeline
     396              13 :                     .set_wal_backup_lsn(new_backup_lsn)
     397               4 :                     .await
     398              13 :                     .context("setting wal_backup_lsn")?;
     399              13 :                 *backup_lsn = new_backup_lsn;
     400                 :             } else {
     401                 :                 // no more segments to upload
     402              12 :                 break;
     403                 :             }
     404              14 :         }
     405                 :     }
     406                 : 
     407              12 :     info!(
     408              12 :         "offloaded segnos {:?} up to {}, previous backup_lsn {}",
     409              13 :         segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
     410              12 :         end_lsn,
     411              12 :         start_lsn,
     412              12 :     );
     413              12 :     Ok(())
     414              12 : }
     415                 : 
     416              14 : async fn backup_single_segment(
     417              14 :     seg: &Segment,
     418              14 :     timeline_dir: &Utf8Path,
     419              14 :     workspace_dir: &Utf8Path,
     420              14 : ) -> Result<Segment> {
     421              14 :     let segment_file_path = seg.file_path(timeline_dir)?;
     422              14 :     let remote_segment_path = segment_file_path
     423              14 :         .strip_prefix(workspace_dir)
     424              14 :         .context("Failed to strip workspace dir prefix")
     425              14 :         .and_then(RemotePath::new)
     426              14 :         .with_context(|| {
     427 UBC           0 :             format!(
     428               0 :                 "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
     429               0 :             )
     430 CBC          14 :         })?;
     431                 : 
     432            4796 :     let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
     433              13 :     if res.is_ok() {
     434              13 :         BACKED_UP_SEGMENTS.inc();
     435              13 :     } else {
     436 LBC         (2) :         BACKUP_ERRORS.inc();
     437             (2) :     }
     438 CBC          13 :     res?;
     439 UBC           0 :     debug!("Backup of {} done", segment_file_path);
     440                 : 
     441 CBC          13 :     Ok(*seg)
     442              13 : }
     443                 : 
     444 UBC           0 : #[derive(Debug, Copy, Clone)]
     445                 : pub struct Segment {
     446                 :     seg_no: XLogSegNo,
     447                 :     start_lsn: Lsn,
     448                 :     end_lsn: Lsn,
     449                 : }
     450                 : 
     451                 : impl Segment {
     452 CBC          14 :     pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
     453              14 :         Self {
     454              14 :             seg_no,
     455              14 :             start_lsn,
     456              14 :             end_lsn,
     457              14 :         }
     458              14 :     }
     459                 : 
     460              14 :     pub fn object_name(self) -> String {
     461              14 :         XLogFileName(PG_TLI, self.seg_no, self.size())
     462              14 :     }
     463                 : 
     464              14 :     pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
     465              14 :         Ok(timeline_dir.join(self.object_name()))
     466              14 :     }
     467                 : 
     468              28 :     pub fn size(self) -> usize {
     469              28 :         (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
     470              28 :     }
     471                 : }
     472                 : 
     473              13 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
     474              13 :     let first_seg = start.segment_number(seg_size);
     475              13 :     let last_seg = end.segment_number(seg_size);
     476              13 : 
     477              13 :     let res: Vec<Segment> = (first_seg..last_seg)
     478              14 :         .map(|s| {
     479              14 :             let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
     480              14 :             let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
     481              14 :             Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
     482              14 :         })
     483              13 :         .collect();
     484              13 :     res
     485              13 : }
     486                 : 
     487                 : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
     488                 : 
     489              14 : async fn backup_object(
     490              14 :     source_file: &Utf8Path,
     491              14 :     target_file: &RemotePath,
     492              14 :     size: usize,
     493              14 : ) -> Result<()> {
     494              14 :     let storage = REMOTE_STORAGE
     495              14 :         .get()
     496              14 :         .expect("failed to get remote storage")
     497              14 :         .as_ref()
     498              14 :         .unwrap();
     499                 : 
     500              14 :     let file = File::open(&source_file)
     501              14 :         .await
     502              14 :         .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
     503                 : 
     504              14 :     let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
     505              14 : 
     506            4782 :     storage.upload_storage_object(file, size, target_file).await
     507              13 : }
     508                 : 
     509              38 : pub async fn read_object(
     510              38 :     file_path: &RemotePath,
     511              38 :     offset: u64,
     512              38 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
     513              38 :     let storage = REMOTE_STORAGE
     514              38 :         .get()
     515              38 :         .context("Failed to get remote storage")?
     516              38 :         .as_ref()
     517              38 :         .context("No remote storage configured")?;
     518                 : 
     519              38 :     info!("segment download about to start from remote path {file_path:?} at offset {offset}");
     520                 : 
     521              38 :     let download = storage
     522              38 :         .download_storage_object(Some((offset, None)), file_path)
     523             110 :         .await
     524              38 :         .with_context(|| {
     525 UBC           0 :             format!("Failed to open WAL segment download stream for remote path {file_path:?}")
     526 CBC          38 :         })?;
     527                 : 
     528              38 :     let reader = tokio_util::io::StreamReader::new(download.download_stream);
     529              38 : 
     530              38 :     let reader = tokio::io::BufReader::with_capacity(BUFFER_SIZE, reader);
     531              38 : 
     532              38 :     Ok(Box::pin(reader))
     533              38 : }
     534                 : 
     535                 : /// Copy segments from one timeline to another. Used in copy_timeline.
     536              48 : pub async fn copy_s3_segments(
     537              48 :     wal_seg_size: usize,
     538              48 :     src_ttid: &TenantTimelineId,
     539              48 :     dst_ttid: &TenantTimelineId,
     540              48 :     from_segment: XLogSegNo,
     541              48 :     to_segment: XLogSegNo,
     542              48 : ) -> Result<()> {
     543              48 :     const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
     544              48 : 
     545              48 :     let storage = REMOTE_STORAGE
     546              48 :         .get()
     547              48 :         .expect("failed to get remote storage")
     548              48 :         .as_ref()
     549              48 :         .unwrap();
     550              48 : 
     551              48 :     let relative_dst_path =
     552              48 :         Utf8Path::new(&dst_ttid.tenant_id.to_string()).join(dst_ttid.timeline_id.to_string());
     553                 : 
     554              48 :     let remote_path = RemotePath::new(&relative_dst_path)?;
     555                 : 
     556             190 :     let files = storage.list_files(Some(&remote_path)).await?;
     557              48 :     let uploaded_segments = &files
     558              48 :         .iter()
     559              48 :         .filter_map(|file| file.object_name().map(ToOwned::to_owned))
     560              48 :         .collect::<HashSet<_>>();
     561                 : 
     562 UBC           0 :     debug!(
     563               0 :         "these segments have already been uploaded: {:?}",
     564               0 :         uploaded_segments
     565               0 :     );
     566                 : 
     567 CBC          48 :     let relative_src_path =
     568              48 :         Utf8Path::new(&src_ttid.tenant_id.to_string()).join(src_ttid.timeline_id.to_string());
     569                 : 
     570              48 :     for segno in from_segment..to_segment {
     571              36 :         if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
     572 UBC           0 :             info!("copied all segments from {} until {}", from_segment, segno);
     573 CBC          36 :         }
     574                 : 
     575              36 :         let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     576              36 :         if uploaded_segments.contains(&segment_name) {
     577              24 :             continue;
     578              12 :         }
     579 UBC           0 :         debug!("copying segment {}", segment_name);
     580                 : 
     581 CBC          12 :         let from = RemotePath::new(&relative_src_path.join(&segment_name))?;
     582              12 :         let to = RemotePath::new(&relative_dst_path.join(&segment_name))?;
     583                 : 
     584              48 :         storage.copy_object(&from, &to).await?;
     585                 :     }
     586                 : 
     587              48 :     info!(
     588              48 :         "finished copying segments from {} until {}",
     589              48 :         from_segment, to_segment
     590              48 :     );
     591              48 :     Ok(())
     592              48 : }
        

Generated by: LCOV version 2.1-beta