Line data Source code
1 : #![deny(unsafe_code)]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 : pub mod checks;
4 : pub mod cloud_admin_api;
5 : pub mod find_large_objects;
6 : pub mod garbage;
7 : pub mod metadata_stream;
8 : pub mod pageserver_physical_gc;
9 : pub mod scan_pageserver_metadata;
10 : pub mod scan_safekeeper_metadata;
11 : pub mod tenant_snapshot;
12 :
13 : use std::env;
14 : use std::fmt::Display;
15 : use std::time::{Duration, SystemTime};
16 :
17 : use anyhow::Context;
18 : use camino::{Utf8Path, Utf8PathBuf};
19 : use clap::ValueEnum;
20 : use futures::{Stream, StreamExt};
21 : use pageserver::tenant::TENANTS_SEGMENT_NAME;
22 : use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_timeline_path};
23 : use pageserver_api::shard::TenantShardId;
24 : use remote_storage::{
25 : DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig,
26 : RemoteStorageKind, VersionId,
27 : };
28 : use reqwest::Url;
29 : use serde::{Deserialize, Serialize};
30 : use storage_controller_client::control_api;
31 : use tokio::io::AsyncReadExt;
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::{error, warn};
34 : use tracing_appender::non_blocking::WorkerGuard;
35 : use tracing_subscriber::prelude::*;
36 : use tracing_subscriber::{EnvFilter, fmt};
37 : use utils::fs_ext;
38 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
39 :
40 : const MAX_RETRIES: usize = 20;
41 : const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
42 :
43 : #[derive(Debug, Clone)]
44 : pub struct S3Target {
45 : pub desc_str: String,
46 : /// This `prefix_in_bucket` is only equal to the PS/SK config of the same
47 : /// name for the RootTarget: other instances of S3Target will have prefix_in_bucket
48 : /// with extra parts.
49 : pub prefix_in_bucket: String,
50 : pub delimiter: String,
51 : }
52 :
53 : /// Convenience for referring to timelines within a particular shard: more ergonomic
54 : /// than using a 2-tuple.
55 : ///
56 : /// This is the shard-aware equivalent of TenantTimelineId. It's defined here rather
57 : /// than somewhere more broadly exposed, because this kind of thing is rarely needed
58 : /// in the pageserver, as all timeline objects existing in the scope of a particular
59 : /// tenant: the scrubber is different in that it handles collections of data referring to many
60 : /// TenantShardTimelineIds in on place.
61 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
62 : pub struct TenantShardTimelineId {
63 : tenant_shard_id: TenantShardId,
64 : timeline_id: TimelineId,
65 : }
66 :
67 : impl TenantShardTimelineId {
68 0 : fn new(tenant_shard_id: TenantShardId, timeline_id: TimelineId) -> Self {
69 0 : Self {
70 0 : tenant_shard_id,
71 0 : timeline_id,
72 0 : }
73 0 : }
74 :
75 0 : fn as_tenant_timeline_id(&self) -> TenantTimelineId {
76 0 : TenantTimelineId::new(self.tenant_shard_id.tenant_id, self.timeline_id)
77 0 : }
78 : }
79 :
80 : impl Display for TenantShardTimelineId {
81 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 0 : write!(f, "{}/{}", self.tenant_shard_id, self.timeline_id)
83 0 : }
84 : }
85 :
86 : #[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
87 : pub enum TraversingDepth {
88 : Tenant,
89 : Timeline,
90 : }
91 :
92 : impl Display for TraversingDepth {
93 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 0 : f.write_str(match self {
95 0 : Self::Tenant => "tenant",
96 0 : Self::Timeline => "timeline",
97 : })
98 0 : }
99 : }
100 :
101 0 : #[derive(ValueEnum, Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)]
102 : pub enum NodeKind {
103 : Safekeeper,
104 : Pageserver,
105 : }
106 :
107 : impl NodeKind {
108 0 : fn as_str(&self) -> &'static str {
109 0 : match self {
110 0 : Self::Safekeeper => "safekeeper",
111 0 : Self::Pageserver => "pageserver",
112 : }
113 0 : }
114 : }
115 :
116 : impl Display for NodeKind {
117 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 0 : f.write_str(self.as_str())
119 0 : }
120 : }
121 :
122 : impl S3Target {
123 0 : pub fn with_sub_segment(&self, new_segment: &str) -> Self {
124 0 : let mut new_self = self.clone();
125 0 : if new_self.prefix_in_bucket.is_empty() {
126 0 : new_self.prefix_in_bucket = format!("/{}/", new_segment);
127 0 : } else {
128 0 : if new_self.prefix_in_bucket.ends_with('/') {
129 0 : new_self.prefix_in_bucket.pop();
130 0 : }
131 0 : new_self.prefix_in_bucket =
132 0 : [&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
133 : }
134 0 : new_self
135 0 : }
136 : }
137 :
138 : #[derive(Clone)]
139 : pub enum RootTarget {
140 : Pageserver(S3Target),
141 : Safekeeper(S3Target),
142 : }
143 :
144 : impl RootTarget {
145 0 : pub fn tenants_root(&self) -> S3Target {
146 0 : match self {
147 0 : Self::Pageserver(root) => root.with_sub_segment(TENANTS_SEGMENT_NAME),
148 0 : Self::Safekeeper(root) => root.clone(),
149 : }
150 0 : }
151 :
152 0 : pub fn tenant_root(&self, tenant_id: &TenantShardId) -> S3Target {
153 0 : match self {
154 0 : Self::Pageserver(_) => self.tenants_root().with_sub_segment(&tenant_id.to_string()),
155 0 : Self::Safekeeper(_) => self
156 0 : .tenants_root()
157 0 : .with_sub_segment(&tenant_id.tenant_id.to_string()),
158 : }
159 0 : }
160 :
161 0 : pub(crate) fn tenant_shards_prefix(&self, tenant_id: &TenantId) -> S3Target {
162 0 : // Only pageserver remote storage contains tenant-shards
163 0 : assert!(matches!(self, Self::Pageserver(_)));
164 0 : let Self::Pageserver(root) = self else {
165 0 : panic!();
166 : };
167 :
168 0 : S3Target {
169 0 : desc_str: root.desc_str.clone(),
170 0 : prefix_in_bucket: format!(
171 0 : "{}/{TENANTS_SEGMENT_NAME}/{tenant_id}",
172 0 : root.prefix_in_bucket
173 0 : ),
174 0 : delimiter: root.delimiter.clone(),
175 0 : }
176 0 : }
177 :
178 0 : pub fn timelines_root(&self, tenant_id: &TenantShardId) -> S3Target {
179 0 : match self {
180 0 : Self::Pageserver(_) => self.tenant_root(tenant_id).with_sub_segment("timelines"),
181 0 : Self::Safekeeper(_) => self.tenant_root(tenant_id),
182 : }
183 0 : }
184 :
185 0 : pub fn timeline_root(&self, id: &TenantShardTimelineId) -> S3Target {
186 0 : self.timelines_root(&id.tenant_shard_id)
187 0 : .with_sub_segment(&id.timeline_id.to_string())
188 0 : }
189 :
190 : /// Given RemotePath "tenants/foo/timelines/bar/layerxyz", prefix it to a literal
191 : /// key in the S3 bucket.
192 0 : pub fn absolute_key(&self, key: &RemotePath) -> String {
193 0 : let root = match self {
194 0 : Self::Pageserver(root) => root,
195 0 : Self::Safekeeper(root) => root,
196 : };
197 :
198 0 : let prefix = &root.prefix_in_bucket;
199 0 : if prefix.ends_with('/') {
200 0 : format!("{prefix}{key}")
201 : } else {
202 0 : format!("{prefix}/{key}")
203 : }
204 0 : }
205 :
206 0 : pub fn desc_str(&self) -> &str {
207 0 : match self {
208 0 : Self::Pageserver(root) => &root.desc_str,
209 0 : Self::Safekeeper(root) => &root.desc_str,
210 : }
211 0 : }
212 :
213 0 : pub fn delimiter(&self) -> &str {
214 0 : match self {
215 0 : Self::Pageserver(root) => &root.delimiter,
216 0 : Self::Safekeeper(root) => &root.delimiter,
217 : }
218 0 : }
219 : }
220 :
221 0 : pub fn remote_timeline_path_id(id: &TenantShardTimelineId) -> RemotePath {
222 0 : remote_timeline_path(&id.tenant_shard_id, &id.timeline_id)
223 0 : }
224 :
225 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
226 : #[serde(deny_unknown_fields)]
227 : pub struct BucketConfig(RemoteStorageConfig);
228 :
229 : impl BucketConfig {
230 0 : pub fn from_env() -> anyhow::Result<Self> {
231 0 : if let Ok(legacy) = Self::from_env_legacy() {
232 0 : return Ok(legacy);
233 0 : }
234 0 : let config_toml =
235 0 : env::var("REMOTE_STORAGE_CONFIG").context("'REMOTE_STORAGE_CONFIG' retrieval")?;
236 0 : let remote_config = RemoteStorageConfig::from_toml_str(&config_toml)?;
237 0 : Ok(BucketConfig(remote_config))
238 0 : }
239 :
240 0 : fn from_env_legacy() -> anyhow::Result<Self> {
241 0 : let bucket_region = env::var("REGION").context("'REGION' param retrieval")?;
242 0 : let bucket_name = env::var("BUCKET").context("'BUCKET' param retrieval")?;
243 0 : let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
244 0 : let endpoint = env::var("AWS_ENDPOINT_URL").ok();
245 0 : // Create a json object which we then deserialize so that we don't
246 0 : // have to repeat all of the S3Config fields.
247 0 : let s3_config_json = serde_json::json!({
248 0 : "bucket_name": bucket_name,
249 0 : "bucket_region": bucket_region,
250 0 : "prefix_in_bucket": prefix_in_bucket,
251 0 : "endpoint": endpoint,
252 0 : });
253 0 : let config: RemoteStorageConfig = serde_json::from_value(s3_config_json)?;
254 0 : Ok(BucketConfig(config))
255 0 : }
256 0 : pub fn desc_str(&self) -> String {
257 0 : match &self.0.storage {
258 0 : RemoteStorageKind::LocalFs { local_path } => {
259 0 : format!("local path {local_path}")
260 : }
261 0 : RemoteStorageKind::AwsS3(config) => format!(
262 0 : "bucket {}, region {}",
263 0 : config.bucket_name, config.bucket_region
264 0 : ),
265 0 : RemoteStorageKind::AzureContainer(config) => format!(
266 0 : "container {}, storage account {:?}, region {}",
267 0 : config.container_name, config.storage_account, config.container_region
268 0 : ),
269 : }
270 0 : }
271 0 : pub fn bucket_name(&self) -> Option<&str> {
272 0 : self.0.storage.bucket_name()
273 0 : }
274 : }
275 :
276 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
277 : #[serde(deny_unknown_fields)]
278 : pub struct BucketConfigLegacy {
279 : pub region: String,
280 : pub bucket: String,
281 : pub prefix_in_bucket: Option<String>,
282 : }
283 :
284 : pub struct ControllerClientConfig {
285 : /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
286 : pub controller_api: Url,
287 :
288 : /// JWT token for authenticating with storage controller. Requires scope 'scrubber' or 'admin'.
289 : pub controller_jwt: String,
290 : }
291 :
292 : impl ControllerClientConfig {
293 0 : pub fn build_client(self, http_client: reqwest::Client) -> control_api::Client {
294 0 : control_api::Client::new(http_client, self.controller_api, Some(self.controller_jwt))
295 0 : }
296 : }
297 :
298 : pub struct ConsoleConfig {
299 : pub token: String,
300 : pub base_url: Url,
301 : }
302 :
303 : impl ConsoleConfig {
304 0 : pub fn from_env() -> anyhow::Result<Self> {
305 0 : let base_url: Url = env::var("CLOUD_ADMIN_API_URL")
306 0 : .context("'CLOUD_ADMIN_API_URL' param retrieval")?
307 0 : .parse()
308 0 : .context("'CLOUD_ADMIN_API_URL' param parsing")?;
309 :
310 0 : let token = env::var(CLOUD_ADMIN_API_TOKEN_ENV_VAR)
311 0 : .context("'CLOUD_ADMIN_API_TOKEN' environment variable fetch")?;
312 :
313 0 : Ok(Self { base_url, token })
314 0 : }
315 : }
316 :
317 0 : pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
318 0 : let stderr_logs = fmt::Layer::new()
319 0 : .with_target(false)
320 0 : .with_writer(std::io::stderr);
321 :
322 0 : let disable_file_logging = match std::env::var("PAGESERVER_DISABLE_FILE_LOGGING") {
323 0 : Ok(s) => s == "1" || s.to_lowercase() == "true",
324 0 : Err(_) => false,
325 : };
326 :
327 0 : if disable_file_logging {
328 0 : tracing_subscriber::registry()
329 0 : .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
330 0 : .with(stderr_logs)
331 0 : .init();
332 0 : None
333 : } else {
334 0 : let (file_writer, guard) =
335 0 : tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
336 0 : let file_logs = fmt::Layer::new()
337 0 : .with_target(false)
338 0 : .with_ansi(false)
339 0 : .with_writer(file_writer);
340 0 : tracing_subscriber::registry()
341 0 : .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
342 0 : .with(stderr_logs)
343 0 : .with(file_logs)
344 0 : .init();
345 0 : Some(guard)
346 : }
347 0 : }
348 :
349 0 : fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
350 0 : match node_kind {
351 0 : NodeKind::Pageserver => "pageserver/v1/",
352 0 : NodeKind::Safekeeper => "wal/",
353 : }
354 0 : }
355 :
356 0 : fn make_root_target(desc_str: String, prefix_in_bucket: String, node_kind: NodeKind) -> RootTarget {
357 0 : let s3_target = S3Target {
358 0 : desc_str,
359 0 : prefix_in_bucket,
360 0 : delimiter: "/".to_string(),
361 0 : };
362 0 : match node_kind {
363 0 : NodeKind::Pageserver => RootTarget::Pageserver(s3_target),
364 0 : NodeKind::Safekeeper => RootTarget::Safekeeper(s3_target),
365 : }
366 0 : }
367 :
368 0 : async fn init_remote(
369 0 : mut storage_config: BucketConfig,
370 0 : node_kind: NodeKind,
371 0 : ) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
372 0 : let desc_str = storage_config.desc_str();
373 0 :
374 0 : let default_prefix = default_prefix_in_bucket(node_kind).to_string();
375 0 :
376 0 : match &mut storage_config.0.storage {
377 0 : RemoteStorageKind::AwsS3(config) => {
378 0 : config.prefix_in_bucket.get_or_insert(default_prefix);
379 0 : }
380 0 : RemoteStorageKind::AzureContainer(config) => {
381 0 : config.prefix_in_container.get_or_insert(default_prefix);
382 0 : }
383 0 : RemoteStorageKind::LocalFs { .. } => (),
384 : }
385 :
386 : // We already pass the prefix to the remote client above
387 0 : let prefix_in_root_target = String::new();
388 0 : let root_target = make_root_target(desc_str, prefix_in_root_target, node_kind);
389 :
390 0 : let client = GenericRemoteStorage::from_config(&storage_config.0).await?;
391 0 : Ok((client, root_target))
392 0 : }
393 :
394 : /// Listing possibly large amounts of keys in a streaming fashion.
395 0 : fn stream_objects_with_retries<'a>(
396 0 : storage_client: &'a GenericRemoteStorage,
397 0 : listing_mode: ListingMode,
398 0 : s3_target: &'a S3Target,
399 0 : ) -> impl Stream<Item = Result<Listing, anyhow::Error>> + 'a {
400 0 : async_stream::stream! {
401 0 : let mut trial = 0;
402 0 : let cancel = CancellationToken::new();
403 0 : let prefix_str = &s3_target
404 0 : .prefix_in_bucket
405 0 : .strip_prefix("/")
406 0 : .unwrap_or(&s3_target.prefix_in_bucket);
407 0 : let prefix = RemotePath::from_string(prefix_str)?;
408 0 : let mut list_stream =
409 0 : storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel);
410 0 : while let Some(res) = list_stream.next().await {
411 0 : match res {
412 0 : Err(err) => {
413 0 : let yield_err = if err.is_permanent() {
414 0 : true
415 0 : } else {
416 0 : let backoff_time = 1 << trial.min(5);
417 0 : tokio::time::sleep(Duration::from_secs(backoff_time)).await;
418 0 : trial += 1;
419 0 : trial == MAX_RETRIES - 1
420 0 : };
421 0 : if yield_err {
422 0 : yield Err(err)
423 0 : .with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
424 0 : break;
425 0 : }
426 0 : }
427 0 : Ok(res) => {
428 0 : trial = 0;
429 0 : yield Ok(res);
430 0 : }
431 0 : }
432 0 : }
433 0 : }
434 0 : }
435 :
436 : /// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes,
437 : /// use [`stream_objects_with_retries`] instead.
438 0 : async fn list_objects_with_retries(
439 0 : remote_client: &GenericRemoteStorage,
440 0 : listing_mode: ListingMode,
441 0 : s3_target: &S3Target,
442 0 : ) -> anyhow::Result<Listing> {
443 0 : let cancel = CancellationToken::new();
444 0 : let prefix_str = &s3_target
445 0 : .prefix_in_bucket
446 0 : .strip_prefix("/")
447 0 : .unwrap_or(&s3_target.prefix_in_bucket);
448 0 : let prefix = RemotePath::from_string(prefix_str)?;
449 0 : for trial in 0..MAX_RETRIES {
450 0 : match remote_client
451 0 : .list(Some(&prefix), listing_mode, None, &cancel)
452 0 : .await
453 : {
454 0 : Ok(response) => return Ok(response),
455 0 : Err(e) => {
456 0 : if trial == MAX_RETRIES - 1 {
457 0 : return Err(e)
458 0 : .with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
459 0 : }
460 0 : warn!(
461 0 : "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
462 0 : remote_client.bucket_name().unwrap_or_default(),
463 : s3_target.prefix_in_bucket,
464 : s3_target.delimiter,
465 : e,
466 : );
467 0 : let backoff_time = 1 << trial.min(5);
468 0 : tokio::time::sleep(Duration::from_secs(backoff_time)).await;
469 : }
470 : }
471 : }
472 0 : panic!("MAX_RETRIES is not allowed to be 0");
473 0 : }
474 :
475 : /// Returns content, last modified time
476 0 : async fn download_object_with_retries(
477 0 : remote_client: &GenericRemoteStorage,
478 0 : key: &RemotePath,
479 0 : ) -> anyhow::Result<(Vec<u8>, SystemTime)> {
480 0 : let cancel = CancellationToken::new();
481 0 : for trial in 0..MAX_RETRIES {
482 0 : let mut buf = Vec::new();
483 0 : let download = match remote_client
484 0 : .download(key, &DownloadOpts::default(), &cancel)
485 0 : .await
486 : {
487 0 : Ok(response) => response,
488 0 : Err(e) => {
489 0 : error!("Failed to download object for key {key}: {e}");
490 0 : let backoff_time = 1 << trial.min(5);
491 0 : tokio::time::sleep(Duration::from_secs(backoff_time)).await;
492 0 : continue;
493 : }
494 : };
495 :
496 0 : match tokio_util::io::StreamReader::new(download.download_stream)
497 0 : .read_to_end(&mut buf)
498 0 : .await
499 : {
500 0 : Ok(bytes_read) => {
501 0 : tracing::debug!("Downloaded {bytes_read} bytes for object {key}");
502 0 : return Ok((buf, download.last_modified));
503 : }
504 0 : Err(e) => {
505 0 : error!("Failed to stream object body for key {key}: {e}");
506 0 : let backoff_time = 1 << trial.min(5);
507 0 : tokio::time::sleep(Duration::from_secs(backoff_time)).await;
508 : }
509 : }
510 : }
511 :
512 0 : anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
513 0 : }
514 :
515 0 : async fn download_object_to_file(
516 0 : remote_storage: &GenericRemoteStorage,
517 0 : key: &RemotePath,
518 0 : version_id: Option<VersionId>,
519 0 : local_path: &Utf8Path,
520 0 : ) -> anyhow::Result<()> {
521 0 : let opts = DownloadOpts {
522 0 : version_id: version_id.clone(),
523 0 : ..Default::default()
524 0 : };
525 0 : let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
526 0 : let cancel = CancellationToken::new();
527 0 : for _ in 0..MAX_RETRIES {
528 0 : tokio::fs::remove_file(&tmp_path)
529 0 : .await
530 0 : .or_else(fs_ext::ignore_not_found)?;
531 :
532 0 : let mut file = tokio::fs::File::create(&tmp_path)
533 0 : .await
534 0 : .context("Opening output file")?;
535 :
536 0 : let res = remote_storage.download(key, &opts, &cancel).await;
537 :
538 0 : let download = match res {
539 0 : Ok(response) => response,
540 0 : Err(e) => {
541 0 : error!(
542 0 : "Failed to download object for key {key} version {:?}: {e:#}",
543 0 : &version_id.as_ref().unwrap_or(&VersionId(String::new()))
544 : );
545 0 : tokio::time::sleep(Duration::from_secs(1)).await;
546 0 : continue;
547 : }
548 : };
549 :
550 : //response_stream.download_stream
551 :
552 0 : let mut body = tokio_util::io::StreamReader::new(download.download_stream);
553 0 : tokio::io::copy(&mut body, &mut file).await?;
554 :
555 0 : tokio::fs::rename(&tmp_path, local_path).await?;
556 0 : return Ok(());
557 : }
558 :
559 0 : anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
560 0 : }
|