LCOV - differential code coverage report
Current view: top level - safekeeper/src - timeline.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 91.2 % 491 448 1 42 448
Current Date: 2023-10-19 02:04:12 Functions: 80.6 % 98 79 19 79
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! This module implements Timeline lifecycle management and has all necessary code
       2                 : //! to glue together SafeKeeper and all other background services.
       3                 : 
       4                 : use anyhow::{anyhow, bail, Result};
       5                 : use camino::Utf8PathBuf;
       6                 : use postgres_ffi::XLogSegNo;
       7                 : use serde::{Deserialize, Serialize};
       8                 : use serde_with::serde_as;
       9                 : use tokio::fs;
      10                 : 
      11                 : use serde_with::DisplayFromStr;
      12                 : use std::cmp::max;
      13                 : use std::sync::Arc;
      14                 : use tokio::sync::{Mutex, MutexGuard};
      15                 : use tokio::{
      16                 :     sync::{mpsc::Sender, watch},
      17                 :     time::Instant,
      18                 : };
      19                 : use tracing::*;
      20                 : use utils::http::error::ApiError;
      21                 : use utils::{
      22                 :     id::{NodeId, TenantTimelineId},
      23                 :     lsn::Lsn,
      24                 : };
      25                 : 
      26                 : use storage_broker::proto::SafekeeperTimelineInfo;
      27                 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      28                 : 
      29                 : use crate::receive_wal::WalReceivers;
      30                 : use crate::recovery::recovery_main;
      31                 : use crate::safekeeper::{
      32                 :     AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
      33                 :     SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
      34                 : };
      35                 : use crate::send_wal::WalSenders;
      36                 : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
      37                 : 
      38                 : use crate::metrics::FullTimelineInfo;
      39                 : use crate::wal_storage::Storage as wal_storage_iface;
      40                 : use crate::SafeKeeperConf;
      41                 : use crate::{debug_dump, wal_storage};
      42                 : 
      43                 : /// Things safekeeper should know about timeline state on peers.
      44                 : #[serde_as]
      45 CBC       10902 : #[derive(Debug, Clone, Serialize, Deserialize)]
      46                 : pub struct PeerInfo {
      47                 :     pub sk_id: NodeId,
      48                 :     /// Term of the last entry.
      49                 :     _last_log_term: Term,
      50                 :     /// LSN of the last record.
      51                 :     #[serde_as(as = "DisplayFromStr")]
      52                 :     _flush_lsn: Lsn,
      53                 :     #[serde_as(as = "DisplayFromStr")]
      54                 :     pub commit_lsn: Lsn,
      55                 :     /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
      56                 :     /// sk since backup_lsn.
      57                 :     #[serde_as(as = "DisplayFromStr")]
      58                 :     pub local_start_lsn: Lsn,
      59                 :     /// When info was received. Serde annotations are not very useful but make
      60                 :     /// the code compile -- we don't rely on this field externally.
      61                 :     #[serde(skip)]
      62                 :     #[serde(default = "Instant::now")]
      63                 :     ts: Instant,
      64                 : }
      65                 : 
      66                 : impl PeerInfo {
      67            9742 :     fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
      68            9742 :         PeerInfo {
      69            9742 :             sk_id: NodeId(sk_info.safekeeper_id),
      70            9742 :             _last_log_term: sk_info.last_log_term,
      71            9742 :             _flush_lsn: Lsn(sk_info.flush_lsn),
      72            9742 :             commit_lsn: Lsn(sk_info.commit_lsn),
      73            9742 :             local_start_lsn: Lsn(sk_info.local_start_lsn),
      74            9742 :             ts,
      75            9742 :         }
      76            9742 :     }
      77                 : }
      78                 : 
      79                 : // vector-based node id -> peer state map with very limited functionality we
      80                 : // need.
      81 UBC           0 : #[derive(Debug, Clone, Default)]
      82                 : pub struct PeersInfo(pub Vec<PeerInfo>);
      83                 : 
      84                 : impl PeersInfo {
      85 CBC        9742 :     fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
      86           13581 :         self.0.iter_mut().find(|p| p.sk_id == id)
      87            9742 :     }
      88                 : 
      89            9742 :     fn upsert(&mut self, p: &PeerInfo) {
      90            9742 :         match self.get(p.sk_id) {
      91            8898 :             Some(rp) => *rp = p.clone(),
      92             844 :             None => self.0.push(p.clone()),
      93                 :         }
      94            9742 :     }
      95                 : }
      96                 : 
      97                 : /// Shared state associated with database instance
      98                 : pub struct SharedState {
      99                 :     /// Safekeeper object
     100                 :     sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
     101                 :     /// In memory list containing state of peers sent in latest messages from them.
     102                 :     peers_info: PeersInfo,
     103                 :     /// True when WAL backup launcher oversees the timeline, making sure WAL is
     104                 :     /// offloaded, allows to bother launcher less.
     105                 :     wal_backup_active: bool,
     106                 :     /// True whenever there is at least some pending activity on timeline: live
     107                 :     /// compute connection, pageserver is not caughtup (it must have latest WAL
     108                 :     /// for new compute start) or WAL backuping is not finished. Practically it
     109                 :     /// means safekeepers broadcast info to peers about the timeline, old WAL is
     110                 :     /// trimmed.
     111                 :     ///
     112                 :     /// TODO: it might be better to remove tli completely from GlobalTimelines
     113                 :     /// when tli is inactive instead of having this flag.
     114                 :     active: bool,
     115                 :     last_removed_segno: XLogSegNo,
     116                 : }
     117                 : 
     118                 : impl SharedState {
     119                 :     /// Initialize fresh timeline state without persisting anything to disk.
     120             481 :     fn create_new(
     121             481 :         conf: &SafeKeeperConf,
     122             481 :         ttid: &TenantTimelineId,
     123             481 :         state: SafeKeeperState,
     124             481 :     ) -> Result<Self> {
     125             481 :         if state.server.wal_seg_size == 0 {
     126 UBC           0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     127 CBC         481 :         }
     128             481 : 
     129             481 :         if state.server.pg_version == UNKNOWN_SERVER_VERSION {
     130 UBC           0 :             bail!(TimelineError::UninitialinzedPgVersion(*ttid));
     131 CBC         481 :         }
     132             481 : 
     133             481 :         if state.commit_lsn < state.local_start_lsn {
     134 UBC           0 :             bail!(
     135               0 :                 "commit_lsn {} is higher than local_start_lsn {}",
     136               0 :                 state.commit_lsn,
     137               0 :                 state.local_start_lsn
     138               0 :             );
     139 CBC         481 :         }
     140                 : 
     141                 :         // We don't want to write anything to disk, because we may have existing timeline there.
     142                 :         // These functions should not change anything on disk.
     143             481 :         let control_store = control_file::FileStorage::create_new(ttid, conf, state)?;
     144             481 :         let wal_store =
     145             481 :             wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
     146             481 :         let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
     147                 : 
     148             481 :         Ok(Self {
     149             481 :             sk,
     150             481 :             peers_info: PeersInfo(vec![]),
     151             481 :             wal_backup_active: false,
     152             481 :             active: false,
     153             481 :             last_removed_segno: 0,
     154             481 :         })
     155             481 :     }
     156                 : 
     157                 :     /// Restore SharedState from control file. If file doesn't exist, bails out.
     158              84 :     fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
     159              84 :         let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
     160              84 :         if control_store.server.wal_seg_size == 0 {
     161 UBC           0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     162 CBC          84 :         }
     163                 : 
     164              84 :         let wal_store =
     165              84 :             wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
     166                 : 
     167                 :         Ok(Self {
     168              84 :             sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
     169              84 :             peers_info: PeersInfo(vec![]),
     170                 :             wal_backup_active: false,
     171                 :             active: false,
     172                 :             last_removed_segno: 0,
     173                 :         })
     174              84 :     }
     175                 : 
     176           14253 :     fn is_active(&self, num_computes: usize, remote_consistent_lsn: Lsn) -> bool {
     177           14253 :         self.is_wal_backup_required(num_computes)
     178                 :             // FIXME: add tracking of relevant pageservers and check them here individually,
     179                 :             // otherwise migration won't work (we suspend too early).
     180            2514 :             || remote_consistent_lsn < self.sk.inmem.commit_lsn
     181           14253 :     }
     182                 : 
     183                 :     /// Mark timeline active/inactive and return whether s3 offloading requires
     184                 :     /// start/stop action.
     185           14253 :     fn update_status(
     186           14253 :         &mut self,
     187           14253 :         num_computes: usize,
     188           14253 :         remote_consistent_lsn: Lsn,
     189           14253 :         ttid: TenantTimelineId,
     190           14253 :     ) -> bool {
     191           14253 :         let is_active = self.is_active(num_computes, remote_consistent_lsn);
     192           14253 :         if self.active != is_active {
     193            1563 :             info!("timeline {} active={} now", ttid, is_active);
     194           12690 :         }
     195           14253 :         self.active = is_active;
     196           14253 :         self.is_wal_backup_action_pending(num_computes)
     197           14253 :     }
     198                 : 
     199                 :     /// Should we run s3 offloading in current state?
     200           40074 :     fn is_wal_backup_required(&self, num_computes: usize) -> bool {
     201           40074 :         let seg_size = self.get_wal_seg_size();
     202           40074 :         num_computes > 0 ||
     203                 :         // Currently only the whole segment is offloaded, so compare segment numbers.
     204            9697 :             (self.sk.inmem.commit_lsn.segment_number(seg_size) >
     205            9697 :              self.sk.inmem.backup_lsn.segment_number(seg_size))
     206           40074 :     }
     207                 : 
     208                 :     /// Is current state of s3 offloading is not what it ought to be?
     209           14253 :     fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool {
     210           14253 :         let res = self.wal_backup_active != self.is_wal_backup_required(num_computes);
     211           14253 :         if res {
     212           11376 :             let action_pending = if self.is_wal_backup_required(num_computes) {
     213           11298 :                 "start"
     214                 :             } else {
     215              78 :                 "stop"
     216                 :             };
     217           11376 :             trace!(
     218 UBC           0 :                 "timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
     219               0 :                 self.sk.state.timeline_id, action_pending, num_computes, self.sk.inmem.commit_lsn, self.sk.inmem.backup_lsn
     220               0 :             );
     221 CBC        2877 :         }
     222           14253 :         res
     223           14253 :     }
     224                 : 
     225                 :     /// Returns whether s3 offloading is required and sets current status as
     226                 :     /// matching.
     227             192 :     fn wal_backup_attend(&mut self, num_computes: usize) -> bool {
     228             192 :         self.wal_backup_active = self.is_wal_backup_required(num_computes);
     229             192 :         self.wal_backup_active
     230             192 :     }
     231                 : 
     232           40099 :     fn get_wal_seg_size(&self) -> usize {
     233           40099 :         self.sk.state.server.wal_seg_size as usize
     234           40099 :     }
     235                 : 
     236            6665 :     fn get_safekeeper_info(
     237            6665 :         &self,
     238            6665 :         ttid: &TenantTimelineId,
     239            6665 :         conf: &SafeKeeperConf,
     240            6665 :         remote_consistent_lsn: Lsn,
     241            6665 :     ) -> SafekeeperTimelineInfo {
     242            6665 :         SafekeeperTimelineInfo {
     243            6665 :             safekeeper_id: conf.my_id.0,
     244            6665 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     245            6665 :                 tenant_id: ttid.tenant_id.as_ref().to_owned(),
     246            6665 :                 timeline_id: ttid.timeline_id.as_ref().to_owned(),
     247            6665 :             }),
     248            6665 :             term: self.sk.state.acceptor_state.term,
     249            6665 :             last_log_term: self.sk.get_epoch(),
     250            6665 :             flush_lsn: self.sk.flush_lsn().0,
     251            6665 :             // note: this value is not flushed to control file yet and can be lost
     252            6665 :             commit_lsn: self.sk.inmem.commit_lsn.0,
     253            6665 :             remote_consistent_lsn: remote_consistent_lsn.0,
     254            6665 :             peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
     255            6665 :             safekeeper_connstr: conf
     256            6665 :                 .advertise_pg_addr
     257            6665 :                 .to_owned()
     258            6665 :                 .unwrap_or(conf.listen_pg_addr.clone()),
     259            6665 :             http_connstr: conf.listen_http_addr.to_owned(),
     260            6665 :             backup_lsn: self.sk.inmem.backup_lsn.0,
     261            6665 :             local_start_lsn: self.sk.state.local_start_lsn.0,
     262            6665 :             availability_zone: conf.availability_zone.clone(),
     263            6665 :         }
     264            6665 :     }
     265                 : }
     266                 : 
     267 UBC           0 : #[derive(Debug, thiserror::Error)]
     268                 : pub enum TimelineError {
     269                 :     #[error("Timeline {0} was cancelled and cannot be used anymore")]
     270                 :     Cancelled(TenantTimelineId),
     271                 :     #[error("Timeline {0} was not found in global map")]
     272                 :     NotFound(TenantTimelineId),
     273                 :     #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
     274                 :     Invalid(TenantTimelineId),
     275                 :     #[error("Timeline {0} is already exists")]
     276                 :     AlreadyExists(TenantTimelineId),
     277                 :     #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
     278                 :     UninitializedWalSegSize(TenantTimelineId),
     279                 :     #[error("Timeline {0} is not initialized, pg_version is unknown")]
     280                 :     UninitialinzedPgVersion(TenantTimelineId),
     281                 : }
     282                 : 
     283                 : // Convert to HTTP API error.
     284                 : impl From<TimelineError> for ApiError {
     285               0 :     fn from(te: TimelineError) -> ApiError {
     286               0 :         match te {
     287               0 :             TimelineError::NotFound(ttid) => {
     288               0 :                 ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
     289                 :             }
     290               0 :             _ => ApiError::InternalServerError(anyhow!("{}", te)),
     291                 :         }
     292               0 :     }
     293                 : }
     294                 : 
     295                 : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
     296                 : /// It also holds SharedState and provides mutually exclusive access to it.
     297                 : pub struct Timeline {
     298                 :     pub ttid: TenantTimelineId,
     299                 : 
     300                 :     /// Sending here asks for wal backup launcher attention (start/stop
     301                 :     /// offloading). Sending ttid instead of concrete command allows to do
     302                 :     /// sending without timeline lock.
     303                 :     pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
     304                 : 
     305                 :     /// Used to broadcast commit_lsn updates to all background jobs.
     306                 :     commit_lsn_watch_tx: watch::Sender<Lsn>,
     307                 :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     308                 : 
     309                 :     /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
     310                 :     /// them when sending in recovery mode (to walproposer or peers). Note: this
     311                 :     /// is just a notification, WAL reading should always done with lock held as
     312                 :     /// term can change otherwise.
     313                 :     term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
     314                 :     term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
     315                 : 
     316                 :     /// Safekeeper and other state, that should remain consistent and
     317                 :     /// synchronized with the disk. This is tokio mutex as we write WAL to disk
     318                 :     /// while holding it, ensuring that consensus checks are in order.
     319                 :     mutex: Mutex<SharedState>,
     320                 :     walsenders: Arc<WalSenders>,
     321                 :     walreceivers: Arc<WalReceivers>,
     322                 : 
     323                 :     /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
     324                 :     cancellation_tx: watch::Sender<bool>,
     325                 : 
     326                 :     /// Timeline should not be used after cancellation. Background tasks should
     327                 :     /// monitor this channel and stop eventually after receiving `true` from this channel.
     328                 :     cancellation_rx: watch::Receiver<bool>,
     329                 : 
     330                 :     /// Directory where timeline state is stored.
     331                 :     pub timeline_dir: Utf8PathBuf,
     332                 : }
     333                 : 
     334                 : impl Timeline {
     335                 :     /// Load existing timeline from disk.
     336 CBC          84 :     pub fn load_timeline(
     337              84 :         conf: &SafeKeeperConf,
     338              84 :         ttid: TenantTimelineId,
     339              84 :         wal_backup_launcher_tx: Sender<TenantTimelineId>,
     340              84 :     ) -> Result<Timeline> {
     341              84 :         let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
     342                 : 
     343              84 :         let shared_state = SharedState::restore(conf, &ttid)?;
     344              84 :         let rcl = shared_state.sk.state.remote_consistent_lsn;
     345              84 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
     346              84 :             watch::channel(shared_state.sk.state.commit_lsn);
     347              84 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
     348              84 :             shared_state.sk.get_term(),
     349              84 :             shared_state.sk.flush_lsn(),
     350              84 :         )));
     351              84 :         let (cancellation_tx, cancellation_rx) = watch::channel(false);
     352              84 : 
     353              84 :         Ok(Timeline {
     354              84 :             ttid,
     355              84 :             wal_backup_launcher_tx,
     356              84 :             commit_lsn_watch_tx,
     357              84 :             commit_lsn_watch_rx,
     358              84 :             term_flush_lsn_watch_tx,
     359              84 :             term_flush_lsn_watch_rx,
     360              84 :             mutex: Mutex::new(shared_state),
     361              84 :             walsenders: WalSenders::new(rcl),
     362              84 :             walreceivers: WalReceivers::new(),
     363              84 :             cancellation_rx,
     364              84 :             cancellation_tx,
     365              84 :             timeline_dir: conf.timeline_dir(&ttid),
     366              84 :         })
     367              84 :     }
     368                 : 
     369                 :     /// Create a new timeline, which is not yet persisted to disk.
     370             481 :     pub fn create_empty(
     371             481 :         conf: &SafeKeeperConf,
     372             481 :         ttid: TenantTimelineId,
     373             481 :         wal_backup_launcher_tx: Sender<TenantTimelineId>,
     374             481 :         server_info: ServerInfo,
     375             481 :         commit_lsn: Lsn,
     376             481 :         local_start_lsn: Lsn,
     377             481 :     ) -> Result<Timeline> {
     378             481 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
     379             481 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
     380             481 :             watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
     381             481 :         let (cancellation_tx, cancellation_rx) = watch::channel(false);
     382             481 :         let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
     383             481 : 
     384             481 :         Ok(Timeline {
     385             481 :             ttid,
     386             481 :             wal_backup_launcher_tx,
     387             481 :             commit_lsn_watch_tx,
     388             481 :             commit_lsn_watch_rx,
     389             481 :             term_flush_lsn_watch_tx,
     390             481 :             term_flush_lsn_watch_rx,
     391             481 :             mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
     392             481 :             walsenders: WalSenders::new(Lsn(0)),
     393             481 :             walreceivers: WalReceivers::new(),
     394             481 :             cancellation_rx,
     395             481 :             cancellation_tx,
     396             481 :             timeline_dir: conf.timeline_dir(&ttid),
     397                 :         })
     398             481 :     }
     399                 : 
     400                 :     /// Initialize fresh timeline on disk and start background tasks. If init
     401                 :     /// fails, timeline is cancelled and cannot be used anymore.
     402                 :     ///
     403                 :     /// Init is transactional, so if it fails, created files will be deleted,
     404                 :     /// and state on disk should remain unchanged.
     405             481 :     pub async fn init_new(
     406             481 :         self: &Arc<Timeline>,
     407             481 :         shared_state: &mut MutexGuard<'_, SharedState>,
     408             481 :         conf: &SafeKeeperConf,
     409             481 :     ) -> Result<()> {
     410             548 :         match fs::metadata(&self.timeline_dir).await {
     411                 :             Ok(_) => {
     412                 :                 // Timeline directory exists on disk, we should leave state unchanged
     413                 :                 // and return error.
     414 UBC           0 :                 bail!(TimelineError::Invalid(self.ttid));
     415                 :             }
     416 CBC         481 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     417 UBC           0 :             Err(e) => {
     418               0 :                 return Err(e.into());
     419                 :             }
     420                 :         }
     421                 : 
     422                 :         // Create timeline directory.
     423 CBC         495 :         fs::create_dir_all(&self.timeline_dir).await?;
     424                 : 
     425                 :         // Write timeline to disk and start background tasks.
     426            1559 :         if let Err(e) = shared_state.sk.persist().await {
     427                 :             // Bootstrap failed, cancel timeline and remove timeline directory.
     428 UBC           0 :             self.cancel(shared_state);
     429                 : 
     430               0 :             if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
     431               0 :                 warn!(
     432               0 :                     "failed to remove timeline {} directory after bootstrap failure: {}",
     433               0 :                     self.ttid, fs_err
     434               0 :                 );
     435               0 :             }
     436                 : 
     437               0 :             return Err(e);
     438 CBC         481 :         }
     439             481 :         self.bootstrap(conf);
     440             481 :         Ok(())
     441             481 :     }
     442                 : 
     443                 :     /// Bootstrap new or existing timeline starting background stasks.
     444             565 :     pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
     445             565 :         // Start recovery task which always runs on the timeline.
     446             565 :         tokio::spawn(recovery_main(self.clone(), conf.clone()));
     447             565 :     }
     448                 : 
     449                 :     /// Delete timeline from disk completely, by removing timeline directory. Background
     450                 :     /// timeline activities will stop eventually.
     451              31 :     pub async fn delete_from_disk(
     452              31 :         &self,
     453              31 :         shared_state: &mut MutexGuard<'_, SharedState>,
     454              31 :     ) -> Result<(bool, bool)> {
     455              31 :         let was_active = shared_state.active;
     456              31 :         self.cancel(shared_state);
     457              31 :         let dir_existed = delete_dir(&self.timeline_dir).await?;
     458              31 :         Ok((dir_existed, was_active))
     459              31 :     }
     460                 : 
     461                 :     /// Cancel timeline to prevent further usage. Background tasks will stop
     462                 :     /// eventually after receiving cancellation signal.
     463                 :     ///
     464                 :     /// Note that we can't notify backup launcher here while holding
     465                 :     /// shared_state lock, as this is a potential deadlock: caller is
     466                 :     /// responsible for that. Generally we should probably make WAL backup tasks
     467                 :     /// to shut down on their own, checking once in a while whether it is the
     468                 :     /// time.
     469              31 :     fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
     470              31 :         info!("timeline {} is cancelled", self.ttid);
     471              31 :         let _ = self.cancellation_tx.send(true);
     472              31 :         // Close associated FDs. Nobody will be able to touch timeline data once
     473              31 :         // it is cancelled, so WAL storage won't be opened again.
     474              31 :         shared_state.sk.wal_store.close();
     475              31 :     }
     476                 : 
     477                 :     /// Returns if timeline is cancelled.
     478         5413907 :     pub fn is_cancelled(&self) -> bool {
     479         5413907 :         *self.cancellation_rx.borrow()
     480         5413907 :     }
     481                 : 
     482                 :     /// Returns watch channel which gets value when timeline is cancelled. It is
     483                 :     /// guaranteed to have not cancelled value observed (errors otherwise).
     484             565 :     pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
     485             565 :         let rx = self.cancellation_rx.clone();
     486             565 :         if *rx.borrow() {
     487 UBC           0 :             bail!(TimelineError::Cancelled(self.ttid));
     488 CBC         565 :         }
     489             565 :         Ok(rx)
     490             565 :     }
     491                 : 
     492                 :     /// Take a writing mutual exclusive lock on timeline shared_state.
     493         5416017 :     pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
     494         5416017 :         self.mutex.lock().await
     495         5416017 :     }
     496                 : 
     497           14253 :     fn update_status(&self, shared_state: &mut SharedState) -> bool {
     498           14253 :         shared_state.update_status(
     499           14253 :             self.walreceivers.get_num(),
     500           14253 :             self.get_walsenders().get_remote_consistent_lsn(),
     501           14253 :             self.ttid,
     502           14253 :         )
     503           14253 :     }
     504                 : 
     505                 :     /// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
     506            4511 :     pub async fn update_status_notify(&self) -> Result<()> {
     507            4511 :         if self.is_cancelled() {
     508 UBC           0 :             bail!(TimelineError::Cancelled(self.ttid));
     509 CBC        4511 :         }
     510            4511 :         let is_wal_backup_action_pending: bool = {
     511            4511 :             let mut shared_state = self.write_shared_state().await;
     512            4511 :             self.update_status(&mut shared_state)
     513            4511 :         };
     514            4511 :         if is_wal_backup_action_pending {
     515                 :             // Can fail only if channel to a static thread got closed, which is not normal at all.
     516            2866 :             self.wal_backup_launcher_tx.send(self.ttid).await?;
     517            1645 :         }
     518            4511 :         Ok(())
     519            4511 :     }
     520                 : 
     521                 :     /// Returns true if walsender should stop sending WAL to pageserver. We
     522                 :     /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
     523                 :     /// computes. While there might be nothing to stream already, we learn about
     524                 :     /// remote_consistent_lsn update through replication feedback, and we want
     525                 :     /// to stop pushing to the broker if pageserver is fully caughtup.
     526            1999 :     pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
     527            1999 :         if self.is_cancelled() {
     528 LBC         (1) :             return true;
     529 CBC        1999 :         }
     530            1999 :         let shared_state = self.write_shared_state().await;
     531            1999 :         if self.walreceivers.get_num() == 0 {
     532             463 :             return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
     533             463 :             reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn;
     534            1536 :         }
     535            1536 :         false
     536            1999 :     }
     537                 : 
     538                 :     /// Ensure taht current term is t, erroring otherwise, and lock the state.
     539               1 :     pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
     540               1 :         let ss = self.write_shared_state().await;
     541               1 :         if ss.sk.state.acceptor_state.term != t {
     542               1 :             bail!(
     543               1 :                 "failed to acquire term {}, current term {}",
     544               1 :                 t,
     545               1 :                 ss.sk.state.acceptor_state.term
     546               1 :             );
     547 UBC           0 :         }
     548               0 :         Ok(ss)
     549 CBC           1 :     }
     550                 : 
     551                 :     /// Returns whether s3 offloading is required and sets current status as
     552                 :     /// matching it.
     553             192 :     pub async fn wal_backup_attend(&self) -> bool {
     554             192 :         if self.is_cancelled() {
     555 UBC           0 :             return false;
     556 CBC         192 :         }
     557             192 : 
     558             192 :         self.write_shared_state()
     559              39 :             .await
     560             192 :             .wal_backup_attend(self.walreceivers.get_num())
     561             192 :     }
     562                 : 
     563                 :     /// Returns commit_lsn watch channel.
     564             714 :     pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
     565             714 :         self.commit_lsn_watch_rx.clone()
     566             714 :     }
     567                 : 
     568                 :     /// Returns term_flush_lsn watch channel.
     569              94 :     pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
     570              94 :         self.term_flush_lsn_watch_rx.clone()
     571              94 :     }
     572                 : 
     573                 :     /// Pass arrived message to the safekeeper.
     574         5377466 :     pub async fn process_msg(
     575         5377466 :         &self,
     576         5377466 :         msg: &ProposerAcceptorMessage,
     577         5377466 :     ) -> Result<Option<AcceptorProposerMessage>> {
     578         5377466 :         if self.is_cancelled() {
     579 UBC           0 :             bail!(TimelineError::Cancelled(self.ttid));
     580 CBC     5377466 :         }
     581                 : 
     582                 :         let mut rmsg: Option<AcceptorProposerMessage>;
     583                 :         let commit_lsn: Lsn;
     584                 :         let term_flush_lsn: TermLsn;
     585                 :         {
     586         5377466 :             let mut shared_state = self.write_shared_state().await;
     587         5377466 :             rmsg = shared_state.sk.process_msg(msg).await?;
     588                 : 
     589                 :             // if this is AppendResponse, fill in proper pageserver and hot
     590                 :             // standby feedback.
     591         5377466 :             if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
     592         2459760 :                 let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks();
     593         2459760 :                 resp.hs_feedback = hs_feedback;
     594         2459760 :                 resp.pageserver_feedback = ps_feedback;
     595         2917706 :             }
     596                 : 
     597         5377466 :             commit_lsn = shared_state.sk.inmem.commit_lsn;
     598         5377466 :             term_flush_lsn =
     599         5377466 :                 TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
     600         5377466 :         }
     601         5377466 :         self.commit_lsn_watch_tx.send(commit_lsn)?;
     602         5377466 :         self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
     603         5377466 :         Ok(rmsg)
     604         5377466 :     }
     605                 : 
     606                 :     /// Returns wal_seg_size.
     607              25 :     pub async fn get_wal_seg_size(&self) -> usize {
     608              25 :         self.write_shared_state().await.get_wal_seg_size()
     609              25 :     }
     610                 : 
     611                 :     /// Returns true only if the timeline is loaded and active.
     612            8132 :     pub async fn is_active(&self) -> bool {
     613            8132 :         if self.is_cancelled() {
     614 UBC           0 :             return false;
     615 CBC        8132 :         }
     616            8132 : 
     617            8132 :         self.write_shared_state().await.active
     618            8132 :     }
     619                 : 
     620                 :     /// Returns state of the timeline.
     621            2678 :     pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
     622            2678 :         let state = self.write_shared_state().await;
     623            2678 :         (state.sk.inmem.clone(), state.sk.state.clone())
     624            2678 :     }
     625                 : 
     626                 :     /// Returns latest backup_lsn.
     627             297 :     pub async fn get_wal_backup_lsn(&self) -> Lsn {
     628             297 :         self.write_shared_state().await.sk.inmem.backup_lsn
     629             297 :     }
     630                 : 
     631                 :     /// Sets backup_lsn to the given value.
     632              20 :     pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
     633              20 :         if self.is_cancelled() {
     634 UBC           0 :             bail!(TimelineError::Cancelled(self.ttid));
     635 CBC          20 :         }
     636                 : 
     637              20 :         let mut state = self.write_shared_state().await;
     638              20 :         state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
     639              20 :         // we should check whether to shut down offloader, but this will be done
     640              20 :         // soon by peer communication anyway.
     641              20 :         Ok(())
     642              20 :     }
     643                 : 
     644                 :     /// Get safekeeper info for broadcasting to broker and other peers.
     645            6665 :     pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
     646            6665 :         let shared_state = self.write_shared_state().await;
     647            6665 :         shared_state.get_safekeeper_info(
     648            6665 :             &self.ttid,
     649            6665 :             conf,
     650            6665 :             self.walsenders.get_remote_consistent_lsn(),
     651            6665 :         )
     652            6665 :     }
     653                 : 
     654                 :     /// Update timeline state with peer safekeeper data.
     655            9742 :     pub async fn record_safekeeper_info(&self, mut sk_info: SafekeeperTimelineInfo) -> Result<()> {
     656            9742 :         // Update local remote_consistent_lsn in memory (in .walsenders) and in
     657            9742 :         // sk_info to pass it down to control file.
     658            9742 :         sk_info.remote_consistent_lsn = self
     659            9742 :             .walsenders
     660            9742 :             .update_remote_consistent_lsn(Lsn(sk_info.remote_consistent_lsn))
     661            9742 :             .0;
     662                 :         let is_wal_backup_action_pending: bool;
     663                 :         let commit_lsn: Lsn;
     664                 :         {
     665            9742 :             let mut shared_state = self.write_shared_state().await;
     666            9742 :             shared_state.sk.record_safekeeper_info(&sk_info).await?;
     667            9742 :             let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
     668            9742 :             shared_state.peers_info.upsert(&peer_info);
     669            9742 :             is_wal_backup_action_pending = self.update_status(&mut shared_state);
     670            9742 :             commit_lsn = shared_state.sk.inmem.commit_lsn;
     671            9742 :         }
     672            9742 :         self.commit_lsn_watch_tx.send(commit_lsn)?;
     673                 :         // Wake up wal backup launcher, if it is time to stop the offloading.
     674            9742 :         if is_wal_backup_action_pending {
     675            8510 :             self.wal_backup_launcher_tx.send(self.ttid).await?;
     676            1232 :         }
     677            9742 :         Ok(())
     678            9742 :     }
     679                 : 
     680                 :     /// Get our latest view of alive peers status on the timeline.
     681                 :     /// We pass our own info through the broker as well, so when we don't have connection
     682                 :     /// to the broker returned vec is empty.
     683             472 :     pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
     684             472 :         let shared_state = self.write_shared_state().await;
     685             472 :         let now = Instant::now();
     686             472 :         shared_state
     687             472 :             .peers_info
     688             472 :             .0
     689             472 :             .iter()
     690             472 :             // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
     691            1183 :             .filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
     692             472 :             .cloned()
     693             472 :             .collect()
     694             472 :     }
     695                 : 
     696           15511 :     pub fn get_walsenders(&self) -> &Arc<WalSenders> {
     697           15511 :         &self.walsenders
     698           15511 :     }
     699                 : 
     700            2224 :     pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
     701            2224 :         &self.walreceivers
     702            2224 :     }
     703                 : 
     704                 :     /// Returns flush_lsn.
     705             488 :     pub async fn get_flush_lsn(&self) -> Lsn {
     706             488 :         self.write_shared_state().await.sk.wal_store.flush_lsn()
     707             488 :     }
     708                 : 
     709                 :     /// Delete WAL segments from disk that are no longer needed. This is determined
     710                 :     /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
     711            1363 :     pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
     712            1363 :         if self.is_cancelled() {
     713 UBC           0 :             bail!(TimelineError::Cancelled(self.ttid));
     714 CBC        1363 :         }
     715                 : 
     716                 :         let horizon_segno: XLogSegNo;
     717              35 :         let remover = {
     718            1363 :             let shared_state = self.write_shared_state().await;
     719            1363 :             horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
     720            1363 :             if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
     721            1328 :                 return Ok(()); // nothing to do
     722              35 :             }
     723              35 : 
     724              35 :             // release the lock before removing
     725              35 :             shared_state.sk.wal_store.remove_up_to(horizon_segno - 1)
     726              35 :         };
     727              35 : 
     728              35 :         // delete old WAL files
     729              96 :         remover.await?;
     730                 : 
     731                 :         // update last_removed_segno
     732              35 :         let mut shared_state = self.write_shared_state().await;
     733              35 :         shared_state.last_removed_segno = horizon_segno;
     734              35 :         Ok(())
     735            1363 :     }
     736                 : 
     737                 :     /// Persist control file if there is something to save and enough time
     738                 :     /// passed after the last save. This helps to keep remote_consistent_lsn up
     739                 :     /// to date so that storage nodes restart doesn't cause many pageserver ->
     740                 :     /// safekeeper reconnections.
     741            1363 :     pub async fn maybe_persist_control_file(&self) -> Result<()> {
     742            1363 :         let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn();
     743            1363 :         self.write_shared_state()
     744              35 :             .await
     745                 :             .sk
     746            1363 :             .maybe_persist_control_file(remote_consistent_lsn)
     747              81 :             .await
     748            1363 :     }
     749                 : 
     750                 :     /// Gather timeline data for metrics. If the timeline is not active, returns
     751                 :     /// None, we do not collect these.
     752              51 :     pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
     753              51 :         if self.is_cancelled() {
     754 UBC           0 :             return None;
     755 CBC          51 :         }
     756              51 : 
     757              51 :         let ps_feedback = self.walsenders.get_ps_feedback();
     758              51 :         let state = self.write_shared_state().await;
     759              51 :         if state.active {
     760              51 :             Some(FullTimelineInfo {
     761              51 :                 ttid: self.ttid,
     762              51 :                 ps_feedback,
     763              51 :                 wal_backup_active: state.wal_backup_active,
     764              51 :                 timeline_is_active: state.active,
     765              51 :                 num_computes: self.walreceivers.get_num() as u32,
     766              51 :                 last_removed_segno: state.last_removed_segno,
     767              51 :                 epoch_start_lsn: state.sk.epoch_start_lsn,
     768              51 :                 mem_state: state.sk.inmem.clone(),
     769              51 :                 persisted_state: state.sk.state.clone(),
     770              51 :                 flush_lsn: state.sk.wal_store.flush_lsn(),
     771              51 :                 remote_consistent_lsn: self.get_walsenders().get_remote_consistent_lsn(),
     772              51 :                 wal_storage: state.sk.wal_store.get_metrics(),
     773              51 :             })
     774                 :         } else {
     775 UBC           0 :             None
     776                 :         }
     777 CBC          51 :     }
     778                 : 
     779                 :     /// Returns in-memory timeline state to build a full debug dump.
     780               5 :     pub async fn memory_dump(&self) -> debug_dump::Memory {
     781               5 :         let state = self.write_shared_state().await;
     782                 : 
     783               5 :         let (write_lsn, write_record_lsn, flush_lsn, file_open) =
     784               5 :             state.sk.wal_store.internal_state();
     785               5 : 
     786               5 :         debug_dump::Memory {
     787               5 :             is_cancelled: self.is_cancelled(),
     788               5 :             peers_info_len: state.peers_info.0.len(),
     789               5 :             walsenders: self.walsenders.get_all(),
     790               5 :             wal_backup_active: state.wal_backup_active,
     791               5 :             active: state.active,
     792               5 :             num_computes: self.walreceivers.get_num() as u32,
     793               5 :             last_removed_segno: state.last_removed_segno,
     794               5 :             epoch_start_lsn: state.sk.epoch_start_lsn,
     795               5 :             mem_state: state.sk.inmem.clone(),
     796               5 :             write_lsn,
     797               5 :             write_record_lsn,
     798               5 :             flush_lsn,
     799               5 :             file_open,
     800               5 :         }
     801               5 :     }
     802                 : }
     803                 : 
     804                 : /// Deletes directory and it's contents. Returns false if directory does not exist.
     805              31 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
     806              31 :     match fs::remove_dir_all(path).await {
     807              17 :         Ok(_) => Ok(true),
     808              14 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
     809 UBC           0 :         Err(e) => Err(e.into()),
     810                 :     }
     811 CBC          31 : }
        

Generated by: LCOV version 2.1-beta