Line data Source code
1 : use std::{collections::HashSet, str::FromStr};
2 :
3 : use aws_sdk_s3::Client;
4 : use futures::stream::{StreamExt, TryStreamExt};
5 : use pageserver_api::shard::TenantShardId;
6 : use postgres_ffi::{XLogFileName, PG_TLI};
7 : use serde::Serialize;
8 : use tokio_postgres::types::PgLsn;
9 : use tracing::{error, info, trace};
10 : use utils::{
11 : id::{TenantId, TenantTimelineId, TimelineId},
12 : lsn::Lsn,
13 : };
14 :
15 : use crate::{
16 : cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
17 : BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
18 : };
19 :
20 : /// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
21 : const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
22 :
23 : #[derive(Serialize)]
24 : pub struct MetadataSummary {
25 : timeline_count: usize,
26 : with_errors: HashSet<TenantTimelineId>,
27 : deleted_count: usize,
28 : }
29 :
30 : impl MetadataSummary {
31 0 : fn new() -> Self {
32 0 : Self {
33 0 : timeline_count: 0,
34 0 : with_errors: HashSet::new(),
35 0 : deleted_count: 0,
36 0 : }
37 0 : }
38 :
39 0 : pub fn summary_string(&self) -> String {
40 0 : format!(
41 0 : "timeline_count: {}, with_errors: {}",
42 0 : self.timeline_count,
43 0 : self.with_errors.len()
44 0 : )
45 0 : }
46 :
47 0 : pub fn is_empty(&self) -> bool {
48 0 : self.timeline_count == 0
49 0 : }
50 :
51 0 : pub fn is_fatal(&self) -> bool {
52 0 : !self.with_errors.is_empty()
53 0 : }
54 : }
55 :
56 : /// Scan the safekeeper metadata in an S3 bucket, reporting errors and
57 : /// statistics.
58 : ///
59 : /// It works by listing timelines along with timeline_start_lsn and backup_lsn
60 : /// in debug dump in dump_db_table and verifying its s3 contents. If some WAL
61 : /// segments are missing, before complaining control plane is queried to check if
62 : /// the project wasn't deleted in the meanwhile.
63 0 : pub async fn scan_safekeeper_metadata(
64 0 : bucket_config: BucketConfig,
65 0 : tenant_ids: Vec<TenantId>,
66 0 : dump_db_connstr: String,
67 0 : dump_db_table: String,
68 0 : ) -> anyhow::Result<MetadataSummary> {
69 0 : info!(
70 0 : "checking bucket {}, region {}, dump_db_table {}",
71 : bucket_config.bucket, bucket_config.region, dump_db_table
72 : );
73 : // Use the native TLS implementation (Neon requires TLS)
74 0 : let tls_connector =
75 0 : postgres_native_tls::MakeTlsConnector::new(native_tls::TlsConnector::new().unwrap());
76 0 : let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
77 : // The connection object performs the actual communication with the database,
78 : // so spawn it off to run on its own.
79 0 : tokio::spawn(async move {
80 0 : if let Err(e) = connection.await {
81 0 : eprintln!("connection error: {}", e);
82 0 : }
83 0 : });
84 :
85 0 : let tenant_filter_clause = if !tenant_ids.is_empty() {
86 0 : format!(
87 0 : "and tenant_id in ({})",
88 0 : tenant_ids
89 0 : .iter()
90 0 : .map(|t| format!("'{}'", t))
91 0 : .collect::<Vec<_>>()
92 0 : .join(", ")
93 0 : )
94 : } else {
95 0 : "".to_owned()
96 : };
97 0 : let query = format!(
98 0 : "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) from \"{}\" where not is_cancelled {} group by tenant_id, timeline_id;",
99 0 : dump_db_table, tenant_filter_clause,
100 0 : );
101 0 : info!("query is {}", query);
102 0 : let timelines = client.query(&query, &[]).await?;
103 0 : info!("loaded {} timelines", timelines.len());
104 :
105 0 : let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?;
106 0 : let console_config = ConsoleConfig::from_env()?;
107 0 : let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
108 0 :
109 0 : let checks = futures::stream::iter(timelines.iter().map(Ok)).map_ok(|row| {
110 0 : let tenant_id = TenantId::from_str(row.get(0)).expect("failed to parse tenant_id");
111 0 : let timeline_id = TimelineId::from_str(row.get(1)).expect("failed to parse tenant_id");
112 0 : let timeline_start_lsn_pg: PgLsn = row.get(2);
113 0 : let timeline_start_lsn: Lsn = Lsn(u64::from(timeline_start_lsn_pg));
114 0 : let backup_lsn_pg: PgLsn = row.get(3);
115 0 : let backup_lsn: Lsn = Lsn(u64::from(backup_lsn_pg));
116 0 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
117 0 : check_timeline(
118 0 : &s3_client,
119 0 : &target,
120 0 : &cloud_admin_api_client,
121 0 : ttid,
122 0 : timeline_start_lsn,
123 0 : backup_lsn,
124 0 : )
125 0 : });
126 0 : // Run multiple check_timeline's concurrently.
127 0 : const CONCURRENCY: usize = 32;
128 0 : let mut timelines = checks.try_buffered(CONCURRENCY);
129 0 :
130 0 : let mut summary = MetadataSummary::new();
131 0 : while let Some(r) = timelines.next().await {
132 0 : let res = r?;
133 0 : summary.timeline_count += 1;
134 0 : if !res.is_ok {
135 0 : summary.with_errors.insert(res.ttid);
136 0 : }
137 0 : if res.is_deleted {
138 0 : summary.deleted_count += 1;
139 0 : }
140 : }
141 :
142 0 : Ok(summary)
143 0 : }
144 :
145 : struct TimelineCheckResult {
146 : ttid: TenantTimelineId,
147 : is_ok: bool,
148 : is_deleted: bool, // timeline is deleted in cplane
149 : }
150 :
151 : /// List s3 and check that is has all expected WAL for the ttid. Consistency
152 : /// errors are logged to stderr; returns Ok(true) if timeline is consistent,
153 : /// Ok(false) if not, Err if failed to check.
154 0 : async fn check_timeline(
155 0 : s3_client: &Client,
156 0 : root: &RootTarget,
157 0 : api_client: &CloudAdminApiClient,
158 0 : ttid: TenantTimelineId,
159 0 : timeline_start_lsn: Lsn,
160 0 : backup_lsn: Lsn,
161 0 : ) -> anyhow::Result<TimelineCheckResult> {
162 0 : trace!(
163 0 : "checking ttid {}, should contain WAL [{}-{}]",
164 : ttid,
165 : timeline_start_lsn,
166 : backup_lsn
167 : );
168 : // calculate expected segfiles
169 0 : let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE);
170 0 : let expected_last_segno = backup_lsn.segment_number(WAL_SEGSIZE);
171 0 : let mut expected_segfiles: HashSet<String> = HashSet::from_iter(
172 0 : (expected_first_segno..expected_last_segno)
173 0 : .map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)),
174 0 : );
175 0 : let expected_files_num = expected_segfiles.len();
176 0 : trace!("expecting {} files", expected_segfiles.len(),);
177 :
178 : // now list s3 and check if it misses something
179 0 : let ttshid =
180 0 : TenantShardTimelineId::new(TenantShardId::unsharded(ttid.tenant_id), ttid.timeline_id);
181 0 : let mut timeline_dir_target = root.timeline_root(&ttshid);
182 0 : // stream_listing yields only common_prefixes if delimiter is not empty, but
183 0 : // we need files, so unset it.
184 0 : timeline_dir_target.delimiter = String::new();
185 0 :
186 0 : let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
187 0 : while let Some(obj) = stream.next().await {
188 0 : let obj = obj?;
189 0 : let key = obj.key();
190 0 :
191 0 : let seg_name = key
192 0 : .strip_prefix(&timeline_dir_target.prefix_in_bucket)
193 0 : .expect("failed to extract segment name");
194 0 : expected_segfiles.remove(seg_name);
195 : }
196 0 : if !expected_segfiles.is_empty() {
197 : // Before complaining check cplane, probably timeline is already deleted.
198 0 : let bdata = api_client
199 0 : .find_timeline_branch(ttid.tenant_id, ttid.timeline_id)
200 0 : .await?;
201 0 : let deleted = match bdata {
202 0 : Some(bdata) => bdata.deleted,
203 : None => {
204 : // note: should be careful with selecting proper cplane address
205 0 : info!("ttid {} not found, assuming it is deleted", ttid);
206 0 : true
207 : }
208 : };
209 0 : if deleted {
210 : // ok, branch is deleted
211 0 : return Ok(TimelineCheckResult {
212 0 : ttid,
213 0 : is_ok: true,
214 0 : is_deleted: true,
215 0 : });
216 0 : }
217 0 : error!(
218 0 : "ttid {}: missing {} files out of {}, timeline_start_lsn {}, wal_backup_lsn {}",
219 0 : ttid,
220 0 : expected_segfiles.len(),
221 : expected_files_num,
222 : timeline_start_lsn,
223 : backup_lsn,
224 : );
225 0 : return Ok(TimelineCheckResult {
226 0 : ttid,
227 0 : is_ok: false,
228 0 : is_deleted: false,
229 0 : });
230 0 : }
231 0 : Ok(TimelineCheckResult {
232 0 : ttid,
233 0 : is_ok: true,
234 0 : is_deleted: false,
235 0 : })
236 0 : }
|