Line data Source code
1 : //!
2 : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
3 : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
4 : //! initialization:
5 : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
6 : //! - Every start is a fresh start, so the data directory is removed and
7 : //! initialized again on each run.
8 : //! - If remote_extension_config is provided, it will be used to fetch extensions list
9 : //! and download `shared_preload_libraries` from the remote storage.
10 : //! - Next it will put configuration files into the `PGDATA` directory.
11 : //! - Sync safekeepers and get commit LSN.
12 : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
13 : //! - Try to start `postgres` and wait until it is ready to accept connections.
14 : //! - Check and alter/drop/create roles and databases.
15 : //! - Hang waiting on the `postmaster` process to exit.
16 : //!
17 : //! Also `compute_ctl` spawns two separate service threads:
18 : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
19 : //! into the shared `ComputeNode`;
20 : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
21 : //! last activity requests.
22 : //!
23 : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
24 : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
25 : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
26 : //! downscaling and requests immediate upscaling under resource pressure.
27 : //!
28 : //! Usage example:
29 : //! ```sh
30 : //! compute_ctl -D /var/db/postgres/compute \
31 : //! -C 'postgresql://cloud_admin@localhost/postgres' \
32 : //! -S /var/db/postgres/specs/current.json \
33 : //! -b /usr/local/bin/postgres \
34 : //! -r http://pg-ext-s3-gateway \
35 : //! ```
36 : //!
37 : use std::collections::HashMap;
38 : use std::fs::File;
39 : use std::path::Path;
40 : use std::process::exit;
41 : use std::sync::atomic::Ordering;
42 : use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
43 : use std::{thread, time::Duration};
44 :
45 : use anyhow::{Context, Result};
46 : use chrono::Utc;
47 : use clap::Arg;
48 : use signal_hook::consts::{SIGQUIT, SIGTERM};
49 : use signal_hook::{consts::SIGINT, iterator::Signals};
50 : use tracing::{error, info};
51 : use url::Url;
52 :
53 : use compute_api::responses::ComputeStatus;
54 :
55 : use compute_tools::compute::{
56 : forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
57 : };
58 : use compute_tools::configurator::launch_configurator;
59 : use compute_tools::extension_server::get_pg_version;
60 : use compute_tools::http::api::launch_http_server;
61 : use compute_tools::logger::*;
62 : use compute_tools::monitor::launch_monitor;
63 : use compute_tools::params::*;
64 : use compute_tools::spec::*;
65 :
66 : // this is an arbitrary build tag. Fine as a default / for testing purposes
67 : // in-case of not-set environment var
68 : const BUILD_TAG_DEFAULT: &str = "latest";
69 :
70 0 : fn main() -> Result<()> {
71 0 : init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
72 :
73 0 : let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
74 0 : thread::spawn(move || {
75 0 : for sig in signals.forever() {
76 0 : handle_exit_signal(sig);
77 0 : }
78 0 : });
79 0 :
80 0 : let build_tag = option_env!("BUILD_TAG")
81 0 : .unwrap_or(BUILD_TAG_DEFAULT)
82 0 : .to_string();
83 0 : info!("build_tag: {build_tag}");
84 :
85 0 : let matches = cli().get_matches();
86 0 : let pgbin_default = String::from("postgres");
87 0 : let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
88 0 :
89 0 : let ext_remote_storage = matches
90 0 : .get_one::<String>("remote-ext-config")
91 0 : // Compatibility hack: if the control plane specified any remote-ext-config
92 0 : // use the default value for extension storage proxy gateway.
93 0 : // Remove this once the control plane is updated to pass the gateway URL
94 0 : .map(|conf| {
95 0 : if conf.starts_with("http") {
96 0 : conf.trim_end_matches('/')
97 : } else {
98 0 : "http://pg-ext-s3-gateway"
99 : }
100 0 : });
101 0 :
102 0 : let http_port = *matches
103 0 : .get_one::<u16>("http-port")
104 0 : .expect("http-port is required");
105 0 : let pgdata = matches
106 0 : .get_one::<String>("pgdata")
107 0 : .expect("PGDATA path is required");
108 0 : let connstr = matches
109 0 : .get_one::<String>("connstr")
110 0 : .expect("Postgres connection string is required");
111 0 : let spec_json = matches.get_one::<String>("spec");
112 0 : let spec_path = matches.get_one::<String>("spec-path");
113 0 :
114 0 : // Extract OpenTelemetry context for the startup actions from the
115 0 : // TRACEPARENT and TRACESTATE env variables, and attach it to the current
116 0 : // tracing context.
117 0 : //
118 0 : // This is used to propagate the context for the 'start_compute' operation
119 0 : // from the neon control plane. This allows linking together the wider
120 0 : // 'start_compute' operation that creates the compute container, with the
121 0 : // startup actions here within the container.
122 0 : //
123 0 : // There is no standard for passing context in env variables, but a lot of
124 0 : // tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
125 0 : // https://github.com/open-telemetry/opentelemetry-specification/issues/740
126 0 : //
127 0 : // Switch to the startup context here, and exit it once the startup has
128 0 : // completed and Postgres is up and running.
129 0 : //
130 0 : // If this pod is pre-created without binding it to any particular endpoint
131 0 : // yet, this isn't the right place to enter the startup context. In that
132 0 : // case, the control plane should pass the tracing context as part of the
133 0 : // /configure API call.
134 0 : //
135 0 : // NOTE: This is supposed to only cover the *startup* actions. Once
136 0 : // postgres is configured and up-and-running, we exit this span. Any other
137 0 : // actions that are performed on incoming HTTP requests, for example, are
138 0 : // performed in separate spans.
139 0 : //
140 0 : // XXX: If the pod is restarted, we perform the startup actions in the same
141 0 : // context as the original startup actions, which probably doesn't make
142 0 : // sense.
143 0 : let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
144 0 : if let Ok(val) = std::env::var("TRACEPARENT") {
145 0 : startup_tracing_carrier.insert("traceparent".to_string(), val);
146 0 : }
147 0 : if let Ok(val) = std::env::var("TRACESTATE") {
148 0 : startup_tracing_carrier.insert("tracestate".to_string(), val);
149 0 : }
150 0 : let startup_context_guard = if !startup_tracing_carrier.is_empty() {
151 : use opentelemetry::propagation::TextMapPropagator;
152 : use opentelemetry::sdk::propagation::TraceContextPropagator;
153 0 : let guard = TraceContextPropagator::new()
154 0 : .extract(&startup_tracing_carrier)
155 0 : .attach();
156 0 : info!("startup tracing context attached");
157 0 : Some(guard)
158 : } else {
159 0 : None
160 : };
161 :
162 0 : let compute_id = matches.get_one::<String>("compute-id");
163 0 : let control_plane_uri = matches.get_one::<String>("control-plane-uri");
164 0 :
165 0 : let spec;
166 0 : let mut live_config_allowed = false;
167 0 : match spec_json {
168 : // First, try to get cluster spec from the cli argument
169 0 : Some(json) => {
170 0 : info!("got spec from cli argument {}", json);
171 0 : spec = Some(serde_json::from_str(json)?);
172 : }
173 : None => {
174 : // Second, try to read it from the file if path is provided
175 0 : if let Some(sp) = spec_path {
176 0 : let path = Path::new(sp);
177 0 : let file = File::open(path)?;
178 0 : spec = Some(serde_json::from_reader(file)?);
179 0 : live_config_allowed = true;
180 0 : } else if let Some(id) = compute_id {
181 0 : if let Some(cp_base) = control_plane_uri {
182 0 : live_config_allowed = true;
183 0 : spec = match get_spec_from_control_plane(cp_base, id) {
184 0 : Ok(s) => s,
185 0 : Err(e) => {
186 0 : error!("cannot get response from control plane: {}", e);
187 0 : panic!("neither spec nor confirmation that compute is in the Empty state was received");
188 : }
189 : };
190 : } else {
191 0 : panic!("must specify both --control-plane-uri and --compute-id or none");
192 : }
193 : } else {
194 0 : panic!(
195 0 : "compute spec should be provided by one of the following ways: \
196 0 : --spec OR --spec-path OR --control-plane-uri and --compute-id"
197 0 : );
198 : }
199 : }
200 : };
201 :
202 0 : let mut new_state = ComputeState::new();
203 : let spec_set;
204 :
205 0 : if let Some(spec) = spec {
206 0 : let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
207 0 : info!("new pspec.spec: {:?}", pspec.spec);
208 0 : new_state.pspec = Some(pspec);
209 0 : spec_set = true;
210 0 : } else {
211 0 : spec_set = false;
212 0 : }
213 0 : let compute_node = ComputeNode {
214 0 : connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
215 0 : pgdata: pgdata.to_string(),
216 0 : pgbin: pgbin.to_string(),
217 0 : pgversion: get_pg_version(pgbin),
218 0 : live_config_allowed,
219 0 : state: Mutex::new(new_state),
220 0 : state_changed: Condvar::new(),
221 0 : ext_remote_storage: ext_remote_storage.map(|s| s.to_string()),
222 0 : ext_download_progress: RwLock::new(HashMap::new()),
223 0 : build_tag,
224 0 : };
225 0 : let compute = Arc::new(compute_node);
226 0 :
227 0 : // If this is a pooled VM, prewarm before starting HTTP server and becoming
228 0 : // available for binding. Prewarming helps Postgres start quicker later,
229 0 : // because QEMU will already have it's memory allocated from the host, and
230 0 : // the necessary binaries will already be cached.
231 0 : if !spec_set {
232 0 : compute.prewarm_postgres()?;
233 0 : }
234 :
235 : // Launch http service first, so we were able to serve control-plane
236 : // requests, while configuration is still in progress.
237 0 : let _http_handle =
238 0 : launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
239 0 :
240 0 : let extension_server_port: u16 = http_port;
241 0 :
242 0 : if !spec_set {
243 : // No spec provided, hang waiting for it.
244 0 : info!("no compute spec provided, waiting");
245 :
246 0 : let mut state = compute.state.lock().unwrap();
247 0 : while state.status != ComputeStatus::ConfigurationPending {
248 0 : state = compute.state_changed.wait(state).unwrap();
249 0 :
250 0 : if state.status == ComputeStatus::ConfigurationPending {
251 0 : info!("got spec, continue configuration");
252 : // Spec is already set by the http server handler.
253 0 : break;
254 0 : }
255 : }
256 0 : }
257 :
258 : // We got all we need, update the state.
259 0 : let mut state = compute.state.lock().unwrap();
260 0 :
261 0 : // Record for how long we slept waiting for the spec.
262 0 : state.metrics.wait_for_spec_ms = Utc::now()
263 0 : .signed_duration_since(state.start_time)
264 0 : .to_std()
265 0 : .unwrap()
266 0 : .as_millis() as u64;
267 0 : // Reset start time to the actual start of the configuration, so that
268 0 : // total startup time was properly measured at the end.
269 0 : state.start_time = Utc::now();
270 0 :
271 0 : state.status = ComputeStatus::Init;
272 0 : compute.state_changed.notify_all();
273 0 :
274 0 : info!(
275 0 : "running compute with features: {:?}",
276 0 : state.pspec.as_ref().unwrap().spec.features
277 0 : );
278 0 : drop(state);
279 0 :
280 0 : // Launch remaining service threads
281 0 : let _monitor_handle = launch_monitor(&compute);
282 0 : let _configurator_handle = launch_configurator(&compute);
283 0 :
284 0 : // Start Postgres
285 0 : let mut delay_exit = false;
286 0 : let mut exit_code = None;
287 0 : let pg = match compute.start_compute(extension_server_port) {
288 0 : Ok(pg) => Some(pg),
289 0 : Err(err) => {
290 0 : error!("could not start the compute node: {:#}", err);
291 0 : let mut state = compute.state.lock().unwrap();
292 0 : state.error = Some(format!("{:?}", err));
293 0 : state.status = ComputeStatus::Failed;
294 0 : // Notify others that Postgres failed to start. In case of configuring the
295 0 : // empty compute, it's likely that API handler is still waiting for compute
296 0 : // state change. With this we will notify it that compute is in Failed state,
297 0 : // so control plane will know about it earlier and record proper error instead
298 0 : // of timeout.
299 0 : compute.state_changed.notify_all();
300 0 : drop(state); // unlock
301 0 : delay_exit = true;
302 0 : None
303 : }
304 : };
305 :
306 : // Start the vm-monitor if directed to. The vm-monitor only runs on linux
307 : // because it requires cgroups.
308 : cfg_if::cfg_if! {
309 : if #[cfg(target_os = "linux")] {
310 : use std::env;
311 : use tokio_util::sync::CancellationToken;
312 0 : let vm_monitor_addr = matches
313 0 : .get_one::<String>("vm-monitor-addr")
314 0 : .expect("--vm-monitor-addr should always be set because it has a default arg");
315 0 : let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
316 0 : let cgroup = matches.get_one::<String>("cgroup");
317 :
318 : // Only make a runtime if we need to.
319 : // Note: it seems like you can make a runtime in an inner scope and
320 : // if you start a task in it it won't be dropped. However, make it
321 : // in the outermost scope just to be safe.
322 0 : let rt = if env::var_os("AUTOSCALING").is_some() {
323 0 : Some(
324 0 : tokio::runtime::Builder::new_multi_thread()
325 0 : .worker_threads(4)
326 0 : .enable_all()
327 0 : .build()
328 0 : .expect("failed to create tokio runtime for monitor")
329 0 : )
330 : } else {
331 0 : None
332 : };
333 :
334 : // This token is used internally by the monitor to clean up all threads
335 0 : let token = CancellationToken::new();
336 0 :
337 0 : let vm_monitor = &rt.as_ref().map(|rt| {
338 0 : rt.spawn(vm_monitor::start(
339 0 : Box::leak(Box::new(vm_monitor::Args {
340 0 : cgroup: cgroup.cloned(),
341 0 : pgconnstr: file_cache_connstr.cloned(),
342 0 : addr: vm_monitor_addr.clone(),
343 0 : })),
344 0 : token.clone(),
345 0 : ))
346 0 : });
347 : }
348 : }
349 :
350 : // Wait for the child Postgres process forever. In this state Ctrl+C will
351 : // propagate to Postgres and it will be shut down as well.
352 0 : if let Some((mut pg, logs_handle)) = pg {
353 : // Startup is finished, exit the startup tracing span
354 0 : drop(startup_context_guard);
355 0 :
356 0 : let ecode = pg
357 0 : .wait()
358 0 : .expect("failed to start waiting on Postgres process");
359 0 : PG_PID.store(0, Ordering::SeqCst);
360 0 :
361 0 : // Process has exited, so we can join the logs thread.
362 0 : let _ = logs_handle
363 0 : .join()
364 0 : .map_err(|e| tracing::error!("log thread panicked: {:?}", e));
365 0 :
366 0 : info!("Postgres exited with code {}, shutting down", ecode);
367 0 : exit_code = ecode.code()
368 0 : }
369 :
370 : // Terminate the vm_monitor so it releases the file watcher on
371 : // /sys/fs/cgroup/neon-postgres.
372 : // Note: the vm-monitor only runs on linux because it requires cgroups.
373 : cfg_if::cfg_if! {
374 : if #[cfg(target_os = "linux")] {
375 0 : if let Some(handle) = vm_monitor {
376 0 : // Kills all threads spawned by the monitor
377 0 : token.cancel();
378 0 : // Kills the actual task running the monitor
379 0 : handle.abort();
380 0 :
381 0 : // If handle is some, rt must have been used to produce it, and
382 0 : // hence is also some
383 0 : rt.unwrap().shutdown_timeout(Duration::from_secs(2));
384 0 : }
385 : }
386 : }
387 :
388 : // Maybe sync safekeepers again, to speed up next startup
389 0 : let compute_state = compute.state.lock().unwrap().clone();
390 0 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
391 0 : if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
392 0 : info!("syncing safekeepers on shutdown");
393 0 : let storage_auth_token = pspec.storage_auth_token.clone();
394 0 : let lsn = compute.sync_safekeepers(storage_auth_token)?;
395 0 : info!("synced safekeepers at lsn {lsn}");
396 0 : }
397 :
398 0 : let mut state = compute.state.lock().unwrap();
399 0 : if state.status == ComputeStatus::TerminationPending {
400 0 : state.status = ComputeStatus::Terminated;
401 0 : compute.state_changed.notify_all();
402 0 : // we were asked to terminate gracefully, don't exit to avoid restart
403 0 : delay_exit = true
404 0 : }
405 0 : drop(state);
406 :
407 0 : if let Err(err) = compute.check_for_core_dumps() {
408 0 : error!("error while checking for core dumps: {err:?}");
409 0 : }
410 :
411 : // If launch failed, keep serving HTTP requests for a while, so the cloud
412 : // control plane can get the actual error.
413 0 : if delay_exit {
414 0 : info!("giving control plane 30s to collect the error before shutdown");
415 0 : thread::sleep(Duration::from_secs(30));
416 0 : }
417 :
418 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
419 : // pending traces before we exit. Shutting down OTEL tracing provider may
420 : // hang for quite some time, see, for example:
421 : // - https://github.com/open-telemetry/opentelemetry-rust/issues/868
422 : // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
423 : //
424 : // Yet, we want computes to shut down fast enough, as we may need a new one
425 : // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
426 : // complete, then just error out and exit the main thread.
427 0 : info!("shutting down tracing");
428 0 : let (sender, receiver) = mpsc::channel();
429 0 : let _ = thread::spawn(move || {
430 0 : tracing_utils::shutdown_tracing();
431 0 : sender.send(()).ok()
432 0 : });
433 0 : let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
434 0 : if shutdown_res.is_err() {
435 0 : error!("timed out while shutting down tracing, exiting anyway");
436 0 : }
437 :
438 0 : info!("shutting down");
439 0 : exit(exit_code.unwrap_or(1))
440 0 : }
441 :
442 2 : fn cli() -> clap::Command {
443 2 : // Env variable is set by `cargo`
444 2 : let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
445 2 : clap::Command::new("compute_ctl")
446 2 : .version(version)
447 2 : .arg(
448 2 : Arg::new("http-port")
449 2 : .long("http-port")
450 2 : .value_name("HTTP_PORT")
451 2 : .default_value("3080")
452 2 : .value_parser(clap::value_parser!(u16))
453 2 : .required(false),
454 2 : )
455 2 : .arg(
456 2 : Arg::new("connstr")
457 2 : .short('C')
458 2 : .long("connstr")
459 2 : .value_name("DATABASE_URL")
460 2 : .required(true),
461 2 : )
462 2 : .arg(
463 2 : Arg::new("pgdata")
464 2 : .short('D')
465 2 : .long("pgdata")
466 2 : .value_name("DATADIR")
467 2 : .required(true),
468 2 : )
469 2 : .arg(
470 2 : Arg::new("pgbin")
471 2 : .short('b')
472 2 : .long("pgbin")
473 2 : .default_value("postgres")
474 2 : .value_name("POSTGRES_PATH"),
475 2 : )
476 2 : .arg(
477 2 : Arg::new("spec")
478 2 : .short('s')
479 2 : .long("spec")
480 2 : .value_name("SPEC_JSON"),
481 2 : )
482 2 : .arg(
483 2 : Arg::new("spec-path")
484 2 : .short('S')
485 2 : .long("spec-path")
486 2 : .value_name("SPEC_PATH"),
487 2 : )
488 2 : .arg(
489 2 : Arg::new("compute-id")
490 2 : .short('i')
491 2 : .long("compute-id")
492 2 : .value_name("COMPUTE_ID"),
493 2 : )
494 2 : .arg(
495 2 : Arg::new("control-plane-uri")
496 2 : .short('p')
497 2 : .long("control-plane-uri")
498 2 : .value_name("CONTROL_PLANE_API_BASE_URI"),
499 2 : )
500 2 : .arg(
501 2 : Arg::new("remote-ext-config")
502 2 : .short('r')
503 2 : .long("remote-ext-config")
504 2 : .value_name("REMOTE_EXT_CONFIG"),
505 2 : )
506 2 : // TODO(fprasx): we currently have default arguments because the cloud PR
507 2 : // to pass them in hasn't been merged yet. We should get rid of them once
508 2 : // the PR is merged.
509 2 : .arg(
510 2 : Arg::new("vm-monitor-addr")
511 2 : .long("vm-monitor-addr")
512 2 : .default_value("0.0.0.0:10301")
513 2 : .value_name("VM_MONITOR_ADDR"),
514 2 : )
515 2 : .arg(
516 2 : Arg::new("cgroup")
517 2 : .long("cgroup")
518 2 : .default_value("neon-postgres")
519 2 : .value_name("CGROUP"),
520 2 : )
521 2 : .arg(
522 2 : Arg::new("filecache-connstr")
523 2 : .long("filecache-connstr")
524 2 : .default_value(
525 2 : "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable",
526 2 : )
527 2 : .value_name("FILECACHE_CONNSTR"),
528 2 : )
529 2 : }
530 :
531 : /// When compute_ctl is killed, send also termination signal to sync-safekeepers
532 : /// to prevent leakage. TODO: it is better to convert compute_ctl to async and
533 : /// wait for termination which would be easy then.
534 0 : fn handle_exit_signal(sig: i32) {
535 0 : info!("received {sig} termination signal");
536 0 : forward_termination_signal();
537 0 : exit(1);
538 : }
539 :
540 2 : #[test]
541 2 : fn verify_cli() {
542 2 : cli().debug_assert()
543 2 : }
|