TLA Line data Source code
1 : mod tenant_batch;
2 : mod timeline_batch;
3 :
4 : use std::future::Future;
5 : use std::str::FromStr;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use anyhow::Context;
10 : use aws_sdk_s3::Client;
11 : use either::Either;
12 : use tokio::sync::mpsc::UnboundedReceiver;
13 : use tokio::sync::Mutex;
14 : use tokio::task::{JoinHandle, JoinSet};
15 : use tracing::{error, info, info_span, Instrument};
16 :
17 : use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectData};
18 : use crate::{list_objects_with_retries, RootTarget, S3Target, TraversingDepth, MAX_RETRIES};
19 : use utils::id::{TenantId, TenantTimelineId};
20 :
21 : /// Typical tenant to remove contains 1 layer and 1 index_part.json blobs
22 : /// Also, there are some non-standard tenants to remove, having more layers.
23 : /// delete_objects request allows up to 1000 keys, so be on a safe side and allow most
24 : /// batch processing tasks to do 1 delete objects request only.
25 : ///
26 : /// Every batch item will be additionally S3 LS'ed later, so keep the batch size
27 : /// even lower to allow multiple concurrent tasks do the LS requests.
28 : const BATCH_SIZE: usize = 100;
29 :
30 : pub struct DeleteBatchProducer {
31 : delete_tenants_sender_task: JoinHandle<anyhow::Result<ProcessedS3List<TenantId, ProjectData>>>,
32 : delete_timelines_sender_task:
33 : JoinHandle<anyhow::Result<ProcessedS3List<TenantTimelineId, BranchData>>>,
34 : delete_batch_creator_task: JoinHandle<()>,
35 : delete_batch_receiver: Arc<Mutex<UnboundedReceiver<DeleteBatch>>>,
36 : }
37 :
38 : pub struct DeleteProducerStats {
39 : pub tenant_stats: ProcessedS3List<TenantId, ProjectData>,
40 : pub timeline_stats: Option<ProcessedS3List<TenantTimelineId, BranchData>>,
41 : }
42 :
43 : impl DeleteProducerStats {
44 UBC 0 : pub fn tenants_checked(&self) -> usize {
45 0 : self.tenant_stats.entries_total
46 0 : }
47 :
48 0 : pub fn active_tenants(&self) -> usize {
49 0 : self.tenant_stats.active_entries.len()
50 0 : }
51 :
52 0 : pub fn timelines_checked(&self) -> usize {
53 0 : self.timeline_stats
54 0 : .as_ref()
55 0 : .map(|stats| stats.entries_total)
56 0 : .unwrap_or(0)
57 0 : }
58 : }
59 :
60 0 : #[derive(Debug, Default, Clone)]
61 : pub struct DeleteBatch {
62 : pub tenants: Vec<TenantId>,
63 : pub timelines: Vec<TenantTimelineId>,
64 : }
65 :
66 : impl DeleteBatch {
67 0 : pub fn merge(&mut self, other: Self) {
68 0 : self.tenants.extend(other.tenants);
69 0 : self.timelines.extend(other.timelines);
70 0 : }
71 :
72 0 : pub fn len(&self) -> usize {
73 0 : self.tenants.len() + self.timelines.len()
74 0 : }
75 :
76 0 : pub fn is_empty(&self) -> bool {
77 0 : self.len() == 0
78 0 : }
79 : }
80 :
81 : impl DeleteBatchProducer {
82 0 : pub fn start(
83 0 : admin_client: Arc<CloudAdminApiClient>,
84 0 : s3_client: Arc<Client>,
85 0 : s3_root_target: RootTarget,
86 0 : traversing_depth: TraversingDepth,
87 0 : ) -> Self {
88 0 : let (delete_elements_sender, mut delete_elements_receiver) =
89 0 : tokio::sync::mpsc::unbounded_channel();
90 0 : let delete_elements_sender = Arc::new(delete_elements_sender);
91 0 : let admin_client = Arc::new(admin_client);
92 0 :
93 0 : let (projects_to_check_sender, mut projects_to_check_receiver) =
94 0 : tokio::sync::mpsc::unbounded_channel();
95 0 : let delete_tenants_root_target = s3_root_target.clone();
96 0 : let delete_tenants_client = Arc::clone(&s3_client);
97 0 : let delete_tenants_admin_client = Arc::clone(&admin_client);
98 0 : let delete_sender = Arc::clone(&delete_elements_sender);
99 0 : let delete_tenants_sender_task = tokio::spawn(
100 0 : async move {
101 0 : tenant_batch::schedule_cleanup_deleted_tenants(
102 0 : &delete_tenants_root_target,
103 0 : &delete_tenants_client,
104 0 : &delete_tenants_admin_client,
105 0 : projects_to_check_sender,
106 0 : delete_sender,
107 0 : traversing_depth,
108 0 : )
109 0 : .await
110 0 : }
111 0 : .instrument(info_span!("delete_tenants_sender")),
112 : );
113 0 : let delete_timelines_sender_task = tokio::spawn(async move {
114 0 : timeline_batch::schedule_cleanup_deleted_timelines(
115 0 : &s3_root_target,
116 0 : &s3_client,
117 0 : &admin_client,
118 0 : &mut projects_to_check_receiver,
119 0 : delete_elements_sender,
120 0 : )
121 0 : .in_current_span()
122 0 : .await
123 0 : });
124 0 :
125 0 : let (delete_batch_sender, delete_batch_receiver) = tokio::sync::mpsc::unbounded_channel();
126 0 : let delete_batch_creator_task = tokio::spawn(
127 0 : async move {
128 0 : 'outer: loop {
129 0 : let mut delete_batch = DeleteBatch::default();
130 0 : while delete_batch.len() < BATCH_SIZE {
131 0 : match delete_elements_receiver.recv().await {
132 0 : Some(new_task) => match new_task {
133 0 : Either::Left(tenant_id) => delete_batch.tenants.push(tenant_id),
134 0 : Either::Right(timeline_id) => {
135 0 : delete_batch.timelines.push(timeline_id)
136 0 : }
137 0 : },
138 0 : None => {
139 0 : info!("Task finished: sender dropped");
140 0 : delete_batch_sender.send(delete_batch).ok();
141 0 : break 'outer;
142 0 : }
143 0 : }
144 0 : }
145 0 :
146 0 : if !delete_batch.is_empty() {
147 0 : delete_batch_sender.send(delete_batch).ok();
148 0 : }
149 0 : }
150 0 : }
151 0 : .instrument(info_span!("delete batch creator")),
152 : );
153 :
154 0 : Self {
155 0 : delete_tenants_sender_task,
156 0 : delete_timelines_sender_task,
157 0 : delete_batch_creator_task,
158 0 : delete_batch_receiver: Arc::new(Mutex::new(delete_batch_receiver)),
159 0 : }
160 0 : }
161 :
162 0 : pub fn subscribe(&self) -> Arc<Mutex<UnboundedReceiver<DeleteBatch>>> {
163 0 : self.delete_batch_receiver.clone()
164 0 : }
165 :
166 0 : pub async fn join(self) -> anyhow::Result<DeleteProducerStats> {
167 0 : let (delete_tenants_task_result, delete_timelines_task_result, batch_task_result) = tokio::join!(
168 0 : self.delete_tenants_sender_task,
169 0 : self.delete_timelines_sender_task,
170 0 : self.delete_batch_creator_task,
171 0 : );
172 :
173 0 : let tenant_stats = match delete_tenants_task_result {
174 0 : Ok(Ok(stats)) => stats,
175 0 : Ok(Err(tenant_deletion_error)) => return Err(tenant_deletion_error),
176 0 : Err(join_error) => {
177 0 : anyhow::bail!("Failed to join the delete tenant producing task: {join_error}")
178 : }
179 : };
180 :
181 0 : let timeline_stats = match delete_timelines_task_result {
182 0 : Ok(Ok(stats)) => Some(stats),
183 0 : Ok(Err(timeline_deletion_error)) => return Err(timeline_deletion_error),
184 0 : Err(join_error) => {
185 0 : anyhow::bail!("Failed to join the delete timeline producing task: {join_error}")
186 : }
187 : };
188 :
189 0 : match batch_task_result {
190 0 : Ok(()) => (),
191 0 : Err(join_error) => anyhow::bail!("Failed to join the batch forming task: {join_error}"),
192 : };
193 :
194 0 : Ok(DeleteProducerStats {
195 0 : tenant_stats,
196 0 : timeline_stats,
197 0 : })
198 0 : }
199 : }
200 :
201 : pub struct ProcessedS3List<I, A> {
202 : pub entries_total: usize,
203 : pub entries_to_delete: Vec<I>,
204 : pub active_entries: Vec<A>,
205 : }
206 :
207 : impl<I, A> Default for ProcessedS3List<I, A> {
208 0 : fn default() -> Self {
209 0 : Self {
210 0 : entries_total: 0,
211 0 : entries_to_delete: Vec::new(),
212 0 : active_entries: Vec::new(),
213 0 : }
214 0 : }
215 : }
216 :
217 : impl<I, A> ProcessedS3List<I, A> {
218 0 : fn merge(&mut self, other: Self) {
219 0 : self.entries_total += other.entries_total;
220 0 : self.entries_to_delete.extend(other.entries_to_delete);
221 0 : self.active_entries.extend(other.active_entries);
222 0 : }
223 :
224 0 : fn change_ids<NewI>(self, transform: impl Fn(I) -> NewI) -> ProcessedS3List<NewI, A> {
225 0 : ProcessedS3List {
226 0 : entries_total: self.entries_total,
227 0 : entries_to_delete: self.entries_to_delete.into_iter().map(transform).collect(),
228 0 : active_entries: self.active_entries,
229 0 : }
230 0 : }
231 : }
232 :
233 0 : async fn process_s3_target_recursively<F, Fut, I, E, A>(
234 0 : s3_client: &Client,
235 0 : target: &S3Target,
236 0 : find_active_and_deleted_entries: F,
237 0 : ) -> anyhow::Result<ProcessedS3List<I, A>>
238 0 : where
239 0 : I: FromStr<Err = E> + Send + Sync,
240 0 : E: Send + Sync + std::error::Error + 'static,
241 0 : F: FnOnce(Vec<I>) -> Fut + Clone,
242 0 : Fut: Future<Output = anyhow::Result<ProcessedS3List<I, A>>>,
243 0 : {
244 0 : let mut continuation_token = None;
245 0 : let mut total_entries = ProcessedS3List::default();
246 :
247 : loop {
248 0 : let fetch_response =
249 0 : list_objects_with_retries(s3_client, target, continuation_token.clone()).await?;
250 :
251 0 : let new_entry_ids = fetch_response
252 0 : .common_prefixes()
253 0 : .unwrap_or_default()
254 0 : .iter()
255 0 : .filter_map(|prefix| prefix.prefix())
256 0 : .filter_map(|prefix| -> Option<&str> {
257 0 : prefix
258 0 : .strip_prefix(&target.prefix_in_bucket)?
259 0 : .strip_suffix('/')
260 0 : })
261 0 : .map(|entry_id_str| {
262 0 : entry_id_str
263 0 : .parse()
264 0 : .with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
265 0 : })
266 0 : .collect::<anyhow::Result<Vec<I>>>()
267 0 : .context("list and parse bucket's entry ids")?;
268 :
269 0 : total_entries.merge(
270 0 : (find_active_and_deleted_entries.clone())(new_entry_ids)
271 0 : .await
272 0 : .context("filter active and deleted entry ids")?,
273 : );
274 :
275 0 : match fetch_response.next_continuation_token {
276 0 : Some(new_token) => continuation_token = Some(new_token),
277 0 : None => break,
278 0 : }
279 0 : }
280 0 :
281 0 : Ok(total_entries)
282 0 : }
283 :
284 : enum FetchResult<A> {
285 : Found(A),
286 : Deleted,
287 : Absent,
288 : }
289 :
290 0 : async fn split_to_active_and_deleted_entries<I, A, F, Fut>(
291 0 : new_entry_ids: Vec<I>,
292 0 : find_active_entry: F,
293 0 : ) -> anyhow::Result<ProcessedS3List<I, A>>
294 0 : where
295 0 : I: std::fmt::Display + Send + Sync + 'static + Copy,
296 0 : A: Send + 'static,
297 0 : F: FnOnce(I) -> Fut + Send + Sync + 'static + Clone,
298 0 : Fut: Future<Output = anyhow::Result<FetchResult<A>>> + Send,
299 0 : {
300 0 : let entries_total = new_entry_ids.len();
301 0 : let mut check_tasks = JoinSet::new();
302 0 : let mut active_entries = Vec::with_capacity(entries_total);
303 0 : let mut entries_to_delete = Vec::with_capacity(entries_total);
304 :
305 0 : for new_entry_id in new_entry_ids {
306 0 : let check_closure = find_active_entry.clone();
307 0 : check_tasks.spawn(
308 0 : async move {
309 0 : (
310 0 : new_entry_id,
311 0 : async {
312 0 : for _ in 0..MAX_RETRIES {
313 0 : let closure_clone = check_closure.clone();
314 0 : match closure_clone(new_entry_id).await {
315 0 : Ok(active_entry) => return Ok(active_entry),
316 0 : Err(e) => {
317 0 : error!("find active entry admin API call failed: {e}");
318 0 : tokio::time::sleep(Duration::from_secs(1)).await;
319 : }
320 : }
321 : }
322 :
323 0 : anyhow::bail!("Failed to check entry {new_entry_id} {MAX_RETRIES} times")
324 0 : }
325 0 : .await,
326 : )
327 0 : }
328 0 : .instrument(info_span!("filter_active_entries")),
329 : );
330 : }
331 :
332 0 : while let Some(task_result) = check_tasks.join_next().await {
333 0 : let (entry_id, entry_data_fetch_result) = task_result.context("task join")?;
334 0 : match entry_data_fetch_result.context("entry data fetch")? {
335 0 : FetchResult::Found(active_entry) => {
336 0 : info!("Entry {entry_id} is alive, cannot delete");
337 0 : active_entries.push(active_entry);
338 : }
339 : FetchResult::Deleted => {
340 0 : info!("Entry {entry_id} deleted in the admin data, can safely delete");
341 0 : entries_to_delete.push(entry_id);
342 : }
343 : FetchResult::Absent => {
344 0 : info!("Entry {entry_id} absent in the admin data, can safely delete");
345 0 : entries_to_delete.push(entry_id);
346 : }
347 : }
348 : }
349 0 : Ok(ProcessedS3List {
350 0 : entries_total,
351 0 : entries_to_delete,
352 0 : active_entries,
353 0 : })
354 0 : }
|