Line data Source code
1 : //! Code to manage compute endpoints
2 : //!
3 : //! In the local test environment, the data for each endpoint is stored in
4 : //!
5 : //! ```text
6 : //! .neon/endpoints/<endpoint id>
7 : //! ```
8 : //!
9 : //! Some basic information about the endpoint, like the tenant and timeline IDs,
10 : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
11 : //! when the endpoint is created, and doesn't change afterwards.
12 : //!
13 : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
14 : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
15 : //! the basebackup from the pageserver to initialize the data directory, and
16 : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
17 : //! until it exits.
18 : //!
19 : //! When an endpoint is created, a `postgresql.conf` file is also created in
20 : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
21 : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
22 : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
23 : //! copy of it in the data directory.
24 : //!
25 : //! Directory contents:
26 : //!
27 : //! ```text
28 : //! .neon/endpoints/main/
29 : //! compute.log - log output of `compute_ctl` and `postgres`
30 : //! endpoint.json - serialized `EndpointConf` struct
31 : //! postgresql.conf - postgresql settings
32 : //! config.json - passed to `compute_ctl`
33 : //! pgdata/
34 : //! postgresql.conf - copy of postgresql.conf created by `compute_ctl`
35 : //! zenith.signal
36 : //! <other PostgreSQL files>
37 : //! ```
38 : //!
39 : use std::collections::BTreeMap;
40 : use std::fmt::Display;
41 : use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
42 : use std::path::PathBuf;
43 : use std::process::Command;
44 : use std::str::FromStr;
45 : use std::sync::Arc;
46 : use std::time::{Duration, Instant};
47 :
48 : use anyhow::{Context, Result, anyhow, bail};
49 : use base64::Engine;
50 : use base64::prelude::BASE64_URL_SAFE_NO_PAD;
51 : use compute_api::requests::{
52 : COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
53 : };
54 : use compute_api::responses::{
55 : ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TerminateResponse,
56 : TlsConfig,
57 : };
58 : use compute_api::spec::{
59 : Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PageserverProtocol,
60 : PgIdent, RemoteExtSpec, Role,
61 : };
62 : use jsonwebtoken::jwk::{
63 : AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
64 : OctetKeyPairParameters, OctetKeyPairType, PublicKeyUse,
65 : };
66 : use nix::sys::signal::{Signal, kill};
67 : use pageserver_api::shard::ShardStripeSize;
68 : use pem::Pem;
69 : use reqwest::header::CONTENT_TYPE;
70 : use safekeeper_api::PgMajorVersion;
71 : use safekeeper_api::membership::SafekeeperGeneration;
72 : use serde::{Deserialize, Serialize};
73 : use sha2::{Digest, Sha256};
74 : use spki::der::Decode;
75 : use spki::{SubjectPublicKeyInfo, SubjectPublicKeyInfoRef};
76 : use tracing::debug;
77 : use url::Host;
78 : use utils::id::{NodeId, TenantId, TimelineId};
79 :
80 : use crate::local_env::LocalEnv;
81 : use crate::postgresql_conf::PostgresConf;
82 :
83 : // contents of a endpoint.json file
84 0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
85 : pub struct EndpointConf {
86 : endpoint_id: String,
87 : tenant_id: TenantId,
88 : timeline_id: TimelineId,
89 : mode: ComputeMode,
90 : pg_port: u16,
91 : external_http_port: u16,
92 : internal_http_port: u16,
93 : pg_version: PgMajorVersion,
94 : grpc: bool,
95 : skip_pg_catalog_updates: bool,
96 : reconfigure_concurrency: usize,
97 : drop_subscriptions_before_start: bool,
98 : features: Vec<ComputeFeature>,
99 : cluster: Option<Cluster>,
100 : compute_ctl_config: ComputeCtlConfig,
101 : }
102 :
103 : //
104 : // ComputeControlPlane
105 : //
106 : pub struct ComputeControlPlane {
107 : base_port: u16,
108 :
109 : // endpoint ID is the key
110 : pub endpoints: BTreeMap<String, Arc<Endpoint>>,
111 :
112 : env: LocalEnv,
113 : }
114 :
115 : impl ComputeControlPlane {
116 : // Load current endpoints from the endpoints/ subdirectories
117 0 : pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
118 0 : let mut endpoints = BTreeMap::default();
119 0 : for endpoint_dir in std::fs::read_dir(env.endpoints_path())
120 0 : .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
121 : {
122 0 : let ep_res = Endpoint::from_dir_entry(endpoint_dir?, &env);
123 0 : let ep = match ep_res {
124 0 : Ok(ep) => ep,
125 0 : Err(e) => match e.downcast::<std::io::Error>() {
126 0 : Ok(e) => {
127 : // A parallel task could delete an endpoint while we have just scanned the directory
128 0 : if e.kind() == std::io::ErrorKind::NotFound {
129 0 : continue;
130 : } else {
131 0 : Err(e)?
132 : }
133 : }
134 0 : Err(e) => Err(e)?,
135 : },
136 : };
137 0 : endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
138 : }
139 :
140 0 : Ok(ComputeControlPlane {
141 0 : base_port: 55431,
142 0 : endpoints,
143 0 : env,
144 0 : })
145 0 : }
146 :
147 0 : fn get_port(&mut self) -> u16 {
148 0 : 1 + self
149 0 : .endpoints
150 0 : .values()
151 0 : .map(|ep| std::cmp::max(ep.pg_address.port(), ep.external_http_address.port()))
152 0 : .max()
153 0 : .unwrap_or(self.base_port)
154 0 : }
155 :
156 : /// Create a JSON Web Key Set. This ideally matches the way we create a JWKS
157 : /// from the production control plane.
158 0 : fn create_jwks_from_pem(pem: &Pem) -> Result<JwkSet> {
159 0 : let spki: SubjectPublicKeyInfoRef = SubjectPublicKeyInfo::from_der(pem.contents())?;
160 0 : let public_key = spki.subject_public_key.raw_bytes();
161 :
162 0 : let mut hasher = Sha256::new();
163 0 : hasher.update(public_key);
164 0 : let key_hash = hasher.finalize();
165 :
166 0 : Ok(JwkSet {
167 0 : keys: vec![Jwk {
168 0 : common: CommonParameters {
169 0 : public_key_use: Some(PublicKeyUse::Signature),
170 0 : key_operations: Some(vec![KeyOperations::Verify]),
171 0 : key_algorithm: Some(KeyAlgorithm::EdDSA),
172 0 : key_id: Some(BASE64_URL_SAFE_NO_PAD.encode(key_hash)),
173 0 : x509_url: None::<String>,
174 0 : x509_chain: None::<Vec<String>>,
175 0 : x509_sha1_fingerprint: None::<String>,
176 0 : x509_sha256_fingerprint: None::<String>,
177 0 : },
178 0 : algorithm: AlgorithmParameters::OctetKeyPair(OctetKeyPairParameters {
179 0 : key_type: OctetKeyPairType::OctetKeyPair,
180 0 : curve: EllipticCurve::Ed25519,
181 0 : x: BASE64_URL_SAFE_NO_PAD.encode(public_key),
182 0 : }),
183 0 : }],
184 0 : })
185 0 : }
186 :
187 : #[allow(clippy::too_many_arguments)]
188 0 : pub fn new_endpoint(
189 0 : &mut self,
190 0 : endpoint_id: &str,
191 0 : tenant_id: TenantId,
192 0 : timeline_id: TimelineId,
193 0 : pg_port: Option<u16>,
194 0 : external_http_port: Option<u16>,
195 0 : internal_http_port: Option<u16>,
196 0 : pg_version: PgMajorVersion,
197 0 : mode: ComputeMode,
198 0 : grpc: bool,
199 0 : skip_pg_catalog_updates: bool,
200 0 : drop_subscriptions_before_start: bool,
201 0 : ) -> Result<Arc<Endpoint>> {
202 0 : let pg_port = pg_port.unwrap_or_else(|| self.get_port());
203 0 : let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
204 0 : let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1);
205 0 : let compute_ctl_config = ComputeCtlConfig {
206 0 : jwks: Self::create_jwks_from_pem(&self.env.read_public_key()?)?,
207 0 : tls: None::<TlsConfig>,
208 : };
209 0 : let ep = Arc::new(Endpoint {
210 0 : endpoint_id: endpoint_id.to_owned(),
211 0 : pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), pg_port),
212 0 : external_http_address: SocketAddr::new(
213 0 : IpAddr::from(Ipv4Addr::UNSPECIFIED),
214 0 : external_http_port,
215 0 : ),
216 0 : internal_http_address: SocketAddr::new(
217 0 : IpAddr::from(Ipv4Addr::LOCALHOST),
218 0 : internal_http_port,
219 0 : ),
220 0 : env: self.env.clone(),
221 0 : timeline_id,
222 0 : mode,
223 0 : tenant_id,
224 0 : pg_version,
225 0 : // We don't setup roles and databases in the spec locally, so we don't need to
226 0 : // do catalog updates. Catalog updates also include check availability
227 0 : // data creation. Yet, we have tests that check that size and db dump
228 0 : // before and after start are the same. So, skip catalog updates,
229 0 : // with this we basically test a case of waking up an idle compute, where
230 0 : // we also skip catalog updates in the cloud.
231 0 : skip_pg_catalog_updates,
232 0 : drop_subscriptions_before_start,
233 0 : grpc,
234 0 : reconfigure_concurrency: 1,
235 0 : features: vec![],
236 0 : cluster: None,
237 0 : compute_ctl_config: compute_ctl_config.clone(),
238 0 : });
239 :
240 0 : ep.create_endpoint_dir()?;
241 0 : std::fs::write(
242 0 : ep.endpoint_path().join("endpoint.json"),
243 0 : serde_json::to_string_pretty(&EndpointConf {
244 0 : endpoint_id: endpoint_id.to_string(),
245 0 : tenant_id,
246 0 : timeline_id,
247 0 : mode,
248 0 : external_http_port,
249 0 : internal_http_port,
250 0 : pg_port,
251 0 : pg_version,
252 0 : grpc,
253 0 : skip_pg_catalog_updates,
254 0 : drop_subscriptions_before_start,
255 0 : reconfigure_concurrency: 1,
256 0 : features: vec![],
257 0 : cluster: None,
258 0 : compute_ctl_config,
259 0 : })?,
260 0 : )?;
261 0 : std::fs::write(
262 0 : ep.endpoint_path().join("postgresql.conf"),
263 0 : ep.setup_pg_conf()?.to_string(),
264 0 : )?;
265 :
266 0 : self.endpoints
267 0 : .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
268 :
269 0 : Ok(ep)
270 0 : }
271 :
272 0 : pub fn check_conflicting_endpoints(
273 0 : &self,
274 0 : mode: ComputeMode,
275 0 : tenant_id: TenantId,
276 0 : timeline_id: TimelineId,
277 0 : ) -> Result<()> {
278 0 : if matches!(mode, ComputeMode::Primary) {
279 : // this check is not complete, as you could have a concurrent attempt at
280 : // creating another primary, both reading the state before checking it here,
281 : // but it's better than nothing.
282 0 : let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
283 0 : v.tenant_id == tenant_id
284 0 : && v.timeline_id == timeline_id
285 0 : && v.mode == mode
286 0 : && v.status() != EndpointStatus::Stopped
287 0 : });
288 :
289 0 : if let Some((key, _)) = duplicates.next() {
290 0 : bail!(
291 0 : "attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported."
292 : );
293 0 : }
294 0 : }
295 0 : Ok(())
296 0 : }
297 : }
298 :
299 : ///////////////////////////////////////////////////////////////////////////////
300 :
301 : pub struct Endpoint {
302 : /// used as the directory name
303 : endpoint_id: String,
304 : pub tenant_id: TenantId,
305 : pub timeline_id: TimelineId,
306 : pub mode: ComputeMode,
307 : /// If true, the endpoint should use gRPC to communicate with Pageservers.
308 : pub grpc: bool,
309 :
310 : // port and address of the Postgres server and `compute_ctl`'s HTTP APIs
311 : pub pg_address: SocketAddr,
312 : pub external_http_address: SocketAddr,
313 : pub internal_http_address: SocketAddr,
314 :
315 : // postgres major version in the format: 14, 15, etc.
316 : pg_version: PgMajorVersion,
317 :
318 : // These are not part of the endpoint as such, but the environment
319 : // the endpoint runs in.
320 : pub env: LocalEnv,
321 :
322 : // Optimizations
323 : skip_pg_catalog_updates: bool,
324 :
325 : drop_subscriptions_before_start: bool,
326 : reconfigure_concurrency: usize,
327 : // Feature flags
328 : features: Vec<ComputeFeature>,
329 : // Cluster settings
330 : cluster: Option<Cluster>,
331 :
332 : /// The compute_ctl config for the endpoint's compute.
333 : compute_ctl_config: ComputeCtlConfig,
334 : }
335 :
336 : #[derive(PartialEq, Eq)]
337 : pub enum EndpointStatus {
338 : Running,
339 : Stopped,
340 : Crashed,
341 : RunningNoPidfile,
342 : }
343 :
344 : impl Display for EndpointStatus {
345 0 : fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
346 0 : writer.write_str(match self {
347 0 : Self::Running => "running",
348 0 : Self::Stopped => "stopped",
349 0 : Self::Crashed => "crashed",
350 0 : Self::RunningNoPidfile => "running, no pidfile",
351 : })
352 0 : }
353 : }
354 :
355 : #[derive(Default, Clone, Copy, clap::ValueEnum)]
356 : pub enum EndpointTerminateMode {
357 : #[default]
358 : /// Use pg_ctl stop -m fast
359 : Fast,
360 : /// Use pg_ctl stop -m immediate
361 : Immediate,
362 : /// Use /terminate?mode=immediate
363 : ImmediateTerminate,
364 : }
365 :
366 : impl std::fmt::Display for EndpointTerminateMode {
367 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368 0 : f.write_str(match &self {
369 0 : EndpointTerminateMode::Fast => "fast",
370 0 : EndpointTerminateMode::Immediate => "immediate",
371 0 : EndpointTerminateMode::ImmediateTerminate => "immediate-terminate",
372 : })
373 0 : }
374 : }
375 :
376 : pub struct EndpointStartArgs {
377 : pub auth_token: Option<String>,
378 : pub endpoint_storage_token: String,
379 : pub endpoint_storage_addr: String,
380 : pub safekeepers_generation: Option<SafekeeperGeneration>,
381 : pub safekeepers: Vec<NodeId>,
382 : pub pageservers: Vec<(PageserverProtocol, Host, u16)>,
383 : pub remote_ext_base_url: Option<String>,
384 : pub shard_stripe_size: usize,
385 : pub create_test_user: bool,
386 : pub start_timeout: Duration,
387 : pub autoprewarm: bool,
388 : pub offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
389 : pub dev: bool,
390 : }
391 :
392 : impl Endpoint {
393 0 : fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
394 0 : if !entry.file_type()?.is_dir() {
395 0 : anyhow::bail!(
396 0 : "Endpoint::from_dir_entry failed: '{}' is not a directory",
397 0 : entry.path().display()
398 : );
399 0 : }
400 :
401 : // parse data directory name
402 0 : let fname = entry.file_name();
403 0 : let endpoint_id = fname.to_str().unwrap().to_string();
404 :
405 : // Read the endpoint.json file
406 0 : let conf: EndpointConf =
407 0 : serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
408 :
409 0 : debug!("serialized endpoint conf: {:?}", conf);
410 :
411 0 : Ok(Endpoint {
412 0 : pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
413 0 : external_http_address: SocketAddr::new(
414 0 : IpAddr::from(Ipv4Addr::UNSPECIFIED),
415 0 : conf.external_http_port,
416 0 : ),
417 0 : internal_http_address: SocketAddr::new(
418 0 : IpAddr::from(Ipv4Addr::LOCALHOST),
419 0 : conf.internal_http_port,
420 0 : ),
421 0 : endpoint_id,
422 0 : env: env.clone(),
423 0 : timeline_id: conf.timeline_id,
424 0 : mode: conf.mode,
425 0 : tenant_id: conf.tenant_id,
426 0 : pg_version: conf.pg_version,
427 0 : grpc: conf.grpc,
428 0 : skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
429 0 : reconfigure_concurrency: conf.reconfigure_concurrency,
430 0 : drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
431 0 : features: conf.features,
432 0 : cluster: conf.cluster,
433 0 : compute_ctl_config: conf.compute_ctl_config,
434 0 : })
435 0 : }
436 :
437 0 : fn create_endpoint_dir(&self) -> Result<()> {
438 0 : std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
439 0 : format!(
440 0 : "could not create endpoint directory {}",
441 0 : self.endpoint_path().display()
442 : )
443 0 : })
444 0 : }
445 :
446 : // Generate postgresql.conf with default configuration
447 0 : fn setup_pg_conf(&self) -> Result<PostgresConf> {
448 0 : let mut conf = PostgresConf::new();
449 0 : conf.append("max_wal_senders", "10");
450 0 : conf.append("wal_log_hints", "off");
451 0 : conf.append("max_replication_slots", "10");
452 0 : conf.append("hot_standby", "on");
453 : // Set to 1MB to both exercise getPage requests/LFC, and still have enough room for
454 : // Postgres to operate. Everything smaller might be not enough for Postgres under load,
455 : // and can cause errors like 'no unpinned buffers available', see
456 : // <https://github.com/neondatabase/neon/issues/9956>
457 0 : conf.append("shared_buffers", "1MB");
458 : // Postgres defaults to effective_io_concurrency=1, which does not exercise the pageserver's
459 : // batching logic. Set this to 2 so that we exercise the code a bit without letting
460 : // individual tests do a lot of concurrent work on underpowered test machines
461 0 : conf.append("effective_io_concurrency", "2");
462 0 : conf.append("fsync", "off");
463 0 : conf.append("max_connections", "100");
464 0 : conf.append("wal_level", "logical");
465 : // wal_sender_timeout is the maximum time to wait for WAL replication.
466 : // It also defines how often the walreciever will send a feedback message to the wal sender.
467 0 : conf.append("wal_sender_timeout", "5s");
468 0 : conf.append("listen_addresses", &self.pg_address.ip().to_string());
469 0 : conf.append("port", &self.pg_address.port().to_string());
470 0 : conf.append("wal_keep_size", "0");
471 : // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
472 0 : conf.append("restart_after_crash", "off");
473 :
474 : // Load the 'neon' extension
475 0 : conf.append("shared_preload_libraries", "neon");
476 :
477 0 : conf.append_line("");
478 : // Replication-related configurations, such as WAL sending
479 0 : match &self.mode {
480 : ComputeMode::Primary => {
481 : // Configure backpressure
482 : // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
483 : // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
484 : // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
485 : // Actually latency should be much smaller (better if < 1sec). But we assume that recently
486 : // updates pages are not requested from pageserver.
487 : // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
488 : // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
489 : // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
490 : // recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
491 : // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
492 : // To be able to restore database in case of pageserver node crash, safekeeper should not
493 : // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
494 : // (if they are not able to upload WAL to S3).
495 0 : conf.append("max_replication_write_lag", "15MB");
496 0 : conf.append("max_replication_flush_lag", "10GB");
497 :
498 0 : if !self.env.safekeepers.is_empty() {
499 : // Configure Postgres to connect to the safekeepers
500 0 : conf.append("synchronous_standby_names", "walproposer");
501 :
502 0 : let safekeepers = self
503 0 : .env
504 0 : .safekeepers
505 0 : .iter()
506 0 : .map(|sk| format!("localhost:{}", sk.get_compute_port()))
507 0 : .collect::<Vec<String>>()
508 0 : .join(",");
509 0 : conf.append("neon.safekeepers", &safekeepers);
510 0 : } else {
511 0 : // We only use setup without safekeepers for tests,
512 0 : // and don't care about data durability on pageserver,
513 0 : // so set more relaxed synchronous_commit.
514 0 : conf.append("synchronous_commit", "remote_write");
515 0 :
516 0 : // Configure the node to stream WAL directly to the pageserver
517 0 : // This isn't really a supported configuration, but can be useful for
518 0 : // testing.
519 0 : conf.append("synchronous_standby_names", "pageserver");
520 0 : }
521 : }
522 0 : ComputeMode::Static(lsn) => {
523 0 : conf.append("recovery_target_lsn", &lsn.to_string());
524 0 : }
525 : ComputeMode::Replica => {
526 0 : assert!(!self.env.safekeepers.is_empty());
527 :
528 : // TODO: use future host field from safekeeper spec
529 : // Pass the list of safekeepers to the replica so that it can connect to any of them,
530 : // whichever is available.
531 0 : let sk_ports = self
532 0 : .env
533 0 : .safekeepers
534 0 : .iter()
535 0 : .map(|x| x.get_compute_port().to_string())
536 0 : .collect::<Vec<_>>()
537 0 : .join(",");
538 0 : let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
539 :
540 0 : let connstr = format!(
541 0 : "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
542 : sk_hosts,
543 : sk_ports,
544 0 : &self.timeline_id.to_string(),
545 0 : &self.tenant_id.to_string(),
546 : );
547 :
548 0 : let slot_name = format!("repl_{}_", self.timeline_id);
549 0 : conf.append("primary_conninfo", connstr.as_str());
550 0 : conf.append("primary_slot_name", slot_name.as_str());
551 0 : conf.append("hot_standby", "on");
552 : // prefetching of blocks referenced in WAL doesn't make sense for us
553 : // Neon hot standby ignores pages that are not in the shared_buffers
554 0 : if self.pg_version >= PgMajorVersion::PG15 {
555 0 : conf.append("recovery_prefetch", "off");
556 0 : }
557 : }
558 : }
559 :
560 0 : Ok(conf)
561 0 : }
562 :
563 0 : pub fn endpoint_path(&self) -> PathBuf {
564 0 : self.env.endpoints_path().join(&self.endpoint_id)
565 0 : }
566 :
567 0 : pub fn pgdata(&self) -> PathBuf {
568 0 : self.endpoint_path().join("pgdata")
569 0 : }
570 :
571 0 : pub fn status(&self) -> EndpointStatus {
572 0 : let timeout = Duration::from_millis(300);
573 0 : let has_pidfile = self.pgdata().join("postmaster.pid").exists();
574 0 : let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
575 :
576 0 : match (has_pidfile, can_connect) {
577 0 : (true, true) => EndpointStatus::Running,
578 0 : (false, false) => EndpointStatus::Stopped,
579 0 : (true, false) => EndpointStatus::Crashed,
580 0 : (false, true) => EndpointStatus::RunningNoPidfile,
581 : }
582 0 : }
583 :
584 0 : fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
585 0 : let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
586 0 : let mut cmd = Command::new(&pg_ctl_path);
587 0 : cmd.args(
588 0 : [
589 0 : &[
590 0 : "-D",
591 0 : self.pgdata().to_str().unwrap(),
592 0 : "-w", //wait till pg_ctl actually does what was asked
593 0 : ],
594 0 : args,
595 0 : ]
596 0 : .concat(),
597 0 : )
598 0 : .env_clear()
599 0 : .env(
600 : "LD_LIBRARY_PATH",
601 0 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
602 : )
603 0 : .env(
604 : "DYLD_LIBRARY_PATH",
605 0 : self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
606 : );
607 :
608 : // Pass authentication token used for the connections to pageserver and safekeepers
609 0 : if let Some(token) = auth_token {
610 0 : cmd.env("NEON_AUTH_TOKEN", token);
611 0 : }
612 :
613 0 : let pg_ctl = cmd
614 0 : .output()
615 0 : .context(format!("{} failed", pg_ctl_path.display()))?;
616 0 : if !pg_ctl.status.success() {
617 0 : anyhow::bail!(
618 0 : "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
619 : pg_ctl.status,
620 0 : String::from_utf8_lossy(&pg_ctl.stdout),
621 0 : String::from_utf8_lossy(&pg_ctl.stderr),
622 : );
623 0 : }
624 :
625 0 : Ok(())
626 0 : }
627 :
628 0 : fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
629 : // TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
630 0 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
631 0 : let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
632 0 : let pid = nix::unistd::Pid::from_raw(pid as i32);
633 0 : if send_sigterm {
634 0 : kill(pid, Signal::SIGTERM).ok();
635 0 : }
636 0 : crate::background_process::wait_until_stopped("compute_ctl", pid)?;
637 0 : Ok(())
638 0 : }
639 :
640 0 : fn read_postgresql_conf(&self) -> Result<String> {
641 : // Slurp the endpoints/<endpoint id>/postgresql.conf file into
642 : // memory. We will include it in the spec file that we pass to
643 : // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
644 : // in the data directory.
645 0 : let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
646 0 : match std::fs::read(&postgresql_conf_path) {
647 0 : Ok(content) => Ok(String::from_utf8(content)?),
648 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
649 0 : Err(e) => Err(anyhow::Error::new(e).context(format!(
650 0 : "failed to read config file in {}",
651 0 : postgresql_conf_path.to_str().unwrap()
652 0 : ))),
653 : }
654 0 : }
655 :
656 0 : fn build_pageserver_connstr(pageservers: &[(PageserverProtocol, Host, u16)]) -> String {
657 0 : pageservers
658 0 : .iter()
659 0 : .map(|(scheme, host, port)| format!("{scheme}://no_user@{host}:{port}"))
660 0 : .collect::<Vec<_>>()
661 0 : .join(",")
662 0 : }
663 :
664 : /// Map safekeepers ids to the actual connection strings.
665 0 : fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
666 0 : let mut safekeeper_connstrings = Vec::new();
667 0 : if self.mode == ComputeMode::Primary {
668 0 : for sk_id in sk_ids {
669 0 : let sk = self
670 0 : .env
671 0 : .safekeepers
672 0 : .iter()
673 0 : .find(|node| node.id == sk_id)
674 0 : .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
675 0 : safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
676 : }
677 0 : }
678 0 : Ok(safekeeper_connstrings)
679 0 : }
680 :
681 : /// Generate a JWT with the correct claims.
682 0 : pub fn generate_jwt(&self, scope: Option<ComputeClaimsScope>) -> Result<String> {
683 0 : self.env.generate_auth_token(&ComputeClaims {
684 0 : audience: match scope {
685 0 : Some(ComputeClaimsScope::Admin) => Some(vec![COMPUTE_AUDIENCE.to_owned()]),
686 0 : _ => None,
687 : },
688 0 : compute_id: match scope {
689 0 : Some(ComputeClaimsScope::Admin) => None,
690 0 : _ => Some(self.endpoint_id.clone()),
691 : },
692 0 : scope,
693 : })
694 0 : }
695 :
696 0 : pub async fn start(&self, args: EndpointStartArgs) -> Result<()> {
697 0 : if self.status() == EndpointStatus::Running {
698 0 : anyhow::bail!("The endpoint is already running");
699 0 : }
700 :
701 0 : let postgresql_conf = self.read_postgresql_conf()?;
702 :
703 : // We always start the compute node from scratch, so if the Postgres
704 : // data dir exists from a previous launch, remove it first.
705 0 : if self.pgdata().exists() {
706 0 : std::fs::remove_dir_all(self.pgdata())?;
707 0 : }
708 :
709 0 : let pageserver_connstring = Self::build_pageserver_connstr(&args.pageservers);
710 0 : assert!(!pageserver_connstring.is_empty());
711 :
712 0 : let safekeeper_connstrings = self.build_safekeepers_connstrs(args.safekeepers)?;
713 :
714 : // check for file remote_extensions_spec.json
715 : // if it is present, read it and pass to compute_ctl
716 0 : let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
717 0 : let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
718 : let remote_extensions: Option<RemoteExtSpec>;
719 :
720 0 : if let Ok(spec_file) = remote_extensions_spec {
721 0 : remote_extensions = serde_json::from_reader(spec_file).ok();
722 0 : } else {
723 0 : remote_extensions = None;
724 0 : };
725 :
726 : // Create config file
727 0 : let config = {
728 0 : let mut spec = ComputeSpec {
729 0 : skip_pg_catalog_updates: self.skip_pg_catalog_updates,
730 : format_version: 1.0,
731 0 : operation_uuid: None,
732 0 : features: self.features.clone(),
733 0 : swap_size_bytes: None,
734 0 : disk_quota_bytes: None,
735 0 : disable_lfc_resizing: None,
736 : cluster: Cluster {
737 0 : cluster_id: None, // project ID: not used
738 0 : name: None, // project name: not used
739 0 : state: None,
740 0 : roles: if args.create_test_user {
741 0 : vec![Role {
742 0 : name: PgIdent::from_str("test").unwrap(),
743 0 : encrypted_password: None,
744 0 : options: None,
745 0 : }]
746 : } else {
747 0 : Vec::new()
748 : },
749 0 : databases: if args.create_test_user {
750 0 : vec![Database {
751 0 : name: PgIdent::from_str("neondb").unwrap(),
752 0 : owner: PgIdent::from_str("test").unwrap(),
753 0 : options: None,
754 0 : restrict_conn: false,
755 0 : invalid: false,
756 0 : }]
757 : } else {
758 0 : Vec::new()
759 : },
760 0 : settings: None,
761 0 : postgresql_conf: Some(postgresql_conf.clone()),
762 : },
763 0 : delta_operations: None,
764 0 : tenant_id: Some(self.tenant_id),
765 0 : timeline_id: Some(self.timeline_id),
766 0 : project_id: None,
767 0 : branch_id: None,
768 0 : endpoint_id: Some(self.endpoint_id.clone()),
769 0 : mode: self.mode,
770 0 : pageserver_connstring: Some(pageserver_connstring),
771 0 : safekeepers_generation: args.safekeepers_generation.map(|g| g.into_inner()),
772 0 : safekeeper_connstrings,
773 0 : storage_auth_token: args.auth_token.clone(),
774 0 : remote_extensions,
775 0 : pgbouncer_settings: None,
776 0 : shard_stripe_size: Some(args.shard_stripe_size),
777 0 : local_proxy_config: None,
778 0 : reconfigure_concurrency: self.reconfigure_concurrency,
779 0 : drop_subscriptions_before_start: self.drop_subscriptions_before_start,
780 0 : audit_log_level: ComputeAudit::Disabled,
781 0 : logs_export_host: None::<String>,
782 0 : endpoint_storage_addr: Some(args.endpoint_storage_addr),
783 0 : endpoint_storage_token: Some(args.endpoint_storage_token),
784 0 : autoprewarm: args.autoprewarm,
785 0 : offload_lfc_interval_seconds: args.offload_lfc_interval_seconds,
786 : suspend_timeout_seconds: -1, // Only used in neon_local.
787 : };
788 :
789 : // this strange code is needed to support respec() in tests
790 0 : if self.cluster.is_some() {
791 0 : debug!("Cluster is already set in the endpoint spec, using it");
792 0 : spec.cluster = self.cluster.clone().unwrap();
793 :
794 0 : debug!("spec.cluster {:?}", spec.cluster);
795 :
796 : // fill missing fields again
797 0 : if args.create_test_user {
798 0 : spec.cluster.roles.push(Role {
799 0 : name: PgIdent::from_str("test").unwrap(),
800 0 : encrypted_password: None,
801 0 : options: None,
802 0 : });
803 0 : spec.cluster.databases.push(Database {
804 0 : name: PgIdent::from_str("neondb").unwrap(),
805 0 : owner: PgIdent::from_str("test").unwrap(),
806 0 : options: None,
807 0 : restrict_conn: false,
808 0 : invalid: false,
809 0 : });
810 0 : }
811 0 : spec.cluster.postgresql_conf = Some(postgresql_conf);
812 0 : }
813 :
814 0 : ComputeConfig {
815 0 : spec: Some(spec),
816 0 : compute_ctl_config: self.compute_ctl_config.clone(),
817 0 : }
818 : };
819 :
820 0 : let config_path = self.endpoint_path().join("config.json");
821 0 : std::fs::write(config_path, serde_json::to_string_pretty(&config)?)?;
822 :
823 : // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
824 0 : let logfile = std::fs::OpenOptions::new()
825 0 : .create(true)
826 0 : .append(true)
827 0 : .open(self.endpoint_path().join("compute.log"))?;
828 :
829 : // Launch compute_ctl
830 0 : let conn_str = self.connstr("cloud_admin", "postgres");
831 0 : println!("Starting postgres node at '{conn_str}'");
832 0 : if args.create_test_user {
833 0 : let conn_str = self.connstr("test", "neondb");
834 0 : println!("Also at '{conn_str}'");
835 0 : }
836 0 : let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
837 0 : cmd.args([
838 0 : "--external-http-port",
839 0 : &self.external_http_address.port().to_string(),
840 0 : ])
841 0 : .args([
842 0 : "--internal-http-port",
843 0 : &self.internal_http_address.port().to_string(),
844 0 : ])
845 0 : .args(["--pgdata", self.pgdata().to_str().unwrap()])
846 0 : .args(["--connstr", &conn_str])
847 0 : .arg("--config")
848 0 : .arg(self.endpoint_path().join("config.json").as_os_str())
849 0 : .args([
850 : "--pgbin",
851 0 : self.env
852 0 : .pg_bin_dir(self.pg_version)?
853 0 : .join("postgres")
854 0 : .to_str()
855 0 : .unwrap(),
856 : ])
857 : // TODO: It would be nice if we generated compute IDs with the same
858 : // algorithm as the real control plane.
859 0 : .args(["--compute-id", &self.endpoint_id])
860 0 : .stdin(std::process::Stdio::null())
861 0 : .stderr(logfile.try_clone()?)
862 0 : .stdout(logfile);
863 :
864 0 : if let Some(remote_ext_base_url) = args.remote_ext_base_url {
865 0 : cmd.args(["--remote-ext-base-url", &remote_ext_base_url]);
866 0 : }
867 :
868 0 : if args.dev {
869 0 : cmd.arg("--dev");
870 0 : }
871 :
872 0 : let child = cmd.spawn()?;
873 : // set up a scopeguard to kill & wait for the child in case we panic or bail below
874 0 : let child = scopeguard::guard(child, |mut child| {
875 0 : println!("SIGKILL & wait the started process");
876 0 : (|| {
877 : // TODO: use another signal that can be caught by the child so it can clean up any children it spawned
878 0 : child.kill().context("SIGKILL child")?;
879 0 : child.wait().context("wait() for child process")?;
880 0 : anyhow::Ok(())
881 : })()
882 0 : .with_context(|| format!("scopeguard kill&wait child {child:?}"))
883 0 : .unwrap();
884 0 : });
885 :
886 : // Write down the pid so we can wait for it when we want to stop
887 : // TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
888 0 : let pid = child.id();
889 0 : let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
890 0 : std::fs::write(pidfile_path, pid.to_string())?;
891 :
892 : // Wait for it to start
893 : const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
894 0 : let start_at = Instant::now();
895 : loop {
896 0 : match self.get_status().await {
897 0 : Ok(state) => {
898 0 : match state.status {
899 : ComputeStatus::Init => {
900 0 : let timeout = args.start_timeout;
901 0 : if Instant::now().duration_since(start_at) > timeout {
902 0 : bail!(
903 0 : "compute startup timed out {:?}; still in Init state",
904 : timeout
905 : );
906 0 : }
907 : // keep retrying
908 : }
909 : ComputeStatus::Running => {
910 : // All good!
911 0 : break;
912 : }
913 : ComputeStatus::Failed => {
914 0 : bail!(
915 0 : "compute startup failed: {}",
916 0 : state
917 0 : .error
918 0 : .as_deref()
919 0 : .unwrap_or("<no error from compute_ctl>")
920 : );
921 : }
922 : ComputeStatus::Empty
923 : | ComputeStatus::ConfigurationPending
924 : | ComputeStatus::Configuration
925 : | ComputeStatus::TerminationPending { .. }
926 : | ComputeStatus::Terminated => {
927 0 : bail!("unexpected compute status: {:?}", state.status)
928 : }
929 : }
930 : }
931 0 : Err(e) => {
932 0 : if Instant::now().duration_since(start_at) > args.start_timeout {
933 0 : return Err(e).context(format!(
934 0 : "timed out {:?} waiting to connect to compute_ctl HTTP",
935 : args.start_timeout
936 : ));
937 0 : }
938 : }
939 : }
940 0 : tokio::time::sleep(ATTEMPT_INTERVAL).await;
941 : }
942 :
943 : // disarm the scopeguard, let the child outlive this function (and neon_local invoction)
944 0 : drop(scopeguard::ScopeGuard::into_inner(child));
945 :
946 0 : Ok(())
947 0 : }
948 :
949 : // Call the /status HTTP API
950 0 : pub async fn get_status(&self) -> Result<ComputeStatusResponse> {
951 0 : let client = reqwest::Client::new();
952 :
953 0 : let response = client
954 0 : .request(
955 0 : reqwest::Method::GET,
956 0 : format!(
957 0 : "http://{}:{}/status",
958 0 : self.external_http_address.ip(),
959 0 : self.external_http_address.port()
960 : ),
961 : )
962 0 : .bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
963 0 : .send()
964 0 : .await?;
965 :
966 : // Interpret the response
967 0 : let status = response.status();
968 0 : if !(status.is_client_error() || status.is_server_error()) {
969 0 : Ok(response.json().await?)
970 : } else {
971 : // reqwest does not export its error construction utility functions, so let's craft the message ourselves
972 0 : let url = response.url().to_owned();
973 0 : let msg = match response.text().await {
974 0 : Ok(err_body) => format!("Error: {err_body}"),
975 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
976 : };
977 0 : Err(anyhow::anyhow!(msg))
978 : }
979 0 : }
980 :
981 0 : pub async fn reconfigure(
982 0 : &self,
983 0 : pageservers: Option<Vec<(PageserverProtocol, Host, u16)>>,
984 0 : stripe_size: Option<ShardStripeSize>,
985 0 : safekeepers: Option<Vec<NodeId>>,
986 0 : safekeeper_generation: Option<SafekeeperGeneration>,
987 0 : ) -> Result<()> {
988 0 : let (mut spec, compute_ctl_config) = {
989 0 : let config_path = self.endpoint_path().join("config.json");
990 0 : let file = std::fs::File::open(config_path)?;
991 0 : let config: ComputeConfig = serde_json::from_reader(file)?;
992 :
993 0 : (config.spec.unwrap(), config.compute_ctl_config)
994 : };
995 :
996 0 : let postgresql_conf = self.read_postgresql_conf()?;
997 0 : spec.cluster.postgresql_conf = Some(postgresql_conf);
998 :
999 : // If pageservers are not specified, don't change them.
1000 0 : if let Some(pageservers) = pageservers {
1001 0 : anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
1002 :
1003 0 : let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
1004 0 : spec.pageserver_connstring = Some(pageserver_connstr);
1005 0 : if stripe_size.is_some() {
1006 0 : spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
1007 0 : }
1008 0 : }
1009 :
1010 : // If safekeepers are not specified, don't change them.
1011 0 : if let Some(safekeepers) = safekeepers {
1012 0 : let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
1013 0 : spec.safekeeper_connstrings = safekeeper_connstrings;
1014 0 : if let Some(g) = safekeeper_generation {
1015 0 : spec.safekeepers_generation = Some(g.into_inner());
1016 0 : }
1017 0 : }
1018 :
1019 0 : let client = reqwest::Client::builder()
1020 0 : .timeout(Duration::from_secs(120))
1021 0 : .build()
1022 0 : .unwrap();
1023 0 : let response = client
1024 0 : .post(format!(
1025 0 : "http://{}:{}/configure",
1026 0 : self.external_http_address.ip(),
1027 0 : self.external_http_address.port()
1028 : ))
1029 0 : .header(CONTENT_TYPE.as_str(), "application/json")
1030 0 : .bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
1031 0 : .body(
1032 0 : serde_json::to_string(&ConfigurationRequest {
1033 0 : spec,
1034 0 : compute_ctl_config,
1035 0 : })
1036 0 : .unwrap(),
1037 : )
1038 0 : .send()
1039 0 : .await?;
1040 :
1041 0 : let status = response.status();
1042 0 : if !(status.is_client_error() || status.is_server_error()) {
1043 0 : Ok(())
1044 : } else {
1045 0 : let url = response.url().to_owned();
1046 0 : let msg = match response.text().await {
1047 0 : Ok(err_body) => format!("Error: {err_body}"),
1048 0 : Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
1049 : };
1050 0 : Err(anyhow::anyhow!(msg))
1051 : }
1052 0 : }
1053 :
1054 0 : pub async fn reconfigure_pageservers(
1055 0 : &self,
1056 0 : pageservers: Vec<(PageserverProtocol, Host, u16)>,
1057 0 : stripe_size: Option<ShardStripeSize>,
1058 0 : ) -> Result<()> {
1059 0 : self.reconfigure(Some(pageservers), stripe_size, None, None)
1060 0 : .await
1061 0 : }
1062 :
1063 0 : pub async fn reconfigure_safekeepers(
1064 0 : &self,
1065 0 : safekeepers: Vec<NodeId>,
1066 0 : generation: SafekeeperGeneration,
1067 0 : ) -> Result<()> {
1068 0 : self.reconfigure(None, None, Some(safekeepers), Some(generation))
1069 0 : .await
1070 0 : }
1071 :
1072 0 : pub async fn stop(
1073 0 : &self,
1074 0 : mode: EndpointTerminateMode,
1075 0 : destroy: bool,
1076 0 : ) -> Result<TerminateResponse> {
1077 : // pg_ctl stop is fast but doesn't allow us to collect LSN. /terminate is
1078 : // slow, and test runs time out. Solution: special mode "immediate-terminate"
1079 : // which uses /terminate
1080 0 : let response = if let EndpointTerminateMode::ImmediateTerminate = mode {
1081 0 : let ip = self.external_http_address.ip();
1082 0 : let port = self.external_http_address.port();
1083 0 : let url = format!("http://{ip}:{port}/terminate?mode=immediate");
1084 0 : let token = self.generate_jwt(Some(ComputeClaimsScope::Admin))?;
1085 0 : let request = reqwest::Client::new().post(url).bearer_auth(token);
1086 0 : let response = request.send().await.context("/terminate")?;
1087 0 : let text = response.text().await.context("/terminate result")?;
1088 0 : serde_json::from_str(&text).with_context(|| format!("deserializing {text}"))?
1089 : } else {
1090 0 : self.pg_ctl(&["-m", &mode.to_string(), "stop"], &None)?;
1091 0 : TerminateResponse { lsn: None }
1092 : };
1093 :
1094 : // Also wait for the compute_ctl process to die. It might have some
1095 : // cleanup work to do after postgres stops, like syncing safekeepers,
1096 : // etc.
1097 : //
1098 : // If destroying or stop mode is immediate, send it SIGTERM before
1099 : // waiting. Sometimes we do *not* want this cleanup: tests intentionally
1100 : // do stop when majority of safekeepers is down, so sync-safekeepers
1101 : // would hang otherwise. This could be a separate flag though.
1102 0 : let send_sigterm = destroy || !matches!(mode, EndpointTerminateMode::Fast);
1103 0 : self.wait_for_compute_ctl_to_exit(send_sigterm)?;
1104 0 : if destroy {
1105 0 : println!(
1106 0 : "Destroying postgres data directory '{}'",
1107 0 : self.pgdata().to_str().unwrap()
1108 : );
1109 0 : std::fs::remove_dir_all(self.endpoint_path())?;
1110 0 : }
1111 0 : Ok(response)
1112 0 : }
1113 :
1114 0 : pub fn connstr(&self, user: &str, db_name: &str) -> String {
1115 0 : format!(
1116 0 : "postgresql://{}@{}:{}/{}",
1117 : user,
1118 0 : self.pg_address.ip(),
1119 0 : self.pg_address.port(),
1120 : db_name
1121 : )
1122 0 : }
1123 : }
|