Line data Source code
1 : use std::{collections::HashSet, str::FromStr, sync::Arc};
2 :
3 : use futures::stream::{StreamExt, TryStreamExt};
4 : use once_cell::sync::OnceCell;
5 : use pageserver_api::shard::TenantShardId;
6 : use postgres_ffi::{XLogFileName, PG_TLI};
7 : use remote_storage::GenericRemoteStorage;
8 : use serde::Serialize;
9 : use tokio_postgres::types::PgLsn;
10 : use tracing::{debug, error, info};
11 : use utils::{
12 : id::{TenantId, TenantTimelineId, TimelineId},
13 : lsn::Lsn,
14 : };
15 :
16 : use crate::{
17 : cloud_admin_api::CloudAdminApiClient, init_remote, metadata_stream::stream_listing,
18 : BucketConfig, ConsoleConfig, NodeKind, RootTarget, TenantShardTimelineId,
19 : };
20 :
21 : /// Generally we should ask safekeepers, but so far we use everywhere default 16MB.
22 : const WAL_SEGSIZE: usize = 16 * 1024 * 1024;
23 :
24 : #[derive(Serialize)]
25 : pub struct MetadataSummary {
26 : timeline_count: usize,
27 : with_errors: HashSet<TenantTimelineId>,
28 : deleted_count: usize,
29 : }
30 :
31 : impl MetadataSummary {
32 0 : fn new() -> Self {
33 0 : Self {
34 0 : timeline_count: 0,
35 0 : with_errors: HashSet::new(),
36 0 : deleted_count: 0,
37 0 : }
38 0 : }
39 :
40 0 : pub fn summary_string(&self) -> String {
41 0 : format!(
42 0 : "timeline_count: {}, with_errors: {}",
43 0 : self.timeline_count,
44 0 : self.with_errors.len()
45 0 : )
46 0 : }
47 :
48 0 : pub fn is_empty(&self) -> bool {
49 0 : self.timeline_count == 0
50 0 : }
51 :
52 0 : pub fn is_fatal(&self) -> bool {
53 0 : !self.with_errors.is_empty()
54 0 : }
55 : }
56 :
57 0 : #[derive(serde::Deserialize)]
58 : pub struct TimelineLsnData {
59 : tenant_id: String,
60 : timeline_id: String,
61 : timeline_start_lsn: Lsn,
62 : backup_lsn: Lsn,
63 : }
64 :
65 : pub enum DatabaseOrList {
66 : Database {
67 : tenant_ids: Vec<TenantId>,
68 : connstr: String,
69 : table: String,
70 : },
71 : List(Vec<TimelineLsnData>),
72 : }
73 :
74 : /// Scan the safekeeper metadata in an S3 bucket, reporting errors and
75 : /// statistics.
76 : ///
77 : /// It works by listing timelines along with timeline_start_lsn and backup_lsn
78 : /// in debug dump in dump_db_table and verifying its s3 contents. If some WAL
79 : /// segments are missing, before complaining control plane is queried to check if
80 : /// the project wasn't deleted in the meanwhile.
81 0 : pub async fn scan_safekeeper_metadata(
82 0 : bucket_config: BucketConfig,
83 0 : db_or_list: DatabaseOrList,
84 0 : ) -> anyhow::Result<MetadataSummary> {
85 0 : info!(
86 0 : "checking bucket {}, region {}",
87 : bucket_config.bucket, bucket_config.region
88 : );
89 :
90 0 : let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
91 0 : let console_config = ConsoleConfig::from_env()?;
92 0 : let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
93 :
94 0 : let timelines = match db_or_list {
95 : DatabaseOrList::Database {
96 0 : tenant_ids,
97 0 : connstr,
98 0 : table,
99 0 : } => load_timelines_from_db(tenant_ids, connstr, table).await?,
100 0 : DatabaseOrList::List(list) => list,
101 : };
102 0 : info!("loaded {} timelines", timelines.len());
103 :
104 0 : let checks = futures::stream::iter(timelines.into_iter().map(Ok)).map_ok(|timeline| {
105 0 : let tenant_id = TenantId::from_str(&timeline.tenant_id).expect("failed to parse tenant_id");
106 0 : let timeline_id =
107 0 : TimelineId::from_str(&timeline.timeline_id).expect("failed to parse tenant_id");
108 0 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
109 0 : check_timeline(
110 0 : &remote_client,
111 0 : &target,
112 0 : &cloud_admin_api_client,
113 0 : ttid,
114 0 : timeline.timeline_start_lsn,
115 0 : timeline.backup_lsn,
116 0 : )
117 0 : });
118 : // Run multiple check_timeline's concurrently.
119 : const CONCURRENCY: usize = 32;
120 0 : let mut timelines = checks.try_buffered(CONCURRENCY);
121 0 :
122 0 : let mut summary = MetadataSummary::new();
123 0 : while let Some(r) = timelines.next().await {
124 0 : let res = r?;
125 0 : summary.timeline_count += 1;
126 0 : if !res.is_ok {
127 0 : summary.with_errors.insert(res.ttid);
128 0 : }
129 0 : if res.is_deleted {
130 0 : summary.deleted_count += 1;
131 0 : }
132 : }
133 :
134 0 : Ok(summary)
135 0 : }
136 :
137 : struct TimelineCheckResult {
138 : ttid: TenantTimelineId,
139 : is_ok: bool,
140 : is_deleted: bool, // timeline is deleted in cplane
141 : }
142 :
143 : /// List s3 and check that is has all expected WAL for the ttid. Consistency
144 : /// errors are logged to stderr; returns Ok(true) if timeline is consistent,
145 : /// Ok(false) if not, Err if failed to check.
146 0 : async fn check_timeline(
147 0 : remote_client: &GenericRemoteStorage,
148 0 : root: &RootTarget,
149 0 : api_client: &CloudAdminApiClient,
150 0 : ttid: TenantTimelineId,
151 0 : timeline_start_lsn: Lsn,
152 0 : backup_lsn: Lsn,
153 0 : ) -> anyhow::Result<TimelineCheckResult> {
154 0 : debug!(
155 0 : "checking ttid {}, should contain WAL [{}-{}]",
156 : ttid, timeline_start_lsn, backup_lsn
157 : );
158 : // calculate expected segfiles
159 0 : let expected_first_segno = timeline_start_lsn.segment_number(WAL_SEGSIZE);
160 0 : let expected_last_segno = backup_lsn.segment_number(WAL_SEGSIZE);
161 0 : let mut expected_segfiles: HashSet<String> = HashSet::from_iter(
162 0 : (expected_first_segno..expected_last_segno)
163 0 : .map(|segno| XLogFileName(PG_TLI, segno, WAL_SEGSIZE)),
164 0 : );
165 0 : let expected_files_num = expected_segfiles.len();
166 0 : debug!("expecting {} files", expected_segfiles.len(),);
167 :
168 : // now list s3 and check if it misses something
169 0 : let ttshid =
170 0 : TenantShardTimelineId::new(TenantShardId::unsharded(ttid.tenant_id), ttid.timeline_id);
171 0 : let mut timeline_dir_target = root.timeline_root(&ttshid);
172 0 : // stream_listing yields only common_prefixes if delimiter is not empty, but
173 0 : // we need files, so unset it.
174 0 : timeline_dir_target.delimiter = String::new();
175 0 :
176 0 : let prefix_str = &timeline_dir_target
177 0 : .prefix_in_bucket
178 0 : .strip_prefix("/")
179 0 : .unwrap_or(&timeline_dir_target.prefix_in_bucket);
180 0 :
181 0 : let mut stream = std::pin::pin!(stream_listing(remote_client, &timeline_dir_target));
182 0 : while let Some(obj) = stream.next().await {
183 0 : let (key, _obj) = obj?;
184 :
185 0 : let seg_name = key
186 0 : .get_path()
187 0 : .as_str()
188 0 : .strip_prefix(prefix_str)
189 0 : .expect("failed to extract segment name");
190 0 : expected_segfiles.remove(seg_name);
191 : }
192 0 : if !expected_segfiles.is_empty() {
193 : // Before complaining check cplane, probably timeline is already deleted.
194 0 : let bdata = api_client
195 0 : .find_timeline_branch(ttid.tenant_id, ttid.timeline_id)
196 0 : .await?;
197 0 : let deleted = match bdata {
198 0 : Some(bdata) => bdata.deleted,
199 : None => {
200 : // note: should be careful with selecting proper cplane address
201 0 : info!("ttid {} not found, assuming it is deleted", ttid);
202 0 : true
203 : }
204 : };
205 0 : if deleted {
206 : // ok, branch is deleted
207 0 : return Ok(TimelineCheckResult {
208 0 : ttid,
209 0 : is_ok: true,
210 0 : is_deleted: true,
211 0 : });
212 0 : }
213 0 : error!(
214 0 : "ttid {}: missing {} files out of {}, timeline_start_lsn {}, wal_backup_lsn {}",
215 0 : ttid,
216 0 : expected_segfiles.len(),
217 : expected_files_num,
218 : timeline_start_lsn,
219 : backup_lsn,
220 : );
221 0 : return Ok(TimelineCheckResult {
222 0 : ttid,
223 0 : is_ok: false,
224 0 : is_deleted: false,
225 0 : });
226 0 : }
227 0 : Ok(TimelineCheckResult {
228 0 : ttid,
229 0 : is_ok: true,
230 0 : is_deleted: false,
231 0 : })
232 0 : }
233 :
234 0 : fn load_certs() -> Result<Arc<rustls::RootCertStore>, std::io::Error> {
235 0 : let der_certs = rustls_native_certs::load_native_certs()?;
236 0 : let mut store = rustls::RootCertStore::empty();
237 0 : store.add_parsable_certificates(der_certs);
238 0 : Ok(Arc::new(store))
239 0 : }
240 : static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
241 :
242 0 : async fn load_timelines_from_db(
243 0 : tenant_ids: Vec<TenantId>,
244 0 : dump_db_connstr: String,
245 0 : dump_db_table: String,
246 0 : ) -> anyhow::Result<Vec<TimelineLsnData>> {
247 0 : info!("loading from table {dump_db_table}");
248 :
249 : // Use rustls (Neon requires TLS)
250 0 : let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
251 0 : let client_config = rustls::ClientConfig::builder()
252 0 : .with_root_certificates(root_store)
253 0 : .with_no_client_auth();
254 0 : let tls_connector = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
255 0 : let (client, connection) = tokio_postgres::connect(&dump_db_connstr, tls_connector).await?;
256 : // The connection object performs the actual communication with the database,
257 : // so spawn it off to run on its own.
258 0 : tokio::spawn(async move {
259 0 : if let Err(e) = connection.await {
260 0 : eprintln!("connection error: {}", e);
261 0 : }
262 0 : });
263 :
264 0 : let tenant_filter_clause = if !tenant_ids.is_empty() {
265 0 : format!(
266 0 : "and tenant_id in ({})",
267 0 : tenant_ids
268 0 : .iter()
269 0 : .map(|t| format!("'{}'", t))
270 0 : .collect::<Vec<_>>()
271 0 : .join(", ")
272 0 : )
273 : } else {
274 0 : "".to_owned()
275 : };
276 0 : let query = format!(
277 0 : "select tenant_id, timeline_id, min(timeline_start_lsn), max(backup_lsn) \
278 0 : from \"{dump_db_table}\" \
279 0 : where not is_cancelled {tenant_filter_clause} \
280 0 : group by tenant_id, timeline_id;"
281 0 : );
282 0 : info!("query is {}", query);
283 0 : let timelines = client.query(&query, &[]).await?;
284 :
285 0 : let timelines = timelines
286 0 : .into_iter()
287 0 : .map(|row| {
288 0 : let tenant_id = row.get(0);
289 0 : let timeline_id = row.get(1);
290 0 : let timeline_start_lsn_pg: PgLsn = row.get(2);
291 0 : let backup_lsn_pg: PgLsn = row.get(3);
292 0 :
293 0 : TimelineLsnData {
294 0 : tenant_id,
295 0 : timeline_id,
296 0 : timeline_start_lsn: Lsn(u64::from(timeline_start_lsn_pg)),
297 0 : backup_lsn: Lsn(u64::from(backup_lsn_pg)),
298 0 : }
299 0 : })
300 0 : .collect::<Vec<TimelineLsnData>>();
301 0 : Ok(timelines)
302 0 : }
|