Line data Source code
1 : //! Functionality for finding and purging garbage, as in "garbage collection". Garbage means
2 : //! S3 objects which are either not referenced by any metadata, or are referenced by a
3 : //! control plane tenant/timeline in a deleted state.
4 :
5 : use std::{
6 : collections::{HashMap, HashSet},
7 : sync::Arc,
8 : };
9 :
10 : use anyhow::Context;
11 : use aws_sdk_s3::{
12 : types::{Delete, ObjectIdentifier},
13 : Client,
14 : };
15 : use futures_util::{pin_mut, TryStreamExt};
16 : use pageserver_api::shard::TenantShardId;
17 : use serde::{Deserialize, Serialize};
18 : use tokio_stream::StreamExt;
19 : use utils::id::TenantId;
20 :
21 : use crate::{
22 : cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
23 : init_remote,
24 : metadata_stream::{stream_listing, stream_tenant_timelines, stream_tenants},
25 : BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId, TraversingDepth,
26 : };
27 :
28 0 : #[derive(Serialize, Deserialize, Debug)]
29 : enum GarbageReason {
30 : DeletedInConsole,
31 : MissingInConsole,
32 : }
33 :
34 0 : #[derive(Serialize, Deserialize, Debug)]
35 : enum GarbageEntity {
36 : Tenant(TenantShardId),
37 : Timeline(TenantShardTimelineId),
38 : }
39 :
40 0 : #[derive(Serialize, Deserialize, Debug)]
41 : struct GarbageItem {
42 : entity: GarbageEntity,
43 : reason: GarbageReason,
44 : }
45 :
46 0 : #[derive(Serialize, Deserialize, Debug)]
47 : pub struct GarbageList {
48 : /// Remember what NodeKind we were finding garbage for, so that we can
49 : /// purge the list without re-stating it.
50 : node_kind: NodeKind,
51 :
52 : /// Embed the identity of the bucket, so that we do not risk executing
53 : /// the wrong list against the wrong bucket, and so that the user does not have
54 : /// to re-state the bucket details when purging.
55 : bucket_config: BucketConfig,
56 :
57 : items: Vec<GarbageItem>,
58 :
59 : /// Advisory information to enable consumers to do a validation that if we
60 : /// see garbage, we saw some active tenants too. This protects against classes of bugs
61 : /// in the scrubber that might otherwise generate a "deleted all" result.
62 : active_tenant_count: usize,
63 : }
64 :
65 : impl GarbageList {
66 0 : fn new(node_kind: NodeKind, bucket_config: BucketConfig) -> Self {
67 0 : Self {
68 0 : items: Vec::new(),
69 0 : active_tenant_count: 0,
70 0 : node_kind,
71 0 : bucket_config,
72 0 : }
73 0 : }
74 :
75 : /// Return true if appended, false if not. False means the result was not garbage.
76 0 : fn maybe_append<T>(&mut self, entity: GarbageEntity, result: Option<T>) -> bool
77 0 : where
78 0 : T: MaybeDeleted,
79 0 : {
80 0 : match result {
81 0 : Some(result_item) if result_item.is_deleted() => {
82 0 : self.items.push(GarbageItem {
83 0 : entity,
84 0 : reason: GarbageReason::DeletedInConsole,
85 0 : });
86 0 : true
87 : }
88 0 : Some(_) => false,
89 : None => {
90 0 : self.items.push(GarbageItem {
91 0 : entity,
92 0 : reason: GarbageReason::MissingInConsole,
93 0 : });
94 0 : true
95 : }
96 : }
97 0 : }
98 : }
99 :
100 0 : pub async fn find_garbage(
101 0 : bucket_config: BucketConfig,
102 0 : console_config: ConsoleConfig,
103 0 : depth: TraversingDepth,
104 0 : node_kind: NodeKind,
105 0 : output_path: String,
106 0 : ) -> anyhow::Result<()> {
107 0 : let garbage = find_garbage_inner(bucket_config, console_config, depth, node_kind).await?;
108 0 : let serialized = serde_json::to_vec_pretty(&garbage)?;
109 :
110 0 : tokio::fs::write(&output_path, &serialized).await?;
111 :
112 0 : tracing::info!("Wrote garbage report to {output_path}");
113 :
114 0 : Ok(())
115 0 : }
116 :
117 : // How many concurrent S3 operations to issue (approximately): this is the concurrency
118 : // for things like listing the timelines within tenant prefixes.
119 : const S3_CONCURRENCY: usize = 32;
120 :
121 : // How many concurrent API requests to make to the console API.
122 : const CONSOLE_CONCURRENCY: usize = 128;
123 :
124 : struct ConsoleCache {
125 : /// Set of tenants found in the control plane API
126 : projects: HashMap<TenantId, ProjectData>,
127 : /// Set of tenants for which the control plane API returned 404
128 : not_found: HashSet<TenantId>,
129 : }
130 :
131 0 : async fn find_garbage_inner(
132 0 : bucket_config: BucketConfig,
133 0 : console_config: ConsoleConfig,
134 0 : depth: TraversingDepth,
135 0 : node_kind: NodeKind,
136 0 : ) -> anyhow::Result<GarbageList> {
137 : // Construct clients for S3 and for Console API
138 0 : let (s3_client, target) = init_remote(bucket_config.clone(), node_kind)?;
139 0 : let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
140 :
141 : // Build a set of console-known tenants, for quickly eliminating known-active tenants without having
142 : // to issue O(N) console API requests.
143 0 : let console_projects: HashMap<TenantId, ProjectData> = cloud_admin_api_client
144 0 : // FIXME: we can't just assume that all console's region ids are aws-<something>. This hack
145 0 : // will go away when we are talking to Control Plane APIs, which are per-region.
146 0 : .list_projects(format!("aws-{}", bucket_config.region))
147 0 : .await?
148 0 : .into_iter()
149 0 : .map(|t| (t.tenant, t))
150 0 : .collect();
151 0 : tracing::info!(
152 0 : "Loaded {} console projects tenant IDs",
153 0 : console_projects.len()
154 0 : );
155 :
156 : // Because many tenant shards may look up the same TenantId, we maintain a cache.
157 0 : let console_cache = Arc::new(std::sync::Mutex::new(ConsoleCache {
158 0 : projects: console_projects,
159 0 : not_found: HashSet::new(),
160 0 : }));
161 :
162 : // Enumerate Tenants in S3, and check if each one exists in Console
163 0 : tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
164 0 : let tenants = stream_tenants(&s3_client, &target);
165 0 : let tenants_checked = tenants.map_ok(|t| {
166 0 : let api_client = cloud_admin_api_client.clone();
167 0 : let console_cache = console_cache.clone();
168 0 : async move {
169 : // Check cache before issuing API call
170 0 : let project_data = {
171 0 : let cache = console_cache.lock().unwrap();
172 0 : let result = cache.projects.get(&t.tenant_id).cloned();
173 0 : if result.is_none() && cache.not_found.contains(&t.tenant_id) {
174 0 : return Ok((t, None));
175 0 : }
176 0 : result
177 0 : };
178 0 :
179 0 : match project_data {
180 0 : Some(project_data) => Ok((t, Some(project_data.clone()))),
181 : None => {
182 0 : let project_data = api_client
183 0 : .find_tenant_project(t.tenant_id)
184 0 : .await
185 0 : .map_err(|e| anyhow::anyhow!(e));
186 0 :
187 0 : // Populate cache with result of API call
188 0 : {
189 0 : let mut cache = console_cache.lock().unwrap();
190 0 : if let Ok(Some(project_data)) = &project_data {
191 0 : cache.projects.insert(t.tenant_id, project_data.clone());
192 0 : } else if let Ok(None) = &project_data {
193 0 : cache.not_found.insert(t.tenant_id);
194 0 : }
195 : }
196 :
197 0 : project_data.map(|r| (t, r))
198 : }
199 : }
200 0 : }
201 0 : });
202 0 : let tenants_checked = tenants_checked.try_buffer_unordered(CONSOLE_CONCURRENCY);
203 0 :
204 0 : // Process the results of Tenant checks. If a Tenant is garbage, it goes into
205 0 : // the `GarbageList`. Else it goes into `active_tenants` for more detailed timeline
206 0 : // checks if they are enabled by the `depth` parameter.
207 0 : pin_mut!(tenants_checked);
208 0 : let mut garbage = GarbageList::new(node_kind, bucket_config);
209 0 : let mut active_tenants: Vec<TenantShardId> = vec![];
210 0 : let mut counter = 0;
211 0 : while let Some(result) = tenants_checked.next().await {
212 0 : let (tenant_shard_id, console_result) = result?;
213 :
214 : // Paranoia check
215 0 : if let Some(project) = &console_result {
216 0 : assert!(project.tenant == tenant_shard_id.tenant_id);
217 0 : }
218 :
219 0 : if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) {
220 0 : tracing::debug!("Tenant {tenant_shard_id} is garbage");
221 : } else {
222 0 : tracing::debug!("Tenant {tenant_shard_id} is active");
223 0 : active_tenants.push(tenant_shard_id);
224 : }
225 :
226 0 : counter += 1;
227 0 : if counter % 1000 == 0 {
228 0 : tracing::info!(
229 0 : "Progress: {counter} tenants checked, {} active, {} garbage",
230 0 : active_tenants.len(),
231 0 : garbage.items.len()
232 0 : );
233 0 : }
234 : }
235 :
236 0 : tracing::info!(
237 0 : "Found {}/{} garbage tenants",
238 0 : garbage.items.len(),
239 0 : garbage.items.len() + active_tenants.len()
240 0 : );
241 :
242 : // If we are only checking tenant-deep, we are done. Otherwise we must
243 : // proceed to check the individual timelines of the active tenants.
244 0 : if depth == TraversingDepth::Tenant {
245 0 : return Ok(garbage);
246 0 : }
247 :
248 0 : tracing::info!(
249 0 : "Checking timelines for {} active tenants",
250 0 : active_tenants.len(),
251 0 : );
252 :
253 : // Construct a stream of all timelines within active tenants
254 0 : let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
255 0 : let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, *t));
256 0 : let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
257 0 : let timelines = timelines.try_flatten();
258 0 :
259 0 : // For all timelines within active tenants, call into console API to check their existence
260 0 : let timelines_checked = timelines.map_ok(|ttid| {
261 0 : let api_client = cloud_admin_api_client.clone();
262 0 : async move {
263 0 : api_client
264 0 : .find_timeline_branch(ttid.timeline_id)
265 0 : .await
266 0 : .map_err(|e| anyhow::anyhow!(e))
267 0 : .map(|r| (ttid, r))
268 0 : }
269 0 : });
270 0 : let timelines_checked = timelines_checked.try_buffer_unordered(CONSOLE_CONCURRENCY);
271 0 :
272 0 : // Update the GarbageList with any timelines which appear not to exist.
273 0 : pin_mut!(timelines_checked);
274 0 : while let Some(result) = timelines_checked.next().await {
275 0 : let (ttid, console_result) = result?;
276 0 : if garbage.maybe_append(GarbageEntity::Timeline(ttid), console_result) {
277 0 : tracing::debug!("Timeline {ttid} is garbage");
278 : } else {
279 0 : tracing::debug!("Timeline {ttid} is active");
280 : }
281 : }
282 :
283 0 : Ok(garbage)
284 0 : }
285 :
286 0 : #[derive(clap::ValueEnum, Debug, Clone)]
287 : pub enum PurgeMode {
288 : /// The safest mode: only delete tenants that were explicitly reported as deleted
289 : /// by Console API.
290 : DeletedOnly,
291 :
292 : /// Delete all garbage tenants, including those which are only presumed to be deleted,
293 : /// because the Console API could not find them.
294 : DeletedAndMissing,
295 : }
296 :
297 : impl std::fmt::Display for PurgeMode {
298 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 0 : match self {
300 0 : PurgeMode::DeletedOnly => write!(f, "deleted-only"),
301 0 : PurgeMode::DeletedAndMissing => write!(f, "deleted-and-missing"),
302 : }
303 0 : }
304 : }
305 :
306 0 : pub async fn get_tenant_objects(
307 0 : s3_client: &Arc<Client>,
308 0 : target: RootTarget,
309 0 : tenant_shard_id: TenantShardId,
310 0 : ) -> anyhow::Result<Vec<ObjectIdentifier>> {
311 0 : tracing::debug!("Listing objects in tenant {tenant_shard_id}");
312 : // TODO: apply extra validation based on object modification time. Don't purge
313 : // tenants where any timeline's index_part.json has been touched recently.
314 :
315 0 : let mut tenant_root = target.tenant_root(&tenant_shard_id);
316 0 :
317 0 : // Remove delimiter, so that object listing lists all keys in the prefix and not just
318 0 : // common prefixes.
319 0 : tenant_root.delimiter = String::new();
320 0 :
321 0 : let key_stream = stream_listing(s3_client, &tenant_root);
322 0 : key_stream.try_collect().await
323 0 : }
324 :
325 0 : pub async fn get_timeline_objects(
326 0 : s3_client: &Arc<Client>,
327 0 : target: RootTarget,
328 0 : ttid: TenantShardTimelineId,
329 0 : ) -> anyhow::Result<Vec<ObjectIdentifier>> {
330 0 : tracing::debug!("Listing objects in timeline {ttid}");
331 0 : let mut timeline_root = target.timeline_root(&ttid);
332 0 :
333 0 : // TODO: apply extra validation based on object modification time. Don't purge
334 0 : // timelines whose index_part.json has been touched recently.
335 0 :
336 0 : // Remove delimiter, so that object listing lists all keys in the prefix and not just
337 0 : // common prefixes.
338 0 : timeline_root.delimiter = String::new();
339 0 : let key_stream = stream_listing(s3_client, &timeline_root);
340 0 :
341 0 : key_stream.try_collect().await
342 0 : }
343 :
344 : const MAX_KEYS_PER_DELETE: usize = 1000;
345 :
346 : /// Drain a buffer of keys into DeleteObjects requests
347 0 : async fn do_delete(
348 0 : s3_client: &Arc<Client>,
349 0 : bucket_name: &str,
350 0 : keys: &mut Vec<ObjectIdentifier>,
351 0 : dry_run: bool,
352 0 : drain: bool,
353 0 : ) -> anyhow::Result<()> {
354 0 : while (!keys.is_empty() && drain) || (keys.len() >= MAX_KEYS_PER_DELETE) {
355 0 : let request_keys =
356 0 : keys.split_off(keys.len() - (std::cmp::min(MAX_KEYS_PER_DELETE, keys.len())));
357 0 : if dry_run {
358 0 : tracing::info!("Dry-run deletion of objects: ");
359 0 : for k in request_keys {
360 0 : tracing::info!(" {k:?}");
361 : }
362 : } else {
363 0 : let delete_request = s3_client
364 0 : .delete_objects()
365 0 : .bucket(bucket_name)
366 0 : .delete(Delete::builder().set_objects(Some(request_keys)).build()?);
367 0 : delete_request
368 0 : .send()
369 0 : .await
370 0 : .context("DeleteObjects request")?;
371 : }
372 : }
373 :
374 0 : Ok(())
375 0 : }
376 :
377 0 : pub async fn purge_garbage(
378 0 : input_path: String,
379 0 : mode: PurgeMode,
380 0 : dry_run: bool,
381 0 : ) -> anyhow::Result<()> {
382 0 : let list_bytes = tokio::fs::read(&input_path).await?;
383 0 : let garbage_list = serde_json::from_slice::<GarbageList>(&list_bytes)?;
384 0 : tracing::info!(
385 0 : "Loaded {} items in garbage list from {}",
386 0 : garbage_list.items.len(),
387 0 : input_path
388 0 : );
389 :
390 0 : let (s3_client, target) =
391 0 : init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind)?;
392 :
393 : // Sanity checks on the incoming list
394 0 : if garbage_list.active_tenant_count == 0 {
395 0 : anyhow::bail!("Refusing to purge a garbage list that reports 0 active tenants");
396 0 : }
397 0 :
398 0 : let filtered_items = garbage_list
399 0 : .items
400 0 : .iter()
401 0 : .filter(|i| match (&mode, &i.reason) {
402 0 : (PurgeMode::DeletedAndMissing, _) => true,
403 0 : (PurgeMode::DeletedOnly, GarbageReason::DeletedInConsole) => true,
404 0 : (PurgeMode::DeletedOnly, GarbageReason::MissingInConsole) => false,
405 0 : });
406 :
407 0 : tracing::info!(
408 0 : "Filtered down to {} garbage items based on mode {}",
409 0 : garbage_list.items.len(),
410 0 : mode
411 0 : );
412 :
413 0 : let items = tokio_stream::iter(filtered_items.map(Ok));
414 0 : let get_objects_results = items.map_ok(|i| {
415 0 : let s3_client = s3_client.clone();
416 0 : let target = target.clone();
417 0 : async move {
418 0 : match i.entity {
419 0 : GarbageEntity::Tenant(tenant_id) => {
420 0 : get_tenant_objects(&s3_client, target, tenant_id).await
421 : }
422 0 : GarbageEntity::Timeline(ttid) => {
423 0 : get_timeline_objects(&s3_client, target, ttid).await
424 : }
425 : }
426 0 : }
427 0 : });
428 0 : let get_objects_results = get_objects_results.try_buffer_unordered(S3_CONCURRENCY);
429 0 :
430 0 : pin_mut!(get_objects_results);
431 0 : let mut objects_to_delete = Vec::new();
432 0 : while let Some(result) = get_objects_results.next().await {
433 0 : let mut object_list = result?;
434 0 : objects_to_delete.append(&mut object_list);
435 0 : if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
436 0 : do_delete(
437 0 : &s3_client,
438 0 : &garbage_list.bucket_config.bucket,
439 0 : &mut objects_to_delete,
440 0 : dry_run,
441 0 : false,
442 0 : )
443 0 : .await?;
444 0 : }
445 : }
446 :
447 0 : do_delete(
448 0 : &s3_client,
449 0 : &garbage_list.bucket_config.bucket,
450 0 : &mut objects_to_delete,
451 0 : dry_run,
452 0 : true,
453 0 : )
454 0 : .await?;
455 :
456 0 : tracing::info!("Fell through");
457 :
458 0 : Ok(())
459 0 : }
|