LCOV - code coverage report
Current view: top level - safekeeper/src - timeline.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 90.9 % 492 447
Test Date: 2023-09-06 10:18:01 Functions: 80.6 % 98 79

            Line data    Source code
       1              : //! This module implements Timeline lifecycle management and has all necessary code
       2              : //! to glue together SafeKeeper and all other background services.
       3              : 
       4              : use anyhow::{anyhow, bail, Result};
       5              : use postgres_ffi::XLogSegNo;
       6              : use serde::{Deserialize, Serialize};
       7              : use serde_with::serde_as;
       8              : use tokio::fs;
       9              : 
      10              : use serde_with::DisplayFromStr;
      11              : use std::cmp::max;
      12              : use std::path::PathBuf;
      13              : use std::sync::Arc;
      14              : use tokio::sync::{Mutex, MutexGuard};
      15              : use tokio::{
      16              :     sync::{mpsc::Sender, watch},
      17              :     time::Instant,
      18              : };
      19              : use tracing::*;
      20              : use utils::http::error::ApiError;
      21              : use utils::{
      22              :     id::{NodeId, TenantTimelineId},
      23              :     lsn::Lsn,
      24              : };
      25              : 
      26              : use storage_broker::proto::SafekeeperTimelineInfo;
      27              : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      28              : 
      29              : use crate::receive_wal::WalReceivers;
      30              : use crate::recovery::recovery_main;
      31              : use crate::safekeeper::{
      32              :     AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
      33              :     SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM,
      34              : };
      35              : use crate::send_wal::WalSenders;
      36              : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
      37              : 
      38              : use crate::metrics::FullTimelineInfo;
      39              : use crate::wal_storage::Storage as wal_storage_iface;
      40              : use crate::SafeKeeperConf;
      41              : use crate::{debug_dump, wal_storage};
      42              : 
      43              : /// Things safekeeper should know about timeline state on peers.
      44              : #[serde_as]
      45        10150 : #[derive(Debug, Clone, Serialize, Deserialize)]
      46              : pub struct PeerInfo {
      47              :     pub sk_id: NodeId,
      48              :     /// Term of the last entry.
      49              :     _last_log_term: Term,
      50              :     /// LSN of the last record.
      51              :     #[serde_as(as = "DisplayFromStr")]
      52              :     _flush_lsn: Lsn,
      53              :     #[serde_as(as = "DisplayFromStr")]
      54              :     pub commit_lsn: Lsn,
      55              :     /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
      56              :     /// sk since backup_lsn.
      57              :     #[serde_as(as = "DisplayFromStr")]
      58              :     pub local_start_lsn: Lsn,
      59              :     /// When info was received. Serde annotations are not very useful but make
      60              :     /// the code compile -- we don't rely on this field externally.
      61              :     #[serde(skip)]
      62              :     #[serde(default = "Instant::now")]
      63              :     ts: Instant,
      64              : }
      65              : 
      66              : impl PeerInfo {
      67         9077 :     fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
      68         9077 :         PeerInfo {
      69         9077 :             sk_id: NodeId(sk_info.safekeeper_id),
      70         9077 :             _last_log_term: sk_info.last_log_term,
      71         9077 :             _flush_lsn: Lsn(sk_info.flush_lsn),
      72         9077 :             commit_lsn: Lsn(sk_info.commit_lsn),
      73         9077 :             local_start_lsn: Lsn(sk_info.local_start_lsn),
      74         9077 :             ts,
      75         9077 :         }
      76         9077 :     }
      77              : }
      78              : 
      79              : // vector-based node id -> peer state map with very limited functionality we
      80              : // need.
      81            0 : #[derive(Debug, Clone, Default)]
      82              : pub struct PeersInfo(pub Vec<PeerInfo>);
      83              : 
      84              : impl PeersInfo {
      85         9077 :     fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
      86        12312 :         self.0.iter_mut().find(|p| p.sk_id == id)
      87         9077 :     }
      88              : 
      89         9077 :     fn upsert(&mut self, p: &PeerInfo) {
      90         9077 :         match self.get(p.sk_id) {
      91         8189 :             Some(rp) => *rp = p.clone(),
      92          888 :             None => self.0.push(p.clone()),
      93              :         }
      94         9077 :     }
      95              : }
      96              : 
      97              : /// Shared state associated with database instance
      98              : pub struct SharedState {
      99              :     /// Safekeeper object
     100              :     sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
     101              :     /// In memory list containing state of peers sent in latest messages from them.
     102              :     peers_info: PeersInfo,
     103              :     /// True when WAL backup launcher oversees the timeline, making sure WAL is
     104              :     /// offloaded, allows to bother launcher less.
     105              :     wal_backup_active: bool,
     106              :     /// True whenever there is at least some pending activity on timeline: live
     107              :     /// compute connection, pageserver is not caughtup (it must have latest WAL
     108              :     /// for new compute start) or WAL backuping is not finished. Practically it
     109              :     /// means safekeepers broadcast info to peers about the timeline, old WAL is
     110              :     /// trimmed.
     111              :     ///
     112              :     /// TODO: it might be better to remove tli completely from GlobalTimelines
     113              :     /// when tli is inactive instead of having this flag.
     114              :     active: bool,
     115              :     num_computes: u32,
     116              :     last_removed_segno: XLogSegNo,
     117              : }
     118              : 
     119              : impl SharedState {
     120              :     /// Initialize fresh timeline state without persisting anything to disk.
     121          523 :     fn create_new(
     122          523 :         conf: &SafeKeeperConf,
     123          523 :         ttid: &TenantTimelineId,
     124          523 :         state: SafeKeeperState,
     125          523 :     ) -> Result<Self> {
     126          523 :         if state.server.wal_seg_size == 0 {
     127            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     128          523 :         }
     129          523 : 
     130          523 :         if state.server.pg_version == UNKNOWN_SERVER_VERSION {
     131            0 :             bail!(TimelineError::UninitialinzedPgVersion(*ttid));
     132          523 :         }
     133          523 : 
     134          523 :         if state.commit_lsn < state.local_start_lsn {
     135            0 :             bail!(
     136            0 :                 "commit_lsn {} is higher than local_start_lsn {}",
     137            0 :                 state.commit_lsn,
     138            0 :                 state.local_start_lsn
     139            0 :             );
     140          523 :         }
     141              : 
     142              :         // We don't want to write anything to disk, because we may have existing timeline there.
     143              :         // These functions should not change anything on disk.
     144          523 :         let control_store = control_file::FileStorage::create_new(ttid, conf, state)?;
     145          523 :         let wal_store =
     146          523 :             wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
     147          523 :         let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
     148              : 
     149          523 :         Ok(Self {
     150          523 :             sk,
     151          523 :             peers_info: PeersInfo(vec![]),
     152          523 :             wal_backup_active: false,
     153          523 :             active: false,
     154          523 :             num_computes: 0,
     155          523 :             last_removed_segno: 0,
     156          523 :         })
     157          523 :     }
     158              : 
     159              :     /// Restore SharedState from control file. If file doesn't exist, bails out.
     160           81 :     fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
     161           81 :         let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
     162           81 :         if control_store.server.wal_seg_size == 0 {
     163            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     164           81 :         }
     165              : 
     166           81 :         let wal_store =
     167           81 :             wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
     168              : 
     169              :         Ok(Self {
     170           81 :             sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
     171           81 :             peers_info: PeersInfo(vec![]),
     172              :             wal_backup_active: false,
     173              :             active: false,
     174              :             num_computes: 0,
     175              :             last_removed_segno: 0,
     176              :         })
     177           81 :     }
     178              : 
     179        13817 :     fn is_active(&self, num_computes: usize, remote_consistent_lsn: Lsn) -> bool {
     180        13817 :         self.is_wal_backup_required(num_computes)
     181              :             // FIXME: add tracking of relevant pageservers and check them here individually,
     182              :             // otherwise migration won't work (we suspend too early).
     183         2569 :             || remote_consistent_lsn < self.sk.inmem.commit_lsn
     184        13817 :     }
     185              : 
     186              :     /// Mark timeline active/inactive and return whether s3 offloading requires
     187              :     /// start/stop action.
     188        13817 :     fn update_status(
     189        13817 :         &mut self,
     190        13817 :         num_computes: usize,
     191        13817 :         remote_consistent_lsn: Lsn,
     192        13817 :         ttid: TenantTimelineId,
     193        13817 :     ) -> bool {
     194        13817 :         let is_active = self.is_active(num_computes, remote_consistent_lsn);
     195        13817 :         if self.active != is_active {
     196         1702 :             info!("timeline {} active={} now", ttid, is_active);
     197        12115 :         }
     198        13817 :         self.active = is_active;
     199        13817 :         self.is_wal_backup_action_pending(num_computes)
     200        13817 :     }
     201              : 
     202              :     /// Should we run s3 offloading in current state?
     203        38737 :     fn is_wal_backup_required(&self, num_computes: usize) -> bool {
     204        38737 :         let seg_size = self.get_wal_seg_size();
     205        38737 :         num_computes > 0 ||
     206              :         // Currently only the whole segment is offloaded, so compare segment numbers.
     207         9980 :             (self.sk.inmem.commit_lsn.segment_number(seg_size) >
     208         9980 :              self.sk.inmem.backup_lsn.segment_number(seg_size))
     209        38737 :     }
     210              : 
     211              :     /// Is current state of s3 offloading is not what it ought to be?
     212        13817 :     fn is_wal_backup_action_pending(&self, num_computes: usize) -> bool {
     213        13817 :         let res = self.wal_backup_active != self.is_wal_backup_required(num_computes);
     214        13817 :         if res {
     215        10915 :             let action_pending = if self.is_wal_backup_required(num_computes) {
     216        10841 :                 "start"
     217              :             } else {
     218           74 :                 "stop"
     219              :             };
     220        10915 :             trace!(
     221            0 :                 "timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
     222            0 :                 self.sk.state.timeline_id, action_pending, self.num_computes, self.sk.inmem.commit_lsn, self.sk.inmem.backup_lsn
     223            0 :             );
     224         2902 :         }
     225        13817 :         res
     226        13817 :     }
     227              : 
     228              :     /// Returns whether s3 offloading is required and sets current status as
     229              :     /// matching.
     230          188 :     fn wal_backup_attend(&mut self, num_computes: usize) -> bool {
     231          188 :         self.wal_backup_active = self.is_wal_backup_required(num_computes);
     232          188 :         self.wal_backup_active
     233          188 :     }
     234              : 
     235        38762 :     fn get_wal_seg_size(&self) -> usize {
     236        38762 :         self.sk.state.server.wal_seg_size as usize
     237        38762 :     }
     238              : 
     239         6300 :     fn get_safekeeper_info(
     240         6300 :         &self,
     241         6300 :         ttid: &TenantTimelineId,
     242         6300 :         conf: &SafeKeeperConf,
     243         6300 :         remote_consistent_lsn: Lsn,
     244         6300 :     ) -> SafekeeperTimelineInfo {
     245         6300 :         SafekeeperTimelineInfo {
     246         6300 :             safekeeper_id: conf.my_id.0,
     247         6300 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     248         6300 :                 tenant_id: ttid.tenant_id.as_ref().to_owned(),
     249         6300 :                 timeline_id: ttid.timeline_id.as_ref().to_owned(),
     250         6300 :             }),
     251         6300 :             term: self.sk.state.acceptor_state.term,
     252         6300 :             last_log_term: self.sk.get_epoch(),
     253         6300 :             flush_lsn: self.sk.flush_lsn().0,
     254         6300 :             // note: this value is not flushed to control file yet and can be lost
     255         6300 :             commit_lsn: self.sk.inmem.commit_lsn.0,
     256         6300 :             remote_consistent_lsn: remote_consistent_lsn.0,
     257         6300 :             peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
     258         6300 :             safekeeper_connstr: conf
     259         6300 :                 .advertise_pg_addr
     260         6300 :                 .to_owned()
     261         6300 :                 .unwrap_or(conf.listen_pg_addr.clone()),
     262         6300 :             http_connstr: conf.listen_http_addr.to_owned(),
     263         6300 :             backup_lsn: self.sk.inmem.backup_lsn.0,
     264         6300 :             local_start_lsn: self.sk.state.local_start_lsn.0,
     265         6300 :             availability_zone: conf.availability_zone.clone(),
     266         6300 :         }
     267         6300 :     }
     268              : }
     269              : 
     270            0 : #[derive(Debug, thiserror::Error)]
     271              : pub enum TimelineError {
     272              :     #[error("Timeline {0} was cancelled and cannot be used anymore")]
     273              :     Cancelled(TenantTimelineId),
     274              :     #[error("Timeline {0} was not found in global map")]
     275              :     NotFound(TenantTimelineId),
     276              :     #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
     277              :     Invalid(TenantTimelineId),
     278              :     #[error("Timeline {0} is already exists")]
     279              :     AlreadyExists(TenantTimelineId),
     280              :     #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
     281              :     UninitializedWalSegSize(TenantTimelineId),
     282              :     #[error("Timeline {0} is not initialized, pg_version is unknown")]
     283              :     UninitialinzedPgVersion(TenantTimelineId),
     284              : }
     285              : 
     286              : // Convert to HTTP API error.
     287              : impl From<TimelineError> for ApiError {
     288            0 :     fn from(te: TimelineError) -> ApiError {
     289            0 :         match te {
     290            0 :             TimelineError::NotFound(ttid) => {
     291            0 :                 ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
     292              :             }
     293            0 :             _ => ApiError::InternalServerError(anyhow!("{}", te)),
     294              :         }
     295            0 :     }
     296              : }
     297              : 
     298              : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
     299              : /// It also holds SharedState and provides mutually exclusive access to it.
     300              : pub struct Timeline {
     301              :     pub ttid: TenantTimelineId,
     302              : 
     303              :     /// Sending here asks for wal backup launcher attention (start/stop
     304              :     /// offloading). Sending ttid instead of concrete command allows to do
     305              :     /// sending without timeline lock.
     306              :     pub wal_backup_launcher_tx: Sender<TenantTimelineId>,
     307              : 
     308              :     /// Used to broadcast commit_lsn updates to all background jobs.
     309              :     commit_lsn_watch_tx: watch::Sender<Lsn>,
     310              :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     311              : 
     312              :     /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
     313              :     /// them when sending in recovery mode (to walproposer or peers). Note: this
     314              :     /// is just a notification, WAL reading should always done with lock held as
     315              :     /// term can change otherwise.
     316              :     term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
     317              :     term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
     318              : 
     319              :     /// Safekeeper and other state, that should remain consistent and
     320              :     /// synchronized with the disk. This is tokio mutex as we write WAL to disk
     321              :     /// while holding it, ensuring that consensus checks are in order.
     322              :     mutex: Mutex<SharedState>,
     323              :     walsenders: Arc<WalSenders>,
     324              :     walreceivers: Arc<WalReceivers>,
     325              : 
     326              :     /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
     327              :     cancellation_tx: watch::Sender<bool>,
     328              : 
     329              :     /// Timeline should not be used after cancellation. Background tasks should
     330              :     /// monitor this channel and stop eventually after receiving `true` from this channel.
     331              :     cancellation_rx: watch::Receiver<bool>,
     332              : 
     333              :     /// Directory where timeline state is stored.
     334              :     pub timeline_dir: PathBuf,
     335              : }
     336              : 
     337              : impl Timeline {
     338              :     /// Load existing timeline from disk.
     339           81 :     pub fn load_timeline(
     340           81 :         conf: &SafeKeeperConf,
     341           81 :         ttid: TenantTimelineId,
     342           81 :         wal_backup_launcher_tx: Sender<TenantTimelineId>,
     343           81 :     ) -> Result<Timeline> {
     344           81 :         let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
     345              : 
     346           81 :         let shared_state = SharedState::restore(conf, &ttid)?;
     347           81 :         let rcl = shared_state.sk.state.remote_consistent_lsn;
     348           81 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
     349           81 :             watch::channel(shared_state.sk.state.commit_lsn);
     350           81 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
     351           81 :             shared_state.sk.get_term(),
     352           81 :             shared_state.sk.flush_lsn(),
     353           81 :         )));
     354           81 :         let (cancellation_tx, cancellation_rx) = watch::channel(false);
     355           81 : 
     356           81 :         Ok(Timeline {
     357           81 :             ttid,
     358           81 :             wal_backup_launcher_tx,
     359           81 :             commit_lsn_watch_tx,
     360           81 :             commit_lsn_watch_rx,
     361           81 :             term_flush_lsn_watch_tx,
     362           81 :             term_flush_lsn_watch_rx,
     363           81 :             mutex: Mutex::new(shared_state),
     364           81 :             walsenders: WalSenders::new(rcl),
     365           81 :             walreceivers: WalReceivers::new(),
     366           81 :             cancellation_rx,
     367           81 :             cancellation_tx,
     368           81 :             timeline_dir: conf.timeline_dir(&ttid),
     369           81 :         })
     370           81 :     }
     371              : 
     372              :     /// Create a new timeline, which is not yet persisted to disk.
     373          523 :     pub fn create_empty(
     374          523 :         conf: &SafeKeeperConf,
     375          523 :         ttid: TenantTimelineId,
     376          523 :         wal_backup_launcher_tx: Sender<TenantTimelineId>,
     377          523 :         server_info: ServerInfo,
     378          523 :         commit_lsn: Lsn,
     379          523 :         local_start_lsn: Lsn,
     380          523 :     ) -> Result<Timeline> {
     381          523 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
     382          523 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
     383          523 :             watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
     384          523 :         let (cancellation_tx, cancellation_rx) = watch::channel(false);
     385          523 :         let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
     386          523 : 
     387          523 :         Ok(Timeline {
     388          523 :             ttid,
     389          523 :             wal_backup_launcher_tx,
     390          523 :             commit_lsn_watch_tx,
     391          523 :             commit_lsn_watch_rx,
     392          523 :             term_flush_lsn_watch_tx,
     393          523 :             term_flush_lsn_watch_rx,
     394          523 :             mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
     395          523 :             walsenders: WalSenders::new(Lsn(0)),
     396          523 :             walreceivers: WalReceivers::new(),
     397          523 :             cancellation_rx,
     398          523 :             cancellation_tx,
     399          523 :             timeline_dir: conf.timeline_dir(&ttid),
     400              :         })
     401          523 :     }
     402              : 
     403              :     /// Initialize fresh timeline on disk and start background tasks. If init
     404              :     /// fails, timeline is cancelled and cannot be used anymore.
     405              :     ///
     406              :     /// Init is transactional, so if it fails, created files will be deleted,
     407              :     /// and state on disk should remain unchanged.
     408          523 :     pub async fn init_new(
     409          523 :         self: &Arc<Timeline>,
     410          523 :         shared_state: &mut MutexGuard<'_, SharedState>,
     411          523 :         conf: &SafeKeeperConf,
     412          523 :     ) -> Result<()> {
     413          595 :         match fs::metadata(&self.timeline_dir).await {
     414              :             Ok(_) => {
     415              :                 // Timeline directory exists on disk, we should leave state unchanged
     416              :                 // and return error.
     417            0 :                 bail!(TimelineError::Invalid(self.ttid));
     418              :             }
     419          523 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     420            0 :             Err(e) => {
     421            0 :                 return Err(e.into());
     422              :             }
     423              :         }
     424              : 
     425              :         // Create timeline directory.
     426          536 :         fs::create_dir_all(&self.timeline_dir).await?;
     427              : 
     428              :         // Write timeline to disk and start background tasks.
     429         1687 :         if let Err(e) = shared_state.sk.persist().await {
     430              :             // Bootstrap failed, cancel timeline and remove timeline directory.
     431            0 :             self.cancel(shared_state);
     432              : 
     433            0 :             if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
     434            0 :                 warn!(
     435            0 :                     "failed to remove timeline {} directory after bootstrap failure: {}",
     436            0 :                     self.ttid, fs_err
     437            0 :                 );
     438            0 :             }
     439              : 
     440            0 :             return Err(e);
     441          523 :         }
     442          523 :         self.bootstrap(conf);
     443          523 :         Ok(())
     444          523 :     }
     445              : 
     446              :     /// Bootstrap new or existing timeline starting background stasks.
     447          604 :     pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
     448          604 :         // Start recovery task which always runs on the timeline.
     449          604 :         tokio::spawn(recovery_main(self.clone(), conf.clone()));
     450          604 :     }
     451              : 
     452              :     /// Delete timeline from disk completely, by removing timeline directory. Background
     453              :     /// timeline activities will stop eventually.
     454           31 :     pub async fn delete_from_disk(
     455           31 :         &self,
     456           31 :         shared_state: &mut MutexGuard<'_, SharedState>,
     457           31 :     ) -> Result<(bool, bool)> {
     458           31 :         let was_active = shared_state.active;
     459           31 :         self.cancel(shared_state);
     460           31 :         let dir_existed = delete_dir(&self.timeline_dir).await?;
     461           31 :         Ok((dir_existed, was_active))
     462           31 :     }
     463              : 
     464              :     /// Cancel timeline to prevent further usage. Background tasks will stop
     465              :     /// eventually after receiving cancellation signal.
     466              :     ///
     467              :     /// Note that we can't notify backup launcher here while holding
     468              :     /// shared_state lock, as this is a potential deadlock: caller is
     469              :     /// responsible for that. Generally we should probably make WAL backup tasks
     470              :     /// to shut down on their own, checking once in a while whether it is the
     471              :     /// time.
     472           31 :     fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) {
     473           31 :         info!("timeline {} is cancelled", self.ttid);
     474           31 :         let _ = self.cancellation_tx.send(true);
     475           31 :         // Close associated FDs. Nobody will be able to touch timeline data once
     476           31 :         // it is cancelled, so WAL storage won't be opened again.
     477           31 :         shared_state.sk.wal_store.close();
     478           31 :     }
     479              : 
     480              :     /// Returns if timeline is cancelled.
     481      4883812 :     pub fn is_cancelled(&self) -> bool {
     482      4883812 :         *self.cancellation_rx.borrow()
     483      4883812 :     }
     484              : 
     485              :     /// Returns watch channel which gets value when timeline is cancelled. It is
     486              :     /// guaranteed to have not cancelled value observed (errors otherwise).
     487          604 :     pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
     488          604 :         let rx = self.cancellation_rx.clone();
     489          604 :         if *rx.borrow() {
     490            0 :             bail!(TimelineError::Cancelled(self.ttid));
     491          604 :         }
     492          604 :         Ok(rx)
     493          604 :     }
     494              : 
     495              :     /// Take a writing mutual exclusive lock on timeline shared_state.
     496      4885949 :     pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
     497      4885949 :         self.mutex.lock().await
     498      4885949 :     }
     499              : 
     500        13817 :     fn update_status(&self, shared_state: &mut SharedState) -> bool {
     501        13817 :         shared_state.update_status(
     502        13817 :             self.walreceivers.get_num(),
     503        13817 :             self.get_walsenders().get_remote_consistent_lsn(),
     504        13817 :             self.ttid,
     505        13817 :         )
     506        13817 :     }
     507              : 
     508              :     /// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
     509         4740 :     pub async fn update_status_notify(&self) -> Result<()> {
     510         4740 :         if self.is_cancelled() {
     511            0 :             bail!(TimelineError::Cancelled(self.ttid));
     512         4740 :         }
     513         4740 :         let is_wal_backup_action_pending: bool = {
     514         4740 :             let mut shared_state = self.write_shared_state().await;
     515         4740 :             self.update_status(&mut shared_state)
     516         4740 :         };
     517         4740 :         if is_wal_backup_action_pending {
     518              :             // Can fail only if channel to a static thread got closed, which is not normal at all.
     519         3049 :             self.wal_backup_launcher_tx.send(self.ttid).await?;
     520         1691 :         }
     521         4740 :         Ok(())
     522         4740 :     }
     523              : 
     524              :     /// Returns true if walsender should stop sending WAL to pageserver. We
     525              :     /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
     526              :     /// computes. While there might be nothing to stream already, we learn about
     527              :     /// remote_consistent_lsn update through replication feedback, and we want
     528              :     /// to stop pushing to the broker if pageserver is fully caughtup.
     529         1941 :     pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
     530         1941 :         if self.is_cancelled() {
     531            0 :             return true;
     532         1941 :         }
     533         1941 :         let shared_state = self.write_shared_state().await;
     534         1941 :         if shared_state.num_computes == 0 {
     535         1941 :             return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
     536         1941 :             reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn;
     537            0 :         }
     538            0 :         false
     539         1941 :     }
     540              : 
     541              :     /// Ensure taht current term is t, erroring otherwise, and lock the state.
     542            1 :     pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
     543            1 :         let ss = self.write_shared_state().await;
     544            1 :         if ss.sk.state.acceptor_state.term != t {
     545            1 :             bail!(
     546            1 :                 "failed to acquire term {}, current term {}",
     547            1 :                 t,
     548            1 :                 ss.sk.state.acceptor_state.term
     549            1 :             );
     550            0 :         }
     551            0 :         Ok(ss)
     552            1 :     }
     553              : 
     554              :     /// Returns whether s3 offloading is required and sets current status as
     555              :     /// matching it.
     556          188 :     pub async fn wal_backup_attend(&self) -> bool {
     557          188 :         if self.is_cancelled() {
     558            0 :             return false;
     559          188 :         }
     560          188 : 
     561          188 :         self.write_shared_state()
     562           42 :             .await
     563          188 :             .wal_backup_attend(self.walreceivers.get_num())
     564          188 :     }
     565              : 
     566              :     /// Returns commit_lsn watch channel.
     567          791 :     pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
     568          791 :         self.commit_lsn_watch_rx.clone()
     569          791 :     }
     570              : 
     571              :     /// Returns term_flush_lsn watch channel.
     572           64 :     pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
     573           64 :         self.term_flush_lsn_watch_rx.clone()
     574           64 :     }
     575              : 
     576              :     /// Pass arrived message to the safekeeper.
     577      4848632 :     pub async fn process_msg(
     578      4848632 :         &self,
     579      4848632 :         msg: &ProposerAcceptorMessage,
     580      4848632 :     ) -> Result<Option<AcceptorProposerMessage>> {
     581      4848632 :         if self.is_cancelled() {
     582            0 :             bail!(TimelineError::Cancelled(self.ttid));
     583      4848632 :         }
     584              : 
     585              :         let mut rmsg: Option<AcceptorProposerMessage>;
     586              :         let commit_lsn: Lsn;
     587              :         let term_flush_lsn: TermLsn;
     588              :         {
     589      4848632 :             let mut shared_state = self.write_shared_state().await;
     590      4848632 :             rmsg = shared_state.sk.process_msg(msg).await?;
     591              : 
     592              :             // if this is AppendResponse, fill in proper pageserver and hot
     593              :             // standby feedback.
     594      4848632 :             if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
     595      2200283 :                 let (ps_feedback, hs_feedback) = self.walsenders.get_feedbacks();
     596      2200283 :                 resp.hs_feedback = hs_feedback;
     597      2200283 :                 resp.pageserver_feedback = ps_feedback;
     598      2648349 :             }
     599              : 
     600      4848632 :             commit_lsn = shared_state.sk.inmem.commit_lsn;
     601      4848632 :             term_flush_lsn =
     602      4848632 :                 TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
     603      4848632 :         }
     604      4848632 :         self.commit_lsn_watch_tx.send(commit_lsn)?;
     605      4848632 :         self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
     606      4848632 :         Ok(rmsg)
     607      4848632 :     }
     608              : 
     609              :     /// Returns wal_seg_size.
     610           25 :     pub async fn get_wal_seg_size(&self) -> usize {
     611           25 :         self.write_shared_state().await.get_wal_seg_size()
     612           25 :     }
     613              : 
     614              :     /// Returns true only if the timeline is loaded and active.
     615         7727 :     pub async fn is_active(&self) -> bool {
     616         7727 :         if self.is_cancelled() {
     617            0 :             return false;
     618         7727 :         }
     619         7727 : 
     620         7727 :         self.write_shared_state().await.active
     621         7727 :     }
     622              : 
     623              :     /// Returns state of the timeline.
     624         2851 :     pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
     625         2851 :         let state = self.write_shared_state().await;
     626         2851 :         (state.sk.inmem.clone(), state.sk.state.clone())
     627         2851 :     }
     628              : 
     629              :     /// Returns latest backup_lsn.
     630          287 :     pub async fn get_wal_backup_lsn(&self) -> Lsn {
     631          287 :         self.write_shared_state().await.sk.inmem.backup_lsn
     632          287 :     }
     633              : 
     634              :     /// Sets backup_lsn to the given value.
     635           23 :     pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
     636           23 :         if self.is_cancelled() {
     637            0 :             bail!(TimelineError::Cancelled(self.ttid));
     638           23 :         }
     639              : 
     640           23 :         let mut state = self.write_shared_state().await;
     641           23 :         state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
     642           23 :         // we should check whether to shut down offloader, but this will be done
     643           23 :         // soon by peer communication anyway.
     644           23 :         Ok(())
     645           23 :     }
     646              : 
     647              :     /// Get safekeeper info for broadcasting to broker and other peers.
     648         6300 :     pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
     649         6300 :         let shared_state = self.write_shared_state().await;
     650         6300 :         shared_state.get_safekeeper_info(
     651         6300 :             &self.ttid,
     652         6300 :             conf,
     653         6300 :             self.walsenders.get_remote_consistent_lsn(),
     654         6300 :         )
     655         6300 :     }
     656              : 
     657              :     /// Update timeline state with peer safekeeper data.
     658         9077 :     pub async fn record_safekeeper_info(&self, mut sk_info: SafekeeperTimelineInfo) -> Result<()> {
     659         9077 :         // Update local remote_consistent_lsn in memory (in .walsenders) and in
     660         9077 :         // sk_info to pass it down to control file.
     661         9077 :         sk_info.remote_consistent_lsn = self
     662         9077 :             .walsenders
     663         9077 :             .update_remote_consistent_lsn(Lsn(sk_info.remote_consistent_lsn))
     664         9077 :             .0;
     665              :         let is_wal_backup_action_pending: bool;
     666              :         let commit_lsn: Lsn;
     667              :         {
     668         9077 :             let mut shared_state = self.write_shared_state().await;
     669         9077 :             shared_state.sk.record_safekeeper_info(&sk_info).await?;
     670         9077 :             let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
     671         9077 :             shared_state.peers_info.upsert(&peer_info);
     672         9077 :             is_wal_backup_action_pending = self.update_status(&mut shared_state);
     673         9077 :             commit_lsn = shared_state.sk.inmem.commit_lsn;
     674         9077 :         }
     675         9077 :         self.commit_lsn_watch_tx.send(commit_lsn)?;
     676              :         // Wake up wal backup launcher, if it is time to stop the offloading.
     677         9077 :         if is_wal_backup_action_pending {
     678         7866 :             self.wal_backup_launcher_tx.send(self.ttid).await?;
     679         1211 :         }
     680         9077 :         Ok(())
     681         9077 :     }
     682              : 
     683              :     /// Get our latest view of alive peers status on the timeline.
     684              :     /// We pass our own info through the broker as well, so when we don't have connection
     685              :     /// to the broker returned vec is empty.
     686          443 :     pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
     687          443 :         let shared_state = self.write_shared_state().await;
     688          443 :         let now = Instant::now();
     689          443 :         shared_state
     690          443 :             .peers_info
     691          443 :             .0
     692          443 :             .iter()
     693          443 :             // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
     694         1082 :             .filter(|p| now.duration_since(p.ts) <= conf.heartbeat_timeout)
     695          443 :             .cloned()
     696          443 :             .collect()
     697          443 :     }
     698              : 
     699        15096 :     pub fn get_walsenders(&self) -> &Arc<WalSenders> {
     700        15096 :         &self.walsenders
     701        15096 :     }
     702              : 
     703         2304 :     pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
     704         2304 :         &self.walreceivers
     705         2304 :     }
     706              : 
     707              :     /// Returns flush_lsn.
     708          460 :     pub async fn get_flush_lsn(&self) -> Lsn {
     709          460 :         self.write_shared_state().await.sk.wal_store.flush_lsn()
     710          460 :     }
     711              : 
     712              :     /// Delete WAL segments from disk that are no longer needed. This is determined
     713              :     /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
     714         1315 :     pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
     715         1315 :         if self.is_cancelled() {
     716            0 :             bail!(TimelineError::Cancelled(self.ttid));
     717         1315 :         }
     718              : 
     719              :         let horizon_segno: XLogSegNo;
     720           14 :         let remover = {
     721         1315 :             let shared_state = self.write_shared_state().await;
     722         1315 :             horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
     723         1315 :             if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
     724         1301 :                 return Ok(()); // nothing to do
     725           14 :             }
     726           14 :             let remover = shared_state.sk.wal_store.remove_up_to(horizon_segno - 1);
     727           14 :             // release the lock before removing
     728           14 :             remover
     729           14 :         };
     730           14 : 
     731           14 :         // delete old WAL files
     732           54 :         remover.await?;
     733              : 
     734              :         // update last_removed_segno
     735           14 :         let mut shared_state = self.write_shared_state().await;
     736           14 :         shared_state.last_removed_segno = horizon_segno;
     737           14 :         Ok(())
     738         1315 :     }
     739              : 
     740              :     /// Persist control file if there is something to save and enough time
     741              :     /// passed after the last save. This helps to keep remote_consistent_lsn up
     742              :     /// to date so that storage nodes restart doesn't cause many pageserver ->
     743              :     /// safekeeper reconnections.
     744         1315 :     pub async fn maybe_persist_control_file(&self) -> Result<()> {
     745         1315 :         let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn();
     746         1315 :         self.write_shared_state()
     747           40 :             .await
     748              :             .sk
     749         1315 :             .maybe_persist_control_file(remote_consistent_lsn)
     750            3 :             .await
     751         1315 :     }
     752              : 
     753              :     /// Gather timeline data for metrics. If the timeline is not active, returns
     754              :     /// None, we do not collect these.
     755           51 :     pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
     756           51 :         if self.is_cancelled() {
     757            0 :             return None;
     758           51 :         }
     759           51 : 
     760           51 :         let ps_feedback = self.walsenders.get_ps_feedback();
     761           51 :         let state = self.write_shared_state().await;
     762           51 :         if state.active {
     763           51 :             Some(FullTimelineInfo {
     764           51 :                 ttid: self.ttid,
     765           51 :                 ps_feedback,
     766           51 :                 wal_backup_active: state.wal_backup_active,
     767           51 :                 timeline_is_active: state.active,
     768           51 :                 num_computes: state.num_computes,
     769           51 :                 last_removed_segno: state.last_removed_segno,
     770           51 :                 epoch_start_lsn: state.sk.epoch_start_lsn,
     771           51 :                 mem_state: state.sk.inmem.clone(),
     772           51 :                 persisted_state: state.sk.state.clone(),
     773           51 :                 flush_lsn: state.sk.wal_store.flush_lsn(),
     774           51 :                 remote_consistent_lsn: self.get_walsenders().get_remote_consistent_lsn(),
     775           51 :                 wal_storage: state.sk.wal_store.get_metrics(),
     776           51 :             })
     777              :         } else {
     778            0 :             None
     779              :         }
     780           51 :     }
     781              : 
     782              :     /// Returns in-memory timeline state to build a full debug dump.
     783            5 :     pub async fn memory_dump(&self) -> debug_dump::Memory {
     784            5 :         let state = self.write_shared_state().await;
     785              : 
     786            5 :         let (write_lsn, write_record_lsn, flush_lsn, file_open) =
     787            5 :             state.sk.wal_store.internal_state();
     788            5 : 
     789            5 :         debug_dump::Memory {
     790            5 :             is_cancelled: self.is_cancelled(),
     791            5 :             peers_info_len: state.peers_info.0.len(),
     792            5 :             walsenders: self.walsenders.get_all(),
     793            5 :             wal_backup_active: state.wal_backup_active,
     794            5 :             active: state.active,
     795            5 :             num_computes: state.num_computes,
     796            5 :             last_removed_segno: state.last_removed_segno,
     797            5 :             epoch_start_lsn: state.sk.epoch_start_lsn,
     798            5 :             mem_state: state.sk.inmem.clone(),
     799            5 :             write_lsn,
     800            5 :             write_record_lsn,
     801            5 :             flush_lsn,
     802            5 :             file_open,
     803            5 :         }
     804            5 :     }
     805              : }
     806              : 
     807              : /// Deletes directory and it's contents. Returns false if directory does not exist.
     808           31 : async fn delete_dir(path: &PathBuf) -> Result<bool> {
     809           31 :     match fs::remove_dir_all(path).await {
     810           17 :         Ok(_) => Ok(true),
     811           14 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
     812            0 :         Err(e) => Err(e.into()),
     813              :     }
     814           31 : }
        

Generated by: LCOV version 2.1-beta