Line data Source code
1 : //!
2 : //! Postgres wrapper (`compute_ctl`) is intended to be run as a Docker entrypoint or as a `systemd`
3 : //! `ExecStart` option. It will handle all the `Neon` specifics during compute node
4 : //! initialization:
5 : //! - `compute_ctl` accepts cluster (compute node) specification as a JSON file.
6 : //! - Every start is a fresh start, so the data directory is removed and
7 : //! initialized again on each run.
8 : //! - If remote_extension_config is provided, it will be used to fetch extensions list
9 : //! and download `shared_preload_libraries` from the remote storage.
10 : //! - Next it will put configuration files into the `PGDATA` directory.
11 : //! - Sync safekeepers and get commit LSN.
12 : //! - Get `basebackup` from pageserver using the returned on the previous step LSN.
13 : //! - Try to start `postgres` and wait until it is ready to accept connections.
14 : //! - Check and alter/drop/create roles and databases.
15 : //! - Hang waiting on the `postmaster` process to exit.
16 : //!
17 : //! Also `compute_ctl` spawns two separate service threads:
18 : //! - `compute-monitor` checks the last Postgres activity timestamp and saves it
19 : //! into the shared `ComputeNode`;
20 : //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the
21 : //! last activity requests.
22 : //!
23 : //! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the
24 : //! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes,
25 : //! `vm-monitor` communicates with the VM autoscaling system. It coordinates
26 : //! downscaling and requests immediate upscaling under resource pressure.
27 : //!
28 : //! Usage example:
29 : //! ```sh
30 : //! compute_ctl -D /var/db/postgres/compute \
31 : //! -C 'postgresql://cloud_admin@localhost/postgres' \
32 : //! -c /var/db/postgres/configs/config.json \
33 : //! -b /usr/local/bin/postgres \
34 : //! -r http://pg-ext-s3-gateway \
35 : //! ```
36 : use std::ffi::OsString;
37 : use std::fs::File;
38 : use std::process::exit;
39 : use std::sync::mpsc;
40 : use std::thread;
41 : use std::time::Duration;
42 :
43 : use anyhow::{Context, Result, bail};
44 : use clap::Parser;
45 : use compute_api::responses::ComputeConfig;
46 : use compute_tools::compute::{
47 : BUILD_TAG, ComputeNode, ComputeNodeParams, forward_termination_signal,
48 : };
49 : use compute_tools::extension_server::get_pg_version_string;
50 : use compute_tools::logger::*;
51 : use compute_tools::params::*;
52 : use compute_tools::spec::*;
53 : use rlimit::{Resource, setrlimit};
54 : use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
55 : use signal_hook::iterator::Signals;
56 : use tracing::{error, info};
57 : use url::Url;
58 : use utils::failpoint_support;
59 :
60 : #[derive(Debug, Parser)]
61 : #[command(rename_all = "kebab-case")]
62 : struct Cli {
63 : #[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
64 0 : pub pgbin: String,
65 :
66 : /// The base URL for the remote extension storage proxy gateway.
67 : #[arg(short = 'r', long, value_parser = Self::parse_remote_ext_base_url)]
68 : pub remote_ext_base_url: Option<Url>,
69 :
70 : /// The port to bind the external listening HTTP server to. Clients running
71 : /// outside the compute will talk to the compute through this port. Keep
72 : /// the previous name for this argument around for a smoother release
73 : /// with the control plane.
74 2 : #[arg(long, default_value_t = 3080)]
75 0 : pub external_http_port: u16,
76 :
77 : /// The port to bind the internal listening HTTP server to. Clients include
78 : /// the neon extension (for installing remote extensions) and local_proxy.
79 2 : #[arg(long, default_value_t = 3081)]
80 0 : pub internal_http_port: u16,
81 :
82 : #[arg(short = 'D', long, value_name = "DATADIR")]
83 0 : pub pgdata: String,
84 :
85 : #[arg(short = 'C', long, value_name = "DATABASE_URL")]
86 0 : pub connstr: String,
87 :
88 : #[cfg(target_os = "linux")]
89 : #[arg(long, default_value = "neon-postgres")]
90 0 : pub cgroup: String,
91 :
92 : #[cfg(target_os = "linux")]
93 : #[arg(
94 : long,
95 : default_value = "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor"
96 : )]
97 0 : pub filecache_connstr: String,
98 :
99 : #[cfg(target_os = "linux")]
100 : #[arg(long, default_value = "0.0.0.0:10301")]
101 0 : pub vm_monitor_addr: String,
102 :
103 : #[arg(long, action = clap::ArgAction::SetTrue)]
104 0 : pub resize_swap_on_bind: bool,
105 :
106 : #[arg(long)]
107 : pub set_disk_quota_for_fs: Option<String>,
108 :
109 : #[arg(short = 'c', long)]
110 : pub config: Option<OsString>,
111 :
112 : #[arg(short = 'i', long, group = "compute-id")]
113 0 : pub compute_id: String,
114 :
115 : #[arg(
116 : short = 'p',
117 : long,
118 : conflicts_with = "config",
119 : value_name = "CONTROL_PLANE_API_BASE_URL",
120 : requires = "compute-id"
121 : )]
122 : pub control_plane_uri: Option<String>,
123 :
124 : /// Interval in seconds for collecting installed extensions statistics
125 : #[arg(long, default_value = "3600")]
126 0 : pub installed_extensions_collection_interval: u64,
127 : }
128 :
129 : impl Cli {
130 : /// Parse a URL from an argument. By default, this isn't necessary, but we
131 : /// want to do some sanity checking.
132 3 : fn parse_remote_ext_base_url(value: &str) -> Result<Url> {
133 3 : // Remove extra trailing slashes, and add one. We use Url::join() later
134 3 : // when downloading remote extensions. If the base URL is something like
135 3 : // http://example.com/pg-ext-s3-gateway, and join() is called with
136 3 : // something like "xyz", the resulting URL is http://example.com/xyz.
137 3 : let value = value.trim_end_matches('/').to_owned() + "/";
138 3 : let url = Url::parse(&value)?;
139 :
140 3 : if url.query_pairs().count() != 0 {
141 1 : bail!("parameters detected in remote extensions base URL")
142 2 : }
143 2 :
144 2 : Ok(url)
145 3 : }
146 : }
147 :
148 0 : fn main() -> Result<()> {
149 0 : let cli = Cli::parse();
150 0 :
151 0 : let scenario = failpoint_support::init();
152 :
153 : // For historical reasons, the main thread that processes the config and launches postgres
154 : // is synchronous, but we always have this tokio runtime available and we "enter" it so
155 : // that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
156 : // from all parts of compute_ctl.
157 0 : let runtime = tokio::runtime::Builder::new_multi_thread()
158 0 : .enable_all()
159 0 : .build()?;
160 0 : let _rt_guard = runtime.enter();
161 0 :
162 0 : runtime.block_on(init())?;
163 :
164 : // enable core dumping for all child processes
165 0 : setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
166 :
167 0 : let connstr = Url::parse(&cli.connstr).context("cannot parse connstr as a URL")?;
168 :
169 0 : let config = get_config(&cli)?;
170 :
171 0 : let compute_node = ComputeNode::new(
172 0 : ComputeNodeParams {
173 0 : compute_id: cli.compute_id,
174 0 : connstr,
175 0 : pgdata: cli.pgdata.clone(),
176 0 : pgbin: cli.pgbin.clone(),
177 0 : pgversion: get_pg_version_string(&cli.pgbin),
178 0 : external_http_port: cli.external_http_port,
179 0 : internal_http_port: cli.internal_http_port,
180 0 : remote_ext_base_url: cli.remote_ext_base_url.clone(),
181 0 : resize_swap_on_bind: cli.resize_swap_on_bind,
182 0 : set_disk_quota_for_fs: cli.set_disk_quota_for_fs,
183 0 : #[cfg(target_os = "linux")]
184 0 : filecache_connstr: cli.filecache_connstr,
185 0 : #[cfg(target_os = "linux")]
186 0 : cgroup: cli.cgroup,
187 0 : #[cfg(target_os = "linux")]
188 0 : vm_monitor_addr: cli.vm_monitor_addr,
189 0 : installed_extensions_collection_interval: cli.installed_extensions_collection_interval,
190 0 : },
191 0 : config,
192 0 : )?;
193 :
194 0 : let exit_code = compute_node.run()?;
195 :
196 0 : scenario.teardown();
197 0 :
198 0 : deinit_and_exit(exit_code);
199 0 : }
200 :
201 0 : async fn init() -> Result<()> {
202 0 : init_tracing_and_logging(DEFAULT_LOG_LEVEL).await?;
203 :
204 0 : let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
205 0 : thread::spawn(move || {
206 0 : for sig in signals.forever() {
207 0 : handle_exit_signal(sig);
208 0 : }
209 0 : });
210 0 :
211 0 : info!("compute build_tag: {}", &BUILD_TAG.to_string());
212 :
213 0 : Ok(())
214 0 : }
215 :
216 0 : fn get_config(cli: &Cli) -> Result<ComputeConfig> {
217 : // First, read the config from the path if provided
218 0 : if let Some(ref config) = cli.config {
219 0 : let file = File::open(config)?;
220 0 : return Ok(serde_json::from_reader(&file)?);
221 0 : }
222 0 :
223 0 : // If the config wasn't provided in the CLI arguments, then retrieve it from
224 0 : // the control plane
225 0 : match get_config_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) {
226 0 : Ok(config) => Ok(config),
227 0 : Err(e) => {
228 0 : error!(
229 0 : "cannot get response from control plane: {}\n\
230 0 : neither spec nor confirmation that compute is in the Empty state was received",
231 : e
232 : );
233 0 : Err(e)
234 : }
235 : }
236 0 : }
237 :
238 0 : fn deinit_and_exit(exit_code: Option<i32>) -> ! {
239 0 : // Shutdown trace pipeline gracefully, so that it has a chance to send any
240 0 : // pending traces before we exit. Shutting down OTEL tracing provider may
241 0 : // hang for quite some time, see, for example:
242 0 : // - https://github.com/open-telemetry/opentelemetry-rust/issues/868
243 0 : // - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
244 0 : //
245 0 : // Yet, we want computes to shut down fast enough, as we may need a new one
246 0 : // for the same timeline ASAP. So wait no longer than 2s for the shutdown to
247 0 : // complete, then just error out and exit the main thread.
248 0 : info!("shutting down tracing");
249 0 : let (sender, receiver) = mpsc::channel();
250 0 : let _ = thread::spawn(move || {
251 0 : tracing_utils::shutdown_tracing();
252 0 : sender.send(()).ok()
253 0 : });
254 0 : let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
255 0 : if shutdown_res.is_err() {
256 0 : error!("timed out while shutting down tracing, exiting anyway");
257 0 : }
258 :
259 0 : info!("shutting down");
260 0 : exit(exit_code.unwrap_or(1))
261 : }
262 :
263 : /// When compute_ctl is killed, send also termination signal to sync-safekeepers
264 : /// to prevent leakage. TODO: it is better to convert compute_ctl to async and
265 : /// wait for termination which would be easy then.
266 0 : fn handle_exit_signal(sig: i32) {
267 0 : info!("received {sig} termination signal");
268 0 : forward_termination_signal();
269 0 : exit(1);
270 : }
271 :
272 : #[cfg(test)]
273 : mod test {
274 : use clap::{CommandFactory, Parser};
275 : use url::Url;
276 :
277 : use super::Cli;
278 :
279 : #[test]
280 1 : fn verify_cli() {
281 1 : Cli::command().debug_assert()
282 1 : }
283 :
284 : #[test]
285 1 : fn verify_remote_ext_base_url() {
286 1 : let cli = Cli::parse_from([
287 1 : "compute_ctl",
288 1 : "--pgdata=test",
289 1 : "--connstr=test",
290 1 : "--compute-id=test",
291 1 : "--remote-ext-base-url",
292 1 : "https://example.com/subpath",
293 1 : ]);
294 1 : assert_eq!(
295 1 : cli.remote_ext_base_url.unwrap(),
296 1 : Url::parse("https://example.com/subpath/").unwrap()
297 1 : );
298 :
299 1 : let cli = Cli::parse_from([
300 1 : "compute_ctl",
301 1 : "--pgdata=test",
302 1 : "--connstr=test",
303 1 : "--compute-id=test",
304 1 : "--remote-ext-base-url",
305 1 : "https://example.com//",
306 1 : ]);
307 1 : assert_eq!(
308 1 : cli.remote_ext_base_url.unwrap(),
309 1 : Url::parse("https://example.com").unwrap()
310 1 : );
311 :
312 1 : Cli::try_parse_from([
313 1 : "compute_ctl",
314 1 : "--pgdata=test",
315 1 : "--connstr=test",
316 1 : "--compute-id=test",
317 1 : "--remote-ext-base-url",
318 1 : "https://example.com?hello=world",
319 1 : ])
320 1 : .expect_err("URL parameters are not allowed");
321 1 : }
322 : }
|