LCOV - code coverage report
Current view: top level - compute_tools/src/bin - compute_ctl.rs (source / functions) Coverage Total Hit
Test: fcf55189004bd3119eed75e2873a97da8078700c.info Lines: 18.1 % 530 96
Test Date: 2024-06-25 12:07:31 Functions: 9.1 % 22 2

            Line data    Source code
       1              : //!
       2              : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
       3              : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
       4              : //! initialization:
       5              : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
       6              : //! - Every start is a fresh start, so the data directory is removed and
       7              : //!   initialized again on each run.
       8              : //! - If remote_extension_config is provided, it will be used to fetch extensions list
       9              : //!  and download `shared_preload_libraries` from the remote storage.
      10              : //! - Next it will put configuration files into the `PGDATA` directory.
      11              : //! - Sync safekeepers and get commit LSN.
      12              : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
      13              : //! - Try to start `postgres` and wait until it is ready to accept connections.
      14              : //! - Check and alter/drop/create roles and databases.
      15              : //! - Hang waiting on the `postmaster` process to exit.
      16              : //!
      17              : //! Also `compute_ctl` spawns two separate service threads:
      18              : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
      19              : //!   into the shared `ComputeNode`;
      20              : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
      21              : //!   last activity requests.
      22              : //!
      23              : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
      24              : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
      25              : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
      26              : //! downscaling and requests immediate upscaling under resource pressure.
      27              : //!
      28              : //! Usage example:
      29              : //! ```sh
      30              : //! compute_ctl -D /var/db/postgres/compute \
      31              : //!             -C 'postgresql://cloud_admin@localhost/postgres' \
      32              : //!             -S /var/db/postgres/specs/current.json \
      33              : //!             -b /usr/local/bin/postgres \
      34              : //!             -r http://pg-ext-s3-gateway \
      35              : //! ```
      36              : //!
      37              : use std::collections::HashMap;
      38              : use std::fs::File;
      39              : use std::path::Path;
      40              : use std::process::exit;
      41              : use std::sync::atomic::Ordering;
      42              : use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
      43              : use std::{thread, time::Duration};
      44              : 
      45              : use anyhow::{Context, Result};
      46              : use chrono::Utc;
      47              : use clap::Arg;
      48              : use signal_hook::consts::{SIGQUIT, SIGTERM};
      49              : use signal_hook::{consts::SIGINT, iterator::Signals};
      50              : use tracing::{error, info, warn};
      51              : use url::Url;
      52              : 
      53              : use compute_api::responses::ComputeStatus;
      54              : use compute_api::spec::ComputeSpec;
      55              : 
      56              : use compute_tools::compute::{
      57              :     forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
      58              : };
      59              : use compute_tools::configurator::launch_configurator;
      60              : use compute_tools::extension_server::get_pg_version;
      61              : use compute_tools::http::api::launch_http_server;
      62              : use compute_tools::logger::*;
      63              : use compute_tools::monitor::launch_monitor;
      64              : use compute_tools::params::*;
      65              : use compute_tools::spec::*;
      66              : use compute_tools::swap::resize_swap;
      67              : 
      68              : // this is an arbitrary build tag. Fine as a default / for testing purposes
      69              : // in-case of not-set environment var
      70              : const BUILD_TAG_DEFAULT: &str = "latest";
      71              : 
      72            0 : fn main() -> Result<()> {
      73            0 :     let (build_tag, clap_args) = init()?;
      74              : 
      75            0 :     let (pg_handle, start_pg_result) = {
      76              :         // Enter startup tracing context
      77            0 :         let _startup_context_guard = startup_context_from_env();
      78              : 
      79            0 :         let cli_args = process_cli(&clap_args)?;
      80              : 
      81            0 :         let cli_spec = try_spec_from_cli(&clap_args, &cli_args)?;
      82              : 
      83            0 :         let wait_spec_result = wait_spec(build_tag, cli_args, cli_spec)?;
      84              : 
      85            0 :         start_postgres(&clap_args, wait_spec_result)?
      86              : 
      87              :         // Startup is finished, exit the startup tracing span
      88              :     };
      89              : 
      90              :     // PostgreSQL is now running, if startup was successful. Wait until it exits.
      91            0 :     let wait_pg_result = wait_postgres(pg_handle)?;
      92              : 
      93            0 :     let delay_exit = cleanup_after_postgres_exit(start_pg_result)?;
      94              : 
      95            0 :     maybe_delay_exit(delay_exit);
      96            0 : 
      97            0 :     deinit_and_exit(wait_pg_result);
      98            0 : }
      99              : 
     100            0 : fn init() -> Result<(String, clap::ArgMatches)> {
     101            0 :     init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
     102              : 
     103            0 :     let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
     104            0 :     thread::spawn(move || {
     105            0 :         for sig in signals.forever() {
     106            0 :             handle_exit_signal(sig);
     107            0 :         }
     108            0 :     });
     109            0 : 
     110            0 :     let build_tag = option_env!("BUILD_TAG")
     111            0 :         .unwrap_or(BUILD_TAG_DEFAULT)
     112            0 :         .to_string();
     113            0 :     info!("build_tag: {build_tag}");
     114              : 
     115            0 :     Ok((build_tag, cli().get_matches()))
     116            0 : }
     117              : 
     118            0 : fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
     119            0 :     let pgbin_default = "postgres";
     120            0 :     let pgbin = matches
     121            0 :         .get_one::<String>("pgbin")
     122            0 :         .map(|s| s.as_str())
     123            0 :         .unwrap_or(pgbin_default);
     124            0 : 
     125            0 :     let ext_remote_storage = matches
     126            0 :         .get_one::<String>("remote-ext-config")
     127            0 :         // Compatibility hack: if the control plane specified any remote-ext-config
     128            0 :         // use the default value for extension storage proxy gateway.
     129            0 :         // Remove this once the control plane is updated to pass the gateway URL
     130            0 :         .map(|conf| {
     131            0 :             if conf.starts_with("http") {
     132            0 :                 conf.trim_end_matches('/')
     133              :             } else {
     134            0 :                 "http://pg-ext-s3-gateway"
     135              :             }
     136            0 :         });
     137            0 : 
     138            0 :     let http_port = *matches
     139            0 :         .get_one::<u16>("http-port")
     140            0 :         .expect("http-port is required");
     141            0 :     let pgdata = matches
     142            0 :         .get_one::<String>("pgdata")
     143            0 :         .expect("PGDATA path is required");
     144            0 :     let connstr = matches
     145            0 :         .get_one::<String>("connstr")
     146            0 :         .expect("Postgres connection string is required");
     147            0 :     let spec_json = matches.get_one::<String>("spec");
     148            0 :     let spec_path = matches.get_one::<String>("spec-path");
     149            0 :     let resize_swap_on_bind = matches.get_flag("resize-swap-on-bind");
     150            0 : 
     151            0 :     Ok(ProcessCliResult {
     152            0 :         connstr,
     153            0 :         pgdata,
     154            0 :         pgbin,
     155            0 :         ext_remote_storage,
     156            0 :         http_port,
     157            0 :         spec_json,
     158            0 :         spec_path,
     159            0 :         resize_swap_on_bind,
     160            0 :     })
     161            0 : }
     162              : 
     163              : struct ProcessCliResult<'clap> {
     164              :     connstr: &'clap str,
     165              :     pgdata: &'clap str,
     166              :     pgbin: &'clap str,
     167              :     ext_remote_storage: Option<&'clap str>,
     168              :     http_port: u16,
     169              :     spec_json: Option<&'clap String>,
     170              :     spec_path: Option<&'clap String>,
     171              :     resize_swap_on_bind: bool,
     172              : }
     173              : 
     174            0 : fn startup_context_from_env() -> Option<opentelemetry::ContextGuard> {
     175            0 :     // Extract OpenTelemetry context for the startup actions from the
     176            0 :     // TRACEPARENT and TRACESTATE env variables, and attach it to the current
     177            0 :     // tracing context.
     178            0 :     //
     179            0 :     // This is used to propagate the context for the 'start_compute' operation
     180            0 :     // from the neon control plane. This allows linking together the wider
     181            0 :     // 'start_compute' operation that creates the compute container, with the
     182            0 :     // startup actions here within the container.
     183            0 :     //
     184            0 :     // There is no standard for passing context in env variables, but a lot of
     185            0 :     // tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
     186            0 :     // https://github.com/open-telemetry/opentelemetry-specification/issues/740
     187            0 :     //
     188            0 :     // Switch to the startup context here, and exit it once the startup has
     189            0 :     // completed and Postgres is up and running.
     190            0 :     //
     191            0 :     // If this pod is pre-created without binding it to any particular endpoint
     192            0 :     // yet, this isn't the right place to enter the startup context. In that
     193            0 :     // case, the control plane should pass the tracing context as part of the
     194            0 :     // /configure API call.
     195            0 :     //
     196            0 :     // NOTE: This is supposed to only cover the *startup* actions. Once
     197            0 :     // postgres is configured and up-and-running, we exit this span. Any other
     198            0 :     // actions that are performed on incoming HTTP requests, for example, are
     199            0 :     // performed in separate spans.
     200            0 :     //
     201            0 :     // XXX: If the pod is restarted, we perform the startup actions in the same
     202            0 :     // context as the original startup actions, which probably doesn't make
     203            0 :     // sense.
     204            0 :     let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
     205            0 :     if let Ok(val) = std::env::var("TRACEPARENT") {
     206            0 :         startup_tracing_carrier.insert("traceparent".to_string(), val);
     207            0 :     }
     208            0 :     if let Ok(val) = std::env::var("TRACESTATE") {
     209            0 :         startup_tracing_carrier.insert("tracestate".to_string(), val);
     210            0 :     }
     211            0 :     if !startup_tracing_carrier.is_empty() {
     212              :         use opentelemetry::propagation::TextMapPropagator;
     213              :         use opentelemetry::sdk::propagation::TraceContextPropagator;
     214            0 :         let guard = TraceContextPropagator::new()
     215            0 :             .extract(&startup_tracing_carrier)
     216            0 :             .attach();
     217            0 :         info!("startup tracing context attached");
     218            0 :         Some(guard)
     219              :     } else {
     220            0 :         None
     221              :     }
     222            0 : }
     223              : 
     224            0 : fn try_spec_from_cli(
     225            0 :     matches: &clap::ArgMatches,
     226            0 :     ProcessCliResult {
     227            0 :         spec_json,
     228            0 :         spec_path,
     229            0 :         ..
     230            0 :     }: &ProcessCliResult,
     231            0 : ) -> Result<CliSpecParams> {
     232            0 :     let compute_id = matches.get_one::<String>("compute-id");
     233            0 :     let control_plane_uri = matches.get_one::<String>("control-plane-uri");
     234            0 : 
     235            0 :     let spec;
     236            0 :     let mut live_config_allowed = false;
     237            0 :     match spec_json {
     238              :         // First, try to get cluster spec from the cli argument
     239            0 :         Some(json) => {
     240            0 :             info!("got spec from cli argument {}", json);
     241            0 :             spec = Some(serde_json::from_str(json)?);
     242              :         }
     243              :         None => {
     244              :             // Second, try to read it from the file if path is provided
     245            0 :             if let Some(sp) = spec_path {
     246            0 :                 let path = Path::new(sp);
     247            0 :                 let file = File::open(path)?;
     248            0 :                 spec = Some(serde_json::from_reader(file)?);
     249            0 :                 live_config_allowed = true;
     250            0 :             } else if let Some(id) = compute_id {
     251            0 :                 if let Some(cp_base) = control_plane_uri {
     252            0 :                     live_config_allowed = true;
     253            0 :                     spec = match get_spec_from_control_plane(cp_base, id) {
     254            0 :                         Ok(s) => s,
     255            0 :                         Err(e) => {
     256            0 :                             error!("cannot get response from control plane: {}", e);
     257            0 :                             panic!("neither spec nor confirmation that compute is in the Empty state was received");
     258              :                         }
     259              :                     };
     260              :                 } else {
     261            0 :                     panic!("must specify both --control-plane-uri and --compute-id or none");
     262              :                 }
     263              :             } else {
     264            0 :                 panic!(
     265            0 :                     "compute spec should be provided by one of the following ways: \
     266            0 :                     --spec OR --spec-path OR --control-plane-uri and --compute-id"
     267            0 :                 );
     268              :             }
     269              :         }
     270              :     };
     271              : 
     272            0 :     Ok(CliSpecParams {
     273            0 :         spec,
     274            0 :         live_config_allowed,
     275            0 :     })
     276            0 : }
     277              : 
     278              : struct CliSpecParams {
     279              :     /// If a spec was provided via CLI or file, the [`ComputeSpec`]
     280              :     spec: Option<ComputeSpec>,
     281              :     live_config_allowed: bool,
     282              : }
     283              : 
     284            0 : fn wait_spec(
     285            0 :     build_tag: String,
     286            0 :     ProcessCliResult {
     287            0 :         connstr,
     288            0 :         pgdata,
     289            0 :         pgbin,
     290            0 :         ext_remote_storage,
     291            0 :         resize_swap_on_bind,
     292            0 :         http_port,
     293            0 :         ..
     294            0 :     }: ProcessCliResult,
     295            0 :     CliSpecParams {
     296            0 :         spec,
     297            0 :         live_config_allowed,
     298            0 :     }: CliSpecParams,
     299            0 : ) -> Result<WaitSpecResult> {
     300            0 :     let mut new_state = ComputeState::new();
     301              :     let spec_set;
     302              : 
     303            0 :     if let Some(spec) = spec {
     304            0 :         let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
     305            0 :         info!("new pspec.spec: {:?}", pspec.spec);
     306            0 :         new_state.pspec = Some(pspec);
     307            0 :         spec_set = true;
     308            0 :     } else {
     309            0 :         spec_set = false;
     310            0 :     }
     311            0 :     let compute_node = ComputeNode {
     312            0 :         connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
     313            0 :         pgdata: pgdata.to_string(),
     314            0 :         pgbin: pgbin.to_string(),
     315            0 :         pgversion: get_pg_version(pgbin),
     316            0 :         live_config_allowed,
     317            0 :         state: Mutex::new(new_state),
     318            0 :         state_changed: Condvar::new(),
     319            0 :         ext_remote_storage: ext_remote_storage.map(|s| s.to_string()),
     320            0 :         ext_download_progress: RwLock::new(HashMap::new()),
     321            0 :         build_tag,
     322            0 :     };
     323            0 :     let compute = Arc::new(compute_node);
     324            0 : 
     325            0 :     // If this is a pooled VM, prewarm before starting HTTP server and becoming
     326            0 :     // available for binding. Prewarming helps Postgres start quicker later,
     327            0 :     // because QEMU will already have its memory allocated from the host, and
     328            0 :     // the necessary binaries will already be cached.
     329            0 :     if !spec_set {
     330            0 :         compute.prewarm_postgres()?;
     331            0 :     }
     332              : 
     333              :     // Launch http service first, so that we can serve control-plane requests
     334              :     // while configuration is still in progress.
     335            0 :     let _http_handle =
     336            0 :         launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
     337            0 : 
     338            0 :     if !spec_set {
     339              :         // No spec provided, hang waiting for it.
     340            0 :         info!("no compute spec provided, waiting");
     341              : 
     342            0 :         let mut state = compute.state.lock().unwrap();
     343            0 :         while state.status != ComputeStatus::ConfigurationPending {
     344            0 :             state = compute.state_changed.wait(state).unwrap();
     345            0 : 
     346            0 :             if state.status == ComputeStatus::ConfigurationPending {
     347            0 :                 info!("got spec, continue configuration");
     348              :                 // Spec is already set by the http server handler.
     349            0 :                 break;
     350            0 :             }
     351              :         }
     352              : 
     353              :         // Record for how long we slept waiting for the spec.
     354            0 :         let now = Utc::now();
     355            0 :         state.metrics.wait_for_spec_ms = now
     356            0 :             .signed_duration_since(state.start_time)
     357            0 :             .to_std()
     358            0 :             .unwrap()
     359            0 :             .as_millis() as u64;
     360            0 : 
     361            0 :         // Reset start time, so that the total startup time that is calculated later will
     362            0 :         // not include the time that we waited for the spec.
     363            0 :         state.start_time = now;
     364            0 :     }
     365              : 
     366            0 :     Ok(WaitSpecResult {
     367            0 :         compute,
     368            0 :         http_port,
     369            0 :         resize_swap_on_bind,
     370            0 :     })
     371            0 : }
     372              : 
     373              : struct WaitSpecResult {
     374              :     compute: Arc<ComputeNode>,
     375              :     // passed through from ProcessCliResult
     376              :     http_port: u16,
     377              :     resize_swap_on_bind: bool,
     378              : }
     379              : 
     380            0 : fn start_postgres(
     381            0 :     // need to allow unused because `matches` is only used if target_os = "linux"
     382            0 :     #[allow(unused_variables)] matches: &clap::ArgMatches,
     383            0 :     WaitSpecResult {
     384            0 :         compute,
     385            0 :         http_port,
     386            0 :         resize_swap_on_bind,
     387            0 :     }: WaitSpecResult,
     388            0 : ) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
     389            0 :     // We got all we need, update the state.
     390            0 :     let mut state = compute.state.lock().unwrap();
     391            0 :     state.status = ComputeStatus::Init;
     392            0 :     compute.state_changed.notify_all();
     393            0 : 
     394            0 :     info!(
     395            0 :         "running compute with features: {:?}",
     396            0 :         state.pspec.as_ref().unwrap().spec.features
     397              :     );
     398              :     // before we release the mutex, fetch the swap size (if any) for later.
     399            0 :     let swap_size_bytes = state.pspec.as_ref().unwrap().spec.swap_size_bytes;
     400            0 :     drop(state);
     401            0 : 
     402            0 :     // Launch remaining service threads
     403            0 :     let _monitor_handle = launch_monitor(&compute);
     404            0 :     let _configurator_handle = launch_configurator(&compute);
     405            0 : 
     406            0 :     let mut prestartup_failed = false;
     407            0 :     let mut delay_exit = false;
     408              : 
     409              :     // Resize swap to the desired size if the compute spec says so
     410            0 :     if let (Some(size_bytes), true) = (swap_size_bytes, resize_swap_on_bind) {
     411              :         // To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
     412              :         // *before* starting postgres.
     413              :         //
     414              :         // In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
     415              :         // carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
     416              :         // OOM-killed during startup because swap wasn't available yet.
     417            0 :         match resize_swap(size_bytes) {
     418              :             Ok(()) => {
     419            0 :                 let size_gib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
     420            0 :                 info!(%size_bytes, %size_gib, "resized swap");
     421              :             }
     422            0 :             Err(err) => {
     423            0 :                 let err = err.context("failed to resize swap");
     424            0 :                 error!("{err:#}");
     425              : 
     426              :                 // Mark compute startup as failed; don't try to start postgres, and report this
     427              :                 // error to the control plane when it next asks.
     428            0 :                 prestartup_failed = true;
     429            0 :                 let mut state = compute.state.lock().unwrap();
     430            0 :                 state.error = Some(format!("{err:?}"));
     431            0 :                 state.status = ComputeStatus::Failed;
     432            0 :                 compute.state_changed.notify_all();
     433            0 :                 delay_exit = true;
     434              :             }
     435              :         }
     436            0 :     }
     437              : 
     438            0 :     let extension_server_port: u16 = http_port;
     439            0 : 
     440            0 :     // Start Postgres
     441            0 :     let mut pg = None;
     442            0 :     if !prestartup_failed {
     443            0 :         pg = match compute.start_compute(extension_server_port) {
     444            0 :             Ok(pg) => Some(pg),
     445            0 :             Err(err) => {
     446            0 :                 error!("could not start the compute node: {:#}", err);
     447            0 :                 let mut state = compute.state.lock().unwrap();
     448            0 :                 state.error = Some(format!("{:?}", err));
     449            0 :                 state.status = ComputeStatus::Failed;
     450            0 :                 // Notify others that Postgres failed to start. In case of configuring the
     451            0 :                 // empty compute, it's likely that API handler is still waiting for compute
     452            0 :                 // state change. With this we will notify it that compute is in Failed state,
     453            0 :                 // so control plane will know about it earlier and record proper error instead
     454            0 :                 // of timeout.
     455            0 :                 compute.state_changed.notify_all();
     456            0 :                 drop(state); // unlock
     457            0 :                 delay_exit = true;
     458            0 :                 None
     459              :             }
     460              :         };
     461              :     } else {
     462            0 :         warn!("skipping postgres startup because pre-startup step failed");
     463              :     }
     464              : 
     465              :     // Start the vm-monitor if directed to. The vm-monitor only runs on linux
     466              :     // because it requires cgroups.
     467              :     cfg_if::cfg_if! {
     468              :         if #[cfg(target_os = "linux")] {
     469              :             use std::env;
     470              :             use tokio_util::sync::CancellationToken;
     471            0 :             let vm_monitor_addr = matches
     472            0 :                 .get_one::<String>("vm-monitor-addr")
     473            0 :                 .expect("--vm-monitor-addr should always be set because it has a default arg");
     474            0 :             let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
     475            0 :             let cgroup = matches.get_one::<String>("cgroup");
     476              : 
     477              :             // Only make a runtime if we need to.
     478              :             // Note: it seems like you can make a runtime in an inner scope and
     479              :             // if you start a task in it it won't be dropped. However, make it
     480              :             // in the outermost scope just to be safe.
     481            0 :             let rt = if env::var_os("AUTOSCALING").is_some() {
     482            0 :                 Some(
     483            0 :                     tokio::runtime::Builder::new_multi_thread()
     484            0 :                         .worker_threads(4)
     485            0 :                         .enable_all()
     486            0 :                         .build()
     487            0 :                         .expect("failed to create tokio runtime for monitor")
     488            0 :                 )
     489              :             } else {
     490            0 :                 None
     491              :             };
     492              : 
     493              :             // This token is used internally by the monitor to clean up all threads
     494            0 :             let token = CancellationToken::new();
     495            0 : 
     496            0 :             let vm_monitor = rt.as_ref().map(|rt| {
     497            0 :                 rt.spawn(vm_monitor::start(
     498            0 :                     Box::leak(Box::new(vm_monitor::Args {
     499            0 :                         cgroup: cgroup.cloned(),
     500            0 :                         pgconnstr: file_cache_connstr.cloned(),
     501            0 :                         addr: vm_monitor_addr.clone(),
     502            0 :                     })),
     503            0 :                     token.clone(),
     504            0 :                 ))
     505            0 :             });
     506            0 :         }
     507            0 :     }
     508            0 : 
     509            0 :     Ok((
     510            0 :         pg,
     511            0 :         StartPostgresResult {
     512            0 :             delay_exit,
     513            0 :             compute,
     514            0 :             #[cfg(target_os = "linux")]
     515            0 :             rt,
     516            0 :             #[cfg(target_os = "linux")]
     517            0 :             token,
     518            0 :             #[cfg(target_os = "linux")]
     519            0 :             vm_monitor,
     520            0 :         },
     521            0 :     ))
     522            0 : }
     523              : 
     524              : type PostgresHandle = (std::process::Child, std::thread::JoinHandle<()>);
     525              : 
     526              : struct StartPostgresResult {
     527              :     delay_exit: bool,
     528              :     // passed through from WaitSpecResult
     529              :     compute: Arc<ComputeNode>,
     530              : 
     531              :     #[cfg(target_os = "linux")]
     532              :     rt: Option<tokio::runtime::Runtime>,
     533              :     #[cfg(target_os = "linux")]
     534              :     token: tokio_util::sync::CancellationToken,
     535              :     #[cfg(target_os = "linux")]
     536              :     vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
     537              : }
     538              : 
     539            0 : fn wait_postgres(pg: Option<PostgresHandle>) -> Result<WaitPostgresResult> {
     540            0 :     // Wait for the child Postgres process forever. In this state Ctrl+C will
     541            0 :     // propagate to Postgres and it will be shut down as well.
     542            0 :     let mut exit_code = None;
     543            0 :     if let Some((mut pg, logs_handle)) = pg {
     544            0 :         let ecode = pg
     545            0 :             .wait()
     546            0 :             .expect("failed to start waiting on Postgres process");
     547            0 :         PG_PID.store(0, Ordering::SeqCst);
     548            0 : 
     549            0 :         // Process has exited, so we can join the logs thread.
     550            0 :         let _ = logs_handle
     551            0 :             .join()
     552            0 :             .map_err(|e| tracing::error!("log thread panicked: {:?}", e));
     553            0 : 
     554            0 :         info!("Postgres exited with code {}, shutting down", ecode);
     555            0 :         exit_code = ecode.code()
     556            0 :     }
     557              : 
     558            0 :     Ok(WaitPostgresResult { exit_code })
     559            0 : }
     560              : 
     561              : struct WaitPostgresResult {
     562              :     exit_code: Option<i32>,
     563              : }
     564              : 
     565            0 : fn cleanup_after_postgres_exit(
     566            0 :     StartPostgresResult {
     567            0 :         mut delay_exit,
     568            0 :         compute,
     569            0 :         #[cfg(target_os = "linux")]
     570            0 :         vm_monitor,
     571            0 :         #[cfg(target_os = "linux")]
     572            0 :         token,
     573            0 :         #[cfg(target_os = "linux")]
     574            0 :         rt,
     575            0 :     }: StartPostgresResult,
     576            0 : ) -> Result<bool> {
     577              :     // Terminate the vm_monitor so it releases the file watcher on
     578              :     // /sys/fs/cgroup/neon-postgres.
     579              :     // Note: the vm-monitor only runs on linux because it requires cgroups.
     580              :     cfg_if::cfg_if! {
     581              :         if #[cfg(target_os = "linux")] {
     582            0 :             if let Some(handle) = vm_monitor {
     583            0 :                 // Kills all threads spawned by the monitor
     584            0 :                 token.cancel();
     585            0 :                 // Kills the actual task running the monitor
     586            0 :                 handle.abort();
     587            0 : 
     588            0 :                 // If handle is some, rt must have been used to produce it, and
     589            0 :                 // hence is also some
     590            0 :                 rt.unwrap().shutdown_timeout(Duration::from_secs(2));
     591            0 :             }
     592              :         }
     593              :     }
     594              : 
     595              :     // Maybe sync safekeepers again, to speed up next startup
     596            0 :     let compute_state = compute.state.lock().unwrap().clone();
     597            0 :     let pspec = compute_state.pspec.as_ref().expect("spec must be set");
     598            0 :     if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
     599            0 :         info!("syncing safekeepers on shutdown");
     600            0 :         let storage_auth_token = pspec.storage_auth_token.clone();
     601            0 :         let lsn = compute.sync_safekeepers(storage_auth_token)?;
     602            0 :         info!("synced safekeepers at lsn {lsn}");
     603            0 :     }
     604              : 
     605            0 :     let mut state = compute.state.lock().unwrap();
     606            0 :     if state.status == ComputeStatus::TerminationPending {
     607            0 :         state.status = ComputeStatus::Terminated;
     608            0 :         compute.state_changed.notify_all();
     609            0 :         // we were asked to terminate gracefully, don't exit to avoid restart
     610            0 :         delay_exit = true
     611            0 :     }
     612            0 :     drop(state);
     613              : 
     614            0 :     if let Err(err) = compute.check_for_core_dumps() {
     615            0 :         error!("error while checking for core dumps: {err:?}");
     616            0 :     }
     617              : 
     618            0 :     Ok(delay_exit)
     619            0 : }
     620              : 
     621            0 : fn maybe_delay_exit(delay_exit: bool) {
     622            0 :     // If launch failed, keep serving HTTP requests for a while, so the cloud
     623            0 :     // control plane can get the actual error.
     624            0 :     if delay_exit {
     625            0 :         info!("giving control plane 30s to collect the error before shutdown");
     626            0 :         thread::sleep(Duration::from_secs(30));
     627            0 :     }
     628            0 : }
     629              : 
     630            0 : fn deinit_and_exit(WaitPostgresResult { exit_code }: WaitPostgresResult) -> ! {
     631            0 :     // Shutdown trace pipeline gracefully, so that it has a chance to send any
     632            0 :     // pending traces before we exit. Shutting down OTEL tracing provider may
     633            0 :     // hang for quite some time, see, for example:
     634            0 :     // - https://github.com/open-telemetry/opentelemetry-rust/issues/868
     635            0 :     // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
     636            0 :     //
     637            0 :     // Yet, we want computes to shut down fast enough, as we may need a new one
     638            0 :     // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
     639            0 :     // complete, then just error out and exit the main thread.
     640            0 :     info!("shutting down tracing");
     641            0 :     let (sender, receiver) = mpsc::channel();
     642            0 :     let _ = thread::spawn(move || {
     643            0 :         tracing_utils::shutdown_tracing();
     644            0 :         sender.send(()).ok()
     645            0 :     });
     646            0 :     let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
     647            0 :     if shutdown_res.is_err() {
     648            0 :         error!("timed out while shutting down tracing, exiting anyway");
     649            0 :     }
     650              : 
     651            0 :     info!("shutting down");
     652            0 :     exit(exit_code.unwrap_or(1))
     653              : }
     654              : 
     655            2 : fn cli() -> clap::Command {
     656            2 :     // Env variable is set by `cargo`
     657            2 :     let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
     658            2 :     clap::Command::new("compute_ctl")
     659            2 :         .version(version)
     660            2 :         .arg(
     661            2 :             Arg::new("http-port")
     662            2 :                 .long("http-port")
     663            2 :                 .value_name("HTTP_PORT")
     664            2 :                 .default_value("3080")
     665            2 :                 .value_parser(clap::value_parser!(u16))
     666            2 :                 .required(false),
     667            2 :         )
     668            2 :         .arg(
     669            2 :             Arg::new("connstr")
     670            2 :                 .short('C')
     671            2 :                 .long("connstr")
     672            2 :                 .value_name("DATABASE_URL")
     673            2 :                 .required(true),
     674            2 :         )
     675            2 :         .arg(
     676            2 :             Arg::new("pgdata")
     677            2 :                 .short('D')
     678            2 :                 .long("pgdata")
     679            2 :                 .value_name("DATADIR")
     680            2 :                 .required(true),
     681            2 :         )
     682            2 :         .arg(
     683            2 :             Arg::new("pgbin")
     684            2 :                 .short('b')
     685            2 :                 .long("pgbin")
     686            2 :                 .default_value("postgres")
     687            2 :                 .value_name("POSTGRES_PATH"),
     688            2 :         )
     689            2 :         .arg(
     690            2 :             Arg::new("spec")
     691            2 :                 .short('s')
     692            2 :                 .long("spec")
     693            2 :                 .value_name("SPEC_JSON"),
     694            2 :         )
     695            2 :         .arg(
     696            2 :             Arg::new("spec-path")
     697            2 :                 .short('S')
     698            2 :                 .long("spec-path")
     699            2 :                 .value_name("SPEC_PATH"),
     700            2 :         )
     701            2 :         .arg(
     702            2 :             Arg::new("compute-id")
     703            2 :                 .short('i')
     704            2 :                 .long("compute-id")
     705            2 :                 .value_name("COMPUTE_ID"),
     706            2 :         )
     707            2 :         .arg(
     708            2 :             Arg::new("control-plane-uri")
     709            2 :                 .short('p')
     710            2 :                 .long("control-plane-uri")
     711            2 :                 .value_name("CONTROL_PLANE_API_BASE_URI"),
     712            2 :         )
     713            2 :         .arg(
     714            2 :             Arg::new("remote-ext-config")
     715            2 :                 .short('r')
     716            2 :                 .long("remote-ext-config")
     717            2 :                 .value_name("REMOTE_EXT_CONFIG"),
     718            2 :         )
     719            2 :         // TODO(fprasx): we currently have default arguments because the cloud PR
     720            2 :         // to pass them in hasn't been merged yet. We should get rid of them once
     721            2 :         // the PR is merged.
     722            2 :         .arg(
     723            2 :             Arg::new("vm-monitor-addr")
     724            2 :                 .long("vm-monitor-addr")
     725            2 :                 .default_value("0.0.0.0:10301")
     726            2 :                 .value_name("VM_MONITOR_ADDR"),
     727            2 :         )
     728            2 :         .arg(
     729            2 :             Arg::new("cgroup")
     730            2 :                 .long("cgroup")
     731            2 :                 .default_value("neon-postgres")
     732            2 :                 .value_name("CGROUP"),
     733            2 :         )
     734            2 :         .arg(
     735            2 :             Arg::new("filecache-connstr")
     736            2 :                 .long("filecache-connstr")
     737            2 :                 .default_value(
     738            2 :                     "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor",
     739            2 :                 )
     740            2 :                 .value_name("FILECACHE_CONNSTR"),
     741            2 :         )
     742            2 :         .arg(
     743            2 :             Arg::new("resize-swap-on-bind")
     744            2 :                 .long("resize-swap-on-bind")
     745            2 :                 .action(clap::ArgAction::SetTrue),
     746            2 :         )
     747            2 : }
     748              : 
     749              : /// When compute_ctl is killed, send also termination signal to sync-safekeepers
     750              : /// to prevent leakage. TODO: it is better to convert compute_ctl to async and
     751              : /// wait for termination which would be easy then.
     752            0 : fn handle_exit_signal(sig: i32) {
     753            0 :     info!("received {sig} termination signal");
     754            0 :     forward_termination_signal();
     755            0 :     exit(1);
     756              : }
     757              : 
     758              : #[test]
     759            2 : fn verify_cli() {
     760            2 :     cli().debug_assert()
     761            2 : }
        

Generated by: LCOV version 2.1-beta