Line data Source code
1 : //! Thread removing old WAL.
2 :
3 : use std::time::Duration;
4 :
5 : use tokio::time::sleep;
6 : use tracing::*;
7 :
8 : use crate::{GlobalTimelines, SafeKeeperConf};
9 :
10 : const ALLOW_INACTIVE_TIMELINES: bool = true;
11 :
12 508 : pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
13 508 : let wal_removal_interval = Duration::from_millis(5000);
14 : loop {
15 1882 : let now = tokio::time::Instant::now();
16 1882 : let mut active_timelines = 0;
17 1882 :
18 1882 : let tlis = GlobalTimelines::get_all();
19 3619 : for tli in &tlis {
20 1737 : let is_active = tli.is_active().await;
21 1737 : if is_active {
22 1646 : active_timelines += 1;
23 1646 : }
24 1737 : if !ALLOW_INACTIVE_TIMELINES && !is_active {
25 0 : continue;
26 1737 : }
27 1737 : let ttid = tli.ttid;
28 1737 : async {
29 1737 : if let Err(e) = tli.maybe_persist_control_file().await {
30 0 : warn!("failed to persist control file: {e}");
31 1737 : }
32 1737 : if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled).await {
33 0 : error!("failed to remove WAL: {}", e);
34 1737 : }
35 1737 : }
36 1737 : .instrument(info_span!("WAL removal", ttid = %ttid))
37 181 : .await;
38 : }
39 :
40 1882 : let elapsed = now.elapsed();
41 1882 : let total_timelines = tlis.len();
42 1882 :
43 1882 : if elapsed > wal_removal_interval {
44 0 : info!(
45 0 : "WAL removal is too long, processed {} active timelines ({} total) in {:?}",
46 0 : active_timelines, total_timelines, elapsed
47 0 : );
48 1882 : }
49 :
50 1882 : sleep(wal_removal_interval).await;
51 : }
52 : }
|