Line data Source code
1 : //!
2 : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
3 : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
4 : //! initialization:
5 : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
6 : //! - Every start is a fresh start, so the data directory is removed and
7 : //! initialized again on each run.
8 : //! - If remote_extension_config is provided, it will be used to fetch extensions list
9 : //! and download `shared_preload_libraries` from the remote storage.
10 : //! - Next it will put configuration files into the `PGDATA` directory.
11 : //! - Sync safekeepers and get commit LSN.
12 : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
13 : //! - Try to start `postgres` and wait until it is ready to accept connections.
14 : //! - Check and alter/drop/create roles and databases.
15 : //! - Hang waiting on the `postmaster` process to exit.
16 : //!
17 : //! Also `compute_ctl` spawns two separate service threads:
18 : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
19 : //! into the shared `ComputeNode`;
20 : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
21 : //! last activity requests.
22 : //!
23 : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
24 : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
25 : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
26 : //! downscaling and requests immediate upscaling under resource pressure.
27 : //!
28 : //! Usage example:
29 : //! ```sh
30 : //! compute_ctl -D /var/db/postgres/compute \
31 : //! -C 'postgresql://cloud_admin@localhost/postgres' \
32 : //! -S /var/db/postgres/specs/current.json \
33 : //! -b /usr/local/bin/postgres \
34 : //! -r http://pg-ext-s3-gateway \
35 : //! ```
36 : use std::collections::HashMap;
37 : use std::ffi::OsString;
38 : use std::fs::File;
39 : use std::path::Path;
40 : use std::process::exit;
41 : use std::str::FromStr;
42 : use std::sync::atomic::Ordering;
43 : use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
44 : use std::time::SystemTime;
45 : use std::{thread, time::Duration};
46 :
47 : use anyhow::{Context, Result};
48 : use chrono::Utc;
49 : use clap::Parser;
50 : use compute_tools::disk_quota::set_disk_quota;
51 : use compute_tools::http::server::Server;
52 : use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
53 : use signal_hook::consts::{SIGQUIT, SIGTERM};
54 : use signal_hook::{consts::SIGINT, iterator::Signals};
55 : use tracing::{error, info, warn};
56 : use url::Url;
57 :
58 : use compute_api::responses::{ComputeCtlConfig, ComputeStatus};
59 : use compute_api::spec::ComputeSpec;
60 :
61 : use compute_tools::compute::{
62 : forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
63 : };
64 : use compute_tools::configurator::launch_configurator;
65 : use compute_tools::extension_server::get_pg_version_string;
66 : use compute_tools::logger::*;
67 : use compute_tools::monitor::launch_monitor;
68 : use compute_tools::params::*;
69 : use compute_tools::spec::*;
70 : use compute_tools::swap::resize_swap;
71 : use rlimit::{setrlimit, Resource};
72 : use utils::failpoint_support;
73 :
74 : // this is an arbitrary build tag. Fine as a default / for testing purposes
75 : // in-case of not-set environment var
76 : const BUILD_TAG_DEFAULT: &str = "latest";
77 :
78 : // Compatibility hack: if the control plane specified any remote-ext-config
79 : // use the default value for extension storage proxy gateway.
80 : // Remove this once the control plane is updated to pass the gateway URL
81 0 : fn parse_remote_ext_config(arg: &str) -> Result<String> {
82 0 : if arg.starts_with("http") {
83 0 : Ok(arg.trim_end_matches('/').to_string())
84 : } else {
85 0 : Ok("http://pg-ext-s3-gateway".to_string())
86 : }
87 0 : }
88 :
89 : /// Generate a compute ID if one is not supplied. This exists to keep forward
90 : /// compatibility tests working, but will be removed in a future iteration.
91 1 : fn generate_compute_id() -> String {
92 1 : let now = SystemTime::now();
93 1 :
94 1 : format!(
95 1 : "compute-{}",
96 1 : now.duration_since(SystemTime::UNIX_EPOCH)
97 1 : .unwrap()
98 1 : .as_secs()
99 1 : )
100 1 : }
101 :
102 : #[derive(Parser)]
103 : #[command(rename_all = "kebab-case")]
104 : struct Cli {
105 : #[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
106 0 : pub pgbin: String,
107 :
108 : #[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
109 : pub remote_ext_config: Option<String>,
110 :
111 : /// The port to bind the external listening HTTP server to. Clients running
112 : /// outside the compute will talk to the compute through this port. Keep
113 : /// the previous name for this argument around for a smoother release
114 : /// with the control plane.
115 : ///
116 : /// TODO: Remove the alias after the control plane release which teaches the
117 : /// control plane about the renamed argument.
118 1 : #[arg(long, alias = "http-port", default_value_t = 3080)]
119 0 : pub external_http_port: u16,
120 :
121 : /// The port to bind the internal listening HTTP server to. Clients like
122 : /// the neon extension (for installing remote extensions) and local_proxy.
123 : #[arg(long)]
124 : pub internal_http_port: Option<u16>,
125 :
126 : #[arg(short = 'D', long, value_name = "DATADIR")]
127 0 : pub pgdata: String,
128 :
129 : #[arg(short = 'C', long, value_name = "DATABASE_URL")]
130 0 : pub connstr: String,
131 :
132 : #[cfg(target_os = "linux")]
133 : #[arg(long, default_value = "neon-postgres")]
134 0 : pub cgroup: String,
135 :
136 : #[cfg(target_os = "linux")]
137 : #[arg(
138 : long,
139 : default_value = "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor"
140 : )]
141 0 : pub filecache_connstr: String,
142 :
143 : #[cfg(target_os = "linux")]
144 : #[arg(long, default_value = "0.0.0.0:10301")]
145 0 : pub vm_monitor_addr: String,
146 :
147 : #[arg(long, action = clap::ArgAction::SetTrue)]
148 0 : pub resize_swap_on_bind: bool,
149 :
150 : #[arg(long)]
151 : pub set_disk_quota_for_fs: Option<String>,
152 :
153 : #[arg(short = 's', long = "spec", group = "spec")]
154 : pub spec_json: Option<String>,
155 :
156 : #[arg(short = 'S', long, group = "spec-path")]
157 : pub spec_path: Option<OsString>,
158 :
159 : #[arg(short = 'i', long, group = "compute-id", default_value = generate_compute_id())]
160 0 : pub compute_id: String,
161 :
162 : #[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
163 : pub control_plane_uri: Option<String>,
164 : }
165 :
166 0 : fn main() -> Result<()> {
167 0 : let cli = Cli::parse();
168 :
169 : // For historical reasons, the main thread that processes the spec and launches postgres
170 : // is synchronous, but we always have this tokio runtime available and we "enter" it so
171 : // that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
172 : // from all parts of compute_ctl.
173 0 : let runtime = tokio::runtime::Builder::new_multi_thread()
174 0 : .enable_all()
175 0 : .build()?;
176 0 : let _rt_guard = runtime.enter();
177 :
178 0 : let build_tag = runtime.block_on(init())?;
179 :
180 0 : let scenario = failpoint_support::init();
181 0 :
182 0 : // enable core dumping for all child processes
183 0 : setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
184 :
185 0 : let (pg_handle, start_pg_result) = {
186 : // Enter startup tracing context
187 0 : let _startup_context_guard = startup_context_from_env();
188 :
189 0 : let cli_spec = try_spec_from_cli(&cli)?;
190 :
191 0 : let compute = wait_spec(build_tag, &cli, cli_spec)?;
192 :
193 0 : start_postgres(&cli, compute)?
194 :
195 : // Startup is finished, exit the startup tracing span
196 : };
197 :
198 : // PostgreSQL is now running, if startup was successful. Wait until it exits.
199 0 : let wait_pg_result = wait_postgres(pg_handle)?;
200 :
201 0 : let delay_exit = cleanup_after_postgres_exit(start_pg_result)?;
202 :
203 0 : maybe_delay_exit(delay_exit);
204 0 :
205 0 : scenario.teardown();
206 0 :
207 0 : deinit_and_exit(wait_pg_result);
208 0 : }
209 :
210 0 : async fn init() -> Result<String> {
211 0 : init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
212 :
213 0 : let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
214 0 : thread::spawn(move || {
215 0 : for sig in signals.forever() {
216 0 : handle_exit_signal(sig);
217 0 : }
218 0 : });
219 0 :
220 0 : let build_tag = option_env!("BUILD_TAG")
221 0 : .unwrap_or(BUILD_TAG_DEFAULT)
222 0 : .to_string();
223 0 : info!("build_tag: {build_tag}");
224 :
225 0 : Ok(build_tag)
226 0 : }
227 :
228 0 : fn startup_context_from_env() -> Option<opentelemetry::ContextGuard> {
229 0 : // Extract OpenTelemetry context for the startup actions from the
230 0 : // TRACEPARENT and TRACESTATE env variables, and attach it to the current
231 0 : // tracing context.
232 0 : //
233 0 : // This is used to propagate the context for the 'start_compute' operation
234 0 : // from the neon control plane. This allows linking together the wider
235 0 : // 'start_compute' operation that creates the compute container, with the
236 0 : // startup actions here within the container.
237 0 : //
238 0 : // There is no standard for passing context in env variables, but a lot of
239 0 : // tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
240 0 : // https://github.com/open-telemetry/opentelemetry-specification/issues/740
241 0 : //
242 0 : // Switch to the startup context here, and exit it once the startup has
243 0 : // completed and Postgres is up and running.
244 0 : //
245 0 : // If this pod is pre-created without binding it to any particular endpoint
246 0 : // yet, this isn't the right place to enter the startup context. In that
247 0 : // case, the control plane should pass the tracing context as part of the
248 0 : // /configure API call.
249 0 : //
250 0 : // NOTE: This is supposed to only cover the *startup* actions. Once
251 0 : // postgres is configured and up-and-running, we exit this span. Any other
252 0 : // actions that are performed on incoming HTTP requests, for example, are
253 0 : // performed in separate spans.
254 0 : //
255 0 : // XXX: If the pod is restarted, we perform the startup actions in the same
256 0 : // context as the original startup actions, which probably doesn't make
257 0 : // sense.
258 0 : let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
259 0 : if let Ok(val) = std::env::var("TRACEPARENT") {
260 0 : startup_tracing_carrier.insert("traceparent".to_string(), val);
261 0 : }
262 0 : if let Ok(val) = std::env::var("TRACESTATE") {
263 0 : startup_tracing_carrier.insert("tracestate".to_string(), val);
264 0 : }
265 0 : if !startup_tracing_carrier.is_empty() {
266 : use opentelemetry::propagation::TextMapPropagator;
267 : use opentelemetry_sdk::propagation::TraceContextPropagator;
268 0 : let guard = TraceContextPropagator::new()
269 0 : .extract(&startup_tracing_carrier)
270 0 : .attach();
271 0 : info!("startup tracing context attached");
272 0 : Some(guard)
273 : } else {
274 0 : None
275 : }
276 0 : }
277 :
278 0 : fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
279 : // First, try to get cluster spec from the cli argument
280 0 : if let Some(ref spec_json) = cli.spec_json {
281 0 : info!("got spec from cli argument {}", spec_json);
282 : return Ok(CliSpecParams {
283 0 : spec: Some(serde_json::from_str(spec_json)?),
284 0 : compute_ctl_config: ComputeCtlConfig::default(),
285 : live_config_allowed: false,
286 : });
287 0 : }
288 :
289 : // Second, try to read it from the file if path is provided
290 0 : if let Some(ref spec_path) = cli.spec_path {
291 0 : let file = File::open(Path::new(spec_path))?;
292 : return Ok(CliSpecParams {
293 0 : spec: Some(serde_json::from_reader(file)?),
294 0 : compute_ctl_config: ComputeCtlConfig::default(),
295 : live_config_allowed: true,
296 : });
297 0 : }
298 0 :
299 0 : if cli.control_plane_uri.is_none() {
300 0 : panic!("must specify --control-plane-uri");
301 0 : };
302 0 :
303 0 : match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
304 0 : Ok(resp) => Ok(CliSpecParams {
305 0 : spec: resp.0,
306 0 : compute_ctl_config: resp.1,
307 0 : live_config_allowed: true,
308 0 : }),
309 0 : Err(e) => {
310 0 : error!(
311 0 : "cannot get response from control plane: {}\n\
312 0 : neither spec nor confirmation that compute is in the Empty state was received",
313 : e
314 : );
315 0 : Err(e)
316 : }
317 : }
318 0 : }
319 :
320 : struct CliSpecParams {
321 : /// If a spec was provided via CLI or file, the [`ComputeSpec`]
322 : spec: Option<ComputeSpec>,
323 : #[allow(dead_code)]
324 : compute_ctl_config: ComputeCtlConfig,
325 : live_config_allowed: bool,
326 : }
327 :
328 0 : fn wait_spec(
329 0 : build_tag: String,
330 0 : cli: &Cli,
331 0 : CliSpecParams {
332 0 : spec,
333 0 : live_config_allowed,
334 0 : compute_ctl_config: _,
335 0 : }: CliSpecParams,
336 0 : ) -> Result<Arc<ComputeNode>> {
337 0 : let mut new_state = ComputeState::new();
338 : let spec_set;
339 :
340 0 : if let Some(spec) = spec {
341 0 : let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
342 0 : info!("new pspec.spec: {:?}", pspec.spec);
343 0 : new_state.pspec = Some(pspec);
344 0 : spec_set = true;
345 0 : } else {
346 0 : spec_set = false;
347 0 : }
348 0 : let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
349 0 : let conn_conf = postgres::config::Config::from_str(connstr.as_str())
350 0 : .context("cannot build postgres config from connstr")?;
351 0 : let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str())
352 0 : .context("cannot build tokio postgres config from connstr")?;
353 0 : let compute_node = ComputeNode {
354 0 : compute_id: cli.compute_id.clone(),
355 0 : connstr,
356 0 : conn_conf,
357 0 : tokio_conn_conf,
358 0 : pgdata: cli.pgdata.clone(),
359 0 : pgbin: cli.pgbin.clone(),
360 0 : pgversion: get_pg_version_string(&cli.pgbin),
361 0 : external_http_port: cli.external_http_port,
362 0 : internal_http_port: cli.internal_http_port.unwrap_or(cli.external_http_port + 1),
363 0 : live_config_allowed,
364 0 : state: Mutex::new(new_state),
365 0 : state_changed: Condvar::new(),
366 0 : ext_remote_storage: cli.remote_ext_config.clone(),
367 0 : ext_download_progress: RwLock::new(HashMap::new()),
368 0 : build_tag,
369 0 : };
370 0 : let compute = Arc::new(compute_node);
371 0 :
372 0 : // If this is a pooled VM, prewarm before starting HTTP server and becoming
373 0 : // available for binding. Prewarming helps Postgres start quicker later,
374 0 : // because QEMU will already have its memory allocated from the host, and
375 0 : // the necessary binaries will already be cached.
376 0 : if !spec_set {
377 0 : compute.prewarm_postgres()?;
378 0 : }
379 :
380 : // Launch the external HTTP server first, so that we can serve control plane
381 : // requests while configuration is still in progress.
382 0 : Server::External(cli.external_http_port).launch(&compute);
383 0 :
384 0 : // The internal HTTP server could be launched later, but there isn't much
385 0 : // sense in waiting.
386 0 : Server::Internal(cli.internal_http_port.unwrap_or(cli.external_http_port + 1)).launch(&compute);
387 0 :
388 0 : if !spec_set {
389 : // No spec provided, hang waiting for it.
390 0 : info!("no compute spec provided, waiting");
391 :
392 0 : let mut state = compute.state.lock().unwrap();
393 0 : while state.status != ComputeStatus::ConfigurationPending {
394 0 : state = compute.state_changed.wait(state).unwrap();
395 0 :
396 0 : if state.status == ComputeStatus::ConfigurationPending {
397 0 : info!("got spec, continue configuration");
398 : // Spec is already set by the http server handler.
399 0 : break;
400 0 : }
401 : }
402 :
403 : // Record for how long we slept waiting for the spec.
404 0 : let now = Utc::now();
405 0 : state.metrics.wait_for_spec_ms = now
406 0 : .signed_duration_since(state.start_time)
407 0 : .to_std()
408 0 : .unwrap()
409 0 : .as_millis() as u64;
410 0 :
411 0 : // Reset start time, so that the total startup time that is calculated later will
412 0 : // not include the time that we waited for the spec.
413 0 : state.start_time = now;
414 0 : }
415 :
416 0 : launch_lsn_lease_bg_task_for_static(&compute);
417 0 :
418 0 : Ok(compute)
419 0 : }
420 :
421 0 : fn start_postgres(
422 0 : cli: &Cli,
423 0 : compute: Arc<ComputeNode>,
424 0 : ) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
425 0 : // We got all we need, update the state.
426 0 : let mut state = compute.state.lock().unwrap();
427 0 : state.set_status(ComputeStatus::Init, &compute.state_changed);
428 0 :
429 0 : info!(
430 0 : "running compute with features: {:?}",
431 0 : state.pspec.as_ref().unwrap().spec.features
432 : );
433 : // before we release the mutex, fetch some parameters for later.
434 : let &ComputeSpec {
435 0 : swap_size_bytes,
436 0 : disk_quota_bytes,
437 0 : #[cfg(target_os = "linux")]
438 0 : disable_lfc_resizing,
439 0 : ..
440 0 : } = &state.pspec.as_ref().unwrap().spec;
441 0 : drop(state);
442 0 :
443 0 : // Launch remaining service threads
444 0 : let _monitor_handle = launch_monitor(&compute);
445 0 : let _configurator_handle = launch_configurator(&compute);
446 0 :
447 0 : let mut prestartup_failed = false;
448 0 : let mut delay_exit = false;
449 :
450 : // Resize swap to the desired size if the compute spec says so
451 0 : if let (Some(size_bytes), true) = (swap_size_bytes, cli.resize_swap_on_bind) {
452 : // To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
453 : // *before* starting postgres.
454 : //
455 : // In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
456 : // carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
457 : // OOM-killed during startup because swap wasn't available yet.
458 0 : match resize_swap(size_bytes) {
459 : Ok(()) => {
460 0 : let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
461 0 : info!(%size_bytes, %size_mib, "resized swap");
462 : }
463 0 : Err(err) => {
464 0 : let err = err.context("failed to resize swap");
465 0 : error!("{err:#}");
466 :
467 : // Mark compute startup as failed; don't try to start postgres, and report this
468 : // error to the control plane when it next asks.
469 0 : prestartup_failed = true;
470 0 : compute.set_failed_status(err);
471 0 : delay_exit = true;
472 : }
473 : }
474 0 : }
475 :
476 : // Set disk quota if the compute spec says so
477 0 : if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
478 0 : (disk_quota_bytes, cli.set_disk_quota_for_fs.as_ref())
479 : {
480 0 : match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) {
481 : Ok(()) => {
482 0 : let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
483 0 : info!(%disk_quota_bytes, %size_mib, "set disk quota");
484 : }
485 0 : Err(err) => {
486 0 : let err = err.context("failed to set disk quota");
487 0 : error!("{err:#}");
488 :
489 : // Mark compute startup as failed; don't try to start postgres, and report this
490 : // error to the control plane when it next asks.
491 0 : prestartup_failed = true;
492 0 : compute.set_failed_status(err);
493 0 : delay_exit = true;
494 : }
495 : }
496 0 : }
497 :
498 : // Start Postgres
499 0 : let mut pg = None;
500 0 : if !prestartup_failed {
501 0 : pg = match compute.start_compute() {
502 0 : Ok(pg) => {
503 0 : info!(postmaster_pid = %pg.0.id(), "Postgres was started");
504 0 : Some(pg)
505 : }
506 0 : Err(err) => {
507 0 : error!("could not start the compute node: {:#}", err);
508 0 : compute.set_failed_status(err);
509 0 : delay_exit = true;
510 0 : None
511 : }
512 : };
513 : } else {
514 0 : warn!("skipping postgres startup because pre-startup step failed");
515 : }
516 :
517 : // Start the vm-monitor if directed to. The vm-monitor only runs on linux
518 : // because it requires cgroups.
519 : cfg_if::cfg_if! {
520 : if #[cfg(target_os = "linux")] {
521 : use std::env;
522 : use tokio_util::sync::CancellationToken;
523 :
524 : // This token is used internally by the monitor to clean up all threads
525 0 : let token = CancellationToken::new();
526 :
527 : // don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
528 0 : let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
529 0 : None
530 : } else {
531 0 : Some(cli.filecache_connstr.clone())
532 : };
533 :
534 0 : let vm_monitor = if env::var_os("AUTOSCALING").is_some() {
535 0 : let vm_monitor = tokio::spawn(vm_monitor::start(
536 0 : Box::leak(Box::new(vm_monitor::Args {
537 0 : cgroup: Some(cli.cgroup.clone()),
538 0 : pgconnstr,
539 0 : addr: cli.vm_monitor_addr.clone(),
540 0 : })),
541 0 : token.clone(),
542 0 : ));
543 0 : Some(vm_monitor)
544 : } else {
545 0 : None
546 : };
547 : }
548 : }
549 :
550 0 : Ok((
551 0 : pg,
552 0 : StartPostgresResult {
553 0 : delay_exit,
554 0 : compute,
555 0 : #[cfg(target_os = "linux")]
556 0 : token,
557 0 : #[cfg(target_os = "linux")]
558 0 : vm_monitor,
559 0 : },
560 0 : ))
561 0 : }
562 :
563 : type PostgresHandle = (std::process::Child, tokio::task::JoinHandle<Result<()>>);
564 :
565 : struct StartPostgresResult {
566 : delay_exit: bool,
567 : // passed through from WaitSpecResult
568 : compute: Arc<ComputeNode>,
569 :
570 : #[cfg(target_os = "linux")]
571 : token: tokio_util::sync::CancellationToken,
572 : #[cfg(target_os = "linux")]
573 : vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
574 : }
575 :
576 0 : fn wait_postgres(pg: Option<PostgresHandle>) -> Result<WaitPostgresResult> {
577 0 : // Wait for the child Postgres process forever. In this state Ctrl+C will
578 0 : // propagate to Postgres and it will be shut down as well.
579 0 : let mut exit_code = None;
580 0 : if let Some((mut pg, logs_handle)) = pg {
581 0 : info!(postmaster_pid = %pg.id(), "Waiting for Postgres to exit");
582 :
583 0 : let ecode = pg
584 0 : .wait()
585 0 : .expect("failed to start waiting on Postgres process");
586 0 : PG_PID.store(0, Ordering::SeqCst);
587 0 :
588 0 : // Process has exited. Wait for the log collecting task to finish.
589 0 : let _ = tokio::runtime::Handle::current()
590 0 : .block_on(logs_handle)
591 0 : .map_err(|e| tracing::error!("log task panicked: {:?}", e));
592 0 :
593 0 : info!("Postgres exited with code {}, shutting down", ecode);
594 0 : exit_code = ecode.code()
595 0 : }
596 :
597 0 : Ok(WaitPostgresResult { exit_code })
598 0 : }
599 :
600 : struct WaitPostgresResult {
601 : exit_code: Option<i32>,
602 : }
603 :
604 0 : fn cleanup_after_postgres_exit(
605 0 : StartPostgresResult {
606 0 : mut delay_exit,
607 0 : compute,
608 0 : #[cfg(target_os = "linux")]
609 0 : vm_monitor,
610 0 : #[cfg(target_os = "linux")]
611 0 : token,
612 0 : }: StartPostgresResult,
613 0 : ) -> Result<bool> {
614 : // Terminate the vm_monitor so it releases the file watcher on
615 : // /sys/fs/cgroup/neon-postgres.
616 : // Note: the vm-monitor only runs on linux because it requires cgroups.
617 : cfg_if::cfg_if! {
618 : if #[cfg(target_os = "linux")] {
619 0 : if let Some(handle) = vm_monitor {
620 0 : // Kills all threads spawned by the monitor
621 0 : token.cancel();
622 0 : // Kills the actual task running the monitor
623 0 : handle.abort();
624 0 : }
625 : }
626 : }
627 :
628 : // Maybe sync safekeepers again, to speed up next startup
629 0 : let compute_state = compute.state.lock().unwrap().clone();
630 0 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
631 0 : if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
632 0 : info!("syncing safekeepers on shutdown");
633 0 : let storage_auth_token = pspec.storage_auth_token.clone();
634 0 : let lsn = compute.sync_safekeepers(storage_auth_token)?;
635 0 : info!("synced safekeepers at lsn {lsn}");
636 0 : }
637 :
638 0 : let mut state = compute.state.lock().unwrap();
639 0 : if state.status == ComputeStatus::TerminationPending {
640 0 : state.status = ComputeStatus::Terminated;
641 0 : compute.state_changed.notify_all();
642 0 : // we were asked to terminate gracefully, don't exit to avoid restart
643 0 : delay_exit = true
644 0 : }
645 0 : drop(state);
646 :
647 0 : if let Err(err) = compute.check_for_core_dumps() {
648 0 : error!("error while checking for core dumps: {err:?}");
649 0 : }
650 :
651 0 : Ok(delay_exit)
652 0 : }
653 :
654 0 : fn maybe_delay_exit(delay_exit: bool) {
655 0 : // If launch failed, keep serving HTTP requests for a while, so the cloud
656 0 : // control plane can get the actual error.
657 0 : if delay_exit {
658 0 : info!("giving control plane 30s to collect the error before shutdown");
659 0 : thread::sleep(Duration::from_secs(30));
660 0 : }
661 0 : }
662 :
663 0 : fn deinit_and_exit(WaitPostgresResult { exit_code }: WaitPostgresResult) -> ! {
664 0 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
665 0 : // pending traces before we exit. Shutting down OTEL tracing provider may
666 0 : // hang for quite some time, see, for example:
667 0 : // - https://github.com/open-telemetry/opentelemetry-rust/issues/868
668 0 : // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
669 0 : //
670 0 : // Yet, we want computes to shut down fast enough, as we may need a new one
671 0 : // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
672 0 : // complete, then just error out and exit the main thread.
673 0 : info!("shutting down tracing");
674 0 : let (sender, receiver) = mpsc::channel();
675 0 : let _ = thread::spawn(move || {
676 0 : tracing_utils::shutdown_tracing();
677 0 : sender.send(()).ok()
678 0 : });
679 0 : let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
680 0 : if shutdown_res.is_err() {
681 0 : error!("timed out while shutting down tracing, exiting anyway");
682 0 : }
683 :
684 0 : info!("shutting down");
685 0 : exit(exit_code.unwrap_or(1))
686 : }
687 :
688 : /// When compute_ctl is killed, send also termination signal to sync-safekeepers
689 : /// to prevent leakage. TODO: it is better to convert compute_ctl to async and
690 : /// wait for termination which would be easy then.
691 0 : fn handle_exit_signal(sig: i32) {
692 0 : info!("received {sig} termination signal");
693 0 : forward_termination_signal();
694 0 : exit(1);
695 : }
696 :
697 : #[cfg(test)]
698 : mod test {
699 : use clap::CommandFactory;
700 :
701 : use super::Cli;
702 :
703 : #[test]
704 1 : fn verify_cli() {
705 1 : Cli::command().debug_assert()
706 1 : }
707 : }
|