Line data Source code
1 : //!
2 : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
3 : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
4 : //! initialization:
5 : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
6 : //! - Every start is a fresh start, so the data directory is removed and
7 : //! initialized again on each run.
8 : //! - If remote_extension_config is provided, it will be used to fetch extensions list
9 : //! and download `shared_preload_libraries` from the remote storage.
10 : //! - Next it will put configuration files into the `PGDATA` directory.
11 : //! - Sync safekeepers and get commit LSN.
12 : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
13 : //! - Try to start `postgres` and wait until it is ready to accept connections.
14 : //! - Check and alter/drop/create roles and databases.
15 : //! - Hang waiting on the `postmaster` process to exit.
16 : //!
17 : //! Also `compute_ctl` spawns two separate service threads:
18 : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
19 : //! into the shared `ComputeNode`;
20 : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
21 : //! last activity requests.
22 : //!
23 : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
24 : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
25 : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
26 : //! downscaling and requests immediate upscaling under resource pressure.
27 : //!
28 : //! Usage example:
29 : //! ```sh
30 : //! compute_ctl -D /var/db/postgres/compute \
31 : //! -C 'postgresql://cloud_admin@localhost/postgres' \
32 : //! -S /var/db/postgres/specs/current.json \
33 : //! -b /usr/local/bin/postgres \
34 : //! -r http://pg-ext-s3-gateway \
35 : //! ```
36 : use std::ffi::OsString;
37 : use std::fs::File;
38 : use std::path::Path;
39 : use std::process::exit;
40 : use std::sync::mpsc;
41 : use std::thread;
42 : use std::time::Duration;
43 :
44 : use anyhow::{Context, Result};
45 : use clap::Parser;
46 : use compute_api::responses::ComputeCtlConfig;
47 : use compute_api::spec::ComputeSpec;
48 : use compute_tools::compute::{ComputeNode, ComputeNodeParams, forward_termination_signal};
49 : use compute_tools::extension_server::get_pg_version_string;
50 : use compute_tools::logger::*;
51 : use compute_tools::params::*;
52 : use compute_tools::spec::*;
53 : use rlimit::{Resource, setrlimit};
54 : use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
55 : use signal_hook::iterator::Signals;
56 : use tracing::{error, info};
57 : use url::Url;
58 : use utils::failpoint_support;
59 :
60 : // this is an arbitrary build tag. Fine as a default / for testing purposes
61 : // in-case of not-set environment var
62 : const BUILD_TAG_DEFAULT: &str = "latest";
63 :
64 : // Compatibility hack: if the control plane specified any remote-ext-config
65 : // use the default value for extension storage proxy gateway.
66 : // Remove this once the control plane is updated to pass the gateway URL
67 0 : fn parse_remote_ext_config(arg: &str) -> Result<String> {
68 0 : if arg.starts_with("http") {
69 0 : Ok(arg.trim_end_matches('/').to_string())
70 : } else {
71 0 : Ok("http://pg-ext-s3-gateway".to_string())
72 : }
73 0 : }
74 :
75 : #[derive(Parser)]
76 : #[command(rename_all = "kebab-case")]
77 : struct Cli {
78 : #[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
79 0 : pub pgbin: String,
80 :
81 : #[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
82 : pub remote_ext_config: Option<String>,
83 :
84 : /// The port to bind the external listening HTTP server to. Clients running
85 : /// outside the compute will talk to the compute through this port. Keep
86 : /// the previous name for this argument around for a smoother release
87 : /// with the control plane.
88 1 : #[arg(long, default_value_t = 3080)]
89 0 : pub external_http_port: u16,
90 :
91 : /// The port to bind the internal listening HTTP server to. Clients include
92 : /// the neon extension (for installing remote extensions) and local_proxy.
93 1 : #[arg(long, default_value_t = 3081)]
94 0 : pub internal_http_port: u16,
95 :
96 : #[arg(short = 'D', long, value_name = "DATADIR")]
97 0 : pub pgdata: String,
98 :
99 : #[arg(short = 'C', long, value_name = "DATABASE_URL")]
100 0 : pub connstr: String,
101 :
102 : #[cfg(target_os = "linux")]
103 : #[arg(long, default_value = "neon-postgres")]
104 0 : pub cgroup: String,
105 :
106 : #[cfg(target_os = "linux")]
107 : #[arg(
108 : long,
109 : default_value = "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor"
110 : )]
111 0 : pub filecache_connstr: String,
112 :
113 : #[cfg(target_os = "linux")]
114 : #[arg(long, default_value = "0.0.0.0:10301")]
115 0 : pub vm_monitor_addr: String,
116 :
117 : #[arg(long, action = clap::ArgAction::SetTrue)]
118 0 : pub resize_swap_on_bind: bool,
119 :
120 : #[arg(long)]
121 : pub set_disk_quota_for_fs: Option<String>,
122 :
123 : #[arg(short = 's', long = "spec", group = "spec")]
124 : pub spec_json: Option<String>,
125 :
126 : #[arg(short = 'S', long, group = "spec-path")]
127 : pub spec_path: Option<OsString>,
128 :
129 : #[arg(short = 'i', long, group = "compute-id")]
130 0 : pub compute_id: String,
131 :
132 : #[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
133 : pub control_plane_uri: Option<String>,
134 : }
135 :
136 0 : fn main() -> Result<()> {
137 0 : let cli = Cli::parse();
138 0 :
139 0 : let scenario = failpoint_support::init();
140 :
141 : // For historical reasons, the main thread that processes the spec and launches postgres
142 : // is synchronous, but we always have this tokio runtime available and we "enter" it so
143 : // that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
144 : // from all parts of compute_ctl.
145 0 : let runtime = tokio::runtime::Builder::new_multi_thread()
146 0 : .enable_all()
147 0 : .build()?;
148 0 : let _rt_guard = runtime.enter();
149 :
150 0 : let build_tag = runtime.block_on(init())?;
151 :
152 : // enable core dumping for all child processes
153 0 : setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
154 :
155 0 : let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
156 :
157 0 : let cli_spec = try_spec_from_cli(&cli)?;
158 :
159 0 : let compute_node = ComputeNode::new(
160 0 : ComputeNodeParams {
161 0 : compute_id: cli.compute_id,
162 0 : connstr,
163 0 : pgdata: cli.pgdata.clone(),
164 0 : pgbin: cli.pgbin.clone(),
165 0 : pgversion: get_pg_version_string(&cli.pgbin),
166 0 : external_http_port: cli.external_http_port,
167 0 : internal_http_port: cli.internal_http_port,
168 0 : ext_remote_storage: cli.remote_ext_config.clone(),
169 0 : resize_swap_on_bind: cli.resize_swap_on_bind,
170 0 : set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
171 0 : #[cfg(target_os = "linux")]
172 0 : filecache_connstr: cli.filecache_connstr,
173 0 : #[cfg(target_os = "linux")]
174 0 : cgroup: cli.cgroup,
175 0 : #[cfg(target_os = "linux")]
176 0 : vm_monitor_addr: cli.vm_monitor_addr,
177 0 : build_tag,
178 0 :
179 0 : live_config_allowed: cli_spec.live_config_allowed,
180 0 : },
181 0 : cli_spec.spec,
182 0 : cli_spec.compute_ctl_config,
183 0 : )?;
184 :
185 0 : let exit_code = compute_node.run()?;
186 :
187 0 : scenario.teardown();
188 0 :
189 0 : deinit_and_exit(exit_code);
190 0 : }
191 :
192 0 : async fn init() -> Result<String> {
193 0 : init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
194 :
195 0 : let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
196 0 : thread::spawn(move || {
197 0 : for sig in signals.forever() {
198 0 : handle_exit_signal(sig);
199 0 : }
200 0 : });
201 0 :
202 0 : let build_tag = option_env!("BUILD_TAG")
203 0 : .unwrap_or(BUILD_TAG_DEFAULT)
204 0 : .to_string();
205 0 : info!("build_tag: {build_tag}");
206 :
207 0 : Ok(build_tag)
208 0 : }
209 :
210 0 : fn try_spec_from_cli(cli: &Cli) -> Result<CliSpecParams> {
211 : // First, try to get cluster spec from the cli argument
212 0 : if let Some(ref spec_json) = cli.spec_json {
213 0 : info!("got spec from cli argument {}", spec_json);
214 : return Ok(CliSpecParams {
215 0 : spec: Some(serde_json::from_str(spec_json)?),
216 0 : compute_ctl_config: ComputeCtlConfig::default(),
217 : live_config_allowed: false,
218 : });
219 0 : }
220 :
221 : // Second, try to read it from the file if path is provided
222 0 : if let Some(ref spec_path) = cli.spec_path {
223 0 : let file = File::open(Path::new(spec_path))?;
224 : return Ok(CliSpecParams {
225 0 : spec: Some(serde_json::from_reader(file)?),
226 0 : compute_ctl_config: ComputeCtlConfig::default(),
227 : live_config_allowed: true,
228 : });
229 0 : }
230 0 :
231 0 : if cli.control_plane_uri.is_none() {
232 0 : panic!("must specify --control-plane-uri");
233 0 : };
234 0 :
235 0 : match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
236 0 : Ok(resp) => Ok(CliSpecParams {
237 0 : spec: resp.0,
238 0 : compute_ctl_config: resp.1,
239 0 : live_config_allowed: true,
240 0 : }),
241 0 : Err(e) => {
242 0 : error!(
243 0 : "cannot get response from control plane: {}\n\
244 0 : neither spec nor confirmation that compute is in the Empty state was received",
245 : e
246 : );
247 0 : Err(e)
248 : }
249 : }
250 0 : }
251 :
252 : struct CliSpecParams {
253 : /// If a spec was provided via CLI or file, the [`ComputeSpec`]
254 : spec: Option<ComputeSpec>,
255 : #[allow(dead_code)]
256 : compute_ctl_config: ComputeCtlConfig,
257 : live_config_allowed: bool,
258 : }
259 :
260 0 : fn deinit_and_exit(exit_code: Option<i32>) -> ! {
261 0 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
262 0 : // pending traces before we exit. Shutting down OTEL tracing provider may
263 0 : // hang for quite some time, see, for example:
264 0 : // - https://github.com/open-telemetry/opentelemetry-rust/issues/868
265 0 : // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
266 0 : //
267 0 : // Yet, we want computes to shut down fast enough, as we may need a new one
268 0 : // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
269 0 : // complete, then just error out and exit the main thread.
270 0 : info!("shutting down tracing");
271 0 : let (sender, receiver) = mpsc::channel();
272 0 : let _ = thread::spawn(move || {
273 0 : tracing_utils::shutdown_tracing();
274 0 : sender.send(()).ok()
275 0 : });
276 0 : let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
277 0 : if shutdown_res.is_err() {
278 0 : error!("timed out while shutting down tracing, exiting anyway");
279 0 : }
280 :
281 0 : info!("shutting down");
282 0 : exit(exit_code.unwrap_or(1))
283 : }
284 :
285 : /// When compute_ctl is killed, send also termination signal to sync-safekeepers
286 : /// to prevent leakage. TODO: it is better to convert compute_ctl to async and
287 : /// wait for termination which would be easy then.
288 0 : fn handle_exit_signal(sig: i32) {
289 0 : info!("received {sig} termination signal");
290 0 : forward_termination_signal();
291 0 : exit(1);
292 : }
293 :
294 : #[cfg(test)]
295 : mod test {
296 : use clap::CommandFactory;
297 :
298 : use super::Cli;
299 :
300 : #[test]
301 1 : fn verify_cli() {
302 1 : Cli::command().debug_assert()
303 1 : }
304 : }
|