LCOV - code coverage report
Current view: top level - safekeeper/src - wal_backup_partial.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 1.0 % 105 1
Test Date: 2024-05-10 13:18:37 Functions: 1.8 % 57 1

            Line data    Source code
       1              : //! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
       2              : //! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
       3              : //! was changed), the segment will be uploaded to S3 in about 15 minutes.
       4              : //!
       5              : //! The filename format for partial segments is
       6              : //! `Segment_Term_Flush_Commit_skNN.partial`, where:
       7              : //! - `Segment` – the segment name, like `000000010000000000000001`
       8              : //! - `Term` – current term
       9              : //! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
      10              : //! - `Commit` – commit_lsn in the same hex format
      11              : //! - `NN` – safekeeper_id, like `1`
      12              : //!
      13              : //! The full object name example:
      14              : //! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
      15              : //!
      16              : //! Each safekeeper will keep info about remote partial segments in its control
      17              : //! file. Code updates state in the control file before doing any S3 operations.
      18              : //! This way control file stores information about all potentially existing
      19              : //! remote partial segments and can clean them up after uploading a newer version.
      20              : 
      21              : use std::sync::Arc;
      22              : 
      23              : use camino::Utf8PathBuf;
      24              : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
      25              : use rand::Rng;
      26              : use remote_storage::RemotePath;
      27              : use serde::{Deserialize, Serialize};
      28              : 
      29              : use tracing::{debug, error, info, instrument};
      30              : use utils::lsn::Lsn;
      31              : 
      32              : use crate::{
      33              :     metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
      34              :     safekeeper::Term,
      35              :     timeline::Timeline,
      36              :     wal_backup, SafeKeeperConf,
      37              : };
      38              : 
      39            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      40              : pub enum UploadStatus {
      41              :     /// Upload is in progress
      42              :     InProgress,
      43              :     /// Upload is finished
      44              :     Uploaded,
      45              :     /// Deletion is in progress
      46              :     Deleting,
      47              : }
      48              : 
      49            0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
      50              : pub struct PartialRemoteSegment {
      51              :     pub status: UploadStatus,
      52              :     pub name: String,
      53              :     pub commit_lsn: Lsn,
      54              :     pub flush_lsn: Lsn,
      55              :     pub term: Term,
      56              : }
      57              : 
      58              : impl PartialRemoteSegment {
      59            0 :     fn eq_without_status(&self, other: &Self) -> bool {
      60            0 :         self.name == other.name
      61            0 :             && self.commit_lsn == other.commit_lsn
      62            0 :             && self.flush_lsn == other.flush_lsn
      63            0 :             && self.term == other.term
      64            0 :     }
      65              : }
      66              : 
      67              : // NB: these structures are a part of a control_file, you can't change them without
      68              : // changing the control file format version.
      69            6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
      70              : pub struct State {
      71              :     pub segments: Vec<PartialRemoteSegment>,
      72              : }
      73              : 
      74              : impl State {
      75              :     /// Find an Uploaded segment. There should be only one Uploaded segment at a time.
      76            0 :     fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
      77            0 :         self.segments
      78            0 :             .iter()
      79            0 :             .find(|seg| seg.status == UploadStatus::Uploaded)
      80            0 :             .cloned()
      81            0 :     }
      82              : }
      83              : 
      84              : struct PartialBackup {
      85              :     wal_seg_size: usize,
      86              :     tli: Arc<Timeline>,
      87              :     conf: SafeKeeperConf,
      88              :     local_prefix: Utf8PathBuf,
      89              :     remote_prefix: Utf8PathBuf,
      90              : 
      91              :     state: State,
      92              : }
      93              : 
      94              : // Read-only methods for getting segment names
      95              : impl PartialBackup {
      96            0 :     fn segno(&self, lsn: Lsn) -> XLogSegNo {
      97            0 :         lsn.segment_number(self.wal_seg_size)
      98            0 :     }
      99              : 
     100            0 :     fn segment_name(&self, segno: u64) -> String {
     101            0 :         XLogFileName(PG_TLI, segno, self.wal_seg_size)
     102            0 :     }
     103              : 
     104            0 :     fn remote_segment_name(
     105            0 :         &self,
     106            0 :         segno: u64,
     107            0 :         term: u64,
     108            0 :         commit_lsn: Lsn,
     109            0 :         flush_lsn: Lsn,
     110            0 :     ) -> String {
     111            0 :         format!(
     112            0 :             "{}_{}_{:016X}_{:016X}_sk{}.partial",
     113            0 :             self.segment_name(segno),
     114            0 :             term,
     115            0 :             flush_lsn.0,
     116            0 :             commit_lsn.0,
     117            0 :             self.conf.my_id.0,
     118            0 :         )
     119            0 :     }
     120              : 
     121            0 :     fn local_segment_name(&self, segno: u64) -> String {
     122            0 :         format!("{}.partial", self.segment_name(segno))
     123            0 :     }
     124              : }
     125              : 
     126              : impl PartialBackup {
     127              :     /// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
     128            0 :     async fn prepare_upload(&self) -> PartialRemoteSegment {
     129              :         // this operation takes a lock to get the actual state
     130            0 :         let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
     131            0 :         let flush_lsn = Lsn(sk_info.flush_lsn);
     132            0 :         let commit_lsn = Lsn(sk_info.commit_lsn);
     133            0 :         let term = sk_info.term;
     134            0 :         let segno = self.segno(flush_lsn);
     135            0 : 
     136            0 :         let name = self.remote_segment_name(segno, term, commit_lsn, flush_lsn);
     137            0 : 
     138            0 :         PartialRemoteSegment {
     139            0 :             status: UploadStatus::InProgress,
     140            0 :             name,
     141            0 :             commit_lsn,
     142            0 :             flush_lsn,
     143            0 :             term,
     144            0 :         }
     145            0 :     }
     146              : 
     147              :     /// Reads segment from disk and uploads it to the remote storage.
     148            0 :     async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
     149            0 :         let flush_lsn = prepared.flush_lsn;
     150            0 :         let segno = self.segno(flush_lsn);
     151            0 : 
     152            0 :         // We're going to backup bytes from the start of the segment up to flush_lsn.
     153            0 :         let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
     154            0 : 
     155            0 :         let local_path = self.local_prefix.join(self.local_segment_name(segno));
     156            0 :         let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?;
     157              : 
     158              :         // Upload first `backup_bytes` bytes of the segment to the remote storage.
     159            0 :         wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
     160            0 :         PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
     161              : 
     162              :         // We uploaded the segment, now let's verify that the data is still actual.
     163              :         // If the term changed, we cannot guarantee the validity of the uploaded data.
     164              :         // If the term is the same, we know the data is not corrupted.
     165            0 :         let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
     166            0 :         if sk_info.term != prepared.term {
     167            0 :             anyhow::bail!("term changed during upload");
     168            0 :         }
     169            0 :         assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
     170            0 :         assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
     171              : 
     172            0 :         Ok(())
     173            0 :     }
     174              : 
     175              :     /// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
     176            0 :     async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
     177            0 :         self.tli
     178            0 :             .map_control_file(|cf| {
     179            0 :                 if cf.partial_backup != self.state {
     180            0 :                     let memory = self.state.clone();
     181            0 :                     self.state = cf.partial_backup.clone();
     182            0 :                     anyhow::bail!(
     183            0 :                         "partial backup state diverged, memory={:?}, disk={:?}",
     184            0 :                         memory,
     185            0 :                         cf.partial_backup
     186            0 :                     );
     187            0 :                 }
     188            0 : 
     189            0 :                 cf.partial_backup = new_state.clone();
     190            0 :                 Ok(())
     191            0 :             })
     192            0 :             .await?;
     193              :         // update in-memory state
     194            0 :         self.state = new_state;
     195            0 :         Ok(())
     196            0 :     }
     197              : 
     198              :     /// Upload the latest version of the partial segment and garbage collect older versions.
     199            0 :     #[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
     200              :     async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
     201              :         info!("starting upload {:?}", prepared);
     202              : 
     203              :         let state_0 = self.state.clone();
     204              :         let state_1 = {
     205              :             let mut state = state_0.clone();
     206              :             state.segments.push(prepared.clone());
     207              :             state
     208              :         };
     209              : 
     210              :         // we're going to upload a new segment, let's write it to disk to make GC later
     211              :         self.commit_state(state_1).await?;
     212              : 
     213              :         self.upload_segment(prepared.clone()).await?;
     214              : 
     215              :         let state_2 = {
     216              :             let mut state = state_0.clone();
     217              :             for seg in state.segments.iter_mut() {
     218              :                 seg.status = UploadStatus::Deleting;
     219              :             }
     220              :             let mut actual_remote_segment = prepared.clone();
     221              :             actual_remote_segment.status = UploadStatus::Uploaded;
     222              :             state.segments.push(actual_remote_segment);
     223              :             state
     224              :         };
     225              : 
     226              :         // we've uploaded new segment, it's actual, all other segments should be GCed
     227              :         self.commit_state(state_2).await?;
     228              :         self.gc().await?;
     229              : 
     230              :         Ok(())
     231              :     }
     232              : 
     233              :     /// Delete all non-Uploaded segments from the remote storage. There should be only one
     234              :     /// Uploaded segment at a time.
     235            0 :     #[instrument(name = "gc", skip_all)]
     236              :     async fn gc(&mut self) -> anyhow::Result<()> {
     237              :         let mut segments_to_delete = vec![];
     238              : 
     239              :         let new_segments: Vec<PartialRemoteSegment> = self
     240              :             .state
     241              :             .segments
     242              :             .iter()
     243            0 :             .filter_map(|seg| {
     244            0 :                 if seg.status == UploadStatus::Uploaded {
     245            0 :                     Some(seg.clone())
     246              :                 } else {
     247            0 :                     segments_to_delete.push(seg.name.clone());
     248            0 :                     None
     249              :                 }
     250            0 :             })
     251              :             .collect();
     252              : 
     253              :         info!("deleting objects: {:?}", segments_to_delete);
     254              :         let mut objects_to_delete = vec![];
     255              :         for seg in segments_to_delete.iter() {
     256              :             let remote_path = RemotePath::new(self.remote_prefix.join(seg).as_ref())?;
     257              :             objects_to_delete.push(remote_path);
     258              :         }
     259              : 
     260              :         // removing segments from remote storage
     261              :         wal_backup::delete_objects(&objects_to_delete).await?;
     262              : 
     263              :         // now we can update the state on disk
     264              :         let new_state = {
     265              :             let mut state = self.state.clone();
     266              :             state.segments = new_segments;
     267              :             state
     268              :         };
     269              :         self.commit_state(new_state).await?;
     270              : 
     271              :         Ok(())
     272              :     }
     273              : }
     274              : 
     275            0 : #[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
     276              : pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
     277              :     debug!("started");
     278              :     let await_duration = conf.partial_backup_timeout;
     279              : 
     280              :     let mut cancellation_rx = match tli.get_cancellation_rx() {
     281              :         Ok(rx) => rx,
     282              :         Err(_) => {
     283              :             info!("timeline canceled during task start");
     284              :             return;
     285              :         }
     286              :     };
     287              : 
     288              :     // sleep for random time to avoid thundering herd
     289              :     {
     290              :         let randf64 = rand::thread_rng().gen_range(0.0..1.0);
     291              :         let sleep_duration = await_duration.mul_f64(randf64);
     292              :         tokio::time::sleep(sleep_duration).await;
     293              :     }
     294              : 
     295              :     let (_, persistent_state) = tli.get_state().await;
     296              :     let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
     297              :     let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
     298              :     let wal_seg_size = tli.get_wal_seg_size().await;
     299              : 
     300              :     let local_prefix = tli.timeline_dir.clone();
     301              :     let remote_prefix = match tli.timeline_dir.strip_prefix(&conf.workdir) {
     302              :         Ok(path) => path.to_owned(),
     303              :         Err(e) => {
     304              :             error!("failed to strip workspace dir prefix: {:?}", e);
     305              :             return;
     306              :         }
     307              :     };
     308              : 
     309              :     let mut backup = PartialBackup {
     310              :         wal_seg_size,
     311              :         tli,
     312              :         state: persistent_state.partial_backup,
     313              :         conf,
     314              :         local_prefix,
     315              :         remote_prefix,
     316              :     };
     317              : 
     318              :     debug!("state: {:?}", backup.state);
     319              : 
     320              :     'outer: loop {
     321              :         // wait until we have something to upload
     322              :         let uploaded_segment = backup.state.uploaded_segment();
     323              :         if let Some(seg) = &uploaded_segment {
     324              :             // if we already uploaded something, wait until we have something new
     325              :             while flush_lsn_rx.borrow().lsn == seg.flush_lsn
     326              :                 && *commit_lsn_rx.borrow() == seg.commit_lsn
     327              :                 && flush_lsn_rx.borrow().term == seg.term
     328              :             {
     329              :                 tokio::select! {
     330              :                     _ = cancellation_rx.changed() => {
     331              :                         info!("timeline canceled");
     332              :                         return;
     333              :                     }
     334              :                     _ = commit_lsn_rx.changed() => {}
     335              :                     _ = flush_lsn_rx.changed() => {}
     336              :                 }
     337              :             }
     338              :         }
     339              : 
     340              :         // if we don't have any data and zero LSNs, wait for something
     341              :         while flush_lsn_rx.borrow().lsn == Lsn(0) {
     342              :             tokio::select! {
     343              :                 _ = cancellation_rx.changed() => {
     344              :                     info!("timeline canceled");
     345              :                     return;
     346              :                 }
     347              :                 _ = flush_lsn_rx.changed() => {}
     348              :             }
     349              :         }
     350              : 
     351              :         // fixing the segno and waiting some time to prevent reuploading the same segment too often
     352              :         let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
     353              :         let timeout = tokio::time::sleep(await_duration);
     354              :         tokio::pin!(timeout);
     355              :         let mut timeout_expired = false;
     356              : 
     357              :         // waiting until timeout expires OR segno changes
     358              :         'inner: loop {
     359              :             tokio::select! {
     360              :                 _ = cancellation_rx.changed() => {
     361              :                     info!("timeline canceled");
     362              :                     return;
     363              :                 }
     364              :                 _ = commit_lsn_rx.changed() => {}
     365              :                 _ = flush_lsn_rx.changed() => {
     366              :                     let segno = backup.segno(flush_lsn_rx.borrow().lsn);
     367              :                     if segno != pending_segno {
     368              :                         // previous segment is no longer partial, aborting the wait
     369              :                         break 'inner;
     370              :                     }
     371              :                 }
     372              :                 _ = &mut timeout => {
     373              :                     // timeout expired, now we are ready for upload
     374              :                     timeout_expired = true;
     375              :                     break 'inner;
     376              :                 }
     377              :             }
     378              :         }
     379              : 
     380              :         if !timeout_expired {
     381              :             // likely segno has changed, let's try again in the next iteration
     382              :             continue 'outer;
     383              :         }
     384              : 
     385              :         let prepared = backup.prepare_upload().await;
     386              :         if let Some(seg) = &uploaded_segment {
     387              :             if seg.eq_without_status(&prepared) {
     388              :                 // we already uploaded this segment, nothing to do
     389              :                 continue 'outer;
     390              :             }
     391              :         }
     392              : 
     393              :         match backup.do_upload(&prepared).await {
     394              :             Ok(()) => {
     395              :                 debug!(
     396              :                     "uploaded {} up to flush_lsn {}",
     397              :                     prepared.name, prepared.flush_lsn
     398              :                 );
     399              :                 PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
     400              :             }
     401              :             Err(e) => {
     402              :                 info!("failed to upload {}: {:#}", prepared.name, e);
     403              :                 PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
     404              :             }
     405              :         }
     406              :     }
     407              : }
        

Generated by: LCOV version 2.1-beta