Line data Source code
1 : use std::collections::{hash_map, HashMap, HashSet};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use anyhow::Context;
6 : use aws_sdk_s3::Client;
7 : use tokio::task::JoinSet;
8 : use tracing::{error, info, info_span, warn, Instrument};
9 :
10 : use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectId};
11 : use crate::delete_batch_producer::DeleteProducerStats;
12 : use crate::{download_object_with_retries, list_objects_with_retries, RootTarget, MAX_RETRIES};
13 : use pageserver::tenant::storage_layer::LayerFileName;
14 : use pageserver::tenant::IndexPart;
15 : use utils::id::TenantTimelineId;
16 :
17 0 : pub async fn validate_pageserver_active_tenant_and_timelines(
18 0 : s3_client: Arc<Client>,
19 0 : s3_root: RootTarget,
20 0 : admin_client: Arc<CloudAdminApiClient>,
21 0 : batch_producer_stats: DeleteProducerStats,
22 0 : ) -> anyhow::Result<BranchCheckStats> {
23 0 : let Some(timeline_stats) = batch_producer_stats.timeline_stats else {
24 0 : info!("No tenant-only checks, exiting");
25 0 : return Ok(BranchCheckStats::default());
26 : };
27 :
28 0 : let s3_active_projects = batch_producer_stats
29 0 : .tenant_stats
30 0 : .active_entries
31 0 : .into_iter()
32 0 : .map(|project| (project.id.clone(), project))
33 0 : .collect::<HashMap<_, _>>();
34 0 : info!("Validating {} active tenants", s3_active_projects.len());
35 :
36 0 : let mut s3_active_branches_per_project = HashMap::<ProjectId, Vec<BranchData>>::new();
37 0 : let mut s3_blob_data = HashMap::<TenantTimelineId, S3TimelineBlobData>::new();
38 0 : for active_branch in timeline_stats.active_entries {
39 0 : let active_project_id = active_branch.project_id.clone();
40 0 : let active_branch_id = active_branch.id.clone();
41 0 : let active_timeline_id = active_branch.timeline_id;
42 0 :
43 0 : s3_active_branches_per_project
44 0 : .entry(active_project_id.clone())
45 0 : .or_default()
46 0 : .push(active_branch);
47 :
48 0 : let Some(active_project) = s3_active_projects.get(&active_project_id) else {
49 0 : error!("Branch {:?} for project {:?} has no such project in the active projects", active_branch_id, active_project_id);
50 0 : continue;
51 : };
52 :
53 0 : let id = TenantTimelineId::new(active_project.tenant, active_timeline_id);
54 0 : s3_blob_data.insert(
55 0 : id,
56 0 : list_timeline_blobs(&s3_client, id, &s3_root)
57 0 : .await
58 0 : .with_context(|| format!("List timeline {id} blobs"))?,
59 : );
60 : }
61 :
62 0 : let mut branch_checks = JoinSet::new();
63 0 : for (_, s3_active_project) in s3_active_projects {
64 0 : let project_id = &s3_active_project.id;
65 0 : let tenant_id = s3_active_project.tenant;
66 :
67 0 : let mut console_active_branches =
68 0 : branches_for_project_with_retries(&admin_client, project_id)
69 0 : .await
70 0 : .with_context(|| {
71 0 : format!("Client API branches for project {project_id:?} retrieval")
72 0 : })?
73 0 : .into_iter()
74 0 : .map(|branch| (branch.id.clone(), branch))
75 0 : .collect::<HashMap<_, _>>();
76 0 :
77 0 : let active_branches = s3_active_branches_per_project
78 0 : .remove(project_id)
79 0 : .unwrap_or_default();
80 0 : info!(
81 0 : "Spawning tasks for {} tenant {} active timelines",
82 0 : active_branches.len(),
83 0 : tenant_id
84 0 : );
85 0 : for s3_active_branch in active_branches {
86 0 : let console_branch = console_active_branches.remove(&s3_active_branch.id);
87 0 : let timeline_id = s3_active_branch.timeline_id;
88 0 : let id = TenantTimelineId::new(tenant_id, timeline_id);
89 0 : let s3_data = s3_blob_data.remove(&id);
90 0 : let s3_root = s3_root.clone();
91 0 : branch_checks.spawn(
92 0 : async move {
93 0 : let check_errors = branch_cleanup_and_check_errors(
94 0 : &id,
95 0 : &s3_root,
96 0 : Some(&s3_active_branch),
97 0 : console_branch,
98 0 : s3_data,
99 0 : )
100 0 : .await;
101 0 : (id, check_errors)
102 0 : }
103 0 : .instrument(info_span!("check_timeline", id = %id)),
104 : );
105 : }
106 : }
107 :
108 0 : let mut total_stats = BranchCheckStats::default();
109 0 : while let Some((id, analysis)) = branch_checks
110 0 : .join_next()
111 0 : .await
112 0 : .transpose()
113 0 : .context("branch check task join")?
114 0 : {
115 0 : total_stats.add(id, analysis.errors);
116 0 : }
117 0 : Ok(total_stats)
118 0 : }
119 :
120 0 : async fn branches_for_project_with_retries(
121 0 : admin_client: &CloudAdminApiClient,
122 0 : project_id: &ProjectId,
123 0 : ) -> anyhow::Result<Vec<BranchData>> {
124 0 : for _ in 0..MAX_RETRIES {
125 0 : match admin_client.branches_for_project(project_id, false).await {
126 0 : Ok(branches) => return Ok(branches),
127 0 : Err(e) => {
128 0 : error!("admin list branches for project {project_id:?} query failed: {e}");
129 0 : tokio::time::sleep(Duration::from_secs(1)).await;
130 : }
131 : }
132 : }
133 :
134 0 : anyhow::bail!("Failed to list branches for project {project_id:?} {MAX_RETRIES} times")
135 0 : }
136 :
137 0 : #[derive(Debug, Default)]
138 : pub struct BranchCheckStats {
139 : pub timelines_with_errors: HashMap<TenantTimelineId, Vec<String>>,
140 : pub normal_timelines: HashSet<TenantTimelineId>,
141 : }
142 :
143 : impl BranchCheckStats {
144 0 : pub fn add(&mut self, id: TenantTimelineId, check_errors: Vec<String>) {
145 0 : if check_errors.is_empty() {
146 0 : if !self.normal_timelines.insert(id) {
147 0 : panic!("Checking branch with timeline {id} more than once")
148 0 : }
149 : } else {
150 0 : match self.timelines_with_errors.entry(id) {
151 : hash_map::Entry::Occupied(_) => {
152 0 : panic!("Checking branch with timeline {id} more than once")
153 : }
154 0 : hash_map::Entry::Vacant(v) => {
155 0 : v.insert(check_errors);
156 0 : }
157 : }
158 : }
159 0 : }
160 : }
161 :
162 : pub struct TimelineAnalysis {
163 : /// Anomalies detected
164 : pub errors: Vec<String>,
165 :
166 : /// Healthy-but-noteworthy, like old-versioned structures that are readable but
167 : /// worth reporting for awareness that we must not remove that old version decoding
168 : /// yet.
169 : pub warnings: Vec<String>,
170 :
171 : /// Keys not referenced in metadata: candidates for removal
172 : pub garbage_keys: Vec<String>,
173 : }
174 :
175 : impl TimelineAnalysis {
176 0 : fn new() -> Self {
177 0 : Self {
178 0 : errors: Vec::new(),
179 0 : warnings: Vec::new(),
180 0 : garbage_keys: Vec::new(),
181 0 : }
182 0 : }
183 : }
184 :
185 0 : pub async fn branch_cleanup_and_check_errors(
186 0 : id: &TenantTimelineId,
187 0 : s3_root: &RootTarget,
188 0 : s3_active_branch: Option<&BranchData>,
189 0 : console_branch: Option<BranchData>,
190 0 : s3_data: Option<S3TimelineBlobData>,
191 0 : ) -> TimelineAnalysis {
192 0 : let mut result = TimelineAnalysis::new();
193 :
194 0 : info!("Checking timeline {id}");
195 :
196 0 : if let Some(s3_active_branch) = s3_active_branch {
197 0 : info!(
198 0 : "Checking console status for timeline for branch {:?}/{:?}",
199 0 : s3_active_branch.project_id, s3_active_branch.id
200 0 : );
201 0 : match console_branch {
202 0 : Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
203 0 : s3_active_branch.id, s3_active_branch.project_id))
204 : },
205 : None => {
206 0 : result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
207 0 : s3_active_branch.id, s3_active_branch.project_id))
208 : }
209 : };
210 0 : }
211 :
212 0 : match s3_data {
213 0 : Some(s3_data) => {
214 0 : result.garbage_keys.extend(s3_data.keys_to_remove);
215 0 :
216 0 : match s3_data.blob_data {
217 : BlobDataParseResult::Parsed {
218 0 : index_part,
219 0 : mut s3_layers,
220 0 : } => {
221 0 : if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
222 0 : result.errors.push(format!(
223 0 : "index_part.json version: {}",
224 0 : index_part.get_version()
225 0 : ))
226 0 : }
227 :
228 0 : if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
229 0 : result.warnings.push(format!(
230 0 : "index_part.json version is not latest: {}",
231 0 : index_part.get_version()
232 0 : ))
233 0 : }
234 :
235 0 : if index_part.metadata.disk_consistent_lsn()
236 0 : != index_part.get_disk_consistent_lsn()
237 : {
238 0 : result.errors.push(format!(
239 0 : "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
240 0 : index_part.metadata.disk_consistent_lsn(),
241 0 : index_part.get_disk_consistent_lsn(),
242 0 :
243 0 : ))
244 0 : }
245 :
246 0 : if index_part.layer_metadata.is_empty() {
247 : // not an error, can happen for branches with zero writes, but notice that
248 0 : info!("index_part.json has no layers");
249 0 : }
250 :
251 0 : for (layer, metadata) in index_part.layer_metadata {
252 0 : if metadata.file_size == 0 {
253 0 : result.errors.push(format!(
254 0 : "index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
255 0 : ))
256 0 : }
257 :
258 0 : if !s3_layers.remove(&layer) {
259 0 : result.errors.push(format!(
260 0 : "index_part.json contains a layer {} that is not present in S3",
261 0 : layer.file_name(),
262 0 : ))
263 0 : }
264 : }
265 :
266 0 : if !s3_layers.is_empty() {
267 0 : result.errors.push(format!(
268 0 : "index_part.json does not contain layers from S3: {:?}",
269 0 : s3_layers
270 0 : .iter()
271 0 : .map(|layer_name| layer_name.file_name())
272 0 : .collect::<Vec<_>>(),
273 0 : ));
274 0 : result
275 0 : .garbage_keys
276 0 : .extend(s3_layers.iter().map(|layer_name| {
277 0 : let mut key = s3_root.timeline_root(id).prefix_in_bucket;
278 0 : let delimiter = s3_root.delimiter();
279 0 : if !key.ends_with(delimiter) {
280 0 : key.push_str(delimiter);
281 0 : }
282 0 : key.push_str(&layer_name.file_name());
283 0 : key
284 0 : }));
285 0 : }
286 : }
287 0 : BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
288 0 : parse_errors
289 0 : .into_iter()
290 0 : .map(|error| format!("parse error: {error}")),
291 0 : ),
292 : }
293 : }
294 0 : None => result
295 0 : .errors
296 0 : .push("Timeline has no data on S3 at all".to_string()),
297 : }
298 :
299 0 : if result.errors.is_empty() {
300 0 : info!("No check errors found");
301 : } else {
302 0 : warn!("Timeline metadata errors: {0:?}", result.errors);
303 : }
304 :
305 0 : if !result.warnings.is_empty() {
306 0 : warn!("Timeline metadata warnings: {0:?}", result.warnings);
307 0 : }
308 :
309 0 : if !result.garbage_keys.is_empty() {
310 0 : error!(
311 0 : "The following keys should be removed from S3: {0:?}",
312 0 : result.garbage_keys
313 0 : )
314 0 : }
315 :
316 0 : result
317 0 : }
318 :
319 0 : #[derive(Debug)]
320 : pub struct S3TimelineBlobData {
321 : pub blob_data: BlobDataParseResult,
322 : pub keys_to_remove: Vec<String>,
323 : }
324 :
325 0 : #[derive(Debug)]
326 : pub enum BlobDataParseResult {
327 : Parsed {
328 : index_part: IndexPart,
329 : s3_layers: HashSet<LayerFileName>,
330 : },
331 : Incorrect(Vec<String>),
332 : }
333 :
334 0 : pub async fn list_timeline_blobs(
335 0 : s3_client: &Client,
336 0 : id: TenantTimelineId,
337 0 : s3_root: &RootTarget,
338 0 : ) -> anyhow::Result<S3TimelineBlobData> {
339 0 : let mut s3_layers = HashSet::new();
340 0 : let mut index_part_object = None;
341 0 :
342 0 : let timeline_dir_target = s3_root.timeline_root(&id);
343 0 : let mut continuation_token = None;
344 0 :
345 0 : let mut errors = Vec::new();
346 0 : let mut keys_to_remove = Vec::new();
347 :
348 : loop {
349 0 : let fetch_response =
350 0 : list_objects_with_retries(s3_client, &timeline_dir_target, continuation_token.clone())
351 0 : .await?;
352 :
353 0 : let subdirectories = fetch_response.common_prefixes().unwrap_or_default();
354 0 : if !subdirectories.is_empty() {
355 0 : errors.push(format!(
356 0 : "S3 list response should not contain any subdirectories, but got {subdirectories:?}"
357 0 : ));
358 0 : }
359 :
360 0 : for (object, key) in fetch_response
361 0 : .contents()
362 0 : .unwrap_or_default()
363 0 : .iter()
364 0 : .filter_map(|object| Some((object, object.key()?)))
365 : {
366 0 : let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
367 0 : match blob_name {
368 0 : Some("index_part.json") => index_part_object = Some(object.clone()),
369 0 : Some(maybe_layer_name) => match maybe_layer_name.parse::<LayerFileName>() {
370 0 : Ok(new_layer) => {
371 0 : s3_layers.insert(new_layer);
372 0 : }
373 0 : Err(e) => {
374 0 : errors.push(
375 0 : format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
376 0 : );
377 0 : keys_to_remove.push(key.to_string());
378 0 : }
379 : },
380 0 : None => {
381 0 : errors.push(format!("S3 list response got an object with odd key {key}"));
382 0 : keys_to_remove.push(key.to_string());
383 0 : }
384 : }
385 : }
386 :
387 0 : match fetch_response.next_continuation_token {
388 0 : Some(new_token) => continuation_token = Some(new_token),
389 0 : None => break,
390 0 : }
391 0 : }
392 0 :
393 0 : if index_part_object.is_none() {
394 0 : errors.push("S3 list response got no index_part.json file".to_string());
395 0 : }
396 :
397 0 : if let Some(index_part_object_key) = index_part_object.as_ref().and_then(|object| object.key())
398 : {
399 0 : let index_part_bytes = download_object_with_retries(
400 0 : s3_client,
401 0 : &timeline_dir_target.bucket_name,
402 0 : index_part_object_key,
403 0 : )
404 0 : .await
405 0 : .context("index_part.json download")?;
406 :
407 0 : match serde_json::from_slice(&index_part_bytes) {
408 0 : Ok(index_part) => {
409 0 : return Ok(S3TimelineBlobData {
410 0 : blob_data: BlobDataParseResult::Parsed {
411 0 : index_part,
412 0 : s3_layers,
413 0 : },
414 0 : keys_to_remove,
415 0 : })
416 : }
417 0 : Err(index_parse_error) => errors.push(format!(
418 0 : "index_part.json body parsing error: {index_parse_error}"
419 0 : )),
420 : }
421 0 : } else {
422 0 : errors.push(format!(
423 0 : "Index part object {index_part_object:?} has no key"
424 0 : ));
425 0 : }
426 :
427 0 : if errors.is_empty() {
428 0 : errors.push(
429 0 : "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
430 0 : );
431 0 : }
432 :
433 0 : Ok(S3TimelineBlobData {
434 0 : blob_data: BlobDataParseResult::Incorrect(errors),
435 0 : keys_to_remove,
436 0 : })
437 0 : }
|