LCOV - code coverage report
Current view: top level - safekeeper/src - timeline.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 0.8 % 746 6
Test Date: 2024-10-22 22:13:45 Functions: 1.7 % 115 2

            Line data    Source code
       1              : //! This module implements Timeline lifecycle management and has all necessary code
       2              : //! to glue together SafeKeeper and all other background services.
       3              : 
       4              : use anyhow::{anyhow, bail, Result};
       5              : use camino::Utf8PathBuf;
       6              : use remote_storage::RemotePath;
       7              : use safekeeper_api::models::TimelineTermBumpResponse;
       8              : use serde::{Deserialize, Serialize};
       9              : use tokio::fs::{self};
      10              : use tokio_util::sync::CancellationToken;
      11              : use utils::id::TenantId;
      12              : 
      13              : use std::cmp::max;
      14              : use std::ops::{Deref, DerefMut};
      15              : use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
      16              : use std::sync::Arc;
      17              : use std::time::Duration;
      18              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      19              : use tokio::{sync::watch, time::Instant};
      20              : use tracing::*;
      21              : use utils::http::error::ApiError;
      22              : use utils::{
      23              :     id::{NodeId, TenantTimelineId},
      24              :     lsn::Lsn,
      25              : };
      26              : 
      27              : use storage_broker::proto::SafekeeperTimelineInfo;
      28              : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      29              : 
      30              : use crate::rate_limit::RateLimiter;
      31              : use crate::receive_wal::WalReceivers;
      32              : use crate::safekeeper::{
      33              :     AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
      34              :     INVALID_TERM,
      35              : };
      36              : use crate::send_wal::WalSenders;
      37              : use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
      38              : use crate::timeline_guard::ResidenceGuard;
      39              : use crate::timeline_manager::{AtomicStatus, ManagerCtl};
      40              : use crate::timelines_set::TimelinesSet;
      41              : use crate::wal_backup::{self, remote_timeline_path};
      42              : use crate::wal_backup_partial::PartialRemoteSegment;
      43              : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
      44              : 
      45              : use crate::metrics::{FullTimelineInfo, WalStorageMetrics, MISC_OPERATION_SECONDS};
      46              : use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
      47              : use crate::{debug_dump, timeline_manager, wal_storage};
      48              : use crate::{GlobalTimelines, SafeKeeperConf};
      49              : 
      50              : /// Things safekeeper should know about timeline state on peers.
      51            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      52              : pub struct PeerInfo {
      53              :     pub sk_id: NodeId,
      54              :     pub term: Term,
      55              :     /// Term of the last entry.
      56              :     pub last_log_term: Term,
      57              :     /// LSN of the last record.
      58              :     pub flush_lsn: Lsn,
      59              :     pub commit_lsn: Lsn,
      60              :     /// Since which LSN safekeeper has WAL.
      61              :     pub local_start_lsn: Lsn,
      62              :     /// When info was received. Serde annotations are not very useful but make
      63              :     /// the code compile -- we don't rely on this field externally.
      64              :     #[serde(skip)]
      65              :     #[serde(default = "Instant::now")]
      66              :     ts: Instant,
      67              :     pub pg_connstr: String,
      68              :     pub http_connstr: String,
      69              : }
      70              : 
      71              : impl PeerInfo {
      72            0 :     fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
      73            0 :         PeerInfo {
      74            0 :             sk_id: NodeId(sk_info.safekeeper_id),
      75            0 :             term: sk_info.term,
      76            0 :             last_log_term: sk_info.last_log_term,
      77            0 :             flush_lsn: Lsn(sk_info.flush_lsn),
      78            0 :             commit_lsn: Lsn(sk_info.commit_lsn),
      79            0 :             local_start_lsn: Lsn(sk_info.local_start_lsn),
      80            0 :             pg_connstr: sk_info.safekeeper_connstr.clone(),
      81            0 :             http_connstr: sk_info.http_connstr.clone(),
      82            0 :             ts,
      83            0 :         }
      84            0 :     }
      85              : }
      86              : 
      87              : // vector-based node id -> peer state map with very limited functionality we
      88              : // need.
      89              : #[derive(Debug, Clone, Default)]
      90              : pub struct PeersInfo(pub Vec<PeerInfo>);
      91              : 
      92              : impl PeersInfo {
      93            0 :     fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
      94            0 :         self.0.iter_mut().find(|p| p.sk_id == id)
      95            0 :     }
      96              : 
      97            0 :     fn upsert(&mut self, p: &PeerInfo) {
      98            0 :         match self.get(p.sk_id) {
      99            0 :             Some(rp) => *rp = p.clone(),
     100            0 :             None => self.0.push(p.clone()),
     101              :         }
     102            0 :     }
     103              : }
     104              : 
     105              : pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
     106              : 
     107              : /// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
     108              : /// automatically updates `watch::Sender` channels with state on drop.
     109              : pub struct WriteGuardSharedState<'a> {
     110              :     tli: Arc<Timeline>,
     111              :     guard: RwLockWriteGuard<'a, SharedState>,
     112              :     skip_update: bool,
     113              : }
     114              : 
     115              : impl<'a> WriteGuardSharedState<'a> {
     116            0 :     fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
     117            0 :         WriteGuardSharedState {
     118            0 :             tli,
     119            0 :             guard,
     120            0 :             skip_update: false,
     121            0 :         }
     122            0 :     }
     123              : }
     124              : 
     125              : impl Deref for WriteGuardSharedState<'_> {
     126              :     type Target = SharedState;
     127              : 
     128            0 :     fn deref(&self) -> &Self::Target {
     129            0 :         &self.guard
     130            0 :     }
     131              : }
     132              : 
     133              : impl DerefMut for WriteGuardSharedState<'_> {
     134            0 :     fn deref_mut(&mut self) -> &mut Self::Target {
     135            0 :         &mut self.guard
     136            0 :     }
     137              : }
     138              : 
     139              : impl Drop for WriteGuardSharedState<'_> {
     140            0 :     fn drop(&mut self) {
     141            0 :         let term_flush_lsn =
     142            0 :             TermLsn::from((self.guard.sk.last_log_term(), self.guard.sk.flush_lsn()));
     143            0 :         let commit_lsn = self.guard.sk.state().inmem.commit_lsn;
     144            0 : 
     145            0 :         let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
     146            0 :             if *old != term_flush_lsn {
     147            0 :                 *old = term_flush_lsn;
     148            0 :                 true
     149              :             } else {
     150            0 :                 false
     151              :             }
     152            0 :         });
     153            0 : 
     154            0 :         let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
     155            0 :             if *old != commit_lsn {
     156            0 :                 *old = commit_lsn;
     157            0 :                 true
     158              :             } else {
     159            0 :                 false
     160              :             }
     161            0 :         });
     162            0 : 
     163            0 :         if !self.skip_update {
     164            0 :             // send notification about shared state update
     165            0 :             self.tli.shared_state_version_tx.send_modify(|old| {
     166            0 :                 *old += 1;
     167            0 :             });
     168            0 :         }
     169            0 :     }
     170              : }
     171              : 
     172              : /// This structure is stored in shared state and represents the state of the timeline.
     173              : ///
     174              : /// Usually it holds SafeKeeper, but it also supports offloaded timeline state. In this
     175              : /// case, SafeKeeper is not available (because WAL is not present on disk) and all
     176              : /// operations can be done only with control file.
     177              : pub enum StateSK {
     178              :     Loaded(SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>),
     179              :     Offloaded(Box<TimelineState<control_file::FileStorage>>),
     180              :     // Not used, required for moving between states.
     181              :     Empty,
     182              : }
     183              : 
     184              : impl StateSK {
     185            0 :     pub fn flush_lsn(&self) -> Lsn {
     186            0 :         match self {
     187            0 :             StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
     188            0 :             StateSK::Offloaded(state) => match state.eviction_state {
     189            0 :                 EvictionState::Offloaded(flush_lsn) => flush_lsn,
     190            0 :                 _ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
     191              :             },
     192            0 :             StateSK::Empty => unreachable!(),
     193              :         }
     194            0 :     }
     195              : 
     196              :     /// Get a reference to the control file's timeline state.
     197            0 :     pub fn state(&self) -> &TimelineState<control_file::FileStorage> {
     198            0 :         match self {
     199            0 :             StateSK::Loaded(sk) => &sk.state,
     200            0 :             StateSK::Offloaded(ref s) => s,
     201            0 :             StateSK::Empty => unreachable!(),
     202              :         }
     203            0 :     }
     204              : 
     205            0 :     pub fn state_mut(&mut self) -> &mut TimelineState<control_file::FileStorage> {
     206            0 :         match self {
     207            0 :             StateSK::Loaded(sk) => &mut sk.state,
     208            0 :             StateSK::Offloaded(ref mut s) => s,
     209            0 :             StateSK::Empty => unreachable!(),
     210              :         }
     211            0 :     }
     212              : 
     213            0 :     pub fn last_log_term(&self) -> Term {
     214            0 :         self.state()
     215            0 :             .acceptor_state
     216            0 :             .get_last_log_term(self.flush_lsn())
     217            0 :     }
     218              : 
     219            0 :     pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
     220            0 :         self.state_mut().term_bump(to).await
     221            0 :     }
     222              : 
     223              :     /// Close open WAL files to release FDs.
     224            0 :     fn close_wal_store(&mut self) {
     225            0 :         if let StateSK::Loaded(sk) = self {
     226            0 :             sk.wal_store.close();
     227            0 :         }
     228            0 :     }
     229              : 
     230              :     /// Update timeline state with peer safekeeper data.
     231            0 :     pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
     232            0 :         // update commit_lsn if safekeeper is loaded
     233            0 :         match self {
     234            0 :             StateSK::Loaded(sk) => sk.record_safekeeper_info(sk_info).await?,
     235            0 :             StateSK::Offloaded(_) => {}
     236            0 :             StateSK::Empty => unreachable!(),
     237              :         }
     238              : 
     239              :         // update everything else, including remote_consistent_lsn and backup_lsn
     240            0 :         let mut sync_control_file = false;
     241            0 :         let state = self.state_mut();
     242            0 :         let wal_seg_size = state.server.wal_seg_size as u64;
     243            0 : 
     244            0 :         state.inmem.backup_lsn = max(Lsn(sk_info.backup_lsn), state.inmem.backup_lsn);
     245            0 :         sync_control_file |= state.backup_lsn + wal_seg_size < state.inmem.backup_lsn;
     246            0 : 
     247            0 :         state.inmem.remote_consistent_lsn = max(
     248            0 :             Lsn(sk_info.remote_consistent_lsn),
     249            0 :             state.inmem.remote_consistent_lsn,
     250            0 :         );
     251            0 :         sync_control_file |=
     252            0 :             state.remote_consistent_lsn + wal_seg_size < state.inmem.remote_consistent_lsn;
     253            0 : 
     254            0 :         state.inmem.peer_horizon_lsn =
     255            0 :             max(Lsn(sk_info.peer_horizon_lsn), state.inmem.peer_horizon_lsn);
     256            0 :         sync_control_file |= state.peer_horizon_lsn + wal_seg_size < state.inmem.peer_horizon_lsn;
     257            0 : 
     258            0 :         if sync_control_file {
     259            0 :             state.flush().await?;
     260            0 :         }
     261            0 :         Ok(())
     262            0 :     }
     263              : 
     264              :     /// Previously known as epoch_start_lsn. Needed only for reference in some APIs.
     265            0 :     pub fn term_start_lsn(&self) -> Lsn {
     266            0 :         match self {
     267            0 :             StateSK::Loaded(sk) => sk.term_start_lsn,
     268            0 :             StateSK::Offloaded(_) => Lsn(0),
     269            0 :             StateSK::Empty => unreachable!(),
     270              :         }
     271            0 :     }
     272              : 
     273              :     /// Used for metrics only.
     274            0 :     pub fn wal_storage_metrics(&self) -> WalStorageMetrics {
     275            0 :         match self {
     276            0 :             StateSK::Loaded(sk) => sk.wal_store.get_metrics(),
     277            0 :             StateSK::Offloaded(_) => WalStorageMetrics::default(),
     278            0 :             StateSK::Empty => unreachable!(),
     279              :         }
     280            0 :     }
     281              : 
     282              :     /// Returns WAL storage internal LSNs for debug dump.
     283            0 :     pub fn wal_storage_internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
     284            0 :         match self {
     285            0 :             StateSK::Loaded(sk) => sk.wal_store.internal_state(),
     286              :             StateSK::Offloaded(_) => {
     287            0 :                 let flush_lsn = self.flush_lsn();
     288            0 :                 (flush_lsn, flush_lsn, flush_lsn, false)
     289              :             }
     290            0 :             StateSK::Empty => unreachable!(),
     291              :         }
     292            0 :     }
     293              : 
     294              :     /// Access to SafeKeeper object. Panics if offloaded, should be good to use from WalResidentTimeline.
     295            0 :     pub fn safekeeper(
     296            0 :         &mut self,
     297            0 :     ) -> &mut SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage> {
     298            0 :         match self {
     299            0 :             StateSK::Loaded(sk) => sk,
     300              :             StateSK::Offloaded(_) => {
     301            0 :                 panic!("safekeeper is offloaded, cannot be used")
     302              :             }
     303            0 :             StateSK::Empty => unreachable!(),
     304              :         }
     305            0 :     }
     306              : 
     307              :     /// Moves control file's state structure out of the enum. Used to switch states.
     308            0 :     fn take_state(self) -> TimelineState<control_file::FileStorage> {
     309            0 :         match self {
     310            0 :             StateSK::Loaded(sk) => sk.state,
     311            0 :             StateSK::Offloaded(state) => *state,
     312            0 :             StateSK::Empty => unreachable!(),
     313              :         }
     314            0 :     }
     315              : }
     316              : 
     317              : /// Shared state associated with database instance
     318              : pub struct SharedState {
     319              :     /// Safekeeper object
     320              :     pub(crate) sk: StateSK,
     321              :     /// In memory list containing state of peers sent in latest messages from them.
     322              :     pub(crate) peers_info: PeersInfo,
     323              :     // True value hinders old WAL removal; this is used by snapshotting. We
     324              :     // could make it a counter, but there is no need to.
     325              :     pub(crate) wal_removal_on_hold: bool,
     326              : }
     327              : 
     328              : impl SharedState {
     329              :     /// Initialize fresh timeline state without persisting anything to disk.
     330            0 :     fn create_new(
     331            0 :         conf: &SafeKeeperConf,
     332            0 :         ttid: &TenantTimelineId,
     333            0 :         state: TimelinePersistentState,
     334            0 :     ) -> Result<Self> {
     335            0 :         if state.server.wal_seg_size == 0 {
     336            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     337            0 :         }
     338            0 : 
     339            0 :         if state.server.pg_version == UNKNOWN_SERVER_VERSION {
     340            0 :             bail!(TimelineError::UninitialinzedPgVersion(*ttid));
     341            0 :         }
     342            0 : 
     343            0 :         if state.commit_lsn < state.local_start_lsn {
     344            0 :             bail!(
     345            0 :                 "commit_lsn {} is higher than local_start_lsn {}",
     346            0 :                 state.commit_lsn,
     347            0 :                 state.local_start_lsn
     348            0 :             );
     349            0 :         }
     350            0 : 
     351            0 :         // We don't want to write anything to disk, because we may have existing timeline there.
     352            0 :         // These functions should not change anything on disk.
     353            0 :         let timeline_dir = get_timeline_dir(conf, ttid);
     354            0 :         let control_store =
     355            0 :             control_file::FileStorage::create_new(timeline_dir.clone(), conf, state)?;
     356            0 :         let wal_store =
     357            0 :             wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
     358            0 :         let sk = SafeKeeper::new(TimelineState::new(control_store), wal_store, conf.my_id)?;
     359              : 
     360            0 :         Ok(Self {
     361            0 :             sk: StateSK::Loaded(sk),
     362            0 :             peers_info: PeersInfo(vec![]),
     363            0 :             wal_removal_on_hold: false,
     364            0 :         })
     365            0 :     }
     366              : 
     367              :     /// Restore SharedState from control file. If file doesn't exist, bails out.
     368            0 :     fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
     369            0 :         let timeline_dir = get_timeline_dir(conf, ttid);
     370            0 :         let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
     371            0 :         if control_store.server.wal_seg_size == 0 {
     372            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     373            0 :         }
     374              : 
     375            0 :         let sk = match control_store.eviction_state {
     376              :             EvictionState::Present => {
     377            0 :                 let wal_store =
     378            0 :                     wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
     379            0 :                 StateSK::Loaded(SafeKeeper::new(
     380            0 :                     TimelineState::new(control_store),
     381            0 :                     wal_store,
     382            0 :                     conf.my_id,
     383            0 :                 )?)
     384              :             }
     385              :             EvictionState::Offloaded(_) => {
     386            0 :                 StateSK::Offloaded(Box::new(TimelineState::new(control_store)))
     387              :             }
     388              :         };
     389              : 
     390            0 :         Ok(Self {
     391            0 :             sk,
     392            0 :             peers_info: PeersInfo(vec![]),
     393            0 :             wal_removal_on_hold: false,
     394            0 :         })
     395            0 :     }
     396              : 
     397            0 :     pub(crate) fn get_wal_seg_size(&self) -> usize {
     398            0 :         self.sk.state().server.wal_seg_size as usize
     399            0 :     }
     400              : 
     401            0 :     fn get_safekeeper_info(
     402            0 :         &self,
     403            0 :         ttid: &TenantTimelineId,
     404            0 :         conf: &SafeKeeperConf,
     405            0 :         standby_apply_lsn: Lsn,
     406            0 :     ) -> SafekeeperTimelineInfo {
     407            0 :         SafekeeperTimelineInfo {
     408            0 :             safekeeper_id: conf.my_id.0,
     409            0 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     410            0 :                 tenant_id: ttid.tenant_id.as_ref().to_owned(),
     411            0 :                 timeline_id: ttid.timeline_id.as_ref().to_owned(),
     412            0 :             }),
     413            0 :             term: self.sk.state().acceptor_state.term,
     414            0 :             last_log_term: self.sk.last_log_term(),
     415            0 :             flush_lsn: self.sk.flush_lsn().0,
     416            0 :             // note: this value is not flushed to control file yet and can be lost
     417            0 :             commit_lsn: self.sk.state().inmem.commit_lsn.0,
     418            0 :             remote_consistent_lsn: self.sk.state().inmem.remote_consistent_lsn.0,
     419            0 :             peer_horizon_lsn: self.sk.state().inmem.peer_horizon_lsn.0,
     420            0 :             safekeeper_connstr: conf
     421            0 :                 .advertise_pg_addr
     422            0 :                 .to_owned()
     423            0 :                 .unwrap_or(conf.listen_pg_addr.clone()),
     424            0 :             http_connstr: conf.listen_http_addr.to_owned(),
     425            0 :             backup_lsn: self.sk.state().inmem.backup_lsn.0,
     426            0 :             local_start_lsn: self.sk.state().local_start_lsn.0,
     427            0 :             availability_zone: conf.availability_zone.clone(),
     428            0 :             standby_horizon: standby_apply_lsn.0,
     429            0 :         }
     430            0 :     }
     431              : 
     432              :     /// Get our latest view of alive peers status on the timeline.
     433              :     /// We pass our own info through the broker as well, so when we don't have connection
     434              :     /// to the broker returned vec is empty.
     435            0 :     pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
     436            0 :         let now = Instant::now();
     437            0 :         self.peers_info
     438            0 :             .0
     439            0 :             .iter()
     440            0 :             // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
     441            0 :             .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
     442            0 :             .cloned()
     443            0 :             .collect()
     444            0 :     }
     445              : }
     446              : 
     447            0 : #[derive(Debug, thiserror::Error)]
     448              : pub enum TimelineError {
     449              :     #[error("Timeline {0} was cancelled and cannot be used anymore")]
     450              :     Cancelled(TenantTimelineId),
     451              :     #[error("Timeline {0} was not found in global map")]
     452              :     NotFound(TenantTimelineId),
     453              :     #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
     454              :     Invalid(TenantTimelineId),
     455              :     #[error("Timeline {0} is already exists")]
     456              :     AlreadyExists(TenantTimelineId),
     457              :     #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
     458              :     UninitializedWalSegSize(TenantTimelineId),
     459              :     #[error("Timeline {0} is not initialized, pg_version is unknown")]
     460              :     UninitialinzedPgVersion(TenantTimelineId),
     461              : }
     462              : 
     463              : // Convert to HTTP API error.
     464              : impl From<TimelineError> for ApiError {
     465            0 :     fn from(te: TimelineError) -> ApiError {
     466            0 :         match te {
     467            0 :             TimelineError::NotFound(ttid) => {
     468            0 :                 ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
     469              :             }
     470            0 :             _ => ApiError::InternalServerError(anyhow!("{}", te)),
     471              :         }
     472            0 :     }
     473              : }
     474              : 
     475              : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
     476              : /// It also holds SharedState and provides mutually exclusive access to it.
     477              : pub struct Timeline {
     478              :     pub ttid: TenantTimelineId,
     479              :     pub remote_path: RemotePath,
     480              : 
     481              :     /// Used to broadcast commit_lsn updates to all background jobs.
     482              :     commit_lsn_watch_tx: watch::Sender<Lsn>,
     483              :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     484              : 
     485              :     /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
     486              :     /// them when sending in recovery mode (to walproposer or peers). Note: this
     487              :     /// is just a notification, WAL reading should always done with lock held as
     488              :     /// term can change otherwise.
     489              :     term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
     490              :     term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
     491              : 
     492              :     /// Broadcasts shared state updates.
     493              :     shared_state_version_tx: watch::Sender<usize>,
     494              :     shared_state_version_rx: watch::Receiver<usize>,
     495              : 
     496              :     /// Safekeeper and other state, that should remain consistent and
     497              :     /// synchronized with the disk. This is tokio mutex as we write WAL to disk
     498              :     /// while holding it, ensuring that consensus checks are in order.
     499              :     mutex: RwLock<SharedState>,
     500              :     walsenders: Arc<WalSenders>,
     501              :     walreceivers: Arc<WalReceivers>,
     502              :     timeline_dir: Utf8PathBuf,
     503              :     manager_ctl: ManagerCtl,
     504              : 
     505              :     /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
     506              :     pub(crate) cancel: CancellationToken,
     507              : 
     508              :     // timeline_manager controlled state
     509              :     pub(crate) broker_active: AtomicBool,
     510              :     pub(crate) wal_backup_active: AtomicBool,
     511              :     pub(crate) last_removed_segno: AtomicU64,
     512              :     pub(crate) mgr_status: AtomicStatus,
     513              : }
     514              : 
     515              : impl Timeline {
     516              :     /// Load existing timeline from disk.
     517            0 :     pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Timeline> {
     518            0 :         let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
     519              : 
     520            0 :         let shared_state = SharedState::restore(conf, &ttid)?;
     521            0 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
     522            0 :             watch::channel(shared_state.sk.state().commit_lsn);
     523            0 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
     524            0 :             shared_state.sk.last_log_term(),
     525            0 :             shared_state.sk.flush_lsn(),
     526            0 :         )));
     527            0 :         let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
     528            0 : 
     529            0 :         let walreceivers = WalReceivers::new();
     530            0 :         let remote_path = remote_timeline_path(&ttid)?;
     531            0 :         Ok(Timeline {
     532            0 :             ttid,
     533            0 :             remote_path,
     534            0 :             commit_lsn_watch_tx,
     535            0 :             commit_lsn_watch_rx,
     536            0 :             term_flush_lsn_watch_tx,
     537            0 :             term_flush_lsn_watch_rx,
     538            0 :             shared_state_version_tx,
     539            0 :             shared_state_version_rx,
     540            0 :             mutex: RwLock::new(shared_state),
     541            0 :             walsenders: WalSenders::new(walreceivers.clone()),
     542            0 :             walreceivers,
     543            0 :             cancel: CancellationToken::default(),
     544            0 :             timeline_dir: get_timeline_dir(conf, &ttid),
     545            0 :             manager_ctl: ManagerCtl::new(),
     546            0 :             broker_active: AtomicBool::new(false),
     547            0 :             wal_backup_active: AtomicBool::new(false),
     548            0 :             last_removed_segno: AtomicU64::new(0),
     549            0 :             mgr_status: AtomicStatus::new(),
     550            0 :         })
     551            0 :     }
     552              : 
     553              :     /// Create a new timeline, which is not yet persisted to disk.
     554            0 :     pub fn create_empty(
     555            0 :         conf: &SafeKeeperConf,
     556            0 :         ttid: TenantTimelineId,
     557            0 :         server_info: ServerInfo,
     558            0 :         commit_lsn: Lsn,
     559            0 :         local_start_lsn: Lsn,
     560            0 :     ) -> Result<Timeline> {
     561            0 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
     562            0 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
     563            0 :             watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
     564            0 :         let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
     565            0 : 
     566            0 :         let state =
     567            0 :             TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
     568            0 : 
     569            0 :         let walreceivers = WalReceivers::new();
     570            0 :         let remote_path = remote_timeline_path(&ttid)?;
     571              :         Ok(Timeline {
     572            0 :             ttid,
     573            0 :             remote_path,
     574            0 :             commit_lsn_watch_tx,
     575            0 :             commit_lsn_watch_rx,
     576            0 :             term_flush_lsn_watch_tx,
     577            0 :             term_flush_lsn_watch_rx,
     578            0 :             shared_state_version_tx,
     579            0 :             shared_state_version_rx,
     580            0 :             mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?),
     581            0 :             walsenders: WalSenders::new(walreceivers.clone()),
     582            0 :             walreceivers,
     583            0 :             cancel: CancellationToken::default(),
     584            0 :             timeline_dir: get_timeline_dir(conf, &ttid),
     585            0 :             manager_ctl: ManagerCtl::new(),
     586            0 :             broker_active: AtomicBool::new(false),
     587            0 :             wal_backup_active: AtomicBool::new(false),
     588            0 :             last_removed_segno: AtomicU64::new(0),
     589            0 :             mgr_status: AtomicStatus::new(),
     590              :         })
     591            0 :     }
     592              : 
     593              :     /// Initialize fresh timeline on disk and start background tasks. If init
     594              :     /// fails, timeline is cancelled and cannot be used anymore.
     595              :     ///
     596              :     /// Init is transactional, so if it fails, created files will be deleted,
     597              :     /// and state on disk should remain unchanged.
     598            0 :     pub async fn init_new(
     599            0 :         self: &Arc<Timeline>,
     600            0 :         shared_state: &mut WriteGuardSharedState<'_>,
     601            0 :         conf: &SafeKeeperConf,
     602            0 :         broker_active_set: Arc<TimelinesSet>,
     603            0 :         partial_backup_rate_limiter: RateLimiter,
     604            0 :     ) -> Result<()> {
     605            0 :         match fs::metadata(&self.timeline_dir).await {
     606              :             Ok(_) => {
     607              :                 // Timeline directory exists on disk, we should leave state unchanged
     608              :                 // and return error.
     609            0 :                 bail!(TimelineError::Invalid(self.ttid));
     610              :             }
     611            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     612            0 :             Err(e) => {
     613            0 :                 return Err(e.into());
     614              :             }
     615              :         }
     616              : 
     617              :         // Create timeline directory.
     618            0 :         fs::create_dir_all(&self.timeline_dir).await?;
     619              : 
     620              :         // Write timeline to disk and start background tasks.
     621            0 :         if let Err(e) = shared_state.sk.state_mut().flush().await {
     622              :             // Bootstrap failed, cancel timeline and remove timeline directory.
     623            0 :             self.cancel(shared_state);
     624              : 
     625            0 :             if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
     626            0 :                 warn!(
     627            0 :                     "failed to remove timeline {} directory after bootstrap failure: {}",
     628            0 :                     self.ttid, fs_err
     629              :                 );
     630            0 :             }
     631              : 
     632            0 :             return Err(e);
     633            0 :         }
     634            0 :         self.bootstrap(
     635            0 :             shared_state,
     636            0 :             conf,
     637            0 :             broker_active_set,
     638            0 :             partial_backup_rate_limiter,
     639            0 :         );
     640            0 :         Ok(())
     641            0 :     }
     642              : 
     643              :     /// Bootstrap new or existing timeline starting background tasks.
     644            0 :     pub fn bootstrap(
     645            0 :         self: &Arc<Timeline>,
     646            0 :         _shared_state: &mut WriteGuardSharedState<'_>,
     647            0 :         conf: &SafeKeeperConf,
     648            0 :         broker_active_set: Arc<TimelinesSet>,
     649            0 :         partial_backup_rate_limiter: RateLimiter,
     650            0 :     ) {
     651            0 :         let (tx, rx) = self.manager_ctl.bootstrap_manager();
     652            0 : 
     653            0 :         // Start manager task which will monitor timeline state and update
     654            0 :         // background tasks.
     655            0 :         tokio::spawn(timeline_manager::main_task(
     656            0 :             ManagerTimeline { tli: self.clone() },
     657            0 :             conf.clone(),
     658            0 :             broker_active_set,
     659            0 :             tx,
     660            0 :             rx,
     661            0 :             partial_backup_rate_limiter,
     662            0 :         ));
     663            0 :     }
     664              : 
     665              :     /// Delete timeline from disk completely, by removing timeline directory.
     666              :     /// Background timeline activities will stop eventually.
     667              :     ///
     668              :     /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
     669              :     /// deletion API endpoint is retriable.
     670            0 :     pub async fn delete(
     671            0 :         &self,
     672            0 :         shared_state: &mut WriteGuardSharedState<'_>,
     673            0 :         only_local: bool,
     674            0 :     ) -> Result<bool> {
     675            0 :         self.cancel(shared_state);
     676            0 : 
     677            0 :         // TODO: It's better to wait for s3 offloader termination before
     678            0 :         // removing data from s3. Though since s3 doesn't have transactions it
     679            0 :         // still wouldn't guarantee absense of data after removal.
     680            0 :         let conf = GlobalTimelines::get_global_config();
     681            0 :         if !only_local && conf.is_wal_backup_enabled() {
     682              :             // Note: we concurrently delete remote storage data from multiple
     683              :             // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
     684              :             // do some retries anyway.
     685            0 :             wal_backup::delete_timeline(&self.ttid).await?;
     686            0 :         }
     687            0 :         let dir_existed = delete_dir(&self.timeline_dir).await?;
     688            0 :         Ok(dir_existed)
     689            0 :     }
     690              : 
     691              :     /// Cancel timeline to prevent further usage. Background tasks will stop
     692              :     /// eventually after receiving cancellation signal.
     693            0 :     fn cancel(&self, shared_state: &mut WriteGuardSharedState<'_>) {
     694            0 :         info!("timeline {} is cancelled", self.ttid);
     695            0 :         self.cancel.cancel();
     696            0 :         // Close associated FDs. Nobody will be able to touch timeline data once
     697            0 :         // it is cancelled, so WAL storage won't be opened again.
     698            0 :         shared_state.sk.close_wal_store();
     699            0 :     }
     700              : 
     701              :     /// Returns if timeline is cancelled.
     702            0 :     pub fn is_cancelled(&self) -> bool {
     703            0 :         self.cancel.is_cancelled()
     704            0 :     }
     705              : 
     706              :     /// Take a writing mutual exclusive lock on timeline shared_state.
     707            0 :     pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
     708            0 :         WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
     709            0 :     }
     710              : 
     711            0 :     pub async fn read_shared_state(&self) -> ReadGuardSharedState {
     712            0 :         self.mutex.read().await
     713            0 :     }
     714              : 
     715              :     /// Returns commit_lsn watch channel.
     716            0 :     pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
     717            0 :         self.commit_lsn_watch_rx.clone()
     718            0 :     }
     719              : 
     720              :     /// Returns term_flush_lsn watch channel.
     721            0 :     pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
     722            0 :         self.term_flush_lsn_watch_rx.clone()
     723            0 :     }
     724              : 
     725              :     /// Returns watch channel for SharedState update version.
     726            0 :     pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
     727            0 :         self.shared_state_version_rx.clone()
     728            0 :     }
     729              : 
     730              :     /// Returns wal_seg_size.
     731            0 :     pub async fn get_wal_seg_size(&self) -> usize {
     732            0 :         self.read_shared_state().await.get_wal_seg_size()
     733            0 :     }
     734              : 
     735              :     /// Returns state of the timeline.
     736            0 :     pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
     737            0 :         let state = self.read_shared_state().await;
     738            0 :         (
     739            0 :             state.sk.state().inmem.clone(),
     740            0 :             TimelinePersistentState::clone(state.sk.state()),
     741            0 :         )
     742            0 :     }
     743              : 
     744              :     /// Returns latest backup_lsn.
     745            0 :     pub async fn get_wal_backup_lsn(&self) -> Lsn {
     746            0 :         self.read_shared_state().await.sk.state().inmem.backup_lsn
     747            0 :     }
     748              : 
     749              :     /// Sets backup_lsn to the given value.
     750            0 :     pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
     751            0 :         if self.is_cancelled() {
     752            0 :             bail!(TimelineError::Cancelled(self.ttid));
     753            0 :         }
     754              : 
     755            0 :         let mut state = self.write_shared_state().await;
     756            0 :         state.sk.state_mut().inmem.backup_lsn = max(state.sk.state().inmem.backup_lsn, backup_lsn);
     757            0 :         // we should check whether to shut down offloader, but this will be done
     758            0 :         // soon by peer communication anyway.
     759            0 :         Ok(())
     760            0 :     }
     761              : 
     762              :     /// Get safekeeper info for broadcasting to broker and other peers.
     763            0 :     pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
     764            0 :         let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
     765            0 :         let shared_state = self.read_shared_state().await;
     766            0 :         shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
     767            0 :     }
     768              : 
     769              :     /// Update timeline state with peer safekeeper data.
     770            0 :     pub async fn record_safekeeper_info(
     771            0 :         self: &Arc<Self>,
     772            0 :         sk_info: SafekeeperTimelineInfo,
     773            0 :     ) -> Result<()> {
     774              :         {
     775            0 :             let mut shared_state = self.write_shared_state().await;
     776            0 :             shared_state.sk.record_safekeeper_info(&sk_info).await?;
     777            0 :             let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
     778            0 :             shared_state.peers_info.upsert(&peer_info);
     779            0 :         }
     780            0 :         Ok(())
     781            0 :     }
     782              : 
     783            0 :     pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
     784            0 :         let shared_state = self.read_shared_state().await;
     785            0 :         shared_state.get_peers(conf.heartbeat_timeout)
     786            0 :     }
     787              : 
     788            0 :     pub fn get_walsenders(&self) -> &Arc<WalSenders> {
     789            0 :         &self.walsenders
     790            0 :     }
     791              : 
     792            0 :     pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
     793            0 :         &self.walreceivers
     794            0 :     }
     795              : 
     796              :     /// Returns flush_lsn.
     797            0 :     pub async fn get_flush_lsn(&self) -> Lsn {
     798            0 :         self.read_shared_state().await.sk.flush_lsn()
     799            0 :     }
     800              : 
     801              :     /// Gather timeline data for metrics.
     802            0 :     pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
     803            0 :         if self.is_cancelled() {
     804            0 :             return None;
     805            0 :         }
     806            0 : 
     807            0 :         let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
     808            0 :         let state = self.read_shared_state().await;
     809            0 :         Some(FullTimelineInfo {
     810            0 :             ttid: self.ttid,
     811            0 :             ps_feedback_count,
     812            0 :             last_ps_feedback,
     813            0 :             wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
     814            0 :             timeline_is_active: self.broker_active.load(Ordering::Relaxed),
     815            0 :             num_computes: self.walreceivers.get_num() as u32,
     816            0 :             last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
     817            0 :             epoch_start_lsn: state.sk.term_start_lsn(),
     818            0 :             mem_state: state.sk.state().inmem.clone(),
     819            0 :             persisted_state: TimelinePersistentState::clone(state.sk.state()),
     820            0 :             flush_lsn: state.sk.flush_lsn(),
     821            0 :             wal_storage: state.sk.wal_storage_metrics(),
     822            0 :         })
     823            0 :     }
     824              : 
     825              :     /// Returns in-memory timeline state to build a full debug dump.
     826            0 :     pub async fn memory_dump(&self) -> debug_dump::Memory {
     827            0 :         let state = self.read_shared_state().await;
     828              : 
     829            0 :         let (write_lsn, write_record_lsn, flush_lsn, file_open) =
     830            0 :             state.sk.wal_storage_internal_state();
     831            0 : 
     832            0 :         debug_dump::Memory {
     833            0 :             is_cancelled: self.is_cancelled(),
     834            0 :             peers_info_len: state.peers_info.0.len(),
     835            0 :             walsenders: self.walsenders.get_all(),
     836            0 :             wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
     837            0 :             active: self.broker_active.load(Ordering::Relaxed),
     838            0 :             num_computes: self.walreceivers.get_num() as u32,
     839            0 :             last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
     840            0 :             epoch_start_lsn: state.sk.term_start_lsn(),
     841            0 :             mem_state: state.sk.state().inmem.clone(),
     842            0 :             mgr_status: self.mgr_status.get(),
     843            0 :             write_lsn,
     844            0 :             write_record_lsn,
     845            0 :             flush_lsn,
     846            0 :             file_open,
     847            0 :         }
     848            0 :     }
     849              : 
     850              :     /// Apply a function to the control file state and persist it.
     851            0 :     pub async fn map_control_file<T>(
     852            0 :         self: &Arc<Self>,
     853            0 :         f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
     854            0 :     ) -> Result<T> {
     855            0 :         let mut state = self.write_shared_state().await;
     856            0 :         let mut persistent_state = state.sk.state_mut().start_change();
     857              :         // If f returns error, we abort the change and don't persist anything.
     858            0 :         let res = f(&mut persistent_state)?;
     859              :         // If persisting fails, we abort the change and return error.
     860            0 :         state
     861            0 :             .sk
     862            0 :             .state_mut()
     863            0 :             .finish_change(&persistent_state)
     864            0 :             .await?;
     865            0 :         Ok(res)
     866            0 :     }
     867              : 
     868            0 :     pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
     869            0 :         let mut state = self.write_shared_state().await;
     870            0 :         state.sk.term_bump(to).await
     871            0 :     }
     872              : 
     873              :     /// Get the timeline guard for reading/writing WAL files.
     874              :     /// If WAL files are not present on disk (evicted), they will be automatically
     875              :     /// downloaded from remote storage. This is done in the manager task, which is
     876              :     /// responsible for issuing all guards.
     877              :     ///
     878              :     /// NB: don't use this function from timeline_manager, it will deadlock.
     879              :     /// NB: don't use this function while holding shared_state lock.
     880            0 :     pub async fn wal_residence_guard(self: &Arc<Self>) -> Result<WalResidentTimeline> {
     881            0 :         if self.is_cancelled() {
     882            0 :             bail!(TimelineError::Cancelled(self.ttid));
     883            0 :         }
     884            0 : 
     885            0 :         debug!("requesting WalResidentTimeline guard");
     886            0 :         let started_at = Instant::now();
     887            0 :         let status_before = self.mgr_status.get();
     888              : 
     889              :         // Wait 30 seconds for the guard to be acquired. It can time out if someone is
     890              :         // holding the lock (e.g. during `SafeKeeper::process_msg()`) or manager task
     891              :         // is stuck.
     892            0 :         let res = tokio::time::timeout_at(
     893            0 :             started_at + Duration::from_secs(30),
     894            0 :             self.manager_ctl.wal_residence_guard(),
     895            0 :         )
     896            0 :         .await;
     897              : 
     898            0 :         let guard = match res {
     899            0 :             Ok(Ok(guard)) => {
     900            0 :                 let finished_at = Instant::now();
     901            0 :                 let elapsed = finished_at - started_at;
     902            0 :                 MISC_OPERATION_SECONDS
     903            0 :                     .with_label_values(&["wal_residence_guard"])
     904            0 :                     .observe(elapsed.as_secs_f64());
     905            0 : 
     906            0 :                 guard
     907              :             }
     908            0 :             Ok(Err(e)) => {
     909            0 :                 warn!(
     910            0 :                     "error while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
     911            0 :                     status_before,
     912            0 :                     self.mgr_status.get()
     913              :                 );
     914            0 :                 return Err(e);
     915              :             }
     916              :             Err(_) => {
     917            0 :                 warn!(
     918            0 :                     "timeout while acquiring WalResidentTimeline guard, statuses {:?} => {:?}",
     919            0 :                     status_before,
     920            0 :                     self.mgr_status.get()
     921              :                 );
     922            0 :                 anyhow::bail!("timeout while acquiring WalResidentTimeline guard");
     923              :             }
     924              :         };
     925              : 
     926            0 :         Ok(WalResidentTimeline::new(self.clone(), guard))
     927            0 :     }
     928              : 
     929            0 :     pub async fn backup_partial_reset(self: &Arc<Self>) -> Result<Vec<String>> {
     930            0 :         self.manager_ctl.backup_partial_reset().await
     931            0 :     }
     932              : }
     933              : 
     934              : /// This is a guard that allows to read/write disk timeline state.
     935              : /// All tasks that are trying to read/write WAL from disk should use this guard.
     936              : pub struct WalResidentTimeline {
     937              :     pub tli: Arc<Timeline>,
     938              :     _guard: ResidenceGuard,
     939              : }
     940              : 
     941              : impl WalResidentTimeline {
     942            0 :     pub fn new(tli: Arc<Timeline>, _guard: ResidenceGuard) -> Self {
     943            0 :         WalResidentTimeline { tli, _guard }
     944            0 :     }
     945              : }
     946              : 
     947              : impl Deref for WalResidentTimeline {
     948              :     type Target = Arc<Timeline>;
     949              : 
     950            0 :     fn deref(&self) -> &Self::Target {
     951            0 :         &self.tli
     952            0 :     }
     953              : }
     954              : 
     955              : impl WalResidentTimeline {
     956              :     /// Returns true if walsender should stop sending WAL to pageserver. We
     957              :     /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
     958              :     /// computes. While there might be nothing to stream already, we learn about
     959              :     /// remote_consistent_lsn update through replication feedback, and we want
     960              :     /// to stop pushing to the broker if pageserver is fully caughtup.
     961            0 :     pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
     962            0 :         if self.is_cancelled() {
     963            0 :             return true;
     964            0 :         }
     965            0 :         let shared_state = self.read_shared_state().await;
     966            0 :         if self.walreceivers.get_num() == 0 {
     967            0 :             return shared_state.sk.state().inmem.commit_lsn == Lsn(0) || // no data at all yet
     968            0 :             reported_remote_consistent_lsn >= shared_state.sk.state().inmem.commit_lsn;
     969            0 :         }
     970            0 :         false
     971            0 :     }
     972              : 
     973              :     /// Ensure that current term is t, erroring otherwise, and lock the state.
     974            0 :     pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
     975            0 :         let ss = self.read_shared_state().await;
     976            0 :         if ss.sk.state().acceptor_state.term != t {
     977            0 :             bail!(
     978            0 :                 "failed to acquire term {}, current term {}",
     979            0 :                 t,
     980            0 :                 ss.sk.state().acceptor_state.term
     981            0 :             );
     982            0 :         }
     983            0 :         Ok(ss)
     984            0 :     }
     985              : 
     986              :     /// Pass arrived message to the safekeeper.
     987            0 :     pub async fn process_msg(
     988            0 :         &self,
     989            0 :         msg: &ProposerAcceptorMessage,
     990            0 :     ) -> Result<Option<AcceptorProposerMessage>> {
     991            0 :         if self.is_cancelled() {
     992            0 :             bail!(TimelineError::Cancelled(self.ttid));
     993            0 :         }
     994              : 
     995              :         let mut rmsg: Option<AcceptorProposerMessage>;
     996              :         {
     997            0 :             let mut shared_state = self.write_shared_state().await;
     998            0 :             rmsg = shared_state.sk.safekeeper().process_msg(msg).await?;
     999              : 
    1000              :             // if this is AppendResponse, fill in proper hot standby feedback.
    1001            0 :             if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
    1002            0 :                 resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
    1003            0 :             }
    1004              :         }
    1005            0 :         Ok(rmsg)
    1006            0 :     }
    1007              : 
    1008            0 :     pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
    1009            0 :         let (_, persisted_state) = self.get_state().await;
    1010            0 :         let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled();
    1011            0 : 
    1012            0 :         WalReader::new(
    1013            0 :             &self.ttid,
    1014            0 :             self.timeline_dir.clone(),
    1015            0 :             &persisted_state,
    1016            0 :             start_lsn,
    1017            0 :             enable_remote_read,
    1018            0 :         )
    1019            0 :     }
    1020              : 
    1021            0 :     pub fn get_timeline_dir(&self) -> Utf8PathBuf {
    1022            0 :         self.timeline_dir.clone()
    1023            0 :     }
    1024              : 
    1025              :     /// Update in memory remote consistent lsn.
    1026            0 :     pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
    1027            0 :         let mut shared_state = self.write_shared_state().await;
    1028            0 :         shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
    1029            0 :             shared_state.sk.state().inmem.remote_consistent_lsn,
    1030            0 :             candidate,
    1031            0 :         );
    1032            0 :     }
    1033              : }
    1034              : 
    1035              : /// This struct contains methods that are used by timeline manager task.
    1036              : pub(crate) struct ManagerTimeline {
    1037              :     pub(crate) tli: Arc<Timeline>,
    1038              : }
    1039              : 
    1040              : impl Deref for ManagerTimeline {
    1041              :     type Target = Arc<Timeline>;
    1042              : 
    1043            0 :     fn deref(&self) -> &Self::Target {
    1044            0 :         &self.tli
    1045            0 :     }
    1046              : }
    1047              : 
    1048              : impl ManagerTimeline {
    1049            0 :     pub(crate) fn timeline_dir(&self) -> &Utf8PathBuf {
    1050            0 :         &self.tli.timeline_dir
    1051            0 :     }
    1052              : 
    1053              :     /// Manager requests this state on startup.
    1054            0 :     pub(crate) async fn bootstrap_mgr(&self) -> (bool, Option<PartialRemoteSegment>) {
    1055            0 :         let shared_state = self.read_shared_state().await;
    1056            0 :         let is_offloaded = matches!(
    1057            0 :             shared_state.sk.state().eviction_state,
    1058              :             EvictionState::Offloaded(_)
    1059              :         );
    1060            0 :         let partial_backup_uploaded = shared_state.sk.state().partial_backup.uploaded_segment();
    1061            0 : 
    1062            0 :         (is_offloaded, partial_backup_uploaded)
    1063            0 :     }
    1064              : 
    1065              :     /// Try to switch state Present->Offloaded.
    1066            0 :     pub(crate) async fn switch_to_offloaded(
    1067            0 :         &self,
    1068            0 :         partial: &PartialRemoteSegment,
    1069            0 :     ) -> anyhow::Result<()> {
    1070            0 :         let mut shared = self.write_shared_state().await;
    1071              : 
    1072              :         // updating control file
    1073            0 :         let mut pstate = shared.sk.state_mut().start_change();
    1074              : 
    1075            0 :         if !matches!(pstate.eviction_state, EvictionState::Present) {
    1076            0 :             bail!(
    1077            0 :                 "cannot switch to offloaded state, current state is {:?}",
    1078            0 :                 pstate.eviction_state
    1079            0 :             );
    1080            0 :         }
    1081            0 : 
    1082            0 :         if partial.flush_lsn != shared.sk.flush_lsn() {
    1083            0 :             bail!(
    1084            0 :                 "flush_lsn mismatch in partial backup, expected {}, got {}",
    1085            0 :                 shared.sk.flush_lsn(),
    1086            0 :                 partial.flush_lsn
    1087            0 :             );
    1088            0 :         }
    1089            0 : 
    1090            0 :         if partial.commit_lsn != pstate.commit_lsn {
    1091            0 :             bail!(
    1092            0 :                 "commit_lsn mismatch in partial backup, expected {}, got {}",
    1093            0 :                 pstate.commit_lsn,
    1094            0 :                 partial.commit_lsn
    1095            0 :             );
    1096            0 :         }
    1097            0 : 
    1098            0 :         if partial.term != shared.sk.last_log_term() {
    1099            0 :             bail!(
    1100            0 :                 "term mismatch in partial backup, expected {}, got {}",
    1101            0 :                 shared.sk.last_log_term(),
    1102            0 :                 partial.term
    1103            0 :             );
    1104            0 :         }
    1105            0 : 
    1106            0 :         pstate.eviction_state = EvictionState::Offloaded(shared.sk.flush_lsn());
    1107            0 :         shared.sk.state_mut().finish_change(&pstate).await?;
    1108              :         // control file is now switched to Offloaded state
    1109              : 
    1110              :         // now we can switch shared.sk to Offloaded, shouldn't fail
    1111            0 :         let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
    1112            0 :         let cfile_state = prev_sk.take_state();
    1113            0 :         shared.sk = StateSK::Offloaded(Box::new(cfile_state));
    1114            0 : 
    1115            0 :         Ok(())
    1116            0 :     }
    1117              : 
    1118              :     /// Try to switch state Offloaded->Present.
    1119            0 :     pub(crate) async fn switch_to_present(&self) -> anyhow::Result<()> {
    1120            0 :         let conf = GlobalTimelines::get_global_config();
    1121            0 :         let mut shared = self.write_shared_state().await;
    1122              : 
    1123              :         // trying to restore WAL storage
    1124            0 :         let wal_store = wal_storage::PhysicalStorage::new(
    1125            0 :             &self.ttid,
    1126            0 :             self.timeline_dir.clone(),
    1127            0 :             &conf,
    1128            0 :             shared.sk.state(),
    1129            0 :         )?;
    1130              : 
    1131              :         // updating control file
    1132            0 :         let mut pstate = shared.sk.state_mut().start_change();
    1133              : 
    1134            0 :         if !matches!(pstate.eviction_state, EvictionState::Offloaded(_)) {
    1135            0 :             bail!(
    1136            0 :                 "cannot switch to present state, current state is {:?}",
    1137            0 :                 pstate.eviction_state
    1138            0 :             );
    1139            0 :         }
    1140            0 : 
    1141            0 :         if wal_store.flush_lsn() != shared.sk.flush_lsn() {
    1142            0 :             bail!(
    1143            0 :                 "flush_lsn mismatch in restored WAL, expected {}, got {}",
    1144            0 :                 shared.sk.flush_lsn(),
    1145            0 :                 wal_store.flush_lsn()
    1146            0 :             );
    1147            0 :         }
    1148            0 : 
    1149            0 :         pstate.eviction_state = EvictionState::Present;
    1150            0 :         shared.sk.state_mut().finish_change(&pstate).await?;
    1151              : 
    1152              :         // now we can switch shared.sk to Present, shouldn't fail
    1153            0 :         let prev_sk = std::mem::replace(&mut shared.sk, StateSK::Empty);
    1154            0 :         let cfile_state = prev_sk.take_state();
    1155            0 :         shared.sk = StateSK::Loaded(SafeKeeper::new(cfile_state, wal_store, conf.my_id)?);
    1156              : 
    1157            0 :         Ok(())
    1158            0 :     }
    1159              : 
    1160              :     /// Update current manager state, useful for debugging manager deadlocks.
    1161            0 :     pub(crate) fn set_status(&self, status: timeline_manager::Status) {
    1162            0 :         self.mgr_status.store(status, Ordering::Relaxed);
    1163            0 :     }
    1164              : }
    1165              : 
    1166              : /// Deletes directory and it's contents. Returns false if directory does not exist.
    1167            0 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
    1168            0 :     match fs::remove_dir_all(path).await {
    1169            0 :         Ok(_) => Ok(true),
    1170            0 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
    1171            0 :         Err(e) => Err(e.into()),
    1172              :     }
    1173            0 : }
    1174              : 
    1175              : /// Get a path to the tenant directory. If you just need to get a timeline directory,
    1176              : /// use WalResidentTimeline::get_timeline_dir instead.
    1177            7 : pub(crate) fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
    1178            7 :     conf.workdir.join(tenant_id.to_string())
    1179            7 : }
    1180              : 
    1181              : /// Get a path to the timeline directory. If you need to read WAL files from disk,
    1182              : /// use WalResidentTimeline::get_timeline_dir instead. This function does not check
    1183              : /// timeline eviction status and WAL files might not be present on disk.
    1184            7 : pub(crate) fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
    1185            7 :     get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
    1186            7 : }
        

Generated by: LCOV version 2.1-beta