LCOV - code coverage report
Current view: top level - safekeeper/src - wal_backup.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 94.1 % 337 317
Test Date: 2023-09-06 10:18:01 Functions: 78.9 % 57 45

            Line data    Source code
       1              : use anyhow::{Context, Result};
       2              : 
       3              : use futures::stream::FuturesOrdered;
       4              : use futures::StreamExt;
       5              : use tokio::task::JoinHandle;
       6              : use utils::id::NodeId;
       7              : 
       8              : use std::cmp::min;
       9              : use std::collections::HashMap;
      10              : use std::path::{Path, PathBuf};
      11              : use std::pin::Pin;
      12              : use std::sync::Arc;
      13              : use std::time::Duration;
      14              : 
      15              : use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
      16              : use postgres_ffi::XLogFileName;
      17              : use postgres_ffi::{XLogSegNo, PG_TLI};
      18              : use remote_storage::{GenericRemoteStorage, RemotePath};
      19              : use tokio::fs::File;
      20              : 
      21              : use tokio::select;
      22              : use tokio::sync::mpsc::{self, Receiver, Sender};
      23              : use tokio::sync::watch;
      24              : use tokio::time::sleep;
      25              : use tracing::*;
      26              : 
      27              : use utils::{id::TenantTimelineId, lsn::Lsn};
      28              : 
      29              : use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
      30              : use crate::timeline::{PeerInfo, Timeline};
      31              : use crate::{GlobalTimelines, SafeKeeperConf};
      32              : 
      33              : use once_cell::sync::OnceCell;
      34              : 
      35              : const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
      36              : const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
      37              : 
      38              : /// Check whether wal backup is required for timeline. If yes, mark that launcher is
      39              : /// aware of current status and return the timeline.
      40          188 : async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option<Arc<Timeline>> {
      41          188 :     match GlobalTimelines::get(ttid).ok() {
      42          188 :         Some(tli) => {
      43          188 :             tli.wal_backup_attend().await;
      44          188 :             Some(tli)
      45              :         }
      46            0 :         None => None,
      47              :     }
      48          188 : }
      49              : 
      50              : struct WalBackupTaskHandle {
      51              :     shutdown_tx: Sender<()>,
      52              :     handle: JoinHandle<()>,
      53              : }
      54              : 
      55              : struct WalBackupTimelineEntry {
      56              :     timeline: Arc<Timeline>,
      57              :     handle: Option<WalBackupTaskHandle>,
      58              : }
      59              : 
      60           10 : async fn shut_down_task(ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry) {
      61           10 :     if let Some(wb_handle) = entry.handle.take() {
      62              :         // Tell the task to shutdown. Error means task exited earlier, that's ok.
      63           10 :         let _ = wb_handle.shutdown_tx.send(()).await;
      64              :         // Await the task itself. TODO: restart panicked tasks earlier.
      65           10 :         if let Err(e) = wb_handle.handle.await {
      66            0 :             warn!("WAL backup task for {} panicked: {}", ttid, e);
      67           10 :         }
      68            0 :     }
      69           10 : }
      70              : 
      71              : /// The goal is to ensure that normally only one safekeepers offloads. However,
      72              : /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
      73              : /// time we have several ones as they PUT the same files. Also,
      74              : /// - frequently changing the offloader would be bad;
      75              : /// - electing seriously lagging safekeeper is undesirable;
      76              : /// So we deterministically choose among the reasonably caught up candidates.
      77              : /// TODO: take into account failed attempts to deal with hypothetical situation
      78              : /// where s3 is unreachable only for some sks.
      79          244 : fn determine_offloader(
      80          244 :     alive_peers: &[PeerInfo],
      81          244 :     wal_backup_lsn: Lsn,
      82          244 :     ttid: TenantTimelineId,
      83          244 :     conf: &SafeKeeperConf,
      84          244 : ) -> (Option<NodeId>, String) {
      85          244 :     // TODO: remove this once we fill newly joined safekeepers since backup_lsn.
      86          244 :     let capable_peers = alive_peers
      87          244 :         .iter()
      88         1680 :         .filter(|p| p.local_start_lsn <= wal_backup_lsn);
      89          560 :     match capable_peers.clone().map(|p| p.commit_lsn).max() {
      90           35 :         None => (None, "no connected peers to elect from".to_string()),
      91          209 :         Some(max_commit_lsn) => {
      92          209 :             let threshold = max_commit_lsn
      93          209 :                 .checked_sub(conf.max_offloader_lag_bytes)
      94          209 :                 .unwrap_or(Lsn(0));
      95          209 :             let mut caughtup_peers = capable_peers
      96          209 :                 .clone()
      97          560 :                 .filter(|p| p.commit_lsn >= threshold)
      98          209 :                 .collect::<Vec<_>>();
      99          411 :             caughtup_peers.sort_by(|p1, p2| p1.sk_id.cmp(&p2.sk_id));
     100          209 : 
     101          209 :             // To distribute the load, shift by timeline_id.
     102          209 :             let offloader = caughtup_peers
     103          209 :                 [(u128::from(ttid.timeline_id) % caughtup_peers.len() as u128) as usize]
     104          209 :                 .sk_id;
     105          209 : 
     106          209 :             let mut capable_peers_dbg = capable_peers
     107          560 :                 .map(|p| (p.sk_id, p.commit_lsn))
     108          209 :                 .collect::<Vec<_>>();
     109          411 :             capable_peers_dbg.sort_by(|p1, p2| p1.0.cmp(&p2.0));
     110          209 :             (
     111          209 :                 Some(offloader),
     112          209 :                 format!(
     113          209 :                     "elected {} among {:?} peers, with {} of them being caughtup",
     114          209 :                     offloader,
     115          209 :                     capable_peers_dbg,
     116          209 :                     caughtup_peers.len()
     117          209 :                 ),
     118          209 :             )
     119              :         }
     120              :     }
     121          244 : }
     122              : 
     123              : /// Based on peer information determine which safekeeper should offload; if it
     124              : /// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
     125              : /// is running, kill it.
     126          244 : async fn update_task(
     127          244 :     conf: &SafeKeeperConf,
     128          244 :     ttid: TenantTimelineId,
     129          244 :     entry: &mut WalBackupTimelineEntry,
     130          244 : ) {
     131          244 :     let alive_peers = entry.timeline.get_peers(conf).await;
     132          244 :     let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await;
     133          244 :     let (offloader, election_dbg_str) =
     134          244 :         determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf);
     135          244 :     let elected_me = Some(conf.my_id) == offloader;
     136          244 : 
     137          244 :     if elected_me != (entry.handle.is_some()) {
     138           35 :         if elected_me {
     139           25 :             info!("elected for backup {}: {}", ttid, election_dbg_str);
     140              : 
     141           25 :             let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
     142           25 :             let timeline_dir = conf.timeline_dir(&ttid);
     143              : 
     144           25 :             let handle = tokio::spawn(
     145           25 :                 backup_task_main(
     146           25 :                     ttid,
     147           25 :                     timeline_dir,
     148           25 :                     conf.workdir.clone(),
     149           25 :                     conf.backup_parallel_jobs,
     150           25 :                     shutdown_rx,
     151           25 :                 )
     152           25 :                 .instrument(info_span!("WAL backup task", ttid = %ttid)),
     153              :             );
     154              : 
     155           25 :             entry.handle = Some(WalBackupTaskHandle {
     156           25 :                 shutdown_tx,
     157           25 :                 handle,
     158           25 :             });
     159              :         } else {
     160           10 :             info!("stepping down from backup {}: {}", ttid, election_dbg_str);
     161           10 :             shut_down_task(ttid, entry).await;
     162              :         }
     163          209 :     }
     164          244 : }
     165              : 
     166              : const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
     167              : 
     168              : /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
     169              : /// tasks. Having this in separate task simplifies locking, allows to reap
     170              : /// panics and separate elections from offloading itself.
     171          517 : pub async fn wal_backup_launcher_task_main(
     172          517 :     conf: SafeKeeperConf,
     173          517 :     mut wal_backup_launcher_rx: Receiver<TenantTimelineId>,
     174          517 : ) -> anyhow::Result<()> {
     175          517 :     info!(
     176          517 :         "WAL backup launcher started, remote config {:?}",
     177          517 :         conf.remote_storage
     178          517 :     );
     179              : 
     180          517 :     let conf_ = conf.clone();
     181          517 :     REMOTE_STORAGE.get_or_init(|| {
     182          517 :         conf_
     183          517 :             .remote_storage
     184          517 :             .as_ref()
     185          517 :             .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
     186          517 :     });
     187          517 : 
     188          517 :     // Presence in this map means launcher is aware s3 offloading is needed for
     189          517 :     // the timeline, but task is started only if it makes sense for to offload
     190          517 :     // from this safekeeper.
     191          517 :     let mut tasks: HashMap<TenantTimelineId, WalBackupTimelineEntry> = HashMap::new();
     192          517 : 
     193          517 :     let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC));
     194              :     loop {
     195        18519 :         tokio::select! {
     196        11438 :             ttid = wal_backup_launcher_rx.recv() => {
     197              :                 // channel is never expected to get closed
     198              :                 let ttid = ttid.unwrap();
     199              :                 if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
     200              :                     continue; /* just drain the channel and do nothing */
     201              :                 }
     202              :                 let timeline = is_wal_backup_required(ttid).await;
     203              :                 // do we need to do anything at all?
     204              :                 if timeline.is_some() != tasks.contains_key(&ttid) {
     205              :                     if let Some(timeline) = timeline {
     206              :                         // need to start the task
     207              :                         let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry {
     208              :                             timeline,
     209              :                             handle: None,
     210              :                         });
     211              :                         update_task(&conf, ttid, entry).await;
     212              :                     } else {
     213              :                         // need to stop the task
     214            0 :                         info!("stopping WAL backup task for {}", ttid);
     215              :                         let mut entry = tasks.remove(&ttid).unwrap();
     216              :                         shut_down_task(ttid, &mut entry).await;
     217              :                     }
     218              :                 }
     219              :             }
     220              :             // For each timeline needing offloading, check if this safekeeper
     221              :             // should do the job and start/stop the task accordingly.
     222              :             _ = ticker.tick() => {
     223              :                 for (ttid, entry) in tasks.iter_mut() {
     224              :                     update_task(&conf, *ttid, entry).await;
     225              :                 }
     226              :             }
     227              :         }
     228              :     }
     229              : }
     230              : 
     231              : struct WalBackupTask {
     232              :     timeline: Arc<Timeline>,
     233              :     timeline_dir: PathBuf,
     234              :     workspace_dir: PathBuf,
     235              :     wal_seg_size: usize,
     236              :     parallel_jobs: usize,
     237              :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     238              : }
     239              : 
     240              : /// Offload single timeline.
     241           25 : async fn backup_task_main(
     242           25 :     ttid: TenantTimelineId,
     243           25 :     timeline_dir: PathBuf,
     244           25 :     workspace_dir: PathBuf,
     245           25 :     parallel_jobs: usize,
     246           25 :     mut shutdown_rx: Receiver<()>,
     247           25 : ) {
     248           25 :     info!("started");
     249           25 :     let res = GlobalTimelines::get(ttid);
     250           25 :     if let Err(e) = res {
     251            0 :         error!("backup error for timeline {}: {}", ttid, e);
     252            0 :         return;
     253           25 :     }
     254           25 :     let tli = res.unwrap();
     255              : 
     256           25 :     let mut wb = WalBackupTask {
     257           25 :         wal_seg_size: tli.get_wal_seg_size().await,
     258           25 :         commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
     259           25 :         timeline: tli,
     260           25 :         timeline_dir,
     261           25 :         workspace_dir,
     262           25 :         parallel_jobs,
     263           25 :     };
     264           25 : 
     265           25 :     // task is spinned up only when wal_seg_size already initialized
     266           25 :     assert!(wb.wal_seg_size > 0);
     267              : 
     268           25 :     let mut canceled = false;
     269        32354 :     select! {
     270        32354 :         _ = wb.run() => {}
     271        32354 :         _ = shutdown_rx.recv() => {
     272        32354 :             canceled = true;
     273        32354 :         }
     274        32354 :     }
     275           10 :     info!("task {}", if canceled { "canceled" } else { "terminated" });
     276           10 : }
     277              : 
     278              : impl WalBackupTask {
     279           25 :     async fn run(&mut self) {
     280           25 :         let mut backup_lsn = Lsn(0);
     281           25 : 
     282           25 :         let mut retry_attempt = 0u32;
     283              :         // offload loop
     284         2566 :         loop {
     285         2566 :             if retry_attempt == 0 {
     286              :                 // wait for new WAL to arrive
     287         2568 :                 if let Err(e) = self.commit_lsn_watch_rx.changed().await {
     288              :                     // should never happen, as we hold Arc to timeline.
     289            0 :                     error!("commit_lsn watch shut down: {:?}", e);
     290            0 :                     return;
     291         2543 :                 }
     292              :             } else {
     293              :                 // or just sleep if we errored previously
     294            2 :                 let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
     295            2 :                 if let Some(backoff_delay) = UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
     296            2 :                 {
     297            2 :                     retry_delay = min(retry_delay, backoff_delay);
     298            2 :                 }
     299            2 :                 sleep(Duration::from_millis(retry_delay)).await;
     300              :             }
     301              : 
     302         2543 :             let commit_lsn = *self.commit_lsn_watch_rx.borrow();
     303         2543 : 
     304         2543 :             // Note that backup_lsn can be higher than commit_lsn if we
     305         2543 :             // don't have much local WAL and others already uploaded
     306         2543 :             // segments we don't even have.
     307         2543 :             if backup_lsn.segment_number(self.wal_seg_size)
     308         2543 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     309              :             {
     310         2500 :                 retry_attempt = 0;
     311         2500 :                 continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
     312           43 :             }
     313           43 :             // Perhaps peers advanced the position, check shmem value.
     314           43 :             backup_lsn = self.timeline.get_wal_backup_lsn().await;
     315           43 :             if backup_lsn.segment_number(self.wal_seg_size)
     316           43 :                 >= commit_lsn.segment_number(self.wal_seg_size)
     317              :             {
     318           17 :                 retry_attempt = 0;
     319           17 :                 continue;
     320           26 :             }
     321           26 : 
     322           26 :             match backup_lsn_range(
     323           26 :                 &self.timeline,
     324           26 :                 &mut backup_lsn,
     325           26 :                 commit_lsn,
     326           26 :                 self.wal_seg_size,
     327           26 :                 &self.timeline_dir,
     328           26 :                 &self.workspace_dir,
     329           26 :                 self.parallel_jobs,
     330           26 :             )
     331        29750 :             .await
     332              :             {
     333           22 :                 Ok(()) => {
     334           22 :                     retry_attempt = 0;
     335           22 :                 }
     336            2 :                 Err(e) => {
     337            2 :                     error!(
     338            2 :                         "failed while offloading range {}-{}: {:?}",
     339            2 :                         backup_lsn, commit_lsn, e
     340            2 :                     );
     341              : 
     342            2 :                     retry_attempt = retry_attempt.saturating_add(1);
     343              :                 }
     344              :             }
     345              :         }
     346            0 :     }
     347              : }
     348              : 
     349           26 : pub async fn backup_lsn_range(
     350           26 :     timeline: &Arc<Timeline>,
     351           26 :     backup_lsn: &mut Lsn,
     352           26 :     end_lsn: Lsn,
     353           26 :     wal_seg_size: usize,
     354           26 :     timeline_dir: &Path,
     355           26 :     workspace_dir: &Path,
     356           26 :     parallel_jobs: usize,
     357           26 : ) -> Result<()> {
     358           26 :     if parallel_jobs < 1 {
     359            0 :         anyhow::bail!("parallel_jobs must be >= 1");
     360           26 :     }
     361           26 : 
     362           26 :     let start_lsn = *backup_lsn;
     363           26 :     let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
     364           26 : 
     365           26 :     // Pool of concurrent upload tasks. We use `FuturesOrdered` to
     366           26 :     // preserve order of uploads, and update `backup_lsn` only after
     367           26 :     // all previous uploads are finished.
     368           26 :     let mut uploads = FuturesOrdered::new();
     369           26 :     let mut iter = segments.iter();
     370              : 
     371              :     loop {
     372           77 :         let added_task = match iter.next() {
     373           28 :             Some(s) => {
     374           28 :                 uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
     375           28 :                 true
     376              :             }
     377           49 :             None => false,
     378              :         };
     379              : 
     380              :         // Wait for the next segment to upload if we don't have any more segments,
     381              :         // or if we have too many concurrent uploads.
     382           77 :         if !added_task || uploads.len() >= parallel_jobs {
     383        29745 :             let next = uploads.next().await;
     384           47 :             if let Some(res) = next {
     385              :                 // next segment uploaded
     386           25 :                 let segment = res?;
     387           23 :                 let new_backup_lsn = segment.end_lsn;
     388           23 :                 timeline
     389           23 :                     .set_wal_backup_lsn(new_backup_lsn)
     390            5 :                     .await
     391           23 :                     .context("setting wal_backup_lsn")?;
     392           23 :                 *backup_lsn = new_backup_lsn;
     393              :             } else {
     394              :                 // no more segments to upload
     395           22 :                 break;
     396              :             }
     397           28 :         }
     398              :     }
     399              : 
     400           22 :     info!(
     401           22 :         "offloaded segnos {:?} up to {}, previous backup_lsn {}",
     402           23 :         segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
     403           22 :         end_lsn,
     404           22 :         start_lsn,
     405           22 :     );
     406           22 :     Ok(())
     407           24 : }
     408              : 
     409           28 : async fn backup_single_segment(
     410           28 :     seg: &Segment,
     411           28 :     timeline_dir: &Path,
     412           28 :     workspace_dir: &Path,
     413           28 : ) -> Result<Segment> {
     414           28 :     let segment_file_path = seg.file_path(timeline_dir)?;
     415           28 :     let remote_segment_path = segment_file_path
     416           28 :         .strip_prefix(workspace_dir)
     417           28 :         .context("Failed to strip workspace dir prefix")
     418           28 :         .and_then(RemotePath::new)
     419           28 :         .with_context(|| {
     420            0 :             format!(
     421            0 :                 "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
     422            0 :             )
     423           28 :         })?;
     424              : 
     425        26226 :     let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
     426           25 :     if res.is_ok() {
     427           23 :         BACKED_UP_SEGMENTS.inc();
     428           23 :     } else {
     429            2 :         BACKUP_ERRORS.inc();
     430            2 :     }
     431           25 :     res?;
     432            0 :     debug!("Backup of {} done", segment_file_path.display());
     433              : 
     434           23 :     Ok(*seg)
     435           25 : }
     436              : 
     437            0 : #[derive(Debug, Copy, Clone)]
     438              : pub struct Segment {
     439              :     seg_no: XLogSegNo,
     440              :     start_lsn: Lsn,
     441              :     end_lsn: Lsn,
     442              : }
     443              : 
     444              : impl Segment {
     445           28 :     pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
     446           28 :         Self {
     447           28 :             seg_no,
     448           28 :             start_lsn,
     449           28 :             end_lsn,
     450           28 :         }
     451           28 :     }
     452              : 
     453           28 :     pub fn object_name(self) -> String {
     454           28 :         XLogFileName(PG_TLI, self.seg_no, self.size())
     455           28 :     }
     456              : 
     457           28 :     pub fn file_path(self, timeline_dir: &Path) -> Result<PathBuf> {
     458           28 :         Ok(timeline_dir.join(self.object_name()))
     459           28 :     }
     460              : 
     461           56 :     pub fn size(self) -> usize {
     462           56 :         (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
     463           56 :     }
     464              : }
     465              : 
     466           26 : fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
     467           26 :     let first_seg = start.segment_number(seg_size);
     468           26 :     let last_seg = end.segment_number(seg_size);
     469           26 : 
     470           26 :     let res: Vec<Segment> = (first_seg..last_seg)
     471           28 :         .map(|s| {
     472           28 :             let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
     473           28 :             let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
     474           28 :             Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
     475           28 :         })
     476           26 :         .collect();
     477           26 :     res
     478           26 : }
     479              : 
     480              : static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
     481              : 
     482           28 : async fn backup_object(source_file: &Path, target_file: &RemotePath, size: usize) -> Result<()> {
     483           28 :     let storage = REMOTE_STORAGE
     484           28 :         .get()
     485           28 :         .expect("failed to get remote storage")
     486           28 :         .as_ref()
     487           28 :         .unwrap();
     488              : 
     489           28 :     let file = tokio::io::BufReader::new(File::open(&source_file).await.with_context(|| {
     490            0 :         format!(
     491            0 :             "Failed to open file {} for wal backup",
     492            0 :             source_file.display()
     493            0 :         )
     494           28 :     })?);
     495              : 
     496           28 :     storage
     497           28 :         .upload_storage_object(Box::new(file), size, target_file)
     498        26198 :         .await
     499           25 : }
     500              : 
     501            6 : pub async fn read_object(
     502            6 :     file_path: &RemotePath,
     503            6 :     offset: u64,
     504            6 : ) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
     505            6 :     let storage = REMOTE_STORAGE
     506            6 :         .get()
     507            6 :         .context("Failed to get remote storage")?
     508            6 :         .as_ref()
     509            6 :         .context("No remote storage configured")?;
     510              : 
     511            6 :     info!("segment download about to start from remote path {file_path:?} at offset {offset}");
     512              : 
     513            6 :     let download = storage
     514            6 :         .download_storage_object(Some((offset, None)), file_path)
     515           21 :         .await
     516            6 :         .with_context(|| {
     517            0 :             format!("Failed to open WAL segment download stream for remote path {file_path:?}")
     518            6 :         })?;
     519              : 
     520            6 :     Ok(download.download_stream)
     521            6 : }
        

Generated by: LCOV version 2.1-beta