TLA Line data Source code
1 : //! Main entry point for the Page Server executable.
2 :
3 : use std::env::{var, VarError};
4 : use std::sync::Arc;
5 : use std::{env, ops::ControlFlow, str::FromStr};
6 :
7 : use anyhow::{anyhow, Context};
8 : use camino::Utf8Path;
9 : use clap::{Arg, ArgAction, Command};
10 :
11 : use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
12 : use pageserver::control_plane_client::ControlPlaneClient;
13 : use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
14 : use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
15 : use pageserver::task_mgr::WALRECEIVER_RUNTIME;
16 : use pageserver::tenant::TenantSharedResources;
17 : use remote_storage::GenericRemoteStorage;
18 : use tokio::time::Instant;
19 : use tracing::*;
20 :
21 : use metrics::set_build_info_metric;
22 : use pageserver::{
23 : config::{defaults::*, PageServerConf},
24 : context::{DownloadBehavior, RequestContext},
25 : deletion_queue::DeletionQueue,
26 : http, page_cache, page_service, task_mgr,
27 : task_mgr::TaskKind,
28 : task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
29 : tenant::mgr,
30 : virtual_file,
31 : };
32 : use postgres_backend::AuthType;
33 : use utils::logging::TracingErrorLayerEnablement;
34 : use utils::signals::ShutdownSignals;
35 : use utils::{
36 : auth::JwtAuth, logging, project_git_version, sentry_init::init_sentry, signals::Signal,
37 : tcp_listener,
38 : };
39 :
40 : project_git_version!(GIT_VERSION);
41 :
42 : const PID_FILE_NAME: &str = "pageserver.pid";
43 :
44 : const FEATURES: &[&str] = &[
45 : #[cfg(feature = "testing")]
46 : "testing",
47 : ];
48 :
49 CBC 1823 : fn version() -> String {
50 1823 : format!(
51 1823 : "{GIT_VERSION} failpoints: {}, features: {:?}",
52 1823 : fail::has_failpoints(),
53 1823 : FEATURES,
54 1823 : )
55 1823 : }
56 :
57 1262 : fn main() -> anyhow::Result<()> {
58 1262 : let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
59 1262 :
60 1262 : let arg_matches = cli().get_matches();
61 1262 :
62 1262 : if arg_matches.get_flag("enabled-features") {
63 UBC 0 : println!("{{\"features\": {FEATURES:?} }}");
64 0 : return Ok(());
65 CBC 1262 : }
66 1262 :
67 1262 : let workdir = arg_matches
68 1262 : .get_one::<String>("workdir")
69 1262 : .map(Utf8Path::new)
70 1262 : .unwrap_or_else(|| Utf8Path::new(".neon"));
71 1262 : let workdir = workdir
72 1262 : .canonicalize_utf8()
73 1262 : .with_context(|| format!("Error opening workdir '{workdir}'"))?;
74 :
75 1262 : let cfg_file_path = workdir.join("pageserver.toml");
76 1262 :
77 1262 : // Set CWD to workdir for non-daemon modes
78 1262 : env::set_current_dir(&workdir)
79 1262 : .with_context(|| format!("Failed to set application's current dir to '{workdir}'"))?;
80 :
81 1262 : let conf = match initialize_config(&cfg_file_path, arg_matches, &workdir)? {
82 560 : ControlFlow::Continue(conf) => conf,
83 : ControlFlow::Break(()) => {
84 350 : info!("Pageserver config init successful");
85 350 : return Ok(());
86 : }
87 : };
88 :
89 : // Initialize logging.
90 : //
91 : // It must be initialized before the custom panic hook is installed below.
92 : //
93 : // Regarding tracing_error enablement: at this time, we only use the
94 : // tracing_error crate to debug_assert that log spans contain tenant and timeline ids.
95 : // See `debug_assert_current_span_has_tenant_and_timeline_id` in the timeline module
96 560 : let tracing_error_layer_enablement = if cfg!(debug_assertions) {
97 560 : TracingErrorLayerEnablement::EnableWithRustLogFilter
98 : } else {
99 UBC 0 : TracingErrorLayerEnablement::Disabled
100 : };
101 CBC 560 : logging::init(conf.log_format, tracing_error_layer_enablement)?;
102 :
103 : // mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
104 : // disarming this hook on pageserver, because we never tear down tracing.
105 560 : logging::replace_panic_hook_with_tracing_panic_hook().forget();
106 560 :
107 560 : // initialize sentry if SENTRY_DSN is provided
108 560 : let _sentry_guard = init_sentry(
109 560 : Some(GIT_VERSION.into()),
110 560 : &[("node_id", &conf.id.to_string())],
111 560 : );
112 560 :
113 560 : let tenants_path = conf.tenants_path();
114 560 : if !tenants_path.exists() {
115 349 : utils::crashsafe::create_dir_all(conf.tenants_path())
116 349 : .with_context(|| format!("Failed to create tenants root dir at '{tenants_path}'"))?;
117 211 : }
118 :
119 : // Initialize up failpoints support
120 560 : let scenario = pageserver::failpoint_support::init();
121 560 :
122 560 : // Basic initialization of things that don't change after startup
123 560 : virtual_file::init(conf.max_file_descriptors);
124 560 : page_cache::init(conf.page_cache_size);
125 560 :
126 560 : start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
127 :
128 UBC 0 : scenario.teardown();
129 0 : Ok(())
130 CBC 353 : }
131 :
132 913 : fn initialize_config(
133 913 : cfg_file_path: &Utf8Path,
134 913 : arg_matches: clap::ArgMatches,
135 913 : workdir: &Utf8Path,
136 913 : ) -> anyhow::Result<ControlFlow<(), &'static PageServerConf>> {
137 913 : let init = arg_matches.get_flag("init");
138 913 : let update_config = init || arg_matches.get_flag("update-config");
139 :
140 913 : let (mut toml, config_file_exists) = if cfg_file_path.is_file() {
141 562 : if init {
142 1 : anyhow::bail!(
143 1 : "Config file '{cfg_file_path}' already exists, cannot init it, use --update-config to update it",
144 1 : );
145 561 : }
146 : // Supplement the CLI arguments with the config file
147 561 : let cfg_file_contents = std::fs::read_to_string(cfg_file_path)
148 561 : .with_context(|| format!("Failed to read pageserver config at '{cfg_file_path}'"))?;
149 : (
150 561 : cfg_file_contents
151 561 : .parse::<toml_edit::Document>()
152 561 : .with_context(|| {
153 UBC 0 : format!("Failed to parse '{cfg_file_path}' as pageserver config")
154 CBC 561 : })?,
155 : true,
156 : )
157 351 : } else if cfg_file_path.exists() {
158 UBC 0 : anyhow::bail!("Config file '{cfg_file_path}' exists but is not a regular file");
159 : } else {
160 : // We're initializing the tenant, so there's no config file yet
161 : (
162 CBC 351 : DEFAULT_CONFIG_FILE
163 351 : .parse::<toml_edit::Document>()
164 351 : .context("could not parse built-in config file")?,
165 : false,
166 : )
167 : };
168 :
169 912 : if let Some(values) = arg_matches.get_many::<String>("config-override") {
170 8408 : for option_line in values {
171 7497 : let doc = toml_edit::Document::from_str(option_line).with_context(|| {
172 UBC 0 : format!("Option '{option_line}' could not be parsed as a toml document")
173 CBC 7497 : })?;
174 :
175 7523 : for (key, item) in doc.iter() {
176 7523 : if config_file_exists && update_config && key == "id" && toml.contains_key(key) {
177 1 : anyhow::bail!("Pageserver config file exists at '{cfg_file_path}' and has node id already, it cannot be overridden");
178 7522 : }
179 7522 : toml.insert(key, item.clone());
180 : }
181 : }
182 UBC 0 : }
183 :
184 CBC 911 : debug!("Resulting toml: {toml}");
185 911 : let conf = PageServerConf::parse_and_validate(&toml, workdir)
186 911 : .context("Failed to parse pageserver configuration")?;
187 :
188 910 : if update_config {
189 350 : info!("Writing pageserver config to '{cfg_file_path}'");
190 :
191 350 : std::fs::write(cfg_file_path, toml.to_string())
192 350 : .with_context(|| format!("Failed to write pageserver config to '{cfg_file_path}'"))?;
193 350 : info!("Config successfully written to '{cfg_file_path}'")
194 560 : }
195 :
196 910 : Ok(if init {
197 350 : ControlFlow::Break(())
198 : } else {
199 560 : ControlFlow::Continue(Box::leak(Box::new(conf)))
200 : })
201 913 : }
202 :
203 560 : fn start_pageserver(
204 560 : launch_ts: &'static LaunchTimestamp,
205 560 : conf: &'static PageServerConf,
206 560 : ) -> anyhow::Result<()> {
207 560 : // Monotonic time for later calculating startup duration
208 560 : let started_startup_at = Instant::now();
209 560 :
210 3110 : let startup_checkpoint = move |phase: &str, human_phase: &str| {
211 3110 : let elapsed = started_startup_at.elapsed();
212 3110 : let secs = elapsed.as_secs_f64();
213 3110 : STARTUP_DURATION.with_label_values(&[phase]).set(secs);
214 3110 : info!(
215 3110 : elapsed_ms = elapsed.as_millis(),
216 3110 : "{human_phase} ({secs:.3}s since start)"
217 3110 : )
218 3110 : };
219 :
220 : // Print version and launch timestamp to the log,
221 : // and expose them as prometheus metrics.
222 : // A changed version string indicates changed software.
223 : // A changed launch timestamp indicates a pageserver restart.
224 560 : info!(
225 560 : "version: {} launch_timestamp: {}",
226 560 : version(),
227 560 : launch_ts.to_string()
228 560 : );
229 560 : set_build_info_metric(GIT_VERSION);
230 560 : set_launch_timestamp_metric(launch_ts);
231 560 : pageserver::preinitialize_metrics();
232 560 :
233 560 : // If any failpoints were set from FAILPOINTS environment variable,
234 560 : // print them to the log for debugging purposes
235 560 : let failpoints = fail::list();
236 560 : if !failpoints.is_empty() {
237 11 : info!(
238 11 : "started with failpoints: {}",
239 11 : failpoints
240 11 : .iter()
241 11 : .map(|(name, actions)| format!("{name}={actions}"))
242 11 : .collect::<Vec<String>>()
243 11 : .join(";")
244 11 : )
245 549 : }
246 :
247 : // Create and lock PID file. This ensures that there cannot be more than one
248 : // pageserver process running at the same time.
249 560 : let lock_file_path = conf.workdir.join(PID_FILE_NAME);
250 560 : let lock_file =
251 560 : utils::pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
252 560 : info!("Claimed pid file at {lock_file_path:?}");
253 :
254 : // Ensure that the lock file is held even if the main thread of the process panics.
255 : // We need to release the lock file only when the process exits.
256 560 : std::mem::forget(lock_file);
257 560 :
258 560 : // Bind the HTTP and libpq ports early, so that if they are in use by some other
259 560 : // process, we error out early.
260 560 : let http_addr = &conf.listen_http_addr;
261 560 : info!("Starting pageserver http handler on {http_addr}");
262 560 : let http_listener = tcp_listener::bind(http_addr)?;
263 :
264 560 : let pg_addr = &conf.listen_pg_addr;
265 560 : info!("Starting pageserver pg protocol handler on {pg_addr}");
266 560 : let pageserver_listener = tcp_listener::bind(pg_addr)?;
267 :
268 : // Launch broker client
269 : // The storage_broker::connect call needs to happen inside a tokio runtime thread.
270 560 : let broker_client = WALRECEIVER_RUNTIME
271 560 : .block_on(async {
272 560 : // Note: we do not attempt connecting here (but validate endpoints sanity).
273 560 : storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
274 560 : })
275 560 : .with_context(|| {
276 UBC 0 : format!(
277 0 : "create broker client for uri={:?} keepalive_interval={:?}",
278 0 : &conf.broker_endpoint, conf.broker_keepalive_interval,
279 0 : )
280 CBC 560 : })?;
281 :
282 : // Initialize authentication for incoming connections
283 : let http_auth;
284 : let pg_auth;
285 560 : if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
286 : // unwrap is ok because check is performed when creating config, so path is set and file exists
287 9 : let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
288 9 : info!(
289 9 : "Loading public key for verifying JWT tokens from {:#?}",
290 9 : key_path
291 9 : );
292 9 : let auth: Arc<JwtAuth> = Arc::new(JwtAuth::from_key_path(key_path)?);
293 :
294 9 : http_auth = match &conf.http_auth_type {
295 UBC 0 : AuthType::Trust => None,
296 CBC 9 : AuthType::NeonJWT => Some(auth.clone()),
297 : };
298 9 : pg_auth = match &conf.pg_auth_type {
299 UBC 0 : AuthType::Trust => None,
300 CBC 9 : AuthType::NeonJWT => Some(auth),
301 : };
302 551 : } else {
303 551 : http_auth = None;
304 551 : pg_auth = None;
305 551 : }
306 560 : info!("Using auth for http API: {:#?}", conf.http_auth_type);
307 560 : info!("Using auth for pg connections: {:#?}", conf.pg_auth_type);
308 :
309 551 : match var("NEON_AUTH_TOKEN") {
310 9 : Ok(v) => {
311 9 : info!("Loaded JWT token for authentication with Safekeeper");
312 9 : pageserver::config::SAFEKEEPER_AUTH_TOKEN
313 9 : .set(Arc::new(v))
314 9 : .map_err(|_| anyhow!("Could not initialize SAFEKEEPER_AUTH_TOKEN"))?;
315 : }
316 : Err(VarError::NotPresent) => {
317 551 : info!("No JWT token for authentication with Safekeeper detected");
318 : }
319 UBC 0 : Err(e) => {
320 0 : return Err(e).with_context(|| {
321 0 : "Failed to either load to detect non-present NEON_AUTH_TOKEN environment variable"
322 0 : })
323 : }
324 : };
325 :
326 : // Top-level cancellation token for the process
327 CBC 560 : let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
328 :
329 : // Set up remote storage client
330 560 : let remote_storage = create_remote_storage_client(conf)?;
331 :
332 : // Set up deletion queue
333 560 : let (deletion_queue, deletion_workers) = DeletionQueue::new(
334 560 : remote_storage.clone(),
335 560 : ControlPlaneClient::new(conf, &shutdown_pageserver),
336 560 : conf,
337 560 : );
338 560 : if let Some(deletion_workers) = deletion_workers {
339 560 : deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
340 560 : }
341 :
342 : // Up to this point no significant I/O has been done: this should have been fast. Record
343 : // duration prior to starting I/O intensive phase of startup.
344 560 : startup_checkpoint("initial", "Starting loading tenants");
345 560 : STARTUP_IS_LOADING.set(1);
346 560 :
347 560 : // Startup staging or optimizing:
348 560 : //
349 560 : // We want to minimize downtime for `page_service` connections, and trying not to overload
350 560 : // BACKGROUND_RUNTIME by doing initial compactions and initial logical sizes at the same time.
351 560 : //
352 560 : // init_done_rx will notify when all initial load operations have completed.
353 560 : //
354 560 : // background_jobs_can_start (same name used to hold off background jobs from starting at
355 560 : // consumer side) will be dropped once we can start the background jobs. Currently it is behind
356 560 : // completing all initial logical size calculations (init_logical_size_done_rx) and a timeout
357 560 : // (background_task_maximum_delay).
358 560 : let (init_remote_done_tx, init_remote_done_rx) = utils::completion::channel();
359 560 : let (init_done_tx, init_done_rx) = utils::completion::channel();
360 560 :
361 560 : let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel();
362 560 :
363 560 : let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();
364 560 :
365 560 : let order = pageserver::InitializationOrder {
366 560 : initial_tenant_load_remote: Some(init_done_tx),
367 560 : initial_tenant_load: Some(init_remote_done_tx),
368 560 : initial_logical_size_can_start: init_done_rx.clone(),
369 560 : initial_logical_size_attempt: Some(init_logical_size_done_tx),
370 560 : background_jobs_can_start: background_jobs_barrier.clone(),
371 560 : };
372 560 :
373 560 : // Scan the local 'tenants/' directory and start loading the tenants
374 560 : let deletion_queue_client = deletion_queue.new_client();
375 560 : BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
376 560 : conf,
377 560 : TenantSharedResources {
378 560 : broker_client: broker_client.clone(),
379 560 : remote_storage: remote_storage.clone(),
380 560 : deletion_queue_client,
381 560 : },
382 560 : order,
383 560 : shutdown_pageserver.clone(),
384 560 : ))?;
385 :
386 560 : BACKGROUND_RUNTIME.spawn({
387 560 : let init_done_rx = init_done_rx;
388 560 : let shutdown_pageserver = shutdown_pageserver.clone();
389 560 : let drive_init = async move {
390 560 : // NOTE: unlike many futures in pageserver, this one is cancellation-safe
391 560 : let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial load completed"));
392 560 :
393 560 : init_remote_done_rx.wait().await;
394 560 : startup_checkpoint("initial_tenant_load_remote", "Remote part of initial load completed");
395 560 :
396 560 : init_done_rx.wait().await;
397 560 : startup_checkpoint("initial_tenant_load", "Initial load completed");
398 560 : STARTUP_IS_LOADING.set(0);
399 560 :
400 560 : // initial logical sizes can now start, as they were waiting on init_done_rx.
401 560 :
402 560 : scopeguard::ScopeGuard::into_inner(guard);
403 560 :
404 560 : let mut init_sizes_done = std::pin::pin!(init_logical_size_done_rx.wait());
405 560 :
406 560 : let timeout = conf.background_task_maximum_delay;
407 560 :
408 560 : let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed"));
409 :
410 560 : let init_sizes_done = match tokio::time::timeout(timeout, &mut init_sizes_done).await {
411 : Ok(_) => {
412 470 : startup_checkpoint("initial_logical_sizes", "Initial logical sizes completed");
413 470 : None
414 : }
415 : Err(_) => {
416 20 : tracing::info!(
417 20 : timeout_millis = timeout.as_millis(),
418 20 : "Initial logical size timeout elapsed; starting background jobs"
419 20 : );
420 20 : Some(init_sizes_done)
421 : }
422 : };
423 :
424 490 : scopeguard::ScopeGuard::into_inner(guard);
425 490 :
426 490 : // allow background jobs to start
427 490 : drop(background_jobs_can_start);
428 490 : startup_checkpoint("background_jobs_can_start", "Starting background jobs");
429 :
430 490 : if let Some(init_sizes_done) = init_sizes_done {
431 : // ending up here is not a bug; at the latest logical sizes will be queried by
432 : // consumption metrics.
433 20 : let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed"));
434 20 : init_sizes_done.await;
435 :
436 UBC 0 : scopeguard::ScopeGuard::into_inner(guard);
437 0 :
438 0 : startup_checkpoint("initial_logical_sizes", "Initial logical sizes completed after timeout (background jobs already started)");
439 :
440 CBC 470 : }
441 :
442 470 : startup_checkpoint("complete", "Startup complete");
443 560 : };
444 560 :
445 560 : async move {
446 560 : let mut drive_init = std::pin::pin!(drive_init);
447 560 : // just race these tasks
448 835 : tokio::select! {
449 835 : _ = shutdown_pageserver.cancelled() => {},
450 835 : _ = &mut drive_init => {},
451 835 : }
452 560 : }
453 560 : });
454 560 :
455 560 : // shared state between the disk-usage backed eviction background task and the http endpoint
456 560 : // that allows triggering disk-usage based eviction manually. note that the http endpoint
457 560 : // is still accessible even if background task is not configured as long as remote storage has
458 560 : // been configured.
459 560 : let disk_usage_eviction_state: Arc<disk_usage_eviction_task::State> = Arc::default();
460 :
461 560 : if let Some(remote_storage) = &remote_storage {
462 560 : launch_disk_usage_global_eviction_task(
463 560 : conf,
464 560 : remote_storage.clone(),
465 560 : disk_usage_eviction_state.clone(),
466 560 : background_jobs_barrier.clone(),
467 560 : )?;
468 UBC 0 : }
469 :
470 : // Start up the service to handle HTTP mgmt API request. We created the
471 : // listener earlier already.
472 : {
473 CBC 560 : let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
474 :
475 560 : let router_state = Arc::new(
476 560 : http::routes::State::new(
477 560 : conf,
478 560 : http_auth.clone(),
479 560 : remote_storage.clone(),
480 560 : broker_client.clone(),
481 560 : disk_usage_eviction_state,
482 560 : deletion_queue.new_client(),
483 560 : )
484 560 : .context("Failed to initialize router state")?,
485 : );
486 560 : let router = http::make_router(router_state, launch_ts, http_auth.clone())?
487 560 : .build()
488 560 : .map_err(|err| anyhow!(err))?;
489 560 : let service = utils::http::RouterService::new(router).unwrap();
490 560 : let server = hyper::Server::from_tcp(http_listener)?
491 560 : .serve(service)
492 560 : .with_graceful_shutdown(task_mgr::shutdown_watcher());
493 560 :
494 560 : task_mgr::spawn(
495 560 : MGMT_REQUEST_RUNTIME.handle(),
496 560 : TaskKind::HttpEndpointListener,
497 560 : None,
498 560 : None,
499 560 : "http endpoint listener",
500 560 : true,
501 560 : async {
502 2595 : server.await?;
503 144 : Ok(())
504 560 : },
505 560 : );
506 : }
507 :
508 560 : if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
509 6 : let background_jobs_barrier = background_jobs_barrier;
510 6 : let metrics_ctx = RequestContext::todo_child(
511 6 : TaskKind::MetricsCollection,
512 6 : // This task itself shouldn't download anything.
513 6 : // The actual size calculation does need downloads, and
514 6 : // creates a child context with the right DownloadBehavior.
515 6 : DownloadBehavior::Error,
516 6 : );
517 6 :
518 6 : let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");
519 6 :
520 6 : task_mgr::spawn(
521 6 : crate::BACKGROUND_RUNTIME.handle(),
522 6 : TaskKind::MetricsCollection,
523 6 : None,
524 6 : None,
525 6 : "consumption metrics collection",
526 6 : true,
527 6 : async move {
528 6 : // first wait until background jobs are cleared to launch.
529 6 : //
530 6 : // this is because we only process active tenants and timelines, and the
531 6 : // Timeline::get_current_logical_size will spawn the logical size calculation,
532 6 : // which will not be rate-limited.
533 6 : let cancel = task_mgr::shutdown_token();
534 :
535 6 : tokio::select! {
536 9 : _ = cancel.cancelled() => { return Ok(()); },
537 9 : _ = background_jobs_barrier.wait() => {}
538 9 : };
539 :
540 6 : pageserver::consumption_metrics::collect_metrics(
541 6 : metric_collection_endpoint,
542 6 : conf.metric_collection_interval,
543 6 : conf.cached_metric_collection_interval,
544 6 : conf.synthetic_size_calculation_interval,
545 6 : conf.id,
546 6 : local_disk_storage,
547 6 : metrics_ctx,
548 6 : )
549 6 : .instrument(info_span!("metrics_collection"))
550 193 : .await?;
551 3 : Ok(())
552 6 : },
553 6 : );
554 554 : }
555 :
556 : // Spawn a task to listen for libpq connections. It will spawn further tasks
557 : // for each connection. We created the listener earlier already.
558 560 : {
559 560 : let libpq_ctx = RequestContext::todo_child(
560 560 : TaskKind::LibpqEndpointListener,
561 560 : // listener task shouldn't need to download anything. (We will
562 560 : // create a separate sub-contexts for each connection, with their
563 560 : // own download behavior. This context is used only to listen and
564 560 : // accept connections.)
565 560 : DownloadBehavior::Error,
566 560 : );
567 560 : task_mgr::spawn(
568 560 : COMPUTE_REQUEST_RUNTIME.handle(),
569 560 : TaskKind::LibpqEndpointListener,
570 560 : None,
571 560 : None,
572 560 : "libpq endpoint listener",
573 560 : true,
574 560 : async move {
575 560 : page_service::libpq_listener_main(
576 560 : conf,
577 560 : broker_client,
578 560 : pg_auth,
579 560 : pageserver_listener,
580 560 : conf.pg_auth_type,
581 560 : libpq_ctx,
582 560 : task_mgr::shutdown_token(),
583 560 : )
584 5201 : .await
585 560 : },
586 560 : );
587 560 : }
588 560 :
589 560 : let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
590 560 :
591 560 : // All started up! Now just sit and wait for shutdown signal.
592 560 : ShutdownSignals::handle(|signal| match signal {
593 : Signal::Quit => {
594 408 : info!(
595 408 : "Got {}. Terminating in immediate shutdown mode",
596 408 : signal.name()
597 408 : );
598 408 : std::process::exit(111);
599 : }
600 :
601 : Signal::Interrupt | Signal::Terminate => {
602 144 : info!(
603 144 : "Got {}. Terminating gracefully in fast shutdown mode",
604 144 : signal.name()
605 144 : );
606 :
607 : // This cancels the `shutdown_pageserver` cancellation tree.
608 : // Right now that tree doesn't reach very far, and `task_mgr` is used instead.
609 : // The plan is to change that over time.
610 144 : shutdown_pageserver.take();
611 144 : let bg_remote_storage = remote_storage.clone();
612 144 : let bg_deletion_queue = deletion_queue.clone();
613 144 : BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
614 144 : bg_remote_storage.map(|_| bg_deletion_queue),
615 144 : 0,
616 144 : ));
617 144 : unreachable!()
618 : }
619 560 : })
620 560 : }
621 :
622 560 : fn create_remote_storage_client(
623 560 : conf: &'static PageServerConf,
624 560 : ) -> anyhow::Result<Option<GenericRemoteStorage>> {
625 560 : let config = if let Some(config) = &conf.remote_storage_config {
626 560 : config
627 : } else {
628 UBC 0 : tracing::warn!("no remote storage configured, this is a deprecated configuration");
629 0 : return Ok(None);
630 : };
631 :
632 : // Create the client
633 CBC 560 : let mut remote_storage = GenericRemoteStorage::from_config(config)?;
634 :
635 : // If `test_remote_failures` is non-zero, wrap the client with a
636 : // wrapper that simulates failures.
637 560 : if conf.test_remote_failures > 0 {
638 65 : if !cfg!(feature = "testing") {
639 UBC 0 : anyhow::bail!("test_remote_failures option is not available because pageserver was compiled without the 'testing' feature");
640 CBC 65 : }
641 65 : info!(
642 65 : "Simulating remote failures for first {} attempts of each op",
643 65 : conf.test_remote_failures
644 65 : );
645 65 : remote_storage =
646 65 : GenericRemoteStorage::unreliable_wrapper(remote_storage, conf.test_remote_failures);
647 495 : }
648 :
649 560 : Ok(Some(remote_storage))
650 560 : }
651 :
652 1263 : fn cli() -> Command {
653 1263 : Command::new("Neon page server")
654 1263 : .about("Materializes WAL stream to pages and serves them to the postgres")
655 1263 : .version(version())
656 1263 : .arg(
657 1263 : Arg::new("init")
658 1263 : .long("init")
659 1263 : .action(ArgAction::SetTrue)
660 1263 : .help("Initialize pageserver with all given config overrides"),
661 1263 : )
662 1263 : .arg(
663 1263 : Arg::new("workdir")
664 1263 : .short('D')
665 1263 : .long("workdir")
666 1263 : .help("Working directory for the pageserver"),
667 1263 : )
668 1263 : // See `settings.md` for more details on the extra configuration patameters pageserver can process
669 1263 : .arg(
670 1263 : Arg::new("config-override")
671 1263 : .short('c')
672 1263 : .num_args(1)
673 1263 : .action(ArgAction::Append)
674 1263 : .help("Additional configuration overrides of the ones from the toml config file (or new ones to add there). \
675 1263 : Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"),
676 1263 : )
677 1263 : .arg(
678 1263 : Arg::new("update-config")
679 1263 : .long("update-config")
680 1263 : .action(ArgAction::SetTrue)
681 1263 : .help("Update the config file when started"),
682 1263 : )
683 1263 : .arg(
684 1263 : Arg::new("enabled-features")
685 1263 : .long("enabled-features")
686 1263 : .action(ArgAction::SetTrue)
687 1263 : .help("Show enabled compile time features"),
688 1263 : )
689 1263 : }
690 :
691 1 : #[test]
692 1 : fn verify_cli() {
693 1 : cli().debug_assert();
694 1 : }
|