LCOV - code coverage report
Current view: top level - safekeeper/src - wal_backup_partial.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 2.9 % 205 6
Test Date: 2025-05-26 10:37:33 Functions: 2.2 % 46 1

            Line data    Source code
       1              : //! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
       2              : //! and `flush_lsn` updates.
       3              : //!
       4              : //! After the partial segment was updated (`flush_lsn` was changed), the segment
       5              : //! will be uploaded to S3 within the configured `partial_backup_timeout`.
       6              : //!
       7              : //! The filename format for partial segments is
       8              : //! `Segment_Term_Flush_Commit_skNN.partial`, where:
       9              : //! - `Segment` – the segment name, like `000000010000000000000001`
      10              : //! - `Term` – current term
      11              : //! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
      12              : //! - `Commit` – commit_lsn in the same hex format
      13              : //! - `NN` – safekeeper_id, like `1`
      14              : //!
      15              : //! The full object name example:
      16              : //! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
      17              : //!
      18              : //! Each safekeeper will keep info about remote partial segments in its control
      19              : //! file. Code updates state in the control file before doing any S3 operations.
      20              : //! This way control file stores information about all potentially existing
      21              : //! remote partial segments and can clean them up after uploading a newer version.
      22              : use std::sync::Arc;
      23              : 
      24              : use camino::Utf8PathBuf;
      25              : use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
      26              : use remote_storage::{GenericRemoteStorage, RemotePath};
      27              : use safekeeper_api::Term;
      28              : use serde::{Deserialize, Serialize};
      29              : use tokio_util::sync::CancellationToken;
      30              : use tracing::{debug, error, info, instrument, warn};
      31              : use utils::id::NodeId;
      32              : use utils::lsn::Lsn;
      33              : 
      34              : use crate::SafeKeeperConf;
      35              : use crate::metrics::{
      36              :     MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS,
      37              : };
      38              : use crate::rate_limit::{RateLimiter, rand_duration};
      39              : use crate::timeline::WalResidentTimeline;
      40              : use crate::timeline_manager::StateSnapshot;
      41              : use crate::wal_backup::{self};
      42              : 
      43            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      44              : pub enum UploadStatus {
      45              :     /// Upload is in progress. This status should be used only for garbage collection,
      46              :     /// don't read data from the remote storage with this status.
      47              :     InProgress,
      48              :     /// Upload is finished. There is always at most one segment with this status.
      49              :     /// It means that the segment is actual and can be used.
      50              :     Uploaded,
      51              :     /// Deletion is in progress. This status should be used only for garbage collection,
      52              :     /// don't read data from the remote storage with this status.
      53              :     Deleting,
      54              : }
      55              : 
      56            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      57              : pub struct PartialRemoteSegment {
      58              :     pub status: UploadStatus,
      59              :     pub name: String,
      60              :     pub commit_lsn: Lsn,
      61              :     pub flush_lsn: Lsn,
      62              :     // We should use last_log_term here, otherwise it's possible to have inconsistent data in the
      63              :     // remote storage.
      64              :     //
      65              :     // More info here: https://github.com/neondatabase/neon/pull/8022#discussion_r1654738405
      66              :     pub term: Term,
      67              : }
      68              : 
      69              : impl PartialRemoteSegment {
      70            0 :     fn eq_without_status(&self, other: &Self) -> bool {
      71            0 :         self.name == other.name
      72            0 :             && self.commit_lsn == other.commit_lsn
      73            0 :             && self.flush_lsn == other.flush_lsn
      74            0 :             && self.term == other.term
      75            0 :     }
      76              : 
      77            0 :     pub(crate) fn remote_path(&self, remote_timeline_path: &RemotePath) -> RemotePath {
      78            0 :         remote_timeline_path.join(&self.name)
      79            0 :     }
      80              : }
      81              : 
      82              : // NB: these structures are a part of a control_file, you can't change them without
      83              : // changing the control file format version.
      84            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
      85              : pub struct State {
      86              :     pub segments: Vec<PartialRemoteSegment>,
      87              : }
      88              : 
      89              : #[derive(Debug)]
      90              : pub(crate) struct ReplaceUploadedSegment {
      91              :     pub(crate) previous: PartialRemoteSegment,
      92              :     pub(crate) current: PartialRemoteSegment,
      93              : }
      94              : 
      95              : impl State {
      96              :     /// Find an Uploaded segment. There should be only one Uploaded segment at a time.
      97            5 :     pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
      98            5 :         self.segments
      99            5 :             .iter()
     100            5 :             .find(|seg| seg.status == UploadStatus::Uploaded)
     101            5 :             .cloned()
     102            5 :     }
     103              : 
     104              :     /// Replace the name of the Uploaded segment (if one exists) in order to match
     105              :     /// it with `destination` safekeeper. Returns a description of the change or None
     106              :     /// wrapped in anyhow::Result.
     107            0 :     pub(crate) fn replace_uploaded_segment(
     108            0 :         &mut self,
     109            0 :         source: NodeId,
     110            0 :         destination: NodeId,
     111            0 :     ) -> anyhow::Result<Option<ReplaceUploadedSegment>> {
     112            0 :         let current = self
     113            0 :             .segments
     114            0 :             .iter_mut()
     115            0 :             .find(|seg| seg.status == UploadStatus::Uploaded);
     116              : 
     117            0 :         let current = match current {
     118            0 :             Some(some) => some,
     119              :             None => {
     120            0 :                 return anyhow::Ok(None);
     121              :             }
     122              :         };
     123              : 
     124              :         // Sanity check that the partial segment we are replacing is belongs
     125              :         // to the `source` SK.
     126            0 :         if !current
     127            0 :             .name
     128            0 :             .ends_with(format!("sk{}.partial", source.0).as_str())
     129              :         {
     130            0 :             anyhow::bail!(
     131            0 :                 "Partial segment name ({}) doesn't match self node id ({})",
     132            0 :                 current.name,
     133            0 :                 source
     134            0 :             );
     135            0 :         }
     136            0 : 
     137            0 :         let previous = current.clone();
     138            0 : 
     139            0 :         let new_name = current.name.replace(
     140            0 :             format!("_sk{}", source.0).as_str(),
     141            0 :             format!("_sk{}", destination.0).as_str(),
     142            0 :         );
     143            0 : 
     144            0 :         current.name = new_name;
     145            0 : 
     146            0 :         anyhow::Ok(Some(ReplaceUploadedSegment {
     147            0 :             previous,
     148            0 :             current: current.clone(),
     149            0 :         }))
     150            0 :     }
     151              : }
     152              : 
     153              : pub struct PartialBackup {
     154              :     wal_seg_size: usize,
     155              :     tli: WalResidentTimeline,
     156              :     conf: SafeKeeperConf,
     157              :     local_prefix: Utf8PathBuf,
     158              :     remote_timeline_path: RemotePath,
     159              :     storage: Arc<GenericRemoteStorage>,
     160              :     state: State,
     161              : }
     162              : 
     163              : impl PartialBackup {
     164            0 :     pub async fn new(
     165            0 :         tli: WalResidentTimeline,
     166            0 :         conf: SafeKeeperConf,
     167            0 :         storage: Arc<GenericRemoteStorage>,
     168            0 :     ) -> PartialBackup {
     169            0 :         let (_, persistent_state) = tli.get_state().await;
     170            0 :         let wal_seg_size = tli.get_wal_seg_size().await;
     171              : 
     172            0 :         let local_prefix = tli.get_timeline_dir();
     173            0 :         let remote_timeline_path = tli.remote_path.clone();
     174            0 : 
     175            0 :         PartialBackup {
     176            0 :             wal_seg_size,
     177            0 :             tli,
     178            0 :             state: persistent_state.partial_backup,
     179            0 :             conf,
     180            0 :             local_prefix,
     181            0 :             remote_timeline_path,
     182            0 :             storage,
     183            0 :         }
     184            0 :     }
     185              : 
     186              :     // Read-only methods for getting segment names
     187            0 :     fn segno(&self, lsn: Lsn) -> XLogSegNo {
     188            0 :         lsn.segment_number(self.wal_seg_size)
     189            0 :     }
     190              : 
     191            0 :     fn segment_name(&self, segno: u64) -> String {
     192            0 :         XLogFileName(PG_TLI, segno, self.wal_seg_size)
     193            0 :     }
     194              : 
     195            0 :     fn remote_segment_name(
     196            0 :         &self,
     197            0 :         segno: u64,
     198            0 :         term: u64,
     199            0 :         commit_lsn: Lsn,
     200            0 :         flush_lsn: Lsn,
     201            0 :     ) -> String {
     202            0 :         format!(
     203            0 :             "{}_{}_{:016X}_{:016X}_sk{}.partial",
     204            0 :             self.segment_name(segno),
     205            0 :             term,
     206            0 :             flush_lsn.0,
     207            0 :             commit_lsn.0,
     208            0 :             self.conf.my_id.0,
     209            0 :         )
     210            0 :     }
     211              : 
     212            0 :     fn local_segment_name(&self, segno: u64) -> String {
     213            0 :         format!("{}.partial", self.segment_name(segno))
     214            0 :     }
     215              : }
     216              : 
     217              : impl PartialBackup {
     218              :     /// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
     219            0 :     async fn prepare_upload(&self) -> PartialRemoteSegment {
     220              :         // this operation takes a lock to get the actual state
     221            0 :         let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
     222            0 :         let flush_lsn = Lsn(sk_info.flush_lsn);
     223            0 :         let commit_lsn = Lsn(sk_info.commit_lsn);
     224            0 :         let last_log_term = sk_info.last_log_term;
     225            0 :         let segno = self.segno(flush_lsn);
     226            0 : 
     227            0 :         let name = self.remote_segment_name(segno, last_log_term, commit_lsn, flush_lsn);
     228            0 : 
     229            0 :         PartialRemoteSegment {
     230            0 :             status: UploadStatus::InProgress,
     231            0 :             name,
     232            0 :             commit_lsn,
     233            0 :             flush_lsn,
     234            0 :             term: last_log_term,
     235            0 :         }
     236            0 :     }
     237              : 
     238              :     /// Reads segment from disk and uploads it to the remote storage.
     239            0 :     async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
     240            0 :         let flush_lsn = prepared.flush_lsn;
     241            0 :         let segno = self.segno(flush_lsn);
     242            0 : 
     243            0 :         // We're going to backup bytes from the start of the segment up to flush_lsn.
     244            0 :         let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
     245            0 : 
     246            0 :         let local_path = self.local_prefix.join(self.local_segment_name(segno));
     247            0 :         let remote_path = prepared.remote_path(&self.remote_timeline_path);
     248            0 : 
     249            0 :         // Upload first `backup_bytes` bytes of the segment to the remote storage.
     250            0 :         wal_backup::backup_partial_segment(&self.storage, &local_path, &remote_path, backup_bytes)
     251            0 :             .await?;
     252            0 :         PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
     253              : 
     254              :         // We uploaded the segment, now let's verify that the data is still actual.
     255              :         // If the term changed, we cannot guarantee the validity of the uploaded data.
     256              :         // If the term is the same, we know the data is not corrupted.
     257            0 :         let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
     258            0 :         if sk_info.last_log_term != prepared.term {
     259            0 :             anyhow::bail!("term changed during upload");
     260            0 :         }
     261            0 :         assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
     262            0 :         assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
     263              : 
     264            0 :         Ok(())
     265            0 :     }
     266              : 
     267              :     /// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
     268            0 :     async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
     269            0 :         self.tli
     270            0 :             .map_control_file(|cf| {
     271            0 :                 if cf.partial_backup != self.state {
     272            0 :                     let memory = self.state.clone();
     273            0 :                     self.state = cf.partial_backup.clone();
     274            0 :                     anyhow::bail!(
     275            0 :                         "partial backup state diverged, memory={:?}, disk={:?}",
     276            0 :                         memory,
     277            0 :                         cf.partial_backup
     278            0 :                     );
     279            0 :                 }
     280            0 : 
     281            0 :                 cf.partial_backup = new_state.clone();
     282            0 :                 Ok(())
     283            0 :             })
     284            0 :             .await?;
     285              :         // update in-memory state
     286            0 :         self.state = new_state;
     287            0 :         Ok(())
     288            0 :     }
     289              : 
     290              :     /// Upload the latest version of the partial segment and garbage collect older versions.
     291              :     #[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
     292              :     async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
     293              :         let _timer = MISC_OPERATION_SECONDS
     294              :             .with_label_values(&["partial_do_upload"])
     295              :             .start_timer();
     296              :         info!("starting upload {:?}", prepared);
     297              : 
     298              :         let state_0 = self.state.clone();
     299              :         let state_1 = {
     300              :             let mut state = state_0.clone();
     301              :             state.segments.push(prepared.clone());
     302              :             state
     303              :         };
     304              : 
     305              :         // we're going to upload a new segment, let's write it to disk to make GC later
     306              :         self.commit_state(state_1).await?;
     307              : 
     308              :         self.upload_segment(prepared.clone()).await?;
     309              : 
     310              :         let state_2 = {
     311              :             let mut state = state_0.clone();
     312              :             for seg in state.segments.iter_mut() {
     313              :                 seg.status = UploadStatus::Deleting;
     314              :             }
     315              :             let mut actual_remote_segment = prepared.clone();
     316              :             actual_remote_segment.status = UploadStatus::Uploaded;
     317              :             state.segments.push(actual_remote_segment);
     318              :             state
     319              :         };
     320              : 
     321              :         // we've uploaded new segment, it's actual, all other segments should be GCed
     322              :         self.commit_state(state_2).await?;
     323              :         self.gc().await?;
     324              : 
     325              :         Ok(())
     326              :     }
     327              : 
     328              :     // Prepend to the given segments remote prefix and delete them from the
     329              :     // remote storage.
     330            0 :     async fn delete_segments(&self, segments_to_delete: &Vec<String>) -> anyhow::Result<()> {
     331            0 :         info!("deleting objects: {:?}", segments_to_delete);
     332            0 :         let mut objects_to_delete = vec![];
     333            0 :         for seg in segments_to_delete.iter() {
     334            0 :             let remote_path = self.remote_timeline_path.join(seg);
     335            0 :             objects_to_delete.push(remote_path);
     336            0 :         }
     337            0 :         wal_backup::delete_objects(&self.storage, &objects_to_delete).await
     338            0 :     }
     339              : 
     340              :     /// Delete all non-Uploaded segments from the remote storage. There should be only one
     341              :     /// Uploaded segment at a time.
     342              :     #[instrument(name = "gc", skip_all)]
     343              :     async fn gc(&mut self) -> anyhow::Result<()> {
     344              :         let mut segments_to_delete = vec![];
     345              : 
     346              :         let new_segments: Vec<PartialRemoteSegment> = self
     347              :             .state
     348              :             .segments
     349              :             .iter()
     350            0 :             .filter_map(|seg| {
     351            0 :                 if seg.status == UploadStatus::Uploaded {
     352            0 :                     Some(seg.clone())
     353              :                 } else {
     354            0 :                     segments_to_delete.push(seg.name.clone());
     355            0 :                     None
     356              :                 }
     357            0 :             })
     358              :             .collect();
     359              : 
     360              :         if new_segments.len() == 1 {
     361              :             // we have an uploaded segment, it must not be deleted from remote storage
     362            0 :             segments_to_delete.retain(|name| name != &new_segments[0].name);
     363              :         } else {
     364              :             // there should always be zero or one uploaded segment
     365              :             assert!(
     366              :                 new_segments.is_empty(),
     367              :                 "too many uploaded segments: {:?}",
     368              :                 new_segments
     369              :             );
     370              :         }
     371              : 
     372              :         // execute the deletion
     373              :         self.delete_segments(&segments_to_delete).await?;
     374              : 
     375              :         // now we can update the state on disk
     376              :         let new_state = {
     377              :             let mut state = self.state.clone();
     378              :             state.segments = new_segments;
     379              :             state
     380              :         };
     381              :         self.commit_state(new_state).await?;
     382              : 
     383              :         Ok(())
     384              :     }
     385              : 
     386              :     /// Remove uploaded segment(s) from the state and remote storage. Aimed for
     387              :     /// manual intervention, not normally needed.
     388              :     /// Returns list of segments which potentially existed in the remote storage.
     389            0 :     pub async fn reset(&mut self) -> anyhow::Result<Vec<String>> {
     390            0 :         let segments_to_delete = self
     391            0 :             .state
     392            0 :             .segments
     393            0 :             .iter()
     394            0 :             .map(|seg| seg.name.clone())
     395            0 :             .collect();
     396            0 : 
     397            0 :         // First reset cfile state, and only then objects themselves. If the
     398            0 :         // later fails we might leave some garbage behind; that's ok for this
     399            0 :         // single time usage.
     400            0 :         let new_state = State { segments: vec![] };
     401            0 :         self.commit_state(new_state).await?;
     402              : 
     403            0 :         self.delete_segments(&segments_to_delete).await?;
     404            0 :         Ok(segments_to_delete)
     405            0 :     }
     406              : }
     407              : 
     408              : /// Check if everything is uploaded and partial backup task doesn't need to run.
     409            0 : pub(crate) fn needs_uploading(
     410            0 :     state: &StateSnapshot,
     411            0 :     uploaded: &Option<PartialRemoteSegment>,
     412            0 : ) -> bool {
     413            0 :     match uploaded {
     414            0 :         Some(uploaded) => {
     415            0 :             uploaded.status != UploadStatus::Uploaded
     416            0 :                 || uploaded.flush_lsn != state.flush_lsn
     417            0 :                 || uploaded.commit_lsn != state.commit_lsn
     418            0 :                 || uploaded.term != state.last_log_term
     419              :         }
     420            0 :         None => true,
     421              :     }
     422            0 : }
     423              : 
     424              : /// Main task for partial backup. It waits for the flush_lsn to change and then uploads the
     425              : /// partial segment to the remote storage. It also does garbage collection of old segments.
     426              : ///
     427              : /// When there is nothing more to do and the last segment was successfully uploaded, the task
     428              : /// returns PartialRemoteSegment, to signal readiness for offloading the timeline.
     429              : #[instrument(name = "partial_backup", skip_all, fields(ttid = %tli.ttid))]
     430              : pub async fn main_task(
     431              :     tli: WalResidentTimeline,
     432              :     conf: SafeKeeperConf,
     433              :     limiter: RateLimiter,
     434              :     cancel: CancellationToken,
     435              :     storage: Arc<GenericRemoteStorage>,
     436              : ) -> Option<PartialRemoteSegment> {
     437              :     debug!("started");
     438              :     let await_duration = conf.partial_backup_timeout;
     439              :     let mut first_iteration = true;
     440              : 
     441              :     let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
     442              :     let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
     443              : 
     444              :     let mut backup = PartialBackup::new(tli, conf, storage).await;
     445              : 
     446              :     debug!("state: {:?}", backup.state);
     447              : 
     448              :     // The general idea is that each safekeeper keeps only one partial segment
     449              :     // both in remote storage and in local state. If this is not true, something
     450              :     // went wrong.
     451              :     const MAX_SIMULTANEOUS_SEGMENTS: usize = 10;
     452              : 
     453              :     'outer: loop {
     454              :         if backup.state.segments.len() > MAX_SIMULTANEOUS_SEGMENTS {
     455              :             warn!(
     456              :                 "too many segments in control_file state, running gc: {}",
     457              :                 backup.state.segments.len()
     458              :             );
     459              : 
     460            0 :             backup.gc().await.unwrap_or_else(|e| {
     461            0 :                 error!("failed to run gc: {:#}", e);
     462            0 :             });
     463              :         }
     464              : 
     465              :         // wait until we have something to upload
     466              :         let uploaded_segment = backup.state.uploaded_segment();
     467              :         if let Some(seg) = &uploaded_segment {
     468              :             // check if uploaded segment matches the current state
     469              :             if flush_lsn_rx.borrow().lsn == seg.flush_lsn
     470              :                 && *commit_lsn_rx.borrow() == seg.commit_lsn
     471              :                 && flush_lsn_rx.borrow().term == seg.term
     472              :             {
     473              :                 // we have nothing to do, the last segment is already uploaded
     474              :                 debug!(
     475              :                     "exiting, uploaded up to term={} flush_lsn={} commit_lsn={}",
     476              :                     seg.term, seg.flush_lsn, seg.commit_lsn
     477              :                 );
     478              :                 return Some(seg.clone());
     479              :             }
     480              :         }
     481              : 
     482              :         // if we don't have any data and zero LSNs, wait for something
     483              :         while flush_lsn_rx.borrow().lsn == Lsn(0) {
     484              :             tokio::select! {
     485              :                 _ = backup.tli.cancel.cancelled() => {
     486              :                     info!("timeline canceled");
     487              :                     return None;
     488              :                 }
     489              :                 _ = cancel.cancelled() => {
     490              :                     info!("task canceled");
     491              :                     return None;
     492              :                 }
     493              :                 _ = flush_lsn_rx.changed() => {}
     494              :             }
     495              :         }
     496              : 
     497              :         // smoothing the load after restart, by sleeping for a random time.
     498              :         // if this is not the first iteration, we will wait for the full await_duration
     499              :         let await_duration = if first_iteration {
     500              :             first_iteration = false;
     501              :             rand_duration(&await_duration)
     502              :         } else {
     503              :             await_duration
     504              :         };
     505              : 
     506              :         // fixing the segno and waiting some time to prevent reuploading the same segment too often
     507              :         let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
     508              :         let timeout = tokio::time::sleep(await_duration);
     509              :         tokio::pin!(timeout);
     510              :         let mut timeout_expired = false;
     511              : 
     512              :         // waiting until timeout expires OR segno changes
     513              :         'inner: loop {
     514              :             tokio::select! {
     515              :                 _ = backup.tli.cancel.cancelled() => {
     516              :                     info!("timeline canceled");
     517              :                     return None;
     518              :                 }
     519              :                 _ = cancel.cancelled() => {
     520              :                     info!("task canceled");
     521              :                     return None;
     522              :                 }
     523              :                 _ = commit_lsn_rx.changed() => {}
     524              :                 _ = flush_lsn_rx.changed() => {
     525              :                     let segno = backup.segno(flush_lsn_rx.borrow().lsn);
     526              :                     if segno != pending_segno {
     527              :                         // previous segment is no longer partial, aborting the wait
     528              :                         break 'inner;
     529              :                     }
     530              :                 }
     531              :                 _ = &mut timeout => {
     532              :                     // timeout expired, now we are ready for upload
     533              :                     timeout_expired = true;
     534              :                     break 'inner;
     535              :                 }
     536              :             }
     537              :         }
     538              : 
     539              :         if !timeout_expired {
     540              :             // likely segno has changed, let's try again in the next iteration
     541              :             continue 'outer;
     542              :         }
     543              : 
     544              :         // limit concurrent uploads
     545              :         let _upload_permit = tokio::select! {
     546              :             acq = limiter.acquire_partial_backup() => acq,
     547              :             _ = backup.tli.cancel.cancelled() => {
     548              :                 info!("timeline canceled");
     549              :                 return None;
     550              :             }
     551              :             _ = cancel.cancelled() => {
     552              :                 info!("task canceled");
     553              :                 return None;
     554              :             }
     555              :         };
     556              : 
     557              :         let prepared = backup.prepare_upload().await;
     558              :         if let Some(seg) = &uploaded_segment {
     559              :             if seg.eq_without_status(&prepared) {
     560              :                 // we already uploaded this segment, nothing to do
     561              :                 continue 'outer;
     562              :             }
     563              :         }
     564              : 
     565              :         match backup.do_upload(&prepared).await {
     566              :             Ok(()) => {
     567              :                 debug!(
     568              :                     "uploaded {} up to flush_lsn {}",
     569              :                     prepared.name, prepared.flush_lsn
     570              :                 );
     571              :                 PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
     572              :             }
     573              :             Err(e) => {
     574              :                 info!("failed to upload {}: {:#}", prepared.name, e);
     575              :                 PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
     576              :             }
     577              :         }
     578              :     }
     579              : }
        

Generated by: LCOV version 2.1-beta