LCOV - differential code coverage report
Current view: top level - safekeeper/src - wal_backup.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 95.0 % 339 322 1 16 322
Current Date: 2023-10-19 02:04:12 Functions: 78.9 % 57 45 12 45
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use anyhow::{Context, Result};
       2                 : 
       3                 : use camino::{Utf8Path, Utf8PathBuf};
       4                 : use futures::stream::FuturesOrdered;
       5                 : use futures::StreamExt;
       6                 : use tokio::task::JoinHandle;
       7                 : use utils::id::NodeId;
       8                 : 
       9                 : use std::cmp::min;
      10                 : use std::collections::HashMap;
      11                 : use std::pin::Pin;
      12                 : use std::sync::Arc;
      13                 : use std::time::Duration;
      14                 : 
      15                 : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
      16                 : use postgres_ffi::XLogFileName;
      17                 : use postgres_ffi::{XLogSegNo, PG_TLI};
      18                 : use remote_storage::{GenericRemoteStorage, RemotePath};
      19                 : use tokio::fs::File;
      20                 : 
      21                 : use tokio::select;
      22                 : use tokio::sync::mpsc::{self, Receiver, Sender};
      23                 : use tokio::sync::watch;
      24                 : use tokio::time::sleep;
      25                 : use tracing::*;
      26                 : 
      27                 : use utils::{id::TenantTimelineId, lsn::Lsn};
      28                 : 
      29                 : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
      30                 : use crate::timeline::{PeerInfo, Timeline};
      31                 : use crate::{GlobalTimelines, SafeKeeperConf};
      32                 : 
      33                 : use once_cell::sync::OnceCell;
      34                 : 
      35                 : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
      36                 : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
      37                 : 
      38                 : /// Check whether wal backup is required for timeline. If yes, mark that launcher is
      39                 : /// aware of current status and return the timeline.
      40 CBC         192 : async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
      41             192 :     match GlobalTimelines::get(ttid).ok() {
      42             192 :         Some(tli) => {
      43             192 :             tli.wal_backup_attend().await;
      44             192 :             Some(tli)
      45                 :         }
      46 UBC           0 :         None => None,
      47                 :     }
      48 CBC         192 : }
      49                 : 
      50                 : struct WalBackupTaskHandle {
      51                 :     shutdown_tx: Sender<()>,
      52                 :     handle: JoinHandle<()>,
      53                 : }
      54                 : 
      55                 : struct WalBackupTimelineEntry {
      56                 :     timeline: Arc<Timeline>,
      57                 :     handle: Option<WalBackupTaskHandle>,
      58                 : }
      59                 : 
      60               9 : async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
      61               9 :     if let Some(wb_handle) = entry.handle.take() {
      62                 :         // Tell the task to shutdown. Error means task exited earlier, that's ok.
      63               9 :         let _ = wb_handle.shutdown_tx.send(()).await;
      64                 :         // Await the task itself. TODO: restart panicked tasks earlier.
      65              10 :         if let Err(e) = wb_handle.handle.await {
      66 UBC           0 :             warn!("WAL backup task for {} panicked: {}", ttid, e);
      67 CBC           9 :         }
      68 UBC           0 :     }
      69 CBC           9 : }
      70                 : 
      71                 : /// The goal is to ensure that normally only one safekeepers offloads. However,
      72                 : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
      73                 : /// time we have several ones as they PUT the same files. Also,
      74                 : /// - frequently changing the offloader would be bad;
      75                 : /// - electing seriously lagging safekeeper is undesirable;
      76                 : /// So we deterministically choose among the reasonably caught up candidates.
      77                 : /// TODO: take into account failed attempts to deal with hypothetical situation
      78                 : /// where s3 is unreachable only for some sks.
      79             260 : fn determine_offloader(
      80             260 :     alive_peers: &[PeerInfo],
      81             260 :     wal_backup_lsn: Lsn,
      82             260 :     ttid: TenantTimelineId,
      83             260 :     conf: &SafeKeeperConf,
      84             260 : ) -> (Option<NodeId>, String) {
      85             260 :     // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
      86             260 :     let capable_peers = alive_peers
      87             260 :         .iter()
      88            1836 :         .filter(|p| p.local_start_lsn <= wal_backup_lsn);
      89             612 :     match capable_peers.clone().map(|p| p.commit_lsn).max() {
      90              37 :         None => (None, "no connected peers to elect from".to_string()),
      91             223 :         Some(max_commit_lsn) => {
      92             223 :             let threshold = max_commit_lsn
      93             223 :                 .checked_sub(conf.max_offloader_lag_bytes)
      94             223 :                 .unwrap_or(Lsn(0));
      95             223 :             let mut caughtup_peers = capable_peers
      96             223 :                 .clone()
      97             612 :                 .filter(|p| p.commit_lsn >= threshold)
      98             223 :                 .collect::<Vec<_>>();
      99             455 :             caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
     100             223 : 
     101             223 :             // To distribute the load, shift by timeline_id.
     102             223 :             let offloader = caughtup_peers
     103             223 :                 [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
     104             223 :                 .sk_id;
     105             223 : 
     106             223 :             let mut capable_peers_dbg = capable_peers
     107             612 :                 .map(|p| (p.sk_id, p.commit_lsn))
     108             223 :                 .collect::<Vec<_>>();
     109             455 :             capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
     110             223 :             (
     111             223 :                 Some(offloader),
     112             223 :                 format!(
     113             223 :                     "elected {} among {:?} peers, with {} of them being caughtup",
     114             223 :                     offloader,
     115             223 :                     capable_peers_dbg,
     116             223 :                     caughtup_peers.len()
     117             223 :                 ),
     118             223 :             )
     119                 :         }
     120                 :     }
     121             260 : }
     122                 : 
     123                 : /// Based on peer information determine which safekeeper should offload; if it
     124                 : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
     125                 : /// is running, kill it.
     126             260 : async fn update_task(
     127             260 :     conf: &SafeKeeperConf,
     128             260 :     ttid: TenantTimelineId,
     129             260 :     entry: &mut WalBackupTimelineEntry,
     130             260 : ) {
     131             260 :     let alive_peers = entry.timeline.get_peers(conf).await;
     132             260 :     let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
     133             260 :     let (offloader, election_dbg_str) =
     134             260 :         determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
     135             260 :     let elected_me = Some(conf.my_id) == offloader;
     136             260 : 
     137             260 :     if elected_me != (entry.handle.is_some()) {
     138              34 :         if elected_me {
     139              25 :             info!("elected for backup {}: {}", ttid, election_dbg_str);
     140                 : 
     141              25 :             let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
     142              25 :             let timeline_dir = conf.timeline_dir(&ttid);
     143                 : 
     144              25 :             let handle = tokio::spawn(
     145              25 :                 backup_task_main(
     146              25 :                     ttid,
     147              25 :                     timeline_dir,
     148              25 :                     conf.workdir.clone(),
     149              25 :                     conf.backup_parallel_jobs,
     150              25 :                     shutdown_rx,
     151              25 :                 )
     152              25 :                 .instrument(info_span!("WAL backup task", ttid = %ttid)),
     153                 :             );
     154                 : 
     155              25 :             entry.handle = Some(WalBackupTaskHandle {
     156              25 :                 shutdown_tx,
     157              25 :                 handle,
     158              25 :             });
     159                 :         } else {
     160               9 :             info!("stepping down from backup {}: {}", ttid, election_dbg_str);
     161              10 :             shut_down_task(ttid, entry).await;
     162                 :         }
     163             226 :     }
     164             260 : }
     165                 : 
     166                 : const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
     167                 : 
     168                 : /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
     169                 : /// tasks. Having this in separate task simplifies locking, allows to reap
     170                 : /// panics and separate elections from offloading itself.
     171             500 : pub async fn wal_backup_launcher_task_main(
     172             500 :     conf: SafeKeeperConf,
     173             500 :     mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
     174             500 : ) -> anyhow::Result<()> {
     175             500 :     info!(
     176             500 :         "WAL backup launcher started, remote config {:?}",
     177             500 :         conf.remote_storage
     178             500 :     );
     179                 : 
     180             500 :     let conf_ = conf.clone();
     181             500 :     REMOTE_STORAGE.get_or_init(|| {
     182             500 :         conf_
     183             500 :             .remote_storage
     184             500 :             .as_ref()
     185             500 :             .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
     186             500 :     });
     187             500 : 
     188             500 :     // Presence in this map means launcher is aware s3 offloading is needed for
     189             500 :     // the timeline, but task is started only if it makes sense for to offload
     190             500 :     // from this safekeeper.
     191             500 :     let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
     192             500 : 
     193             500 :     let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
     194                 :     loop {
     195           19305 :         tokio::select! {
     196           11857 :             ttid = wal_backup_launcher_rx.recv() => {
     197                 :                 // channel is never expected to get closed
     198                 :                 let ttid = ttid.unwrap();
     199                 :                 if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
     200                 :                     continue; /* just drain the channel and do nothing */
     201                 :                 }
     202                 :                 let timeline = is_wal_backup_required(ttid).await;
     203                 :                 // do we need to do anything at all?
     204                 :                 if timeline.is_some() != tasks.contains_key(&ttid) {
     205                 :                     if let Some(timeline) = timeline {
     206                 :                         // need to start the task
     207                 :                         let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
     208                 :                             timeline,
     209                 :                             handle: None,
     210                 :                         });
     211                 :                         update_task(&conf, ttid, entry).await;
     212                 :                     } else {
     213                 :                         // need to stop the task
     214 UBC           0 :                         info!("stopping WAL backup task for {}", ttid);
     215                 :                         let mut entry = tasks.remove(&ttid).unwrap();
     216                 :                         shut_down_task(ttid, &mut entry).await;
     217                 :                     }
     218                 :                 }
     219                 :             }
     220                 :             // For each timeline needing offloading, check if this safekeeper
     221                 :             // should do the job and start/stop the task accordingly.
     222                 :             _ = ticker.tick() => {
     223                 :                 for (ttid, entry) in tasks.iter_mut() {
     224                 :                     update_task(&conf, *ttid, entry).await;
     225                 :                 }
     226                 :             }
     227                 :         }
     228                 :     }
     229                 : }
     230                 : 
     231                 : struct WalBackupTask {
     232                 :     timeline: Arc<Timeline>,
     233                 :     timeline_dir: Utf8PathBuf,
     234                 :     workspace_dir: Utf8PathBuf,
     235                 :     wal_seg_size: usize,
     236                 :     parallel_jobs: usize,
     237                 :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     238                 : }
     239                 : 
     240                 : /// Offload single timeline.
     241 CBC          25 : async fn backup_task_main(
     242              25 :     ttid: TenantTimelineId,
     243              25 :     timeline_dir: Utf8PathBuf,
     244              25 :     workspace_dir: Utf8PathBuf,
     245              25 :     parallel_jobs: usize,
     246              25 :     mut shutdown_rx: Receiver<()>,
     247              25 : ) {
     248              25 :     info!("started");
     249              25 :     let res = GlobalTimelines::get(ttid);
     250              25 :     if let Err(e) = res {
     251 UBC           0 :         error!("backup error for timeline {}: {}", ttid, e);
     252               0 :         return;
     253 CBC          25 :     }
     254              25 :     let tli = res.unwrap();
     255                 : 
     256              25 :     let mut wb = WalBackupTask {
     257              25 :         wal_seg_size: tli.get_wal_seg_size().await,
     258              25 :         commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
     259              25 :         timeline: tli,
     260              25 :         timeline_dir,
     261              25 :         workspace_dir,
     262              25 :         parallel_jobs,
     263              25 :     };
     264              25 : 
     265              25 :     // task is spinned up only when wal_seg_size already initialized
     266              25 :     assert!(wb.wal_seg_size > 0);
     267                 : 
     268              25 :     let mut canceled = false;
     269           21386 :     select! {
     270           21386 :         _ = wb.run() => {}
     271           21386 :         _ = shutdown_rx.recv() => {
     272           21386 :             canceled = true;
     273           21386 :         }
     274           21386 :     }
     275               9 :     info!("task {}", if canceled { "canceled" } else { "terminated" });
     276               9 : }
     277                 : 
     278                 : impl WalBackupTask {
     279              25 :     async fn run(&mut self) {
     280              25 :         let mut backup_lsn = Lsn(0);
     281              25 : 
     282              25 :         let mut retry_attempt = 0u32;
     283                 :         // offload loop
     284            2329 :         loop {
     285            2329 :             if retry_attempt == 0 {
     286                 :                 // wait for new WAL to arrive
     287            2328 :                 if let Err(e) = self.commit_lsn_watch_rx.changed().await {
     288                 :                     // should never happen, as we hold Arc to timeline.
     289 UBC           0 :                     error!("commit_lsn watch shut down: {:?}", e);
     290               0 :                     return;
     291 CBC        2304 :                 }
     292                 :             } else {
     293                 :                 // or just sleep if we errored previously
     294               1 :                 let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
     295               1 :                 if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
     296               1 :                 {
     297               1 :                     retry_delay = min(retry_delay, backoff_delay);
     298               1 :                 }
     299               1 :                 sleep(Duration::from_millis(retry_delay)).await;
     300                 :             }
     301                 : 
     302            2304 :             let commit_lsn = *self.commit_lsn_watch_rx.borrow();
     303            2304 : 
     304            2304 :             // Note that backup_lsn can be higher than commit_lsn if we
     305            2304 :             // don't have much local WAL and others already uploaded
     306            2304 :             // segments we don't even have.
     307            2304 :             if backup_lsn.segment_number(self.wal_seg_size)
     308            2304 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     309                 :             {
     310            2267 :                 retry_attempt = 0;
     311            2267 :                 continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
     312              37 :             }
     313              37 :             // Perhaps peers advanced the position, check shmem value.
     314              37 :             backup_lsn = self.timeline.get_wal_backup_lsn().await;
     315              37 :             if backup_lsn.segment_number(self.wal_seg_size)
     316              37 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     317                 :             {
     318              16 :                 retry_attempt = 0;
     319              16 :                 continue;
     320              21 :             }
     321              21 : 
     322              21 :             match backup_lsn_range(
     323              21 :                 &self.timeline,
     324              21 :                 &mut backup_lsn,
     325              21 :                 commit_lsn,
     326              21 :                 self.wal_seg_size,
     327              21 :                 &self.timeline_dir,
     328              21 :                 &self.workspace_dir,
     329              21 :                 self.parallel_jobs,
     330              21 :             )
     331           19058 :             .await
     332                 :             {
     333              20 :                 Ok(()) => {
     334              20 :                     retry_attempt = 0;
     335              20 :                 }
     336               1 :                 Err(e) => {
     337               1 :                     error!(
     338               1 :                         "failed while offloading range {}-{}: {:?}",
     339               1 :                         backup_lsn, commit_lsn, e
     340               1 :                     );
     341                 : 
     342               1 :                     retry_attempt = retry_attempt.saturating_add(1);
     343                 :                 }
     344                 :             }
     345                 :         }
     346 UBC           0 :     }
     347                 : }
     348                 : 
     349 CBC          21 : pub async fn backup_lsn_range(
     350              21 :     timeline: &Arc<Timeline>,
     351              21 :     backup_lsn: &mut Lsn,
     352              21 :     end_lsn: Lsn,
     353              21 :     wal_seg_size: usize,
     354              21 :     timeline_dir: &Utf8Path,
     355              21 :     workspace_dir: &Utf8Path,
     356              21 :     parallel_jobs: usize,
     357              21 : ) -> Result<()> {
     358              21 :     if parallel_jobs < 1 {
     359 UBC           0 :         anyhow::bail!("parallel_jobs must be >= 1");
     360 CBC          21 :     }
     361              21 : 
     362              21 :     let start_lsn = *backup_lsn;
     363              21 :     let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
     364              21 : 
     365              21 :     // Pool of concurrent upload tasks. We use `FuturesOrdered` to
     366              21 :     // preserve order of uploads, and update `backup_lsn` only after
     367              21 :     // all previous uploads are finished.
     368              21 :     let mut uploads = FuturesOrdered::new();
     369              21 :     let mut iter = segments.iter();
     370                 : 
     371                 :     loop {
     372              62 :         let added_task = match iter.next() {
     373              21 :             Some(s) => {
     374              21 :                 uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
     375              21 :                 true
     376                 :             }
     377              41 :             None => false,
     378                 :         };
     379                 : 
     380                 :         // Wait for the next segment to upload if we don't have any more segments,
     381                 :         // or if we have too many concurrent uploads.
     382              62 :         if !added_task || uploads.len() >= parallel_jobs {
     383           19058 :             let next = uploads.next().await;
     384              41 :             if let Some(res) = next {
     385                 :                 // next segment uploaded
     386              21 :                 let segment = res?;
     387              20 :                 let new_backup_lsn = segment.end_lsn;
     388              20 :                 timeline
     389              20 :                     .set_wal_backup_lsn(new_backup_lsn)
     390 LBC         (6) :                     .await
     391 CBC          20 :                     .context("setting wal_backup_lsn")?;
     392              20 :                 *backup_lsn = new_backup_lsn;
     393                 :             } else {
     394                 :                 // no more segments to upload
     395              20 :                 break;
     396                 :             }
     397              21 :         }
     398                 :     }
     399                 : 
     400              20 :     info!(
     401              20 :         "offloaded segnos {:?} up to {}, previous backup_lsn {}",
     402              20 :         segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
     403              20 :         end_lsn,
     404              20 :         start_lsn,
     405              20 :     );
     406              20 :     Ok(())
     407              21 : }
     408                 : 
     409              21 : async fn backup_single_segment(
     410              21 :     seg: &Segment,
     411              21 :     timeline_dir: &Utf8Path,
     412              21 :     workspace_dir: &Utf8Path,
     413              21 : ) -> Result<Segment> {
     414              21 :     let segment_file_path = seg.file_path(timeline_dir)?;
     415              21 :     let remote_segment_path = segment_file_path
     416              21 :         .strip_prefix(workspace_dir)
     417              21 :         .context("Failed to strip workspace dir prefix")
     418              21 :         .and_then(RemotePath::new)
     419              21 :         .with_context(|| {
     420 UBC           0 :             format!(
     421               0 :                 "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
     422               0 :             )
     423 CBC          21 :         })?;
     424                 : 
     425           16287 :     let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
     426              21 :     if res.is_ok() {
     427              20 :         BACKED_UP_SEGMENTS.inc();
     428              20 :     } else {
     429               1 :         BACKUP_ERRORS.inc();
     430               1 :     }
     431              21 :     res?;
     432 UBC           0 :     debug!("Backup of {} done", segment_file_path);
     433                 : 
     434 CBC          20 :     Ok(*seg)
     435              21 : }
     436                 : 
     437 UBC           0 : #[derive(Debug, Copy, Clone)]
     438                 : pub struct Segment {
     439                 :     seg_no: XLogSegNo,
     440                 :     start_lsn: Lsn,
     441                 :     end_lsn: Lsn,
     442                 : }
     443                 : 
     444                 : impl Segment {
     445 CBC          21 :     pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
     446              21 :         Self {
     447              21 :             seg_no,
     448              21 :             start_lsn,
     449              21 :             end_lsn,
     450              21 :         }
     451              21 :     }
     452                 : 
     453              21 :     pub fn object_name(self) -> String {
     454              21 :         XLogFileName(PG_TLI, self.seg_no, self.size())
     455              21 :     }
     456                 : 
     457              21 :     pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
     458              21 :         Ok(timeline_dir.join(self.object_name()))
     459              21 :     }
     460                 : 
     461              42 :     pub fn size(self) -> usize {
     462              42 :         (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
     463              42 :     }
     464                 : }
     465                 : 
     466              21 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
     467              21 :     let first_seg = start.segment_number(seg_size);
     468              21 :     let last_seg = end.segment_number(seg_size);
     469              21 : 
     470              21 :     let res: Vec<Segment> = (first_seg..last_seg)
     471              21 :         .map(|s| {
     472              21 :             let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
     473              21 :             let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
     474              21 :             Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
     475              21 :         })
     476              21 :         .collect();
     477              21 :     res
     478              21 : }
     479                 : 
     480                 : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
     481                 : 
     482              21 : async fn backup_object(
     483              21 :     source_file: &Utf8Path,
     484              21 :     target_file: &RemotePath,
     485              21 :     size: usize,
     486              21 : ) -> Result<()> {
     487              21 :     let storage = REMOTE_STORAGE
     488              21 :         .get()
     489              21 :         .expect("failed to get remote storage")
     490              21 :         .as_ref()
     491              21 :         .unwrap();
     492                 : 
     493              21 :     let file = tokio::io::BufReader::new(
     494              21 :         File::open(&source_file)
     495              21 :             .await
     496              21 :             .with_context(|| format!("Failed to open file {} for wal backup", source_file))?,
     497                 :     );
     498                 : 
     499              21 :     storage
     500              21 :         .upload_storage_object(Box::new(file), size, target_file)
     501           16266 :         .await
     502              21 : }
     503                 : 
     504               6 : pub async fn read_object(
     505               6 :     file_path: &RemotePath,
     506               6 :     offset: u64,
     507               6 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
     508               6 :     let storage = REMOTE_STORAGE
     509               6 :         .get()
     510               6 :         .context("Failed to get remote storage")?
     511               6 :         .as_ref()
     512               6 :         .context("No remote storage configured")?;
     513                 : 
     514               6 :     info!("segment download about to start from remote path {file_path:?} at offset {offset}");
     515                 : 
     516               6 :     let download = storage
     517               6 :         .download_storage_object(Some((offset, None)), file_path)
     518              23 :         .await
     519               6 :         .with_context(|| {
     520 UBC           0 :             format!("Failed to open WAL segment download stream for remote path {file_path:?}")
     521 CBC           6 :         })?;
     522                 : 
     523               6 :     Ok(download.download_stream)
     524               6 : }
        

Generated by: LCOV version 2.1-beta