Line data Source code
1 : use std::{collections::HashSet, str::FromStr, sync::Arc};
2 :
3 : use anyhow::{bail, Context};
4 : use futures::stream::{StreamExt, TryStreamExt};
5 : use once_cell::sync::OnceCell;
6 : use pageserver_api::shard::TenantShardId;
7 : use postgres_ffi::{XLogFileName, PG_TLI};
8 : use remote_storage::GenericRemoteStorage;
9 : use rustls::crypto::aws_lc_rs;
10 : use serde::Serialize;
11 : use tokio_postgres::types::PgLsn;
12 : use tracing::{debug, error, info};
13 : use utils::{
14 : id::{TenantId, TenantTimelineId, TimelineId},
15 : lsn::Lsn,
16 : };
17 :
18 : use crate::{
19 : cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
20 : BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
21 : };
22 :
23 : /// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
24 : const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
25 :
26 : #[derive(Serialize)]
27 : pub struct MetadataSummary {
28 : timeline_count: usize,
29 : with_errors: HashSet<TenantTimelineId>,
30 : deleted_count: usize,
31 : }
32 :
33 : impl MetadataSummary {
34 0 : fn new() -> Self {
35 0 : Self {
36 0 : timeline_count: 0,
37 0 : with_errors: HashSet::new(),
38 0 : deleted_count: 0,
39 0 : }
40 0 : }
41 :
42 0 : pub fn summary_string(&self) -> String {
43 0 : format!(
44 0 : "timeline_count: {}, with_errors: {}",
45 0 : self.timeline_count,
46 0 : self.with_errors.len()
47 0 : )
48 0 : }
49 :
50 0 : pub fn is_empty(&self) -> bool {
51 0 : self.timeline_count == 0
52 0 : }
53 :
54 0 : pub fn is_fatal(&self) -> bool {
55 0 : !self.with_errors.is_empty()
56 0 : }
57 : }
58 :
59 0 : #[derive(serde::Deserialize)]
60 : pub struct TimelineLsnData {
61 : tenant_id: String,
62 : timeline_id: String,
63 : timeline_start_lsn: Lsn,
64 : backup_lsn: Lsn,
65 : }
66 :
67 : pub enum DatabaseOrList {
68 : Database {
69 : tenant_ids: Vec<TenantId>,
70 : connstr: String,
71 : table: String,
72 : },
73 : List(Vec<TimelineLsnData>),
74 : }
75 :
76 : /// Scan the safekeeper metadata in an S3 bucket, reporting errors and
77 : /// statistics.
78 : ///
79 : /// It works by listing timelines along with timeline_start_lsn and backup_lsn
80 : /// in debug dump in dump_db_table and verifying its s3 contents. If some WAL
81 : /// segments are missing, before complaining control plane is queried to check if
82 : /// the project wasn't deleted in the meanwhile.
83 0 : pub async fn scan_safekeeper_metadata(
84 0 : bucket_config: BucketConfig,
85 0 : db_or_list: DatabaseOrList,
86 0 : ) -> anyhow::Result<MetadataSummary> {
87 0 : info!(
88 0 : "checking bucket {}, region {}",
89 : bucket_config.bucket, bucket_config.region
90 : );
91 :
92 0 : let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
93 0 : let console_config = ConsoleConfig::from_env()?;
94 0 : let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
95 :
96 0 : let timelines = match db_or_list {
97 : DatabaseOrList::Database {
98 0 : tenant_ids,
99 0 : connstr,
100 0 : table,
101 0 : } => load_timelines_from_db(tenant_ids, connstr, table).await?,
102 0 : DatabaseOrList::List(list) => list,
103 : };
104 0 : info!("loaded {} timelines", timelines.len());
105 :
106 0 : let checks = futures::stream::iter(timelines.into_iter().map(Ok)).map_ok(|timeline| {
107 0 : let tenant_id = TenantId::from_str(&timeline.tenant_id).expect("failed to parse tenant_id");
108 0 : let timeline_id =
109 0 : TimelineId::from_str(&timeline.timeline_id).expect("failed to parse tenant_id");
110 0 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
111 0 : check_timeline(
112 0 : &remote_client,
113 0 : &target,
114 0 : &cloud_admin_api_client,
115 0 : ttid,
116 0 : timeline.timeline_start_lsn,
117 0 : timeline.backup_lsn,
118 0 : )
119 0 : });
120 : // Run multiple check_timeline's concurrently.
121 : const CONCURRENCY: usize = 32;
122 0 : let mut timelines = checks.try_buffered(CONCURRENCY);
123 0 :
124 0 : let mut summary = MetadataSummary::new();
125 0 : while let Some(r) = timelines.next().await {
126 0 : let res = r?;
127 0 : summary.timeline_count += 1;
128 0 : if !res.is_ok {
129 0 : summary.with_errors.insert(res.ttid);
130 0 : }
131 0 : if res.is_deleted {
132 0 : summary.deleted_count += 1;
133 0 : }
134 : }
135 :
136 0 : Ok(summary)
137 0 : }
138 :
139 : struct TimelineCheckResult {
140 : ttid: TenantTimelineId,
141 : is_ok: bool,
142 : is_deleted: bool, // timeline is deleted in cplane
143 : }
144 :
145 : /// List s3 and check that is has all expected WAL for the ttid. Consistency
146 : /// errors are logged to stderr; returns Ok(true) if timeline is consistent,
147 : /// Ok(false) if not, Err if failed to check.
148 0 : async fn check_timeline(
149 0 : remote_client: &GenericRemoteStorage,
150 0 : root: &RootTarget,
151 0 : api_client: &CloudAdminApiClient,
152 0 : ttid: TenantTimelineId,
153 0 : timeline_start_lsn: Lsn,
154 0 : backup_lsn: Lsn,
155 0 : ) -> anyhow::Result<TimelineCheckResult> {
156 0 : debug!(
157 0 : "checking ttid {}, should contain WAL [{}-{}]",
158 : ttid, timeline_start_lsn, backup_lsn
159 : );
160 : // calculate expected segfiles
161 0 : let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE);
162 0 : let expected_last_segno = backup_lsn.segment_number(WAL_SEGSIZE);
163 0 : let mut expected_segfiles: HashSet<String> = HashSet::from_iter(
164 0 : (expected_first_segno..expected_last_segno)
165 0 : .map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)),
166 0 : );
167 0 : let expected_files_num = expected_segfiles.len();
168 0 : debug!("expecting {} files", expected_segfiles.len(),);
169 :
170 : // now list s3 and check if it misses something
171 0 : let ttshid =
172 0 : TenantShardTimelineId::new(TenantShardId::unsharded(ttid.tenant_id), ttid.timeline_id);
173 0 : let mut timeline_dir_target = root.timeline_root(&ttshid);
174 0 : // stream_listing yields only common_prefixes if delimiter is not empty, but
175 0 : // we need files, so unset it.
176 0 : timeline_dir_target.delimiter = String::new();
177 0 :
178 0 : let prefix_str = &timeline_dir_target
179 0 : .prefix_in_bucket
180 0 : .strip_prefix("/")
181 0 : .unwrap_or(&timeline_dir_target.prefix_in_bucket);
182 0 :
183 0 : let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
184 0 : while let Some(obj) = stream.next().await {
185 0 : let (key, _obj) = obj?;
186 :
187 0 : let seg_name = key
188 0 : .get_path()
189 0 : .as_str()
190 0 : .strip_prefix(prefix_str)
191 0 : .expect("failed to extract segment name");
192 0 : expected_segfiles.remove(seg_name);
193 : }
194 0 : if !expected_segfiles.is_empty() {
195 : // Before complaining check cplane, probably timeline is already deleted.
196 0 : let bdata = api_client
197 0 : .find_timeline_branch(ttid.tenant_id, ttid.timeline_id)
198 0 : .await?;
199 0 : let deleted = match bdata {
200 0 : Some(bdata) => bdata.deleted,
201 : None => {
202 : // note: should be careful with selecting proper cplane address
203 0 : info!("ttid {} not found, assuming it is deleted", ttid);
204 0 : true
205 : }
206 : };
207 0 : if deleted {
208 : // ok, branch is deleted
209 0 : return Ok(TimelineCheckResult {
210 0 : ttid,
211 0 : is_ok: true,
212 0 : is_deleted: true,
213 0 : });
214 0 : }
215 0 : error!(
216 0 : "ttid {}: missing {} files out of {}, timeline_start_lsn {}, wal_backup_lsn {}",
217 0 : ttid,
218 0 : expected_segfiles.len(),
219 : expected_files_num,
220 : timeline_start_lsn,
221 : backup_lsn,
222 : );
223 0 : return Ok(TimelineCheckResult {
224 0 : ttid,
225 0 : is_ok: false,
226 0 : is_deleted: false,
227 0 : });
228 0 : }
229 0 : Ok(TimelineCheckResult {
230 0 : ttid,
231 0 : is_ok: true,
232 0 : is_deleted: false,
233 0 : })
234 0 : }
235 :
236 0 : fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
237 0 : let der_certs = rustls_native_certs::load_native_certs();
238 0 :
239 0 : if !der_certs.errors.is_empty() {
240 0 : bail!("could not load native tls certs: {:?}", der_certs.errors);
241 0 : }
242 0 :
243 0 : let mut store = rustls::RootCertStore::empty();
244 0 : store.add_parsable_certificates(der_certs.certs);
245 0 : Ok(Arc::new(store))
246 0 : }
247 : static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
248 :
249 0 : async fn load_timelines_from_db(
250 0 : tenant_ids: Vec<TenantId>,
251 0 : dump_db_connstr: String,
252 0 : dump_db_table: String,
253 0 : ) -> anyhow::Result<Vec<TimelineLsnData>> {
254 0 : info!("loading from table {dump_db_table}");
255 :
256 : // Use rustls (Neon requires TLS)
257 0 : let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
258 0 : let client_config =
259 0 : rustls::ClientConfig::builder_with_provider(Arc::new(aws_lc_rs::default_provider()))
260 0 : .with_safe_default_protocol_versions()
261 0 : .context("aws_lc_rs should support the default protocol versions")?
262 0 : .with_root_certificates(root_store)
263 0 : .with_no_client_auth();
264 0 : let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
265 0 : let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
266 : // The connection object performs the actual communication with the database,
267 : // so spawn it off to run on its own.
268 0 : tokio::spawn(async move {
269 0 : if let Err(e) = connection.await {
270 0 : eprintln!("connection error: {}", e);
271 0 : }
272 0 : });
273 :
274 0 : let tenant_filter_clause = if !tenant_ids.is_empty() {
275 0 : format!(
276 0 : "and tenant_id in ({})",
277 0 : tenant_ids
278 0 : .iter()
279 0 : .map(|t| format!("'{}'", t))
280 0 : .collect::<Vec<_>>()
281 0 : .join(", ")
282 0 : )
283 : } else {
284 0 : "".to_owned()
285 : };
286 0 : let query = format!(
287 0 : "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) \
288 0 : from \"{dump_db_table}\" \
289 0 : where not is_cancelled {tenant_filter_clause} \
290 0 : group by tenant_id, timeline_id;"
291 0 : );
292 0 : info!("query is {}", query);
293 0 : let timelines = client.query(&query, &[]).await?;
294 :
295 0 : let timelines = timelines
296 0 : .into_iter()
297 0 : .map(|row| {
298 0 : let tenant_id = row.get(0);
299 0 : let timeline_id = row.get(1);
300 0 : let timeline_start_lsn_pg: PgLsn = row.get(2);
301 0 : let backup_lsn_pg: PgLsn = row.get(3);
302 0 :
303 0 : TimelineLsnData {
304 0 : tenant_id,
305 0 : timeline_id,
306 0 : timeline_start_lsn: Lsn(u64::from(timeline_start_lsn_pg)),
307 0 : backup_lsn: Lsn(u64::from(backup_lsn_pg)),
308 0 : }
309 0 : })
310 0 : .collect::<Vec<TimelineLsnData>>();
311 0 : Ok(timelines)
312 0 : }
|