TLA Line data Source code
1 : use std::collections::{hash_map, HashMap, HashSet};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use anyhow::Context;
6 : use aws_sdk_s3::Client;
7 : use tokio::task::JoinSet;
8 : use tracing::{error, info, info_span, warn, Instrument};
9 :
10 : use crate::cloud_admin_api::{BranchData, CloudAdminApiClient, ProjectId};
11 : use crate::delete_batch_producer::DeleteProducerStats;
12 : use crate::{download_object_with_retries, list_objects_with_retries, RootTarget, MAX_RETRIES};
13 : use pageserver::tenant::storage_layer::LayerFileName;
14 : use pageserver::tenant::IndexPart;
15 : use utils::id::TenantTimelineId;
16 :
17 UBC 0 : pub async fn validate_pageserver_active_tenant_and_timelines(
18 0 : s3_client: Arc<Client>,
19 0 : s3_root: RootTarget,
20 0 : admin_client: Arc<CloudAdminApiClient>,
21 0 : batch_producer_stats: DeleteProducerStats,
22 0 : ) -> anyhow::Result<BranchCheckStats> {
23 0 : let Some(timeline_stats) = batch_producer_stats.timeline_stats else {
24 0 : info!("No tenant-only checks, exiting");
25 0 : return Ok(BranchCheckStats::default());
26 : };
27 :
28 0 : let s3_active_projects = batch_producer_stats
29 0 : .tenant_stats
30 0 : .active_entries
31 0 : .into_iter()
32 0 : .map(|project| (project.id.clone(), project))
33 0 : .collect::<HashMap<_, _>>();
34 0 : info!("Validating {} active tenants", s3_active_projects.len());
35 :
36 0 : let mut s3_active_branches_per_project = HashMap::<ProjectId, Vec<BranchData>>::new();
37 0 : let mut s3_blob_data = HashMap::<TenantTimelineId, S3TimelineBlobData>::new();
38 0 : for active_branch in timeline_stats.active_entries {
39 0 : let active_project_id = active_branch.project_id.clone();
40 0 : let active_branch_id = active_branch.id.clone();
41 0 : let active_timeline_id = active_branch.timeline_id;
42 0 :
43 0 : s3_active_branches_per_project
44 0 : .entry(active_project_id.clone())
45 0 : .or_default()
46 0 : .push(active_branch);
47 :
48 0 : let Some(active_project) = s3_active_projects.get(&active_project_id) else {
49 0 : error!(
50 0 : "Branch {:?} for project {:?} has no such project in the active projects",
51 0 : active_branch_id, active_project_id
52 0 : );
53 0 : continue;
54 : };
55 :
56 0 : let id = TenantTimelineId::new(active_project.tenant, active_timeline_id);
57 0 : s3_blob_data.insert(
58 0 : id,
59 0 : list_timeline_blobs(&s3_client, id, &s3_root)
60 0 : .await
61 0 : .with_context(|| format!("List timeline {id} blobs"))?,
62 : );
63 : }
64 :
65 0 : let mut branch_checks = JoinSet::new();
66 0 : for (_, s3_active_project) in s3_active_projects {
67 0 : let project_id = &s3_active_project.id;
68 0 : let tenant_id = s3_active_project.tenant;
69 :
70 0 : let mut console_active_branches =
71 0 : branches_for_project_with_retries(&admin_client, project_id)
72 0 : .await
73 0 : .with_context(|| {
74 0 : format!("Client API branches for project {project_id:?} retrieval")
75 0 : })?
76 0 : .into_iter()
77 0 : .map(|branch| (branch.id.clone(), branch))
78 0 : .collect::<HashMap<_, _>>();
79 0 :
80 0 : let active_branches = s3_active_branches_per_project
81 0 : .remove(project_id)
82 0 : .unwrap_or_default();
83 0 : info!(
84 0 : "Spawning tasks for {} tenant {} active timelines",
85 0 : active_branches.len(),
86 0 : tenant_id
87 0 : );
88 0 : for s3_active_branch in active_branches {
89 0 : let console_branch = console_active_branches.remove(&s3_active_branch.id);
90 0 : let timeline_id = s3_active_branch.timeline_id;
91 0 : let id = TenantTimelineId::new(tenant_id, timeline_id);
92 0 : let s3_data = s3_blob_data.remove(&id);
93 0 : let s3_root = s3_root.clone();
94 0 : branch_checks.spawn(
95 0 : async move {
96 0 : let check_errors = branch_cleanup_and_check_errors(
97 0 : &id,
98 0 : &s3_root,
99 0 : Some(&s3_active_branch),
100 0 : console_branch,
101 0 : s3_data,
102 0 : )
103 0 : .await;
104 0 : (id, check_errors)
105 0 : }
106 0 : .instrument(info_span!("check_timeline", id = %id)),
107 : );
108 : }
109 : }
110 :
111 0 : let mut total_stats = BranchCheckStats::default();
112 0 : while let Some((id, analysis)) = branch_checks
113 0 : .join_next()
114 0 : .await
115 0 : .transpose()
116 0 : .context("branch check task join")?
117 0 : {
118 0 : total_stats.add(id, analysis.errors);
119 0 : }
120 0 : Ok(total_stats)
121 0 : }
122 :
123 0 : async fn branches_for_project_with_retries(
124 0 : admin_client: &CloudAdminApiClient,
125 0 : project_id: &ProjectId,
126 0 : ) -> anyhow::Result<Vec<BranchData>> {
127 0 : for _ in 0..MAX_RETRIES {
128 0 : match admin_client.branches_for_project(project_id, false).await {
129 0 : Ok(branches) => return Ok(branches),
130 0 : Err(e) => {
131 0 : error!("admin list branches for project {project_id:?} query failed: {e}");
132 0 : tokio::time::sleep(Duration::from_secs(1)).await;
133 : }
134 : }
135 : }
136 :
137 0 : anyhow::bail!("Failed to list branches for project {project_id:?} {MAX_RETRIES} times")
138 0 : }
139 :
140 0 : #[derive(Debug, Default)]
141 : pub struct BranchCheckStats {
142 : pub timelines_with_errors: HashMap<TenantTimelineId, Vec<String>>,
143 : pub normal_timelines: HashSet<TenantTimelineId>,
144 : }
145 :
146 : impl BranchCheckStats {
147 0 : pub fn add(&mut self, id: TenantTimelineId, check_errors: Vec<String>) {
148 0 : if check_errors.is_empty() {
149 0 : if !self.normal_timelines.insert(id) {
150 0 : panic!("Checking branch with timeline {id} more than once")
151 0 : }
152 : } else {
153 0 : match self.timelines_with_errors.entry(id) {
154 : hash_map::Entry::Occupied(_) => {
155 0 : panic!("Checking branch with timeline {id} more than once")
156 : }
157 0 : hash_map::Entry::Vacant(v) => {
158 0 : v.insert(check_errors);
159 0 : }
160 : }
161 : }
162 0 : }
163 : }
164 :
165 : pub struct TimelineAnalysis {
166 : /// Anomalies detected
167 : pub errors: Vec<String>,
168 :
169 : /// Healthy-but-noteworthy, like old-versioned structures that are readable but
170 : /// worth reporting for awareness that we must not remove that old version decoding
171 : /// yet.
172 : pub warnings: Vec<String>,
173 :
174 : /// Keys not referenced in metadata: candidates for removal
175 : pub garbage_keys: Vec<String>,
176 : }
177 :
178 : impl TimelineAnalysis {
179 0 : fn new() -> Self {
180 0 : Self {
181 0 : errors: Vec::new(),
182 0 : warnings: Vec::new(),
183 0 : garbage_keys: Vec::new(),
184 0 : }
185 0 : }
186 : }
187 :
188 0 : pub async fn branch_cleanup_and_check_errors(
189 0 : id: &TenantTimelineId,
190 0 : s3_root: &RootTarget,
191 0 : s3_active_branch: Option<&BranchData>,
192 0 : console_branch: Option<BranchData>,
193 0 : s3_data: Option<S3TimelineBlobData>,
194 0 : ) -> TimelineAnalysis {
195 0 : let mut result = TimelineAnalysis::new();
196 :
197 0 : info!("Checking timeline {id}");
198 :
199 0 : if let Some(s3_active_branch) = s3_active_branch {
200 0 : info!(
201 0 : "Checking console status for timeline for branch {:?}/{:?}",
202 0 : s3_active_branch.project_id, s3_active_branch.id
203 0 : );
204 0 : match console_branch {
205 0 : Some(_) => {result.errors.push(format!("Timeline has deleted branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
206 0 : s3_active_branch.id, s3_active_branch.project_id))
207 : },
208 : None => {
209 0 : result.errors.push(format!("Timeline has no branch data in the console (id = {:?}, project_id = {:?}), recheck whether it got removed during the check",
210 0 : s3_active_branch.id, s3_active_branch.project_id))
211 : }
212 : };
213 0 : }
214 :
215 0 : match s3_data {
216 0 : Some(s3_data) => {
217 0 : result.garbage_keys.extend(s3_data.keys_to_remove);
218 0 :
219 0 : match s3_data.blob_data {
220 : BlobDataParseResult::Parsed {
221 0 : index_part,
222 0 : mut s3_layers,
223 0 : } => {
224 0 : if !IndexPart::KNOWN_VERSIONS.contains(&index_part.get_version()) {
225 0 : result.errors.push(format!(
226 0 : "index_part.json version: {}",
227 0 : index_part.get_version()
228 0 : ))
229 0 : }
230 :
231 0 : if &index_part.get_version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
232 0 : result.warnings.push(format!(
233 0 : "index_part.json version is not latest: {}",
234 0 : index_part.get_version()
235 0 : ))
236 0 : }
237 :
238 0 : if index_part.metadata.disk_consistent_lsn()
239 0 : != index_part.get_disk_consistent_lsn()
240 : {
241 0 : result.errors.push(format!(
242 0 : "Mismatching disk_consistent_lsn in TimelineMetadata ({}) and in the index_part ({})",
243 0 : index_part.metadata.disk_consistent_lsn(),
244 0 : index_part.get_disk_consistent_lsn(),
245 0 :
246 0 : ))
247 0 : }
248 :
249 0 : if index_part.layer_metadata.is_empty() {
250 : // not an error, can happen for branches with zero writes, but notice that
251 0 : info!("index_part.json has no layers");
252 0 : }
253 :
254 0 : for (layer, metadata) in index_part.layer_metadata {
255 0 : if metadata.file_size == 0 {
256 0 : result.errors.push(format!(
257 0 : "index_part.json contains a layer {} that has 0 size in its layer metadata", layer.file_name(),
258 0 : ))
259 0 : }
260 :
261 0 : if !s3_layers.remove(&layer) {
262 0 : result.errors.push(format!(
263 0 : "index_part.json contains a layer {} that is not present in S3",
264 0 : layer.file_name(),
265 0 : ))
266 0 : }
267 : }
268 :
269 0 : if !s3_layers.is_empty() {
270 0 : result.errors.push(format!(
271 0 : "index_part.json does not contain layers from S3: {:?}",
272 0 : s3_layers
273 0 : .iter()
274 0 : .map(|layer_name| layer_name.file_name())
275 0 : .collect::<Vec<_>>(),
276 0 : ));
277 0 : result
278 0 : .garbage_keys
279 0 : .extend(s3_layers.iter().map(|layer_name| {
280 0 : let mut key = s3_root.timeline_root(id).prefix_in_bucket;
281 0 : let delimiter = s3_root.delimiter();
282 0 : if !key.ends_with(delimiter) {
283 0 : key.push_str(delimiter);
284 0 : }
285 0 : key.push_str(&layer_name.file_name());
286 0 : key
287 0 : }));
288 0 : }
289 : }
290 0 : BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
291 0 : parse_errors
292 0 : .into_iter()
293 0 : .map(|error| format!("parse error: {error}")),
294 0 : ),
295 : }
296 : }
297 0 : None => result
298 0 : .errors
299 0 : .push("Timeline has no data on S3 at all".to_string()),
300 : }
301 :
302 0 : if result.errors.is_empty() {
303 0 : info!("No check errors found");
304 : } else {
305 0 : warn!("Timeline metadata errors: {0:?}", result.errors);
306 : }
307 :
308 0 : if !result.warnings.is_empty() {
309 0 : warn!("Timeline metadata warnings: {0:?}", result.warnings);
310 0 : }
311 :
312 0 : if !result.garbage_keys.is_empty() {
313 0 : error!(
314 0 : "The following keys should be removed from S3: {0:?}",
315 0 : result.garbage_keys
316 0 : )
317 0 : }
318 :
319 0 : result
320 0 : }
321 :
322 0 : #[derive(Debug)]
323 : pub struct S3TimelineBlobData {
324 : pub blob_data: BlobDataParseResult,
325 : pub keys_to_remove: Vec<String>,
326 : }
327 :
328 0 : #[derive(Debug)]
329 : pub enum BlobDataParseResult {
330 : Parsed {
331 : index_part: IndexPart,
332 : s3_layers: HashSet<LayerFileName>,
333 : },
334 : Incorrect(Vec<String>),
335 : }
336 :
337 0 : pub async fn list_timeline_blobs(
338 0 : s3_client: &Client,
339 0 : id: TenantTimelineId,
340 0 : s3_root: &RootTarget,
341 0 : ) -> anyhow::Result<S3TimelineBlobData> {
342 0 : let mut s3_layers = HashSet::new();
343 0 : let mut index_part_object = None;
344 0 :
345 0 : let timeline_dir_target = s3_root.timeline_root(&id);
346 0 : let mut continuation_token = None;
347 0 :
348 0 : let mut errors = Vec::new();
349 0 : let mut keys_to_remove = Vec::new();
350 :
351 : loop {
352 0 : let fetch_response =
353 0 : list_objects_with_retries(s3_client, &timeline_dir_target, continuation_token.clone())
354 0 : .await?;
355 :
356 0 : let subdirectories = fetch_response.common_prefixes().unwrap_or_default();
357 0 : if !subdirectories.is_empty() {
358 0 : errors.push(format!(
359 0 : "S3 list response should not contain any subdirectories, but got {subdirectories:?}"
360 0 : ));
361 0 : }
362 :
363 0 : for (object, key) in fetch_response
364 0 : .contents()
365 0 : .unwrap_or_default()
366 0 : .iter()
367 0 : .filter_map(|object| Some((object, object.key()?)))
368 : {
369 0 : let blob_name = key.strip_prefix(&timeline_dir_target.prefix_in_bucket);
370 0 : match blob_name {
371 0 : Some("index_part.json") => index_part_object = Some(object.clone()),
372 0 : Some(maybe_layer_name) => match maybe_layer_name.parse::<LayerFileName>() {
373 0 : Ok(new_layer) => {
374 0 : s3_layers.insert(new_layer);
375 0 : }
376 0 : Err(e) => {
377 0 : errors.push(
378 0 : format!("S3 list response got an object with key {key} that is not a layer name: {e}"),
379 0 : );
380 0 : keys_to_remove.push(key.to_string());
381 0 : }
382 : },
383 0 : None => {
384 0 : errors.push(format!("S3 list response got an object with odd key {key}"));
385 0 : keys_to_remove.push(key.to_string());
386 0 : }
387 : }
388 : }
389 :
390 0 : match fetch_response.next_continuation_token {
391 0 : Some(new_token) => continuation_token = Some(new_token),
392 0 : None => break,
393 0 : }
394 0 : }
395 0 :
396 0 : if index_part_object.is_none() {
397 0 : errors.push("S3 list response got no index_part.json file".to_string());
398 0 : }
399 :
400 0 : if let Some(index_part_object_key) = index_part_object.as_ref().and_then(|object| object.key())
401 : {
402 0 : let index_part_bytes = download_object_with_retries(
403 0 : s3_client,
404 0 : &timeline_dir_target.bucket_name,
405 0 : index_part_object_key,
406 0 : )
407 0 : .await
408 0 : .context("index_part.json download")?;
409 :
410 0 : match serde_json::from_slice(&index_part_bytes) {
411 0 : Ok(index_part) => {
412 0 : return Ok(S3TimelineBlobData {
413 0 : blob_data: BlobDataParseResult::Parsed {
414 0 : index_part,
415 0 : s3_layers,
416 0 : },
417 0 : keys_to_remove,
418 0 : })
419 : }
420 0 : Err(index_parse_error) => errors.push(format!(
421 0 : "index_part.json body parsing error: {index_parse_error}"
422 0 : )),
423 : }
424 0 : } else {
425 0 : errors.push(format!(
426 0 : "Index part object {index_part_object:?} has no key"
427 0 : ));
428 0 : }
429 :
430 0 : if errors.is_empty() {
431 0 : errors.push(
432 0 : "Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
433 0 : );
434 0 : }
435 :
436 0 : Ok(S3TimelineBlobData {
437 0 : blob_data: BlobDataParseResult::Incorrect(errors),
438 0 : keys_to_remove,
439 0 : })
440 0 : }
|