TLA Line data Source code
1 : //! This module implements pulling WAL from peer safekeepers if compute can't
2 : //! provide it, i.e. safekeeper lags too much.
3 :
4 : use std::sync::Arc;
5 :
6 : use tokio::{select, time::sleep, time::Duration};
7 : use tracing::{info, instrument};
8 :
9 : use crate::{timeline::Timeline, SafeKeeperConf};
10 :
11 : /// Entrypoint for per timeline task which always runs, checking whether
12 : /// recovery for this safekeeper is needed and starting it if so.
13 CBC 1130 : #[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
14 : pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
15 565 : info!("started");
16 : let mut cancellation_rx = match tli.get_cancellation_rx() {
17 : Ok(rx) => rx,
18 : Err(_) => {
19 UBC 0 : info!("timeline canceled during task start");
20 : return;
21 : }
22 : };
23 :
24 CBC 3683 : select! {
25 3683 : _ = recovery_main_loop(tli) => { unreachable!() }
26 3683 : _ = cancellation_rx.changed() => {
27 3683 : info!("stopped");
28 3683 : }
29 3683 : }
30 : }
31 :
32 : const CHECK_INTERVAL_MS: u64 = 2000;
33 :
34 : /// Check regularly whether we need to start recovery.
35 565 : async fn recovery_main_loop(_tli: Arc<Timeline>) {
36 565 : let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
37 : loop {
38 3666 : sleep(check_duration).await;
39 : }
40 : }
|