LCOV - code coverage report
Current view: top level - safekeeper/src - wal_backup_partial.rs (source / functions) Coverage Total Hit
Test: 12c2fc96834f59604b8ade5b9add28f1dce41ec6.info Lines: 0.7 % 140 1
Test Date: 2024-07-03 15:33:13 Functions: 1.9 % 54 1

            Line data    Source code
       1              : //! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
       2              : //! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
       3              : //! was changed), the segment will be uploaded to S3 in about 15 minutes.
       4              : //!
       5              : //! The filename format for partial segments is
       6              : //! `Segment_Term_Flush_Commit_skNN.partial`, where:
       7              : //! - `Segment` – the segment name, like `000000010000000000000001`
       8              : //! - `Term` – current term
       9              : //! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
      10              : //! - `Commit` – commit_lsn in the same hex format
      11              : //! - `NN` – safekeeper_id, like `1`
      12              : //!
      13              : //! The full object name example:
      14              : //! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
      15              : //!
      16              : //! Each safekeeper will keep info about remote partial segments in its control
      17              : //! file. Code updates state in the control file before doing any S3 operations.
      18              : //! This way control file stores information about all potentially existing
      19              : //! remote partial segments and can clean them up after uploading a newer version.
      20              : 
      21              : use std::sync::Arc;
      22              : 
      23              : use camino::Utf8PathBuf;
      24              : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
      25              : use remote_storage::RemotePath;
      26              : use serde::{Deserialize, Serialize};
      27              : 
      28              : use tracing::{debug, error, info, instrument, warn};
      29              : use utils::lsn::Lsn;
      30              : 
      31              : use crate::{
      32              :     metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
      33              :     safekeeper::Term,
      34              :     timeline::WalResidentTimeline,
      35              :     timeline_manager::StateSnapshot,
      36              :     wal_backup::{self, remote_timeline_path},
      37              :     SafeKeeperConf,
      38              : };
      39              : 
      40              : #[derive(Clone)]
      41              : pub struct RateLimiter {
      42              :     semaphore: Arc<tokio::sync::Semaphore>,
      43              : }
      44              : 
      45              : impl RateLimiter {
      46            0 :     pub fn new(permits: usize) -> Self {
      47            0 :         Self {
      48            0 :             semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
      49            0 :         }
      50            0 :     }
      51              : 
      52            0 :     async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit {
      53            0 :         let _timer = MISC_OPERATION_SECONDS
      54            0 :             .with_label_values(&["partial_permit_acquire"])
      55            0 :             .start_timer();
      56            0 :         self.semaphore
      57            0 :             .clone()
      58            0 :             .acquire_owned()
      59            0 :             .await
      60            0 :             .expect("semaphore is closed")
      61            0 :     }
      62              : }
      63              : 
      64            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      65              : pub enum UploadStatus {
      66              :     /// Upload is in progress. This status should be used only for garbage collection,
      67              :     /// don't read data from the remote storage with this status.
      68              :     InProgress,
      69              :     /// Upload is finished. There is always at most one segment with this status.
      70              :     /// It means that the segment is actual and can be used.
      71              :     Uploaded,
      72              :     /// Deletion is in progress. This status should be used only for garbage collection,
      73              :     /// don't read data from the remote storage with this status.
      74              :     Deleting,
      75              : }
      76              : 
      77            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      78              : pub struct PartialRemoteSegment {
      79              :     pub status: UploadStatus,
      80              :     pub name: String,
      81              :     pub commit_lsn: Lsn,
      82              :     pub flush_lsn: Lsn,
      83              :     // We should use last_log_term here, otherwise it's possible to have inconsistent data in the
      84              :     // remote storage.
      85              :     //
      86              :     // More info here: https://github.com/neondatabase/neon/pull/8022#discussion_r1654738405
      87              :     pub term: Term,
      88              : }
      89              : 
      90              : impl PartialRemoteSegment {
      91            0 :     fn eq_without_status(&self, other: &Self) -> bool {
      92            0 :         self.name == other.name
      93            0 :             && self.commit_lsn == other.commit_lsn
      94            0 :             && self.flush_lsn == other.flush_lsn
      95            0 :             && self.term == other.term
      96            0 :     }
      97              : 
      98            0 :     pub(crate) fn remote_path(&self, remote_timeline_path: &RemotePath) -> RemotePath {
      99            0 :         remote_timeline_path.join(&self.name)
     100            0 :     }
     101              : }
     102              : 
     103              : // NB: these structures are a part of a control_file, you can't change them without
     104              : // changing the control file format version.
     105            6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
     106              : pub struct State {
     107              :     pub segments: Vec<PartialRemoteSegment>,
     108              : }
     109              : 
     110              : impl State {
     111              :     /// Find an Uploaded segment. There should be only one Uploaded segment at a time.
     112            0 :     pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
     113            0 :         self.segments
     114            0 :             .iter()
     115            0 :             .find(|seg| seg.status == UploadStatus::Uploaded)
     116            0 :             .cloned()
     117            0 :     }
     118              : }
     119              : 
     120              : struct PartialBackup {
     121              :     wal_seg_size: usize,
     122              :     tli: WalResidentTimeline,
     123              :     conf: SafeKeeperConf,
     124              :     local_prefix: Utf8PathBuf,
     125              :     remote_timeline_path: RemotePath,
     126              : 
     127              :     state: State,
     128              : }
     129              : 
     130              : // Read-only methods for getting segment names
     131              : impl PartialBackup {
     132            0 :     fn segno(&self, lsn: Lsn) -> XLogSegNo {
     133            0 :         lsn.segment_number(self.wal_seg_size)
     134            0 :     }
     135              : 
     136            0 :     fn segment_name(&self, segno: u64) -> String {
     137            0 :         XLogFileName(PG_TLI, segno, self.wal_seg_size)
     138            0 :     }
     139              : 
     140            0 :     fn remote_segment_name(
     141            0 :         &self,
     142            0 :         segno: u64,
     143            0 :         term: u64,
     144            0 :         commit_lsn: Lsn,
     145            0 :         flush_lsn: Lsn,
     146            0 :     ) -> String {
     147            0 :         format!(
     148            0 :             "{}_{}_{:016X}_{:016X}_sk{}.partial",
     149            0 :             self.segment_name(segno),
     150            0 :             term,
     151            0 :             flush_lsn.0,
     152            0 :             commit_lsn.0,
     153            0 :             self.conf.my_id.0,
     154            0 :         )
     155            0 :     }
     156              : 
     157            0 :     fn local_segment_name(&self, segno: u64) -> String {
     158            0 :         format!("{}.partial", self.segment_name(segno))
     159            0 :     }
     160              : }
     161              : 
     162              : impl PartialBackup {
     163              :     /// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
     164            0 :     async fn prepare_upload(&self) -> PartialRemoteSegment {
     165              :         // this operation takes a lock to get the actual state
     166            0 :         let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
     167            0 :         let flush_lsn = Lsn(sk_info.flush_lsn);
     168            0 :         let commit_lsn = Lsn(sk_info.commit_lsn);
     169            0 :         let last_log_term = sk_info.last_log_term;
     170            0 :         let segno = self.segno(flush_lsn);
     171            0 : 
     172            0 :         let name = self.remote_segment_name(segno, last_log_term, commit_lsn, flush_lsn);
     173            0 : 
     174            0 :         PartialRemoteSegment {
     175            0 :             status: UploadStatus::InProgress,
     176            0 :             name,
     177            0 :             commit_lsn,
     178            0 :             flush_lsn,
     179            0 :             term: last_log_term,
     180            0 :         }
     181            0 :     }
     182              : 
     183              :     /// Reads segment from disk and uploads it to the remote storage.
     184            0 :     async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
     185            0 :         let flush_lsn = prepared.flush_lsn;
     186            0 :         let segno = self.segno(flush_lsn);
     187            0 : 
     188            0 :         // We're going to backup bytes from the start of the segment up to flush_lsn.
     189            0 :         let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
     190            0 : 
     191            0 :         let local_path = self.local_prefix.join(self.local_segment_name(segno));
     192            0 :         let remote_path = prepared.remote_path(&self.remote_timeline_path);
     193            0 : 
     194            0 :         // Upload first `backup_bytes` bytes of the segment to the remote storage.
     195            0 :         wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
     196            0 :         PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
     197              : 
     198              :         // We uploaded the segment, now let's verify that the data is still actual.
     199              :         // If the term changed, we cannot guarantee the validity of the uploaded data.
     200              :         // If the term is the same, we know the data is not corrupted.
     201            0 :         let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
     202            0 :         if sk_info.last_log_term != prepared.term {
     203            0 :             anyhow::bail!("term changed during upload");
     204            0 :         }
     205            0 :         assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
     206            0 :         assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
     207              : 
     208            0 :         Ok(())
     209            0 :     }
     210              : 
     211              :     /// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
     212            0 :     async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
     213            0 :         self.tli
     214            0 :             .map_control_file(|cf| {
     215            0 :                 if cf.partial_backup != self.state {
     216            0 :                     let memory = self.state.clone();
     217            0 :                     self.state = cf.partial_backup.clone();
     218            0 :                     anyhow::bail!(
     219            0 :                         "partial backup state diverged, memory={:?}, disk={:?}",
     220            0 :                         memory,
     221            0 :                         cf.partial_backup
     222            0 :                     );
     223            0 :                 }
     224            0 : 
     225            0 :                 cf.partial_backup = new_state.clone();
     226            0 :                 Ok(())
     227            0 :             })
     228            0 :             .await?;
     229              :         // update in-memory state
     230            0 :         self.state = new_state;
     231            0 :         Ok(())
     232            0 :     }
     233              : 
     234              :     /// Upload the latest version of the partial segment and garbage collect older versions.
     235            0 :     #[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
     236              :     async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
     237              :         let _timer = MISC_OPERATION_SECONDS
     238              :             .with_label_values(&["partial_do_upload"])
     239              :             .start_timer();
     240              :         info!("starting upload {:?}", prepared);
     241              : 
     242              :         let state_0 = self.state.clone();
     243              :         let state_1 = {
     244              :             let mut state = state_0.clone();
     245              :             state.segments.push(prepared.clone());
     246              :             state
     247              :         };
     248              : 
     249              :         // we're going to upload a new segment, let's write it to disk to make GC later
     250              :         self.commit_state(state_1).await?;
     251              : 
     252              :         self.upload_segment(prepared.clone()).await?;
     253              : 
     254              :         let state_2 = {
     255              :             let mut state = state_0.clone();
     256              :             for seg in state.segments.iter_mut() {
     257              :                 seg.status = UploadStatus::Deleting;
     258              :             }
     259              :             let mut actual_remote_segment = prepared.clone();
     260              :             actual_remote_segment.status = UploadStatus::Uploaded;
     261              :             state.segments.push(actual_remote_segment);
     262              :             state
     263              :         };
     264              : 
     265              :         // we've uploaded new segment, it's actual, all other segments should be GCed
     266              :         self.commit_state(state_2).await?;
     267              :         self.gc().await?;
     268              : 
     269              :         Ok(())
     270              :     }
     271              : 
     272              :     /// Delete all non-Uploaded segments from the remote storage. There should be only one
     273              :     /// Uploaded segment at a time.
     274            0 :     #[instrument(name = "gc", skip_all)]
     275              :     async fn gc(&mut self) -> anyhow::Result<()> {
     276              :         let mut segments_to_delete = vec![];
     277              : 
     278              :         let new_segments: Vec<PartialRemoteSegment> = self
     279              :             .state
     280              :             .segments
     281              :             .iter()
     282            0 :             .filter_map(|seg| {
     283            0 :                 if seg.status == UploadStatus::Uploaded {
     284            0 :                     Some(seg.clone())
     285              :                 } else {
     286            0 :                     segments_to_delete.push(seg.name.clone());
     287            0 :                     None
     288              :                 }
     289            0 :             })
     290              :             .collect();
     291              : 
     292              :         info!("deleting objects: {:?}", segments_to_delete);
     293              :         let mut objects_to_delete = vec![];
     294              :         for seg in segments_to_delete.iter() {
     295              :             let remote_path = self.remote_timeline_path.join(seg);
     296              :             objects_to_delete.push(remote_path);
     297              :         }
     298              : 
     299              :         // removing segments from remote storage
     300              :         wal_backup::delete_objects(&objects_to_delete).await?;
     301              : 
     302              :         // now we can update the state on disk
     303              :         let new_state = {
     304              :             let mut state = self.state.clone();
     305              :             state.segments = new_segments;
     306              :             state
     307              :         };
     308              :         self.commit_state(new_state).await?;
     309              : 
     310              :         Ok(())
     311              :     }
     312              : }
     313              : 
     314              : /// Check if everything is uploaded and partial backup task doesn't need to run.
     315            0 : pub(crate) fn needs_uploading(
     316            0 :     state: &StateSnapshot,
     317            0 :     uploaded: &Option<PartialRemoteSegment>,
     318            0 : ) -> bool {
     319            0 :     match uploaded {
     320            0 :         Some(uploaded) => {
     321            0 :             uploaded.status != UploadStatus::Uploaded
     322            0 :                 || uploaded.flush_lsn != state.flush_lsn
     323            0 :                 || uploaded.commit_lsn != state.commit_lsn
     324            0 :                 || uploaded.term != state.last_log_term
     325              :         }
     326            0 :         None => true,
     327              :     }
     328            0 : }
     329              : 
     330              : /// Main task for partial backup. It waits for the flush_lsn to change and then uploads the
     331              : /// partial segment to the remote storage. It also does garbage collection of old segments.
     332              : ///
     333              : /// When there is nothing more to do and the last segment was successfully uploaded, the task
     334              : /// returns PartialRemoteSegment, to signal readiness for offloading the timeline.
     335            0 : #[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
     336              : pub async fn main_task(
     337              :     tli: WalResidentTimeline,
     338              :     conf: SafeKeeperConf,
     339              :     limiter: RateLimiter,
     340              : ) -> Option<PartialRemoteSegment> {
     341              :     debug!("started");
     342              :     let await_duration = conf.partial_backup_timeout;
     343              : 
     344              :     let (_, persistent_state) = tli.get_state().await;
     345              :     let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
     346              :     let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
     347              :     let wal_seg_size = tli.get_wal_seg_size().await;
     348              : 
     349              :     let local_prefix = tli.get_timeline_dir();
     350              :     let remote_timeline_path = match remote_timeline_path(&tli.ttid) {
     351              :         Ok(path) => path,
     352              :         Err(e) => {
     353              :             error!("failed to create remote path: {:?}", e);
     354              :             return None;
     355              :         }
     356              :     };
     357              : 
     358              :     let mut backup = PartialBackup {
     359              :         wal_seg_size,
     360              :         tli,
     361              :         state: persistent_state.partial_backup,
     362              :         conf,
     363              :         local_prefix,
     364              :         remote_timeline_path,
     365              :     };
     366              : 
     367              :     debug!("state: {:?}", backup.state);
     368              : 
     369              :     // The general idea is that each safekeeper keeps only one partial segment
     370              :     // both in remote storage and in local state. If this is not true, something
     371              :     // went wrong.
     372              :     const MAX_SIMULTANEOUS_SEGMENTS: usize = 10;
     373              : 
     374              :     'outer: loop {
     375              :         if backup.state.segments.len() > MAX_SIMULTANEOUS_SEGMENTS {
     376              :             warn!(
     377              :                 "too many segments in control_file state, running gc: {}",
     378              :                 backup.state.segments.len()
     379              :             );
     380              : 
     381            0 :             backup.gc().await.unwrap_or_else(|e| {
     382            0 :                 error!("failed to run gc: {:#}", e);
     383            0 :             });
     384              :         }
     385              : 
     386              :         // wait until we have something to upload
     387              :         let uploaded_segment = backup.state.uploaded_segment();
     388              :         if let Some(seg) = &uploaded_segment {
     389              :             // check if uploaded segment matches the current state
     390              :             if flush_lsn_rx.borrow().lsn == seg.flush_lsn
     391              :                 && *commit_lsn_rx.borrow() == seg.commit_lsn
     392              :                 && flush_lsn_rx.borrow().term == seg.term
     393              :             {
     394              :                 // we have nothing to do, the last segment is already uploaded
     395              :                 return Some(seg.clone());
     396              :             }
     397              :         }
     398              : 
     399              :         // if we don't have any data and zero LSNs, wait for something
     400              :         while flush_lsn_rx.borrow().lsn == Lsn(0) {
     401              :             tokio::select! {
     402              :                 _ = backup.tli.cancel.cancelled() => {
     403              :                     info!("timeline canceled");
     404              :                     return None;
     405              :                 }
     406              :                 _ = flush_lsn_rx.changed() => {}
     407              :             }
     408              :         }
     409              : 
     410              :         // fixing the segno and waiting some time to prevent reuploading the same segment too often
     411              :         let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
     412              :         let timeout = tokio::time::sleep(await_duration);
     413              :         tokio::pin!(timeout);
     414              :         let mut timeout_expired = false;
     415              : 
     416              :         // waiting until timeout expires OR segno changes
     417              :         'inner: loop {
     418              :             tokio::select! {
     419              :                 _ = backup.tli.cancel.cancelled() => {
     420              :                     info!("timeline canceled");
     421              :                     return None;
     422              :                 }
     423              :                 _ = commit_lsn_rx.changed() => {}
     424              :                 _ = flush_lsn_rx.changed() => {
     425              :                     let segno = backup.segno(flush_lsn_rx.borrow().lsn);
     426              :                     if segno != pending_segno {
     427              :                         // previous segment is no longer partial, aborting the wait
     428              :                         break 'inner;
     429              :                     }
     430              :                 }
     431              :                 _ = &mut timeout => {
     432              :                     // timeout expired, now we are ready for upload
     433              :                     timeout_expired = true;
     434              :                     break 'inner;
     435              :                 }
     436              :             }
     437              :         }
     438              : 
     439              :         if !timeout_expired {
     440              :             // likely segno has changed, let's try again in the next iteration
     441              :             continue 'outer;
     442              :         }
     443              : 
     444              :         // limit concurrent uploads
     445              :         let _upload_permit = limiter.acquire_owned().await;
     446              : 
     447              :         let prepared = backup.prepare_upload().await;
     448              :         if let Some(seg) = &uploaded_segment {
     449              :             if seg.eq_without_status(&prepared) {
     450              :                 // we already uploaded this segment, nothing to do
     451              :                 continue 'outer;
     452              :             }
     453              :         }
     454              : 
     455              :         match backup.do_upload(&prepared).await {
     456              :             Ok(()) => {
     457              :                 debug!(
     458              :                     "uploaded {} up to flush_lsn {}",
     459              :                     prepared.name, prepared.flush_lsn
     460              :                 );
     461              :                 PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
     462              :             }
     463              :             Err(e) => {
     464              :                 info!("failed to upload {}: {:#}", prepared.name, e);
     465              :                 PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
     466              :             }
     467              :         }
     468              :     }
     469              : }
        

Generated by: LCOV version 2.1-beta