LCOV - code coverage report
Current view: top level - compute_tools/src/bin - compute_ctl.rs (source / functions) Coverage Total Hit
Test: 553e39c2773e5840c720c90d86e56f89a4330d43.info Lines: 33.5 % 158 53
Test Date: 2025-06-13 20:01:21 Functions: 13.5 % 37 5

            Line data    Source code
       1              : //!
       2              : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
       3              : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
       4              : //! initialization:
       5              : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
       6              : //! - Every start is a fresh start, so the data directory is removed and
       7              : //!   initialized again on each run.
       8              : //! - If remote_extension_config is provided, it will be used to fetch extensions list
       9              : //!   and download `shared_preload_libraries` from the remote storage.
      10              : //! - Next it will put configuration files into the `PGDATA` directory.
      11              : //! - Sync safekeepers and get commit LSN.
      12              : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
      13              : //! - Try to start `postgres` and wait until it is ready to accept connections.
      14              : //! - Check and alter/drop/create roles and databases.
      15              : //! - Hang waiting on the `postmaster` process to exit.
      16              : //!
      17              : //! Also `compute_ctl` spawns two separate service threads:
      18              : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
      19              : //!   into the shared `ComputeNode`;
      20              : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
      21              : //!   last activity requests.
      22              : //!
      23              : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
      24              : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
      25              : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
      26              : //! downscaling and requests immediate upscaling under resource pressure.
      27              : //!
      28              : //! Usage example:
      29              : //! ```sh
      30              : //! compute_ctl -D /var/db/postgres/compute \
      31              : //!             -C 'postgresql://cloud_admin@localhost/postgres' \
      32              : //!             -c /var/db/postgres/configs/config.json \
      33              : //!             -b /usr/local/bin/postgres \
      34              : //!             -r http://pg-ext-s3-gateway \
      35              : //! ```
      36              : use std::ffi::OsString;
      37              : use std::fs::File;
      38              : use std::process::exit;
      39              : use std::sync::mpsc;
      40              : use std::thread;
      41              : use std::time::Duration;
      42              : 
      43              : use anyhow::{Context, Result, bail};
      44              : use clap::Parser;
      45              : use compute_api::responses::ComputeConfig;
      46              : use compute_tools::compute::{
      47              :     BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
      48              : };
      49              : use compute_tools::extension_server::get_pg_version_string;
      50              : use compute_tools::logger::*;
      51              : use compute_tools::params::*;
      52              : use compute_tools::spec::*;
      53              : use rlimit::{Resource, setrlimit};
      54              : use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
      55              : use signal_hook::iterator::Signals;
      56              : use tracing::{error, info};
      57              : use url::Url;
      58              : use utils::failpoint_support;
      59              : 
      60              : #[derive(Debug, Parser)]
      61              : #[command(rename_all = "kebab-case")]
      62              : struct Cli {
      63              :     #[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
      64            0 :     pub pgbin: String,
      65              : 
      66              :     /// The base URL for the remote extension storage proxy gateway.
      67              :     #[arg(short = 'r', long, value_parser = Self::parse_remote_ext_base_url)]
      68              :     pub remote_ext_base_url: Option<Url>,
      69              : 
      70              :     /// The port to bind the external listening HTTP server to. Clients running
      71              :     /// outside the compute will talk to the compute through this port. Keep
      72              :     /// the previous name for this argument around for a smoother release
      73              :     /// with the control plane.
      74            2 :     #[arg(long, default_value_t = 3080)]
      75            0 :     pub external_http_port: u16,
      76              : 
      77              :     /// The port to bind the internal listening HTTP server to. Clients include
      78              :     /// the neon extension (for installing remote extensions) and local_proxy.
      79            2 :     #[arg(long, default_value_t = 3081)]
      80            0 :     pub internal_http_port: u16,
      81              : 
      82              :     #[arg(short = 'D', long, value_name = "DATADIR")]
      83            0 :     pub pgdata: String,
      84              : 
      85              :     #[arg(short = 'C', long, value_name = "DATABASE_URL")]
      86            0 :     pub connstr: String,
      87              : 
      88              :     #[cfg(target_os = "linux")]
      89              :     #[arg(long, default_value = "neon-postgres")]
      90            0 :     pub cgroup: String,
      91              : 
      92              :     #[cfg(target_os = "linux")]
      93              :     #[arg(
      94              :         long,
      95              :         default_value = "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor"
      96              :     )]
      97            0 :     pub filecache_connstr: String,
      98              : 
      99              :     #[cfg(target_os = "linux")]
     100              :     #[arg(long, default_value = "0.0.0.0:10301")]
     101            0 :     pub vm_monitor_addr: String,
     102              : 
     103              :     #[arg(long, action = clap::ArgAction::SetTrue)]
     104            0 :     pub resize_swap_on_bind: bool,
     105              : 
     106              :     #[arg(long)]
     107              :     pub set_disk_quota_for_fs: Option<String>,
     108              : 
     109              :     #[arg(short = 'c', long)]
     110              :     pub config: Option<OsString>,
     111              : 
     112              :     #[arg(short = 'i', long, group = "compute-id")]
     113            0 :     pub compute_id: String,
     114              : 
     115              :     #[arg(
     116              :         short = 'p',
     117              :         long,
     118              :         conflicts_with = "config",
     119              :         value_name = "CONTROL_PLANE_API_BASE_URL",
     120              :         requires = "compute-id"
     121              :     )]
     122              :     pub control_plane_uri: Option<String>,
     123              : 
     124              :     /// Interval in seconds for collecting installed extensions statistics
     125              :     #[arg(long, default_value = "3600")]
     126            0 :     pub installed_extensions_collection_interval: u64,
     127              : }
     128              : 
     129              : impl Cli {
     130              :     /// Parse a URL from an argument. By default, this isn't necessary, but we
     131              :     /// want to do some sanity checking.
     132            3 :     fn parse_remote_ext_base_url(value: &str) -> Result<Url> {
     133            3 :         // Remove extra trailing slashes, and add one. We use Url::join() later
     134            3 :         // when downloading remote extensions. If the base URL is something like
     135            3 :         // http://example.com/pg-ext-s3-gateway, and join() is called with
     136            3 :         // something like "xyz", the resulting URL is http://example.com/xyz.
     137            3 :         let value = value.trim_end_matches('/').to_owned() + "/";
     138            3 :         let url = Url::parse(&value)?;
     139              : 
     140            3 :         if url.query_pairs().count() != 0 {
     141            1 :             bail!("parameters detected in remote extensions base URL")
     142            2 :         }
     143            2 : 
     144            2 :         Ok(url)
     145            3 :     }
     146              : }
     147              : 
     148            0 : fn main() -> Result<()> {
     149            0 :     let cli = Cli::parse();
     150            0 : 
     151            0 :     let scenario = failpoint_support::init();
     152              : 
     153              :     // For historical reasons, the main thread that processes the config and launches postgres
     154              :     // is synchronous, but we always have this tokio runtime available and we "enter" it so
     155              :     // that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
     156              :     // from all parts of compute_ctl.
     157            0 :     let runtime = tokio::runtime::Builder::new_multi_thread()
     158            0 :         .enable_all()
     159            0 :         .build()?;
     160            0 :     let _rt_guard = runtime.enter();
     161            0 : 
     162            0 :     runtime.block_on(init())?;
     163              : 
     164              :     // enable core dumping for all child processes
     165            0 :     setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
     166              : 
     167            0 :     let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
     168              : 
     169            0 :     let config = get_config(&cli)?;
     170              : 
     171            0 :     let compute_node = ComputeNode::new(
     172            0 :         ComputeNodeParams {
     173            0 :             compute_id: cli.compute_id,
     174            0 :             connstr,
     175            0 :             pgdata: cli.pgdata.clone(),
     176            0 :             pgbin: cli.pgbin.clone(),
     177            0 :             pgversion: get_pg_version_string(&cli.pgbin),
     178            0 :             external_http_port: cli.external_http_port,
     179            0 :             internal_http_port: cli.internal_http_port,
     180            0 :             remote_ext_base_url: cli.remote_ext_base_url.clone(),
     181            0 :             resize_swap_on_bind: cli.resize_swap_on_bind,
     182            0 :             set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
     183            0 :             #[cfg(target_os = "linux")]
     184            0 :             filecache_connstr: cli.filecache_connstr,
     185            0 :             #[cfg(target_os = "linux")]
     186            0 :             cgroup: cli.cgroup,
     187            0 :             #[cfg(target_os = "linux")]
     188            0 :             vm_monitor_addr: cli.vm_monitor_addr,
     189            0 :             installed_extensions_collection_interval: cli.installed_extensions_collection_interval,
     190            0 :         },
     191            0 :         config,
     192            0 :     )?;
     193              : 
     194            0 :     let exit_code = compute_node.run()?;
     195              : 
     196            0 :     scenario.teardown();
     197            0 : 
     198            0 :     deinit_and_exit(exit_code);
     199            0 : }
     200              : 
     201            0 : async fn init() -> Result<()> {
     202            0 :     init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
     203              : 
     204            0 :     let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
     205            0 :     thread::spawn(move || {
     206            0 :         for sig in signals.forever() {
     207            0 :             handle_exit_signal(sig);
     208            0 :         }
     209            0 :     });
     210            0 : 
     211            0 :     info!("compute build_tag: {}", &BUILD_TAG.to_string());
     212              : 
     213            0 :     Ok(())
     214            0 : }
     215              : 
     216            0 : fn get_config(cli: &Cli) -> Result<ComputeConfig> {
     217              :     // First, read the config from the path if provided
     218            0 :     if let Some(ref config) = cli.config {
     219            0 :         let file = File::open(config)?;
     220            0 :         return Ok(serde_json::from_reader(&file)?);
     221            0 :     }
     222            0 : 
     223            0 :     // If the config wasn't provided in the CLI arguments, then retrieve it from
     224            0 :     // the control plane
     225            0 :     match get_config_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
     226            0 :         Ok(config) => Ok(config),
     227            0 :         Err(e) => {
     228            0 :             error!(
     229            0 :                 "cannot get response from control plane: {}\n\
     230            0 :                 neither spec nor confirmation that compute is in the Empty state was received",
     231              :                 e
     232              :             );
     233            0 :             Err(e)
     234              :         }
     235              :     }
     236            0 : }
     237              : 
     238            0 : fn deinit_and_exit(exit_code: Option<i32>) -> ! {
     239            0 :     // Shutdown trace pipeline gracefully, so that it has a chance to send any
     240            0 :     // pending traces before we exit. Shutting down OTEL tracing provider may
     241            0 :     // hang for quite some time, see, for example:
     242            0 :     // - https://github.com/open-telemetry/opentelemetry-rust/issues/868
     243            0 :     // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
     244            0 :     //
     245            0 :     // Yet, we want computes to shut down fast enough, as we may need a new one
     246            0 :     // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
     247            0 :     // complete, then just error out and exit the main thread.
     248            0 :     info!("shutting down tracing");
     249            0 :     let (sender, receiver) = mpsc::channel();
     250            0 :     let _ = thread::spawn(move || {
     251            0 :         tracing_utils::shutdown_tracing();
     252            0 :         sender.send(()).ok()
     253            0 :     });
     254            0 :     let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
     255            0 :     if shutdown_res.is_err() {
     256            0 :         error!("timed out while shutting down tracing, exiting anyway");
     257            0 :     }
     258              : 
     259            0 :     info!("shutting down");
     260            0 :     exit(exit_code.unwrap_or(1))
     261              : }
     262              : 
     263              : /// When compute_ctl is killed, send also termination signal to sync-safekeepers
     264              : /// to prevent leakage. TODO: it is better to convert compute_ctl to async and
     265              : /// wait for termination which would be easy then.
     266            0 : fn handle_exit_signal(sig: i32) {
     267            0 :     info!("received {sig} termination signal");
     268            0 :     forward_termination_signal();
     269            0 :     exit(1);
     270              : }
     271              : 
     272              : #[cfg(test)]
     273              : mod test {
     274              :     use clap::{CommandFactory, Parser};
     275              :     use url::Url;
     276              : 
     277              :     use super::Cli;
     278              : 
     279              :     #[test]
     280            1 :     fn verify_cli() {
     281            1 :         Cli::command().debug_assert()
     282            1 :     }
     283              : 
     284              :     #[test]
     285            1 :     fn verify_remote_ext_base_url() {
     286            1 :         let cli = Cli::parse_from([
     287            1 :             "compute_ctl",
     288            1 :             "--pgdata=test",
     289            1 :             "--connstr=test",
     290            1 :             "--compute-id=test",
     291            1 :             "--remote-ext-base-url",
     292            1 :             "https://example.com/subpath",
     293            1 :         ]);
     294            1 :         assert_eq!(
     295            1 :             cli.remote_ext_base_url.unwrap(),
     296            1 :             Url::parse("https://example.com/subpath/").unwrap()
     297            1 :         );
     298              : 
     299            1 :         let cli = Cli::parse_from([
     300            1 :             "compute_ctl",
     301            1 :             "--pgdata=test",
     302            1 :             "--connstr=test",
     303            1 :             "--compute-id=test",
     304            1 :             "--remote-ext-base-url",
     305            1 :             "https://example.com//",
     306            1 :         ]);
     307            1 :         assert_eq!(
     308            1 :             cli.remote_ext_base_url.unwrap(),
     309            1 :             Url::parse("https://example.com").unwrap()
     310            1 :         );
     311              : 
     312            1 :         Cli::try_parse_from([
     313            1 :             "compute_ctl",
     314            1 :             "--pgdata=test",
     315            1 :             "--connstr=test",
     316            1 :             "--compute-id=test",
     317            1 :             "--remote-ext-base-url",
     318            1 :             "https://example.com?hello=world",
     319            1 :         ])
     320            1 :         .expect_err("URL parameters are not allowed");
     321            1 :     }
     322              : }
        

Generated by: LCOV version 2.1-beta