TLA Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::Context;
4 : use aws_sdk_s3::Client;
5 : use either::Either;
6 : use tokio::sync::mpsc::UnboundedSender;
7 : use tracing::info;
8 :
9 : use crate::cloud_admin_api::{CloudAdminApiClient, ProjectData};
10 : use crate::delete_batch_producer::FetchResult;
11 : use crate::{RootTarget, TraversingDepth};
12 : use utils::id::{TenantId, TenantTimelineId};
13 :
14 : use super::ProcessedS3List;
15 :
16 UBC 0 : pub async fn schedule_cleanup_deleted_tenants(
17 0 : s3_root_target: &RootTarget,
18 0 : s3_client: &Arc<Client>,
19 0 : admin_client: &Arc<CloudAdminApiClient>,
20 0 : projects_to_check_sender: UnboundedSender<ProjectData>,
21 0 : delete_sender: Arc<UnboundedSender<Either<TenantId, TenantTimelineId>>>,
22 0 : traversing_depth: TraversingDepth,
23 0 : ) -> anyhow::Result<ProcessedS3List<TenantId, ProjectData>> {
24 0 : info!(
25 0 : "Starting to list the bucket from root {}",
26 0 : s3_root_target.bucket_name()
27 0 : );
28 0 : s3_client
29 0 : .head_bucket()
30 0 : .bucket(s3_root_target.bucket_name())
31 0 : .send()
32 0 : .await
33 0 : .with_context(|| format!("bucket {} was not found", s3_root_target.bucket_name()))?;
34 :
35 0 : let check_client = Arc::clone(admin_client);
36 0 : let tenant_stats = super::process_s3_target_recursively(
37 0 : s3_client,
38 0 : s3_root_target.tenants_root(),
39 0 : |s3_tenants| async move {
40 0 : let another_client = Arc::clone(&check_client);
41 0 : super::split_to_active_and_deleted_entries(s3_tenants, move |tenant_id| async move {
42 0 : let project_data = another_client
43 0 : .find_tenant_project(tenant_id)
44 0 : .await
45 0 : .with_context(|| format!("Tenant {tenant_id} project admin check"))?;
46 :
47 0 : Ok(if let Some(console_project) = project_data {
48 0 : if console_project.deleted {
49 0 : delete_sender.send(Either::Left(tenant_id)).ok();
50 0 : FetchResult::Deleted
51 : } else {
52 0 : if traversing_depth == TraversingDepth::Timeline {
53 0 : projects_to_check_sender.send(console_project.clone()).ok();
54 0 : }
55 0 : FetchResult::Found(console_project)
56 : }
57 : } else {
58 0 : delete_sender.send(Either::Left(tenant_id)).ok();
59 0 : FetchResult::Absent
60 : })
61 0 : })
62 0 : .await
63 0 : },
64 0 : )
65 0 : .await
66 0 : .context("tenant batch processing")?;
67 :
68 0 : info!(
69 0 : "Among {} tenants, found {} tenants to delete and {} active ones",
70 0 : tenant_stats.entries_total,
71 0 : tenant_stats.entries_to_delete.len(),
72 0 : tenant_stats.active_entries.len(),
73 0 : );
74 :
75 0 : let tenant_stats = match traversing_depth {
76 : TraversingDepth::Tenant => {
77 0 : info!("Finished listing the bucket for tenants only");
78 0 : tenant_stats
79 : }
80 : TraversingDepth::Timeline => {
81 0 : info!("Finished listing the bucket for tenants and sent {} active tenants to check for timelines", tenant_stats.active_entries.len());
82 0 : tenant_stats
83 : }
84 : };
85 :
86 0 : Ok(tenant_stats)
87 0 : }
|