LCOV - code coverage report
Current view: top level - safekeeper/src - wal_backup.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 0.0 % 486 0
Test Date: 2024-05-10 13:18:37 Functions: 0.0 % 58 0

            Line data    Source code
       1              : use anyhow::{Context, Result};
       2              : 
       3              : use camino::{Utf8Path, Utf8PathBuf};
       4              : use futures::stream::FuturesOrdered;
       5              : use futures::StreamExt;
       6              : use tokio::task::JoinHandle;
       7              : use tokio_util::sync::CancellationToken;
       8              : use utils::backoff;
       9              : use utils::id::NodeId;
      10              : 
      11              : use std::cmp::min;
      12              : use std::collections::{HashMap, HashSet};
      13              : use std::num::NonZeroU32;
      14              : use std::pin::Pin;
      15              : use std::sync::Arc;
      16              : use std::time::Duration;
      17              : 
      18              : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
      19              : use postgres_ffi::XLogFileName;
      20              : use postgres_ffi::{XLogSegNo, PG_TLI};
      21              : use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata};
      22              : use tokio::fs::File;
      23              : 
      24              : use tokio::select;
      25              : use tokio::sync::mpsc::{self, Receiver, Sender};
      26              : use tokio::sync::watch;
      27              : use tokio::time::sleep;
      28              : use tracing::*;
      29              : 
      30              : use utils::{id::TenantTimelineId, lsn::Lsn};
      31              : 
      32              : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
      33              : use crate::timeline::{PeerInfo, Timeline};
      34              : use crate::{GlobalTimelines, SafeKeeperConf};
      35              : 
      36              : use once_cell::sync::OnceCell;
      37              : 
      38              : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
      39              : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
      40              : 
      41              : /// Default buffer size when interfacing with [`tokio::fs::File`].
      42              : const BUFFER_SIZE: usize = 32 * 1024;
      43              : 
      44              : /// Check whether wal backup is required for timeline. If yes, mark that launcher is
      45              : /// aware of current status and return the timeline.
      46            0 : async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
      47            0 :     match GlobalTimelines::get(ttid).ok() {
      48            0 :         Some(tli) => {
      49            0 :             tli.wal_backup_attend().await;
      50            0 :             Some(tli)
      51              :         }
      52            0 :         None => None,
      53              :     }
      54            0 : }
      55              : 
      56              : struct WalBackupTaskHandle {
      57              :     shutdown_tx: Sender<()>,
      58              :     handle: JoinHandle<()>,
      59              : }
      60              : 
      61              : struct WalBackupTimelineEntry {
      62              :     timeline: Arc<Timeline>,
      63              :     handle: Option<WalBackupTaskHandle>,
      64              : }
      65              : 
      66            0 : async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
      67            0 :     if let Some(wb_handle) = entry.handle.take() {
      68              :         // Tell the task to shutdown. Error means task exited earlier, that's ok.
      69            0 :         let _ = wb_handle.shutdown_tx.send(()).await;
      70              :         // Await the task itself. TODO: restart panicked tasks earlier.
      71            0 :         if let Err(e) = wb_handle.handle.await {
      72            0 :             warn!("WAL backup task for {} panicked: {}", ttid, e);
      73            0 :         }
      74            0 :     }
      75            0 : }
      76              : 
      77              : /// The goal is to ensure that normally only one safekeepers offloads. However,
      78              : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
      79              : /// time we have several ones as they PUT the same files. Also,
      80              : /// - frequently changing the offloader would be bad;
      81              : /// - electing seriously lagging safekeeper is undesirable;
      82              : /// So we deterministically choose among the reasonably caught up candidates.
      83              : /// TODO: take into account failed attempts to deal with hypothetical situation
      84              : /// where s3 is unreachable only for some sks.
      85            0 : fn determine_offloader(
      86            0 :     alive_peers: &[PeerInfo],
      87            0 :     wal_backup_lsn: Lsn,
      88            0 :     ttid: TenantTimelineId,
      89            0 :     conf: &SafeKeeperConf,
      90            0 : ) -> (Option<NodeId>, String) {
      91            0 :     // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
      92            0 :     let capable_peers = alive_peers
      93            0 :         .iter()
      94            0 :         .filter(|p| p.local_start_lsn <= wal_backup_lsn);
      95            0 :     match capable_peers.clone().map(|p| p.commit_lsn).max() {
      96            0 :         None => (None, "no connected peers to elect from".to_string()),
      97            0 :         Some(max_commit_lsn) => {
      98            0 :             let threshold = max_commit_lsn
      99            0 :                 .checked_sub(conf.max_offloader_lag_bytes)
     100            0 :                 .unwrap_or(Lsn(0));
     101            0 :             let mut caughtup_peers = capable_peers
     102            0 :                 .clone()
     103            0 :                 .filter(|p| p.commit_lsn >= threshold)
     104            0 :                 .collect::<Vec<_>>();
     105            0 :             caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
     106            0 : 
     107            0 :             // To distribute the load, shift by timeline_id.
     108            0 :             let offloader = caughtup_peers
     109            0 :                 [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
     110            0 :                 .sk_id;
     111            0 : 
     112            0 :             let mut capable_peers_dbg = capable_peers
     113            0 :                 .map(|p| (p.sk_id, p.commit_lsn))
     114            0 :                 .collect::<Vec<_>>();
     115            0 :             capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
     116            0 :             (
     117            0 :                 Some(offloader),
     118            0 :                 format!(
     119            0 :                     "elected {} among {:?} peers, with {} of them being caughtup",
     120            0 :                     offloader,
     121            0 :                     capable_peers_dbg,
     122            0 :                     caughtup_peers.len()
     123            0 :                 ),
     124            0 :             )
     125              :         }
     126              :     }
     127            0 : }
     128              : 
     129              : /// Based on peer information determine which safekeeper should offload; if it
     130              : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
     131              : /// is running, kill it.
     132            0 : async fn update_task(
     133            0 :     conf: &SafeKeeperConf,
     134            0 :     ttid: TenantTimelineId,
     135            0 :     entry: &mut WalBackupTimelineEntry,
     136            0 : ) {
     137            0 :     let alive_peers = entry.timeline.get_peers(conf).await;
     138            0 :     let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
     139            0 :     let (offloader, election_dbg_str) =
     140            0 :         determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
     141            0 :     let elected_me = Some(conf.my_id) == offloader;
     142            0 : 
     143            0 :     if elected_me != (entry.handle.is_some()) {
     144            0 :         if elected_me {
     145            0 :             info!("elected for backup: {}", election_dbg_str);
     146              : 
     147            0 :             let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
     148            0 :             let timeline_dir = conf.timeline_dir(&ttid);
     149            0 : 
     150            0 :             let handle = tokio::spawn(
     151            0 :                 backup_task_main(
     152            0 :                     ttid,
     153            0 :                     timeline_dir,
     154            0 :                     conf.workdir.clone(),
     155            0 :                     conf.backup_parallel_jobs,
     156            0 :                     shutdown_rx,
     157            0 :                 )
     158            0 :                 .in_current_span(),
     159            0 :             );
     160            0 : 
     161            0 :             entry.handle = Some(WalBackupTaskHandle {
     162            0 :                 shutdown_tx,
     163            0 :                 handle,
     164            0 :             });
     165              :         } else {
     166            0 :             info!("stepping down from backup: {}", election_dbg_str);
     167            0 :             shut_down_task(ttid, entry).await;
     168              :         }
     169            0 :     }
     170            0 : }
     171              : 
     172              : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
     173              : 
     174              : // Storage must be configured and initialized when this is called.
     175            0 : fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
     176            0 :     REMOTE_STORAGE
     177            0 :         .get()
     178            0 :         .expect("failed to get remote storage")
     179            0 :         .as_ref()
     180            0 :         .unwrap()
     181            0 : }
     182              : 
     183            0 : pub fn init_remote_storage(conf: &SafeKeeperConf) {
     184            0 :     // TODO: refactor REMOTE_STORAGE to avoid using global variables, and provide
     185            0 :     // dependencies to all tasks instead.
     186            0 :     REMOTE_STORAGE.get_or_init(|| {
     187            0 :         conf.remote_storage
     188            0 :             .as_ref()
     189            0 :             .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
     190            0 :     });
     191            0 : }
     192              : 
     193              : const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
     194              : 
     195              : /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
     196              : /// tasks. Having this in separate task simplifies locking, allows to reap
     197              : /// panics and separate elections from offloading itself.
     198            0 : pub async fn wal_backup_launcher_task_main(
     199            0 :     conf: SafeKeeperConf,
     200            0 :     mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
     201            0 : ) -> anyhow::Result<()> {
     202            0 :     info!(
     203            0 :         "WAL backup launcher started, remote config {:?}",
     204              :         conf.remote_storage
     205              :     );
     206              : 
     207              :     // Presence in this map means launcher is aware s3 offloading is needed for
     208              :     // the timeline, but task is started only if it makes sense for to offload
     209              :     // from this safekeeper.
     210            0 :     let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
     211            0 : 
     212            0 :     let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
     213            0 :     loop {
     214            0 :         tokio::select! {
     215              :             ttid = wal_backup_launcher_rx.recv() => {
     216              :                 // channel is never expected to get closed
     217              :                 let ttid = ttid.unwrap();
     218              :                 if !conf.is_wal_backup_enabled() {
     219              :                     continue; /* just drain the channel and do nothing */
     220              :                 }
     221            0 :                 async {
     222            0 :                     let timeline = is_wal_backup_required(ttid).await;
     223              :                     // do we need to do anything at all?
     224            0 :                     if timeline.is_some() != tasks.contains_key(&ttid) {
     225            0 :                         if let Some(timeline) = timeline {
     226              :                             // need to start the task
     227            0 :                             let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
     228            0 :                                 timeline,
     229            0 :                                 handle: None,
     230            0 :                             });
     231            0 :                             update_task(&conf, ttid, entry).await;
     232              :                         } else {
     233              :                             // need to stop the task
     234            0 :                             info!("stopping WAL backup task");
     235            0 :                             let mut entry = tasks.remove(&ttid).unwrap();
     236            0 :                             shut_down_task(ttid, &mut entry).await;
     237              :                         }
     238            0 :                     }
     239            0 :                 }.instrument(info_span!("WAL backup", ttid = %ttid)).await;
     240              :             }
     241              :             // For each timeline needing offloading, check if this safekeeper
     242              :             // should do the job and start/stop the task accordingly.
     243              :             _ = ticker.tick() => {
     244              :                 for (ttid, entry) in tasks.iter_mut() {
     245              :                     update_task(&conf, *ttid, entry)
     246              :                         .instrument(info_span!("WAL backup", ttid = %ttid))
     247              :                         .await;
     248              :                 }
     249              :             }
     250            0 :         }
     251            0 :     }
     252              : }
     253              : 
     254              : struct WalBackupTask {
     255              :     timeline: Arc<Timeline>,
     256              :     timeline_dir: Utf8PathBuf,
     257              :     workspace_dir: Utf8PathBuf,
     258              :     wal_seg_size: usize,
     259              :     parallel_jobs: usize,
     260              :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     261              : }
     262              : 
     263              : /// Offload single timeline.
     264            0 : async fn backup_task_main(
     265            0 :     ttid: TenantTimelineId,
     266            0 :     timeline_dir: Utf8PathBuf,
     267            0 :     workspace_dir: Utf8PathBuf,
     268            0 :     parallel_jobs: usize,
     269            0 :     mut shutdown_rx: Receiver<()>,
     270            0 : ) {
     271            0 :     info!("started");
     272            0 :     let res = GlobalTimelines::get(ttid);
     273            0 :     if let Err(e) = res {
     274            0 :         error!("backup error: {}", e);
     275            0 :         return;
     276            0 :     }
     277            0 :     let tli = res.unwrap();
     278              : 
     279            0 :     let mut wb = WalBackupTask {
     280            0 :         wal_seg_size: tli.get_wal_seg_size().await,
     281            0 :         commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
     282            0 :         timeline: tli,
     283            0 :         timeline_dir,
     284            0 :         workspace_dir,
     285            0 :         parallel_jobs,
     286            0 :     };
     287            0 : 
     288            0 :     // task is spinned up only when wal_seg_size already initialized
     289            0 :     assert!(wb.wal_seg_size > 0);
     290              : 
     291            0 :     let mut canceled = false;
     292              :     select! {
     293              :         _ = wb.run() => {}
     294              :         _ = shutdown_rx.recv() => {
     295              :             canceled = true;
     296              :         }
     297              :     }
     298            0 :     info!("task {}", if canceled { "canceled" } else { "terminated" });
     299            0 : }
     300              : 
     301              : impl WalBackupTask {
     302            0 :     async fn run(&mut self) {
     303            0 :         let mut backup_lsn = Lsn(0);
     304            0 : 
     305            0 :         let mut retry_attempt = 0u32;
     306              :         // offload loop
     307            0 :         loop {
     308            0 :             if retry_attempt == 0 {
     309              :                 // wait for new WAL to arrive
     310            0 :                 if let Err(e) = self.commit_lsn_watch_rx.changed().await {
     311              :                     // should never happen, as we hold Arc to timeline.
     312            0 :                     error!("commit_lsn watch shut down: {:?}", e);
     313            0 :                     return;
     314            0 :                 }
     315              :             } else {
     316              :                 // or just sleep if we errored previously
     317            0 :                 let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
     318            0 :                 if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
     319            0 :                 {
     320            0 :                     retry_delay = min(retry_delay, backoff_delay);
     321            0 :                 }
     322            0 :                 sleep(Duration::from_millis(retry_delay)).await;
     323              :             }
     324              : 
     325            0 :             let commit_lsn = *self.commit_lsn_watch_rx.borrow();
     326            0 : 
     327            0 :             // Note that backup_lsn can be higher than commit_lsn if we
     328            0 :             // don't have much local WAL and others already uploaded
     329            0 :             // segments we don't even have.
     330            0 :             if backup_lsn.segment_number(self.wal_seg_size)
     331            0 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     332              :             {
     333            0 :                 retry_attempt = 0;
     334            0 :                 continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
     335            0 :             }
     336            0 :             // Perhaps peers advanced the position, check shmem value.
     337            0 :             backup_lsn = self.timeline.get_wal_backup_lsn().await;
     338            0 :             if backup_lsn.segment_number(self.wal_seg_size)
     339            0 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     340              :             {
     341            0 :                 retry_attempt = 0;
     342            0 :                 continue;
     343            0 :             }
     344            0 : 
     345            0 :             match backup_lsn_range(
     346            0 :                 &self.timeline,
     347            0 :                 &mut backup_lsn,
     348            0 :                 commit_lsn,
     349            0 :                 self.wal_seg_size,
     350            0 :                 &self.timeline_dir,
     351            0 :                 &self.workspace_dir,
     352            0 :                 self.parallel_jobs,
     353            0 :             )
     354            0 :             .await
     355              :             {
     356            0 :                 Ok(()) => {
     357            0 :                     retry_attempt = 0;
     358            0 :                 }
     359            0 :                 Err(e) => {
     360            0 :                     error!(
     361            0 :                         "failed while offloading range {}-{}: {:?}",
     362              :                         backup_lsn, commit_lsn, e
     363              :                     );
     364              : 
     365            0 :                     retry_attempt = retry_attempt.saturating_add(1);
     366              :                 }
     367              :             }
     368              :         }
     369            0 :     }
     370              : }
     371              : 
     372            0 : async fn backup_lsn_range(
     373            0 :     timeline: &Arc<Timeline>,
     374            0 :     backup_lsn: &mut Lsn,
     375            0 :     end_lsn: Lsn,
     376            0 :     wal_seg_size: usize,
     377            0 :     timeline_dir: &Utf8Path,
     378            0 :     workspace_dir: &Utf8Path,
     379            0 :     parallel_jobs: usize,
     380            0 : ) -> Result<()> {
     381            0 :     if parallel_jobs < 1 {
     382            0 :         anyhow::bail!("parallel_jobs must be >= 1");
     383            0 :     }
     384            0 : 
     385            0 :     let start_lsn = *backup_lsn;
     386            0 :     let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
     387            0 : 
     388            0 :     // Pool of concurrent upload tasks. We use `FuturesOrdered` to
     389            0 :     // preserve order of uploads, and update `backup_lsn` only after
     390            0 :     // all previous uploads are finished.
     391            0 :     let mut uploads = FuturesOrdered::new();
     392            0 :     let mut iter = segments.iter();
     393              : 
     394              :     loop {
     395            0 :         let added_task = match iter.next() {
     396            0 :             Some(s) => {
     397            0 :                 uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
     398            0 :                 true
     399              :             }
     400            0 :             None => false,
     401              :         };
     402              : 
     403              :         // Wait for the next segment to upload if we don't have any more segments,
     404              :         // or if we have too many concurrent uploads.
     405            0 :         if !added_task || uploads.len() >= parallel_jobs {
     406            0 :             let next = uploads.next().await;
     407            0 :             if let Some(res) = next {
     408              :                 // next segment uploaded
     409            0 :                 let segment = res?;
     410            0 :                 let new_backup_lsn = segment.end_lsn;
     411            0 :                 timeline
     412            0 :                     .set_wal_backup_lsn(new_backup_lsn)
     413            0 :                     .await
     414            0 :                     .context("setting wal_backup_lsn")?;
     415            0 :                 *backup_lsn = new_backup_lsn;
     416              :             } else {
     417              :                 // no more segments to upload
     418            0 :                 break;
     419              :             }
     420            0 :         }
     421              :     }
     422              : 
     423            0 :     info!(
     424            0 :         "offloaded segnos {:?} up to {}, previous backup_lsn {}",
     425            0 :         segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
     426              :         end_lsn,
     427              :         start_lsn,
     428              :     );
     429            0 :     Ok(())
     430            0 : }
     431              : 
     432            0 : async fn backup_single_segment(
     433            0 :     seg: &Segment,
     434            0 :     timeline_dir: &Utf8Path,
     435            0 :     workspace_dir: &Utf8Path,
     436            0 : ) -> Result<Segment> {
     437            0 :     let segment_file_path = seg.file_path(timeline_dir)?;
     438            0 :     let remote_segment_path = segment_file_path
     439            0 :         .strip_prefix(workspace_dir)
     440            0 :         .context("Failed to strip workspace dir prefix")
     441            0 :         .and_then(RemotePath::new)
     442            0 :         .with_context(|| {
     443            0 :             format!(
     444            0 :                 "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
     445            0 :             )
     446            0 :         })?;
     447              : 
     448            0 :     let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
     449            0 :     if res.is_ok() {
     450            0 :         BACKED_UP_SEGMENTS.inc();
     451            0 :     } else {
     452            0 :         BACKUP_ERRORS.inc();
     453            0 :     }
     454            0 :     res?;
     455            0 :     debug!("Backup of {} done", segment_file_path);
     456              : 
     457            0 :     Ok(*seg)
     458            0 : }
     459              : 
     460              : #[derive(Debug, Copy, Clone)]
     461              : pub struct Segment {
     462              :     seg_no: XLogSegNo,
     463              :     start_lsn: Lsn,
     464              :     end_lsn: Lsn,
     465              : }
     466              : 
     467              : impl Segment {
     468            0 :     pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
     469            0 :         Self {
     470            0 :             seg_no,
     471            0 :             start_lsn,
     472            0 :             end_lsn,
     473            0 :         }
     474            0 :     }
     475              : 
     476            0 :     pub fn object_name(self) -> String {
     477            0 :         XLogFileName(PG_TLI, self.seg_no, self.size())
     478            0 :     }
     479              : 
     480            0 :     pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
     481            0 :         Ok(timeline_dir.join(self.object_name()))
     482            0 :     }
     483              : 
     484            0 :     pub fn size(self) -> usize {
     485            0 :         (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
     486            0 :     }
     487              : }
     488              : 
     489            0 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
     490            0 :     let first_seg = start.segment_number(seg_size);
     491            0 :     let last_seg = end.segment_number(seg_size);
     492            0 : 
     493            0 :     let res: Vec<Segment> = (first_seg..last_seg)
     494            0 :         .map(|s| {
     495            0 :             let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
     496            0 :             let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
     497            0 :             Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
     498            0 :         })
     499            0 :         .collect();
     500            0 :     res
     501            0 : }
     502              : 
     503            0 : async fn backup_object(
     504            0 :     source_file: &Utf8Path,
     505            0 :     target_file: &RemotePath,
     506            0 :     size: usize,
     507            0 : ) -> Result<()> {
     508            0 :     let storage = get_configured_remote_storage();
     509              : 
     510            0 :     let file = File::open(&source_file)
     511            0 :         .await
     512            0 :         .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
     513              : 
     514            0 :     let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
     515            0 : 
     516            0 :     let cancel = CancellationToken::new();
     517            0 : 
     518            0 :     storage
     519            0 :         .upload_storage_object(file, size, target_file, &cancel)
     520            0 :         .await
     521            0 : }
     522              : 
     523            0 : pub(crate) async fn backup_partial_segment(
     524            0 :     source_file: &Utf8Path,
     525            0 :     target_file: &RemotePath,
     526            0 :     size: usize,
     527            0 : ) -> Result<()> {
     528            0 :     let storage = get_configured_remote_storage();
     529              : 
     530            0 :     let file = File::open(&source_file)
     531            0 :         .await
     532            0 :         .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
     533              : 
     534              :     // limiting the file to read only the first `size` bytes
     535            0 :     let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);
     536            0 : 
     537            0 :     let file = tokio_util::io::ReaderStream::with_capacity(limited_file, BUFFER_SIZE);
     538            0 : 
     539            0 :     let cancel = CancellationToken::new();
     540            0 : 
     541            0 :     storage
     542            0 :         .upload(
     543            0 :             file,
     544            0 :             size,
     545            0 :             target_file,
     546            0 :             Some(StorageMetadata::from([("sk_type", "partial_segment")])),
     547            0 :             &cancel,
     548            0 :         )
     549            0 :         .await
     550            0 : }
     551              : 
     552            0 : pub async fn read_object(
     553            0 :     file_path: &RemotePath,
     554            0 :     offset: u64,
     555            0 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
     556            0 :     let storage = REMOTE_STORAGE
     557            0 :         .get()
     558            0 :         .context("Failed to get remote storage")?
     559            0 :         .as_ref()
     560            0 :         .context("No remote storage configured")?;
     561              : 
     562            0 :     info!("segment download about to start from remote path {file_path:?} at offset {offset}");
     563              : 
     564            0 :     let cancel = CancellationToken::new();
     565              : 
     566            0 :     let download = storage
     567            0 :         .download_storage_object(Some((offset, None)), file_path, &cancel)
     568            0 :         .await
     569            0 :         .with_context(|| {
     570            0 :             format!("Failed to open WAL segment download stream for remote path {file_path:?}")
     571            0 :         })?;
     572              : 
     573            0 :     let reader = tokio_util::io::StreamReader::new(download.download_stream);
     574            0 : 
     575            0 :     let reader = tokio::io::BufReader::with_capacity(BUFFER_SIZE, reader);
     576            0 : 
     577            0 :     Ok(Box::pin(reader))
     578            0 : }
     579              : 
     580              : /// Delete WAL files for the given timeline. Remote storage must be configured
     581              : /// when called.
     582            0 : pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
     583            0 :     let storage = get_configured_remote_storage();
     584            0 :     let ttid_path = Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string());
     585            0 :     let remote_path = RemotePath::new(&ttid_path)?;
     586              : 
     587              :     // see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
     588              :     // const Option unwrap is not stable, otherwise it would be const.
     589            0 :     let batch_size: NonZeroU32 = NonZeroU32::new(1000).unwrap();
     590            0 : 
     591            0 :     // A backoff::retry is used here for two reasons:
     592            0 :     // - To provide a backoff rather than busy-polling the API on errors
     593            0 :     // - To absorb transient 429/503 conditions without hitting our error
     594            0 :     //   logging path for issues deleting objects.
     595            0 :     //
     596            0 :     // Note: listing segments might take a long time if there are many of them.
     597            0 :     // We don't currently have http requests timeout cancellation, but if/once
     598            0 :     // we have listing should get streaming interface to make progress.
     599            0 : 
     600            0 :     let cancel = CancellationToken::new(); // not really used
     601            0 :     backoff::retry(
     602            0 :         || async {
     603            0 :             // Do list-delete in batch_size batches to make progress even if there a lot of files.
     604            0 :             // Alternatively we could make remote storage list return iterator, but it is more complicated and
     605            0 :             // I'm not sure deleting while iterating is expected in s3.
     606            0 :             loop {
     607            0 :                 let files = storage
     608            0 :                     .list(
     609            0 :                         Some(&remote_path),
     610            0 :                         ListingMode::NoDelimiter,
     611            0 :                         Some(batch_size),
     612            0 :                         &cancel,
     613            0 :                     )
     614            0 :                     .await?
     615            0 :                     .keys;
     616            0 :                 if files.is_empty() {
     617            0 :                     return Ok(()); // done
     618            0 :                 }
     619            0 :                 // (at least) s3 results are sorted, so can log min/max:
     620            0 :                 // "List results are always returned in UTF-8 binary order."
     621            0 :                 info!(
     622            0 :                     "deleting batch of {} WAL segments [{}-{}]",
     623            0 :                     files.len(),
     624            0 :                     files.first().unwrap().object_name().unwrap_or(""),
     625            0 :                     files.last().unwrap().object_name().unwrap_or("")
     626            0 :                 );
     627            0 :                 storage.delete_objects(&files, &cancel).await?;
     628            0 :             }
     629            0 :         },
     630            0 :         // consider TimeoutOrCancel::caused_by_cancel when using cancellation
     631            0 :         |_| false,
     632            0 :         3,
     633            0 :         10,
     634            0 :         "executing WAL segments deletion batch",
     635            0 :         &cancel,
     636            0 :     )
     637            0 :     .await
     638            0 :     .ok_or_else(|| anyhow::anyhow!("canceled"))
     639            0 :     .and_then(|x| x)?;
     640              : 
     641            0 :     Ok(())
     642            0 : }
     643              : 
     644              : /// Used by wal_backup_partial.
     645            0 : pub async fn delete_objects(paths: &[RemotePath]) -> Result<()> {
     646            0 :     let cancel = CancellationToken::new(); // not really used
     647            0 :     let storage = get_configured_remote_storage();
     648            0 :     storage.delete_objects(paths, &cancel).await
     649            0 : }
     650              : 
     651              : /// Copy segments from one timeline to another. Used in copy_timeline.
     652            0 : pub async fn copy_s3_segments(
     653            0 :     wal_seg_size: usize,
     654            0 :     src_ttid: &TenantTimelineId,
     655            0 :     dst_ttid: &TenantTimelineId,
     656            0 :     from_segment: XLogSegNo,
     657            0 :     to_segment: XLogSegNo,
     658            0 : ) -> Result<()> {
     659            0 :     const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
     660            0 : 
     661            0 :     let storage = REMOTE_STORAGE
     662            0 :         .get()
     663            0 :         .expect("failed to get remote storage")
     664            0 :         .as_ref()
     665            0 :         .unwrap();
     666            0 : 
     667            0 :     let relative_dst_path =
     668            0 :         Utf8Path::new(&dst_ttid.tenant_id.to_string()).join(dst_ttid.timeline_id.to_string());
     669              : 
     670            0 :     let remote_path = RemotePath::new(&relative_dst_path)?;
     671              : 
     672            0 :     let cancel = CancellationToken::new();
     673              : 
     674            0 :     let files = storage
     675            0 :         .list(Some(&remote_path), ListingMode::NoDelimiter, None, &cancel)
     676            0 :         .await?
     677              :         .keys;
     678              : 
     679            0 :     let uploaded_segments = &files
     680            0 :         .iter()
     681            0 :         .filter_map(|file| file.object_name().map(ToOwned::to_owned))
     682            0 :         .collect::<HashSet<_>>();
     683            0 : 
     684            0 :     debug!(
     685            0 :         "these segments have already been uploaded: {:?}",
     686              :         uploaded_segments
     687              :     );
     688              : 
     689            0 :     let relative_src_path =
     690            0 :         Utf8Path::new(&src_ttid.tenant_id.to_string()).join(src_ttid.timeline_id.to_string());
     691              : 
     692            0 :     for segno in from_segment..to_segment {
     693            0 :         if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
     694            0 :             info!("copied all segments from {} until {}", from_segment, segno);
     695            0 :         }
     696              : 
     697            0 :         let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     698            0 :         if uploaded_segments.contains(&segment_name) {
     699            0 :             continue;
     700            0 :         }
     701            0 :         debug!("copying segment {}", segment_name);
     702              : 
     703            0 :         let from = RemotePath::new(&relative_src_path.join(&segment_name))?;
     704            0 :         let to = RemotePath::new(&relative_dst_path.join(&segment_name))?;
     705              : 
     706            0 :         storage.copy_object(&from, &to, &cancel).await?;
     707              :     }
     708              : 
     709            0 :     info!(
     710            0 :         "finished copying segments from {} until {}",
     711              :         from_segment, to_segment
     712              :     );
     713            0 :     Ok(())
     714            0 : }
        

Generated by: LCOV version 2.1-beta