Line data Source code
1 : use std::time::{Duration, UNIX_EPOCH};
2 :
3 : use crate::checks::{list_timeline_blobs, BlobDataParseResult};
4 : use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
5 : use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId};
6 : use aws_sdk_s3::Client;
7 : use futures_util::{StreamExt, TryStreamExt};
8 : use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
9 : use pageserver::tenant::IndexPart;
10 : use pageserver_api::shard::TenantShardId;
11 : use remote_storage::RemotePath;
12 : use serde::Serialize;
13 : use tracing::{info_span, Instrument};
14 : use utils::generation::Generation;
15 :
16 : #[derive(Serialize, Default)]
17 : pub struct GcSummary {
18 : indices_deleted: usize,
19 : remote_storage_errors: usize,
20 : }
21 :
22 0 : #[derive(clap::ValueEnum, Debug, Clone, Copy)]
23 : pub enum GcMode {
24 : // Delete nothing
25 : DryRun,
26 :
27 : // Enable only removing old-generation indices
28 : IndicesOnly,
29 : // Enable all forms of GC
30 : // TODO: this will be used when shard split ancestor layer deletion is added
31 : // All,
32 : }
33 :
34 : impl std::fmt::Display for GcMode {
35 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 0 : match self {
37 0 : GcMode::DryRun => write!(f, "dry-run"),
38 0 : GcMode::IndicesOnly => write!(f, "indices-only"),
39 : }
40 0 : }
41 : }
42 :
43 0 : async fn maybe_delete_index(
44 0 : s3_client: &Client,
45 0 : bucket_config: &BucketConfig,
46 0 : min_age: &Duration,
47 0 : latest_gen: Generation,
48 0 : key: &str,
49 0 : mode: GcMode,
50 0 : summary: &mut GcSummary,
51 0 : ) {
52 0 : // Validation: we will only delete things that parse cleanly
53 0 : let basename = key.rsplit_once('/').unwrap().1;
54 0 : let candidate_generation =
55 0 : match parse_remote_index_path(RemotePath::from_string(basename).unwrap()) {
56 0 : Some(g) => g,
57 : None => {
58 0 : if basename == IndexPart::FILE_NAME {
59 : // A legacy pre-generation index
60 0 : Generation::none()
61 : } else {
62 : // A strange key: we will not delete this because we don't understand it.
63 0 : tracing::warn!("Bad index key");
64 0 : return;
65 : }
66 : }
67 : };
68 :
69 : // Validation: we will only delete indices more than one generation old, to avoid interfering
70 : // in typical migrations, even if they are very long running.
71 0 : if candidate_generation >= latest_gen {
72 : // This shouldn't happen: when we loaded metadata, it should have selected the latest
73 : // generation already, and only populated [`S3TimelineBlobData::unused_index_keys`]
74 : // with older generations.
75 0 : tracing::warn!("Deletion candidate is >= latest generation, this is a bug!");
76 0 : return;
77 0 : } else if candidate_generation.next() == latest_gen {
78 : // Skip deleting the latest-1th generation's index.
79 0 : return;
80 0 : }
81 :
82 : // Validation: we will only delete indices after one week, so that during incidents we will have
83 : // easy access to recent indices.
84 0 : let age: Duration = match s3_client
85 0 : .head_object()
86 0 : .bucket(&bucket_config.bucket)
87 0 : .key(key)
88 0 : .send()
89 0 : .await
90 : {
91 0 : Ok(response) => match response.last_modified {
92 : None => {
93 0 : tracing::warn!("Missing last_modified");
94 0 : summary.remote_storage_errors += 1;
95 0 : return;
96 : }
97 0 : Some(last_modified) => {
98 0 : let last_modified =
99 0 : UNIX_EPOCH + Duration::from_secs_f64(last_modified.as_secs_f64());
100 0 : match last_modified.elapsed() {
101 0 : Ok(e) => e,
102 : Err(_) => {
103 0 : tracing::warn!("Bad last_modified time: {last_modified:?}");
104 0 : return;
105 : }
106 : }
107 : }
108 : },
109 0 : Err(e) => {
110 0 : tracing::warn!("Failed to HEAD {key}: {e}");
111 0 : summary.remote_storage_errors += 1;
112 0 : return;
113 : }
114 : };
115 0 : if &age < min_age {
116 0 : tracing::info!(
117 0 : "Skipping young object {} < {}",
118 0 : age.as_secs_f64(),
119 0 : min_age.as_secs_f64()
120 : );
121 0 : return;
122 0 : }
123 :
124 0 : if matches!(mode, GcMode::DryRun) {
125 0 : tracing::info!("Dry run: would delete this key");
126 0 : return;
127 0 : }
128 0 :
129 0 : // All validations passed: erase the object
130 0 : match s3_client
131 0 : .delete_object()
132 0 : .bucket(&bucket_config.bucket)
133 0 : .key(key)
134 0 : .send()
135 0 : .await
136 : {
137 : Ok(_) => {
138 0 : tracing::info!("Successfully deleted index");
139 0 : summary.indices_deleted += 1;
140 : }
141 0 : Err(e) => {
142 0 : tracing::warn!("Failed to delete index: {e}");
143 0 : summary.remote_storage_errors += 1;
144 : }
145 : }
146 0 : }
147 :
148 : /// Physical garbage collection: removing unused S3 objects. This is distinct from the garbage collection
149 : /// done inside the pageserver, which operates at a higher level (keys, layers). This type of garbage collection
150 : /// is about removing:
151 : /// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
152 : /// uploading a layer and uploading an index)
153 : /// - Index objects from historic generations
154 : ///
155 : /// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
156 : /// make sure that object listings don't get slowed down by large numbers of garbage objects.
157 0 : pub async fn pageserver_physical_gc(
158 0 : bucket_config: BucketConfig,
159 0 : tenant_ids: Vec<TenantShardId>,
160 0 : min_age: Duration,
161 0 : mode: GcMode,
162 0 : ) -> anyhow::Result<GcSummary> {
163 0 : let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
164 :
165 0 : let tenants = if tenant_ids.is_empty() {
166 0 : futures::future::Either::Left(stream_tenants(&s3_client, &target))
167 : } else {
168 0 : futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok)))
169 : };
170 :
171 : // How many tenants to process in parallel. We need to be mindful of pageservers
172 : // accessing the same per tenant prefixes, so use a lower setting than pageservers.
173 : const CONCURRENCY: usize = 32;
174 :
175 : // Generate a stream of TenantTimelineId
176 0 : let timelines = tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, t));
177 0 : let timelines = timelines.try_buffered(CONCURRENCY);
178 0 : let timelines = timelines.try_flatten();
179 0 :
180 0 : // Generate a stream of S3TimelineBlobData
181 0 : async fn gc_timeline(
182 0 : s3_client: &Client,
183 0 : bucket_config: &BucketConfig,
184 0 : min_age: &Duration,
185 0 : target: &RootTarget,
186 0 : mode: GcMode,
187 0 : ttid: TenantShardTimelineId,
188 0 : ) -> anyhow::Result<GcSummary> {
189 0 : let mut summary = GcSummary::default();
190 0 : let data = list_timeline_blobs(s3_client, ttid, target).await?;
191 0 :
192 0 : let (latest_gen, candidates) = match &data.blob_data {
193 0 : BlobDataParseResult::Parsed {
194 0 : index_part: _index_part,
195 0 : index_part_generation,
196 0 : s3_layers: _s3_layers,
197 0 : } => (*index_part_generation, data.unused_index_keys),
198 0 : BlobDataParseResult::Relic => {
199 0 : // Post-deletion tenant location: don't try and GC it.
200 0 : return Ok(summary);
201 0 : }
202 0 : BlobDataParseResult::Incorrect(reasons) => {
203 0 : // Our primary purpose isn't to report on bad data, but log this rather than skipping silently
204 0 : tracing::warn!("Skipping timeline {ttid}, bad metadata: {reasons:?}");
205 0 : return Ok(summary);
206 0 : }
207 0 : };
208 0 :
209 0 : for key in candidates {
210 0 : maybe_delete_index(
211 0 : s3_client,
212 0 : bucket_config,
213 0 : min_age,
214 0 : latest_gen,
215 0 : &key,
216 0 : mode,
217 0 : &mut summary,
218 0 : )
219 0 : .instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, key))
220 0 : .await;
221 0 : }
222 0 :
223 0 : Ok(summary)
224 0 : }
225 0 : let timelines = timelines
226 0 : .map_ok(|ttid| gc_timeline(&s3_client, &bucket_config, &min_age, &target, mode, ttid));
227 0 : let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
228 0 :
229 0 : let mut summary = GcSummary::default();
230 :
231 0 : while let Some(i) = timelines.next().await {
232 0 : let tl_summary = i?;
233 :
234 0 : summary.indices_deleted += tl_summary.indices_deleted;
235 0 : summary.remote_storage_errors += tl_summary.remote_storage_errors;
236 : }
237 :
238 0 : Ok(summary)
239 0 : }
|