TLA Line data Source code
1 : //!
2 : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
3 : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
4 : //! initialization:
5 : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
6 : //! - Every start is a fresh start, so the data directory is removed and
7 : //! initialized again on each run.
8 : //! - If remote_extension_config is provided, it will be used to fetch extensions list
9 : //! and download `shared_preload_libraries` from the remote storage.
10 : //! - Next it will put configuration files into the `PGDATA` directory.
11 : //! - Sync safekeepers and get commit LSN.
12 : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
13 : //! - Try to start `postgres` and wait until it is ready to accept connections.
14 : //! - Check and alter/drop/create roles and databases.
15 : //! - Hang waiting on the `postmaster` process to exit.
16 : //!
17 : //! Also `compute_ctl` spawns two separate service threads:
18 : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
19 : //! into the shared `ComputeNode`;
20 : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
21 : //! last activity requests.
22 : //!
23 : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
24 : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
25 : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
26 : //! downscaling and requests immediate upscaling under resource pressure.
27 : //!
28 : //! Usage example:
29 : //! ```sh
30 : //! compute_ctl -D /var/db/postgres/compute \
31 : //! -C 'postgresql://cloud_admin@localhost/postgres' \
32 : //! -S /var/db/postgres/specs/current.json \
33 : //! -b /usr/local/bin/postgres \
34 : //! -r {"bucket": "neon-dev-extensions-eu-central-1", "region": "eu-central-1"}
35 : //! ```
36 : //!
37 : use std::collections::HashMap;
38 : use std::fs::File;
39 : use std::path::Path;
40 : use std::process::exit;
41 : use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
42 : use std::{thread, time::Duration};
43 :
44 : use anyhow::{Context, Result};
45 : use chrono::Utc;
46 : use clap::Arg;
47 : use tracing::{error, info};
48 : use url::Url;
49 :
50 : use compute_api::responses::ComputeStatus;
51 :
52 : use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
53 : use compute_tools::configurator::launch_configurator;
54 : use compute_tools::extension_server::{get_pg_version, init_remote_storage};
55 : use compute_tools::http::api::launch_http_server;
56 : use compute_tools::logger::*;
57 : use compute_tools::monitor::launch_monitor;
58 : use compute_tools::params::*;
59 : use compute_tools::spec::*;
60 :
61 : // this is an arbitrary build tag. Fine as a default / for testing purposes
62 : // in-case of not-set environment var
63 : const BUILD_TAG_DEFAULT: &str = "5670669815";
64 :
65 CBC 641 : fn main() -> Result<()> {
66 641 : init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
67 :
68 641 : let build_tag = option_env!("BUILD_TAG")
69 641 : .unwrap_or(BUILD_TAG_DEFAULT)
70 641 : .to_string();
71 641 : info!("build_tag: {build_tag}");
72 :
73 641 : let matches = cli().get_matches();
74 641 : let pgbin_default = String::from("postgres");
75 641 : let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
76 641 :
77 641 : let remote_ext_config = matches.get_one::<String>("remote-ext-config");
78 641 : let ext_remote_storage = remote_ext_config.map(|x| {
79 UBC 0 : init_remote_storage(x).expect("cannot initialize remote extension storage from config")
80 CBC 641 : });
81 641 :
82 641 : let http_port = *matches
83 641 : .get_one::<u16>("http-port")
84 641 : .expect("http-port is required");
85 641 : let pgdata = matches
86 641 : .get_one::<String>("pgdata")
87 641 : .expect("PGDATA path is required");
88 641 : let connstr = matches
89 641 : .get_one::<String>("connstr")
90 641 : .expect("Postgres connection string is required");
91 641 : let spec_json = matches.get_one::<String>("spec");
92 641 : let spec_path = matches.get_one::<String>("spec-path");
93 641 :
94 641 : // Extract OpenTelemetry context for the startup actions from the
95 641 : // TRACEPARENT and TRACESTATE env variables, and attach it to the current
96 641 : // tracing context.
97 641 : //
98 641 : // This is used to propagate the context for the 'start_compute' operation
99 641 : // from the neon control plane. This allows linking together the wider
100 641 : // 'start_compute' operation that creates the compute container, with the
101 641 : // startup actions here within the container.
102 641 : //
103 641 : // There is no standard for passing context in env variables, but a lot of
104 641 : // tools use TRACEPARENT/TRACESTATE, so we use that convention too. See
105 641 : // https://github.com/open-telemetry/opentelemetry-specification/issues/740
106 641 : //
107 641 : // Switch to the startup context here, and exit it once the startup has
108 641 : // completed and Postgres is up and running.
109 641 : //
110 641 : // If this pod is pre-created without binding it to any particular endpoint
111 641 : // yet, this isn't the right place to enter the startup context. In that
112 641 : // case, the control plane should pass the tracing context as part of the
113 641 : // /configure API call.
114 641 : //
115 641 : // NOTE: This is supposed to only cover the *startup* actions. Once
116 641 : // postgres is configured and up-and-running, we exit this span. Any other
117 641 : // actions that are performed on incoming HTTP requests, for example, are
118 641 : // performed in separate spans.
119 641 : //
120 641 : // XXX: If the pod is restarted, we perform the startup actions in the same
121 641 : // context as the original startup actions, which probably doesn't make
122 641 : // sense.
123 641 : let mut startup_tracing_carrier: HashMap<String, String> = HashMap::new();
124 641 : if let Ok(val) = std::env::var("TRACEPARENT") {
125 UBC 0 : startup_tracing_carrier.insert("traceparent".to_string(), val);
126 CBC 641 : }
127 641 : if let Ok(val) = std::env::var("TRACESTATE") {
128 UBC 0 : startup_tracing_carrier.insert("tracestate".to_string(), val);
129 CBC 641 : }
130 641 : let startup_context_guard = if !startup_tracing_carrier.is_empty() {
131 : use opentelemetry::propagation::TextMapPropagator;
132 : use opentelemetry::sdk::propagation::TraceContextPropagator;
133 UBC 0 : let guard = TraceContextPropagator::new()
134 0 : .extract(&startup_tracing_carrier)
135 0 : .attach();
136 0 : info!("startup tracing context attached");
137 0 : Some(guard)
138 : } else {
139 CBC 641 : None
140 : };
141 :
142 641 : let compute_id = matches.get_one::<String>("compute-id");
143 641 : let control_plane_uri = matches.get_one::<String>("control-plane-uri");
144 641 :
145 641 : let spec;
146 641 : let mut live_config_allowed = false;
147 641 : match spec_json {
148 : // First, try to get cluster spec from the cli argument
149 UBC 0 : Some(json) => {
150 0 : info!("got spec from cli argument {}", json);
151 0 : spec = Some(serde_json::from_str(json)?);
152 : }
153 : None => {
154 : // Second, try to read it from the file if path is provided
155 CBC 641 : if let Some(sp) = spec_path {
156 641 : let path = Path::new(sp);
157 641 : let file = File::open(path)?;
158 641 : spec = Some(serde_json::from_reader(file)?);
159 UBC 0 : } else if let Some(id) = compute_id {
160 0 : if let Some(cp_base) = control_plane_uri {
161 0 : live_config_allowed = true;
162 0 : spec = match get_spec_from_control_plane(cp_base, id) {
163 0 : Ok(s) => s,
164 0 : Err(e) => {
165 0 : error!("cannot get response from control plane: {}", e);
166 0 : panic!("neither spec nor confirmation that compute is in the Empty state was received");
167 : }
168 : };
169 : } else {
170 0 : panic!("must specify both --control-plane-uri and --compute-id or none");
171 : }
172 : } else {
173 0 : panic!(
174 0 : "compute spec should be provided by one of the following ways: \
175 0 : --spec OR --spec-path OR --control-plane-uri and --compute-id"
176 0 : );
177 : }
178 : }
179 : };
180 :
181 CBC 641 : let mut new_state = ComputeState::new();
182 : let spec_set;
183 :
184 641 : if let Some(spec) = spec {
185 641 : let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
186 641 : info!("new pspec.spec: {:?}", pspec.spec);
187 641 : new_state.pspec = Some(pspec);
188 641 : spec_set = true;
189 UBC 0 : } else {
190 0 : spec_set = false;
191 0 : }
192 CBC 641 : let compute_node = ComputeNode {
193 641 : connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
194 641 : pgdata: pgdata.to_string(),
195 641 : pgbin: pgbin.to_string(),
196 641 : pgversion: get_pg_version(pgbin),
197 641 : live_config_allowed,
198 641 : state: Mutex::new(new_state),
199 641 : state_changed: Condvar::new(),
200 641 : ext_remote_storage,
201 641 : ext_download_progress: RwLock::new(HashMap::new()),
202 641 : build_tag,
203 641 : };
204 641 : let compute = Arc::new(compute_node);
205 641 :
206 641 : // If this is a pooled VM, prewarm before starting HTTP server and becoming
207 641 : // available for binding. Prewarming helps postgres start quicker later,
208 641 : // because QEMU will already have it's memory allocated from the host, and
209 641 : // the necessary binaries will alreaady be cached.
210 641 : if !spec_set {
211 UBC 0 : compute.prewarm_postgres()?;
212 CBC 641 : }
213 :
214 : // Launch http service first, so we were able to serve control-plane
215 : // requests, while configuration is still in progress.
216 641 : let _http_handle =
217 641 : launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
218 641 :
219 641 : let extension_server_port: u16 = http_port;
220 641 :
221 641 : if !spec_set {
222 : // No spec provided, hang waiting for it.
223 UBC 0 : info!("no compute spec provided, waiting");
224 :
225 0 : let mut state = compute.state.lock().unwrap();
226 0 : while state.status != ComputeStatus::ConfigurationPending {
227 0 : state = compute.state_changed.wait(state).unwrap();
228 0 :
229 0 : if state.status == ComputeStatus::ConfigurationPending {
230 0 : info!("got spec, continue configuration");
231 : // Spec is already set by the http server handler.
232 0 : break;
233 0 : }
234 : }
235 CBC 641 : }
236 :
237 : // We got all we need, update the state.
238 641 : let mut state = compute.state.lock().unwrap();
239 641 :
240 641 : // Record for how long we slept waiting for the spec.
241 641 : state.metrics.wait_for_spec_ms = Utc::now()
242 641 : .signed_duration_since(state.start_time)
243 641 : .to_std()
244 641 : .unwrap()
245 641 : .as_millis() as u64;
246 641 : // Reset start time to the actual start of the configuration, so that
247 641 : // total startup time was properly measured at the end.
248 641 : state.start_time = Utc::now();
249 641 :
250 641 : state.status = ComputeStatus::Init;
251 641 : compute.state_changed.notify_all();
252 641 : drop(state);
253 641 :
254 641 : // Launch remaining service threads
255 641 : let _monitor_handle = launch_monitor(&compute);
256 641 : let _configurator_handle = launch_configurator(&compute);
257 641 :
258 641 : // Start Postgres
259 641 : let mut delay_exit = false;
260 641 : let mut exit_code = None;
261 641 : let pg = match compute.start_compute(extension_server_port) {
262 632 : Ok(pg) => Some(pg),
263 9 : Err(err) => {
264 9 : error!("could not start the compute node: {:?}", err);
265 9 : let mut state = compute.state.lock().unwrap();
266 9 : state.error = Some(format!("{:?}", err));
267 9 : state.status = ComputeStatus::Failed;
268 9 : drop(state);
269 9 : delay_exit = true;
270 9 : None
271 : }
272 : };
273 :
274 : // Start the vm-monitor if directed to. The vm-monitor only runs on linux
275 : // because it requires cgroups.
276 : cfg_if::cfg_if! {
277 : if #[cfg(target_os = "linux")] {
278 : use std::env;
279 : use tokio_util::sync::CancellationToken;
280 : use tracing::warn;
281 641 : let vm_monitor_addr = matches.get_one::<String>("vm-monitor-addr");
282 641 : let file_cache_connstr = matches.get_one::<String>("filecache-connstr");
283 641 : let cgroup = matches.get_one::<String>("cgroup");
284 641 : let file_cache_on_disk = matches.get_flag("file-cache-on-disk");
285 :
286 : // Only make a runtime if we need to.
287 : // Note: it seems like you can make a runtime in an inner scope and
288 : // if you start a task in it it won't be dropped. However, make it
289 : // in the outermost scope just to be safe.
290 641 : let rt = match (env::var_os("AUTOSCALING"), vm_monitor_addr) {
291 UBC 0 : (None, None) => None,
292 : (None, Some(_)) => {
293 CBC 641 : warn!("--vm-monitor-addr option set but AUTOSCALING env var not present");
294 641 : None
295 : }
296 : (Some(_), None) => {
297 UBC 0 : panic!("AUTOSCALING env var present but --vm-monitor-addr option not set")
298 : }
299 0 : (Some(_), Some(_)) => Some(
300 0 : tokio::runtime::Builder::new_multi_thread()
301 0 : .worker_threads(4)
302 0 : .enable_all()
303 0 : .build()
304 0 : .expect("failed to create tokio runtime for monitor"),
305 0 : ),
306 : };
307 :
308 : // This token is used internally by the monitor to clean up all threads
309 CBC 641 : let token = CancellationToken::new();
310 641 :
311 641 : let vm_monitor = &rt.as_ref().map(|rt| {
312 UBC 0 : rt.spawn(vm_monitor::start(
313 0 : Box::leak(Box::new(vm_monitor::Args {
314 0 : cgroup: cgroup.cloned(),
315 0 : pgconnstr: file_cache_connstr.cloned(),
316 0 : addr: vm_monitor_addr.cloned().unwrap(),
317 0 : file_cache_on_disk,
318 0 : })),
319 0 : token.clone(),
320 0 : ))
321 CBC 641 : });
322 : }
323 : }
324 :
325 : // Wait for the child Postgres process forever. In this state Ctrl+C will
326 : // propagate to Postgres and it will be shut down as well.
327 641 : if let Some(mut pg) = pg {
328 : // Startup is finished, exit the startup tracing span
329 632 : drop(startup_context_guard);
330 632 :
331 632 : let ecode = pg
332 632 : .wait()
333 632 : .expect("failed to start waiting on Postgres process");
334 632 : info!("Postgres exited with code {}, shutting down", ecode);
335 632 : exit_code = ecode.code()
336 9 : }
337 :
338 : // Terminate the vm_monitor so it releases the file watcher on
339 : // /sys/fs/cgroup/neon-postgres.
340 : // Note: the vm-monitor only runs on linux because it requires cgroups.
341 : cfg_if::cfg_if! {
342 : if #[cfg(target_os = "linux")] {
343 641 : if let Some(handle) = vm_monitor {
344 UBC 0 : // Kills all threads spawned by the monitor
345 0 : token.cancel();
346 0 : // Kills the actual task running the monitor
347 0 : handle.abort();
348 0 :
349 0 : // If handle is some, rt must have been used to produce it, and
350 0 : // hence is also some
351 0 : rt.unwrap().shutdown_timeout(Duration::from_secs(2));
352 CBC 641 : }
353 : }
354 : }
355 :
356 : // Maybe sync safekeepers again, to speed up next startup
357 641 : let compute_state = compute.state.lock().unwrap().clone();
358 641 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
359 641 : if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) {
360 552 : info!("syncing safekeepers on shutdown");
361 552 : let storage_auth_token = pspec.storage_auth_token.clone();
362 552 : let lsn = compute.sync_safekeepers(storage_auth_token)?;
363 552 : info!("synced safekeepers at lsn {lsn}");
364 89 : }
365 :
366 641 : if let Err(err) = compute.check_for_core_dumps() {
367 UBC 0 : error!("error while checking for core dumps: {err:?}");
368 CBC 641 : }
369 :
370 : // If launch failed, keep serving HTTP requests for a while, so the cloud
371 : // control plane can get the actual error.
372 641 : if delay_exit {
373 9 : info!("giving control plane 30s to collect the error before shutdown");
374 9 : thread::sleep(Duration::from_secs(30));
375 632 : }
376 :
377 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
378 : // pending traces before we exit. Shutting down OTEL tracing provider may
379 : // hang for quite some time, see, for example:
380 : // - https://github.com/open-telemetry/opentelemetry-rust/issues/868
381 : // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
382 : //
383 : // Yet, we want computes to shut down fast enough, as we may need a new one
384 : // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
385 : // complete, then just error out and exit the main thread.
386 641 : info!("shutting down tracing");
387 641 : let (sender, receiver) = mpsc::channel();
388 641 : let _ = thread::spawn(move || {
389 641 : tracing_utils::shutdown_tracing();
390 641 : sender.send(()).ok()
391 641 : });
392 641 : let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
393 641 : if shutdown_res.is_err() {
394 UBC 0 : error!("timed out while shutting down tracing, exiting anyway");
395 CBC 641 : }
396 :
397 641 : info!("shutting down");
398 641 : exit(exit_code.unwrap_or(1))
399 UBC 0 : }
400 :
401 CBC 642 : fn cli() -> clap::Command {
402 642 : // Env variable is set by `cargo`
403 642 : let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
404 642 : clap::Command::new("compute_ctl")
405 642 : .version(version)
406 642 : .arg(
407 642 : Arg::new("http-port")
408 642 : .long("http-port")
409 642 : .value_name("HTTP_PORT")
410 642 : .default_value("3080")
411 642 : .value_parser(clap::value_parser!(u16))
412 642 : .required(false),
413 642 : )
414 642 : .arg(
415 642 : Arg::new("connstr")
416 642 : .short('C')
417 642 : .long("connstr")
418 642 : .value_name("DATABASE_URL")
419 642 : .required(true),
420 642 : )
421 642 : .arg(
422 642 : Arg::new("pgdata")
423 642 : .short('D')
424 642 : .long("pgdata")
425 642 : .value_name("DATADIR")
426 642 : .required(true),
427 642 : )
428 642 : .arg(
429 642 : Arg::new("pgbin")
430 642 : .short('b')
431 642 : .long("pgbin")
432 642 : .default_value("postgres")
433 642 : .value_name("POSTGRES_PATH"),
434 642 : )
435 642 : .arg(
436 642 : Arg::new("spec")
437 642 : .short('s')
438 642 : .long("spec")
439 642 : .value_name("SPEC_JSON"),
440 642 : )
441 642 : .arg(
442 642 : Arg::new("spec-path")
443 642 : .short('S')
444 642 : .long("spec-path")
445 642 : .value_name("SPEC_PATH"),
446 642 : )
447 642 : .arg(
448 642 : Arg::new("compute-id")
449 642 : .short('i')
450 642 : .long("compute-id")
451 642 : .value_name("COMPUTE_ID"),
452 642 : )
453 642 : .arg(
454 642 : Arg::new("control-plane-uri")
455 642 : .short('p')
456 642 : .long("control-plane-uri")
457 642 : .value_name("CONTROL_PLANE_API_BASE_URI"),
458 642 : )
459 642 : .arg(
460 642 : Arg::new("remote-ext-config")
461 642 : .short('r')
462 642 : .long("remote-ext-config")
463 642 : .value_name("REMOTE_EXT_CONFIG"),
464 642 : )
465 642 : // TODO(fprasx): we currently have default arguments because the cloud PR
466 642 : // to pass them in hasn't been merged yet. We should get rid of them once
467 642 : // the PR is merged.
468 642 : .arg(
469 642 : Arg::new("vm-monitor-addr")
470 642 : .long("vm-monitor-addr")
471 642 : .default_value("0.0.0.0:10301")
472 642 : .value_name("VM_MONITOR_ADDR"),
473 642 : )
474 642 : .arg(
475 642 : Arg::new("cgroup")
476 642 : .long("cgroup")
477 642 : .default_value("neon-postgres")
478 642 : .value_name("CGROUP"),
479 642 : )
480 642 : .arg(
481 642 : Arg::new("filecache-connstr")
482 642 : .long("filecache-connstr")
483 642 : .default_value(
484 642 : "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable",
485 642 : )
486 642 : .value_name("FILECACHE_CONNSTR"),
487 642 : )
488 642 : .arg(
489 642 : Arg::new("file-cache-on-disk")
490 642 : .long("file-cache-on-disk")
491 642 : .action(clap::ArgAction::SetTrue),
492 642 : )
493 642 : }
494 :
495 1 : #[test]
496 1 : fn verify_cli() {
497 1 : cli().debug_assert()
498 1 : }
|