|             Line data    Source code 
       1              : //! The timeline manager task is responsible for managing the timeline's background tasks.
       2              : //! It is spawned alongside each timeline and exits when the timeline is deleted.
       3              : //! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
       4              : //! It also can manage some reactive state, like should the timeline be active for broker pushes or not.
       5              : 
       6              : use std::{
       7              :     sync::Arc,
       8              :     time::{Duration, Instant},
       9              : };
      10              : 
      11              : use postgres_ffi::XLogSegNo;
      12              : use tokio::task::{JoinError, JoinHandle};
      13              : use tracing::{info, info_span, instrument, warn, Instrument};
      14              : use utils::lsn::Lsn;
      15              : 
      16              : use crate::{
      17              :     control_file::Storage,
      18              :     metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL},
      19              :     recovery::recovery_main,
      20              :     remove_wal::calc_horizon_lsn,
      21              :     send_wal::WalSenders,
      22              :     timeline::{PeerInfo, ReadGuardSharedState, Timeline},
      23              :     timelines_set::{TimelineSetGuard, TimelinesSet},
      24              :     wal_backup::{self, WalBackupTaskHandle},
      25              :     wal_backup_partial, SafeKeeperConf,
      26              : };
      27              : 
      28              : pub struct StateSnapshot {
      29              :     // inmem values
      30              :     pub commit_lsn: Lsn,
      31              :     pub backup_lsn: Lsn,
      32              :     pub remote_consistent_lsn: Lsn,
      33              : 
      34              :     // persistent control file values
      35              :     pub cfile_peer_horizon_lsn: Lsn,
      36              :     pub cfile_remote_consistent_lsn: Lsn,
      37              :     pub cfile_backup_lsn: Lsn,
      38              : 
      39              :     // misc
      40              :     pub cfile_last_persist_at: Instant,
      41              :     pub inmem_flush_pending: bool,
      42              :     pub wal_removal_on_hold: bool,
      43              :     pub peers: Vec<PeerInfo>,
      44              : }
      45              : 
      46              : impl StateSnapshot {
      47              :     /// Create a new snapshot of the timeline state.
      48            0 :     fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self {
      49            0 :         Self {
      50            0 :             commit_lsn: read_guard.sk.state.inmem.commit_lsn,
      51            0 :             backup_lsn: read_guard.sk.state.inmem.backup_lsn,
      52            0 :             remote_consistent_lsn: read_guard.sk.state.inmem.remote_consistent_lsn,
      53            0 :             cfile_peer_horizon_lsn: read_guard.sk.state.peer_horizon_lsn,
      54            0 :             cfile_remote_consistent_lsn: read_guard.sk.state.remote_consistent_lsn,
      55            0 :             cfile_backup_lsn: read_guard.sk.state.backup_lsn,
      56            0 :             cfile_last_persist_at: read_guard.sk.state.pers.last_persist_at(),
      57            0 :             inmem_flush_pending: Self::has_unflushed_inmem_state(&read_guard),
      58            0 :             wal_removal_on_hold: read_guard.wal_removal_on_hold,
      59            0 :             peers: read_guard.get_peers(heartbeat_timeout),
      60            0 :         }
      61            0 :     }
      62              : 
      63            0 :     fn has_unflushed_inmem_state(read_guard: &ReadGuardSharedState) -> bool {
      64            0 :         let state = &read_guard.sk.state;
      65            0 :         state.inmem.commit_lsn > state.commit_lsn
      66            0 :             || state.inmem.backup_lsn > state.backup_lsn
      67            0 :             || state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
      68            0 :             || state.inmem.remote_consistent_lsn > state.remote_consistent_lsn
      69            0 :     }
      70              : }
      71              : 
      72              : /// Control how often the manager task should wake up to check updates.
      73              : /// There is no need to check for updates more often than this.
      74              : const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
      75              : 
      76              : /// How often to save the control file if the is no other activity.
      77              : const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
      78              : 
      79              : /// This task gets spawned alongside each timeline and is responsible for managing the timeline's
      80              : /// background tasks.
      81              : /// Be careful, this task is not respawned on panic, so it should not panic.
      82            0 : #[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))]
      83              : pub async fn main_task(
      84              :     tli: Arc<Timeline>,
      85              :     conf: SafeKeeperConf,
      86              :     broker_active_set: Arc<TimelinesSet>,
      87              : ) {
      88              :     scopeguard::defer! {
      89              :         if tli.is_cancelled() {
      90              :             info!("manager task finished");
      91              :         } else {
      92              :             warn!("manager task finished prematurely");
      93              :         }
      94              :     };
      95              : 
      96              :     // configuration & dependencies
      97              :     let wal_seg_size = tli.get_wal_seg_size().await;
      98              :     let heartbeat_timeout = conf.heartbeat_timeout;
      99              :     let walsenders = tli.get_walsenders();
     100              :     let walreceivers = tli.get_walreceivers();
     101              : 
     102              :     // current state
     103              :     let mut state_version_rx = tli.get_state_version_rx();
     104              :     let mut num_computes_rx = walreceivers.get_num_rx();
     105              :     let mut tli_broker_active = broker_active_set.guard(tli.clone());
     106              :     let mut last_removed_segno = 0 as XLogSegNo;
     107              : 
     108              :     // list of background tasks
     109              :     let mut backup_task: Option<WalBackupTaskHandle> = None;
     110              :     let mut recovery_task: Option<JoinHandle<()>> = None;
     111              :     let mut partial_backup_task: Option<JoinHandle<()>> = None;
     112              :     let mut wal_removal_task: Option<JoinHandle<anyhow::Result<u64>>> = None;
     113              : 
     114              :     // Start recovery task which always runs on the timeline.
     115              :     if conf.peer_recovery_enabled {
     116              :         match tli.full_access_guard().await {
     117              :             Ok(tli) => {
     118              :                 recovery_task = Some(tokio::spawn(recovery_main(tli, conf.clone())));
     119              :             }
     120              :             Err(e) => {
     121              :                 warn!("failed to start recovery task: {:?}", e);
     122              :             }
     123              :         }
     124              :     }
     125              : 
     126              :     // Start partial backup task which always runs on the timeline.
     127              :     if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
     128              :         match tli.full_access_guard().await {
     129              :             Ok(tli) => {
     130              :                 partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
     131              :                     tli,
     132              :                     conf.clone(),
     133              :                 )));
     134              :             }
     135              :             Err(e) => {
     136              :                 warn!("failed to start partial backup task: {:?}", e);
     137              :             }
     138              :         }
     139              :     }
     140              : 
     141              :     let last_state = 'outer: loop {
     142              :         MANAGER_ITERATIONS_TOTAL.inc();
     143              : 
     144              :         let state_snapshot = StateSnapshot::new(tli.read_shared_state().await, heartbeat_timeout);
     145              :         let num_computes = *num_computes_rx.borrow();
     146              : 
     147              :         let is_wal_backup_required = update_backup(
     148              :             &conf,
     149              :             &tli,
     150              :             wal_seg_size,
     151              :             num_computes,
     152              :             &state_snapshot,
     153              :             &mut backup_task,
     154              :         )
     155              :         .await;
     156              : 
     157              :         let _is_active = update_is_active(
     158              :             is_wal_backup_required,
     159              :             num_computes,
     160              :             &state_snapshot,
     161              :             &mut tli_broker_active,
     162              :             &tli,
     163              :         );
     164              : 
     165              :         let next_cfile_save = update_control_file_save(&state_snapshot, &tli).await;
     166              : 
     167              :         update_wal_removal(
     168              :             &conf,
     169              :             walsenders,
     170              :             &tli,
     171              :             wal_seg_size,
     172              :             &state_snapshot,
     173              :             last_removed_segno,
     174              :             &mut wal_removal_task,
     175              :         )
     176              :         .await;
     177              : 
     178              :         // wait until something changes. tx channels are stored under Arc, so they will not be
     179              :         // dropped until the manager task is finished.
     180              :         tokio::select! {
     181              :             _ = tli.cancel.cancelled() => {
     182              :                 // timeline was deleted
     183              :                 break 'outer state_snapshot;
     184              :             }
     185            0 :             _ = async {
     186            0 :                 // don't wake up on every state change, but at most every REFRESH_INTERVAL
     187            0 :                 tokio::time::sleep(REFRESH_INTERVAL).await;
     188            0 :                 let _ = state_version_rx.changed().await;
     189            0 :             } => {
     190              :                 // state was updated
     191              :             }
     192              :             _ = num_computes_rx.changed() => {
     193              :                 // number of connected computes was updated
     194              :             }
     195            0 :             _ = async {
     196            0 :                 if let Some(timeout) = next_cfile_save {
     197            0 :                     tokio::time::sleep_until(timeout).await
     198              :                 } else {
     199            0 :                     futures::future::pending().await
     200              :                 }
     201            0 :             } => {
     202              :                 // it's time to save the control file
     203              :             }
     204            0 :             res = async {
     205            0 :                 if let Some(task) = &mut wal_removal_task {
     206            0 :                     task.await
     207              :                 } else {
     208            0 :                     futures::future::pending().await
     209              :                 }
     210            0 :             } => {
     211              :                 // WAL removal task finished
     212              :                 wal_removal_task = None;
     213              :                 update_wal_removal_end(res, &tli, &mut last_removed_segno);
     214              :             }
     215              :         }
     216              :     };
     217              : 
     218              :     // remove timeline from the broker active set sooner, before waiting for background tasks
     219              :     tli_broker_active.set(false);
     220              : 
     221              :     // shutdown background tasks
     222              :     if conf.is_wal_backup_enabled() {
     223              :         wal_backup::update_task(&conf, &tli, false, &last_state, &mut backup_task).await;
     224              :     }
     225              : 
     226              :     if let Some(recovery_task) = recovery_task {
     227              :         if let Err(e) = recovery_task.await {
     228              :             warn!("recovery task failed: {:?}", e);
     229              :         }
     230              :     }
     231              : 
     232              :     if let Some(partial_backup_task) = partial_backup_task {
     233              :         if let Err(e) = partial_backup_task.await {
     234              :             warn!("partial backup task failed: {:?}", e);
     235              :         }
     236              :     }
     237              : 
     238              :     if let Some(wal_removal_task) = wal_removal_task {
     239              :         let res = wal_removal_task.await;
     240              :         update_wal_removal_end(res, &tli, &mut last_removed_segno);
     241              :     }
     242              : }
     243              : 
     244              : /// Spawns/kills backup task and returns true if backup is required.
     245            0 : async fn update_backup(
     246            0 :     conf: &SafeKeeperConf,
     247            0 :     tli: &Arc<Timeline>,
     248            0 :     wal_seg_size: usize,
     249            0 :     num_computes: usize,
     250            0 :     state: &StateSnapshot,
     251            0 :     backup_task: &mut Option<WalBackupTaskHandle>,
     252            0 : ) -> bool {
     253            0 :     let is_wal_backup_required =
     254            0 :         wal_backup::is_wal_backup_required(wal_seg_size, num_computes, state);
     255            0 : 
     256            0 :     if conf.is_wal_backup_enabled() {
     257            0 :         wal_backup::update_task(conf, tli, is_wal_backup_required, state, backup_task).await;
     258            0 :     }
     259              : 
     260              :     // update the state in Arc<Timeline>
     261            0 :     tli.wal_backup_active
     262            0 :         .store(backup_task.is_some(), std::sync::atomic::Ordering::Relaxed);
     263            0 :     is_wal_backup_required
     264            0 : }
     265              : 
     266              : /// Update is_active flag and returns its value.
     267            0 : fn update_is_active(
     268            0 :     is_wal_backup_required: bool,
     269            0 :     num_computes: usize,
     270            0 :     state: &StateSnapshot,
     271            0 :     tli_broker_active: &mut TimelineSetGuard,
     272            0 :     tli: &Arc<Timeline>,
     273            0 : ) -> bool {
     274            0 :     let is_active = is_wal_backup_required
     275            0 :         || num_computes > 0
     276            0 :         || state.remote_consistent_lsn < state.commit_lsn;
     277              : 
     278              :     // update the broker timeline set
     279            0 :     if tli_broker_active.set(is_active) {
     280              :         // write log if state has changed
     281            0 :         info!(
     282            0 :             "timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
     283              :             is_active, state.remote_consistent_lsn, state.commit_lsn,
     284              :         );
     285              : 
     286            0 :         MANAGER_ACTIVE_CHANGES.inc();
     287            0 :     }
     288              : 
     289              :     // update the state in Arc<Timeline>
     290            0 :     tli.broker_active
     291            0 :         .store(is_active, std::sync::atomic::Ordering::Relaxed);
     292            0 :     is_active
     293            0 : }
     294              : 
     295              : /// Save control file if needed. Returns Instant if we should persist the control file in the future.
     296            0 : async fn update_control_file_save(
     297            0 :     state: &StateSnapshot,
     298            0 :     tli: &Arc<Timeline>,
     299            0 : ) -> Option<tokio::time::Instant> {
     300            0 :     if !state.inmem_flush_pending {
     301            0 :         return None;
     302            0 :     }
     303            0 : 
     304            0 :     if state.cfile_last_persist_at.elapsed() > CF_SAVE_INTERVAL {
     305            0 :         let mut write_guard = tli.write_shared_state().await;
     306              :         // this can be done in the background because it blocks manager task, but flush() should
     307              :         // be fast enough not to be a problem now
     308            0 :         if let Err(e) = write_guard.sk.state.flush().await {
     309            0 :             warn!("failed to save control file: {:?}", e);
     310            0 :         }
     311              : 
     312            0 :         None
     313              :     } else {
     314              :         // we should wait until next CF_SAVE_INTERVAL
     315            0 :         Some((state.cfile_last_persist_at + CF_SAVE_INTERVAL).into())
     316              :     }
     317            0 : }
     318              : 
     319              : /// Spawns WAL removal task if needed.
     320            0 : async fn update_wal_removal(
     321            0 :     conf: &SafeKeeperConf,
     322            0 :     walsenders: &Arc<WalSenders>,
     323            0 :     tli: &Arc<Timeline>,
     324            0 :     wal_seg_size: usize,
     325            0 :     state: &StateSnapshot,
     326            0 :     last_removed_segno: u64,
     327            0 :     wal_removal_task: &mut Option<JoinHandle<anyhow::Result<u64>>>,
     328            0 : ) {
     329            0 :     if wal_removal_task.is_some() || state.wal_removal_on_hold {
     330              :         // WAL removal is already in progress or hold off
     331            0 :         return;
     332            0 :     }
     333              : 
     334              :     // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
     335              :     // This allows to get better read speed for pageservers that are lagging behind,
     336              :     // at the cost of keeping more WAL on disk.
     337            0 :     let replication_horizon_lsn = if conf.walsenders_keep_horizon {
     338            0 :         walsenders.laggard_lsn()
     339              :     } else {
     340            0 :         None
     341              :     };
     342              : 
     343            0 :     let removal_horizon_lsn = calc_horizon_lsn(state, replication_horizon_lsn);
     344            0 :     let removal_horizon_segno = removal_horizon_lsn
     345            0 :         .segment_number(wal_seg_size)
     346            0 :         .saturating_sub(1);
     347            0 : 
     348            0 :     if removal_horizon_segno > last_removed_segno {
     349              :         // we need to remove WAL
     350            0 :         let remover = crate::wal_storage::Storage::remove_up_to(
     351            0 :             &tli.read_shared_state().await.sk.wal_store,
     352            0 :             removal_horizon_segno,
     353              :         );
     354            0 :         *wal_removal_task = Some(tokio::spawn(
     355            0 :             async move {
     356            0 :                 remover.await?;
     357            0 :                 Ok(removal_horizon_segno)
     358            0 :             }
     359            0 :             .instrument(info_span!("WAL removal", ttid=%tli.ttid)),
     360              :         ));
     361            0 :     }
     362            0 : }
     363              : 
     364              : /// Update the state after WAL removal task finished.
     365            0 : fn update_wal_removal_end(
     366            0 :     res: Result<anyhow::Result<u64>, JoinError>,
     367            0 :     tli: &Arc<Timeline>,
     368            0 :     last_removed_segno: &mut u64,
     369            0 : ) {
     370            0 :     let new_last_removed_segno = match res {
     371            0 :         Ok(Ok(segno)) => segno,
     372            0 :         Err(e) => {
     373            0 :             warn!("WAL removal task failed: {:?}", e);
     374            0 :             return;
     375              :         }
     376            0 :         Ok(Err(e)) => {
     377            0 :             warn!("WAL removal task failed: {:?}", e);
     378            0 :             return;
     379              :         }
     380              :     };
     381              : 
     382            0 :     *last_removed_segno = new_last_removed_segno;
     383            0 :     // update the state in Arc<Timeline>
     384            0 :     tli.last_removed_segno
     385            0 :         .store(new_last_removed_segno, std::sync::atomic::Ordering::Relaxed);
     386            0 : }
         |