LCOV - code coverage report
Current view: top level - safekeeper/src - timeline.rs (source / functions) Coverage Total Hit
Test: 1d5975439f3c9882b18414799141ebf9a3922c58.info Lines: 33.9 % 755 256
Test Date: 2025-07-31 15:59:03 Functions: 40.0 % 125 50

            Line data    Source code
       1              : //! This module implements Timeline lifecycle management and has all necessary code
       2              : //! to glue together SafeKeeper and all other background services.
       3              : 
       4              : use std::cmp::max;
       5              : use std::ops::{Deref, DerefMut};
       6              : use std::sync::Arc;
       7              : use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
       8              : use std::time::Duration;
       9              : 
      10              : use anyhow::{Result, anyhow, bail};
      11              : use camino::{Utf8Path, Utf8PathBuf};
      12              : use http_utils::error::ApiError;
      13              : use remote_storage::RemotePath;
      14              : use safekeeper_api::Term;
      15              : use safekeeper_api::membership::Configuration;
      16              : use safekeeper_api::models::{
      17              :     PeerInfo, TimelineMembershipSwitchResponse, TimelineTermBumpResponse,
      18              : };
      19              : use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
      20              : use tokio::fs::{self};
      21              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, watch};
      22              : use tokio::time::Instant;
      23              : use tokio_util::sync::CancellationToken;
      24              : use tracing::*;
      25              : use utils::id::{NodeId, TenantId, TenantTimelineId};
      26              : use utils::lsn::Lsn;
      27              : use utils::sync::gate::Gate;
      28              : 
      29              : use crate::metrics::{
      30              :     FullTimelineInfo, MISC_OPERATION_SECONDS, WAL_STORAGE_LIMIT_ERRORS, WalStorageMetrics,
      31              : };
      32              : 
      33              : use crate::hadron::GLOBAL_DISK_LIMIT_EXCEEDED;
      34              : use crate::rate_limit::RateLimiter;
      35              : use crate::receive_wal::WalReceivers;
      36              : use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn};
      37              : use crate::send_wal::{WalSenders, WalSendersTimelineMetricValues};
      38              : use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
      39              : use crate::timeline_guard::ResidenceGuard;
      40              : use crate::timeline_manager::{AtomicStatus, ManagerCtl};
      41              : use crate::timelines_set::TimelinesSet;
      42              : use crate::wal_backup;
      43              : use crate::wal_backup::{WalBackup, remote_timeline_path};
      44              : use crate::wal_backup_partial::PartialRemoteSegment;
      45              : use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
      46              : use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
      47              : 
      48            0 : fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
      49            0 :     PeerInfo {
      50            0 :         sk_id: NodeId(sk_info.safekeeper_id),
      51            0 :         term: sk_info.term,
      52            0 :         last_log_term: sk_info.last_log_term,
      53            0 :         flush_lsn: Lsn(sk_info.flush_lsn),
      54            0 :         commit_lsn: Lsn(sk_info.commit_lsn),
      55            0 :         local_start_lsn: Lsn(sk_info.local_start_lsn),
      56            0 :         pg_connstr: sk_info.safekeeper_connstr.clone(),
      57            0 :         http_connstr: sk_info.http_connstr.clone(),
      58            0 :         https_connstr: sk_info.https_connstr.clone(),
      59            0 :         ts,
      60            0 :     }
      61            0 : }
      62              : 
      63              : // vector-based node id -> peer state map with very limited functionality we
      64              : // need.
      65              : #[derive(Debug, Clone, Default)]
      66              : pub struct PeersInfo(pub Vec<PeerInfo>);
      67              : 
      68              : impl PeersInfo {
      69            0 :     fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
      70            0 :         self.0.iter_mut().find(|p| p.sk_id == id)
      71            0 :     }
      72              : 
      73            0 :     fn upsert(&mut self, p: &PeerInfo) {
      74            0 :         match self.get(p.sk_id) {
      75            0 :             Some(rp) => *rp = p.clone(),
      76            0 :             None => self.0.push(p.clone()),
      77              :         }
      78            0 :     }
      79              : }
      80              : 
      81              : pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
      82              : 
      83              : /// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
      84              : /// automatically updates `watch::Sender` channels with state on drop.
      85              : pub struct WriteGuardSharedState<'a> {
      86              :     tli: Arc<Timeline>,
      87              :     guard: RwLockWriteGuard<'a, SharedState>,
      88              : }
      89              : 
      90              : impl<'a> WriteGuardSharedState<'a> {
      91         1249 :     fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
      92         1249 :         WriteGuardSharedState { tli, guard }
      93         1249 :     }
      94              : }
      95              : 
      96              : impl Deref for WriteGuardSharedState<'_> {
      97              :     type Target = SharedState;
      98              : 
      99            0 :     fn deref(&self) -> &Self::Target {
     100            0 :         &self.guard
     101            0 :     }
     102              : }
     103              : 
     104              : impl DerefMut for WriteGuardSharedState<'_> {
     105         1244 :     fn deref_mut(&mut self) -> &mut Self::Target {
     106         1244 :         &mut self.guard
     107         1244 :     }
     108              : }
     109              : 
     110              : impl Drop for WriteGuardSharedState<'_> {
     111         1249 :     fn drop(&mut self) {
     112         1249 :         let term_flush_lsn =
     113         1249 :             TermLsn::from((self.guard.sk.last_log_term(), self.guard.sk.flush_lsn()));
     114         1249 :         let commit_lsn = self.guard.sk.state().inmem.commit_lsn;
     115              : 
     116         1249 :         let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
     117         1249 :             if *old != term_flush_lsn {
     118          620 :                 *old = term_flush_lsn;
     119          620 :                 true
     120              :             } else {
     121          629 :                 false
     122              :             }
     123         1249 :         });
     124              : 
     125         1249 :         let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
     126         1249 :             if *old != commit_lsn {
     127          615 :                 *old = commit_lsn;
     128          615 :                 true
     129              :             } else {
     130          634 :                 false
     131              :             }
     132         1249 :         });
     133              : 
     134              :         // send notification about shared state update
     135         1249 :         self.tli.shared_state_version_tx.send_modify(|old| {
     136         1249 :             *old += 1;
     137         1249 :         });
     138         1249 :     }
     139              : }
     140              : 
     141              : /// This structure is stored in shared state and represents the state of the timeline.
     142              : ///
     143              : /// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
     144              : /// case, SafeKeeper is not available (because WAL is not present on disk) and all
     145              : /// operations can be done only with control file.
     146              : #[allow(clippy::large_enum_variant, reason = "TODO")]
     147              : pub enum StateSK {
     148              :     Loaded(SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>),
     149              :     Offloaded(Box<TimelineState<control_file::FileStorage>>),
     150              :     // Not used, required for moving between states.
     151              :     Empty,
     152              : }
     153              : 
     154              : impl StateSK {
     155         2580 :     pub fn flush_lsn(&self) -> Lsn {
     156         2580 :         match self {
     157         2580 :             StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
     158            0 :             StateSK::Offloaded(state) => match state.eviction_state {
     159            0 :                 EvictionState::Offloaded(flush_lsn) => flush_lsn,
     160            0 :                 _ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
     161              :             },
     162            0 :             StateSK::Empty => unreachable!(),
     163              :         }
     164         2580 :     }
     165              : 
     166              :     /// Get a reference to the control file's timeline state.
     167         2613 :     pub fn state(&self) -> &TimelineState<control_file::FileStorage> {
     168         2613 :         match self {
     169         2613 :             StateSK::Loaded(sk) => &sk.state,
     170            0 :             StateSK::Offloaded(s) => s,
     171            0 :             StateSK::Empty => unreachable!(),
     172              :         }
     173         2613 :     }
     174              : 
     175            4 :     pub fn state_mut(&mut self) -> &mut TimelineState<control_file::FileStorage> {
     176            4 :         match self {
     177            4 :             StateSK::Loaded(sk) => &mut sk.state,
     178            0 :             StateSK::Offloaded(s) => s,
     179            0 :             StateSK::Empty => unreachable!(),
     180              :         }
     181            4 :     }
     182              : 
     183         1290 :     pub fn last_log_term(&self) -> Term {
     184         1290 :         self.state()
     185         1290 :             .acceptor_state
     186         1290 :             .get_last_log_term(self.flush_lsn())
     187         1290 :     }
     188              : 
     189            0 :     pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
     190            0 :         self.state_mut().term_bump(to).await
     191            0 :     }
     192              : 
     193            0 :     pub async fn membership_switch(
     194            0 :         &mut self,
     195            0 :         to: Configuration,
     196            0 :     ) -> Result<TimelineMembershipSwitchResponse> {
     197            0 :         let result = self.state_mut().membership_switch(to).await?;
     198            0 :         let flush_lsn = self.flush_lsn();
     199            0 :         let last_log_term = self.state().acceptor_state.get_last_log_term(flush_lsn);
     200              : 
     201            0 :         Ok(TimelineMembershipSwitchResponse {
     202            0 :             previous_conf: result.previous_conf,
     203            0 :             current_conf: result.current_conf,
     204            0 :             last_log_term,
     205            0 :             flush_lsn,
     206            0 :         })
     207            0 :     }
     208              : 
     209              :     /// Close open WAL files to release FDs.
     210            0 :     fn close_wal_store(&mut self) {
     211            0 :         if let StateSK::Loaded(sk) = self {
     212            0 :             sk.wal_store.close();
     213            0 :         }
     214            0 :     }
     215              : 
     216              :     /// Update timeline state with peer safekeeper data.
     217            0 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
     218              :         // update commit_lsn if safekeeper is loaded
     219            0 :         match self {
     220            0 :             StateSK::Loaded(sk) => sk.record_safekeeper_info(sk_info).await?,
     221            0 :             StateSK::Offloaded(_) => {}
     222            0 :             StateSK::Empty => unreachable!(),
     223              :         }
     224              : 
     225              :         // update everything else, including remote_consistent_lsn and backup_lsn
     226            0 :         let mut sync_control_file = false;
     227            0 :         let state = self.state_mut();
     228            0 :         let wal_seg_size = state.server.wal_seg_size as u64;
     229              : 
     230            0 :         state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
     231            0 :         sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
     232              : 
     233            0 :         state.inmem.remote_consistent_lsn = max(
     234            0 :             Lsn(sk_info.remote_consistent_lsn),
     235            0 :             state.inmem.remote_consistent_lsn,
     236            0 :         );
     237            0 :         sync_control_file |=
     238            0 :             state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
     239              : 
     240            0 :         state.inmem.peer_horizon_lsn =
     241            0 :             max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
     242            0 :         sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
     243              : 
     244            0 :         if sync_control_file {
     245            0 :             state.flush().await?;
     246            0 :         }
     247            0 :         Ok(())
     248            0 :     }
     249              : 
     250              :     /// Previously known as epoch_start_lsn. Needed only for reference in some APIs.
     251            0 :     pub fn term_start_lsn(&self) -> Lsn {
     252            0 :         match self {
     253            0 :             StateSK::Loaded(sk) => sk.term_start_lsn,
     254            0 :             StateSK::Offloaded(_) => Lsn(0),
     255            0 :             StateSK::Empty => unreachable!(),
     256              :         }
     257            0 :     }
     258              : 
     259              :     /// Used for metrics only.
     260            0 :     pub fn wal_storage_metrics(&self) -> WalStorageMetrics {
     261            0 :         match self {
     262            0 :             StateSK::Loaded(sk) => sk.wal_store.get_metrics(),
     263            0 :             StateSK::Offloaded(_) => WalStorageMetrics::default(),
     264            0 :             StateSK::Empty => unreachable!(),
     265              :         }
     266            0 :     }
     267              : 
     268              :     /// Returns WAL storage internal LSNs for debug dump.
     269            0 :     pub fn wal_storage_internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
     270            0 :         match self {
     271            0 :             StateSK::Loaded(sk) => sk.wal_store.internal_state(),
     272              :             StateSK::Offloaded(_) => {
     273            0 :                 let flush_lsn = self.flush_lsn();
     274            0 :                 (flush_lsn, flush_lsn, flush_lsn, false)
     275              :             }
     276            0 :             StateSK::Empty => unreachable!(),
     277              :         }
     278            0 :     }
     279              : 
     280              :     /// Access to SafeKeeper object. Panics if offloaded, should be good to use from WalResidentTimeline.
     281         1240 :     pub fn safekeeper(
     282         1240 :         &mut self,
     283         1240 :     ) -> &mut SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage> {
     284         1240 :         match self {
     285         1240 :             StateSK::Loaded(sk) => sk,
     286              :             StateSK::Offloaded(_) => {
     287            0 :                 panic!("safekeeper is offloaded, cannot be used")
     288              :             }
     289            0 :             StateSK::Empty => unreachable!(),
     290              :         }
     291         1240 :     }
     292              : 
     293              :     /// Moves control file's state structure out of the enum. Used to switch states.
     294            0 :     fn take_state(self) -> TimelineState<control_file::FileStorage> {
     295            0 :         match self {
     296            0 :             StateSK::Loaded(sk) => sk.state,
     297            0 :             StateSK::Offloaded(state) => *state,
     298            0 :             StateSK::Empty => unreachable!(),
     299              :         }
     300            0 :     }
     301              : }
     302              : 
     303              : /// Shared state associated with database instance
     304              : pub struct SharedState {
     305              :     /// Safekeeper object
     306              :     pub(crate) sk: StateSK,
     307              :     /// In memory list containing state of peers sent in latest messages from them.
     308              :     pub(crate) peers_info: PeersInfo,
     309              :     // True value hinders old WAL removal; this is used by snapshotting. We
     310              :     // could make it a counter, but there is no need to.
     311              :     pub(crate) wal_removal_on_hold: bool,
     312              : }
     313              : 
     314              : impl SharedState {
     315              :     /// Creates a new SharedState.
     316            5 :     pub fn new(sk: StateSK) -> Self {
     317            5 :         Self {
     318            5 :             sk,
     319            5 :             peers_info: PeersInfo(vec![]),
     320            5 :             wal_removal_on_hold: false,
     321            5 :         }
     322            5 :     }
     323              : 
     324              :     /// Restore SharedState from control file. If file doesn't exist, bails out.
     325            0 :     pub fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
     326            0 :         let timeline_dir = get_timeline_dir(conf, ttid);
     327            0 :         let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
     328            0 :         if control_store.server.wal_seg_size == 0 {
     329            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     330            0 :         }
     331              : 
     332            0 :         let sk = match control_store.eviction_state {
     333              :             EvictionState::Present => {
     334            0 :                 let wal_store = wal_storage::PhysicalStorage::new(
     335            0 :                     ttid,
     336            0 :                     &timeline_dir,
     337            0 :                     &control_store,
     338            0 :                     conf.no_sync,
     339            0 :                 )?;
     340            0 :                 StateSK::Loaded(SafeKeeper::new(
     341            0 :                     TimelineState::new(control_store),
     342            0 :                     wal_store,
     343            0 :                     conf.my_id,
     344            0 :                 )?)
     345              :             }
     346              :             EvictionState::Offloaded(_) => {
     347            0 :                 StateSK::Offloaded(Box::new(TimelineState::new(control_store)))
     348              :             }
     349              :         };
     350              : 
     351            0 :         Ok(Self::new(sk))
     352            0 :     }
     353              : 
     354            5 :     pub(crate) fn get_wal_seg_size(&self) -> usize {
     355            5 :         self.sk.state().server.wal_seg_size as usize
     356            5 :     }
     357              : 
     358            0 :     fn get_safekeeper_info(
     359            0 :         &self,
     360            0 :         ttid: &TenantTimelineId,
     361            0 :         conf: &SafeKeeperConf,
     362            0 :         standby_apply_lsn: Lsn,
     363            0 :     ) -> SafekeeperTimelineInfo {
     364            0 :         SafekeeperTimelineInfo {
     365            0 :             safekeeper_id: conf.my_id.0,
     366            0 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     367            0 :                 tenant_id: ttid.tenant_id.as_ref().to_owned(),
     368            0 :                 timeline_id: ttid.timeline_id.as_ref().to_owned(),
     369            0 :             }),
     370            0 :             term: self.sk.state().acceptor_state.term,
     371            0 :             last_log_term: self.sk.last_log_term(),
     372            0 :             flush_lsn: self.sk.flush_lsn().0,
     373            0 :             // note: this value is not flushed to control file yet and can be lost
     374            0 :             commit_lsn: self.sk.state().inmem.commit_lsn.0,
     375            0 :             remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
     376            0 :             peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
     377            0 :             safekeeper_connstr: conf
     378            0 :                 .advertise_pg_addr
     379            0 :                 .to_owned()
     380            0 :                 .unwrap_or(conf.listen_pg_addr.clone()),
     381            0 :             http_connstr: conf.listen_http_addr.to_owned(),
     382            0 :             https_connstr: conf.listen_https_addr.to_owned(),
     383            0 :             backup_lsn: self.sk.state().inmem.backup_lsn.0,
     384            0 :             local_start_lsn: self.sk.state().local_start_lsn.0,
     385            0 :             availability_zone: conf.availability_zone.clone(),
     386            0 :             standby_horizon: standby_apply_lsn.0,
     387            0 :         }
     388            0 :     }
     389              : 
     390              :     /// Get our latest view of alive peers status on the timeline.
     391              :     /// We pass our own info through the broker as well, so when we don't have connection
     392              :     /// to the broker returned vec is empty.
     393           36 :     pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
     394           36 :         let now = Instant::now();
     395           36 :         self.peers_info
     396           36 :             .0
     397           36 :             .iter()
     398              :             // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
     399           36 :             .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
     400           36 :             .cloned()
     401           36 :             .collect()
     402           36 :     }
     403              : }
     404              : 
     405              : #[derive(Debug, thiserror::Error)]
     406              : pub enum TimelineError {
     407              :     #[error("Timeline {0} was cancelled and cannot be used anymore")]
     408              :     Cancelled(TenantTimelineId),
     409              :     #[error("Timeline {0} was not found in global map")]
     410              :     NotFound(TenantTimelineId),
     411              :     #[error("Timeline {0} has been deleted")]
     412              :     Deleted(TenantTimelineId),
     413              :     #[error("Timeline {0} creation is in progress")]
     414              :     CreationInProgress(TenantTimelineId),
     415              :     #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
     416              :     Invalid(TenantTimelineId),
     417              :     #[error("Timeline {0} is already exists")]
     418              :     AlreadyExists(TenantTimelineId),
     419              :     #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
     420              :     UninitializedWalSegSize(TenantTimelineId),
     421              :     #[error("Timeline {0} is not initialized, pg_version is unknown")]
     422              :     UninitialinzedPgVersion(TenantTimelineId),
     423              : }
     424              : 
     425              : // Convert to HTTP API error.
     426              : impl From<TimelineError> for ApiError {
     427            0 :     fn from(te: TimelineError) -> ApiError {
     428            0 :         match te {
     429            0 :             TimelineError::NotFound(ttid) => {
     430            0 :                 ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
     431              :             }
     432            0 :             TimelineError::Deleted(ttid) => {
     433            0 :                 ApiError::NotFound(anyhow!("timeline {} deleted", ttid).into())
     434              :             }
     435            0 :             _ => ApiError::InternalServerError(anyhow!("{}", te)),
     436              :         }
     437            0 :     }
     438              : }
     439              : 
     440              : /// We run remote deletion in a background task, this is how it sends its results back.
     441              : type RemoteDeletionReceiver = tokio::sync::watch::Receiver<Option<anyhow::Result<()>>>;
     442              : 
     443              : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
     444              : /// It also holds SharedState and provides mutually exclusive access to it.
     445              : pub struct Timeline {
     446              :     pub ttid: TenantTimelineId,
     447              :     pub remote_path: RemotePath,
     448              : 
     449              :     /// Used to broadcast commit_lsn updates to all background jobs.
     450              :     commit_lsn_watch_tx: watch::Sender<Lsn>,
     451              :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     452              : 
     453              :     /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
     454              :     /// them when sending in recovery mode (to walproposer or peers). Note: this
     455              :     /// is just a notification, WAL reading should always done with lock held as
     456              :     /// term can change otherwise.
     457              :     term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
     458              :     term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
     459              : 
     460              :     /// Broadcasts shared state updates.
     461              :     shared_state_version_tx: watch::Sender<usize>,
     462              :     shared_state_version_rx: watch::Receiver<usize>,
     463              : 
     464              :     /// Safekeeper and other state, that should remain consistent and
     465              :     /// synchronized with the disk. This is tokio mutex as we write WAL to disk
     466              :     /// while holding it, ensuring that consensus checks are in order.
     467              :     mutex: RwLock<SharedState>,
     468              :     walsenders: Arc<WalSenders>,
     469              :     walreceivers: Arc<WalReceivers>,
     470              :     timeline_dir: Utf8PathBuf,
     471              :     manager_ctl: ManagerCtl,
     472              :     conf: Arc<SafeKeeperConf>,
     473              : 
     474              :     pub(crate) wal_backup: Arc<WalBackup>,
     475              : 
     476              :     remote_deletion: std::sync::Mutex<Option<RemoteDeletionReceiver>>,
     477              : 
     478              :     /// Hold this gate from code that depends on the Timeline's non-shut-down state.  While holding
     479              :     /// this gate, you must respect [`Timeline::cancel`]
     480              :     pub(crate) gate: Gate,
     481              : 
     482              :     /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
     483              :     pub(crate) cancel: CancellationToken,
     484              : 
     485              :     // timeline_manager controlled state
     486              :     pub(crate) broker_active: AtomicBool,
     487              :     pub(crate) wal_backup_active: AtomicBool,
     488              :     pub(crate) last_removed_segno: AtomicU64,
     489              :     pub(crate) mgr_status: AtomicStatus,
     490              : }
     491              : 
     492              : impl Timeline {
     493              :     /// Constructs a new timeline.
     494            5 :     pub fn new(
     495            5 :         ttid: TenantTimelineId,
     496            5 :         timeline_dir: &Utf8Path,
     497            5 :         remote_path: &RemotePath,
     498            5 :         shared_state: SharedState,
     499            5 :         conf: Arc<SafeKeeperConf>,
     500            5 :         wal_backup: Arc<WalBackup>,
     501            5 :     ) -> Arc<Self> {
     502            5 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
     503            5 :             watch::channel(shared_state.sk.state().commit_lsn);
     504            5 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
     505            5 :             shared_state.sk.last_log_term(),
     506            5 :             shared_state.sk.flush_lsn(),
     507            5 :         )));
     508            5 :         let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
     509              : 
     510            5 :         let walreceivers = WalReceivers::new();
     511              : 
     512            5 :         Arc::new(Self {
     513            5 :             ttid,
     514            5 :             remote_path: remote_path.to_owned(),
     515            5 :             timeline_dir: timeline_dir.to_owned(),
     516            5 :             commit_lsn_watch_tx,
     517            5 :             commit_lsn_watch_rx,
     518            5 :             term_flush_lsn_watch_tx,
     519            5 :             term_flush_lsn_watch_rx,
     520            5 :             shared_state_version_tx,
     521            5 :             shared_state_version_rx,
     522            5 :             mutex: RwLock::new(shared_state),
     523            5 :             walsenders: WalSenders::new(walreceivers.clone()),
     524            5 :             walreceivers,
     525            5 :             gate: Default::default(),
     526            5 :             cancel: CancellationToken::default(),
     527            5 :             remote_deletion: std::sync::Mutex::new(None),
     528            5 :             manager_ctl: ManagerCtl::new(),
     529            5 :             conf,
     530            5 :             broker_active: AtomicBool::new(false),
     531            5 :             wal_backup_active: AtomicBool::new(false),
     532            5 :             last_removed_segno: AtomicU64::new(0),
     533            5 :             mgr_status: AtomicStatus::new(),
     534            5 :             wal_backup,
     535            5 :         })
     536            5 :     }
     537              : 
     538              :     /// Load existing timeline from disk.
     539            0 :     pub fn load_timeline(
     540            0 :         conf: Arc<SafeKeeperConf>,
     541            0 :         ttid: TenantTimelineId,
     542            0 :         wal_backup: Arc<WalBackup>,
     543            0 :     ) -> Result<Arc<Timeline>> {
     544            0 :         let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
     545              : 
     546            0 :         let shared_state = SharedState::restore(conf.as_ref(), &ttid)?;
     547            0 :         let timeline_dir = get_timeline_dir(conf.as_ref(), &ttid);
     548            0 :         let remote_path = remote_timeline_path(&ttid)?;
     549              : 
     550            0 :         Ok(Timeline::new(
     551            0 :             ttid,
     552            0 :             &timeline_dir,
     553            0 :             &remote_path,
     554            0 :             shared_state,
     555            0 :             conf,
     556            0 :             wal_backup,
     557            0 :         ))
     558            0 :     }
     559              : 
     560              :     /// Bootstrap new or existing timeline starting background tasks.
     561            5 :     pub fn bootstrap(
     562            5 :         self: &Arc<Timeline>,
     563            5 :         _shared_state: &mut WriteGuardSharedState<'_>,
     564            5 :         conf: &SafeKeeperConf,
     565            5 :         broker_active_set: Arc<TimelinesSet>,
     566            5 :         partial_backup_rate_limiter: RateLimiter,
     567            5 :         wal_backup: Arc<WalBackup>,
     568            5 :     ) {
     569            5 :         let (tx, rx) = self.manager_ctl.bootstrap_manager();
     570              : 
     571            5 :         let Ok(gate_guard) = self.gate.enter() else {
     572              :             // Init raced with shutdown
     573            0 :             return;
     574              :         };
     575              : 
     576              :         // Start manager task which will monitor timeline state and update
     577              :         // background tasks.
     578            5 :         tokio::spawn({
     579            5 :             let this = self.clone();
     580            5 :             let conf = conf.clone();
     581            5 :             async move {
     582            5 :                 let _gate_guard = gate_guard;
     583            5 :                 timeline_manager::main_task(
     584            5 :                     ManagerTimeline { tli: this },
     585            5 :                     conf,
     586            5 :                     broker_active_set,
     587            5 :                     tx,
     588            5 :                     rx,
     589            5 :                     partial_backup_rate_limiter,
     590            5 :                     wal_backup,
     591            5 :                 )
     592            5 :                 .await
     593            0 :             }
     594              :         });
     595            5 :     }
     596              : 
     597              :     /// Cancel the timeline, requesting background activity to stop. Closing
     598              :     /// the `self.gate` waits for that.
     599            0 :     pub fn cancel(&self) {
     600            0 :         info!("timeline {} shutting down", self.ttid);
     601            0 :         self.cancel.cancel();
     602            0 :     }
     603              : 
     604              :     /// Background timeline activities (which hold Timeline::gate) will no
     605              :     /// longer run once this function completes. `Self::cancel` must have been
     606              :     /// already called.
     607            0 :     pub async fn close(&self) {
     608            0 :         assert!(self.cancel.is_cancelled());
     609              : 
     610              :         // Wait for any concurrent tasks to stop using this timeline, to avoid e.g. attempts
     611              :         // to read deleted files.
     612            0 :         self.gate.close().await;
     613            0 :     }
     614              : 
     615              :     /// Delete timeline from disk completely, by removing timeline directory.
     616              :     ///
     617              :     /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
     618              :     /// deletion API endpoint is retriable.
     619              :     ///
     620              :     /// Timeline must be in shut-down state (i.e. call [`Self::close`] first)
     621            0 :     pub async fn delete(
     622            0 :         &self,
     623            0 :         shared_state: &mut WriteGuardSharedState<'_>,
     624            0 :         only_local: bool,
     625            0 :     ) -> Result<bool> {
     626              :         // Assert that [`Self::close`] was already called
     627            0 :         assert!(self.cancel.is_cancelled());
     628            0 :         assert!(self.gate.close_complete());
     629              : 
     630            0 :         info!("deleting timeline {} from disk", self.ttid);
     631              : 
     632              :         // Close associated FDs. Nobody will be able to touch timeline data once
     633              :         // it is cancelled, so WAL storage won't be opened again.
     634            0 :         shared_state.sk.close_wal_store();
     635              : 
     636            0 :         if !only_local {
     637            0 :             self.remote_delete().await?;
     638            0 :         }
     639              : 
     640            0 :         let dir_existed = delete_dir(&self.timeline_dir).await?;
     641            0 :         Ok(dir_existed)
     642            0 :     }
     643              : 
     644              :     /// Delete timeline content from remote storage.  If the returned future is dropped,
     645              :     /// deletion will continue in the background.
     646              :     ///
     647              :     /// This function ordinarily spawns a task and stashes a result receiver into [`Self::remote_deletion`].  If
     648              :     /// deletion is already happening, it may simply wait for an existing task's result.
     649              :     ///
     650              :     /// Note: we concurrently delete remote storage data from multiple
     651              :     /// safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
     652              :     /// do some retries anyway.
     653            0 :     async fn remote_delete(&self) -> Result<()> {
     654              :         // We will start a background task to do the deletion, so that it proceeds even if our
     655              :         // API request is dropped.  Future requests will see the existing deletion task and wait
     656              :         // for it to complete.
     657            0 :         let mut result_rx = {
     658            0 :             let mut remote_deletion_state = self.remote_deletion.lock().unwrap();
     659            0 :             let result_rx = if let Some(result_rx) = remote_deletion_state.as_ref() {
     660            0 :                 if let Some(result) = result_rx.borrow().as_ref() {
     661            0 :                     if let Err(e) = result {
     662              :                         // A previous remote deletion failed: we will start a new one
     663            0 :                         tracing::error!("remote deletion failed, will retry ({e})");
     664            0 :                         None
     665              :                     } else {
     666              :                         // A previous remote deletion call already succeeded
     667            0 :                         return Ok(());
     668              :                     }
     669              :                 } else {
     670              :                     // Remote deletion is still in flight
     671            0 :                     Some(result_rx.clone())
     672              :                 }
     673              :             } else {
     674              :                 // Remote deletion was not attempted yet, start it now.
     675            0 :                 None
     676              :             };
     677              : 
     678            0 :             match result_rx {
     679            0 :                 Some(result_rx) => result_rx,
     680            0 :                 None => self.start_remote_delete(&mut remote_deletion_state),
     681              :             }
     682              :         };
     683              : 
     684              :         // Wait for a result
     685            0 :         let Ok(result) = result_rx.wait_for(|v| v.is_some()).await else {
     686              :             // Unexpected: sender should always send a result before dropping the channel, even if it has an error
     687            0 :             return Err(anyhow::anyhow!(
     688            0 :                 "remote deletion task future was dropped without sending a result"
     689            0 :             ));
     690              :         };
     691              : 
     692            0 :         result
     693            0 :             .as_ref()
     694            0 :             .expect("We did a wait_for on this being Some above")
     695            0 :             .as_ref()
     696            0 :             .map(|_| ())
     697            0 :             .map_err(|e| anyhow::anyhow!("remote deletion failed: {e}"))
     698            0 :     }
     699              : 
     700              :     /// Spawn background task to do remote deletion, return a receiver for its outcome
     701            0 :     fn start_remote_delete(
     702            0 :         &self,
     703            0 :         guard: &mut std::sync::MutexGuard<Option<RemoteDeletionReceiver>>,
     704            0 :     ) -> RemoteDeletionReceiver {
     705            0 :         tracing::info!("starting remote deletion");
     706            0 :         let storage = self.wal_backup.get_storage().clone();
     707            0 :         let (result_tx, result_rx) = tokio::sync::watch::channel(None);
     708            0 :         let ttid = self.ttid;
     709            0 :         tokio::task::spawn(
     710            0 :             async move {
     711            0 :                 let r = if let Some(storage) = storage {
     712            0 :                     wal_backup::delete_timeline(&storage, &ttid).await
     713              :                 } else {
     714            0 :                     tracing::info!(
     715            0 :                         "skipping remote deletion because no remote storage is configured; this effectively leaks the objects in remote storage"
     716              :                     );
     717            0 :                     Ok(())
     718              :                 };
     719              : 
     720            0 :                 if let Err(e) = &r {
     721              :                     // Log error here in case nobody ever listens for our result (e.g. dropped API request)
     722            0 :                     tracing::error!("remote deletion failed: {e}");
     723            0 :                 }
     724              : 
     725              :                 // Ignore send results: it's legal for the Timeline to give up waiting for us.
     726            0 :                 let _ = result_tx.send(Some(r));
     727            0 :             }
     728            0 :             .instrument(info_span!("remote_delete", timeline = %self.ttid)),
     729              :         );
     730              : 
     731            0 :         **guard = Some(result_rx.clone());
     732              : 
     733            0 :         result_rx
     734            0 :     }
     735              : 
     736              :     /// Returns if timeline is cancelled.
     737         2505 :     pub fn is_cancelled(&self) -> bool {
     738         2505 :         self.cancel.is_cancelled()
     739         2505 :     }
     740              : 
     741              :     /// Take a writing mutual exclusive lock on timeline shared_state.
     742         1249 :     pub async fn write_shared_state(self: &Arc<Self>) -> WriteGuardSharedState<'_> {
     743         1249 :         WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
     744         1249 :     }
     745              : 
     746           55 :     pub async fn read_shared_state(&self) -> ReadGuardSharedState {
     747           55 :         self.mutex.read().await
     748           55 :     }
     749              : 
     750              :     /// Returns commit_lsn watch channel.
     751            5 :     pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
     752            5 :         self.commit_lsn_watch_rx.clone()
     753            5 :     }
     754              : 
     755              :     /// Returns term_flush_lsn watch channel.
     756            0 :     pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
     757            0 :         self.term_flush_lsn_watch_rx.clone()
     758            0 :     }
     759              : 
     760              :     /// Returns watch channel for SharedState update version.
     761            5 :     pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
     762            5 :         self.shared_state_version_rx.clone()
     763            5 :     }
     764              : 
     765              :     /// Returns wal_seg_size.
     766            5 :     pub async fn get_wal_seg_size(&self) -> usize {
     767            5 :         self.read_shared_state().await.get_wal_seg_size()
     768            5 :     }
     769              : 
     770              :     /// Returns state of the timeline.
     771            9 :     pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
     772            9 :         let state = self.read_shared_state().await;
     773            9 :         (
     774            9 :             state.sk.state().inmem.clone(),
     775            9 :             TimelinePersistentState::clone(state.sk.state()),
     776            9 :         )
     777            9 :     }
     778              : 
     779              :     /// Returns latest backup_lsn.
     780            0 :     pub async fn get_wal_backup_lsn(&self) -> Lsn {
     781            0 :         self.read_shared_state().await.sk.state().inmem.backup_lsn
     782            0 :     }
     783              : 
     784              :     /// Sets backup_lsn to the given value.
     785            0 :     pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
     786            0 :         if self.is_cancelled() {
     787            0 :             bail!(TimelineError::Cancelled(self.ttid));
     788            0 :         }
     789              : 
     790            0 :         let mut state = self.write_shared_state().await;
     791            0 :         state.sk.state_mut().inmem.backup_lsn = max(state.sk.state().inmem.backup_lsn, backup_lsn);
     792              :         // we should check whether to shut down offloader, but this will be done
     793              :         // soon by peer communication anyway.
     794            0 :         Ok(())
     795            0 :     }
     796              : 
     797              :     /// Get safekeeper info for broadcasting to broker and other peers.
     798            0 :     pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
     799            0 :         let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
     800            0 :         let shared_state = self.read_shared_state().await;
     801            0 :         shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
     802            0 :     }
     803              : 
     804              :     /// Update timeline state with peer safekeeper data.
     805            0 :     pub async fn record_safekeeper_info(
     806            0 :         self: &Arc<Self>,
     807            0 :         sk_info: SafekeeperTimelineInfo,
     808            0 :     ) -> Result<()> {
     809              :         {
     810            0 :             let mut shared_state = self.write_shared_state().await;
     811            0 :             shared_state.sk.record_safekeeper_info(&sk_info).await?;
     812            0 :             let peer_info = peer_info_from_sk_info(&sk_info, Instant::now());
     813            0 :             shared_state.peers_info.upsert(&peer_info);
     814              :         }
     815            0 :         Ok(())
     816            0 :     }
     817              : 
     818            0 :     pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
     819            0 :         let shared_state = self.read_shared_state().await;
     820            0 :         shared_state.get_peers(conf.heartbeat_timeout)
     821            0 :     }
     822              : 
     823            5 :     pub fn get_walsenders(&self) -> &Arc<WalSenders> {
     824            5 :         &self.walsenders
     825            5 :     }
     826              : 
     827           15 :     pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
     828           15 :         &self.walreceivers
     829           15 :     }
     830              : 
     831              :     /// Returns flush_lsn.
     832            0 :     pub async fn get_flush_lsn(&self) -> Lsn {
     833            0 :         self.read_shared_state().await.sk.flush_lsn()
     834            0 :     }
     835              : 
     836              :     /// Gather timeline data for metrics.
     837            0 :     pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
     838            0 :         if self.is_cancelled() {
     839            0 :             return None;
     840            0 :         }
     841              : 
     842              :         let WalSendersTimelineMetricValues {
     843            0 :             ps_feedback_counter,
     844            0 :             ps_corruption_detected,
     845            0 :             last_ps_feedback,
     846            0 :             interpreted_wal_reader_tasks,
     847            0 :         } = self.walsenders.info_for_metrics();
     848              : 
     849            0 :         let state = self.read_shared_state().await;
     850            0 :         Some(FullTimelineInfo {
     851            0 :             ttid: self.ttid,
     852            0 :             ps_feedback_count: ps_feedback_counter,
     853            0 :             ps_corruption_detected,
     854            0 :             last_ps_feedback,
     855            0 :             wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
     856            0 :             timeline_is_active: self.broker_active.load(Ordering::Relaxed),
     857            0 :             num_computes: self.walreceivers.get_num() as u32,
     858            0 :             last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
     859            0 :             interpreted_wal_reader_tasks,
     860            0 :             epoch_start_lsn: state.sk.term_start_lsn(),
     861            0 :             mem_state: state.sk.state().inmem.clone(),
     862            0 :             persisted_state: TimelinePersistentState::clone(state.sk.state()),
     863            0 :             flush_lsn: state.sk.flush_lsn(),
     864            0 :             wal_storage: state.sk.wal_storage_metrics(),
     865            0 :         })
     866            0 :     }
     867              : 
     868              :     /// Returns in-memory timeline state to build a full debug dump.
     869            0 :     pub async fn memory_dump(&self) -> debug_dump::Memory {
     870            0 :         let state = self.read_shared_state().await;
     871              : 
     872            0 :         let (write_lsn, write_record_lsn, flush_lsn, file_open) =
     873            0 :             state.sk.wal_storage_internal_state();
     874              : 
     875            0 :         debug_dump::Memory {
     876            0 :             is_cancelled: self.is_cancelled(),
     877            0 :             peers_info_len: state.peers_info.0.len(),
     878            0 :             walsenders: self.walsenders.get_all_public(),
     879            0 :             wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
     880            0 :             active: self.broker_active.load(Ordering::Relaxed),
     881            0 :             num_computes: self.walreceivers.get_num() as u32,
     882            0 :             last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
     883            0 :             epoch_start_lsn: state.sk.term_start_lsn(),
     884            0 :             mem_state: state.sk.state().inmem.clone(),
     885            0 :             mgr_status: self.mgr_status.get(),
     886            0 :             write_lsn,
     887            0 :             write_record_lsn,
     888            0 :             flush_lsn,
     889            0 :             file_open,
     890            0 :         }
     891            0 :     }
     892              : 
     893              :     /// Apply a function to the control file state and persist it.
     894            0 :     pub async fn map_control_file<T>(
     895            0 :         self: &Arc<Self>,
     896            0 :         f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
     897            0 :     ) -> Result<T> {
     898            0 :         let mut state = self.write_shared_state().await;
     899            0 :         let mut persistent_state = state.sk.state_mut().start_change();
     900              :         // If f returns error, we abort the change and don't persist anything.
     901            0 :         let res = f(&mut persistent_state)?;
     902              :         // If persisting fails, we abort the change and return error.
     903            0 :         state
     904            0 :             .sk
     905            0 :             .state_mut()
     906            0 :             .finish_change(&persistent_state)
     907            0 :             .await?;
     908            0 :         Ok(res)
     909            0 :     }
     910              : 
     911            0 :     pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
     912            0 :         let mut state = self.write_shared_state().await;
     913            0 :         state.sk.term_bump(to).await
     914            0 :     }
     915              : 
     916            0 :     pub async fn membership_switch(
     917            0 :         self: &Arc<Self>,
     918            0 :         to: Configuration,
     919            0 :     ) -> Result<TimelineMembershipSwitchResponse> {
     920            0 :         let mut state = self.write_shared_state().await;
     921              :         // Ensure we don't race with exclude/delete requests by checking the cancellation
     922              :         // token under the write_shared_state lock.
     923              :         // Exclude/delete cancel the timeline under the shared state lock,
     924              :         // so the timeline cannot be deleted in the middle of the membership switch.
     925            0 :         if self.is_cancelled() {
     926            0 :             bail!(TimelineError::Cancelled(self.ttid));
     927            0 :         }
     928            0 :         state.sk.membership_switch(to).await
     929            0 :     }
     930              : 
     931              :     /// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
     932           10 :     async fn do_wal_residence_guard(
     933           10 :         self: &Arc<Self>,
     934           10 :         block: bool,
     935           10 :     ) -> Result<Option<WalResidentTimeline>> {
     936           10 :         let op_label = if block {
     937           10 :             "wal_residence_guard"
     938              :         } else {
     939            0 :             "try_wal_residence_guard"
     940              :         };
     941              : 
     942           10 :         if self.is_cancelled() {
     943            0 :             bail!(TimelineError::Cancelled(self.ttid));
     944           10 :         }
     945              : 
     946           10 :         debug!("requesting WalResidentTimeline guard");
     947           10 :         let started_at = Instant::now();
     948           10 :         let status_before = self.mgr_status.get();
     949              : 
     950              :         // Wait 30 seconds for the guard to be acquired. It can time out if someone is
     951              :         // holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
     952              :         // is stuck.
     953           10 :         let res = tokio::time::timeout_at(started_at + Duration::from_secs(30), async {
     954           10 :             if block {
     955           10 :                 self.manager_ctl.wal_residence_guard().await.map(Some)
     956              :             } else {
     957            0 :                 self.manager_ctl.try_wal_residence_guard().await
     958              :             }
     959           10 :         })
     960           10 :         .await;
     961              : 
     962           10 :         let guard = match res {
     963           10 :             Ok(Ok(guard)) => {
     964           10 :                 let finished_at = Instant::now();
     965           10 :                 let elapsed = finished_at - started_at;
     966           10 :                 MISC_OPERATION_SECONDS
     967           10 :                     .with_label_values(&[op_label])
     968           10 :                     .observe(elapsed.as_secs_f64());
     969              : 
     970           10 :                 guard
     971              :             }
     972            0 :             Ok(Err(e)) => {
     973            0 :                 warn!(
     974            0 :                     "error acquiring in {op_label}, statuses {:?} => {:?}",
     975              :                     status_before,
     976            0 :                     self.mgr_status.get()
     977              :                 );
     978            0 :                 return Err(e);
     979              :             }
     980              :             Err(_) => {
     981            0 :                 warn!(
     982            0 :                     "timeout acquiring in {op_label} guard, statuses {:?} => {:?}",
     983              :                     status_before,
     984            0 :                     self.mgr_status.get()
     985              :                 );
     986            0 :                 anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
     987              :             }
     988              :         };
     989              : 
     990           10 :         Ok(guard.map(|g| WalResidentTimeline::new(self.clone(), g)))
     991           10 :     }
     992              : 
     993              :     /// Get the timeline guard for reading/writing WAL files.
     994              :     /// If WAL files are not present on disk (evicted), they will be automatically
     995              :     /// downloaded from remote storage. This is done in the manager task, which is
     996              :     /// responsible for issuing all guards.
     997              :     ///
     998              :     /// NB: don't use this function from timeline_manager, it will deadlock.
     999              :     /// NB: don't use this function while holding shared_state lock.
    1000           10 :     pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
    1001           10 :         self.do_wal_residence_guard(true)
    1002           10 :             .await
    1003           10 :             .map(|m| m.expect("Always get Some in block=true mode"))
    1004           10 :     }
    1005              : 
    1006              :     /// Get the timeline guard for reading/writing WAL files if the timeline is resident,
    1007              :     /// else return None
    1008            0 :     pub(crate) async fn try_wal_residence_guard(
    1009            0 :         self: &Arc<Self>,
    1010            0 :     ) -> Result<Option<WalResidentTimeline>> {
    1011            0 :         self.do_wal_residence_guard(false).await
    1012            0 :     }
    1013              : 
    1014            0 :     pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
    1015            0 :         self.manager_ctl.backup_partial_reset().await
    1016            0 :     }
    1017              : }
    1018              : 
    1019              : /// This is a guard that allows to read/write disk timeline state.
    1020              : /// All tasks that are trying to read/write WAL from disk should use this guard.
    1021              : pub struct WalResidentTimeline {
    1022              :     pub tli: Arc<Timeline>,
    1023              :     _guard: ResidenceGuard,
    1024              : }
    1025              : 
    1026              : impl WalResidentTimeline {
    1027           15 :     pub fn new(tli: Arc<Timeline>, _guard: ResidenceGuard) -> Self {
    1028           15 :         WalResidentTimeline { tli, _guard }
    1029           15 :     }
    1030              : }
    1031              : 
    1032              : impl Deref for WalResidentTimeline {
    1033              :     type Target = Arc<Timeline>;
    1034              : 
    1035         6911 :     fn deref(&self) -> &Self::Target {
    1036         6911 :         &self.tli
    1037         6911 :     }
    1038              : }
    1039              : 
    1040              : impl WalResidentTimeline {
    1041              :     /// Returns true if walsender should stop sending WAL to pageserver. We
    1042              :     /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
    1043              :     /// computes. While there might be nothing to stream already, we learn about
    1044              :     /// remote_consistent_lsn update through replication feedback, and we want
    1045              :     /// to stop pushing to the broker if pageserver is fully caughtup.
    1046            0 :     pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
    1047            0 :         if self.is_cancelled() {
    1048            0 :             return true;
    1049            0 :         }
    1050            0 :         let shared_state = self.read_shared_state().await;
    1051            0 :         if self.walreceivers.get_num() == 0 {
    1052            0 :             return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
    1053            0 :             reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
    1054            0 :         }
    1055            0 :         false
    1056            0 :     }
    1057              : 
    1058              :     /// Ensure that current term is t, erroring otherwise, and lock the state.
    1059            0 :     pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
    1060            0 :         let ss = self.read_shared_state().await;
    1061            0 :         if ss.sk.state().acceptor_state.term != t {
    1062            0 :             bail!(
    1063            0 :                 "failed to acquire term {}, current term {}",
    1064              :                 t,
    1065            0 :                 ss.sk.state().acceptor_state.term
    1066              :             );
    1067            0 :         }
    1068            0 :         Ok(ss)
    1069            0 :     }
    1070              : 
    1071              :     // BEGIN HADRON
    1072              :     // Check if disk usage by WAL segment files for this timeline exceeds the configured limit.
    1073         1240 :     fn hadron_check_disk_usage(
    1074         1240 :         &self,
    1075         1240 :         shared_state_locked: &mut WriteGuardSharedState<'_>,
    1076         1240 :     ) -> Result<()> {
    1077              :         // The disk usage is calculated based on the number of segments between `last_removed_segno`
    1078              :         // and the current flush LSN segment number. `last_removed_segno` is advanced after
    1079              :         // unneeded WAL files are physically removed from disk (see `update_wal_removal_end()`
    1080              :         // in `timeline_manager.rs`).
    1081         1240 :         let max_timeline_disk_usage_bytes = self.conf.max_timeline_disk_usage_bytes;
    1082         1240 :         if max_timeline_disk_usage_bytes > 0 {
    1083            0 :             let last_removed_segno = self.last_removed_segno.load(Ordering::Relaxed);
    1084            0 :             let flush_lsn = shared_state_locked.sk.flush_lsn();
    1085            0 :             let wal_seg_size = shared_state_locked.sk.state().server.wal_seg_size as u64;
    1086            0 :             let current_segno = flush_lsn.segment_number(wal_seg_size as usize);
    1087              : 
    1088            0 :             let segno_count = current_segno - last_removed_segno;
    1089            0 :             let disk_usage_bytes = segno_count * wal_seg_size;
    1090              : 
    1091            0 :             if disk_usage_bytes > max_timeline_disk_usage_bytes {
    1092            0 :                 WAL_STORAGE_LIMIT_ERRORS.inc();
    1093            0 :                 bail!(
    1094            0 :                     "WAL storage utilization exceeds configured limit of {} bytes: current disk usage: {} bytes",
    1095              :                     max_timeline_disk_usage_bytes,
    1096              :                     disk_usage_bytes
    1097              :                 );
    1098            0 :             }
    1099         1240 :         }
    1100              : 
    1101         1240 :         if GLOBAL_DISK_LIMIT_EXCEEDED.load(Ordering::Relaxed) {
    1102            0 :             bail!("Global disk usage exceeded limit");
    1103         1240 :         }
    1104              : 
    1105         1240 :         Ok(())
    1106         1240 :     }
    1107              :     // END HADRON
    1108              : 
    1109              :     /// Pass arrived message to the safekeeper.
    1110         1240 :     pub async fn process_msg(
    1111         1240 :         &self,
    1112         1240 :         msg: &ProposerAcceptorMessage,
    1113         1240 :     ) -> Result<Option<AcceptorProposerMessage>> {
    1114         1240 :         if self.is_cancelled() {
    1115            0 :             bail!(TimelineError::Cancelled(self.ttid));
    1116         1240 :         }
    1117              : 
    1118              :         let mut rmsg: Option<AcceptorProposerMessage>;
    1119              :         {
    1120         1240 :             let mut shared_state = self.write_shared_state().await;
    1121              :             // BEGIN HADRON
    1122              :             // Errors from the `hadron_check_disk_usage()` function fail the process_msg() function, which
    1123              :             // gets propagated upward and terminates the entire WalAcceptor. This will cause postgres to
    1124              :             // disconnect from the safekeeper and reestablish another connection. Postgres will keep retrying
    1125              :             // safekeeper connections every second until it can successfully propose WAL to the SK again.
    1126         1240 :             self.hadron_check_disk_usage(&mut shared_state)?;
    1127              :             // END HADRON
    1128         1240 :             rmsg = shared_state.sk.safekeeper().process_msg(msg).await?;
    1129              : 
    1130              :             // if this is AppendResponse, fill in proper hot standby feedback.
    1131          620 :             if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
    1132          620 :                 resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
    1133          620 :             }
    1134              :         }
    1135         1240 :         Ok(rmsg)
    1136         1240 :     }
    1137              : 
    1138            9 :     pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
    1139            9 :         let (_, persisted_state) = self.get_state().await;
    1140              : 
    1141            9 :         WalReader::new(
    1142            9 :             &self.ttid,
    1143            9 :             self.timeline_dir.clone(),
    1144            9 :             &persisted_state,
    1145            9 :             start_lsn,
    1146            9 :             self.wal_backup.clone(),
    1147              :         )
    1148            9 :     }
    1149              : 
    1150            0 :     pub fn get_timeline_dir(&self) -> Utf8PathBuf {
    1151            0 :         self.timeline_dir.clone()
    1152            0 :     }
    1153              : 
    1154              :     /// Update in memory remote consistent lsn.
    1155            0 :     pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
    1156            0 :         let mut shared_state = self.write_shared_state().await;
    1157            0 :         shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
    1158            0 :             shared_state.sk.state().inmem.remote_consistent_lsn,
    1159            0 :             candidate,
    1160            0 :         );
    1161            0 :     }
    1162              : }
    1163              : 
    1164              : /// This struct contains methods that are used by timeline manager task.
    1165              : pub(crate) struct ManagerTimeline {
    1166              :     pub(crate) tli: Arc<Timeline>,
    1167              : }
    1168              : 
    1169              : impl Deref for ManagerTimeline {
    1170              :     type Target = Arc<Timeline>;
    1171              : 
    1172          391 :     fn deref(&self) -> &Self::Target {
    1173          391 :         &self.tli
    1174          391 :     }
    1175              : }
    1176              : 
    1177              : impl ManagerTimeline {
    1178            0 :     pub(crate) fn timeline_dir(&self) -> &Utf8PathBuf {
    1179            0 :         &self.tli.timeline_dir
    1180            0 :     }
    1181              : 
    1182              :     /// Manager requests this state on startup.
    1183            5 :     pub(crate) async fn bootstrap_mgr(&self) -> (bool, Option<PartialRemoteSegment>) {
    1184            5 :         let shared_state = self.read_shared_state().await;
    1185            5 :         let is_offloaded = matches!(
    1186            5 :             shared_state.sk.state().eviction_state,
    1187              :             EvictionState::Offloaded(_)
    1188              :         );
    1189            5 :         let partial_backup_uploaded = shared_state.sk.state().partial_backup.uploaded_segment();
    1190              : 
    1191            5 :         (is_offloaded, partial_backup_uploaded)
    1192            5 :     }
    1193              : 
    1194              :     /// Try to switch state Present->Offloaded.
    1195            0 :     pub(crate) async fn switch_to_offloaded(
    1196            0 :         &self,
    1197            0 :         partial: &PartialRemoteSegment,
    1198            0 :     ) -> anyhow::Result<()> {
    1199            0 :         let mut shared = self.write_shared_state().await;
    1200              : 
    1201              :         // updating control file
    1202            0 :         let mut pstate = shared.sk.state_mut().start_change();
    1203              : 
    1204            0 :         if !matches!(pstate.eviction_state, EvictionState::Present) {
    1205            0 :             bail!(
    1206            0 :                 "cannot switch to offloaded state, current state is {:?}",
    1207              :                 pstate.eviction_state
    1208              :             );
    1209            0 :         }
    1210              : 
    1211            0 :         if partial.flush_lsn != shared.sk.flush_lsn() {
    1212            0 :             bail!(
    1213            0 :                 "flush_lsn mismatch in partial backup, expected {}, got {}",
    1214            0 :                 shared.sk.flush_lsn(),
    1215              :                 partial.flush_lsn
    1216              :             );
    1217            0 :         }
    1218              : 
    1219            0 :         if partial.commit_lsn != pstate.commit_lsn {
    1220            0 :             bail!(
    1221            0 :                 "commit_lsn mismatch in partial backup, expected {}, got {}",
    1222              :                 pstate.commit_lsn,
    1223              :                 partial.commit_lsn
    1224              :             );
    1225            0 :         }
    1226              : 
    1227            0 :         if partial.term != shared.sk.last_log_term() {
    1228            0 :             bail!(
    1229            0 :                 "term mismatch in partial backup, expected {}, got {}",
    1230            0 :                 shared.sk.last_log_term(),
    1231              :                 partial.term
    1232              :             );
    1233            0 :         }
    1234              : 
    1235            0 :         pstate.eviction_state = EvictionState::Offloaded(shared.sk.flush_lsn());
    1236            0 :         shared.sk.state_mut().finish_change(&pstate).await?;
    1237              :         // control file is now switched to Offloaded state
    1238              : 
    1239              :         // now we can switch shared.sk to Offloaded, shouldn't fail
    1240            0 :         let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
    1241            0 :         let cfile_state = prev_sk.take_state();
    1242            0 :         shared.sk = StateSK::Offloaded(Box::new(cfile_state));
    1243              : 
    1244            0 :         Ok(())
    1245            0 :     }
    1246              : 
    1247              :     /// Try to switch state Offloaded->Present.
    1248            0 :     pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
    1249            0 :         let mut shared = self.write_shared_state().await;
    1250              : 
    1251              :         // trying to restore WAL storage
    1252            0 :         let wal_store = wal_storage::PhysicalStorage::new(
    1253            0 :             &self.ttid,
    1254            0 :             &self.timeline_dir,
    1255            0 :             shared.sk.state(),
    1256            0 :             self.conf.no_sync,
    1257            0 :         )?;
    1258              : 
    1259              :         // updating control file
    1260            0 :         let mut pstate = shared.sk.state_mut().start_change();
    1261              : 
    1262            0 :         if !matches!(pstate.eviction_state, EvictionState::Offloaded(_)) {
    1263            0 :             bail!(
    1264            0 :                 "cannot switch to present state, current state is {:?}",
    1265              :                 pstate.eviction_state
    1266              :             );
    1267            0 :         }
    1268              : 
    1269            0 :         if wal_store.flush_lsn() != shared.sk.flush_lsn() {
    1270            0 :             bail!(
    1271            0 :                 "flush_lsn mismatch in restored WAL, expected {}, got {}",
    1272            0 :                 shared.sk.flush_lsn(),
    1273            0 :                 wal_store.flush_lsn()
    1274              :             );
    1275            0 :         }
    1276              : 
    1277            0 :         pstate.eviction_state = EvictionState::Present;
    1278            0 :         shared.sk.state_mut().finish_change(&pstate).await?;
    1279              : 
    1280              :         // now we can switch shared.sk to Present, shouldn't fail
    1281            0 :         let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
    1282            0 :         let cfile_state = prev_sk.take_state();
    1283            0 :         shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, self.conf.my_id)?);
    1284              : 
    1285            0 :         Ok(())
    1286            0 :     }
    1287              : 
    1288              :     /// Update current manager state, useful for debugging manager deadlocks.
    1289          208 :     pub(crate) fn set_status(&self, status: timeline_manager::Status) {
    1290          208 :         self.mgr_status.store(status, Ordering::Relaxed);
    1291          208 :     }
    1292              : }
    1293              : 
    1294              : /// Deletes directory and it's contents. Returns false if directory does not exist.
    1295            0 : pub async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
    1296            0 :     match fs::remove_dir_all(path).await {
    1297            0 :         Ok(_) => Ok(true),
    1298            0 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
    1299            0 :         Err(e) => Err(e.into()),
    1300              :     }
    1301            0 : }
    1302              : 
    1303              : /// Get a path to the tenant directory. If you just need to get a timeline directory,
    1304              : /// use WalResidentTimeline::get_timeline_dir instead.
    1305           10 : pub fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
    1306           10 :     conf.workdir.join(tenant_id.to_string())
    1307           10 : }
    1308              : 
    1309              : /// Get a path to the timeline directory. If you need to read WAL files from disk,
    1310              : /// use WalResidentTimeline::get_timeline_dir instead. This function does not check
    1311              : /// timeline eviction status and WAL files might not be present on disk.
    1312           10 : pub fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
    1313           10 :     get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
    1314           10 : }
        

Generated by: LCOV version 2.1-beta