|             Line data    Source code 
       1              : //!
       2              : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
       3              : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
       4              : //! initialization:
       5              : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
       6              : //! - Every start is a fresh start, so the data directory is removed and
       7              : //!   initialized again on each run.
       8              : //! - If remote_extension_config is provided, it will be used to fetch extensions list
       9              : //!   and download `shared_preload_libraries` from the remote storage.
      10              : //! - Next it will put configuration files into the `PGDATA` directory.
      11              : //! - Sync safekeepers and get commit LSN.
      12              : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
      13              : //! - Try to start `postgres` and wait until it is ready to accept connections.
      14              : //! - Check and alter/drop/create roles and databases.
      15              : //! - Hang waiting on the `postmaster` process to exit.
      16              : //!
      17              : //! Also `compute_ctl` spawns two separate service threads:
      18              : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
      19              : //!   into the shared `ComputeNode`;
      20              : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
      21              : //!   last activity requests.
      22              : //!
      23              : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
      24              : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
      25              : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
      26              : //! downscaling and requests immediate upscaling under resource pressure.
      27              : //!
      28              : //! Usage example:
      29              : //! ```sh
      30              : //! compute_ctl -D /var/db/postgres/compute \
      31              : //!             -C 'postgresql://cloud_admin@localhost/postgres' \
      32              : //!             -c /var/db/postgres/configs/config.json \
      33              : //!             -b /usr/local/bin/postgres \
      34              : //!             -r http://pg-ext-s3-gateway \
      35              : //! ```
      36              : use std::ffi::OsString;
      37              : use std::fs::File;
      38              : use std::process::exit;
      39              : use std::sync::Arc;
      40              : use std::sync::atomic::AtomicU64;
      41              : use std::sync::mpsc;
      42              : use std::thread;
      43              : use std::time::Duration;
      44              : 
      45              : use anyhow::{Context, Result, bail};
      46              : use clap::Parser;
      47              : use compute_api::responses::ComputeConfig;
      48              : use compute_tools::compute::{
      49              :     BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
      50              : };
      51              : use compute_tools::extension_server::get_pg_version_string;
      52              : use compute_tools::params::*;
      53              : use compute_tools::pg_isready::get_pg_isready_bin;
      54              : use compute_tools::spec::*;
      55              : use compute_tools::{hadron_metrics, installed_extensions, logger::*};
      56              : use rlimit::{Resource, setrlimit};
      57              : use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
      58              : use signal_hook::iterator::Signals;
      59              : use tracing::{error, info};
      60              : use url::Url;
      61              : use utils::failpoint_support;
      62              : 
      63              : #[derive(Debug, Parser)]
      64              : #[command(rename_all = "kebab-case")]
      65              : struct Cli {
      66              :     #[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
      67              :     pub pgbin: String,
      68              : 
      69              :     /// The base URL for the remote extension storage proxy gateway.
      70              :     #[arg(short = 'r', long, value_parser = Self::parse_remote_ext_base_url)]
      71              :     pub remote_ext_base_url: Option<Url>,
      72              : 
      73              :     /// The port to bind the external listening HTTP server to. Clients running
      74              :     /// outside the compute will talk to the compute through this port. Keep
      75              :     /// the previous name for this argument around for a smoother release
      76              :     /// with the control plane.
      77              :     #[arg(long, default_value_t = 3080)]
      78              :     pub external_http_port: u16,
      79              : 
      80              :     /// The port to bind the internal listening HTTP server to. Clients include
      81              :     /// the neon extension (for installing remote extensions) and local_proxy.
      82              :     #[arg(long, default_value_t = 3081)]
      83              :     pub internal_http_port: u16,
      84              : 
      85              :     /// Backwards-compatible --http-port for Hadron deployments. Functionally the
      86              :     /// same as --external-http-port.
      87              :     #[arg(
      88              :         long,
      89              :         conflicts_with = "external_http_port",
      90              :         conflicts_with = "internal_http_port"
      91              :     )]
      92              :     pub http_port: Option<u16>,
      93              : 
      94              :     #[arg(short = 'D', long, value_name = "DATADIR")]
      95              :     pub pgdata: String,
      96              : 
      97              :     #[arg(short = 'C', long, value_name = "DATABASE_URL")]
      98              :     pub connstr: String,
      99              : 
     100              :     #[arg(
     101              :         long,
     102              :         default_value = "neon_superuser",
     103              :         value_name = "PRIVILEGED_ROLE_NAME",
     104              :         value_parser = Self::parse_privileged_role_name
     105              :     )]
     106              :     pub privileged_role_name: String,
     107              : 
     108              :     #[cfg(target_os = "linux")]
     109              :     #[arg(long, default_value = "neon-postgres")]
     110              :     pub cgroup: String,
     111              : 
     112              :     #[cfg(target_os = "linux")]
     113              :     #[arg(
     114              :         long,
     115              :         default_value = "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor"
     116              :     )]
     117              :     pub filecache_connstr: String,
     118              : 
     119              :     #[cfg(target_os = "linux")]
     120              :     #[arg(long, default_value = "0.0.0.0:10301")]
     121              :     pub vm_monitor_addr: String,
     122              : 
     123              :     #[arg(long, action = clap::ArgAction::SetTrue)]
     124              :     pub resize_swap_on_bind: bool,
     125              : 
     126              :     #[arg(long)]
     127              :     pub set_disk_quota_for_fs: Option<String>,
     128              : 
     129              :     #[arg(short = 'c', long)]
     130              :     pub config: Option<OsString>,
     131              : 
     132              :     #[arg(short = 'i', long, group = "compute-id")]
     133              :     pub compute_id: String,
     134              : 
     135              :     #[arg(
     136              :         short = 'p',
     137              :         long,
     138              :         conflicts_with = "config",
     139              :         value_name = "CONTROL_PLANE_API_BASE_URL",
     140              :         requires = "compute-id"
     141              :     )]
     142              :     pub control_plane_uri: Option<String>,
     143              : 
     144              :     /// Interval in seconds for collecting installed extensions statistics
     145              :     #[arg(long, default_value = "3600")]
     146              :     pub installed_extensions_collection_interval: u64,
     147              : 
     148              :     /// Run in development mode, skipping VM-specific operations like process termination
     149              :     #[arg(long, action = clap::ArgAction::SetTrue)]
     150              :     pub dev: bool,
     151              : 
     152              :     #[arg(long)]
     153              :     pub pg_init_timeout: Option<u64>,
     154              : 
     155              :     #[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
     156              :     pub lakebase_mode: bool,
     157              : }
     158              : 
     159              : impl Cli {
     160              :     /// Parse a URL from an argument. By default, this isn't necessary, but we
     161              :     /// want to do some sanity checking.
     162            3 :     fn parse_remote_ext_base_url(value: &str) -> Result<Url> {
     163              :         // Remove extra trailing slashes, and add one. We use Url::join() later
     164              :         // when downloading remote extensions. If the base URL is something like
     165              :         // http://example.com/pg-ext-s3-gateway, and join() is called with
     166              :         // something like "xyz", the resulting URL is http://example.com/xyz.
     167            3 :         let value = value.trim_end_matches('/').to_owned() + "/";
     168            3 :         let url = Url::parse(&value)?;
     169              : 
     170            3 :         if url.query_pairs().count() != 0 {
     171            1 :             bail!("parameters detected in remote extensions base URL")
     172            2 :         }
     173              : 
     174            2 :         Ok(url)
     175            3 :     }
     176              : 
     177              :     /// For simplicity, we do not escape `privileged_role_name` anywhere in the code.
     178              :     /// Since it's a system role, which we fully control, that's fine. Still, let's
     179              :     /// validate it to avoid any surprises.
     180            6 :     fn parse_privileged_role_name(value: &str) -> Result<String> {
     181              :         use regex::Regex;
     182              : 
     183            6 :         let pattern = Regex::new(r"^[a-z_]+$").unwrap();
     184              : 
     185            6 :         if !pattern.is_match(value) {
     186            3 :             bail!("--privileged-role-name can only contain lowercase letters and underscores")
     187            3 :         }
     188              : 
     189            3 :         Ok(value.to_string())
     190            6 :     }
     191              : }
     192              : 
     193              : // Hadron helpers to get compatible compute_ctl http ports from Cli. The old `--http-port`
     194              : // arg is used and acts the same as `--external-http-port`. The internal http port is defined
     195              : // to be http_port + 1. Hadron runs in the dblet environment which uses the host network, so
     196              : // we need to be careful with the ports to choose.
     197            0 : fn get_external_http_port(cli: &Cli) -> u16 {
     198            0 :     if cli.lakebase_mode {
     199            0 :         return cli.http_port.unwrap_or(cli.external_http_port);
     200            0 :     }
     201            0 :     cli.external_http_port
     202            0 : }
     203            0 : fn get_internal_http_port(cli: &Cli) -> u16 {
     204            0 :     if cli.lakebase_mode {
     205            0 :         return cli
     206            0 :             .http_port
     207            0 :             .map(|p| p + 1)
     208            0 :             .unwrap_or(cli.internal_http_port);
     209            0 :     }
     210            0 :     cli.internal_http_port
     211            0 : }
     212              : 
     213            0 : fn main() -> Result<()> {
     214            0 :     let cli = Cli::parse();
     215              : 
     216            0 :     let scenario = failpoint_support::init();
     217              : 
     218              :     // For historical reasons, the main thread that processes the config and launches postgres
     219              :     // is synchronous, but we always have this tokio runtime available and we "enter" it so
     220              :     // that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
     221              :     // from all parts of compute_ctl.
     222            0 :     let runtime = tokio::runtime::Builder::new_multi_thread()
     223            0 :         .enable_all()
     224            0 :         .build()?;
     225            0 :     let _rt_guard = runtime.enter();
     226              : 
     227            0 :     let mut log_dir = None;
     228            0 :     if cli.lakebase_mode {
     229            0 :         log_dir = std::env::var("COMPUTE_CTL_LOG_DIRECTORY").ok();
     230            0 :     }
     231              : 
     232            0 :     let (tracing_provider, _file_logs_guard) = init(cli.dev, log_dir)?;
     233              : 
     234              :     // enable core dumping for all child processes
     235            0 :     setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
     236              : 
     237            0 :     if cli.lakebase_mode {
     238            0 :         installed_extensions::initialize_metrics();
     239            0 :         hadron_metrics::initialize_metrics();
     240            0 :     }
     241              : 
     242            0 :     let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
     243              : 
     244            0 :     let config = get_config(&cli)?;
     245              : 
     246            0 :     let external_http_port = get_external_http_port(&cli);
     247            0 :     let internal_http_port = get_internal_http_port(&cli);
     248              : 
     249            0 :     let compute_node = ComputeNode::new(
     250            0 :         ComputeNodeParams {
     251            0 :             compute_id: cli.compute_id,
     252            0 :             connstr,
     253            0 :             privileged_role_name: cli.privileged_role_name.clone(),
     254            0 :             pgdata: cli.pgdata.clone(),
     255            0 :             pgbin: cli.pgbin.clone(),
     256            0 :             pgversion: get_pg_version_string(&cli.pgbin),
     257            0 :             external_http_port,
     258            0 :             internal_http_port,
     259            0 :             remote_ext_base_url: cli.remote_ext_base_url.clone(),
     260            0 :             resize_swap_on_bind: cli.resize_swap_on_bind,
     261            0 :             set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
     262            0 :             #[cfg(target_os = "linux")]
     263            0 :             filecache_connstr: cli.filecache_connstr,
     264            0 :             #[cfg(target_os = "linux")]
     265            0 :             cgroup: cli.cgroup,
     266            0 :             #[cfg(target_os = "linux")]
     267            0 :             vm_monitor_addr: cli.vm_monitor_addr,
     268            0 :             installed_extensions_collection_interval: Arc::new(AtomicU64::new(
     269            0 :                 cli.installed_extensions_collection_interval,
     270            0 :             )),
     271            0 :             pg_init_timeout: cli.pg_init_timeout.map(Duration::from_secs),
     272            0 :             pg_isready_bin: get_pg_isready_bin(&cli.pgbin),
     273            0 :             instance_id: std::env::var("INSTANCE_ID").ok(),
     274            0 :             lakebase_mode: cli.lakebase_mode,
     275            0 :             build_tag: BUILD_TAG.to_string(),
     276            0 :             control_plane_uri: cli.control_plane_uri,
     277            0 :             config_path_test_only: cli.config,
     278            0 :         },
     279            0 :         config,
     280            0 :     )?;
     281              : 
     282            0 :     let exit_code = compute_node.run().context("running compute node")?;
     283              : 
     284            0 :     scenario.teardown();
     285              : 
     286            0 :     deinit_and_exit(tracing_provider, exit_code);
     287            0 : }
     288              : 
     289            0 : fn init(
     290            0 :     dev_mode: bool,
     291            0 :     log_dir: Option<String>,
     292            0 : ) -> Result<(
     293            0 :     Option<tracing_utils::Provider>,
     294            0 :     Option<tracing_appender::non_blocking::WorkerGuard>,
     295            0 : )> {
     296            0 :     let (provider, file_logs_guard) = init_tracing_and_logging(DEFAULT_LOG_LEVEL, &log_dir)?;
     297              : 
     298            0 :     let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
     299            0 :     thread::spawn(move || {
     300            0 :         for sig in signals.forever() {
     301            0 :             handle_exit_signal(sig, dev_mode);
     302            0 :         }
     303            0 :     });
     304              : 
     305            0 :     info!("compute build_tag: {}", &BUILD_TAG.to_string());
     306              : 
     307            0 :     Ok((provider, file_logs_guard))
     308            0 : }
     309              : 
     310            0 : fn get_config(cli: &Cli) -> Result<ComputeConfig> {
     311              :     // First, read the config from the path if provided
     312            0 :     if let Some(ref config) = cli.config {
     313            0 :         let file = File::open(config)?;
     314            0 :         return Ok(serde_json::from_reader(&file)?);
     315            0 :     }
     316              : 
     317              :     // If the config wasn't provided in the CLI arguments, then retrieve it from
     318              :     // the control plane
     319            0 :     match get_config_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
     320            0 :         Ok(config) => Ok(config),
     321            0 :         Err(e) => {
     322            0 :             error!(
     323            0 :                 "cannot get response from control plane: {}\n\
     324            0 :                 neither spec nor confirmation that compute is in the Empty state was received",
     325              :                 e
     326              :             );
     327            0 :             Err(e)
     328              :         }
     329              :     }
     330            0 : }
     331              : 
     332            0 : fn deinit_and_exit(tracing_provider: Option<tracing_utils::Provider>, exit_code: Option<i32>) -> ! {
     333            0 :     if let Some(p) = tracing_provider {
     334              :         // Shutdown trace pipeline gracefully, so that it has a chance to send any
     335              :         // pending traces before we exit. Shutting down OTEL tracing provider may
     336              :         // hang for quite some time, see, for example:
     337              :         // - https://github.com/open-telemetry/opentelemetry-rust/issues/868
     338              :         // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
     339              :         //
     340              :         // Yet, we want computes to shut down fast enough, as we may need a new one
     341              :         // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
     342              :         // complete, then just error out and exit the main thread.
     343            0 :         info!("shutting down tracing");
     344            0 :         let (sender, receiver) = mpsc::channel();
     345            0 :         let _ = thread::spawn(move || {
     346            0 :             _ = p.shutdown();
     347            0 :             sender.send(()).ok()
     348            0 :         });
     349            0 :         let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
     350            0 :         if shutdown_res.is_err() {
     351            0 :             error!("timed out while shutting down tracing, exiting anyway");
     352            0 :         }
     353            0 :     }
     354              : 
     355            0 :     info!("shutting down");
     356            0 :     exit(exit_code.unwrap_or(1))
     357              : }
     358              : 
     359              : /// When compute_ctl is killed, send also termination signal to sync-safekeepers
     360              : /// to prevent leakage. TODO: it is better to convert compute_ctl to async and
     361              : /// wait for termination which would be easy then.
     362            0 : fn handle_exit_signal(sig: i32, dev_mode: bool) {
     363            0 :     info!("received {sig} termination signal");
     364            0 :     forward_termination_signal(dev_mode);
     365            0 :     exit(1);
     366              : }
     367              : 
     368              : #[cfg(test)]
     369              : mod test {
     370              :     use clap::{CommandFactory, Parser};
     371              :     use url::Url;
     372              : 
     373              :     use super::Cli;
     374              : 
     375              :     #[test]
     376            1 :     fn verify_cli() {
     377            1 :         Cli::command().debug_assert()
     378            1 :     }
     379              : 
     380              :     #[test]
     381            1 :     fn verify_remote_ext_base_url() {
     382            1 :         let cli = Cli::parse_from([
     383            1 :             "compute_ctl",
     384            1 :             "--pgdata=test",
     385            1 :             "--connstr=test",
     386            1 :             "--compute-id=test",
     387            1 :             "--remote-ext-base-url",
     388            1 :             "https://example.com/subpath",
     389            1 :         ]);
     390            1 :         assert_eq!(
     391            1 :             cli.remote_ext_base_url.unwrap(),
     392            1 :             Url::parse("https://example.com/subpath/").unwrap()
     393              :         );
     394              : 
     395            1 :         let cli = Cli::parse_from([
     396            1 :             "compute_ctl",
     397            1 :             "--pgdata=test",
     398            1 :             "--connstr=test",
     399            1 :             "--compute-id=test",
     400            1 :             "--remote-ext-base-url",
     401            1 :             "https://example.com//",
     402            1 :         ]);
     403            1 :         assert_eq!(
     404            1 :             cli.remote_ext_base_url.unwrap(),
     405            1 :             Url::parse("https://example.com").unwrap()
     406              :         );
     407              : 
     408            1 :         Cli::try_parse_from([
     409            1 :             "compute_ctl",
     410            1 :             "--pgdata=test",
     411            1 :             "--connstr=test",
     412            1 :             "--compute-id=test",
     413            1 :             "--remote-ext-base-url",
     414            1 :             "https://example.com?hello=world",
     415            1 :         ])
     416            1 :         .expect_err("URL parameters are not allowed");
     417            1 :     }
     418              : 
     419              :     #[test]
     420            1 :     fn verify_privileged_role_name() {
     421              :         // Valid name
     422            1 :         let cli = Cli::parse_from([
     423            1 :             "compute_ctl",
     424            1 :             "--pgdata=test",
     425            1 :             "--connstr=test",
     426            1 :             "--compute-id=test",
     427            1 :             "--privileged-role-name",
     428            1 :             "my_superuser",
     429            1 :         ]);
     430            1 :         assert_eq!(cli.privileged_role_name, "my_superuser");
     431              : 
     432              :         // Invalid names
     433            1 :         Cli::try_parse_from([
     434            1 :             "compute_ctl",
     435            1 :             "--pgdata=test",
     436            1 :             "--connstr=test",
     437            1 :             "--compute-id=test",
     438            1 :             "--privileged-role-name",
     439            1 :             "NeonSuperuser",
     440            1 :         ])
     441            1 :         .expect_err("uppercase letters are not allowed");
     442              : 
     443            1 :         Cli::try_parse_from([
     444            1 :             "compute_ctl",
     445            1 :             "--pgdata=test",
     446            1 :             "--connstr=test",
     447            1 :             "--compute-id=test",
     448            1 :             "--privileged-role-name",
     449            1 :             "$'neon_superuser",
     450            1 :         ])
     451            1 :         .expect_err("special characters are not allowed");
     452              : 
     453            1 :         Cli::try_parse_from([
     454            1 :             "compute_ctl",
     455            1 :             "--pgdata=test",
     456            1 :             "--connstr=test",
     457            1 :             "--compute-id=test",
     458            1 :             "--privileged-role-name",
     459            1 :             "",
     460            1 :         ])
     461            1 :         .expect_err("empty name is not allowed");
     462            1 :     }
     463              : }
         |