Line data Source code
1 : //
2 : // Main entry point for the safekeeper executable
3 : //
4 : use std::env::{VarError, var};
5 : use std::fs::{self, File};
6 : use std::io::{ErrorKind, Write};
7 : use std::str::FromStr;
8 : use std::sync::Arc;
9 : use std::time::{Duration, Instant};
10 :
11 : use anyhow::{Context, Result, bail};
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use clap::{ArgAction, Parser};
14 : use futures::future::BoxFuture;
15 : use futures::stream::FuturesUnordered;
16 : use futures::{FutureExt, StreamExt};
17 : use http_utils::tls_certs::ReloadingCertificateResolver;
18 : use metrics::set_build_info_metric;
19 : use remote_storage::RemoteStorageConfig;
20 : use reqwest::Certificate;
21 : use safekeeper::defaults::{
22 : DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
23 : DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
24 : DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE,
25 : DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
26 : };
27 : use safekeeper::{
28 : BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
29 : WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service,
30 : };
31 : use sd_notify::NotifyState;
32 : use storage_broker::{DEFAULT_ENDPOINT, Uri};
33 : use tokio::runtime::Handle;
34 : use tokio::signal::unix::{SignalKind, signal};
35 : use tokio::task::JoinError;
36 : use tracing::*;
37 : use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
38 : use utils::id::NodeId;
39 : use utils::logging::{self, LogFormat, SecretString};
40 : use utils::sentry_init::init_sentry;
41 : use utils::{pid_file, project_build_tag, project_git_version, tcp_listener};
42 :
43 : #[global_allocator]
44 : static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
45 :
46 : /// Configure jemalloc to profile heap allocations by sampling stack traces every 2 MB (1 << 21).
47 : /// This adds roughly 3% overhead for allocations on average, which is acceptable considering
48 : /// performance-sensitive code will avoid allocations as far as possible anyway.
49 : #[allow(non_upper_case_globals)]
50 : #[unsafe(export_name = "malloc_conf")]
51 : pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:21\0";
52 :
53 : const PID_FILE_NAME: &str = "safekeeper.pid";
54 : const ID_FILE_NAME: &str = "safekeeper.id";
55 :
56 : project_git_version!(GIT_VERSION);
57 : project_build_tag!(BUILD_TAG);
58 :
59 : const FEATURES: &[&str] = &[
60 : #[cfg(feature = "testing")]
61 : "testing",
62 : ];
63 :
64 0 : fn version() -> String {
65 0 : format!(
66 0 : "{GIT_VERSION} failpoints: {}, features: {:?}",
67 0 : fail::has_failpoints(),
68 0 : FEATURES,
69 0 : )
70 0 : }
71 :
72 : const ABOUT: &str = r#"
73 : A fleet of safekeepers is responsible for reliably storing WAL received from
74 : compute, passing it through consensus (mitigating potential computes brain
75 : split), and serving the hardened part further downstream to pageserver(s).
76 : "#;
77 :
78 : #[derive(Parser)]
79 : #[command(name = "Neon safekeeper", version = GIT_VERSION, about = ABOUT, long_about = None)]
80 : struct Args {
81 : /// Path to the safekeeper data directory.
82 : #[arg(short = 'D', long, default_value = "./")]
83 0 : datadir: Utf8PathBuf,
84 : /// Safekeeper node id.
85 : #[arg(long)]
86 : id: Option<u64>,
87 : /// Initialize safekeeper with given id and exit.
88 : #[arg(long)]
89 0 : init: bool,
90 : /// Listen endpoint for receiving/sending WAL in the form host:port.
91 : #[arg(short, long, default_value = DEFAULT_PG_LISTEN_ADDR)]
92 0 : listen_pg: String,
93 : /// Listen endpoint for receiving/sending WAL in the form host:port allowing
94 : /// only tenant scoped auth tokens. Pointless if auth is disabled.
95 : #[arg(long, default_value = None, verbatim_doc_comment)]
96 : listen_pg_tenant_only: Option<String>,
97 : /// Listen http endpoint for management and metrics in the form host:port.
98 : #[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)]
99 0 : listen_http: String,
100 : /// Listen https endpoint for management and metrics in the form host:port.
101 : #[arg(long, default_value = None)]
102 : listen_https: Option<String>,
103 : /// Advertised endpoint for receiving/sending WAL in the form host:port. If not
104 : /// specified, listen_pg is used to advertise instead.
105 : #[arg(long, default_value = None)]
106 : advertise_pg: Option<String>,
107 : /// Availability zone of the safekeeper.
108 : #[arg(long)]
109 : availability_zone: Option<String>,
110 : /// Do not wait for changes to be written safely to disk. Unsafe.
111 : #[arg(short, long)]
112 0 : no_sync: bool,
113 : /// Dump control file at path specified by this argument and exit.
114 : #[arg(long)]
115 : dump_control_file: Option<Utf8PathBuf>,
116 : /// Broker endpoint for storage nodes coordination in the form
117 : /// http[s]://host:port. In case of https schema TLS is connection is
118 : /// established; plaintext otherwise.
119 : #[arg(long, default_value = DEFAULT_ENDPOINT, verbatim_doc_comment)]
120 0 : broker_endpoint: Uri,
121 : /// Broker keepalive interval.
122 : #[arg(long, value_parser= humantime::parse_duration, default_value = storage_broker::DEFAULT_KEEPALIVE_INTERVAL)]
123 0 : broker_keepalive_interval: Duration,
124 : /// Peer safekeeper is considered dead after not receiving heartbeats from
125 : /// it during this period passed as a human readable duration.
126 : #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT, verbatim_doc_comment)]
127 0 : heartbeat_timeout: Duration,
128 : /// Enable/disable peer recovery.
129 : #[arg(long, default_value = "false", action=ArgAction::Set)]
130 0 : peer_recovery: bool,
131 : /// Remote storage configuration for WAL backup (offloading to s3) as TOML
132 : /// inline table, e.g.
133 : /// {max_concurrent_syncs = 17, max_sync_errors = 13, bucket_name = "<BUCKETNAME>", bucket_region = "<REGION>", concurrency_limit = 119}
134 : /// Safekeeper offloads WAL to
135 : /// [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring
136 : /// structure on the file system.
137 : #[arg(long, value_parser = parse_remote_storage, verbatim_doc_comment)]
138 : remote_storage: Option<RemoteStorageConfig>,
139 : /// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes
140 1 : #[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)]
141 0 : max_offloader_lag: u64,
142 : /// Number of max parallel WAL segments to be offloaded to remote storage.
143 : #[arg(long, default_value = "5")]
144 0 : wal_backup_parallel_jobs: usize,
145 : /// Disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring
146 : /// WAL backup horizon.
147 : #[arg(long)]
148 0 : disable_wal_backup: bool,
149 : /// If given, enables auth on incoming connections to WAL service endpoint
150 : /// (--listen-pg). Value specifies path to a .pem public key used for
151 : /// validations of JWT tokens. Empty string is allowed and means disabling
152 : /// auth.
153 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
154 : pg_auth_public_key_path: Option<Utf8PathBuf>,
155 : /// If given, enables auth on incoming connections to tenant only WAL
156 : /// service endpoint (--listen-pg-tenant-only). Value specifies path to a
157 : /// .pem public key used for validations of JWT tokens. Empty string is
158 : /// allowed and means disabling auth.
159 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
160 : pg_tenant_only_auth_public_key_path: Option<Utf8PathBuf>,
161 : /// If given, enables auth on incoming connections to http management
162 : /// service endpoint (--listen-http). Value specifies path to a .pem public
163 : /// key used for validations of JWT tokens. Empty string is allowed and
164 : /// means disabling auth.
165 : #[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
166 : http_auth_public_key_path: Option<Utf8PathBuf>,
167 : /// Format for logging, either 'plain' or 'json'.
168 : #[arg(long, default_value = "plain")]
169 0 : log_format: String,
170 : /// Run everything in single threaded current thread runtime, might be
171 : /// useful for debugging.
172 : #[arg(long)]
173 0 : current_thread_runtime: bool,
174 : /// Keep horizon for walsenders, i.e. don't remove WAL segments that are
175 : /// still needed for existing replication connection.
176 : #[arg(long)]
177 0 : walsenders_keep_horizon: bool,
178 : /// Controls how long backup will wait until uploading the partial segment.
179 : #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_PARTIAL_BACKUP_TIMEOUT, verbatim_doc_comment)]
180 0 : partial_backup_timeout: Duration,
181 : /// Disable task to push messages to broker every second. Supposed to
182 : /// be used in tests.
183 : #[arg(long)]
184 0 : disable_periodic_broker_push: bool,
185 : /// Enable automatic switching to offloaded state.
186 : #[arg(long)]
187 0 : enable_offload: bool,
188 : /// Delete local WAL files after offloading. When disabled, they will be left on disk.
189 : #[arg(long)]
190 0 : delete_offloaded_wal: bool,
191 : /// Pending updates to control file will be automatically saved after this interval.
192 : #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_CONTROL_FILE_SAVE_INTERVAL)]
193 0 : control_file_save_interval: Duration,
194 : /// Number of allowed concurrent uploads of partial segments to remote storage.
195 : #[arg(long, default_value = DEFAULT_PARTIAL_BACKUP_CONCURRENCY)]
196 0 : partial_backup_concurrency: usize,
197 : /// How long a timeline must be resident before it is eligible for eviction.
198 : /// Usually, timeline eviction has to wait for `partial_backup_timeout` before being eligible for eviction,
199 : /// but if a timeline is un-evicted and then _not_ written to, it would immediately flap to evicting again,
200 : /// if it weren't for `eviction_min_resident` preventing that.
201 : ///
202 : /// Also defines interval for eviction retries.
203 : #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_EVICTION_MIN_RESIDENT)]
204 0 : eviction_min_resident: Duration,
205 : /// Enable fanning out WAL to different shards from the same reader
206 : #[arg(long)]
207 0 : wal_reader_fanout: bool,
208 : /// Only fan out the WAL reader if the absoulte delta between the new requested position
209 : /// and the current position of the reader is smaller than this value.
210 : #[arg(long)]
211 : max_delta_for_fanout: Option<u64>,
212 : /// Path to a file with certificate's private key for https API.
213 : #[arg(long, default_value = DEFAULT_SSL_KEY_FILE)]
214 0 : ssl_key_file: Utf8PathBuf,
215 : /// Path to a file with a X509 certificate for https API.
216 : #[arg(long, default_value = DEFAULT_SSL_CERT_FILE)]
217 0 : ssl_cert_file: Utf8PathBuf,
218 : /// Period to reload certificate and private key from files.
219 : #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
220 0 : ssl_cert_reload_period: Duration,
221 : /// Trusted root CA certificates to use in https APIs.
222 : #[arg(long)]
223 : ssl_ca_file: Option<Utf8PathBuf>,
224 : /// Flag to use https for requests to peer's safekeeper API.
225 : #[arg(long)]
226 0 : use_https_safekeeper_api: bool,
227 : /// Path to the JWT auth token used to authenticate with other safekeepers.
228 : #[arg(long)]
229 : auth_token_path: Option<Utf8PathBuf>,
230 : /// Enable TLS in wall service API.
231 : /// Does not force TLS: the client negotiates TLS usage during the handshake.
232 : /// Uses key and certificate from ssl_key_file/ssl_cert_file.
233 : #[arg(long)]
234 0 : enable_tls_wall_service_api: bool,
235 : }
236 :
237 : // Like PathBufValueParser, but allows empty string.
238 0 : fn opt_pathbuf_parser(s: &str) -> Result<Utf8PathBuf, String> {
239 0 : Ok(Utf8PathBuf::from_str(s).unwrap())
240 0 : }
241 :
242 : #[tokio::main(flavor = "current_thread")]
243 0 : async fn main() -> anyhow::Result<()> {
244 0 : // We want to allow multiple occurences of the same arg (taking the last) so
245 0 : // that neon_local could generate command with defaults + overrides without
246 0 : // getting 'argument cannot be used multiple times' error. This seems to be
247 0 : // impossible with pure Derive API, so convert struct to Command, modify it,
248 0 : // parse arguments, and then fill the struct back.
249 0 : let cmd = <Args as clap::CommandFactory>::command()
250 0 : .args_override_self(true)
251 0 : .version(version());
252 0 : let mut matches = cmd.get_matches();
253 0 : let mut args = <Args as clap::FromArgMatches>::from_arg_matches_mut(&mut matches)?;
254 0 :
255 0 : // I failed to modify opt_pathbuf_parser to return Option<PathBuf> in
256 0 : // reasonable time, so turn empty string into option post factum.
257 0 : if let Some(pb) = &args.pg_auth_public_key_path {
258 0 : if pb.as_os_str().is_empty() {
259 0 : args.pg_auth_public_key_path = None;
260 0 : }
261 0 : }
262 0 : if let Some(pb) = &args.pg_tenant_only_auth_public_key_path {
263 0 : if pb.as_os_str().is_empty() {
264 0 : args.pg_tenant_only_auth_public_key_path = None;
265 0 : }
266 0 : }
267 0 : if let Some(pb) = &args.http_auth_public_key_path {
268 0 : if pb.as_os_str().is_empty() {
269 0 : args.http_auth_public_key_path = None;
270 0 : }
271 0 : }
272 0 :
273 0 : if let Some(addr) = args.dump_control_file {
274 0 : let state = control_file::FileStorage::load_control_file(addr)?;
275 0 : let json = serde_json::to_string(&state)?;
276 0 : print!("{json}");
277 0 : return Ok(());
278 0 : }
279 0 :
280 0 : // important to keep the order of:
281 0 : // 1. init logging
282 0 : // 2. tracing panic hook
283 0 : // 3. sentry
284 0 : logging::init(
285 0 : LogFormat::from_config(&args.log_format)?,
286 0 : logging::TracingErrorLayerEnablement::Disabled,
287 0 : logging::Output::Stdout,
288 0 : )?;
289 0 : logging::replace_panic_hook_with_tracing_panic_hook().forget();
290 0 : info!("version: {GIT_VERSION}");
291 0 : info!("buld_tag: {BUILD_TAG}");
292 0 :
293 0 : let args_workdir = &args.datadir;
294 0 : let workdir = args_workdir.canonicalize_utf8().with_context(|| {
295 0 : format!("Failed to get the absolute path for input workdir {args_workdir:?}")
296 0 : })?;
297 0 :
298 0 : // Change into the data directory.
299 0 : std::env::set_current_dir(&workdir)?;
300 0 :
301 0 : // Prevent running multiple safekeepers on the same directory
302 0 : let lock_file_path = workdir.join(PID_FILE_NAME);
303 0 : let lock_file =
304 0 : pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
305 0 : info!("claimed pid file at {lock_file_path:?}");
306 0 : // ensure that the lock file is held even if the main thread of the process is panics
307 0 : // we need to release the lock file only when the current process is gone
308 0 : std::mem::forget(lock_file);
309 0 :
310 0 : // Set or read our ID.
311 0 : let id = set_id(&workdir, args.id.map(NodeId))?;
312 0 : if args.init {
313 0 : return Ok(());
314 0 : }
315 0 :
316 0 : let pg_auth = match args.pg_auth_public_key_path.as_ref() {
317 0 : None => {
318 0 : info!("pg auth is disabled");
319 0 : None
320 0 : }
321 0 : Some(path) => {
322 0 : info!("loading pg auth JWT key from {path}");
323 0 : Some(Arc::new(
324 0 : JwtAuth::from_key_path(path).context("failed to load the auth key")?,
325 0 : ))
326 0 : }
327 0 : };
328 0 : let pg_tenant_only_auth = match args.pg_tenant_only_auth_public_key_path.as_ref() {
329 0 : None => {
330 0 : info!("pg tenant only auth is disabled");
331 0 : None
332 0 : }
333 0 : Some(path) => {
334 0 : info!("loading pg tenant only auth JWT key from {path}");
335 0 : Some(Arc::new(
336 0 : JwtAuth::from_key_path(path).context("failed to load the auth key")?,
337 0 : ))
338 0 : }
339 0 : };
340 0 : let http_auth = match args.http_auth_public_key_path.as_ref() {
341 0 : None => {
342 0 : info!("http auth is disabled");
343 0 : None
344 0 : }
345 0 : Some(path) => {
346 0 : info!("loading http auth JWT key(s) from {path}");
347 0 : let jwt_auth = JwtAuth::from_key_path(path).context("failed to load the auth key")?;
348 0 : Some(Arc::new(SwappableJwtAuth::new(jwt_auth)))
349 0 : }
350 0 : };
351 0 :
352 0 : // Load JWT auth token to connect to other safekeepers for pull_timeline.
353 0 : // First check if the env var is present, then check the arg with the path.
354 0 : // We want to deprecate and remove the env var method in the future.
355 0 : let sk_auth_token = match var("SAFEKEEPER_AUTH_TOKEN") {
356 0 : Ok(v) => {
357 0 : info!("loaded JWT token for authentication with safekeepers");
358 0 : Some(SecretString::from(v))
359 0 : }
360 0 : Err(VarError::NotPresent) => {
361 0 : if let Some(auth_token_path) = args.auth_token_path.as_ref() {
362 0 : info!(
363 0 : "loading JWT token for authentication with safekeepers from {auth_token_path}"
364 0 : );
365 0 : let auth_token = tokio::fs::read_to_string(auth_token_path).await?;
366 0 : Some(SecretString::from(auth_token.trim().to_owned()))
367 0 : } else {
368 0 : info!("no JWT token for authentication with safekeepers detected");
369 0 : None
370 0 : }
371 0 : }
372 0 : Err(_) => {
373 0 : warn!("JWT token for authentication with safekeepers is not unicode");
374 0 : None
375 0 : }
376 0 : };
377 0 :
378 0 : let ssl_ca_certs = match args.ssl_ca_file.as_ref() {
379 0 : Some(ssl_ca_file) => {
380 0 : tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
381 0 : let buf = tokio::fs::read(ssl_ca_file).await?;
382 0 : Certificate::from_pem_bundle(&buf)?
383 0 : }
384 0 : None => Vec::new(),
385 0 : };
386 0 :
387 0 : let conf = Arc::new(SafeKeeperConf {
388 0 : workdir,
389 0 : my_id: id,
390 0 : listen_pg_addr: args.listen_pg,
391 0 : listen_pg_addr_tenant_only: args.listen_pg_tenant_only,
392 0 : listen_http_addr: args.listen_http,
393 0 : listen_https_addr: args.listen_https,
394 0 : advertise_pg_addr: args.advertise_pg,
395 0 : availability_zone: args.availability_zone,
396 0 : no_sync: args.no_sync,
397 0 : broker_endpoint: args.broker_endpoint,
398 0 : broker_keepalive_interval: args.broker_keepalive_interval,
399 0 : heartbeat_timeout: args.heartbeat_timeout,
400 0 : peer_recovery_enabled: args.peer_recovery,
401 0 : remote_storage: args.remote_storage,
402 0 : max_offloader_lag_bytes: args.max_offloader_lag,
403 0 : wal_backup_enabled: !args.disable_wal_backup,
404 0 : backup_parallel_jobs: args.wal_backup_parallel_jobs,
405 0 : pg_auth,
406 0 : pg_tenant_only_auth,
407 0 : http_auth,
408 0 : sk_auth_token,
409 0 : current_thread_runtime: args.current_thread_runtime,
410 0 : walsenders_keep_horizon: args.walsenders_keep_horizon,
411 0 : partial_backup_timeout: args.partial_backup_timeout,
412 0 : disable_periodic_broker_push: args.disable_periodic_broker_push,
413 0 : enable_offload: args.enable_offload,
414 0 : delete_offloaded_wal: args.delete_offloaded_wal,
415 0 : control_file_save_interval: args.control_file_save_interval,
416 0 : partial_backup_concurrency: args.partial_backup_concurrency,
417 0 : eviction_min_resident: args.eviction_min_resident,
418 0 : wal_reader_fanout: args.wal_reader_fanout,
419 0 : max_delta_for_fanout: args.max_delta_for_fanout,
420 0 : ssl_key_file: args.ssl_key_file,
421 0 : ssl_cert_file: args.ssl_cert_file,
422 0 : ssl_cert_reload_period: args.ssl_cert_reload_period,
423 0 : ssl_ca_certs,
424 0 : use_https_safekeeper_api: args.use_https_safekeeper_api,
425 0 : enable_tls_wall_service_api: args.enable_tls_wall_service_api,
426 0 : });
427 0 :
428 0 : // initialize sentry if SENTRY_DSN is provided
429 0 : let _sentry_guard = init_sentry(
430 0 : Some(GIT_VERSION.into()),
431 0 : &[("node_id", &conf.my_id.to_string())],
432 0 : );
433 0 : start_safekeeper(conf).await
434 0 : }
435 :
436 : /// Result of joining any of main tasks: upper error means task failed to
437 : /// complete, e.g. panicked, inner is error produced by task itself.
438 : type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
439 :
440 0 : async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
441 0 : // fsync the datadir to make sure we have a consistent state on disk.
442 0 : if !conf.no_sync {
443 0 : let dfd = File::open(&conf.workdir).context("open datadir for syncfs")?;
444 0 : let started = Instant::now();
445 0 : utils::crashsafe::syncfs(dfd)?;
446 0 : let elapsed = started.elapsed();
447 0 : info!(
448 0 : elapsed_ms = elapsed.as_millis(),
449 0 : "syncfs data directory done"
450 : );
451 0 : }
452 :
453 0 : info!("starting safekeeper WAL service on {}", conf.listen_pg_addr);
454 0 : let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
455 0 : error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
456 0 : e
457 0 : })?;
458 :
459 0 : let pg_listener_tenant_only =
460 0 : if let Some(listen_pg_addr_tenant_only) = &conf.listen_pg_addr_tenant_only {
461 0 : info!(
462 0 : "starting safekeeper tenant scoped WAL service on {}",
463 : listen_pg_addr_tenant_only
464 : );
465 0 : let listener = tcp_listener::bind(listen_pg_addr_tenant_only.clone()).map_err(|e| {
466 0 : error!(
467 0 : "failed to bind to address {}: {}",
468 : listen_pg_addr_tenant_only, e
469 : );
470 0 : e
471 0 : })?;
472 0 : Some(listener)
473 : } else {
474 0 : None
475 : };
476 :
477 0 : info!(
478 0 : "starting safekeeper HTTP service on {}",
479 0 : conf.listen_http_addr
480 : );
481 0 : let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
482 0 : error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
483 0 : e
484 0 : })?;
485 :
486 0 : let https_listener = match conf.listen_https_addr.as_ref() {
487 0 : Some(listen_https_addr) => {
488 0 : info!("starting safekeeper HTTPS service on {}", listen_https_addr);
489 0 : Some(tcp_listener::bind(listen_https_addr).map_err(|e| {
490 0 : error!("failed to bind to address {}: {}", listen_https_addr, e);
491 0 : e
492 0 : })?)
493 : }
494 0 : None => None,
495 : };
496 :
497 0 : let global_timelines = Arc::new(GlobalTimelines::new(conf.clone()));
498 0 :
499 0 : // Register metrics collector for active timelines. It's important to do this
500 0 : // after daemonizing, otherwise process collector will be upset.
501 0 : let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone());
502 0 : metrics::register_internal(Box::new(timeline_collector))?;
503 :
504 0 : wal_backup::init_remote_storage(&conf).await;
505 :
506 : // Keep handles to main tasks to die if any of them disappears.
507 0 : let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
508 0 : FuturesUnordered::new();
509 0 :
510 0 : // Start wal backup launcher before loading timelines as we'll notify it
511 0 : // through the channel about timelines which need offloading, not draining
512 0 : // the channel would cause deadlock.
513 0 : let current_thread_rt = conf
514 0 : .current_thread_runtime
515 0 : .then(|| Handle::try_current().expect("no runtime in main"));
516 0 :
517 0 : // Load all timelines from disk to memory.
518 0 : global_timelines.init().await?;
519 :
520 : // Run everything in current thread rt, if asked.
521 0 : if conf.current_thread_runtime {
522 0 : info!("running in current thread runtime");
523 0 : }
524 :
525 0 : let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_wall_service_api
526 : {
527 0 : let ssl_key_file = conf.ssl_key_file.clone();
528 0 : let ssl_cert_file = conf.ssl_cert_file.clone();
529 0 : let ssl_cert_reload_period = conf.ssl_cert_reload_period;
530 :
531 : // Create resolver in BACKGROUND_RUNTIME, so the background certificate reloading
532 : // task is run in this runtime.
533 0 : let cert_resolver = current_thread_rt
534 0 : .as_ref()
535 0 : .unwrap_or_else(|| BACKGROUND_RUNTIME.handle())
536 0 : .spawn(async move {
537 0 : ReloadingCertificateResolver::new(
538 0 : "main",
539 0 : &ssl_key_file,
540 0 : &ssl_cert_file,
541 0 : ssl_cert_reload_period,
542 0 : )
543 0 : .await
544 0 : })
545 0 : .await??;
546 :
547 0 : let config = rustls::ServerConfig::builder()
548 0 : .with_no_client_auth()
549 0 : .with_cert_resolver(cert_resolver);
550 0 :
551 0 : Some(Arc::new(config))
552 : } else {
553 0 : None
554 : };
555 :
556 0 : let wal_service_handle = current_thread_rt
557 0 : .as_ref()
558 0 : .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
559 0 : .spawn(wal_service::task_main(
560 0 : conf.clone(),
561 0 : pg_listener,
562 0 : Scope::SafekeeperData,
563 0 : conf.enable_tls_wall_service_api
564 0 : .then(|| tls_server_config.clone())
565 0 : .flatten(),
566 0 : global_timelines.clone(),
567 0 : ))
568 0 : // wrap with task name for error reporting
569 0 : .map(|res| ("WAL service main".to_owned(), res));
570 0 : tasks_handles.push(Box::pin(wal_service_handle));
571 0 :
572 0 : let global_timelines_ = global_timelines.clone();
573 0 : let timeline_housekeeping_handle = current_thread_rt
574 0 : .as_ref()
575 0 : .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
576 0 : .spawn(async move {
577 : const TOMBSTONE_TTL: Duration = Duration::from_secs(3600 * 24);
578 : loop {
579 0 : tokio::time::sleep(TOMBSTONE_TTL).await;
580 0 : global_timelines_.housekeeping(&TOMBSTONE_TTL);
581 : }
582 0 : })
583 0 : .map(|res| ("Timeline map housekeeping".to_owned(), res));
584 0 : tasks_handles.push(Box::pin(timeline_housekeeping_handle));
585 :
586 0 : if let Some(pg_listener_tenant_only) = pg_listener_tenant_only {
587 0 : let wal_service_handle = current_thread_rt
588 0 : .as_ref()
589 0 : .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
590 0 : .spawn(wal_service::task_main(
591 0 : conf.clone(),
592 0 : pg_listener_tenant_only,
593 0 : Scope::Tenant,
594 0 : conf.enable_tls_wall_service_api
595 0 : .then(|| tls_server_config.clone())
596 0 : .flatten(),
597 0 : global_timelines.clone(),
598 0 : ))
599 0 : // wrap with task name for error reporting
600 0 : .map(|res| ("WAL service tenant only main".to_owned(), res));
601 0 : tasks_handles.push(Box::pin(wal_service_handle));
602 0 : }
603 :
604 0 : let http_handle = current_thread_rt
605 0 : .as_ref()
606 0 : .unwrap_or_else(|| HTTP_RUNTIME.handle())
607 0 : .spawn(http::task_main_http(
608 0 : conf.clone(),
609 0 : http_listener,
610 0 : global_timelines.clone(),
611 0 : ))
612 0 : .map(|res| ("HTTP service main".to_owned(), res));
613 0 : tasks_handles.push(Box::pin(http_handle));
614 :
615 0 : if let Some(https_listener) = https_listener {
616 0 : let https_handle = current_thread_rt
617 0 : .as_ref()
618 0 : .unwrap_or_else(|| HTTP_RUNTIME.handle())
619 0 : .spawn(http::task_main_https(
620 0 : conf.clone(),
621 0 : https_listener,
622 0 : tls_server_config.expect("tls_server_config is set earlier if https is enabled"),
623 0 : global_timelines.clone(),
624 0 : ))
625 0 : .map(|res| ("HTTPS service main".to_owned(), res));
626 0 : tasks_handles.push(Box::pin(https_handle));
627 0 : }
628 :
629 0 : let broker_task_handle = current_thread_rt
630 0 : .as_ref()
631 0 : .unwrap_or_else(|| BROKER_RUNTIME.handle())
632 0 : .spawn(
633 0 : broker::task_main(conf.clone(), global_timelines.clone())
634 0 : .instrument(info_span!("broker")),
635 : )
636 0 : .map(|res| ("broker main".to_owned(), res));
637 0 : tasks_handles.push(Box::pin(broker_task_handle));
638 0 :
639 0 : set_build_info_metric(GIT_VERSION, BUILD_TAG);
640 :
641 : // TODO: update tokio-stream, convert to real async Stream with
642 : // SignalStream, map it to obtain missing signal name, combine streams into
643 : // single stream we can easily sit on.
644 0 : let mut sigquit_stream = signal(SignalKind::quit())?;
645 0 : let mut sigint_stream = signal(SignalKind::interrupt())?;
646 0 : let mut sigterm_stream = signal(SignalKind::terminate())?;
647 :
648 : // Notify systemd that we are ready. This is important as currently loading
649 : // timelines takes significant time (~30s in busy regions).
650 0 : if let Err(e) = sd_notify::notify(true, &[NotifyState::Ready]) {
651 0 : warn!("systemd notify failed: {:?}", e);
652 0 : }
653 :
654 0 : tokio::select! {
655 0 : Some((task_name, res)) = tasks_handles.next()=> {
656 0 : error!("{} task failed: {:?}, exiting", task_name, res);
657 0 : std::process::exit(1);
658 : }
659 : // On any shutdown signal, log receival and exit. Additionally, handling
660 : // SIGQUIT prevents coredump.
661 0 : _ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"),
662 0 : _ = sigint_stream.recv() => info!("received SIGINT, terminating"),
663 0 : _ = sigterm_stream.recv() => info!("received SIGTERM, terminating")
664 :
665 : };
666 0 : std::process::exit(0);
667 0 : }
668 :
669 : /// Determine safekeeper id.
670 0 : fn set_id(workdir: &Utf8Path, given_id: Option<NodeId>) -> Result<NodeId> {
671 0 : let id_file_path = workdir.join(ID_FILE_NAME);
672 0 :
673 0 : let my_id: NodeId;
674 0 : // If file with ID exists, read it in; otherwise set one passed.
675 0 : match fs::read(&id_file_path) {
676 0 : Ok(id_serialized) => {
677 0 : my_id = NodeId(
678 0 : std::str::from_utf8(&id_serialized)
679 0 : .context("failed to parse safekeeper id")?
680 0 : .parse()
681 0 : .context("failed to parse safekeeper id")?,
682 : );
683 0 : if let Some(given_id) = given_id {
684 0 : if given_id != my_id {
685 0 : bail!(
686 0 : "safekeeper already initialized with id {}, can't set {}",
687 0 : my_id,
688 0 : given_id
689 0 : );
690 0 : }
691 0 : }
692 0 : info!("safekeeper ID {}", my_id);
693 : }
694 0 : Err(error) => match error.kind() {
695 : ErrorKind::NotFound => {
696 0 : my_id = if let Some(given_id) = given_id {
697 0 : given_id
698 : } else {
699 0 : bail!("safekeeper id is not specified");
700 : };
701 0 : let mut f = File::create(&id_file_path)
702 0 : .with_context(|| format!("Failed to create id file at {id_file_path:?}"))?;
703 0 : f.write_all(my_id.to_string().as_bytes())?;
704 0 : f.sync_all()?;
705 0 : info!("initialized safekeeper id {}", my_id);
706 : }
707 : _ => {
708 0 : return Err(error.into());
709 : }
710 : },
711 : }
712 0 : Ok(my_id)
713 0 : }
714 :
715 0 : fn parse_remote_storage(storage_conf: &str) -> anyhow::Result<RemoteStorageConfig> {
716 0 : RemoteStorageConfig::from_toml(&storage_conf.parse()?)
717 0 : }
718 :
719 : #[test]
720 1 : fn verify_cli() {
721 : use clap::CommandFactory;
722 1 : Args::command().debug_assert()
723 1 : }
|