       1              : //!
       2              : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
       3              : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
       4              : //! initialization:
       5              : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
       6              : //! - Every start is a fresh start, so the data directory is removed and
       7              : //!   initialized again on each run.
       8              : //! - If remote_extension_config is provided, it will be used to fetch extensions list
       9              : //!   and download `shared_preload_libraries` from the remote storage.
      10              : //! - Next it will put configuration files into the `PGDATA` directory.
      11              : //! - Sync safekeepers and get commit LSN.
      12              : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
      13              : //! - Try to start `postgres` and wait until it is ready to accept connections.
      14              : //! - Check and alter/drop/create roles and databases.
      15              : //! - Hang waiting on the `postmaster` process to exit.
      16              : //!
      17              : //! Also `compute_ctl` spawns two separate service threads:
      18              : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
      19              : //!   into the shared `ComputeNode`;
      20              : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
      21              : //!   last activity requests.
      22              : //!
      23              : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
      24              : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
      25              : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
      26              : //! downscaling and requests immediate upscaling under resource pressure.
      27              : //!
      28              : //! Usage example:
      29              : //! ```sh
      30              : //! compute_ctl -D /var/db/postgres/compute \
      31              : //!             -C 'postgresql://cloud_admin@localhost/postgres' \
      32              : //!             -S /var/db/postgres/specs/current.json \
      33              : //!             -b /usr/local/bin/postgres \
      34              : //!             -r http://pg-ext-s3-gateway \
      35              : //! ```
      36              : use std::ffi::OsString;
      37              : use std::fs::File;
      38              : use std::path::Path;
      39              : use std::process::exit;
      40              : use std::sync::mpsc;
      41              : use std::thread;
      42              : use std::time::Duration;
      43              : 
      44              : use anyhow::{Context, Result};
      45              : use clap::Parser;
      46              : use compute_api::responses::ComputeCtlConfig;
      47              : use compute_api::spec::ComputeSpec;
      48              : use compute_tools::compute::{ComputeNode, ComputeNodeParams, forward_termination_signal};
      49              : use compute_tools::extension_server::get_pg_version_string;
      50              : use compute_tools::logger::*;
      51              : use compute_tools::params::*;
      52              : use compute_tools::spec::*;
      53              : use rlimit::{Resource, setrlimit};
      54              : use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
      55              : use signal_hook::iterator::Signals;
      56              : use tracing::{error, info};
      57              : use url::Url;
      58              : use utils::failpoint_support;
      59              : 
      60              : // this is an arbitrary build tag. Fine as a default / for testing purposes
      61              : // in-case of not-set environment var
      62              : const BUILD_TAG_DEFAULT: &str = "latest";
      63              : 
      64              : // Compatibility hack: if the control plane specified any remote-ext-config
      65              : // use the default value for extension storage proxy gateway.
      66              : // Remove this once the control plane is updated to pass the gateway URL
      67            0 : fn parse_remote_ext_config(arg: &str) -> Result<String> {
      68            0 :     if arg.starts_with("http") {
      69            0 :         Ok(arg.trim_end_matches('/').to_string())
      70              :     } else {
      71            0 :         Ok("http://pg-ext-s3-gateway".to_string())
      72              :     }
      73            0 : }
      74              : 
      75              : #[derive(Parser)]
      76              : #[command(rename_all = "kebab-case")]
      77              : struct Cli {
      78              :     #[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
      79            0 :     pub pgbin: String,
      80              : 
      81              :     #[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
      82              :     pub remote_ext_config: Option<String>,
      83              : 
      84              :     /// The port to bind the external listening HTTP server to. Clients running
      85              :     /// outside the compute will talk to the compute through this port. Keep
      86              :     /// the previous name for this argument around for a smoother release
      87              :     /// with the control plane.
      88            1 :     #[arg(long, default_value_t = 3080)]
      89            0 :     pub external_http_port: u16,
      90              : 
      91              :     /// The port to bind the internal listening HTTP server to. Clients include
      92              :     /// the neon extension (for installing remote extensions) and local_proxy.
      93            1 :     #[arg(long, default_value_t = 3081)]
      94            0 :     pub internal_http_port: u16,
      95              : 
      96              :     #[arg(short = 'D', long, value_name = "DATADIR")]
      97            0 :     pub pgdata: String,
      98              : 
      99              :     #[arg(short = 'C', long, value_name = "DATABASE_URL")]
     100            0 :     pub connstr: String,
     101              : 
     102              :     #[cfg(target_os = "linux")]
     103              :     #[arg(long, default_value = "neon-postgres")]
     104            0 :     pub cgroup: String,
     105              : 
     106              :     #[cfg(target_os = "linux")]
     107              :     #[arg(
     108              :         long,
     109              :         default_value = "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor"
     110              :     )]
     111            0 :     pub filecache_connstr: String,
     112              : 
     113              :     #[cfg(target_os = "linux")]
     114              :     #[arg(long, default_value = "")]
     115            0 :     pub vm_monitor_addr: String,
     116              : 
     117              :     #[arg(long, action = clap::ArgAction::SetTrue)]
     118            0 :     pub resize_swap_on_bind: bool,
     119              : 
     120              :     #[arg(long)]
     121              :     pub set_disk_quota_for_fs: Option<String>,
     122              : 
     123              :     #[arg(short = 's', long = "spec", group = "spec")]
     124              :     pub spec_json: Option<String>,
     125              : 
     126              :     #[arg(short = 'S', long, group = "spec-path")]
     127              :     pub spec_path: Option<OsString>,
     128              : 
     129              :     #[arg(short = 'i', long, group = "compute-id")]
     130            0 :     pub compute_id: String,
     131              : 
     132              :     #[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
     133              :     pub control_plane_uri: Option<String>,
     134              : }
     135              : 
     136            0 : fn main() -> Result<()> {
     137            0 :     let cli = Cli::parse();
     138            0 : 
     139            0 :     let scenario = failpoint_support::init();
     140              : 
     141              :     // For historical reasons, the main thread that processes the spec and launches postgres
     142              :     // is synchronous, but we always have this tokio runtime available and we "enter" it so
     143              :     // that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
     144              :     // from all parts of compute_ctl.
     145            0 :     let runtime = tokio::runtime::Builder::new_multi_thread()
     146            0 :         .enable_all()
     147            0 :         .build()?;
     148            0 :     let _rt_guard = runtime.enter();
     149              : 
     150            0 :     let build_tag = runtime.block_on(init())?;
     151              : 
     152              :     // enable core dumping for all child processes
     153            0 :     setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
     154              : 
     155            0 :     let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
     156              : 
     157            0 :     let cli_spec = try_spec_from_cli(&cli)?;
     158              : 
     159            0 :     let compute_node = ComputeNode::new(
     160            0 :         ComputeNodeParams {
     161            0 :             compute_id: cli.compute_id,
     162            0 :             connstr,
     163            0 :             pgdata: cli.pgdata.clone(),
     164            0 :             pgbin: cli.pgbin.clone(),
     165            0 :             pgversion: get_pg_version_string(&cli.pgbin),
     166            0 :             external_http_port: cli.external_http_port,
     167            0 :             internal_http_port: cli.internal_http_port,
     168            0 :             ext_remote_storage: cli.remote_ext_config.clone(),
     169            0 :             resize_swap_on_bind: cli.resize_swap_on_bind,
     170            0 :             set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
     171            0 :             #[cfg(target_os = "linux")]
     172            0 :             filecache_connstr: cli.filecache_connstr,
     173            0 :             #[cfg(target_os = "linux")]
     174            0 :             cgroup: cli.cgroup,
     175            0 :             #[cfg(target_os = "linux")]
     176            0 :             vm_monitor_addr: cli.vm_monitor_addr,
     177            0 :             build_tag,
     178            0 : 
     179            0 :             live_config_allowed: cli_spec.live_config_allowed,
     180            0 :         },
     181            0 :         cli_spec.spec,
     182            0 :         cli_spec.compute_ctl_config,
     183            0 :     )?;
     184              : 
     185            0 :     let exit_code =;
     186              : 
     187            0 :     scenario.teardown();
     188            0 : 
     189            0 :     deinit_and_exit(exit_code);
     190            0 : }
     191              : 
     192            0 : async fn init() -> Result<String> {
     193            0 :     init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
     194              : 
     195            0 :     let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
     196            0 :     thread::spawn(move || {
     197            0 :         for sig in signals.forever() {
     198            0 :             handle_exit_signal(sig);
     199            0 :         }
     200            0 :     });
     201            0 : 
     202            0 :     let build_tag = option_env!("BUILD_TAG")
     203            0 :         .unwrap_or(BUILD_TAG_DEFAULT)
     204            0 :         .to_string();
     205            0 :     info!("build_tag: {build_tag}");
     206              : 
     207            0 :     Ok(build_tag)
     208            0 : }
     209              : 
     210            0 : fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
     211              :     // First, try to get cluster spec from the cli argument
     212            0 :     if let Some(ref spec_json) = cli.spec_json {
     213            0 :         info!("got spec from cli argument {}", spec_json);
     214              :         return Ok(CliSpecParams {
     215            0 :             spec: Some(serde_json::from_str(spec_json)?),
     216            0 :             compute_ctl_config: ComputeCtlConfig::default(),
     217              :             live_config_allowed: false,
     218              :         });
     219            0 :     }
     220              : 
     221              :     // Second, try to read it from the file if path is provided
     222            0 :     if let Some(ref spec_path) = cli.spec_path {
     223            0 :         let file = File::open(Path::new(spec_path))?;
     224              :         return Ok(CliSpecParams {
     225            0 :             spec: Some(serde_json::from_reader(file)?),
     226            0 :             compute_ctl_config: ComputeCtlConfig::default(),
     227              :             live_config_allowed: true,
     228              :         });
     229            0 :     }
     230            0 : 
     231            0 :     if cli.control_plane_uri.is_none() {
     232            0 :         panic!("must specify --control-plane-uri");
     233            0 :     };
     234            0 : 
     235            0 :     match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
     236            0 :         Ok(resp) => Ok(CliSpecParams {
     237            0 :             spec: resp.0,
     238            0 :             compute_ctl_config: resp.1,
     239            0 :             live_config_allowed: true,
     240            0 :         }),
     241            0 :         Err(e) => {
     242            0 :             error!(
     243            0 :                 "cannot get response from control plane: {}\n\
     244            0 :                 neither spec nor confirmation that compute is in the Empty state was received",
     245              :                 e
     246              :             );
     247            0 :             Err(e)
     248              :         }
     249              :     }
     250            0 : }
     251              : 
     252              : struct CliSpecParams {
     253              :     /// If a spec was provided via CLI or file, the [`ComputeSpec`]
     254              :     spec: Option<ComputeSpec>,
     255              :     #[allow(dead_code)]
     256              :     compute_ctl_config: ComputeCtlConfig,
     257              :     live_config_allowed: bool,
     258              : }
     259              : 
     260            0 : fn deinit_and_exit(exit_code: Option<i32>) -> ! {
     261            0 :     // Shutdown trace pipeline gracefully, so that it has a chance to send any
     262            0 :     // pending traces before we exit. Shutting down OTEL tracing provider may
     263            0 :     // hang for quite some time, see, for example:
     264            0 :     // -
     265            0 :     // - and our problems with staging
     266            0 :     //
     267            0 :     // Yet, we want computes to shut down fast enough, as we may need a new one
     268            0 :     // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
     269            0 :     // complete, then just error out and exit the main thread.
     270            0 :     info!("shutting down tracing");
     271            0 :     let (sender, receiver) = mpsc::channel();
     272            0 :     let _ = thread::spawn(move || {
     273            0 :         tracing_utils::shutdown_tracing();
     274            0 :         sender.send(()).ok()
     275            0 :     });
     276            0 :     let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
     277            0 :     if shutdown_res.is_err() {
     278            0 :         error!("timed out while shutting down tracing, exiting anyway");
     279            0 :     }
     280              : 
     281            0 :     info!("shutting down");
     282            0 :     exit(exit_code.unwrap_or(1))
     283              : }
     284              : 
     285              : /// When compute_ctl is killed, send also termination signal to sync-safekeepers
     286              : /// to prevent leakage. TODO: it is better to convert compute_ctl to async and
     287              : /// wait for termination which would be easy then.
     288            0 : fn handle_exit_signal(sig: i32) {
     289            0 :     info!("received {sig} termination signal");
     290            0 :     forward_termination_signal();
     291            0 :     exit(1);
     292              : }
     293              : 
     294              : #[cfg(test)]
     295              : mod test {
     296              :     use clap::CommandFactory;
     297              : 
     298              :     use super::Cli;
     299              : 
     300              :     #[test]
     301            1 :     fn verify_cli() {
     302            1 :         Cli::command().debug_assert()
     303            1 :     }
     304              : }

