LCOV - code coverage report
Current view: top level - safekeeper/src - timeline.rs (source / functions) Coverage Total Hit
Test: feead26e04cdef6e988ff1765b1cb7075eb48d3d.info Lines: 35.9 % 708 254
Test Date: 2025-02-28 12:11:00 Functions: 41.2 % 119 49

            Line data    Source code
       1              : //! This module implements Timeline lifecycle management and has all necessary code
       2              : //! to glue together SafeKeeper and all other background services.
       3              : 
       4              : use std::cmp::max;
       5              : use std::ops::{Deref, DerefMut};
       6              : use std::sync::Arc;
       7              : use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
       8              : use std::time::Duration;
       9              : 
      10              : use anyhow::{Result, anyhow, bail};
      11              : use camino::{Utf8Path, Utf8PathBuf};
      12              : use http_utils::error::ApiError;
      13              : use remote_storage::RemotePath;
      14              : use safekeeper_api::Term;
      15              : use safekeeper_api::membership::Configuration;
      16              : use safekeeper_api::models::{
      17              :     PeerInfo, TimelineMembershipSwitchResponse, TimelineTermBumpResponse,
      18              : };
      19              : use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
      20              : use tokio::fs::{self};
      21              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, watch};
      22              : use tokio::time::Instant;
      23              : use tokio_util::sync::CancellationToken;
      24              : use tracing::*;
      25              : use utils::id::{NodeId, TenantId, TenantTimelineId};
      26              : use utils::lsn::Lsn;
      27              : use utils::sync::gate::Gate;
      28              : 
      29              : use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics};
      30              : use crate::rate_limit::RateLimiter;
      31              : use crate::receive_wal::WalReceivers;
      32              : use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn};
      33              : use crate::send_wal::{WalSenders, WalSendersTimelineMetricValues};
      34              : use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
      35              : use crate::timeline_guard::ResidenceGuard;
      36              : use crate::timeline_manager::{AtomicStatus, ManagerCtl};
      37              : use crate::timelines_set::TimelinesSet;
      38              : use crate::wal_backup::{self, remote_timeline_path};
      39              : use crate::wal_backup_partial::PartialRemoteSegment;
      40              : use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
      41              : use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
      42              : 
      43            0 : fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
      44            0 :     PeerInfo {
      45            0 :         sk_id: NodeId(sk_info.safekeeper_id),
      46            0 :         term: sk_info.term,
      47            0 :         last_log_term: sk_info.last_log_term,
      48            0 :         flush_lsn: Lsn(sk_info.flush_lsn),
      49            0 :         commit_lsn: Lsn(sk_info.commit_lsn),
      50            0 :         local_start_lsn: Lsn(sk_info.local_start_lsn),
      51            0 :         pg_connstr: sk_info.safekeeper_connstr.clone(),
      52            0 :         http_connstr: sk_info.http_connstr.clone(),
      53            0 :         ts,
      54            0 :     }
      55            0 : }
      56              : 
      57              : // vector-based node id -> peer state map with very limited functionality we
      58              : // need.
      59              : #[derive(Debug, Clone, Default)]
      60              : pub struct PeersInfo(pub Vec<PeerInfo>);
      61              : 
      62              : impl PeersInfo {
      63            0 :     fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
      64            0 :         self.0.iter_mut().find(|p| p.sk_id == id)
      65            0 :     }
      66              : 
      67            0 :     fn upsert(&mut self, p: &PeerInfo) {
      68            0 :         match self.get(p.sk_id) {
      69            0 :             Some(rp) => *rp = p.clone(),
      70            0 :             None => self.0.push(p.clone()),
      71              :         }
      72            0 :     }
      73              : }
      74              : 
      75              : pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
      76              : 
      77              : /// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
      78              : /// automatically updates `watch::Sender` channels with state on drop.
      79              : pub struct WriteGuardSharedState<'a> {
      80              :     tli: Arc<Timeline>,
      81              :     guard: RwLockWriteGuard<'a, SharedState>,
      82              : }
      83              : 
      84              : impl<'a> WriteGuardSharedState<'a> {
      85         1238 :     fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
      86         1238 :         WriteGuardSharedState { tli, guard }
      87         1238 :     }
      88              : }
      89              : 
      90              : impl Deref for WriteGuardSharedState<'_> {
      91              :     type Target = SharedState;
      92              : 
      93            0 :     fn deref(&self) -> &Self::Target {
      94            0 :         &self.guard
      95            0 :     }
      96              : }
      97              : 
      98              : impl DerefMut for WriteGuardSharedState<'_> {
      99         1234 :     fn deref_mut(&mut self) -> &mut Self::Target {
     100         1234 :         &mut self.guard
     101         1234 :     }
     102              : }
     103              : 
     104              : impl Drop for WriteGuardSharedState<'_> {
     105         1238 :     fn drop(&mut self) {
     106         1238 :         let term_flush_lsn =
     107         1238 :             TermLsn::from((self.guard.sk.last_log_term(), self.guard.sk.flush_lsn()));
     108         1238 :         let commit_lsn = self.guard.sk.state().inmem.commit_lsn;
     109         1238 : 
     110         1238 :         let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
     111         1238 :             if *old != term_flush_lsn {
     112          610 :                 *old = term_flush_lsn;
     113          610 :                 true
     114              :             } else {
     115          628 :                 false
     116              :             }
     117         1238 :         });
     118         1238 : 
     119         1238 :         let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
     120         1238 :             if *old != commit_lsn {
     121          606 :                 *old = commit_lsn;
     122          606 :                 true
     123              :             } else {
     124          632 :                 false
     125              :             }
     126         1238 :         });
     127         1238 : 
     128         1238 :         // send notification about shared state update
     129         1238 :         self.tli.shared_state_version_tx.send_modify(|old| {
     130         1238 :             *old += 1;
     131         1238 :         });
     132         1238 :     }
     133              : }
     134              : 
     135              : /// This structure is stored in shared state and represents the state of the timeline.
     136              : ///
     137              : /// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
     138              : /// case, SafeKeeper is not available (because WAL is not present on disk) and all
     139              : /// operations can be done only with control file.
     140              : pub enum StateSK {
     141              :     Loaded(SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>),
     142              :     Offloaded(Box<TimelineState<control_file::FileStorage>>),
     143              :     // Not used, required for moving between states.
     144              :     Empty,
     145              : }
     146              : 
     147              : impl StateSK {
     148         2570 :     pub fn flush_lsn(&self) -> Lsn {
     149         2570 :         match self {
     150         2570 :             StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
     151            0 :             StateSK::Offloaded(state) => match state.eviction_state {
     152            0 :                 EvictionState::Offloaded(flush_lsn) => flush_lsn,
     153            0 :                 _ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
     154              :             },
     155            0 :             StateSK::Empty => unreachable!(),
     156              :         }
     157         2570 :     }
     158              : 
     159              :     /// Get a reference to the control file's timeline state.
     160         2598 :     pub fn state(&self) -> &TimelineState<control_file::FileStorage> {
     161         2598 :         match self {
     162         2598 :             StateSK::Loaded(sk) => &sk.state,
     163            0 :             StateSK::Offloaded(s) => s,
     164            0 :             StateSK::Empty => unreachable!(),
     165              :         }
     166         2598 :     }
     167              : 
     168           14 :     pub fn state_mut(&mut self) -> &mut TimelineState<control_file::FileStorage> {
     169           14 :         match self {
     170           14 :             StateSK::Loaded(sk) => &mut sk.state,
     171            0 :             StateSK::Offloaded(s) => s,
     172            0 :             StateSK::Empty => unreachable!(),
     173              :         }
     174           14 :     }
     175              : 
     176         1285 :     pub fn last_log_term(&self) -> Term {
     177         1285 :         self.state()
     178         1285 :             .acceptor_state
     179         1285 :             .get_last_log_term(self.flush_lsn())
     180         1285 :     }
     181              : 
     182            0 :     pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
     183            0 :         self.state_mut().term_bump(to).await
     184            0 :     }
     185              : 
     186            0 :     pub async fn membership_switch(
     187            0 :         &mut self,
     188            0 :         to: Configuration,
     189            0 :     ) -> Result<TimelineMembershipSwitchResponse> {
     190            0 :         self.state_mut().membership_switch(to).await
     191            0 :     }
     192              : 
     193              :     /// Close open WAL files to release FDs.
     194            0 :     fn close_wal_store(&mut self) {
     195            0 :         if let StateSK::Loaded(sk) = self {
     196            0 :             sk.wal_store.close();
     197            0 :         }
     198            0 :     }
     199              : 
     200              :     /// Update timeline state with peer safekeeper data.
     201            0 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
     202            0 :         // update commit_lsn if safekeeper is loaded
     203            0 :         match self {
     204            0 :             StateSK::Loaded(sk) => sk.record_safekeeper_info(sk_info).await?,
     205            0 :             StateSK::Offloaded(_) => {}
     206            0 :             StateSK::Empty => unreachable!(),
     207              :         }
     208              : 
     209              :         // update everything else, including remote_consistent_lsn and backup_lsn
     210            0 :         let mut sync_control_file = false;
     211            0 :         let state = self.state_mut();
     212            0 :         let wal_seg_size = state.server.wal_seg_size as u64;
     213            0 : 
     214            0 :         state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
     215            0 :         sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
     216            0 : 
     217            0 :         state.inmem.remote_consistent_lsn = max(
     218            0 :             Lsn(sk_info.remote_consistent_lsn),
     219            0 :             state.inmem.remote_consistent_lsn,
     220            0 :         );
     221            0 :         sync_control_file |=
     222            0 :             state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
     223            0 : 
     224            0 :         state.inmem.peer_horizon_lsn =
     225            0 :             max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
     226            0 :         sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
     227            0 : 
     228            0 :         if sync_control_file {
     229            0 :             state.flush().await?;
     230            0 :         }
     231            0 :         Ok(())
     232            0 :     }
     233              : 
     234              :     /// Previously known as epoch_start_lsn. Needed only for reference in some APIs.
     235            0 :     pub fn term_start_lsn(&self) -> Lsn {
     236            0 :         match self {
     237            0 :             StateSK::Loaded(sk) => sk.term_start_lsn,
     238            0 :             StateSK::Offloaded(_) => Lsn(0),
     239            0 :             StateSK::Empty => unreachable!(),
     240              :         }
     241            0 :     }
     242              : 
     243              :     /// Used for metrics only.
     244            0 :     pub fn wal_storage_metrics(&self) -> WalStorageMetrics {
     245            0 :         match self {
     246            0 :             StateSK::Loaded(sk) => sk.wal_store.get_metrics(),
     247            0 :             StateSK::Offloaded(_) => WalStorageMetrics::default(),
     248            0 :             StateSK::Empty => unreachable!(),
     249              :         }
     250            0 :     }
     251              : 
     252              :     /// Returns WAL storage internal LSNs for debug dump.
     253            0 :     pub fn wal_storage_internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
     254            0 :         match self {
     255            0 :             StateSK::Loaded(sk) => sk.wal_store.internal_state(),
     256              :             StateSK::Offloaded(_) => {
     257            0 :                 let flush_lsn = self.flush_lsn();
     258            0 :                 (flush_lsn, flush_lsn, flush_lsn, false)
     259              :             }
     260            0 :             StateSK::Empty => unreachable!(),
     261              :         }
     262            0 :     }
     263              : 
     264              :     /// Access to SafeKeeper object. Panics if offloaded, should be good to use from WalResidentTimeline.
     265         1220 :     pub fn safekeeper(
     266         1220 :         &mut self,
     267         1220 :     ) -> &mut SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage> {
     268         1220 :         match self {
     269         1220 :             StateSK::Loaded(sk) => sk,
     270              :             StateSK::Offloaded(_) => {
     271            0 :                 panic!("safekeeper is offloaded, cannot be used")
     272              :             }
     273            0 :             StateSK::Empty => unreachable!(),
     274              :         }
     275         1220 :     }
     276              : 
     277              :     /// Moves control file's state structure out of the enum. Used to switch states.
     278            0 :     fn take_state(self) -> TimelineState<control_file::FileStorage> {
     279            0 :         match self {
     280            0 :             StateSK::Loaded(sk) => sk.state,
     281            0 :             StateSK::Offloaded(state) => *state,
     282            0 :             StateSK::Empty => unreachable!(),
     283              :         }
     284            0 :     }
     285              : }
     286              : 
     287              : /// Shared state associated with database instance
     288              : pub struct SharedState {
     289              :     /// Safekeeper object
     290              :     pub(crate) sk: StateSK,
     291              :     /// In memory list containing state of peers sent in latest messages from them.
     292              :     pub(crate) peers_info: PeersInfo,
     293              :     // True value hinders old WAL removal; this is used by snapshotting. We
     294              :     // could make it a counter, but there is no need to.
     295              :     pub(crate) wal_removal_on_hold: bool,
     296              : }
     297              : 
     298              : impl SharedState {
     299              :     /// Creates a new SharedState.
     300            4 :     pub fn new(sk: StateSK) -> Self {
     301            4 :         Self {
     302            4 :             sk,
     303            4 :             peers_info: PeersInfo(vec![]),
     304            4 :             wal_removal_on_hold: false,
     305            4 :         }
     306            4 :     }
     307              : 
     308              :     /// Restore SharedState from control file. If file doesn't exist, bails out.
     309            0 :     pub fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
     310            0 :         let timeline_dir = get_timeline_dir(conf, ttid);
     311            0 :         let control_store = control_file::FileStorage::restore_new(&timeline_dir, conf.no_sync)?;
     312            0 :         if control_store.server.wal_seg_size == 0 {
     313            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     314            0 :         }
     315              : 
     316            0 :         let sk = match control_store.eviction_state {
     317              :             EvictionState::Present => {
     318            0 :                 let wal_store = wal_storage::PhysicalStorage::new(
     319            0 :                     ttid,
     320            0 :                     &timeline_dir,
     321            0 :                     &control_store,
     322            0 :                     conf.no_sync,
     323            0 :                 )?;
     324            0 :                 StateSK::Loaded(SafeKeeper::new(
     325            0 :                     TimelineState::new(control_store),
     326            0 :                     wal_store,
     327            0 :                     conf.my_id,
     328            0 :                 )?)
     329              :             }
     330              :             EvictionState::Offloaded(_) => {
     331            0 :                 StateSK::Offloaded(Box::new(TimelineState::new(control_store)))
     332              :             }
     333              :         };
     334              : 
     335            0 :         Ok(Self::new(sk))
     336            0 :     }
     337              : 
     338            4 :     pub(crate) fn get_wal_seg_size(&self) -> usize {
     339            4 :         self.sk.state().server.wal_seg_size as usize
     340            4 :     }
     341              : 
     342            0 :     fn get_safekeeper_info(
     343            0 :         &self,
     344            0 :         ttid: &TenantTimelineId,
     345            0 :         conf: &SafeKeeperConf,
     346            0 :         standby_apply_lsn: Lsn,
     347            0 :     ) -> SafekeeperTimelineInfo {
     348            0 :         SafekeeperTimelineInfo {
     349            0 :             safekeeper_id: conf.my_id.0,
     350            0 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     351            0 :                 tenant_id: ttid.tenant_id.as_ref().to_owned(),
     352            0 :                 timeline_id: ttid.timeline_id.as_ref().to_owned(),
     353            0 :             }),
     354            0 :             term: self.sk.state().acceptor_state.term,
     355            0 :             last_log_term: self.sk.last_log_term(),
     356            0 :             flush_lsn: self.sk.flush_lsn().0,
     357            0 :             // note: this value is not flushed to control file yet and can be lost
     358            0 :             commit_lsn: self.sk.state().inmem.commit_lsn.0,
     359            0 :             remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
     360            0 :             peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
     361            0 :             safekeeper_connstr: conf
     362            0 :                 .advertise_pg_addr
     363            0 :                 .to_owned()
     364            0 :                 .unwrap_or(conf.listen_pg_addr.clone()),
     365            0 :             http_connstr: conf.listen_http_addr.to_owned(),
     366            0 :             backup_lsn: self.sk.state().inmem.backup_lsn.0,
     367            0 :             local_start_lsn: self.sk.state().local_start_lsn.0,
     368            0 :             availability_zone: conf.availability_zone.clone(),
     369            0 :             standby_horizon: standby_apply_lsn.0,
     370            0 :         }
     371            0 :     }
     372              : 
     373              :     /// Get our latest view of alive peers status on the timeline.
     374              :     /// We pass our own info through the broker as well, so when we don't have connection
     375              :     /// to the broker returned vec is empty.
     376           43 :     pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
     377           43 :         let now = Instant::now();
     378           43 :         self.peers_info
     379           43 :             .0
     380           43 :             .iter()
     381           43 :             // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
     382           43 :             .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
     383           43 :             .cloned()
     384           43 :             .collect()
     385           43 :     }
     386              : }
     387              : 
     388              : #[derive(Debug, thiserror::Error)]
     389              : pub enum TimelineError {
     390              :     #[error("Timeline {0} was cancelled and cannot be used anymore")]
     391              :     Cancelled(TenantTimelineId),
     392              :     #[error("Timeline {0} was not found in global map")]
     393              :     NotFound(TenantTimelineId),
     394              :     #[error("Timeline {0} creation is in progress")]
     395              :     CreationInProgress(TenantTimelineId),
     396              :     #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
     397              :     Invalid(TenantTimelineId),
     398              :     #[error("Timeline {0} is already exists")]
     399              :     AlreadyExists(TenantTimelineId),
     400              :     #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
     401              :     UninitializedWalSegSize(TenantTimelineId),
     402              :     #[error("Timeline {0} is not initialized, pg_version is unknown")]
     403              :     UninitialinzedPgVersion(TenantTimelineId),
     404              : }
     405              : 
     406              : // Convert to HTTP API error.
     407              : impl From<TimelineError> for ApiError {
     408            0 :     fn from(te: TimelineError) -> ApiError {
     409            0 :         match te {
     410            0 :             TimelineError::NotFound(ttid) => {
     411            0 :                 ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
     412              :             }
     413            0 :             _ => ApiError::InternalServerError(anyhow!("{}", te)),
     414              :         }
     415            0 :     }
     416              : }
     417              : 
     418              : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
     419              : /// It also holds SharedState and provides mutually exclusive access to it.
     420              : pub struct Timeline {
     421              :     pub ttid: TenantTimelineId,
     422              :     pub remote_path: RemotePath,
     423              : 
     424              :     /// Used to broadcast commit_lsn updates to all background jobs.
     425              :     commit_lsn_watch_tx: watch::Sender<Lsn>,
     426              :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     427              : 
     428              :     /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
     429              :     /// them when sending in recovery mode (to walproposer or peers). Note: this
     430              :     /// is just a notification, WAL reading should always done with lock held as
     431              :     /// term can change otherwise.
     432              :     term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
     433              :     term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
     434              : 
     435              :     /// Broadcasts shared state updates.
     436              :     shared_state_version_tx: watch::Sender<usize>,
     437              :     shared_state_version_rx: watch::Receiver<usize>,
     438              : 
     439              :     /// Safekeeper and other state, that should remain consistent and
     440              :     /// synchronized with the disk. This is tokio mutex as we write WAL to disk
     441              :     /// while holding it, ensuring that consensus checks are in order.
     442              :     mutex: RwLock<SharedState>,
     443              :     walsenders: Arc<WalSenders>,
     444              :     walreceivers: Arc<WalReceivers>,
     445              :     timeline_dir: Utf8PathBuf,
     446              :     manager_ctl: ManagerCtl,
     447              :     conf: Arc<SafeKeeperConf>,
     448              : 
     449              :     /// Hold this gate from code that depends on the Timeline's non-shut-down state.  While holding
     450              :     /// this gate, you must respect [`Timeline::cancel`]
     451              :     pub(crate) gate: Gate,
     452              : 
     453              :     /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
     454              :     pub(crate) cancel: CancellationToken,
     455              : 
     456              :     // timeline_manager controlled state
     457              :     pub(crate) broker_active: AtomicBool,
     458              :     pub(crate) wal_backup_active: AtomicBool,
     459              :     pub(crate) last_removed_segno: AtomicU64,
     460              :     pub(crate) mgr_status: AtomicStatus,
     461              : }
     462              : 
     463              : impl Timeline {
     464              :     /// Constructs a new timeline.
     465            4 :     pub fn new(
     466            4 :         ttid: TenantTimelineId,
     467            4 :         timeline_dir: &Utf8Path,
     468            4 :         remote_path: &RemotePath,
     469            4 :         shared_state: SharedState,
     470            4 :         conf: Arc<SafeKeeperConf>,
     471            4 :     ) -> Arc<Self> {
     472            4 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
     473            4 :             watch::channel(shared_state.sk.state().commit_lsn);
     474            4 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
     475            4 :             shared_state.sk.last_log_term(),
     476            4 :             shared_state.sk.flush_lsn(),
     477            4 :         )));
     478            4 :         let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
     479            4 : 
     480            4 :         let walreceivers = WalReceivers::new();
     481            4 : 
     482            4 :         Arc::new(Self {
     483            4 :             ttid,
     484            4 :             remote_path: remote_path.to_owned(),
     485            4 :             timeline_dir: timeline_dir.to_owned(),
     486            4 :             commit_lsn_watch_tx,
     487            4 :             commit_lsn_watch_rx,
     488            4 :             term_flush_lsn_watch_tx,
     489            4 :             term_flush_lsn_watch_rx,
     490            4 :             shared_state_version_tx,
     491            4 :             shared_state_version_rx,
     492            4 :             mutex: RwLock::new(shared_state),
     493            4 :             walsenders: WalSenders::new(walreceivers.clone()),
     494            4 :             walreceivers,
     495            4 :             gate: Default::default(),
     496            4 :             cancel: CancellationToken::default(),
     497            4 :             manager_ctl: ManagerCtl::new(),
     498            4 :             conf,
     499            4 :             broker_active: AtomicBool::new(false),
     500            4 :             wal_backup_active: AtomicBool::new(false),
     501            4 :             last_removed_segno: AtomicU64::new(0),
     502            4 :             mgr_status: AtomicStatus::new(),
     503            4 :         })
     504            4 :     }
     505              : 
     506              :     /// Load existing timeline from disk.
     507            0 :     pub fn load_timeline(
     508            0 :         conf: Arc<SafeKeeperConf>,
     509            0 :         ttid: TenantTimelineId,
     510            0 :     ) -> Result<Arc<Timeline>> {
     511            0 :         let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
     512              : 
     513            0 :         let shared_state = SharedState::restore(conf.as_ref(), &ttid)?;
     514            0 :         let timeline_dir = get_timeline_dir(conf.as_ref(), &ttid);
     515            0 :         let remote_path = remote_timeline_path(&ttid)?;
     516              : 
     517            0 :         Ok(Timeline::new(
     518            0 :             ttid,
     519            0 :             &timeline_dir,
     520            0 :             &remote_path,
     521            0 :             shared_state,
     522            0 :             conf,
     523            0 :         ))
     524            0 :     }
     525              : 
     526              :     /// Bootstrap new or existing timeline starting background tasks.
     527            4 :     pub fn bootstrap(
     528            4 :         self: &Arc<Timeline>,
     529            4 :         _shared_state: &mut WriteGuardSharedState<'_>,
     530            4 :         conf: &SafeKeeperConf,
     531            4 :         broker_active_set: Arc<TimelinesSet>,
     532            4 :         partial_backup_rate_limiter: RateLimiter,
     533            4 :     ) {
     534            4 :         let (tx, rx) = self.manager_ctl.bootstrap_manager();
     535              : 
     536            4 :         let Ok(gate_guard) = self.gate.enter() else {
     537              :             // Init raced with shutdown
     538            0 :             return;
     539              :         };
     540              : 
     541              :         // Start manager task which will monitor timeline state and update
     542              :         // background tasks.
     543            4 :         tokio::spawn({
     544            4 :             let this = self.clone();
     545            4 :             let conf = conf.clone();
     546            4 :             async move {
     547            4 :                 let _gate_guard = gate_guard;
     548            4 :                 timeline_manager::main_task(
     549            4 :                     ManagerTimeline { tli: this },
     550            4 :                     conf,
     551            4 :                     broker_active_set,
     552            4 :                     tx,
     553            4 :                     rx,
     554            4 :                     partial_backup_rate_limiter,
     555            4 :                 )
     556            4 :                 .await
     557            4 :             }
     558            4 :         });
     559            4 :     }
     560              : 
     561              :     /// Cancel the timeline, requesting background activity to stop. Closing
     562              :     /// the `self.gate` waits for that.
     563            0 :     pub async fn cancel(&self) {
     564            0 :         info!("timeline {} shutting down", self.ttid);
     565            0 :         self.cancel.cancel();
     566            0 :     }
     567              : 
     568              :     /// Background timeline activities (which hold Timeline::gate) will no
     569              :     /// longer run once this function completes. `Self::cancel` must have been
     570              :     /// already called.
     571            0 :     pub async fn close(&self) {
     572            0 :         assert!(self.cancel.is_cancelled());
     573              : 
     574              :         // Wait for any concurrent tasks to stop using this timeline, to avoid e.g. attempts
     575              :         // to read deleted files.
     576            0 :         self.gate.close().await;
     577            0 :     }
     578              : 
     579              :     /// Delete timeline from disk completely, by removing timeline directory.
     580              :     ///
     581              :     /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
     582              :     /// deletion API endpoint is retriable.
     583              :     ///
     584              :     /// Timeline must be in shut-down state (i.e. call [`Self::close`] first)
     585            0 :     pub async fn delete(
     586            0 :         &self,
     587            0 :         shared_state: &mut WriteGuardSharedState<'_>,
     588            0 :         only_local: bool,
     589            0 :     ) -> Result<bool> {
     590            0 :         // Assert that [`Self::close`] was already called
     591            0 :         assert!(self.cancel.is_cancelled());
     592            0 :         assert!(self.gate.close_complete());
     593              : 
     594            0 :         info!("deleting timeline {} from disk", self.ttid);
     595              : 
     596              :         // Close associated FDs. Nobody will be able to touch timeline data once
     597              :         // it is cancelled, so WAL storage won't be opened again.
     598            0 :         shared_state.sk.close_wal_store();
     599            0 : 
     600            0 :         if !only_local && self.conf.is_wal_backup_enabled() {
     601              :             // Note: we concurrently delete remote storage data from multiple
     602              :             // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
     603              :             // do some retries anyway.
     604            0 :             wal_backup::delete_timeline(&self.ttid).await?;
     605            0 :         }
     606            0 :         let dir_existed = delete_dir(&self.timeline_dir).await?;
     607            0 :         Ok(dir_existed)
     608            0 :     }
     609              : 
     610              :     /// Returns if timeline is cancelled.
     611         2460 :     pub fn is_cancelled(&self) -> bool {
     612         2460 :         self.cancel.is_cancelled()
     613         2460 :     }
     614              : 
     615              :     /// Take a writing mutual exclusive lock on timeline shared_state.
     616         1238 :     pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
     617         1238 :         WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
     618         1238 :     }
     619              : 
     620           59 :     pub async fn read_shared_state(&self) -> ReadGuardSharedState {
     621           59 :         self.mutex.read().await
     622           59 :     }
     623              : 
     624              :     /// Returns commit_lsn watch channel.
     625            4 :     pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
     626            4 :         self.commit_lsn_watch_rx.clone()
     627            4 :     }
     628              : 
     629              :     /// Returns term_flush_lsn watch channel.
     630            0 :     pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
     631            0 :         self.term_flush_lsn_watch_rx.clone()
     632            0 :     }
     633              : 
     634              :     /// Returns watch channel for SharedState update version.
     635            4 :     pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
     636            4 :         self.shared_state_version_rx.clone()
     637            4 :     }
     638              : 
     639              :     /// Returns wal_seg_size.
     640            4 :     pub async fn get_wal_seg_size(&self) -> usize {
     641            4 :         self.read_shared_state().await.get_wal_seg_size()
     642            4 :     }
     643              : 
     644              :     /// Returns state of the timeline.
     645            8 :     pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
     646            8 :         let state = self.read_shared_state().await;
     647            8 :         (
     648            8 :             state.sk.state().inmem.clone(),
     649            8 :             TimelinePersistentState::clone(state.sk.state()),
     650            8 :         )
     651            8 :     }
     652              : 
     653              :     /// Returns latest backup_lsn.
     654            0 :     pub async fn get_wal_backup_lsn(&self) -> Lsn {
     655            0 :         self.read_shared_state().await.sk.state().inmem.backup_lsn
     656            0 :     }
     657              : 
     658              :     /// Sets backup_lsn to the given value.
     659            0 :     pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
     660            0 :         if self.is_cancelled() {
     661            0 :             bail!(TimelineError::Cancelled(self.ttid));
     662            0 :         }
     663              : 
     664            0 :         let mut state = self.write_shared_state().await;
     665            0 :         state.sk.state_mut().inmem.backup_lsn = max(state.sk.state().inmem.backup_lsn, backup_lsn);
     666            0 :         // we should check whether to shut down offloader, but this will be done
     667            0 :         // soon by peer communication anyway.
     668            0 :         Ok(())
     669            0 :     }
     670              : 
     671              :     /// Get safekeeper info for broadcasting to broker and other peers.
     672            0 :     pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
     673            0 :         let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
     674            0 :         let shared_state = self.read_shared_state().await;
     675            0 :         shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
     676            0 :     }
     677              : 
     678              :     /// Update timeline state with peer safekeeper data.
     679            0 :     pub async fn record_safekeeper_info(
     680            0 :         self: &Arc<Self>,
     681            0 :         sk_info: SafekeeperTimelineInfo,
     682            0 :     ) -> Result<()> {
     683              :         {
     684            0 :             let mut shared_state = self.write_shared_state().await;
     685            0 :             shared_state.sk.record_safekeeper_info(&sk_info).await?;
     686            0 :             let peer_info = peer_info_from_sk_info(&sk_info, Instant::now());
     687            0 :             shared_state.peers_info.upsert(&peer_info);
     688            0 :         }
     689            0 :         Ok(())
     690            0 :     }
     691              : 
     692            0 :     pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
     693            0 :         let shared_state = self.read_shared_state().await;
     694            0 :         shared_state.get_peers(conf.heartbeat_timeout)
     695            0 :     }
     696              : 
     697            4 :     pub fn get_walsenders(&self) -> &Arc<WalSenders> {
     698            4 :         &self.walsenders
     699            4 :     }
     700              : 
     701           14 :     pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
     702           14 :         &self.walreceivers
     703           14 :     }
     704              : 
     705              :     /// Returns flush_lsn.
     706            0 :     pub async fn get_flush_lsn(&self) -> Lsn {
     707            0 :         self.read_shared_state().await.sk.flush_lsn()
     708            0 :     }
     709              : 
     710              :     /// Gather timeline data for metrics.
     711            0 :     pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
     712            0 :         if self.is_cancelled() {
     713            0 :             return None;
     714            0 :         }
     715            0 : 
     716            0 :         let WalSendersTimelineMetricValues {
     717            0 :             ps_feedback_counter,
     718            0 :             last_ps_feedback,
     719            0 :             interpreted_wal_reader_tasks,
     720            0 :         } = self.walsenders.info_for_metrics();
     721              : 
     722            0 :         let state = self.read_shared_state().await;
     723            0 :         Some(FullTimelineInfo {
     724            0 :             ttid: self.ttid,
     725            0 :             ps_feedback_count: ps_feedback_counter,
     726            0 :             last_ps_feedback,
     727            0 :             wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
     728            0 :             timeline_is_active: self.broker_active.load(Ordering::Relaxed),
     729            0 :             num_computes: self.walreceivers.get_num() as u32,
     730            0 :             last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
     731            0 :             interpreted_wal_reader_tasks,
     732            0 :             epoch_start_lsn: state.sk.term_start_lsn(),
     733            0 :             mem_state: state.sk.state().inmem.clone(),
     734            0 :             persisted_state: TimelinePersistentState::clone(state.sk.state()),
     735            0 :             flush_lsn: state.sk.flush_lsn(),
     736            0 :             wal_storage: state.sk.wal_storage_metrics(),
     737            0 :         })
     738            0 :     }
     739              : 
     740              :     /// Returns in-memory timeline state to build a full debug dump.
     741            0 :     pub async fn memory_dump(&self) -> debug_dump::Memory {
     742            0 :         let state = self.read_shared_state().await;
     743              : 
     744            0 :         let (write_lsn, write_record_lsn, flush_lsn, file_open) =
     745            0 :             state.sk.wal_storage_internal_state();
     746            0 : 
     747            0 :         debug_dump::Memory {
     748            0 :             is_cancelled: self.is_cancelled(),
     749            0 :             peers_info_len: state.peers_info.0.len(),
     750            0 :             walsenders: self.walsenders.get_all_public(),
     751            0 :             wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
     752            0 :             active: self.broker_active.load(Ordering::Relaxed),
     753            0 :             num_computes: self.walreceivers.get_num() as u32,
     754            0 :             last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
     755            0 :             epoch_start_lsn: state.sk.term_start_lsn(),
     756            0 :             mem_state: state.sk.state().inmem.clone(),
     757            0 :             mgr_status: self.mgr_status.get(),
     758            0 :             write_lsn,
     759            0 :             write_record_lsn,
     760            0 :             flush_lsn,
     761            0 :             file_open,
     762            0 :         }
     763            0 :     }
     764              : 
     765              :     /// Apply a function to the control file state and persist it.
     766            0 :     pub async fn map_control_file<T>(
     767            0 :         self: &Arc<Self>,
     768            0 :         f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
     769            0 :     ) -> Result<T> {
     770            0 :         let mut state = self.write_shared_state().await;
     771            0 :         let mut persistent_state = state.sk.state_mut().start_change();
     772              :         // If f returns error, we abort the change and don't persist anything.
     773            0 :         let res = f(&mut persistent_state)?;
     774              :         // If persisting fails, we abort the change and return error.
     775            0 :         state
     776            0 :             .sk
     777            0 :             .state_mut()
     778            0 :             .finish_change(&persistent_state)
     779            0 :             .await?;
     780            0 :         Ok(res)
     781            0 :     }
     782              : 
     783            0 :     pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
     784            0 :         let mut state = self.write_shared_state().await;
     785            0 :         state.sk.term_bump(to).await
     786            0 :     }
     787              : 
     788            0 :     pub async fn membership_switch(
     789            0 :         self: &Arc<Self>,
     790            0 :         to: Configuration,
     791            0 :     ) -> Result<TimelineMembershipSwitchResponse> {
     792            0 :         let mut state = self.write_shared_state().await;
     793            0 :         state.sk.membership_switch(to).await
     794            0 :     }
     795              : 
     796              :     /// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
     797            8 :     async fn do_wal_residence_guard(
     798            8 :         self: &Arc<Self>,
     799            8 :         block: bool,
     800            8 :     ) -> Result<Option<WalResidentTimeline>> {
     801            8 :         let op_label = if block {
     802            8 :             "wal_residence_guard"
     803              :         } else {
     804            0 :             "try_wal_residence_guard"
     805              :         };
     806              : 
     807            8 :         if self.is_cancelled() {
     808            0 :             bail!(TimelineError::Cancelled(self.ttid));
     809            8 :         }
     810            8 : 
     811            8 :         debug!("requesting WalResidentTimeline guard");
     812            8 :         let started_at = Instant::now();
     813            8 :         let status_before = self.mgr_status.get();
     814              : 
     815              :         // Wait 30 seconds for the guard to be acquired. It can time out if someone is
     816              :         // holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
     817              :         // is stuck.
     818            8 :         let res = tokio::time::timeout_at(started_at + Duration::from_secs(30), async {
     819            8 :             if block {
     820            8 :                 self.manager_ctl.wal_residence_guard().await.map(Some)
     821              :             } else {
     822            0 :                 self.manager_ctl.try_wal_residence_guard().await
     823              :             }
     824            8 :         })
     825            8 :         .await;
     826              : 
     827            8 :         let guard = match res {
     828            8 :             Ok(Ok(guard)) => {
     829            8 :                 let finished_at = Instant::now();
     830            8 :                 let elapsed = finished_at - started_at;
     831            8 :                 MISC_OPERATION_SECONDS
     832            8 :                     .with_label_values(&[op_label])
     833            8 :                     .observe(elapsed.as_secs_f64());
     834            8 : 
     835            8 :                 guard
     836              :             }
     837            0 :             Ok(Err(e)) => {
     838            0 :                 warn!(
     839            0 :                     "error acquiring in {op_label}, statuses {:?} => {:?}",
     840            0 :                     status_before,
     841            0 :                     self.mgr_status.get()
     842              :                 );
     843            0 :                 return Err(e);
     844              :             }
     845              :             Err(_) => {
     846            0 :                 warn!(
     847            0 :                     "timeout acquiring in {op_label} guard, statuses {:?} => {:?}",
     848            0 :                     status_before,
     849            0 :                     self.mgr_status.get()
     850              :                 );
     851            0 :                 anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
     852              :             }
     853              :         };
     854              : 
     855            8 :         Ok(guard.map(|g| WalResidentTimeline::new(self.clone(), g)))
     856            8 :     }
     857              : 
     858              :     /// Get the timeline guard for reading/writing WAL files.
     859              :     /// If WAL files are not present on disk (evicted), they will be automatically
     860              :     /// downloaded from remote storage. This is done in the manager task, which is
     861              :     /// responsible for issuing all guards.
     862              :     ///
     863              :     /// NB: don't use this function from timeline_manager, it will deadlock.
     864              :     /// NB: don't use this function while holding shared_state lock.
     865            8 :     pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
     866            8 :         self.do_wal_residence_guard(true)
     867            8 :             .await
     868            8 :             .map(|m| m.expect("Always get Some in block=true mode"))
     869            8 :     }
     870              : 
     871              :     /// Get the timeline guard for reading/writing WAL files if the timeline is resident,
     872              :     /// else return None
     873            0 :     pub(crate) async fn try_wal_residence_guard(
     874            0 :         self: &Arc<Self>,
     875            0 :     ) -> Result<Option<WalResidentTimeline>> {
     876            0 :         self.do_wal_residence_guard(false).await
     877            0 :     }
     878              : 
     879            0 :     pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
     880            0 :         self.manager_ctl.backup_partial_reset().await
     881            0 :     }
     882              : }
     883              : 
     884              : /// This is a guard that allows to read/write disk timeline state.
     885              : /// All tasks that are trying to read/write WAL from disk should use this guard.
     886              : pub struct WalResidentTimeline {
     887              :     pub tli: Arc<Timeline>,
     888              :     _guard: ResidenceGuard,
     889              : }
     890              : 
     891              : impl WalResidentTimeline {
     892           12 :     pub fn new(tli: Arc<Timeline>, _guard: ResidenceGuard) -> Self {
     893           12 :         WalResidentTimeline { tli, _guard }
     894           12 :     }
     895              : }
     896              : 
     897              : impl Deref for WalResidentTimeline {
     898              :     type Target = Arc<Timeline>;
     899              : 
     900         5566 :     fn deref(&self) -> &Self::Target {
     901         5566 :         &self.tli
     902         5566 :     }
     903              : }
     904              : 
     905              : impl WalResidentTimeline {
     906              :     /// Returns true if walsender should stop sending WAL to pageserver. We
     907              :     /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
     908              :     /// computes. While there might be nothing to stream already, we learn about
     909              :     /// remote_consistent_lsn update through replication feedback, and we want
     910              :     /// to stop pushing to the broker if pageserver is fully caughtup.
     911            0 :     pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
     912            0 :         if self.is_cancelled() {
     913            0 :             return true;
     914            0 :         }
     915            0 :         let shared_state = self.read_shared_state().await;
     916            0 :         if self.walreceivers.get_num() == 0 {
     917            0 :             return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
     918            0 :             reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
     919            0 :         }
     920            0 :         false
     921            0 :     }
     922              : 
     923              :     /// Ensure that current term is t, erroring otherwise, and lock the state.
     924            0 :     pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
     925            0 :         let ss = self.read_shared_state().await;
     926            0 :         if ss.sk.state().acceptor_state.term != t {
     927            0 :             bail!(
     928            0 :                 "failed to acquire term {}, current term {}",
     929            0 :                 t,
     930            0 :                 ss.sk.state().acceptor_state.term
     931            0 :             );
     932            0 :         }
     933            0 :         Ok(ss)
     934            0 :     }
     935              : 
     936              :     /// Pass arrived message to the safekeeper.
     937         1220 :     pub async fn process_msg(
     938         1220 :         &self,
     939         1220 :         msg: &ProposerAcceptorMessage,
     940         1220 :     ) -> Result<Option<AcceptorProposerMessage>> {
     941         1220 :         if self.is_cancelled() {
     942            0 :             bail!(TimelineError::Cancelled(self.ttid));
     943         1220 :         }
     944              : 
     945              :         let mut rmsg: Option<AcceptorProposerMessage>;
     946              :         {
     947         1220 :             let mut shared_state = self.write_shared_state().await;
     948         1220 :             rmsg = shared_state.sk.safekeeper().process_msg(msg).await?;
     949              : 
     950              :             // if this is AppendResponse, fill in proper hot standby feedback.
     951          610 :             if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
     952          610 :                 resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
     953          610 :             }
     954              :         }
     955         1220 :         Ok(rmsg)
     956         1220 :     }
     957              : 
     958            8 :     pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
     959            8 :         let (_, persisted_state) = self.get_state().await;
     960            8 :         let enable_remote_read = self.conf.is_wal_backup_enabled();
     961            8 : 
     962            8 :         WalReader::new(
     963            8 :             &self.ttid,
     964            8 :             self.timeline_dir.clone(),
     965            8 :             &persisted_state,
     966            8 :             start_lsn,
     967            8 :             enable_remote_read,
     968            8 :         )
     969            8 :     }
     970              : 
     971            0 :     pub fn get_timeline_dir(&self) -> Utf8PathBuf {
     972            0 :         self.timeline_dir.clone()
     973            0 :     }
     974              : 
     975              :     /// Update in memory remote consistent lsn.
     976            0 :     pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
     977            0 :         let mut shared_state = self.write_shared_state().await;
     978            0 :         shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
     979            0 :             shared_state.sk.state().inmem.remote_consistent_lsn,
     980            0 :             candidate,
     981            0 :         );
     982            0 :     }
     983              : }
     984              : 
     985              : /// This struct contains methods that are used by timeline manager task.
     986              : pub(crate) struct ManagerTimeline {
     987              :     pub(crate) tli: Arc<Timeline>,
     988              : }
     989              : 
     990              : impl Deref for ManagerTimeline {
     991              :     type Target = Arc<Timeline>;
     992              : 
     993          445 :     fn deref(&self) -> &Self::Target {
     994          445 :         &self.tli
     995          445 :     }
     996              : }
     997              : 
     998              : impl ManagerTimeline {
     999            0 :     pub(crate) fn timeline_dir(&self) -> &Utf8PathBuf {
    1000            0 :         &self.tli.timeline_dir
    1001            0 :     }
    1002              : 
    1003              :     /// Manager requests this state on startup.
    1004            4 :     pub(crate) async fn bootstrap_mgr(&self) -> (bool, Option<PartialRemoteSegment>) {
    1005            4 :         let shared_state = self.read_shared_state().await;
    1006            4 :         let is_offloaded = matches!(
    1007            4 :             shared_state.sk.state().eviction_state,
    1008              :             EvictionState::Offloaded(_)
    1009              :         );
    1010            4 :         let partial_backup_uploaded = shared_state.sk.state().partial_backup.uploaded_segment();
    1011            4 : 
    1012            4 :         (is_offloaded, partial_backup_uploaded)
    1013            4 :     }
    1014              : 
    1015              :     /// Try to switch state Present->Offloaded.
    1016            0 :     pub(crate) async fn switch_to_offloaded(
    1017            0 :         &self,
    1018            0 :         partial: &PartialRemoteSegment,
    1019            0 :     ) -> anyhow::Result<()> {
    1020            0 :         let mut shared = self.write_shared_state().await;
    1021              : 
    1022              :         // updating control file
    1023            0 :         let mut pstate = shared.sk.state_mut().start_change();
    1024              : 
    1025            0 :         if !matches!(pstate.eviction_state, EvictionState::Present) {
    1026            0 :             bail!(
    1027            0 :                 "cannot switch to offloaded state, current state is {:?}",
    1028            0 :                 pstate.eviction_state
    1029            0 :             );
    1030            0 :         }
    1031            0 : 
    1032            0 :         if partial.flush_lsn != shared.sk.flush_lsn() {
    1033            0 :             bail!(
    1034            0 :                 "flush_lsn mismatch in partial backup, expected {}, got {}",
    1035            0 :                 shared.sk.flush_lsn(),
    1036            0 :                 partial.flush_lsn
    1037            0 :             );
    1038            0 :         }
    1039            0 : 
    1040            0 :         if partial.commit_lsn != pstate.commit_lsn {
    1041            0 :             bail!(
    1042            0 :                 "commit_lsn mismatch in partial backup, expected {}, got {}",
    1043            0 :                 pstate.commit_lsn,
    1044            0 :                 partial.commit_lsn
    1045            0 :             );
    1046            0 :         }
    1047            0 : 
    1048            0 :         if partial.term != shared.sk.last_log_term() {
    1049            0 :             bail!(
    1050            0 :                 "term mismatch in partial backup, expected {}, got {}",
    1051            0 :                 shared.sk.last_log_term(),
    1052            0 :                 partial.term
    1053            0 :             );
    1054            0 :         }
    1055            0 : 
    1056            0 :         pstate.eviction_state = EvictionState::Offloaded(shared.sk.flush_lsn());
    1057            0 :         shared.sk.state_mut().finish_change(&pstate).await?;
    1058              :         // control file is now switched to Offloaded state
    1059              : 
    1060              :         // now we can switch shared.sk to Offloaded, shouldn't fail
    1061            0 :         let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
    1062            0 :         let cfile_state = prev_sk.take_state();
    1063            0 :         shared.sk = StateSK::Offloaded(Box::new(cfile_state));
    1064            0 : 
    1065            0 :         Ok(())
    1066            0 :     }
    1067              : 
    1068              :     /// Try to switch state Offloaded->Present.
    1069            0 :     pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
    1070            0 :         let mut shared = self.write_shared_state().await;
    1071              : 
    1072              :         // trying to restore WAL storage
    1073            0 :         let wal_store = wal_storage::PhysicalStorage::new(
    1074            0 :             &self.ttid,
    1075            0 :             &self.timeline_dir,
    1076            0 :             shared.sk.state(),
    1077            0 :             self.conf.no_sync,
    1078            0 :         )?;
    1079              : 
    1080              :         // updating control file
    1081            0 :         let mut pstate = shared.sk.state_mut().start_change();
    1082              : 
    1083            0 :         if !matches!(pstate.eviction_state, EvictionState::Offloaded(_)) {
    1084            0 :             bail!(
    1085            0 :                 "cannot switch to present state, current state is {:?}",
    1086            0 :                 pstate.eviction_state
    1087            0 :             );
    1088            0 :         }
    1089            0 : 
    1090            0 :         if wal_store.flush_lsn() != shared.sk.flush_lsn() {
    1091            0 :             bail!(
    1092            0 :                 "flush_lsn mismatch in restored WAL, expected {}, got {}",
    1093            0 :                 shared.sk.flush_lsn(),
    1094            0 :                 wal_store.flush_lsn()
    1095            0 :             );
    1096            0 :         }
    1097            0 : 
    1098            0 :         pstate.eviction_state = EvictionState::Present;
    1099            0 :         shared.sk.state_mut().finish_change(&pstate).await?;
    1100              : 
    1101              :         // now we can switch shared.sk to Present, shouldn't fail
    1102            0 :         let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
    1103            0 :         let cfile_state = prev_sk.take_state();
    1104            0 :         shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, self.conf.my_id)?);
    1105              : 
    1106            0 :         Ok(())
    1107            0 :     }
    1108              : 
    1109              :     /// Update current manager state, useful for debugging manager deadlocks.
    1110          239 :     pub(crate) fn set_status(&self, status: timeline_manager::Status) {
    1111          239 :         self.mgr_status.store(status, Ordering::Relaxed);
    1112          239 :     }
    1113              : }
    1114              : 
    1115              : /// Deletes directory and it's contents. Returns false if directory does not exist.
    1116            0 : pub async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
    1117            0 :     match fs::remove_dir_all(path).await {
    1118            0 :         Ok(_) => Ok(true),
    1119            0 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
    1120            0 :         Err(e) => Err(e.into()),
    1121              :     }
    1122            0 : }
    1123              : 
    1124              : /// Get a path to the tenant directory. If you just need to get a timeline directory,
    1125              : /// use WalResidentTimeline::get_timeline_dir instead.
    1126            8 : pub fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
    1127            8 :     conf.workdir.join(tenant_id.to_string())
    1128            8 : }
    1129              : 
    1130              : /// Get a path to the timeline directory. If you need to read WAL files from disk,
    1131              : /// use WalResidentTimeline::get_timeline_dir instead. This function does not check
    1132              : /// timeline eviction status and WAL files might not be present on disk.
    1133            8 : pub fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
    1134            8 :     get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
    1135            8 : }
        

Generated by: LCOV version 2.1-beta