Line data Source code
1 : use std::collections::{BTreeMap, BTreeSet, HashMap};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use crate::checks::{list_timeline_blobs, BlobDataParseResult};
6 : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
7 : use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES};
8 : use futures_util::{StreamExt, TryStreamExt};
9 : use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
10 : use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
11 : use pageserver::tenant::storage_layer::LayerName;
12 : use pageserver::tenant::IndexPart;
13 : use pageserver_api::controller_api::TenantDescribeResponse;
14 : use pageserver_api::shard::{ShardIndex, TenantShardId};
15 : use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
16 : use reqwest::Method;
17 : use serde::Serialize;
18 : use storage_controller_client::control_api;
19 : use tokio_util::sync::CancellationToken;
20 : use tracing::{info_span, Instrument};
21 : use utils::backoff;
22 : use utils::generation::Generation;
23 : use utils::id::{TenantId, TenantTimelineId};
24 :
25 : #[derive(Serialize, Default)]
26 : pub struct GcSummary {
27 : indices_deleted: usize,
28 : remote_storage_errors: usize,
29 : controller_api_errors: usize,
30 : ancestor_layers_deleted: usize,
31 : }
32 :
33 : impl GcSummary {
34 0 : fn merge(&mut self, other: Self) {
35 0 : let Self {
36 0 : indices_deleted,
37 0 : remote_storage_errors,
38 0 : ancestor_layers_deleted,
39 0 : controller_api_errors,
40 0 : } = other;
41 0 :
42 0 : self.indices_deleted += indices_deleted;
43 0 : self.remote_storage_errors += remote_storage_errors;
44 0 : self.ancestor_layers_deleted += ancestor_layers_deleted;
45 0 : self.controller_api_errors += controller_api_errors;
46 0 : }
47 : }
48 :
49 0 : #[derive(clap::ValueEnum, Debug, Clone, Copy)]
50 : pub enum GcMode {
51 : // Delete nothing
52 : DryRun,
53 :
54 : // Enable only removing old-generation indices
55 : IndicesOnly,
56 :
57 : // Enable all forms of GC
58 : Full,
59 : }
60 :
61 : impl std::fmt::Display for GcMode {
62 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 0 : match self {
64 0 : GcMode::DryRun => write!(f, "dry-run"),
65 0 : GcMode::IndicesOnly => write!(f, "indices-only"),
66 0 : GcMode::Full => write!(f, "full"),
67 : }
68 0 : }
69 : }
70 :
71 : mod refs {
72 : use super::*;
73 : // Map of cross-shard layer references, giving a refcount for each layer in each shard that is referenced by some other
74 : // shard in the same tenant. This is sparse! The vast majority of timelines will have no cross-shard refs, and those that
75 : // do have cross shard refs should eventually drop most of them via compaction.
76 : //
77 : // In our inner map type, the TTID in the key is shard-agnostic, and the ShardIndex in the value refers to the _ancestor
78 : // which is is referenced_.
79 : #[derive(Default)]
80 : pub(super) struct AncestorRefs(
81 : BTreeMap<TenantTimelineId, HashMap<(ShardIndex, LayerName), usize>>,
82 : );
83 :
84 : impl AncestorRefs {
85 : /// Insert references for layers discovered in a particular shard-timeline that refer to an ancestral shard-timeline.
86 0 : pub(super) fn update(
87 0 : &mut self,
88 0 : ttid: TenantShardTimelineId,
89 0 : layers: Vec<(LayerName, LayerFileMetadata)>,
90 0 : ) {
91 0 : let ttid_refs = self.0.entry(ttid.as_tenant_timeline_id()).or_default();
92 0 : for (layer_name, layer_metadata) in layers {
93 0 : // Increment refcount of this layer in the ancestor shard
94 0 : *(ttid_refs
95 0 : .entry((layer_metadata.shard, layer_name))
96 0 : .or_default()) += 1;
97 0 : }
98 0 : }
99 :
100 : /// For a particular TTID, return the map of all ancestor layers referenced by a descendent to their refcount
101 : ///
102 : /// The `ShardIndex` in the result's key is the index of the _ancestor_, not the descendent.
103 0 : pub(super) fn get_ttid_refcounts(
104 0 : &self,
105 0 : ttid: &TenantTimelineId,
106 0 : ) -> Option<&HashMap<(ShardIndex, LayerName), usize>> {
107 0 : self.0.get(ttid)
108 0 : }
109 : }
110 : }
111 :
112 : use refs::AncestorRefs;
113 :
114 : // As we see shards for a tenant, acccumulate knowledge needed for cross-shard GC:
115 : // - Are there any ancestor shards?
116 : // - Are there any refs to ancestor shards' layers?
117 : #[derive(Default)]
118 : struct TenantRefAccumulator {
119 : shards_seen: HashMap<TenantId, BTreeSet<ShardIndex>>,
120 :
121 : // For each shard that has refs to an ancestor's layers, the set of ancestor layers referred to
122 : ancestor_ref_shards: AncestorRefs,
123 : }
124 :
125 : impl TenantRefAccumulator {
126 0 : fn update(&mut self, ttid: TenantShardTimelineId, index_part: &IndexPart) {
127 0 : let this_shard_idx = ttid.tenant_shard_id.to_index();
128 0 : (*self
129 0 : .shards_seen
130 0 : .entry(ttid.tenant_shard_id.tenant_id)
131 0 : .or_default())
132 0 : .insert(this_shard_idx);
133 0 :
134 0 : let mut ancestor_refs = Vec::new();
135 0 : for (layer_name, layer_metadata) in &index_part.layer_metadata {
136 0 : if layer_metadata.shard != this_shard_idx {
137 0 : // This is a reference from this shard to a layer in an ancestor shard: we must track this
138 0 : // as a marker to not GC this layer from the parent.
139 0 : ancestor_refs.push((layer_name.clone(), layer_metadata.clone()));
140 0 : }
141 : }
142 :
143 0 : if !ancestor_refs.is_empty() {
144 0 : tracing::info!(%ttid, "Found {} ancestor refs", ancestor_refs.len());
145 0 : self.ancestor_ref_shards.update(ttid, ancestor_refs);
146 0 : }
147 0 : }
148 :
149 : /// Consume Self and return a vector of ancestor tenant shards that should be GC'd, and map of referenced ancestor layers to preserve
150 0 : async fn into_gc_ancestors(
151 0 : self,
152 0 : controller_client: &control_api::Client,
153 0 : summary: &mut GcSummary,
154 0 : ) -> (Vec<TenantShardId>, AncestorRefs) {
155 0 : let mut ancestors_to_gc = Vec::new();
156 0 : for (tenant_id, shard_indices) in self.shards_seen {
157 : // Find the highest shard count
158 0 : let latest_count = shard_indices
159 0 : .iter()
160 0 : .map(|i| i.shard_count)
161 0 : .max()
162 0 : .expect("Always at least one shard");
163 0 :
164 0 : let mut shard_indices = shard_indices.iter().collect::<Vec<_>>();
165 0 : let (mut latest_shards, ancestor_shards) = {
166 0 : let at =
167 0 : itertools::partition(&mut shard_indices, |i| i.shard_count == latest_count);
168 0 : (shard_indices[0..at].to_owned(), &shard_indices[at..])
169 0 : };
170 0 : // Sort shards, as we will later compare them with a sorted list from the controller
171 0 : latest_shards.sort();
172 0 :
173 0 : // Check that we have a complete view of the latest shard count: this should always be the case unless we happened
174 0 : // to scan the S3 bucket halfway through a shard split.
175 0 : if latest_shards.len() != latest_count.count() as usize {
176 : // This should be extremely rare, so we warn on it.
177 0 : tracing::warn!(%tenant_id, "Missed some shards at count {:?}: {latest_shards:?}", latest_count);
178 0 : continue;
179 0 : }
180 0 :
181 0 : // Check if we have any non-latest-count shards
182 0 : if ancestor_shards.is_empty() {
183 0 : tracing::debug!(%tenant_id, "No ancestor shards to clean up");
184 0 : continue;
185 0 : }
186 0 :
187 0 : // Based on S3 view, this tenant looks like it might have some ancestor shard work to do. We
188 0 : // must only do this work if the tenant is not currently being split: otherwise, it is not safe
189 0 : // to GC ancestors, because if the split fails then the controller will try to attach ancestor
190 0 : // shards again.
191 0 : match controller_client
192 0 : .dispatch::<(), TenantDescribeResponse>(
193 0 : Method::GET,
194 0 : format!("control/v1/tenant/{tenant_id}"),
195 0 : None,
196 0 : )
197 0 : .await
198 : {
199 0 : Err(e) => {
200 0 : // We were not able to learn the latest shard split state from the controller, so we will not
201 0 : // do ancestor GC on this tenant.
202 0 : tracing::warn!(%tenant_id, "Failed to query storage controller, will not do ancestor GC: {e}");
203 0 : summary.controller_api_errors += 1;
204 0 : continue;
205 : }
206 0 : Ok(desc) => {
207 0 : // We expect to see that the latest shard count matches the one we saw in S3, and that none
208 0 : // of the shards indicate splitting in progress.
209 0 :
210 0 : let controller_indices: Vec<ShardIndex> = desc
211 0 : .shards
212 0 : .iter()
213 0 : .map(|s| s.tenant_shard_id.to_index())
214 0 : .collect();
215 0 : if !controller_indices.iter().eq(latest_shards.iter().copied()) {
216 0 : tracing::info!(%tenant_id, "Latest shards seen in S3 ({latest_shards:?}) don't match controller state ({controller_indices:?})");
217 0 : continue;
218 0 : }
219 0 :
220 0 : if desc.shards.iter().any(|s| s.is_splitting) {
221 0 : tracing::info!(%tenant_id, "One or more shards is currently splitting");
222 0 : continue;
223 0 : }
224 0 :
225 0 : // This shouldn't be too noisy, because we only log this for tenants that have some ancestral refs.
226 0 : tracing::info!(%tenant_id, "Validated state with controller: {desc:?}");
227 : }
228 : }
229 :
230 : // GC ancestor shards
231 0 : for ancestor_shard in ancestor_shards.iter().map(|idx| TenantShardId {
232 0 : tenant_id,
233 0 : shard_count: idx.shard_count,
234 0 : shard_number: idx.shard_number,
235 0 : }) {
236 0 : ancestors_to_gc.push(ancestor_shard);
237 0 : }
238 : }
239 :
240 0 : (ancestors_to_gc, self.ancestor_ref_shards)
241 0 : }
242 : }
243 :
244 0 : fn is_old_enough(min_age: &Duration, key: &ListingObject, summary: &mut GcSummary) -> bool {
245 : // Validation: we will only GC indices & layers after a time threshold (e.g. one week) so that during an incident
246 : // it is easier to read old data for analysis, and easier to roll back shard splits without having to un-delete any objects.
247 0 : let age = match key.last_modified.elapsed() {
248 0 : Ok(e) => e,
249 : Err(_) => {
250 0 : tracing::warn!("Bad last_modified time: {:?}", key.last_modified);
251 0 : summary.remote_storage_errors += 1;
252 0 : return false;
253 : }
254 : };
255 0 : let old_enough = &age > min_age;
256 0 :
257 0 : if !old_enough {
258 0 : tracing::info!(
259 0 : "Skipping young object {} < {}",
260 0 : humantime::format_duration(age),
261 0 : humantime::format_duration(*min_age)
262 : );
263 0 : }
264 :
265 0 : old_enough
266 0 : }
267 :
268 : /// Same as [`is_old_enough`], but doesn't require a [`ListingObject`] passed to it.
269 0 : async fn check_is_old_enough(
270 0 : remote_client: &GenericRemoteStorage,
271 0 : key: &RemotePath,
272 0 : min_age: &Duration,
273 0 : summary: &mut GcSummary,
274 0 : ) -> Option<bool> {
275 0 : let listing_object = remote_client
276 0 : .head_object(key, &CancellationToken::new())
277 0 : .await
278 0 : .ok()?;
279 0 : Some(is_old_enough(min_age, &listing_object, summary))
280 0 : }
281 :
282 0 : async fn maybe_delete_index(
283 0 : remote_client: &GenericRemoteStorage,
284 0 : min_age: &Duration,
285 0 : latest_gen: Generation,
286 0 : obj: &ListingObject,
287 0 : mode: GcMode,
288 0 : summary: &mut GcSummary,
289 0 : ) {
290 0 : // Validation: we will only delete things that parse cleanly
291 0 : let basename = obj.key.get_path().file_name().unwrap();
292 0 : let candidate_generation =
293 0 : match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) {
294 0 : Some(g) => g,
295 : None => {
296 0 : if basename == IndexPart::FILE_NAME {
297 : // A legacy pre-generation index
298 0 : Generation::none()
299 : } else {
300 : // A strange key: we will not delete this because we don't understand it.
301 0 : tracing::warn!("Bad index key");
302 0 : return;
303 : }
304 : }
305 : };
306 :
307 : // Validation: we will only delete indices more than one generation old, to avoid interfering
308 : // in typical migrations, even if they are very long running.
309 0 : if candidate_generation >= latest_gen {
310 : // This shouldn't happen: when we loaded metadata, it should have selected the latest
311 : // generation already, and only populated [`S3TimelineBlobData::unused_index_keys`]
312 : // with older generations.
313 0 : tracing::warn!("Deletion candidate is >= latest generation, this is a bug!");
314 0 : return;
315 0 : } else if candidate_generation.next() == latest_gen {
316 : // Skip deleting the latest-1th generation's index.
317 0 : return;
318 0 : }
319 0 :
320 0 : if !is_old_enough(min_age, obj, summary) {
321 0 : return;
322 0 : }
323 :
324 0 : if matches!(mode, GcMode::DryRun) {
325 0 : tracing::info!("Dry run: would delete this key");
326 0 : return;
327 0 : }
328 0 :
329 0 : // All validations passed: erase the object
330 0 : let cancel = CancellationToken::new();
331 0 : match backoff::retry(
332 0 : || remote_client.delete(&obj.key, &cancel),
333 0 : |_| false,
334 0 : 3,
335 0 : MAX_RETRIES as u32,
336 0 : "maybe_delete_index",
337 0 : &cancel,
338 0 : )
339 0 : .await
340 : {
341 : None => {
342 0 : unreachable!("Using a dummy cancellation token");
343 : }
344 : Some(Ok(_)) => {
345 0 : tracing::info!("Successfully deleted index");
346 0 : summary.indices_deleted += 1;
347 : }
348 0 : Some(Err(e)) => {
349 0 : tracing::warn!("Failed to delete index: {e}");
350 0 : summary.remote_storage_errors += 1;
351 : }
352 : }
353 0 : }
354 :
355 : #[allow(clippy::too_many_arguments)]
356 0 : async fn gc_ancestor(
357 0 : remote_client: &GenericRemoteStorage,
358 0 : root_target: &RootTarget,
359 0 : min_age: &Duration,
360 0 : ancestor: TenantShardId,
361 0 : refs: &AncestorRefs,
362 0 : mode: GcMode,
363 0 : summary: &mut GcSummary,
364 0 : ) -> anyhow::Result<()> {
365 : // Scan timelines in the ancestor
366 0 : let timelines = stream_tenant_timelines(remote_client, root_target, ancestor).await?;
367 0 : let mut timelines = std::pin::pin!(timelines);
368 :
369 : // Build a list of keys to retain
370 :
371 0 : while let Some(ttid) = timelines.next().await {
372 0 : let ttid = ttid?;
373 :
374 0 : let data = list_timeline_blobs(remote_client, ttid, root_target).await?;
375 :
376 0 : let s3_layers = match data.blob_data {
377 : BlobDataParseResult::Parsed {
378 : index_part: _,
379 : index_part_generation: _,
380 0 : s3_layers,
381 0 : } => s3_layers,
382 : BlobDataParseResult::Relic => {
383 : // Post-deletion tenant location: don't try and GC it.
384 0 : continue;
385 : }
386 : BlobDataParseResult::Incorrect {
387 0 : errors,
388 0 : s3_layers: _, // TODO(yuchen): could still check references to these s3 layers?
389 0 : } => {
390 0 : // Our primary purpose isn't to report on bad data, but log this rather than skipping silently
391 0 : tracing::warn!(
392 0 : "Skipping ancestor GC for timeline {ttid}, bad metadata: {errors:?}"
393 : );
394 0 : continue;
395 : }
396 : };
397 :
398 0 : let ttid_refs = refs.get_ttid_refcounts(&ttid.as_tenant_timeline_id());
399 0 : let ancestor_shard_index = ttid.tenant_shard_id.to_index();
400 :
401 0 : for (layer_name, layer_gen) in s3_layers {
402 0 : let ref_count = ttid_refs
403 0 : .and_then(|m| m.get(&(ancestor_shard_index, layer_name.clone())))
404 0 : .copied()
405 0 : .unwrap_or(0);
406 0 :
407 0 : if ref_count > 0 {
408 0 : tracing::debug!(%ttid, "Ancestor layer {layer_name} has {ref_count} refs");
409 0 : continue;
410 0 : }
411 0 :
412 0 : tracing::info!(%ttid, "Ancestor layer {layer_name} is not referenced");
413 :
414 : // Build the key for the layer we are considering deleting
415 0 : let key = root_target.absolute_key(&remote_layer_path(
416 0 : &ttid.tenant_shard_id.tenant_id,
417 0 : &ttid.timeline_id,
418 0 : ancestor_shard_index,
419 0 : &layer_name,
420 0 : layer_gen,
421 0 : ));
422 0 :
423 0 : // We apply a time threshold to GCing objects that are un-referenced: this preserves our ability
424 0 : // to roll back a shard split if we have to, by avoiding deleting ancestor layers right away
425 0 : let path = RemotePath::from_string(key.strip_prefix("/").unwrap_or(&key)).unwrap();
426 0 : if check_is_old_enough(remote_client, &path, min_age, summary).await != Some(true) {
427 0 : continue;
428 0 : }
429 :
430 0 : if !matches!(mode, GcMode::Full) {
431 0 : tracing::info!("Dry run: would delete key {key}");
432 0 : continue;
433 0 : }
434 0 :
435 0 : // All validations passed: erase the object
436 0 : match remote_client.delete(&path, &CancellationToken::new()).await {
437 : Ok(_) => {
438 0 : tracing::info!("Successfully deleted unreferenced ancestor layer {key}");
439 0 : summary.ancestor_layers_deleted += 1;
440 : }
441 0 : Err(e) => {
442 0 : tracing::warn!("Failed to delete layer {key}: {e}");
443 0 : summary.remote_storage_errors += 1;
444 : }
445 : }
446 : }
447 :
448 : // TODO: if all the layers are gone, clean up the whole timeline dir (remove index)
449 : }
450 :
451 0 : Ok(())
452 0 : }
453 :
454 : /// Physical garbage collection: removing unused S3 objects.
455 : ///
456 : /// This is distinct from the garbage collection done inside the pageserver, which operates at a higher level
457 : /// (keys, layers). This type of garbage collection is about removing:
458 : /// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
459 : /// uploading a layer and uploading an index)
460 : /// - Index objects from historic generations
461 : ///
462 : /// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
463 : /// make sure that object listings don't get slowed down by large numbers of garbage objects.
464 0 : pub async fn pageserver_physical_gc(
465 0 : bucket_config: &BucketConfig,
466 0 : controller_client: Option<&control_api::Client>,
467 0 : tenant_shard_ids: Vec<TenantShardId>,
468 0 : min_age: Duration,
469 0 : mode: GcMode,
470 0 : ) -> anyhow::Result<GcSummary> {
471 0 : let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
472 :
473 0 : let tenants = if tenant_shard_ids.is_empty() {
474 0 : futures::future::Either::Left(stream_tenants(&remote_client, &target))
475 : } else {
476 0 : futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
477 : };
478 :
479 : // How many tenants to process in parallel. We need to be mindful of pageservers
480 : // accessing the same per tenant prefixes, so use a lower setting than pageservers.
481 : const CONCURRENCY: usize = 32;
482 :
483 : // Accumulate information about each tenant for cross-shard GC step we'll do at the end
484 0 : let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
485 0 :
486 0 : // Generate a stream of TenantTimelineId
487 0 : let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
488 0 : let timelines = timelines.try_buffered(CONCURRENCY);
489 0 : let timelines = timelines.try_flatten();
490 :
491 : // Generate a stream of S3TimelineBlobData
492 0 : async fn gc_timeline(
493 0 : remote_client: &GenericRemoteStorage,
494 0 : min_age: &Duration,
495 0 : target: &RootTarget,
496 0 : mode: GcMode,
497 0 : ttid: TenantShardTimelineId,
498 0 : accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
499 0 : ) -> anyhow::Result<GcSummary> {
500 0 : let mut summary = GcSummary::default();
501 0 : let data = list_timeline_blobs(remote_client, ttid, target).await?;
502 :
503 0 : let (index_part, latest_gen, candidates) = match &data.blob_data {
504 : BlobDataParseResult::Parsed {
505 0 : index_part,
506 0 : index_part_generation,
507 0 : s3_layers: _s3_layers,
508 0 : } => (index_part, *index_part_generation, data.unused_index_keys),
509 : BlobDataParseResult::Relic => {
510 : // Post-deletion tenant location: don't try and GC it.
511 0 : return Ok(summary);
512 : }
513 : BlobDataParseResult::Incorrect {
514 0 : errors,
515 0 : s3_layers: _,
516 0 : } => {
517 0 : // Our primary purpose isn't to report on bad data, but log this rather than skipping silently
518 0 : tracing::warn!("Skipping timeline {ttid}, bad metadata: {errors:?}");
519 0 : return Ok(summary);
520 : }
521 : };
522 :
523 0 : accumulator.lock().unwrap().update(ttid, index_part);
524 :
525 0 : for key in candidates {
526 0 : maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary)
527 0 : .instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key))
528 0 : .await;
529 : }
530 :
531 0 : Ok(summary)
532 0 : }
533 :
534 0 : let mut summary = GcSummary::default();
535 0 :
536 0 : // Drain futures for per-shard GC, populating accumulator as a side effect
537 0 : {
538 0 : let timelines = timelines.map_ok(|ttid| {
539 0 : gc_timeline(&remote_client, &min_age, &target, mode, ttid, &accumulator)
540 0 : });
541 0 : let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
542 :
543 0 : while let Some(i) = timelines.next().await {
544 0 : summary.merge(i?);
545 : }
546 : }
547 :
548 : // Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC
549 0 : let Some(client) = controller_client else {
550 0 : tracing::info!("Skipping ancestor layer GC, because no `--controller-api` was specified");
551 0 : return Ok(summary);
552 : };
553 :
554 0 : let (ancestor_shards, ancestor_refs) = Arc::into_inner(accumulator)
555 0 : .unwrap()
556 0 : .into_inner()
557 0 : .unwrap()
558 0 : .into_gc_ancestors(client, &mut summary)
559 0 : .await;
560 :
561 0 : for ancestor_shard in ancestor_shards {
562 0 : gc_ancestor(
563 0 : &remote_client,
564 0 : &target,
565 0 : &min_age,
566 0 : ancestor_shard,
567 0 : &ancestor_refs,
568 0 : mode,
569 0 : &mut summary,
570 0 : )
571 0 : .instrument(info_span!("gc_ancestor", %ancestor_shard))
572 0 : .await?;
573 : }
574 :
575 0 : Ok(summary)
576 0 : }
|