TLA Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::Context;
4 : use aws_sdk_s3::Client;
5 : use either::Either;
6 : use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
7 : use tracing::{info, info_span, Instrument};
8 :
9 : use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectData};
10 : use crate::delete_batch_producer::{FetchResult, ProcessedS3List};
11 : use crate::RootTarget;
12 : use utils::id::{TenantId, TenantTimelineId};
13 :
14 UBC 0 : pub async fn schedule_cleanup_deleted_timelines(
15 0 : s3_root_target: &RootTarget,
16 0 : s3_client: &Arc<Client>,
17 0 : admin_client: &Arc<CloudAdminApiClient>,
18 0 : projects_to_check_receiver: &mut UnboundedReceiver<ProjectData>,
19 0 : delete_elements_sender: Arc<UnboundedSender<Either<TenantId, TenantTimelineId>>>,
20 0 : ) -> anyhow::Result<ProcessedS3List<TenantTimelineId, BranchData>> {
21 0 : info!(
22 0 : "Starting to list the bucket from root {}",
23 0 : s3_root_target.bucket_name()
24 0 : );
25 0 : s3_client
26 0 : .head_bucket()
27 0 : .bucket(s3_root_target.bucket_name())
28 0 : .send()
29 0 : .await
30 0 : .with_context(|| format!("bucket {} was not found", s3_root_target.bucket_name()))?;
31 :
32 0 : let mut timeline_stats = ProcessedS3List::default();
33 0 : while let Some(project_to_check) = projects_to_check_receiver.recv().await {
34 0 : let check_client = Arc::clone(admin_client);
35 0 :
36 0 : let check_s3_client = Arc::clone(s3_client);
37 0 :
38 0 : let check_delete_sender = Arc::clone(&delete_elements_sender);
39 0 :
40 0 : let check_root = s3_root_target.clone();
41 :
42 0 : let new_stats = async move {
43 0 : let tenant_id_to_check = project_to_check.tenant;
44 0 : let check_target = check_root.timelines_root(&tenant_id_to_check);
45 0 : let stats = super::process_s3_target_recursively(
46 0 : &check_s3_client,
47 0 : &check_target,
48 0 : |s3_timelines| async move {
49 0 : let another_client = check_client.clone();
50 0 : super::split_to_active_and_deleted_entries(
51 0 : s3_timelines,
52 0 : move |timeline_id| async move {
53 0 : let console_branch = another_client
54 0 : .find_timeline_branch(timeline_id)
55 0 : .await
56 0 : .map_err(|e| {
57 0 : anyhow::anyhow!(
58 0 : "Timeline {timeline_id} branch admin check: {e}"
59 0 : )
60 0 : })?;
61 :
62 0 : let id = TenantTimelineId::new(tenant_id_to_check, timeline_id);
63 0 : Ok(match console_branch {
64 0 : Some(console_branch) => {
65 0 : if console_branch.deleted {
66 0 : check_delete_sender.send(Either::Right(id)).ok();
67 0 : FetchResult::Deleted
68 : } else {
69 0 : FetchResult::Found(console_branch)
70 : }
71 : }
72 : None => {
73 0 : check_delete_sender.send(Either::Right(id)).ok();
74 0 : FetchResult::Absent
75 : }
76 : })
77 0 : },
78 0 : )
79 0 : .await
80 0 : },
81 0 : )
82 0 : .await
83 0 : .with_context(|| format!("tenant {tenant_id_to_check} timeline batch processing"))?
84 0 : .change_ids(|timeline_id| TenantTimelineId::new(tenant_id_to_check, timeline_id));
85 0 :
86 0 : Ok::<_, anyhow::Error>(stats)
87 0 : }
88 0 : .instrument(info_span!("delete_timelines_sender", tenant = %project_to_check.tenant))
89 0 : .await?;
90 :
91 0 : timeline_stats.merge(new_stats);
92 : }
93 :
94 0 : info!(
95 0 : "Among {} timelines, found {} timelines to delete and {} active ones",
96 0 : timeline_stats.entries_total,
97 0 : timeline_stats.entries_to_delete.len(),
98 0 : timeline_stats.active_entries.len(),
99 0 : );
100 :
101 0 : Ok(timeline_stats)
102 0 : }
|