LCOV - code coverage report
Current view: top level - compute_tools/src/bin - compute_ctl.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 3.5 % 399 14
Test Date: 2025-02-20 13:11:02 Functions: 7.7 % 39 3

            Line data    Source code
       1              : //!
       2              : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
       3              : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
       4              : //! initialization:
       5              : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
       6              : //! - Every start is a fresh start, so the data directory is removed and
       7              : //!   initialized again on each run.
       8              : //! - If remote_extension_config is provided, it will be used to fetch extensions list
       9              : //!   and download `shared_preload_libraries` from the remote storage.
      10              : //! - Next it will put configuration files into the `PGDATA` directory.
      11              : //! - Sync safekeepers and get commit LSN.
      12              : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
      13              : //! - Try to start `postgres` and wait until it is ready to accept connections.
      14              : //! - Check and alter/drop/create roles and databases.
      15              : //! - Hang waiting on the `postmaster` process to exit.
      16              : //!
      17              : //! Also `compute_ctl` spawns two separate service threads:
      18              : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
      19              : //!   into the shared `ComputeNode`;
      20              : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
      21              : //!   last activity requests.
      22              : //!
      23              : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
      24              : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
      25              : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
      26              : //! downscaling and requests immediate upscaling under resource pressure.
      27              : //!
      28              : //! Usage example:
      29              : //! ```sh
      30              : //! compute_ctl -D /var/db/postgres/compute \
      31              : //!             -C 'postgresql://cloud_admin@localhost/postgres' \
      32              : //!             -S /var/db/postgres/specs/current.json \
      33              : //!             -b /usr/local/bin/postgres \
      34              : //!             -r http://pg-ext-s3-gateway \
      35              : //! ```
      36              : use std::collections::HashMap;
      37              : use std::ffi::OsString;
      38              : use std::fs::File;
      39              : use std::path::Path;
      40              : use std::process::exit;
      41              : use std::str::FromStr;
      42              : use std::sync::atomic::Ordering;
      43              : use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
      44              : use std::time::SystemTime;
      45              : use std::{thread, time::Duration};
      46              : 
      47              : use anyhow::{Context, Result};
      48              : use chrono::Utc;
      49              : use clap::Parser;
      50              : use compute_tools::disk_quota::set_disk_quota;
      51              : use compute_tools::http::server::Server;
      52              : use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
      53              : use signal_hook::consts::{SIGQUIT, SIGTERM};
      54              : use signal_hook::{consts::SIGINT, iterator::Signals};
      55              : use tracing::{error, info, warn};
      56              : use url::Url;
      57              : 
      58              : use compute_api::responses::{ComputeCtlConfig, ComputeStatus};
      59              : use compute_api::spec::ComputeSpec;
      60              : 
      61              : use compute_tools::compute::{
      62              :     forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
      63              : };
      64              : use compute_tools::configurator::launch_configurator;
      65              : use compute_tools::extension_server::get_pg_version_string;
      66              : use compute_tools::logger::*;
      67              : use compute_tools::monitor::launch_monitor;
      68              : use compute_tools::params::*;
      69              : use compute_tools::spec::*;
      70              : use compute_tools::swap::resize_swap;
      71              : use rlimit::{setrlimit, Resource};
      72              : use utils::failpoint_support;
      73              : 
      74              : // this is an arbitrary build tag. Fine as a default / for testing purposes
      75              : // in-case of not-set environment var
      76              : const BUILD_TAG_DEFAULT: &str = "latest";
      77              : 
      78              : // Compatibility hack: if the control plane specified any remote-ext-config
      79              : // use the default value for extension storage proxy gateway.
      80              : // Remove this once the control plane is updated to pass the gateway URL
      81            0 : fn parse_remote_ext_config(arg: &str) -> Result<String> {
      82            0 :     if arg.starts_with("http") {
      83            0 :         Ok(arg.trim_end_matches('/').to_string())
      84              :     } else {
      85            0 :         Ok("http://pg-ext-s3-gateway".to_string())
      86              :     }
      87            0 : }
      88              : 
      89              : /// Generate a compute ID if one is not supplied. This exists to keep forward
      90              : /// compatibility tests working, but will be removed in a future iteration.
      91            1 : fn generate_compute_id() -> String {
      92            1 :     let now = SystemTime::now();
      93            1 : 
      94            1 :     format!(
      95            1 :         "compute-{}",
      96            1 :         now.duration_since(SystemTime::UNIX_EPOCH)
      97            1 :             .unwrap()
      98            1 :             .as_secs()
      99            1 :     )
     100            1 : }
     101              : 
     102              : #[derive(Parser)]
     103              : #[command(rename_all = "kebab-case")]
     104              : struct Cli {
     105              :     #[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
     106            0 :     pub pgbin: String,
     107              : 
     108              :     #[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
     109              :     pub remote_ext_config: Option<String>,
     110              : 
     111              :     /// The port to bind the external listening HTTP server to. Clients running
     112              :     /// outside the compute will talk to the compute through this port. Keep
     113              :     /// the previous name for this argument around for a smoother release
     114              :     /// with the control plane.
     115              :     ///
     116              :     /// TODO: Remove the alias after the control plane release which teaches the
     117              :     /// control plane about the renamed argument.
     118            1 :     #[arg(long, alias = "http-port", default_value_t = 3080)]
     119            0 :     pub external_http_port: u16,
     120              : 
     121              :     /// The port to bind the internal listening HTTP server to. Clients like
     122              :     /// the neon extension (for installing remote extensions) and local_proxy.
     123              :     #[arg(long)]
     124              :     pub internal_http_port: Option<u16>,
     125              : 
     126              :     #[arg(short = 'D', long, value_name = "DATADIR")]
     127            0 :     pub pgdata: String,
     128              : 
     129              :     #[arg(short = 'C', long, value_name = "DATABASE_URL")]
     130            0 :     pub connstr: String,
     131              : 
     132              :     #[cfg(target_os = "linux")]
     133              :     #[arg(long, default_value = "neon-postgres")]
     134            0 :     pub cgroup: String,
     135              : 
     136              :     #[cfg(target_os = "linux")]
     137              :     #[arg(
     138              :         long,
     139              :         default_value = "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor"
     140              :     )]
     141            0 :     pub filecache_connstr: String,
     142              : 
     143              :     #[cfg(target_os = "linux")]
     144              :     #[arg(long, default_value = "0.0.0.0:10301")]
     145            0 :     pub vm_monitor_addr: String,
     146              : 
     147              :     #[arg(long, action = clap::ArgAction::SetTrue)]
     148            0 :     pub resize_swap_on_bind: bool,
     149              : 
     150              :     #[arg(long)]
     151              :     pub set_disk_quota_for_fs: Option<String>,
     152              : 
     153              :     #[arg(short = 's', long = "spec", group = "spec")]
     154              :     pub spec_json: Option<String>,
     155              : 
     156              :     #[arg(short = 'S', long, group = "spec-path")]
     157              :     pub spec_path: Option<OsString>,
     158              : 
     159              :     #[arg(short = 'i', long, group = "compute-id", default_value = generate_compute_id())]
     160            0 :     pub compute_id: String,
     161              : 
     162              :     #[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
     163              :     pub control_plane_uri: Option<String>,
     164              : }
     165              : 
     166            0 : fn main() -> Result<()> {
     167            0 :     let cli = Cli::parse();
     168              : 
     169              :     // For historical reasons, the main thread that processes the spec and launches postgres
     170              :     // is synchronous, but we always have this tokio runtime available and we "enter" it so
     171              :     // that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
     172              :     // from all parts of compute_ctl.
     173            0 :     let runtime = tokio::runtime::Builder::new_multi_thread()
     174            0 :         .enable_all()
     175            0 :         .build()?;
     176            0 :     let _rt_guard = runtime.enter();
     177              : 
     178            0 :     let build_tag = runtime.block_on(init())?;
     179              : 
     180            0 :     let scenario = failpoint_support::init();
     181            0 : 
     182            0 :     // enable core dumping for all child processes
     183            0 :     setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
     184              : 
     185            0 :     let (pg_handle, start_pg_result) = {
     186              :         // Enter startup tracing context
     187            0 :         let _startup_context_guard = startup_context_from_env();
     188              : 
     189            0 :         let cli_spec = try_spec_from_cli(&cli)?;
     190              : 
     191            0 :         let compute = wait_spec(build_tag, &cli, cli_spec)?;
     192              : 
     193            0 :         start_postgres(&cli, compute)?
     194              : 
     195              :         // Startup is finished, exit the startup tracing span
     196              :     };
     197              : 
     198              :     // PostgreSQL is now running, if startup was successful. Wait until it exits.
     199            0 :     let wait_pg_result = wait_postgres(pg_handle)?;
     200              : 
     201            0 :     let delay_exit = cleanup_after_postgres_exit(start_pg_result)?;
     202              : 
     203            0 :     maybe_delay_exit(delay_exit);
     204            0 : 
     205            0 :     scenario.teardown();
     206            0 : 
     207            0 :     deinit_and_exit(wait_pg_result);
     208            0 : }
     209              : 
     210            0 : async fn init() -> Result<String> {
     211            0 :     init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
     212              : 
     213            0 :     let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
     214            0 :     thread::spawn(move || {
     215            0 :         for sig in signals.forever() {
     216            0 :             handle_exit_signal(sig);
     217            0 :         }
     218            0 :     });
     219            0 : 
     220            0 :     let build_tag = option_env!("BUILD_TAG")
     221            0 :         .unwrap_or(BUILD_TAG_DEFAULT)
     222            0 :         .to_string();
     223            0 :     info!("build_tag: {build_tag}");
     224              : 
     225            0 :     Ok(build_tag)
     226            0 : }
     227              : 
     228            0 : fn startup_context_from_env() -> Option<opentelemetry::ContextGuard> {
     229            0 :     // Extract OpenTelemetry context for the startup actions from the
     230            0 :     // TRACEPARENT and TRACESTATE env variables, and attach it to the current
     231            0 :     // tracing context.
     232            0 :     //
     233            0 :     // This is used to propagate the context for the 'start_compute' operation
     234            0 :     // from the neon control plane. This allows linking together the wider
     235            0 :     // 'start_compute' operation that creates the compute container, with the
     236            0 :     // startup actions here within the container.
     237            0 :     //
     238            0 :     // There is no standard for passing context in env variables, but a lot of
     239            0 :     // tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
     240            0 :     // https://github.com/open-telemetry/opentelemetry-specification/issues/740
     241            0 :     //
     242            0 :     // Switch to the startup context here, and exit it once the startup has
     243            0 :     // completed and Postgres is up and running.
     244            0 :     //
     245            0 :     // If this pod is pre-created without binding it to any particular endpoint
     246            0 :     // yet, this isn't the right place to enter the startup context. In that
     247            0 :     // case, the control plane should pass the tracing context as part of the
     248            0 :     // /configure API call.
     249            0 :     //
     250            0 :     // NOTE: This is supposed to only cover the *startup* actions. Once
     251            0 :     // postgres is configured and up-and-running, we exit this span. Any other
     252            0 :     // actions that are performed on incoming HTTP requests, for example, are
     253            0 :     // performed in separate spans.
     254            0 :     //
     255            0 :     // XXX: If the pod is restarted, we perform the startup actions in the same
     256            0 :     // context as the original startup actions, which probably doesn't make
     257            0 :     // sense.
     258            0 :     let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
     259            0 :     if let Ok(val) = std::env::var("TRACEPARENT") {
     260            0 :         startup_tracing_carrier.insert("traceparent".to_string(), val);
     261            0 :     }
     262            0 :     if let Ok(val) = std::env::var("TRACESTATE") {
     263            0 :         startup_tracing_carrier.insert("tracestate".to_string(), val);
     264            0 :     }
     265            0 :     if !startup_tracing_carrier.is_empty() {
     266              :         use opentelemetry::propagation::TextMapPropagator;
     267              :         use opentelemetry_sdk::propagation::TraceContextPropagator;
     268            0 :         let guard = TraceContextPropagator::new()
     269            0 :             .extract(&startup_tracing_carrier)
     270            0 :             .attach();
     271            0 :         info!("startup tracing context attached");
     272            0 :         Some(guard)
     273              :     } else {
     274            0 :         None
     275              :     }
     276            0 : }
     277              : 
     278            0 : fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
     279              :     // First, try to get cluster spec from the cli argument
     280            0 :     if let Some(ref spec_json) = cli.spec_json {
     281            0 :         info!("got spec from cli argument {}", spec_json);
     282              :         return Ok(CliSpecParams {
     283            0 :             spec: Some(serde_json::from_str(spec_json)?),
     284            0 :             compute_ctl_config: ComputeCtlConfig::default(),
     285              :             live_config_allowed: false,
     286              :         });
     287            0 :     }
     288              : 
     289              :     // Second, try to read it from the file if path is provided
     290            0 :     if let Some(ref spec_path) = cli.spec_path {
     291            0 :         let file = File::open(Path::new(spec_path))?;
     292              :         return Ok(CliSpecParams {
     293            0 :             spec: Some(serde_json::from_reader(file)?),
     294            0 :             compute_ctl_config: ComputeCtlConfig::default(),
     295              :             live_config_allowed: true,
     296              :         });
     297            0 :     }
     298            0 : 
     299            0 :     if cli.control_plane_uri.is_none() {
     300            0 :         panic!("must specify --control-plane-uri");
     301            0 :     };
     302            0 : 
     303            0 :     match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
     304            0 :         Ok(resp) => Ok(CliSpecParams {
     305            0 :             spec: resp.0,
     306            0 :             compute_ctl_config: resp.1,
     307            0 :             live_config_allowed: true,
     308            0 :         }),
     309            0 :         Err(e) => {
     310            0 :             error!(
     311            0 :                 "cannot get response from control plane: {}\n\
     312            0 :                 neither spec nor confirmation that compute is in the Empty state was received",
     313              :                 e
     314              :             );
     315            0 :             Err(e)
     316              :         }
     317              :     }
     318            0 : }
     319              : 
     320              : struct CliSpecParams {
     321              :     /// If a spec was provided via CLI or file, the [`ComputeSpec`]
     322              :     spec: Option<ComputeSpec>,
     323              :     #[allow(dead_code)]
     324              :     compute_ctl_config: ComputeCtlConfig,
     325              :     live_config_allowed: bool,
     326              : }
     327              : 
     328            0 : fn wait_spec(
     329            0 :     build_tag: String,
     330            0 :     cli: &Cli,
     331            0 :     CliSpecParams {
     332            0 :         spec,
     333            0 :         live_config_allowed,
     334            0 :         compute_ctl_config: _,
     335            0 :     }: CliSpecParams,
     336            0 : ) -> Result<Arc<ComputeNode>> {
     337            0 :     let mut new_state = ComputeState::new();
     338              :     let spec_set;
     339              : 
     340            0 :     if let Some(spec) = spec {
     341            0 :         let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
     342            0 :         info!("new pspec.spec: {:?}", pspec.spec);
     343            0 :         new_state.pspec = Some(pspec);
     344            0 :         spec_set = true;
     345            0 :     } else {
     346            0 :         spec_set = false;
     347            0 :     }
     348            0 :     let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
     349            0 :     let conn_conf = postgres::config::Config::from_str(connstr.as_str())
     350            0 :         .context("cannot build postgres config from connstr")?;
     351            0 :     let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str())
     352            0 :         .context("cannot build tokio postgres config from connstr")?;
     353            0 :     let compute_node = ComputeNode {
     354            0 :         compute_id: cli.compute_id.clone(),
     355            0 :         connstr,
     356            0 :         conn_conf,
     357            0 :         tokio_conn_conf,
     358            0 :         pgdata: cli.pgdata.clone(),
     359            0 :         pgbin: cli.pgbin.clone(),
     360            0 :         pgversion: get_pg_version_string(&cli.pgbin),
     361            0 :         external_http_port: cli.external_http_port,
     362            0 :         internal_http_port: cli.internal_http_port.unwrap_or(cli.external_http_port + 1),
     363            0 :         live_config_allowed,
     364            0 :         state: Mutex::new(new_state),
     365            0 :         state_changed: Condvar::new(),
     366            0 :         ext_remote_storage: cli.remote_ext_config.clone(),
     367            0 :         ext_download_progress: RwLock::new(HashMap::new()),
     368            0 :         build_tag,
     369            0 :     };
     370            0 :     let compute = Arc::new(compute_node);
     371            0 : 
     372            0 :     // If this is a pooled VM, prewarm before starting HTTP server and becoming
     373            0 :     // available for binding. Prewarming helps Postgres start quicker later,
     374            0 :     // because QEMU will already have its memory allocated from the host, and
     375            0 :     // the necessary binaries will already be cached.
     376            0 :     if !spec_set {
     377            0 :         compute.prewarm_postgres()?;
     378            0 :     }
     379              : 
     380              :     // Launch the external HTTP server first, so that we can serve control plane
     381              :     // requests while configuration is still in progress.
     382            0 :     Server::External(cli.external_http_port).launch(&compute);
     383            0 : 
     384            0 :     // The internal HTTP server could be launched later, but there isn't much
     385            0 :     // sense in waiting.
     386            0 :     Server::Internal(cli.internal_http_port.unwrap_or(cli.external_http_port + 1)).launch(&compute);
     387            0 : 
     388            0 :     if !spec_set {
     389              :         // No spec provided, hang waiting for it.
     390            0 :         info!("no compute spec provided, waiting");
     391              : 
     392            0 :         let mut state = compute.state.lock().unwrap();
     393            0 :         while state.status != ComputeStatus::ConfigurationPending {
     394            0 :             state = compute.state_changed.wait(state).unwrap();
     395            0 : 
     396            0 :             if state.status == ComputeStatus::ConfigurationPending {
     397            0 :                 info!("got spec, continue configuration");
     398              :                 // Spec is already set by the http server handler.
     399            0 :                 break;
     400            0 :             }
     401              :         }
     402              : 
     403              :         // Record for how long we slept waiting for the spec.
     404            0 :         let now = Utc::now();
     405            0 :         state.metrics.wait_for_spec_ms = now
     406            0 :             .signed_duration_since(state.start_time)
     407            0 :             .to_std()
     408            0 :             .unwrap()
     409            0 :             .as_millis() as u64;
     410            0 : 
     411            0 :         // Reset start time, so that the total startup time that is calculated later will
     412            0 :         // not include the time that we waited for the spec.
     413            0 :         state.start_time = now;
     414            0 :     }
     415              : 
     416            0 :     launch_lsn_lease_bg_task_for_static(&compute);
     417            0 : 
     418            0 :     Ok(compute)
     419            0 : }
     420              : 
     421            0 : fn start_postgres(
     422            0 :     cli: &Cli,
     423            0 :     compute: Arc<ComputeNode>,
     424            0 : ) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
     425            0 :     // We got all we need, update the state.
     426            0 :     let mut state = compute.state.lock().unwrap();
     427            0 :     state.set_status(ComputeStatus::Init, &compute.state_changed);
     428            0 : 
     429            0 :     info!(
     430            0 :         "running compute with features: {:?}",
     431            0 :         state.pspec.as_ref().unwrap().spec.features
     432              :     );
     433              :     // before we release the mutex, fetch some parameters for later.
     434              :     let &ComputeSpec {
     435            0 :         swap_size_bytes,
     436            0 :         disk_quota_bytes,
     437            0 :         #[cfg(target_os = "linux")]
     438            0 :         disable_lfc_resizing,
     439            0 :         ..
     440            0 :     } = &state.pspec.as_ref().unwrap().spec;
     441            0 :     drop(state);
     442            0 : 
     443            0 :     // Launch remaining service threads
     444            0 :     let _monitor_handle = launch_monitor(&compute);
     445            0 :     let _configurator_handle = launch_configurator(&compute);
     446            0 : 
     447            0 :     let mut prestartup_failed = false;
     448            0 :     let mut delay_exit = false;
     449              : 
     450              :     // Resize swap to the desired size if the compute spec says so
     451            0 :     if let (Some(size_bytes), true) = (swap_size_bytes, cli.resize_swap_on_bind) {
     452              :         // To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
     453              :         // *before* starting postgres.
     454              :         //
     455              :         // In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
     456              :         // carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
     457              :         // OOM-killed during startup because swap wasn't available yet.
     458            0 :         match resize_swap(size_bytes) {
     459              :             Ok(()) => {
     460            0 :                 let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
     461            0 :                 info!(%size_bytes, %size_mib, "resized swap");
     462              :             }
     463            0 :             Err(err) => {
     464            0 :                 let err = err.context("failed to resize swap");
     465            0 :                 error!("{err:#}");
     466              : 
     467              :                 // Mark compute startup as failed; don't try to start postgres, and report this
     468              :                 // error to the control plane when it next asks.
     469            0 :                 prestartup_failed = true;
     470            0 :                 compute.set_failed_status(err);
     471            0 :                 delay_exit = true;
     472              :             }
     473              :         }
     474            0 :     }
     475              : 
     476              :     // Set disk quota if the compute spec says so
     477            0 :     if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
     478            0 :         (disk_quota_bytes, cli.set_disk_quota_for_fs.as_ref())
     479              :     {
     480            0 :         match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) {
     481              :             Ok(()) => {
     482            0 :                 let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
     483            0 :                 info!(%disk_quota_bytes, %size_mib, "set disk quota");
     484              :             }
     485            0 :             Err(err) => {
     486            0 :                 let err = err.context("failed to set disk quota");
     487            0 :                 error!("{err:#}");
     488              : 
     489              :                 // Mark compute startup as failed; don't try to start postgres, and report this
     490              :                 // error to the control plane when it next asks.
     491            0 :                 prestartup_failed = true;
     492            0 :                 compute.set_failed_status(err);
     493            0 :                 delay_exit = true;
     494              :             }
     495              :         }
     496            0 :     }
     497              : 
     498              :     // Start Postgres
     499            0 :     let mut pg = None;
     500            0 :     if !prestartup_failed {
     501            0 :         pg = match compute.start_compute() {
     502            0 :             Ok(pg) => {
     503            0 :                 info!(postmaster_pid = %pg.0.id(), "Postgres was started");
     504            0 :                 Some(pg)
     505              :             }
     506            0 :             Err(err) => {
     507            0 :                 error!("could not start the compute node: {:#}", err);
     508            0 :                 compute.set_failed_status(err);
     509            0 :                 delay_exit = true;
     510            0 :                 None
     511              :             }
     512              :         };
     513              :     } else {
     514            0 :         warn!("skipping postgres startup because pre-startup step failed");
     515              :     }
     516              : 
     517              :     // Start the vm-monitor if directed to. The vm-monitor only runs on linux
     518              :     // because it requires cgroups.
     519              :     cfg_if::cfg_if! {
     520              :         if #[cfg(target_os = "linux")] {
     521              :             use std::env;
     522              :             use tokio_util::sync::CancellationToken;
     523              : 
     524              :             // This token is used internally by the monitor to clean up all threads
     525            0 :             let token = CancellationToken::new();
     526              : 
     527              :             // don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
     528            0 :             let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
     529            0 :                 None
     530              :             } else {
     531            0 :                 Some(cli.filecache_connstr.clone())
     532              :             };
     533              : 
     534            0 :             let vm_monitor = if env::var_os("AUTOSCALING").is_some() {
     535            0 :                 let vm_monitor = tokio::spawn(vm_monitor::start(
     536            0 :                     Box::leak(Box::new(vm_monitor::Args {
     537            0 :                         cgroup: Some(cli.cgroup.clone()),
     538            0 :                         pgconnstr,
     539            0 :                         addr: cli.vm_monitor_addr.clone(),
     540            0 :                     })),
     541            0 :                     token.clone(),
     542            0 :                 ));
     543            0 :                 Some(vm_monitor)
     544              :             } else {
     545            0 :                 None
     546              :             };
     547              :         }
     548              :     }
     549              : 
     550            0 :     Ok((
     551            0 :         pg,
     552            0 :         StartPostgresResult {
     553            0 :             delay_exit,
     554            0 :             compute,
     555            0 :             #[cfg(target_os = "linux")]
     556            0 :             token,
     557            0 :             #[cfg(target_os = "linux")]
     558            0 :             vm_monitor,
     559            0 :         },
     560            0 :     ))
     561            0 : }
     562              : 
     563              : type PostgresHandle = (std::process::Child, tokio::task::JoinHandle<Result<()>>);
     564              : 
     565              : struct StartPostgresResult {
     566              :     delay_exit: bool,
     567              :     // passed through from WaitSpecResult
     568              :     compute: Arc<ComputeNode>,
     569              : 
     570              :     #[cfg(target_os = "linux")]
     571              :     token: tokio_util::sync::CancellationToken,
     572              :     #[cfg(target_os = "linux")]
     573              :     vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
     574              : }
     575              : 
     576            0 : fn wait_postgres(pg: Option<PostgresHandle>) -> Result<WaitPostgresResult> {
     577            0 :     // Wait for the child Postgres process forever. In this state Ctrl+C will
     578            0 :     // propagate to Postgres and it will be shut down as well.
     579            0 :     let mut exit_code = None;
     580            0 :     if let Some((mut pg, logs_handle)) = pg {
     581            0 :         info!(postmaster_pid = %pg.id(), "Waiting for Postgres to exit");
     582              : 
     583            0 :         let ecode = pg
     584            0 :             .wait()
     585            0 :             .expect("failed to start waiting on Postgres process");
     586            0 :         PG_PID.store(0, Ordering::SeqCst);
     587            0 : 
     588            0 :         // Process has exited. Wait for the log collecting task to finish.
     589            0 :         let _ = tokio::runtime::Handle::current()
     590            0 :             .block_on(logs_handle)
     591            0 :             .map_err(|e| tracing::error!("log task panicked: {:?}", e));
     592            0 : 
     593            0 :         info!("Postgres exited with code {}, shutting down", ecode);
     594            0 :         exit_code = ecode.code()
     595            0 :     }
     596              : 
     597            0 :     Ok(WaitPostgresResult { exit_code })
     598            0 : }
     599              : 
     600              : struct WaitPostgresResult {
     601              :     exit_code: Option<i32>,
     602              : }
     603              : 
     604            0 : fn cleanup_after_postgres_exit(
     605            0 :     StartPostgresResult {
     606            0 :         mut delay_exit,
     607            0 :         compute,
     608            0 :         #[cfg(target_os = "linux")]
     609            0 :         vm_monitor,
     610            0 :         #[cfg(target_os = "linux")]
     611            0 :         token,
     612            0 :     }: StartPostgresResult,
     613            0 : ) -> Result<bool> {
     614              :     // Terminate the vm_monitor so it releases the file watcher on
     615              :     // /sys/fs/cgroup/neon-postgres.
     616              :     // Note: the vm-monitor only runs on linux because it requires cgroups.
     617              :     cfg_if::cfg_if! {
     618              :         if #[cfg(target_os = "linux")] {
     619            0 :             if let Some(handle) = vm_monitor {
     620            0 :                 // Kills all threads spawned by the monitor
     621            0 :                 token.cancel();
     622            0 :                 // Kills the actual task running the monitor
     623            0 :                 handle.abort();
     624            0 :             }
     625              :         }
     626              :     }
     627              : 
     628              :     // Maybe sync safekeepers again, to speed up next startup
     629            0 :     let compute_state = compute.state.lock().unwrap().clone();
     630            0 :     let pspec = compute_state.pspec.as_ref().expect("spec must be set");
     631            0 :     if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
     632            0 :         info!("syncing safekeepers on shutdown");
     633            0 :         let storage_auth_token = pspec.storage_auth_token.clone();
     634            0 :         let lsn = compute.sync_safekeepers(storage_auth_token)?;
     635            0 :         info!("synced safekeepers at lsn {lsn}");
     636            0 :     }
     637              : 
     638            0 :     let mut state = compute.state.lock().unwrap();
     639            0 :     if state.status == ComputeStatus::TerminationPending {
     640            0 :         state.status = ComputeStatus::Terminated;
     641            0 :         compute.state_changed.notify_all();
     642            0 :         // we were asked to terminate gracefully, don't exit to avoid restart
     643            0 :         delay_exit = true
     644            0 :     }
     645            0 :     drop(state);
     646              : 
     647            0 :     if let Err(err) = compute.check_for_core_dumps() {
     648            0 :         error!("error while checking for core dumps: {err:?}");
     649            0 :     }
     650              : 
     651            0 :     Ok(delay_exit)
     652            0 : }
     653              : 
     654            0 : fn maybe_delay_exit(delay_exit: bool) {
     655            0 :     // If launch failed, keep serving HTTP requests for a while, so the cloud
     656            0 :     // control plane can get the actual error.
     657            0 :     if delay_exit {
     658            0 :         info!("giving control plane 30s to collect the error before shutdown");
     659            0 :         thread::sleep(Duration::from_secs(30));
     660            0 :     }
     661            0 : }
     662              : 
     663            0 : fn deinit_and_exit(WaitPostgresResult { exit_code }: WaitPostgresResult) -> ! {
     664            0 :     // Shutdown trace pipeline gracefully, so that it has a chance to send any
     665            0 :     // pending traces before we exit. Shutting down OTEL tracing provider may
     666            0 :     // hang for quite some time, see, for example:
     667            0 :     // - https://github.com/open-telemetry/opentelemetry-rust/issues/868
     668            0 :     // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
     669            0 :     //
     670            0 :     // Yet, we want computes to shut down fast enough, as we may need a new one
     671            0 :     // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
     672            0 :     // complete, then just error out and exit the main thread.
     673            0 :     info!("shutting down tracing");
     674            0 :     let (sender, receiver) = mpsc::channel();
     675            0 :     let _ = thread::spawn(move || {
     676            0 :         tracing_utils::shutdown_tracing();
     677            0 :         sender.send(()).ok()
     678            0 :     });
     679            0 :     let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
     680            0 :     if shutdown_res.is_err() {
     681            0 :         error!("timed out while shutting down tracing, exiting anyway");
     682            0 :     }
     683              : 
     684            0 :     info!("shutting down");
     685            0 :     exit(exit_code.unwrap_or(1))
     686              : }
     687              : 
     688              : /// When compute_ctl is killed, send also termination signal to sync-safekeepers
     689              : /// to prevent leakage. TODO: it is better to convert compute_ctl to async and
     690              : /// wait for termination which would be easy then.
     691            0 : fn handle_exit_signal(sig: i32) {
     692            0 :     info!("received {sig} termination signal");
     693            0 :     forward_termination_signal();
     694            0 :     exit(1);
     695              : }
     696              : 
     697              : #[cfg(test)]
     698              : mod test {
     699              :     use clap::CommandFactory;
     700              : 
     701              :     use super::Cli;
     702              : 
     703              :     #[test]
     704            1 :     fn verify_cli() {
     705            1 :         Cli::command().debug_assert()
     706            1 :     }
     707              : }
        

Generated by: LCOV version 2.1-beta