Line data Source code
1 : //! Code to manage compute endpoints
2 : //!
3 : //! In the local test environment, the data for each endpoint is stored in
4 : //!
5 : //! ```text
6 : //! .neon/endpoints/<endpoint id>
7 : //! ```
8 : //!
9 : //! Some basic information about the endpoint, like the tenant and timeline IDs,
10 : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
11 : //! when the endpoint is created, and doesn't change afterwards.
12 : //!
13 : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
14 : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
15 : //! the basebackup from the pageserver to initialize the data directory, and
16 : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
17 : //! until it exits.
18 : //!
19 : //! When an endpoint is created, a `postgresql.conf` file is also created in
20 : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
21 : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
22 : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
23 : //! copy of it in the data directory.
24 : //!
25 : //! Directory contents:
26 : //!
27 : //! ```text
28 : //! .neon/endpoints/main/
29 : //! compute.log - log output of `compute_ctl` and `postgres`
30 : //! endpoint.json - serialized `EndpointConf` struct
31 : //! postgresql.conf - postgresql settings
32 : //! config.json - passed to `compute_ctl`
33 : //! pgdata/
34 : //! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
35 : //! neon.signal
36 : //! zenith.signal - copy of neon.signal, for backward compatibility
37 : //! <other PostgreSQL files>
38 : //! ```
39 : //!
40 : use anyhow::{Context, Result, anyhow, bail};
41 : use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
42 : use compute_api::requests::{
43 : COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
44 : };
45 : use compute_api::responses::{
46 : ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TerminateResponse,
47 : TlsConfig,
48 : };
49 : use compute_api::spec::{
50 : Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PageserverProtocol,
51 : PageserverShardInfo, PgIdent, RemoteExtSpec, Role,
52 : };
53 :
54 : // re-export these, because they're used in the reconfigure() function
55 : pub use compute_api::spec::{PageserverConnectionInfo, PageserverShardConnectionInfo};
56 :
57 : use jsonwebtoken::jwk::{
58 : AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
59 : OctetKeyPairParameters, OctetKeyPairType, PublicKeyUse, RSAKeyParameters, RSAKeyType,
60 : };
61 : use nix::sys::signal::{Signal, kill};
62 : use pem::Pem;
63 : use reqwest::header::CONTENT_TYPE;
64 : use rsa::{RsaPublicKey, pkcs1::DecodeRsaPublicKey, traits::PublicKeyParts};
65 : use safekeeper_api::PgMajorVersion;
66 : use safekeeper_api::membership::SafekeeperGeneration;
67 : use serde::{Deserialize, Serialize};
68 : use sha2::{Digest, Sha256};
69 : use spki::der::Decode;
70 : use spki::{SubjectPublicKeyInfo, SubjectPublicKeyInfoRef};
71 : use std::collections::{BTreeMap, HashMap};
72 : use std::fmt::Display;
73 : use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
74 : use std::path::PathBuf;
75 : use std::process::Command;
76 : use std::str::FromStr;
77 : use std::sync::Arc;
78 : use std::time::{Duration, Instant};
79 : use tracing::debug;
80 : use utils::id::{NodeId, TenantId, TimelineId};
81 : use utils::shard::{ShardCount, ShardIndex, ShardNumber};
82 : use x509_parser::parse_x509_certificate;
83 :
84 : use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT;
85 : use postgres_connection::parse_host_port;
86 :
87 : use crate::local_env::LocalEnv;
88 : use crate::postgresql_conf::PostgresConf;
89 :
90 : // contents of a endpoint.json file
91 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
92 : pub struct EndpointConf {
93 : endpoint_id: String,
94 : tenant_id: TenantId,
95 : timeline_id: TimelineId,
96 : mode: ComputeMode,
97 : pg_port: u16,
98 : external_http_port: u16,
99 : internal_http_port: u16,
100 : pg_version: PgMajorVersion,
101 : grpc: bool,
102 : skip_pg_catalog_updates: bool,
103 : reconfigure_concurrency: usize,
104 : drop_subscriptions_before_start: bool,
105 : features: Vec<ComputeFeature>,
106 : cluster: Option<Cluster>,
107 : compute_ctl_config: ComputeCtlConfig,
108 : privileged_role_name: Option<String>,
109 : }
110 :
111 : //
112 : // ComputeControlPlane
113 : //
114 : pub struct ComputeControlPlane {
115 : base_port: u16,
116 :
117 : // endpoint ID is the key
118 : pub endpoints: BTreeMap<String, Arc<Endpoint>>,
119 :
120 : env: LocalEnv,
121 : }
122 :
123 : impl ComputeControlPlane {
124 : // Load current endpoints from the endpoints/ subdirectories
125 0 : pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
126 0 : let mut endpoints = BTreeMap::default();
127 0 : for endpoint_dir in std::fs::read_dir(env.endpoints_path())
128 0 : .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
129 : {
130 0 : let ep_res = Endpoint::from_dir_entry(endpoint_dir?, &env);
131 0 : let ep = match ep_res {
132 0 : Ok(ep) => ep,
133 0 : Err(e) => match e.downcast::<std::io::Error>() {
134 0 : Ok(e) => {
135 : // A parallel task could delete an endpoint while we have just scanned the directory
136 0 : if e.kind() == std::io::ErrorKind::NotFound {
137 0 : continue;
138 : } else {
139 0 : Err(e)?
140 : }
141 : }
142 0 : Err(e) => Err(e)?,
143 : },
144 : };
145 0 : endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
146 : }
147 :
148 0 : Ok(ComputeControlPlane {
149 0 : base_port: 55431,
150 0 : endpoints,
151 0 : env,
152 0 : })
153 0 : }
154 :
155 0 : fn get_port(&mut self) -> u16 {
156 0 : 1 + self
157 0 : .endpoints
158 0 : .values()
159 0 : .map(|ep| std::cmp::max(ep.pg_address.port(), ep.external_http_address.port()))
160 0 : .max()
161 0 : .unwrap_or(self.base_port)
162 0 : }
163 :
164 : // BEGIN HADRON
165 :
166 : /// Extract SubjectPublicKeyInfo from a PEM that can be either a X509 certificate or a public key
167 0 : fn extract_spki_from_pem(pem: &Pem) -> Result<Vec<u8>> {
168 0 : if pem.tag() == "CERTIFICATE" {
169 : // Handle X509 certificate
170 0 : let (_, cert) = parse_x509_certificate(pem.contents())?;
171 0 : let public_key = cert.public_key();
172 0 : Ok(public_key.subject_public_key.data.to_vec())
173 : } else {
174 : // Handle public key directly
175 0 : let spki: SubjectPublicKeyInfoRef = SubjectPublicKeyInfo::from_der(pem.contents())?;
176 0 : Ok(spki.subject_public_key.raw_bytes().to_vec())
177 : }
178 0 : }
179 :
180 : /// Create RSA JWK from certificate PEM
181 0 : fn create_rsa_jwk_from_cert(pem: &Pem, key_hash: &[u8]) -> Result<Jwk> {
182 0 : let public_key = Self::extract_spki_from_pem(pem)?;
183 :
184 : // Extract RSA parameters (n, e) from RSA public key DER data
185 0 : let rsa_key = RsaPublicKey::from_pkcs1_der(&public_key)?;
186 0 : let n = rsa_key.n().to_bytes_be();
187 0 : let e = rsa_key.e().to_bytes_be();
188 :
189 0 : Ok(Jwk {
190 0 : common: CommonParameters {
191 0 : public_key_use: Some(PublicKeyUse::Signature),
192 0 : key_operations: Some(vec![KeyOperations::Verify]),
193 0 : key_algorithm: Some(KeyAlgorithm::RS256),
194 0 : key_id: Some(URL_SAFE_NO_PAD.encode(key_hash)),
195 0 : x509_url: None::<String>,
196 0 : x509_chain: None::<Vec<String>>,
197 0 : x509_sha1_fingerprint: None::<String>,
198 0 : x509_sha256_fingerprint: None::<String>,
199 0 : },
200 0 : algorithm: AlgorithmParameters::RSA(RSAKeyParameters {
201 0 : key_type: RSAKeyType::RSA,
202 0 : n: URL_SAFE_NO_PAD.encode(n),
203 0 : e: URL_SAFE_NO_PAD.encode(e),
204 0 : }),
205 0 : })
206 0 : }
207 :
208 : // END HADRON
209 :
210 : /// Create a JSON Web Key Set. This ideally matches the way we create a JWKS
211 : /// from the production control plane.
212 0 : fn create_jwks_from_pem(pem: &Pem) -> Result<JwkSet> {
213 0 : let public_key = Self::extract_spki_from_pem(pem)?;
214 :
215 0 : let mut hasher = Sha256::new();
216 0 : hasher.update(&public_key);
217 0 : let key_hash = hasher.finalize();
218 :
219 : // BEGIN HADRON
220 0 : if pem.tag() == "CERTIFICATE" {
221 : // Assume RSA if we are parsing keys from a certificate.
222 0 : let jwk = Self::create_rsa_jwk_from_cert(pem, &key_hash)?;
223 0 : return Ok(JwkSet { keys: vec![jwk] });
224 0 : }
225 : // END HADRON
226 :
227 0 : Ok(JwkSet {
228 0 : keys: vec![Jwk {
229 0 : common: CommonParameters {
230 0 : public_key_use: Some(PublicKeyUse::Signature),
231 0 : key_operations: Some(vec![KeyOperations::Verify]),
232 0 : key_algorithm: Some(KeyAlgorithm::EdDSA),
233 0 : key_id: Some(URL_SAFE_NO_PAD.encode(key_hash)),
234 0 : x509_url: None::<String>,
235 0 : x509_chain: None::<Vec<String>>,
236 0 : x509_sha1_fingerprint: None::<String>,
237 0 : x509_sha256_fingerprint: None::<String>,
238 0 : },
239 0 : algorithm: AlgorithmParameters::OctetKeyPair(OctetKeyPairParameters {
240 0 : key_type: OctetKeyPairType::OctetKeyPair,
241 0 : curve: EllipticCurve::Ed25519,
242 0 : x: URL_SAFE_NO_PAD.encode(public_key),
243 0 : }),
244 0 : }],
245 0 : })
246 0 : }
247 :
248 : #[allow(clippy::too_many_arguments)]
249 0 : pub fn new_endpoint(
250 0 : &mut self,
251 0 : endpoint_id: &str,
252 0 : tenant_id: TenantId,
253 0 : timeline_id: TimelineId,
254 0 : pg_port: Option<u16>,
255 0 : external_http_port: Option<u16>,
256 0 : internal_http_port: Option<u16>,
257 0 : pg_version: PgMajorVersion,
258 0 : mode: ComputeMode,
259 0 : grpc: bool,
260 0 : skip_pg_catalog_updates: bool,
261 0 : drop_subscriptions_before_start: bool,
262 0 : privileged_role_name: Option<String>,
263 0 : ) -> Result<Arc<Endpoint>> {
264 0 : let pg_port = pg_port.unwrap_or_else(|| self.get_port());
265 0 : let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
266 0 : let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1);
267 0 : let compute_ctl_config = ComputeCtlConfig {
268 0 : jwks: Self::create_jwks_from_pem(&self.env.read_public_key()?)?,
269 0 : tls: None::<TlsConfig>,
270 : };
271 0 : let ep = Arc::new(Endpoint {
272 0 : endpoint_id: endpoint_id.to_owned(),
273 0 : pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), pg_port),
274 0 : external_http_address: SocketAddr::new(
275 0 : IpAddr::from(Ipv4Addr::UNSPECIFIED),
276 0 : external_http_port,
277 0 : ),
278 0 : internal_http_address: SocketAddr::new(
279 0 : IpAddr::from(Ipv4Addr::LOCALHOST),
280 0 : internal_http_port,
281 0 : ),
282 0 : env: self.env.clone(),
283 0 : timeline_id,
284 0 : mode,
285 0 : tenant_id,
286 0 : pg_version,
287 0 : // We don't setup roles and databases in the spec locally, so we don't need to
288 0 : // do catalog updates. Catalog updates also include check availability
289 0 : // data creation. Yet, we have tests that check that size and db dump
290 0 : // before and after start are the same. So, skip catalog updates,
291 0 : // with this we basically test a case of waking up an idle compute, where
292 0 : // we also skip catalog updates in the cloud.
293 0 : skip_pg_catalog_updates,
294 0 : drop_subscriptions_before_start,
295 0 : grpc,
296 0 : reconfigure_concurrency: 1,
297 0 : features: vec![],
298 0 : cluster: None,
299 0 : compute_ctl_config: compute_ctl_config.clone(),
300 0 : privileged_role_name: privileged_role_name.clone(),
301 0 : });
302 :
303 0 : ep.create_endpoint_dir()?;
304 0 : std::fs::write(
305 0 : ep.endpoint_path().join("endpoint.json"),
306 0 : serde_json::to_string_pretty(&EndpointConf {
307 0 : endpoint_id: endpoint_id.to_string(),
308 0 : tenant_id,
309 0 : timeline_id,
310 0 : mode,
311 0 : external_http_port,
312 0 : internal_http_port,
313 0 : pg_port,
314 0 : pg_version,
315 0 : grpc,
316 0 : skip_pg_catalog_updates,
317 0 : drop_subscriptions_before_start,
318 0 : reconfigure_concurrency: 1,
319 0 : features: vec![],
320 0 : cluster: None,
321 0 : compute_ctl_config,
322 0 : privileged_role_name,
323 0 : })?,
324 0 : )?;
325 0 : std::fs::write(
326 0 : ep.endpoint_path().join("postgresql.conf"),
327 0 : ep.setup_pg_conf()?.to_string(),
328 0 : )?;
329 :
330 0 : self.endpoints
331 0 : .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
332 :
333 0 : Ok(ep)
334 0 : }
335 :
336 0 : pub fn check_conflicting_endpoints(
337 0 : &self,
338 0 : mode: ComputeMode,
339 0 : tenant_id: TenantId,
340 0 : timeline_id: TimelineId,
341 0 : ) -> Result<()> {
342 0 : if matches!(mode, ComputeMode::Primary) {
343 : // this check is not complete, as you could have a concurrent attempt at
344 : // creating another primary, both reading the state before checking it here,
345 : // but it's better than nothing.
346 0 : let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
347 0 : v.tenant_id == tenant_id
348 0 : && v.timeline_id == timeline_id
349 0 : && v.mode == mode
350 0 : && v.status() != EndpointStatus::Stopped
351 0 : });
352 :
353 0 : if let Some((key, _)) = duplicates.next() {
354 0 : bail!(
355 0 : "attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported."
356 : );
357 0 : }
358 0 : }
359 0 : Ok(())
360 0 : }
361 : }
362 :
363 : ///////////////////////////////////////////////////////////////////////////////
364 :
365 : pub struct Endpoint {
366 : /// used as the directory name
367 : endpoint_id: String,
368 : pub tenant_id: TenantId,
369 : pub timeline_id: TimelineId,
370 : pub mode: ComputeMode,
371 : /// If true, the endpoint should use gRPC to communicate with Pageservers.
372 : pub grpc: bool,
373 :
374 : // port and address of the Postgres server and `compute_ctl`'s HTTP APIs
375 : pub pg_address: SocketAddr,
376 : pub external_http_address: SocketAddr,
377 : pub internal_http_address: SocketAddr,
378 :
379 : // postgres major version in the format: 14, 15, etc.
380 : pg_version: PgMajorVersion,
381 :
382 : // These are not part of the endpoint as such, but the environment
383 : // the endpoint runs in.
384 : pub env: LocalEnv,
385 :
386 : // Optimizations
387 : skip_pg_catalog_updates: bool,
388 :
389 : drop_subscriptions_before_start: bool,
390 : reconfigure_concurrency: usize,
391 : // Feature flags
392 : features: Vec<ComputeFeature>,
393 : // Cluster settings
394 : cluster: Option<Cluster>,
395 :
396 : /// The compute_ctl config for the endpoint's compute.
397 : compute_ctl_config: ComputeCtlConfig,
398 :
399 : /// The name of the privileged role for the endpoint.
400 : privileged_role_name: Option<String>,
401 : }
402 :
403 : #[derive(PartialEq, Eq)]
404 : pub enum EndpointStatus {
405 : Running,
406 : Stopped,
407 : Crashed,
408 : RunningNoPidfile,
409 : }
410 :
411 : impl Display for EndpointStatus {
412 0 : fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
413 0 : writer.write_str(match self {
414 0 : Self::Running => "running",
415 0 : Self::Stopped => "stopped",
416 0 : Self::Crashed => "crashed",
417 0 : Self::RunningNoPidfile => "running, no pidfile",
418 : })
419 0 : }
420 : }
421 :
422 : #[derive(Default, Clone, Copy, clap::ValueEnum)]
423 : pub enum EndpointTerminateMode {
424 : #[default]
425 : /// Use pg_ctl stop -m fast
426 : Fast,
427 : /// Use pg_ctl stop -m immediate
428 : Immediate,
429 : /// Use /terminate?mode=immediate
430 : ImmediateTerminate,
431 : }
432 :
433 : impl std::fmt::Display for EndpointTerminateMode {
434 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
435 0 : f.write_str(match &self {
436 0 : EndpointTerminateMode::Fast => "fast",
437 0 : EndpointTerminateMode::Immediate => "immediate",
438 0 : EndpointTerminateMode::ImmediateTerminate => "immediate-terminate",
439 : })
440 0 : }
441 : }
442 :
443 : pub struct EndpointStartArgs {
444 : pub auth_token: Option<String>,
445 : pub endpoint_storage_token: String,
446 : pub endpoint_storage_addr: String,
447 : pub safekeepers_generation: Option<SafekeeperGeneration>,
448 : pub safekeepers: Vec<NodeId>,
449 : pub pageserver_conninfo: PageserverConnectionInfo,
450 : pub remote_ext_base_url: Option<String>,
451 : pub create_test_user: bool,
452 : pub start_timeout: Duration,
453 : pub autoprewarm: bool,
454 : pub offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
455 : pub dev: bool,
456 : }
457 :
458 : impl Endpoint {
459 0 : fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
460 0 : if !entry.file_type()?.is_dir() {
461 0 : anyhow::bail!(
462 0 : "Endpoint::from_dir_entry failed: '{}' is not a directory",
463 0 : entry.path().display()
464 : );
465 0 : }
466 :
467 : // parse data directory name
468 0 : let fname = entry.file_name();
469 0 : let endpoint_id = fname.to_str().unwrap().to_string();
470 :
471 : // Read the endpoint.json file
472 0 : let conf: EndpointConf =
473 0 : serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
474 :
475 0 : debug!("serialized endpoint conf: {:?}", conf);
476 :
477 0 : Ok(Endpoint {
478 0 : pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
479 0 : external_http_address: SocketAddr::new(
480 0 : IpAddr::from(Ipv4Addr::UNSPECIFIED),
481 0 : conf.external_http_port,
482 0 : ),
483 0 : internal_http_address: SocketAddr::new(
484 0 : IpAddr::from(Ipv4Addr::LOCALHOST),
485 0 : conf.internal_http_port,
486 0 : ),
487 0 : endpoint_id,
488 0 : env: env.clone(),
489 0 : timeline_id: conf.timeline_id,
490 0 : mode: conf.mode,
491 0 : tenant_id: conf.tenant_id,
492 0 : pg_version: conf.pg_version,
493 0 : grpc: conf.grpc,
494 0 : skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
495 0 : reconfigure_concurrency: conf.reconfigure_concurrency,
496 0 : drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
497 0 : features: conf.features,
498 0 : cluster: conf.cluster,
499 0 : compute_ctl_config: conf.compute_ctl_config,
500 0 : privileged_role_name: conf.privileged_role_name,
501 0 : })
502 0 : }
503 :
504 0 : fn create_endpoint_dir(&self) -> Result<()> {
505 0 : std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
506 0 : format!(
507 0 : "could not create endpoint directory {}",
508 0 : self.endpoint_path().display()
509 : )
510 0 : })
511 0 : }
512 :
513 : // Generate postgresql.conf with default configuration
514 0 : fn setup_pg_conf(&self) -> Result<PostgresConf> {
515 0 : let mut conf = PostgresConf::new();
516 0 : conf.append("max_wal_senders", "10");
517 0 : conf.append("wal_log_hints", "off");
518 0 : conf.append("max_replication_slots", "10");
519 0 : conf.append("hot_standby", "on");
520 : // Set to 1MB to both exercise getPage requests/LFC, and still have enough room for
521 : // Postgres to operate. Everything smaller might be not enough for Postgres under load,
522 : // and can cause errors like 'no unpinned buffers available', see
523 : // <https://github.com/neondatabase/neon/issues/9956>
524 0 : conf.append("shared_buffers", "1MB");
525 : // Postgres defaults to effective_io_concurrency=1, which does not exercise the pageserver's
526 : // batching logic. Set this to 2 so that we exercise the code a bit without letting
527 : // individual tests do a lot of concurrent work on underpowered test machines
528 0 : conf.append("effective_io_concurrency", "2");
529 0 : conf.append("fsync", "off");
530 0 : conf.append("max_connections", "100");
531 0 : conf.append("wal_level", "logical");
532 : // wal_sender_timeout is the maximum time to wait for WAL replication.
533 : // It also defines how often the walreceiver will send a feedback message to the wal sender.
534 0 : conf.append("wal_sender_timeout", "5s");
535 0 : conf.append("listen_addresses", &self.pg_address.ip().to_string());
536 0 : conf.append("port", &self.pg_address.port().to_string());
537 0 : conf.append("wal_keep_size", "0");
538 : // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
539 0 : conf.append("restart_after_crash", "off");
540 :
541 : // Load the 'neon' extension
542 0 : conf.append("shared_preload_libraries", "neon");
543 :
544 0 : conf.append_line("");
545 : // Replication-related configurations, such as WAL sending
546 0 : match &self.mode {
547 : ComputeMode::Primary => {
548 : // Configure backpressure
549 : // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
550 : // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
551 : // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
552 : // Actually latency should be much smaller (better if < 1sec). But we assume that recently
553 : // updates pages are not requested from pageserver.
554 : // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
555 : // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
556 : // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
557 : // recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
558 : // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
559 : // To be able to restore database in case of pageserver node crash, safekeeper should not
560 : // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
561 : // (if they are not able to upload WAL to S3).
562 0 : conf.append("max_replication_write_lag", "15MB");
563 0 : conf.append("max_replication_flush_lag", "10GB");
564 :
565 0 : if !self.env.safekeepers.is_empty() {
566 : // Configure Postgres to connect to the safekeepers
567 0 : conf.append("synchronous_standby_names", "walproposer");
568 :
569 0 : let safekeepers = self
570 0 : .env
571 0 : .safekeepers
572 0 : .iter()
573 0 : .map(|sk| format!("localhost:{}", sk.get_compute_port()))
574 0 : .collect::<Vec<String>>()
575 0 : .join(",");
576 0 : conf.append("neon.safekeepers", &safekeepers);
577 0 : } else {
578 0 : // We only use setup without safekeepers for tests,
579 0 : // and don't care about data durability on pageserver,
580 0 : // so set more relaxed synchronous_commit.
581 0 : conf.append("synchronous_commit", "remote_write");
582 0 :
583 0 : // Configure the node to stream WAL directly to the pageserver
584 0 : // This isn't really a supported configuration, but can be useful for
585 0 : // testing.
586 0 : conf.append("synchronous_standby_names", "pageserver");
587 0 : }
588 : }
589 0 : ComputeMode::Static(lsn) => {
590 0 : conf.append("recovery_target_lsn", &lsn.to_string());
591 0 : }
592 : ComputeMode::Replica => {
593 0 : assert!(!self.env.safekeepers.is_empty());
594 :
595 : // TODO: use future host field from safekeeper spec
596 : // Pass the list of safekeepers to the replica so that it can connect to any of them,
597 : // whichever is available.
598 0 : let sk_ports = self
599 0 : .env
600 0 : .safekeepers
601 0 : .iter()
602 0 : .map(|x| x.get_compute_port().to_string())
603 0 : .collect::<Vec<_>>()
604 0 : .join(",");
605 0 : let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
606 :
607 0 : let connstr = format!(
608 0 : "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
609 : sk_hosts,
610 : sk_ports,
611 0 : &self.timeline_id.to_string(),
612 0 : &self.tenant_id.to_string(),
613 : );
614 :
615 0 : let slot_name = format!("repl_{}_", self.timeline_id);
616 0 : conf.append("primary_conninfo", connstr.as_str());
617 0 : conf.append("primary_slot_name", slot_name.as_str());
618 0 : conf.append("hot_standby", "on");
619 : // prefetching of blocks referenced in WAL doesn't make sense for us
620 : // Neon hot standby ignores pages that are not in the shared_buffers
621 0 : if self.pg_version >= PgMajorVersion::PG15 {
622 0 : conf.append("recovery_prefetch", "off");
623 0 : }
624 : }
625 : }
626 :
627 0 : Ok(conf)
628 0 : }
629 :
630 0 : pub fn endpoint_path(&self) -> PathBuf {
631 0 : self.env.endpoints_path().join(&self.endpoint_id)
632 0 : }
633 :
634 0 : pub fn pgdata(&self) -> PathBuf {
635 0 : self.endpoint_path().join("pgdata")
636 0 : }
637 :
638 0 : pub fn status(&self) -> EndpointStatus {
639 0 : let timeout = Duration::from_millis(300);
640 0 : let has_pidfile = self.pgdata().join("postmaster.pid").exists();
641 0 : let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
642 :
643 0 : match (has_pidfile, can_connect) {
644 0 : (true, true) => EndpointStatus::Running,
645 0 : (false, false) => EndpointStatus::Stopped,
646 0 : (true, false) => EndpointStatus::Crashed,
647 0 : (false, true) => EndpointStatus::RunningNoPidfile,
648 : }
649 0 : }
650 :
651 0 : fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
652 0 : let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
653 0 : let mut cmd = Command::new(&pg_ctl_path);
654 0 : cmd.args(
655 0 : [
656 0 : &[
657 0 : "-D",
658 0 : self.pgdata().to_str().unwrap(),
659 0 : "-w", //wait till pg_ctl actually does what was asked
660 0 : ],
661 0 : args,
662 0 : ]
663 0 : .concat(),
664 0 : )
665 0 : .env_clear()
666 0 : .env(
667 : "LD_LIBRARY_PATH",
668 0 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
669 : )
670 0 : .env(
671 : "DYLD_LIBRARY_PATH",
672 0 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
673 : );
674 :
675 : // Pass authentication token used for the connections to pageserver and safekeepers
676 0 : if let Some(token) = auth_token {
677 0 : cmd.env("NEON_AUTH_TOKEN", token);
678 0 : }
679 :
680 0 : let pg_ctl = cmd
681 0 : .output()
682 0 : .context(format!("{} failed", pg_ctl_path.display()))?;
683 0 : if !pg_ctl.status.success() {
684 0 : anyhow::bail!(
685 0 : "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
686 : pg_ctl.status,
687 0 : String::from_utf8_lossy(&pg_ctl.stdout),
688 0 : String::from_utf8_lossy(&pg_ctl.stderr),
689 : );
690 0 : }
691 :
692 0 : Ok(())
693 0 : }
694 :
695 0 : fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
696 : // TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
697 0 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
698 0 : let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
699 0 : let pid = nix::unistd::Pid::from_raw(pid as i32);
700 0 : if send_sigterm {
701 0 : kill(pid, Signal::SIGTERM).ok();
702 0 : }
703 0 : crate::background_process::wait_until_stopped("compute_ctl", pid)?;
704 0 : Ok(())
705 0 : }
706 :
707 0 : fn read_postgresql_conf(&self) -> Result<String> {
708 : // Slurp the endpoints/<endpoint id>/postgresql.conf file into
709 : // memory. We will include it in the spec file that we pass to
710 : // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
711 : // in the data directory.
712 0 : let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
713 0 : match std::fs::read(&postgresql_conf_path) {
714 0 : Ok(content) => Ok(String::from_utf8(content)?),
715 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
716 0 : Err(e) => Err(anyhow::Error::new(e).context(format!(
717 0 : "failed to read config file in {}",
718 0 : postgresql_conf_path.to_str().unwrap()
719 0 : ))),
720 : }
721 0 : }
722 :
723 : /// Map safekeepers ids to the actual connection strings.
724 0 : fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
725 0 : let mut safekeeper_connstrings = Vec::new();
726 0 : if self.mode == ComputeMode::Primary {
727 0 : for sk_id in sk_ids {
728 0 : let sk = self
729 0 : .env
730 0 : .safekeepers
731 0 : .iter()
732 0 : .find(|node| node.id == sk_id)
733 0 : .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
734 0 : safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
735 : }
736 0 : }
737 0 : Ok(safekeeper_connstrings)
738 0 : }
739 :
740 : /// Generate a JWT with the correct claims.
741 0 : pub fn generate_jwt(&self, scope: Option<ComputeClaimsScope>) -> Result<String> {
742 0 : self.env.generate_auth_token(&ComputeClaims {
743 0 : audience: match scope {
744 0 : Some(ComputeClaimsScope::Admin) => Some(vec![COMPUTE_AUDIENCE.to_owned()]),
745 0 : _ => None,
746 : },
747 0 : compute_id: match scope {
748 0 : Some(ComputeClaimsScope::Admin) => None,
749 0 : _ => Some(self.endpoint_id.clone()),
750 : },
751 0 : scope,
752 : })
753 0 : }
754 :
755 0 : pub async fn start(&self, args: EndpointStartArgs) -> Result<()> {
756 0 : if self.status() == EndpointStatus::Running {
757 0 : anyhow::bail!("The endpoint is already running");
758 0 : }
759 :
760 0 : let postgresql_conf = self.read_postgresql_conf()?;
761 :
762 : // We always start the compute node from scratch, so if the Postgres
763 : // data dir exists from a previous launch, remove it first.
764 0 : if self.pgdata().exists() {
765 0 : std::fs::remove_dir_all(self.pgdata())?;
766 0 : }
767 :
768 0 : let safekeeper_connstrings = self.build_safekeepers_connstrs(args.safekeepers)?;
769 :
770 : // check for file remote_extensions_spec.json
771 : // if it is present, read it and pass to compute_ctl
772 0 : let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
773 0 : let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
774 : let remote_extensions: Option<RemoteExtSpec>;
775 :
776 0 : if let Ok(spec_file) = remote_extensions_spec {
777 0 : remote_extensions = serde_json::from_reader(spec_file).ok();
778 0 : } else {
779 0 : remote_extensions = None;
780 0 : };
781 :
782 : // For the sake of backwards-compatibility, also fill in 'pageserver_connstring'
783 : //
784 : // XXX: I believe this is not really needed, except to make
785 : // test_forward_compatibility happy.
786 : //
787 : // Use a closure so that we can conviniently return None in the middle of the
788 : // loop.
789 0 : let pageserver_connstring: Option<String> = (|| {
790 0 : let num_shards = args.pageserver_conninfo.shard_count.count();
791 0 : let mut connstrings = Vec::new();
792 0 : for shard_no in 0..num_shards {
793 0 : let shard_index = ShardIndex {
794 0 : shard_count: args.pageserver_conninfo.shard_count,
795 0 : shard_number: ShardNumber(shard_no),
796 0 : };
797 0 : let shard = args
798 0 : .pageserver_conninfo
799 0 : .shards
800 0 : .get(&shard_index)
801 0 : .ok_or_else(|| {
802 0 : anyhow!(
803 0 : "shard {} not found in pageserver_connection_info",
804 : shard_index
805 : )
806 0 : })?;
807 0 : let pageserver = shard
808 0 : .pageservers
809 0 : .first()
810 0 : .ok_or(anyhow!("must have at least one pageserver"))?;
811 0 : if let Some(libpq_url) = &pageserver.libpq_url {
812 0 : connstrings.push(libpq_url.clone());
813 0 : } else {
814 0 : return Ok::<_, anyhow::Error>(None);
815 : }
816 : }
817 0 : Ok(Some(connstrings.join(",")))
818 0 : })()?;
819 :
820 : // Create config file
821 0 : let config = {
822 0 : let mut spec = ComputeSpec {
823 0 : skip_pg_catalog_updates: self.skip_pg_catalog_updates,
824 : format_version: 1.0,
825 0 : operation_uuid: None,
826 0 : features: self.features.clone(),
827 0 : swap_size_bytes: None,
828 0 : disk_quota_bytes: None,
829 0 : disable_lfc_resizing: None,
830 : cluster: Cluster {
831 0 : cluster_id: None, // project ID: not used
832 0 : name: None, // project name: not used
833 0 : state: None,
834 0 : roles: if args.create_test_user {
835 0 : vec![Role {
836 0 : name: PgIdent::from_str("test").unwrap(),
837 0 : encrypted_password: None,
838 0 : options: None,
839 0 : }]
840 : } else {
841 0 : Vec::new()
842 : },
843 0 : databases: if args.create_test_user {
844 0 : vec![Database {
845 0 : name: PgIdent::from_str("neondb").unwrap(),
846 0 : owner: PgIdent::from_str("test").unwrap(),
847 0 : options: None,
848 0 : restrict_conn: false,
849 0 : invalid: false,
850 0 : }]
851 : } else {
852 0 : Vec::new()
853 : },
854 0 : settings: None,
855 0 : postgresql_conf: Some(postgresql_conf.clone()),
856 : },
857 0 : delta_operations: None,
858 0 : tenant_id: Some(self.tenant_id),
859 0 : timeline_id: Some(self.timeline_id),
860 0 : project_id: None,
861 0 : branch_id: None,
862 0 : endpoint_id: Some(self.endpoint_id.clone()),
863 0 : mode: self.mode,
864 0 : pageserver_connection_info: Some(args.pageserver_conninfo.clone()),
865 0 : pageserver_connstring,
866 0 : safekeepers_generation: args.safekeepers_generation.map(|g| g.into_inner()),
867 0 : safekeeper_connstrings,
868 0 : storage_auth_token: args.auth_token.clone(),
869 0 : remote_extensions,
870 0 : pgbouncer_settings: None,
871 0 : shard_stripe_size: args.pageserver_conninfo.stripe_size, // redundant with pageserver_connection_info.stripe_size
872 0 : local_proxy_config: None,
873 0 : reconfigure_concurrency: self.reconfigure_concurrency,
874 0 : drop_subscriptions_before_start: self.drop_subscriptions_before_start,
875 0 : audit_log_level: ComputeAudit::Disabled,
876 0 : logs_export_host: None::<String>,
877 0 : endpoint_storage_addr: Some(args.endpoint_storage_addr),
878 0 : endpoint_storage_token: Some(args.endpoint_storage_token),
879 0 : autoprewarm: args.autoprewarm,
880 0 : offload_lfc_interval_seconds: args.offload_lfc_interval_seconds,
881 : suspend_timeout_seconds: -1, // Only used in neon_local.
882 0 : databricks_settings: None,
883 : };
884 :
885 : // this strange code is needed to support respec() in tests
886 0 : if self.cluster.is_some() {
887 0 : debug!("Cluster is already set in the endpoint spec, using it");
888 0 : spec.cluster = self.cluster.clone().unwrap();
889 :
890 0 : debug!("spec.cluster {:?}", spec.cluster);
891 :
892 : // fill missing fields again
893 0 : if args.create_test_user {
894 0 : spec.cluster.roles.push(Role {
895 0 : name: PgIdent::from_str("test").unwrap(),
896 0 : encrypted_password: None,
897 0 : options: None,
898 0 : });
899 0 : spec.cluster.databases.push(Database {
900 0 : name: PgIdent::from_str("neondb").unwrap(),
901 0 : owner: PgIdent::from_str("test").unwrap(),
902 0 : options: None,
903 0 : restrict_conn: false,
904 0 : invalid: false,
905 0 : });
906 0 : }
907 0 : spec.cluster.postgresql_conf = Some(postgresql_conf);
908 0 : }
909 :
910 0 : ComputeConfig {
911 0 : spec: Some(spec),
912 0 : compute_ctl_config: self.compute_ctl_config.clone(),
913 0 : }
914 : };
915 :
916 0 : let config_path = self.endpoint_path().join("config.json");
917 0 : std::fs::write(config_path, serde_json::to_string_pretty(&config)?)?;
918 :
919 : // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
920 0 : let logfile = std::fs::OpenOptions::new()
921 0 : .create(true)
922 0 : .append(true)
923 0 : .open(self.endpoint_path().join("compute.log"))?;
924 :
925 : // Launch compute_ctl
926 0 : let conn_str = self.connstr("cloud_admin", "postgres");
927 0 : println!("Starting postgres node at '{conn_str}'");
928 0 : if args.create_test_user {
929 0 : let conn_str = self.connstr("test", "neondb");
930 0 : println!("Also at '{conn_str}'");
931 0 : }
932 0 : let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
933 0 : cmd.args([
934 0 : "--external-http-port",
935 0 : &self.external_http_address.port().to_string(),
936 0 : ])
937 0 : .args([
938 0 : "--internal-http-port",
939 0 : &self.internal_http_address.port().to_string(),
940 0 : ])
941 0 : .args(["--pgdata", self.pgdata().to_str().unwrap()])
942 0 : .args(["--connstr", &conn_str])
943 0 : .arg("--config")
944 0 : .arg(self.endpoint_path().join("config.json").as_os_str())
945 0 : .args([
946 : "--pgbin",
947 0 : self.env
948 0 : .pg_bin_dir(self.pg_version)?
949 0 : .join("postgres")
950 0 : .to_str()
951 0 : .unwrap(),
952 : ])
953 : // TODO: It would be nice if we generated compute IDs with the same
954 : // algorithm as the real control plane.
955 0 : .args(["--compute-id", &self.endpoint_id])
956 0 : .stdin(std::process::Stdio::null())
957 0 : .stderr(logfile.try_clone()?)
958 0 : .stdout(logfile);
959 :
960 0 : if let Some(remote_ext_base_url) = args.remote_ext_base_url {
961 0 : cmd.args(["--remote-ext-base-url", &remote_ext_base_url]);
962 0 : }
963 :
964 0 : if args.dev {
965 0 : cmd.arg("--dev");
966 0 : }
967 :
968 0 : if let Some(privileged_role_name) = self.privileged_role_name.clone() {
969 0 : cmd.args(["--privileged-role-name", &privileged_role_name]);
970 0 : }
971 :
972 0 : let child = cmd.spawn()?;
973 : // set up a scopeguard to kill & wait for the child in case we panic or bail below
974 0 : let child = scopeguard::guard(child, |mut child| {
975 0 : println!("SIGKILL & wait the started process");
976 0 : (|| {
977 : // TODO: use another signal that can be caught by the child so it can clean up any children it spawned
978 0 : child.kill().context("SIGKILL child")?;
979 0 : child.wait().context("wait() for child process")?;
980 0 : anyhow::Ok(())
981 : })()
982 0 : .with_context(|| format!("scopeguard kill&wait child {child:?}"))
983 0 : .unwrap();
984 0 : });
985 :
986 : // Write down the pid so we can wait for it when we want to stop
987 : // TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
988 0 : let pid = child.id();
989 0 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
990 0 : std::fs::write(pidfile_path, pid.to_string())?;
991 :
992 : // Wait for it to start
993 : const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
994 0 : let start_at = Instant::now();
995 : loop {
996 0 : match self.get_status().await {
997 0 : Ok(state) => {
998 0 : match state.status {
999 : ComputeStatus::Init => {
1000 0 : let timeout = args.start_timeout;
1001 0 : if Instant::now().duration_since(start_at) > timeout {
1002 0 : bail!(
1003 0 : "compute startup timed out {:?}; still in Init state",
1004 : timeout
1005 : );
1006 0 : }
1007 : // keep retrying
1008 : }
1009 : ComputeStatus::Running => {
1010 : // All good!
1011 0 : break;
1012 : }
1013 : ComputeStatus::Failed => {
1014 0 : bail!(
1015 0 : "compute startup failed: {}",
1016 0 : state
1017 0 : .error
1018 0 : .as_deref()
1019 0 : .unwrap_or("<no error from compute_ctl>")
1020 : );
1021 : }
1022 : ComputeStatus::Empty
1023 : | ComputeStatus::ConfigurationPending
1024 : | ComputeStatus::Configuration
1025 : | ComputeStatus::TerminationPendingFast
1026 : | ComputeStatus::TerminationPendingImmediate
1027 : | ComputeStatus::Terminated
1028 : | ComputeStatus::RefreshConfigurationPending
1029 : | ComputeStatus::RefreshConfiguration => {
1030 0 : bail!("unexpected compute status: {:?}", state.status)
1031 : }
1032 : }
1033 : }
1034 0 : Err(e) => {
1035 0 : if Instant::now().duration_since(start_at) > args.start_timeout {
1036 0 : return Err(e).context(format!(
1037 0 : "timed out {:?} waiting to connect to compute_ctl HTTP",
1038 : args.start_timeout
1039 : ));
1040 0 : }
1041 : }
1042 : }
1043 0 : tokio::time::sleep(ATTEMPT_INTERVAL).await;
1044 : }
1045 :
1046 : // disarm the scopeguard, let the child outlive this function (and neon_local invoction)
1047 0 : drop(scopeguard::ScopeGuard::into_inner(child));
1048 :
1049 0 : Ok(())
1050 0 : }
1051 :
1052 : // Update the pageservers in the spec file of the endpoint. This is useful to test the spec refresh scenario.
1053 0 : pub async fn update_pageservers_in_config(
1054 0 : &self,
1055 0 : pageserver_conninfo: &PageserverConnectionInfo,
1056 0 : ) -> Result<()> {
1057 0 : let config_path = self.endpoint_path().join("config.json");
1058 0 : let mut config: ComputeConfig = {
1059 0 : let file = std::fs::File::open(&config_path)?;
1060 0 : serde_json::from_reader(file)?
1061 : };
1062 :
1063 0 : let mut spec = config.spec.unwrap();
1064 0 : spec.pageserver_connection_info = Some(pageserver_conninfo.clone());
1065 0 : config.spec = Some(spec);
1066 :
1067 0 : let file = std::fs::File::create(&config_path)?;
1068 0 : serde_json::to_writer_pretty(file, &config)?;
1069 :
1070 0 : Ok(())
1071 0 : }
1072 :
1073 : // Call the /status HTTP API
1074 0 : pub async fn get_status(&self) -> Result<ComputeStatusResponse> {
1075 0 : let client = reqwest::Client::new();
1076 :
1077 0 : let response = client
1078 0 : .request(
1079 0 : reqwest::Method::GET,
1080 0 : format!(
1081 0 : "http://{}:{}/status",
1082 0 : self.external_http_address.ip(),
1083 0 : self.external_http_address.port()
1084 : ),
1085 : )
1086 0 : .bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
1087 0 : .send()
1088 0 : .await?;
1089 :
1090 : // Interpret the response
1091 0 : let status = response.status();
1092 0 : if !(status.is_client_error() || status.is_server_error()) {
1093 0 : Ok(response.json().await?)
1094 : } else {
1095 : // reqwest does not export its error construction utility functions, so let's craft the message ourselves
1096 0 : let url = response.url().to_owned();
1097 0 : let msg = match response.text().await {
1098 0 : Ok(err_body) => format!("Error: {err_body}"),
1099 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
1100 : };
1101 0 : Err(anyhow::anyhow!(msg))
1102 : }
1103 0 : }
1104 :
1105 0 : pub async fn reconfigure(
1106 0 : &self,
1107 0 : pageserver_conninfo: Option<&PageserverConnectionInfo>,
1108 0 : safekeepers: Option<Vec<NodeId>>,
1109 0 : safekeeper_generation: Option<SafekeeperGeneration>,
1110 0 : ) -> Result<()> {
1111 0 : let (mut spec, compute_ctl_config) = {
1112 0 : let config_path = self.endpoint_path().join("config.json");
1113 0 : let file = std::fs::File::open(config_path)?;
1114 0 : let config: ComputeConfig = serde_json::from_reader(file)?;
1115 :
1116 0 : (config.spec.unwrap(), config.compute_ctl_config)
1117 : };
1118 :
1119 0 : let postgresql_conf = self.read_postgresql_conf()?;
1120 0 : spec.cluster.postgresql_conf = Some(postgresql_conf);
1121 :
1122 0 : if let Some(pageserver_conninfo) = pageserver_conninfo {
1123 : // If pageservers are provided, we need to ensure that they are not empty.
1124 : // This is a requirement for the compute_ctl configuration.
1125 0 : anyhow::ensure!(
1126 0 : !pageserver_conninfo.shards.is_empty(),
1127 0 : "no pageservers provided"
1128 : );
1129 0 : spec.pageserver_connection_info = Some(pageserver_conninfo.clone());
1130 0 : spec.shard_stripe_size = pageserver_conninfo.stripe_size;
1131 0 : }
1132 :
1133 : // If safekeepers are not specified, don't change them.
1134 0 : if let Some(safekeepers) = safekeepers {
1135 0 : let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
1136 0 : spec.safekeeper_connstrings = safekeeper_connstrings;
1137 0 : if let Some(g) = safekeeper_generation {
1138 0 : spec.safekeepers_generation = Some(g.into_inner());
1139 0 : }
1140 0 : }
1141 :
1142 0 : let client = reqwest::Client::builder()
1143 0 : .timeout(Duration::from_secs(120))
1144 0 : .build()
1145 0 : .unwrap();
1146 0 : let response = client
1147 0 : .post(format!(
1148 0 : "http://{}:{}/configure",
1149 0 : self.external_http_address.ip(),
1150 0 : self.external_http_address.port()
1151 : ))
1152 0 : .header(CONTENT_TYPE.as_str(), "application/json")
1153 0 : .bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
1154 0 : .body(
1155 0 : serde_json::to_string(&ConfigurationRequest {
1156 0 : spec,
1157 0 : compute_ctl_config,
1158 0 : })
1159 0 : .unwrap(),
1160 : )
1161 0 : .send()
1162 0 : .await?;
1163 :
1164 0 : let status = response.status();
1165 0 : if !(status.is_client_error() || status.is_server_error()) {
1166 0 : Ok(())
1167 : } else {
1168 0 : let url = response.url().to_owned();
1169 0 : let msg = match response.text().await {
1170 0 : Ok(err_body) => format!("Error: {err_body}"),
1171 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
1172 : };
1173 0 : Err(anyhow::anyhow!(msg))
1174 : }
1175 0 : }
1176 :
1177 0 : pub async fn reconfigure_pageservers(
1178 0 : &self,
1179 0 : pageservers: &PageserverConnectionInfo,
1180 0 : ) -> Result<()> {
1181 0 : self.reconfigure(Some(pageservers), None, None).await
1182 0 : }
1183 :
1184 0 : pub async fn reconfigure_safekeepers(
1185 0 : &self,
1186 0 : safekeepers: Vec<NodeId>,
1187 0 : generation: SafekeeperGeneration,
1188 0 : ) -> Result<()> {
1189 0 : self.reconfigure(None, Some(safekeepers), Some(generation))
1190 0 : .await
1191 0 : }
1192 :
1193 0 : pub async fn stop(
1194 0 : &self,
1195 0 : mode: EndpointTerminateMode,
1196 0 : destroy: bool,
1197 0 : ) -> Result<TerminateResponse> {
1198 : // pg_ctl stop is fast but doesn't allow us to collect LSN. /terminate is
1199 : // slow, and test runs time out. Solution: special mode "immediate-terminate"
1200 : // which uses /terminate
1201 0 : let response = if let EndpointTerminateMode::ImmediateTerminate = mode {
1202 0 : let ip = self.external_http_address.ip();
1203 0 : let port = self.external_http_address.port();
1204 0 : let url = format!("http://{ip}:{port}/terminate?mode=immediate");
1205 0 : let token = self.generate_jwt(Some(ComputeClaimsScope::Admin))?;
1206 0 : let request = reqwest::Client::new().post(url).bearer_auth(token);
1207 0 : let response = request.send().await.context("/terminate")?;
1208 0 : let text = response.text().await.context("/terminate result")?;
1209 0 : serde_json::from_str(&text).with_context(|| format!("deserializing {text}"))?
1210 : } else {
1211 0 : self.pg_ctl(&["-m", &mode.to_string(), "stop"], &None)?;
1212 0 : TerminateResponse { lsn: None }
1213 : };
1214 :
1215 : // Also wait for the compute_ctl process to die. It might have some
1216 : // cleanup work to do after postgres stops, like syncing safekeepers,
1217 : // etc.
1218 : //
1219 : // If destroying or stop mode is immediate, send it SIGTERM before
1220 : // waiting. Sometimes we do *not* want this cleanup: tests intentionally
1221 : // do stop when majority of safekeepers is down, so sync-safekeepers
1222 : // would hang otherwise. This could be a separate flag though.
1223 0 : let send_sigterm = destroy || !matches!(mode, EndpointTerminateMode::Fast);
1224 0 : self.wait_for_compute_ctl_to_exit(send_sigterm)?;
1225 0 : if destroy {
1226 0 : println!(
1227 0 : "Destroying postgres data directory '{}'",
1228 0 : self.pgdata().to_str().unwrap()
1229 : );
1230 0 : std::fs::remove_dir_all(self.endpoint_path())?;
1231 0 : }
1232 0 : Ok(response)
1233 0 : }
1234 :
1235 0 : pub async fn refresh_configuration(&self) -> Result<()> {
1236 0 : let client = reqwest::Client::builder()
1237 0 : .timeout(Duration::from_secs(30))
1238 0 : .build()
1239 0 : .unwrap();
1240 0 : let response = client
1241 0 : .post(format!(
1242 0 : "http://{}:{}/refresh_configuration",
1243 0 : self.internal_http_address.ip(),
1244 0 : self.internal_http_address.port()
1245 0 : ))
1246 0 : .send()
1247 0 : .await?;
1248 :
1249 0 : let status = response.status();
1250 0 : if !(status.is_client_error() || status.is_server_error()) {
1251 0 : Ok(())
1252 : } else {
1253 0 : let url = response.url().to_owned();
1254 0 : let msg = match response.text().await {
1255 0 : Ok(err_body) => format!("Error: {err_body}"),
1256 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
1257 : };
1258 0 : Err(anyhow::anyhow!(msg))
1259 : }
1260 0 : }
1261 :
1262 0 : pub fn connstr(&self, user: &str, db_name: &str) -> String {
1263 0 : format!(
1264 0 : "postgresql://{}@{}:{}/{}",
1265 : user,
1266 0 : self.pg_address.ip(),
1267 0 : self.pg_address.port(),
1268 : db_name
1269 : )
1270 0 : }
1271 : }
1272 :
1273 : /// If caller is telling us what pageserver to use, this is not a tenant which is
1274 : /// fully managed by storage controller, therefore not sharded.
1275 0 : pub fn local_pageserver_conf_to_conn_info(
1276 0 : conf: &crate::local_env::PageServerConf,
1277 0 : ) -> Result<PageserverConnectionInfo> {
1278 0 : let libpq_url = {
1279 0 : let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
1280 0 : let port = port.unwrap_or(5432);
1281 0 : Some(format!("postgres://no_user@{host}:{port}"))
1282 : };
1283 0 : let grpc_url = if let Some(grpc_addr) = &conf.listen_grpc_addr {
1284 0 : let (host, port) = parse_host_port(grpc_addr)?;
1285 0 : let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
1286 0 : Some(format!("grpc://no_user@{host}:{port}"))
1287 : } else {
1288 0 : None
1289 : };
1290 0 : let ps_conninfo = PageserverShardConnectionInfo {
1291 0 : id: Some(conf.id),
1292 0 : libpq_url,
1293 0 : grpc_url,
1294 0 : };
1295 :
1296 0 : let shard_info = PageserverShardInfo {
1297 0 : pageservers: vec![ps_conninfo],
1298 0 : };
1299 :
1300 0 : let shards: HashMap<_, _> = vec![(ShardIndex::unsharded(), shard_info)]
1301 0 : .into_iter()
1302 0 : .collect();
1303 0 : Ok(PageserverConnectionInfo {
1304 0 : shard_count: ShardCount::unsharded(),
1305 0 : stripe_size: None,
1306 0 : shards,
1307 0 : prefer_protocol: PageserverProtocol::default(),
1308 0 : })
1309 0 : }
1310 :
1311 0 : pub fn tenant_locate_response_to_conn_info(
1312 0 : response: &pageserver_api::controller_api::TenantLocateResponse,
1313 0 : ) -> Result<PageserverConnectionInfo> {
1314 0 : let mut shards = HashMap::new();
1315 0 : for shard in response.shards.iter() {
1316 0 : tracing::info!("parsing {}", shard.listen_pg_addr);
1317 0 : let libpq_url = {
1318 0 : let host = &shard.listen_pg_addr;
1319 0 : let port = shard.listen_pg_port;
1320 0 : Some(format!("postgres://no_user@{host}:{port}"))
1321 : };
1322 0 : let grpc_url = if let Some(grpc_addr) = &shard.listen_grpc_addr {
1323 0 : let host = grpc_addr;
1324 0 : let port = shard.listen_grpc_port.expect("no gRPC port");
1325 0 : Some(format!("grpc://no_user@{host}:{port}"))
1326 : } else {
1327 0 : None
1328 : };
1329 :
1330 0 : let shard_info = PageserverShardInfo {
1331 0 : pageservers: vec![PageserverShardConnectionInfo {
1332 0 : id: Some(shard.node_id),
1333 0 : libpq_url,
1334 0 : grpc_url,
1335 0 : }],
1336 0 : };
1337 :
1338 0 : shards.insert(shard.shard_id.to_index(), shard_info);
1339 : }
1340 :
1341 0 : let stripe_size = if response.shard_params.count.is_unsharded() {
1342 0 : None
1343 : } else {
1344 0 : Some(response.shard_params.stripe_size)
1345 : };
1346 0 : Ok(PageserverConnectionInfo {
1347 0 : shard_count: response.shard_params.count,
1348 0 : stripe_size,
1349 0 : shards,
1350 0 : prefer_protocol: PageserverProtocol::default(),
1351 0 : })
1352 0 : }
|