LCOV - code coverage report
Current view: top level - control_plane/src - endpoint.rs (source / functions) Coverage Total Hit
Test: a1cc1f33dc9899e4da66eb51e44e911a4b3bd648.info Lines: 0.0 % 833 0
Test Date: 2025-07-31 11:35:14 Functions: 0.0 % 59 0

            Line data    Source code
       1              : //! Code to manage compute endpoints
       2              : //!
       3              : //! In the local test environment, the data for each endpoint is stored in
       4              : //!
       5              : //! ```text
       6              : //!   .neon/endpoints/<endpoint id>
       7              : //! ```
       8              : //!
       9              : //! Some basic information about the endpoint, like the tenant and timeline IDs,
      10              : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
      11              : //! when the endpoint is created, and doesn't change afterwards.
      12              : //!
      13              : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
      14              : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
      15              : //! the basebackup from the pageserver to initialize the data directory, and
      16              : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
      17              : //! until it exits.
      18              : //!
      19              : //! When an endpoint is created, a `postgresql.conf` file is also created in
      20              : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
      21              : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
      22              : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
      23              : //! copy of it in the data directory.
      24              : //!
      25              : //! Directory contents:
      26              : //!
      27              : //! ```text
      28              : //! .neon/endpoints/main/
      29              : //!     compute.log               - log output of `compute_ctl` and `postgres`
      30              : //!     endpoint.json             - serialized `EndpointConf` struct
      31              : //!     postgresql.conf           - postgresql settings
      32              : //!     config.json                 - passed to `compute_ctl`
      33              : //!     pgdata/
      34              : //!         postgresql.conf       - copy of postgresql.conf created by `compute_ctl`
      35              : //!         neon.signal
      36              : //!         zenith.signal         - copy of neon.signal, for backward compatibility
      37              : //!         <other PostgreSQL files>
      38              : //! ```
      39              : //!
      40              : use anyhow::{Context, Result, anyhow, bail};
      41              : use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
      42              : use compute_api::requests::{
      43              :     COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
      44              : };
      45              : use compute_api::responses::{
      46              :     ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TerminateResponse,
      47              :     TlsConfig,
      48              : };
      49              : use compute_api::spec::{
      50              :     Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PageserverProtocol,
      51              :     PageserverShardInfo, PgIdent, RemoteExtSpec, Role,
      52              : };
      53              : 
      54              : // re-export these, because they're used in the reconfigure() function
      55              : pub use compute_api::spec::{PageserverConnectionInfo, PageserverShardConnectionInfo};
      56              : 
      57              : use jsonwebtoken::jwk::{
      58              :     AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
      59              :     OctetKeyPairParameters, OctetKeyPairType, PublicKeyUse, RSAKeyParameters, RSAKeyType,
      60              : };
      61              : use nix::sys::signal::{Signal, kill};
      62              : use pem::Pem;
      63              : use reqwest::header::CONTENT_TYPE;
      64              : use rsa::{RsaPublicKey, pkcs1::DecodeRsaPublicKey, traits::PublicKeyParts};
      65              : use safekeeper_api::PgMajorVersion;
      66              : use safekeeper_api::membership::SafekeeperGeneration;
      67              : use serde::{Deserialize, Serialize};
      68              : use sha2::{Digest, Sha256};
      69              : use spki::der::Decode;
      70              : use spki::{SubjectPublicKeyInfo, SubjectPublicKeyInfoRef};
      71              : use std::collections::{BTreeMap, HashMap};
      72              : use std::fmt::Display;
      73              : use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
      74              : use std::path::PathBuf;
      75              : use std::process::Command;
      76              : use std::str::FromStr;
      77              : use std::sync::Arc;
      78              : use std::time::{Duration, Instant};
      79              : use tracing::debug;
      80              : use utils::id::{NodeId, TenantId, TimelineId};
      81              : use utils::shard::{ShardCount, ShardIndex, ShardNumber};
      82              : use x509_parser::parse_x509_certificate;
      83              : 
      84              : use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT;
      85              : use postgres_connection::parse_host_port;
      86              : 
      87              : use crate::local_env::LocalEnv;
      88              : use crate::postgresql_conf::PostgresConf;
      89              : 
      90              : // contents of a endpoint.json file
      91            0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
      92              : pub struct EndpointConf {
      93              :     endpoint_id: String,
      94              :     tenant_id: TenantId,
      95              :     timeline_id: TimelineId,
      96              :     mode: ComputeMode,
      97              :     pg_port: u16,
      98              :     external_http_port: u16,
      99              :     internal_http_port: u16,
     100              :     pg_version: PgMajorVersion,
     101              :     grpc: bool,
     102              :     skip_pg_catalog_updates: bool,
     103              :     reconfigure_concurrency: usize,
     104              :     drop_subscriptions_before_start: bool,
     105              :     features: Vec<ComputeFeature>,
     106              :     cluster: Option<Cluster>,
     107              :     compute_ctl_config: ComputeCtlConfig,
     108              :     privileged_role_name: Option<String>,
     109              : }
     110              : 
     111              : //
     112              : // ComputeControlPlane
     113              : //
     114              : pub struct ComputeControlPlane {
     115              :     base_port: u16,
     116              : 
     117              :     // endpoint ID is the key
     118              :     pub endpoints: BTreeMap<String, Arc<Endpoint>>,
     119              : 
     120              :     env: LocalEnv,
     121              : }
     122              : 
     123              : impl ComputeControlPlane {
     124              :     // Load current endpoints from the endpoints/ subdirectories
     125            0 :     pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
     126            0 :         let mut endpoints = BTreeMap::default();
     127            0 :         for endpoint_dir in std::fs::read_dir(env.endpoints_path())
     128            0 :             .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
     129              :         {
     130            0 :             let ep_res = Endpoint::from_dir_entry(endpoint_dir?, &env);
     131            0 :             let ep = match ep_res {
     132            0 :                 Ok(ep) => ep,
     133            0 :                 Err(e) => match e.downcast::<std::io::Error>() {
     134            0 :                     Ok(e) => {
     135              :                         // A parallel task could delete an endpoint while we have just scanned the directory
     136            0 :                         if e.kind() == std::io::ErrorKind::NotFound {
     137            0 :                             continue;
     138              :                         } else {
     139            0 :                             Err(e)?
     140              :                         }
     141              :                     }
     142            0 :                     Err(e) => Err(e)?,
     143              :                 },
     144              :             };
     145            0 :             endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
     146              :         }
     147              : 
     148            0 :         Ok(ComputeControlPlane {
     149            0 :             base_port: 55431,
     150            0 :             endpoints,
     151            0 :             env,
     152            0 :         })
     153            0 :     }
     154              : 
     155            0 :     fn get_port(&mut self) -> u16 {
     156            0 :         1 + self
     157            0 :             .endpoints
     158            0 :             .values()
     159            0 :             .map(|ep| std::cmp::max(ep.pg_address.port(), ep.external_http_address.port()))
     160            0 :             .max()
     161            0 :             .unwrap_or(self.base_port)
     162            0 :     }
     163              : 
     164              :     // BEGIN HADRON
     165              : 
     166              :     /// Extract SubjectPublicKeyInfo from a PEM that can be either a X509 certificate or a public key
     167            0 :     fn extract_spki_from_pem(pem: &Pem) -> Result<Vec<u8>> {
     168            0 :         if pem.tag() == "CERTIFICATE" {
     169              :             // Handle X509 certificate
     170            0 :             let (_, cert) = parse_x509_certificate(pem.contents())?;
     171            0 :             let public_key = cert.public_key();
     172            0 :             Ok(public_key.subject_public_key.data.to_vec())
     173              :         } else {
     174              :             // Handle public key directly
     175            0 :             let spki: SubjectPublicKeyInfoRef = SubjectPublicKeyInfo::from_der(pem.contents())?;
     176            0 :             Ok(spki.subject_public_key.raw_bytes().to_vec())
     177              :         }
     178            0 :     }
     179              : 
     180              :     /// Create RSA JWK from certificate PEM
     181            0 :     fn create_rsa_jwk_from_cert(pem: &Pem, key_hash: &[u8]) -> Result<Jwk> {
     182            0 :         let public_key = Self::extract_spki_from_pem(pem)?;
     183              : 
     184              :         // Extract RSA parameters (n, e) from RSA public key DER data
     185            0 :         let rsa_key = RsaPublicKey::from_pkcs1_der(&public_key)?;
     186            0 :         let n = rsa_key.n().to_bytes_be();
     187            0 :         let e = rsa_key.e().to_bytes_be();
     188              : 
     189            0 :         Ok(Jwk {
     190            0 :             common: CommonParameters {
     191            0 :                 public_key_use: Some(PublicKeyUse::Signature),
     192            0 :                 key_operations: Some(vec![KeyOperations::Verify]),
     193            0 :                 key_algorithm: Some(KeyAlgorithm::RS256),
     194            0 :                 key_id: Some(URL_SAFE_NO_PAD.encode(key_hash)),
     195            0 :                 x509_url: None::<String>,
     196            0 :                 x509_chain: None::<Vec<String>>,
     197            0 :                 x509_sha1_fingerprint: None::<String>,
     198            0 :                 x509_sha256_fingerprint: None::<String>,
     199            0 :             },
     200            0 :             algorithm: AlgorithmParameters::RSA(RSAKeyParameters {
     201            0 :                 key_type: RSAKeyType::RSA,
     202            0 :                 n: URL_SAFE_NO_PAD.encode(n),
     203            0 :                 e: URL_SAFE_NO_PAD.encode(e),
     204            0 :             }),
     205            0 :         })
     206            0 :     }
     207              : 
     208              :     // END HADRON
     209              : 
     210              :     /// Create a JSON Web Key Set. This ideally matches the way we create a JWKS
     211              :     /// from the production control plane.
     212            0 :     fn create_jwks_from_pem(pem: &Pem) -> Result<JwkSet> {
     213            0 :         let public_key = Self::extract_spki_from_pem(pem)?;
     214              : 
     215            0 :         let mut hasher = Sha256::new();
     216            0 :         hasher.update(&public_key);
     217            0 :         let key_hash = hasher.finalize();
     218              : 
     219              :         // BEGIN HADRON
     220            0 :         if pem.tag() == "CERTIFICATE" {
     221              :             // Assume RSA if we are parsing keys from a certificate.
     222            0 :             let jwk = Self::create_rsa_jwk_from_cert(pem, &key_hash)?;
     223            0 :             return Ok(JwkSet { keys: vec![jwk] });
     224            0 :         }
     225              :         // END HADRON
     226              : 
     227            0 :         Ok(JwkSet {
     228            0 :             keys: vec![Jwk {
     229            0 :                 common: CommonParameters {
     230            0 :                     public_key_use: Some(PublicKeyUse::Signature),
     231            0 :                     key_operations: Some(vec![KeyOperations::Verify]),
     232            0 :                     key_algorithm: Some(KeyAlgorithm::EdDSA),
     233            0 :                     key_id: Some(URL_SAFE_NO_PAD.encode(key_hash)),
     234            0 :                     x509_url: None::<String>,
     235            0 :                     x509_chain: None::<Vec<String>>,
     236            0 :                     x509_sha1_fingerprint: None::<String>,
     237            0 :                     x509_sha256_fingerprint: None::<String>,
     238            0 :                 },
     239            0 :                 algorithm: AlgorithmParameters::OctetKeyPair(OctetKeyPairParameters {
     240            0 :                     key_type: OctetKeyPairType::OctetKeyPair,
     241            0 :                     curve: EllipticCurve::Ed25519,
     242            0 :                     x: URL_SAFE_NO_PAD.encode(public_key),
     243            0 :                 }),
     244            0 :             }],
     245            0 :         })
     246            0 :     }
     247              : 
     248              :     #[allow(clippy::too_many_arguments)]
     249            0 :     pub fn new_endpoint(
     250            0 :         &mut self,
     251            0 :         endpoint_id: &str,
     252            0 :         tenant_id: TenantId,
     253            0 :         timeline_id: TimelineId,
     254            0 :         pg_port: Option<u16>,
     255            0 :         external_http_port: Option<u16>,
     256            0 :         internal_http_port: Option<u16>,
     257            0 :         pg_version: PgMajorVersion,
     258            0 :         mode: ComputeMode,
     259            0 :         grpc: bool,
     260            0 :         skip_pg_catalog_updates: bool,
     261            0 :         drop_subscriptions_before_start: bool,
     262            0 :         privileged_role_name: Option<String>,
     263            0 :     ) -> Result<Arc<Endpoint>> {
     264            0 :         let pg_port = pg_port.unwrap_or_else(|| self.get_port());
     265            0 :         let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
     266            0 :         let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1);
     267            0 :         let compute_ctl_config = ComputeCtlConfig {
     268            0 :             jwks: Self::create_jwks_from_pem(&self.env.read_public_key()?)?,
     269            0 :             tls: None::<TlsConfig>,
     270              :         };
     271            0 :         let ep = Arc::new(Endpoint {
     272            0 :             endpoint_id: endpoint_id.to_owned(),
     273            0 :             pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), pg_port),
     274            0 :             external_http_address: SocketAddr::new(
     275            0 :                 IpAddr::from(Ipv4Addr::UNSPECIFIED),
     276            0 :                 external_http_port,
     277            0 :             ),
     278            0 :             internal_http_address: SocketAddr::new(
     279            0 :                 IpAddr::from(Ipv4Addr::LOCALHOST),
     280            0 :                 internal_http_port,
     281            0 :             ),
     282            0 :             env: self.env.clone(),
     283            0 :             timeline_id,
     284            0 :             mode,
     285            0 :             tenant_id,
     286            0 :             pg_version,
     287            0 :             // We don't setup roles and databases in the spec locally, so we don't need to
     288            0 :             // do catalog updates. Catalog updates also include check availability
     289            0 :             // data creation. Yet, we have tests that check that size and db dump
     290            0 :             // before and after start are the same. So, skip catalog updates,
     291            0 :             // with this we basically test a case of waking up an idle compute, where
     292            0 :             // we also skip catalog updates in the cloud.
     293            0 :             skip_pg_catalog_updates,
     294            0 :             drop_subscriptions_before_start,
     295            0 :             grpc,
     296            0 :             reconfigure_concurrency: 1,
     297            0 :             features: vec![],
     298            0 :             cluster: None,
     299            0 :             compute_ctl_config: compute_ctl_config.clone(),
     300            0 :             privileged_role_name: privileged_role_name.clone(),
     301            0 :         });
     302              : 
     303            0 :         ep.create_endpoint_dir()?;
     304            0 :         std::fs::write(
     305            0 :             ep.endpoint_path().join("endpoint.json"),
     306            0 :             serde_json::to_string_pretty(&EndpointConf {
     307            0 :                 endpoint_id: endpoint_id.to_string(),
     308            0 :                 tenant_id,
     309            0 :                 timeline_id,
     310            0 :                 mode,
     311            0 :                 external_http_port,
     312            0 :                 internal_http_port,
     313            0 :                 pg_port,
     314            0 :                 pg_version,
     315            0 :                 grpc,
     316            0 :                 skip_pg_catalog_updates,
     317            0 :                 drop_subscriptions_before_start,
     318            0 :                 reconfigure_concurrency: 1,
     319            0 :                 features: vec![],
     320            0 :                 cluster: None,
     321            0 :                 compute_ctl_config,
     322            0 :                 privileged_role_name,
     323            0 :             })?,
     324            0 :         )?;
     325            0 :         std::fs::write(
     326            0 :             ep.endpoint_path().join("postgresql.conf"),
     327            0 :             ep.setup_pg_conf()?.to_string(),
     328            0 :         )?;
     329              : 
     330            0 :         self.endpoints
     331            0 :             .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
     332              : 
     333            0 :         Ok(ep)
     334            0 :     }
     335              : 
     336            0 :     pub fn check_conflicting_endpoints(
     337            0 :         &self,
     338            0 :         mode: ComputeMode,
     339            0 :         tenant_id: TenantId,
     340            0 :         timeline_id: TimelineId,
     341            0 :     ) -> Result<()> {
     342            0 :         if matches!(mode, ComputeMode::Primary) {
     343              :             // this check is not complete, as you could have a concurrent attempt at
     344              :             // creating another primary, both reading the state before checking it here,
     345              :             // but it's better than nothing.
     346            0 :             let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
     347            0 :                 v.tenant_id == tenant_id
     348            0 :                     && v.timeline_id == timeline_id
     349            0 :                     && v.mode == mode
     350            0 :                     && v.status() != EndpointStatus::Stopped
     351            0 :             });
     352              : 
     353            0 :             if let Some((key, _)) = duplicates.next() {
     354            0 :                 bail!(
     355            0 :                     "attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported."
     356              :                 );
     357            0 :             }
     358            0 :         }
     359            0 :         Ok(())
     360            0 :     }
     361              : }
     362              : 
     363              : ///////////////////////////////////////////////////////////////////////////////
     364              : 
     365              : pub struct Endpoint {
     366              :     /// used as the directory name
     367              :     endpoint_id: String,
     368              :     pub tenant_id: TenantId,
     369              :     pub timeline_id: TimelineId,
     370              :     pub mode: ComputeMode,
     371              :     /// If true, the endpoint should use gRPC to communicate with Pageservers.
     372              :     pub grpc: bool,
     373              : 
     374              :     // port and address of the Postgres server and `compute_ctl`'s HTTP APIs
     375              :     pub pg_address: SocketAddr,
     376              :     pub external_http_address: SocketAddr,
     377              :     pub internal_http_address: SocketAddr,
     378              : 
     379              :     // postgres major version in the format: 14, 15, etc.
     380              :     pg_version: PgMajorVersion,
     381              : 
     382              :     // These are not part of the endpoint as such, but the environment
     383              :     // the endpoint runs in.
     384              :     pub env: LocalEnv,
     385              : 
     386              :     // Optimizations
     387              :     skip_pg_catalog_updates: bool,
     388              : 
     389              :     drop_subscriptions_before_start: bool,
     390              :     reconfigure_concurrency: usize,
     391              :     // Feature flags
     392              :     features: Vec<ComputeFeature>,
     393              :     // Cluster settings
     394              :     cluster: Option<Cluster>,
     395              : 
     396              :     /// The compute_ctl config for the endpoint's compute.
     397              :     compute_ctl_config: ComputeCtlConfig,
     398              : 
     399              :     /// The name of the privileged role for the endpoint.
     400              :     privileged_role_name: Option<String>,
     401              : }
     402              : 
     403              : #[derive(PartialEq, Eq)]
     404              : pub enum EndpointStatus {
     405              :     Running,
     406              :     Stopped,
     407              :     Crashed,
     408              :     RunningNoPidfile,
     409              : }
     410              : 
     411              : impl Display for EndpointStatus {
     412            0 :     fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
     413            0 :         writer.write_str(match self {
     414            0 :             Self::Running => "running",
     415            0 :             Self::Stopped => "stopped",
     416            0 :             Self::Crashed => "crashed",
     417            0 :             Self::RunningNoPidfile => "running, no pidfile",
     418              :         })
     419            0 :     }
     420              : }
     421              : 
     422              : #[derive(Default, Clone, Copy, clap::ValueEnum)]
     423              : pub enum EndpointTerminateMode {
     424              :     #[default]
     425              :     /// Use pg_ctl stop -m fast
     426              :     Fast,
     427              :     /// Use pg_ctl stop -m immediate
     428              :     Immediate,
     429              :     /// Use /terminate?mode=immediate
     430              :     ImmediateTerminate,
     431              : }
     432              : 
     433              : impl std::fmt::Display for EndpointTerminateMode {
     434            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     435            0 :         f.write_str(match &self {
     436            0 :             EndpointTerminateMode::Fast => "fast",
     437            0 :             EndpointTerminateMode::Immediate => "immediate",
     438            0 :             EndpointTerminateMode::ImmediateTerminate => "immediate-terminate",
     439              :         })
     440            0 :     }
     441              : }
     442              : 
     443              : pub struct EndpointStartArgs {
     444              :     pub auth_token: Option<String>,
     445              :     pub endpoint_storage_token: String,
     446              :     pub endpoint_storage_addr: String,
     447              :     pub safekeepers_generation: Option<SafekeeperGeneration>,
     448              :     pub safekeepers: Vec<NodeId>,
     449              :     pub pageserver_conninfo: PageserverConnectionInfo,
     450              :     pub remote_ext_base_url: Option<String>,
     451              :     pub create_test_user: bool,
     452              :     pub start_timeout: Duration,
     453              :     pub autoprewarm: bool,
     454              :     pub offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
     455              :     pub dev: bool,
     456              : }
     457              : 
     458              : impl Endpoint {
     459            0 :     fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
     460            0 :         if !entry.file_type()?.is_dir() {
     461            0 :             anyhow::bail!(
     462            0 :                 "Endpoint::from_dir_entry failed: '{}' is not a directory",
     463            0 :                 entry.path().display()
     464              :             );
     465            0 :         }
     466              : 
     467              :         // parse data directory name
     468            0 :         let fname = entry.file_name();
     469            0 :         let endpoint_id = fname.to_str().unwrap().to_string();
     470              : 
     471              :         // Read the endpoint.json file
     472            0 :         let conf: EndpointConf =
     473            0 :             serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
     474              : 
     475            0 :         debug!("serialized endpoint conf: {:?}", conf);
     476              : 
     477            0 :         Ok(Endpoint {
     478            0 :             pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
     479            0 :             external_http_address: SocketAddr::new(
     480            0 :                 IpAddr::from(Ipv4Addr::UNSPECIFIED),
     481            0 :                 conf.external_http_port,
     482            0 :             ),
     483            0 :             internal_http_address: SocketAddr::new(
     484            0 :                 IpAddr::from(Ipv4Addr::LOCALHOST),
     485            0 :                 conf.internal_http_port,
     486            0 :             ),
     487            0 :             endpoint_id,
     488            0 :             env: env.clone(),
     489            0 :             timeline_id: conf.timeline_id,
     490            0 :             mode: conf.mode,
     491            0 :             tenant_id: conf.tenant_id,
     492            0 :             pg_version: conf.pg_version,
     493            0 :             grpc: conf.grpc,
     494            0 :             skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
     495            0 :             reconfigure_concurrency: conf.reconfigure_concurrency,
     496            0 :             drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
     497            0 :             features: conf.features,
     498            0 :             cluster: conf.cluster,
     499            0 :             compute_ctl_config: conf.compute_ctl_config,
     500            0 :             privileged_role_name: conf.privileged_role_name,
     501            0 :         })
     502            0 :     }
     503              : 
     504            0 :     fn create_endpoint_dir(&self) -> Result<()> {
     505            0 :         std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
     506            0 :             format!(
     507            0 :                 "could not create endpoint directory {}",
     508            0 :                 self.endpoint_path().display()
     509              :             )
     510            0 :         })
     511            0 :     }
     512              : 
     513              :     // Generate postgresql.conf with default configuration
     514            0 :     fn setup_pg_conf(&self) -> Result<PostgresConf> {
     515            0 :         let mut conf = PostgresConf::new();
     516            0 :         conf.append("max_wal_senders", "10");
     517            0 :         conf.append("wal_log_hints", "off");
     518            0 :         conf.append("max_replication_slots", "10");
     519            0 :         conf.append("hot_standby", "on");
     520              :         // Set to 1MB to both exercise getPage requests/LFC, and still have enough room for
     521              :         // Postgres to operate. Everything smaller might be not enough for Postgres under load,
     522              :         // and can cause errors like 'no unpinned buffers available', see
     523              :         // <https://github.com/neondatabase/neon/issues/9956>
     524            0 :         conf.append("shared_buffers", "1MB");
     525              :         // Postgres defaults to effective_io_concurrency=1, which does not exercise the pageserver's
     526              :         // batching logic.  Set this to 2 so that we exercise the code a bit without letting
     527              :         // individual tests do a lot of concurrent work on underpowered test machines
     528            0 :         conf.append("effective_io_concurrency", "2");
     529            0 :         conf.append("fsync", "off");
     530            0 :         conf.append("max_connections", "100");
     531            0 :         conf.append("wal_level", "logical");
     532              :         // wal_sender_timeout is the maximum time to wait for WAL replication.
     533              :         // It also defines how often the walreceiver will send a feedback message to the wal sender.
     534            0 :         conf.append("wal_sender_timeout", "5s");
     535            0 :         conf.append("listen_addresses", &self.pg_address.ip().to_string());
     536            0 :         conf.append("port", &self.pg_address.port().to_string());
     537            0 :         conf.append("wal_keep_size", "0");
     538              :         // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
     539            0 :         conf.append("restart_after_crash", "off");
     540              : 
     541              :         // Load the 'neon' extension
     542            0 :         conf.append("shared_preload_libraries", "neon");
     543              : 
     544            0 :         conf.append_line("");
     545              :         // Replication-related configurations, such as WAL sending
     546            0 :         match &self.mode {
     547              :             ComputeMode::Primary => {
     548              :                 // Configure backpressure
     549              :                 // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
     550              :                 //   This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
     551              :                 //   so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
     552              :                 //   Actually latency should be much smaller (better if < 1sec). But we assume that recently
     553              :                 //   updates pages are not requested from pageserver.
     554              :                 // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
     555              :                 //   delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
     556              :                 //   remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
     557              :                 //   recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
     558              :                 // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
     559              :                 //   To be able to restore database in case of pageserver node crash, safekeeper should not
     560              :                 //   remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
     561              :                 //   (if they are not able to upload WAL to S3).
     562            0 :                 conf.append("max_replication_write_lag", "15MB");
     563            0 :                 conf.append("max_replication_flush_lag", "10GB");
     564              : 
     565            0 :                 if !self.env.safekeepers.is_empty() {
     566              :                     // Configure Postgres to connect to the safekeepers
     567            0 :                     conf.append("synchronous_standby_names", "walproposer");
     568              : 
     569            0 :                     let safekeepers = self
     570            0 :                         .env
     571            0 :                         .safekeepers
     572            0 :                         .iter()
     573            0 :                         .map(|sk| format!("localhost:{}", sk.get_compute_port()))
     574            0 :                         .collect::<Vec<String>>()
     575            0 :                         .join(",");
     576            0 :                     conf.append("neon.safekeepers", &safekeepers);
     577            0 :                 } else {
     578            0 :                     // We only use setup without safekeepers for tests,
     579            0 :                     // and don't care about data durability on pageserver,
     580            0 :                     // so set more relaxed synchronous_commit.
     581            0 :                     conf.append("synchronous_commit", "remote_write");
     582            0 : 
     583            0 :                     // Configure the node to stream WAL directly to the pageserver
     584            0 :                     // This isn't really a supported configuration, but can be useful for
     585            0 :                     // testing.
     586            0 :                     conf.append("synchronous_standby_names", "pageserver");
     587            0 :                 }
     588              :             }
     589            0 :             ComputeMode::Static(lsn) => {
     590            0 :                 conf.append("recovery_target_lsn", &lsn.to_string());
     591            0 :             }
     592              :             ComputeMode::Replica => {
     593            0 :                 assert!(!self.env.safekeepers.is_empty());
     594              : 
     595              :                 // TODO: use future host field from safekeeper spec
     596              :                 // Pass the list of safekeepers to the replica so that it can connect to any of them,
     597              :                 // whichever is available.
     598            0 :                 let sk_ports = self
     599            0 :                     .env
     600            0 :                     .safekeepers
     601            0 :                     .iter()
     602            0 :                     .map(|x| x.get_compute_port().to_string())
     603            0 :                     .collect::<Vec<_>>()
     604            0 :                     .join(",");
     605            0 :                 let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
     606              : 
     607            0 :                 let connstr = format!(
     608            0 :                     "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
     609              :                     sk_hosts,
     610              :                     sk_ports,
     611            0 :                     &self.timeline_id.to_string(),
     612            0 :                     &self.tenant_id.to_string(),
     613              :                 );
     614              : 
     615            0 :                 let slot_name = format!("repl_{}_", self.timeline_id);
     616            0 :                 conf.append("primary_conninfo", connstr.as_str());
     617            0 :                 conf.append("primary_slot_name", slot_name.as_str());
     618            0 :                 conf.append("hot_standby", "on");
     619              :                 // prefetching of blocks referenced in WAL doesn't make sense for us
     620              :                 // Neon hot standby ignores pages that are not in the shared_buffers
     621            0 :                 if self.pg_version >= PgMajorVersion::PG15 {
     622            0 :                     conf.append("recovery_prefetch", "off");
     623            0 :                 }
     624              :             }
     625              :         }
     626              : 
     627            0 :         Ok(conf)
     628            0 :     }
     629              : 
     630            0 :     pub fn endpoint_path(&self) -> PathBuf {
     631            0 :         self.env.endpoints_path().join(&self.endpoint_id)
     632            0 :     }
     633              : 
     634            0 :     pub fn pgdata(&self) -> PathBuf {
     635            0 :         self.endpoint_path().join("pgdata")
     636            0 :     }
     637              : 
     638            0 :     pub fn status(&self) -> EndpointStatus {
     639            0 :         let timeout = Duration::from_millis(300);
     640            0 :         let has_pidfile = self.pgdata().join("postmaster.pid").exists();
     641            0 :         let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
     642              : 
     643            0 :         match (has_pidfile, can_connect) {
     644            0 :             (true, true) => EndpointStatus::Running,
     645            0 :             (false, false) => EndpointStatus::Stopped,
     646            0 :             (true, false) => EndpointStatus::Crashed,
     647            0 :             (false, true) => EndpointStatus::RunningNoPidfile,
     648              :         }
     649            0 :     }
     650              : 
     651            0 :     fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
     652            0 :         let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
     653            0 :         let mut cmd = Command::new(&pg_ctl_path);
     654            0 :         cmd.args(
     655            0 :             [
     656            0 :                 &[
     657            0 :                     "-D",
     658            0 :                     self.pgdata().to_str().unwrap(),
     659            0 :                     "-w", //wait till pg_ctl actually does what was asked
     660            0 :                 ],
     661            0 :                 args,
     662            0 :             ]
     663            0 :             .concat(),
     664            0 :         )
     665            0 :         .env_clear()
     666            0 :         .env(
     667              :             "LD_LIBRARY_PATH",
     668            0 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     669              :         )
     670            0 :         .env(
     671              :             "DYLD_LIBRARY_PATH",
     672            0 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     673              :         );
     674              : 
     675              :         // Pass authentication token used for the connections to pageserver and safekeepers
     676            0 :         if let Some(token) = auth_token {
     677            0 :             cmd.env("NEON_AUTH_TOKEN", token);
     678            0 :         }
     679              : 
     680            0 :         let pg_ctl = cmd
     681            0 :             .output()
     682            0 :             .context(format!("{} failed", pg_ctl_path.display()))?;
     683            0 :         if !pg_ctl.status.success() {
     684            0 :             anyhow::bail!(
     685            0 :                 "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
     686              :                 pg_ctl.status,
     687            0 :                 String::from_utf8_lossy(&pg_ctl.stdout),
     688            0 :                 String::from_utf8_lossy(&pg_ctl.stderr),
     689              :             );
     690            0 :         }
     691              : 
     692            0 :         Ok(())
     693            0 :     }
     694              : 
     695            0 :     fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
     696              :         // TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
     697            0 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     698            0 :         let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
     699            0 :         let pid = nix::unistd::Pid::from_raw(pid as i32);
     700            0 :         if send_sigterm {
     701            0 :             kill(pid, Signal::SIGTERM).ok();
     702            0 :         }
     703            0 :         crate::background_process::wait_until_stopped("compute_ctl", pid)?;
     704            0 :         Ok(())
     705            0 :     }
     706              : 
     707            0 :     fn read_postgresql_conf(&self) -> Result<String> {
     708              :         // Slurp the endpoints/<endpoint id>/postgresql.conf file into
     709              :         // memory. We will include it in the spec file that we pass to
     710              :         // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
     711              :         // in the data directory.
     712            0 :         let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
     713            0 :         match std::fs::read(&postgresql_conf_path) {
     714            0 :             Ok(content) => Ok(String::from_utf8(content)?),
     715            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
     716            0 :             Err(e) => Err(anyhow::Error::new(e).context(format!(
     717            0 :                 "failed to read config file in {}",
     718            0 :                 postgresql_conf_path.to_str().unwrap()
     719            0 :             ))),
     720              :         }
     721            0 :     }
     722              : 
     723              :     /// Map safekeepers ids to the actual connection strings.
     724            0 :     fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
     725            0 :         let mut safekeeper_connstrings = Vec::new();
     726            0 :         if self.mode == ComputeMode::Primary {
     727            0 :             for sk_id in sk_ids {
     728            0 :                 let sk = self
     729            0 :                     .env
     730            0 :                     .safekeepers
     731            0 :                     .iter()
     732            0 :                     .find(|node| node.id == sk_id)
     733            0 :                     .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
     734            0 :                 safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
     735              :             }
     736            0 :         }
     737            0 :         Ok(safekeeper_connstrings)
     738            0 :     }
     739              : 
     740              :     /// Generate a JWT with the correct claims.
     741            0 :     pub fn generate_jwt(&self, scope: Option<ComputeClaimsScope>) -> Result<String> {
     742            0 :         self.env.generate_auth_token(&ComputeClaims {
     743            0 :             audience: match scope {
     744            0 :                 Some(ComputeClaimsScope::Admin) => Some(vec![COMPUTE_AUDIENCE.to_owned()]),
     745            0 :                 _ => None,
     746              :             },
     747            0 :             compute_id: match scope {
     748            0 :                 Some(ComputeClaimsScope::Admin) => None,
     749            0 :                 _ => Some(self.endpoint_id.clone()),
     750              :             },
     751            0 :             scope,
     752              :         })
     753            0 :     }
     754              : 
     755            0 :     pub async fn start(&self, args: EndpointStartArgs) -> Result<()> {
     756            0 :         if self.status() == EndpointStatus::Running {
     757            0 :             anyhow::bail!("The endpoint is already running");
     758            0 :         }
     759              : 
     760            0 :         let postgresql_conf = self.read_postgresql_conf()?;
     761              : 
     762              :         // We always start the compute node from scratch, so if the Postgres
     763              :         // data dir exists from a previous launch, remove it first.
     764            0 :         if self.pgdata().exists() {
     765            0 :             std::fs::remove_dir_all(self.pgdata())?;
     766            0 :         }
     767              : 
     768            0 :         let safekeeper_connstrings = self.build_safekeepers_connstrs(args.safekeepers)?;
     769              : 
     770              :         // check for file remote_extensions_spec.json
     771              :         // if it is present, read it and pass to compute_ctl
     772            0 :         let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
     773            0 :         let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
     774              :         let remote_extensions: Option<RemoteExtSpec>;
     775              : 
     776            0 :         if let Ok(spec_file) = remote_extensions_spec {
     777            0 :             remote_extensions = serde_json::from_reader(spec_file).ok();
     778            0 :         } else {
     779            0 :             remote_extensions = None;
     780            0 :         };
     781              : 
     782              :         // For the sake of backwards-compatibility, also fill in 'pageserver_connstring'
     783              :         //
     784              :         // XXX: I believe this is not really needed, except to make
     785              :         // test_forward_compatibility happy.
     786              :         //
     787              :         // Use a closure so that we can conviniently return None in the middle of the
     788              :         // loop.
     789            0 :         let pageserver_connstring: Option<String> = (|| {
     790            0 :             let num_shards = args.pageserver_conninfo.shard_count.count();
     791            0 :             let mut connstrings = Vec::new();
     792            0 :             for shard_no in 0..num_shards {
     793            0 :                 let shard_index = ShardIndex {
     794            0 :                     shard_count: args.pageserver_conninfo.shard_count,
     795            0 :                     shard_number: ShardNumber(shard_no),
     796            0 :                 };
     797            0 :                 let shard = args
     798            0 :                     .pageserver_conninfo
     799            0 :                     .shards
     800            0 :                     .get(&shard_index)
     801            0 :                     .ok_or_else(|| {
     802            0 :                         anyhow!(
     803            0 :                             "shard {} not found in pageserver_connection_info",
     804              :                             shard_index
     805              :                         )
     806            0 :                     })?;
     807            0 :                 let pageserver = shard
     808            0 :                     .pageservers
     809            0 :                     .first()
     810            0 :                     .ok_or(anyhow!("must have at least one pageserver"))?;
     811            0 :                 if let Some(libpq_url) = &pageserver.libpq_url {
     812            0 :                     connstrings.push(libpq_url.clone());
     813            0 :                 } else {
     814            0 :                     return Ok::<_, anyhow::Error>(None);
     815              :                 }
     816              :             }
     817            0 :             Ok(Some(connstrings.join(",")))
     818            0 :         })()?;
     819              : 
     820              :         // Create config file
     821            0 :         let config = {
     822            0 :             let mut spec = ComputeSpec {
     823            0 :                 skip_pg_catalog_updates: self.skip_pg_catalog_updates,
     824              :                 format_version: 1.0,
     825            0 :                 operation_uuid: None,
     826            0 :                 features: self.features.clone(),
     827            0 :                 swap_size_bytes: None,
     828            0 :                 disk_quota_bytes: None,
     829            0 :                 disable_lfc_resizing: None,
     830              :                 cluster: Cluster {
     831            0 :                     cluster_id: None, // project ID: not used
     832            0 :                     name: None,       // project name: not used
     833            0 :                     state: None,
     834            0 :                     roles: if args.create_test_user {
     835            0 :                         vec![Role {
     836            0 :                             name: PgIdent::from_str("test").unwrap(),
     837            0 :                             encrypted_password: None,
     838            0 :                             options: None,
     839            0 :                         }]
     840              :                     } else {
     841            0 :                         Vec::new()
     842              :                     },
     843            0 :                     databases: if args.create_test_user {
     844            0 :                         vec![Database {
     845            0 :                             name: PgIdent::from_str("neondb").unwrap(),
     846            0 :                             owner: PgIdent::from_str("test").unwrap(),
     847            0 :                             options: None,
     848            0 :                             restrict_conn: false,
     849            0 :                             invalid: false,
     850            0 :                         }]
     851              :                     } else {
     852            0 :                         Vec::new()
     853              :                     },
     854            0 :                     settings: None,
     855            0 :                     postgresql_conf: Some(postgresql_conf.clone()),
     856              :                 },
     857            0 :                 delta_operations: None,
     858            0 :                 tenant_id: Some(self.tenant_id),
     859            0 :                 timeline_id: Some(self.timeline_id),
     860            0 :                 project_id: None,
     861            0 :                 branch_id: None,
     862            0 :                 endpoint_id: Some(self.endpoint_id.clone()),
     863            0 :                 mode: self.mode,
     864            0 :                 pageserver_connection_info: Some(args.pageserver_conninfo.clone()),
     865            0 :                 pageserver_connstring,
     866            0 :                 safekeepers_generation: args.safekeepers_generation.map(|g| g.into_inner()),
     867            0 :                 safekeeper_connstrings,
     868            0 :                 storage_auth_token: args.auth_token.clone(),
     869            0 :                 remote_extensions,
     870            0 :                 pgbouncer_settings: None,
     871            0 :                 shard_stripe_size: args.pageserver_conninfo.stripe_size, // redundant with pageserver_connection_info.stripe_size
     872            0 :                 local_proxy_config: None,
     873            0 :                 reconfigure_concurrency: self.reconfigure_concurrency,
     874            0 :                 drop_subscriptions_before_start: self.drop_subscriptions_before_start,
     875            0 :                 audit_log_level: ComputeAudit::Disabled,
     876            0 :                 logs_export_host: None::<String>,
     877            0 :                 endpoint_storage_addr: Some(args.endpoint_storage_addr),
     878            0 :                 endpoint_storage_token: Some(args.endpoint_storage_token),
     879            0 :                 autoprewarm: args.autoprewarm,
     880            0 :                 offload_lfc_interval_seconds: args.offload_lfc_interval_seconds,
     881              :                 suspend_timeout_seconds: -1, // Only used in neon_local.
     882            0 :                 databricks_settings: None,
     883              :             };
     884              : 
     885              :             // this strange code is needed to support respec() in tests
     886            0 :             if self.cluster.is_some() {
     887            0 :                 debug!("Cluster is already set in the endpoint spec, using it");
     888            0 :                 spec.cluster = self.cluster.clone().unwrap();
     889              : 
     890            0 :                 debug!("spec.cluster {:?}", spec.cluster);
     891              : 
     892              :                 // fill missing fields again
     893            0 :                 if args.create_test_user {
     894            0 :                     spec.cluster.roles.push(Role {
     895            0 :                         name: PgIdent::from_str("test").unwrap(),
     896            0 :                         encrypted_password: None,
     897            0 :                         options: None,
     898            0 :                     });
     899            0 :                     spec.cluster.databases.push(Database {
     900            0 :                         name: PgIdent::from_str("neondb").unwrap(),
     901            0 :                         owner: PgIdent::from_str("test").unwrap(),
     902            0 :                         options: None,
     903            0 :                         restrict_conn: false,
     904            0 :                         invalid: false,
     905            0 :                     });
     906            0 :                 }
     907            0 :                 spec.cluster.postgresql_conf = Some(postgresql_conf);
     908            0 :             }
     909              : 
     910            0 :             ComputeConfig {
     911            0 :                 spec: Some(spec),
     912            0 :                 compute_ctl_config: self.compute_ctl_config.clone(),
     913            0 :             }
     914              :         };
     915              : 
     916            0 :         let config_path = self.endpoint_path().join("config.json");
     917            0 :         std::fs::write(config_path, serde_json::to_string_pretty(&config)?)?;
     918              : 
     919              :         // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
     920            0 :         let logfile = std::fs::OpenOptions::new()
     921            0 :             .create(true)
     922            0 :             .append(true)
     923            0 :             .open(self.endpoint_path().join("compute.log"))?;
     924              : 
     925              :         // Launch compute_ctl
     926            0 :         let conn_str = self.connstr("cloud_admin", "postgres");
     927            0 :         println!("Starting postgres node at '{conn_str}'");
     928            0 :         if args.create_test_user {
     929            0 :             let conn_str = self.connstr("test", "neondb");
     930            0 :             println!("Also at '{conn_str}'");
     931            0 :         }
     932            0 :         let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
     933            0 :         cmd.args([
     934            0 :             "--external-http-port",
     935            0 :             &self.external_http_address.port().to_string(),
     936            0 :         ])
     937            0 :         .args([
     938            0 :             "--internal-http-port",
     939            0 :             &self.internal_http_address.port().to_string(),
     940            0 :         ])
     941            0 :         .args(["--pgdata", self.pgdata().to_str().unwrap()])
     942            0 :         .args(["--connstr", &conn_str])
     943            0 :         .arg("--config")
     944            0 :         .arg(self.endpoint_path().join("config.json").as_os_str())
     945            0 :         .args([
     946              :             "--pgbin",
     947            0 :             self.env
     948            0 :                 .pg_bin_dir(self.pg_version)?
     949            0 :                 .join("postgres")
     950            0 :                 .to_str()
     951            0 :                 .unwrap(),
     952              :         ])
     953              :         // TODO: It would be nice if we generated compute IDs with the same
     954              :         // algorithm as the real control plane.
     955            0 :         .args(["--compute-id", &self.endpoint_id])
     956            0 :         .stdin(std::process::Stdio::null())
     957            0 :         .stderr(logfile.try_clone()?)
     958            0 :         .stdout(logfile);
     959              : 
     960            0 :         if let Some(remote_ext_base_url) = args.remote_ext_base_url {
     961            0 :             cmd.args(["--remote-ext-base-url", &remote_ext_base_url]);
     962            0 :         }
     963              : 
     964            0 :         if args.dev {
     965            0 :             cmd.arg("--dev");
     966            0 :         }
     967              : 
     968            0 :         if let Some(privileged_role_name) = self.privileged_role_name.clone() {
     969            0 :             cmd.args(["--privileged-role-name", &privileged_role_name]);
     970            0 :         }
     971              : 
     972            0 :         let child = cmd.spawn()?;
     973              :         // set up a scopeguard to kill & wait for the child in case we panic or bail below
     974            0 :         let child = scopeguard::guard(child, |mut child| {
     975            0 :             println!("SIGKILL & wait the started process");
     976            0 :             (|| {
     977              :                 // TODO: use another signal that can be caught by the child so it can clean up any children it spawned
     978            0 :                 child.kill().context("SIGKILL child")?;
     979            0 :                 child.wait().context("wait() for child process")?;
     980            0 :                 anyhow::Ok(())
     981              :             })()
     982            0 :             .with_context(|| format!("scopeguard kill&wait child {child:?}"))
     983            0 :             .unwrap();
     984            0 :         });
     985              : 
     986              :         // Write down the pid so we can wait for it when we want to stop
     987              :         // TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
     988            0 :         let pid = child.id();
     989            0 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     990            0 :         std::fs::write(pidfile_path, pid.to_string())?;
     991              : 
     992              :         // Wait for it to start
     993              :         const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
     994            0 :         let start_at = Instant::now();
     995              :         loop {
     996            0 :             match self.get_status().await {
     997            0 :                 Ok(state) => {
     998            0 :                     match state.status {
     999              :                         ComputeStatus::Init => {
    1000            0 :                             let timeout = args.start_timeout;
    1001            0 :                             if Instant::now().duration_since(start_at) > timeout {
    1002            0 :                                 bail!(
    1003            0 :                                     "compute startup timed out {:?}; still in Init state",
    1004              :                                     timeout
    1005              :                                 );
    1006            0 :                             }
    1007              :                             // keep retrying
    1008              :                         }
    1009              :                         ComputeStatus::Running => {
    1010              :                             // All good!
    1011            0 :                             break;
    1012              :                         }
    1013              :                         ComputeStatus::Failed => {
    1014            0 :                             bail!(
    1015            0 :                                 "compute startup failed: {}",
    1016            0 :                                 state
    1017            0 :                                     .error
    1018            0 :                                     .as_deref()
    1019            0 :                                     .unwrap_or("<no error from compute_ctl>")
    1020              :                             );
    1021              :                         }
    1022              :                         ComputeStatus::Empty
    1023              :                         | ComputeStatus::ConfigurationPending
    1024              :                         | ComputeStatus::Configuration
    1025              :                         | ComputeStatus::TerminationPendingFast
    1026              :                         | ComputeStatus::TerminationPendingImmediate
    1027              :                         | ComputeStatus::Terminated
    1028              :                         | ComputeStatus::RefreshConfigurationPending
    1029              :                         | ComputeStatus::RefreshConfiguration => {
    1030            0 :                             bail!("unexpected compute status: {:?}", state.status)
    1031              :                         }
    1032              :                     }
    1033              :                 }
    1034            0 :                 Err(e) => {
    1035            0 :                     if Instant::now().duration_since(start_at) > args.start_timeout {
    1036            0 :                         return Err(e).context(format!(
    1037            0 :                             "timed out {:?} waiting to connect to compute_ctl HTTP",
    1038              :                             args.start_timeout
    1039              :                         ));
    1040            0 :                     }
    1041              :                 }
    1042              :             }
    1043            0 :             tokio::time::sleep(ATTEMPT_INTERVAL).await;
    1044              :         }
    1045              : 
    1046              :         // disarm the scopeguard, let the child outlive this function (and neon_local invoction)
    1047            0 :         drop(scopeguard::ScopeGuard::into_inner(child));
    1048              : 
    1049            0 :         Ok(())
    1050            0 :     }
    1051              : 
    1052              :     // Update the pageservers in the spec file of the endpoint. This is useful to test the spec refresh scenario.
    1053            0 :     pub async fn update_pageservers_in_config(
    1054            0 :         &self,
    1055            0 :         pageserver_conninfo: &PageserverConnectionInfo,
    1056            0 :     ) -> Result<()> {
    1057            0 :         let config_path = self.endpoint_path().join("config.json");
    1058            0 :         let mut config: ComputeConfig = {
    1059            0 :             let file = std::fs::File::open(&config_path)?;
    1060            0 :             serde_json::from_reader(file)?
    1061              :         };
    1062              : 
    1063            0 :         let mut spec = config.spec.unwrap();
    1064            0 :         spec.pageserver_connection_info = Some(pageserver_conninfo.clone());
    1065            0 :         config.spec = Some(spec);
    1066              : 
    1067            0 :         let file = std::fs::File::create(&config_path)?;
    1068            0 :         serde_json::to_writer_pretty(file, &config)?;
    1069              : 
    1070            0 :         Ok(())
    1071            0 :     }
    1072              : 
    1073              :     // Call the /status HTTP API
    1074            0 :     pub async fn get_status(&self) -> Result<ComputeStatusResponse> {
    1075            0 :         let client = reqwest::Client::new();
    1076              : 
    1077            0 :         let response = client
    1078            0 :             .request(
    1079            0 :                 reqwest::Method::GET,
    1080            0 :                 format!(
    1081            0 :                     "http://{}:{}/status",
    1082            0 :                     self.external_http_address.ip(),
    1083            0 :                     self.external_http_address.port()
    1084              :                 ),
    1085              :             )
    1086            0 :             .bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
    1087            0 :             .send()
    1088            0 :             .await?;
    1089              : 
    1090              :         // Interpret the response
    1091            0 :         let status = response.status();
    1092            0 :         if !(status.is_client_error() || status.is_server_error()) {
    1093            0 :             Ok(response.json().await?)
    1094              :         } else {
    1095              :             // reqwest does not export its error construction utility functions, so let's craft the message ourselves
    1096            0 :             let url = response.url().to_owned();
    1097            0 :             let msg = match response.text().await {
    1098            0 :                 Ok(err_body) => format!("Error: {err_body}"),
    1099            0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
    1100              :             };
    1101            0 :             Err(anyhow::anyhow!(msg))
    1102              :         }
    1103            0 :     }
    1104              : 
    1105            0 :     pub async fn reconfigure(
    1106            0 :         &self,
    1107            0 :         pageserver_conninfo: Option<&PageserverConnectionInfo>,
    1108            0 :         safekeepers: Option<Vec<NodeId>>,
    1109            0 :         safekeeper_generation: Option<SafekeeperGeneration>,
    1110            0 :     ) -> Result<()> {
    1111            0 :         let (mut spec, compute_ctl_config) = {
    1112            0 :             let config_path = self.endpoint_path().join("config.json");
    1113            0 :             let file = std::fs::File::open(config_path)?;
    1114            0 :             let config: ComputeConfig = serde_json::from_reader(file)?;
    1115              : 
    1116            0 :             (config.spec.unwrap(), config.compute_ctl_config)
    1117              :         };
    1118              : 
    1119            0 :         let postgresql_conf = self.read_postgresql_conf()?;
    1120            0 :         spec.cluster.postgresql_conf = Some(postgresql_conf);
    1121              : 
    1122            0 :         if let Some(pageserver_conninfo) = pageserver_conninfo {
    1123              :             // If pageservers are provided, we need to ensure that they are not empty.
    1124              :             // This is a requirement for the compute_ctl configuration.
    1125            0 :             anyhow::ensure!(
    1126            0 :                 !pageserver_conninfo.shards.is_empty(),
    1127            0 :                 "no pageservers provided"
    1128              :             );
    1129            0 :             spec.pageserver_connection_info = Some(pageserver_conninfo.clone());
    1130            0 :             spec.shard_stripe_size = pageserver_conninfo.stripe_size;
    1131            0 :         }
    1132              : 
    1133              :         // If safekeepers are not specified, don't change them.
    1134            0 :         if let Some(safekeepers) = safekeepers {
    1135            0 :             let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
    1136            0 :             spec.safekeeper_connstrings = safekeeper_connstrings;
    1137            0 :             if let Some(g) = safekeeper_generation {
    1138            0 :                 spec.safekeepers_generation = Some(g.into_inner());
    1139            0 :             }
    1140            0 :         }
    1141              : 
    1142            0 :         let client = reqwest::Client::builder()
    1143            0 :             .timeout(Duration::from_secs(120))
    1144            0 :             .build()
    1145            0 :             .unwrap();
    1146            0 :         let response = client
    1147            0 :             .post(format!(
    1148            0 :                 "http://{}:{}/configure",
    1149            0 :                 self.external_http_address.ip(),
    1150            0 :                 self.external_http_address.port()
    1151              :             ))
    1152            0 :             .header(CONTENT_TYPE.as_str(), "application/json")
    1153            0 :             .bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
    1154            0 :             .body(
    1155            0 :                 serde_json::to_string(&ConfigurationRequest {
    1156            0 :                     spec,
    1157            0 :                     compute_ctl_config,
    1158            0 :                 })
    1159            0 :                 .unwrap(),
    1160              :             )
    1161            0 :             .send()
    1162            0 :             .await?;
    1163              : 
    1164            0 :         let status = response.status();
    1165            0 :         if !(status.is_client_error() || status.is_server_error()) {
    1166            0 :             Ok(())
    1167              :         } else {
    1168            0 :             let url = response.url().to_owned();
    1169            0 :             let msg = match response.text().await {
    1170            0 :                 Ok(err_body) => format!("Error: {err_body}"),
    1171            0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
    1172              :             };
    1173            0 :             Err(anyhow::anyhow!(msg))
    1174              :         }
    1175            0 :     }
    1176              : 
    1177            0 :     pub async fn reconfigure_pageservers(
    1178            0 :         &self,
    1179            0 :         pageservers: &PageserverConnectionInfo,
    1180            0 :     ) -> Result<()> {
    1181            0 :         self.reconfigure(Some(pageservers), None, None).await
    1182            0 :     }
    1183              : 
    1184            0 :     pub async fn reconfigure_safekeepers(
    1185            0 :         &self,
    1186            0 :         safekeepers: Vec<NodeId>,
    1187            0 :         generation: SafekeeperGeneration,
    1188            0 :     ) -> Result<()> {
    1189            0 :         self.reconfigure(None, Some(safekeepers), Some(generation))
    1190            0 :             .await
    1191            0 :     }
    1192              : 
    1193            0 :     pub async fn stop(
    1194            0 :         &self,
    1195            0 :         mode: EndpointTerminateMode,
    1196            0 :         destroy: bool,
    1197            0 :     ) -> Result<TerminateResponse> {
    1198              :         // pg_ctl stop is fast but doesn't allow us to collect LSN. /terminate is
    1199              :         // slow, and test runs time out. Solution: special mode "immediate-terminate"
    1200              :         // which uses /terminate
    1201            0 :         let response = if let EndpointTerminateMode::ImmediateTerminate = mode {
    1202            0 :             let ip = self.external_http_address.ip();
    1203            0 :             let port = self.external_http_address.port();
    1204            0 :             let url = format!("http://{ip}:{port}/terminate?mode=immediate");
    1205            0 :             let token = self.generate_jwt(Some(ComputeClaimsScope::Admin))?;
    1206            0 :             let request = reqwest::Client::new().post(url).bearer_auth(token);
    1207            0 :             let response = request.send().await.context("/terminate")?;
    1208            0 :             let text = response.text().await.context("/terminate result")?;
    1209            0 :             serde_json::from_str(&text).with_context(|| format!("deserializing {text}"))?
    1210              :         } else {
    1211            0 :             self.pg_ctl(&["-m", &mode.to_string(), "stop"], &None)?;
    1212            0 :             TerminateResponse { lsn: None }
    1213              :         };
    1214              : 
    1215              :         // Also wait for the compute_ctl process to die. It might have some
    1216              :         // cleanup work to do after postgres stops, like syncing safekeepers,
    1217              :         // etc.
    1218              :         //
    1219              :         // If destroying or stop mode is immediate, send it SIGTERM before
    1220              :         // waiting. Sometimes we do *not* want this cleanup: tests intentionally
    1221              :         // do stop when majority of safekeepers is down, so sync-safekeepers
    1222              :         // would hang otherwise. This could be a separate flag though.
    1223            0 :         let send_sigterm = destroy || !matches!(mode, EndpointTerminateMode::Fast);
    1224            0 :         self.wait_for_compute_ctl_to_exit(send_sigterm)?;
    1225            0 :         if destroy {
    1226            0 :             println!(
    1227            0 :                 "Destroying postgres data directory '{}'",
    1228            0 :                 self.pgdata().to_str().unwrap()
    1229              :             );
    1230            0 :             std::fs::remove_dir_all(self.endpoint_path())?;
    1231            0 :         }
    1232            0 :         Ok(response)
    1233            0 :     }
    1234              : 
    1235            0 :     pub async fn refresh_configuration(&self) -> Result<()> {
    1236            0 :         let client = reqwest::Client::builder()
    1237            0 :             .timeout(Duration::from_secs(30))
    1238            0 :             .build()
    1239            0 :             .unwrap();
    1240            0 :         let response = client
    1241            0 :             .post(format!(
    1242            0 :                 "http://{}:{}/refresh_configuration",
    1243            0 :                 self.internal_http_address.ip(),
    1244            0 :                 self.internal_http_address.port()
    1245            0 :             ))
    1246            0 :             .send()
    1247            0 :             .await?;
    1248              : 
    1249            0 :         let status = response.status();
    1250            0 :         if !(status.is_client_error() || status.is_server_error()) {
    1251            0 :             Ok(())
    1252              :         } else {
    1253            0 :             let url = response.url().to_owned();
    1254            0 :             let msg = match response.text().await {
    1255            0 :                 Ok(err_body) => format!("Error: {err_body}"),
    1256            0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
    1257              :             };
    1258            0 :             Err(anyhow::anyhow!(msg))
    1259              :         }
    1260            0 :     }
    1261              : 
    1262            0 :     pub fn connstr(&self, user: &str, db_name: &str) -> String {
    1263            0 :         format!(
    1264            0 :             "postgresql://{}@{}:{}/{}",
    1265              :             user,
    1266            0 :             self.pg_address.ip(),
    1267            0 :             self.pg_address.port(),
    1268              :             db_name
    1269              :         )
    1270            0 :     }
    1271              : }
    1272              : 
    1273              : /// If caller is telling us what pageserver to use, this is not a tenant which is
    1274              : /// fully managed by storage controller, therefore not sharded.
    1275            0 : pub fn local_pageserver_conf_to_conn_info(
    1276            0 :     conf: &crate::local_env::PageServerConf,
    1277            0 : ) -> Result<PageserverConnectionInfo> {
    1278            0 :     let libpq_url = {
    1279            0 :         let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
    1280            0 :         let port = port.unwrap_or(5432);
    1281            0 :         Some(format!("postgres://no_user@{host}:{port}"))
    1282              :     };
    1283            0 :     let grpc_url = if let Some(grpc_addr) = &conf.listen_grpc_addr {
    1284            0 :         let (host, port) = parse_host_port(grpc_addr)?;
    1285            0 :         let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
    1286            0 :         Some(format!("grpc://no_user@{host}:{port}"))
    1287              :     } else {
    1288            0 :         None
    1289              :     };
    1290            0 :     let ps_conninfo = PageserverShardConnectionInfo {
    1291            0 :         id: Some(conf.id),
    1292            0 :         libpq_url,
    1293            0 :         grpc_url,
    1294            0 :     };
    1295              : 
    1296            0 :     let shard_info = PageserverShardInfo {
    1297            0 :         pageservers: vec![ps_conninfo],
    1298            0 :     };
    1299              : 
    1300            0 :     let shards: HashMap<_, _> = vec![(ShardIndex::unsharded(), shard_info)]
    1301            0 :         .into_iter()
    1302            0 :         .collect();
    1303            0 :     Ok(PageserverConnectionInfo {
    1304            0 :         shard_count: ShardCount::unsharded(),
    1305            0 :         stripe_size: None,
    1306            0 :         shards,
    1307            0 :         prefer_protocol: PageserverProtocol::default(),
    1308            0 :     })
    1309            0 : }
    1310              : 
    1311            0 : pub fn tenant_locate_response_to_conn_info(
    1312            0 :     response: &pageserver_api::controller_api::TenantLocateResponse,
    1313            0 : ) -> Result<PageserverConnectionInfo> {
    1314            0 :     let mut shards = HashMap::new();
    1315            0 :     for shard in response.shards.iter() {
    1316            0 :         tracing::info!("parsing {}", shard.listen_pg_addr);
    1317            0 :         let libpq_url = {
    1318            0 :             let host = &shard.listen_pg_addr;
    1319            0 :             let port = shard.listen_pg_port;
    1320            0 :             Some(format!("postgres://no_user@{host}:{port}"))
    1321              :         };
    1322            0 :         let grpc_url = if let Some(grpc_addr) = &shard.listen_grpc_addr {
    1323            0 :             let host = grpc_addr;
    1324            0 :             let port = shard.listen_grpc_port.expect("no gRPC port");
    1325            0 :             Some(format!("grpc://no_user@{host}:{port}"))
    1326              :         } else {
    1327            0 :             None
    1328              :         };
    1329              : 
    1330            0 :         let shard_info = PageserverShardInfo {
    1331            0 :             pageservers: vec![PageserverShardConnectionInfo {
    1332            0 :                 id: Some(shard.node_id),
    1333            0 :                 libpq_url,
    1334            0 :                 grpc_url,
    1335            0 :             }],
    1336            0 :         };
    1337              : 
    1338            0 :         shards.insert(shard.shard_id.to_index(), shard_info);
    1339              :     }
    1340              : 
    1341            0 :     let stripe_size = if response.shard_params.count.is_unsharded() {
    1342            0 :         None
    1343              :     } else {
    1344            0 :         Some(response.shard_params.stripe_size)
    1345              :     };
    1346            0 :     Ok(PageserverConnectionInfo {
    1347            0 :         shard_count: response.shard_params.count,
    1348            0 :         stripe_size,
    1349            0 :         shards,
    1350            0 :         prefer_protocol: PageserverProtocol::default(),
    1351            0 :     })
    1352            0 : }
        

Generated by: LCOV version 2.1-beta