LCOV - code coverage report
Current view: top level - safekeeper/src - timeline.rs (source / functions) Coverage Total Hit
Test: 050dd70dd490b28fffe527eae9fb8a1222b5c59c.info Lines: 1.3 % 477 6
Test Date: 2024-06-25 21:28:46 Functions: 2.3 % 87 2

            Line data    Source code
       1              : //! This module implements Timeline lifecycle management and has all necessary code
       2              : //! to glue together SafeKeeper and all other background services.
       3              : 
       4              : use anyhow::{anyhow, bail, Result};
       5              : use camino::Utf8PathBuf;
       6              : use serde::{Deserialize, Serialize};
       7              : use tokio::fs::{self};
       8              : use tokio_util::sync::CancellationToken;
       9              : use utils::id::TenantId;
      10              : 
      11              : use std::cmp::max;
      12              : use std::ops::{Deref, DerefMut};
      13              : use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
      14              : use std::sync::Arc;
      15              : use std::time::Duration;
      16              : use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
      17              : use tokio::{sync::watch, time::Instant};
      18              : use tracing::*;
      19              : use utils::http::error::ApiError;
      20              : use utils::{
      21              :     id::{NodeId, TenantTimelineId},
      22              :     lsn::Lsn,
      23              : };
      24              : 
      25              : use storage_broker::proto::SafekeeperTimelineInfo;
      26              : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      27              : 
      28              : use crate::receive_wal::WalReceivers;
      29              : use crate::safekeeper::{
      30              :     AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
      31              :     INVALID_TERM,
      32              : };
      33              : use crate::send_wal::WalSenders;
      34              : use crate::state::{TimelineMemState, TimelinePersistentState};
      35              : use crate::timelines_set::TimelinesSet;
      36              : use crate::wal_backup::{self};
      37              : use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
      38              : 
      39              : use crate::metrics::FullTimelineInfo;
      40              : use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
      41              : use crate::{debug_dump, timeline_manager, wal_storage};
      42              : use crate::{GlobalTimelines, SafeKeeperConf};
      43              : 
      44              : /// Things safekeeper should know about timeline state on peers.
      45            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
      46              : pub struct PeerInfo {
      47              :     pub sk_id: NodeId,
      48              :     pub term: Term,
      49              :     /// Term of the last entry.
      50              :     pub last_log_term: Term,
      51              :     /// LSN of the last record.
      52              :     pub flush_lsn: Lsn,
      53              :     pub commit_lsn: Lsn,
      54              :     /// Since which LSN safekeeper has WAL.
      55              :     pub local_start_lsn: Lsn,
      56              :     /// When info was received. Serde annotations are not very useful but make
      57              :     /// the code compile -- we don't rely on this field externally.
      58              :     #[serde(skip)]
      59              :     #[serde(default = "Instant::now")]
      60              :     ts: Instant,
      61              :     pub pg_connstr: String,
      62              :     pub http_connstr: String,
      63              : }
      64              : 
      65              : impl PeerInfo {
      66            0 :     fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
      67            0 :         PeerInfo {
      68            0 :             sk_id: NodeId(sk_info.safekeeper_id),
      69            0 :             term: sk_info.term,
      70            0 :             last_log_term: sk_info.last_log_term,
      71            0 :             flush_lsn: Lsn(sk_info.flush_lsn),
      72            0 :             commit_lsn: Lsn(sk_info.commit_lsn),
      73            0 :             local_start_lsn: Lsn(sk_info.local_start_lsn),
      74            0 :             pg_connstr: sk_info.safekeeper_connstr.clone(),
      75            0 :             http_connstr: sk_info.http_connstr.clone(),
      76            0 :             ts,
      77            0 :         }
      78            0 :     }
      79              : }
      80              : 
      81              : // vector-based node id -> peer state map with very limited functionality we
      82              : // need.
      83              : #[derive(Debug, Clone, Default)]
      84              : pub struct PeersInfo(pub Vec<PeerInfo>);
      85              : 
      86              : impl PeersInfo {
      87            0 :     fn get(&mut self, id: NodeId) -> Option<&mut PeerInfo> {
      88            0 :         self.0.iter_mut().find(|p| p.sk_id == id)
      89            0 :     }
      90              : 
      91            0 :     fn upsert(&mut self, p: &PeerInfo) {
      92            0 :         match self.get(p.sk_id) {
      93            0 :             Some(rp) => *rp = p.clone(),
      94            0 :             None => self.0.push(p.clone()),
      95              :         }
      96            0 :     }
      97              : }
      98              : 
      99              : pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
     100              : 
     101              : /// WriteGuardSharedState is a wrapper around `RwLockWriteGuard<SharedState>` that
     102              : /// automatically updates `watch::Sender` channels with state on drop.
     103              : pub struct WriteGuardSharedState<'a> {
     104              :     tli: Arc<Timeline>,
     105              :     guard: RwLockWriteGuard<'a, SharedState>,
     106              :     skip_update: bool,
     107              : }
     108              : 
     109              : impl<'a> WriteGuardSharedState<'a> {
     110            0 :     fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
     111            0 :         WriteGuardSharedState {
     112            0 :             tli,
     113            0 :             guard,
     114            0 :             skip_update: false,
     115            0 :         }
     116            0 :     }
     117              : }
     118              : 
     119              : impl<'a> Deref for WriteGuardSharedState<'a> {
     120              :     type Target = SharedState;
     121              : 
     122            0 :     fn deref(&self) -> &Self::Target {
     123            0 :         &self.guard
     124            0 :     }
     125              : }
     126              : 
     127              : impl<'a> DerefMut for WriteGuardSharedState<'a> {
     128            0 :     fn deref_mut(&mut self) -> &mut Self::Target {
     129            0 :         &mut self.guard
     130            0 :     }
     131              : }
     132              : 
     133              : impl<'a> Drop for WriteGuardSharedState<'a> {
     134            0 :     fn drop(&mut self) {
     135            0 :         let term_flush_lsn = TermLsn::from((self.guard.sk.get_term(), self.guard.sk.flush_lsn()));
     136            0 :         let commit_lsn = self.guard.sk.state.inmem.commit_lsn;
     137            0 : 
     138            0 :         let _ = self.tli.term_flush_lsn_watch_tx.send_if_modified(|old| {
     139            0 :             if *old != term_flush_lsn {
     140            0 :                 *old = term_flush_lsn;
     141            0 :                 true
     142              :             } else {
     143            0 :                 false
     144              :             }
     145            0 :         });
     146            0 : 
     147            0 :         let _ = self.tli.commit_lsn_watch_tx.send_if_modified(|old| {
     148            0 :             if *old != commit_lsn {
     149            0 :                 *old = commit_lsn;
     150            0 :                 true
     151              :             } else {
     152            0 :                 false
     153              :             }
     154            0 :         });
     155            0 : 
     156            0 :         if !self.skip_update {
     157            0 :             // send notification about shared state update
     158            0 :             self.tli.shared_state_version_tx.send_modify(|old| {
     159            0 :                 *old += 1;
     160            0 :             });
     161            0 :         }
     162            0 :     }
     163              : }
     164              : 
     165              : /// Shared state associated with database instance
     166              : pub struct SharedState {
     167              :     /// Safekeeper object
     168              :     pub(crate) sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
     169              :     /// In memory list containing state of peers sent in latest messages from them.
     170              :     pub(crate) peers_info: PeersInfo,
     171              :     // True value hinders old WAL removal; this is used by snapshotting. We
     172              :     // could make it a counter, but there is no need to.
     173              :     pub(crate) wal_removal_on_hold: bool,
     174              : }
     175              : 
     176              : impl SharedState {
     177              :     /// Initialize fresh timeline state without persisting anything to disk.
     178            0 :     fn create_new(
     179            0 :         conf: &SafeKeeperConf,
     180            0 :         ttid: &TenantTimelineId,
     181            0 :         state: TimelinePersistentState,
     182            0 :     ) -> Result<Self> {
     183            0 :         if state.server.wal_seg_size == 0 {
     184            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     185            0 :         }
     186            0 : 
     187            0 :         if state.server.pg_version == UNKNOWN_SERVER_VERSION {
     188            0 :             bail!(TimelineError::UninitialinzedPgVersion(*ttid));
     189            0 :         }
     190            0 : 
     191            0 :         if state.commit_lsn < state.local_start_lsn {
     192            0 :             bail!(
     193            0 :                 "commit_lsn {} is higher than local_start_lsn {}",
     194            0 :                 state.commit_lsn,
     195            0 :                 state.local_start_lsn
     196            0 :             );
     197            0 :         }
     198            0 : 
     199            0 :         // We don't want to write anything to disk, because we may have existing timeline there.
     200            0 :         // These functions should not change anything on disk.
     201            0 :         let timeline_dir = get_timeline_dir(conf, ttid);
     202            0 :         let control_store =
     203            0 :             control_file::FileStorage::create_new(timeline_dir.clone(), conf, state)?;
     204            0 :         let wal_store =
     205            0 :             wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
     206            0 :         let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
     207              : 
     208            0 :         Ok(Self {
     209            0 :             sk,
     210            0 :             peers_info: PeersInfo(vec![]),
     211            0 :             wal_removal_on_hold: false,
     212            0 :         })
     213            0 :     }
     214              : 
     215              :     /// Restore SharedState from control file. If file doesn't exist, bails out.
     216            0 :     fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
     217            0 :         let timeline_dir = get_timeline_dir(conf, ttid);
     218            0 :         let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
     219            0 :         if control_store.server.wal_seg_size == 0 {
     220            0 :             bail!(TimelineError::UninitializedWalSegSize(*ttid));
     221            0 :         }
     222              : 
     223            0 :         let wal_store =
     224            0 :             wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
     225              : 
     226              :         Ok(Self {
     227            0 :             sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
     228            0 :             peers_info: PeersInfo(vec![]),
     229              :             wal_removal_on_hold: false,
     230              :         })
     231            0 :     }
     232              : 
     233            0 :     pub(crate) fn get_wal_seg_size(&self) -> usize {
     234            0 :         self.sk.state.server.wal_seg_size as usize
     235            0 :     }
     236              : 
     237            0 :     fn get_safekeeper_info(
     238            0 :         &self,
     239            0 :         ttid: &TenantTimelineId,
     240            0 :         conf: &SafeKeeperConf,
     241            0 :         standby_apply_lsn: Lsn,
     242            0 :     ) -> SafekeeperTimelineInfo {
     243            0 :         SafekeeperTimelineInfo {
     244            0 :             safekeeper_id: conf.my_id.0,
     245            0 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     246            0 :                 tenant_id: ttid.tenant_id.as_ref().to_owned(),
     247            0 :                 timeline_id: ttid.timeline_id.as_ref().to_owned(),
     248            0 :             }),
     249            0 :             term: self.sk.state.acceptor_state.term,
     250            0 :             last_log_term: self.sk.get_last_log_term(),
     251            0 :             flush_lsn: self.sk.flush_lsn().0,
     252            0 :             // note: this value is not flushed to control file yet and can be lost
     253            0 :             commit_lsn: self.sk.state.inmem.commit_lsn.0,
     254            0 :             remote_consistent_lsn: self.sk.state.inmem.remote_consistent_lsn.0,
     255            0 :             peer_horizon_lsn: self.sk.state.inmem.peer_horizon_lsn.0,
     256            0 :             safekeeper_connstr: conf
     257            0 :                 .advertise_pg_addr
     258            0 :                 .to_owned()
     259            0 :                 .unwrap_or(conf.listen_pg_addr.clone()),
     260            0 :             http_connstr: conf.listen_http_addr.to_owned(),
     261            0 :             backup_lsn: self.sk.state.inmem.backup_lsn.0,
     262            0 :             local_start_lsn: self.sk.state.local_start_lsn.0,
     263            0 :             availability_zone: conf.availability_zone.clone(),
     264            0 :             standby_horizon: standby_apply_lsn.0,
     265            0 :         }
     266            0 :     }
     267              : 
     268              :     /// Get our latest view of alive peers status on the timeline.
     269              :     /// We pass our own info through the broker as well, so when we don't have connection
     270              :     /// to the broker returned vec is empty.
     271            0 :     pub(crate) fn get_peers(&self, heartbeat_timeout: Duration) -> Vec<PeerInfo> {
     272            0 :         let now = Instant::now();
     273            0 :         self.peers_info
     274            0 :             .0
     275            0 :             .iter()
     276            0 :             // Regard peer as absent if we haven't heard from it within heartbeat_timeout.
     277            0 :             .filter(|p| now.duration_since(p.ts) <= heartbeat_timeout)
     278            0 :             .cloned()
     279            0 :             .collect()
     280            0 :     }
     281              : }
     282              : 
     283            0 : #[derive(Debug, thiserror::Error)]
     284              : pub enum TimelineError {
     285              :     #[error("Timeline {0} was cancelled and cannot be used anymore")]
     286              :     Cancelled(TenantTimelineId),
     287              :     #[error("Timeline {0} was not found in global map")]
     288              :     NotFound(TenantTimelineId),
     289              :     #[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
     290              :     Invalid(TenantTimelineId),
     291              :     #[error("Timeline {0} is already exists")]
     292              :     AlreadyExists(TenantTimelineId),
     293              :     #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
     294              :     UninitializedWalSegSize(TenantTimelineId),
     295              :     #[error("Timeline {0} is not initialized, pg_version is unknown")]
     296              :     UninitialinzedPgVersion(TenantTimelineId),
     297              : }
     298              : 
     299              : // Convert to HTTP API error.
     300              : impl From<TimelineError> for ApiError {
     301            0 :     fn from(te: TimelineError) -> ApiError {
     302            0 :         match te {
     303            0 :             TimelineError::NotFound(ttid) => {
     304            0 :                 ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
     305              :             }
     306            0 :             _ => ApiError::InternalServerError(anyhow!("{}", te)),
     307              :         }
     308            0 :     }
     309              : }
     310              : 
     311              : /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
     312              : /// It also holds SharedState and provides mutually exclusive access to it.
     313              : pub struct Timeline {
     314              :     pub ttid: TenantTimelineId,
     315              : 
     316              :     /// Used to broadcast commit_lsn updates to all background jobs.
     317              :     commit_lsn_watch_tx: watch::Sender<Lsn>,
     318              :     commit_lsn_watch_rx: watch::Receiver<Lsn>,
     319              : 
     320              :     /// Broadcasts (current term, flush_lsn) updates, walsender is interested in
     321              :     /// them when sending in recovery mode (to walproposer or peers). Note: this
     322              :     /// is just a notification, WAL reading should always done with lock held as
     323              :     /// term can change otherwise.
     324              :     term_flush_lsn_watch_tx: watch::Sender<TermLsn>,
     325              :     term_flush_lsn_watch_rx: watch::Receiver<TermLsn>,
     326              : 
     327              :     /// Broadcasts shared state updates.
     328              :     shared_state_version_tx: watch::Sender<usize>,
     329              :     shared_state_version_rx: watch::Receiver<usize>,
     330              : 
     331              :     /// Safekeeper and other state, that should remain consistent and
     332              :     /// synchronized with the disk. This is tokio mutex as we write WAL to disk
     333              :     /// while holding it, ensuring that consensus checks are in order.
     334              :     mutex: RwLock<SharedState>,
     335              :     walsenders: Arc<WalSenders>,
     336              :     walreceivers: Arc<WalReceivers>,
     337              :     timeline_dir: Utf8PathBuf,
     338              : 
     339              :     /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
     340              :     pub(crate) cancel: CancellationToken,
     341              : 
     342              :     // timeline_manager controlled state
     343              :     pub(crate) broker_active: AtomicBool,
     344              :     pub(crate) wal_backup_active: AtomicBool,
     345              :     pub(crate) last_removed_segno: AtomicU64,
     346              : }
     347              : 
     348              : impl Timeline {
     349              :     /// Load existing timeline from disk.
     350            0 :     pub fn load_timeline(conf: &SafeKeeperConf, ttid: TenantTimelineId) -> Result<Timeline> {
     351            0 :         let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
     352              : 
     353            0 :         let shared_state = SharedState::restore(conf, &ttid)?;
     354            0 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
     355            0 :             watch::channel(shared_state.sk.state.commit_lsn);
     356            0 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from((
     357            0 :             shared_state.sk.get_term(),
     358            0 :             shared_state.sk.flush_lsn(),
     359            0 :         )));
     360            0 :         let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
     361            0 : 
     362            0 :         let walreceivers = WalReceivers::new();
     363            0 :         Ok(Timeline {
     364            0 :             ttid,
     365            0 :             commit_lsn_watch_tx,
     366            0 :             commit_lsn_watch_rx,
     367            0 :             term_flush_lsn_watch_tx,
     368            0 :             term_flush_lsn_watch_rx,
     369            0 :             shared_state_version_tx,
     370            0 :             shared_state_version_rx,
     371            0 :             mutex: RwLock::new(shared_state),
     372            0 :             walsenders: WalSenders::new(walreceivers.clone()),
     373            0 :             walreceivers,
     374            0 :             cancel: CancellationToken::default(),
     375            0 :             timeline_dir: get_timeline_dir(conf, &ttid),
     376            0 :             broker_active: AtomicBool::new(false),
     377            0 :             wal_backup_active: AtomicBool::new(false),
     378            0 :             last_removed_segno: AtomicU64::new(0),
     379            0 :         })
     380            0 :     }
     381              : 
     382              :     /// Create a new timeline, which is not yet persisted to disk.
     383            0 :     pub fn create_empty(
     384            0 :         conf: &SafeKeeperConf,
     385            0 :         ttid: TenantTimelineId,
     386            0 :         server_info: ServerInfo,
     387            0 :         commit_lsn: Lsn,
     388            0 :         local_start_lsn: Lsn,
     389            0 :     ) -> Result<Timeline> {
     390            0 :         let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
     391            0 :         let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
     392            0 :             watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
     393            0 :         let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
     394            0 : 
     395            0 :         let state =
     396            0 :             TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
     397            0 : 
     398            0 :         let walreceivers = WalReceivers::new();
     399            0 :         Ok(Timeline {
     400            0 :             ttid,
     401            0 :             commit_lsn_watch_tx,
     402            0 :             commit_lsn_watch_rx,
     403            0 :             term_flush_lsn_watch_tx,
     404            0 :             term_flush_lsn_watch_rx,
     405            0 :             shared_state_version_tx,
     406            0 :             shared_state_version_rx,
     407            0 :             mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?),
     408            0 :             walsenders: WalSenders::new(walreceivers.clone()),
     409            0 :             walreceivers,
     410            0 :             cancel: CancellationToken::default(),
     411            0 :             timeline_dir: get_timeline_dir(conf, &ttid),
     412            0 :             broker_active: AtomicBool::new(false),
     413            0 :             wal_backup_active: AtomicBool::new(false),
     414            0 :             last_removed_segno: AtomicU64::new(0),
     415              :         })
     416            0 :     }
     417              : 
     418              :     /// Initialize fresh timeline on disk and start background tasks. If init
     419              :     /// fails, timeline is cancelled and cannot be used anymore.
     420              :     ///
     421              :     /// Init is transactional, so if it fails, created files will be deleted,
     422              :     /// and state on disk should remain unchanged.
     423            0 :     pub async fn init_new(
     424            0 :         self: &Arc<Timeline>,
     425            0 :         shared_state: &mut WriteGuardSharedState<'_>,
     426            0 :         conf: &SafeKeeperConf,
     427            0 :         broker_active_set: Arc<TimelinesSet>,
     428            0 :     ) -> Result<()> {
     429            0 :         match fs::metadata(&self.timeline_dir).await {
     430              :             Ok(_) => {
     431              :                 // Timeline directory exists on disk, we should leave state unchanged
     432              :                 // and return error.
     433            0 :                 bail!(TimelineError::Invalid(self.ttid));
     434              :             }
     435            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     436            0 :             Err(e) => {
     437            0 :                 return Err(e.into());
     438              :             }
     439              :         }
     440              : 
     441              :         // Create timeline directory.
     442            0 :         fs::create_dir_all(&self.timeline_dir).await?;
     443              : 
     444              :         // Write timeline to disk and start background tasks.
     445            0 :         if let Err(e) = shared_state.sk.state.flush().await {
     446              :             // Bootstrap failed, cancel timeline and remove timeline directory.
     447            0 :             self.cancel(shared_state);
     448              : 
     449            0 :             if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await {
     450            0 :                 warn!(
     451            0 :                     "failed to remove timeline {} directory after bootstrap failure: {}",
     452            0 :                     self.ttid, fs_err
     453              :                 );
     454            0 :             }
     455              : 
     456            0 :             return Err(e);
     457            0 :         }
     458            0 :         self.bootstrap(conf, broker_active_set);
     459            0 :         Ok(())
     460            0 :     }
     461              : 
     462              :     /// Bootstrap new or existing timeline starting background tasks.
     463            0 :     pub fn bootstrap(
     464            0 :         self: &Arc<Timeline>,
     465            0 :         conf: &SafeKeeperConf,
     466            0 :         broker_active_set: Arc<TimelinesSet>,
     467            0 :     ) {
     468            0 :         // Start manager task which will monitor timeline state and update
     469            0 :         // background tasks.
     470            0 :         tokio::spawn(timeline_manager::main_task(
     471            0 :             self.clone(),
     472            0 :             conf.clone(),
     473            0 :             broker_active_set,
     474            0 :         ));
     475            0 :     }
     476              : 
     477              :     /// Delete timeline from disk completely, by removing timeline directory.
     478              :     /// Background timeline activities will stop eventually.
     479              :     ///
     480              :     /// Also deletes WAL in s3. Might fail if e.g. s3 is unavailable, but
     481              :     /// deletion API endpoint is retriable.
     482            0 :     pub async fn delete(
     483            0 :         &self,
     484            0 :         shared_state: &mut WriteGuardSharedState<'_>,
     485            0 :         only_local: bool,
     486            0 :     ) -> Result<bool> {
     487            0 :         self.cancel(shared_state);
     488            0 : 
     489            0 :         // TODO: It's better to wait for s3 offloader termination before
     490            0 :         // removing data from s3. Though since s3 doesn't have transactions it
     491            0 :         // still wouldn't guarantee absense of data after removal.
     492            0 :         let conf = GlobalTimelines::get_global_config();
     493            0 :         if !only_local && conf.is_wal_backup_enabled() {
     494              :             // Note: we concurrently delete remote storage data from multiple
     495              :             // safekeepers. That's ok, s3 replies 200 if object doesn't exist and we
     496              :             // do some retries anyway.
     497            0 :             wal_backup::delete_timeline(&self.ttid).await?;
     498            0 :         }
     499            0 :         let dir_existed = delete_dir(&self.timeline_dir).await?;
     500            0 :         Ok(dir_existed)
     501            0 :     }
     502              : 
     503              :     /// Cancel timeline to prevent further usage. Background tasks will stop
     504              :     /// eventually after receiving cancellation signal.
     505            0 :     fn cancel(&self, shared_state: &mut WriteGuardSharedState<'_>) {
     506            0 :         info!("timeline {} is cancelled", self.ttid);
     507            0 :         self.cancel.cancel();
     508            0 :         // Close associated FDs. Nobody will be able to touch timeline data once
     509            0 :         // it is cancelled, so WAL storage won't be opened again.
     510            0 :         shared_state.sk.wal_store.close();
     511            0 :     }
     512              : 
     513              :     /// Returns if timeline is cancelled.
     514            0 :     pub fn is_cancelled(&self) -> bool {
     515            0 :         self.cancel.is_cancelled()
     516            0 :     }
     517              : 
     518              :     /// Take a writing mutual exclusive lock on timeline shared_state.
     519            0 :     pub async fn write_shared_state<'a>(self: &'a Arc<Self>) -> WriteGuardSharedState<'a> {
     520            0 :         WriteGuardSharedState::new(self.clone(), self.mutex.write().await)
     521            0 :     }
     522              : 
     523            0 :     pub async fn read_shared_state(&self) -> ReadGuardSharedState {
     524            0 :         self.mutex.read().await
     525            0 :     }
     526              : 
     527              :     /// Returns commit_lsn watch channel.
     528            0 :     pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
     529            0 :         self.commit_lsn_watch_rx.clone()
     530            0 :     }
     531              : 
     532              :     /// Returns term_flush_lsn watch channel.
     533            0 :     pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver<TermLsn> {
     534            0 :         self.term_flush_lsn_watch_rx.clone()
     535            0 :     }
     536              : 
     537              :     /// Returns watch channel for SharedState update version.
     538            0 :     pub fn get_state_version_rx(&self) -> watch::Receiver<usize> {
     539            0 :         self.shared_state_version_rx.clone()
     540            0 :     }
     541              : 
     542              :     /// Returns wal_seg_size.
     543            0 :     pub async fn get_wal_seg_size(&self) -> usize {
     544            0 :         self.read_shared_state().await.get_wal_seg_size()
     545            0 :     }
     546              : 
     547              :     /// Returns state of the timeline.
     548            0 :     pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
     549            0 :         let state = self.read_shared_state().await;
     550            0 :         (state.sk.state.inmem.clone(), state.sk.state.clone())
     551            0 :     }
     552              : 
     553              :     /// Returns latest backup_lsn.
     554            0 :     pub async fn get_wal_backup_lsn(&self) -> Lsn {
     555            0 :         self.read_shared_state().await.sk.state.inmem.backup_lsn
     556            0 :     }
     557              : 
     558              :     /// Sets backup_lsn to the given value.
     559            0 :     pub async fn set_wal_backup_lsn(self: &Arc<Self>, backup_lsn: Lsn) -> Result<()> {
     560            0 :         if self.is_cancelled() {
     561            0 :             bail!(TimelineError::Cancelled(self.ttid));
     562            0 :         }
     563              : 
     564            0 :         let mut state = self.write_shared_state().await;
     565            0 :         state.sk.state.inmem.backup_lsn = max(state.sk.state.inmem.backup_lsn, backup_lsn);
     566            0 :         // we should check whether to shut down offloader, but this will be done
     567            0 :         // soon by peer communication anyway.
     568            0 :         Ok(())
     569            0 :     }
     570              : 
     571              :     /// Get safekeeper info for broadcasting to broker and other peers.
     572            0 :     pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
     573            0 :         let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
     574            0 :         let shared_state = self.read_shared_state().await;
     575            0 :         shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
     576            0 :     }
     577              : 
     578              :     /// Update timeline state with peer safekeeper data.
     579            0 :     pub async fn record_safekeeper_info(
     580            0 :         self: &Arc<Self>,
     581            0 :         sk_info: SafekeeperTimelineInfo,
     582            0 :     ) -> Result<()> {
     583              :         {
     584            0 :             let mut shared_state = self.write_shared_state().await;
     585            0 :             shared_state.sk.record_safekeeper_info(&sk_info).await?;
     586            0 :             let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
     587            0 :             shared_state.peers_info.upsert(&peer_info);
     588            0 :         }
     589            0 :         Ok(())
     590            0 :     }
     591              : 
     592            0 :     pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
     593            0 :         let shared_state = self.read_shared_state().await;
     594            0 :         shared_state.get_peers(conf.heartbeat_timeout)
     595            0 :     }
     596              : 
     597            0 :     pub fn get_walsenders(&self) -> &Arc<WalSenders> {
     598            0 :         &self.walsenders
     599            0 :     }
     600              : 
     601            0 :     pub fn get_walreceivers(&self) -> &Arc<WalReceivers> {
     602            0 :         &self.walreceivers
     603            0 :     }
     604              : 
     605              :     /// Returns flush_lsn.
     606            0 :     pub async fn get_flush_lsn(&self) -> Lsn {
     607            0 :         self.read_shared_state().await.sk.wal_store.flush_lsn()
     608            0 :     }
     609              : 
     610              :     /// Gather timeline data for metrics.
     611            0 :     pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
     612            0 :         if self.is_cancelled() {
     613            0 :             return None;
     614            0 :         }
     615            0 : 
     616            0 :         let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
     617            0 :         let state = self.read_shared_state().await;
     618            0 :         Some(FullTimelineInfo {
     619            0 :             ttid: self.ttid,
     620            0 :             ps_feedback_count,
     621            0 :             last_ps_feedback,
     622            0 :             wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
     623            0 :             timeline_is_active: self.broker_active.load(Ordering::Relaxed),
     624            0 :             num_computes: self.walreceivers.get_num() as u32,
     625            0 :             last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
     626            0 :             epoch_start_lsn: state.sk.term_start_lsn,
     627            0 :             mem_state: state.sk.state.inmem.clone(),
     628            0 :             persisted_state: state.sk.state.clone(),
     629            0 :             flush_lsn: state.sk.wal_store.flush_lsn(),
     630            0 :             wal_storage: state.sk.wal_store.get_metrics(),
     631            0 :         })
     632            0 :     }
     633              : 
     634              :     /// Returns in-memory timeline state to build a full debug dump.
     635            0 :     pub async fn memory_dump(&self) -> debug_dump::Memory {
     636            0 :         let state = self.read_shared_state().await;
     637              : 
     638            0 :         let (write_lsn, write_record_lsn, flush_lsn, file_open) =
     639            0 :             state.sk.wal_store.internal_state();
     640            0 : 
     641            0 :         debug_dump::Memory {
     642            0 :             is_cancelled: self.is_cancelled(),
     643            0 :             peers_info_len: state.peers_info.0.len(),
     644            0 :             walsenders: self.walsenders.get_all(),
     645            0 :             wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
     646            0 :             active: self.broker_active.load(Ordering::Relaxed),
     647            0 :             num_computes: self.walreceivers.get_num() as u32,
     648            0 :             last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
     649            0 :             epoch_start_lsn: state.sk.term_start_lsn,
     650            0 :             mem_state: state.sk.state.inmem.clone(),
     651            0 :             write_lsn,
     652            0 :             write_record_lsn,
     653            0 :             flush_lsn,
     654            0 :             file_open,
     655            0 :         }
     656            0 :     }
     657              : 
     658              :     /// Apply a function to the control file state and persist it.
     659            0 :     pub async fn map_control_file<T>(
     660            0 :         self: &Arc<Self>,
     661            0 :         f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
     662            0 :     ) -> Result<T> {
     663            0 :         let mut state = self.write_shared_state().await;
     664            0 :         let mut persistent_state = state.sk.state.start_change();
     665              :         // If f returns error, we abort the change and don't persist anything.
     666            0 :         let res = f(&mut persistent_state)?;
     667              :         // If persisting fails, we abort the change and return error.
     668            0 :         state.sk.state.finish_change(&persistent_state).await?;
     669            0 :         Ok(res)
     670            0 :     }
     671              : 
     672              :     /// Get the timeline guard for reading/writing WAL files.
     673              :     /// TODO: if WAL files are not present on disk (evicted), they will be
     674              :     /// downloaded from S3. Also there will logic for preventing eviction
     675              :     /// while someone is holding FullAccessTimeline guard.
     676            0 :     pub async fn full_access_guard(self: &Arc<Self>) -> Result<FullAccessTimeline> {
     677            0 :         if self.is_cancelled() {
     678            0 :             bail!(TimelineError::Cancelled(self.ttid));
     679            0 :         }
     680            0 :         Ok(FullAccessTimeline { tli: self.clone() })
     681            0 :     }
     682              : }
     683              : 
     684              : /// This is a guard that allows to read/write disk timeline state.
     685              : /// All tasks that are using the disk should use this guard.
     686              : #[derive(Clone)]
     687              : pub struct FullAccessTimeline {
     688              :     pub tli: Arc<Timeline>,
     689              : }
     690              : 
     691              : impl Deref for FullAccessTimeline {
     692              :     type Target = Arc<Timeline>;
     693              : 
     694            0 :     fn deref(&self) -> &Self::Target {
     695            0 :         &self.tli
     696            0 :     }
     697              : }
     698              : 
     699              : impl FullAccessTimeline {
     700              :     /// Returns true if walsender should stop sending WAL to pageserver. We
     701              :     /// terminate it if remote_consistent_lsn reached commit_lsn and there is no
     702              :     /// computes. While there might be nothing to stream already, we learn about
     703              :     /// remote_consistent_lsn update through replication feedback, and we want
     704              :     /// to stop pushing to the broker if pageserver is fully caughtup.
     705            0 :     pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
     706            0 :         if self.is_cancelled() {
     707            0 :             return true;
     708            0 :         }
     709            0 :         let shared_state = self.read_shared_state().await;
     710            0 :         if self.walreceivers.get_num() == 0 {
     711            0 :             return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
     712            0 :             reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
     713            0 :         }
     714            0 :         false
     715            0 :     }
     716              : 
     717              :     /// Ensure that current term is t, erroring otherwise, and lock the state.
     718            0 :     pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
     719            0 :         let ss = self.read_shared_state().await;
     720            0 :         if ss.sk.state.acceptor_state.term != t {
     721            0 :             bail!(
     722            0 :                 "failed to acquire term {}, current term {}",
     723            0 :                 t,
     724            0 :                 ss.sk.state.acceptor_state.term
     725            0 :             );
     726            0 :         }
     727            0 :         Ok(ss)
     728            0 :     }
     729              : 
     730              :     /// Pass arrived message to the safekeeper.
     731            0 :     pub async fn process_msg(
     732            0 :         &self,
     733            0 :         msg: &ProposerAcceptorMessage,
     734            0 :     ) -> Result<Option<AcceptorProposerMessage>> {
     735            0 :         if self.is_cancelled() {
     736            0 :             bail!(TimelineError::Cancelled(self.ttid));
     737            0 :         }
     738              : 
     739              :         let mut rmsg: Option<AcceptorProposerMessage>;
     740              :         {
     741            0 :             let mut shared_state = self.write_shared_state().await;
     742            0 :             rmsg = shared_state.sk.process_msg(msg).await?;
     743              : 
     744              :             // if this is AppendResponse, fill in proper hot standby feedback.
     745            0 :             if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
     746            0 :                 resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
     747            0 :             }
     748              :         }
     749            0 :         Ok(rmsg)
     750            0 :     }
     751              : 
     752            0 :     pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
     753            0 :         let (_, persisted_state) = self.get_state().await;
     754            0 :         let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled();
     755            0 : 
     756            0 :         WalReader::new(
     757            0 :             &self.ttid,
     758            0 :             self.timeline_dir.clone(),
     759            0 :             &persisted_state,
     760            0 :             start_lsn,
     761            0 :             enable_remote_read,
     762            0 :         )
     763            0 :     }
     764              : 
     765            0 :     pub fn get_timeline_dir(&self) -> Utf8PathBuf {
     766            0 :         self.timeline_dir.clone()
     767            0 :     }
     768              : 
     769              :     /// Update in memory remote consistent lsn.
     770            0 :     pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
     771            0 :         let mut shared_state = self.write_shared_state().await;
     772            0 :         shared_state.sk.state.inmem.remote_consistent_lsn =
     773            0 :             max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
     774            0 :     }
     775              : }
     776              : 
     777              : /// Deletes directory and it's contents. Returns false if directory does not exist.
     778            0 : async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
     779            0 :     match fs::remove_dir_all(path).await {
     780            0 :         Ok(_) => Ok(true),
     781            0 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
     782            0 :         Err(e) => Err(e.into()),
     783              :     }
     784            0 : }
     785              : 
     786              : /// Get a path to the tenant directory. If you just need to get a timeline directory,
     787              : /// use FullAccessTimeline::get_timeline_dir instead.
     788           14 : pub(crate) fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
     789           14 :     conf.workdir.join(tenant_id.to_string())
     790           14 : }
     791              : 
     792              : /// Get a path to the timeline directory. If you need to read WAL files from disk,
     793              : /// use FullAccessTimeline::get_timeline_dir instead. This function does not check
     794              : /// timeline eviction status and WAL files might not be present on disk.
     795           14 : pub(crate) fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
     796           14 :     get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
     797           14 : }
        

Generated by: LCOV version 2.1-beta