Line data Source code
1 : //! Main entry point for the Page Server executable.
2 :
3 : use std::env::{var, VarError};
4 : use std::sync::Arc;
5 : use std::{env, ops::ControlFlow, path::Path, str::FromStr};
6 :
7 : use anyhow::{anyhow, Context};
8 : use clap::{Arg, ArgAction, Command};
9 :
10 : use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
11 : use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
12 : use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
13 : use pageserver::task_mgr::WALRECEIVER_RUNTIME;
14 : use pageserver::tenant::TenantSharedResources;
15 : use remote_storage::GenericRemoteStorage;
16 : use tokio::time::Instant;
17 : use tracing::*;
18 :
19 : use metrics::set_build_info_metric;
20 : use pageserver::{
21 : config::{defaults::*, PageServerConf},
22 : context::{DownloadBehavior, RequestContext},
23 : http, page_cache, page_service, task_mgr,
24 : task_mgr::TaskKind,
25 : task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
26 : tenant::mgr,
27 : virtual_file,
28 : };
29 : use postgres_backend::AuthType;
30 : use utils::logging::TracingErrorLayerEnablement;
31 : use utils::signals::ShutdownSignals;
32 : use utils::{
33 : auth::JwtAuth, logging, project_git_version, sentry_init::init_sentry, signals::Signal,
34 : tcp_listener,
35 : };
36 :
37 : project_git_version!(GIT_VERSION);
38 :
39 : const PID_FILE_NAME: &str = "pageserver.pid";
40 :
41 : const FEATURES: &[&str] = &[
42 : #[cfg(feature = "testing")]
43 : "testing",
44 : ];
45 :
46 1893 : fn version() -> String {
47 1893 : format!(
48 1893 : "{GIT_VERSION} failpoints: {}, features: {:?}",
49 1893 : fail::has_failpoints(),
50 1893 : FEATURES,
51 1893 : )
52 1893 : }
53 :
54 1317 : fn main() -> anyhow::Result<()> {
55 1317 : let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
56 1317 :
57 1317 : let arg_matches = cli().get_matches();
58 1317 :
59 1317 : if arg_matches.get_flag("enabled-features") {
60 0 : println!("{{\"features\": {FEATURES:?} }}");
61 0 : return Ok(());
62 1317 : }
63 1317 :
64 1317 : let workdir = arg_matches
65 1317 : .get_one::<String>("workdir")
66 1317 : .map(Path::new)
67 1317 : .unwrap_or_else(|| Path::new(".neon"));
68 1317 : let workdir = workdir
69 1317 : .canonicalize()
70 1317 : .with_context(|| format!("Error opening workdir '{}'", workdir.display()))?;
71 :
72 1317 : let cfg_file_path = workdir.join("pageserver.toml");
73 1317 :
74 1317 : // Set CWD to workdir for non-daemon modes
75 1317 : env::set_current_dir(&workdir).with_context(|| {
76 0 : format!(
77 0 : "Failed to set application's current dir to '{}'",
78 0 : workdir.display()
79 0 : )
80 1317 : })?;
81 :
82 1317 : let conf = match initialize_config(&cfg_file_path, arg_matches, &workdir)? {
83 575 : ControlFlow::Continue(conf) => conf,
84 : ControlFlow::Break(()) => {
85 370 : info!("Pageserver config init successful");
86 370 : return Ok(());
87 : }
88 : };
89 :
90 : // Initialize logging.
91 : //
92 : // It must be initialized before the custom panic hook is installed below.
93 : //
94 : // Regarding tracing_error enablement: at this time, we only use the
95 : // tracing_error crate to debug_assert that log spans contain tenant and timeline ids.
96 : // See `debug_assert_current_span_has_tenant_and_timeline_id` in the timeline module
97 575 : let tracing_error_layer_enablement = if cfg!(debug_assertions) {
98 575 : TracingErrorLayerEnablement::EnableWithRustLogFilter
99 : } else {
100 0 : TracingErrorLayerEnablement::Disabled
101 : };
102 575 : logging::init(conf.log_format, tracing_error_layer_enablement)?;
103 :
104 : // mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
105 : // disarming this hook on pageserver, because we never tear down tracing.
106 575 : logging::replace_panic_hook_with_tracing_panic_hook().forget();
107 575 :
108 575 : // initialize sentry if SENTRY_DSN is provided
109 575 : let _sentry_guard = init_sentry(
110 575 : Some(GIT_VERSION.into()),
111 575 : &[("node_id", &conf.id.to_string())],
112 575 : );
113 575 :
114 575 : let tenants_path = conf.tenants_path();
115 575 : if !tenants_path.exists() {
116 369 : utils::crashsafe::create_dir_all(conf.tenants_path()).with_context(|| {
117 0 : format!(
118 0 : "Failed to create tenants root dir at '{}'",
119 0 : tenants_path.display()
120 0 : )
121 369 : })?;
122 206 : }
123 :
124 : // Initialize up failpoints support
125 575 : let scenario = pageserver::failpoint_support::init();
126 575 :
127 575 : // Basic initialization of things that don't change after startup
128 575 : virtual_file::init(conf.max_file_descriptors);
129 575 : page_cache::init(conf.page_cache_size);
130 575 :
131 575 : start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
132 :
133 0 : scenario.teardown();
134 0 : Ok(())
135 373 : }
136 :
137 948 : fn initialize_config(
138 948 : cfg_file_path: &Path,
139 948 : arg_matches: clap::ArgMatches,
140 948 : workdir: &Path,
141 948 : ) -> anyhow::Result<ControlFlow<(), &'static PageServerConf>> {
142 948 : let init = arg_matches.get_flag("init");
143 948 : let update_config = init || arg_matches.get_flag("update-config");
144 :
145 948 : let (mut toml, config_file_exists) = if cfg_file_path.is_file() {
146 577 : if init {
147 1 : anyhow::bail!(
148 1 : "Config file '{}' already exists, cannot init it, use --update-config to update it",
149 1 : cfg_file_path.display()
150 1 : );
151 576 : }
152 : // Supplement the CLI arguments with the config file
153 576 : let cfg_file_contents = std::fs::read_to_string(cfg_file_path).with_context(|| {
154 0 : format!(
155 0 : "Failed to read pageserver config at '{}'",
156 0 : cfg_file_path.display()
157 0 : )
158 576 : })?;
159 : (
160 576 : cfg_file_contents
161 576 : .parse::<toml_edit::Document>()
162 576 : .with_context(|| {
163 0 : format!(
164 0 : "Failed to parse '{}' as pageserver config",
165 0 : cfg_file_path.display()
166 0 : )
167 576 : })?,
168 : true,
169 : )
170 371 : } else if cfg_file_path.exists() {
171 0 : anyhow::bail!(
172 0 : "Config file '{}' exists but is not a regular file",
173 0 : cfg_file_path.display()
174 0 : );
175 : } else {
176 : // We're initializing the tenant, so there's no config file yet
177 : (
178 371 : DEFAULT_CONFIG_FILE
179 371 : .parse::<toml_edit::Document>()
180 371 : .context("could not parse built-in config file")?,
181 : false,
182 : )
183 : };
184 :
185 947 : if let Some(values) = arg_matches.get_many::<String>("config-override") {
186 8226 : for option_line in values {
187 7280 : let doc = toml_edit::Document::from_str(option_line).with_context(|| {
188 0 : format!("Option '{option_line}' could not be parsed as a toml document")
189 7280 : })?;
190 :
191 7296 : for (key, item) in doc.iter() {
192 7296 : if config_file_exists && update_config && key == "id" && toml.contains_key(key) {
193 1 : anyhow::bail!("Pageserver config file exists at '{}' and has node id already, it cannot be overridden", cfg_file_path.display());
194 7295 : }
195 7295 : toml.insert(key, item.clone());
196 : }
197 : }
198 0 : }
199 :
200 946 : debug!("Resulting toml: {toml}");
201 946 : let conf = PageServerConf::parse_and_validate(&toml, workdir)
202 946 : .context("Failed to parse pageserver configuration")?;
203 :
204 945 : if update_config {
205 370 : info!("Writing pageserver config to '{}'", cfg_file_path.display());
206 :
207 370 : std::fs::write(cfg_file_path, toml.to_string()).with_context(|| {
208 0 : format!(
209 0 : "Failed to write pageserver config to '{}'",
210 0 : cfg_file_path.display()
211 0 : )
212 370 : })?;
213 370 : info!(
214 0 : "Config successfully written to '{}'",
215 0 : cfg_file_path.display()
216 0 : )
217 575 : }
218 :
219 945 : Ok(if init {
220 370 : ControlFlow::Break(())
221 : } else {
222 575 : ControlFlow::Continue(Box::leak(Box::new(conf)))
223 : })
224 948 : }
225 :
226 575 : fn start_pageserver(
227 575 : launch_ts: &'static LaunchTimestamp,
228 575 : conf: &'static PageServerConf,
229 575 : ) -> anyhow::Result<()> {
230 575 : // Monotonic time for later calculating startup duration
231 575 : let started_startup_at = Instant::now();
232 575 :
233 2604 : let startup_checkpoint = move |phase: &str, human_phase: &str| {
234 2604 : let elapsed = started_startup_at.elapsed();
235 2604 : let secs = elapsed.as_secs_f64();
236 2604 : STARTUP_DURATION.with_label_values(&[phase]).set(secs);
237 2604 : info!(
238 2604 : elapsed_ms = elapsed.as_millis(),
239 2604 : "{human_phase} ({secs:.3}s since start)"
240 2604 : )
241 2604 : };
242 :
243 : // Print version and launch timestamp to the log,
244 : // and expose them as prometheus metrics.
245 : // A changed version string indicates changed software.
246 : // A changed launch timestamp indicates a pageserver restart.
247 575 : info!(
248 575 : "version: {} launch_timestamp: {}",
249 575 : version(),
250 575 : launch_ts.to_string()
251 575 : );
252 575 : set_build_info_metric(GIT_VERSION);
253 575 : set_launch_timestamp_metric(launch_ts);
254 575 : pageserver::preinitialize_metrics();
255 575 :
256 575 : // If any failpoints were set from FAILPOINTS environment variable,
257 575 : // print them to the log for debugging purposes
258 575 : let failpoints = fail::list();
259 575 : if !failpoints.is_empty() {
260 10 : info!(
261 10 : "started with failpoints: {}",
262 10 : failpoints
263 10 : .iter()
264 10 : .map(|(name, actions)| format!("{name}={actions}"))
265 10 : .collect::<Vec<String>>()
266 10 : .join(";")
267 10 : )
268 565 : }
269 :
270 : // Create and lock PID file. This ensures that there cannot be more than one
271 : // pageserver process running at the same time.
272 575 : let lock_file_path = conf.workdir.join(PID_FILE_NAME);
273 575 : let lock_file =
274 575 : utils::pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
275 575 : info!("Claimed pid file at {lock_file_path:?}");
276 :
277 : // Ensure that the lock file is held even if the main thread of the process panics.
278 : // We need to release the lock file only when the process exits.
279 575 : std::mem::forget(lock_file);
280 575 :
281 575 : // Bind the HTTP and libpq ports early, so that if they are in use by some other
282 575 : // process, we error out early.
283 575 : let http_addr = &conf.listen_http_addr;
284 575 : info!("Starting pageserver http handler on {http_addr}");
285 575 : let http_listener = tcp_listener::bind(http_addr)?;
286 :
287 575 : let pg_addr = &conf.listen_pg_addr;
288 575 : info!("Starting pageserver pg protocol handler on {pg_addr}");
289 575 : let pageserver_listener = tcp_listener::bind(pg_addr)?;
290 :
291 : // Launch broker client
292 : // The storage_broker::connect call needs to happen inside a tokio runtime thread.
293 575 : let broker_client = WALRECEIVER_RUNTIME
294 575 : .block_on(async {
295 575 : // Note: we do not attempt connecting here (but validate endpoints sanity).
296 575 : storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
297 575 : })
298 575 : .with_context(|| {
299 0 : format!(
300 0 : "create broker client for uri={:?} keepalive_interval={:?}",
301 0 : &conf.broker_endpoint, conf.broker_keepalive_interval,
302 0 : )
303 575 : })?;
304 :
305 : // Initialize authentication for incoming connections
306 : let http_auth;
307 : let pg_auth;
308 575 : if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
309 : // unwrap is ok because check is performed when creating config, so path is set and file exists
310 9 : let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
311 9 : info!(
312 9 : "Loading public key for verifying JWT tokens from {:#?}",
313 9 : key_path
314 9 : );
315 9 : let auth: Arc<JwtAuth> = Arc::new(JwtAuth::from_key_path(key_path)?);
316 :
317 9 : http_auth = match &conf.http_auth_type {
318 0 : AuthType::Trust => None,
319 9 : AuthType::NeonJWT => Some(auth.clone()),
320 : };
321 9 : pg_auth = match &conf.pg_auth_type {
322 0 : AuthType::Trust => None,
323 9 : AuthType::NeonJWT => Some(auth),
324 : };
325 566 : } else {
326 566 : http_auth = None;
327 566 : pg_auth = None;
328 566 : }
329 575 : info!("Using auth for http API: {:#?}", conf.http_auth_type);
330 575 : info!("Using auth for pg connections: {:#?}", conf.pg_auth_type);
331 :
332 566 : match var("NEON_AUTH_TOKEN") {
333 9 : Ok(v) => {
334 9 : info!("Loaded JWT token for authentication with Safekeeper");
335 9 : pageserver::config::SAFEKEEPER_AUTH_TOKEN
336 9 : .set(Arc::new(v))
337 9 : .map_err(|_| anyhow!("Could not initialize SAFEKEEPER_AUTH_TOKEN"))?;
338 : }
339 : Err(VarError::NotPresent) => {
340 566 : info!("No JWT token for authentication with Safekeeper detected");
341 : }
342 0 : Err(e) => {
343 0 : return Err(e).with_context(|| {
344 0 : "Failed to either load to detect non-present NEON_AUTH_TOKEN environment variable"
345 0 : })
346 : }
347 : };
348 :
349 : // Set up remote storage client
350 575 : let remote_storage = create_remote_storage_client(conf)?;
351 :
352 : // Up to this point no significant I/O has been done: this should have been fast. Record
353 : // duration prior to starting I/O intensive phase of startup.
354 575 : startup_checkpoint("initial", "Starting loading tenants");
355 575 : STARTUP_IS_LOADING.set(1);
356 575 :
357 575 : // Startup staging or optimizing:
358 575 : //
359 575 : // We want to minimize downtime for `page_service` connections, and trying not to overload
360 575 : // BACKGROUND_RUNTIME by doing initial compactions and initial logical sizes at the same time.
361 575 : //
362 575 : // init_done_rx will notify when all initial load operations have completed.
363 575 : //
364 575 : // background_jobs_can_start (same name used to hold off background jobs from starting at
365 575 : // consumer side) will be dropped once we can start the background jobs. Currently it is behind
366 575 : // completing all initial logical size calculations (init_logical_size_done_rx) and a timeout
367 575 : // (background_task_maximum_delay).
368 575 : let (init_done_tx, init_done_rx) = utils::completion::channel();
369 575 :
370 575 : let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel();
371 575 :
372 575 : let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();
373 575 :
374 575 : let order = pageserver::InitializationOrder {
375 575 : initial_tenant_load: Some(init_done_tx),
376 575 : initial_logical_size_can_start: init_done_rx.clone(),
377 575 : initial_logical_size_attempt: Some(init_logical_size_done_tx),
378 575 : background_jobs_can_start: background_jobs_barrier.clone(),
379 575 : };
380 575 :
381 575 : // Scan the local 'tenants/' directory and start loading the tenants
382 575 : let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
383 575 :
384 575 : BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
385 575 : conf,
386 575 : TenantSharedResources {
387 575 : broker_client: broker_client.clone(),
388 575 : remote_storage: remote_storage.clone(),
389 575 : },
390 575 : order,
391 575 : ))?;
392 :
393 575 : BACKGROUND_RUNTIME.spawn({
394 575 : let init_done_rx = init_done_rx;
395 575 : let shutdown_pageserver = shutdown_pageserver.clone();
396 575 : let drive_init = async move {
397 575 : // NOTE: unlike many futures in pageserver, this one is cancellation-safe
398 575 : let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial load completed"));
399 575 :
400 575 : init_done_rx.wait().await;
401 575 : startup_checkpoint("initial_tenant_load", "Initial load completed");
402 575 : STARTUP_IS_LOADING.set(0);
403 575 :
404 575 : // initial logical sizes can now start, as they were waiting on init_done_rx.
405 575 :
406 575 : scopeguard::ScopeGuard::into_inner(guard);
407 575 :
408 575 : let mut init_sizes_done = std::pin::pin!(init_logical_size_done_rx.wait());
409 575 :
410 575 : let timeout = conf.background_task_maximum_delay;
411 575 :
412 575 : let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed"));
413 :
414 575 : let init_sizes_done = match tokio::time::timeout(timeout, &mut init_sizes_done).await {
415 : Ok(_) => {
416 478 : startup_checkpoint("initial_logical_sizes", "Initial logical sizes completed");
417 478 : None
418 : }
419 : Err(_) => {
420 20 : tracing::info!(
421 20 : timeout_millis = timeout.as_millis(),
422 20 : "Initial logical size timeout elapsed; starting background jobs"
423 20 : );
424 20 : Some(init_sizes_done)
425 : }
426 : };
427 :
428 498 : scopeguard::ScopeGuard::into_inner(guard);
429 498 :
430 498 : // allow background jobs to start
431 498 : drop(background_jobs_can_start);
432 498 : startup_checkpoint("background_jobs_can_start", "Starting background jobs");
433 :
434 498 : if let Some(init_sizes_done) = init_sizes_done {
435 : // ending up here is not a bug; at the latest logical sizes will be queried by
436 : // consumption metrics.
437 20 : let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed"));
438 20 : init_sizes_done.await;
439 :
440 0 : scopeguard::ScopeGuard::into_inner(guard);
441 0 :
442 0 : startup_checkpoint("initial_logical_sizes", "Initial logical sizes completed after timeout (background jobs already started)");
443 :
444 478 : }
445 :
446 478 : startup_checkpoint("complete", "Startup complete");
447 575 : };
448 575 :
449 575 : async move {
450 575 : let mut drive_init = std::pin::pin!(drive_init);
451 575 : // just race these tasks
452 830 : tokio::select! {
453 830 : _ = shutdown_pageserver.cancelled() => {},
454 830 : _ = &mut drive_init => {},
455 830 : }
456 575 : }
457 575 : });
458 575 :
459 575 : // shared state between the disk-usage backed eviction background task and the http endpoint
460 575 : // that allows triggering disk-usage based eviction manually. note that the http endpoint
461 575 : // is still accessible even if background task is not configured as long as remote storage has
462 575 : // been configured.
463 575 : let disk_usage_eviction_state: Arc<disk_usage_eviction_task::State> = Arc::default();
464 :
465 575 : if let Some(remote_storage) = &remote_storage {
466 303 : launch_disk_usage_global_eviction_task(
467 303 : conf,
468 303 : remote_storage.clone(),
469 303 : disk_usage_eviction_state.clone(),
470 303 : background_jobs_barrier.clone(),
471 303 : )?;
472 272 : }
473 :
474 : // Start up the service to handle HTTP mgmt API request. We created the
475 : // listener earlier already.
476 : {
477 575 : let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
478 :
479 575 : let router = http::make_router(
480 575 : conf,
481 575 : launch_ts,
482 575 : http_auth,
483 575 : broker_client.clone(),
484 575 : remote_storage,
485 575 : disk_usage_eviction_state,
486 575 : )?
487 575 : .build()
488 575 : .map_err(|err| anyhow!(err))?;
489 575 : let service = utils::http::RouterService::new(router).unwrap();
490 575 : let server = hyper::Server::from_tcp(http_listener)?
491 575 : .serve(service)
492 575 : .with_graceful_shutdown(task_mgr::shutdown_watcher());
493 575 :
494 575 : task_mgr::spawn(
495 575 : MGMT_REQUEST_RUNTIME.handle(),
496 575 : TaskKind::HttpEndpointListener,
497 575 : None,
498 575 : None,
499 575 : "http endpoint listener",
500 575 : true,
501 575 : async {
502 2736 : server.await?;
503 148 : Ok(())
504 575 : },
505 575 : );
506 : }
507 :
508 575 : if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
509 4 : let background_jobs_barrier = background_jobs_barrier;
510 4 : let metrics_ctx = RequestContext::todo_child(
511 4 : TaskKind::MetricsCollection,
512 4 : // This task itself shouldn't download anything.
513 4 : // The actual size calculation does need downloads, and
514 4 : // creates a child context with the right DownloadBehavior.
515 4 : DownloadBehavior::Error,
516 4 : );
517 4 : task_mgr::spawn(
518 4 : crate::BACKGROUND_RUNTIME.handle(),
519 4 : TaskKind::MetricsCollection,
520 4 : None,
521 4 : None,
522 4 : "consumption metrics collection",
523 4 : true,
524 4 : async move {
525 4 : // first wait until background jobs are cleared to launch.
526 4 : //
527 4 : // this is because we only process active tenants and timelines, and the
528 4 : // Timeline::get_current_logical_size will spawn the logical size calculation,
529 4 : // which will not be rate-limited.
530 4 : let cancel = task_mgr::shutdown_token();
531 4 :
532 5 : tokio::select! {
533 5 : _ = cancel.cancelled() => { return Ok(()); },
534 5 : _ = background_jobs_barrier.wait() => {}
535 5 : };
536 :
537 4 : pageserver::consumption_metrics::collect_metrics(
538 4 : metric_collection_endpoint,
539 4 : conf.metric_collection_interval,
540 4 : conf.cached_metric_collection_interval,
541 4 : conf.synthetic_size_calculation_interval,
542 4 : conf.id,
543 4 : metrics_ctx,
544 4 : )
545 4 : .instrument(info_span!("metrics_collection"))
546 204 : .await?;
547 1 : Ok(())
548 4 : },
549 4 : );
550 571 : }
551 :
552 : // Spawn a task to listen for libpq connections. It will spawn further tasks
553 : // for each connection. We created the listener earlier already.
554 575 : {
555 575 : let libpq_ctx = RequestContext::todo_child(
556 575 : TaskKind::LibpqEndpointListener,
557 575 : // listener task shouldn't need to download anything. (We will
558 575 : // create a separate sub-contexts for each connection, with their
559 575 : // own download behavior. This context is used only to listen and
560 575 : // accept connections.)
561 575 : DownloadBehavior::Error,
562 575 : );
563 575 : task_mgr::spawn(
564 575 : COMPUTE_REQUEST_RUNTIME.handle(),
565 575 : TaskKind::LibpqEndpointListener,
566 575 : None,
567 575 : None,
568 575 : "libpq endpoint listener",
569 575 : true,
570 575 : async move {
571 575 : page_service::libpq_listener_main(
572 575 : conf,
573 575 : broker_client,
574 575 : pg_auth,
575 575 : pageserver_listener,
576 575 : conf.pg_auth_type,
577 575 : libpq_ctx,
578 575 : )
579 5400 : .await
580 575 : },
581 575 : );
582 575 : }
583 575 :
584 575 : let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
585 575 :
586 575 : // All started up! Now just sit and wait for shutdown signal.
587 575 : ShutdownSignals::handle(|signal| match signal {
588 : Signal::Quit => {
589 421 : info!(
590 421 : "Got {}. Terminating in immediate shutdown mode",
591 421 : signal.name()
592 421 : );
593 421 : std::process::exit(111);
594 : }
595 :
596 : Signal::Interrupt | Signal::Terminate => {
597 148 : info!(
598 148 : "Got {}. Terminating gracefully in fast shutdown mode",
599 148 : signal.name()
600 148 : );
601 :
602 : // This cancels the `shutdown_pageserver` cancellation tree.
603 : // Right now that tree doesn't reach very far, and `task_mgr` is used instead.
604 : // The plan is to change that over time.
605 148 : shutdown_pageserver.take();
606 148 : BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0));
607 148 : unreachable!()
608 : }
609 575 : })
610 575 : }
611 :
612 575 : fn create_remote_storage_client(
613 575 : conf: &'static PageServerConf,
614 575 : ) -> anyhow::Result<Option<GenericRemoteStorage>> {
615 575 : let config = if let Some(config) = &conf.remote_storage_config {
616 303 : config
617 : } else {
618 : // No remote storage configured.
619 272 : return Ok(None);
620 : };
621 :
622 : // Create the client
623 303 : let mut remote_storage = GenericRemoteStorage::from_config(config)?;
624 :
625 : // If `test_remote_failures` is non-zero, wrap the client with a
626 : // wrapper that simulates failures.
627 303 : if conf.test_remote_failures > 0 {
628 56 : if !cfg!(feature = "testing") {
629 0 : anyhow::bail!("test_remote_failures option is not available because pageserver was compiled without the 'testing' feature");
630 56 : }
631 56 : info!(
632 56 : "Simulating remote failures for first {} attempts of each op",
633 56 : conf.test_remote_failures
634 56 : );
635 56 : remote_storage =
636 56 : GenericRemoteStorage::unreliable_wrapper(remote_storage, conf.test_remote_failures);
637 247 : }
638 :
639 303 : Ok(Some(remote_storage))
640 575 : }
641 :
642 1318 : fn cli() -> Command {
643 1318 : Command::new("Neon page server")
644 1318 : .about("Materializes WAL stream to pages and serves them to the postgres")
645 1318 : .version(version())
646 1318 : .arg(
647 1318 : Arg::new("init")
648 1318 : .long("init")
649 1318 : .action(ArgAction::SetTrue)
650 1318 : .help("Initialize pageserver with all given config overrides"),
651 1318 : )
652 1318 : .arg(
653 1318 : Arg::new("workdir")
654 1318 : .short('D')
655 1318 : .long("workdir")
656 1318 : .help("Working directory for the pageserver"),
657 1318 : )
658 1318 : // See `settings.md` for more details on the extra configuration patameters pageserver can process
659 1318 : .arg(
660 1318 : Arg::new("config-override")
661 1318 : .short('c')
662 1318 : .num_args(1)
663 1318 : .action(ArgAction::Append)
664 1318 : .help("Additional configuration overrides of the ones from the toml config file (or new ones to add there). \
665 1318 : Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"),
666 1318 : )
667 1318 : .arg(
668 1318 : Arg::new("update-config")
669 1318 : .long("update-config")
670 1318 : .action(ArgAction::SetTrue)
671 1318 : .help("Update the config file when started"),
672 1318 : )
673 1318 : .arg(
674 1318 : Arg::new("enabled-features")
675 1318 : .long("enabled-features")
676 1318 : .action(ArgAction::SetTrue)
677 1318 : .help("Show enabled compile time features"),
678 1318 : )
679 1318 : }
680 :
681 1 : #[test]
682 1 : fn verify_cli() {
683 1 : cli().debug_assert();
684 1 : }
|