Line data Source code
1 : //! Functionality for finding and purging garbage, as in "garbage collection". Garbage means
2 : //! S3 objects which are either not referenced by any metadata, or are referenced by a
3 : //! control plane tenant/timeline in a deleted state.
4 :
5 : use std::{
6 : collections::{HashMap, HashSet},
7 : sync::Arc,
8 : };
9 :
10 : use anyhow::Context;
11 : use aws_sdk_s3::{
12 : types::{Delete, ObjectIdentifier},
13 : Client,
14 : };
15 : use futures_util::TryStreamExt;
16 : use pageserver_api::shard::TenantShardId;
17 : use serde::{Deserialize, Serialize};
18 : use tokio_stream::StreamExt;
19 : use utils::id::TenantId;
20 :
21 : use crate::{
22 : cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
23 : init_remote,
24 : metadata_stream::{stream_listing, stream_tenant_timelines, stream_tenants},
25 : BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, TraversingDepth,
26 : };
27 :
28 0 : #[derive(Serialize, Deserialize, Debug)]
29 : enum GarbageReason {
30 : DeletedInConsole,
31 : MissingInConsole,
32 : }
33 :
34 0 : #[derive(Serialize, Deserialize, Debug)]
35 : enum GarbageEntity {
36 : Tenant(TenantShardId),
37 : Timeline(TenantShardTimelineId),
38 : }
39 :
40 0 : #[derive(Serialize, Deserialize, Debug)]
41 : struct GarbageItem {
42 : entity: GarbageEntity,
43 : reason: GarbageReason,
44 : }
45 :
46 0 : #[derive(Serialize, Deserialize, Debug)]
47 : pub struct GarbageList {
48 : /// Remember what NodeKind we were finding garbage for, so that we can
49 : /// purge the list without re-stating it.
50 : node_kind: NodeKind,
51 :
52 : /// Embed the identity of the bucket, so that we do not risk executing
53 : /// the wrong list against the wrong bucket, and so that the user does not have
54 : /// to re-state the bucket details when purging.
55 : bucket_config: BucketConfig,
56 :
57 : items: Vec<GarbageItem>,
58 :
59 : /// Advisory information to enable consumers to do a validation that if we
60 : /// see garbage, we saw some active tenants too. This protects against classes of bugs
61 : /// in the scrubber that might otherwise generate a "deleted all" result.
62 : active_tenant_count: usize,
63 : active_timeline_count: usize,
64 : }
65 :
66 : impl GarbageList {
67 0 : fn new(node_kind: NodeKind, bucket_config: BucketConfig) -> Self {
68 0 : Self {
69 0 : items: Vec::new(),
70 0 : active_tenant_count: 0,
71 0 : active_timeline_count: 0,
72 0 : node_kind,
73 0 : bucket_config,
74 0 : }
75 0 : }
76 :
77 : /// Return true if appended, false if not. False means the result was not garbage.
78 0 : fn maybe_append<T>(&mut self, entity: GarbageEntity, result: Option<T>) -> bool
79 0 : where
80 0 : T: MaybeDeleted,
81 0 : {
82 0 : match result {
83 0 : Some(result_item) if result_item.is_deleted() => {
84 0 : self.items.push(GarbageItem {
85 0 : entity,
86 0 : reason: GarbageReason::DeletedInConsole,
87 0 : });
88 0 : true
89 : }
90 0 : Some(_) => false,
91 : None => {
92 0 : self.items.push(GarbageItem {
93 0 : entity,
94 0 : reason: GarbageReason::MissingInConsole,
95 0 : });
96 0 : true
97 : }
98 : }
99 0 : }
100 : }
101 :
102 0 : pub async fn find_garbage(
103 0 : bucket_config: BucketConfig,
104 0 : console_config: ConsoleConfig,
105 0 : depth: TraversingDepth,
106 0 : node_kind: NodeKind,
107 0 : output_path: String,
108 0 : ) -> anyhow::Result<()> {
109 0 : let garbage = find_garbage_inner(bucket_config, console_config, depth, node_kind).await?;
110 0 : let serialized = serde_json::to_vec_pretty(&garbage)?;
111 :
112 0 : tokio::fs::write(&output_path, &serialized).await?;
113 :
114 0 : tracing::info!("Wrote garbage report to {output_path}");
115 :
116 0 : Ok(())
117 0 : }
118 :
119 : // How many concurrent S3 operations to issue (approximately): this is the concurrency
120 : // for things like listing the timelines within tenant prefixes.
121 : const S3_CONCURRENCY: usize = 32;
122 :
123 : // How many concurrent API requests to make to the console API.
124 : //
125 : // Be careful increasing this; roughly we shouldn't have more than ~100 rps. It
126 : // would be better to implement real rsp limiter.
127 : const CONSOLE_CONCURRENCY: usize = 16;
128 :
129 : struct ConsoleCache {
130 : /// Set of tenants found in the control plane API
131 : projects: HashMap<TenantId, ProjectData>,
132 : /// Set of tenants for which the control plane API returned 404
133 : not_found: HashSet<TenantId>,
134 : }
135 :
136 0 : async fn find_garbage_inner(
137 0 : bucket_config: BucketConfig,
138 0 : console_config: ConsoleConfig,
139 0 : depth: TraversingDepth,
140 0 : node_kind: NodeKind,
141 0 : ) -> anyhow::Result<GarbageList> {
142 : // Construct clients for S3 and for Console API
143 0 : let (s3_client, target) = init_remote(bucket_config.clone(), node_kind)?;
144 0 : let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
145 :
146 : // Build a set of console-known tenants, for quickly eliminating known-active tenants without having
147 : // to issue O(N) console API requests.
148 0 : let console_projects: HashMap<TenantId, ProjectData> = cloud_admin_api_client
149 0 : // FIXME: we can't just assume that all console's region ids are aws-<something>. This hack
150 0 : // will go away when we are talking to Control Plane APIs, which are per-region.
151 0 : .list_projects(format!("aws-{}", bucket_config.region))
152 0 : .await?
153 0 : .into_iter()
154 0 : .map(|t| (t.tenant, t))
155 0 : .collect();
156 0 : tracing::info!(
157 0 : "Loaded {} console projects tenant IDs",
158 0 : console_projects.len()
159 : );
160 :
161 : // Because many tenant shards may look up the same TenantId, we maintain a cache.
162 0 : let console_cache = Arc::new(std::sync::Mutex::new(ConsoleCache {
163 0 : projects: console_projects,
164 0 : not_found: HashSet::new(),
165 0 : }));
166 0 :
167 0 : // Enumerate Tenants in S3, and check if each one exists in Console
168 0 : tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
169 0 : let tenants = stream_tenants(&s3_client, &target);
170 0 : let tenants_checked = tenants.map_ok(|t| {
171 0 : let api_client = cloud_admin_api_client.clone();
172 0 : let console_cache = console_cache.clone();
173 0 : async move {
174 : // Check cache before issuing API call
175 0 : let project_data = {
176 0 : let cache = console_cache.lock().unwrap();
177 0 : let result = cache.projects.get(&t.tenant_id).cloned();
178 0 : if result.is_none() && cache.not_found.contains(&t.tenant_id) {
179 0 : return Ok((t, None));
180 0 : }
181 0 : result
182 0 : };
183 0 :
184 0 : match project_data {
185 0 : Some(project_data) => Ok((t, Some(project_data.clone()))),
186 : None => {
187 0 : let project_data = api_client
188 0 : .find_tenant_project(t.tenant_id)
189 0 : .await
190 0 : .map_err(|e| anyhow::anyhow!(e));
191 0 :
192 0 : // Populate cache with result of API call
193 0 : {
194 0 : let mut cache = console_cache.lock().unwrap();
195 0 : if let Ok(Some(project_data)) = &project_data {
196 0 : cache.projects.insert(t.tenant_id, project_data.clone());
197 0 : } else if let Ok(None) = &project_data {
198 0 : cache.not_found.insert(t.tenant_id);
199 0 : }
200 : }
201 :
202 0 : project_data.map(|r| (t, r))
203 : }
204 : }
205 0 : }
206 0 : });
207 0 : let mut tenants_checked =
208 0 : std::pin::pin!(tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));
209 0 :
210 0 : // Process the results of Tenant checks. If a Tenant is garbage, it goes into
211 0 : // the `GarbageList`. Else it goes into `active_tenants` for more detailed timeline
212 0 : // checks if they are enabled by the `depth` parameter.
213 0 : let mut garbage = GarbageList::new(node_kind, bucket_config);
214 0 : let mut active_tenants: Vec<TenantShardId> = vec![];
215 0 : let mut counter = 0;
216 0 : while let Some(result) = tenants_checked.next().await {
217 0 : let (tenant_shard_id, console_result) = result?;
218 :
219 : // Paranoia check
220 0 : if let Some(project) = &console_result {
221 0 : assert!(project.tenant == tenant_shard_id.tenant_id);
222 0 : }
223 :
224 0 : if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) {
225 0 : tracing::debug!("Tenant {tenant_shard_id} is garbage");
226 : } else {
227 0 : tracing::debug!("Tenant {tenant_shard_id} is active");
228 0 : active_tenants.push(tenant_shard_id);
229 0 : garbage.active_tenant_count = active_tenants.len();
230 : }
231 :
232 0 : counter += 1;
233 0 : if counter % 1000 == 0 {
234 0 : tracing::info!(
235 0 : "Progress: {counter} tenants checked, {} active, {} garbage",
236 0 : active_tenants.len(),
237 0 : garbage.items.len()
238 : );
239 0 : }
240 : }
241 :
242 0 : tracing::info!(
243 0 : "Found {}/{} garbage tenants",
244 0 : garbage.items.len(),
245 0 : garbage.items.len() + active_tenants.len()
246 : );
247 :
248 : // If we are only checking tenant-deep, we are done. Otherwise we must
249 : // proceed to check the individual timelines of the active tenants.
250 0 : if depth == TraversingDepth::Tenant {
251 0 : return Ok(garbage);
252 0 : }
253 0 :
254 0 : tracing::info!(
255 0 : "Checking timelines for {} active tenants",
256 0 : active_tenants.len(),
257 : );
258 :
259 : // Construct a stream of all timelines within active tenants
260 0 : let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
261 0 : let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, *t));
262 0 : let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
263 0 : let timelines = timelines.try_flatten();
264 0 :
265 0 : // For all timelines within active tenants, call into console API to check their existence
266 0 : let timelines_checked = timelines.map_ok(|ttid| {
267 0 : let api_client = cloud_admin_api_client.clone();
268 0 : async move {
269 0 : api_client
270 0 : .find_timeline_branch(ttid.tenant_shard_id.tenant_id, ttid.timeline_id)
271 0 : .await
272 0 : .map_err(|e| anyhow::anyhow!(e))
273 0 : .map(|r| (ttid, r))
274 0 : }
275 0 : });
276 0 : let mut timelines_checked =
277 0 : std::pin::pin!(timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY));
278 0 :
279 0 : // Update the GarbageList with any timelines which appear not to exist.
280 0 : let mut active_timelines: Vec<TenantShardTimelineId> = vec![];
281 0 : while let Some(result) = timelines_checked.next().await {
282 0 : let (ttid, console_result) = result?;
283 0 : if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) {
284 0 : tracing::debug!("Timeline {ttid} is garbage");
285 : } else {
286 0 : tracing::debug!("Timeline {ttid} is active");
287 0 : active_timelines.push(ttid);
288 0 : garbage.active_timeline_count = active_timelines.len();
289 : }
290 : }
291 :
292 0 : let num_garbage_timelines = garbage
293 0 : .items
294 0 : .iter()
295 0 : .filter(|g| matches!(g.entity, GarbageEntity::Timeline(_)))
296 0 : .count();
297 0 : tracing::info!(
298 0 : "Found {}/{} garbage timelines in active tenants",
299 0 : num_garbage_timelines,
300 0 : active_timelines.len(),
301 : );
302 :
303 0 : Ok(garbage)
304 0 : }
305 :
306 0 : #[derive(clap::ValueEnum, Debug, Clone)]
307 : pub enum PurgeMode {
308 : /// The safest mode: only delete tenants that were explicitly reported as deleted
309 : /// by Console API.
310 : DeletedOnly,
311 :
312 : /// Delete all garbage tenants, including those which are only presumed to be deleted,
313 : /// because the Console API could not find them.
314 : DeletedAndMissing,
315 : }
316 :
317 : impl std::fmt::Display for PurgeMode {
318 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319 0 : match self {
320 0 : PurgeMode::DeletedOnly => write!(f, "deleted-only"),
321 0 : PurgeMode::DeletedAndMissing => write!(f, "deleted-and-missing"),
322 : }
323 0 : }
324 : }
325 :
326 0 : pub async fn get_tenant_objects(
327 0 : s3_client: &Arc<Client>,
328 0 : target: RootTarget,
329 0 : tenant_shard_id: TenantShardId,
330 0 : ) -> anyhow::Result<Vec<ObjectIdentifier>> {
331 0 : tracing::debug!("Listing objects in tenant {tenant_shard_id}");
332 : // TODO: apply extra validation based on object modification time. Don't purge
333 : // tenants where any timeline's index_part.json has been touched recently.
334 :
335 0 : let mut tenant_root = target.tenant_root(&tenant_shard_id);
336 0 :
337 0 : // Remove delimiter, so that object listing lists all keys in the prefix and not just
338 0 : // common prefixes.
339 0 : tenant_root.delimiter = String::new();
340 0 :
341 0 : let key_stream = stream_listing(s3_client, &tenant_root);
342 0 : key_stream.try_collect().await
343 0 : }
344 :
345 0 : pub async fn get_timeline_objects(
346 0 : s3_client: &Arc<Client>,
347 0 : target: RootTarget,
348 0 : ttid: TenantShardTimelineId,
349 0 : ) -> anyhow::Result<Vec<ObjectIdentifier>> {
350 0 : tracing::debug!("Listing objects in timeline {ttid}");
351 0 : let mut timeline_root = target.timeline_root(&ttid);
352 0 :
353 0 : // TODO: apply extra validation based on object modification time. Don't purge
354 0 : // timelines whose index_part.json has been touched recently.
355 0 :
356 0 : // Remove delimiter, so that object listing lists all keys in the prefix and not just
357 0 : // common prefixes.
358 0 : timeline_root.delimiter = String::new();
359 0 : let key_stream = stream_listing(s3_client, &timeline_root);
360 0 :
361 0 : key_stream.try_collect().await
362 0 : }
363 :
364 : const MAX_KEYS_PER_DELETE: usize = 1000;
365 :
366 : /// Drain a buffer of keys into DeleteObjects requests
367 : ///
368 : /// If `drain` is true, drains keys completely; otherwise stops when <
369 : /// MAX_KEYS_PER_DELETE keys are left.
370 : /// `num_deleted` returns number of deleted keys.
371 0 : async fn do_delete(
372 0 : s3_client: &Arc<Client>,
373 0 : bucket_name: &str,
374 0 : keys: &mut Vec<ObjectIdentifier>,
375 0 : dry_run: bool,
376 0 : drain: bool,
377 0 : progress_tracker: &mut DeletionProgressTracker,
378 0 : ) -> anyhow::Result<()> {
379 0 : while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
380 0 : let request_keys =
381 0 : keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
382 0 : let num_deleted = request_keys.len();
383 0 : if dry_run {
384 0 : tracing::info!("Dry-run deletion of objects: ");
385 0 : for k in request_keys {
386 0 : tracing::info!(" {k:?}");
387 : }
388 : } else {
389 0 : let delete_request = s3_client
390 0 : .delete_objects()
391 0 : .bucket(bucket_name)
392 0 : .delete(Delete::builder().set_objects(Some(request_keys)).build()?);
393 0 : delete_request
394 0 : .send()
395 0 : .await
396 0 : .context("DeleteObjects request")?;
397 0 : progress_tracker.register(num_deleted);
398 : }
399 : }
400 :
401 0 : Ok(())
402 0 : }
403 :
404 : /// Simple tracker reporting each 10k deleted keys.
405 : #[derive(Default)]
406 : struct DeletionProgressTracker {
407 : num_deleted: usize,
408 : last_reported_num_deleted: usize,
409 : }
410 :
411 : impl DeletionProgressTracker {
412 0 : fn register(&mut self, n: usize) {
413 0 : self.num_deleted += n;
414 0 : if self.num_deleted - self.last_reported_num_deleted > 10000 {
415 0 : tracing::info!("progress: deleted {} keys", self.num_deleted);
416 0 : self.last_reported_num_deleted = self.num_deleted;
417 0 : }
418 0 : }
419 : }
420 :
421 0 : pub async fn purge_garbage(
422 0 : input_path: String,
423 0 : mode: PurgeMode,
424 0 : dry_run: bool,
425 0 : ) -> anyhow::Result<()> {
426 0 : let list_bytes = tokio::fs::read(&input_path).await?;
427 0 : let garbage_list = serde_json::from_slice::<GarbageList>(&list_bytes)?;
428 0 : tracing::info!(
429 0 : "Loaded {} items in garbage list from {}",
430 0 : garbage_list.items.len(),
431 : input_path
432 : );
433 :
434 0 : let (s3_client, target) =
435 0 : init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind)?;
436 :
437 : // Sanity checks on the incoming list
438 0 : if garbage_list.active_tenant_count == 0 {
439 0 : anyhow::bail!("Refusing to purge a garbage list that reports 0 active tenants");
440 0 : }
441 0 : if garbage_list
442 0 : .items
443 0 : .iter()
444 0 : .any(|g| matches!(g.entity, GarbageEntity::Timeline(_)))
445 0 : && garbage_list.active_timeline_count == 0
446 : {
447 0 : anyhow::bail!("Refusing to purge a garbage list containing garbage timelines that reports 0 active timelines");
448 0 : }
449 0 :
450 0 : let filtered_items = garbage_list
451 0 : .items
452 0 : .iter()
453 0 : .filter(|i| match (&mode, &i.reason) {
454 0 : (PurgeMode::DeletedAndMissing, _) => true,
455 0 : (PurgeMode::DeletedOnly, GarbageReason::DeletedInConsole) => true,
456 0 : (PurgeMode::DeletedOnly, GarbageReason::MissingInConsole) => false,
457 0 : });
458 0 :
459 0 : tracing::info!(
460 0 : "Filtered down to {} garbage items based on mode {}",
461 0 : garbage_list.items.len(),
462 : mode
463 : );
464 :
465 0 : let items = tokio_stream::iter(filtered_items.map(Ok));
466 0 : let get_objects_results = items.map_ok(|i| {
467 0 : let s3_client = s3_client.clone();
468 0 : let target = target.clone();
469 0 : async move {
470 0 : match i.entity {
471 0 : GarbageEntity::Tenant(tenant_id) => {
472 0 : get_tenant_objects(&s3_client, target, tenant_id).await
473 : }
474 0 : GarbageEntity::Timeline(ttid) => {
475 0 : get_timeline_objects(&s3_client, target, ttid).await
476 : }
477 : }
478 0 : }
479 0 : });
480 0 : let mut get_objects_results =
481 0 : std::pin::pin!(get_objects_results.try_buffer_unordered(S3_CONCURRENCY));
482 0 :
483 0 : let mut objects_to_delete = Vec::new();
484 0 : let mut progress_tracker = DeletionProgressTracker::default();
485 0 : while let Some(result) = get_objects_results.next().await {
486 0 : let mut object_list = result?;
487 0 : objects_to_delete.append(&mut object_list);
488 0 : if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
489 0 : do_delete(
490 0 : &s3_client,
491 0 : &garbage_list.bucket_config.bucket,
492 0 : &mut objects_to_delete,
493 0 : dry_run,
494 0 : false,
495 0 : &mut progress_tracker,
496 0 : )
497 0 : .await?;
498 0 : }
499 : }
500 :
501 0 : do_delete(
502 0 : &s3_client,
503 0 : &garbage_list.bucket_config.bucket,
504 0 : &mut objects_to_delete,
505 0 : dry_run,
506 0 : true,
507 0 : &mut progress_tracker,
508 0 : )
509 0 : .await?;
510 :
511 0 : tracing::info!("{} keys deleted in total", progress_tracker.num_deleted);
512 :
513 0 : Ok(())
514 0 : }
|