Line data Source code
1 : //! Thread removing old WAL.
2 :
3 : use std::time::Duration;
4 :
5 : use tokio::time::sleep;
6 : use tracing::*;
7 :
8 : use crate::{GlobalTimelines, SafeKeeperConf};
9 :
10 : const ALLOW_INACTIVE_TIMELINES: bool = true;
11 :
12 510 : pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
13 510 : let wal_removal_interval = Duration::from_millis(5000);
14 : loop {
15 1962 : let now = tokio::time::Instant::now();
16 1962 : let mut active_timelines = 0;
17 1962 :
18 1962 : let tlis = GlobalTimelines::get_all();
19 3795 : for tli in &tlis {
20 1833 : let is_active = tli.is_active().await;
21 1833 : if is_active {
22 1746 : active_timelines += 1;
23 1746 : }
24 1833 : if !ALLOW_INACTIVE_TIMELINES && !is_active {
25 0 : continue;
26 1833 : }
27 1833 : let ttid = tli.ttid;
28 1833 : async {
29 1833 : if let Err(e) = tli.maybe_persist_control_file().await {
30 0 : warn!("failed to persist control file: {e}");
31 1833 : }
32 1833 : if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled).await {
33 0 : error!("failed to remove WAL: {}", e);
34 1833 : }
35 1833 : }
36 1833 : .instrument(info_span!("WAL removal", ttid = %ttid))
37 191 : .await;
38 : }
39 :
40 1962 : let elapsed = now.elapsed();
41 1962 : let total_timelines = tlis.len();
42 1962 :
43 1962 : if elapsed > wal_removal_interval {
44 0 : info!(
45 0 : "WAL removal is too long, processed {} active timelines ({} total) in {:?}",
46 0 : active_timelines, total_timelines, elapsed
47 0 : );
48 1962 : }
49 :
50 1962 : sleep(wal_removal_interval).await;
51 : }
52 : }
|