LCOV - code coverage report
Current view: top level - safekeeper/src - timeline_manager.rs (source / functions) Coverage Total Hit
Test: 12c2fc96834f59604b8ade5b9add28f1dce41ec6.info Lines: 0.0 % 294 0
Test Date: 2024-07-03 15:33:13 Functions: 0.0 % 51 0

            Line data    Source code
       1              : //! The timeline manager task is responsible for managing the timeline's background tasks.
       2              : //! It is spawned alongside each timeline and exits when the timeline is deleted.
       3              : //! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
       4              : //! It also can manage some reactive state, like should the timeline be active for broker pushes or not.
       5              : //!
       6              : //! Be aware that you need to be extra careful with manager code, because it is not respawned on panic.
       7              : //! Also, if it will stuck in some branch, it will prevent any further progress in the timeline.
       8              : 
       9              : use std::{
      10              :     sync::{atomic::AtomicUsize, Arc},
      11              :     time::Duration,
      12              : };
      13              : 
      14              : use postgres_ffi::XLogSegNo;
      15              : use serde::{Deserialize, Serialize};
      16              : use tokio::{
      17              :     task::{JoinError, JoinHandle},
      18              :     time::Instant,
      19              : };
      20              : use tracing::{debug, info, info_span, instrument, warn, Instrument};
      21              : use utils::lsn::Lsn;
      22              : 
      23              : use crate::{
      24              :     control_file::{FileStorage, Storage},
      25              :     metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS},
      26              :     recovery::recovery_main,
      27              :     remove_wal::calc_horizon_lsn,
      28              :     safekeeper::Term,
      29              :     send_wal::WalSenders,
      30              :     state::TimelineState,
      31              :     timeline::{ManagerTimeline, PeerInfo, ReadGuardSharedState, StateSK, WalResidentTimeline},
      32              :     timeline_guard::{AccessService, GuardId, ResidenceGuard},
      33              :     timelines_set::{TimelineSetGuard, TimelinesSet},
      34              :     wal_backup::{self, WalBackupTaskHandle},
      35              :     wal_backup_partial::{self, PartialRemoteSegment, RateLimiter},
      36              :     SafeKeeperConf,
      37              : };
      38              : 
      39              : pub(crate) struct StateSnapshot {
      40              :     // inmem values
      41              :     pub(crate) commit_lsn: Lsn,
      42              :     pub(crate) backup_lsn: Lsn,
      43              :     pub(crate) remote_consistent_lsn: Lsn,
      44              : 
      45              :     // persistent control file values
      46              :     pub(crate) cfile_peer_horizon_lsn: Lsn,
      47              :     pub(crate) cfile_remote_consistent_lsn: Lsn,
      48              :     pub(crate) cfile_backup_lsn: Lsn,
      49              : 
      50              :     // latest state
      51              :     pub(crate) flush_lsn: Lsn,
      52              :     pub(crate) last_log_term: Term,
      53              : 
      54              :     // misc
      55              :     pub(crate) cfile_last_persist_at: std::time::Instant,
      56              :     pub(crate) inmem_flush_pending: bool,
      57              :     pub(crate) wal_removal_on_hold: bool,
      58              :     pub(crate) peers: Vec<PeerInfo>,
      59              : }
      60              : 
      61              : impl StateSnapshot {
      62              :     /// Create a new snapshot of the timeline state.
      63            0 :     fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self {
      64            0 :         let state = read_guard.sk.state();
      65            0 :         Self {
      66            0 :             commit_lsn: state.inmem.commit_lsn,
      67            0 :             backup_lsn: state.inmem.backup_lsn,
      68            0 :             remote_consistent_lsn: state.inmem.remote_consistent_lsn,
      69            0 :             cfile_peer_horizon_lsn: state.peer_horizon_lsn,
      70            0 :             cfile_remote_consistent_lsn: state.remote_consistent_lsn,
      71            0 :             cfile_backup_lsn: state.backup_lsn,
      72            0 :             flush_lsn: read_guard.sk.flush_lsn(),
      73            0 :             last_log_term: read_guard.sk.last_log_term(),
      74            0 :             cfile_last_persist_at: state.pers.last_persist_at(),
      75            0 :             inmem_flush_pending: Self::has_unflushed_inmem_state(state),
      76            0 :             wal_removal_on_hold: read_guard.wal_removal_on_hold,
      77            0 :             peers: read_guard.get_peers(heartbeat_timeout),
      78            0 :         }
      79            0 :     }
      80              : 
      81            0 :     fn has_unflushed_inmem_state(state: &TimelineState<FileStorage>) -> bool {
      82            0 :         state.inmem.commit_lsn > state.commit_lsn
      83            0 :             || state.inmem.backup_lsn > state.backup_lsn
      84            0 :             || state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
      85            0 :             || state.inmem.remote_consistent_lsn > state.remote_consistent_lsn
      86            0 :     }
      87              : }
      88              : 
      89              : /// Control how often the manager task should wake up to check updates.
      90              : /// There is no need to check for updates more often than this.
      91              : const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
      92              : 
      93              : pub enum ManagerCtlMessage {
      94              :     /// Request to get a guard for WalResidentTimeline, with WAL files available locally.
      95              :     GuardRequest(tokio::sync::oneshot::Sender<anyhow::Result<ResidenceGuard>>),
      96              :     /// Request to drop the guard.
      97              :     GuardDrop(GuardId),
      98              : }
      99              : 
     100              : impl std::fmt::Debug for ManagerCtlMessage {
     101            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     102            0 :         match self {
     103            0 :             ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"),
     104            0 :             ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({:?})", id),
     105              :         }
     106            0 :     }
     107              : }
     108              : 
     109              : pub struct ManagerCtl {
     110              :     manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
     111              : 
     112              :     // this is used to initialize manager, it will be moved out in bootstrap().
     113              :     init_manager_rx:
     114              :         std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>>>,
     115              : }
     116              : 
     117              : impl Default for ManagerCtl {
     118            0 :     fn default() -> Self {
     119            0 :         Self::new()
     120            0 :     }
     121              : }
     122              : 
     123              : impl ManagerCtl {
     124            0 :     pub fn new() -> Self {
     125            0 :         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
     126            0 :         Self {
     127            0 :             manager_tx: tx,
     128            0 :             init_manager_rx: std::sync::Mutex::new(Some(rx)),
     129            0 :         }
     130            0 :     }
     131              : 
     132              :     /// Issue a new guard and wait for manager to prepare the timeline.
     133              :     /// Sends a message to the manager and waits for the response.
     134              :     /// Can be blocked indefinitely if the manager is stuck.
     135            0 :     pub async fn wal_residence_guard(&self) -> anyhow::Result<ResidenceGuard> {
     136            0 :         let (tx, rx) = tokio::sync::oneshot::channel();
     137            0 :         self.manager_tx.send(ManagerCtlMessage::GuardRequest(tx))?;
     138              : 
     139              :         // wait for the manager to respond with the guard
     140            0 :         rx.await
     141            0 :             .map_err(|e| anyhow::anyhow!("response read fail: {:?}", e))
     142            0 :             .and_then(std::convert::identity)
     143            0 :     }
     144              : 
     145              :     /// Must be called exactly once to bootstrap the manager.
     146            0 :     pub fn bootstrap_manager(
     147            0 :         &self,
     148            0 :     ) -> (
     149            0 :         tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
     150            0 :         tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
     151            0 :     ) {
     152            0 :         let rx = self
     153            0 :             .init_manager_rx
     154            0 :             .lock()
     155            0 :             .expect("mutex init_manager_rx poisoned")
     156            0 :             .take()
     157            0 :             .expect("manager already bootstrapped");
     158            0 : 
     159            0 :         (self.manager_tx.clone(), rx)
     160            0 :     }
     161              : }
     162              : 
     163              : pub(crate) struct Manager {
     164              :     // configuration & dependencies
     165              :     pub(crate) tli: ManagerTimeline,
     166              :     pub(crate) conf: SafeKeeperConf,
     167              :     pub(crate) wal_seg_size: usize,
     168              :     pub(crate) walsenders: Arc<WalSenders>,
     169              : 
     170              :     // current state
     171              :     pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,
     172              :     pub(crate) num_computes_rx: tokio::sync::watch::Receiver<usize>,
     173              :     pub(crate) tli_broker_active: TimelineSetGuard,
     174              :     pub(crate) last_removed_segno: XLogSegNo,
     175              :     pub(crate) is_offloaded: bool,
     176              : 
     177              :     // background tasks
     178              :     pub(crate) backup_task: Option<WalBackupTaskHandle>,
     179              :     pub(crate) recovery_task: Option<JoinHandle<()>>,
     180              :     pub(crate) wal_removal_task: Option<JoinHandle<anyhow::Result<u64>>>,
     181              : 
     182              :     // partial backup
     183              :     pub(crate) partial_backup_task: Option<JoinHandle<Option<PartialRemoteSegment>>>,
     184              :     pub(crate) partial_backup_uploaded: Option<PartialRemoteSegment>,
     185              : 
     186              :     // misc
     187              :     pub(crate) access_service: AccessService,
     188              :     pub(crate) partial_backup_rate_limiter: RateLimiter,
     189              : }
     190              : 
     191              : /// This task gets spawned alongside each timeline and is responsible for managing the timeline's
     192              : /// background tasks.
     193              : /// Be careful, this task is not respawned on panic, so it should not panic.
     194            0 : #[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))]
     195              : pub async fn main_task(
     196              :     tli: ManagerTimeline,
     197              :     conf: SafeKeeperConf,
     198              :     broker_active_set: Arc<TimelinesSet>,
     199              :     manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
     200              :     mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
     201              :     partial_backup_rate_limiter: RateLimiter,
     202              : ) {
     203              :     tli.set_status(Status::Started);
     204              : 
     205              :     let defer_tli = tli.tli.clone();
     206              :     scopeguard::defer! {
     207              :         if defer_tli.is_cancelled() {
     208              :             info!("manager task finished");
     209              :         } else {
     210              :             warn!("manager task finished prematurely");
     211              :         }
     212              :     };
     213              : 
     214              :     let mut mgr = Manager::new(
     215              :         tli,
     216              :         conf,
     217              :         broker_active_set,
     218              :         manager_tx,
     219              :         partial_backup_rate_limiter,
     220              :     )
     221              :     .await;
     222              : 
     223              :     // Start recovery task which always runs on the timeline.
     224              :     if !mgr.is_offloaded && mgr.conf.peer_recovery_enabled {
     225              :         let tli = mgr.wal_resident_timeline();
     226              :         mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone())));
     227              :     }
     228              : 
     229              :     let last_state = 'outer: loop {
     230              :         MANAGER_ITERATIONS_TOTAL.inc();
     231              : 
     232              :         mgr.set_status(Status::StateSnapshot);
     233              :         let state_snapshot = mgr.state_snapshot().await;
     234              : 
     235              :         let mut next_event: Option<Instant> = None;
     236              :         if !mgr.is_offloaded {
     237              :             let num_computes = *mgr.num_computes_rx.borrow();
     238              : 
     239              :             mgr.set_status(Status::UpdateBackup);
     240              :             let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await;
     241              :             mgr.update_is_active(is_wal_backup_required, num_computes, &state_snapshot);
     242              : 
     243              :             mgr.set_status(Status::UpdateControlFile);
     244              :             mgr.update_control_file_save(&state_snapshot, &mut next_event)
     245              :                 .await;
     246              : 
     247              :             mgr.set_status(Status::UpdateWalRemoval);
     248              :             mgr.update_wal_removal(&state_snapshot).await;
     249              : 
     250              :             mgr.set_status(Status::UpdatePartialBackup);
     251              :             mgr.update_partial_backup(&state_snapshot).await;
     252              : 
     253              :             if mgr.conf.enable_offload && mgr.ready_for_eviction(&next_event, &state_snapshot) {
     254              :                 mgr.set_status(Status::EvictTimeline);
     255              :                 mgr.evict_timeline().await;
     256              :             }
     257              :         }
     258              : 
     259              :         mgr.set_status(Status::Wait);
     260              :         // wait until something changes. tx channels are stored under Arc, so they will not be
     261              :         // dropped until the manager task is finished.
     262              :         tokio::select! {
     263              :             _ = mgr.tli.cancel.cancelled() => {
     264              :                 // timeline was deleted
     265              :                 break 'outer state_snapshot;
     266              :             }
     267            0 :             _ = async {
     268            0 :                 // don't wake up on every state change, but at most every REFRESH_INTERVAL
     269            0 :                 tokio::time::sleep(REFRESH_INTERVAL).await;
     270            0 :                 let _ = mgr.state_version_rx.changed().await;
     271            0 :             } => {
     272              :                 // state was updated
     273              :             }
     274              :             _ = mgr.num_computes_rx.changed() => {
     275              :                 // number of connected computes was updated
     276              :             }
     277              :             _ = sleep_until(&next_event) => {
     278              :                 // we were waiting for some event (e.g. cfile save)
     279              :             }
     280              :             res = await_task_finish(&mut mgr.wal_removal_task) => {
     281              :                 // WAL removal task finished
     282              :                 mgr.wal_removal_task = None;
     283              :                 mgr.update_wal_removal_end(res);
     284              :             }
     285              :             res = await_task_finish(&mut mgr.partial_backup_task) => {
     286              :                 // partial backup task finished
     287              :                 mgr.partial_backup_task = None;
     288              :                 mgr.update_partial_backup_end(res);
     289              :             }
     290              : 
     291              :             msg = manager_rx.recv() => {
     292              :                 mgr.set_status(Status::HandleMessage);
     293              :                 mgr.handle_message(msg).await;
     294              :             }
     295              :         }
     296              :     };
     297              :     mgr.set_status(Status::Exiting);
     298              : 
     299              :     // remove timeline from the broker active set sooner, before waiting for background tasks
     300              :     mgr.tli_broker_active.set(false);
     301              : 
     302              :     // shutdown background tasks
     303              :     if mgr.conf.is_wal_backup_enabled() {
     304              :         wal_backup::update_task(&mut mgr, false, &last_state).await;
     305              :     }
     306              : 
     307              :     if let Some(recovery_task) = &mut mgr.recovery_task {
     308              :         if let Err(e) = recovery_task.await {
     309              :             warn!("recovery task failed: {:?}", e);
     310              :         }
     311              :     }
     312              : 
     313              :     if let Some(partial_backup_task) = &mut mgr.partial_backup_task {
     314              :         if let Err(e) = partial_backup_task.await {
     315              :             warn!("partial backup task failed: {:?}", e);
     316              :         }
     317              :     }
     318              : 
     319              :     if let Some(wal_removal_task) = &mut mgr.wal_removal_task {
     320              :         let res = wal_removal_task.await;
     321              :         mgr.update_wal_removal_end(res);
     322              :     }
     323              : 
     324              :     mgr.set_status(Status::Finished);
     325              : }
     326              : 
     327              : impl Manager {
     328            0 :     async fn new(
     329            0 :         tli: ManagerTimeline,
     330            0 :         conf: SafeKeeperConf,
     331            0 :         broker_active_set: Arc<TimelinesSet>,
     332            0 :         manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
     333            0 :         partial_backup_rate_limiter: RateLimiter,
     334            0 :     ) -> Manager {
     335            0 :         let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
     336              :         Manager {
     337            0 :             conf,
     338            0 :             wal_seg_size: tli.get_wal_seg_size().await,
     339            0 :             walsenders: tli.get_walsenders().clone(),
     340            0 :             state_version_rx: tli.get_state_version_rx(),
     341            0 :             num_computes_rx: tli.get_walreceivers().get_num_rx(),
     342            0 :             tli_broker_active: broker_active_set.guard(tli.clone()),
     343            0 :             last_removed_segno: 0,
     344            0 :             is_offloaded,
     345            0 :             backup_task: None,
     346            0 :             recovery_task: None,
     347            0 :             wal_removal_task: None,
     348            0 :             partial_backup_task: None,
     349            0 :             partial_backup_uploaded,
     350            0 :             access_service: AccessService::new(manager_tx),
     351            0 :             tli,
     352            0 :             partial_backup_rate_limiter,
     353            0 :         }
     354            0 :     }
     355              : 
     356            0 :     fn set_status(&self, status: Status) {
     357            0 :         self.tli.set_status(status);
     358            0 :     }
     359              : 
     360              :     /// Get a WalResidentTimeline.
     361              :     /// Manager code must use this function instead of one from `Timeline`
     362              :     /// directly, because it will deadlock.
     363            0 :     pub(crate) fn wal_resident_timeline(&mut self) -> WalResidentTimeline {
     364            0 :         assert!(!self.is_offloaded);
     365            0 :         let guard = self.access_service.create_guard();
     366            0 :         WalResidentTimeline::new(self.tli.clone(), guard)
     367            0 :     }
     368              : 
     369              :     /// Get a snapshot of the timeline state.
     370            0 :     async fn state_snapshot(&self) -> StateSnapshot {
     371            0 :         let _timer = MISC_OPERATION_SECONDS
     372            0 :             .with_label_values(&["state_snapshot"])
     373            0 :             .start_timer();
     374            0 : 
     375            0 :         StateSnapshot::new(
     376            0 :             self.tli.read_shared_state().await,
     377            0 :             self.conf.heartbeat_timeout,
     378            0 :         )
     379            0 :     }
     380              : 
     381              :     /// Spawns/kills backup task and returns true if backup is required.
     382            0 :     async fn update_backup(&mut self, num_computes: usize, state: &StateSnapshot) -> bool {
     383            0 :         let is_wal_backup_required =
     384            0 :             wal_backup::is_wal_backup_required(self.wal_seg_size, num_computes, state);
     385            0 : 
     386            0 :         if self.conf.is_wal_backup_enabled() {
     387            0 :             wal_backup::update_task(self, is_wal_backup_required, state).await;
     388            0 :         }
     389              : 
     390              :         // update the state in Arc<Timeline>
     391            0 :         self.tli.wal_backup_active.store(
     392            0 :             self.backup_task.is_some(),
     393            0 :             std::sync::atomic::Ordering::Relaxed,
     394            0 :         );
     395            0 :         is_wal_backup_required
     396            0 :     }
     397              : 
     398              :     /// Update is_active flag and returns its value.
     399            0 :     fn update_is_active(
     400            0 :         &mut self,
     401            0 :         is_wal_backup_required: bool,
     402            0 :         num_computes: usize,
     403            0 :         state: &StateSnapshot,
     404            0 :     ) {
     405            0 :         let is_active = is_wal_backup_required
     406            0 :             || num_computes > 0
     407            0 :             || state.remote_consistent_lsn < state.commit_lsn;
     408              : 
     409              :         // update the broker timeline set
     410            0 :         if self.tli_broker_active.set(is_active) {
     411              :             // write log if state has changed
     412            0 :             info!(
     413            0 :                 "timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
     414              :                 is_active, state.remote_consistent_lsn, state.commit_lsn,
     415              :             );
     416              : 
     417            0 :             MANAGER_ACTIVE_CHANGES.inc();
     418            0 :         }
     419              : 
     420              :         // update the state in Arc<Timeline>
     421            0 :         self.tli
     422            0 :             .broker_active
     423            0 :             .store(is_active, std::sync::atomic::Ordering::Relaxed);
     424            0 :     }
     425              : 
     426              :     /// Save control file if needed. Returns Instant if we should persist the control file in the future.
     427            0 :     async fn update_control_file_save(
     428            0 :         &self,
     429            0 :         state: &StateSnapshot,
     430            0 :         next_event: &mut Option<Instant>,
     431            0 :     ) {
     432            0 :         if !state.inmem_flush_pending {
     433            0 :             return;
     434            0 :         }
     435            0 : 
     436            0 :         if state.cfile_last_persist_at.elapsed() > self.conf.control_file_save_interval {
     437            0 :             let mut write_guard = self.tli.write_shared_state().await;
     438              :             // it should be done in the background because it blocks manager task, but flush() should
     439              :             // be fast enough not to be a problem now
     440            0 :             if let Err(e) = write_guard.sk.state_mut().flush().await {
     441            0 :                 warn!("failed to save control file: {:?}", e);
     442            0 :             }
     443            0 :         } else {
     444            0 :             // we should wait until some time passed until the next save
     445            0 :             update_next_event(
     446            0 :                 next_event,
     447            0 :                 (state.cfile_last_persist_at + self.conf.control_file_save_interval).into(),
     448            0 :             );
     449            0 :         }
     450            0 :     }
     451              : 
     452              :     /// Spawns WAL removal task if needed.
     453            0 :     async fn update_wal_removal(&mut self, state: &StateSnapshot) {
     454            0 :         if self.wal_removal_task.is_some() || state.wal_removal_on_hold {
     455              :             // WAL removal is already in progress or hold off
     456            0 :             return;
     457            0 :         }
     458              : 
     459              :         // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
     460              :         // This allows to get better read speed for pageservers that are lagging behind,
     461              :         // at the cost of keeping more WAL on disk.
     462            0 :         let replication_horizon_lsn = if self.conf.walsenders_keep_horizon {
     463            0 :             self.walsenders.laggard_lsn()
     464              :         } else {
     465            0 :             None
     466              :         };
     467              : 
     468            0 :         let removal_horizon_lsn = calc_horizon_lsn(state, replication_horizon_lsn);
     469            0 :         let removal_horizon_segno = removal_horizon_lsn
     470            0 :             .segment_number(self.wal_seg_size)
     471            0 :             .saturating_sub(1);
     472            0 : 
     473            0 :         if removal_horizon_segno > self.last_removed_segno {
     474              :             // we need to remove WAL
     475            0 :             let remover = match self.tli.read_shared_state().await.sk {
     476            0 :                 StateSK::Loaded(ref sk) => {
     477            0 :                     crate::wal_storage::Storage::remove_up_to(&sk.wal_store, removal_horizon_segno)
     478              :                 }
     479              :                 StateSK::Offloaded(_) => {
     480              :                     // we can't remove WAL if it's not loaded
     481            0 :                     warn!("unexpectedly trying to run WAL removal on offloaded timeline");
     482            0 :                     return;
     483              :                 }
     484            0 :                 StateSK::Empty => unreachable!(),
     485              :             };
     486              : 
     487            0 :             self.wal_removal_task = Some(tokio::spawn(
     488            0 :                 async move {
     489            0 :                     remover.await?;
     490            0 :                     Ok(removal_horizon_segno)
     491            0 :                 }
     492            0 :                 .instrument(info_span!("WAL removal", ttid=%self.tli.ttid)),
     493              :             ));
     494            0 :         }
     495            0 :     }
     496              : 
     497              :     /// Update the state after WAL removal task finished.
     498            0 :     fn update_wal_removal_end(&mut self, res: Result<anyhow::Result<u64>, JoinError>) {
     499            0 :         let new_last_removed_segno = match res {
     500            0 :             Ok(Ok(segno)) => segno,
     501            0 :             Err(e) => {
     502            0 :                 warn!("WAL removal task failed: {:?}", e);
     503            0 :                 return;
     504              :             }
     505            0 :             Ok(Err(e)) => {
     506            0 :                 warn!("WAL removal task failed: {:?}", e);
     507            0 :                 return;
     508              :             }
     509              :         };
     510              : 
     511            0 :         self.last_removed_segno = new_last_removed_segno;
     512            0 :         // update the state in Arc<Timeline>
     513            0 :         self.tli
     514            0 :             .last_removed_segno
     515            0 :             .store(new_last_removed_segno, std::sync::atomic::Ordering::Relaxed);
     516            0 :     }
     517              : 
     518              :     /// Spawns partial WAL backup task if needed.
     519            0 :     async fn update_partial_backup(&mut self, state: &StateSnapshot) {
     520            0 :         // check if partial backup is enabled and should be started
     521            0 :         if !self.conf.is_wal_backup_enabled() || !self.conf.partial_backup_enabled {
     522            0 :             return;
     523            0 :         }
     524            0 : 
     525            0 :         if self.partial_backup_task.is_some() {
     526              :             // partial backup is already running
     527            0 :             return;
     528            0 :         }
     529            0 : 
     530            0 :         if !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded) {
     531              :             // nothing to upload
     532            0 :             return;
     533            0 :         }
     534            0 : 
     535            0 :         // Get WalResidentTimeline and start partial backup task.
     536            0 :         self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
     537            0 :             self.wal_resident_timeline(),
     538            0 :             self.conf.clone(),
     539            0 :             self.partial_backup_rate_limiter.clone(),
     540            0 :         )));
     541            0 :     }
     542              : 
     543              :     /// Update the state after partial WAL backup task finished.
     544            0 :     fn update_partial_backup_end(&mut self, res: Result<Option<PartialRemoteSegment>, JoinError>) {
     545            0 :         match res {
     546            0 :             Ok(new_upload_state) => {
     547            0 :                 self.partial_backup_uploaded = new_upload_state;
     548            0 :             }
     549            0 :             Err(e) => {
     550            0 :                 warn!("partial backup task panicked: {:?}", e);
     551              :             }
     552              :         }
     553            0 :     }
     554              : 
     555              :     /// Handle message arrived from ManagerCtl.
     556            0 :     async fn handle_message(&mut self, msg: Option<ManagerCtlMessage>) {
     557            0 :         debug!("received manager message: {:?}", msg);
     558            0 :         match msg {
     559            0 :             Some(ManagerCtlMessage::GuardRequest(tx)) => {
     560            0 :                 if self.is_offloaded {
     561              :                     // trying to unevict timeline, but without gurarantee that it will be successful
     562            0 :                     self.unevict_timeline().await;
     563            0 :                 }
     564              : 
     565            0 :                 let guard = if self.is_offloaded {
     566            0 :                     Err(anyhow::anyhow!("timeline is offloaded, can't get a guard"))
     567              :                 } else {
     568            0 :                     Ok(self.access_service.create_guard())
     569              :                 };
     570              : 
     571            0 :                 if tx.send(guard).is_err() {
     572            0 :                     warn!("failed to reply with a guard, receiver dropped");
     573            0 :                 }
     574              :             }
     575            0 :             Some(ManagerCtlMessage::GuardDrop(guard_id)) => {
     576            0 :                 self.access_service.drop_guard(guard_id);
     577            0 :             }
     578              :             None => {
     579              :                 // can't happen, we're holding the sender
     580            0 :                 unreachable!();
     581              :             }
     582              :         }
     583            0 :     }
     584              : }
     585              : 
     586              : // utility functions
     587            0 : async fn sleep_until(option: &Option<tokio::time::Instant>) {
     588            0 :     if let Some(timeout) = option {
     589            0 :         tokio::time::sleep_until(*timeout).await;
     590              :     } else {
     591            0 :         futures::future::pending::<()>().await;
     592              :     }
     593            0 : }
     594              : 
     595            0 : async fn await_task_finish<T>(option: &mut Option<JoinHandle<T>>) -> Result<T, JoinError> {
     596            0 :     if let Some(task) = option {
     597            0 :         task.await
     598              :     } else {
     599            0 :         futures::future::pending().await
     600              :     }
     601            0 : }
     602              : 
     603              : /// Update next_event if candidate is earlier.
     604            0 : fn update_next_event(next_event: &mut Option<Instant>, candidate: Instant) {
     605            0 :     if let Some(next) = next_event {
     606            0 :         if candidate < *next {
     607            0 :             *next = candidate;
     608            0 :         }
     609            0 :     } else {
     610            0 :         *next_event = Some(candidate);
     611            0 :     }
     612            0 : }
     613              : 
     614              : #[repr(usize)]
     615            0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
     616              : pub enum Status {
     617              :     NotStarted,
     618              :     Started,
     619              :     StateSnapshot,
     620              :     UpdateBackup,
     621              :     UpdateControlFile,
     622              :     UpdateWalRemoval,
     623              :     UpdatePartialBackup,
     624              :     EvictTimeline,
     625              :     Wait,
     626              :     HandleMessage,
     627              :     Exiting,
     628              :     Finished,
     629              : }
     630              : 
     631              : /// AtomicStatus is a wrapper around AtomicUsize adapted for the Status enum.
     632              : pub struct AtomicStatus {
     633              :     inner: AtomicUsize,
     634              : }
     635              : 
     636              : impl Default for AtomicStatus {
     637            0 :     fn default() -> Self {
     638            0 :         Self::new()
     639            0 :     }
     640              : }
     641              : 
     642              : impl AtomicStatus {
     643            0 :     pub fn new() -> Self {
     644            0 :         AtomicStatus {
     645            0 :             inner: AtomicUsize::new(Status::NotStarted as usize),
     646            0 :         }
     647            0 :     }
     648              : 
     649            0 :     pub fn load(&self, order: std::sync::atomic::Ordering) -> Status {
     650            0 :         // Safety: This line of code uses `std::mem::transmute` to reinterpret the loaded value as `Status`.
     651            0 :         // It is safe to use `transmute` in this context because `Status` is a repr(usize) enum,
     652            0 :         // which means it has the same memory layout as usize.
     653            0 :         // However, it is important to ensure that the loaded value is a valid variant of `Status`,
     654            0 :         // otherwise, the behavior will be undefined.
     655            0 :         unsafe { std::mem::transmute(self.inner.load(order)) }
     656            0 :     }
     657              : 
     658            0 :     pub fn get(&self) -> Status {
     659            0 :         self.load(std::sync::atomic::Ordering::Relaxed)
     660            0 :     }
     661              : 
     662            0 :     pub fn store(&self, val: Status, order: std::sync::atomic::Ordering) {
     663            0 :         self.inner.store(val as usize, order);
     664            0 :     }
     665              : }
        

Generated by: LCOV version 2.1-beta