Line data Source code
1 : //! Code to manage compute endpoints
2 : //!
3 : //! In the local test environment, the data for each endpoint is stored in
4 : //!
5 : //! ```text
6 : //! .neon/endpoints/<endpoint id>
7 : //! ```
8 : //!
9 : //! Some basic information about the endpoint, like the tenant and timeline IDs,
10 : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
11 : //! when the endpoint is created, and doesn't change afterwards.
12 : //!
13 : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
14 : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
15 : //! the basebackup from the pageserver to initialize the data directory, and
16 : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
17 : //! until it exits.
18 : //!
19 : //! When an endpoint is created, a `postgresql.conf` file is also created in
20 : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
21 : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
22 : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
23 : //! copy of it in the data directory.
24 : //!
25 : //! Directory contents:
26 : //!
27 : //! ```text
28 : //! .neon/endpoints/main/
29 : //! compute.log - log output of `compute_ctl` and `postgres`
30 : //! endpoint.json - serialized `EndpointConf` struct
31 : //! postgresql.conf - postgresql settings
32 : //! spec.json - passed to `compute_ctl`
33 : //! pgdata/
34 : //! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
35 : //! zenith.signal
36 : //! <other PostgreSQL files>
37 : //! ```
38 : //!
39 : use std::collections::BTreeMap;
40 : use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
41 : use std::path::PathBuf;
42 : use std::process::Command;
43 : use std::str::FromStr;
44 : use std::sync::Arc;
45 : use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
46 :
47 : use anyhow::{Context, Result, anyhow, bail};
48 : use compute_api::requests::ConfigurationRequest;
49 : use compute_api::responses::{ComputeCtlConfig, ComputeStatus, ComputeStatusResponse};
50 : use compute_api::spec::{
51 : Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
52 : RemoteExtSpec, Role,
53 : };
54 : use nix::sys::signal::{Signal, kill};
55 : use pageserver_api::shard::ShardStripeSize;
56 : use reqwest::header::CONTENT_TYPE;
57 : use safekeeper_api::membership::SafekeeperGeneration;
58 : use serde::{Deserialize, Serialize};
59 : use tracing::debug;
60 : use url::Host;
61 : use utils::id::{NodeId, TenantId, TimelineId};
62 :
63 : use crate::local_env::LocalEnv;
64 : use crate::postgresql_conf::PostgresConf;
65 : use crate::storage_controller::StorageController;
66 :
67 : // contents of a endpoint.json file
68 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
69 : pub struct EndpointConf {
70 : endpoint_id: String,
71 : tenant_id: TenantId,
72 : timeline_id: TimelineId,
73 : mode: ComputeMode,
74 : pg_port: u16,
75 : external_http_port: u16,
76 : internal_http_port: u16,
77 : pg_version: u32,
78 : skip_pg_catalog_updates: bool,
79 : reconfigure_concurrency: usize,
80 : drop_subscriptions_before_start: bool,
81 : features: Vec<ComputeFeature>,
82 : cluster: Option<Cluster>,
83 : }
84 :
85 : //
86 : // ComputeControlPlane
87 : //
88 : pub struct ComputeControlPlane {
89 : base_port: u16,
90 :
91 : // endpoint ID is the key
92 : pub endpoints: BTreeMap<String, Arc<Endpoint>>,
93 :
94 : env: LocalEnv,
95 : }
96 :
97 : impl ComputeControlPlane {
98 : // Load current endpoints from the endpoints/ subdirectories
99 0 : pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
100 0 : let mut endpoints = BTreeMap::default();
101 0 : for endpoint_dir in std::fs::read_dir(env.endpoints_path())
102 0 : .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
103 : {
104 0 : let ep_res = Endpoint::from_dir_entry(endpoint_dir?, &env);
105 0 : let ep = match ep_res {
106 0 : Ok(ep) => ep,
107 0 : Err(e) => match e.downcast::<std::io::Error>() {
108 0 : Ok(e) => {
109 0 : // A parallel task could delete an endpoint while we have just scanned the directory
110 0 : if e.kind() == std::io::ErrorKind::NotFound {
111 0 : continue;
112 : } else {
113 0 : Err(e)?
114 : }
115 : }
116 0 : Err(e) => Err(e)?,
117 : },
118 : };
119 0 : endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
120 : }
121 :
122 0 : Ok(ComputeControlPlane {
123 0 : base_port: 55431,
124 0 : endpoints,
125 0 : env,
126 0 : })
127 0 : }
128 :
129 0 : fn get_port(&mut self) -> u16 {
130 0 : 1 + self
131 0 : .endpoints
132 0 : .values()
133 0 : .map(|ep| std::cmp::max(ep.pg_address.port(), ep.external_http_address.port()))
134 0 : .max()
135 0 : .unwrap_or(self.base_port)
136 0 : }
137 :
138 : #[allow(clippy::too_many_arguments)]
139 0 : pub fn new_endpoint(
140 0 : &mut self,
141 0 : endpoint_id: &str,
142 0 : tenant_id: TenantId,
143 0 : timeline_id: TimelineId,
144 0 : pg_port: Option<u16>,
145 0 : external_http_port: Option<u16>,
146 0 : internal_http_port: Option<u16>,
147 0 : pg_version: u32,
148 0 : mode: ComputeMode,
149 0 : skip_pg_catalog_updates: bool,
150 0 : drop_subscriptions_before_start: bool,
151 0 : ) -> Result<Arc<Endpoint>> {
152 0 : let pg_port = pg_port.unwrap_or_else(|| self.get_port());
153 0 : let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
154 0 : let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1);
155 0 : let ep = Arc::new(Endpoint {
156 0 : endpoint_id: endpoint_id.to_owned(),
157 0 : pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), pg_port),
158 0 : external_http_address: SocketAddr::new(
159 0 : IpAddr::from(Ipv4Addr::UNSPECIFIED),
160 0 : external_http_port,
161 0 : ),
162 0 : internal_http_address: SocketAddr::new(
163 0 : IpAddr::from(Ipv4Addr::LOCALHOST),
164 0 : internal_http_port,
165 0 : ),
166 0 : env: self.env.clone(),
167 0 : timeline_id,
168 0 : mode,
169 0 : tenant_id,
170 0 : pg_version,
171 0 : // We don't setup roles and databases in the spec locally, so we don't need to
172 0 : // do catalog updates. Catalog updates also include check availability
173 0 : // data creation. Yet, we have tests that check that size and db dump
174 0 : // before and after start are the same. So, skip catalog updates,
175 0 : // with this we basically test a case of waking up an idle compute, where
176 0 : // we also skip catalog updates in the cloud.
177 0 : skip_pg_catalog_updates,
178 0 : drop_subscriptions_before_start,
179 0 : reconfigure_concurrency: 1,
180 0 : features: vec![],
181 0 : cluster: None,
182 0 : });
183 0 :
184 0 : ep.create_endpoint_dir()?;
185 : std::fs::write(
186 0 : ep.endpoint_path().join("endpoint.json"),
187 0 : serde_json::to_string_pretty(&EndpointConf {
188 0 : endpoint_id: endpoint_id.to_string(),
189 0 : tenant_id,
190 0 : timeline_id,
191 0 : mode,
192 0 : external_http_port,
193 0 : internal_http_port,
194 0 : pg_port,
195 0 : pg_version,
196 0 : skip_pg_catalog_updates,
197 0 : drop_subscriptions_before_start,
198 0 : reconfigure_concurrency: 1,
199 0 : features: vec![],
200 0 : cluster: None,
201 0 : })?,
202 0 : )?;
203 : std::fs::write(
204 0 : ep.endpoint_path().join("postgresql.conf"),
205 0 : ep.setup_pg_conf()?.to_string(),
206 0 : )?;
207 :
208 0 : self.endpoints
209 0 : .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
210 0 :
211 0 : Ok(ep)
212 0 : }
213 :
214 0 : pub fn check_conflicting_endpoints(
215 0 : &self,
216 0 : mode: ComputeMode,
217 0 : tenant_id: TenantId,
218 0 : timeline_id: TimelineId,
219 0 : ) -> Result<()> {
220 0 : if matches!(mode, ComputeMode::Primary) {
221 : // this check is not complete, as you could have a concurrent attempt at
222 : // creating another primary, both reading the state before checking it here,
223 : // but it's better than nothing.
224 0 : let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
225 0 : v.tenant_id == tenant_id
226 0 : && v.timeline_id == timeline_id
227 0 : && v.mode == mode
228 0 : && v.status() != EndpointStatus::Stopped
229 0 : });
230 :
231 0 : if let Some((key, _)) = duplicates.next() {
232 0 : bail!(
233 0 : "attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported."
234 0 : );
235 0 : }
236 0 : }
237 0 : Ok(())
238 0 : }
239 : }
240 :
241 : ///////////////////////////////////////////////////////////////////////////////
242 :
243 : #[derive(Debug)]
244 : pub struct Endpoint {
245 : /// used as the directory name
246 : endpoint_id: String,
247 : pub tenant_id: TenantId,
248 : pub timeline_id: TimelineId,
249 : pub mode: ComputeMode,
250 :
251 : // port and address of the Postgres server and `compute_ctl`'s HTTP APIs
252 : pub pg_address: SocketAddr,
253 : pub external_http_address: SocketAddr,
254 : pub internal_http_address: SocketAddr,
255 :
256 : // postgres major version in the format: 14, 15, etc.
257 : pg_version: u32,
258 :
259 : // These are not part of the endpoint as such, but the environment
260 : // the endpoint runs in.
261 : pub env: LocalEnv,
262 :
263 : // Optimizations
264 : skip_pg_catalog_updates: bool,
265 :
266 : drop_subscriptions_before_start: bool,
267 : reconfigure_concurrency: usize,
268 : // Feature flags
269 : features: Vec<ComputeFeature>,
270 : // Cluster settings
271 : cluster: Option<Cluster>,
272 : }
273 :
274 : #[derive(PartialEq, Eq)]
275 : pub enum EndpointStatus {
276 : Running,
277 : Stopped,
278 : Crashed,
279 : RunningNoPidfile,
280 : }
281 :
282 : impl std::fmt::Display for EndpointStatus {
283 0 : fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
284 0 : let s = match self {
285 0 : Self::Running => "running",
286 0 : Self::Stopped => "stopped",
287 0 : Self::Crashed => "crashed",
288 0 : Self::RunningNoPidfile => "running, no pidfile",
289 : };
290 0 : write!(writer, "{}", s)
291 0 : }
292 : }
293 :
294 : impl Endpoint {
295 0 : fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
296 0 : if !entry.file_type()?.is_dir() {
297 0 : anyhow::bail!(
298 0 : "Endpoint::from_dir_entry failed: '{}' is not a directory",
299 0 : entry.path().display()
300 0 : );
301 0 : }
302 0 :
303 0 : // parse data directory name
304 0 : let fname = entry.file_name();
305 0 : let endpoint_id = fname.to_str().unwrap().to_string();
306 :
307 : // Read the endpoint.json file
308 0 : let conf: EndpointConf =
309 0 : serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
310 :
311 0 : debug!("serialized endpoint conf: {:?}", conf);
312 :
313 0 : Ok(Endpoint {
314 0 : pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
315 0 : external_http_address: SocketAddr::new(
316 0 : IpAddr::from(Ipv4Addr::UNSPECIFIED),
317 0 : conf.external_http_port,
318 0 : ),
319 0 : internal_http_address: SocketAddr::new(
320 0 : IpAddr::from(Ipv4Addr::LOCALHOST),
321 0 : conf.internal_http_port,
322 0 : ),
323 0 : endpoint_id,
324 0 : env: env.clone(),
325 0 : timeline_id: conf.timeline_id,
326 0 : mode: conf.mode,
327 0 : tenant_id: conf.tenant_id,
328 0 : pg_version: conf.pg_version,
329 0 : skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
330 0 : reconfigure_concurrency: conf.reconfigure_concurrency,
331 0 : drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
332 0 : features: conf.features,
333 0 : cluster: conf.cluster,
334 0 : })
335 0 : }
336 :
337 0 : fn create_endpoint_dir(&self) -> Result<()> {
338 0 : std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
339 0 : format!(
340 0 : "could not create endpoint directory {}",
341 0 : self.endpoint_path().display()
342 0 : )
343 0 : })
344 0 : }
345 :
346 : // Generate postgresql.conf with default configuration
347 0 : fn setup_pg_conf(&self) -> Result<PostgresConf> {
348 0 : let mut conf = PostgresConf::new();
349 0 : conf.append("max_wal_senders", "10");
350 0 : conf.append("wal_log_hints", "off");
351 0 : conf.append("max_replication_slots", "10");
352 0 : conf.append("hot_standby", "on");
353 0 : // Set to 1MB to both exercise getPage requests/LFC, and still have enough room for
354 0 : // Postgres to operate. Everything smaller might be not enough for Postgres under load,
355 0 : // and can cause errors like 'no unpinned buffers available', see
356 0 : // <https://github.com/neondatabase/neon/issues/9956>
357 0 : conf.append("shared_buffers", "1MB");
358 0 : // Postgres defaults to effective_io_concurrency=1, which does not exercise the pageserver's
359 0 : // batching logic. Set this to 2 so that we exercise the code a bit without letting
360 0 : // individual tests do a lot of concurrent work on underpowered test machines
361 0 : conf.append("effective_io_concurrency", "2");
362 0 : conf.append("fsync", "off");
363 0 : conf.append("max_connections", "100");
364 0 : conf.append("wal_level", "logical");
365 0 : // wal_sender_timeout is the maximum time to wait for WAL replication.
366 0 : // It also defines how often the walreciever will send a feedback message to the wal sender.
367 0 : conf.append("wal_sender_timeout", "5s");
368 0 : conf.append("listen_addresses", &self.pg_address.ip().to_string());
369 0 : conf.append("port", &self.pg_address.port().to_string());
370 0 : conf.append("wal_keep_size", "0");
371 0 : // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
372 0 : conf.append("restart_after_crash", "off");
373 0 :
374 0 : // Load the 'neon' extension
375 0 : conf.append("shared_preload_libraries", "neon");
376 0 :
377 0 : conf.append_line("");
378 0 : // Replication-related configurations, such as WAL sending
379 0 : match &self.mode {
380 : ComputeMode::Primary => {
381 : // Configure backpressure
382 : // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
383 : // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
384 : // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
385 : // Actually latency should be much smaller (better if < 1sec). But we assume that recently
386 : // updates pages are not requested from pageserver.
387 : // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
388 : // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
389 : // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
390 : // recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
391 : // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
392 : // To be able to restore database in case of pageserver node crash, safekeeper should not
393 : // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
394 : // (if they are not able to upload WAL to S3).
395 0 : conf.append("max_replication_write_lag", "15MB");
396 0 : conf.append("max_replication_flush_lag", "10GB");
397 0 :
398 0 : if !self.env.safekeepers.is_empty() {
399 0 : // Configure Postgres to connect to the safekeepers
400 0 : conf.append("synchronous_standby_names", "walproposer");
401 0 :
402 0 : let safekeepers = self
403 0 : .env
404 0 : .safekeepers
405 0 : .iter()
406 0 : .map(|sk| format!("localhost:{}", sk.get_compute_port()))
407 0 : .collect::<Vec<String>>()
408 0 : .join(",");
409 0 : conf.append("neon.safekeepers", &safekeepers);
410 0 : } else {
411 0 : // We only use setup without safekeepers for tests,
412 0 : // and don't care about data durability on pageserver,
413 0 : // so set more relaxed synchronous_commit.
414 0 : conf.append("synchronous_commit", "remote_write");
415 0 :
416 0 : // Configure the node to stream WAL directly to the pageserver
417 0 : // This isn't really a supported configuration, but can be useful for
418 0 : // testing.
419 0 : conf.append("synchronous_standby_names", "pageserver");
420 0 : }
421 : }
422 0 : ComputeMode::Static(lsn) => {
423 0 : conf.append("recovery_target_lsn", &lsn.to_string());
424 0 : }
425 : ComputeMode::Replica => {
426 0 : assert!(!self.env.safekeepers.is_empty());
427 :
428 : // TODO: use future host field from safekeeper spec
429 : // Pass the list of safekeepers to the replica so that it can connect to any of them,
430 : // whichever is available.
431 0 : let sk_ports = self
432 0 : .env
433 0 : .safekeepers
434 0 : .iter()
435 0 : .map(|x| x.get_compute_port().to_string())
436 0 : .collect::<Vec<_>>()
437 0 : .join(",");
438 0 : let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
439 0 :
440 0 : let connstr = format!(
441 0 : "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
442 0 : sk_hosts,
443 0 : sk_ports,
444 0 : &self.timeline_id.to_string(),
445 0 : &self.tenant_id.to_string(),
446 0 : );
447 0 :
448 0 : let slot_name = format!("repl_{}_", self.timeline_id);
449 0 : conf.append("primary_conninfo", connstr.as_str());
450 0 : conf.append("primary_slot_name", slot_name.as_str());
451 0 : conf.append("hot_standby", "on");
452 0 : // prefetching of blocks referenced in WAL doesn't make sense for us
453 0 : // Neon hot standby ignores pages that are not in the shared_buffers
454 0 : if self.pg_version >= 15 {
455 0 : conf.append("recovery_prefetch", "off");
456 0 : }
457 : }
458 : }
459 :
460 0 : Ok(conf)
461 0 : }
462 :
463 0 : pub fn endpoint_path(&self) -> PathBuf {
464 0 : self.env.endpoints_path().join(&self.endpoint_id)
465 0 : }
466 :
467 0 : pub fn pgdata(&self) -> PathBuf {
468 0 : self.endpoint_path().join("pgdata")
469 0 : }
470 :
471 0 : pub fn status(&self) -> EndpointStatus {
472 0 : let timeout = Duration::from_millis(300);
473 0 : let has_pidfile = self.pgdata().join("postmaster.pid").exists();
474 0 : let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
475 0 :
476 0 : match (has_pidfile, can_connect) {
477 0 : (true, true) => EndpointStatus::Running,
478 0 : (false, false) => EndpointStatus::Stopped,
479 0 : (true, false) => EndpointStatus::Crashed,
480 0 : (false, true) => EndpointStatus::RunningNoPidfile,
481 : }
482 0 : }
483 :
484 0 : fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
485 0 : let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
486 0 : let mut cmd = Command::new(&pg_ctl_path);
487 0 : cmd.args(
488 0 : [
489 0 : &[
490 0 : "-D",
491 0 : self.pgdata().to_str().unwrap(),
492 0 : "-w", //wait till pg_ctl actually does what was asked
493 0 : ],
494 0 : args,
495 0 : ]
496 0 : .concat(),
497 0 : )
498 0 : .env_clear()
499 0 : .env(
500 0 : "LD_LIBRARY_PATH",
501 0 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
502 0 : )
503 0 : .env(
504 0 : "DYLD_LIBRARY_PATH",
505 0 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
506 : );
507 :
508 : // Pass authentication token used for the connections to pageserver and safekeepers
509 0 : if let Some(token) = auth_token {
510 0 : cmd.env("NEON_AUTH_TOKEN", token);
511 0 : }
512 :
513 0 : let pg_ctl = cmd
514 0 : .output()
515 0 : .context(format!("{} failed", pg_ctl_path.display()))?;
516 0 : if !pg_ctl.status.success() {
517 0 : anyhow::bail!(
518 0 : "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
519 0 : pg_ctl.status,
520 0 : String::from_utf8_lossy(&pg_ctl.stdout),
521 0 : String::from_utf8_lossy(&pg_ctl.stderr),
522 0 : );
523 0 : }
524 0 :
525 0 : Ok(())
526 0 : }
527 :
528 0 : fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
529 0 : // TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
530 0 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
531 0 : let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
532 0 : let pid = nix::unistd::Pid::from_raw(pid as i32);
533 0 : if send_sigterm {
534 0 : kill(pid, Signal::SIGTERM).ok();
535 0 : }
536 0 : crate::background_process::wait_until_stopped("compute_ctl", pid)?;
537 0 : Ok(())
538 0 : }
539 :
540 0 : fn read_postgresql_conf(&self) -> Result<String> {
541 0 : // Slurp the endpoints/<endpoint id>/postgresql.conf file into
542 0 : // memory. We will include it in the spec file that we pass to
543 0 : // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
544 0 : // in the data directory.
545 0 : let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
546 0 : match std::fs::read(&postgresql_conf_path) {
547 0 : Ok(content) => Ok(String::from_utf8(content)?),
548 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
549 0 : Err(e) => Err(anyhow::Error::new(e).context(format!(
550 0 : "failed to read config file in {}",
551 0 : postgresql_conf_path.to_str().unwrap()
552 0 : ))),
553 : }
554 0 : }
555 :
556 0 : fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
557 0 : pageservers
558 0 : .iter()
559 0 : .map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
560 0 : .collect::<Vec<_>>()
561 0 : .join(",")
562 0 : }
563 :
564 : /// Map safekeepers ids to the actual connection strings.
565 0 : fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
566 0 : let mut safekeeper_connstrings = Vec::new();
567 0 : if self.mode == ComputeMode::Primary {
568 0 : for sk_id in sk_ids {
569 0 : let sk = self
570 0 : .env
571 0 : .safekeepers
572 0 : .iter()
573 0 : .find(|node| node.id == sk_id)
574 0 : .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
575 0 : safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
576 : }
577 0 : }
578 0 : Ok(safekeeper_connstrings)
579 0 : }
580 :
581 : #[allow(clippy::too_many_arguments)]
582 0 : pub async fn start(
583 0 : &self,
584 0 : auth_token: &Option<String>,
585 0 : safekeepers_generation: Option<SafekeeperGeneration>,
586 0 : safekeepers: Vec<NodeId>,
587 0 : pageservers: Vec<(Host, u16)>,
588 0 : remote_ext_config: Option<&String>,
589 0 : shard_stripe_size: usize,
590 0 : create_test_user: bool,
591 0 : start_timeout: Duration,
592 0 : ) -> Result<()> {
593 0 : if self.status() == EndpointStatus::Running {
594 0 : anyhow::bail!("The endpoint is already running");
595 0 : }
596 :
597 0 : let postgresql_conf = self.read_postgresql_conf()?;
598 :
599 : // We always start the compute node from scratch, so if the Postgres
600 : // data dir exists from a previous launch, remove it first.
601 0 : if self.pgdata().exists() {
602 0 : std::fs::remove_dir_all(self.pgdata())?;
603 0 : }
604 :
605 0 : let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
606 0 : assert!(!pageserver_connstring.is_empty());
607 :
608 0 : let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
609 :
610 : // check for file remote_extensions_spec.json
611 : // if it is present, read it and pass to compute_ctl
612 0 : let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
613 0 : let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
614 : let remote_extensions: Option<RemoteExtSpec>;
615 :
616 0 : if let Ok(spec_file) = remote_extensions_spec {
617 0 : remote_extensions = serde_json::from_reader(spec_file).ok();
618 0 : } else {
619 0 : remote_extensions = None;
620 0 : };
621 :
622 : // Create spec file
623 0 : let mut spec = ComputeSpec {
624 0 : skip_pg_catalog_updates: self.skip_pg_catalog_updates,
625 0 : format_version: 1.0,
626 0 : operation_uuid: None,
627 0 : features: self.features.clone(),
628 0 : swap_size_bytes: None,
629 0 : disk_quota_bytes: None,
630 0 : disable_lfc_resizing: None,
631 0 : cluster: Cluster {
632 0 : cluster_id: None, // project ID: not used
633 0 : name: None, // project name: not used
634 0 : state: None,
635 0 : roles: if create_test_user {
636 0 : vec![Role {
637 0 : name: PgIdent::from_str("test").unwrap(),
638 0 : encrypted_password: None,
639 0 : options: None,
640 0 : }]
641 : } else {
642 0 : Vec::new()
643 : },
644 0 : databases: if create_test_user {
645 0 : vec![Database {
646 0 : name: PgIdent::from_str("neondb").unwrap(),
647 0 : owner: PgIdent::from_str("test").unwrap(),
648 0 : options: None,
649 0 : restrict_conn: false,
650 0 : invalid: false,
651 0 : }]
652 : } else {
653 0 : Vec::new()
654 : },
655 0 : settings: None,
656 0 : postgresql_conf: Some(postgresql_conf.clone()),
657 0 : },
658 0 : delta_operations: None,
659 0 : tenant_id: Some(self.tenant_id),
660 0 : timeline_id: Some(self.timeline_id),
661 0 : mode: self.mode,
662 0 : pageserver_connstring: Some(pageserver_connstring),
663 0 : safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
664 0 : safekeeper_connstrings,
665 0 : storage_auth_token: auth_token.clone(),
666 0 : remote_extensions,
667 0 : pgbouncer_settings: None,
668 0 : shard_stripe_size: Some(shard_stripe_size),
669 0 : local_proxy_config: None,
670 0 : reconfigure_concurrency: self.reconfigure_concurrency,
671 0 : drop_subscriptions_before_start: self.drop_subscriptions_before_start,
672 0 : audit_log_level: ComputeAudit::Disabled,
673 0 : };
674 0 :
675 0 : // this strange code is needed to support respec() in tests
676 0 : if self.cluster.is_some() {
677 0 : debug!("Cluster is already set in the endpoint spec, using it");
678 0 : spec.cluster = self.cluster.clone().unwrap();
679 0 :
680 0 : debug!("spec.cluster {:?}", spec.cluster);
681 :
682 : // fill missing fields again
683 0 : if create_test_user {
684 0 : spec.cluster.roles.push(Role {
685 0 : name: PgIdent::from_str("test").unwrap(),
686 0 : encrypted_password: None,
687 0 : options: None,
688 0 : });
689 0 : spec.cluster.databases.push(Database {
690 0 : name: PgIdent::from_str("neondb").unwrap(),
691 0 : owner: PgIdent::from_str("test").unwrap(),
692 0 : options: None,
693 0 : restrict_conn: false,
694 0 : invalid: false,
695 0 : });
696 0 : }
697 0 : spec.cluster.postgresql_conf = Some(postgresql_conf);
698 0 : }
699 :
700 0 : let spec_path = self.endpoint_path().join("spec.json");
701 0 : std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
702 :
703 : // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
704 0 : let logfile = std::fs::OpenOptions::new()
705 0 : .create(true)
706 0 : .append(true)
707 0 : .open(self.endpoint_path().join("compute.log"))?;
708 :
709 : // Launch compute_ctl
710 0 : let conn_str = self.connstr("cloud_admin", "postgres");
711 0 : println!("Starting postgres node at '{}'", conn_str);
712 0 : if create_test_user {
713 0 : let conn_str = self.connstr("test", "neondb");
714 0 : println!("Also at '{}'", conn_str);
715 0 : }
716 0 : let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
717 0 : cmd.args([
718 0 : "--external-http-port",
719 0 : &self.external_http_address.port().to_string(),
720 0 : ])
721 0 : .args([
722 0 : "--internal-http-port",
723 0 : &self.internal_http_address.port().to_string(),
724 0 : ])
725 0 : .args(["--pgdata", self.pgdata().to_str().unwrap()])
726 0 : .args(["--connstr", &conn_str])
727 0 : .args([
728 0 : "--spec-path",
729 0 : self.endpoint_path().join("spec.json").to_str().unwrap(),
730 0 : ])
731 0 : .args([
732 0 : "--pgbin",
733 0 : self.env
734 0 : .pg_bin_dir(self.pg_version)?
735 0 : .join("postgres")
736 0 : .to_str()
737 0 : .unwrap(),
738 0 : ])
739 0 : // TODO: It would be nice if we generated compute IDs with the same
740 0 : // algorithm as the real control plane.
741 0 : .args([
742 0 : "--compute-id",
743 0 : &format!(
744 0 : "compute-{}",
745 0 : SystemTime::now()
746 0 : .duration_since(UNIX_EPOCH)
747 0 : .unwrap()
748 0 : .as_secs()
749 0 : ),
750 0 : ])
751 0 : .stdin(std::process::Stdio::null())
752 0 : .stderr(logfile.try_clone()?)
753 0 : .stdout(logfile);
754 :
755 0 : if let Some(remote_ext_config) = remote_ext_config {
756 0 : cmd.args(["--remote-ext-config", remote_ext_config]);
757 0 : }
758 :
759 0 : let child = cmd.spawn()?;
760 : // set up a scopeguard to kill & wait for the child in case we panic or bail below
761 0 : let child = scopeguard::guard(child, |mut child| {
762 0 : println!("SIGKILL & wait the started process");
763 0 : (|| {
764 0 : // TODO: use another signal that can be caught by the child so it can clean up any children it spawned
765 0 : child.kill().context("SIGKILL child")?;
766 0 : child.wait().context("wait() for child process")?;
767 0 : anyhow::Ok(())
768 0 : })()
769 0 : .with_context(|| format!("scopeguard kill&wait child {child:?}"))
770 0 : .unwrap();
771 0 : });
772 0 :
773 0 : // Write down the pid so we can wait for it when we want to stop
774 0 : // TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
775 0 : let pid = child.id();
776 0 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
777 0 : std::fs::write(pidfile_path, pid.to_string())?;
778 :
779 : // Wait for it to start
780 : const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
781 0 : let start_at = Instant::now();
782 : loop {
783 0 : match self.get_status().await {
784 0 : Ok(state) => {
785 0 : match state.status {
786 : ComputeStatus::Init => {
787 0 : if Instant::now().duration_since(start_at) > start_timeout {
788 0 : bail!(
789 0 : "compute startup timed out {:?}; still in Init state",
790 0 : start_timeout
791 0 : );
792 0 : }
793 : // keep retrying
794 : }
795 : ComputeStatus::Running => {
796 : // All good!
797 0 : break;
798 : }
799 : ComputeStatus::Failed => {
800 0 : bail!(
801 0 : "compute startup failed: {}",
802 0 : state
803 0 : .error
804 0 : .as_deref()
805 0 : .unwrap_or("<no error from compute_ctl>")
806 0 : );
807 : }
808 : ComputeStatus::Empty
809 : | ComputeStatus::ConfigurationPending
810 : | ComputeStatus::Configuration
811 : | ComputeStatus::TerminationPending
812 : | ComputeStatus::Terminated => {
813 0 : bail!("unexpected compute status: {:?}", state.status)
814 : }
815 : }
816 : }
817 0 : Err(e) => {
818 0 : if Instant::now().duration_since(start_at) > start_timeout {
819 0 : return Err(e).context(format!(
820 0 : "timed out {:?} waiting to connect to compute_ctl HTTP",
821 0 : start_timeout,
822 0 : ));
823 0 : }
824 : }
825 : }
826 0 : tokio::time::sleep(ATTEMPT_INTERVAL).await;
827 : }
828 :
829 : // disarm the scopeguard, let the child outlive this function (and neon_local invoction)
830 0 : drop(scopeguard::ScopeGuard::into_inner(child));
831 0 :
832 0 : Ok(())
833 0 : }
834 :
835 : // Call the /status HTTP API
836 0 : pub async fn get_status(&self) -> Result<ComputeStatusResponse> {
837 0 : let client = reqwest::Client::new();
838 :
839 0 : let response = client
840 0 : .request(
841 0 : reqwest::Method::GET,
842 0 : format!(
843 0 : "http://{}:{}/status",
844 0 : self.external_http_address.ip(),
845 0 : self.external_http_address.port()
846 0 : ),
847 0 : )
848 0 : .send()
849 0 : .await?;
850 :
851 : // Interpret the response
852 0 : let status = response.status();
853 0 : if !(status.is_client_error() || status.is_server_error()) {
854 0 : Ok(response.json().await?)
855 : } else {
856 : // reqwest does not export its error construction utility functions, so let's craft the message ourselves
857 0 : let url = response.url().to_owned();
858 0 : let msg = match response.text().await {
859 0 : Ok(err_body) => format!("Error: {}", err_body),
860 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
861 : };
862 0 : Err(anyhow::anyhow!(msg))
863 : }
864 0 : }
865 :
866 0 : pub async fn reconfigure(
867 0 : &self,
868 0 : mut pageservers: Vec<(Host, u16)>,
869 0 : stripe_size: Option<ShardStripeSize>,
870 0 : safekeepers: Option<Vec<NodeId>>,
871 0 : ) -> Result<()> {
872 0 : let mut spec: ComputeSpec = {
873 0 : let spec_path = self.endpoint_path().join("spec.json");
874 0 : let file = std::fs::File::open(spec_path)?;
875 0 : serde_json::from_reader(file)?
876 : };
877 :
878 0 : let postgresql_conf = self.read_postgresql_conf()?;
879 0 : spec.cluster.postgresql_conf = Some(postgresql_conf);
880 0 :
881 0 : // If we weren't given explicit pageservers, query the storage controller
882 0 : if pageservers.is_empty() {
883 0 : let storage_controller = StorageController::from_env(&self.env);
884 0 : let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
885 0 : pageservers = locate_result
886 0 : .shards
887 0 : .into_iter()
888 0 : .map(|shard| {
889 0 : (
890 0 : Host::parse(&shard.listen_pg_addr)
891 0 : .expect("Storage controller reported bad hostname"),
892 0 : shard.listen_pg_port,
893 0 : )
894 0 : })
895 0 : .collect::<Vec<_>>();
896 0 : }
897 :
898 0 : let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
899 0 : assert!(!pageserver_connstr.is_empty());
900 0 : spec.pageserver_connstring = Some(pageserver_connstr);
901 0 : if stripe_size.is_some() {
902 0 : spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
903 0 : }
904 :
905 : // If safekeepers are not specified, don't change them.
906 0 : if let Some(safekeepers) = safekeepers {
907 0 : let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
908 0 : spec.safekeeper_connstrings = safekeeper_connstrings;
909 0 : }
910 :
911 0 : let client = reqwest::Client::builder()
912 0 : .timeout(Duration::from_secs(120))
913 0 : .build()
914 0 : .unwrap();
915 0 : let response = client
916 0 : .post(format!(
917 0 : "http://{}:{}/configure",
918 0 : self.external_http_address.ip(),
919 0 : self.external_http_address.port()
920 0 : ))
921 0 : .header(CONTENT_TYPE.as_str(), "application/json")
922 0 : .body(
923 0 : serde_json::to_string(&ConfigurationRequest {
924 0 : spec,
925 0 : compute_ctl_config: ComputeCtlConfig::default(),
926 0 : })
927 0 : .unwrap(),
928 0 : )
929 0 : .send()
930 0 : .await?;
931 :
932 0 : let status = response.status();
933 0 : if !(status.is_client_error() || status.is_server_error()) {
934 0 : Ok(())
935 : } else {
936 0 : let url = response.url().to_owned();
937 0 : let msg = match response.text().await {
938 0 : Ok(err_body) => format!("Error: {}", err_body),
939 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
940 : };
941 0 : Err(anyhow::anyhow!(msg))
942 : }
943 0 : }
944 :
945 0 : pub fn stop(&self, mode: &str, destroy: bool) -> Result<()> {
946 0 : self.pg_ctl(&["-m", mode, "stop"], &None)?;
947 :
948 : // Also wait for the compute_ctl process to die. It might have some
949 : // cleanup work to do after postgres stops, like syncing safekeepers,
950 : // etc.
951 : //
952 : // If destroying or stop mode is immediate, send it SIGTERM before
953 : // waiting. Sometimes we do *not* want this cleanup: tests intentionally
954 : // do stop when majority of safekeepers is down, so sync-safekeepers
955 : // would hang otherwise. This could be a separate flag though.
956 0 : let send_sigterm = destroy || mode == "immediate";
957 0 : self.wait_for_compute_ctl_to_exit(send_sigterm)?;
958 0 : if destroy {
959 0 : println!(
960 0 : "Destroying postgres data directory '{}'",
961 0 : self.pgdata().to_str().unwrap()
962 0 : );
963 0 : std::fs::remove_dir_all(self.endpoint_path())?;
964 0 : }
965 0 : Ok(())
966 0 : }
967 :
968 0 : pub fn connstr(&self, user: &str, db_name: &str) -> String {
969 0 : format!(
970 0 : "postgresql://{}@{}:{}/{}",
971 0 : user,
972 0 : self.pg_address.ip(),
973 0 : self.pg_address.port(),
974 0 : db_name
975 0 : )
976 0 : }
977 : }
|