Line data Source code
1 : //! Code to manage compute endpoints
2 : //!
3 : //! In the local test environment, the data for each endpoint is stored in
4 : //!
5 : //! ```text
6 : //! .neon/endpoints/<endpoint id>
7 : //! ```
8 : //!
9 : //! Some basic information about the endpoint, like the tenant and timeline IDs,
10 : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
11 : //! when the endpoint is created, and doesn't change afterwards.
12 : //!
13 : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
14 : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
15 : //! the basebackup from the pageserver to initialize the the data directory, and
16 : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
17 : //! until it exits.
18 : //!
19 : //! When an endpoint is created, a `postgresql.conf` file is also created in
20 : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
21 : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
22 : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
23 : //! copy of it in the data directory.
24 : //!
25 : //! Directory contents:
26 : //!
27 : //! ```text
28 : //! .neon/endpoints/main/
29 : //! compute.log - log output of `compute_ctl` and `postgres`
30 : //! endpoint.json - serialized `EndpointConf` struct
31 : //! postgresql.conf - postgresql settings
32 : //! spec.json - passed to `compute_ctl`
33 : //! pgdata/
34 : //! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
35 : //! zenith.signal
36 : //! <other PostgreSQL files>
37 : //! ```
38 : //!
39 : use std::collections::BTreeMap;
40 : use std::net::SocketAddr;
41 : use std::net::TcpStream;
42 : use std::path::PathBuf;
43 : use std::process::Command;
44 : use std::sync::Arc;
45 : use std::time::Duration;
46 :
47 : use anyhow::{anyhow, bail, Context, Result};
48 : use compute_api::spec::RemoteExtSpec;
49 : use nix::sys::signal::kill;
50 : use nix::sys::signal::Signal;
51 : use serde::{Deserialize, Serialize};
52 : use url::Host;
53 : use utils::id::{NodeId, TenantId, TimelineId};
54 :
55 : use crate::attachment_service::AttachmentService;
56 : use crate::local_env::LocalEnv;
57 : use crate::postgresql_conf::PostgresConf;
58 :
59 : use compute_api::responses::{ComputeState, ComputeStatus};
60 : use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
61 :
62 : // contents of a endpoint.json file
63 74024 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
64 : pub struct EndpointConf {
65 : endpoint_id: String,
66 : tenant_id: TenantId,
67 : timeline_id: TimelineId,
68 : mode: ComputeMode,
69 : pg_port: u16,
70 : http_port: u16,
71 : pg_version: u32,
72 : skip_pg_catalog_updates: bool,
73 : features: Vec<ComputeFeature>,
74 : }
75 :
76 : //
77 : // ComputeControlPlane
78 : //
79 : pub struct ComputeControlPlane {
80 : base_port: u16,
81 :
82 : // endpoint ID is the key
83 : pub endpoints: BTreeMap<String, Arc<Endpoint>>,
84 :
85 : env: LocalEnv,
86 : }
87 :
88 : impl ComputeControlPlane {
89 : // Load current endpoints from the endpoints/ subdirectories
90 2400 : pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
91 2400 : let mut endpoints = BTreeMap::default();
92 3896 : for endpoint_dir in std::fs::read_dir(env.endpoints_path())
93 2400 : .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
94 3896 : {
95 3896 : let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
96 3896 : endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
97 : }
98 :
99 2400 : Ok(ComputeControlPlane {
100 2400 : base_port: 55431,
101 2400 : endpoints,
102 2400 : env,
103 2400 : })
104 2400 : }
105 :
106 8 : fn get_port(&mut self) -> u16 {
107 8 : 1 + self
108 8 : .endpoints
109 8 : .values()
110 8 : .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port()))
111 8 : .max()
112 8 : .unwrap_or(self.base_port)
113 8 : }
114 :
115 : #[allow(clippy::too_many_arguments)]
116 522 : pub fn new_endpoint(
117 522 : &mut self,
118 522 : endpoint_id: &str,
119 522 : tenant_id: TenantId,
120 522 : timeline_id: TimelineId,
121 522 : pg_port: Option<u16>,
122 522 : http_port: Option<u16>,
123 522 : pg_version: u32,
124 522 : mode: ComputeMode,
125 522 : ) -> Result<Arc<Endpoint>> {
126 522 : let pg_port = pg_port.unwrap_or_else(|| self.get_port());
127 522 : let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
128 522 : let ep = Arc::new(Endpoint {
129 522 : endpoint_id: endpoint_id.to_owned(),
130 522 : pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
131 522 : http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
132 522 : env: self.env.clone(),
133 522 : timeline_id,
134 522 : mode,
135 522 : tenant_id,
136 522 : pg_version,
137 522 : // We don't setup roles and databases in the spec locally, so we don't need to
138 522 : // do catalog updates. Catalog updates also include check availability
139 522 : // data creation. Yet, we have tests that check that size and db dump
140 522 : // before and after start are the same. So, skip catalog updates,
141 522 : // with this we basically test a case of waking up an idle compute, where
142 522 : // we also skip catalog updates in the cloud.
143 522 : skip_pg_catalog_updates: true,
144 522 : features: vec![],
145 522 : });
146 522 :
147 522 : ep.create_endpoint_dir()?;
148 : std::fs::write(
149 522 : ep.endpoint_path().join("endpoint.json"),
150 522 : serde_json::to_string_pretty(&EndpointConf {
151 522 : endpoint_id: endpoint_id.to_string(),
152 522 : tenant_id,
153 522 : timeline_id,
154 522 : mode,
155 522 : http_port,
156 522 : pg_port,
157 522 : pg_version,
158 522 : skip_pg_catalog_updates: true,
159 522 : features: vec![],
160 522 : })?,
161 0 : )?;
162 : std::fs::write(
163 522 : ep.endpoint_path().join("postgresql.conf"),
164 522 : ep.setup_pg_conf()?.to_string(),
165 0 : )?;
166 :
167 522 : self.endpoints
168 522 : .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
169 522 :
170 522 : Ok(ep)
171 522 : }
172 :
173 1131 : pub fn check_conflicting_endpoints(
174 1131 : &self,
175 1131 : mode: ComputeMode,
176 1131 : tenant_id: TenantId,
177 1131 : timeline_id: TimelineId,
178 1131 : ) -> Result<()> {
179 1131 : if matches!(mode, ComputeMode::Primary) {
180 : // this check is not complete, as you could have a concurrent attempt at
181 : // creating another primary, both reading the state before checking it here,
182 : // but it's better than nothing.
183 1514 : let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
184 1514 : v.tenant_id == tenant_id
185 1393 : && v.timeline_id == timeline_id
186 957 : && v.mode == mode
187 953 : && v.status() != EndpointStatus::Stopped
188 1514 : });
189 :
190 1023 : if let Some((key, _)) = duplicates.next() {
191 29 : bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
192 994 : }
193 108 : }
194 1102 : Ok(())
195 1131 : }
196 : }
197 :
198 : ///////////////////////////////////////////////////////////////////////////////
199 :
200 0 : #[derive(Debug)]
201 : pub struct Endpoint {
202 : /// used as the directory name
203 : endpoint_id: String,
204 : pub tenant_id: TenantId,
205 : pub timeline_id: TimelineId,
206 : pub mode: ComputeMode,
207 :
208 : // port and address of the Postgres server and `compute_ctl`'s HTTP API
209 : pub pg_address: SocketAddr,
210 : pub http_address: SocketAddr,
211 :
212 : // postgres major version in the format: 14, 15, etc.
213 : pg_version: u32,
214 :
215 : // These are not part of the endpoint as such, but the environment
216 : // the endpoint runs in.
217 : pub env: LocalEnv,
218 :
219 : // Optimizations
220 : skip_pg_catalog_updates: bool,
221 :
222 : // Feature flags
223 : features: Vec<ComputeFeature>,
224 : }
225 :
226 1540 : #[derive(PartialEq, Eq)]
227 : pub enum EndpointStatus {
228 : Running,
229 : Stopped,
230 : Crashed,
231 : RunningNoPidfile,
232 : }
233 :
234 : impl std::fmt::Display for EndpointStatus {
235 0 : fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
236 0 : let s = match self {
237 0 : Self::Running => "running",
238 0 : Self::Stopped => "stopped",
239 0 : Self::Crashed => "crashed",
240 0 : Self::RunningNoPidfile => "running, no pidfile",
241 : };
242 0 : write!(writer, "{}", s)
243 0 : }
244 : }
245 :
246 : impl Endpoint {
247 3896 : fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
248 3896 : if !entry.file_type()?.is_dir() {
249 0 : anyhow::bail!(
250 0 : "Endpoint::from_dir_entry failed: '{}' is not a directory",
251 0 : entry.path().display()
252 0 : );
253 3896 : }
254 3896 :
255 3896 : // parse data directory name
256 3896 : let fname = entry.file_name();
257 3896 : let endpoint_id = fname.to_str().unwrap().to_string();
258 :
259 : // Read the endpoint.json file
260 3896 : let conf: EndpointConf =
261 3896 : serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
262 :
263 3896 : Ok(Endpoint {
264 3896 : pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
265 3896 : http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
266 3896 : endpoint_id,
267 3896 : env: env.clone(),
268 3896 : timeline_id: conf.timeline_id,
269 3896 : mode: conf.mode,
270 3896 : tenant_id: conf.tenant_id,
271 3896 : pg_version: conf.pg_version,
272 3896 : skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
273 3896 : features: conf.features,
274 3896 : })
275 3896 : }
276 :
277 522 : fn create_endpoint_dir(&self) -> Result<()> {
278 522 : std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
279 0 : format!(
280 0 : "could not create endpoint directory {}",
281 0 : self.endpoint_path().display()
282 0 : )
283 522 : })
284 522 : }
285 :
286 : // Generate postgresql.conf with default configuration
287 522 : fn setup_pg_conf(&self) -> Result<PostgresConf> {
288 522 : let mut conf = PostgresConf::new();
289 522 : conf.append("max_wal_senders", "10");
290 522 : conf.append("wal_log_hints", "off");
291 522 : conf.append("max_replication_slots", "10");
292 522 : conf.append("hot_standby", "on");
293 522 : conf.append("hot_standby_feedback", "on");
294 522 : conf.append("shared_buffers", "1MB");
295 522 : conf.append("fsync", "off");
296 522 : conf.append("max_connections", "100");
297 522 : conf.append("wal_level", "logical");
298 522 : // wal_sender_timeout is the maximum time to wait for WAL replication.
299 522 : // It also defines how often the walreciever will send a feedback message to the wal sender.
300 522 : conf.append("wal_sender_timeout", "5s");
301 522 : conf.append("listen_addresses", &self.pg_address.ip().to_string());
302 522 : conf.append("port", &self.pg_address.port().to_string());
303 522 : conf.append("wal_keep_size", "0");
304 522 : // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
305 522 : conf.append("restart_after_crash", "off");
306 522 :
307 522 : // Load the 'neon' extension
308 522 : conf.append("shared_preload_libraries", "neon");
309 522 :
310 522 : conf.append_line("");
311 522 : // Replication-related configurations, such as WAL sending
312 522 : match &self.mode {
313 : ComputeMode::Primary => {
314 : // Configure backpressure
315 : // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
316 : // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
317 : // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
318 : // Actually latency should be much smaller (better if < 1sec). But we assume that recently
319 : // updates pages are not requested from pageserver.
320 : // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
321 : // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
322 : // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
323 : // recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
324 : // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
325 : // To be able to restore database in case of pageserver node crash, safekeeper should not
326 : // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
327 : // (if they are not able to upload WAL to S3).
328 468 : conf.append("max_replication_write_lag", "15MB");
329 468 : conf.append("max_replication_flush_lag", "10GB");
330 468 :
331 468 : if !self.env.safekeepers.is_empty() {
332 468 : // Configure Postgres to connect to the safekeepers
333 468 : conf.append("synchronous_standby_names", "walproposer");
334 468 :
335 468 : let safekeepers = self
336 468 : .env
337 468 : .safekeepers
338 468 : .iter()
339 609 : .map(|sk| format!("localhost:{}", sk.get_compute_port()))
340 468 : .collect::<Vec<String>>()
341 468 : .join(",");
342 468 : conf.append("neon.safekeepers", &safekeepers);
343 468 : } else {
344 0 : // We only use setup without safekeepers for tests,
345 0 : // and don't care about data durability on pageserver,
346 0 : // so set more relaxed synchronous_commit.
347 0 : conf.append("synchronous_commit", "remote_write");
348 0 :
349 0 : // Configure the node to stream WAL directly to the pageserver
350 0 : // This isn't really a supported configuration, but can be useful for
351 0 : // testing.
352 0 : conf.append("synchronous_standby_names", "pageserver");
353 0 : }
354 : }
355 49 : ComputeMode::Static(lsn) => {
356 49 : conf.append("recovery_target_lsn", &lsn.to_string());
357 49 : }
358 : ComputeMode::Replica => {
359 5 : assert!(!self.env.safekeepers.is_empty());
360 :
361 : // TODO: use future host field from safekeeper spec
362 : // Pass the list of safekeepers to the replica so that it can connect to any of them,
363 : // whichever is available.
364 5 : let sk_ports = self
365 5 : .env
366 5 : .safekeepers
367 5 : .iter()
368 5 : .map(|x| x.get_compute_port().to_string())
369 5 : .collect::<Vec<_>>()
370 5 : .join(",");
371 5 : let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
372 5 :
373 5 : let connstr = format!(
374 5 : "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
375 5 : sk_hosts,
376 5 : sk_ports,
377 5 : &self.timeline_id.to_string(),
378 5 : &self.tenant_id.to_string(),
379 5 : );
380 5 :
381 5 : let slot_name = format!("repl_{}_", self.timeline_id);
382 5 : conf.append("primary_conninfo", connstr.as_str());
383 5 : conf.append("primary_slot_name", slot_name.as_str());
384 5 : conf.append("hot_standby", "on");
385 5 : // prefetching of blocks referenced in WAL doesn't make sense for us
386 5 : // Neon hot standby ignores pages that are not in the shared_buffers
387 5 : if self.pg_version >= 15 {
388 0 : conf.append("recovery_prefetch", "off");
389 5 : }
390 : }
391 : }
392 :
393 522 : Ok(conf)
394 522 : }
395 :
396 9490 : pub fn endpoint_path(&self) -> PathBuf {
397 9490 : self.env.endpoints_path().join(&self.endpoint_id)
398 9490 : }
399 :
400 3376 : pub fn pgdata(&self) -> PathBuf {
401 3376 : self.endpoint_path().join("pgdata")
402 3376 : }
403 :
404 1540 : pub fn status(&self) -> EndpointStatus {
405 1540 : let timeout = Duration::from_millis(300);
406 1540 : let has_pidfile = self.pgdata().join("postmaster.pid").exists();
407 1540 : let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
408 1540 :
409 1540 : match (has_pidfile, can_connect) {
410 29 : (true, true) => EndpointStatus::Running,
411 1511 : (false, false) => EndpointStatus::Stopped,
412 0 : (true, false) => EndpointStatus::Crashed,
413 0 : (false, true) => EndpointStatus::RunningNoPidfile,
414 : }
415 1540 : }
416 :
417 576 : fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
418 576 : let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
419 576 : let mut cmd = Command::new(&pg_ctl_path);
420 576 : cmd.args(
421 576 : [
422 576 : &[
423 576 : "-D",
424 576 : self.pgdata().to_str().unwrap(),
425 576 : "-w", //wait till pg_ctl actually does what was asked
426 576 : ],
427 576 : args,
428 576 : ]
429 576 : .concat(),
430 576 : )
431 576 : .env_clear()
432 576 : .env(
433 576 : "LD_LIBRARY_PATH",
434 576 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
435 576 : )
436 576 : .env(
437 576 : "DYLD_LIBRARY_PATH",
438 576 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
439 : );
440 :
441 : // Pass authentication token used for the connections to pageserver and safekeepers
442 576 : if let Some(token) = auth_token {
443 0 : cmd.env("NEON_AUTH_TOKEN", token);
444 576 : }
445 :
446 576 : let pg_ctl = cmd
447 576 : .output()
448 576 : .context(format!("{} failed", pg_ctl_path.display()))?;
449 576 : if !pg_ctl.status.success() {
450 0 : anyhow::bail!(
451 0 : "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
452 0 : pg_ctl.status,
453 0 : String::from_utf8_lossy(&pg_ctl.stdout),
454 0 : String::from_utf8_lossy(&pg_ctl.stderr),
455 0 : );
456 576 : }
457 576 :
458 576 : Ok(())
459 576 : }
460 :
461 576 : fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
462 576 : // TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
463 576 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
464 576 : let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
465 576 : let pid = nix::unistd::Pid::from_raw(pid as i32);
466 576 : if send_sigterm {
467 26 : kill(pid, Signal::SIGTERM).ok();
468 550 : }
469 576 : crate::background_process::wait_until_stopped("compute_ctl", pid)?;
470 576 : Ok(())
471 576 : }
472 :
473 805 : fn read_postgresql_conf(&self) -> Result<String> {
474 805 : // Slurp the endpoints/<endpoint id>/postgresql.conf file into
475 805 : // memory. We will include it in the spec file that we pass to
476 805 : // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
477 805 : // in the data directory.
478 805 : let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
479 805 : match std::fs::read(&postgresql_conf_path) {
480 805 : Ok(content) => Ok(String::from_utf8(content)?),
481 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
482 0 : Err(e) => Err(anyhow::Error::new(e).context(format!(
483 0 : "failed to read config file in {}",
484 0 : postgresql_conf_path.to_str().unwrap()
485 0 : ))),
486 : }
487 805 : }
488 :
489 805 : fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
490 805 : pageservers
491 805 : .iter()
492 823 : .map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
493 805 : .collect::<Vec<_>>()
494 805 : .join(",")
495 805 : }
496 :
497 584 : pub async fn start(
498 584 : &self,
499 584 : auth_token: &Option<String>,
500 584 : safekeepers: Vec<NodeId>,
501 584 : pageservers: Vec<(Host, u16)>,
502 584 : remote_ext_config: Option<&String>,
503 584 : shard_stripe_size: usize,
504 584 : ) -> Result<()> {
505 584 : if self.status() == EndpointStatus::Running {
506 0 : anyhow::bail!("The endpoint is already running");
507 584 : }
508 :
509 584 : let postgresql_conf = self.read_postgresql_conf()?;
510 :
511 : // We always start the compute node from scratch, so if the Postgres
512 : // data dir exists from a previous launch, remove it first.
513 584 : if self.pgdata().exists() {
514 66 : std::fs::remove_dir_all(self.pgdata())?;
515 518 : }
516 :
517 584 : let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
518 584 : assert!(!pageserver_connstring.is_empty());
519 :
520 584 : let mut safekeeper_connstrings = Vec::new();
521 584 : if self.mode == ComputeMode::Primary {
522 1203 : for sk_id in safekeepers {
523 673 : let sk = self
524 673 : .env
525 673 : .safekeepers
526 673 : .iter()
527 891 : .find(|node| node.id == sk_id)
528 673 : .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
529 673 : safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
530 : }
531 54 : }
532 :
533 : // check for file remote_extensions_spec.json
534 : // if it is present, read it and pass to compute_ctl
535 584 : let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
536 584 : let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
537 : let remote_extensions: Option<RemoteExtSpec>;
538 :
539 584 : if let Ok(spec_file) = remote_extensions_spec {
540 1 : remote_extensions = serde_json::from_reader(spec_file).ok();
541 583 : } else {
542 583 : remote_extensions = None;
543 583 : };
544 :
545 : // Create spec file
546 584 : let spec = ComputeSpec {
547 584 : skip_pg_catalog_updates: self.skip_pg_catalog_updates,
548 584 : format_version: 1.0,
549 584 : operation_uuid: None,
550 584 : features: self.features.clone(),
551 584 : cluster: Cluster {
552 584 : cluster_id: None, // project ID: not used
553 584 : name: None, // project name: not used
554 584 : state: None,
555 584 : roles: vec![],
556 584 : databases: vec![],
557 584 : settings: None,
558 584 : postgresql_conf: Some(postgresql_conf),
559 584 : },
560 584 : delta_operations: None,
561 584 : tenant_id: Some(self.tenant_id),
562 584 : timeline_id: Some(self.timeline_id),
563 584 : mode: self.mode,
564 584 : pageserver_connstring: Some(pageserver_connstring),
565 584 : safekeeper_connstrings,
566 584 : storage_auth_token: auth_token.clone(),
567 584 : remote_extensions,
568 584 : pgbouncer_settings: None,
569 584 : shard_stripe_size: Some(shard_stripe_size),
570 584 : };
571 584 : let spec_path = self.endpoint_path().join("spec.json");
572 584 : std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
573 :
574 : // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
575 584 : let logfile = std::fs::OpenOptions::new()
576 584 : .create(true)
577 584 : .append(true)
578 584 : .open(self.endpoint_path().join("compute.log"))?;
579 :
580 : // Launch compute_ctl
581 584 : println!("Starting postgres node at '{}'", self.connstr());
582 584 : let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
583 584 : cmd.args(["--http-port", &self.http_address.port().to_string()])
584 584 : .args(["--pgdata", self.pgdata().to_str().unwrap()])
585 584 : .args(["--connstr", &self.connstr()])
586 584 : .args([
587 584 : "--spec-path",
588 584 : self.endpoint_path().join("spec.json").to_str().unwrap(),
589 584 : ])
590 584 : .args([
591 584 : "--pgbin",
592 584 : self.env
593 584 : .pg_bin_dir(self.pg_version)?
594 584 : .join("postgres")
595 584 : .to_str()
596 584 : .unwrap(),
597 584 : ])
598 584 : .stdin(std::process::Stdio::null())
599 584 : .stderr(logfile.try_clone()?)
600 584 : .stdout(logfile);
601 :
602 584 : if let Some(remote_ext_config) = remote_ext_config {
603 1 : cmd.args(["--remote-ext-config", remote_ext_config]);
604 583 : }
605 :
606 584 : let child = cmd.spawn()?;
607 : // set up a scopeguard to kill & wait for the child in case we panic or bail below
608 584 : let child = scopeguard::guard(child, |mut child| {
609 8 : println!("SIGKILL & wait the started process");
610 8 : (|| {
611 8 : // TODO: use another signal that can be caught by the child so it can clean up any children it spawned
612 8 : child.kill().context("SIGKILL child")?;
613 8 : child.wait().context("wait() for child process")?;
614 8 : anyhow::Ok(())
615 8 : })()
616 8 : .with_context(|| format!("scopeguard kill&wait child {child:?}"))
617 8 : .unwrap();
618 584 : });
619 584 :
620 584 : // Write down the pid so we can wait for it when we want to stop
621 584 : // TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
622 584 : let pid = child.id();
623 584 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
624 584 : std::fs::write(pidfile_path, pid.to_string())?;
625 :
626 : // Wait for it to start
627 584 : let mut attempt = 0;
628 : const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
629 : const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
630 : loop {
631 4055 : attempt += 1;
632 10997 : match self.get_status().await {
633 3471 : Ok(state) => {
634 3471 : match state.status {
635 : ComputeStatus::Init => {
636 2887 : if attempt == MAX_ATTEMPTS {
637 0 : bail!("compute startup timed out; still in Init state");
638 2887 : }
639 : // keep retrying
640 : }
641 : ComputeStatus::Running => {
642 : // All good!
643 576 : break;
644 : }
645 : ComputeStatus::Failed => {
646 8 : bail!(
647 8 : "compute startup failed: {}",
648 8 : state
649 8 : .error
650 8 : .as_deref()
651 8 : .unwrap_or("<no error from compute_ctl>")
652 8 : );
653 : }
654 : ComputeStatus::Empty
655 : | ComputeStatus::ConfigurationPending
656 : | ComputeStatus::Configuration => {
657 0 : bail!("unexpected compute status: {:?}", state.status)
658 : }
659 : }
660 : }
661 584 : Err(e) => {
662 584 : if attempt == MAX_ATTEMPTS {
663 0 : return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
664 584 : }
665 : }
666 : }
667 3471 : std::thread::sleep(ATTEMPT_INTERVAL);
668 : }
669 :
670 : // disarm the scopeguard, let the child outlive this function (and neon_local invoction)
671 576 : drop(scopeguard::ScopeGuard::into_inner(child));
672 576 :
673 576 : Ok(())
674 584 : }
675 :
676 : // Call the /status HTTP API
677 4055 : pub async fn get_status(&self) -> Result<ComputeState> {
678 4055 : let client = reqwest::Client::new();
679 :
680 4055 : let response = client
681 4055 : .request(
682 4055 : reqwest::Method::GET,
683 4055 : format!(
684 4055 : "http://{}:{}/status",
685 4055 : self.http_address.ip(),
686 4055 : self.http_address.port()
687 4055 : ),
688 4055 : )
689 4055 : .send()
690 10997 : .await?;
691 :
692 : // Interpret the response
693 3471 : let status = response.status();
694 3471 : if !(status.is_client_error() || status.is_server_error()) {
695 3471 : Ok(response.json().await?)
696 : } else {
697 : // reqwest does not export its error construction utility functions, so let's craft the message ourselves
698 0 : let url = response.url().to_owned();
699 0 : let msg = match response.text().await {
700 0 : Ok(err_body) => format!("Error: {}", err_body),
701 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
702 : };
703 0 : Err(anyhow::anyhow!(msg))
704 : }
705 4055 : }
706 :
707 221 : pub async fn reconfigure(&self, mut pageservers: Vec<(Host, u16)>) -> Result<()> {
708 221 : let mut spec: ComputeSpec = {
709 221 : let spec_path = self.endpoint_path().join("spec.json");
710 221 : let file = std::fs::File::open(spec_path)?;
711 221 : serde_json::from_reader(file)?
712 : };
713 :
714 221 : let postgresql_conf = self.read_postgresql_conf()?;
715 221 : spec.cluster.postgresql_conf = Some(postgresql_conf);
716 221 :
717 221 : // If we weren't given explicit pageservers, query the attachment service
718 221 : if pageservers.is_empty() {
719 0 : let attachment_service = AttachmentService::from_env(&self.env);
720 0 : let locate_result = attachment_service.tenant_locate(self.tenant_id).await?;
721 0 : pageservers = locate_result
722 0 : .shards
723 0 : .into_iter()
724 0 : .map(|shard| {
725 0 : (
726 0 : Host::parse(&shard.listen_pg_addr)
727 0 : .expect("Attachment service reported bad hostname"),
728 0 : shard.listen_pg_port,
729 0 : )
730 0 : })
731 0 : .collect::<Vec<_>>();
732 221 : }
733 :
734 221 : let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
735 221 : assert!(!pageserver_connstr.is_empty());
736 221 : spec.pageserver_connstring = Some(pageserver_connstr);
737 221 :
738 221 : let client = reqwest::Client::new();
739 221 : let response = client
740 221 : .post(format!(
741 221 : "http://{}:{}/configure",
742 221 : self.http_address.ip(),
743 221 : self.http_address.port()
744 221 : ))
745 221 : .body(format!(
746 221 : "{{\"spec\":{}}}",
747 221 : serde_json::to_string_pretty(&spec)?
748 : ))
749 221 : .send()
750 663 : .await?;
751 :
752 221 : let status = response.status();
753 221 : if !(status.is_client_error() || status.is_server_error()) {
754 221 : Ok(())
755 : } else {
756 0 : let url = response.url().to_owned();
757 0 : let msg = match response.text().await {
758 0 : Ok(err_body) => format!("Error: {}", err_body),
759 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
760 : };
761 0 : Err(anyhow::anyhow!(msg))
762 : }
763 221 : }
764 :
765 576 : pub fn stop(&self, destroy: bool) -> Result<()> {
766 576 : // If we are going to destroy data directory,
767 576 : // use immediate shutdown mode, otherwise,
768 576 : // shutdown gracefully to leave the data directory sane.
769 576 : //
770 576 : // Postgres is always started from scratch, so stop
771 576 : // without destroy only used for testing and debugging.
772 576 : //
773 576 : self.pg_ctl(
774 576 : if destroy {
775 26 : &["-m", "immediate", "stop"]
776 : } else {
777 550 : &["stop"]
778 : },
779 576 : &None,
780 0 : )?;
781 :
782 : // Also wait for the compute_ctl process to die. It might have some
783 : // cleanup work to do after postgres stops, like syncing safekeepers,
784 : // etc.
785 : //
786 : // If destroying, send it SIGTERM before waiting. Sometimes we do *not*
787 : // want this cleanup: tests intentionally do stop when majority of
788 : // safekeepers is down, so sync-safekeepers would hang otherwise. This
789 : // could be a separate flag though.
790 576 : self.wait_for_compute_ctl_to_exit(destroy)?;
791 576 : if destroy {
792 26 : println!(
793 26 : "Destroying postgres data directory '{}'",
794 26 : self.pgdata().to_str().unwrap()
795 26 : );
796 26 : std::fs::remove_dir_all(self.endpoint_path())?;
797 550 : }
798 576 : Ok(())
799 576 : }
800 :
801 1168 : pub fn connstr(&self) -> String {
802 1168 : format!(
803 1168 : "postgresql://{}@{}:{}/{}",
804 1168 : "cloud_admin",
805 1168 : self.pg_address.ip(),
806 1168 : self.pg_address.port(),
807 1168 : "postgres"
808 1168 : )
809 1168 : }
810 : }
|