Line data Source code
1 : //!
2 : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
3 : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
4 : //! initialization:
5 : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
6 : //! - Every start is a fresh start, so the data directory is removed and
7 : //! initialized again on each run.
8 : //! - If remote_extension_config is provided, it will be used to fetch extensions list
9 : //! and download `shared_preload_libraries` from the remote storage.
10 : //! - Next it will put configuration files into the `PGDATA` directory.
11 : //! - Sync safekeepers and get commit LSN.
12 : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
13 : //! - Try to start `postgres` and wait until it is ready to accept connections.
14 : //! - Check and alter/drop/create roles and databases.
15 : //! - Hang waiting on the `postmaster` process to exit.
16 : //!
17 : //! Also `compute_ctl` spawns two separate service threads:
18 : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
19 : //! into the shared `ComputeNode`;
20 : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
21 : //! last activity requests.
22 : //!
23 : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
24 : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
25 : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
26 : //! downscaling and requests immediate upscaling under resource pressure.
27 : //!
28 : //! Usage example:
29 : //! ```sh
30 : //! compute_ctl -D /var/db/postgres/compute \
31 : //! -C 'postgresql://cloud_admin@localhost/postgres' \
32 : //! -c /var/db/postgres/configs/config.json \
33 : //! -b /usr/local/bin/postgres \
34 : //! -r http://pg-ext-s3-gateway \
35 : //! ```
36 : use std::ffi::OsString;
37 : use std::fs::File;
38 : use std::process::exit;
39 : use std::sync::Arc;
40 : use std::sync::atomic::AtomicU64;
41 : use std::sync::mpsc;
42 : use std::thread;
43 : use std::time::Duration;
44 :
45 : use anyhow::{Context, Result, bail};
46 : use clap::Parser;
47 : use compute_api::responses::ComputeConfig;
48 : use compute_tools::compute::{
49 : BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
50 : };
51 : use compute_tools::extension_server::get_pg_version_string;
52 : use compute_tools::params::*;
53 : use compute_tools::pg_isready::get_pg_isready_bin;
54 : use compute_tools::spec::*;
55 : use compute_tools::{hadron_metrics, installed_extensions, logger::*};
56 : use rlimit::{Resource, setrlimit};
57 : use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
58 : use signal_hook::iterator::Signals;
59 : use tracing::{error, info};
60 : use url::Url;
61 : use utils::failpoint_support;
62 :
63 : #[derive(Debug, Parser)]
64 : #[command(rename_all = "kebab-case")]
65 : struct Cli {
66 : #[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
67 : pub pgbin: String,
68 :
69 : /// The base URL for the remote extension storage proxy gateway.
70 : #[arg(short = 'r', long, value_parser = Self::parse_remote_ext_base_url)]
71 : pub remote_ext_base_url: Option<Url>,
72 :
73 : /// The port to bind the external listening HTTP server to. Clients running
74 : /// outside the compute will talk to the compute through this port. Keep
75 : /// the previous name for this argument around for a smoother release
76 : /// with the control plane.
77 : #[arg(long, default_value_t = 3080)]
78 : pub external_http_port: u16,
79 :
80 : /// The port to bind the internal listening HTTP server to. Clients include
81 : /// the neon extension (for installing remote extensions) and local_proxy.
82 : #[arg(long, default_value_t = 3081)]
83 : pub internal_http_port: u16,
84 :
85 : /// Backwards-compatible --http-port for Hadron deployments. Functionally the
86 : /// same as --external-http-port.
87 : #[arg(
88 : long,
89 : conflicts_with = "external_http_port",
90 : conflicts_with = "internal_http_port"
91 : )]
92 : pub http_port: Option<u16>,
93 :
94 : #[arg(short = 'D', long, value_name = "DATADIR")]
95 : pub pgdata: String,
96 :
97 : #[arg(short = 'C', long, value_name = "DATABASE_URL")]
98 : pub connstr: String,
99 :
100 : #[arg(
101 : long,
102 : default_value = "neon_superuser",
103 : value_name = "PRIVILEGED_ROLE_NAME",
104 : value_parser = Self::parse_privileged_role_name
105 : )]
106 : pub privileged_role_name: String,
107 :
108 : #[cfg(target_os = "linux")]
109 : #[arg(long, default_value = "neon-postgres")]
110 : pub cgroup: String,
111 :
112 : #[cfg(target_os = "linux")]
113 : #[arg(
114 : long,
115 : default_value = "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor"
116 : )]
117 : pub filecache_connstr: String,
118 :
119 : #[cfg(target_os = "linux")]
120 : #[arg(long, default_value = "0.0.0.0:10301")]
121 : pub vm_monitor_addr: String,
122 :
123 : #[arg(long, action = clap::ArgAction::SetTrue)]
124 : pub resize_swap_on_bind: bool,
125 :
126 : #[arg(long)]
127 : pub set_disk_quota_for_fs: Option<String>,
128 :
129 : #[arg(short = 'c', long)]
130 : pub config: Option<OsString>,
131 :
132 : #[arg(short = 'i', long, group = "compute-id")]
133 : pub compute_id: String,
134 :
135 : #[arg(
136 : short = 'p',
137 : long,
138 : conflicts_with = "config",
139 : value_name = "CONTROL_PLANE_API_BASE_URL",
140 : requires = "compute-id"
141 : )]
142 : pub control_plane_uri: Option<String>,
143 :
144 : /// Interval in seconds for collecting installed extensions statistics
145 : #[arg(long, default_value = "3600")]
146 : pub installed_extensions_collection_interval: u64,
147 :
148 : /// Run in development mode, skipping VM-specific operations like process termination
149 : #[arg(long, action = clap::ArgAction::SetTrue)]
150 : pub dev: bool,
151 :
152 : #[arg(long)]
153 : pub pg_init_timeout: Option<u64>,
154 :
155 : #[arg(long, default_value_t = false, action = clap::ArgAction::Set)]
156 : pub lakebase_mode: bool,
157 : }
158 :
159 : impl Cli {
160 : /// Parse a URL from an argument. By default, this isn't necessary, but we
161 : /// want to do some sanity checking.
162 3 : fn parse_remote_ext_base_url(value: &str) -> Result<Url> {
163 : // Remove extra trailing slashes, and add one. We use Url::join() later
164 : // when downloading remote extensions. If the base URL is something like
165 : // http://example.com/pg-ext-s3-gateway, and join() is called with
166 : // something like "xyz", the resulting URL is http://example.com/xyz.
167 3 : let value = value.trim_end_matches('/').to_owned() + "/";
168 3 : let url = Url::parse(&value)?;
169 :
170 3 : if url.query_pairs().count() != 0 {
171 1 : bail!("parameters detected in remote extensions base URL")
172 2 : }
173 :
174 2 : Ok(url)
175 3 : }
176 :
177 : /// For simplicity, we do not escape `privileged_role_name` anywhere in the code.
178 : /// Since it's a system role, which we fully control, that's fine. Still, let's
179 : /// validate it to avoid any surprises.
180 6 : fn parse_privileged_role_name(value: &str) -> Result<String> {
181 : use regex::Regex;
182 :
183 6 : let pattern = Regex::new(r"^[a-z_]+$").unwrap();
184 :
185 6 : if !pattern.is_match(value) {
186 3 : bail!("--privileged-role-name can only contain lowercase letters and underscores")
187 3 : }
188 :
189 3 : Ok(value.to_string())
190 6 : }
191 : }
192 :
193 : // Hadron helpers to get compatible compute_ctl http ports from Cli. The old `--http-port`
194 : // arg is used and acts the same as `--external-http-port`. The internal http port is defined
195 : // to be http_port + 1. Hadron runs in the dblet environment which uses the host network, so
196 : // we need to be careful with the ports to choose.
197 0 : fn get_external_http_port(cli: &Cli) -> u16 {
198 0 : if cli.lakebase_mode {
199 0 : return cli.http_port.unwrap_or(cli.external_http_port);
200 0 : }
201 0 : cli.external_http_port
202 0 : }
203 0 : fn get_internal_http_port(cli: &Cli) -> u16 {
204 0 : if cli.lakebase_mode {
205 0 : return cli
206 0 : .http_port
207 0 : .map(|p| p + 1)
208 0 : .unwrap_or(cli.internal_http_port);
209 0 : }
210 0 : cli.internal_http_port
211 0 : }
212 :
213 0 : fn main() -> Result<()> {
214 0 : let cli = Cli::parse();
215 :
216 0 : let scenario = failpoint_support::init();
217 :
218 : // For historical reasons, the main thread that processes the config and launches postgres
219 : // is synchronous, but we always have this tokio runtime available and we "enter" it so
220 : // that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
221 : // from all parts of compute_ctl.
222 0 : let runtime = tokio::runtime::Builder::new_multi_thread()
223 0 : .enable_all()
224 0 : .build()?;
225 0 : let _rt_guard = runtime.enter();
226 :
227 0 : let mut log_dir = None;
228 0 : if cli.lakebase_mode {
229 0 : log_dir = std::env::var("COMPUTE_CTL_LOG_DIRECTORY").ok();
230 0 : }
231 :
232 0 : let (tracing_provider, _file_logs_guard) = init(cli.dev, log_dir)?;
233 :
234 : // enable core dumping for all child processes
235 0 : setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
236 :
237 0 : if cli.lakebase_mode {
238 0 : installed_extensions::initialize_metrics();
239 0 : hadron_metrics::initialize_metrics();
240 0 : }
241 :
242 0 : let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
243 :
244 0 : let config = get_config(&cli)?;
245 :
246 0 : let external_http_port = get_external_http_port(&cli);
247 0 : let internal_http_port = get_internal_http_port(&cli);
248 :
249 0 : let compute_node = ComputeNode::new(
250 0 : ComputeNodeParams {
251 0 : compute_id: cli.compute_id,
252 0 : connstr,
253 0 : privileged_role_name: cli.privileged_role_name.clone(),
254 0 : pgdata: cli.pgdata.clone(),
255 0 : pgbin: cli.pgbin.clone(),
256 0 : pgversion: get_pg_version_string(&cli.pgbin),
257 0 : external_http_port,
258 0 : internal_http_port,
259 0 : remote_ext_base_url: cli.remote_ext_base_url.clone(),
260 0 : resize_swap_on_bind: cli.resize_swap_on_bind,
261 0 : set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
262 0 : #[cfg(target_os = "linux")]
263 0 : filecache_connstr: cli.filecache_connstr,
264 0 : #[cfg(target_os = "linux")]
265 0 : cgroup: cli.cgroup,
266 0 : #[cfg(target_os = "linux")]
267 0 : vm_monitor_addr: cli.vm_monitor_addr,
268 0 : installed_extensions_collection_interval: Arc::new(AtomicU64::new(
269 0 : cli.installed_extensions_collection_interval,
270 0 : )),
271 0 : pg_init_timeout: cli.pg_init_timeout.map(Duration::from_secs),
272 0 : pg_isready_bin: get_pg_isready_bin(&cli.pgbin),
273 0 : instance_id: std::env::var("INSTANCE_ID").ok(),
274 0 : lakebase_mode: cli.lakebase_mode,
275 0 : build_tag: BUILD_TAG.to_string(),
276 0 : control_plane_uri: cli.control_plane_uri,
277 0 : config_path_test_only: cli.config,
278 0 : },
279 0 : config,
280 0 : )?;
281 :
282 0 : let exit_code = compute_node.run()?;
283 :
284 0 : scenario.teardown();
285 :
286 0 : deinit_and_exit(tracing_provider, exit_code);
287 0 : }
288 :
289 0 : fn init(
290 0 : dev_mode: bool,
291 0 : log_dir: Option<String>,
292 0 : ) -> Result<(
293 0 : Option<tracing_utils::Provider>,
294 0 : Option<tracing_appender::non_blocking::WorkerGuard>,
295 0 : )> {
296 0 : let (provider, file_logs_guard) = init_tracing_and_logging(DEFAULT_LOG_LEVEL, &log_dir)?;
297 :
298 0 : let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
299 0 : thread::spawn(move || {
300 0 : for sig in signals.forever() {
301 0 : handle_exit_signal(sig, dev_mode);
302 0 : }
303 0 : });
304 :
305 0 : info!("compute build_tag: {}", &BUILD_TAG.to_string());
306 :
307 0 : Ok((provider, file_logs_guard))
308 0 : }
309 :
310 0 : fn get_config(cli: &Cli) -> Result<ComputeConfig> {
311 : // First, read the config from the path if provided
312 0 : if let Some(ref config) = cli.config {
313 0 : let file = File::open(config)?;
314 0 : return Ok(serde_json::from_reader(&file)?);
315 0 : }
316 :
317 : // If the config wasn't provided in the CLI arguments, then retrieve it from
318 : // the control plane
319 0 : match get_config_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
320 0 : Ok(config) => Ok(config),
321 0 : Err(e) => {
322 0 : error!(
323 0 : "cannot get response from control plane: {}\n\
324 0 : neither spec nor confirmation that compute is in the Empty state was received",
325 : e
326 : );
327 0 : Err(e)
328 : }
329 : }
330 0 : }
331 :
332 0 : fn deinit_and_exit(tracing_provider: Option<tracing_utils::Provider>, exit_code: Option<i32>) -> ! {
333 0 : if let Some(p) = tracing_provider {
334 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
335 : // pending traces before we exit. Shutting down OTEL tracing provider may
336 : // hang for quite some time, see, for example:
337 : // - https://github.com/open-telemetry/opentelemetry-rust/issues/868
338 : // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
339 : //
340 : // Yet, we want computes to shut down fast enough, as we may need a new one
341 : // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
342 : // complete, then just error out and exit the main thread.
343 0 : info!("shutting down tracing");
344 0 : let (sender, receiver) = mpsc::channel();
345 0 : let _ = thread::spawn(move || {
346 0 : _ = p.shutdown();
347 0 : sender.send(()).ok()
348 0 : });
349 0 : let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
350 0 : if shutdown_res.is_err() {
351 0 : error!("timed out while shutting down tracing, exiting anyway");
352 0 : }
353 0 : }
354 :
355 0 : info!("shutting down");
356 0 : exit(exit_code.unwrap_or(1))
357 : }
358 :
359 : /// When compute_ctl is killed, send also termination signal to sync-safekeepers
360 : /// to prevent leakage. TODO: it is better to convert compute_ctl to async and
361 : /// wait for termination which would be easy then.
362 0 : fn handle_exit_signal(sig: i32, dev_mode: bool) {
363 0 : info!("received {sig} termination signal");
364 0 : forward_termination_signal(dev_mode);
365 0 : exit(1);
366 : }
367 :
368 : #[cfg(test)]
369 : mod test {
370 : use clap::{CommandFactory, Parser};
371 : use url::Url;
372 :
373 : use super::Cli;
374 :
375 : #[test]
376 1 : fn verify_cli() {
377 1 : Cli::command().debug_assert()
378 1 : }
379 :
380 : #[test]
381 1 : fn verify_remote_ext_base_url() {
382 1 : let cli = Cli::parse_from([
383 1 : "compute_ctl",
384 1 : "--pgdata=test",
385 1 : "--connstr=test",
386 1 : "--compute-id=test",
387 1 : "--remote-ext-base-url",
388 1 : "https://example.com/subpath",
389 1 : ]);
390 1 : assert_eq!(
391 1 : cli.remote_ext_base_url.unwrap(),
392 1 : Url::parse("https://example.com/subpath/").unwrap()
393 : );
394 :
395 1 : let cli = Cli::parse_from([
396 1 : "compute_ctl",
397 1 : "--pgdata=test",
398 1 : "--connstr=test",
399 1 : "--compute-id=test",
400 1 : "--remote-ext-base-url",
401 1 : "https://example.com//",
402 1 : ]);
403 1 : assert_eq!(
404 1 : cli.remote_ext_base_url.unwrap(),
405 1 : Url::parse("https://example.com").unwrap()
406 : );
407 :
408 1 : Cli::try_parse_from([
409 1 : "compute_ctl",
410 1 : "--pgdata=test",
411 1 : "--connstr=test",
412 1 : "--compute-id=test",
413 1 : "--remote-ext-base-url",
414 1 : "https://example.com?hello=world",
415 1 : ])
416 1 : .expect_err("URL parameters are not allowed");
417 1 : }
418 :
419 : #[test]
420 1 : fn verify_privileged_role_name() {
421 : // Valid name
422 1 : let cli = Cli::parse_from([
423 1 : "compute_ctl",
424 1 : "--pgdata=test",
425 1 : "--connstr=test",
426 1 : "--compute-id=test",
427 1 : "--privileged-role-name",
428 1 : "my_superuser",
429 1 : ]);
430 1 : assert_eq!(cli.privileged_role_name, "my_superuser");
431 :
432 : // Invalid names
433 1 : Cli::try_parse_from([
434 1 : "compute_ctl",
435 1 : "--pgdata=test",
436 1 : "--connstr=test",
437 1 : "--compute-id=test",
438 1 : "--privileged-role-name",
439 1 : "NeonSuperuser",
440 1 : ])
441 1 : .expect_err("uppercase letters are not allowed");
442 :
443 1 : Cli::try_parse_from([
444 1 : "compute_ctl",
445 1 : "--pgdata=test",
446 1 : "--connstr=test",
447 1 : "--compute-id=test",
448 1 : "--privileged-role-name",
449 1 : "$'neon_superuser",
450 1 : ])
451 1 : .expect_err("special characters are not allowed");
452 :
453 1 : Cli::try_parse_from([
454 1 : "compute_ctl",
455 1 : "--pgdata=test",
456 1 : "--connstr=test",
457 1 : "--compute-id=test",
458 1 : "--privileged-role-name",
459 1 : "",
460 1 : ])
461 1 : .expect_err("empty name is not allowed");
462 1 : }
463 : }
|