Line data Source code
1 : use std::{collections::HashSet, str::FromStr, sync::Arc};
2 :
3 : use aws_sdk_s3::Client;
4 : use futures::stream::{StreamExt, TryStreamExt};
5 : use once_cell::sync::OnceCell;
6 : use pageserver_api::shard::TenantShardId;
7 : use postgres_ffi::{XLogFileName, PG_TLI};
8 : use serde::Serialize;
9 : use tokio_postgres::types::PgLsn;
10 : use tracing::{error, info, trace};
11 : use utils::{
12 : id::{TenantId, TenantTimelineId, TimelineId},
13 : lsn::Lsn,
14 : };
15 :
16 : use crate::{
17 : cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
18 : BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
19 : };
20 :
21 : /// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
22 : const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
23 :
24 : #[derive(Serialize)]
25 : pub struct MetadataSummary {
26 : timeline_count: usize,
27 : with_errors: HashSet<TenantTimelineId>,
28 : deleted_count: usize,
29 : }
30 :
31 : impl MetadataSummary {
32 0 : fn new() -> Self {
33 0 : Self {
34 0 : timeline_count: 0,
35 0 : with_errors: HashSet::new(),
36 0 : deleted_count: 0,
37 0 : }
38 0 : }
39 :
40 0 : pub fn summary_string(&self) -> String {
41 0 : format!(
42 0 : "timeline_count: {}, with_errors: {}",
43 0 : self.timeline_count,
44 0 : self.with_errors.len()
45 0 : )
46 0 : }
47 :
48 0 : pub fn is_empty(&self) -> bool {
49 0 : self.timeline_count == 0
50 0 : }
51 :
52 0 : pub fn is_fatal(&self) -> bool {
53 0 : !self.with_errors.is_empty()
54 0 : }
55 : }
56 :
57 : /// Scan the safekeeper metadata in an S3 bucket, reporting errors and
58 : /// statistics.
59 : ///
60 : /// It works by listing timelines along with timeline_start_lsn and backup_lsn
61 : /// in debug dump in dump_db_table and verifying its s3 contents. If some WAL
62 : /// segments are missing, before complaining control plane is queried to check if
63 : /// the project wasn't deleted in the meanwhile.
64 0 : pub async fn scan_safekeeper_metadata(
65 0 : bucket_config: BucketConfig,
66 0 : tenant_ids: Vec<TenantId>,
67 0 : dump_db_connstr: String,
68 0 : dump_db_table: String,
69 0 : ) -> anyhow::Result<MetadataSummary> {
70 0 : info!(
71 0 : "checking bucket {}, region {}, dump_db_table {}",
72 : bucket_config.bucket, bucket_config.region, dump_db_table
73 : );
74 : // Use rustls (Neon requires TLS)
75 0 : let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
76 0 : let client_config = rustls::ClientConfig::builder()
77 0 : .with_root_certificates(root_store)
78 0 : .with_no_client_auth();
79 0 : let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
80 0 : let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
81 : // The connection object performs the actual communication with the database,
82 : // so spawn it off to run on its own.
83 0 : tokio::spawn(async move {
84 0 : if let Err(e) = connection.await {
85 0 : eprintln!("connection error: {}", e);
86 0 : }
87 0 : });
88 :
89 0 : let tenant_filter_clause = if !tenant_ids.is_empty() {
90 0 : format!(
91 0 : "and tenant_id in ({})",
92 0 : tenant_ids
93 0 : .iter()
94 0 : .map(|t| format!("'{}'", t))
95 0 : .collect::<Vec<_>>()
96 0 : .join(", ")
97 0 : )
98 : } else {
99 0 : "".to_owned()
100 : };
101 0 : let query = format!(
102 0 : "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) from \"{}\" where not is_cancelled {} group by tenant_id, timeline_id;",
103 0 : dump_db_table, tenant_filter_clause,
104 0 : );
105 0 : info!("query is {}", query);
106 0 : let timelines = client.query(&query, &[]).await?;
107 0 : info!("loaded {} timelines", timelines.len());
108 :
109 0 : let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?;
110 0 : let console_config = ConsoleConfig::from_env()?;
111 0 : let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
112 0 :
113 0 : let checks = futures::stream::iter(timelines.iter().map(Ok)).map_ok(|row| {
114 0 : let tenant_id = TenantId::from_str(row.get(0)).expect("failed to parse tenant_id");
115 0 : let timeline_id = TimelineId::from_str(row.get(1)).expect("failed to parse tenant_id");
116 0 : let timeline_start_lsn_pg: PgLsn = row.get(2);
117 0 : let timeline_start_lsn: Lsn = Lsn(u64::from(timeline_start_lsn_pg));
118 0 : let backup_lsn_pg: PgLsn = row.get(3);
119 0 : let backup_lsn: Lsn = Lsn(u64::from(backup_lsn_pg));
120 0 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
121 0 : check_timeline(
122 0 : &s3_client,
123 0 : &target,
124 0 : &cloud_admin_api_client,
125 0 : ttid,
126 0 : timeline_start_lsn,
127 0 : backup_lsn,
128 0 : )
129 0 : });
130 0 : // Run multiple check_timeline's concurrently.
131 0 : const CONCURRENCY: usize = 32;
132 0 : let mut timelines = checks.try_buffered(CONCURRENCY);
133 0 :
134 0 : let mut summary = MetadataSummary::new();
135 0 : while let Some(r) = timelines.next().await {
136 0 : let res = r?;
137 0 : summary.timeline_count += 1;
138 0 : if !res.is_ok {
139 0 : summary.with_errors.insert(res.ttid);
140 0 : }
141 0 : if res.is_deleted {
142 0 : summary.deleted_count += 1;
143 0 : }
144 : }
145 :
146 0 : Ok(summary)
147 0 : }
148 :
149 : struct TimelineCheckResult {
150 : ttid: TenantTimelineId,
151 : is_ok: bool,
152 : is_deleted: bool, // timeline is deleted in cplane
153 : }
154 :
155 : /// List s3 and check that is has all expected WAL for the ttid. Consistency
156 : /// errors are logged to stderr; returns Ok(true) if timeline is consistent,
157 : /// Ok(false) if not, Err if failed to check.
158 0 : async fn check_timeline(
159 0 : s3_client: &Client,
160 0 : root: &RootTarget,
161 0 : api_client: &CloudAdminApiClient,
162 0 : ttid: TenantTimelineId,
163 0 : timeline_start_lsn: Lsn,
164 0 : backup_lsn: Lsn,
165 0 : ) -> anyhow::Result<TimelineCheckResult> {
166 0 : trace!(
167 0 : "checking ttid {}, should contain WAL [{}-{}]",
168 : ttid,
169 : timeline_start_lsn,
170 : backup_lsn
171 : );
172 : // calculate expected segfiles
173 0 : let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE);
174 0 : let expected_last_segno = backup_lsn.segment_number(WAL_SEGSIZE);
175 0 : let mut expected_segfiles: HashSet<String> = HashSet::from_iter(
176 0 : (expected_first_segno..expected_last_segno)
177 0 : .map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)),
178 0 : );
179 0 : let expected_files_num = expected_segfiles.len();
180 0 : trace!("expecting {} files", expected_segfiles.len(),);
181 :
182 : // now list s3 and check if it misses something
183 0 : let ttshid =
184 0 : TenantShardTimelineId::new(TenantShardId::unsharded(ttid.tenant_id), ttid.timeline_id);
185 0 : let mut timeline_dir_target = root.timeline_root(&ttshid);
186 0 : // stream_listing yields only common_prefixes if delimiter is not empty, but
187 0 : // we need files, so unset it.
188 0 : timeline_dir_target.delimiter = String::new();
189 0 :
190 0 : let mut stream = std::pin::pin!(stream_listing(s3_client, &timeline_dir_target));
191 0 : while let Some(obj) = stream.next().await {
192 0 : let obj = obj?;
193 0 : let key = obj.key();
194 0 :
195 0 : let seg_name = key
196 0 : .strip_prefix(&timeline_dir_target.prefix_in_bucket)
197 0 : .expect("failed to extract segment name");
198 0 : expected_segfiles.remove(seg_name);
199 : }
200 0 : if !expected_segfiles.is_empty() {
201 : // Before complaining check cplane, probably timeline is already deleted.
202 0 : let bdata = api_client
203 0 : .find_timeline_branch(ttid.tenant_id, ttid.timeline_id)
204 0 : .await?;
205 0 : let deleted = match bdata {
206 0 : Some(bdata) => bdata.deleted,
207 : None => {
208 : // note: should be careful with selecting proper cplane address
209 0 : info!("ttid {} not found, assuming it is deleted", ttid);
210 0 : true
211 : }
212 : };
213 0 : if deleted {
214 : // ok, branch is deleted
215 0 : return Ok(TimelineCheckResult {
216 0 : ttid,
217 0 : is_ok: true,
218 0 : is_deleted: true,
219 0 : });
220 0 : }
221 0 : error!(
222 0 : "ttid {}: missing {} files out of {}, timeline_start_lsn {}, wal_backup_lsn {}",
223 0 : ttid,
224 0 : expected_segfiles.len(),
225 : expected_files_num,
226 : timeline_start_lsn,
227 : backup_lsn,
228 : );
229 0 : return Ok(TimelineCheckResult {
230 0 : ttid,
231 0 : is_ok: false,
232 0 : is_deleted: false,
233 0 : });
234 0 : }
235 0 : Ok(TimelineCheckResult {
236 0 : ttid,
237 0 : is_ok: true,
238 0 : is_deleted: false,
239 0 : })
240 0 : }
241 :
242 0 : fn load_certs() -> Result<Arc<rustls::RootCertStore>, std::io::Error> {
243 0 : let der_certs = rustls_native_certs::load_native_certs()?;
244 0 : let mut store = rustls::RootCertStore::empty();
245 0 : store.add_parsable_certificates(der_certs);
246 0 : Ok(Arc::new(store))
247 0 : }
248 : static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
|