Line data Source code
1 : //! Thread removing old WAL.
2 :
3 : use std::time::Duration;
4 :
5 : use tokio::time::sleep;
6 : use tracing::*;
7 :
8 : use crate::{GlobalTimelines, SafeKeeperConf};
9 :
10 : const ALLOW_INACTIVE_TIMELINES: bool = true;
11 :
12 510 : pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> {
13 510 : let wal_removal_interval = Duration::from_millis(5000);
14 : loop {
15 1923 : let now = tokio::time::Instant::now();
16 1923 : let mut active_timelines = 0;
17 1923 :
18 1923 : let tlis = GlobalTimelines::get_all();
19 3696 : for tli in &tlis {
20 1773 : let is_active = tli.is_active().await;
21 1773 : if is_active {
22 1679 : active_timelines += 1;
23 1679 : }
24 1773 : if !ALLOW_INACTIVE_TIMELINES && !is_active {
25 0 : continue;
26 1773 : }
27 1773 : let ttid = tli.ttid;
28 1773 : async {
29 1773 : if let Err(e) = tli.maybe_persist_control_file().await {
30 0 : warn!("failed to persist control file: {e}");
31 1773 : }
32 1773 : if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled).await {
33 0 : error!("failed to remove WAL: {}", e);
34 1773 : }
35 1773 : }
36 1773 : .instrument(info_span!("WAL removal", ttid = %ttid))
37 131 : .await;
38 : }
39 :
40 1923 : let elapsed = now.elapsed();
41 1923 : let total_timelines = tlis.len();
42 1923 :
43 1923 : if elapsed > wal_removal_interval {
44 0 : info!(
45 0 : "WAL removal is too long, processed {} active timelines ({} total) in {:?}",
46 0 : active_timelines, total_timelines, elapsed
47 0 : );
48 1923 : }
49 :
50 1923 : sleep(wal_removal_interval).await;
51 : }
52 : }
|