Line data Source code
1 : //! Thread removing old WAL.
2 :
3 : use std::time::Duration;
4 :
5 : use tokio::time::sleep;
6 : use tracing::*;
7 :
8 : use crate::{GlobalTimelines, SafeKeeperConf};
9 :
10 : const ALLOW_INACTIVE_TIMELINES: bool = true;
11 :
12 0 : pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
13 0 : let wal_removal_interval = Duration::from_millis(5000);
14 : loop {
15 0 : let now = tokio::time::Instant::now();
16 0 : let mut active_timelines = 0;
17 0 :
18 0 : let tlis = GlobalTimelines::get_all();
19 0 : for tli in &tlis {
20 0 : let is_active = tli.is_active().await;
21 0 : if is_active {
22 0 : active_timelines += 1;
23 0 : }
24 0 : if !ALLOW_INACTIVE_TIMELINES && !is_active {
25 0 : continue;
26 0 : }
27 0 : let ttid = tli.ttid;
28 0 : async {
29 0 : if let Err(e) = tli.maybe_persist_control_file().await {
30 0 : warn!("failed to persist control file: {e}");
31 0 : }
32 0 : if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled).await {
33 0 : error!("failed to remove WAL: {}", e);
34 0 : }
35 0 : }
36 0 : .instrument(info_span!("WAL removal", ttid = %ttid))
37 0 : .await;
38 : }
39 :
40 0 : let elapsed = now.elapsed();
41 0 : let total_timelines = tlis.len();
42 0 :
43 0 : if elapsed > wal_removal_interval {
44 0 : info!(
45 0 : "WAL removal is too long, processed {} active timelines ({} total) in {:?}",
46 0 : active_timelines, total_timelines, elapsed
47 0 : );
48 0 : }
49 :
50 0 : sleep(wal_removal_interval).await;
51 : }
52 : }
|