Line data Source code
1 : use std::collections::{HashMap, HashSet};
2 :
3 : use anyhow::Context;
4 : use aws_sdk_s3::{types::ObjectIdentifier, Client};
5 : use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
6 : use pageserver_api::shard::ShardIndex;
7 : use tracing::{error, info, warn};
8 : use utils::generation::Generation;
9 : use utils::id::TimelineId;
10 :
11 : use crate::cloud_admin_api::BranchData;
12 : use crate::metadata_stream::stream_listing;
13 : use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
14 : use futures_util::{pin_mut, StreamExt};
15 : use pageserver::tenant::remote_timeline_client::parse_remote_index_path;
16 : use pageserver::tenant::storage_layer::LayerFileName;
17 : use pageserver::tenant::IndexPart;
18 : use remote_storage::RemotePath;
19 :
20 : pub(crate) struct TimelineAnalysis {
21 : /// Anomalies detected
22 : pub(crate) errors: Vec<String>,
23 :
24 : /// Healthy-but-noteworthy, like old-versioned structures that are readable but
25 : /// worth reporting for awareness that we must not remove that old version decoding
26 : /// yet.
27 : pub(crate) warnings: Vec<String>,
28 :
29 : /// Keys not referenced in metadata: candidates for removal, but NOT NECESSARILY: beware
30 : /// of races between reading the metadata and reading the objects.
31 : pub(crate) garbage_keys: Vec<String>,
32 : }
33 :
34 : impl TimelineAnalysis {
35 0 : fn new() -> Self {
36 0 : Self {
37 0 : errors: Vec::new(),
38 0 : warnings: Vec::new(),
39 0 : garbage_keys: Vec::new(),
40 0 : }
41 0 : }
42 : }
43 :
44 0 : pub(crate) fn branch_cleanup_and_check_errors(
45 0 : id: &TenantShardTimelineId,
46 0 : tenant_objects: &mut TenantObjectListing,
47 0 : s3_active_branch: Option<&BranchData>,
48 0 : console_branch: Option<BranchData>,
49 0 : s3_data: Option<S3TimelineBlobData>,
50 0 : ) -> TimelineAnalysis {
51 0 : let mut result = TimelineAnalysis::new();
52 0 :
53 0 : info!("Checking timeline {id}");
54 :
55 0 : if let Some(s3_active_branch) = s3_active_branch {
56 0 : info!(
57 0 : "Checking console status for timeline for branch {:?}/{:?}",
58 0 : s3_active_branch.project_id, s3_active_branch.id
59 0 : );
60 0 : match console_branch {
61 0 : Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
62 0 : s3_active_branch.id, s3_active_branch.project_id))
63 : },
64 : None => {
65 0 : result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
66 0 : s3_active_branch.id, s3_active_branch.project_id))
67 : }
68 : };
69 0 : }
70 :
71 0 : match s3_data {
72 0 : Some(s3_data) => {
73 0 : result.garbage_keys.extend(s3_data.keys_to_remove);
74 0 :
75 0 : match s3_data.blob_data {
76 : BlobDataParseResult::Parsed {
77 0 : index_part,
78 0 : index_part_generation: _index_part_generation,
79 0 : s3_layers: _s3_layers,
80 0 : } => {
81 0 : if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
82 0 : result.errors.push(format!(
83 0 : "index_part.json version: {}",
84 0 : index_part.get_version()
85 0 : ))
86 0 : }
87 :
88 0 : if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
89 0 : result.warnings.push(format!(
90 0 : "index_part.json version is not latest: {}",
91 0 : index_part.get_version()
92 0 : ))
93 0 : }
94 :
95 0 : if index_part.metadata.disk_consistent_lsn()
96 0 : != index_part.get_disk_consistent_lsn()
97 : {
98 0 : result.errors.push(format!(
99 0 : "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
100 0 : index_part.metadata.disk_consistent_lsn(),
101 0 : index_part.get_disk_consistent_lsn(),
102 0 : ))
103 0 : }
104 :
105 0 : if index_part.layer_metadata.is_empty() {
106 : // not an error, can happen for branches with zero writes, but notice that
107 0 : info!("index_part.json has no layers");
108 0 : }
109 :
110 0 : for (layer, metadata) in index_part.layer_metadata {
111 0 : if metadata.file_size == 0 {
112 0 : result.errors.push(format!(
113 0 : "index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
114 0 : ))
115 0 : }
116 :
117 0 : if !tenant_objects.check_ref(id.timeline_id, &layer, &metadata) {
118 : // FIXME: this will emit false positives if an index was
119 : // uploaded concurrently with our scan. To make this check
120 : // correct, we need to try sending a HEAD request for the
121 : // layer we think is missing.
122 0 : result.errors.push(format!(
123 0 : "index_part.json contains a layer {}{} (shard {}) that is not present in remote storage",
124 0 : layer.file_name(),
125 0 : metadata.generation.get_suffix(),
126 0 : metadata.shard
127 0 : ))
128 0 : }
129 : }
130 : }
131 0 : BlobDataParseResult::Relic => {}
132 0 : BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
133 0 : parse_errors
134 0 : .into_iter()
135 0 : .map(|error| format!("parse error: {error}")),
136 0 : ),
137 : }
138 : }
139 0 : None => result
140 0 : .errors
141 0 : .push("Timeline has no data on S3 at all".to_string()),
142 : }
143 :
144 0 : if result.errors.is_empty() {
145 0 : info!("No check errors found");
146 : } else {
147 0 : warn!("Timeline metadata errors: {0:?}", result.errors);
148 : }
149 :
150 0 : if !result.warnings.is_empty() {
151 0 : warn!("Timeline metadata warnings: {0:?}", result.warnings);
152 0 : }
153 :
154 0 : if !result.garbage_keys.is_empty() {
155 0 : error!(
156 0 : "The following keys should be removed from S3: {0:?}",
157 0 : result.garbage_keys
158 0 : )
159 0 : }
160 :
161 0 : result
162 0 : }
163 :
164 0 : #[derive(Default)]
165 : pub(crate) struct LayerRef {
166 : ref_count: usize,
167 : }
168 :
169 : /// Top-level index of objects in a tenant. This may be used by any shard-timeline within
170 : /// the tenant to query whether an object exists.
171 0 : #[derive(Default)]
172 : pub(crate) struct TenantObjectListing {
173 : shard_timelines:
174 : HashMap<(ShardIndex, TimelineId), HashMap<(LayerFileName, Generation), LayerRef>>,
175 : }
176 :
177 : impl TenantObjectListing {
178 : /// Having done an S3 listing of the keys within a timeline prefix, merge them into the overall
179 : /// list of layer keys for the Tenant.
180 0 : pub(crate) fn push(
181 0 : &mut self,
182 0 : ttid: TenantShardTimelineId,
183 0 : layers: HashSet<(LayerFileName, Generation)>,
184 0 : ) {
185 0 : let shard_index = ShardIndex::new(
186 0 : ttid.tenant_shard_id.shard_number,
187 0 : ttid.tenant_shard_id.shard_count,
188 0 : );
189 0 : let replaced = self.shard_timelines.insert(
190 0 : (shard_index, ttid.timeline_id),
191 0 : layers
192 0 : .into_iter()
193 0 : .map(|l| (l, LayerRef::default()))
194 0 : .collect(),
195 0 : );
196 :
197 0 : assert!(
198 0 : replaced.is_none(),
199 0 : "Built from an S3 object listing, which should never repeat a key"
200 : );
201 0 : }
202 :
203 : /// Having loaded a timeline index, check if a layer referenced by the index exists. If it does,
204 : /// the layer's refcount will be incremented. Later, after calling this for all references in all indices
205 : /// in a tenant, orphan layers may be detected by their zero refcounts.
206 : ///
207 : /// Returns true if the layer exists
208 0 : pub(crate) fn check_ref(
209 0 : &mut self,
210 0 : timeline_id: TimelineId,
211 0 : layer_file: &LayerFileName,
212 0 : metadata: &IndexLayerMetadata,
213 0 : ) -> bool {
214 0 : let Some(shard_tl) = self.shard_timelines.get_mut(&(metadata.shard, timeline_id)) else {
215 0 : return false;
216 : };
217 :
218 0 : let Some(layer_ref) = shard_tl.get_mut(&(layer_file.clone(), metadata.generation)) else {
219 0 : return false;
220 : };
221 :
222 0 : layer_ref.ref_count += 1;
223 0 :
224 0 : true
225 0 : }
226 :
227 0 : pub(crate) fn get_orphans(&self) -> Vec<(ShardIndex, TimelineId, LayerFileName, Generation)> {
228 0 : let mut result = Vec::new();
229 0 : for ((shard_index, timeline_id), layers) in &self.shard_timelines {
230 0 : for ((layer_file, generation), layer_ref) in layers {
231 0 : if layer_ref.ref_count == 0 {
232 0 : result.push((*shard_index, *timeline_id, layer_file.clone(), *generation))
233 0 : }
234 : }
235 : }
236 :
237 0 : result
238 0 : }
239 : }
240 :
241 0 : #[derive(Debug)]
242 : pub(crate) struct S3TimelineBlobData {
243 : pub(crate) blob_data: BlobDataParseResult,
244 : pub(crate) keys_to_remove: Vec<String>,
245 : }
246 :
247 0 : #[derive(Debug)]
248 : pub(crate) enum BlobDataParseResult {
249 : Parsed {
250 : index_part: IndexPart,
251 : index_part_generation: Generation,
252 : s3_layers: HashSet<(LayerFileName, Generation)>,
253 : },
254 : /// The remains of a deleted Timeline (i.e. an initdb archive only)
255 : Relic,
256 : Incorrect(Vec<String>),
257 : }
258 :
259 0 : fn parse_layer_object_name(name: &str) -> Result<(LayerFileName, Generation), String> {
260 0 : match name.rsplit_once('-') {
261 : // FIXME: this is gross, just use a regex?
262 0 : Some((layer_filename, gen)) if gen.len() == 8 => {
263 0 : let layer = layer_filename.parse::<LayerFileName>()?;
264 0 : let gen =
265 0 : Generation::parse_suffix(gen).ok_or("Malformed generation suffix".to_string())?;
266 0 : Ok((layer, gen))
267 : }
268 0 : _ => Ok((name.parse::<LayerFileName>()?, Generation::none())),
269 : }
270 0 : }
271 :
272 0 : pub(crate) async fn list_timeline_blobs(
273 0 : s3_client: &Client,
274 0 : id: TenantShardTimelineId,
275 0 : s3_root: &RootTarget,
276 0 : ) -> anyhow::Result<S3TimelineBlobData> {
277 0 : let mut s3_layers = HashSet::new();
278 0 :
279 0 : let mut errors = Vec::new();
280 0 : let mut keys_to_remove = Vec::new();
281 0 :
282 0 : let mut timeline_dir_target = s3_root.timeline_root(&id);
283 0 : timeline_dir_target.delimiter = String::new();
284 0 :
285 0 : let mut index_parts: Vec<ObjectIdentifier> = Vec::new();
286 0 : let mut initdb_archive: bool = false;
287 0 :
288 0 : let stream = stream_listing(s3_client, &timeline_dir_target);
289 0 : pin_mut!(stream);
290 0 : while let Some(obj) = stream.next().await {
291 0 : let obj = obj?;
292 0 : let key = obj.key();
293 0 :
294 0 : let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
295 0 : match blob_name {
296 0 : Some(name) if name.starts_with("index_part.json") => {
297 0 : tracing::info!("Index key {key}");
298 0 : index_parts.push(obj)
299 : }
300 0 : Some("initdb.tar.zst") => {
301 0 : tracing::info!("initdb archive {key}");
302 0 : initdb_archive = true;
303 : }
304 0 : Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
305 0 : Ok((new_layer, gen)) => {
306 0 : tracing::info!("Parsed layer key: {} {:?}", new_layer, gen);
307 0 : s3_layers.insert((new_layer, gen));
308 : }
309 0 : Err(e) => {
310 0 : tracing::info!("Error parsing key {maybe_layer_name}");
311 0 : errors.push(
312 0 : format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
313 0 : );
314 0 : keys_to_remove.push(key.to_string());
315 : }
316 : },
317 : None => {
318 0 : tracing::info!("Peculiar key {}", key);
319 0 : errors.push(format!("S3 list response got an object with odd key {key}"));
320 0 : keys_to_remove.push(key.to_string());
321 : }
322 : }
323 : }
324 :
325 0 : if index_parts.is_empty() && s3_layers.is_empty() && initdb_archive {
326 0 : tracing::info!(
327 0 : "Timeline is empty apart from initdb archive: expected post-deletion state."
328 0 : );
329 0 : return Ok(S3TimelineBlobData {
330 0 : blob_data: BlobDataParseResult::Relic,
331 0 : keys_to_remove: Vec::new(),
332 0 : });
333 0 : }
334 :
335 : // Choose the index_part with the highest generation
336 0 : let (index_part_object, index_part_generation) = match index_parts
337 0 : .iter()
338 0 : .filter_map(|k| {
339 0 : let key = k.key();
340 0 : // Stripping the index key to the last part, because RemotePath doesn't
341 0 : // like absolute paths, and depending on prefix_in_bucket it's possible
342 0 : // for the keys we read back to start with a slash.
343 0 : let basename = key.rsplit_once('/').unwrap().1;
344 0 : parse_remote_index_path(RemotePath::from_string(basename).unwrap()).map(|g| (k, g))
345 0 : })
346 0 : .max_by_key(|i| i.1)
347 0 : .map(|(k, g)| (k.clone(), g))
348 : {
349 0 : Some((key, gen)) => (Some(key), gen),
350 : None => {
351 : // Legacy/missing case: one or zero index parts, which did not have a generation
352 0 : (index_parts.pop(), Generation::none())
353 : }
354 : };
355 :
356 0 : if index_part_object.is_none() {
357 0 : errors.push("S3 list response got no index_part.json file".to_string());
358 0 : }
359 :
360 0 : if let Some(index_part_object_key) = index_part_object.as_ref().map(|object| object.key()) {
361 0 : let index_part_bytes = download_object_with_retries(
362 0 : s3_client,
363 0 : &timeline_dir_target.bucket_name,
364 0 : index_part_object_key,
365 0 : )
366 0 : .await
367 0 : .context("index_part.json download")?;
368 :
369 0 : match serde_json::from_slice(&index_part_bytes) {
370 0 : Ok(index_part) => {
371 0 : return Ok(S3TimelineBlobData {
372 0 : blob_data: BlobDataParseResult::Parsed {
373 0 : index_part,
374 0 : index_part_generation,
375 0 : s3_layers,
376 0 : },
377 0 : keys_to_remove,
378 0 : })
379 : }
380 0 : Err(index_parse_error) => errors.push(format!(
381 0 : "index_part.json body parsing error: {index_parse_error}"
382 0 : )),
383 : }
384 0 : } else {
385 0 : errors.push(format!(
386 0 : "Index part object {index_part_object:?} has no key"
387 0 : ));
388 0 : }
389 :
390 0 : if errors.is_empty() {
391 0 : errors.push(
392 0 : "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
393 0 : );
394 0 : }
395 :
396 0 : Ok(S3TimelineBlobData {
397 0 : blob_data: BlobDataParseResult::Incorrect(errors),
398 0 : keys_to_remove,
399 0 : })
400 0 : }
|