Line data Source code
1 : #![deny(unsafe_code)]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 : pub mod checks;
4 : pub mod cloud_admin_api;
5 : pub mod find_large_objects;
6 : pub mod garbage;
7 : pub mod metadata_stream;
8 : pub mod pageserver_physical_gc;
9 : pub mod scan_pageserver_metadata;
10 : pub mod scan_safekeeper_metadata;
11 : pub mod tenant_snapshot;
12 :
13 : use std::env;
14 : use std::fmt::Display;
15 : use std::sync::Arc;
16 : use std::time::Duration;
17 :
18 : use anyhow::{anyhow, Context};
19 : use aws_sdk_s3::config::Region;
20 : use aws_sdk_s3::error::DisplayErrorContext;
21 : use aws_sdk_s3::Client;
22 :
23 : use camino::{Utf8Path, Utf8PathBuf};
24 : use clap::ValueEnum;
25 : use pageserver::tenant::TENANTS_SEGMENT_NAME;
26 : use pageserver_api::shard::TenantShardId;
27 : use remote_storage::RemotePath;
28 : use reqwest::Url;
29 : use serde::{Deserialize, Serialize};
30 : use tokio::io::AsyncReadExt;
31 : use tracing::error;
32 : use tracing_appender::non_blocking::WorkerGuard;
33 : use tracing_subscriber::{fmt, prelude::*, EnvFilter};
34 : use utils::fs_ext;
35 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
36 :
37 : const MAX_RETRIES: usize = 20;
38 : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
39 :
40 : #[derive(Debug, Clone)]
41 : pub struct S3Target {
42 : pub bucket_name: String,
43 : /// This `prefix_in_bucket` is only equal to the PS/SK config of the same
44 : /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket
45 : /// with extra parts.
46 : pub prefix_in_bucket: String,
47 : pub delimiter: String,
48 : }
49 :
50 : /// Convenience for referring to timelines within a particular shard: more ergonomic
51 : /// than using a 2-tuple.
52 : ///
53 : /// This is the shard-aware equivalent of TenantTimelineId. It's defined here rather
54 : /// than somewhere more broadly exposed, because this kind of thing is rarely needed
55 : /// in the pageserver, as all timeline objects existing in the scope of a particular
56 : /// tenant: the scrubber is different in that it handles collections of data referring to many
57 : /// TenantShardTimelineIds in on place.
58 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
59 : pub struct TenantShardTimelineId {
60 : tenant_shard_id: TenantShardId,
61 : timeline_id: TimelineId,
62 : }
63 :
64 : impl TenantShardTimelineId {
65 0 : fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
66 0 : Self {
67 0 : tenant_shard_id,
68 0 : timeline_id,
69 0 : }
70 0 : }
71 :
72 0 : fn as_tenant_timeline_id(&self) -> TenantTimelineId {
73 0 : TenantTimelineId::new(self.tenant_shard_id.tenant_id, self.timeline_id)
74 0 : }
75 : }
76 :
77 : impl Display for TenantShardTimelineId {
78 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 0 : write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id)
80 0 : }
81 : }
82 :
83 0 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
84 : pub enum TraversingDepth {
85 : Tenant,
86 : Timeline,
87 : }
88 :
89 : impl Display for TraversingDepth {
90 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 0 : f.write_str(match self {
92 0 : Self::Tenant => "tenant",
93 0 : Self::Timeline => "timeline",
94 : })
95 0 : }
96 : }
97 :
98 0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
99 : pub enum NodeKind {
100 : Safekeeper,
101 : Pageserver,
102 : }
103 :
104 : impl NodeKind {
105 0 : fn as_str(&self) -> &'static str {
106 0 : match self {
107 0 : Self::Safekeeper => "safekeeper",
108 0 : Self::Pageserver => "pageserver",
109 : }
110 0 : }
111 : }
112 :
113 : impl Display for NodeKind {
114 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 0 : f.write_str(self.as_str())
116 0 : }
117 : }
118 :
119 : impl S3Target {
120 0 : pub fn with_sub_segment(&self, new_segment: &str) -> Self {
121 0 : let mut new_self = self.clone();
122 0 : if new_self.prefix_in_bucket.is_empty() {
123 0 : new_self.prefix_in_bucket = format!("/{}/", new_segment);
124 0 : } else {
125 0 : if new_self.prefix_in_bucket.ends_with('/') {
126 0 : new_self.prefix_in_bucket.pop();
127 0 : }
128 0 : new_self.prefix_in_bucket =
129 0 : [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
130 : }
131 0 : new_self
132 0 : }
133 : }
134 :
135 : #[derive(Clone)]
136 : pub enum RootTarget {
137 : Pageserver(S3Target),
138 : Safekeeper(S3Target),
139 : }
140 :
141 : impl RootTarget {
142 0 : pub fn tenants_root(&self) -> S3Target {
143 0 : match self {
144 0 : Self::Pageserver(root) => root.with_sub_segment(TENANTS_SEGMENT_NAME),
145 0 : Self::Safekeeper(root) => root.clone(),
146 : }
147 0 : }
148 :
149 0 : pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target {
150 0 : match self {
151 0 : Self::Pageserver(_) => self.tenants_root().with_sub_segment(&tenant_id.to_string()),
152 0 : Self::Safekeeper(_) => self
153 0 : .tenants_root()
154 0 : .with_sub_segment(&tenant_id.tenant_id.to_string()),
155 : }
156 0 : }
157 :
158 0 : pub(crate) fn tenant_shards_prefix(&self, tenant_id: &TenantId) -> S3Target {
159 0 : // Only pageserver remote storage contains tenant-shards
160 0 : assert!(matches!(self, Self::Pageserver(_)));
161 0 : let Self::Pageserver(root) = self else {
162 0 : panic!();
163 : };
164 :
165 0 : S3Target {
166 0 : bucket_name: root.bucket_name.clone(),
167 0 : prefix_in_bucket: format!(
168 0 : "{}/{TENANTS_SEGMENT_NAME}/{tenant_id}",
169 0 : root.prefix_in_bucket
170 0 : ),
171 0 : delimiter: root.delimiter.clone(),
172 0 : }
173 0 : }
174 :
175 0 : pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
176 0 : match self {
177 0 : Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
178 0 : Self::Safekeeper(_) => self.tenant_root(tenant_id),
179 : }
180 0 : }
181 :
182 0 : pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target {
183 0 : self.timelines_root(&id.tenant_shard_id)
184 0 : .with_sub_segment(&id.timeline_id.to_string())
185 0 : }
186 :
187 : /// Given RemotePath "tenants/foo/timelines/bar/layerxyz", prefix it to a literal
188 : /// key in the S3 bucket.
189 0 : pub fn absolute_key(&self, key: &RemotePath) -> String {
190 0 : let root = match self {
191 0 : Self::Pageserver(root) => root,
192 0 : Self::Safekeeper(root) => root,
193 : };
194 :
195 0 : let prefix = &root.prefix_in_bucket;
196 0 : if prefix.ends_with('/') {
197 0 : format!("{prefix}{key}")
198 : } else {
199 0 : format!("{prefix}/{key}")
200 : }
201 0 : }
202 :
203 0 : pub fn bucket_name(&self) -> &str {
204 0 : match self {
205 0 : Self::Pageserver(root) => &root.bucket_name,
206 0 : Self::Safekeeper(root) => &root.bucket_name,
207 : }
208 0 : }
209 :
210 0 : pub fn delimiter(&self) -> &str {
211 0 : match self {
212 0 : Self::Pageserver(root) => &root.delimiter,
213 0 : Self::Safekeeper(root) => &root.delimiter,
214 : }
215 0 : }
216 : }
217 :
218 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
219 : #[serde(deny_unknown_fields)]
220 : pub struct BucketConfig {
221 : pub region: String,
222 : pub bucket: String,
223 : pub prefix_in_bucket: Option<String>,
224 : }
225 :
226 : impl BucketConfig {
227 0 : pub fn from_env() -> anyhow::Result<Self> {
228 0 : let region = env::var("REGION").context("'REGION' param retrieval")?;
229 0 : let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
230 0 : let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
231 0 :
232 0 : Ok(Self {
233 0 : region,
234 0 : bucket,
235 0 : prefix_in_bucket,
236 0 : })
237 0 : }
238 : }
239 :
240 : pub struct ControllerClientConfig {
241 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
242 : pub controller_api: Url,
243 :
244 : /// JWT token for authenticating with storage controller. Requires scope 'scrubber' or 'admin'.
245 : pub controller_jwt: String,
246 : }
247 :
248 : pub struct ConsoleConfig {
249 : pub token: String,
250 : pub base_url: Url,
251 : }
252 :
253 : impl ConsoleConfig {
254 0 : pub fn from_env() -> anyhow::Result<Self> {
255 0 : let base_url: Url = env::var("CLOUD_ADMIN_API_URL")
256 0 : .context("'CLOUD_ADMIN_API_URL' param retrieval")?
257 0 : .parse()
258 0 : .context("'CLOUD_ADMIN_API_URL' param parsing")?;
259 :
260 0 : let token = env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR)
261 0 : .context("'CLOUD_ADMIN_API_TOKEN' environment variable fetch")?;
262 :
263 0 : Ok(Self { base_url, token })
264 0 : }
265 : }
266 :
267 0 : pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
268 0 : let stderr_logs = fmt::Layer::new()
269 0 : .with_target(false)
270 0 : .with_writer(std::io::stderr);
271 :
272 0 : let disable_file_logging = match std::env::var("PAGESERVER_DISABLE_FILE_LOGGING") {
273 0 : Ok(s) => s == "1" || s.to_lowercase() == "true",
274 0 : Err(_) => false,
275 : };
276 :
277 0 : if disable_file_logging {
278 0 : tracing_subscriber::registry()
279 0 : .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
280 0 : .with(stderr_logs)
281 0 : .init();
282 0 : None
283 : } else {
284 0 : let (file_writer, guard) =
285 0 : tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
286 0 : let file_logs = fmt::Layer::new()
287 0 : .with_target(false)
288 0 : .with_ansi(false)
289 0 : .with_writer(file_writer);
290 0 : tracing_subscriber::registry()
291 0 : .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
292 0 : .with(stderr_logs)
293 0 : .with(file_logs)
294 0 : .init();
295 0 : Some(guard)
296 : }
297 0 : }
298 :
299 0 : pub async fn init_s3_client(bucket_region: Region) -> Client {
300 0 : let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
301 0 : .region(bucket_region)
302 0 : .load()
303 0 : .await;
304 0 : Client::new(&config)
305 0 : }
306 :
307 0 : async fn init_remote(
308 0 : bucket_config: BucketConfig,
309 0 : node_kind: NodeKind,
310 0 : ) -> anyhow::Result<(Arc<Client>, RootTarget)> {
311 0 : let bucket_region = Region::new(bucket_config.region);
312 0 : let delimiter = "/".to_string();
313 0 : let s3_client = Arc::new(init_s3_client(bucket_region).await);
314 :
315 0 : let s3_root = match node_kind {
316 0 : NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
317 0 : bucket_name: bucket_config.bucket,
318 0 : prefix_in_bucket: bucket_config
319 0 : .prefix_in_bucket
320 0 : .unwrap_or("pageserver/v1".to_string()),
321 0 : delimiter,
322 0 : }),
323 0 : NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
324 0 : bucket_name: bucket_config.bucket,
325 0 : prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or("wal/".to_string()),
326 0 : delimiter,
327 0 : }),
328 : };
329 :
330 0 : Ok((s3_client, s3_root))
331 0 : }
332 :
333 0 : async fn list_objects_with_retries(
334 0 : s3_client: &Client,
335 0 : s3_target: &S3Target,
336 0 : continuation_token: Option<String>,
337 0 : ) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
338 0 : for trial in 0..MAX_RETRIES {
339 0 : match s3_client
340 0 : .list_objects_v2()
341 0 : .bucket(&s3_target.bucket_name)
342 0 : .prefix(&s3_target.prefix_in_bucket)
343 0 : .delimiter(&s3_target.delimiter)
344 0 : .set_continuation_token(continuation_token.clone())
345 0 : .send()
346 0 : .await
347 : {
348 0 : Ok(response) => return Ok(response),
349 0 : Err(e) => {
350 0 : if trial == MAX_RETRIES - 1 {
351 0 : return Err(e)
352 0 : .with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
353 0 : }
354 0 : error!(
355 0 : "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
356 0 : s3_target.bucket_name,
357 0 : s3_target.prefix_in_bucket,
358 0 : s3_target.delimiter,
359 0 : DisplayErrorContext(e),
360 : );
361 0 : tokio::time::sleep(Duration::from_secs(1)).await;
362 : }
363 : }
364 : }
365 0 : Err(anyhow!("unreachable unless MAX_RETRIES==0"))
366 0 : }
367 :
368 0 : async fn download_object_with_retries(
369 0 : s3_client: &Client,
370 0 : bucket_name: &str,
371 0 : key: &str,
372 0 : ) -> anyhow::Result<Vec<u8>> {
373 0 : for _ in 0..MAX_RETRIES {
374 0 : let mut body_buf = Vec::new();
375 0 : let response_stream = match s3_client
376 0 : .get_object()
377 0 : .bucket(bucket_name)
378 0 : .key(key)
379 0 : .send()
380 0 : .await
381 : {
382 0 : Ok(response) => response,
383 0 : Err(e) => {
384 0 : error!("Failed to download object for key {key}: {e}");
385 0 : tokio::time::sleep(Duration::from_secs(1)).await;
386 0 : continue;
387 : }
388 : };
389 :
390 0 : match response_stream
391 0 : .body
392 0 : .into_async_read()
393 0 : .read_to_end(&mut body_buf)
394 0 : .await
395 : {
396 0 : Ok(bytes_read) => {
397 0 : tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
398 0 : return Ok(body_buf);
399 : }
400 0 : Err(e) => {
401 0 : error!("Failed to stream object body for key {key}: {e}");
402 0 : tokio::time::sleep(Duration::from_secs(1)).await;
403 : }
404 : }
405 : }
406 :
407 0 : anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
408 0 : }
409 :
410 0 : async fn download_object_to_file(
411 0 : s3_client: &Client,
412 0 : bucket_name: &str,
413 0 : key: &str,
414 0 : version_id: Option<&str>,
415 0 : local_path: &Utf8Path,
416 0 : ) -> anyhow::Result<()> {
417 0 : let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
418 0 : for _ in 0..MAX_RETRIES {
419 0 : tokio::fs::remove_file(&tmp_path)
420 0 : .await
421 0 : .or_else(fs_ext::ignore_not_found)?;
422 :
423 0 : let mut file = tokio::fs::File::create(&tmp_path)
424 0 : .await
425 0 : .context("Opening output file")?;
426 :
427 0 : let request = s3_client.get_object().bucket(bucket_name).key(key);
428 :
429 0 : let request = match version_id {
430 0 : Some(version_id) => request.version_id(version_id),
431 0 : None => request,
432 : };
433 :
434 0 : let response_stream = match request.send().await {
435 0 : Ok(response) => response,
436 0 : Err(e) => {
437 0 : error!(
438 0 : "Failed to download object for key {key} version {}: {e:#}",
439 0 : version_id.unwrap_or("")
440 : );
441 0 : tokio::time::sleep(Duration::from_secs(1)).await;
442 0 : continue;
443 : }
444 : };
445 :
446 0 : let mut read_stream = response_stream.body.into_async_read();
447 0 :
448 0 : tokio::io::copy(&mut read_stream, &mut file).await?;
449 :
450 0 : tokio::fs::rename(&tmp_path, local_path).await?;
451 0 : return Ok(());
452 : }
453 :
454 0 : anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
455 0 : }
|