Line data Source code
1 : #![deny(unsafe_code)]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 : pub mod checks;
4 : pub mod cloud_admin_api;
5 : pub mod find_large_objects;
6 : pub mod garbage;
7 : pub mod metadata_stream;
8 : pub mod pageserver_physical_gc;
9 : pub mod scan_pageserver_metadata;
10 : pub mod scan_safekeeper_metadata;
11 : pub mod tenant_snapshot;
12 :
13 : use std::env;
14 : use std::fmt::Display;
15 : use std::sync::Arc;
16 : use std::time::Duration;
17 :
18 : use anyhow::Context;
19 : use aws_config::retry::{RetryConfigBuilder, RetryMode};
20 : use aws_sdk_s3::config::Region;
21 : use aws_sdk_s3::error::DisplayErrorContext;
22 : use aws_sdk_s3::Client;
23 :
24 : use camino::{Utf8Path, Utf8PathBuf};
25 : use clap::ValueEnum;
26 : use futures::{Stream, StreamExt};
27 : use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_timeline_path};
28 : use pageserver::tenant::TENANTS_SEGMENT_NAME;
29 : use pageserver_api::shard::TenantShardId;
30 : use remote_storage::{
31 : DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig,
32 : RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
33 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
34 : };
35 : use reqwest::Url;
36 : use serde::{Deserialize, Serialize};
37 : use storage_controller_client::control_api;
38 : use tokio::io::AsyncReadExt;
39 : use tokio_util::sync::CancellationToken;
40 : use tracing::{error, warn};
41 : use tracing_appender::non_blocking::WorkerGuard;
42 : use tracing_subscriber::{fmt, prelude::*, EnvFilter};
43 : use utils::fs_ext;
44 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
45 :
46 : const MAX_RETRIES: usize = 20;
47 : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
48 :
49 : #[derive(Debug, Clone)]
50 : pub struct S3Target {
51 : pub bucket_name: String,
52 : /// This `prefix_in_bucket` is only equal to the PS/SK config of the same
53 : /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket
54 : /// with extra parts.
55 : pub prefix_in_bucket: String,
56 : pub delimiter: String,
57 : }
58 :
59 : /// Convenience for referring to timelines within a particular shard: more ergonomic
60 : /// than using a 2-tuple.
61 : ///
62 : /// This is the shard-aware equivalent of TenantTimelineId. It's defined here rather
63 : /// than somewhere more broadly exposed, because this kind of thing is rarely needed
64 : /// in the pageserver, as all timeline objects existing in the scope of a particular
65 : /// tenant: the scrubber is different in that it handles collections of data referring to many
66 : /// TenantShardTimelineIds in on place.
67 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
68 : pub struct TenantShardTimelineId {
69 : tenant_shard_id: TenantShardId,
70 : timeline_id: TimelineId,
71 : }
72 :
73 : impl TenantShardTimelineId {
74 0 : fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
75 0 : Self {
76 0 : tenant_shard_id,
77 0 : timeline_id,
78 0 : }
79 0 : }
80 :
81 0 : fn as_tenant_timeline_id(&self) -> TenantTimelineId {
82 0 : TenantTimelineId::new(self.tenant_shard_id.tenant_id, self.timeline_id)
83 0 : }
84 : }
85 :
86 : impl Display for TenantShardTimelineId {
87 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 0 : write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id)
89 0 : }
90 : }
91 :
92 0 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
93 : pub enum TraversingDepth {
94 : Tenant,
95 : Timeline,
96 : }
97 :
98 : impl Display for TraversingDepth {
99 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 0 : f.write_str(match self {
101 0 : Self::Tenant => "tenant",
102 0 : Self::Timeline => "timeline",
103 : })
104 0 : }
105 : }
106 :
107 0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
108 : pub enum NodeKind {
109 : Safekeeper,
110 : Pageserver,
111 : }
112 :
113 : impl NodeKind {
114 0 : fn as_str(&self) -> &'static str {
115 0 : match self {
116 0 : Self::Safekeeper => "safekeeper",
117 0 : Self::Pageserver => "pageserver",
118 : }
119 0 : }
120 : }
121 :
122 : impl Display for NodeKind {
123 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 0 : f.write_str(self.as_str())
125 0 : }
126 : }
127 :
128 : impl S3Target {
129 0 : pub fn with_sub_segment(&self, new_segment: &str) -> Self {
130 0 : let mut new_self = self.clone();
131 0 : if new_self.prefix_in_bucket.is_empty() {
132 0 : new_self.prefix_in_bucket = format!("/{}/", new_segment);
133 0 : } else {
134 0 : if new_self.prefix_in_bucket.ends_with('/') {
135 0 : new_self.prefix_in_bucket.pop();
136 0 : }
137 0 : new_self.prefix_in_bucket =
138 0 : [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
139 : }
140 0 : new_self
141 0 : }
142 : }
143 :
144 : #[derive(Clone)]
145 : pub enum RootTarget {
146 : Pageserver(S3Target),
147 : Safekeeper(S3Target),
148 : }
149 :
150 : impl RootTarget {
151 0 : pub fn tenants_root(&self) -> S3Target {
152 0 : match self {
153 0 : Self::Pageserver(root) => root.with_sub_segment(TENANTS_SEGMENT_NAME),
154 0 : Self::Safekeeper(root) => root.clone(),
155 : }
156 0 : }
157 :
158 0 : pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target {
159 0 : match self {
160 0 : Self::Pageserver(_) => self.tenants_root().with_sub_segment(&tenant_id.to_string()),
161 0 : Self::Safekeeper(_) => self
162 0 : .tenants_root()
163 0 : .with_sub_segment(&tenant_id.tenant_id.to_string()),
164 : }
165 0 : }
166 :
167 0 : pub(crate) fn tenant_shards_prefix(&self, tenant_id: &TenantId) -> S3Target {
168 0 : // Only pageserver remote storage contains tenant-shards
169 0 : assert!(matches!(self, Self::Pageserver(_)));
170 0 : let Self::Pageserver(root) = self else {
171 0 : panic!();
172 : };
173 :
174 0 : S3Target {
175 0 : bucket_name: root.bucket_name.clone(),
176 0 : prefix_in_bucket: format!(
177 0 : "{}/{TENANTS_SEGMENT_NAME}/{tenant_id}",
178 0 : root.prefix_in_bucket
179 0 : ),
180 0 : delimiter: root.delimiter.clone(),
181 0 : }
182 0 : }
183 :
184 0 : pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
185 0 : match self {
186 0 : Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
187 0 : Self::Safekeeper(_) => self.tenant_root(tenant_id),
188 : }
189 0 : }
190 :
191 0 : pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target {
192 0 : self.timelines_root(&id.tenant_shard_id)
193 0 : .with_sub_segment(&id.timeline_id.to_string())
194 0 : }
195 :
196 : /// Given RemotePath "tenants/foo/timelines/bar/layerxyz", prefix it to a literal
197 : /// key in the S3 bucket.
198 0 : pub fn absolute_key(&self, key: &RemotePath) -> String {
199 0 : let root = match self {
200 0 : Self::Pageserver(root) => root,
201 0 : Self::Safekeeper(root) => root,
202 : };
203 :
204 0 : let prefix = &root.prefix_in_bucket;
205 0 : if prefix.ends_with('/') {
206 0 : format!("{prefix}{key}")
207 : } else {
208 0 : format!("{prefix}/{key}")
209 : }
210 0 : }
211 :
212 0 : pub fn bucket_name(&self) -> &str {
213 0 : match self {
214 0 : Self::Pageserver(root) => &root.bucket_name,
215 0 : Self::Safekeeper(root) => &root.bucket_name,
216 : }
217 0 : }
218 :
219 0 : pub fn delimiter(&self) -> &str {
220 0 : match self {
221 0 : Self::Pageserver(root) => &root.delimiter,
222 0 : Self::Safekeeper(root) => &root.delimiter,
223 : }
224 0 : }
225 : }
226 :
227 0 : pub fn remote_timeline_path_id(id: &TenantShardTimelineId) -> RemotePath {
228 0 : remote_timeline_path(&id.tenant_shard_id, &id.timeline_id)
229 0 : }
230 :
231 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
232 : #[serde(deny_unknown_fields)]
233 : pub struct BucketConfig {
234 : pub region: String,
235 : pub bucket: String,
236 : pub prefix_in_bucket: Option<String>,
237 : }
238 :
239 : impl BucketConfig {
240 0 : pub fn from_env() -> anyhow::Result<Self> {
241 0 : let region = env::var("REGION").context("'REGION' param retrieval")?;
242 0 : let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
243 0 : let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
244 0 :
245 0 : Ok(Self {
246 0 : region,
247 0 : bucket,
248 0 : prefix_in_bucket,
249 0 : })
250 0 : }
251 : }
252 :
253 : pub struct ControllerClientConfig {
254 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
255 : pub controller_api: Url,
256 :
257 : /// JWT token for authenticating with storage controller. Requires scope 'scrubber' or 'admin'.
258 : pub controller_jwt: String,
259 : }
260 :
261 : impl ControllerClientConfig {
262 0 : pub fn build_client(self) -> control_api::Client {
263 0 : control_api::Client::new(self.controller_api, Some(self.controller_jwt))
264 0 : }
265 : }
266 :
267 : pub struct ConsoleConfig {
268 : pub token: String,
269 : pub base_url: Url,
270 : }
271 :
272 : impl ConsoleConfig {
273 0 : pub fn from_env() -> anyhow::Result<Self> {
274 0 : let base_url: Url = env::var("CLOUD_ADMIN_API_URL")
275 0 : .context("'CLOUD_ADMIN_API_URL' param retrieval")?
276 0 : .parse()
277 0 : .context("'CLOUD_ADMIN_API_URL' param parsing")?;
278 :
279 0 : let token = env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR)
280 0 : .context("'CLOUD_ADMIN_API_TOKEN' environment variable fetch")?;
281 :
282 0 : Ok(Self { base_url, token })
283 0 : }
284 : }
285 :
286 0 : pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
287 0 : let stderr_logs = fmt::Layer::new()
288 0 : .with_target(false)
289 0 : .with_writer(std::io::stderr);
290 :
291 0 : let disable_file_logging = match std::env::var("PAGESERVER_DISABLE_FILE_LOGGING") {
292 0 : Ok(s) => s == "1" || s.to_lowercase() == "true",
293 0 : Err(_) => false,
294 : };
295 :
296 0 : if disable_file_logging {
297 0 : tracing_subscriber::registry()
298 0 : .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
299 0 : .with(stderr_logs)
300 0 : .init();
301 0 : None
302 : } else {
303 0 : let (file_writer, guard) =
304 0 : tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
305 0 : let file_logs = fmt::Layer::new()
306 0 : .with_target(false)
307 0 : .with_ansi(false)
308 0 : .with_writer(file_writer);
309 0 : tracing_subscriber::registry()
310 0 : .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
311 0 : .with(stderr_logs)
312 0 : .with(file_logs)
313 0 : .init();
314 0 : Some(guard)
315 : }
316 0 : }
317 :
318 0 : async fn init_s3_client(bucket_region: Region) -> Client {
319 0 : let mut retry_config_builder = RetryConfigBuilder::new();
320 0 :
321 0 : retry_config_builder
322 0 : .set_max_attempts(Some(3))
323 0 : .set_mode(Some(RetryMode::Adaptive));
324 :
325 0 : let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
326 0 : .region(bucket_region)
327 0 : .retry_config(retry_config_builder.build())
328 0 : .load()
329 0 : .await;
330 0 : Client::new(&config)
331 0 : }
332 :
333 0 : fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
334 0 : match node_kind {
335 0 : NodeKind::Pageserver => "pageserver/v1/",
336 0 : NodeKind::Safekeeper => "wal/",
337 : }
338 0 : }
339 :
340 0 : fn make_root_target(
341 0 : bucket_name: String,
342 0 : prefix_in_bucket: String,
343 0 : node_kind: NodeKind,
344 0 : ) -> RootTarget {
345 0 : let s3_target = S3Target {
346 0 : bucket_name,
347 0 : prefix_in_bucket,
348 0 : delimiter: "/".to_string(),
349 0 : };
350 0 : match node_kind {
351 0 : NodeKind::Pageserver => RootTarget::Pageserver(s3_target),
352 0 : NodeKind::Safekeeper => RootTarget::Safekeeper(s3_target),
353 : }
354 0 : }
355 :
356 0 : async fn init_remote_s3(
357 0 : bucket_config: BucketConfig,
358 0 : node_kind: NodeKind,
359 0 : ) -> anyhow::Result<(Arc<Client>, RootTarget)> {
360 0 : let bucket_region = Region::new(bucket_config.region);
361 0 : let s3_client = Arc::new(init_s3_client(bucket_region).await);
362 0 : let default_prefix = default_prefix_in_bucket(node_kind).to_string();
363 0 :
364 0 : let s3_root = make_root_target(
365 0 : bucket_config.bucket,
366 0 : bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
367 0 : node_kind,
368 0 : );
369 0 :
370 0 : Ok((s3_client, s3_root))
371 0 : }
372 :
373 0 : async fn init_remote(
374 0 : bucket_config: BucketConfig,
375 0 : node_kind: NodeKind,
376 0 : ) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
377 0 : let endpoint = env::var("AWS_ENDPOINT_URL").ok();
378 0 : let default_prefix = default_prefix_in_bucket(node_kind).to_string();
379 0 : let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix));
380 0 : let storage = S3Config {
381 0 : bucket_name: bucket_config.bucket.clone(),
382 0 : bucket_region: bucket_config.region,
383 0 : prefix_in_bucket,
384 0 : endpoint,
385 0 : concurrency_limit: DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
386 0 : .try_into()
387 0 : .unwrap(),
388 0 : max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
389 0 : upload_storage_class: None,
390 0 : };
391 0 : let storage_config = RemoteStorageConfig {
392 0 : storage: RemoteStorageKind::AwsS3(storage),
393 0 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
394 0 : };
395 0 :
396 0 : // We already pass the prefix to the remote client above
397 0 : let prefix_in_root_target = String::new();
398 0 : let root_target = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
399 :
400 0 : let client = GenericRemoteStorage::from_config(&storage_config).await?;
401 0 : Ok((client, root_target))
402 0 : }
403 :
404 : /// Listing possibly large amounts of keys in a streaming fashion.
405 0 : fn stream_objects_with_retries<'a>(
406 0 : storage_client: &'a GenericRemoteStorage,
407 0 : listing_mode: ListingMode,
408 0 : s3_target: &'a S3Target,
409 0 : ) -> impl Stream<Item = Result<Listing, anyhow::Error>> + 'a {
410 0 : async_stream::stream! {
411 0 : let mut trial = 0;
412 0 : let cancel = CancellationToken::new();
413 0 : let prefix_str = &s3_target
414 0 : .prefix_in_bucket
415 0 : .strip_prefix("/")
416 0 : .unwrap_or(&s3_target.prefix_in_bucket);
417 0 : let prefix = RemotePath::from_string(prefix_str)?;
418 0 : let mut list_stream =
419 0 : storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel);
420 0 : while let Some(res) = list_stream.next().await {
421 0 : match res {
422 0 : Err(err) => {
423 0 : let yield_err = if err.is_permanent() {
424 0 : true
425 0 : } else {
426 0 : let backoff_time = 1 << trial.min(5);
427 0 : tokio::time::sleep(Duration::from_secs(backoff_time)).await;
428 0 : trial += 1;
429 0 : trial == MAX_RETRIES - 1
430 0 : };
431 0 : if yield_err {
432 0 : yield Err(err)
433 0 : .with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
434 0 : break;
435 0 : }
436 0 : }
437 0 : Ok(res) => {
438 0 : trial = 0;
439 0 : yield Ok(res);
440 0 : }
441 0 : }
442 0 : }
443 0 : }
444 0 : }
445 :
446 : /// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes,
447 : /// use [`stream_objects_with_retries`] instead.
448 0 : async fn list_objects_with_retries(
449 0 : remote_client: &GenericRemoteStorage,
450 0 : listing_mode: ListingMode,
451 0 : s3_target: &S3Target,
452 0 : ) -> anyhow::Result<Listing> {
453 0 : let cancel = CancellationToken::new();
454 0 : let prefix_str = &s3_target
455 0 : .prefix_in_bucket
456 0 : .strip_prefix("/")
457 0 : .unwrap_or(&s3_target.prefix_in_bucket);
458 0 : let prefix = RemotePath::from_string(prefix_str)?;
459 0 : for trial in 0..MAX_RETRIES {
460 0 : match remote_client
461 0 : .list(Some(&prefix), listing_mode, None, &cancel)
462 0 : .await
463 : {
464 0 : Ok(response) => return Ok(response),
465 0 : Err(e) => {
466 0 : if trial == MAX_RETRIES - 1 {
467 0 : return Err(e)
468 0 : .with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
469 0 : }
470 0 : warn!(
471 0 : "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
472 0 : s3_target.bucket_name,
473 0 : s3_target.prefix_in_bucket,
474 0 : s3_target.delimiter,
475 0 : DisplayErrorContext(e),
476 : );
477 0 : let backoff_time = 1 << trial.min(5);
478 0 : tokio::time::sleep(Duration::from_secs(backoff_time)).await;
479 : }
480 : }
481 : }
482 0 : panic!("MAX_RETRIES is not allowed to be 0");
483 0 : }
484 :
485 0 : async fn download_object_with_retries(
486 0 : remote_client: &GenericRemoteStorage,
487 0 : key: &RemotePath,
488 0 : ) -> anyhow::Result<Vec<u8>> {
489 0 : let cancel = CancellationToken::new();
490 0 : for trial in 0..MAX_RETRIES {
491 0 : let mut buf = Vec::new();
492 0 : let download = match remote_client
493 0 : .download(key, &DownloadOpts::default(), &cancel)
494 0 : .await
495 : {
496 0 : Ok(response) => response,
497 0 : Err(e) => {
498 0 : error!("Failed to download object for key {key}: {e}");
499 0 : let backoff_time = 1 << trial.min(5);
500 0 : tokio::time::sleep(Duration::from_secs(backoff_time)).await;
501 0 : continue;
502 : }
503 : };
504 :
505 0 : match tokio_util::io::StreamReader::new(download.download_stream)
506 0 : .read_to_end(&mut buf)
507 0 : .await
508 : {
509 0 : Ok(bytes_read) => {
510 0 : tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
511 0 : return Ok(buf);
512 : }
513 0 : Err(e) => {
514 0 : error!("Failed to stream object body for key {key}: {e}");
515 0 : let backoff_time = 1 << trial.min(5);
516 0 : tokio::time::sleep(Duration::from_secs(backoff_time)).await;
517 : }
518 : }
519 : }
520 :
521 0 : anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
522 0 : }
523 :
524 0 : async fn download_object_to_file_s3(
525 0 : s3_client: &Client,
526 0 : bucket_name: &str,
527 0 : key: &str,
528 0 : version_id: Option<&str>,
529 0 : local_path: &Utf8Path,
530 0 : ) -> anyhow::Result<()> {
531 0 : let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
532 0 : for _ in 0..MAX_RETRIES {
533 0 : tokio::fs::remove_file(&tmp_path)
534 0 : .await
535 0 : .or_else(fs_ext::ignore_not_found)?;
536 :
537 0 : let mut file = tokio::fs::File::create(&tmp_path)
538 0 : .await
539 0 : .context("Opening output file")?;
540 :
541 0 : let request = s3_client.get_object().bucket(bucket_name).key(key);
542 :
543 0 : let request = match version_id {
544 0 : Some(version_id) => request.version_id(version_id),
545 0 : None => request,
546 : };
547 :
548 0 : let response_stream = match request.send().await {
549 0 : Ok(response) => response,
550 0 : Err(e) => {
551 0 : error!(
552 0 : "Failed to download object for key {key} version {}: {e:#}",
553 0 : version_id.unwrap_or("")
554 : );
555 0 : tokio::time::sleep(Duration::from_secs(1)).await;
556 0 : continue;
557 : }
558 : };
559 :
560 0 : let mut read_stream = response_stream.body.into_async_read();
561 0 :
562 0 : tokio::io::copy(&mut read_stream, &mut file).await?;
563 :
564 0 : tokio::fs::rename(&tmp_path, local_path).await?;
565 0 : return Ok(());
566 : }
567 :
568 0 : anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
569 0 : }
|