TLA Line data Source code
1 : //! Code to manage compute endpoints
2 : //!
3 : //! In the local test environment, the data for each endpoint is stored in
4 : //!
5 : //! ```text
6 : //! .neon/endpoints/<endpoint id>
7 : //! ```
8 : //!
9 : //! Some basic information about the endpoint, like the tenant and timeline IDs,
10 : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
11 : //! when the endpoint is created, and doesn't change afterwards.
12 : //!
13 : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
14 : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
15 : //! the basebackup from the pageserver to initialize the the data directory, and
16 : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
17 : //! until it exits.
18 : //!
19 : //! When an endpoint is created, a `postgresql.conf` file is also created in
20 : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
21 : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
22 : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
23 : //! copy of it in the data directory.
24 : //!
25 : //! Directory contents:
26 : //!
27 : //! ```text
28 : //! .neon/endpoints/main/
29 : //! compute.log - log output of `compute_ctl` and `postgres`
30 : //! endpoint.json - serialized `EndpointConf` struct
31 : //! postgresql.conf - postgresql settings
32 : //! spec.json - passed to `compute_ctl`
33 : //! pgdata/
34 : //! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
35 : //! zenith.signal
36 : //! <other PostgreSQL files>
37 : //! ```
38 : //!
39 : use std::collections::BTreeMap;
40 : use std::net::SocketAddr;
41 : use std::net::TcpStream;
42 : use std::path::PathBuf;
43 : use std::process::Command;
44 : use std::sync::Arc;
45 : use std::time::Duration;
46 :
47 : use anyhow::{anyhow, bail, Context, Result};
48 : use serde::{Deserialize, Serialize};
49 : use serde_with::{serde_as, DisplayFromStr};
50 : use utils::id::{NodeId, TenantId, TimelineId};
51 :
52 : use crate::local_env::LocalEnv;
53 : use crate::pageserver::PageServerNode;
54 : use crate::postgresql_conf::PostgresConf;
55 :
56 : use compute_api::responses::{ComputeState, ComputeStatus};
57 : use compute_api::spec::{Cluster, ComputeMode, ComputeSpec};
58 :
59 : // contents of a endpoint.json file
60 : #[serde_as]
61 CBC 120981 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
62 : pub struct EndpointConf {
63 : endpoint_id: String,
64 : #[serde_as(as = "DisplayFromStr")]
65 : tenant_id: TenantId,
66 : #[serde_as(as = "DisplayFromStr")]
67 : timeline_id: TimelineId,
68 : mode: ComputeMode,
69 : pg_port: u16,
70 : http_port: u16,
71 : pg_version: u32,
72 : skip_pg_catalog_updates: bool,
73 : pageserver_id: NodeId,
74 : }
75 :
76 : //
77 : // ComputeControlPlane
78 : //
79 : pub struct ComputeControlPlane {
80 : base_port: u16,
81 :
82 : // endpoint ID is the key
83 : pub endpoints: BTreeMap<String, Arc<Endpoint>>,
84 :
85 : env: LocalEnv,
86 : }
87 :
88 : impl ComputeControlPlane {
89 : // Load current endpoints from the endpoints/ subdirectories
90 1842 : pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
91 1842 : let mut endpoints = BTreeMap::default();
92 5761 : for endpoint_dir in std::fs::read_dir(env.endpoints_path())
93 1842 : .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
94 5761 : {
95 5761 : let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
96 5761 : endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
97 : }
98 :
99 1842 : Ok(ComputeControlPlane {
100 1842 : base_port: 55431,
101 1842 : endpoints,
102 1842 : env,
103 1842 : })
104 1842 : }
105 :
106 6 : fn get_port(&mut self) -> u16 {
107 6 : 1 + self
108 6 : .endpoints
109 6 : .values()
110 6 : .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port()))
111 6 : .max()
112 6 : .unwrap_or(self.base_port)
113 6 : }
114 :
115 : #[allow(clippy::too_many_arguments)]
116 568 : pub fn new_endpoint(
117 568 : &mut self,
118 568 : endpoint_id: &str,
119 568 : tenant_id: TenantId,
120 568 : timeline_id: TimelineId,
121 568 : pg_port: Option<u16>,
122 568 : http_port: Option<u16>,
123 568 : pg_version: u32,
124 568 : mode: ComputeMode,
125 568 : pageserver_id: NodeId,
126 568 : ) -> Result<Arc<Endpoint>> {
127 568 : let pg_port = pg_port.unwrap_or_else(|| self.get_port());
128 568 : let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
129 568 : let pageserver =
130 568 : PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
131 568 : let ep = Arc::new(Endpoint {
132 568 : endpoint_id: endpoint_id.to_owned(),
133 568 : pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
134 568 : http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
135 568 : env: self.env.clone(),
136 568 : pageserver,
137 568 : timeline_id,
138 568 : mode,
139 568 : tenant_id,
140 568 : pg_version,
141 568 : // We don't setup roles and databases in the spec locally, so we don't need to
142 568 : // do catalog updates. Catalog updates also include check availability
143 568 : // data creation. Yet, we have tests that check that size and db dump
144 568 : // before and after start are the same. So, skip catalog updates,
145 568 : // with this we basically test a case of waking up an idle compute, where
146 568 : // we also skip catalog updates in the cloud.
147 568 : skip_pg_catalog_updates: true,
148 568 : });
149 568 :
150 568 : ep.create_endpoint_dir()?;
151 : std::fs::write(
152 568 : ep.endpoint_path().join("endpoint.json"),
153 568 : serde_json::to_string_pretty(&EndpointConf {
154 568 : endpoint_id: endpoint_id.to_string(),
155 568 : tenant_id,
156 568 : timeline_id,
157 568 : mode,
158 568 : http_port,
159 568 : pg_port,
160 568 : pg_version,
161 568 : skip_pg_catalog_updates: true,
162 568 : pageserver_id,
163 568 : })?,
164 UBC 0 : )?;
165 : std::fs::write(
166 CBC 568 : ep.endpoint_path().join("postgresql.conf"),
167 568 : ep.setup_pg_conf()?.to_string(),
168 UBC 0 : )?;
169 :
170 CBC 568 : self.endpoints
171 568 : .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
172 568 :
173 568 : Ok(ep)
174 568 : }
175 : }
176 :
177 : ///////////////////////////////////////////////////////////////////////////////
178 :
179 UBC 0 : #[derive(Debug)]
180 : pub struct Endpoint {
181 : /// used as the directory name
182 : endpoint_id: String,
183 : pub tenant_id: TenantId,
184 : pub timeline_id: TimelineId,
185 : pub mode: ComputeMode,
186 :
187 : // port and address of the Postgres server and `compute_ctl`'s HTTP API
188 : pub pg_address: SocketAddr,
189 : pub http_address: SocketAddr,
190 :
191 : // postgres major version in the format: 14, 15, etc.
192 : pg_version: u32,
193 :
194 : // These are not part of the endpoint as such, but the environment
195 : // the endpoint runs in.
196 : pub env: LocalEnv,
197 : pageserver: PageServerNode,
198 :
199 : // Optimizations
200 : skip_pg_catalog_updates: bool,
201 : }
202 :
203 : impl Endpoint {
204 CBC 5761 : fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
205 5761 : if !entry.file_type()?.is_dir() {
206 UBC 0 : anyhow::bail!(
207 0 : "Endpoint::from_dir_entry failed: '{}' is not a directory",
208 0 : entry.path().display()
209 0 : );
210 CBC 5761 : }
211 5761 :
212 5761 : // parse data directory name
213 5761 : let fname = entry.file_name();
214 5761 : let endpoint_id = fname.to_str().unwrap().to_string();
215 :
216 : // Read the endpoint.json file
217 5761 : let conf: EndpointConf =
218 5761 : serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
219 :
220 5761 : let pageserver =
221 5761 : PageServerNode::from_env(env, env.get_pageserver_conf(conf.pageserver_id)?);
222 :
223 5761 : Ok(Endpoint {
224 5761 : pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
225 5761 : http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
226 5761 : endpoint_id,
227 5761 : env: env.clone(),
228 5761 : pageserver,
229 5761 : timeline_id: conf.timeline_id,
230 5761 : mode: conf.mode,
231 5761 : tenant_id: conf.tenant_id,
232 5761 : pg_version: conf.pg_version,
233 5761 : skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
234 5761 : })
235 5761 : }
236 :
237 568 : fn create_endpoint_dir(&self) -> Result<()> {
238 568 : std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
239 UBC 0 : format!(
240 0 : "could not create endpoint directory {}",
241 0 : self.endpoint_path().display()
242 0 : )
243 CBC 568 : })
244 568 : }
245 :
246 : // Generate postgresql.conf with default configuration
247 568 : fn setup_pg_conf(&self) -> Result<PostgresConf> {
248 568 : let mut conf = PostgresConf::new();
249 568 : conf.append("max_wal_senders", "10");
250 568 : conf.append("wal_log_hints", "off");
251 568 : conf.append("max_replication_slots", "10");
252 568 : conf.append("hot_standby", "on");
253 568 : conf.append("shared_buffers", "1MB");
254 568 : conf.append("fsync", "off");
255 568 : conf.append("max_connections", "100");
256 568 : conf.append("wal_level", "logical");
257 568 : // wal_sender_timeout is the maximum time to wait for WAL replication.
258 568 : // It also defines how often the walreciever will send a feedback message to the wal sender.
259 568 : conf.append("wal_sender_timeout", "5s");
260 568 : conf.append("listen_addresses", &self.pg_address.ip().to_string());
261 568 : conf.append("port", &self.pg_address.port().to_string());
262 568 : conf.append("wal_keep_size", "0");
263 568 : // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
264 568 : conf.append("restart_after_crash", "off");
265 568 :
266 568 : // Load the 'neon' extension
267 568 : conf.append("shared_preload_libraries", "neon");
268 568 :
269 568 : conf.append_line("");
270 568 : // Replication-related configurations, such as WAL sending
271 568 : match &self.mode {
272 : ComputeMode::Primary => {
273 : // Configure backpressure
274 : // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
275 : // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
276 : // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
277 : // Actually latency should be much smaller (better if < 1sec). But we assume that recently
278 : // updates pages are not requested from pageserver.
279 : // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
280 : // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
281 : // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
282 : // recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
283 : // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
284 : // To be able to restore database in case of pageserver node crash, safekeeper should not
285 : // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
286 : // (if they are not able to upload WAL to S3).
287 479 : conf.append("max_replication_write_lag", "15MB");
288 479 : conf.append("max_replication_flush_lag", "10GB");
289 479 :
290 479 : if !self.env.safekeepers.is_empty() {
291 479 : // Configure Postgres to connect to the safekeepers
292 479 : conf.append("synchronous_standby_names", "walproposer");
293 479 :
294 479 : let safekeepers = self
295 479 : .env
296 479 : .safekeepers
297 479 : .iter()
298 650 : .map(|sk| format!("localhost:{}", sk.get_compute_port()))
299 479 : .collect::<Vec<String>>()
300 479 : .join(",");
301 479 : conf.append("neon.safekeepers", &safekeepers);
302 479 : } else {
303 UBC 0 : // We only use setup without safekeepers for tests,
304 0 : // and don't care about data durability on pageserver,
305 0 : // so set more relaxed synchronous_commit.
306 0 : conf.append("synchronous_commit", "remote_write");
307 0 :
308 0 : // Configure the node to stream WAL directly to the pageserver
309 0 : // This isn't really a supported configuration, but can be useful for
310 0 : // testing.
311 0 : conf.append("synchronous_standby_names", "pageserver");
312 0 : }
313 : }
314 CBC 88 : ComputeMode::Static(lsn) => {
315 88 : conf.append("recovery_target_lsn", &lsn.to_string());
316 88 : }
317 : ComputeMode::Replica => {
318 1 : assert!(!self.env.safekeepers.is_empty());
319 :
320 : // TODO: use future host field from safekeeper spec
321 : // Pass the list of safekeepers to the replica so that it can connect to any of them,
322 : // whichever is available.
323 1 : let sk_ports = self
324 1 : .env
325 1 : .safekeepers
326 1 : .iter()
327 1 : .map(|x| x.get_compute_port().to_string())
328 1 : .collect::<Vec<_>>()
329 1 : .join(",");
330 1 : let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
331 1 :
332 1 : let connstr = format!(
333 1 : "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
334 1 : sk_hosts,
335 1 : sk_ports,
336 1 : &self.timeline_id.to_string(),
337 1 : &self.tenant_id.to_string(),
338 1 : );
339 1 :
340 1 : let slot_name = format!("repl_{}_", self.timeline_id);
341 1 : conf.append("primary_conninfo", connstr.as_str());
342 1 : conf.append("primary_slot_name", slot_name.as_str());
343 1 : conf.append("hot_standby", "on");
344 1 : // prefetching of blocks referenced in WAL doesn't make sense for us
345 1 : // Neon hot standby ignores pages that are not in the shared_buffers
346 1 : if self.pg_version >= 15 {
347 UBC 0 : conf.append("recovery_prefetch", "off");
348 CBC 1 : }
349 : }
350 : }
351 :
352 568 : Ok(conf)
353 568 : }
354 :
355 8217 : pub fn endpoint_path(&self) -> PathBuf {
356 8217 : self.env.endpoints_path().join(&self.endpoint_id)
357 8217 : }
358 :
359 2664 : pub fn pgdata(&self) -> PathBuf {
360 2664 : self.endpoint_path().join("pgdata")
361 2664 : }
362 :
363 641 : pub fn status(&self) -> &str {
364 641 : let timeout = Duration::from_millis(300);
365 641 : let has_pidfile = self.pgdata().join("postmaster.pid").exists();
366 641 : let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
367 641 :
368 641 : match (has_pidfile, can_connect) {
369 UBC 0 : (true, true) => "running",
370 CBC 641 : (false, false) => "stopped",
371 UBC 0 : (true, false) => "crashed",
372 0 : (false, true) => "running, no pidfile",
373 : }
374 CBC 641 : }
375 :
376 634 : fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
377 634 : let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
378 634 : let mut cmd = Command::new(&pg_ctl_path);
379 634 : cmd.args(
380 634 : [
381 634 : &[
382 634 : "-D",
383 634 : self.pgdata().to_str().unwrap(),
384 634 : "-w", //wait till pg_ctl actually does what was asked
385 634 : ],
386 634 : args,
387 634 : ]
388 634 : .concat(),
389 634 : )
390 634 : .env_clear()
391 634 : .env(
392 634 : "LD_LIBRARY_PATH",
393 634 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
394 634 : )
395 634 : .env(
396 634 : "DYLD_LIBRARY_PATH",
397 634 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
398 : );
399 :
400 : // Pass authentication token used for the connections to pageserver and safekeepers
401 634 : if let Some(token) = auth_token {
402 UBC 0 : cmd.env("NEON_AUTH_TOKEN", token);
403 CBC 634 : }
404 :
405 634 : let pg_ctl = cmd
406 634 : .output()
407 634 : .context(format!("{} failed", pg_ctl_path.display()))?;
408 634 : if !pg_ctl.status.success() {
409 21 : anyhow::bail!(
410 21 : "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
411 21 : pg_ctl.status,
412 21 : String::from_utf8_lossy(&pg_ctl.stdout),
413 21 : String::from_utf8_lossy(&pg_ctl.stderr),
414 21 : );
415 613 : }
416 613 :
417 613 : // Also wait for the compute_ctl process to die. It might have some cleanup
418 613 : // work to do after postgres stops, like syncing safekeepers, etc.
419 613 : //
420 613 : // TODO use background_process::stop_process instead
421 613 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
422 613 : let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
423 613 : let pid = nix::unistd::Pid::from_raw(pid as i32);
424 613 : crate::background_process::wait_until_stopped("compute_ctl", pid)?;
425 :
426 613 : Ok(())
427 634 : }
428 :
429 641 : pub fn start(
430 641 : &self,
431 641 : auth_token: &Option<String>,
432 641 : safekeepers: Vec<NodeId>,
433 641 : remote_ext_config: Option<&String>,
434 641 : ) -> Result<()> {
435 641 : if self.status() == "running" {
436 UBC 0 : anyhow::bail!("The endpoint is already running");
437 CBC 641 : }
438 641 :
439 641 : // Slurp the endpoints/<endpoint id>/postgresql.conf file into
440 641 : // memory. We will include it in the spec file that we pass to
441 641 : // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
442 641 : // in the data directory.
443 641 : let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
444 641 : let postgresql_conf = match std::fs::read(&postgresql_conf_path) {
445 641 : Ok(content) => String::from_utf8(content)?,
446 UBC 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => "".to_string(),
447 0 : Err(e) => {
448 0 : return Err(anyhow::Error::new(e).context(format!(
449 0 : "failed to read config file in {}",
450 0 : postgresql_conf_path.to_str().unwrap()
451 0 : )))
452 : }
453 : };
454 :
455 : // We always start the compute node from scratch, so if the Postgres
456 : // data dir exists from a previous launch, remove it first.
457 CBC 641 : if self.pgdata().exists() {
458 76 : std::fs::remove_dir_all(self.pgdata())?;
459 565 : }
460 :
461 641 : let pageserver_connstring = {
462 641 : let config = &self.pageserver.pg_connection_config;
463 641 : let (host, port) = (config.host(), config.port());
464 641 :
465 641 : // NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
466 641 : format!("postgresql://no_user@{host}:{port}")
467 641 : };
468 641 : let mut safekeeper_connstrings = Vec::new();
469 641 : if self.mode == ComputeMode::Primary {
470 1301 : for sk_id in safekeepers {
471 749 : let sk = self
472 749 : .env
473 749 : .safekeepers
474 749 : .iter()
475 1048 : .find(|node| node.id == sk_id)
476 749 : .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
477 749 : safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
478 : }
479 89 : }
480 :
481 : // Create spec file
482 641 : let spec = ComputeSpec {
483 641 : skip_pg_catalog_updates: self.skip_pg_catalog_updates,
484 641 : format_version: 1.0,
485 641 : operation_uuid: None,
486 641 : cluster: Cluster {
487 641 : cluster_id: None, // project ID: not used
488 641 : name: None, // project name: not used
489 641 : state: None,
490 641 : roles: vec![],
491 641 : databases: vec![],
492 641 : settings: None,
493 641 : postgresql_conf: Some(postgresql_conf),
494 641 : },
495 641 : delta_operations: None,
496 641 : tenant_id: Some(self.tenant_id),
497 641 : timeline_id: Some(self.timeline_id),
498 641 : mode: self.mode,
499 641 : pageserver_connstring: Some(pageserver_connstring),
500 641 : safekeeper_connstrings,
501 641 : storage_auth_token: auth_token.clone(),
502 641 : remote_extensions: None,
503 641 : };
504 641 : let spec_path = self.endpoint_path().join("spec.json");
505 641 : std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
506 :
507 : // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
508 641 : let logfile = std::fs::OpenOptions::new()
509 641 : .create(true)
510 641 : .append(true)
511 641 : .open(self.endpoint_path().join("compute.log"))?;
512 :
513 : // Launch compute_ctl
514 641 : println!("Starting postgres node at '{}'", self.connstr());
515 641 : let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
516 641 : cmd.args(["--http-port", &self.http_address.port().to_string()])
517 641 : .args(["--pgdata", self.pgdata().to_str().unwrap()])
518 641 : .args(["--connstr", &self.connstr()])
519 641 : .args([
520 641 : "--spec-path",
521 641 : self.endpoint_path().join("spec.json").to_str().unwrap(),
522 641 : ])
523 641 : .args([
524 641 : "--pgbin",
525 641 : self.env
526 641 : .pg_bin_dir(self.pg_version)?
527 641 : .join("postgres")
528 641 : .to_str()
529 641 : .unwrap(),
530 641 : ])
531 641 : .stdin(std::process::Stdio::null())
532 641 : .stderr(logfile.try_clone()?)
533 641 : .stdout(logfile);
534 :
535 641 : if let Some(remote_ext_config) = remote_ext_config {
536 UBC 0 : cmd.args(["--remote-ext-config", remote_ext_config]);
537 CBC 641 : }
538 :
539 641 : let child = cmd.spawn()?;
540 :
541 : // Write down the pid so we can wait for it when we want to stop
542 : // TODO use background_process::start_process instead
543 641 : let pid = child.id();
544 641 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
545 641 : std::fs::write(pidfile_path, pid.to_string())?;
546 :
547 : // Wait for it to start
548 641 : let mut attempt = 0;
549 : const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
550 : const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
551 : loop {
552 2069 : attempt += 1;
553 2069 : match self.get_status() {
554 1436 : Ok(state) => {
555 1436 : match state.status {
556 : ComputeStatus::Init => {
557 795 : if attempt == MAX_ATTEMPTS {
558 UBC 0 : bail!("compute startup timed out; still in Init state");
559 CBC 795 : }
560 : // keep retrying
561 : }
562 : ComputeStatus::Running => {
563 : // All good!
564 632 : break;
565 : }
566 : ComputeStatus::Failed => {
567 9 : bail!(
568 9 : "compute startup failed: {}",
569 9 : state
570 9 : .error
571 9 : .as_deref()
572 9 : .unwrap_or("<no error from compute_ctl>")
573 9 : );
574 : }
575 : ComputeStatus::Empty
576 : | ComputeStatus::ConfigurationPending
577 : | ComputeStatus::Configuration => {
578 UBC 0 : bail!("unexpected compute status: {:?}", state.status)
579 : }
580 : }
581 : }
582 CBC 633 : Err(e) => {
583 633 : if attempt == MAX_ATTEMPTS {
584 UBC 0 : return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
585 CBC 633 : }
586 : }
587 : }
588 1428 : std::thread::sleep(ATTEMPT_INTERVAL);
589 : }
590 :
591 632 : Ok(())
592 641 : }
593 :
594 : // Call the /status HTTP API
595 2069 : pub fn get_status(&self) -> Result<ComputeState> {
596 2069 : let client = reqwest::blocking::Client::new();
597 :
598 2069 : let response = client
599 2069 : .request(
600 2069 : reqwest::Method::GET,
601 2069 : format!(
602 2069 : "http://{}:{}/status",
603 2069 : self.http_address.ip(),
604 2069 : self.http_address.port()
605 2069 : ),
606 2069 : )
607 2069 : .send()?;
608 :
609 : // Interpret the response
610 1436 : let status = response.status();
611 1436 : if !(status.is_client_error() || status.is_server_error()) {
612 1436 : Ok(response.json()?)
613 : } else {
614 : // reqwest does not export its error construction utility functions, so let's craft the message ourselves
615 UBC 0 : let url = response.url().to_owned();
616 0 : let msg = match response.text() {
617 0 : Ok(err_body) => format!("Error: {}", err_body),
618 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
619 : };
620 0 : Err(anyhow::anyhow!(msg))
621 : }
622 CBC 2069 : }
623 :
624 634 : pub fn stop(&self, destroy: bool) -> Result<()> {
625 634 : // If we are going to destroy data directory,
626 634 : // use immediate shutdown mode, otherwise,
627 634 : // shutdown gracefully to leave the data directory sane.
628 634 : //
629 634 : // Postgres is always started from scratch, so stop
630 634 : // without destroy only used for testing and debugging.
631 634 : //
632 634 : if destroy {
633 31 : self.pg_ctl(&["-m", "immediate", "stop"], &None)?;
634 31 : println!(
635 31 : "Destroying postgres data directory '{}'",
636 31 : self.pgdata().to_str().unwrap()
637 31 : );
638 31 : std::fs::remove_dir_all(self.endpoint_path())?;
639 : } else {
640 603 : self.pg_ctl(&["stop"], &None)?;
641 : }
642 613 : Ok(())
643 634 : }
644 :
645 1282 : pub fn connstr(&self) -> String {
646 1282 : format!(
647 1282 : "postgresql://{}@{}:{}/{}",
648 1282 : "cloud_admin",
649 1282 : self.pg_address.ip(),
650 1282 : self.pg_address.port(),
651 1282 : "postgres"
652 1282 : )
653 1282 : }
654 : }
|