LCOV - code coverage report
Current view: top level - safekeeper/src - wal_backup.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 4.6 % 437 20
Test Date: 2025-07-16 12:29:03 Functions: 9.3 % 54 5

            Line data    Source code
       1              : use std::cmp::min;
       2              : use std::collections::HashSet;
       3              : use std::num::NonZeroU32;
       4              : use std::pin::Pin;
       5              : use std::sync::Arc;
       6              : use std::time::Duration;
       7              : 
       8              : use anyhow::{Context, Result};
       9              : use camino::{Utf8Path, Utf8PathBuf};
      10              : use futures::StreamExt;
      11              : use futures::stream::FuturesOrdered;
      12              : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
      13              : use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
      14              : use remote_storage::{
      15              :     DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, StorageMetadata,
      16              : };
      17              : use safekeeper_api::models::PeerInfo;
      18              : use tokio::fs::File;
      19              : use tokio::select;
      20              : use tokio::sync::mpsc::{self, Receiver, Sender};
      21              : use tokio::sync::watch;
      22              : use tokio::task::JoinHandle;
      23              : use tokio_util::sync::CancellationToken;
      24              : use tracing::*;
      25              : use utils::id::{NodeId, TenantTimelineId};
      26              : use utils::lsn::Lsn;
      27              : use utils::{backoff, pausable_failpoint};
      28              : 
      29              : use crate::metrics::{
      30              :     BACKED_UP_SEGMENTS, BACKUP_ERRORS, BACKUP_REELECT_LEADER_COUNT, WAL_BACKUP_TASKS,
      31              : };
      32              : use crate::timeline::WalResidentTimeline;
      33              : use crate::timeline_manager::{Manager, StateSnapshot};
      34              : use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME};
      35              : 
      36              : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
      37              : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
      38              : 
      39              : /// Default buffer size when interfacing with [`tokio::fs::File`].
      40              : const BUFFER_SIZE: usize = 32 * 1024;
      41              : 
      42              : pub struct WalBackupTaskHandle {
      43              :     shutdown_tx: Sender<()>,
      44              :     handle: JoinHandle<()>,
      45              : }
      46              : 
      47              : impl WalBackupTaskHandle {
      48            0 :     pub(crate) async fn join(self) {
      49            0 :         if let Err(e) = self.handle.await {
      50            0 :             error!("WAL backup task panicked: {}", e);
      51            0 :         }
      52            0 :     }
      53              : }
      54              : 
      55              : /// Do we have anything to upload to S3, i.e. should safekeepers run backup activity?
      56           50 : pub(crate) fn is_wal_backup_required(
      57           50 :     wal_seg_size: usize,
      58           50 :     num_computes: usize,
      59           50 :     state: &StateSnapshot,
      60           50 : ) -> bool {
      61           50 :     num_computes > 0 ||
      62              :     // Currently only the whole segment is offloaded, so compare segment numbers.
      63           24 :     (state.commit_lsn.segment_number(wal_seg_size) > state.backup_lsn.segment_number(wal_seg_size))
      64           50 : }
      65              : 
      66              : /// Based on peer information determine which safekeeper should offload; if it
      67              : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
      68              : /// is running, kill it.
      69            0 : pub(crate) async fn update_task(
      70            0 :     mgr: &mut Manager,
      71            0 :     storage: Arc<GenericRemoteStorage>,
      72            0 :     need_backup: bool,
      73            0 :     state: &StateSnapshot,
      74            0 : ) {
      75              :     /* BEGIN_HADRON */
      76            0 :     let (offloader, election_dbg_str) = hadron_determine_offloader(mgr, state);
      77              :     /* END_HADRON */
      78            0 :     let elected_me = Some(mgr.conf.my_id) == offloader;
      79              : 
      80            0 :     let should_task_run = need_backup && elected_me;
      81              : 
      82              :     // start or stop the task
      83            0 :     if should_task_run != (mgr.backup_task.is_some()) {
      84            0 :         if should_task_run {
      85            0 :             info!("elected for backup: {}", election_dbg_str);
      86              : 
      87            0 :             let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
      88              : 
      89            0 :             let Ok(resident) = mgr.wal_resident_timeline() else {
      90            0 :                 info!("Timeline shut down");
      91            0 :                 return;
      92              :             };
      93              : 
      94            0 :             let async_task = backup_task_main(
      95            0 :                 resident,
      96            0 :                 storage,
      97            0 :                 mgr.conf.backup_parallel_jobs,
      98            0 :                 shutdown_rx,
      99              :             );
     100              : 
     101            0 :             let handle = if mgr.conf.current_thread_runtime {
     102            0 :                 tokio::spawn(async_task)
     103              :             } else {
     104            0 :                 WAL_BACKUP_RUNTIME.spawn(async_task)
     105              :             };
     106              : 
     107            0 :             mgr.backup_task = Some(WalBackupTaskHandle {
     108            0 :                 shutdown_tx,
     109            0 :                 handle,
     110            0 :             });
     111              :         } else {
     112            0 :             if !need_backup {
     113              :                 // don't need backup at all
     114            0 :                 info!("stepping down from backup, need_backup={}", need_backup);
     115              :             } else {
     116              :                 // someone else has been elected
     117            0 :                 info!("stepping down from backup: {}", election_dbg_str);
     118              :             }
     119            0 :             shut_down_task(&mut mgr.backup_task).await;
     120              :         }
     121            0 :     }
     122            0 : }
     123              : 
     124            0 : async fn shut_down_task(entry: &mut Option<WalBackupTaskHandle>) {
     125            0 :     if let Some(wb_handle) = entry.take() {
     126              :         // Tell the task to shutdown. Error means task exited earlier, that's ok.
     127            0 :         let _ = wb_handle.shutdown_tx.send(()).await;
     128              :         // Await the task itself. TODO: restart panicked tasks earlier.
     129            0 :         wb_handle.join().await;
     130            0 :     }
     131            0 : }
     132              : 
     133              : /* BEGIN_HADRON */
     134              : // On top of the neon determine_offloader, we also check if the current offloader is lagging behind too much.
     135              : // If it is, we re-elect a new offloader. This mitigates the below issue. It also helps distribute the load across SKs.
     136              : //
     137              : // We observe that the offloader fails to upload a segment due to race conditions on XLOG SWITCH and PG start streaming WALs.
     138              : // wal_backup task continously failing to upload a full segment while the segment remains partial on the disk.
     139              : // The consequence is that commit_lsn for all SKs move forward but backup_lsn stays the same. Then, all SKs run out of disk space.
     140              : // See go/sk-ood-xlog-switch for more details.
     141              : //
     142              : // To mitigate this issue, we will re-elect a new offloader if the current offloader is lagging behind too much.
     143              : // Each SK makes the decision locally but they are aware of each other's commit and backup lsns.
     144              : //
     145              : // determine_offloader will pick a SK. say SK-1.
     146              : // Each SK checks
     147              : // -- if commit_lsn - back_lsn > threshold,
     148              : // -- -- remove SK-1 from the candidate and call determine_offloader again.
     149              : // SK-1 will step down and all SKs will elect the same leader again.
     150              : // After the backup is caught up, the leader will become SK-1 again.
     151            0 : fn hadron_determine_offloader(mgr: &Manager, state: &StateSnapshot) -> (Option<NodeId>, String) {
     152              :     let mut offloader: Option<NodeId>;
     153              :     let mut election_dbg_str: String;
     154              :     let caughtup_peers_count: usize;
     155            0 :     (offloader, election_dbg_str, caughtup_peers_count) =
     156            0 :         determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
     157              : 
     158            0 :     if offloader.is_none()
     159            0 :         || caughtup_peers_count <= 1
     160            0 :         || mgr.conf.max_reelect_offloader_lag_bytes == 0
     161              :     {
     162            0 :         return (offloader, election_dbg_str);
     163            0 :     }
     164              : 
     165            0 :     let offloader_sk_id = offloader.unwrap();
     166              : 
     167            0 :     let backup_lag = state.commit_lsn.checked_sub(state.backup_lsn);
     168            0 :     if backup_lag.is_none() {
     169            0 :         debug!("Backup lag is None. Skipping re-election.");
     170            0 :         return (offloader, election_dbg_str);
     171            0 :     }
     172              : 
     173            0 :     let backup_lag = backup_lag.unwrap().0;
     174              : 
     175            0 :     if backup_lag < mgr.conf.max_reelect_offloader_lag_bytes {
     176            0 :         return (offloader, election_dbg_str);
     177            0 :     }
     178              : 
     179            0 :     info!(
     180            0 :         "Electing a new leader: Backup lag is too high backup lsn lag {} threshold {}: {}",
     181              :         backup_lag, mgr.conf.max_reelect_offloader_lag_bytes, election_dbg_str
     182              :     );
     183            0 :     BACKUP_REELECT_LEADER_COUNT.inc();
     184              :     // Remove the current offloader if lag is too high.
     185            0 :     let new_peers: Vec<_> = state
     186            0 :         .peers
     187            0 :         .iter()
     188            0 :         .filter(|p| p.sk_id != offloader_sk_id)
     189            0 :         .cloned()
     190            0 :         .collect();
     191            0 :     (offloader, election_dbg_str, _) =
     192            0 :         determine_offloader(&new_peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
     193            0 :     (offloader, election_dbg_str)
     194            0 : }
     195              : /* END_HADRON */
     196              : 
     197              : /// The goal is to ensure that normally only one safekeepers offloads. However,
     198              : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
     199              : /// time we have several ones as they PUT the same files. Also,
     200              : /// - frequently changing the offloader would be bad;
     201              : /// - electing seriously lagging safekeeper is undesirable;
     202              : ///
     203              : /// So we deterministically choose among the reasonably caught up candidates.
     204              : /// TODO: take into account failed attempts to deal with hypothetical situation
     205              : /// where s3 is unreachable only for some sks.
     206            0 : fn determine_offloader(
     207            0 :     alive_peers: &[PeerInfo],
     208            0 :     wal_backup_lsn: Lsn,
     209            0 :     ttid: TenantTimelineId,
     210            0 :     conf: &SafeKeeperConf,
     211            0 : ) -> (Option<NodeId>, String, usize) {
     212              :     // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
     213            0 :     let capable_peers = alive_peers
     214            0 :         .iter()
     215            0 :         .filter(|p| p.local_start_lsn <= wal_backup_lsn);
     216            0 :     match capable_peers.clone().map(|p| p.commit_lsn).max() {
     217            0 :         None => (None, "no connected peers to elect from".to_string(), 0),
     218            0 :         Some(max_commit_lsn) => {
     219            0 :             let threshold = max_commit_lsn
     220            0 :                 .checked_sub(conf.max_offloader_lag_bytes)
     221            0 :                 .unwrap_or(Lsn(0));
     222            0 :             let mut caughtup_peers = capable_peers
     223            0 :                 .clone()
     224            0 :                 .filter(|p| p.commit_lsn >= threshold)
     225            0 :                 .collect::<Vec<_>>();
     226            0 :             caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
     227              : 
     228              :             // To distribute the load, shift by timeline_id.
     229            0 :             let offloader = caughtup_peers
     230            0 :                 [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
     231            0 :                 .sk_id;
     232              : 
     233            0 :             let mut capable_peers_dbg = capable_peers
     234            0 :                 .map(|p| (p.sk_id, p.commit_lsn))
     235            0 :                 .collect::<Vec<_>>();
     236            0 :             capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
     237            0 :             (
     238            0 :                 Some(offloader),
     239            0 :                 format!(
     240            0 :                     "elected {} among {:?} peers, with {} of them being caughtup",
     241            0 :                     offloader,
     242            0 :                     capable_peers_dbg,
     243            0 :                     caughtup_peers.len()
     244            0 :                 ),
     245            0 :                 caughtup_peers.len(),
     246            0 :             )
     247              :         }
     248              :     }
     249            0 : }
     250              : 
     251              : pub struct WalBackup {
     252              :     storage: Option<Arc<GenericRemoteStorage>>,
     253              : }
     254              : 
     255              : impl WalBackup {
     256              :     /// Create a new WalBackup instance.
     257            5 :     pub async fn new(conf: &SafeKeeperConf) -> Result<Self> {
     258            5 :         if !conf.wal_backup_enabled {
     259            0 :             return Ok(Self { storage: None });
     260            5 :         }
     261              : 
     262            5 :         match conf.remote_storage.as_ref() {
     263            0 :             Some(config) => {
     264            0 :                 let storage = GenericRemoteStorage::from_config(config).await?;
     265            0 :                 Ok(Self {
     266            0 :                     storage: Some(Arc::new(storage)),
     267            0 :                 })
     268              :             }
     269            5 :             None => Ok(Self { storage: None }),
     270              :         }
     271            5 :     }
     272              : 
     273           99 :     pub fn get_storage(&self) -> Option<Arc<GenericRemoteStorage>> {
     274           99 :         self.storage.clone()
     275           99 :     }
     276              : }
     277              : 
     278              : struct WalBackupTask {
     279              :     timeline: WalResidentTimeline,
     280              :     timeline_dir: Utf8PathBuf,
     281              :     wal_seg_size: usize,
     282              :     parallel_jobs: usize,
     283              :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     284              :     storage: Arc<GenericRemoteStorage>,
     285              : }
     286              : 
     287              : /// Offload single timeline.
     288              : #[instrument(name = "wal_backup", skip_all, fields(ttid = %tli.ttid))]
     289              : async fn backup_task_main(
     290              :     tli: WalResidentTimeline,
     291              :     storage: Arc<GenericRemoteStorage>,
     292              :     parallel_jobs: usize,
     293              :     mut shutdown_rx: Receiver<()>,
     294              : ) {
     295              :     let _guard = WAL_BACKUP_TASKS.guard();
     296              :     info!("started");
     297              : 
     298              :     let cancel = tli.tli.cancel.clone();
     299              :     let mut wb = WalBackupTask {
     300              :         wal_seg_size: tli.get_wal_seg_size().await,
     301              :         commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
     302              :         timeline_dir: tli.get_timeline_dir(),
     303              :         timeline: tli,
     304              :         parallel_jobs,
     305              :         storage,
     306              :     };
     307              : 
     308              :     // task is spinned up only when wal_seg_size already initialized
     309              :     assert!(wb.wal_seg_size > 0);
     310              : 
     311              :     let mut canceled = false;
     312              :     select! {
     313              :         _ = wb.run() => {}
     314              :         _ = shutdown_rx.recv() => {
     315              :             canceled = true;
     316              :         },
     317              :         _ = cancel.cancelled() => {
     318              :             canceled = true;
     319              :         }
     320              :     }
     321              :     info!("task {}", if canceled { "canceled" } else { "terminated" });
     322              : }
     323              : 
     324              : impl WalBackupTask {
     325              :     /// This function must be called from a select! that also respects self.timeline's
     326              :     /// cancellation token.  This is done in [`backup_task_main`].
     327              :     ///
     328              :     /// The future returned by this function is safe to drop at any time because it
     329              :     /// does not write to local disk.
     330            0 :     async fn run(&mut self) {
     331            0 :         let mut backup_lsn = Lsn(0);
     332              : 
     333            0 :         let mut retry_attempt = 0u32;
     334              :         // offload loop
     335            0 :         while !self.timeline.cancel.is_cancelled() {
     336            0 :             if retry_attempt == 0 {
     337              :                 // wait for new WAL to arrive
     338            0 :                 if let Err(e) = self.commit_lsn_watch_rx.changed().await {
     339              :                     // should never happen, as we hold Arc to timeline and transmitter's lifetime
     340              :                     // is within Timeline's
     341            0 :                     error!("commit_lsn watch shut down: {:?}", e);
     342            0 :                     return;
     343            0 :                 };
     344              :             } else {
     345              :                 // or just sleep if we errored previously
     346            0 :                 let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
     347            0 :                 if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
     348            0 :                 {
     349            0 :                     retry_delay = min(retry_delay, backoff_delay);
     350            0 :                 }
     351            0 :                 tokio::time::sleep(Duration::from_millis(retry_delay)).await;
     352              :             }
     353              : 
     354            0 :             let commit_lsn = *self.commit_lsn_watch_rx.borrow();
     355              : 
     356              :             // Note that backup_lsn can be higher than commit_lsn if we
     357              :             // don't have much local WAL and others already uploaded
     358              :             // segments we don't even have.
     359            0 :             if backup_lsn.segment_number(self.wal_seg_size)
     360            0 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     361              :             {
     362            0 :                 retry_attempt = 0;
     363            0 :                 continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
     364            0 :             }
     365              :             // Perhaps peers advanced the position, check shmem value.
     366            0 :             backup_lsn = self.timeline.get_wal_backup_lsn().await;
     367            0 :             if backup_lsn.segment_number(self.wal_seg_size)
     368            0 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     369              :             {
     370            0 :                 retry_attempt = 0;
     371            0 :                 continue;
     372            0 :             }
     373              : 
     374            0 :             match backup_lsn_range(
     375            0 :                 &self.timeline,
     376            0 :                 self.storage.clone(),
     377            0 :                 &mut backup_lsn,
     378            0 :                 commit_lsn,
     379            0 :                 self.wal_seg_size,
     380            0 :                 &self.timeline_dir,
     381            0 :                 self.parallel_jobs,
     382              :             )
     383            0 :             .await
     384              :             {
     385            0 :                 Ok(()) => {
     386            0 :                     retry_attempt = 0;
     387            0 :                 }
     388            0 :                 Err(e) => {
     389              :                     // We might have managed to upload some segment even though
     390              :                     // some later in the range failed, so log backup_lsn
     391              :                     // separately.
     392            0 :                     error!(
     393            0 :                         "failed while offloading range {}-{}, backup_lsn {}: {:?}",
     394              :                         backup_lsn, commit_lsn, backup_lsn, e
     395              :                     );
     396              : 
     397            0 :                     retry_attempt = retry_attempt.saturating_add(1);
     398              :                 }
     399              :             }
     400              :         }
     401            0 :     }
     402              : }
     403              : 
     404            0 : async fn backup_lsn_range(
     405            0 :     timeline: &WalResidentTimeline,
     406            0 :     storage: Arc<GenericRemoteStorage>,
     407            0 :     backup_lsn: &mut Lsn,
     408            0 :     end_lsn: Lsn,
     409            0 :     wal_seg_size: usize,
     410            0 :     timeline_dir: &Utf8Path,
     411            0 :     parallel_jobs: usize,
     412            0 : ) -> Result<()> {
     413            0 :     if parallel_jobs < 1 {
     414            0 :         anyhow::bail!("parallel_jobs must be >= 1");
     415            0 :     }
     416              : 
     417            0 :     pausable_failpoint!("backup-lsn-range-pausable");
     418              : 
     419            0 :     let remote_timeline_path = &timeline.remote_path;
     420            0 :     let start_lsn = *backup_lsn;
     421            0 :     let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
     422              : 
     423            0 :     info!(
     424            0 :         "offloading segnos {:?} of range [{}-{})",
     425            0 :         segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
     426              :         start_lsn,
     427              :         end_lsn,
     428              :     );
     429              : 
     430              :     // Pool of concurrent upload tasks. We use `FuturesOrdered` to
     431              :     // preserve order of uploads, and update `backup_lsn` only after
     432              :     // all previous uploads are finished.
     433            0 :     let mut uploads = FuturesOrdered::new();
     434            0 :     let mut iter = segments.iter();
     435              : 
     436              :     loop {
     437            0 :         let added_task = match iter.next() {
     438            0 :             Some(s) => {
     439            0 :                 uploads.push_back(backup_single_segment(
     440            0 :                     &storage,
     441            0 :                     s,
     442            0 :                     timeline_dir,
     443            0 :                     remote_timeline_path,
     444              :                 ));
     445            0 :                 true
     446              :             }
     447            0 :             None => false,
     448              :         };
     449              : 
     450              :         // Wait for the next segment to upload if we don't have any more segments,
     451              :         // or if we have too many concurrent uploads.
     452            0 :         if !added_task || uploads.len() >= parallel_jobs {
     453            0 :             let next = uploads.next().await;
     454            0 :             if let Some(res) = next {
     455              :                 // next segment uploaded
     456            0 :                 let segment = res?;
     457            0 :                 let new_backup_lsn = segment.end_lsn;
     458            0 :                 timeline
     459            0 :                     .set_wal_backup_lsn(new_backup_lsn)
     460            0 :                     .await
     461            0 :                     .context("setting wal_backup_lsn")?;
     462            0 :                 *backup_lsn = new_backup_lsn;
     463              :             } else {
     464              :                 // no more segments to upload
     465            0 :                 break;
     466              :             }
     467            0 :         }
     468              :     }
     469              : 
     470            0 :     info!(
     471            0 :         "offloaded segnos {:?} of range [{}-{})",
     472            0 :         segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
     473              :         start_lsn,
     474              :         end_lsn,
     475              :     );
     476            0 :     Ok(())
     477            0 : }
     478              : 
     479            0 : async fn backup_single_segment(
     480            0 :     storage: &GenericRemoteStorage,
     481            0 :     seg: &Segment,
     482            0 :     timeline_dir: &Utf8Path,
     483            0 :     remote_timeline_path: &RemotePath,
     484            0 : ) -> Result<Segment> {
     485            0 :     let segment_file_path = seg.file_path(timeline_dir)?;
     486            0 :     let remote_segment_path = seg.remote_path(remote_timeline_path);
     487              : 
     488            0 :     let res = backup_object(
     489            0 :         storage,
     490            0 :         &segment_file_path,
     491            0 :         &remote_segment_path,
     492            0 :         seg.size(),
     493            0 :     )
     494            0 :     .await;
     495            0 :     if res.is_ok() {
     496            0 :         BACKED_UP_SEGMENTS.inc();
     497            0 :     } else {
     498            0 :         BACKUP_ERRORS.inc();
     499            0 :     }
     500            0 :     res?;
     501            0 :     debug!("Backup of {} done", segment_file_path);
     502              : 
     503            0 :     Ok(*seg)
     504            0 : }
     505              : 
     506              : #[derive(Debug, Copy, Clone)]
     507              : pub struct Segment {
     508              :     seg_no: XLogSegNo,
     509              :     start_lsn: Lsn,
     510              :     end_lsn: Lsn,
     511              : }
     512              : 
     513              : impl Segment {
     514            0 :     pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
     515            0 :         Self {
     516            0 :             seg_no,
     517            0 :             start_lsn,
     518            0 :             end_lsn,
     519            0 :         }
     520            0 :     }
     521              : 
     522            0 :     pub fn object_name(self) -> String {
     523            0 :         XLogFileName(PG_TLI, self.seg_no, self.size())
     524            0 :     }
     525              : 
     526            0 :     pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
     527            0 :         Ok(timeline_dir.join(self.object_name()))
     528            0 :     }
     529              : 
     530            0 :     pub fn remote_path(self, remote_timeline_path: &RemotePath) -> RemotePath {
     531            0 :         remote_timeline_path.join(self.object_name())
     532            0 :     }
     533              : 
     534            0 :     pub fn size(self) -> usize {
     535            0 :         (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
     536            0 :     }
     537              : }
     538              : 
     539            0 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
     540            0 :     let first_seg = start.segment_number(seg_size);
     541            0 :     let last_seg = end.segment_number(seg_size);
     542              : 
     543            0 :     let res: Vec<Segment> = (first_seg..last_seg)
     544            0 :         .map(|s| {
     545            0 :             let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
     546            0 :             let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
     547            0 :             Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
     548            0 :         })
     549            0 :         .collect();
     550            0 :     res
     551            0 : }
     552              : 
     553            0 : async fn backup_object(
     554            0 :     storage: &GenericRemoteStorage,
     555            0 :     source_file: &Utf8Path,
     556            0 :     target_file: &RemotePath,
     557            0 :     size: usize,
     558            0 : ) -> Result<()> {
     559            0 :     let file = File::open(&source_file)
     560            0 :         .await
     561            0 :         .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
     562              : 
     563            0 :     let file = tokio_util::io::ReaderStream::with_capacity(file, BUFFER_SIZE);
     564              : 
     565            0 :     let cancel = CancellationToken::new();
     566              : 
     567            0 :     storage
     568            0 :         .upload_storage_object(file, size, target_file, &cancel)
     569            0 :         .await
     570            0 : }
     571              : 
     572            0 : pub(crate) async fn backup_partial_segment(
     573            0 :     storage: &GenericRemoteStorage,
     574            0 :     source_file: &Utf8Path,
     575            0 :     target_file: &RemotePath,
     576            0 :     size: usize,
     577            0 : ) -> Result<()> {
     578            0 :     let file = File::open(&source_file)
     579            0 :         .await
     580            0 :         .with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
     581              : 
     582              :     // limiting the file to read only the first `size` bytes
     583            0 :     let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);
     584              : 
     585            0 :     let file = tokio_util::io::ReaderStream::with_capacity(limited_file, BUFFER_SIZE);
     586              : 
     587            0 :     let cancel = CancellationToken::new();
     588              : 
     589            0 :     storage
     590            0 :         .upload(
     591            0 :             file,
     592            0 :             size,
     593            0 :             target_file,
     594            0 :             Some(StorageMetadata::from([("sk_type", "partial_segment")])),
     595            0 :             &cancel,
     596            0 :         )
     597            0 :         .await
     598            0 : }
     599              : 
     600            0 : pub(crate) async fn copy_partial_segment(
     601            0 :     storage: &GenericRemoteStorage,
     602            0 :     source: &RemotePath,
     603            0 :     destination: &RemotePath,
     604            0 : ) -> Result<()> {
     605            0 :     let cancel = CancellationToken::new();
     606              : 
     607            0 :     storage.copy_object(source, destination, &cancel).await
     608            0 : }
     609              : 
     610            0 : pub async fn read_object(
     611            0 :     storage: &GenericRemoteStorage,
     612            0 :     file_path: &RemotePath,
     613            0 :     offset: u64,
     614            0 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
     615            0 :     info!("segment download about to start from remote path {file_path:?} at offset {offset}");
     616              : 
     617            0 :     let cancel = CancellationToken::new();
     618              : 
     619            0 :     let opts = DownloadOpts {
     620            0 :         byte_start: std::ops::Bound::Included(offset),
     621            0 :         ..Default::default()
     622            0 :     };
     623            0 :     let download = storage
     624            0 :         .download(file_path, &opts, &cancel)
     625            0 :         .await
     626            0 :         .with_context(|| {
     627            0 :             format!("Failed to open WAL segment download stream for remote path {file_path:?}")
     628            0 :         })?;
     629              : 
     630            0 :     let reader = tokio_util::io::StreamReader::new(download.download_stream);
     631              : 
     632            0 :     let reader = tokio::io::BufReader::with_capacity(BUFFER_SIZE, reader);
     633              : 
     634            0 :     Ok(Box::pin(reader))
     635            0 : }
     636              : 
     637              : /// Delete WAL files for the given timeline. Remote storage must be configured
     638              : /// when called.
     639            0 : pub async fn delete_timeline(
     640            0 :     storage: &GenericRemoteStorage,
     641            0 :     ttid: &TenantTimelineId,
     642            0 : ) -> Result<()> {
     643            0 :     let remote_path = remote_timeline_path(ttid)?;
     644              : 
     645              :     // see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
     646              :     // const Option unwrap is not stable, otherwise it would be const.
     647            0 :     let batch_size: NonZeroU32 = NonZeroU32::new(1000).unwrap();
     648              : 
     649              :     // A backoff::retry is used here for two reasons:
     650              :     // - To provide a backoff rather than busy-polling the API on errors
     651              :     // - To absorb transient 429/503 conditions without hitting our error
     652              :     //   logging path for issues deleting objects.
     653              :     //
     654              :     // Note: listing segments might take a long time if there are many of them.
     655              :     // We don't currently have http requests timeout cancellation, but if/once
     656              :     // we have listing should get streaming interface to make progress.
     657              : 
     658            0 :     pausable_failpoint!("sk-delete-timeline-remote-pause");
     659              : 
     660            0 :     fail::fail_point!("sk-delete-timeline-remote", |_| {
     661            0 :         Err(anyhow::anyhow!("failpoint: sk-delete-timeline-remote"))
     662            0 :     });
     663              : 
     664            0 :     let cancel = CancellationToken::new(); // not really used
     665            0 :     backoff::retry(
     666            0 :         || async {
     667              :             // Do list-delete in batch_size batches to make progress even if there a lot of files.
     668              :             // Alternatively we could make remote storage list return iterator, but it is more complicated and
     669              :             // I'm not sure deleting while iterating is expected in s3.
     670              :             loop {
     671            0 :                 let files = storage
     672            0 :                     .list(
     673            0 :                         Some(&remote_path),
     674            0 :                         ListingMode::NoDelimiter,
     675            0 :                         Some(batch_size),
     676            0 :                         &cancel,
     677            0 :                     )
     678            0 :                     .await?
     679              :                     .keys
     680            0 :                     .into_iter()
     681            0 :                     .map(|o| o.key)
     682            0 :                     .collect::<Vec<_>>();
     683            0 :                 if files.is_empty() {
     684            0 :                     return Ok(()); // done
     685            0 :                 }
     686              :                 // (at least) s3 results are sorted, so can log min/max:
     687              :                 // "List results are always returned in UTF-8 binary order."
     688            0 :                 info!(
     689            0 :                     "deleting batch of {} WAL segments [{}-{}]",
     690            0 :                     files.len(),
     691            0 :                     files.first().unwrap().object_name().unwrap_or(""),
     692            0 :                     files.last().unwrap().object_name().unwrap_or("")
     693              :                 );
     694            0 :                 storage.delete_objects(&files, &cancel).await?;
     695              :             }
     696            0 :         },
     697              :         // consider TimeoutOrCancel::caused_by_cancel when using cancellation
     698              :         |_| false,
     699              :         3,
     700              :         10,
     701            0 :         "executing WAL segments deletion batch",
     702            0 :         &cancel,
     703              :     )
     704            0 :     .await
     705            0 :     .ok_or_else(|| anyhow::anyhow!("canceled"))
     706            0 :     .and_then(|x| x)?;
     707              : 
     708            0 :     Ok(())
     709            0 : }
     710              : 
     711              : /// Used by wal_backup_partial.
     712            0 : pub async fn delete_objects(storage: &GenericRemoteStorage, paths: &[RemotePath]) -> Result<()> {
     713            0 :     let cancel = CancellationToken::new(); // not really used
     714            0 :     storage.delete_objects(paths, &cancel).await
     715            0 : }
     716              : 
     717              : /// Copy segments from one timeline to another. Used in copy_timeline.
     718            0 : pub async fn copy_s3_segments(
     719            0 :     storage: &GenericRemoteStorage,
     720            0 :     wal_seg_size: usize,
     721            0 :     src_ttid: &TenantTimelineId,
     722            0 :     dst_ttid: &TenantTimelineId,
     723            0 :     from_segment: XLogSegNo,
     724            0 :     to_segment: XLogSegNo,
     725            0 : ) -> Result<()> {
     726              :     const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
     727              : 
     728            0 :     let remote_dst_path = remote_timeline_path(dst_ttid)?;
     729              : 
     730            0 :     let cancel = CancellationToken::new();
     731              : 
     732            0 :     let files = storage
     733            0 :         .list(
     734            0 :             Some(&remote_dst_path),
     735            0 :             ListingMode::NoDelimiter,
     736            0 :             None,
     737            0 :             &cancel,
     738            0 :         )
     739            0 :         .await?
     740              :         .keys;
     741              : 
     742            0 :     let uploaded_segments = &files
     743            0 :         .iter()
     744            0 :         .filter_map(|o| o.key.object_name().map(ToOwned::to_owned))
     745            0 :         .collect::<HashSet<_>>();
     746              : 
     747            0 :     debug!(
     748            0 :         "these segments have already been uploaded: {:?}",
     749              :         uploaded_segments
     750              :     );
     751              : 
     752            0 :     for segno in from_segment..to_segment {
     753            0 :         if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
     754            0 :             info!("copied all segments from {} until {}", from_segment, segno);
     755            0 :         }
     756              : 
     757            0 :         let segment_name = XLogFileName(PG_TLI, segno, wal_seg_size);
     758            0 :         if uploaded_segments.contains(&segment_name) {
     759            0 :             continue;
     760            0 :         }
     761            0 :         debug!("copying segment {}", segment_name);
     762              : 
     763            0 :         let from = remote_timeline_path(src_ttid)?.join(&segment_name);
     764            0 :         let to = remote_dst_path.join(&segment_name);
     765              : 
     766            0 :         storage.copy_object(&from, &to, &cancel).await?;
     767              :     }
     768              : 
     769            0 :     info!(
     770            0 :         "finished copying segments from {} until {}",
     771              :         from_segment, to_segment
     772              :     );
     773            0 :     Ok(())
     774            0 : }
     775              : 
     776              : /// Get S3 (remote_storage) prefix path used for timeline files.
     777           14 : pub fn remote_timeline_path(ttid: &TenantTimelineId) -> Result<RemotePath> {
     778           14 :     RemotePath::new(&Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string()))
     779           14 : }
        

Generated by: LCOV version 2.1-beta