LCOV - code coverage report
Current view: top level - safekeeper/src - timeline_manager.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 55.3 % 349 193
Test Date: 2025-07-16 12:29:03 Functions: 66.7 % 51 34

            Line data    Source code
       1              : //! The timeline manager task is responsible for managing the timeline's background tasks.
       2              : //!
       3              : //! It is spawned alongside each timeline and exits when the timeline is deleted.
       4              : //! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
       5              : //! It also can manage some reactive state, like should the timeline be active for broker pushes or not.
       6              : //!
       7              : //! Be aware that you need to be extra careful with manager code, because it is not respawned on panic.
       8              : //! Also, if it will stuck in some branch, it will prevent any further progress in the timeline.
       9              : 
      10              : use std::sync::Arc;
      11              : use std::sync::atomic::AtomicUsize;
      12              : use std::time::Duration;
      13              : 
      14              : use futures::channel::oneshot;
      15              : use postgres_ffi::XLogSegNo;
      16              : use safekeeper_api::Term;
      17              : use safekeeper_api::models::PeerInfo;
      18              : use serde::{Deserialize, Serialize};
      19              : use tokio::task::{JoinError, JoinHandle};
      20              : use tokio::time::Instant;
      21              : use tokio_util::sync::CancellationToken;
      22              : use tracing::{Instrument, debug, info, info_span, instrument, warn};
      23              : use utils::lsn::Lsn;
      24              : 
      25              : use crate::SafeKeeperConf;
      26              : use crate::control_file::{FileStorage, Storage};
      27              : use crate::metrics::{
      28              :     MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS, NUM_EVICTED_TIMELINES,
      29              : };
      30              : use crate::rate_limit::{RateLimiter, rand_duration};
      31              : use crate::recovery::recovery_main;
      32              : use crate::remove_wal::calc_horizon_lsn;
      33              : use crate::send_wal::WalSenders;
      34              : use crate::state::TimelineState;
      35              : use crate::timeline::{ManagerTimeline, ReadGuardSharedState, StateSK, WalResidentTimeline};
      36              : use crate::timeline_guard::{AccessService, GuardId, ResidenceGuard};
      37              : use crate::timelines_set::{TimelineSetGuard, TimelinesSet};
      38              : use crate::wal_backup::{self, WalBackup, WalBackupTaskHandle};
      39              : use crate::wal_backup_partial::{self, PartialBackup, PartialRemoteSegment};
      40              : 
      41              : pub(crate) struct StateSnapshot {
      42              :     // inmem values
      43              :     pub(crate) commit_lsn: Lsn,
      44              :     pub(crate) backup_lsn: Lsn,
      45              :     pub(crate) remote_consistent_lsn: Lsn,
      46              : 
      47              :     // persistent control file values
      48              :     pub(crate) cfile_commit_lsn: Lsn,
      49              :     pub(crate) cfile_remote_consistent_lsn: Lsn,
      50              :     pub(crate) cfile_backup_lsn: Lsn,
      51              : 
      52              :     // latest state
      53              :     pub(crate) flush_lsn: Lsn,
      54              :     pub(crate) last_log_term: Term,
      55              : 
      56              :     // misc
      57              :     pub(crate) cfile_last_persist_at: std::time::Instant,
      58              :     pub(crate) inmem_flush_pending: bool,
      59              :     pub(crate) wal_removal_on_hold: bool,
      60              :     pub(crate) peers: Vec<PeerInfo>,
      61              : }
      62              : 
      63              : impl StateSnapshot {
      64              :     /// Create a new snapshot of the timeline state.
      65           50 :     fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self {
      66           50 :         let state = read_guard.sk.state();
      67           50 :         Self {
      68           50 :             commit_lsn: state.inmem.commit_lsn,
      69           50 :             backup_lsn: state.inmem.backup_lsn,
      70           50 :             remote_consistent_lsn: state.inmem.remote_consistent_lsn,
      71           50 :             cfile_commit_lsn: state.commit_lsn,
      72           50 :             cfile_remote_consistent_lsn: state.remote_consistent_lsn,
      73           50 :             cfile_backup_lsn: state.backup_lsn,
      74           50 :             flush_lsn: read_guard.sk.flush_lsn(),
      75           50 :             last_log_term: read_guard.sk.last_log_term(),
      76           50 :             cfile_last_persist_at: state.pers.last_persist_at(),
      77           50 :             inmem_flush_pending: Self::has_unflushed_inmem_state(state),
      78           50 :             wal_removal_on_hold: read_guard.wal_removal_on_hold,
      79           50 :             peers: read_guard.get_peers(heartbeat_timeout),
      80           50 :         }
      81           50 :     }
      82              : 
      83           50 :     fn has_unflushed_inmem_state(state: &TimelineState<FileStorage>) -> bool {
      84           50 :         state.inmem.commit_lsn > state.commit_lsn
      85           25 :             || state.inmem.backup_lsn > state.backup_lsn
      86           25 :             || state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
      87           25 :             || state.inmem.remote_consistent_lsn > state.remote_consistent_lsn
      88           50 :     }
      89              : }
      90              : 
      91              : /// Control how often the manager task should wake up to check updates.
      92              : /// There is no need to check for updates more often than this.
      93              : const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
      94              : 
      95              : pub enum ManagerCtlMessage {
      96              :     /// Request to get a guard for WalResidentTimeline, with WAL files available locally.
      97              :     GuardRequest(tokio::sync::oneshot::Sender<anyhow::Result<ResidenceGuard>>),
      98              :     /// Get a guard for WalResidentTimeline if the timeline is not currently offloaded, else None
      99              :     TryGuardRequest(tokio::sync::oneshot::Sender<Option<ResidenceGuard>>),
     100              :     /// Request to drop the guard.
     101              :     GuardDrop(GuardId),
     102              :     /// Request to reset uploaded partial backup state.
     103              :     BackupPartialReset(oneshot::Sender<anyhow::Result<Vec<String>>>),
     104              : }
     105              : 
     106              : impl std::fmt::Debug for ManagerCtlMessage {
     107            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     108            0 :         match self {
     109            0 :             ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"),
     110            0 :             ManagerCtlMessage::TryGuardRequest(_) => write!(f, "TryGuardRequest"),
     111            0 :             ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({id:?})"),
     112            0 :             ManagerCtlMessage::BackupPartialReset(_) => write!(f, "BackupPartialReset"),
     113              :         }
     114            0 :     }
     115              : }
     116              : 
     117              : pub struct ManagerCtl {
     118              :     manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
     119              : 
     120              :     // this is used to initialize manager, it will be moved out in bootstrap().
     121              :     init_manager_rx:
     122              :         std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>>>,
     123              : }
     124              : 
     125              : impl Default for ManagerCtl {
     126            0 :     fn default() -> Self {
     127            0 :         Self::new()
     128            0 :     }
     129              : }
     130              : 
     131              : impl ManagerCtl {
     132            5 :     pub fn new() -> Self {
     133            5 :         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
     134            5 :         Self {
     135            5 :             manager_tx: tx,
     136            5 :             init_manager_rx: std::sync::Mutex::new(Some(rx)),
     137            5 :         }
     138            5 :     }
     139              : 
     140              :     /// Issue a new guard and wait for manager to prepare the timeline.
     141              :     /// Sends a message to the manager and waits for the response.
     142              :     /// Can be blocked indefinitely if the manager is stuck.
     143           10 :     pub async fn wal_residence_guard(&self) -> anyhow::Result<ResidenceGuard> {
     144           10 :         let (tx, rx) = tokio::sync::oneshot::channel();
     145           10 :         self.manager_tx.send(ManagerCtlMessage::GuardRequest(tx))?;
     146              : 
     147              :         // wait for the manager to respond with the guard
     148           10 :         rx.await
     149           10 :             .map_err(|e| anyhow::anyhow!("response read fail: {:?}", e))
     150           10 :             .and_then(std::convert::identity)
     151           10 :     }
     152              : 
     153              :     /// Issue a new guard if the timeline is currently not offloaded, else return None
     154              :     /// Sends a message to the manager and waits for the response.
     155              :     /// Can be blocked indefinitely if the manager is stuck.
     156            0 :     pub async fn try_wal_residence_guard(&self) -> anyhow::Result<Option<ResidenceGuard>> {
     157            0 :         let (tx, rx) = tokio::sync::oneshot::channel();
     158            0 :         self.manager_tx
     159            0 :             .send(ManagerCtlMessage::TryGuardRequest(tx))?;
     160              : 
     161              :         // wait for the manager to respond with the guard
     162            0 :         rx.await
     163            0 :             .map_err(|e| anyhow::anyhow!("response read fail: {:?}", e))
     164            0 :     }
     165              : 
     166              :     /// Request timeline manager to reset uploaded partial segment state and
     167              :     /// wait for the result.
     168            0 :     pub async fn backup_partial_reset(&self) -> anyhow::Result<Vec<String>> {
     169            0 :         let (tx, rx) = oneshot::channel();
     170            0 :         self.manager_tx
     171            0 :             .send(ManagerCtlMessage::BackupPartialReset(tx))
     172            0 :             .expect("manager task is not running");
     173            0 :         match rx.await {
     174            0 :             Ok(res) => res,
     175            0 :             Err(_) => anyhow::bail!("timeline manager is gone"),
     176              :         }
     177            0 :     }
     178              : 
     179              :     /// Must be called exactly once to bootstrap the manager.
     180            5 :     pub fn bootstrap_manager(
     181            5 :         &self,
     182            5 :     ) -> (
     183            5 :         tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
     184            5 :         tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
     185            5 :     ) {
     186            5 :         let rx = self
     187            5 :             .init_manager_rx
     188            5 :             .lock()
     189            5 :             .expect("mutex init_manager_rx poisoned")
     190            5 :             .take()
     191            5 :             .expect("manager already bootstrapped");
     192              : 
     193            5 :         (self.manager_tx.clone(), rx)
     194            5 :     }
     195              : }
     196              : 
     197              : pub(crate) struct Manager {
     198              :     // configuration & dependencies
     199              :     pub(crate) tli: ManagerTimeline,
     200              :     pub(crate) conf: SafeKeeperConf,
     201              :     pub(crate) wal_seg_size: usize,
     202              :     pub(crate) walsenders: Arc<WalSenders>,
     203              :     pub(crate) wal_backup: Arc<WalBackup>,
     204              : 
     205              :     // current state
     206              :     pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,
     207              :     pub(crate) num_computes_rx: tokio::sync::watch::Receiver<usize>,
     208              :     pub(crate) tli_broker_active: TimelineSetGuard,
     209              :     pub(crate) last_removed_segno: XLogSegNo,
     210              :     pub(crate) is_offloaded: bool,
     211              : 
     212              :     // background tasks
     213              :     pub(crate) backup_task: Option<WalBackupTaskHandle>,
     214              :     pub(crate) recovery_task: Option<JoinHandle<()>>,
     215              :     pub(crate) wal_removal_task: Option<JoinHandle<anyhow::Result<u64>>>,
     216              : 
     217              :     // partial backup
     218              :     pub(crate) partial_backup_task:
     219              :         Option<(JoinHandle<Option<PartialRemoteSegment>>, CancellationToken)>,
     220              :     pub(crate) partial_backup_uploaded: Option<PartialRemoteSegment>,
     221              : 
     222              :     // misc
     223              :     pub(crate) access_service: AccessService,
     224              :     pub(crate) global_rate_limiter: RateLimiter,
     225              : 
     226              :     // Anti-flapping state: we evict timelines eagerly if they are inactive, but should not
     227              :     // evict them if they go inactive very soon after being restored.
     228              :     pub(crate) evict_not_before: Instant,
     229              : }
     230              : 
     231              : /// This task gets spawned alongside each timeline and is responsible for managing the timeline's
     232              : /// background tasks.
     233              : /// Be careful, this task is not respawned on panic, so it should not panic.
     234              : #[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))]
     235              : pub async fn main_task(
     236              :     tli: ManagerTimeline,
     237              :     conf: SafeKeeperConf,
     238              :     broker_active_set: Arc<TimelinesSet>,
     239              :     manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
     240              :     mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
     241              :     global_rate_limiter: RateLimiter,
     242              :     wal_backup: Arc<WalBackup>,
     243              : ) {
     244              :     tli.set_status(Status::Started);
     245              : 
     246              :     let defer_tli = tli.tli.clone();
     247              :     scopeguard::defer! {
     248              :         if defer_tli.is_cancelled() {
     249              :             info!("manager task finished");
     250              :         } else {
     251              :             warn!("manager task finished prematurely");
     252              :         }
     253              :     };
     254              : 
     255              :     let mut mgr = Manager::new(
     256              :         tli,
     257              :         conf,
     258              :         broker_active_set,
     259              :         manager_tx,
     260              :         global_rate_limiter,
     261              :         wal_backup,
     262              :     )
     263              :     .await;
     264              : 
     265              :     // Start recovery task which always runs on the timeline.
     266              :     if !mgr.is_offloaded && mgr.conf.peer_recovery_enabled {
     267              :         // Recovery task is only spawned if we can get a residence guard (i.e. timeline is not already shutting down)
     268              :         if let Ok(tli) = mgr.wal_resident_timeline() {
     269              :             mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone())));
     270              :         }
     271              :     }
     272              : 
     273              :     // If timeline is evicted, reflect that in the metric.
     274              :     if mgr.is_offloaded {
     275              :         NUM_EVICTED_TIMELINES.inc();
     276              :     }
     277              : 
     278              :     let last_state = 'outer: loop {
     279              :         MANAGER_ITERATIONS_TOTAL.inc();
     280              : 
     281              :         mgr.set_status(Status::StateSnapshot);
     282              :         let state_snapshot = mgr.state_snapshot().await;
     283              : 
     284              :         let mut next_event: Option<Instant> = None;
     285              :         if !mgr.is_offloaded {
     286              :             let num_computes = *mgr.num_computes_rx.borrow();
     287              : 
     288              :             mgr.set_status(Status::UpdateBackup);
     289              :             let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await;
     290              :             mgr.update_is_active(is_wal_backup_required, num_computes, &state_snapshot);
     291              : 
     292              :             mgr.set_status(Status::UpdateControlFile);
     293              :             mgr.update_control_file_save(&state_snapshot, &mut next_event)
     294              :                 .await;
     295              : 
     296              :             mgr.set_status(Status::UpdateWalRemoval);
     297              :             mgr.update_wal_removal(&state_snapshot).await;
     298              : 
     299              :             mgr.set_status(Status::UpdatePartialBackup);
     300              :             mgr.update_partial_backup(&state_snapshot).await;
     301              : 
     302              :             let now = Instant::now();
     303              :             if mgr.evict_not_before > now {
     304              :                 // we should wait until evict_not_before
     305              :                 update_next_event(&mut next_event, mgr.evict_not_before);
     306              :             }
     307              : 
     308              :             if mgr.conf.enable_offload
     309              :                 && mgr.evict_not_before <= now
     310              :                 && mgr.ready_for_eviction(&next_event, &state_snapshot)
     311              :             {
     312              :                 // check rate limiter and evict timeline if possible
     313              :                 match mgr.global_rate_limiter.try_acquire_eviction() {
     314              :                     Some(_permit) => {
     315              :                         mgr.set_status(Status::EvictTimeline);
     316              :                         if !mgr.evict_timeline().await {
     317              :                             // eviction failed, try again later
     318              :                             mgr.evict_not_before =
     319              :                                 Instant::now() + rand_duration(&mgr.conf.eviction_min_resident);
     320              :                             update_next_event(&mut next_event, mgr.evict_not_before);
     321              :                         }
     322              :                     }
     323              :                     None => {
     324              :                         // we can't evict timeline now, will try again later
     325              :                         mgr.evict_not_before =
     326              :                             Instant::now() + rand_duration(&mgr.conf.eviction_min_resident);
     327              :                         update_next_event(&mut next_event, mgr.evict_not_before);
     328              :                     }
     329              :                 }
     330              :             }
     331              :         }
     332              : 
     333              :         mgr.set_status(Status::Wait);
     334              :         // wait until something changes. tx channels are stored under Arc, so they will not be
     335              :         // dropped until the manager task is finished.
     336              :         tokio::select! {
     337              :             _ = mgr.tli.cancel.cancelled() => {
     338              :                 // timeline was deleted
     339              :                 break 'outer state_snapshot;
     340              :             }
     341           45 :             _ = async {
     342              :                 // don't wake up on every state change, but at most every REFRESH_INTERVAL
     343           45 :                 tokio::time::sleep(REFRESH_INTERVAL).await;
     344           21 :                 let _ = mgr.state_version_rx.changed().await;
     345           21 :             } => {
     346              :                 // state was updated
     347              :             }
     348              :             _ = mgr.num_computes_rx.changed() => {
     349              :                 // number of connected computes was updated
     350              :             }
     351              :             _ = sleep_until(&next_event) => {
     352              :                 // we were waiting for some event (e.g. cfile save)
     353              :             }
     354              :             res = await_task_finish(mgr.wal_removal_task.as_mut()) => {
     355              :                 // WAL removal task finished
     356              :                 mgr.wal_removal_task = None;
     357              :                 mgr.update_wal_removal_end(res);
     358              :             }
     359              :             res = await_task_finish(mgr.partial_backup_task.as_mut().map(|(handle, _)| handle)) => {
     360              :                 // partial backup task finished
     361              :                 mgr.partial_backup_task = None;
     362              :                 mgr.update_partial_backup_end(res);
     363              :             }
     364              : 
     365              :             msg = manager_rx.recv() => {
     366              :                 mgr.set_status(Status::HandleMessage);
     367              :                 mgr.handle_message(msg).await;
     368              :             }
     369              :         }
     370              :     };
     371              :     mgr.set_status(Status::Exiting);
     372              : 
     373              :     // remove timeline from the broker active set sooner, before waiting for background tasks
     374              :     mgr.tli_broker_active.set(false);
     375              : 
     376              :     // shutdown background tasks
     377              :     if let Some(storage) = mgr.wal_backup.get_storage() {
     378              :         if let Some(backup_task) = mgr.backup_task.take() {
     379              :             // If we fell through here, then the timeline is shutting down. This is important
     380              :             // because otherwise joining on the wal_backup handle might hang.
     381              :             assert!(mgr.tli.cancel.is_cancelled());
     382              : 
     383              :             backup_task.join().await;
     384              :         }
     385              :         wal_backup::update_task(&mut mgr, storage, false, &last_state).await;
     386              :     }
     387              : 
     388              :     if let Some(recovery_task) = &mut mgr.recovery_task {
     389              :         if let Err(e) = recovery_task.await {
     390              :             warn!("recovery task failed: {:?}", e);
     391              :         }
     392              :     }
     393              : 
     394              :     if let Some((handle, cancel)) = &mut mgr.partial_backup_task {
     395              :         cancel.cancel();
     396              :         if let Err(e) = handle.await {
     397              :             warn!("partial backup task failed: {:?}", e);
     398              :         }
     399              :     }
     400              : 
     401              :     if let Some(wal_removal_task) = &mut mgr.wal_removal_task {
     402              :         let res = wal_removal_task.await;
     403              :         mgr.update_wal_removal_end(res);
     404              :     }
     405              : 
     406              :     // If timeline is deleted while evicted decrement the gauge.
     407              :     if mgr.tli.is_cancelled() && mgr.is_offloaded {
     408              :         NUM_EVICTED_TIMELINES.dec();
     409              :     }
     410              : 
     411              :     mgr.set_status(Status::Finished);
     412              : }
     413              : 
     414              : impl Manager {
     415            5 :     async fn new(
     416            5 :         tli: ManagerTimeline,
     417            5 :         conf: SafeKeeperConf,
     418            5 :         broker_active_set: Arc<TimelinesSet>,
     419            5 :         manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
     420            5 :         global_rate_limiter: RateLimiter,
     421            5 :         wal_backup: Arc<WalBackup>,
     422            5 :     ) -> Manager {
     423            5 :         let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
     424              :         Manager {
     425            5 :             wal_seg_size: tli.get_wal_seg_size().await,
     426            5 :             walsenders: tli.get_walsenders().clone(),
     427            5 :             wal_backup,
     428            5 :             state_version_rx: tli.get_state_version_rx(),
     429            5 :             num_computes_rx: tli.get_walreceivers().get_num_rx(),
     430            5 :             tli_broker_active: broker_active_set.guard(tli.clone()),
     431              :             last_removed_segno: 0,
     432            5 :             is_offloaded,
     433            5 :             backup_task: None,
     434            5 :             recovery_task: None,
     435            5 :             wal_removal_task: None,
     436            5 :             partial_backup_task: None,
     437            5 :             partial_backup_uploaded,
     438            5 :             access_service: AccessService::new(manager_tx),
     439            5 :             tli,
     440            5 :             global_rate_limiter,
     441              :             // to smooth out evictions spike after restart
     442            5 :             evict_not_before: Instant::now() + rand_duration(&conf.eviction_min_resident),
     443            5 :             conf,
     444              :         }
     445            5 :     }
     446              : 
     447          313 :     fn set_status(&self, status: Status) {
     448          313 :         self.tli.set_status(status);
     449          313 :     }
     450              : 
     451              :     /// Get a WalResidentTimeline.
     452              :     /// Manager code must use this function instead of one from `Timeline`
     453              :     /// directly, because it will deadlock.
     454              :     ///
     455              :     /// This function is fallible because the guard may not be created if the timeline is
     456              :     /// shutting down.
     457            5 :     pub(crate) fn wal_resident_timeline(&mut self) -> anyhow::Result<WalResidentTimeline> {
     458            5 :         assert!(!self.is_offloaded);
     459            5 :         let guard = self.access_service.create_guard(
     460            5 :             self.tli
     461            5 :                 .gate
     462            5 :                 .enter()
     463            5 :                 .map_err(|_| anyhow::anyhow!("Timeline shutting down"))?,
     464              :         );
     465            5 :         Ok(WalResidentTimeline::new(self.tli.clone(), guard))
     466            5 :     }
     467              : 
     468              :     /// Get a snapshot of the timeline state.
     469           50 :     async fn state_snapshot(&self) -> StateSnapshot {
     470           50 :         let _timer = MISC_OPERATION_SECONDS
     471           50 :             .with_label_values(&["state_snapshot"])
     472           50 :             .start_timer();
     473              : 
     474           50 :         StateSnapshot::new(
     475           50 :             self.tli.read_shared_state().await,
     476           50 :             self.conf.heartbeat_timeout,
     477              :         )
     478           50 :     }
     479              : 
     480              :     /// Spawns/kills backup task and returns true if backup is required.
     481           50 :     async fn update_backup(&mut self, num_computes: usize, state: &StateSnapshot) -> bool {
     482           50 :         let is_wal_backup_required =
     483           50 :             wal_backup::is_wal_backup_required(self.wal_seg_size, num_computes, state);
     484              : 
     485           50 :         if let Some(storage) = self.wal_backup.get_storage() {
     486            0 :             wal_backup::update_task(self, storage, is_wal_backup_required, state).await;
     487           50 :         }
     488              : 
     489              :         // update the state in Arc<Timeline>
     490           50 :         self.tli.wal_backup_active.store(
     491           50 :             self.backup_task.is_some(),
     492           50 :             std::sync::atomic::Ordering::Relaxed,
     493              :         );
     494           50 :         is_wal_backup_required
     495           50 :     }
     496              : 
     497              :     /// Update is_active flag and returns its value.
     498           50 :     fn update_is_active(
     499           50 :         &mut self,
     500           50 :         is_wal_backup_required: bool,
     501           50 :         num_computes: usize,
     502           50 :         state: &StateSnapshot,
     503           50 :     ) {
     504           50 :         let is_active = is_wal_backup_required
     505           24 :             || num_computes > 0
     506           24 :             || state.remote_consistent_lsn < state.commit_lsn;
     507              : 
     508              :         // update the broker timeline set
     509           50 :         if self.tli_broker_active.set(is_active) {
     510              :             // write log if state has changed
     511            5 :             info!(
     512            0 :                 "timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
     513              :                 is_active, state.remote_consistent_lsn, state.commit_lsn,
     514              :             );
     515              : 
     516            5 :             MANAGER_ACTIVE_CHANGES.inc();
     517           45 :         }
     518              : 
     519              :         // update the state in Arc<Timeline>
     520           50 :         self.tli
     521           50 :             .broker_active
     522           50 :             .store(is_active, std::sync::atomic::Ordering::Relaxed);
     523           50 :     }
     524              : 
     525              :     /// Save control file if needed. Returns Instant if we should persist the control file in the future.
     526           50 :     async fn update_control_file_save(
     527           50 :         &self,
     528           50 :         state: &StateSnapshot,
     529           50 :         next_event: &mut Option<Instant>,
     530           50 :     ) {
     531           50 :         if !state.inmem_flush_pending {
     532           25 :             return;
     533           25 :         }
     534              : 
     535           25 :         if state.cfile_last_persist_at.elapsed() > self.conf.control_file_save_interval
     536              :             // If the control file's commit_lsn lags more than one segment behind the current
     537              :             // commit_lsn, flush immediately to limit recovery time in case of a crash. We don't do
     538              :             // this on the WAL ingest hot path since it incurs fsync latency.
     539            0 :             || state.commit_lsn.saturating_sub(state.cfile_commit_lsn).0 >= self.wal_seg_size as u64
     540              :         {
     541           25 :             let mut write_guard = self.tli.write_shared_state().await;
     542              :             // it should be done in the background because it blocks manager task, but flush() should
     543              :             // be fast enough not to be a problem now
     544           25 :             if let Err(e) = write_guard.sk.state_mut().flush().await {
     545            0 :                 warn!("failed to save control file: {:?}", e);
     546           24 :             }
     547            0 :         } else {
     548            0 :             // we should wait until some time passed until the next save
     549            0 :             update_next_event(
     550            0 :                 next_event,
     551            0 :                 (state.cfile_last_persist_at + self.conf.control_file_save_interval).into(),
     552            0 :             );
     553            0 :         }
     554           49 :     }
     555              : 
     556              :     /// Spawns WAL removal task if needed.
     557           49 :     async fn update_wal_removal(&mut self, state: &StateSnapshot) {
     558           49 :         if self.wal_removal_task.is_some() || state.wal_removal_on_hold {
     559              :             // WAL removal is already in progress or hold off
     560            0 :             return;
     561           49 :         }
     562              : 
     563              :         // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
     564              :         // This allows to get better read speed for pageservers that are lagging behind,
     565              :         // at the cost of keeping more WAL on disk.
     566           49 :         let replication_horizon_lsn = if self.conf.walsenders_keep_horizon {
     567            0 :             self.walsenders.laggard_lsn()
     568              :         } else {
     569           49 :             None
     570              :         };
     571              : 
     572           49 :         let removal_horizon_lsn = calc_horizon_lsn(state, replication_horizon_lsn);
     573           49 :         let removal_horizon_segno = removal_horizon_lsn
     574           49 :             .segment_number(self.wal_seg_size)
     575           49 :             .saturating_sub(1);
     576              : 
     577           49 :         if removal_horizon_segno > self.last_removed_segno {
     578              :             // we need to remove WAL
     579            0 :             let Ok(timeline_gate_guard) = self.tli.gate.enter() else {
     580            0 :                 tracing::info!("Timeline shutdown, not spawning WAL removal task");
     581            0 :                 return;
     582              :             };
     583              : 
     584            0 :             let remover = match self.tli.read_shared_state().await.sk {
     585            0 :                 StateSK::Loaded(ref sk) => {
     586            0 :                     crate::wal_storage::Storage::remove_up_to(&sk.wal_store, removal_horizon_segno)
     587              :                 }
     588              :                 StateSK::Offloaded(_) => {
     589              :                     // we can't remove WAL if it's not loaded
     590            0 :                     warn!("unexpectedly trying to run WAL removal on offloaded timeline");
     591            0 :                     return;
     592              :                 }
     593            0 :                 StateSK::Empty => unreachable!(),
     594              :             };
     595              : 
     596            0 :             self.wal_removal_task = Some(tokio::spawn(
     597            0 :                 async move {
     598            0 :                     let _timeline_gate_guard = timeline_gate_guard;
     599              : 
     600            0 :                     remover.await?;
     601            0 :                     Ok(removal_horizon_segno)
     602            0 :                 }
     603            0 :                 .instrument(info_span!("WAL removal", ttid=%self.tli.ttid)),
     604              :             ));
     605           49 :         }
     606           49 :     }
     607              : 
     608              :     /// Update the state after WAL removal task finished.
     609            0 :     fn update_wal_removal_end(&mut self, res: Result<anyhow::Result<u64>, JoinError>) {
     610            0 :         let new_last_removed_segno = match res {
     611            0 :             Ok(Ok(segno)) => segno,
     612            0 :             Err(e) => {
     613            0 :                 warn!("WAL removal task failed: {:?}", e);
     614            0 :                 return;
     615              :             }
     616            0 :             Ok(Err(e)) => {
     617            0 :                 warn!("WAL removal task failed: {:?}", e);
     618            0 :                 return;
     619              :             }
     620              :         };
     621              : 
     622            0 :         self.last_removed_segno = new_last_removed_segno;
     623              :         // update the state in Arc<Timeline>
     624            0 :         self.tli
     625            0 :             .last_removed_segno
     626            0 :             .store(new_last_removed_segno, std::sync::atomic::Ordering::Relaxed);
     627            0 :     }
     628              : 
     629              :     /// Spawns partial WAL backup task if needed.
     630           49 :     async fn update_partial_backup(&mut self, state: &StateSnapshot) {
     631              :         // check if WAL backup is enabled and should be started
     632           49 :         let Some(storage) = self.wal_backup.get_storage() else {
     633           49 :             return;
     634              :         };
     635              : 
     636            0 :         if self.partial_backup_task.is_some() {
     637              :             // partial backup is already running
     638            0 :             return;
     639            0 :         }
     640              : 
     641            0 :         if !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded) {
     642              :             // nothing to upload
     643            0 :             return;
     644            0 :         }
     645              : 
     646            0 :         let Ok(resident) = self.wal_resident_timeline() else {
     647              :             // Shutting down
     648            0 :             return;
     649              :         };
     650              : 
     651              :         // Get WalResidentTimeline and start partial backup task.
     652            0 :         let cancel = CancellationToken::new();
     653            0 :         let handle = tokio::spawn(wal_backup_partial::main_task(
     654            0 :             resident,
     655            0 :             self.conf.clone(),
     656            0 :             self.global_rate_limiter.clone(),
     657            0 :             cancel.clone(),
     658            0 :             storage,
     659              :         ));
     660            0 :         self.partial_backup_task = Some((handle, cancel));
     661           49 :     }
     662              : 
     663              :     /// Update the state after partial WAL backup task finished.
     664            0 :     fn update_partial_backup_end(&mut self, res: Result<Option<PartialRemoteSegment>, JoinError>) {
     665            0 :         match res {
     666            0 :             Ok(new_upload_state) => {
     667            0 :                 self.partial_backup_uploaded = new_upload_state;
     668            0 :             }
     669            0 :             Err(e) => {
     670            0 :                 warn!("partial backup task panicked: {:?}", e);
     671              :             }
     672              :         }
     673            0 :     }
     674              : 
     675              :     /// Reset partial backup state and remove its remote storage data. Since it
     676              :     /// might concurrently uploading something, cancel the task first.
     677            0 :     async fn backup_partial_reset(&mut self) -> anyhow::Result<Vec<String>> {
     678            0 :         let Some(storage) = self.wal_backup.get_storage() else {
     679            0 :             anyhow::bail!("remote storage is not enabled");
     680              :         };
     681              : 
     682            0 :         info!("resetting partial backup state");
     683              :         // Force unevict timeline if it is evicted before erasing partial backup
     684              :         // state. The intended use of this function is to drop corrupted remote
     685              :         // state; we haven't enabled local files deletion yet anywhere,
     686              :         // so direct switch is safe.
     687            0 :         if self.is_offloaded {
     688            0 :             self.tli.switch_to_present().await?;
     689              :             // switch manager state as soon as possible
     690            0 :             self.is_offloaded = false;
     691            0 :         }
     692              : 
     693            0 :         if let Some((handle, cancel)) = &mut self.partial_backup_task {
     694            0 :             cancel.cancel();
     695            0 :             info!("cancelled partial backup task, awaiting it");
     696              :             // we're going to reset .partial_backup_uploaded to None anyway, so ignore the result
     697            0 :             handle.await.ok();
     698            0 :             self.partial_backup_task = None;
     699            0 :         }
     700              : 
     701            0 :         let tli = self.wal_resident_timeline()?;
     702            0 :         let mut partial_backup = PartialBackup::new(tli, self.conf.clone(), storage).await;
     703              :         // Reset might fail e.g. when cfile is already reset but s3 removal
     704              :         // failed, so set manager state to None beforehand. In any case caller
     705              :         // is expected to retry until success.
     706            0 :         self.partial_backup_uploaded = None;
     707            0 :         let res = partial_backup.reset().await?;
     708            0 :         info!("reset is done");
     709            0 :         Ok(res)
     710            0 :     }
     711              : 
     712              :     /// Handle message arrived from ManagerCtl.
     713           16 :     async fn handle_message(&mut self, msg: Option<ManagerCtlMessage>) {
     714           16 :         debug!("received manager message: {:?}", msg);
     715           16 :         match msg {
     716           10 :             Some(ManagerCtlMessage::GuardRequest(tx)) => {
     717           10 :                 if self.is_offloaded {
     718              :                     // trying to unevict timeline, but without gurarantee that it will be successful
     719            0 :                     self.unevict_timeline().await;
     720           10 :                 }
     721              : 
     722           10 :                 let guard = if self.is_offloaded {
     723            0 :                     Err(anyhow::anyhow!("timeline is offloaded, can't get a guard"))
     724              :                 } else {
     725           10 :                     match self.tli.gate.enter() {
     726           10 :                         Ok(gate_guard) => Ok(self.access_service.create_guard(gate_guard)),
     727            0 :                         Err(_) => Err(anyhow::anyhow!(
     728            0 :                             "timeline is shutting down, can't get a guard"
     729            0 :                         )),
     730              :                     }
     731              :                 };
     732              : 
     733           10 :                 if tx.send(guard).is_err() {
     734            0 :                     warn!("failed to reply with a guard, receiver dropped");
     735           10 :                 }
     736              :             }
     737            0 :             Some(ManagerCtlMessage::TryGuardRequest(tx)) => {
     738            0 :                 let result = if self.is_offloaded {
     739            0 :                     None
     740              :                 } else {
     741            0 :                     match self.tli.gate.enter() {
     742            0 :                         Ok(gate_guard) => Some(self.access_service.create_guard(gate_guard)),
     743            0 :                         Err(_) => None,
     744              :                     }
     745              :                 };
     746              : 
     747            0 :                 if tx.send(result).is_err() {
     748            0 :                     warn!("failed to reply with a guard, receiver dropped");
     749            0 :                 }
     750              :             }
     751            6 :             Some(ManagerCtlMessage::GuardDrop(guard_id)) => {
     752            6 :                 self.access_service.drop_guard(guard_id);
     753            6 :             }
     754            0 :             Some(ManagerCtlMessage::BackupPartialReset(tx)) => {
     755            0 :                 info!("resetting uploaded partial backup state");
     756            0 :                 let res = self.backup_partial_reset().await;
     757            0 :                 if let Err(ref e) = res {
     758            0 :                     warn!("failed to reset partial backup state: {:?}", e);
     759            0 :                 }
     760            0 :                 if tx.send(res).is_err() {
     761            0 :                     warn!("failed to send partial backup reset result, receiver dropped");
     762            0 :                 }
     763              :             }
     764              :             None => {
     765              :                 // can't happen, we're holding the sender
     766            0 :                 unreachable!();
     767              :             }
     768              :         }
     769           16 :     }
     770              : }
     771              : 
     772              : // utility functions
     773           49 : async fn sleep_until(option: &Option<tokio::time::Instant>) {
     774           45 :     if let Some(timeout) = option {
     775            0 :         tokio::time::sleep_until(*timeout).await;
     776              :     } else {
     777           45 :         futures::future::pending::<()>().await;
     778              :     }
     779            0 : }
     780              : 
     781              : /// Future that resolves when the task is finished or never if the task is None.
     782              : ///
     783              : /// Note: it accepts Option<&mut> instead of &mut Option<> because mapping the
     784              : /// option to get the latter is hard.
     785           98 : async fn await_task_finish<T>(option: Option<&mut JoinHandle<T>>) -> Result<T, JoinError> {
     786           93 :     if let Some(task) = option {
     787            0 :         task.await
     788              :     } else {
     789           93 :         futures::future::pending().await
     790              :     }
     791            0 : }
     792              : 
     793              : /// Update next_event if candidate is earlier.
     794            0 : fn update_next_event(next_event: &mut Option<Instant>, candidate: Instant) {
     795            0 :     if let Some(next) = next_event {
     796            0 :         if candidate < *next {
     797            0 :             *next = candidate;
     798            0 :         }
     799            0 :     } else {
     800            0 :         *next_event = Some(candidate);
     801            0 :     }
     802            0 : }
     803              : 
     804              : #[repr(usize)]
     805            0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
     806              : pub enum Status {
     807              :     NotStarted,
     808              :     Started,
     809              :     StateSnapshot,
     810              :     UpdateBackup,
     811              :     UpdateControlFile,
     812              :     UpdateWalRemoval,
     813              :     UpdatePartialBackup,
     814              :     EvictTimeline,
     815              :     Wait,
     816              :     HandleMessage,
     817              :     Exiting,
     818              :     Finished,
     819              : }
     820              : 
     821              : /// AtomicStatus is a wrapper around AtomicUsize adapted for the Status enum.
     822              : pub struct AtomicStatus {
     823              :     inner: AtomicUsize,
     824              : }
     825              : 
     826              : impl Default for AtomicStatus {
     827            0 :     fn default() -> Self {
     828            0 :         Self::new()
     829            0 :     }
     830              : }
     831              : 
     832              : impl AtomicStatus {
     833            5 :     pub fn new() -> Self {
     834            5 :         AtomicStatus {
     835            5 :             inner: AtomicUsize::new(Status::NotStarted as usize),
     836            5 :         }
     837            5 :     }
     838              : 
     839           10 :     pub fn load(&self, order: std::sync::atomic::Ordering) -> Status {
     840              :         // Safety: This line of code uses `std::mem::transmute` to reinterpret the loaded value as `Status`.
     841              :         // It is safe to use `transmute` in this context because `Status` is a repr(usize) enum,
     842              :         // which means it has the same memory layout as usize.
     843              :         // However, it is important to ensure that the loaded value is a valid variant of `Status`,
     844              :         // otherwise, the behavior will be undefined.
     845           10 :         unsafe { std::mem::transmute(self.inner.load(order)) }
     846           10 :     }
     847              : 
     848           10 :     pub fn get(&self) -> Status {
     849           10 :         self.load(std::sync::atomic::Ordering::Relaxed)
     850           10 :     }
     851              : 
     852          318 :     pub fn store(&self, val: Status, order: std::sync::atomic::Ordering) {
     853          318 :         self.inner.store(val as usize, order);
     854          318 :     }
     855              : }
        

Generated by: LCOV version 2.1-beta