Line data Source code
1 : //! Code to manage compute endpoints
2 : //!
3 : //! In the local test environment, the data for each endpoint is stored in
4 : //!
5 : //! ```text
6 : //! .neon/endpoints/<endpoint id>
7 : //! ```
8 : //!
9 : //! Some basic information about the endpoint, like the tenant and timeline IDs,
10 : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
11 : //! when the endpoint is created, and doesn't change afterwards.
12 : //!
13 : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
14 : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
15 : //! the basebackup from the pageserver to initialize the data directory, and
16 : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
17 : //! until it exits.
18 : //!
19 : //! When an endpoint is created, a `postgresql.conf` file is also created in
20 : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
21 : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
22 : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
23 : //! copy of it in the data directory.
24 : //!
25 : //! Directory contents:
26 : //!
27 : //! ```text
28 : //! .neon/endpoints/main/
29 : //! compute.log - log output of `compute_ctl` and `postgres`
30 : //! endpoint.json - serialized `EndpointConf` struct
31 : //! postgresql.conf - postgresql settings
32 : //! config.json - passed to `compute_ctl`
33 : //! pgdata/
34 : //! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
35 : //! zenith.signal
36 : //! <other PostgreSQL files>
37 : //! ```
38 : //!
39 : use std::collections::BTreeMap;
40 : use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
41 : use std::path::PathBuf;
42 : use std::process::Command;
43 : use std::str::FromStr;
44 : use std::sync::Arc;
45 : use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
46 :
47 : use anyhow::{Context, Result, anyhow, bail};
48 : use compute_api::requests::ConfigurationRequest;
49 : use compute_api::responses::{
50 : ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse,
51 : };
52 : use compute_api::spec::{
53 : Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
54 : RemoteExtSpec, Role,
55 : };
56 : use nix::sys::signal::{Signal, kill};
57 : use pageserver_api::shard::ShardStripeSize;
58 : use reqwest::header::CONTENT_TYPE;
59 : use safekeeper_api::membership::SafekeeperGeneration;
60 : use serde::{Deserialize, Serialize};
61 : use tracing::debug;
62 : use url::Host;
63 : use utils::id::{NodeId, TenantId, TimelineId};
64 :
65 : use crate::local_env::LocalEnv;
66 : use crate::postgresql_conf::PostgresConf;
67 : use crate::storage_controller::StorageController;
68 :
69 : // contents of a endpoint.json file
70 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
71 : pub struct EndpointConf {
72 : endpoint_id: String,
73 : tenant_id: TenantId,
74 : timeline_id: TimelineId,
75 : mode: ComputeMode,
76 : pg_port: u16,
77 : external_http_port: u16,
78 : internal_http_port: u16,
79 : pg_version: u32,
80 : skip_pg_catalog_updates: bool,
81 : reconfigure_concurrency: usize,
82 : drop_subscriptions_before_start: bool,
83 : features: Vec<ComputeFeature>,
84 : cluster: Option<Cluster>,
85 : }
86 :
87 : //
88 : // ComputeControlPlane
89 : //
90 : pub struct ComputeControlPlane {
91 : base_port: u16,
92 :
93 : // endpoint ID is the key
94 : pub endpoints: BTreeMap<String, Arc<Endpoint>>,
95 :
96 : env: LocalEnv,
97 : }
98 :
99 : impl ComputeControlPlane {
100 : // Load current endpoints from the endpoints/ subdirectories
101 0 : pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
102 0 : let mut endpoints = BTreeMap::default();
103 0 : for endpoint_dir in std::fs::read_dir(env.endpoints_path())
104 0 : .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
105 : {
106 0 : let ep_res = Endpoint::from_dir_entry(endpoint_dir?, &env);
107 0 : let ep = match ep_res {
108 0 : Ok(ep) => ep,
109 0 : Err(e) => match e.downcast::<std::io::Error>() {
110 0 : Ok(e) => {
111 0 : // A parallel task could delete an endpoint while we have just scanned the directory
112 0 : if e.kind() == std::io::ErrorKind::NotFound {
113 0 : continue;
114 : } else {
115 0 : Err(e)?
116 : }
117 : }
118 0 : Err(e) => Err(e)?,
119 : },
120 : };
121 0 : endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
122 : }
123 :
124 0 : Ok(ComputeControlPlane {
125 0 : base_port: 55431,
126 0 : endpoints,
127 0 : env,
128 0 : })
129 0 : }
130 :
131 0 : fn get_port(&mut self) -> u16 {
132 0 : 1 + self
133 0 : .endpoints
134 0 : .values()
135 0 : .map(|ep| std::cmp::max(ep.pg_address.port(), ep.external_http_address.port()))
136 0 : .max()
137 0 : .unwrap_or(self.base_port)
138 0 : }
139 :
140 : #[allow(clippy::too_many_arguments)]
141 0 : pub fn new_endpoint(
142 0 : &mut self,
143 0 : endpoint_id: &str,
144 0 : tenant_id: TenantId,
145 0 : timeline_id: TimelineId,
146 0 : pg_port: Option<u16>,
147 0 : external_http_port: Option<u16>,
148 0 : internal_http_port: Option<u16>,
149 0 : pg_version: u32,
150 0 : mode: ComputeMode,
151 0 : skip_pg_catalog_updates: bool,
152 0 : drop_subscriptions_before_start: bool,
153 0 : ) -> Result<Arc<Endpoint>> {
154 0 : let pg_port = pg_port.unwrap_or_else(|| self.get_port());
155 0 : let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
156 0 : let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1);
157 0 : let ep = Arc::new(Endpoint {
158 0 : endpoint_id: endpoint_id.to_owned(),
159 0 : pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), pg_port),
160 0 : external_http_address: SocketAddr::new(
161 0 : IpAddr::from(Ipv4Addr::UNSPECIFIED),
162 0 : external_http_port,
163 0 : ),
164 0 : internal_http_address: SocketAddr::new(
165 0 : IpAddr::from(Ipv4Addr::LOCALHOST),
166 0 : internal_http_port,
167 0 : ),
168 0 : env: self.env.clone(),
169 0 : timeline_id,
170 0 : mode,
171 0 : tenant_id,
172 0 : pg_version,
173 0 : // We don't setup roles and databases in the spec locally, so we don't need to
174 0 : // do catalog updates. Catalog updates also include check availability
175 0 : // data creation. Yet, we have tests that check that size and db dump
176 0 : // before and after start are the same. So, skip catalog updates,
177 0 : // with this we basically test a case of waking up an idle compute, where
178 0 : // we also skip catalog updates in the cloud.
179 0 : skip_pg_catalog_updates,
180 0 : drop_subscriptions_before_start,
181 0 : reconfigure_concurrency: 1,
182 0 : features: vec![],
183 0 : cluster: None,
184 0 : });
185 0 :
186 0 : ep.create_endpoint_dir()?;
187 : std::fs::write(
188 0 : ep.endpoint_path().join("endpoint.json"),
189 0 : serde_json::to_string_pretty(&EndpointConf {
190 0 : endpoint_id: endpoint_id.to_string(),
191 0 : tenant_id,
192 0 : timeline_id,
193 0 : mode,
194 0 : external_http_port,
195 0 : internal_http_port,
196 0 : pg_port,
197 0 : pg_version,
198 0 : skip_pg_catalog_updates,
199 0 : drop_subscriptions_before_start,
200 0 : reconfigure_concurrency: 1,
201 0 : features: vec![],
202 0 : cluster: None,
203 0 : })?,
204 0 : )?;
205 : std::fs::write(
206 0 : ep.endpoint_path().join("postgresql.conf"),
207 0 : ep.setup_pg_conf()?.to_string(),
208 0 : )?;
209 :
210 0 : self.endpoints
211 0 : .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
212 0 :
213 0 : Ok(ep)
214 0 : }
215 :
216 0 : pub fn check_conflicting_endpoints(
217 0 : &self,
218 0 : mode: ComputeMode,
219 0 : tenant_id: TenantId,
220 0 : timeline_id: TimelineId,
221 0 : ) -> Result<()> {
222 0 : if matches!(mode, ComputeMode::Primary) {
223 : // this check is not complete, as you could have a concurrent attempt at
224 : // creating another primary, both reading the state before checking it here,
225 : // but it's better than nothing.
226 0 : let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
227 0 : v.tenant_id == tenant_id
228 0 : && v.timeline_id == timeline_id
229 0 : && v.mode == mode
230 0 : && v.status() != EndpointStatus::Stopped
231 0 : });
232 :
233 0 : if let Some((key, _)) = duplicates.next() {
234 0 : bail!(
235 0 : "attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported."
236 0 : );
237 0 : }
238 0 : }
239 0 : Ok(())
240 0 : }
241 : }
242 :
243 : ///////////////////////////////////////////////////////////////////////////////
244 :
245 : #[derive(Debug)]
246 : pub struct Endpoint {
247 : /// used as the directory name
248 : endpoint_id: String,
249 : pub tenant_id: TenantId,
250 : pub timeline_id: TimelineId,
251 : pub mode: ComputeMode,
252 :
253 : // port and address of the Postgres server and `compute_ctl`'s HTTP APIs
254 : pub pg_address: SocketAddr,
255 : pub external_http_address: SocketAddr,
256 : pub internal_http_address: SocketAddr,
257 :
258 : // postgres major version in the format: 14, 15, etc.
259 : pg_version: u32,
260 :
261 : // These are not part of the endpoint as such, but the environment
262 : // the endpoint runs in.
263 : pub env: LocalEnv,
264 :
265 : // Optimizations
266 : skip_pg_catalog_updates: bool,
267 :
268 : drop_subscriptions_before_start: bool,
269 : reconfigure_concurrency: usize,
270 : // Feature flags
271 : features: Vec<ComputeFeature>,
272 : // Cluster settings
273 : cluster: Option<Cluster>,
274 : }
275 :
276 : #[derive(PartialEq, Eq)]
277 : pub enum EndpointStatus {
278 : Running,
279 : Stopped,
280 : Crashed,
281 : RunningNoPidfile,
282 : }
283 :
284 : impl std::fmt::Display for EndpointStatus {
285 0 : fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
286 0 : let s = match self {
287 0 : Self::Running => "running",
288 0 : Self::Stopped => "stopped",
289 0 : Self::Crashed => "crashed",
290 0 : Self::RunningNoPidfile => "running, no pidfile",
291 : };
292 0 : write!(writer, "{}", s)
293 0 : }
294 : }
295 :
296 : impl Endpoint {
297 0 : fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
298 0 : if !entry.file_type()?.is_dir() {
299 0 : anyhow::bail!(
300 0 : "Endpoint::from_dir_entry failed: '{}' is not a directory",
301 0 : entry.path().display()
302 0 : );
303 0 : }
304 0 :
305 0 : // parse data directory name
306 0 : let fname = entry.file_name();
307 0 : let endpoint_id = fname.to_str().unwrap().to_string();
308 :
309 : // Read the endpoint.json file
310 0 : let conf: EndpointConf =
311 0 : serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
312 :
313 0 : debug!("serialized endpoint conf: {:?}", conf);
314 :
315 0 : Ok(Endpoint {
316 0 : pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
317 0 : external_http_address: SocketAddr::new(
318 0 : IpAddr::from(Ipv4Addr::UNSPECIFIED),
319 0 : conf.external_http_port,
320 0 : ),
321 0 : internal_http_address: SocketAddr::new(
322 0 : IpAddr::from(Ipv4Addr::LOCALHOST),
323 0 : conf.internal_http_port,
324 0 : ),
325 0 : endpoint_id,
326 0 : env: env.clone(),
327 0 : timeline_id: conf.timeline_id,
328 0 : mode: conf.mode,
329 0 : tenant_id: conf.tenant_id,
330 0 : pg_version: conf.pg_version,
331 0 : skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
332 0 : reconfigure_concurrency: conf.reconfigure_concurrency,
333 0 : drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
334 0 : features: conf.features,
335 0 : cluster: conf.cluster,
336 0 : })
337 0 : }
338 :
339 0 : fn create_endpoint_dir(&self) -> Result<()> {
340 0 : std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
341 0 : format!(
342 0 : "could not create endpoint directory {}",
343 0 : self.endpoint_path().display()
344 0 : )
345 0 : })
346 0 : }
347 :
348 : // Generate postgresql.conf with default configuration
349 0 : fn setup_pg_conf(&self) -> Result<PostgresConf> {
350 0 : let mut conf = PostgresConf::new();
351 0 : conf.append("max_wal_senders", "10");
352 0 : conf.append("wal_log_hints", "off");
353 0 : conf.append("max_replication_slots", "10");
354 0 : conf.append("hot_standby", "on");
355 0 : // Set to 1MB to both exercise getPage requests/LFC, and still have enough room for
356 0 : // Postgres to operate. Everything smaller might be not enough for Postgres under load,
357 0 : // and can cause errors like 'no unpinned buffers available', see
358 0 : // <https://github.com/neondatabase/neon/issues/9956>
359 0 : conf.append("shared_buffers", "1MB");
360 0 : // Postgres defaults to effective_io_concurrency=1, which does not exercise the pageserver's
361 0 : // batching logic. Set this to 2 so that we exercise the code a bit without letting
362 0 : // individual tests do a lot of concurrent work on underpowered test machines
363 0 : conf.append("effective_io_concurrency", "2");
364 0 : conf.append("fsync", "off");
365 0 : conf.append("max_connections", "100");
366 0 : conf.append("wal_level", "logical");
367 0 : // wal_sender_timeout is the maximum time to wait for WAL replication.
368 0 : // It also defines how often the walreciever will send a feedback message to the wal sender.
369 0 : conf.append("wal_sender_timeout", "5s");
370 0 : conf.append("listen_addresses", &self.pg_address.ip().to_string());
371 0 : conf.append("port", &self.pg_address.port().to_string());
372 0 : conf.append("wal_keep_size", "0");
373 0 : // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
374 0 : conf.append("restart_after_crash", "off");
375 0 :
376 0 : // Load the 'neon' extension
377 0 : conf.append("shared_preload_libraries", "neon");
378 0 :
379 0 : conf.append_line("");
380 0 : // Replication-related configurations, such as WAL sending
381 0 : match &self.mode {
382 : ComputeMode::Primary => {
383 : // Configure backpressure
384 : // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
385 : // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
386 : // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
387 : // Actually latency should be much smaller (better if < 1sec). But we assume that recently
388 : // updates pages are not requested from pageserver.
389 : // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
390 : // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
391 : // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
392 : // recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
393 : // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
394 : // To be able to restore database in case of pageserver node crash, safekeeper should not
395 : // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
396 : // (if they are not able to upload WAL to S3).
397 0 : conf.append("max_replication_write_lag", "15MB");
398 0 : conf.append("max_replication_flush_lag", "10GB");
399 0 :
400 0 : if !self.env.safekeepers.is_empty() {
401 0 : // Configure Postgres to connect to the safekeepers
402 0 : conf.append("synchronous_standby_names", "walproposer");
403 0 :
404 0 : let safekeepers = self
405 0 : .env
406 0 : .safekeepers
407 0 : .iter()
408 0 : .map(|sk| format!("localhost:{}", sk.get_compute_port()))
409 0 : .collect::<Vec<String>>()
410 0 : .join(",");
411 0 : conf.append("neon.safekeepers", &safekeepers);
412 0 : } else {
413 0 : // We only use setup without safekeepers for tests,
414 0 : // and don't care about data durability on pageserver,
415 0 : // so set more relaxed synchronous_commit.
416 0 : conf.append("synchronous_commit", "remote_write");
417 0 :
418 0 : // Configure the node to stream WAL directly to the pageserver
419 0 : // This isn't really a supported configuration, but can be useful for
420 0 : // testing.
421 0 : conf.append("synchronous_standby_names", "pageserver");
422 0 : }
423 : }
424 0 : ComputeMode::Static(lsn) => {
425 0 : conf.append("recovery_target_lsn", &lsn.to_string());
426 0 : }
427 : ComputeMode::Replica => {
428 0 : assert!(!self.env.safekeepers.is_empty());
429 :
430 : // TODO: use future host field from safekeeper spec
431 : // Pass the list of safekeepers to the replica so that it can connect to any of them,
432 : // whichever is available.
433 0 : let sk_ports = self
434 0 : .env
435 0 : .safekeepers
436 0 : .iter()
437 0 : .map(|x| x.get_compute_port().to_string())
438 0 : .collect::<Vec<_>>()
439 0 : .join(",");
440 0 : let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
441 0 :
442 0 : let connstr = format!(
443 0 : "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
444 0 : sk_hosts,
445 0 : sk_ports,
446 0 : &self.timeline_id.to_string(),
447 0 : &self.tenant_id.to_string(),
448 0 : );
449 0 :
450 0 : let slot_name = format!("repl_{}_", self.timeline_id);
451 0 : conf.append("primary_conninfo", connstr.as_str());
452 0 : conf.append("primary_slot_name", slot_name.as_str());
453 0 : conf.append("hot_standby", "on");
454 0 : // prefetching of blocks referenced in WAL doesn't make sense for us
455 0 : // Neon hot standby ignores pages that are not in the shared_buffers
456 0 : if self.pg_version >= 15 {
457 0 : conf.append("recovery_prefetch", "off");
458 0 : }
459 : }
460 : }
461 :
462 0 : Ok(conf)
463 0 : }
464 :
465 0 : pub fn endpoint_path(&self) -> PathBuf {
466 0 : self.env.endpoints_path().join(&self.endpoint_id)
467 0 : }
468 :
469 0 : pub fn pgdata(&self) -> PathBuf {
470 0 : self.endpoint_path().join("pgdata")
471 0 : }
472 :
473 0 : pub fn status(&self) -> EndpointStatus {
474 0 : let timeout = Duration::from_millis(300);
475 0 : let has_pidfile = self.pgdata().join("postmaster.pid").exists();
476 0 : let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
477 0 :
478 0 : match (has_pidfile, can_connect) {
479 0 : (true, true) => EndpointStatus::Running,
480 0 : (false, false) => EndpointStatus::Stopped,
481 0 : (true, false) => EndpointStatus::Crashed,
482 0 : (false, true) => EndpointStatus::RunningNoPidfile,
483 : }
484 0 : }
485 :
486 0 : fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
487 0 : let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
488 0 : let mut cmd = Command::new(&pg_ctl_path);
489 0 : cmd.args(
490 0 : [
491 0 : &[
492 0 : "-D",
493 0 : self.pgdata().to_str().unwrap(),
494 0 : "-w", //wait till pg_ctl actually does what was asked
495 0 : ],
496 0 : args,
497 0 : ]
498 0 : .concat(),
499 0 : )
500 0 : .env_clear()
501 0 : .env(
502 0 : "LD_LIBRARY_PATH",
503 0 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
504 0 : )
505 0 : .env(
506 0 : "DYLD_LIBRARY_PATH",
507 0 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
508 : );
509 :
510 : // Pass authentication token used for the connections to pageserver and safekeepers
511 0 : if let Some(token) = auth_token {
512 0 : cmd.env("NEON_AUTH_TOKEN", token);
513 0 : }
514 :
515 0 : let pg_ctl = cmd
516 0 : .output()
517 0 : .context(format!("{} failed", pg_ctl_path.display()))?;
518 0 : if !pg_ctl.status.success() {
519 0 : anyhow::bail!(
520 0 : "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
521 0 : pg_ctl.status,
522 0 : String::from_utf8_lossy(&pg_ctl.stdout),
523 0 : String::from_utf8_lossy(&pg_ctl.stderr),
524 0 : );
525 0 : }
526 0 :
527 0 : Ok(())
528 0 : }
529 :
530 0 : fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
531 0 : // TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
532 0 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
533 0 : let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
534 0 : let pid = nix::unistd::Pid::from_raw(pid as i32);
535 0 : if send_sigterm {
536 0 : kill(pid, Signal::SIGTERM).ok();
537 0 : }
538 0 : crate::background_process::wait_until_stopped("compute_ctl", pid)?;
539 0 : Ok(())
540 0 : }
541 :
542 0 : fn read_postgresql_conf(&self) -> Result<String> {
543 0 : // Slurp the endpoints/<endpoint id>/postgresql.conf file into
544 0 : // memory. We will include it in the spec file that we pass to
545 0 : // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
546 0 : // in the data directory.
547 0 : let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
548 0 : match std::fs::read(&postgresql_conf_path) {
549 0 : Ok(content) => Ok(String::from_utf8(content)?),
550 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
551 0 : Err(e) => Err(anyhow::Error::new(e).context(format!(
552 0 : "failed to read config file in {}",
553 0 : postgresql_conf_path.to_str().unwrap()
554 0 : ))),
555 : }
556 0 : }
557 :
558 0 : fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
559 0 : pageservers
560 0 : .iter()
561 0 : .map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
562 0 : .collect::<Vec<_>>()
563 0 : .join(",")
564 0 : }
565 :
566 : /// Map safekeepers ids to the actual connection strings.
567 0 : fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
568 0 : let mut safekeeper_connstrings = Vec::new();
569 0 : if self.mode == ComputeMode::Primary {
570 0 : for sk_id in sk_ids {
571 0 : let sk = self
572 0 : .env
573 0 : .safekeepers
574 0 : .iter()
575 0 : .find(|node| node.id == sk_id)
576 0 : .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
577 0 : safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
578 : }
579 0 : }
580 0 : Ok(safekeeper_connstrings)
581 0 : }
582 :
583 : #[allow(clippy::too_many_arguments)]
584 0 : pub async fn start(
585 0 : &self,
586 0 : auth_token: &Option<String>,
587 0 : safekeepers_generation: Option<SafekeeperGeneration>,
588 0 : safekeepers: Vec<NodeId>,
589 0 : pageservers: Vec<(Host, u16)>,
590 0 : remote_ext_config: Option<&String>,
591 0 : shard_stripe_size: usize,
592 0 : create_test_user: bool,
593 0 : start_timeout: Duration,
594 0 : ) -> Result<()> {
595 0 : if self.status() == EndpointStatus::Running {
596 0 : anyhow::bail!("The endpoint is already running");
597 0 : }
598 :
599 0 : let postgresql_conf = self.read_postgresql_conf()?;
600 :
601 : // We always start the compute node from scratch, so if the Postgres
602 : // data dir exists from a previous launch, remove it first.
603 0 : if self.pgdata().exists() {
604 0 : std::fs::remove_dir_all(self.pgdata())?;
605 0 : }
606 :
607 0 : let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
608 0 : assert!(!pageserver_connstring.is_empty());
609 :
610 0 : let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
611 :
612 : // check for file remote_extensions_spec.json
613 : // if it is present, read it and pass to compute_ctl
614 0 : let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
615 0 : let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
616 : let remote_extensions: Option<RemoteExtSpec>;
617 :
618 0 : if let Ok(spec_file) = remote_extensions_spec {
619 0 : remote_extensions = serde_json::from_reader(spec_file).ok();
620 0 : } else {
621 0 : remote_extensions = None;
622 0 : };
623 :
624 : // Create config file
625 0 : let config = {
626 0 : let mut spec = ComputeSpec {
627 0 : skip_pg_catalog_updates: self.skip_pg_catalog_updates,
628 0 : format_version: 1.0,
629 0 : operation_uuid: None,
630 0 : features: self.features.clone(),
631 0 : swap_size_bytes: None,
632 0 : disk_quota_bytes: None,
633 0 : disable_lfc_resizing: None,
634 0 : cluster: Cluster {
635 0 : cluster_id: None, // project ID: not used
636 0 : name: None, // project name: not used
637 0 : state: None,
638 0 : roles: if create_test_user {
639 0 : vec![Role {
640 0 : name: PgIdent::from_str("test").unwrap(),
641 0 : encrypted_password: None,
642 0 : options: None,
643 0 : }]
644 : } else {
645 0 : Vec::new()
646 : },
647 0 : databases: if create_test_user {
648 0 : vec![Database {
649 0 : name: PgIdent::from_str("neondb").unwrap(),
650 0 : owner: PgIdent::from_str("test").unwrap(),
651 0 : options: None,
652 0 : restrict_conn: false,
653 0 : invalid: false,
654 0 : }]
655 : } else {
656 0 : Vec::new()
657 : },
658 0 : settings: None,
659 0 : postgresql_conf: Some(postgresql_conf.clone()),
660 0 : },
661 0 : delta_operations: None,
662 0 : tenant_id: Some(self.tenant_id),
663 0 : timeline_id: Some(self.timeline_id),
664 0 : project_id: None,
665 0 : branch_id: None,
666 0 : endpoint_id: Some(self.endpoint_id.clone()),
667 0 : mode: self.mode,
668 0 : pageserver_connstring: Some(pageserver_connstring),
669 0 : safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
670 0 : safekeeper_connstrings,
671 0 : storage_auth_token: auth_token.clone(),
672 0 : remote_extensions,
673 0 : pgbouncer_settings: None,
674 0 : shard_stripe_size: Some(shard_stripe_size),
675 0 : local_proxy_config: None,
676 0 : reconfigure_concurrency: self.reconfigure_concurrency,
677 0 : drop_subscriptions_before_start: self.drop_subscriptions_before_start,
678 0 : audit_log_level: ComputeAudit::Disabled,
679 0 : logs_export_host: None::<String>,
680 0 : };
681 0 :
682 0 : // this strange code is needed to support respec() in tests
683 0 : if self.cluster.is_some() {
684 0 : debug!("Cluster is already set in the endpoint spec, using it");
685 0 : spec.cluster = self.cluster.clone().unwrap();
686 0 :
687 0 : debug!("spec.cluster {:?}", spec.cluster);
688 :
689 : // fill missing fields again
690 0 : if create_test_user {
691 0 : spec.cluster.roles.push(Role {
692 0 : name: PgIdent::from_str("test").unwrap(),
693 0 : encrypted_password: None,
694 0 : options: None,
695 0 : });
696 0 : spec.cluster.databases.push(Database {
697 0 : name: PgIdent::from_str("neondb").unwrap(),
698 0 : owner: PgIdent::from_str("test").unwrap(),
699 0 : options: None,
700 0 : restrict_conn: false,
701 0 : invalid: false,
702 0 : });
703 0 : }
704 0 : spec.cluster.postgresql_conf = Some(postgresql_conf);
705 0 : }
706 :
707 0 : ComputeConfig {
708 0 : spec: Some(spec),
709 0 : compute_ctl_config: ComputeCtlConfig::default(),
710 0 : }
711 0 : };
712 0 :
713 0 : // TODO(tristan957): Remove the write to spec.json after compatibility
714 0 : // tests work themselves out
715 0 : let spec_path = self.endpoint_path().join("spec.json");
716 0 : std::fs::write(spec_path, serde_json::to_string_pretty(&config.spec)?)?;
717 0 : let config_path = self.endpoint_path().join("config.json");
718 0 : std::fs::write(config_path, serde_json::to_string_pretty(&config)?)?;
719 :
720 : // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
721 0 : let logfile = std::fs::OpenOptions::new()
722 0 : .create(true)
723 0 : .append(true)
724 0 : .open(self.endpoint_path().join("compute.log"))?;
725 :
726 : // TODO(tristan957): Remove when compatibility tests are no longer an
727 : // issue
728 0 : let old_compute_ctl = {
729 0 : let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
730 0 : let help_output = cmd.arg("--help").output()?;
731 0 : let help_output = String::from_utf8_lossy(&help_output.stdout);
732 0 :
733 0 : !help_output.contains("--config")
734 0 : };
735 0 :
736 0 : // Launch compute_ctl
737 0 : let conn_str = self.connstr("cloud_admin", "postgres");
738 0 : println!("Starting postgres node at '{}'", conn_str);
739 0 : if create_test_user {
740 0 : let conn_str = self.connstr("test", "neondb");
741 0 : println!("Also at '{}'", conn_str);
742 0 : }
743 0 : let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
744 0 : cmd.args([
745 0 : "--external-http-port",
746 0 : &self.external_http_address.port().to_string(),
747 0 : ])
748 0 : .args([
749 0 : "--internal-http-port",
750 0 : &self.internal_http_address.port().to_string(),
751 0 : ])
752 0 : .args(["--pgdata", self.pgdata().to_str().unwrap()])
753 0 : .args(["--connstr", &conn_str])
754 0 : // TODO(tristan957): Change this to --config when compatibility tests
755 0 : // are no longer an issue
756 0 : .args([
757 0 : "--spec-path",
758 0 : self.endpoint_path()
759 0 : .join(if old_compute_ctl {
760 0 : "spec.json"
761 : } else {
762 0 : "config.json"
763 : })
764 0 : .to_str()
765 0 : .unwrap(),
766 0 : ])
767 0 : .args([
768 0 : "--pgbin",
769 0 : self.env
770 0 : .pg_bin_dir(self.pg_version)?
771 0 : .join("postgres")
772 0 : .to_str()
773 0 : .unwrap(),
774 0 : ])
775 0 : // TODO: It would be nice if we generated compute IDs with the same
776 0 : // algorithm as the real control plane.
777 0 : .args([
778 0 : "--compute-id",
779 0 : &format!(
780 0 : "compute-{}",
781 0 : SystemTime::now()
782 0 : .duration_since(UNIX_EPOCH)
783 0 : .unwrap()
784 0 : .as_secs()
785 0 : ),
786 0 : ])
787 0 : .stdin(std::process::Stdio::null())
788 0 : .stderr(logfile.try_clone()?)
789 0 : .stdout(logfile);
790 :
791 0 : if let Some(remote_ext_config) = remote_ext_config {
792 0 : cmd.args(["--remote-ext-config", remote_ext_config]);
793 0 : }
794 :
795 0 : let child = cmd.spawn()?;
796 : // set up a scopeguard to kill & wait for the child in case we panic or bail below
797 0 : let child = scopeguard::guard(child, |mut child| {
798 0 : println!("SIGKILL & wait the started process");
799 0 : (|| {
800 0 : // TODO: use another signal that can be caught by the child so it can clean up any children it spawned
801 0 : child.kill().context("SIGKILL child")?;
802 0 : child.wait().context("wait() for child process")?;
803 0 : anyhow::Ok(())
804 0 : })()
805 0 : .with_context(|| format!("scopeguard kill&wait child {child:?}"))
806 0 : .unwrap();
807 0 : });
808 0 :
809 0 : // Write down the pid so we can wait for it when we want to stop
810 0 : // TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
811 0 : let pid = child.id();
812 0 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
813 0 : std::fs::write(pidfile_path, pid.to_string())?;
814 :
815 : // Wait for it to start
816 : const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
817 0 : let start_at = Instant::now();
818 : loop {
819 0 : match self.get_status().await {
820 0 : Ok(state) => {
821 0 : match state.status {
822 : ComputeStatus::Init => {
823 0 : if Instant::now().duration_since(start_at) > start_timeout {
824 0 : bail!(
825 0 : "compute startup timed out {:?}; still in Init state",
826 0 : start_timeout
827 0 : );
828 0 : }
829 : // keep retrying
830 : }
831 : ComputeStatus::Running => {
832 : // All good!
833 0 : break;
834 : }
835 : ComputeStatus::Failed => {
836 0 : bail!(
837 0 : "compute startup failed: {}",
838 0 : state
839 0 : .error
840 0 : .as_deref()
841 0 : .unwrap_or("<no error from compute_ctl>")
842 0 : );
843 : }
844 : ComputeStatus::Empty
845 : | ComputeStatus::ConfigurationPending
846 : | ComputeStatus::Configuration
847 : | ComputeStatus::TerminationPending
848 : | ComputeStatus::Terminated => {
849 0 : bail!("unexpected compute status: {:?}", state.status)
850 : }
851 : }
852 : }
853 0 : Err(e) => {
854 0 : if Instant::now().duration_since(start_at) > start_timeout {
855 0 : return Err(e).context(format!(
856 0 : "timed out {:?} waiting to connect to compute_ctl HTTP",
857 0 : start_timeout,
858 0 : ));
859 0 : }
860 : }
861 : }
862 0 : tokio::time::sleep(ATTEMPT_INTERVAL).await;
863 : }
864 :
865 : // disarm the scopeguard, let the child outlive this function (and neon_local invoction)
866 0 : drop(scopeguard::ScopeGuard::into_inner(child));
867 0 :
868 0 : Ok(())
869 0 : }
870 :
871 : // Call the /status HTTP API
872 0 : pub async fn get_status(&self) -> Result<ComputeStatusResponse> {
873 0 : let client = reqwest::Client::new();
874 :
875 0 : let response = client
876 0 : .request(
877 0 : reqwest::Method::GET,
878 0 : format!(
879 0 : "http://{}:{}/status",
880 0 : self.external_http_address.ip(),
881 0 : self.external_http_address.port()
882 0 : ),
883 0 : )
884 0 : .send()
885 0 : .await?;
886 :
887 : // Interpret the response
888 0 : let status = response.status();
889 0 : if !(status.is_client_error() || status.is_server_error()) {
890 0 : Ok(response.json().await?)
891 : } else {
892 : // reqwest does not export its error construction utility functions, so let's craft the message ourselves
893 0 : let url = response.url().to_owned();
894 0 : let msg = match response.text().await {
895 0 : Ok(err_body) => format!("Error: {}", err_body),
896 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
897 : };
898 0 : Err(anyhow::anyhow!(msg))
899 : }
900 0 : }
901 :
902 0 : pub async fn reconfigure(
903 0 : &self,
904 0 : mut pageservers: Vec<(Host, u16)>,
905 0 : stripe_size: Option<ShardStripeSize>,
906 0 : safekeepers: Option<Vec<NodeId>>,
907 0 : ) -> Result<()> {
908 0 : let (mut spec, compute_ctl_config) = {
909 0 : let config_path = self.endpoint_path().join("config.json");
910 0 : let file = std::fs::File::open(config_path)?;
911 0 : let config: ComputeConfig = serde_json::from_reader(file)?;
912 :
913 0 : (config.spec.unwrap(), config.compute_ctl_config)
914 : };
915 :
916 0 : let postgresql_conf = self.read_postgresql_conf()?;
917 0 : spec.cluster.postgresql_conf = Some(postgresql_conf);
918 0 :
919 0 : // If we weren't given explicit pageservers, query the storage controller
920 0 : if pageservers.is_empty() {
921 0 : let storage_controller = StorageController::from_env(&self.env);
922 0 : let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
923 0 : pageservers = locate_result
924 0 : .shards
925 0 : .into_iter()
926 0 : .map(|shard| {
927 0 : (
928 0 : Host::parse(&shard.listen_pg_addr)
929 0 : .expect("Storage controller reported bad hostname"),
930 0 : shard.listen_pg_port,
931 0 : )
932 0 : })
933 0 : .collect::<Vec<_>>();
934 0 : }
935 :
936 0 : let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
937 0 : assert!(!pageserver_connstr.is_empty());
938 0 : spec.pageserver_connstring = Some(pageserver_connstr);
939 0 : if stripe_size.is_some() {
940 0 : spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
941 0 : }
942 :
943 : // If safekeepers are not specified, don't change them.
944 0 : if let Some(safekeepers) = safekeepers {
945 0 : let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
946 0 : spec.safekeeper_connstrings = safekeeper_connstrings;
947 0 : }
948 :
949 0 : let client = reqwest::Client::builder()
950 0 : .timeout(Duration::from_secs(120))
951 0 : .build()
952 0 : .unwrap();
953 0 : let response = client
954 0 : .post(format!(
955 0 : "http://{}:{}/configure",
956 0 : self.external_http_address.ip(),
957 0 : self.external_http_address.port()
958 0 : ))
959 0 : .header(CONTENT_TYPE.as_str(), "application/json")
960 0 : .body(
961 0 : serde_json::to_string(&ConfigurationRequest {
962 0 : spec,
963 0 : compute_ctl_config,
964 0 : })
965 0 : .unwrap(),
966 0 : )
967 0 : .send()
968 0 : .await?;
969 :
970 0 : let status = response.status();
971 0 : if !(status.is_client_error() || status.is_server_error()) {
972 0 : Ok(())
973 : } else {
974 0 : let url = response.url().to_owned();
975 0 : let msg = match response.text().await {
976 0 : Ok(err_body) => format!("Error: {}", err_body),
977 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
978 : };
979 0 : Err(anyhow::anyhow!(msg))
980 : }
981 0 : }
982 :
983 0 : pub fn stop(&self, mode: &str, destroy: bool) -> Result<()> {
984 0 : self.pg_ctl(&["-m", mode, "stop"], &None)?;
985 :
986 : // Also wait for the compute_ctl process to die. It might have some
987 : // cleanup work to do after postgres stops, like syncing safekeepers,
988 : // etc.
989 : //
990 : // If destroying or stop mode is immediate, send it SIGTERM before
991 : // waiting. Sometimes we do *not* want this cleanup: tests intentionally
992 : // do stop when majority of safekeepers is down, so sync-safekeepers
993 : // would hang otherwise. This could be a separate flag though.
994 0 : let send_sigterm = destroy || mode == "immediate";
995 0 : self.wait_for_compute_ctl_to_exit(send_sigterm)?;
996 0 : if destroy {
997 0 : println!(
998 0 : "Destroying postgres data directory '{}'",
999 0 : self.pgdata().to_str().unwrap()
1000 0 : );
1001 0 : std::fs::remove_dir_all(self.endpoint_path())?;
1002 0 : }
1003 0 : Ok(())
1004 0 : }
1005 :
1006 0 : pub fn connstr(&self, user: &str, db_name: &str) -> String {
1007 0 : format!(
1008 0 : "postgresql://{}@{}:{}/{}",
1009 0 : user,
1010 0 : self.pg_address.ip(),
1011 0 : self.pg_address.port(),
1012 0 : db_name
1013 0 : )
1014 0 : }
1015 : }
|