TLA Line data Source code
1 : //! Code to manage compute endpoints
2 : //!
3 : //! In the local test environment, the data for each endpoint is stored in
4 : //!
5 : //! ```text
6 : //! .neon/endpoints/<endpoint id>
7 : //! ```
8 : //!
9 : //! Some basic information about the endpoint, like the tenant and timeline IDs,
10 : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
11 : //! when the endpoint is created, and doesn't change afterwards.
12 : //!
13 : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
14 : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
15 : //! the basebackup from the pageserver to initialize the the data directory, and
16 : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
17 : //! until it exits.
18 : //!
19 : //! When an endpoint is created, a `postgresql.conf` file is also created in
20 : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
21 : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
22 : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
23 : //! copy of it in the data directory.
24 : //!
25 : //! Directory contents:
26 : //!
27 : //! ```text
28 : //! .neon/endpoints/main/
29 : //! compute.log - log output of `compute_ctl` and `postgres`
30 : //! endpoint.json - serialized `EndpointConf` struct
31 : //! postgresql.conf - postgresql settings
32 : //! spec.json - passed to `compute_ctl`
33 : //! pgdata/
34 : //! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
35 : //! zenith.signal
36 : //! <other PostgreSQL files>
37 : //! ```
38 : //!
39 : use std::collections::BTreeMap;
40 : use std::net::SocketAddr;
41 : use std::net::TcpStream;
42 : use std::path::PathBuf;
43 : use std::process::Command;
44 : use std::sync::Arc;
45 : use std::time::Duration;
46 :
47 : use anyhow::{anyhow, bail, Context, Result};
48 : use compute_api::spec::RemoteExtSpec;
49 : use nix::sys::signal::kill;
50 : use nix::sys::signal::Signal;
51 : use serde::{Deserialize, Serialize};
52 : use utils::id::{NodeId, TenantId, TimelineId};
53 :
54 : use crate::local_env::LocalEnv;
55 : use crate::pageserver::PageServerNode;
56 : use crate::postgresql_conf::PostgresConf;
57 :
58 : use compute_api::responses::{ComputeState, ComputeStatus};
59 : use compute_api::spec::{Cluster, ComputeMode, ComputeSpec};
60 :
61 : // contents of a endpoint.json file
62 CBC 73549 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
63 : pub struct EndpointConf {
64 : endpoint_id: String,
65 : tenant_id: TenantId,
66 : timeline_id: TimelineId,
67 : mode: ComputeMode,
68 : pg_port: u16,
69 : http_port: u16,
70 : pg_version: u32,
71 : skip_pg_catalog_updates: bool,
72 : pageserver_id: NodeId,
73 : }
74 :
75 : //
76 : // ComputeControlPlane
77 : //
78 : pub struct ComputeControlPlane {
79 : base_port: u16,
80 :
81 : // endpoint ID is the key
82 : pub endpoints: BTreeMap<String, Arc<Endpoint>>,
83 :
84 : env: LocalEnv,
85 : }
86 :
87 : impl ComputeControlPlane {
88 : // Load current endpoints from the endpoints/ subdirectories
89 1810 : pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
90 1810 : let mut endpoints = BTreeMap::default();
91 3654 : for endpoint_dir in std::fs::read_dir(env.endpoints_path())
92 1810 : .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
93 3654 : {
94 3654 : let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
95 3654 : endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
96 : }
97 :
98 1810 : Ok(ComputeControlPlane {
99 1810 : base_port: 55431,
100 1810 : endpoints,
101 1810 : env,
102 1810 : })
103 1810 : }
104 :
105 6 : fn get_port(&mut self) -> u16 {
106 6 : 1 + self
107 6 : .endpoints
108 6 : .values()
109 6 : .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port()))
110 6 : .max()
111 6 : .unwrap_or(self.base_port)
112 6 : }
113 :
114 : #[allow(clippy::too_many_arguments)]
115 484 : pub fn new_endpoint(
116 484 : &mut self,
117 484 : endpoint_id: &str,
118 484 : tenant_id: TenantId,
119 484 : timeline_id: TimelineId,
120 484 : pg_port: Option<u16>,
121 484 : http_port: Option<u16>,
122 484 : pg_version: u32,
123 484 : mode: ComputeMode,
124 484 : pageserver_id: NodeId,
125 484 : ) -> Result<Arc<Endpoint>> {
126 484 : let pg_port = pg_port.unwrap_or_else(|| self.get_port());
127 484 : let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
128 484 : let pageserver =
129 484 : PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
130 :
131 484 : let ep = Arc::new(Endpoint {
132 484 : endpoint_id: endpoint_id.to_owned(),
133 484 : pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
134 484 : http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
135 484 : env: self.env.clone(),
136 484 : pageserver,
137 484 : timeline_id,
138 484 : mode,
139 484 : tenant_id,
140 484 : pg_version,
141 484 : // We don't setup roles and databases in the spec locally, so we don't need to
142 484 : // do catalog updates. Catalog updates also include check availability
143 484 : // data creation. Yet, we have tests that check that size and db dump
144 484 : // before and after start are the same. So, skip catalog updates,
145 484 : // with this we basically test a case of waking up an idle compute, where
146 484 : // we also skip catalog updates in the cloud.
147 484 : skip_pg_catalog_updates: true,
148 484 : });
149 484 :
150 484 : ep.create_endpoint_dir()?;
151 : std::fs::write(
152 484 : ep.endpoint_path().join("endpoint.json"),
153 484 : serde_json::to_string_pretty(&EndpointConf {
154 484 : endpoint_id: endpoint_id.to_string(),
155 484 : tenant_id,
156 484 : timeline_id,
157 484 : mode,
158 484 : http_port,
159 484 : pg_port,
160 484 : pg_version,
161 484 : skip_pg_catalog_updates: true,
162 484 : pageserver_id,
163 484 : })?,
164 UBC 0 : )?;
165 : std::fs::write(
166 CBC 484 : ep.endpoint_path().join("postgresql.conf"),
167 484 : ep.setup_pg_conf()?.to_string(),
168 UBC 0 : )?;
169 :
170 CBC 484 : self.endpoints
171 484 : .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
172 484 :
173 484 : Ok(ep)
174 484 : }
175 :
176 1051 : pub fn check_conflicting_endpoints(
177 1051 : &self,
178 1051 : mode: ComputeMode,
179 1051 : tenant_id: TenantId,
180 1051 : timeline_id: TimelineId,
181 1051 : ) -> Result<()> {
182 1051 : if matches!(mode, ComputeMode::Primary) {
183 : // this check is not complete, as you could have a concurrent attempt at
184 : // creating another primary, both reading the state before checking it here,
185 : // but it's better than nothing.
186 1414 : let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
187 1414 : v.tenant_id == tenant_id
188 1331 : && v.timeline_id == timeline_id
189 906 : && v.mode == mode
190 902 : && v.status() != "stopped"
191 1414 : });
192 :
193 949 : if let Some((key, _)) = duplicates.next() {
194 25 : bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
195 924 : }
196 102 : }
197 1026 : Ok(())
198 1051 : }
199 : }
200 :
201 : ///////////////////////////////////////////////////////////////////////////////
202 :
203 UBC 0 : #[derive(Debug)]
204 : pub struct Endpoint {
205 : /// used as the directory name
206 : endpoint_id: String,
207 : pub tenant_id: TenantId,
208 : pub timeline_id: TimelineId,
209 : pub mode: ComputeMode,
210 :
211 : // port and address of the Postgres server and `compute_ctl`'s HTTP API
212 : pub pg_address: SocketAddr,
213 : pub http_address: SocketAddr,
214 :
215 : // postgres major version in the format: 14, 15, etc.
216 : pg_version: u32,
217 :
218 : // These are not part of the endpoint as such, but the environment
219 : // the endpoint runs in.
220 : pub env: LocalEnv,
221 : pageserver: PageServerNode,
222 :
223 : // Optimizations
224 : skip_pg_catalog_updates: bool,
225 : }
226 :
227 : impl Endpoint {
228 CBC 3654 : fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
229 3654 : if !entry.file_type()?.is_dir() {
230 UBC 0 : anyhow::bail!(
231 0 : "Endpoint::from_dir_entry failed: '{}' is not a directory",
232 0 : entry.path().display()
233 0 : );
234 CBC 3654 : }
235 3654 :
236 3654 : // parse data directory name
237 3654 : let fname = entry.file_name();
238 3654 : let endpoint_id = fname.to_str().unwrap().to_string();
239 :
240 : // Read the endpoint.json file
241 3654 : let conf: EndpointConf =
242 3654 : serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
243 :
244 3654 : let pageserver =
245 3654 : PageServerNode::from_env(env, env.get_pageserver_conf(conf.pageserver_id)?);
246 :
247 3654 : Ok(Endpoint {
248 3654 : pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
249 3654 : http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
250 3654 : endpoint_id,
251 3654 : env: env.clone(),
252 3654 : pageserver,
253 3654 : timeline_id: conf.timeline_id,
254 3654 : mode: conf.mode,
255 3654 : tenant_id: conf.tenant_id,
256 3654 : pg_version: conf.pg_version,
257 3654 : skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
258 3654 : })
259 3654 : }
260 :
261 484 : fn create_endpoint_dir(&self) -> Result<()> {
262 484 : std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
263 UBC 0 : format!(
264 0 : "could not create endpoint directory {}",
265 0 : self.endpoint_path().display()
266 0 : )
267 CBC 484 : })
268 484 : }
269 :
270 : // Generate postgresql.conf with default configuration
271 484 : fn setup_pg_conf(&self) -> Result<PostgresConf> {
272 484 : let mut conf = PostgresConf::new();
273 484 : conf.append("max_wal_senders", "10");
274 484 : conf.append("wal_log_hints", "off");
275 484 : conf.append("max_replication_slots", "10");
276 484 : conf.append("hot_standby", "on");
277 484 : conf.append("shared_buffers", "1MB");
278 484 : conf.append("fsync", "off");
279 484 : conf.append("max_connections", "100");
280 484 : conf.append("wal_level", "logical");
281 484 : // wal_sender_timeout is the maximum time to wait for WAL replication.
282 484 : // It also defines how often the walreciever will send a feedback message to the wal sender.
283 484 : conf.append("wal_sender_timeout", "5s");
284 484 : conf.append("listen_addresses", &self.pg_address.ip().to_string());
285 484 : conf.append("port", &self.pg_address.port().to_string());
286 484 : conf.append("wal_keep_size", "0");
287 484 : // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
288 484 : conf.append("restart_after_crash", "off");
289 484 :
290 484 : // Load the 'neon' extension
291 484 : conf.append("shared_preload_libraries", "neon");
292 484 :
293 484 : conf.append_line("");
294 484 : // Replication-related configurations, such as WAL sending
295 484 : match &self.mode {
296 : ComputeMode::Primary => {
297 : // Configure backpressure
298 : // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
299 : // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
300 : // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
301 : // Actually latency should be much smaller (better if < 1sec). But we assume that recently
302 : // updates pages are not requested from pageserver.
303 : // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
304 : // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
305 : // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
306 : // recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
307 : // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
308 : // To be able to restore database in case of pageserver node crash, safekeeper should not
309 : // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
310 : // (if they are not able to upload WAL to S3).
311 433 : conf.append("max_replication_write_lag", "15MB");
312 433 : conf.append("max_replication_flush_lag", "10GB");
313 433 :
314 433 : if !self.env.safekeepers.is_empty() {
315 433 : // Configure Postgres to connect to the safekeepers
316 433 : conf.append("synchronous_standby_names", "walproposer");
317 433 :
318 433 : let safekeepers = self
319 433 : .env
320 433 : .safekeepers
321 433 : .iter()
322 572 : .map(|sk| format!("localhost:{}", sk.get_compute_port()))
323 433 : .collect::<Vec<String>>()
324 433 : .join(",");
325 433 : conf.append("neon.safekeepers", &safekeepers);
326 433 : } else {
327 UBC 0 : // We only use setup without safekeepers for tests,
328 0 : // and don't care about data durability on pageserver,
329 0 : // so set more relaxed synchronous_commit.
330 0 : conf.append("synchronous_commit", "remote_write");
331 0 :
332 0 : // Configure the node to stream WAL directly to the pageserver
333 0 : // This isn't really a supported configuration, but can be useful for
334 0 : // testing.
335 0 : conf.append("synchronous_standby_names", "pageserver");
336 0 : }
337 : }
338 CBC 49 : ComputeMode::Static(lsn) => {
339 49 : conf.append("recovery_target_lsn", &lsn.to_string());
340 49 : }
341 : ComputeMode::Replica => {
342 2 : assert!(!self.env.safekeepers.is_empty());
343 :
344 : // TODO: use future host field from safekeeper spec
345 : // Pass the list of safekeepers to the replica so that it can connect to any of them,
346 : // whichever is available.
347 2 : let sk_ports = self
348 2 : .env
349 2 : .safekeepers
350 2 : .iter()
351 2 : .map(|x| x.get_compute_port().to_string())
352 2 : .collect::<Vec<_>>()
353 2 : .join(",");
354 2 : let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
355 2 :
356 2 : let connstr = format!(
357 2 : "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
358 2 : sk_hosts,
359 2 : sk_ports,
360 2 : &self.timeline_id.to_string(),
361 2 : &self.tenant_id.to_string(),
362 2 : );
363 2 :
364 2 : let slot_name = format!("repl_{}_", self.timeline_id);
365 2 : conf.append("primary_conninfo", connstr.as_str());
366 2 : conf.append("primary_slot_name", slot_name.as_str());
367 2 : conf.append("hot_standby", "on");
368 2 : // prefetching of blocks referenced in WAL doesn't make sense for us
369 2 : // Neon hot standby ignores pages that are not in the shared_buffers
370 2 : if self.pg_version >= 15 {
371 UBC 0 : conf.append("recovery_prefetch", "off");
372 CBC 2 : }
373 : }
374 : }
375 :
376 484 : Ok(conf)
377 484 : }
378 :
379 9100 : pub fn endpoint_path(&self) -> PathBuf {
380 9100 : self.env.endpoints_path().join(&self.endpoint_id)
381 9100 : }
382 :
383 3164 : pub fn pgdata(&self) -> PathBuf {
384 3164 : self.endpoint_path().join("pgdata")
385 3164 : }
386 :
387 1447 : pub fn status(&self) -> &str {
388 1447 : let timeout = Duration::from_millis(300);
389 1447 : let has_pidfile = self.pgdata().join("postmaster.pid").exists();
390 1447 : let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
391 1447 :
392 1447 : match (has_pidfile, can_connect) {
393 25 : (true, true) => "running",
394 1422 : (false, false) => "stopped",
395 UBC 0 : (true, false) => "crashed",
396 0 : (false, true) => "running, no pidfile",
397 : }
398 CBC 1447 : }
399 :
400 536 : fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
401 536 : let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
402 536 : let mut cmd = Command::new(&pg_ctl_path);
403 536 : cmd.args(
404 536 : [
405 536 : &[
406 536 : "-D",
407 536 : self.pgdata().to_str().unwrap(),
408 536 : "-w", //wait till pg_ctl actually does what was asked
409 536 : ],
410 536 : args,
411 536 : ]
412 536 : .concat(),
413 536 : )
414 536 : .env_clear()
415 536 : .env(
416 536 : "LD_LIBRARY_PATH",
417 536 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
418 536 : )
419 536 : .env(
420 536 : "DYLD_LIBRARY_PATH",
421 536 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
422 : );
423 :
424 : // Pass authentication token used for the connections to pageserver and safekeepers
425 536 : if let Some(token) = auth_token {
426 UBC 0 : cmd.env("NEON_AUTH_TOKEN", token);
427 CBC 536 : }
428 :
429 536 : let pg_ctl = cmd
430 536 : .output()
431 536 : .context(format!("{} failed", pg_ctl_path.display()))?;
432 536 : if !pg_ctl.status.success() {
433 UBC 0 : anyhow::bail!(
434 0 : "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
435 0 : pg_ctl.status,
436 0 : String::from_utf8_lossy(&pg_ctl.stdout),
437 0 : String::from_utf8_lossy(&pg_ctl.stderr),
438 0 : );
439 CBC 536 : }
440 536 :
441 536 : Ok(())
442 536 : }
443 :
444 536 : fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
445 536 : // TODO use background_process::stop_process instead
446 536 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
447 536 : let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
448 536 : let pid = nix::unistd::Pid::from_raw(pid as i32);
449 536 : if send_sigterm {
450 27 : kill(pid, Signal::SIGTERM).ok();
451 509 : }
452 536 : crate::background_process::wait_until_stopped("compute_ctl", pid)?;
453 536 : Ok(())
454 536 : }
455 :
456 762 : fn read_postgresql_conf(&self) -> Result<String> {
457 762 : // Slurp the endpoints/<endpoint id>/postgresql.conf file into
458 762 : // memory. We will include it in the spec file that we pass to
459 762 : // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
460 762 : // in the data directory.
461 762 : let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
462 762 : match std::fs::read(&postgresql_conf_path) {
463 762 : Ok(content) => Ok(String::from_utf8(content)?),
464 UBC 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
465 0 : Err(e) => Err(anyhow::Error::new(e).context(format!(
466 0 : "failed to read config file in {}",
467 0 : postgresql_conf_path.to_str().unwrap()
468 0 : ))),
469 : }
470 CBC 762 : }
471 :
472 545 : pub async fn start(
473 545 : &self,
474 545 : auth_token: &Option<String>,
475 545 : safekeepers: Vec<NodeId>,
476 545 : remote_ext_config: Option<&String>,
477 545 : ) -> Result<()> {
478 545 : if self.status() == "running" {
479 UBC 0 : anyhow::bail!("The endpoint is already running");
480 CBC 545 : }
481 :
482 545 : let postgresql_conf = self.read_postgresql_conf()?;
483 :
484 : // We always start the compute node from scratch, so if the Postgres
485 : // data dir exists from a previous launch, remove it first.
486 545 : if self.pgdata().exists() {
487 64 : std::fs::remove_dir_all(self.pgdata())?;
488 481 : }
489 :
490 545 : let pageserver_connstring = {
491 545 : let config = &self.pageserver.pg_connection_config;
492 545 : let (host, port) = (config.host(), config.port());
493 545 :
494 545 : // NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
495 545 : format!("postgresql://no_user@{host}:{port}")
496 545 : };
497 545 : let mut safekeeper_connstrings = Vec::new();
498 545 : if self.mode == ComputeMode::Primary {
499 1129 : for sk_id in safekeepers {
500 635 : let sk = self
501 635 : .env
502 635 : .safekeepers
503 635 : .iter()
504 850 : .find(|node| node.id == sk_id)
505 635 : .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
506 635 : safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
507 : }
508 51 : }
509 :
510 : // check for file remote_extensions_spec.json
511 : // if it is present, read it and pass to compute_ctl
512 545 : let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
513 545 : let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
514 : let remote_extensions: Option<RemoteExtSpec>;
515 :
516 545 : if let Ok(spec_file) = remote_extensions_spec {
517 1 : remote_extensions = serde_json::from_reader(spec_file).ok();
518 544 : } else {
519 544 : remote_extensions = None;
520 544 : };
521 :
522 : // Create spec file
523 545 : let spec = ComputeSpec {
524 545 : skip_pg_catalog_updates: self.skip_pg_catalog_updates,
525 545 : format_version: 1.0,
526 545 : operation_uuid: None,
527 545 : features: vec![],
528 545 : cluster: Cluster {
529 545 : cluster_id: None, // project ID: not used
530 545 : name: None, // project name: not used
531 545 : state: None,
532 545 : roles: vec![],
533 545 : databases: vec![],
534 545 : settings: None,
535 545 : postgresql_conf: Some(postgresql_conf),
536 545 : },
537 545 : delta_operations: None,
538 545 : tenant_id: Some(self.tenant_id),
539 545 : timeline_id: Some(self.timeline_id),
540 545 : mode: self.mode,
541 545 : pageserver_connstring: Some(pageserver_connstring),
542 545 : safekeeper_connstrings,
543 545 : storage_auth_token: auth_token.clone(),
544 545 : remote_extensions,
545 545 : pgbouncer_settings: None,
546 545 : };
547 545 : let spec_path = self.endpoint_path().join("spec.json");
548 545 : std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
549 :
550 : // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
551 545 : let logfile = std::fs::OpenOptions::new()
552 545 : .create(true)
553 545 : .append(true)
554 545 : .open(self.endpoint_path().join("compute.log"))?;
555 :
556 : // Launch compute_ctl
557 545 : println!("Starting postgres node at '{}'", self.connstr());
558 545 : let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
559 545 : cmd.args(["--http-port", &self.http_address.port().to_string()])
560 545 : .args(["--pgdata", self.pgdata().to_str().unwrap()])
561 545 : .args(["--connstr", &self.connstr()])
562 545 : .args([
563 545 : "--spec-path",
564 545 : self.endpoint_path().join("spec.json").to_str().unwrap(),
565 545 : ])
566 545 : .args([
567 545 : "--pgbin",
568 545 : self.env
569 545 : .pg_bin_dir(self.pg_version)?
570 545 : .join("postgres")
571 545 : .to_str()
572 545 : .unwrap(),
573 545 : ])
574 545 : .stdin(std::process::Stdio::null())
575 545 : .stderr(logfile.try_clone()?)
576 545 : .stdout(logfile);
577 :
578 545 : if let Some(remote_ext_config) = remote_ext_config {
579 1 : cmd.args(["--remote-ext-config", remote_ext_config]);
580 544 : }
581 :
582 545 : let child = cmd.spawn()?;
583 :
584 : // Write down the pid so we can wait for it when we want to stop
585 : // TODO use background_process::start_process instead
586 545 : let pid = child.id();
587 545 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
588 545 : std::fs::write(pidfile_path, pid.to_string())?;
589 :
590 : // Wait for it to start
591 545 : let mut attempt = 0;
592 : const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
593 : const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
594 : loop {
595 2420 : attempt += 1;
596 6170 : match self.get_status().await {
597 1875 : Ok(state) => {
598 1875 : match state.status {
599 : ComputeStatus::Init => {
600 1330 : if attempt == MAX_ATTEMPTS {
601 UBC 0 : bail!("compute startup timed out; still in Init state");
602 CBC 1330 : }
603 : // keep retrying
604 : }
605 : ComputeStatus::Running => {
606 : // All good!
607 537 : break;
608 : }
609 : ComputeStatus::Failed => {
610 8 : bail!(
611 8 : "compute startup failed: {}",
612 8 : state
613 8 : .error
614 8 : .as_deref()
615 8 : .unwrap_or("<no error from compute_ctl>")
616 8 : );
617 : }
618 : ComputeStatus::Empty
619 : | ComputeStatus::ConfigurationPending
620 : | ComputeStatus::Configuration => {
621 UBC 0 : bail!("unexpected compute status: {:?}", state.status)
622 : }
623 : }
624 : }
625 CBC 545 : Err(e) => {
626 545 : if attempt == MAX_ATTEMPTS {
627 UBC 0 : return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
628 CBC 545 : }
629 : }
630 : }
631 1875 : std::thread::sleep(ATTEMPT_INTERVAL);
632 : }
633 :
634 537 : Ok(())
635 545 : }
636 :
637 : // Call the /status HTTP API
638 2420 : pub async fn get_status(&self) -> Result<ComputeState> {
639 2420 : let client = reqwest::Client::new();
640 :
641 2420 : let response = client
642 2420 : .request(
643 2420 : reqwest::Method::GET,
644 2420 : format!(
645 2420 : "http://{}:{}/status",
646 2420 : self.http_address.ip(),
647 2420 : self.http_address.port()
648 2420 : ),
649 2420 : )
650 2420 : .send()
651 6170 : .await?;
652 :
653 : // Interpret the response
654 1875 : let status = response.status();
655 1875 : if !(status.is_client_error() || status.is_server_error()) {
656 1875 : Ok(response.json().await?)
657 : } else {
658 : // reqwest does not export its error construction utility functions, so let's craft the message ourselves
659 UBC 0 : let url = response.url().to_owned();
660 0 : let msg = match response.text().await {
661 0 : Ok(err_body) => format!("Error: {}", err_body),
662 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
663 : };
664 0 : Err(anyhow::anyhow!(msg))
665 : }
666 CBC 2420 : }
667 :
668 217 : pub async fn reconfigure(&self, pageserver_id: Option<NodeId>) -> Result<()> {
669 217 : let mut spec: ComputeSpec = {
670 217 : let spec_path = self.endpoint_path().join("spec.json");
671 217 : let file = std::fs::File::open(spec_path)?;
672 217 : serde_json::from_reader(file)?
673 : };
674 :
675 217 : let postgresql_conf = self.read_postgresql_conf()?;
676 217 : spec.cluster.postgresql_conf = Some(postgresql_conf);
677 :
678 217 : if let Some(pageserver_id) = pageserver_id {
679 217 : let endpoint_config_path = self.endpoint_path().join("endpoint.json");
680 217 : let mut endpoint_conf: EndpointConf = {
681 217 : let file = std::fs::File::open(&endpoint_config_path)?;
682 217 : serde_json::from_reader(file)?
683 : };
684 217 : endpoint_conf.pageserver_id = pageserver_id;
685 217 : std::fs::write(
686 217 : endpoint_config_path,
687 217 : serde_json::to_string_pretty(&endpoint_conf)?,
688 UBC 0 : )?;
689 :
690 CBC 217 : let pageserver =
691 217 : PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
692 217 : let ps_http_conf = &pageserver.pg_connection_config;
693 217 : let (host, port) = (ps_http_conf.host(), ps_http_conf.port());
694 217 : spec.pageserver_connstring = Some(format!("postgresql://no_user@{host}:{port}"));
695 UBC 0 : }
696 :
697 CBC 217 : let client = reqwest::Client::new();
698 217 : let response = client
699 217 : .post(format!(
700 217 : "http://{}:{}/configure",
701 217 : self.http_address.ip(),
702 217 : self.http_address.port()
703 217 : ))
704 217 : .body(format!(
705 217 : "{{\"spec\":{}}}",
706 217 : serde_json::to_string_pretty(&spec)?
707 : ))
708 217 : .send()
709 651 : .await?;
710 :
711 217 : let status = response.status();
712 217 : if !(status.is_client_error() || status.is_server_error()) {
713 217 : Ok(())
714 : } else {
715 UBC 0 : let url = response.url().to_owned();
716 0 : let msg = match response.text().await {
717 0 : Ok(err_body) => format!("Error: {}", err_body),
718 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
719 : };
720 0 : Err(anyhow::anyhow!(msg))
721 : }
722 CBC 217 : }
723 :
724 536 : pub fn stop(&self, destroy: bool) -> Result<()> {
725 536 : // If we are going to destroy data directory,
726 536 : // use immediate shutdown mode, otherwise,
727 536 : // shutdown gracefully to leave the data directory sane.
728 536 : //
729 536 : // Postgres is always started from scratch, so stop
730 536 : // without destroy only used for testing and debugging.
731 536 : //
732 536 : self.pg_ctl(
733 536 : if destroy {
734 27 : &["-m", "immediate", "stop"]
735 : } else {
736 509 : &["stop"]
737 : },
738 536 : &None,
739 UBC 0 : )?;
740 :
741 : // Also wait for the compute_ctl process to die. It might have some
742 : // cleanup work to do after postgres stops, like syncing safekeepers,
743 : // etc.
744 : //
745 : // If destroying, send it SIGTERM before waiting. Sometimes we do *not*
746 : // want this cleanup: tests intentionally do stop when majority of
747 : // safekeepers is down, so sync-safekeepers would hang otherwise. This
748 : // could be a separate flag though.
749 CBC 536 : self.wait_for_compute_ctl_to_exit(destroy)?;
750 536 : if destroy {
751 27 : println!(
752 27 : "Destroying postgres data directory '{}'",
753 27 : self.pgdata().to_str().unwrap()
754 27 : );
755 27 : std::fs::remove_dir_all(self.endpoint_path())?;
756 509 : }
757 536 : Ok(())
758 536 : }
759 :
760 1090 : pub fn connstr(&self) -> String {
761 1090 : format!(
762 1090 : "postgresql://{}@{}:{}/{}",
763 1090 : "cloud_admin",
764 1090 : self.pg_address.ip(),
765 1090 : self.pg_address.port(),
766 1090 : "postgres"
767 1090 : )
768 1090 : }
769 : }
|