Line data Source code
1 : //
2 : // Main entry point for the safekeeper executable
3 : //
4 : use std::fs::{self, File};
5 : use std::io::{ErrorKind, Write};
6 : use std::str::FromStr;
7 : use std::sync::Arc;
8 : use std::time::{Duration, Instant};
9 :
10 : use anyhow::{Context, Result, bail};
11 : use camino::{Utf8Path, Utf8PathBuf};
12 : use clap::{ArgAction, Parser};
13 : use futures::future::BoxFuture;
14 : use futures::stream::FuturesUnordered;
15 : use futures::{FutureExt, StreamExt};
16 : use http_utils::tls_certs::ReloadingCertificateResolver;
17 : use metrics::set_build_info_metric;
18 : use postgres_backend::AuthType;
19 : use remote_storage::RemoteStorageConfig;
20 : use safekeeper::defaults::{
21 : DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT,
22 : DEFAULT_GLOBAL_DISK_CHECK_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR,
23 : DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
24 : DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES, DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES,
25 : DEFAULT_PARTIAL_BACKUP_CONCURRENCY, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
26 : DEFAULT_SSL_CERT_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
27 : };
28 : use safekeeper::hadron;
29 : use safekeeper::wal_backup::WalBackup;
30 : use safekeeper::{
31 : BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
32 : WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service,
33 : };
34 : use sd_notify::NotifyState;
35 : use storage_broker::{DEFAULT_ENDPOINT, Uri};
36 : use tokio::runtime::Handle;
37 : use tokio::signal::unix::{SignalKind, signal};
38 : use tokio::task::JoinError;
39 : use tracing::*;
40 : use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
41 : use utils::id::NodeId;
42 : use utils::logging::{self, LogFormat, SecretString};
43 : use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR};
44 : use utils::sentry_init::init_sentry;
45 : use utils::{pid_file, project_build_tag, project_git_version, tcp_listener};
46 :
47 : use safekeeper::hadron::{
48 : GLOBAL_DISK_LIMIT_EXCEEDED, get_filesystem_capacity, get_filesystem_usage,
49 : };
50 : use safekeeper::metrics::GLOBAL_DISK_UTIL_CHECK_SECONDS;
51 : use std::sync::atomic::Ordering;
52 :
53 : #[global_allocator]
54 : static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
55 :
56 : /// Configure jemalloc to profile heap allocations by sampling stack traces every 2 MB (1 << 21).
57 : /// This adds roughly 3% overhead for allocations on average, which is acceptable considering
58 : /// performance-sensitive code will avoid allocations as far as possible anyway.
59 : #[allow(non_upper_case_globals)]
60 : #[unsafe(export_name = "malloc_conf")]
61 : pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
62 :
63 : const PID_FILE_NAME: &str = "safekeeper.pid";
64 : const ID_FILE_NAME: &str = "safekeeper.id";
65 :
66 : project_git_version!(GIT_VERSION);
67 : project_build_tag!(BUILD_TAG);
68 :
69 : const FEATURES: &[&str] = &[
70 : #[cfg(feature = "testing")]
71 : "testing",
72 : ];
73 :
74 0 : fn version() -> String {
75 0 : format!(
76 0 : "{GIT_VERSION} failpoints: {}, features: {:?}",
77 0 : fail::has_failpoints(),
78 : FEATURES,
79 : )
80 0 : }
81 :
82 : const ABOUT: &str = r#"
83 : A fleet of safekeepers is responsible for reliably storing WAL received from
84 : compute, passing it through consensus (mitigating potential computes brain
85 : split), and serving the hardened part further downstream to pageserver(s).
86 : "#;
87 :
88 : #[derive(Parser)]
89 : #[command(name = "Neon safekeeper", version = GIT_VERSION, about = ABOUT, long_about = None)]
90 : struct Args {
91 : /// Path to the safekeeper data directory.
92 : #[arg(short = 'D', long, default_value = "./")]
93 : datadir: Utf8PathBuf,
94 : /// Safekeeper node id.
95 : #[arg(long)]
96 : id: Option<u64>,
97 : /// Initialize safekeeper with given id and exit.
98 : #[arg(long)]
99 : init: bool,
100 : /// Listen endpoint for receiving/sending WAL in the form host:port.
101 : #[arg(short, long, default_value = DEFAULT_PG_LISTEN_ADDR)]
102 : listen_pg: String,
103 : /// Listen endpoint for receiving/sending WAL in the form host:port allowing
104 : /// only tenant scoped auth tokens. Pointless if auth is disabled.
105 : #[arg(long, default_value = None, verbatim_doc_comment)]
106 : listen_pg_tenant_only: Option<String>,
107 : /// Listen http endpoint for management and metrics in the form host:port.
108 : #[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)]
109 : listen_http: String,
110 : /// Listen https endpoint for management and metrics in the form host:port.
111 : #[arg(long, default_value = None)]
112 : listen_https: Option<String>,
113 : /// Advertised endpoint to PS for receiving/sending WAL in the form host:port. If not
114 : /// specified, listen_pg is used to advertise instead.
115 : #[arg(long, default_value = None)]
116 : advertise_pg: Option<String>,
117 : /// Advertised endpoint to compute for receiving/sending WAL in the form host:port.
118 : /// Required if --hcc-base-url is specified.
119 : // TODO(vlad): pull in hcc-base-url too
120 : #[arg(long, default_value = None)]
121 : advertise_pg_tenant_only: Option<String>,
122 : /// Availability zone of the safekeeper.
123 : #[arg(long)]
124 : availability_zone: Option<String>,
125 : /// Do not wait for changes to be written safely to disk. Unsafe.
126 : #[arg(short, long)]
127 : no_sync: bool,
128 : /// Dump control file at path specified by this argument and exit.
129 : #[arg(long)]
130 : dump_control_file: Option<Utf8PathBuf>,
131 : /// Broker endpoint for storage nodes coordination in the form
132 : /// http[s]://host:port. In case of https schema TLS is connection is
133 : /// established; plaintext otherwise.
134 : #[arg(long, default_value = DEFAULT_ENDPOINT, verbatim_doc_comment)]
135 : broker_endpoint: Uri,
136 : /// Broker keepalive interval.
137 : #[arg(long, value_parser= humantime::parse_duration, default_value = storage_broker::DEFAULT_KEEPALIVE_INTERVAL)]
138 : broker_keepalive_interval: Duration,
139 : /// Peer safekeeper is considered dead after not receiving heartbeats from
140 : /// it during this period passed as a human readable duration.
141 : #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)]
142 : heartbeat_timeout: Duration,
143 : /// Enable/disable peer recovery.
144 : #[arg(long, default_value = "false", action=ArgAction::Set)]
145 : peer_recovery: bool,
146 : /// Remote storage configuration for WAL backup (offloading to s3) as TOML
147 : /// inline table, e.g.
148 : /// {max_concurrent_syncs = 17, max_sync_errors = 13, bucket_name = "<BUCKETNAME>", bucket_region = "<REGION>", concurrency_limit = 119}
149 : /// Safekeeper offloads WAL to
150 : /// [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring
151 : /// structure on the file system.
152 : #[arg(long, value_parser = parse_remote_storage, verbatim_doc_comment)]
153 : remote_storage: Option<RemoteStorageConfig>,
154 : /// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes
155 : #[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)]
156 : max_offloader_lag: u64,
157 : /* BEGIN_HADRON */
158 : /// Safekeeper will re-elect a new offloader if the current backup lagging for more than this value in bytes
159 : #[arg(long, default_value_t = DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES)]
160 : max_reelect_offloader_lag_bytes: u64,
161 : /// Safekeeper will stop accepting new WALs if the timeline disk usage exceeds this value in bytes.
162 : /// Setting this value to 0 disables the limit.
163 : #[arg(long, default_value_t = DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES)]
164 : max_timeline_disk_usage_bytes: u64,
165 : /* END_HADRON */
166 : /// Number of max parallel WAL segments to be offloaded to remote storage.
167 : #[arg(long, default_value = "5")]
168 : wal_backup_parallel_jobs: usize,
169 : /// Disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring
170 : /// WAL backup horizon.
171 : #[arg(long)]
172 : disable_wal_backup: bool,
173 : /// Token authentication type. Allowed values are "NeonJWT" and "HadronJWT". Any specified value only takes effect if
174 : /// --pg-auth-public-key-path, --pg-tenant-only-auth-public-key-path, or --http-auth-public-key-path is specified.
175 : /// NeonJWT: Decoding keys are loaded from plain public key files in the specified key path.
176 : /// HadronJWT: Decoding keys are loaded from X509 certificates in the specified key path.
177 : #[arg(long, verbatim_doc_comment, default_value = "NeonJWT")]
178 : token_auth_type: AuthType,
179 : /// If given, enables auth on incoming connections to WAL service endpoint
180 : /// (--listen-pg). Value specifies path to a .pem public key used for
181 : /// validations of JWT tokens. Empty string is allowed and means disabling
182 : /// auth.
183 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
184 : pg_auth_public_key_path: Option<Utf8PathBuf>,
185 : /// If given, enables auth on incoming connections to tenant only WAL
186 : /// service endpoint (--listen-pg-tenant-only). Value specifies path to a
187 : /// .pem public key used for validations of JWT tokens. Empty string is
188 : /// allowed and means disabling auth.
189 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
190 : pg_tenant_only_auth_public_key_path: Option<Utf8PathBuf>,
191 : /// If given, enables auth on incoming connections to http management
192 : /// service endpoint (--listen-http). Value specifies path to a .pem public
193 : /// key used for validations of JWT tokens. Empty string is allowed and
194 : /// means disabling auth.
195 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
196 : http_auth_public_key_path: Option<Utf8PathBuf>,
197 : /// Format for logging, either 'plain' or 'json'.
198 : #[arg(long, default_value = "plain")]
199 : log_format: String,
200 : /// Run everything in single threaded current thread runtime, might be
201 : /// useful for debugging.
202 : #[arg(long)]
203 : current_thread_runtime: bool,
204 : /// Keep horizon for walsenders, i.e. don't remove WAL segments that are
205 : /// still needed for existing replication connection.
206 : #[arg(long)]
207 : walsenders_keep_horizon: bool,
208 : /// Controls how long backup will wait until uploading the partial segment.
209 : #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_PARTIAL_BACKUP_TIMEOUT, verbatim_doc_comment)]
210 : partial_backup_timeout: Duration,
211 : /// Disable task to push messages to broker every second. Supposed to
212 : /// be used in tests.
213 : #[arg(long)]
214 : disable_periodic_broker_push: bool,
215 : /// Enable automatic switching to offloaded state.
216 : #[arg(long)]
217 : enable_offload: bool,
218 : /// Delete local WAL files after offloading. When disabled, they will be left on disk.
219 : #[arg(long)]
220 : delete_offloaded_wal: bool,
221 : /// Pending updates to control file will be automatically saved after this interval.
222 : #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_CONTROL_FILE_SAVE_INTERVAL)]
223 : control_file_save_interval: Duration,
224 : /// Number of allowed concurrent uploads of partial segments to remote storage.
225 : #[arg(long, default_value = DEFAULT_PARTIAL_BACKUP_CONCURRENCY)]
226 : partial_backup_concurrency: usize,
227 : /// How long a timeline must be resident before it is eligible for eviction.
228 : /// Usually, timeline eviction has to wait for `partial_backup_timeout` before being eligible for eviction,
229 : /// but if a timeline is un-evicted and then _not_ written to, it would immediately flap to evicting again,
230 : /// if it weren't for `eviction_min_resident` preventing that.
231 : ///
232 : /// Also defines interval for eviction retries.
233 : #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_EVICTION_MIN_RESIDENT)]
234 : eviction_min_resident: Duration,
235 : /// Enable fanning out WAL to different shards from the same reader
236 : #[arg(long)]
237 : wal_reader_fanout: bool,
238 : /// Only fan out the WAL reader if the absoulte delta between the new requested position
239 : /// and the current position of the reader is smaller than this value.
240 : #[arg(long)]
241 : max_delta_for_fanout: Option<u64>,
242 : /// Path to a file with certificate's private key for https API.
243 : #[arg(long, default_value = DEFAULT_SSL_KEY_FILE)]
244 : ssl_key_file: Utf8PathBuf,
245 : /// Path to a file with a X509 certificate for https API.
246 : #[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
247 : ssl_cert_file: Utf8PathBuf,
248 : /// Period to reload certificate and private key from files.
249 : #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
250 : ssl_cert_reload_period: Duration,
251 : /// Trusted root CA certificates to use in https APIs.
252 : #[arg(long)]
253 : ssl_ca_file: Option<Utf8PathBuf>,
254 : /// Flag to use https for requests to peer's safekeeper API.
255 : #[arg(long)]
256 : use_https_safekeeper_api: bool,
257 : /// Path to the JWT auth token used to authenticate with other safekeepers.
258 : #[arg(long)]
259 : auth_token_path: Option<Utf8PathBuf>,
260 :
261 : /// Enable TLS in WAL service API.
262 : /// Does not force TLS: the client negotiates TLS usage during the handshake.
263 : /// Uses key and certificate from ssl_key_file/ssl_cert_file.
264 : #[arg(long)]
265 : enable_tls_wal_service_api: bool,
266 :
267 : /// Controls whether to collect all metrics on each scrape or to return potentially stale
268 : /// results.
269 : #[arg(long, default_value_t = true)]
270 : force_metric_collection_on_scrape: bool,
271 :
272 : /// Run in development mode (disables security checks)
273 : #[arg(long, help = "Run in development mode (disables security checks)")]
274 : dev: bool,
275 : /* BEGIN_HADRON */
276 : #[arg(long)]
277 : enable_pull_timeline_on_startup: bool,
278 : /// How often to scan entire data-dir for total disk usage
279 : #[arg(long, value_parser=humantime::parse_duration, default_value = DEFAULT_GLOBAL_DISK_CHECK_INTERVAL)]
280 : global_disk_check_interval: Duration,
281 : /// The portion of the filesystem capacity that can be used by all timelines.
282 : /// A circuit breaker will trip and reject all WAL writes if the total usage
283 : /// exceeds this ratio.
284 : /// Set to 0 to disable the global disk usage limit.
285 : #[arg(long, default_value_t = DEFAULT_MAX_GLOBAL_DISK_USAGE_RATIO)]
286 : max_global_disk_usage_ratio: f64,
287 : /* END_HADRON */
288 : }
289 :
290 : // Like PathBufValueParser, but allows empty string.
291 0 : fn opt_pathbuf_parser(s: &str) -> Result<Utf8PathBuf, String> {
292 0 : Ok(Utf8PathBuf::from_str(s).unwrap())
293 0 : }
294 :
295 : #[tokio::main(flavor = "current_thread")]
296 0 : async fn main() -> anyhow::Result<()> {
297 : // We want to allow multiple occurences of the same arg (taking the last) so
298 : // that neon_local could generate command with defaults + overrides without
299 : // getting 'argument cannot be used multiple times' error. This seems to be
300 : // impossible with pure Derive API, so convert struct to Command, modify it,
301 : // parse arguments, and then fill the struct back.
302 0 : let cmd = <Args as clap::CommandFactory>::command()
303 0 : .args_override_self(true)
304 0 : .version(version());
305 0 : let mut matches = cmd.get_matches();
306 0 : let mut args = <Args as clap::FromArgMatches>::from_arg_matches_mut(&mut matches)?;
307 :
308 : // I failed to modify opt_pathbuf_parser to return Option<PathBuf> in
309 : // reasonable time, so turn empty string into option post factum.
310 0 : if let Some(pb) = &args.pg_auth_public_key_path {
311 0 : if pb.as_os_str().is_empty() {
312 0 : args.pg_auth_public_key_path = None;
313 0 : }
314 0 : }
315 0 : if let Some(pb) = &args.pg_tenant_only_auth_public_key_path {
316 0 : if pb.as_os_str().is_empty() {
317 0 : args.pg_tenant_only_auth_public_key_path = None;
318 0 : }
319 0 : }
320 0 : if let Some(pb) = &args.http_auth_public_key_path {
321 0 : if pb.as_os_str().is_empty() {
322 0 : args.http_auth_public_key_path = None;
323 0 : }
324 0 : }
325 :
326 0 : if let Some(addr) = args.dump_control_file {
327 0 : let state = control_file::FileStorage::load_control_file(addr)?;
328 0 : let json = serde_json::to_string(&state)?;
329 0 : print!("{json}");
330 0 : return Ok(());
331 0 : }
332 :
333 : // important to keep the order of:
334 : // 1. init logging
335 : // 2. tracing panic hook
336 : // 3. sentry
337 0 : logging::init(
338 0 : LogFormat::from_config(&args.log_format)?,
339 0 : logging::TracingErrorLayerEnablement::Disabled,
340 0 : logging::Output::Stdout,
341 0 : )?;
342 0 : logging::replace_panic_hook_with_tracing_panic_hook().forget();
343 0 : info!("version: {GIT_VERSION}");
344 0 : info!("buld_tag: {BUILD_TAG}");
345 :
346 0 : let args_workdir = &args.datadir;
347 0 : let workdir = args_workdir.canonicalize_utf8().with_context(|| {
348 0 : format!("Failed to get the absolute path for input workdir {args_workdir:?}")
349 0 : })?;
350 :
351 : // Change into the data directory.
352 0 : std::env::set_current_dir(&workdir)?;
353 :
354 : // Prevent running multiple safekeepers on the same directory
355 0 : let lock_file_path = workdir.join(PID_FILE_NAME);
356 0 : let lock_file =
357 0 : pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
358 0 : info!("claimed pid file at {lock_file_path:?}");
359 : // ensure that the lock file is held even if the main thread of the process is panics
360 : // we need to release the lock file only when the current process is gone
361 0 : std::mem::forget(lock_file);
362 :
363 : // Set or read our ID.
364 0 : let id = set_id(&workdir, args.id.map(NodeId))?;
365 0 : if args.init {
366 0 : return Ok(());
367 0 : }
368 :
369 0 : let pg_auth = match args.pg_auth_public_key_path.as_ref() {
370 : None => {
371 0 : info!("pg auth is disabled");
372 0 : None
373 : }
374 0 : Some(path) => {
375 0 : info!("loading pg auth JWT key from {path}");
376 0 : match args.token_auth_type {
377 0 : AuthType::NeonJWT => Some(Arc::new(
378 0 : JwtAuth::from_key_path(path).context("failed to load the auth key")?,
379 : )),
380 0 : AuthType::HadronJWT => Some(Arc::new(
381 0 : JwtAuth::from_cert_path(path)
382 0 : .context("failed to load auth keys from certificates")?,
383 : )),
384 0 : _ => panic!(
385 0 : "AuthType {auth_type} is not allowed when --pg-auth-public-key-path is specified",
386 : auth_type = args.token_auth_type
387 : ),
388 : }
389 : }
390 : };
391 0 : let pg_tenant_only_auth = match args.pg_tenant_only_auth_public_key_path.as_ref() {
392 : None => {
393 0 : info!("pg tenant only auth is disabled");
394 0 : None
395 : }
396 0 : Some(path) => {
397 0 : info!("loading pg tenant only auth JWT key from {path}");
398 0 : match args.token_auth_type {
399 0 : AuthType::NeonJWT => Some(Arc::new(
400 0 : JwtAuth::from_key_path(path).context("failed to load the auth key")?,
401 : )),
402 0 : AuthType::HadronJWT => Some(Arc::new(
403 0 : JwtAuth::from_cert_path(path)
404 0 : .context("failed to load auth keys from certificates")?,
405 : )),
406 0 : _ => panic!(
407 0 : "AuthType {auth_type} is not allowed when --pg-tenant-only-auth-public-key-path is specified",
408 : auth_type = args.token_auth_type
409 : ),
410 : }
411 : }
412 : };
413 0 : let http_auth = match args.http_auth_public_key_path.as_ref() {
414 : None => {
415 0 : info!("http auth is disabled");
416 0 : None
417 : }
418 0 : Some(path) => {
419 0 : info!("loading http auth JWT key(s) from {path}");
420 0 : let jwt_auth = match args.token_auth_type {
421 : AuthType::NeonJWT => {
422 0 : JwtAuth::from_key_path(path).context("failed to load the auth key")?
423 : }
424 0 : AuthType::HadronJWT => JwtAuth::from_cert_path(path)
425 0 : .context("failed to load auth keys from certificates")?,
426 0 : _ => panic!(
427 0 : "AuthType {auth_type} is not allowed when --http-auth-public-key-path is specified",
428 : auth_type = args.token_auth_type
429 : ),
430 : };
431 0 : Some(Arc::new(SwappableJwtAuth::new(jwt_auth)))
432 : }
433 : };
434 :
435 : // Load JWT auth token to connect to other safekeepers for pull_timeline.
436 0 : let sk_auth_token = if let Some(auth_token_path) = args.auth_token_path.as_ref() {
437 0 : info!("loading JWT token for authentication with safekeepers from {auth_token_path}");
438 0 : let auth_token = tokio::fs::read_to_string(auth_token_path).await?;
439 0 : Some(SecretString::from(auth_token.trim().to_owned()))
440 : } else {
441 0 : info!("no JWT token for authentication with safekeepers detected");
442 0 : None
443 : };
444 :
445 0 : let ssl_ca_certs = match args.ssl_ca_file.as_ref() {
446 0 : Some(ssl_ca_file) => {
447 0 : tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
448 0 : let buf = tokio::fs::read(ssl_ca_file).await?;
449 0 : pem::parse_many(&buf)?
450 0 : .into_iter()
451 0 : .filter(|pem| pem.tag() == "CERTIFICATE")
452 0 : .collect()
453 : }
454 0 : None => Vec::new(),
455 : };
456 :
457 0 : let conf = Arc::new(SafeKeeperConf {
458 0 : workdir,
459 0 : my_id: id,
460 0 : listen_pg_addr: args.listen_pg,
461 0 : listen_pg_addr_tenant_only: args.listen_pg_tenant_only,
462 0 : listen_http_addr: args.listen_http,
463 0 : listen_https_addr: args.listen_https,
464 0 : advertise_pg_addr: args.advertise_pg,
465 0 : availability_zone: args.availability_zone,
466 0 : no_sync: args.no_sync,
467 0 : broker_endpoint: args.broker_endpoint,
468 0 : broker_keepalive_interval: args.broker_keepalive_interval,
469 0 : heartbeat_timeout: args.heartbeat_timeout,
470 0 : peer_recovery_enabled: args.peer_recovery,
471 0 : remote_storage: args.remote_storage,
472 0 : max_offloader_lag_bytes: args.max_offloader_lag,
473 0 : /* BEGIN_HADRON */
474 0 : max_reelect_offloader_lag_bytes: args.max_reelect_offloader_lag_bytes,
475 0 : max_timeline_disk_usage_bytes: args.max_timeline_disk_usage_bytes,
476 0 : /* END_HADRON */
477 0 : wal_backup_enabled: !args.disable_wal_backup,
478 0 : backup_parallel_jobs: args.wal_backup_parallel_jobs,
479 0 : auth_type: args.token_auth_type,
480 0 : pg_auth,
481 0 : pg_tenant_only_auth,
482 0 : http_auth,
483 0 : sk_auth_token,
484 0 : current_thread_runtime: args.current_thread_runtime,
485 0 : walsenders_keep_horizon: args.walsenders_keep_horizon,
486 0 : partial_backup_timeout: args.partial_backup_timeout,
487 0 : disable_periodic_broker_push: args.disable_periodic_broker_push,
488 0 : enable_offload: args.enable_offload,
489 0 : delete_offloaded_wal: args.delete_offloaded_wal,
490 0 : control_file_save_interval: args.control_file_save_interval,
491 0 : partial_backup_concurrency: args.partial_backup_concurrency,
492 0 : eviction_min_resident: args.eviction_min_resident,
493 0 : wal_reader_fanout: args.wal_reader_fanout,
494 0 : max_delta_for_fanout: args.max_delta_for_fanout,
495 0 : ssl_key_file: args.ssl_key_file,
496 0 : ssl_cert_file: args.ssl_cert_file,
497 0 : ssl_cert_reload_period: args.ssl_cert_reload_period,
498 0 : ssl_ca_certs,
499 0 : use_https_safekeeper_api: args.use_https_safekeeper_api,
500 0 : enable_tls_wal_service_api: args.enable_tls_wal_service_api,
501 0 : force_metric_collection_on_scrape: args.force_metric_collection_on_scrape,
502 0 : /* BEGIN_HADRON */
503 0 : advertise_pg_addr_tenant_only: args.advertise_pg_tenant_only,
504 0 : enable_pull_timeline_on_startup: args.enable_pull_timeline_on_startup,
505 0 : hcc_base_url: None,
506 0 : global_disk_check_interval: args.global_disk_check_interval,
507 0 : max_global_disk_usage_ratio: args.max_global_disk_usage_ratio,
508 0 : /* END_HADRON */
509 0 : });
510 :
511 : // initialize sentry if SENTRY_DSN is provided
512 0 : let _sentry_guard = init_sentry(
513 0 : Some(GIT_VERSION.into()),
514 0 : &[("node_id", &conf.my_id.to_string())],
515 : );
516 0 : start_safekeeper(conf).await
517 0 : }
518 :
519 : /// Result of joining any of main tasks: upper error means task failed to
520 : /// complete, e.g. panicked, inner is error produced by task itself.
521 : type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
522 :
523 0 : async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
524 : // fsync the datadir to make sure we have a consistent state on disk.
525 0 : if !conf.no_sync {
526 0 : let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
527 0 : let started = Instant::now();
528 0 : utils::crashsafe::syncfs(dfd)?;
529 0 : let elapsed = started.elapsed();
530 0 : info!(
531 0 : elapsed_ms = elapsed.as_millis(),
532 0 : "syncfs data directory done"
533 : );
534 0 : }
535 :
536 0 : info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
537 0 : let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
538 0 : error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
539 0 : e
540 0 : })?;
541 :
542 0 : let pg_listener_tenant_only =
543 0 : if let Some(listen_pg_addr_tenant_only) = &conf.listen_pg_addr_tenant_only {
544 0 : info!(
545 0 : "starting safekeeper tenant scoped WAL service on {}",
546 : listen_pg_addr_tenant_only
547 : );
548 0 : let listener = tcp_listener::bind(listen_pg_addr_tenant_only.clone()).map_err(|e| {
549 0 : error!(
550 0 : "failed to bind to address {}: {}",
551 : listen_pg_addr_tenant_only, e
552 : );
553 0 : e
554 0 : })?;
555 0 : Some(listener)
556 : } else {
557 0 : None
558 : };
559 :
560 0 : info!(
561 0 : "starting safekeeper HTTP service on {}",
562 0 : conf.listen_http_addr
563 : );
564 0 : let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
565 0 : error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
566 0 : e
567 0 : })?;
568 :
569 0 : let https_listener = match conf.listen_https_addr.as_ref() {
570 0 : Some(listen_https_addr) => {
571 0 : info!("starting safekeeper HTTPS service on {}", listen_https_addr);
572 0 : Some(tcp_listener::bind(listen_https_addr).map_err(|e| {
573 0 : error!("failed to bind to address {}: {}", listen_https_addr, e);
574 0 : e
575 0 : })?)
576 : }
577 0 : None => None,
578 : };
579 :
580 0 : let wal_backup = Arc::new(WalBackup::new(&conf).await?);
581 :
582 0 : let global_timelines = Arc::new(GlobalTimelines::new(conf.clone(), wal_backup.clone()));
583 :
584 : // Register metrics collector for active timelines. It's important to do this
585 : // after daemonizing, otherwise process collector will be upset.
586 0 : let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone());
587 0 : metrics::register_internal(Box::new(timeline_collector))?;
588 :
589 : // Keep handles to main tasks to die if any of them disappears.
590 0 : let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
591 0 : FuturesUnordered::new();
592 :
593 : // Start wal backup launcher before loading timelines as we'll notify it
594 : // through the channel about timelines which need offloading, not draining
595 : // the channel would cause deadlock.
596 0 : let current_thread_rt = conf
597 0 : .current_thread_runtime
598 0 : .then(|| Handle::try_current().expect("no runtime in main"));
599 :
600 : // Load all timelines from disk to memory.
601 0 : global_timelines.init().await?;
602 :
603 : /* BEGIN_HADRON */
604 0 : if conf.enable_pull_timeline_on_startup && global_timelines.timelines_count() == 0 {
605 0 : match hadron::hcc_pull_timelines(&conf, global_timelines.clone()).await {
606 : Ok(_) => {
607 0 : info!("Successfully pulled all timelines from peer safekeepers");
608 : }
609 0 : Err(e) => {
610 0 : error!("Failed to pull timelines from peer safekeepers: {:?}", e);
611 0 : return Err(e);
612 : }
613 : }
614 0 : }
615 : /* END_HADRON */
616 :
617 : // Run everything in current thread rt, if asked.
618 0 : if conf.current_thread_runtime {
619 0 : info!("running in current thread runtime");
620 0 : }
621 :
622 0 : let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_wal_service_api {
623 0 : let ssl_key_file = conf.ssl_key_file.clone();
624 0 : let ssl_cert_file = conf.ssl_cert_file.clone();
625 0 : let ssl_cert_reload_period = conf.ssl_cert_reload_period;
626 :
627 : // Create resolver in BACKGROUND_RUNTIME, so the background certificate reloading
628 : // task is run in this runtime.
629 0 : let cert_resolver = current_thread_rt
630 0 : .as_ref()
631 0 : .unwrap_or_else(|| BACKGROUND_RUNTIME.handle())
632 0 : .spawn(async move {
633 0 : ReloadingCertificateResolver::new(
634 0 : "main",
635 0 : &ssl_key_file,
636 0 : &ssl_cert_file,
637 0 : ssl_cert_reload_period,
638 0 : )
639 0 : .await
640 0 : })
641 0 : .await??;
642 :
643 0 : let config = rustls::ServerConfig::builder()
644 0 : .with_no_client_auth()
645 0 : .with_cert_resolver(cert_resolver);
646 :
647 0 : Some(Arc::new(config))
648 : } else {
649 0 : None
650 : };
651 :
652 0 : let wal_service_handle = current_thread_rt
653 0 : .as_ref()
654 0 : .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
655 0 : .spawn(wal_service::task_main(
656 0 : conf.clone(),
657 0 : pg_listener,
658 0 : Scope::SafekeeperData,
659 0 : conf.enable_tls_wal_service_api
660 0 : .then(|| tls_server_config.clone())
661 0 : .flatten(),
662 0 : global_timelines.clone(),
663 : ))
664 : // wrap with task name for error reporting
665 0 : .map(|res| ("WAL service main".to_owned(), res));
666 0 : tasks_handles.push(Box::pin(wal_service_handle));
667 :
668 0 : let global_timelines_ = global_timelines.clone();
669 0 : let timeline_housekeeping_handle = current_thread_rt
670 0 : .as_ref()
671 0 : .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
672 0 : .spawn(async move {
673 : const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
674 : loop {
675 0 : tokio::time::sleep(TOMBSTONE_TTL).await;
676 0 : global_timelines_.housekeeping(&TOMBSTONE_TTL);
677 : }
678 : })
679 0 : .map(|res| ("Timeline map housekeeping".to_owned(), res));
680 0 : tasks_handles.push(Box::pin(timeline_housekeeping_handle));
681 :
682 : /* BEGIN_HADRON */
683 : // Spawn global disk usage watcher task, if a global disk usage limit is specified.
684 0 : let interval = conf.global_disk_check_interval;
685 0 : let data_dir = conf.workdir.clone();
686 : // Use the safekeeper data directory to compute filesystem capacity. This only runs once on startup, so
687 : // there is little point to continue if we can't have the proper protections in place.
688 0 : let fs_capacity_bytes = get_filesystem_capacity(data_dir.as_std_path())
689 0 : .expect("Failed to get filesystem capacity for data directory");
690 0 : let limit: u64 = (conf.max_global_disk_usage_ratio * fs_capacity_bytes as f64) as u64;
691 0 : if limit > 0 {
692 0 : let disk_usage_watch_handle = BACKGROUND_RUNTIME
693 0 : .handle()
694 0 : .spawn(async move {
695 : // Use Tokio interval to preserve fixed cadence between filesystem utilization checks
696 0 : let mut ticker = tokio::time::interval(interval);
697 0 : ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
698 :
699 : loop {
700 0 : ticker.tick().await;
701 0 : let data_dir_clone = data_dir.clone();
702 0 : let check_start = Instant::now();
703 :
704 0 : let usage = tokio::task::spawn_blocking(move || {
705 0 : get_filesystem_usage(data_dir_clone.as_std_path())
706 0 : })
707 0 : .await
708 0 : .unwrap_or(0);
709 :
710 0 : let elapsed = check_start.elapsed().as_secs_f64();
711 0 : GLOBAL_DISK_UTIL_CHECK_SECONDS.observe(elapsed);
712 0 : if usage > limit {
713 0 : warn!(
714 0 : "Global disk usage exceeded limit. Usage: {} bytes, limit: {} bytes",
715 : usage, limit
716 : );
717 0 : }
718 0 : GLOBAL_DISK_LIMIT_EXCEEDED.store(usage > limit, Ordering::Relaxed);
719 : }
720 : })
721 0 : .map(|res| ("Global disk usage watcher".to_string(), res));
722 0 : tasks_handles.push(Box::pin(disk_usage_watch_handle));
723 0 : }
724 : /* END_HADRON */
725 0 : if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
726 0 : let wal_service_handle = current_thread_rt
727 0 : .as_ref()
728 0 : .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
729 0 : .spawn(wal_service::task_main(
730 0 : conf.clone(),
731 0 : pg_listener_tenant_only,
732 0 : Scope::Tenant,
733 0 : conf.enable_tls_wal_service_api
734 0 : .then(|| tls_server_config.clone())
735 0 : .flatten(),
736 0 : global_timelines.clone(),
737 : ))
738 : // wrap with task name for error reporting
739 0 : .map(|res| ("WAL service tenant only main".to_owned(), res));
740 0 : tasks_handles.push(Box::pin(wal_service_handle));
741 0 : }
742 :
743 0 : let http_handle = current_thread_rt
744 0 : .as_ref()
745 0 : .unwrap_or_else(|| HTTP_RUNTIME.handle())
746 0 : .spawn(http::task_main_http(
747 0 : conf.clone(),
748 0 : http_listener,
749 0 : global_timelines.clone(),
750 : ))
751 0 : .map(|res| ("HTTP service main".to_owned(), res));
752 0 : tasks_handles.push(Box::pin(http_handle));
753 :
754 0 : if let Some(https_listener) = https_listener {
755 0 : let https_handle = current_thread_rt
756 0 : .as_ref()
757 0 : .unwrap_or_else(|| HTTP_RUNTIME.handle())
758 0 : .spawn(http::task_main_https(
759 0 : conf.clone(),
760 0 : https_listener,
761 0 : tls_server_config.expect("tls_server_config is set earlier if https is enabled"),
762 0 : global_timelines.clone(),
763 : ))
764 0 : .map(|res| ("HTTPS service main".to_owned(), res));
765 0 : tasks_handles.push(Box::pin(https_handle));
766 0 : }
767 :
768 0 : let broker_task_handle = current_thread_rt
769 0 : .as_ref()
770 0 : .unwrap_or_else(|| BROKER_RUNTIME.handle())
771 0 : .spawn(
772 0 : broker::task_main(conf.clone(), global_timelines.clone())
773 0 : .instrument(info_span!("broker")),
774 : )
775 0 : .map(|res| ("broker main".to_owned(), res));
776 0 : tasks_handles.push(Box::pin(broker_task_handle));
777 :
778 : /* BEGIN_HADRON */
779 0 : if conf.force_metric_collection_on_scrape {
780 0 : let metrics_handle = current_thread_rt
781 0 : .as_ref()
782 0 : .unwrap_or_else(|| BACKGROUND_RUNTIME.handle())
783 0 : .spawn(async move {
784 0 : let mut interval: tokio::time::Interval =
785 0 : tokio::time::interval(METRICS_COLLECTION_INTERVAL);
786 : loop {
787 0 : interval.tick().await;
788 0 : tokio::task::spawn_blocking(|| {
789 0 : METRICS_COLLECTOR.run_once(true);
790 0 : });
791 : }
792 : })
793 0 : .map(|res| ("broker main".to_owned(), res));
794 0 : tasks_handles.push(Box::pin(metrics_handle));
795 0 : }
796 : /* END_HADRON */
797 :
798 0 : set_build_info_metric(GIT_VERSION, BUILD_TAG);
799 :
800 : // TODO: update tokio-stream, convert to real async Stream with
801 : // SignalStream, map it to obtain missing signal name, combine streams into
802 : // single stream we can easily sit on.
803 0 : let mut sigquit_stream = signal(SignalKind::quit())?;
804 0 : let mut sigint_stream = signal(SignalKind::interrupt())?;
805 0 : let mut sigterm_stream = signal(SignalKind::terminate())?;
806 :
807 : // Notify systemd that we are ready. This is important as currently loading
808 : // timelines takes significant time (~30s in busy regions).
809 0 : if let Err(e) = sd_notify::notify(true, &[NotifyState::Ready]) {
810 0 : warn!("systemd notify failed: {:?}", e);
811 0 : }
812 :
813 0 : tokio::select! {
814 0 : Some((task_name, res)) = tasks_handles.next()=> {
815 0 : error!("{} task failed: {:?}, exiting", task_name, res);
816 0 : std::process::exit(1);
817 : }
818 : // On any shutdown signal, log receival and exit. Additionally, handling
819 : // SIGQUIT prevents coredump.
820 0 : _ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"),
821 0 : _ = sigint_stream.recv() => info!("received SIGINT, terminating"),
822 0 : _ = sigterm_stream.recv() => info!("received SIGTERM, terminating")
823 :
824 : };
825 0 : std::process::exit(0);
826 0 : }
827 :
828 : /// Determine safekeeper id.
829 0 : fn set_id(workdir: &Utf8Path, given_id: Option<NodeId>) -> Result<NodeId> {
830 0 : let id_file_path = workdir.join(ID_FILE_NAME);
831 :
832 : let my_id: NodeId;
833 : // If file with ID exists, read it in; otherwise set one passed.
834 0 : match fs::read(&id_file_path) {
835 0 : Ok(id_serialized) => {
836 : my_id = NodeId(
837 0 : std::str::from_utf8(&id_serialized)
838 0 : .context("failed to parse safekeeper id")?
839 0 : .parse()
840 0 : .context("failed to parse safekeeper id")?,
841 : );
842 0 : if let Some(given_id) = given_id {
843 0 : if given_id != my_id {
844 0 : bail!(
845 0 : "safekeeper already initialized with id {}, can't set {}",
846 : my_id,
847 : given_id
848 : );
849 0 : }
850 0 : }
851 0 : info!("safekeeper ID {}", my_id);
852 : }
853 0 : Err(error) => match error.kind() {
854 : ErrorKind::NotFound => {
855 0 : my_id = if let Some(given_id) = given_id {
856 0 : given_id
857 : } else {
858 0 : bail!("safekeeper id is not specified");
859 : };
860 0 : let mut f = File::create(&id_file_path)
861 0 : .with_context(|| format!("Failed to create id file at {id_file_path:?}"))?;
862 0 : f.write_all(my_id.to_string().as_bytes())?;
863 0 : f.sync_all()?;
864 0 : info!("initialized safekeeper id {}", my_id);
865 : }
866 : _ => {
867 0 : return Err(error.into());
868 : }
869 : },
870 : }
871 0 : Ok(my_id)
872 0 : }
873 :
874 0 : fn parse_remote_storage(storage_conf: &str) -> anyhow::Result<RemoteStorageConfig> {
875 0 : RemoteStorageConfig::from_toml(&storage_conf.parse()?)
876 0 : }
877 :
878 : #[test]
879 1 : fn verify_cli() {
880 : use clap::CommandFactory;
881 1 : Args::command().debug_assert()
882 1 : }
|