Line data Source code
1 : use std::collections::{BTreeMap, BTreeSet, HashMap};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use async_stream::try_stream;
6 : use futures::future::Either;
7 : use futures_util::{StreamExt, TryStreamExt};
8 : use pageserver::tenant::IndexPart;
9 : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
10 : use pageserver::tenant::remote_timeline_client::manifest::OffloadedTimelineManifest;
11 : use pageserver::tenant::remote_timeline_client::{
12 : parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path,
13 : };
14 : use pageserver::tenant::storage_layer::LayerName;
15 : use pageserver_api::controller_api::TenantDescribeResponse;
16 : use pageserver_api::shard::{ShardIndex, TenantShardId};
17 : use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
18 : use reqwest::Method;
19 : use serde::Serialize;
20 : use storage_controller_client::control_api;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::{Instrument, info_span};
23 : use utils::backoff;
24 : use utils::generation::Generation;
25 : use utils::id::{TenantId, TenantTimelineId};
26 :
27 : use crate::checks::{
28 : BlobDataParseResult, ListTenantManifestResult, RemoteTenantManifestInfo, list_tenant_manifests,
29 : list_timeline_blobs,
30 : };
31 : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
32 : use crate::{BucketConfig, MAX_RETRIES, NodeKind, RootTarget, TenantShardTimelineId, init_remote};
33 :
34 : #[derive(Serialize, Default)]
35 : pub struct GcSummary {
36 : indices_deleted: usize,
37 : tenant_manifests_deleted: usize,
38 : remote_storage_errors: usize,
39 : controller_api_errors: usize,
40 : ancestor_layers_deleted: usize,
41 : }
42 :
43 : impl GcSummary {
44 0 : fn merge(&mut self, other: Self) {
45 0 : let Self {
46 0 : indices_deleted,
47 0 : tenant_manifests_deleted,
48 0 : remote_storage_errors,
49 0 : ancestor_layers_deleted,
50 0 : controller_api_errors,
51 0 : } = other;
52 0 :
53 0 : self.indices_deleted += indices_deleted;
54 0 : self.tenant_manifests_deleted += tenant_manifests_deleted;
55 0 : self.remote_storage_errors += remote_storage_errors;
56 0 : self.ancestor_layers_deleted += ancestor_layers_deleted;
57 0 : self.controller_api_errors += controller_api_errors;
58 0 : }
59 : }
60 :
61 : #[derive(clap::ValueEnum, Debug, Clone, Copy)]
62 : pub enum GcMode {
63 : // Delete nothing
64 : DryRun,
65 :
66 : // Enable only removing old-generation indices
67 : IndicesOnly,
68 :
69 : // Enable all forms of GC
70 : Full,
71 : }
72 :
73 : impl std::fmt::Display for GcMode {
74 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 0 : match self {
76 0 : GcMode::DryRun => write!(f, "dry-run"),
77 0 : GcMode::IndicesOnly => write!(f, "indices-only"),
78 0 : GcMode::Full => write!(f, "full"),
79 : }
80 0 : }
81 : }
82 :
83 : mod refs {
84 : use super::*;
85 : // Map of cross-shard layer references, giving a refcount for each layer in each shard that is referenced by some other
86 : // shard in the same tenant. This is sparse! The vast majority of timelines will have no cross-shard refs, and those that
87 : // do have cross shard refs should eventually drop most of them via compaction.
88 : //
89 : // In our inner map type, the TTID in the key is shard-agnostic, and the ShardIndex in the value refers to the _ancestor
90 : // which is is referenced_.
91 : #[derive(Default)]
92 : pub(super) struct AncestorRefs(
93 : BTreeMap<TenantTimelineId, HashMap<(ShardIndex, LayerName), usize>>,
94 : );
95 :
96 : impl AncestorRefs {
97 : /// Insert references for layers discovered in a particular shard-timeline that refer to an ancestral shard-timeline.
98 0 : pub(super) fn update(
99 0 : &mut self,
100 0 : ttid: TenantShardTimelineId,
101 0 : layers: Vec<(LayerName, LayerFileMetadata)>,
102 0 : ) {
103 0 : let ttid_refs = self.0.entry(ttid.as_tenant_timeline_id()).or_default();
104 0 : for (layer_name, layer_metadata) in layers {
105 0 : // Increment refcount of this layer in the ancestor shard
106 0 : *(ttid_refs
107 0 : .entry((layer_metadata.shard, layer_name))
108 0 : .or_default()) += 1;
109 0 : }
110 0 : }
111 :
112 : /// For a particular TTID, return the map of all ancestor layers referenced by a descendent to their refcount
113 : ///
114 : /// The `ShardIndex` in the result's key is the index of the _ancestor_, not the descendent.
115 0 : pub(super) fn get_ttid_refcounts(
116 0 : &self,
117 0 : ttid: &TenantTimelineId,
118 0 : ) -> Option<&HashMap<(ShardIndex, LayerName), usize>> {
119 0 : self.0.get(ttid)
120 0 : }
121 : }
122 : }
123 :
124 : use refs::AncestorRefs;
125 :
126 : // As we see shards for a tenant, acccumulate knowledge needed for cross-shard GC:
127 : // - Are there any ancestor shards?
128 : // - Are there any refs to ancestor shards' layers?
129 : #[derive(Default)]
130 : struct TenantRefAccumulator {
131 : shards_seen: HashMap<TenantId, BTreeSet<ShardIndex>>,
132 :
133 : // For each shard that has refs to an ancestor's layers, the set of ancestor layers referred to
134 : ancestor_ref_shards: AncestorRefs,
135 : }
136 :
137 : impl TenantRefAccumulator {
138 0 : fn update(&mut self, ttid: TenantShardTimelineId, index_part: &IndexPart) {
139 0 : let this_shard_idx = ttid.tenant_shard_id.to_index();
140 0 : (*self
141 0 : .shards_seen
142 0 : .entry(ttid.tenant_shard_id.tenant_id)
143 0 : .or_default())
144 0 : .insert(this_shard_idx);
145 0 :
146 0 : let mut ancestor_refs = Vec::new();
147 0 : for (layer_name, layer_metadata) in &index_part.layer_metadata {
148 0 : if layer_metadata.shard != this_shard_idx {
149 0 : // This is a reference from this shard to a layer in an ancestor shard: we must track this
150 0 : // as a marker to not GC this layer from the parent.
151 0 : ancestor_refs.push((layer_name.clone(), layer_metadata.clone()));
152 0 : }
153 : }
154 :
155 0 : if !ancestor_refs.is_empty() {
156 0 : tracing::info!(%ttid, "Found {} ancestor refs", ancestor_refs.len());
157 0 : self.ancestor_ref_shards.update(ttid, ancestor_refs);
158 0 : }
159 0 : }
160 :
161 : /// Consume Self and return a vector of ancestor tenant shards that should be GC'd, and map of referenced ancestor layers to preserve
162 0 : async fn into_gc_ancestors(
163 0 : self,
164 0 : controller_client: &control_api::Client,
165 0 : summary: &mut GcSummary,
166 0 : ) -> (Vec<TenantShardId>, AncestorRefs) {
167 0 : let mut ancestors_to_gc = Vec::new();
168 0 : for (tenant_id, shard_indices) in self.shards_seen {
169 : // Find the highest shard count
170 0 : let latest_count = shard_indices
171 0 : .iter()
172 0 : .map(|i| i.shard_count)
173 0 : .max()
174 0 : .expect("Always at least one shard");
175 0 :
176 0 : let mut shard_indices = shard_indices.iter().collect::<Vec<_>>();
177 0 : let (mut latest_shards, ancestor_shards) = {
178 0 : let at =
179 0 : itertools::partition(&mut shard_indices, |i| i.shard_count == latest_count);
180 0 : (shard_indices[0..at].to_owned(), &shard_indices[at..])
181 0 : };
182 0 : // Sort shards, as we will later compare them with a sorted list from the controller
183 0 : latest_shards.sort();
184 0 :
185 0 : // Check that we have a complete view of the latest shard count: this should always be the case unless we happened
186 0 : // to scan the S3 bucket halfway through a shard split.
187 0 : if latest_shards.len() != latest_count.count() as usize {
188 : // This should be extremely rare, so we warn on it.
189 0 : tracing::warn!(%tenant_id, "Missed some shards at count {:?}: {latest_shards:?}", latest_count);
190 0 : continue;
191 0 : }
192 0 :
193 0 : // Check if we have any non-latest-count shards
194 0 : if ancestor_shards.is_empty() {
195 0 : tracing::debug!(%tenant_id, "No ancestor shards to clean up");
196 0 : continue;
197 0 : }
198 0 :
199 0 : // Based on S3 view, this tenant looks like it might have some ancestor shard work to do. We
200 0 : // must only do this work if the tenant is not currently being split: otherwise, it is not safe
201 0 : // to GC ancestors, because if the split fails then the controller will try to attach ancestor
202 0 : // shards again.
203 0 : match controller_client
204 0 : .dispatch::<(), TenantDescribeResponse>(
205 0 : Method::GET,
206 0 : format!("control/v1/tenant/{tenant_id}"),
207 0 : None,
208 0 : )
209 0 : .await
210 : {
211 0 : Err(e) => {
212 0 : // We were not able to learn the latest shard split state from the controller, so we will not
213 0 : // do ancestor GC on this tenant.
214 0 : tracing::warn!(%tenant_id, "Failed to query storage controller, will not do ancestor GC: {e}");
215 0 : summary.controller_api_errors += 1;
216 0 : continue;
217 : }
218 0 : Ok(desc) => {
219 0 : // We expect to see that the latest shard count matches the one we saw in S3, and that none
220 0 : // of the shards indicate splitting in progress.
221 0 :
222 0 : let controller_indices: Vec<ShardIndex> = desc
223 0 : .shards
224 0 : .iter()
225 0 : .map(|s| s.tenant_shard_id.to_index())
226 0 : .collect();
227 0 : if !controller_indices.iter().eq(latest_shards.iter().copied()) {
228 0 : tracing::info!(%tenant_id, "Latest shards seen in S3 ({latest_shards:?}) don't match controller state ({controller_indices:?})");
229 0 : continue;
230 0 : }
231 0 :
232 0 : if desc.shards.iter().any(|s| s.is_splitting) {
233 0 : tracing::info!(%tenant_id, "One or more shards is currently splitting");
234 0 : continue;
235 0 : }
236 0 :
237 0 : // This shouldn't be too noisy, because we only log this for tenants that have some ancestral refs.
238 0 : tracing::info!(%tenant_id, "Validated state with controller: {desc:?}");
239 : }
240 : }
241 :
242 : // GC ancestor shards
243 0 : for ancestor_shard in ancestor_shards.iter().map(|idx| TenantShardId {
244 0 : tenant_id,
245 0 : shard_count: idx.shard_count,
246 0 : shard_number: idx.shard_number,
247 0 : }) {
248 0 : ancestors_to_gc.push(ancestor_shard);
249 0 : }
250 : }
251 :
252 0 : (ancestors_to_gc, self.ancestor_ref_shards)
253 0 : }
254 : }
255 :
256 0 : fn is_old_enough(min_age: &Duration, key: &ListingObject, summary: &mut GcSummary) -> bool {
257 : // Validation: we will only GC indices & layers after a time threshold (e.g. one week) so that during an incident
258 : // it is easier to read old data for analysis, and easier to roll back shard splits without having to un-delete any objects.
259 0 : let age = match key.last_modified.elapsed() {
260 0 : Ok(e) => e,
261 : Err(_) => {
262 0 : tracing::warn!("Bad last_modified time: {:?}", key.last_modified);
263 0 : summary.remote_storage_errors += 1;
264 0 : return false;
265 : }
266 : };
267 0 : let old_enough = &age > min_age;
268 0 :
269 0 : if !old_enough {
270 0 : tracing::info!(
271 0 : "Skipping young object {} < {}",
272 0 : humantime::format_duration(age),
273 0 : humantime::format_duration(*min_age)
274 : );
275 0 : }
276 :
277 0 : old_enough
278 0 : }
279 :
280 : /// Same as [`is_old_enough`], but doesn't require a [`ListingObject`] passed to it.
281 0 : async fn check_is_old_enough(
282 0 : remote_client: &GenericRemoteStorage,
283 0 : key: &RemotePath,
284 0 : min_age: &Duration,
285 0 : summary: &mut GcSummary,
286 0 : ) -> Option<bool> {
287 0 : let listing_object = remote_client
288 0 : .head_object(key, &CancellationToken::new())
289 0 : .await
290 0 : .ok()?;
291 0 : Some(is_old_enough(min_age, &listing_object, summary))
292 0 : }
293 :
294 0 : async fn maybe_delete_index(
295 0 : remote_client: &GenericRemoteStorage,
296 0 : min_age: &Duration,
297 0 : latest_gen: Generation,
298 0 : obj: &ListingObject,
299 0 : mode: GcMode,
300 0 : summary: &mut GcSummary,
301 0 : ) {
302 0 : // Validation: we will only delete things that parse cleanly
303 0 : let basename = obj.key.get_path().file_name().unwrap();
304 0 : let candidate_generation =
305 0 : match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) {
306 0 : Some(g) => g,
307 : None => {
308 0 : if basename == IndexPart::FILE_NAME {
309 : // A legacy pre-generation index
310 0 : Generation::none()
311 : } else {
312 : // A strange key: we will not delete this because we don't understand it.
313 0 : tracing::warn!("Bad index key");
314 0 : return;
315 : }
316 : }
317 : };
318 :
319 : // Validation: we will only delete indices more than one generation old, to avoid interfering
320 : // in typical migrations, even if they are very long running.
321 0 : if candidate_generation >= latest_gen {
322 : // This shouldn't happen: when we loaded metadata, it should have selected the latest
323 : // generation already, and only populated [`S3TimelineBlobData::unused_index_keys`]
324 : // with older generations.
325 0 : tracing::warn!("Deletion candidate is >= latest generation, this is a bug!");
326 0 : return;
327 0 : } else if candidate_generation.next() == latest_gen {
328 : // Skip deleting the latest-1th generation's index.
329 0 : return;
330 0 : }
331 0 :
332 0 : if !is_old_enough(min_age, obj, summary) {
333 0 : return;
334 0 : }
335 :
336 0 : if matches!(mode, GcMode::DryRun) {
337 0 : tracing::info!("Dry run: would delete this key");
338 0 : return;
339 0 : }
340 0 :
341 0 : // All validations passed: erase the object
342 0 : let cancel = CancellationToken::new();
343 0 : match backoff::retry(
344 0 : || remote_client.delete(&obj.key, &cancel),
345 0 : |_| false,
346 0 : 3,
347 0 : MAX_RETRIES as u32,
348 0 : "maybe_delete_index",
349 0 : &cancel,
350 0 : )
351 0 : .await
352 : {
353 : None => {
354 0 : unreachable!("Using a dummy cancellation token");
355 : }
356 : Some(Ok(_)) => {
357 0 : tracing::info!("Successfully deleted index");
358 0 : summary.indices_deleted += 1;
359 : }
360 0 : Some(Err(e)) => {
361 0 : tracing::warn!("Failed to delete index: {e}");
362 0 : summary.remote_storage_errors += 1;
363 : }
364 : }
365 0 : }
366 :
367 0 : async fn maybe_delete_tenant_manifest(
368 0 : remote_client: &GenericRemoteStorage,
369 0 : min_age: &Duration,
370 0 : latest_gen: Generation,
371 0 : obj: &ListingObject,
372 0 : mode: GcMode,
373 0 : summary: &mut GcSummary,
374 0 : ) {
375 0 : // Validation: we will only delete things that parse cleanly
376 0 : let basename = obj.key.get_path().file_name().unwrap();
377 0 : let Some(candidate_generation) =
378 0 : parse_remote_tenant_manifest_path(RemotePath::from_string(basename).unwrap())
379 : else {
380 : // A strange key: we will not delete this because we don't understand it.
381 0 : tracing::warn!("Bad index key");
382 0 : return;
383 : };
384 :
385 : // Validation: we will only delete manifests more than one generation old, and in fact we
386 : // should never be called with such recent generations.
387 0 : if candidate_generation >= latest_gen {
388 0 : tracing::warn!("Deletion candidate is >= latest generation, this is a bug!");
389 0 : return;
390 0 : } else if candidate_generation.next() == latest_gen {
391 0 : tracing::warn!("Deletion candidate is >= latest generation - 1, this is a bug!");
392 0 : return;
393 0 : }
394 0 :
395 0 : if !is_old_enough(min_age, obj, summary) {
396 0 : return;
397 0 : }
398 :
399 0 : if matches!(mode, GcMode::DryRun) {
400 0 : tracing::info!("Dry run: would delete this key");
401 0 : return;
402 0 : }
403 0 :
404 0 : // All validations passed: erase the object
405 0 : let cancel = CancellationToken::new();
406 0 : match backoff::retry(
407 0 : || remote_client.delete(&obj.key, &cancel),
408 0 : |_| false,
409 0 : 3,
410 0 : MAX_RETRIES as u32,
411 0 : "maybe_delete_tenant_manifest",
412 0 : &cancel,
413 0 : )
414 0 : .await
415 : {
416 : None => {
417 0 : unreachable!("Using a dummy cancellation token");
418 : }
419 : Some(Ok(_)) => {
420 0 : tracing::info!("Successfully deleted tenant manifest");
421 0 : summary.tenant_manifests_deleted += 1;
422 : }
423 0 : Some(Err(e)) => {
424 0 : tracing::warn!("Failed to delete tenant manifest: {e}");
425 0 : summary.remote_storage_errors += 1;
426 : }
427 : }
428 0 : }
429 :
430 : #[allow(clippy::too_many_arguments)]
431 0 : async fn gc_ancestor(
432 0 : remote_client: &GenericRemoteStorage,
433 0 : root_target: &RootTarget,
434 0 : min_age: &Duration,
435 0 : ancestor: TenantShardId,
436 0 : refs: &AncestorRefs,
437 0 : mode: GcMode,
438 0 : summary: &mut GcSummary,
439 0 : ) -> anyhow::Result<()> {
440 : // Scan timelines in the ancestor
441 0 : let timelines = stream_tenant_timelines(remote_client, root_target, ancestor).await?;
442 0 : let mut timelines = std::pin::pin!(timelines);
443 :
444 : // Build a list of keys to retain
445 :
446 0 : while let Some(ttid) = timelines.next().await {
447 0 : let ttid = ttid?;
448 :
449 0 : let data = list_timeline_blobs(remote_client, ttid, root_target).await?;
450 :
451 0 : let s3_layers = match data.blob_data {
452 : BlobDataParseResult::Parsed {
453 : index_part: _,
454 : index_part_generation: _,
455 0 : s3_layers,
456 0 : index_part_last_modified_time: _,
457 0 : index_part_snapshot_time: _,
458 0 : } => s3_layers,
459 : BlobDataParseResult::Relic => {
460 : // Post-deletion tenant location: don't try and GC it.
461 0 : continue;
462 : }
463 : BlobDataParseResult::Incorrect {
464 0 : errors,
465 0 : s3_layers: _, // TODO(yuchen): could still check references to these s3 layers?
466 0 : } => {
467 0 : // Our primary purpose isn't to report on bad data, but log this rather than skipping silently
468 0 : tracing::warn!(
469 0 : "Skipping ancestor GC for timeline {ttid}, bad metadata: {errors:?}"
470 : );
471 0 : continue;
472 : }
473 : };
474 :
475 0 : let ttid_refs = refs.get_ttid_refcounts(&ttid.as_tenant_timeline_id());
476 0 : let ancestor_shard_index = ttid.tenant_shard_id.to_index();
477 :
478 0 : for (layer_name, layer_gen) in s3_layers {
479 0 : let ref_count = ttid_refs
480 0 : .and_then(|m| m.get(&(ancestor_shard_index, layer_name.clone())))
481 0 : .copied()
482 0 : .unwrap_or(0);
483 0 :
484 0 : if ref_count > 0 {
485 0 : tracing::debug!(%ttid, "Ancestor layer {layer_name} has {ref_count} refs");
486 0 : continue;
487 0 : }
488 0 :
489 0 : tracing::info!(%ttid, "Ancestor layer {layer_name} is not referenced");
490 :
491 : // Build the key for the layer we are considering deleting
492 0 : let key = root_target.absolute_key(&remote_layer_path(
493 0 : &ttid.tenant_shard_id.tenant_id,
494 0 : &ttid.timeline_id,
495 0 : ancestor_shard_index,
496 0 : &layer_name,
497 0 : layer_gen,
498 0 : ));
499 0 :
500 0 : // We apply a time threshold to GCing objects that are un-referenced: this preserves our ability
501 0 : // to roll back a shard split if we have to, by avoiding deleting ancestor layers right away
502 0 : let path = RemotePath::from_string(key.strip_prefix("/").unwrap_or(&key)).unwrap();
503 0 : if check_is_old_enough(remote_client, &path, min_age, summary).await != Some(true) {
504 0 : continue;
505 0 : }
506 :
507 0 : if !matches!(mode, GcMode::Full) {
508 0 : tracing::info!("Dry run: would delete key {key}");
509 0 : continue;
510 0 : }
511 0 :
512 0 : // All validations passed: erase the object
513 0 : match remote_client.delete(&path, &CancellationToken::new()).await {
514 : Ok(_) => {
515 0 : tracing::info!("Successfully deleted unreferenced ancestor layer {key}");
516 0 : summary.ancestor_layers_deleted += 1;
517 : }
518 0 : Err(e) => {
519 0 : tracing::warn!("Failed to delete layer {key}: {e}");
520 0 : summary.remote_storage_errors += 1;
521 : }
522 : }
523 : }
524 :
525 : // TODO: if all the layers are gone, clean up the whole timeline dir (remove index)
526 : }
527 :
528 0 : Ok(())
529 0 : }
530 :
531 0 : async fn gc_tenant_manifests(
532 0 : remote_client: &GenericRemoteStorage,
533 0 : min_age: Duration,
534 0 : target: &RootTarget,
535 0 : mode: GcMode,
536 0 : tenant_shard_id: TenantShardId,
537 0 : ) -> anyhow::Result<(GcSummary, Option<RemoteTenantManifestInfo>)> {
538 0 : let mut gc_summary = GcSummary::default();
539 0 : match list_tenant_manifests(remote_client, tenant_shard_id, target).await? {
540 : ListTenantManifestResult::WithErrors {
541 0 : errors,
542 : unknown_keys: _,
543 : } => {
544 0 : for (_key, error) in errors {
545 0 : tracing::warn!(%tenant_shard_id, "list_tenant_manifests: {error}");
546 : }
547 0 : Ok((gc_summary, None))
548 : }
549 : ListTenantManifestResult::NoErrors {
550 0 : latest_generation,
551 0 : mut manifests,
552 : } => {
553 0 : let Some(latest_generation) = latest_generation else {
554 0 : return Ok((gc_summary, None));
555 : };
556 0 : manifests.sort_by_key(|(generation, _obj)| *generation);
557 0 : // skip the two latest generations (they don't neccessarily have to be 1 apart from each other)
558 0 : let candidates = manifests.iter().rev().skip(2);
559 0 : for (_generation, key) in candidates {
560 0 : maybe_delete_tenant_manifest(
561 0 : remote_client,
562 0 : &min_age,
563 0 : latest_generation.generation,
564 0 : key,
565 0 : mode,
566 0 : &mut gc_summary,
567 0 : )
568 0 : .instrument(
569 0 : info_span!("maybe_delete_tenant_manifest", %tenant_shard_id, ?latest_generation.generation, %key.key),
570 : )
571 0 : .await;
572 : }
573 0 : Ok((gc_summary, Some(latest_generation)))
574 : }
575 : }
576 0 : }
577 :
578 0 : async fn gc_timeline(
579 0 : remote_client: &GenericRemoteStorage,
580 0 : min_age: &Duration,
581 0 : target: &RootTarget,
582 0 : mode: GcMode,
583 0 : ttid: TenantShardTimelineId,
584 0 : accumulator: &std::sync::Mutex<TenantRefAccumulator>,
585 0 : tenant_manifest_info: Arc<Option<RemoteTenantManifestInfo>>,
586 0 : ) -> anyhow::Result<GcSummary> {
587 0 : let mut summary = GcSummary::default();
588 0 : let data = list_timeline_blobs(remote_client, ttid, target).await?;
589 :
590 0 : let (index_part, latest_gen, candidates) = match &data.blob_data {
591 : BlobDataParseResult::Parsed {
592 0 : index_part,
593 0 : index_part_generation,
594 0 : s3_layers: _,
595 0 : index_part_last_modified_time: _,
596 0 : index_part_snapshot_time: _,
597 0 : } => (index_part, *index_part_generation, data.unused_index_keys),
598 : BlobDataParseResult::Relic => {
599 : // Post-deletion tenant location: don't try and GC it.
600 0 : return Ok(summary);
601 : }
602 : BlobDataParseResult::Incorrect {
603 0 : errors,
604 0 : s3_layers: _,
605 0 : } => {
606 0 : // Our primary purpose isn't to report on bad data, but log this rather than skipping silently
607 0 : tracing::warn!("Skipping timeline {ttid}, bad metadata: {errors:?}");
608 0 : return Ok(summary);
609 : }
610 : };
611 :
612 0 : if let Some(tenant_manifest_info) = &*tenant_manifest_info {
613 : // TODO: this is O(n^2) in the number of offloaded timelines. Do a hashmap lookup instead.
614 0 : let maybe_offloaded = tenant_manifest_info
615 0 : .manifest
616 0 : .offloaded_timelines
617 0 : .iter()
618 0 : .find(|offloaded_timeline| offloaded_timeline.timeline_id == ttid.timeline_id);
619 0 : if let Some(offloaded) = maybe_offloaded {
620 0 : let warnings = validate_index_part_with_offloaded(index_part, offloaded);
621 0 : let warn = if warnings.is_empty() {
622 0 : false
623 : } else {
624 : // Verify that the manifest hasn't changed. If it has, a potential racing change could have been cause for our troubles.
625 0 : match list_tenant_manifests(remote_client, ttid.tenant_shard_id, target).await? {
626 : ListTenantManifestResult::WithErrors {
627 0 : errors,
628 : unknown_keys: _,
629 : } => {
630 0 : for (_key, error) in errors {
631 0 : tracing::warn!(%ttid, "list_tenant_manifests in gc_timeline: {error}");
632 : }
633 0 : true
634 : }
635 : ListTenantManifestResult::NoErrors {
636 0 : latest_generation,
637 : manifests: _,
638 : } => {
639 0 : if let Some(new_latest_gen) = latest_generation {
640 0 : let manifest_changed = (
641 0 : new_latest_gen.generation,
642 0 : new_latest_gen.listing_object.last_modified,
643 0 : ) == (
644 0 : tenant_manifest_info.generation,
645 0 : tenant_manifest_info.listing_object.last_modified,
646 0 : );
647 0 : if manifest_changed {
648 0 : tracing::debug!(%ttid, "tenant manifest changed since it was loaded, suppressing {} warnings", warnings.len());
649 0 : }
650 0 : manifest_changed
651 : } else {
652 : // The latest generation is gone. This timeline is in the progress of being deleted?
653 0 : false
654 : }
655 : }
656 : }
657 : };
658 0 : if warn {
659 0 : for warning in warnings {
660 0 : tracing::warn!(%ttid, "{}", warning);
661 : }
662 0 : }
663 0 : }
664 0 : }
665 :
666 0 : accumulator.lock().unwrap().update(ttid, index_part);
667 :
668 0 : for key in candidates {
669 0 : maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary)
670 0 : .instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key))
671 0 : .await;
672 : }
673 :
674 0 : Ok(summary)
675 0 : }
676 :
677 0 : fn validate_index_part_with_offloaded(
678 0 : index_part: &IndexPart,
679 0 : offloaded: &OffloadedTimelineManifest,
680 0 : ) -> Vec<String> {
681 0 : let mut warnings = Vec::new();
682 0 : if let Some(archived_at_index_part) = index_part.archived_at {
683 0 : if archived_at_index_part
684 0 : .signed_duration_since(offloaded.archived_at)
685 0 : .num_seconds()
686 0 : != 0
687 0 : {
688 0 : warnings.push(format!(
689 0 : "index-part archived_at={} differs from manifest archived_at={}",
690 0 : archived_at_index_part, offloaded.archived_at
691 0 : ));
692 0 : }
693 0 : } else {
694 0 : warnings.push("Timeline offloaded in manifest but not archived in index-part".to_string());
695 0 : }
696 0 : if index_part.metadata.ancestor_timeline() != offloaded.ancestor_timeline_id {
697 0 : warnings.push(format!(
698 0 : "index-part anestor={:?} differs from manifest ancestor={:?}",
699 0 : index_part.metadata.ancestor_timeline(),
700 0 : offloaded.ancestor_timeline_id
701 0 : ));
702 0 : }
703 0 : warnings
704 0 : }
705 :
706 : /// Physical garbage collection: removing unused S3 objects.
707 : ///
708 : /// This is distinct from the garbage collection done inside the pageserver, which operates at a higher level
709 : /// (keys, layers). This type of garbage collection is about removing:
710 : /// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
711 : /// uploading a layer and uploading an index)
712 : /// - Index objects and tenant manifests from historic generations
713 : ///
714 : /// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
715 : /// make sure that object listings don't get slowed down by large numbers of garbage objects.
716 0 : pub async fn pageserver_physical_gc(
717 0 : bucket_config: &BucketConfig,
718 0 : controller_client: Option<&control_api::Client>,
719 0 : tenant_shard_ids: Vec<TenantShardId>,
720 0 : min_age: Duration,
721 0 : mode: GcMode,
722 0 : ) -> anyhow::Result<GcSummary> {
723 0 : let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
724 :
725 0 : let remote_client = Arc::new(remote_client);
726 0 : let tenants = if tenant_shard_ids.is_empty() {
727 0 : Either::Left(stream_tenants(&remote_client, &target))
728 : } else {
729 0 : Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
730 : };
731 :
732 : // How many tenants to process in parallel. We need to be mindful of pageservers
733 : // accessing the same per tenant prefixes, so use a lower setting than pageservers.
734 : const CONCURRENCY: usize = 32;
735 :
736 : // Accumulate information about each tenant for cross-shard GC step we'll do at the end
737 0 : let accumulator = std::sync::Mutex::new(TenantRefAccumulator::default());
738 0 :
739 0 : // Accumulate information about how many manifests we have GCd
740 0 : let manifest_gc_summary = std::sync::Mutex::new(GcSummary::default());
741 0 :
742 0 : // Generate a stream of TenantTimelineId
743 0 : let timelines = tenants.map_ok(|tenant_shard_id| {
744 0 : let target_ref = ⌖
745 0 : let remote_client_ref = &remote_client;
746 0 : let manifest_gc_summary_ref = &manifest_gc_summary;
747 0 : async move {
748 0 : let gc_manifest_result = gc_tenant_manifests(
749 0 : remote_client_ref,
750 0 : min_age,
751 0 : target_ref,
752 0 : mode,
753 0 : tenant_shard_id,
754 0 : )
755 0 : .await;
756 0 : let (summary_from_manifest, tenant_manifest_opt) = match gc_manifest_result {
757 0 : Ok((gc_summary, tenant_manifest)) => (gc_summary, tenant_manifest),
758 0 : Err(e) => {
759 0 : tracing::warn!(%tenant_shard_id, "Error in gc_tenant_manifests: {e}");
760 0 : (GcSummary::default(), None)
761 : }
762 : };
763 0 : manifest_gc_summary_ref
764 0 : .lock()
765 0 : .unwrap()
766 0 : .merge(summary_from_manifest);
767 0 : let tenant_manifest_arc = Arc::new(tenant_manifest_opt);
768 0 : let mut timelines = Box::pin(
769 0 : stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id).await?,
770 : );
771 0 : Ok(try_stream! {
772 0 : while let Some(ttid_res) = timelines.next().await {
773 0 : let ttid = ttid_res?;
774 0 : yield (ttid, tenant_manifest_arc.clone());
775 0 : }
776 0 : })
777 0 : }
778 0 : });
779 0 :
780 0 : let mut summary = GcSummary::default();
781 0 : {
782 0 : let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
783 0 : let timelines = timelines.try_flatten();
784 0 :
785 0 : let timelines = timelines.map_ok(|(ttid, tenant_manifest_arc)| {
786 0 : gc_timeline(
787 0 : &remote_client,
788 0 : &min_age,
789 0 : &target,
790 0 : mode,
791 0 : ttid,
792 0 : &accumulator,
793 0 : tenant_manifest_arc,
794 0 : )
795 0 : });
796 0 : let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
797 :
798 : // Drain futures for per-shard GC, populating accumulator as a side effect
799 0 : while let Some(i) = timelines.next().await {
800 0 : summary.merge(i?);
801 : }
802 : }
803 : // Streams are lazily evaluated, so only now do we have access to the inner object
804 0 : summary.merge(manifest_gc_summary.into_inner().unwrap());
805 :
806 : // Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC
807 0 : let Some(client) = controller_client else {
808 0 : tracing::info!("Skipping ancestor layer GC, because no `--controller-api` was specified");
809 0 : return Ok(summary);
810 : };
811 :
812 0 : let (ancestor_shards, ancestor_refs) = accumulator
813 0 : .into_inner()
814 0 : .unwrap()
815 0 : .into_gc_ancestors(client, &mut summary)
816 0 : .await;
817 :
818 0 : for ancestor_shard in ancestor_shards {
819 0 : gc_ancestor(
820 0 : &remote_client,
821 0 : &target,
822 0 : &min_age,
823 0 : ancestor_shard,
824 0 : &ancestor_refs,
825 0 : mode,
826 0 : &mut summary,
827 0 : )
828 0 : .instrument(info_span!("gc_ancestor", %ancestor_shard))
829 0 : .await?;
830 : }
831 :
832 0 : Ok(summary)
833 0 : }
|