Line data Source code
1 : //! Code to manage compute endpoints
2 : //!
3 : //! In the local test environment, the data for each endpoint is stored in
4 : //!
5 : //! ```text
6 : //! .neon/endpoints/<endpoint id>
7 : //! ```
8 : //!
9 : //! Some basic information about the endpoint, like the tenant and timeline IDs,
10 : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
11 : //! when the endpoint is created, and doesn't change afterwards.
12 : //!
13 : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
14 : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
15 : //! the basebackup from the pageserver to initialize the the data directory, and
16 : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
17 : //! until it exits.
18 : //!
19 : //! When an endpoint is created, a `postgresql.conf` file is also created in
20 : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
21 : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
22 : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
23 : //! copy of it in the data directory.
24 : //!
25 : //! Directory contents:
26 : //!
27 : //! ```text
28 : //! .neon/endpoints/main/
29 : //! compute.log - log output of `compute_ctl` and `postgres`
30 : //! endpoint.json - serialized `EndpointConf` struct
31 : //! postgresql.conf - postgresql settings
32 : //! spec.json - passed to `compute_ctl`
33 : //! pgdata/
34 : //! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
35 : //! zenith.signal
36 : //! <other PostgreSQL files>
37 : //! ```
38 : //!
39 : use std::collections::BTreeMap;
40 : use std::net::SocketAddr;
41 : use std::net::TcpStream;
42 : use std::path::PathBuf;
43 : use std::process::Command;
44 : use std::str::FromStr;
45 : use std::sync::Arc;
46 : use std::time::Duration;
47 :
48 : use anyhow::{anyhow, bail, Context, Result};
49 : use compute_api::spec::Database;
50 : use compute_api::spec::PgIdent;
51 : use compute_api::spec::RemoteExtSpec;
52 : use compute_api::spec::Role;
53 : use nix::sys::signal::kill;
54 : use nix::sys::signal::Signal;
55 : use serde::{Deserialize, Serialize};
56 : use url::Host;
57 : use utils::id::{NodeId, TenantId, TimelineId};
58 :
59 : use crate::attachment_service::AttachmentService;
60 : use crate::local_env::LocalEnv;
61 : use crate::postgresql_conf::PostgresConf;
62 :
63 : use compute_api::responses::{ComputeState, ComputeStatus};
64 : use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
65 :
66 : // contents of a endpoint.json file
67 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
68 : pub struct EndpointConf {
69 : endpoint_id: String,
70 : tenant_id: TenantId,
71 : timeline_id: TimelineId,
72 : mode: ComputeMode,
73 : pg_port: u16,
74 : http_port: u16,
75 : pg_version: u32,
76 : skip_pg_catalog_updates: bool,
77 : features: Vec<ComputeFeature>,
78 : }
79 :
80 : //
81 : // ComputeControlPlane
82 : //
83 : pub struct ComputeControlPlane {
84 : base_port: u16,
85 :
86 : // endpoint ID is the key
87 : pub endpoints: BTreeMap<String, Arc<Endpoint>>,
88 :
89 : env: LocalEnv,
90 : }
91 :
92 : impl ComputeControlPlane {
93 : // Load current endpoints from the endpoints/ subdirectories
94 0 : pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
95 0 : let mut endpoints = BTreeMap::default();
96 0 : for endpoint_dir in std::fs::read_dir(env.endpoints_path())
97 0 : .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
98 0 : {
99 0 : let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
100 0 : endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
101 : }
102 :
103 0 : Ok(ComputeControlPlane {
104 0 : base_port: 55431,
105 0 : endpoints,
106 0 : env,
107 0 : })
108 0 : }
109 :
110 0 : fn get_port(&mut self) -> u16 {
111 0 : 1 + self
112 0 : .endpoints
113 0 : .values()
114 0 : .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port()))
115 0 : .max()
116 0 : .unwrap_or(self.base_port)
117 0 : }
118 :
119 : #[allow(clippy::too_many_arguments)]
120 0 : pub fn new_endpoint(
121 0 : &mut self,
122 0 : endpoint_id: &str,
123 0 : tenant_id: TenantId,
124 0 : timeline_id: TimelineId,
125 0 : pg_port: Option<u16>,
126 0 : http_port: Option<u16>,
127 0 : pg_version: u32,
128 0 : mode: ComputeMode,
129 0 : skip_pg_catalog_updates: bool,
130 0 : ) -> Result<Arc<Endpoint>> {
131 0 : let pg_port = pg_port.unwrap_or_else(|| self.get_port());
132 0 : let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
133 0 : let ep = Arc::new(Endpoint {
134 0 : endpoint_id: endpoint_id.to_owned(),
135 0 : pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
136 0 : http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
137 0 : env: self.env.clone(),
138 0 : timeline_id,
139 0 : mode,
140 0 : tenant_id,
141 0 : pg_version,
142 0 : // We don't setup roles and databases in the spec locally, so we don't need to
143 0 : // do catalog updates. Catalog updates also include check availability
144 0 : // data creation. Yet, we have tests that check that size and db dump
145 0 : // before and after start are the same. So, skip catalog updates,
146 0 : // with this we basically test a case of waking up an idle compute, where
147 0 : // we also skip catalog updates in the cloud.
148 0 : skip_pg_catalog_updates,
149 0 : features: vec![],
150 0 : });
151 0 :
152 0 : ep.create_endpoint_dir()?;
153 : std::fs::write(
154 0 : ep.endpoint_path().join("endpoint.json"),
155 0 : serde_json::to_string_pretty(&EndpointConf {
156 0 : endpoint_id: endpoint_id.to_string(),
157 0 : tenant_id,
158 0 : timeline_id,
159 0 : mode,
160 0 : http_port,
161 0 : pg_port,
162 0 : pg_version,
163 0 : skip_pg_catalog_updates,
164 0 : features: vec![],
165 0 : })?,
166 0 : )?;
167 : std::fs::write(
168 0 : ep.endpoint_path().join("postgresql.conf"),
169 0 : ep.setup_pg_conf()?.to_string(),
170 0 : )?;
171 :
172 0 : self.endpoints
173 0 : .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
174 0 :
175 0 : Ok(ep)
176 0 : }
177 :
178 0 : pub fn check_conflicting_endpoints(
179 0 : &self,
180 0 : mode: ComputeMode,
181 0 : tenant_id: TenantId,
182 0 : timeline_id: TimelineId,
183 0 : ) -> Result<()> {
184 0 : if matches!(mode, ComputeMode::Primary) {
185 : // this check is not complete, as you could have a concurrent attempt at
186 : // creating another primary, both reading the state before checking it here,
187 : // but it's better than nothing.
188 0 : let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
189 0 : v.tenant_id == tenant_id
190 0 : && v.timeline_id == timeline_id
191 0 : && v.mode == mode
192 0 : && v.status() != EndpointStatus::Stopped
193 0 : });
194 :
195 0 : if let Some((key, _)) = duplicates.next() {
196 0 : bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
197 0 : }
198 0 : }
199 0 : Ok(())
200 0 : }
201 : }
202 :
203 : ///////////////////////////////////////////////////////////////////////////////
204 :
205 0 : #[derive(Debug)]
206 : pub struct Endpoint {
207 : /// used as the directory name
208 : endpoint_id: String,
209 : pub tenant_id: TenantId,
210 : pub timeline_id: TimelineId,
211 : pub mode: ComputeMode,
212 :
213 : // port and address of the Postgres server and `compute_ctl`'s HTTP API
214 : pub pg_address: SocketAddr,
215 : pub http_address: SocketAddr,
216 :
217 : // postgres major version in the format: 14, 15, etc.
218 : pg_version: u32,
219 :
220 : // These are not part of the endpoint as such, but the environment
221 : // the endpoint runs in.
222 : pub env: LocalEnv,
223 :
224 : // Optimizations
225 : skip_pg_catalog_updates: bool,
226 :
227 : // Feature flags
228 : features: Vec<ComputeFeature>,
229 : }
230 :
231 0 : #[derive(PartialEq, Eq)]
232 : pub enum EndpointStatus {
233 : Running,
234 : Stopped,
235 : Crashed,
236 : RunningNoPidfile,
237 : }
238 :
239 : impl std::fmt::Display for EndpointStatus {
240 0 : fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
241 0 : let s = match self {
242 0 : Self::Running => "running",
243 0 : Self::Stopped => "stopped",
244 0 : Self::Crashed => "crashed",
245 0 : Self::RunningNoPidfile => "running, no pidfile",
246 : };
247 0 : write!(writer, "{}", s)
248 0 : }
249 : }
250 :
251 : impl Endpoint {
252 0 : fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
253 0 : if !entry.file_type()?.is_dir() {
254 0 : anyhow::bail!(
255 0 : "Endpoint::from_dir_entry failed: '{}' is not a directory",
256 0 : entry.path().display()
257 0 : );
258 0 : }
259 0 :
260 0 : // parse data directory name
261 0 : let fname = entry.file_name();
262 0 : let endpoint_id = fname.to_str().unwrap().to_string();
263 :
264 : // Read the endpoint.json file
265 0 : let conf: EndpointConf =
266 0 : serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
267 :
268 0 : Ok(Endpoint {
269 0 : pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
270 0 : http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
271 0 : endpoint_id,
272 0 : env: env.clone(),
273 0 : timeline_id: conf.timeline_id,
274 0 : mode: conf.mode,
275 0 : tenant_id: conf.tenant_id,
276 0 : pg_version: conf.pg_version,
277 0 : skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
278 0 : features: conf.features,
279 0 : })
280 0 : }
281 :
282 0 : fn create_endpoint_dir(&self) -> Result<()> {
283 0 : std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
284 0 : format!(
285 0 : "could not create endpoint directory {}",
286 0 : self.endpoint_path().display()
287 0 : )
288 0 : })
289 0 : }
290 :
291 : // Generate postgresql.conf with default configuration
292 0 : fn setup_pg_conf(&self) -> Result<PostgresConf> {
293 0 : let mut conf = PostgresConf::new();
294 0 : conf.append("max_wal_senders", "10");
295 0 : conf.append("wal_log_hints", "off");
296 0 : conf.append("max_replication_slots", "10");
297 0 : conf.append("hot_standby", "on");
298 0 : conf.append("shared_buffers", "1MB");
299 0 : conf.append("fsync", "off");
300 0 : conf.append("max_connections", "100");
301 0 : conf.append("wal_level", "logical");
302 0 : // wal_sender_timeout is the maximum time to wait for WAL replication.
303 0 : // It also defines how often the walreciever will send a feedback message to the wal sender.
304 0 : conf.append("wal_sender_timeout", "5s");
305 0 : conf.append("listen_addresses", &self.pg_address.ip().to_string());
306 0 : conf.append("port", &self.pg_address.port().to_string());
307 0 : conf.append("wal_keep_size", "0");
308 0 : // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
309 0 : conf.append("restart_after_crash", "off");
310 0 :
311 0 : // Load the 'neon' extension
312 0 : conf.append("shared_preload_libraries", "neon");
313 0 :
314 0 : conf.append_line("");
315 0 : // Replication-related configurations, such as WAL sending
316 0 : match &self.mode {
317 : ComputeMode::Primary => {
318 : // Configure backpressure
319 : // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
320 : // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
321 : // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
322 : // Actually latency should be much smaller (better if < 1sec). But we assume that recently
323 : // updates pages are not requested from pageserver.
324 : // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
325 : // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
326 : // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
327 : // recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
328 : // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
329 : // To be able to restore database in case of pageserver node crash, safekeeper should not
330 : // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
331 : // (if they are not able to upload WAL to S3).
332 0 : conf.append("max_replication_write_lag", "15MB");
333 0 : conf.append("max_replication_flush_lag", "10GB");
334 0 :
335 0 : if !self.env.safekeepers.is_empty() {
336 0 : // Configure Postgres to connect to the safekeepers
337 0 : conf.append("synchronous_standby_names", "walproposer");
338 0 :
339 0 : let safekeepers = self
340 0 : .env
341 0 : .safekeepers
342 0 : .iter()
343 0 : .map(|sk| format!("localhost:{}", sk.get_compute_port()))
344 0 : .collect::<Vec<String>>()
345 0 : .join(",");
346 0 : conf.append("neon.safekeepers", &safekeepers);
347 0 : } else {
348 0 : // We only use setup without safekeepers for tests,
349 0 : // and don't care about data durability on pageserver,
350 0 : // so set more relaxed synchronous_commit.
351 0 : conf.append("synchronous_commit", "remote_write");
352 0 :
353 0 : // Configure the node to stream WAL directly to the pageserver
354 0 : // This isn't really a supported configuration, but can be useful for
355 0 : // testing.
356 0 : conf.append("synchronous_standby_names", "pageserver");
357 0 : }
358 : }
359 0 : ComputeMode::Static(lsn) => {
360 0 : conf.append("recovery_target_lsn", &lsn.to_string());
361 0 : }
362 : ComputeMode::Replica => {
363 0 : assert!(!self.env.safekeepers.is_empty());
364 :
365 : // TODO: use future host field from safekeeper spec
366 : // Pass the list of safekeepers to the replica so that it can connect to any of them,
367 : // whichever is available.
368 0 : let sk_ports = self
369 0 : .env
370 0 : .safekeepers
371 0 : .iter()
372 0 : .map(|x| x.get_compute_port().to_string())
373 0 : .collect::<Vec<_>>()
374 0 : .join(",");
375 0 : let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
376 0 :
377 0 : let connstr = format!(
378 0 : "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
379 0 : sk_hosts,
380 0 : sk_ports,
381 0 : &self.timeline_id.to_string(),
382 0 : &self.tenant_id.to_string(),
383 0 : );
384 0 :
385 0 : let slot_name = format!("repl_{}_", self.timeline_id);
386 0 : conf.append("primary_conninfo", connstr.as_str());
387 0 : conf.append("primary_slot_name", slot_name.as_str());
388 0 : conf.append("hot_standby", "on");
389 0 : // prefetching of blocks referenced in WAL doesn't make sense for us
390 0 : // Neon hot standby ignores pages that are not in the shared_buffers
391 0 : if self.pg_version >= 15 {
392 0 : conf.append("recovery_prefetch", "off");
393 0 : }
394 : }
395 : }
396 :
397 0 : Ok(conf)
398 0 : }
399 :
400 0 : pub fn endpoint_path(&self) -> PathBuf {
401 0 : self.env.endpoints_path().join(&self.endpoint_id)
402 0 : }
403 :
404 0 : pub fn pgdata(&self) -> PathBuf {
405 0 : self.endpoint_path().join("pgdata")
406 0 : }
407 :
408 0 : pub fn status(&self) -> EndpointStatus {
409 0 : let timeout = Duration::from_millis(300);
410 0 : let has_pidfile = self.pgdata().join("postmaster.pid").exists();
411 0 : let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
412 0 :
413 0 : match (has_pidfile, can_connect) {
414 0 : (true, true) => EndpointStatus::Running,
415 0 : (false, false) => EndpointStatus::Stopped,
416 0 : (true, false) => EndpointStatus::Crashed,
417 0 : (false, true) => EndpointStatus::RunningNoPidfile,
418 : }
419 0 : }
420 :
421 0 : fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
422 0 : let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
423 0 : let mut cmd = Command::new(&pg_ctl_path);
424 0 : cmd.args(
425 0 : [
426 0 : &[
427 0 : "-D",
428 0 : self.pgdata().to_str().unwrap(),
429 0 : "-w", //wait till pg_ctl actually does what was asked
430 0 : ],
431 0 : args,
432 0 : ]
433 0 : .concat(),
434 0 : )
435 0 : .env_clear()
436 0 : .env(
437 0 : "LD_LIBRARY_PATH",
438 0 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
439 0 : )
440 0 : .env(
441 0 : "DYLD_LIBRARY_PATH",
442 0 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
443 : );
444 :
445 : // Pass authentication token used for the connections to pageserver and safekeepers
446 0 : if let Some(token) = auth_token {
447 0 : cmd.env("NEON_AUTH_TOKEN", token);
448 0 : }
449 :
450 0 : let pg_ctl = cmd
451 0 : .output()
452 0 : .context(format!("{} failed", pg_ctl_path.display()))?;
453 0 : if !pg_ctl.status.success() {
454 0 : anyhow::bail!(
455 0 : "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
456 0 : pg_ctl.status,
457 0 : String::from_utf8_lossy(&pg_ctl.stdout),
458 0 : String::from_utf8_lossy(&pg_ctl.stderr),
459 0 : );
460 0 : }
461 0 :
462 0 : Ok(())
463 0 : }
464 :
465 0 : fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
466 0 : // TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
467 0 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
468 0 : let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
469 0 : let pid = nix::unistd::Pid::from_raw(pid as i32);
470 0 : if send_sigterm {
471 0 : kill(pid, Signal::SIGTERM).ok();
472 0 : }
473 0 : crate::background_process::wait_until_stopped("compute_ctl", pid)?;
474 0 : Ok(())
475 0 : }
476 :
477 0 : fn read_postgresql_conf(&self) -> Result<String> {
478 0 : // Slurp the endpoints/<endpoint id>/postgresql.conf file into
479 0 : // memory. We will include it in the spec file that we pass to
480 0 : // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
481 0 : // in the data directory.
482 0 : let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
483 0 : match std::fs::read(&postgresql_conf_path) {
484 0 : Ok(content) => Ok(String::from_utf8(content)?),
485 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
486 0 : Err(e) => Err(anyhow::Error::new(e).context(format!(
487 0 : "failed to read config file in {}",
488 0 : postgresql_conf_path.to_str().unwrap()
489 0 : ))),
490 : }
491 0 : }
492 :
493 0 : fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
494 0 : pageservers
495 0 : .iter()
496 0 : .map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
497 0 : .collect::<Vec<_>>()
498 0 : .join(",")
499 0 : }
500 :
501 0 : pub async fn start(
502 0 : &self,
503 0 : auth_token: &Option<String>,
504 0 : safekeepers: Vec<NodeId>,
505 0 : pageservers: Vec<(Host, u16)>,
506 0 : remote_ext_config: Option<&String>,
507 0 : shard_stripe_size: usize,
508 0 : create_test_user: bool,
509 0 : ) -> Result<()> {
510 0 : if self.status() == EndpointStatus::Running {
511 0 : anyhow::bail!("The endpoint is already running");
512 0 : }
513 :
514 0 : let postgresql_conf = self.read_postgresql_conf()?;
515 :
516 : // We always start the compute node from scratch, so if the Postgres
517 : // data dir exists from a previous launch, remove it first.
518 0 : if self.pgdata().exists() {
519 0 : std::fs::remove_dir_all(self.pgdata())?;
520 0 : }
521 :
522 0 : let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
523 0 : assert!(!pageserver_connstring.is_empty());
524 :
525 0 : let mut safekeeper_connstrings = Vec::new();
526 0 : if self.mode == ComputeMode::Primary {
527 0 : for sk_id in safekeepers {
528 0 : let sk = self
529 0 : .env
530 0 : .safekeepers
531 0 : .iter()
532 0 : .find(|node| node.id == sk_id)
533 0 : .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
534 0 : safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
535 : }
536 0 : }
537 :
538 : // check for file remote_extensions_spec.json
539 : // if it is present, read it and pass to compute_ctl
540 0 : let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
541 0 : let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
542 : let remote_extensions: Option<RemoteExtSpec>;
543 :
544 0 : if let Ok(spec_file) = remote_extensions_spec {
545 0 : remote_extensions = serde_json::from_reader(spec_file).ok();
546 0 : } else {
547 0 : remote_extensions = None;
548 0 : };
549 :
550 : // Create spec file
551 0 : let spec = ComputeSpec {
552 0 : skip_pg_catalog_updates: self.skip_pg_catalog_updates,
553 0 : format_version: 1.0,
554 0 : operation_uuid: None,
555 0 : features: self.features.clone(),
556 0 : cluster: Cluster {
557 0 : cluster_id: None, // project ID: not used
558 0 : name: None, // project name: not used
559 0 : state: None,
560 0 : roles: if create_test_user {
561 0 : vec![Role {
562 0 : name: PgIdent::from_str("test").unwrap(),
563 0 : encrypted_password: None,
564 0 : options: None,
565 0 : }]
566 : } else {
567 0 : Vec::new()
568 : },
569 0 : databases: if create_test_user {
570 0 : vec![Database {
571 0 : name: PgIdent::from_str("neondb").unwrap(),
572 0 : owner: PgIdent::from_str("test").unwrap(),
573 0 : options: None,
574 0 : restrict_conn: false,
575 0 : invalid: false,
576 0 : }]
577 : } else {
578 0 : Vec::new()
579 : },
580 0 : settings: None,
581 0 : postgresql_conf: Some(postgresql_conf),
582 0 : },
583 0 : delta_operations: None,
584 0 : tenant_id: Some(self.tenant_id),
585 0 : timeline_id: Some(self.timeline_id),
586 0 : mode: self.mode,
587 0 : pageserver_connstring: Some(pageserver_connstring),
588 0 : safekeeper_connstrings,
589 0 : storage_auth_token: auth_token.clone(),
590 0 : remote_extensions,
591 0 : pgbouncer_settings: None,
592 0 : shard_stripe_size: Some(shard_stripe_size),
593 0 : primary_is_running: None,
594 0 : };
595 0 : let spec_path = self.endpoint_path().join("spec.json");
596 0 : std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
597 :
598 : // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
599 0 : let logfile = std::fs::OpenOptions::new()
600 0 : .create(true)
601 0 : .append(true)
602 0 : .open(self.endpoint_path().join("compute.log"))?;
603 :
604 : // Launch compute_ctl
605 0 : let conn_str = self.connstr("cloud_admin", "postgres");
606 0 : println!("Starting postgres node at '{}'", conn_str);
607 0 : if create_test_user {
608 0 : let conn_str = self.connstr("user", "neondb");
609 0 : println!("Also at '{}'", conn_str);
610 0 : }
611 0 : let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
612 0 : cmd.args(["--http-port", &self.http_address.port().to_string()])
613 0 : .args(["--pgdata", self.pgdata().to_str().unwrap()])
614 0 : .args(["--connstr", &conn_str])
615 0 : .args([
616 0 : "--spec-path",
617 0 : self.endpoint_path().join("spec.json").to_str().unwrap(),
618 0 : ])
619 0 : .args([
620 0 : "--pgbin",
621 0 : self.env
622 0 : .pg_bin_dir(self.pg_version)?
623 0 : .join("postgres")
624 0 : .to_str()
625 0 : .unwrap(),
626 0 : ])
627 0 : .stdin(std::process::Stdio::null())
628 0 : .stderr(logfile.try_clone()?)
629 0 : .stdout(logfile);
630 :
631 0 : if let Some(remote_ext_config) = remote_ext_config {
632 0 : cmd.args(["--remote-ext-config", remote_ext_config]);
633 0 : }
634 :
635 0 : let child = cmd.spawn()?;
636 : // set up a scopeguard to kill & wait for the child in case we panic or bail below
637 0 : let child = scopeguard::guard(child, |mut child| {
638 0 : println!("SIGKILL & wait the started process");
639 0 : (|| {
640 0 : // TODO: use another signal that can be caught by the child so it can clean up any children it spawned
641 0 : child.kill().context("SIGKILL child")?;
642 0 : child.wait().context("wait() for child process")?;
643 0 : anyhow::Ok(())
644 0 : })()
645 0 : .with_context(|| format!("scopeguard kill&wait child {child:?}"))
646 0 : .unwrap();
647 0 : });
648 0 :
649 0 : // Write down the pid so we can wait for it when we want to stop
650 0 : // TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
651 0 : let pid = child.id();
652 0 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
653 0 : std::fs::write(pidfile_path, pid.to_string())?;
654 :
655 : // Wait for it to start
656 0 : let mut attempt = 0;
657 : const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
658 : const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
659 : loop {
660 0 : attempt += 1;
661 0 : match self.get_status().await {
662 0 : Ok(state) => {
663 0 : match state.status {
664 : ComputeStatus::Init => {
665 0 : if attempt == MAX_ATTEMPTS {
666 0 : bail!("compute startup timed out; still in Init state");
667 0 : }
668 : // keep retrying
669 : }
670 : ComputeStatus::Running => {
671 : // All good!
672 0 : break;
673 : }
674 : ComputeStatus::Failed => {
675 0 : bail!(
676 0 : "compute startup failed: {}",
677 0 : state
678 0 : .error
679 0 : .as_deref()
680 0 : .unwrap_or("<no error from compute_ctl>")
681 0 : );
682 : }
683 : ComputeStatus::Empty
684 : | ComputeStatus::ConfigurationPending
685 : | ComputeStatus::Configuration
686 : | ComputeStatus::TerminationPending
687 : | ComputeStatus::Terminated => {
688 0 : bail!("unexpected compute status: {:?}", state.status)
689 : }
690 : }
691 : }
692 0 : Err(e) => {
693 0 : if attempt == MAX_ATTEMPTS {
694 0 : return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
695 0 : }
696 : }
697 : }
698 0 : std::thread::sleep(ATTEMPT_INTERVAL);
699 : }
700 :
701 : // disarm the scopeguard, let the child outlive this function (and neon_local invoction)
702 0 : drop(scopeguard::ScopeGuard::into_inner(child));
703 0 :
704 0 : Ok(())
705 0 : }
706 :
707 : // Call the /status HTTP API
708 0 : pub async fn get_status(&self) -> Result<ComputeState> {
709 0 : let client = reqwest::Client::new();
710 :
711 0 : let response = client
712 0 : .request(
713 0 : reqwest::Method::GET,
714 0 : format!(
715 0 : "http://{}:{}/status",
716 0 : self.http_address.ip(),
717 0 : self.http_address.port()
718 0 : ),
719 0 : )
720 0 : .send()
721 0 : .await?;
722 :
723 : // Interpret the response
724 0 : let status = response.status();
725 0 : if !(status.is_client_error() || status.is_server_error()) {
726 0 : Ok(response.json().await?)
727 : } else {
728 : // reqwest does not export its error construction utility functions, so let's craft the message ourselves
729 0 : let url = response.url().to_owned();
730 0 : let msg = match response.text().await {
731 0 : Ok(err_body) => format!("Error: {}", err_body),
732 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
733 : };
734 0 : Err(anyhow::anyhow!(msg))
735 : }
736 0 : }
737 :
738 0 : pub async fn reconfigure(&self, mut pageservers: Vec<(Host, u16)>) -> Result<()> {
739 0 : let mut spec: ComputeSpec = {
740 0 : let spec_path = self.endpoint_path().join("spec.json");
741 0 : let file = std::fs::File::open(spec_path)?;
742 0 : serde_json::from_reader(file)?
743 : };
744 :
745 0 : let postgresql_conf = self.read_postgresql_conf()?;
746 0 : spec.cluster.postgresql_conf = Some(postgresql_conf);
747 0 :
748 0 : // If we weren't given explicit pageservers, query the attachment service
749 0 : if pageservers.is_empty() {
750 0 : let attachment_service = AttachmentService::from_env(&self.env);
751 0 : let locate_result = attachment_service.tenant_locate(self.tenant_id).await?;
752 0 : pageservers = locate_result
753 0 : .shards
754 0 : .into_iter()
755 0 : .map(|shard| {
756 0 : (
757 0 : Host::parse(&shard.listen_pg_addr)
758 0 : .expect("Attachment service reported bad hostname"),
759 0 : shard.listen_pg_port,
760 0 : )
761 0 : })
762 0 : .collect::<Vec<_>>();
763 0 : }
764 :
765 0 : let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
766 0 : assert!(!pageserver_connstr.is_empty());
767 0 : spec.pageserver_connstring = Some(pageserver_connstr);
768 0 :
769 0 : let client = reqwest::Client::new();
770 0 : let response = client
771 0 : .post(format!(
772 0 : "http://{}:{}/configure",
773 0 : self.http_address.ip(),
774 0 : self.http_address.port()
775 0 : ))
776 0 : .body(format!(
777 0 : "{{\"spec\":{}}}",
778 0 : serde_json::to_string_pretty(&spec)?
779 : ))
780 0 : .send()
781 0 : .await?;
782 :
783 0 : let status = response.status();
784 0 : if !(status.is_client_error() || status.is_server_error()) {
785 0 : Ok(())
786 : } else {
787 0 : let url = response.url().to_owned();
788 0 : let msg = match response.text().await {
789 0 : Ok(err_body) => format!("Error: {}", err_body),
790 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
791 : };
792 0 : Err(anyhow::anyhow!(msg))
793 : }
794 0 : }
795 :
796 0 : pub fn stop(&self, mode: &str, destroy: bool) -> Result<()> {
797 0 : self.pg_ctl(&["-m", mode, "stop"], &None)?;
798 :
799 : // Also wait for the compute_ctl process to die. It might have some
800 : // cleanup work to do after postgres stops, like syncing safekeepers,
801 : // etc.
802 : //
803 : // If destroying, send it SIGTERM before waiting. Sometimes we do *not*
804 : // want this cleanup: tests intentionally do stop when majority of
805 : // safekeepers is down, so sync-safekeepers would hang otherwise. This
806 : // could be a separate flag though.
807 0 : self.wait_for_compute_ctl_to_exit(destroy)?;
808 0 : if destroy {
809 0 : println!(
810 0 : "Destroying postgres data directory '{}'",
811 0 : self.pgdata().to_str().unwrap()
812 0 : );
813 0 : std::fs::remove_dir_all(self.endpoint_path())?;
814 0 : }
815 0 : Ok(())
816 0 : }
817 :
818 0 : pub fn connstr(&self, user: &str, db_name: &str) -> String {
819 0 : format!(
820 0 : "postgresql://{}@{}:{}/{}",
821 0 : user,
822 0 : self.pg_address.ip(),
823 0 : self.pg_address.port(),
824 0 : db_name
825 0 : )
826 0 : }
827 : }
|