Line data Source code
1 : //!
2 : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
3 : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
4 : //! initialization:
5 : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
6 : //! - Every start is a fresh start, so the data directory is removed and
7 : //! initialized again on each run.
8 : //! - If remote_extension_config is provided, it will be used to fetch extensions list
9 : //! and download `shared_preload_libraries` from the remote storage.
10 : //! - Next it will put configuration files into the `PGDATA` directory.
11 : //! - Sync safekeepers and get commit LSN.
12 : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
13 : //! - Try to start `postgres` and wait until it is ready to accept connections.
14 : //! - Check and alter/drop/create roles and databases.
15 : //! - Hang waiting on the `postmaster` process to exit.
16 : //!
17 : //! Also `compute_ctl` spawns two separate service threads:
18 : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
19 : //! into the shared `ComputeNode`;
20 : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
21 : //! last activity requests.
22 : //!
23 : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
24 : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
25 : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
26 : //! downscaling and requests immediate upscaling under resource pressure.
27 : //!
28 : //! Usage example:
29 : //! ```sh
30 : //! compute_ctl -D /var/db/postgres/compute \
31 : //! -C 'postgresql://cloud_admin@localhost/postgres' \
32 : //! -S /var/db/postgres/specs/current.json \
33 : //! -b /usr/local/bin/postgres \
34 : //! -r http://pg-ext-s3-gateway \
35 : //! ```
36 : use std::collections::HashMap;
37 : use std::fs::File;
38 : use std::path::Path;
39 : use std::process::exit;
40 : use std::str::FromStr;
41 : use std::sync::atomic::Ordering;
42 : use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
43 : use std::{thread, time::Duration};
44 :
45 : use anyhow::{Context, Result};
46 : use chrono::Utc;
47 : use clap::Arg;
48 : use compute_tools::disk_quota::set_disk_quota;
49 : use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
50 : use signal_hook::consts::{SIGQUIT, SIGTERM};
51 : use signal_hook::{consts::SIGINT, iterator::Signals};
52 : use tracing::{error, info, warn};
53 : use url::Url;
54 :
55 : use compute_api::responses::ComputeStatus;
56 : use compute_api::spec::ComputeSpec;
57 :
58 : use compute_tools::compute::{
59 : forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
60 : };
61 : use compute_tools::configurator::launch_configurator;
62 : use compute_tools::extension_server::get_pg_version_string;
63 : use compute_tools::http::launch_http_server;
64 : use compute_tools::logger::*;
65 : use compute_tools::monitor::launch_monitor;
66 : use compute_tools::params::*;
67 : use compute_tools::spec::*;
68 : use compute_tools::swap::resize_swap;
69 : use rlimit::{setrlimit, Resource};
70 : use utils::failpoint_support;
71 :
72 : // this is an arbitrary build tag. Fine as a default / for testing purposes
73 : // in-case of not-set environment var
74 : const BUILD_TAG_DEFAULT: &str = "latest";
75 :
76 0 : fn main() -> Result<()> {
77 0 : let scenario = failpoint_support::init();
78 :
79 0 : let (build_tag, clap_args) = init()?;
80 :
81 : // enable core dumping for all child processes
82 0 : setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
83 :
84 0 : let (pg_handle, start_pg_result) = {
85 : // Enter startup tracing context
86 0 : let _startup_context_guard = startup_context_from_env();
87 :
88 0 : let cli_args = process_cli(&clap_args)?;
89 :
90 0 : let cli_spec = try_spec_from_cli(&clap_args, &cli_args)?;
91 :
92 0 : let wait_spec_result = wait_spec(build_tag, cli_args, cli_spec)?;
93 :
94 0 : start_postgres(&clap_args, wait_spec_result)?
95 :
96 : // Startup is finished, exit the startup tracing span
97 : };
98 :
99 : // PostgreSQL is now running, if startup was successful. Wait until it exits.
100 0 : let wait_pg_result = wait_postgres(pg_handle)?;
101 :
102 0 : let delay_exit = cleanup_after_postgres_exit(start_pg_result)?;
103 :
104 0 : maybe_delay_exit(delay_exit);
105 0 :
106 0 : scenario.teardown();
107 0 :
108 0 : deinit_and_exit(wait_pg_result);
109 0 : }
110 :
111 0 : fn init() -> Result<(String, clap::ArgMatches)> {
112 0 : init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
113 :
114 0 : let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
115 0 : thread::spawn(move || {
116 0 : for sig in signals.forever() {
117 0 : handle_exit_signal(sig);
118 0 : }
119 0 : });
120 0 :
121 0 : let build_tag = option_env!("BUILD_TAG")
122 0 : .unwrap_or(BUILD_TAG_DEFAULT)
123 0 : .to_string();
124 0 : info!("build_tag: {build_tag}");
125 :
126 0 : Ok((build_tag, cli().get_matches()))
127 0 : }
128 :
129 0 : fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
130 0 : let pgbin_default = "postgres";
131 0 : let pgbin = matches
132 0 : .get_one::<String>("pgbin")
133 0 : .map(|s| s.as_str())
134 0 : .unwrap_or(pgbin_default);
135 0 :
136 0 : let ext_remote_storage = matches
137 0 : .get_one::<String>("remote-ext-config")
138 0 : // Compatibility hack: if the control plane specified any remote-ext-config
139 0 : // use the default value for extension storage proxy gateway.
140 0 : // Remove this once the control plane is updated to pass the gateway URL
141 0 : .map(|conf| {
142 0 : if conf.starts_with("http") {
143 0 : conf.trim_end_matches('/')
144 : } else {
145 0 : "http://pg-ext-s3-gateway"
146 : }
147 0 : });
148 0 :
149 0 : let http_port = *matches
150 0 : .get_one::<u16>("http-port")
151 0 : .expect("http-port is required");
152 0 : let pgdata = matches
153 0 : .get_one::<String>("pgdata")
154 0 : .expect("PGDATA path is required");
155 0 : let connstr = matches
156 0 : .get_one::<String>("connstr")
157 0 : .expect("Postgres connection string is required");
158 0 : let spec_json = matches.get_one::<String>("spec");
159 0 : let spec_path = matches.get_one::<String>("spec-path");
160 0 : let resize_swap_on_bind = matches.get_flag("resize-swap-on-bind");
161 0 : let set_disk_quota_for_fs = matches.get_one::<String>("set-disk-quota-for-fs");
162 0 :
163 0 : Ok(ProcessCliResult {
164 0 : connstr,
165 0 : pgdata,
166 0 : pgbin,
167 0 : ext_remote_storage,
168 0 : http_port,
169 0 : spec_json,
170 0 : spec_path,
171 0 : resize_swap_on_bind,
172 0 : set_disk_quota_for_fs,
173 0 : })
174 0 : }
175 :
176 : struct ProcessCliResult<'clap> {
177 : connstr: &'clap str,
178 : pgdata: &'clap str,
179 : pgbin: &'clap str,
180 : ext_remote_storage: Option<&'clap str>,
181 : http_port: u16,
182 : spec_json: Option<&'clap String>,
183 : spec_path: Option<&'clap String>,
184 : resize_swap_on_bind: bool,
185 : set_disk_quota_for_fs: Option<&'clap String>,
186 : }
187 :
188 0 : fn startup_context_from_env() -> Option<opentelemetry::ContextGuard> {
189 0 : // Extract OpenTelemetry context for the startup actions from the
190 0 : // TRACEPARENT and TRACESTATE env variables, and attach it to the current
191 0 : // tracing context.
192 0 : //
193 0 : // This is used to propagate the context for the 'start_compute' operation
194 0 : // from the neon control plane. This allows linking together the wider
195 0 : // 'start_compute' operation that creates the compute container, with the
196 0 : // startup actions here within the container.
197 0 : //
198 0 : // There is no standard for passing context in env variables, but a lot of
199 0 : // tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
200 0 : // https://github.com/open-telemetry/opentelemetry-specification/issues/740
201 0 : //
202 0 : // Switch to the startup context here, and exit it once the startup has
203 0 : // completed and Postgres is up and running.
204 0 : //
205 0 : // If this pod is pre-created without binding it to any particular endpoint
206 0 : // yet, this isn't the right place to enter the startup context. In that
207 0 : // case, the control plane should pass the tracing context as part of the
208 0 : // /configure API call.
209 0 : //
210 0 : // NOTE: This is supposed to only cover the *startup* actions. Once
211 0 : // postgres is configured and up-and-running, we exit this span. Any other
212 0 : // actions that are performed on incoming HTTP requests, for example, are
213 0 : // performed in separate spans.
214 0 : //
215 0 : // XXX: If the pod is restarted, we perform the startup actions in the same
216 0 : // context as the original startup actions, which probably doesn't make
217 0 : // sense.
218 0 : let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
219 0 : if let Ok(val) = std::env::var("TRACEPARENT") {
220 0 : startup_tracing_carrier.insert("traceparent".to_string(), val);
221 0 : }
222 0 : if let Ok(val) = std::env::var("TRACESTATE") {
223 0 : startup_tracing_carrier.insert("tracestate".to_string(), val);
224 0 : }
225 0 : if !startup_tracing_carrier.is_empty() {
226 : use opentelemetry::propagation::TextMapPropagator;
227 : use opentelemetry_sdk::propagation::TraceContextPropagator;
228 0 : let guard = TraceContextPropagator::new()
229 0 : .extract(&startup_tracing_carrier)
230 0 : .attach();
231 0 : info!("startup tracing context attached");
232 0 : Some(guard)
233 : } else {
234 0 : None
235 : }
236 0 : }
237 :
238 0 : fn try_spec_from_cli(
239 0 : matches: &clap::ArgMatches,
240 0 : ProcessCliResult {
241 0 : spec_json,
242 0 : spec_path,
243 0 : ..
244 0 : }: &ProcessCliResult,
245 0 : ) -> Result<CliSpecParams> {
246 0 : let compute_id = matches.get_one::<String>("compute-id");
247 0 : let control_plane_uri = matches.get_one::<String>("control-plane-uri");
248 :
249 : // First, try to get cluster spec from the cli argument
250 0 : if let Some(spec_json) = spec_json {
251 0 : info!("got spec from cli argument {}", spec_json);
252 : return Ok(CliSpecParams {
253 0 : spec: Some(serde_json::from_str(spec_json)?),
254 : live_config_allowed: false,
255 : });
256 0 : }
257 :
258 : // Second, try to read it from the file if path is provided
259 0 : if let Some(spec_path) = spec_path {
260 0 : let file = File::open(Path::new(spec_path))?;
261 : return Ok(CliSpecParams {
262 0 : spec: Some(serde_json::from_reader(file)?),
263 : live_config_allowed: true,
264 : });
265 0 : }
266 :
267 0 : let Some(compute_id) = compute_id else {
268 0 : panic!(
269 0 : "compute spec should be provided by one of the following ways: \
270 0 : --spec OR --spec-path OR --control-plane-uri and --compute-id"
271 0 : );
272 : };
273 0 : let Some(control_plane_uri) = control_plane_uri else {
274 0 : panic!("must specify both --control-plane-uri and --compute-id or none");
275 : };
276 :
277 0 : match get_spec_from_control_plane(control_plane_uri, compute_id) {
278 0 : Ok(spec) => Ok(CliSpecParams {
279 0 : spec,
280 0 : live_config_allowed: true,
281 0 : }),
282 0 : Err(e) => {
283 0 : error!(
284 0 : "cannot get response from control plane: {}\n\
285 0 : neither spec nor confirmation that compute is in the Empty state was received",
286 : e
287 : );
288 0 : Err(e)
289 : }
290 : }
291 0 : }
292 :
293 : struct CliSpecParams {
294 : /// If a spec was provided via CLI or file, the [`ComputeSpec`]
295 : spec: Option<ComputeSpec>,
296 : live_config_allowed: bool,
297 : }
298 :
299 0 : fn wait_spec(
300 0 : build_tag: String,
301 0 : ProcessCliResult {
302 0 : connstr,
303 0 : pgdata,
304 0 : pgbin,
305 0 : ext_remote_storage,
306 0 : resize_swap_on_bind,
307 0 : set_disk_quota_for_fs,
308 0 : http_port,
309 0 : ..
310 0 : }: ProcessCliResult,
311 0 : CliSpecParams {
312 0 : spec,
313 0 : live_config_allowed,
314 0 : }: CliSpecParams,
315 0 : ) -> Result<WaitSpecResult> {
316 0 : let mut new_state = ComputeState::new();
317 : let spec_set;
318 :
319 0 : if let Some(spec) = spec {
320 0 : let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
321 0 : info!("new pspec.spec: {:?}", pspec.spec);
322 0 : new_state.pspec = Some(pspec);
323 0 : spec_set = true;
324 0 : } else {
325 0 : spec_set = false;
326 0 : }
327 0 : let connstr = Url::parse(connstr).context("cannot parse connstr as a URL")?;
328 0 : let conn_conf = postgres::config::Config::from_str(connstr.as_str())
329 0 : .context("cannot build postgres config from connstr")?;
330 0 : let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr.as_str())
331 0 : .context("cannot build tokio postgres config from connstr")?;
332 0 : let compute_node = ComputeNode {
333 0 : connstr,
334 0 : conn_conf,
335 0 : tokio_conn_conf,
336 0 : pgdata: pgdata.to_string(),
337 0 : pgbin: pgbin.to_string(),
338 0 : pgversion: get_pg_version_string(pgbin),
339 0 : http_port,
340 0 : live_config_allowed,
341 0 : state: Mutex::new(new_state),
342 0 : state_changed: Condvar::new(),
343 0 : ext_remote_storage: ext_remote_storage.map(|s| s.to_string()),
344 0 : ext_download_progress: RwLock::new(HashMap::new()),
345 0 : build_tag,
346 0 : };
347 0 : let compute = Arc::new(compute_node);
348 0 :
349 0 : // If this is a pooled VM, prewarm before starting HTTP server and becoming
350 0 : // available for binding. Prewarming helps Postgres start quicker later,
351 0 : // because QEMU will already have its memory allocated from the host, and
352 0 : // the necessary binaries will already be cached.
353 0 : if !spec_set {
354 0 : compute.prewarm_postgres()?;
355 0 : }
356 :
357 : // Launch http service first, so that we can serve control-plane requests
358 : // while configuration is still in progress.
359 0 : let _http_handle =
360 0 : launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
361 0 :
362 0 : if !spec_set {
363 : // No spec provided, hang waiting for it.
364 0 : info!("no compute spec provided, waiting");
365 :
366 0 : let mut state = compute.state.lock().unwrap();
367 0 : while state.status != ComputeStatus::ConfigurationPending {
368 0 : state = compute.state_changed.wait(state).unwrap();
369 0 :
370 0 : if state.status == ComputeStatus::ConfigurationPending {
371 0 : info!("got spec, continue configuration");
372 : // Spec is already set by the http server handler.
373 0 : break;
374 0 : }
375 : }
376 :
377 : // Record for how long we slept waiting for the spec.
378 0 : let now = Utc::now();
379 0 : state.metrics.wait_for_spec_ms = now
380 0 : .signed_duration_since(state.start_time)
381 0 : .to_std()
382 0 : .unwrap()
383 0 : .as_millis() as u64;
384 0 :
385 0 : // Reset start time, so that the total startup time that is calculated later will
386 0 : // not include the time that we waited for the spec.
387 0 : state.start_time = now;
388 0 : }
389 :
390 0 : launch_lsn_lease_bg_task_for_static(&compute);
391 0 :
392 0 : Ok(WaitSpecResult {
393 0 : compute,
394 0 : resize_swap_on_bind,
395 0 : set_disk_quota_for_fs: set_disk_quota_for_fs.cloned(),
396 0 : })
397 0 : }
398 :
399 : struct WaitSpecResult {
400 : compute: Arc<ComputeNode>,
401 : resize_swap_on_bind: bool,
402 : set_disk_quota_for_fs: Option<String>,
403 : }
404 :
405 0 : fn start_postgres(
406 0 : // need to allow unused because `matches` is only used if target_os = "linux"
407 0 : #[allow(unused_variables)] matches: &clap::ArgMatches,
408 0 : WaitSpecResult {
409 0 : compute,
410 0 : resize_swap_on_bind,
411 0 : set_disk_quota_for_fs,
412 0 : }: WaitSpecResult,
413 0 : ) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
414 0 : // We got all we need, update the state.
415 0 : let mut state = compute.state.lock().unwrap();
416 0 : state.set_status(ComputeStatus::Init, &compute.state_changed);
417 0 :
418 0 : info!(
419 0 : "running compute with features: {:?}",
420 0 : state.pspec.as_ref().unwrap().spec.features
421 : );
422 : // before we release the mutex, fetch some parameters for later.
423 : let &ComputeSpec {
424 0 : swap_size_bytes,
425 0 : disk_quota_bytes,
426 0 : #[cfg(target_os = "linux")]
427 0 : disable_lfc_resizing,
428 0 : ..
429 0 : } = &state.pspec.as_ref().unwrap().spec;
430 0 : drop(state);
431 0 :
432 0 : // Launch remaining service threads
433 0 : let _monitor_handle = launch_monitor(&compute);
434 0 : let _configurator_handle = launch_configurator(&compute);
435 0 :
436 0 : let mut prestartup_failed = false;
437 0 : let mut delay_exit = false;
438 :
439 : // Resize swap to the desired size if the compute spec says so
440 0 : if let (Some(size_bytes), true) = (swap_size_bytes, resize_swap_on_bind) {
441 : // To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion
442 : // *before* starting postgres.
443 : //
444 : // In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this
445 : // carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets
446 : // OOM-killed during startup because swap wasn't available yet.
447 0 : match resize_swap(size_bytes) {
448 : Ok(()) => {
449 0 : let size_mib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
450 0 : info!(%size_bytes, %size_mib, "resized swap");
451 : }
452 0 : Err(err) => {
453 0 : let err = err.context("failed to resize swap");
454 0 : error!("{err:#}");
455 :
456 : // Mark compute startup as failed; don't try to start postgres, and report this
457 : // error to the control plane when it next asks.
458 0 : prestartup_failed = true;
459 0 : compute.set_failed_status(err);
460 0 : delay_exit = true;
461 : }
462 : }
463 0 : }
464 :
465 : // Set disk quota if the compute spec says so
466 0 : if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
467 0 : (disk_quota_bytes, set_disk_quota_for_fs)
468 : {
469 0 : match set_disk_quota(disk_quota_bytes, &disk_quota_fs_mountpoint) {
470 : Ok(()) => {
471 0 : let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
472 0 : info!(%disk_quota_bytes, %size_mib, "set disk quota");
473 : }
474 0 : Err(err) => {
475 0 : let err = err.context("failed to set disk quota");
476 0 : error!("{err:#}");
477 :
478 : // Mark compute startup as failed; don't try to start postgres, and report this
479 : // error to the control plane when it next asks.
480 0 : prestartup_failed = true;
481 0 : compute.set_failed_status(err);
482 0 : delay_exit = true;
483 : }
484 : }
485 0 : }
486 :
487 : // Start Postgres
488 0 : let mut pg = None;
489 0 : if !prestartup_failed {
490 0 : pg = match compute.start_compute() {
491 0 : Ok(pg) => {
492 0 : info!(postmaster_pid = %pg.0.id(), "Postgres was started");
493 0 : Some(pg)
494 : }
495 0 : Err(err) => {
496 0 : error!("could not start the compute node: {:#}", err);
497 0 : compute.set_failed_status(err);
498 0 : delay_exit = true;
499 0 : None
500 : }
501 : };
502 : } else {
503 0 : warn!("skipping postgres startup because pre-startup step failed");
504 : }
505 :
506 : // Start the vm-monitor if directed to. The vm-monitor only runs on linux
507 : // because it requires cgroups.
508 : cfg_if::cfg_if! {
509 : if #[cfg(target_os = "linux")] {
510 : use std::env;
511 : use tokio_util::sync::CancellationToken;
512 0 : let vm_monitor_addr = matches
513 0 : .get_one::<String>("vm-monitor-addr")
514 0 : .expect("--vm-monitor-addr should always be set because it has a default arg");
515 0 : let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
516 0 : let cgroup = matches.get_one::<String>("cgroup");
517 :
518 : // Only make a runtime if we need to.
519 : // Note: it seems like you can make a runtime in an inner scope and
520 : // if you start a task in it it won't be dropped. However, make it
521 : // in the outermost scope just to be safe.
522 0 : let rt = if env::var_os("AUTOSCALING").is_some() {
523 0 : Some(
524 0 : tokio::runtime::Builder::new_multi_thread()
525 0 : .worker_threads(4)
526 0 : .enable_all()
527 0 : .build()
528 0 : .expect("failed to create tokio runtime for monitor")
529 0 : )
530 : } else {
531 0 : None
532 : };
533 :
534 : // This token is used internally by the monitor to clean up all threads
535 0 : let token = CancellationToken::new();
536 :
537 : // don't pass postgres connection string to vm-monitor if we don't want it to resize LFC
538 0 : let pgconnstr = if disable_lfc_resizing.unwrap_or(false) {
539 0 : None
540 : } else {
541 0 : file_cache_connstr.cloned()
542 : };
543 :
544 0 : let vm_monitor = rt.as_ref().map(|rt| {
545 0 : rt.spawn(vm_monitor::start(
546 0 : Box::leak(Box::new(vm_monitor::Args {
547 0 : cgroup: cgroup.cloned(),
548 0 : pgconnstr,
549 0 : addr: vm_monitor_addr.clone(),
550 0 : })),
551 0 : token.clone(),
552 0 : ))
553 0 : });
554 0 : }
555 0 : }
556 0 :
557 0 : Ok((
558 0 : pg,
559 0 : StartPostgresResult {
560 0 : delay_exit,
561 0 : compute,
562 0 : #[cfg(target_os = "linux")]
563 0 : rt,
564 0 : #[cfg(target_os = "linux")]
565 0 : token,
566 0 : #[cfg(target_os = "linux")]
567 0 : vm_monitor,
568 0 : },
569 0 : ))
570 0 : }
571 :
572 : type PostgresHandle = (std::process::Child, std::thread::JoinHandle<()>);
573 :
574 : struct StartPostgresResult {
575 : delay_exit: bool,
576 : // passed through from WaitSpecResult
577 : compute: Arc<ComputeNode>,
578 :
579 : #[cfg(target_os = "linux")]
580 : rt: Option<tokio::runtime::Runtime>,
581 : #[cfg(target_os = "linux")]
582 : token: tokio_util::sync::CancellationToken,
583 : #[cfg(target_os = "linux")]
584 : vm_monitor: Option<tokio::task::JoinHandle<Result<()>>>,
585 : }
586 :
587 0 : fn wait_postgres(pg: Option<PostgresHandle>) -> Result<WaitPostgresResult> {
588 0 : // Wait for the child Postgres process forever. In this state Ctrl+C will
589 0 : // propagate to Postgres and it will be shut down as well.
590 0 : let mut exit_code = None;
591 0 : if let Some((mut pg, logs_handle)) = pg {
592 0 : info!(postmaster_pid = %pg.id(), "Waiting for Postgres to exit");
593 :
594 0 : let ecode = pg
595 0 : .wait()
596 0 : .expect("failed to start waiting on Postgres process");
597 0 : PG_PID.store(0, Ordering::SeqCst);
598 0 :
599 0 : // Process has exited, so we can join the logs thread.
600 0 : let _ = logs_handle
601 0 : .join()
602 0 : .map_err(|e| tracing::error!("log thread panicked: {:?}", e));
603 0 :
604 0 : info!("Postgres exited with code {}, shutting down", ecode);
605 0 : exit_code = ecode.code()
606 0 : }
607 :
608 0 : Ok(WaitPostgresResult { exit_code })
609 0 : }
610 :
611 : struct WaitPostgresResult {
612 : exit_code: Option<i32>,
613 : }
614 :
615 0 : fn cleanup_after_postgres_exit(
616 0 : StartPostgresResult {
617 0 : mut delay_exit,
618 0 : compute,
619 0 : #[cfg(target_os = "linux")]
620 0 : vm_monitor,
621 0 : #[cfg(target_os = "linux")]
622 0 : token,
623 0 : #[cfg(target_os = "linux")]
624 0 : rt,
625 0 : }: StartPostgresResult,
626 0 : ) -> Result<bool> {
627 : // Terminate the vm_monitor so it releases the file watcher on
628 : // /sys/fs/cgroup/neon-postgres.
629 : // Note: the vm-monitor only runs on linux because it requires cgroups.
630 : cfg_if::cfg_if! {
631 : if #[cfg(target_os = "linux")] {
632 0 : if let Some(handle) = vm_monitor {
633 0 : // Kills all threads spawned by the monitor
634 0 : token.cancel();
635 0 : // Kills the actual task running the monitor
636 0 : handle.abort();
637 0 :
638 0 : // If handle is some, rt must have been used to produce it, and
639 0 : // hence is also some
640 0 : rt.unwrap().shutdown_timeout(Duration::from_secs(2));
641 0 : }
642 : }
643 : }
644 :
645 : // Maybe sync safekeepers again, to speed up next startup
646 0 : let compute_state = compute.state.lock().unwrap().clone();
647 0 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
648 0 : if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
649 0 : info!("syncing safekeepers on shutdown");
650 0 : let storage_auth_token = pspec.storage_auth_token.clone();
651 0 : let lsn = compute.sync_safekeepers(storage_auth_token)?;
652 0 : info!("synced safekeepers at lsn {lsn}");
653 0 : }
654 :
655 0 : let mut state = compute.state.lock().unwrap();
656 0 : if state.status == ComputeStatus::TerminationPending {
657 0 : state.status = ComputeStatus::Terminated;
658 0 : compute.state_changed.notify_all();
659 0 : // we were asked to terminate gracefully, don't exit to avoid restart
660 0 : delay_exit = true
661 0 : }
662 0 : drop(state);
663 :
664 0 : if let Err(err) = compute.check_for_core_dumps() {
665 0 : error!("error while checking for core dumps: {err:?}");
666 0 : }
667 :
668 0 : Ok(delay_exit)
669 0 : }
670 :
671 0 : fn maybe_delay_exit(delay_exit: bool) {
672 0 : // If launch failed, keep serving HTTP requests for a while, so the cloud
673 0 : // control plane can get the actual error.
674 0 : if delay_exit {
675 0 : info!("giving control plane 30s to collect the error before shutdown");
676 0 : thread::sleep(Duration::from_secs(30));
677 0 : }
678 0 : }
679 :
680 0 : fn deinit_and_exit(WaitPostgresResult { exit_code }: WaitPostgresResult) -> ! {
681 0 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
682 0 : // pending traces before we exit. Shutting down OTEL tracing provider may
683 0 : // hang for quite some time, see, for example:
684 0 : // - https://github.com/open-telemetry/opentelemetry-rust/issues/868
685 0 : // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
686 0 : //
687 0 : // Yet, we want computes to shut down fast enough, as we may need a new one
688 0 : // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
689 0 : // complete, then just error out and exit the main thread.
690 0 : info!("shutting down tracing");
691 0 : let (sender, receiver) = mpsc::channel();
692 0 : let _ = thread::spawn(move || {
693 0 : tracing_utils::shutdown_tracing();
694 0 : sender.send(()).ok()
695 0 : });
696 0 : let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
697 0 : if shutdown_res.is_err() {
698 0 : error!("timed out while shutting down tracing, exiting anyway");
699 0 : }
700 :
701 0 : info!("shutting down");
702 0 : exit(exit_code.unwrap_or(1))
703 : }
704 :
705 1 : fn cli() -> clap::Command {
706 1 : // Env variable is set by `cargo`
707 1 : let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
708 1 : clap::Command::new("compute_ctl")
709 1 : .version(version)
710 1 : .arg(
711 1 : Arg::new("http-port")
712 1 : .long("http-port")
713 1 : .value_name("HTTP_PORT")
714 1 : .default_value("3080")
715 1 : .value_parser(clap::value_parser!(u16))
716 1 : .required(false),
717 1 : )
718 1 : .arg(
719 1 : Arg::new("connstr")
720 1 : .short('C')
721 1 : .long("connstr")
722 1 : .value_name("DATABASE_URL")
723 1 : .required(true),
724 1 : )
725 1 : .arg(
726 1 : Arg::new("pgdata")
727 1 : .short('D')
728 1 : .long("pgdata")
729 1 : .value_name("DATADIR")
730 1 : .required(true),
731 1 : )
732 1 : .arg(
733 1 : Arg::new("pgbin")
734 1 : .short('b')
735 1 : .long("pgbin")
736 1 : .default_value("postgres")
737 1 : .value_name("POSTGRES_PATH"),
738 1 : )
739 1 : .arg(
740 1 : Arg::new("spec")
741 1 : .short('s')
742 1 : .long("spec")
743 1 : .value_name("SPEC_JSON"),
744 1 : )
745 1 : .arg(
746 1 : Arg::new("spec-path")
747 1 : .short('S')
748 1 : .long("spec-path")
749 1 : .value_name("SPEC_PATH"),
750 1 : )
751 1 : .arg(
752 1 : Arg::new("compute-id")
753 1 : .short('i')
754 1 : .long("compute-id")
755 1 : .value_name("COMPUTE_ID"),
756 1 : )
757 1 : .arg(
758 1 : Arg::new("control-plane-uri")
759 1 : .short('p')
760 1 : .long("control-plane-uri")
761 1 : .value_name("CONTROL_PLANE_API_BASE_URI"),
762 1 : )
763 1 : .arg(
764 1 : Arg::new("remote-ext-config")
765 1 : .short('r')
766 1 : .long("remote-ext-config")
767 1 : .value_name("REMOTE_EXT_CONFIG"),
768 1 : )
769 1 : // TODO(fprasx): we currently have default arguments because the cloud PR
770 1 : // to pass them in hasn't been merged yet. We should get rid of them once
771 1 : // the PR is merged.
772 1 : .arg(
773 1 : Arg::new("vm-monitor-addr")
774 1 : .long("vm-monitor-addr")
775 1 : .default_value("0.0.0.0:10301")
776 1 : .value_name("VM_MONITOR_ADDR"),
777 1 : )
778 1 : .arg(
779 1 : Arg::new("cgroup")
780 1 : .long("cgroup")
781 1 : .default_value("neon-postgres")
782 1 : .value_name("CGROUP"),
783 1 : )
784 1 : .arg(
785 1 : Arg::new("filecache-connstr")
786 1 : .long("filecache-connstr")
787 1 : .default_value(
788 1 : "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor",
789 1 : )
790 1 : .value_name("FILECACHE_CONNSTR"),
791 1 : )
792 1 : .arg(
793 1 : Arg::new("resize-swap-on-bind")
794 1 : .long("resize-swap-on-bind")
795 1 : .action(clap::ArgAction::SetTrue),
796 1 : )
797 1 : .arg(
798 1 : Arg::new("set-disk-quota-for-fs")
799 1 : .long("set-disk-quota-for-fs")
800 1 : .value_name("SET_DISK_QUOTA_FOR_FS")
801 1 : )
802 1 : }
803 :
804 : /// When compute_ctl is killed, send also termination signal to sync-safekeepers
805 : /// to prevent leakage. TODO: it is better to convert compute_ctl to async and
806 : /// wait for termination which would be easy then.
807 0 : fn handle_exit_signal(sig: i32) {
808 0 : info!("received {sig} termination signal");
809 0 : forward_termination_signal();
810 0 : exit(1);
811 : }
812 :
813 : #[test]
814 1 : fn verify_cli() {
815 1 : cli().debug_assert()
816 1 : }
|