LCOV - differential code coverage report
Current view: top level - safekeeper/src - timeline.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 92.5 % 561 519 42 519
Current Date: 2024-01-09 02:06:09 Functions: 84.8 % 99 84 15 84
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! This module implements Timeline lifecycle management and has all necessary code
       2                 : //! to glue together SafeKeeper and all other background services.
       3                 : 
       4                 : use anyhow::{anyhow, bail, Result};
       5                 : use camino::Utf8PathBuf;
       6                 : use postgres_ffi::XLogSegNo;
       7                 : use serde::{Deserialize, Serialize};
       8                 : use tokio::fs;
       9                 : 
      10                 : use std::cmp::max;
      11                 : use std::sync::Arc;
      12                 : use std::time::Duration;
      13                 : use tokio::sync::{Mutex, MutexGuard};
      14                 : use tokio::{
      15                 :     sync::{mpsc::Sender, watch},
      16                 :     time::Instant,
      17                 : };
      18                 : use tracing::*;
      19                 : use utils::http::error::ApiError;
      20                 : use utils::{
      21                 :     id::{NodeId, TenantTimelineId},
      22                 :     lsn::Lsn,
      23                 : };
      24                 : 
      25                 : use storage_broker::proto::SafekeeperTimelineInfo;
      26                 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      27                 : 
      28                 : use crate::receive_wal::WalReceivers;
      29                 : use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo};
      30                 : use crate::safekeeper::{
      31                 :     AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
      32                 :     SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
      33                 : };
      34                 : use crate::send_wal::WalSenders;
      35                 : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
      36                 : 
      37                 : use crate::metrics::FullTimelineInfo;
      38                 : use crate::wal_storage::Storage as wal_storage_iface;
      39                 : use crate::SafeKeeperConf;
      40                 : use crate::{debug_dump, wal_storage};
      41                 : 
      42                 : /// Things safekeeper should know about timeline state on peers.
      43 CBC       13295 : #[derive(Debug, Clone, Serialize, Deserialize)]
      44                 : pub struct PeerInfo {
      45                 :     pub sk_id: NodeId,
      46                 :     pub term: Term,
      47                 :     /// Term of the last entry.
      48                 :     pub last_log_term: Term,
      49                 :     /// LSN of the last record.
      50                 :     pub flush_lsn: Lsn,
      51                 :     pub commit_lsn: Lsn,
      52                 :     /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
      53                 :     /// sk since backup_lsn.
      54                 :     pub local_start_lsn: Lsn,
      55                 :     /// When info was received. Serde annotations are not very useful but make
      56                 :     /// the code compile -- we don't rely on this field externally.
      57                 :     #[serde(skip)]
      58                 :     #[serde(default = "Instant::now")]
      59                 :     ts: Instant,
      60                 :     pub pg_connstr: String,
      61                 :     pub http_connstr: String,
      62                 : }
      63                 : 
      64                 : impl PeerInfo {
      65           11960 :     fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
      66           11960 :         PeerInfo {
      67           11960 :             sk_id: NodeId(sk_info.safekeeper_id),
      68           11960 :             term: sk_info.term,
      69           11960 :             last_log_term: sk_info.last_log_term,
      70           11960 :             flush_lsn: Lsn(sk_info.flush_lsn),
      71           11960 :             commit_lsn: Lsn(sk_info.commit_lsn),
      72           11960 :             local_start_lsn: Lsn(sk_info.local_start_lsn),
      73           11960 :             pg_connstr: sk_info.safekeeper_connstr.clone(),
      74           11960 :             http_connstr: sk_info.http_connstr.clone(),
      75           11960 :             ts,
      76           11960 :         }
      77           11960 :     }
      78                 : }
      79                 : 
      80                 : // vector-based node id -> peer state map with very limited functionality we
      81                 : // need.
      82 UBC           0 : #[derive(Debug, Clone, Default)]
      83                 : pub struct PeersInfo(pub Vec<PeerInfo>);
      84                 : 
      85                 : impl PeersInfo {
      86 CBC       11960 :     fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
      87           16033 :         self.0.iter_mut().find(|p| p.sk_id == id)
      88           11960 :     }
      89                 : 
      90           11960 :     fn upsert(&mut self, p: &PeerInfo) {
      91           11960 :         match self.get(p.sk_id) {
      92           11148 :             Some(rp) => *rp = p.clone(),
      93             812 :             None => self.0.push(p.clone()),
      94                 :         }
      95           11960 :     }
      96                 : }
      97                 : 
      98                 : /// Shared state associated with database instance
      99                 : pub struct SharedState {
     100                 :     /// Safekeeper object
     101                 :     sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
     102                 :     /// In memory list containing state of peers sent in latest messages from them.
     103                 :     peers_info: PeersInfo,
     104                 :     /// True when WAL backup launcher oversees the timeline, making sure WAL is
     105                 :     /// offloaded, allows to bother launcher less.
     106                 :     wal_backup_active: bool,
     107                 :     /// True whenever there is at least some pending activity on timeline: live
     108                 :     /// compute connection, pageserver is not caughtup (it must have latest WAL
     109                 :     /// for new compute start) or WAL backuping is not finished. Practically it
     110                 :     /// means safekeepers broadcast info to peers about the timeline, old WAL is
     111                 :     /// trimmed.
     112                 :     ///
     113                 :     /// TODO: it might be better to remove tli completely from GlobalTimelines
     114                 :     /// when tli is inactive instead of having this flag.
     115                 :     active: bool,
     116                 :     last_removed_segno: XLogSegNo,
     117                 : }
     118                 : 
     119                 : impl SharedState {
     120                 :     /// Initialize fresh timeline state without persisting anything to disk.
     121             449 :     fn create_new(
     122             449 :         conf: &SafeKeeperConf,
     123             449 :         ttid: &TenantTimelineId,
     124             449 :         state: SafeKeeperState,
     125             449 :     ) -> Result<Self> {
     126             449 :         if state.server.wal_seg_size == 0 {
     127 UBC           0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     128 CBC         449 :         }
     129             449 : 
     130             449 :         if state.server.pg_version == UNKNOWN_SERVER_VERSION {
     131 UBC           0 :             bail!(TimelineError::UninitialinzedPgVersion(*ttid));
     132 CBC         449 :         }
     133             449 : 
     134             449 :         if state.commit_lsn < state.local_start_lsn {
     135 UBC           0 :             bail!(
     136               0 :                 "commit_lsn {} is higher than local_start_lsn {}",
     137               0 :                 state.commit_lsn,
     138               0 :                 state.local_start_lsn
     139               0 :             );
     140 CBC         449 :         }
     141             449 : 
     142             449 :         // We don't want to write anything to disk, because we may have existing timeline there.
     143             449 :         // These functions should not change anything on disk.
     144             449 :         let timeline_dir = conf.timeline_dir(ttid);
     145             449 :         let control_store = control_file::FileStorage::create_new(timeline_dir, conf, state)?;
     146             449 :         let wal_store =
     147             449 :             wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
     148             449 :         let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
     149                 : 
     150             449 :         Ok(Self {
     151             449 :             sk,
     152             449 :             peers_info: PeersInfo(vec![]),
     153             449 :             wal_backup_active: false,
     154             449 :             active: false,
     155             449 :             last_removed_segno: 0,
     156             449 :         })
     157             449 :     }
     158                 : 
     159                 :     /// Restore SharedState from control file. If file doesn't exist, bails out.
     160             133 :     fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
     161             133 :         let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
     162             133 :         if control_store.server.wal_seg_size == 0 {
     163 UBC           0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     164 CBC         133 :         }
     165                 : 
     166             133 :         let wal_store =
     167             133 :             wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
     168                 : 
     169                 :         Ok(Self {
     170             133 :             sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
     171             133 :             peers_info: PeersInfo(vec![]),
     172                 :             wal_backup_active: false,
     173                 :             active: false,
     174                 :             last_removed_segno: 0,
     175                 :         })
     176             133 :     }
     177                 : 
     178           15911 :     fn is_active(&self, num_computes: usize, remote_consistent_lsn: Lsn) -> bool {
     179           15911 :         self.is_wal_backup_required(num_computes)
     180                 :             // FIXME: add tracking of relevant pageservers and check them here individually,
     181                 :             // otherwise migration won't work (we suspend too early).
     182            2365 :             || remote_consistent_lsn < self.sk.inmem.commit_lsn
     183           15911 :     }
     184                 : 
     185                 :     /// Mark timeline active/inactive and return whether s3 offloading requires
     186                 :     /// start/stop action. If timeline is deactivated, control file is persisted
     187                 :     /// as maintenance task does that only for active timelines.
     188           15911 :     async fn update_status(
     189           15911 :         &mut self,
     190           15911 :         num_computes: usize,
     191           15911 :         remote_consistent_lsn: Lsn,
     192           15911 :         ttid: TenantTimelineId,
     193           15911 :     ) -> bool {
     194           15911 :         let is_active = self.is_active(num_computes, remote_consistent_lsn);
     195           15911 :         if self.active != is_active {
     196            1460 :             info!(
     197            1460 :                 "timeline {} active={} now, remote_consistent_lsn={}, commit_lsn={}",
     198            1460 :                 ttid, is_active, remote_consistent_lsn, self.sk.inmem.commit_lsn
     199            1460 :             );
     200            1460 :             if !is_active {
     201            1403 :                 if let Err(e) = self.sk.persist_inmem(remote_consistent_lsn).await {
     202 UBC           0 :                     warn!("control file save in update_status failed: {:?}", e);
     203 CBC         469 :                 }
     204             991 :             }
     205           14451 :         }
     206           15911 :         self.active = is_active;
     207           15911 :         self.is_wal_backup_action_pending(num_computes)
     208           15911 :     }
     209                 : 
     210                 :     /// Should we run s3 offloading in current state?
     211           44927 :     fn is_wal_backup_required(&self, num_computes: usize) -> bool {
     212           44927 :         let seg_size = self.get_wal_seg_size();
     213           44927 :         num_computes > 0 ||
     214                 :         // Currently only the whole segment is offloaded, so compare segment numbers.
     215           10047 :             (self.sk.inmem.commit_lsn.segment_number(seg_size) >
     216           10047 :              self.sk.inmem.backup_lsn.segment_number(seg_size))
     217           44927 :     }
     218                 : 
     219                 :     /// Is current state of s3 offloading is not what it ought to be?
     220           15911 :     fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool {
     221           15911 :         let res = self.wal_backup_active != self.is_wal_backup_required(num_computes);
     222           15911 :         if res {
     223           12959 :             let action_pending = if self.is_wal_backup_required(num_computes) {
     224           12899 :                 "start"
     225                 :             } else {
     226              60 :                 "stop"
     227                 :             };
     228           12959 :             trace!(
     229 UBC           0 :                 "timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
     230               0 :                 self.sk.state.timeline_id, action_pending, num_computes, self.sk.inmem.commit_lsn, self.sk.inmem.backup_lsn
     231               0 :             );
     232 CBC        2952 :         }
     233           15911 :         res
     234           15911 :     }
     235                 : 
     236                 :     /// Returns whether s3 offloading is required and sets current status as
     237                 :     /// matching.
     238             146 :     fn wal_backup_attend(&mut self, num_computes: usize) -> bool {
     239             146 :         self.wal_backup_active = self.is_wal_backup_required(num_computes);
     240             146 :         self.wal_backup_active
     241             146 :     }
     242                 : 
     243           44939 :     fn get_wal_seg_size(&self) -> usize {
     244           44939 :         self.sk.state.server.wal_seg_size as usize
     245           44939 :     }
     246                 : 
     247            8661 :     fn get_safekeeper_info(
     248            8661 :         &self,
     249            8661 :         ttid: &TenantTimelineId,
     250            8661 :         conf: &SafeKeeperConf,
     251            8661 :         remote_consistent_lsn: Lsn,
     252            8661 :     ) -> SafekeeperTimelineInfo {
     253            8661 :         SafekeeperTimelineInfo {
     254            8661 :             safekeeper_id: conf.my_id.0,
     255            8661 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     256            8661 :                 tenant_id: ttid.tenant_id.as_ref().to_owned(),
     257            8661 :                 timeline_id: ttid.timeline_id.as_ref().to_owned(),
     258            8661 :             }),
     259            8661 :             term: self.sk.state.acceptor_state.term,
     260            8661 :             last_log_term: self.sk.get_epoch(),
     261            8661 :             flush_lsn: self.sk.flush_lsn().0,
     262            8661 :             // note: this value is not flushed to control file yet and can be lost
     263            8661 :             commit_lsn: self.sk.inmem.commit_lsn.0,
     264            8661 :             remote_consistent_lsn: remote_consistent_lsn.0,
     265            8661 :             peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
     266            8661 :             safekeeper_connstr: conf
     267            8661 :                 .advertise_pg_addr
     268            8661 :                 .to_owned()
     269            8661 :                 .unwrap_or(conf.listen_pg_addr.clone()),
     270            8661 :             http_connstr: conf.listen_http_addr.to_owned(),
     271            8661 :             backup_lsn: self.sk.inmem.backup_lsn.0,
     272            8661 :             local_start_lsn: self.sk.state.local_start_lsn.0,
     273            8661 :             availability_zone: conf.availability_zone.clone(),
     274            8661 :         }
     275            8661 :     }
     276                 : 
     277                 :     /// Get our latest view of alive peers status on the timeline.
     278                 :     /// We pass our own info through the broker as well, so when we don't have connection
     279                 :     /// to the broker returned vec is empty.
     280             541 :     fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
     281             541 :         let now = Instant::now();
     282             541 :         self.peers_info
     283             541 :             .0
     284             541 :             .iter()
     285             541 :             // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
     286            1373 :             .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
     287             541 :             .cloned()
     288             541 :             .collect()
     289             541 :     }
     290                 : }
     291                 : 
     292               1 : #[derive(Debug, thiserror::Error)]
     293                 : pub enum TimelineError {
     294                 :     #[error("Timeline {0} was cancelled and cannot be used anymore")]
     295                 :     Cancelled(TenantTimelineId),
     296                 :     #[error("Timeline {0} was not found in global map")]
     297                 :     NotFound(TenantTimelineId),
     298                 :     #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
     299                 :     Invalid(TenantTimelineId),
     300                 :     #[error("Timeline {0} is already exists")]
     301                 :     AlreadyExists(TenantTimelineId),
     302                 :     #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
     303                 :     UninitializedWalSegSize(TenantTimelineId),
     304                 :     #[error("Timeline {0} is not initialized, pg_version is unknown")]
     305                 :     UninitialinzedPgVersion(TenantTimelineId),
     306                 : }
     307                 : 
     308                 : // Convert to HTTP API error.
     309                 : impl From<TimelineError> for ApiError {
     310 UBC           0 :     fn from(te: TimelineError) -> ApiError {
     311               0 :         match te {
     312               0 :             TimelineError::NotFound(ttid) => {
     313               0 :                 ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
     314                 :             }
     315               0 :             _ => ApiError::InternalServerError(anyhow!("{}", te)),
     316                 :         }
     317               0 :     }
     318                 : }
     319                 : 
     320                 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
     321                 : /// It also holds SharedState and provides mutually exclusive access to it.
     322                 : pub struct Timeline {
     323                 :     pub ttid: TenantTimelineId,
     324                 : 
     325                 :     /// Sending here asks for wal backup launcher attention (start/stop
     326                 :     /// offloading). Sending ttid instead of concrete command allows to do
     327                 :     /// sending without timeline lock.
     328                 :     pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
     329                 : 
     330                 :     /// Used to broadcast commit_lsn updates to all background jobs.
     331                 :     commit_lsn_watch_tx: watch::Sender<Lsn>,
     332                 :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     333                 : 
     334                 :     /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
     335                 :     /// them when sending in recovery mode (to walproposer or peers). Note: this
     336                 :     /// is just a notification, WAL reading should always done with lock held as
     337                 :     /// term can change otherwise.
     338                 :     term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
     339                 :     term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
     340                 : 
     341                 :     /// Safekeeper and other state, that should remain consistent and
     342                 :     /// synchronized with the disk. This is tokio mutex as we write WAL to disk
     343                 :     /// while holding it, ensuring that consensus checks are in order.
     344                 :     mutex: Mutex<SharedState>,
     345                 :     walsenders: Arc<WalSenders>,
     346                 :     walreceivers: Arc<WalReceivers>,
     347                 : 
     348                 :     /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
     349                 :     cancellation_tx: watch::Sender<bool>,
     350                 : 
     351                 :     /// Timeline should not be used after cancellation. Background tasks should
     352                 :     /// monitor this channel and stop eventually after receiving `true` from this channel.
     353                 :     cancellation_rx: watch::Receiver<bool>,
     354                 : 
     355                 :     /// Directory where timeline state is stored.
     356                 :     pub timeline_dir: Utf8PathBuf,
     357                 : }
     358                 : 
     359                 : impl Timeline {
     360                 :     /// Load existing timeline from disk.
     361 CBC         133 :     pub fn load_timeline(
     362             133 :         conf: &SafeKeeperConf,
     363             133 :         ttid: TenantTimelineId,
     364             133 :         wal_backup_launcher_tx: Sender<TenantTimelineId>,
     365             133 :     ) -> Result<Timeline> {
     366             133 :         let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
     367                 : 
     368             133 :         let shared_state = SharedState::restore(conf, &ttid)?;
     369             133 :         let rcl = shared_state.sk.state.remote_consistent_lsn;
     370             133 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
     371             133 :             watch::channel(shared_state.sk.state.commit_lsn);
     372             133 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
     373             133 :             shared_state.sk.get_term(),
     374             133 :             shared_state.sk.flush_lsn(),
     375             133 :         )));
     376             133 :         let (cancellation_tx, cancellation_rx) = watch::channel(false);
     377             133 : 
     378             133 :         Ok(Timeline {
     379             133 :             ttid,
     380             133 :             wal_backup_launcher_tx,
     381             133 :             commit_lsn_watch_tx,
     382             133 :             commit_lsn_watch_rx,
     383             133 :             term_flush_lsn_watch_tx,
     384             133 :             term_flush_lsn_watch_rx,
     385             133 :             mutex: Mutex::new(shared_state),
     386             133 :             walsenders: WalSenders::new(rcl),
     387             133 :             walreceivers: WalReceivers::new(),
     388             133 :             cancellation_rx,
     389             133 :             cancellation_tx,
     390             133 :             timeline_dir: conf.timeline_dir(&ttid),
     391             133 :         })
     392             133 :     }
     393                 : 
     394                 :     /// Create a new timeline, which is not yet persisted to disk.
     395             449 :     pub fn create_empty(
     396             449 :         conf: &SafeKeeperConf,
     397             449 :         ttid: TenantTimelineId,
     398             449 :         wal_backup_launcher_tx: Sender<TenantTimelineId>,
     399             449 :         server_info: ServerInfo,
     400             449 :         commit_lsn: Lsn,
     401             449 :         local_start_lsn: Lsn,
     402             449 :     ) -> Result<Timeline> {
     403             449 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
     404             449 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
     405             449 :             watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
     406             449 :         let (cancellation_tx, cancellation_rx) = watch::channel(false);
     407             449 :         let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
     408             449 : 
     409             449 :         Ok(Timeline {
     410             449 :             ttid,
     411             449 :             wal_backup_launcher_tx,
     412             449 :             commit_lsn_watch_tx,
     413             449 :             commit_lsn_watch_rx,
     414             449 :             term_flush_lsn_watch_tx,
     415             449 :             term_flush_lsn_watch_rx,
     416             449 :             mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
     417             449 :             walsenders: WalSenders::new(Lsn(0)),
     418             449 :             walreceivers: WalReceivers::new(),
     419             449 :             cancellation_rx,
     420             449 :             cancellation_tx,
     421             449 :             timeline_dir: conf.timeline_dir(&ttid),
     422                 :         })
     423             449 :     }
     424                 : 
     425                 :     /// Initialize fresh timeline on disk and start background tasks. If init
     426                 :     /// fails, timeline is cancelled and cannot be used anymore.
     427                 :     ///
     428                 :     /// Init is transactional, so if it fails, created files will be deleted,
     429                 :     /// and state on disk should remain unchanged.
     430             449 :     pub async fn init_new(
     431             449 :         self: &Arc<Timeline>,
     432             449 :         shared_state: &mut MutexGuard<'_, SharedState>,
     433             449 :         conf: &SafeKeeperConf,
     434             449 :     ) -> Result<()> {
     435             526 :         match fs::metadata(&self.timeline_dir).await {
     436                 :             Ok(_) => {
     437                 :                 // Timeline directory exists on disk, we should leave state unchanged
     438                 :                 // and return error.
     439 UBC           0 :                 bail!(TimelineError::Invalid(self.ttid));
     440                 :             }
     441 CBC         449 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     442 UBC           0 :             Err(e) => {
     443               0 :                 return Err(e.into());
     444                 :             }
     445                 :         }
     446                 : 
     447                 :         // Create timeline directory.
     448 CBC         528 :         fs::create_dir_all(&self.timeline_dir).await?;
     449                 : 
     450                 :         // Write timeline to disk and start background tasks.
     451            1442 :         if let Err(e) = shared_state.sk.persist_inmem(Lsn::INVALID).await {
     452                 :             // Bootstrap failed, cancel timeline and remove timeline directory.
     453 UBC           0 :             self.cancel(shared_state);
     454                 : 
     455               0 :             if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
     456               0 :                 warn!(
     457               0 :                     "failed to remove timeline {} directory after bootstrap failure: {}",
     458               0 :                     self.ttid, fs_err
     459               0 :                 );
     460               0 :             }
     461                 : 
     462               0 :             return Err(e);
     463 CBC         449 :         }
     464             449 :         self.bootstrap(conf);
     465             449 :         Ok(())
     466             449 :     }
     467                 : 
     468                 :     /// Bootstrap new or existing timeline starting background stasks.
     469             582 :     pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
     470             582 :         // Start recovery task which always runs on the timeline.
     471             582 :         if conf.peer_recovery_enabled {
     472               1 :             tokio::spawn(recovery_main(self.clone(), conf.clone()));
     473             581 :         }
     474             582 :     }
     475                 : 
     476                 :     /// Delete timeline from disk completely, by removing timeline directory. Background
     477                 :     /// timeline activities will stop eventually.
     478              25 :     pub async fn delete_from_disk(
     479              25 :         &self,
     480              25 :         shared_state: &mut MutexGuard<'_, SharedState>,
     481              25 :     ) -> Result<(bool, bool)> {
     482              25 :         let was_active = shared_state.active;
     483              25 :         self.cancel(shared_state);
     484              25 :         let dir_existed = delete_dir(&self.timeline_dir).await?;
     485              25 :         Ok((dir_existed, was_active))
     486              25 :     }
     487                 : 
     488                 :     /// Cancel timeline to prevent further usage. Background tasks will stop
     489                 :     /// eventually after receiving cancellation signal.
     490                 :     ///
     491                 :     /// Note that we can't notify backup launcher here while holding
     492                 :     /// shared_state lock, as this is a potential deadlock: caller is
     493                 :     /// responsible for that. Generally we should probably make WAL backup tasks
     494                 :     /// to shut down on their own, checking once in a while whether it is the
     495                 :     /// time.
     496              25 :     fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
     497              25 :         info!("timeline {} is cancelled", self.ttid);
     498              25 :         let _ = self.cancellation_tx.send(true);
     499              25 :         // Close associated FDs. Nobody will be able to touch timeline data once
     500              25 :         // it is cancelled, so WAL storage won't be opened again.
     501              25 :         shared_state.sk.wal_store.close();
     502              25 :     }
     503                 : 
     504                 :     /// Returns if timeline is cancelled.
     505         3320336 :     pub fn is_cancelled(&self) -> bool {
     506         3320336 :         *self.cancellation_rx.borrow()
     507         3320336 :     }
     508                 : 
     509                 :     /// Returns watch channel which gets value when timeline is cancelled. It is
     510                 :     /// guaranteed to have not cancelled value observed (errors otherwise).
     511               1 :     pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
     512               1 :         let rx = self.cancellation_rx.clone();
     513               1 :         if *rx.borrow() {
     514 UBC           0 :             bail!(TimelineError::Cancelled(self.ttid));
     515 CBC           1 :         }
     516               1 :         Ok(rx)
     517               1 :     }
     518                 : 
     519                 :     /// Take a writing mutual exclusive lock on timeline shared_state.
     520         3323988 :     pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
     521         3323988 :         self.mutex.lock().await
     522         3323988 :     }
     523                 : 
     524           15911 :     async fn update_status(&self, shared_state: &mut SharedState) -> bool {
     525           15911 :         shared_state
     526           15911 :             .update_status(
     527           15911 :                 self.walreceivers.get_num(),
     528           15911 :                 self.get_walsenders().get_remote_consistent_lsn(),
     529           15911 :                 self.ttid,
     530           15911 :             )
     531            1403 :             .await
     532           15911 :     }
     533                 : 
     534                 :     /// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
     535            3951 :     pub async fn update_status_notify(&self) -> Result<()> {
     536            3951 :         if self.is_cancelled() {
     537 UBC           0 :             bail!(TimelineError::Cancelled(self.ttid));
     538 CBC        3951 :         }
     539            3951 :         let is_wal_backup_action_pending: bool = {
     540            3951 :             let mut shared_state = self.write_shared_state().await;
     541            3951 :             self.update_status(&mut shared_state).await
     542                 :         };
     543            3951 :         if is_wal_backup_action_pending {
     544                 :             // Can fail only if channel to a static thread got closed, which is not normal at all.
     545            2404 :             self.wal_backup_launcher_tx.send(self.ttid).await?;
     546            1547 :         }
     547            3951 :         Ok(())
     548            3951 :     }
     549                 : 
     550                 :     /// Returns true if walsender should stop sending WAL to pageserver. We
     551                 :     /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
     552                 :     /// computes. While there might be nothing to stream already, we learn about
     553                 :     /// remote_consistent_lsn update through replication feedback, and we want
     554                 :     /// to stop pushing to the broker if pageserver is fully caughtup.
     555            3461 :     pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
     556            3461 :         if self.is_cancelled() {
     557               1 :             return true;
     558            3460 :         }
     559            3460 :         let shared_state = self.write_shared_state().await;
     560            3460 :         if self.walreceivers.get_num() == 0 {
     561             712 :             return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
     562             712 :             reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn;
     563            2748 :         }
     564            2748 :         false
     565            3461 :     }
     566                 : 
     567                 :     /// Ensure taht current term is t, erroring otherwise, and lock the state.
     568            1905 :     pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
     569            1905 :         let ss = self.write_shared_state().await;
     570            1905 :         if ss.sk.state.acceptor_state.term != t {
     571               1 :             bail!(
     572               1 :                 "failed to acquire term {}, current term {}",
     573               1 :                 t,
     574               1 :                 ss.sk.state.acceptor_state.term
     575               1 :             );
     576            1904 :         }
     577            1904 :         Ok(ss)
     578            1905 :     }
     579                 : 
     580                 :     /// Returns whether s3 offloading is required and sets current status as
     581                 :     /// matching it.
     582             146 :     pub async fn wal_backup_attend(&self) -> bool {
     583             146 :         if self.is_cancelled() {
     584 UBC           0 :             return false;
     585 CBC         146 :         }
     586             146 : 
     587             146 :         self.write_shared_state()
     588              38 :             .await
     589             146 :             .wal_backup_attend(self.walreceivers.get_num())
     590             146 :     }
     591                 : 
     592                 :     /// Returns commit_lsn watch channel.
     593             730 :     pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
     594             730 :         self.commit_lsn_watch_rx.clone()
     595             730 :     }
     596                 : 
     597                 :     /// Returns term_flush_lsn watch channel.
     598              16 :     pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
     599              16 :         self.term_flush_lsn_watch_rx.clone()
     600              16 :     }
     601                 : 
     602                 :     /// Pass arrived message to the safekeeper.
     603         3274394 :     pub async fn process_msg(
     604         3274394 :         &self,
     605         3274394 :         msg: &ProposerAcceptorMessage,
     606         3274394 :     ) -> Result<Option<AcceptorProposerMessage>> {
     607         3274394 :         if self.is_cancelled() {
     608 UBC           0 :             bail!(TimelineError::Cancelled(self.ttid));
     609 CBC     3274394 :         }
     610                 : 
     611                 :         let mut rmsg: Option<AcceptorProposerMessage>;
     612                 :         let commit_lsn: Lsn;
     613                 :         let term_flush_lsn: TermLsn;
     614                 :         {
     615         3274394 :             let mut shared_state = self.write_shared_state().await;
     616         3985426 :             rmsg = shared_state.sk.process_msg(msg).await?;
     617                 : 
     618                 :             // if this is AppendResponse, fill in proper pageserver and hot
     619                 :             // standby feedback.
     620         3274393 :             if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
     621         1294232 :                 let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks();
     622         1294232 :                 resp.hs_feedback = hs_feedback;
     623         1294232 :                 resp.pageserver_feedback = ps_feedback;
     624         1980161 :             }
     625                 : 
     626         3274393 :             commit_lsn = shared_state.sk.inmem.commit_lsn;
     627         3274393 :             term_flush_lsn =
     628         3274393 :                 TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
     629         3274393 :         }
     630         3274393 :         self.commit_lsn_watch_tx.send(commit_lsn)?;
     631         3274393 :         self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
     632         3274393 :         Ok(rmsg)
     633         3274393 :     }
     634                 : 
     635                 :     /// Returns wal_seg_size.
     636              12 :     pub async fn get_wal_seg_size(&self) -> usize {
     637              12 :         self.write_shared_state().await.get_wal_seg_size()
     638              12 :     }
     639                 : 
     640                 :     /// Returns true only if the timeline is loaded and active.
     641           11121 :     pub async fn is_active(&self) -> bool {
     642           11121 :         if self.is_cancelled() {
     643 UBC           0 :             return false;
     644 CBC       11121 :         }
     645           11121 : 
     646           11121 :         self.write_shared_state().await.active
     647           11121 :     }
     648                 : 
     649                 :     /// Returns state of the timeline.
     650            2736 :     pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
     651            2736 :         let state = self.write_shared_state().await;
     652            2736 :         (state.sk.inmem.clone(), state.sk.state.clone())
     653            2736 :     }
     654                 : 
     655                 :     /// Returns latest backup_lsn.
     656             312 :     pub async fn get_wal_backup_lsn(&self) -> Lsn {
     657             312 :         self.write_shared_state().await.sk.inmem.backup_lsn
     658             312 :     }
     659                 : 
     660                 :     /// Sets backup_lsn to the given value.
     661              13 :     pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
     662              13 :         if self.is_cancelled() {
     663 UBC           0 :             bail!(TimelineError::Cancelled(self.ttid));
     664 CBC          13 :         }
     665                 : 
     666              13 :         let mut state = self.write_shared_state().await;
     667              13 :         state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
     668              13 :         // we should check whether to shut down offloader, but this will be done
     669              13 :         // soon by peer communication anyway.
     670              13 :         Ok(())
     671              13 :     }
     672                 : 
     673                 :     /// Get safekeeper info for broadcasting to broker and other peers.
     674            8661 :     pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
     675            8661 :         let shared_state = self.write_shared_state().await;
     676            8661 :         shared_state.get_safekeeper_info(
     677            8661 :             &self.ttid,
     678            8661 :             conf,
     679            8661 :             self.walsenders.get_remote_consistent_lsn(),
     680            8661 :         )
     681            8661 :     }
     682                 : 
     683                 :     /// Update timeline state with peer safekeeper data.
     684           11960 :     pub async fn record_safekeeper_info(&self, mut sk_info: SafekeeperTimelineInfo) -> Result<()> {
     685           11960 :         // Update local remote_consistent_lsn in memory (in .walsenders) and in
     686           11960 :         // sk_info to pass it down to control file.
     687           11960 :         sk_info.remote_consistent_lsn = self
     688           11960 :             .walsenders
     689           11960 :             .update_remote_consistent_lsn(Lsn(sk_info.remote_consistent_lsn))
     690           11960 :             .0;
     691                 :         let is_wal_backup_action_pending: bool;
     692                 :         let commit_lsn: Lsn;
     693                 :         {
     694           11960 :             let mut shared_state = self.write_shared_state().await;
     695           11960 :             shared_state.sk.record_safekeeper_info(&sk_info).await?;
     696           11960 :             let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
     697           11960 :             shared_state.peers_info.upsert(&peer_info);
     698           11960 :             is_wal_backup_action_pending = self.update_status(&mut shared_state).await;
     699           11960 :             commit_lsn = shared_state.sk.inmem.commit_lsn;
     700           11960 :         }
     701           11960 :         self.commit_lsn_watch_tx.send(commit_lsn)?;
     702                 :         // Wake up wal backup launcher, if it is time to stop the offloading.
     703           11960 :         if is_wal_backup_action_pending {
     704           10555 :             self.wal_backup_launcher_tx.send(self.ttid).await?;
     705            1405 :         }
     706           11960 :         Ok(())
     707           11960 :     }
     708                 : 
     709             538 :     pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
     710             538 :         let shared_state = self.write_shared_state().await;
     711             538 :         shared_state.get_peers(conf.heartbeat_timeout)
     712             538 :     }
     713                 : 
     714                 :     /// Should we start fetching WAL from a peer safekeeper, and if yes, from
     715                 :     /// which? Answer is yes, i.e. .donors is not empty if 1) there is something
     716                 :     /// to fetch, and we can do that without running elections; 2) there is no
     717                 :     /// actively streaming compute, as we don't want to compete with it.
     718                 :     ///
     719                 :     /// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal
     720                 :     /// to its last_log_term so we are sure such a leader ever had been elected.
     721                 :     ///
     722                 :     /// All possible donors are returned so that we could keep connection to the
     723                 :     /// current one if it is good even if it slightly lags behind.
     724                 :     ///
     725                 :     /// Note that term conditions above might be not met, but safekeepers are
     726                 :     /// still not aligned on last flush_lsn. Generally in this case until
     727                 :     /// elections are run it is not possible to say which safekeeper should
     728                 :     /// recover from which one -- history which would be committed is different
     729                 :     /// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
     730                 :     /// Thus we don't try to predict it here.
     731               3 :     pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
     732               3 :         let ss = self.write_shared_state().await;
     733               3 :         let term = ss.sk.state.acceptor_state.term;
     734               3 :         let last_log_term = ss.sk.get_epoch();
     735               3 :         let flush_lsn = ss.sk.flush_lsn();
     736               3 :         // note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
     737               3 :         let mut peers = ss.get_peers(heartbeat_timeout);
     738               3 :         // Sort by <last log term, lsn> pairs.
     739               5 :         peers.sort_by(|p1, p2| {
     740               5 :             let tl1 = TermLsn {
     741               5 :                 term: p1.last_log_term,
     742               5 :                 lsn: p1.flush_lsn,
     743               5 :             };
     744               5 :             let tl2 = TermLsn {
     745               5 :                 term: p2.last_log_term,
     746               5 :                 lsn: p2.flush_lsn,
     747               5 :             };
     748               5 :             tl2.cmp(&tl1) // desc
     749               5 :         });
     750               3 :         let num_streaming_computes = self.walreceivers.get_num_streaming();
     751               3 :         let donors = if num_streaming_computes > 0 {
     752 UBC           0 :             vec![] // If there is a streaming compute, don't try to recover to not intervene.
     753                 :         } else {
     754 CBC           3 :             peers
     755               3 :                 .iter()
     756               6 :                 .filter_map(|candidate| {
     757               6 :                     // Are we interested in this candidate?
     758               6 :                     let candidate_tl = TermLsn {
     759               6 :                         term: candidate.last_log_term,
     760               6 :                         lsn: candidate.flush_lsn,
     761               6 :                     };
     762               6 :                     let my_tl = TermLsn {
     763               6 :                         term: last_log_term,
     764               6 :                         lsn: flush_lsn,
     765               6 :                     };
     766               6 :                     if my_tl < candidate_tl {
     767                 :                         // Yes, we are interested. Can we pull from it without
     768                 :                         // (re)running elections? It is possible if 1) his term
     769                 :                         // is equal to his last_log_term so we could act on
     770                 :                         // behalf of leader of this term (we must be sure he was
     771                 :                         // ever elected) and 2) our term is not higher, or we'll refuse data.
     772               2 :                         if candidate.term == candidate.last_log_term && candidate.term >= term {
     773               2 :                             Some(Donor::from(candidate))
     774                 :                         } else {
     775 UBC           0 :                             None
     776                 :                         }
     777                 :                     } else {
     778 CBC           4 :                         None
     779                 :                     }
     780               6 :                 })
     781               3 :                 .collect()
     782                 :         };
     783               3 :         RecoveryNeededInfo {
     784               3 :             term,
     785               3 :             last_log_term,
     786               3 :             flush_lsn,
     787               3 :             peers,
     788               3 :             num_streaming_computes,
     789               3 :             donors,
     790               3 :         }
     791               3 :     }
     792                 : 
     793           17190 :     pub fn get_walsenders(&self) -> &Arc<WalSenders> {
     794           17190 :         &self.walsenders
     795           17190 :     }
     796                 : 
     797            1993 :     pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
     798            1993 :         &self.walreceivers
     799            1993 :     }
     800                 : 
     801                 :     /// Returns flush_lsn.
     802             525 :     pub async fn get_flush_lsn(&self) -> Lsn {
     803             525 :         self.write_shared_state().await.sk.wal_store.flush_lsn()
     804             525 :     }
     805                 : 
     806                 :     /// Delete WAL segments from disk that are no longer needed. This is determined
     807                 :     /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
     808            1846 :     pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
     809            1846 :         if self.is_cancelled() {
     810 UBC           0 :             bail!(TimelineError::Cancelled(self.ttid));
     811 CBC        1846 :         }
     812                 : 
     813                 :         let horizon_segno: XLogSegNo;
     814              29 :         let remover = {
     815            1846 :             let shared_state = self.write_shared_state().await;
     816            1846 :             horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
     817            1846 :             if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
     818            1817 :                 return Ok(()); // nothing to do
     819              29 :             }
     820              29 : 
     821              29 :             // release the lock before removing
     822              29 :             shared_state.sk.wal_store.remove_up_to(horizon_segno - 1)
     823              29 :         };
     824              29 : 
     825              29 :         // delete old WAL files
     826              43 :         remover.await?;
     827                 : 
     828                 :         // update last_removed_segno
     829              29 :         let mut shared_state = self.write_shared_state().await;
     830              29 :         shared_state.last_removed_segno = horizon_segno;
     831              29 :         Ok(())
     832            1846 :     }
     833                 : 
     834                 :     /// Persist control file if there is something to save and enough time
     835                 :     /// passed after the last save. This helps to keep remote_consistent_lsn up
     836                 :     /// to date so that storage nodes restart doesn't cause many pageserver ->
     837                 :     /// safekeeper reconnections.
     838            1846 :     pub async fn maybe_persist_control_file(&self) -> Result<()> {
     839            1846 :         let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn();
     840            1846 :         self.write_shared_state()
     841             106 :             .await
     842                 :             .sk
     843            1846 :             .maybe_persist_inmem_control_file(remote_consistent_lsn)
     844               6 :             .await
     845            1846 :     }
     846                 : 
     847                 :     /// Gather timeline data for metrics. If the timeline is not active, returns
     848                 :     /// None, we do not collect these.
     849              51 :     pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
     850              51 :         if self.is_cancelled() {
     851 UBC           0 :             return None;
     852 CBC          51 :         }
     853              51 : 
     854              51 :         let ps_feedback = self.walsenders.get_ps_feedback();
     855              51 :         let state = self.write_shared_state().await;
     856              51 :         if state.active {
     857              51 :             Some(FullTimelineInfo {
     858              51 :                 ttid: self.ttid,
     859              51 :                 ps_feedback,
     860              51 :                 wal_backup_active: state.wal_backup_active,
     861              51 :                 timeline_is_active: state.active,
     862              51 :                 num_computes: self.walreceivers.get_num() as u32,
     863              51 :                 last_removed_segno: state.last_removed_segno,
     864              51 :                 epoch_start_lsn: state.sk.epoch_start_lsn,
     865              51 :                 mem_state: state.sk.inmem.clone(),
     866              51 :                 persisted_state: state.sk.state.clone(),
     867              51 :                 flush_lsn: state.sk.wal_store.flush_lsn(),
     868              51 :                 remote_consistent_lsn: self.get_walsenders().get_remote_consistent_lsn(),
     869              51 :                 wal_storage: state.sk.wal_store.get_metrics(),
     870              51 :             })
     871                 :         } else {
     872 UBC           0 :             None
     873                 :         }
     874 CBC          51 :     }
     875                 : 
     876                 :     /// Returns in-memory timeline state to build a full debug dump.
     877               5 :     pub async fn memory_dump(&self) -> debug_dump::Memory {
     878               5 :         let state = self.write_shared_state().await;
     879                 : 
     880               5 :         let (write_lsn, write_record_lsn, flush_lsn, file_open) =
     881               5 :             state.sk.wal_store.internal_state();
     882               5 : 
     883               5 :         debug_dump::Memory {
     884               5 :             is_cancelled: self.is_cancelled(),
     885               5 :             peers_info_len: state.peers_info.0.len(),
     886               5 :             walsenders: self.walsenders.get_all(),
     887               5 :             wal_backup_active: state.wal_backup_active,
     888               5 :             active: state.active,
     889               5 :             num_computes: self.walreceivers.get_num() as u32,
     890               5 :             last_removed_segno: state.last_removed_segno,
     891               5 :             epoch_start_lsn: state.sk.epoch_start_lsn,
     892               5 :             mem_state: state.sk.inmem.clone(),
     893               5 :             write_lsn,
     894               5 :             write_record_lsn,
     895               5 :             flush_lsn,
     896               5 :             file_open,
     897               5 :         }
     898               5 :     }
     899                 : }
     900                 : 
     901                 : /// Deletes directory and it's contents. Returns false if directory does not exist.
     902              25 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
     903              25 :     match fs::remove_dir_all(path).await {
     904              11 :         Ok(_) => Ok(true),
     905              14 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
     906 UBC           0 :         Err(e) => Err(e.into()),
     907                 :     }
     908 CBC          25 : }
        

Generated by: LCOV version 2.1-beta