Line data Source code
1 : use std::{collections::HashSet, str::FromStr, sync::Arc};
2 :
3 : use anyhow::{bail, Context};
4 : use futures::stream::{StreamExt, TryStreamExt};
5 : use once_cell::sync::OnceCell;
6 : use pageserver_api::shard::TenantShardId;
7 : use postgres_ffi::{XLogFileName, PG_TLI};
8 : use remote_storage::GenericRemoteStorage;
9 : use rustls::crypto::ring;
10 : use serde::Serialize;
11 : use tokio_postgres::types::PgLsn;
12 : use tracing::{debug, error, info};
13 : use utils::{
14 : id::{TenantId, TenantTimelineId, TimelineId},
15 : lsn::Lsn,
16 : };
17 :
18 : use crate::{
19 : cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
20 : BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
21 : };
22 :
23 : /// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
24 : const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
25 :
26 : #[derive(Serialize)]
27 : pub struct MetadataSummary {
28 : timeline_count: usize,
29 : with_errors: HashSet<TenantTimelineId>,
30 : deleted_count: usize,
31 : }
32 :
33 : impl MetadataSummary {
34 0 : fn new() -> Self {
35 0 : Self {
36 0 : timeline_count: 0,
37 0 : with_errors: HashSet::new(),
38 0 : deleted_count: 0,
39 0 : }
40 0 : }
41 :
42 0 : pub fn summary_string(&self) -> String {
43 0 : format!(
44 0 : "timeline_count: {}, with_errors: {}",
45 0 : self.timeline_count,
46 0 : self.with_errors.len()
47 0 : )
48 0 : }
49 :
50 0 : pub fn is_empty(&self) -> bool {
51 0 : self.timeline_count == 0
52 0 : }
53 :
54 0 : pub fn is_fatal(&self) -> bool {
55 0 : !self.with_errors.is_empty()
56 0 : }
57 : }
58 :
59 0 : #[derive(serde::Deserialize)]
60 : pub struct TimelineLsnData {
61 : tenant_id: String,
62 : timeline_id: String,
63 : timeline_start_lsn: Lsn,
64 : backup_lsn: Lsn,
65 : }
66 :
67 : pub enum DatabaseOrList {
68 : Database {
69 : tenant_ids: Vec<TenantId>,
70 : connstr: String,
71 : table: String,
72 : },
73 : List(Vec<TimelineLsnData>),
74 : }
75 :
76 : /// Scan the safekeeper metadata in an S3 bucket, reporting errors and
77 : /// statistics.
78 : ///
79 : /// It works by listing timelines along with timeline_start_lsn and backup_lsn
80 : /// in debug dump in dump_db_table and verifying its s3 contents. If some WAL
81 : /// segments are missing, before complaining control plane is queried to check if
82 : /// the project wasn't deleted in the meanwhile.
83 0 : pub async fn scan_safekeeper_metadata(
84 0 : bucket_config: BucketConfig,
85 0 : db_or_list: DatabaseOrList,
86 0 : ) -> anyhow::Result<MetadataSummary> {
87 0 : info!("checking {}", bucket_config.desc_str());
88 :
89 0 : let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
90 0 : let console_config = ConsoleConfig::from_env()?;
91 0 : let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
92 :
93 0 : let timelines = match db_or_list {
94 : DatabaseOrList::Database {
95 0 : tenant_ids,
96 0 : connstr,
97 0 : table,
98 0 : } => load_timelines_from_db(tenant_ids, connstr, table).await?,
99 0 : DatabaseOrList::List(list) => list,
100 : };
101 0 : info!("loaded {} timelines", timelines.len());
102 :
103 0 : let checks = futures::stream::iter(timelines.into_iter().map(Ok)).map_ok(|timeline| {
104 0 : let tenant_id = TenantId::from_str(&timeline.tenant_id).expect("failed to parse tenant_id");
105 0 : let timeline_id =
106 0 : TimelineId::from_str(&timeline.timeline_id).expect("failed to parse tenant_id");
107 0 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
108 0 : check_timeline(
109 0 : &remote_client,
110 0 : &target,
111 0 : &cloud_admin_api_client,
112 0 : ttid,
113 0 : timeline.timeline_start_lsn,
114 0 : timeline.backup_lsn,
115 0 : )
116 0 : });
117 : // Run multiple check_timeline's concurrently.
118 : const CONCURRENCY: usize = 32;
119 0 : let mut timelines = checks.try_buffered(CONCURRENCY);
120 0 :
121 0 : let mut summary = MetadataSummary::new();
122 0 : while let Some(r) = timelines.next().await {
123 0 : let res = r?;
124 0 : summary.timeline_count += 1;
125 0 : if !res.is_ok {
126 0 : summary.with_errors.insert(res.ttid);
127 0 : }
128 0 : if res.is_deleted {
129 0 : summary.deleted_count += 1;
130 0 : }
131 : }
132 :
133 0 : Ok(summary)
134 0 : }
135 :
136 : struct TimelineCheckResult {
137 : ttid: TenantTimelineId,
138 : is_ok: bool,
139 : is_deleted: bool, // timeline is deleted in cplane
140 : }
141 :
142 : /// List s3 and check that is has all expected WAL for the ttid. Consistency
143 : /// errors are logged to stderr; returns Ok(true) if timeline is consistent,
144 : /// Ok(false) if not, Err if failed to check.
145 0 : async fn check_timeline(
146 0 : remote_client: &GenericRemoteStorage,
147 0 : root: &RootTarget,
148 0 : api_client: &CloudAdminApiClient,
149 0 : ttid: TenantTimelineId,
150 0 : timeline_start_lsn: Lsn,
151 0 : backup_lsn: Lsn,
152 0 : ) -> anyhow::Result<TimelineCheckResult> {
153 0 : debug!(
154 0 : "checking ttid {}, should contain WAL [{}-{}]",
155 : ttid, timeline_start_lsn, backup_lsn
156 : );
157 : // calculate expected segfiles
158 0 : let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE);
159 0 : let expected_last_segno = backup_lsn.segment_number(WAL_SEGSIZE);
160 0 : let mut expected_segfiles: HashSet<String> = HashSet::from_iter(
161 0 : (expected_first_segno..expected_last_segno)
162 0 : .map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)),
163 0 : );
164 0 : let expected_files_num = expected_segfiles.len();
165 0 : debug!("expecting {} files", expected_segfiles.len(),);
166 :
167 : // now list s3 and check if it misses something
168 0 : let ttshid =
169 0 : TenantShardTimelineId::new(TenantShardId::unsharded(ttid.tenant_id), ttid.timeline_id);
170 0 : let mut timeline_dir_target = root.timeline_root(&ttshid);
171 0 : // stream_listing yields only common_prefixes if delimiter is not empty, but
172 0 : // we need files, so unset it.
173 0 : timeline_dir_target.delimiter = String::new();
174 0 :
175 0 : let prefix_str = &timeline_dir_target
176 0 : .prefix_in_bucket
177 0 : .strip_prefix("/")
178 0 : .unwrap_or(&timeline_dir_target.prefix_in_bucket);
179 0 :
180 0 : let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
181 0 : while let Some(obj) = stream.next().await {
182 0 : let (key, _obj) = obj?;
183 :
184 0 : let seg_name = key
185 0 : .get_path()
186 0 : .as_str()
187 0 : .strip_prefix(prefix_str)
188 0 : .expect("failed to extract segment name");
189 0 : expected_segfiles.remove(seg_name);
190 : }
191 0 : if !expected_segfiles.is_empty() {
192 : // Before complaining check cplane, probably timeline is already deleted.
193 0 : let bdata = api_client
194 0 : .find_timeline_branch(ttid.tenant_id, ttid.timeline_id)
195 0 : .await?;
196 0 : let deleted = match bdata {
197 0 : Some(bdata) => bdata.deleted,
198 : None => {
199 : // note: should be careful with selecting proper cplane address
200 0 : info!("ttid {} not found, assuming it is deleted", ttid);
201 0 : true
202 : }
203 : };
204 0 : if deleted {
205 : // ok, branch is deleted
206 0 : return Ok(TimelineCheckResult {
207 0 : ttid,
208 0 : is_ok: true,
209 0 : is_deleted: true,
210 0 : });
211 0 : }
212 0 : error!(
213 0 : "ttid {}: missing {} files out of {}, timeline_start_lsn {}, wal_backup_lsn {}",
214 0 : ttid,
215 0 : expected_segfiles.len(),
216 : expected_files_num,
217 : timeline_start_lsn,
218 : backup_lsn,
219 : );
220 0 : return Ok(TimelineCheckResult {
221 0 : ttid,
222 0 : is_ok: false,
223 0 : is_deleted: false,
224 0 : });
225 0 : }
226 0 : Ok(TimelineCheckResult {
227 0 : ttid,
228 0 : is_ok: true,
229 0 : is_deleted: false,
230 0 : })
231 0 : }
232 :
233 0 : fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
234 0 : let der_certs = rustls_native_certs::load_native_certs();
235 0 :
236 0 : if !der_certs.errors.is_empty() {
237 0 : bail!("could not load native tls certs: {:?}", der_certs.errors);
238 0 : }
239 0 :
240 0 : let mut store = rustls::RootCertStore::empty();
241 0 : store.add_parsable_certificates(der_certs.certs);
242 0 : Ok(Arc::new(store))
243 0 : }
244 : static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
245 :
246 0 : async fn load_timelines_from_db(
247 0 : tenant_ids: Vec<TenantId>,
248 0 : dump_db_connstr: String,
249 0 : dump_db_table: String,
250 0 : ) -> anyhow::Result<Vec<TimelineLsnData>> {
251 0 : info!("loading from table {dump_db_table}");
252 :
253 : // Use rustls (Neon requires TLS)
254 0 : let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
255 0 : let client_config =
256 0 : rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
257 0 : .with_safe_default_protocol_versions()
258 0 : .context("ring should support the default protocol versions")?
259 0 : .with_root_certificates(root_store)
260 0 : .with_no_client_auth();
261 0 : let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
262 0 : let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
263 : // The connection object performs the actual communication with the database,
264 : // so spawn it off to run on its own.
265 0 : tokio::spawn(async move {
266 0 : if let Err(e) = connection.await {
267 0 : eprintln!("connection error: {}", e);
268 0 : }
269 0 : });
270 :
271 0 : let tenant_filter_clause = if !tenant_ids.is_empty() {
272 0 : format!(
273 0 : "and tenant_id in ({})",
274 0 : tenant_ids
275 0 : .iter()
276 0 : .map(|t| format!("'{}'", t))
277 0 : .collect::<Vec<_>>()
278 0 : .join(", ")
279 0 : )
280 : } else {
281 0 : "".to_owned()
282 : };
283 0 : let query = format!(
284 0 : "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) \
285 0 : from \"{dump_db_table}\" \
286 0 : where not is_cancelled {tenant_filter_clause} \
287 0 : group by tenant_id, timeline_id;"
288 0 : );
289 0 : info!("query is {}", query);
290 0 : let timelines = client.query(&query, &[]).await?;
291 :
292 0 : let timelines = timelines
293 0 : .into_iter()
294 0 : .map(|row| {
295 0 : let tenant_id = row.get(0);
296 0 : let timeline_id = row.get(1);
297 0 : let timeline_start_lsn_pg: PgLsn = row.get(2);
298 0 : let backup_lsn_pg: PgLsn = row.get(3);
299 0 :
300 0 : TimelineLsnData {
301 0 : tenant_id,
302 0 : timeline_id,
303 0 : timeline_start_lsn: Lsn(u64::from(timeline_start_lsn_pg)),
304 0 : backup_lsn: Lsn(u64::from(backup_lsn_pg)),
305 0 : }
306 0 : })
307 0 : .collect::<Vec<TimelineLsnData>>();
308 0 : Ok(timelines)
309 0 : }
|