LCOV - code coverage report
Current view: top level - control_plane/src - endpoint.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 87.8 % 534 469
Test Date: 2024-02-14 18:05:35 Functions: 71.7 % 53 38

            Line data    Source code
       1              : //! Code to manage compute endpoints
       2              : //!
       3              : //! In the local test environment, the data for each endpoint is stored in
       4              : //!
       5              : //! ```text
       6              : //!   .neon/endpoints/<endpoint id>
       7              : //! ```
       8              : //!
       9              : //! Some basic information about the endpoint, like the tenant and timeline IDs,
      10              : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
      11              : //! when the endpoint is created, and doesn't change afterwards.
      12              : //!
      13              : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
      14              : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
      15              : //! the basebackup from the pageserver to initialize the the data directory, and
      16              : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
      17              : //! until it exits.
      18              : //!
      19              : //! When an endpoint is created, a `postgresql.conf` file is also created in
      20              : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
      21              : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
      22              : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
      23              : //! copy of it in the data directory.
      24              : //!
      25              : //! Directory contents:
      26              : //!
      27              : //! ```text
      28              : //! .neon/endpoints/main/
      29              : //!     compute.log               - log output of `compute_ctl` and `postgres`
      30              : //!     endpoint.json             - serialized `EndpointConf` struct
      31              : //!     postgresql.conf           - postgresql settings
      32              : //!     spec.json                 - passed to `compute_ctl`
      33              : //!     pgdata/
      34              : //!         postgresql.conf       - copy of postgresql.conf created by `compute_ctl`
      35              : //!         zenith.signal
      36              : //!         <other PostgreSQL files>
      37              : //! ```
      38              : //!
      39              : use std::collections::BTreeMap;
      40              : use std::net::SocketAddr;
      41              : use std::net::TcpStream;
      42              : use std::path::PathBuf;
      43              : use std::process::Command;
      44              : use std::sync::Arc;
      45              : use std::time::Duration;
      46              : 
      47              : use anyhow::{anyhow, bail, Context, Result};
      48              : use compute_api::spec::RemoteExtSpec;
      49              : use nix::sys::signal::kill;
      50              : use nix::sys::signal::Signal;
      51              : use serde::{Deserialize, Serialize};
      52              : use url::Host;
      53              : use utils::id::{NodeId, TenantId, TimelineId};
      54              : 
      55              : use crate::attachment_service::AttachmentService;
      56              : use crate::local_env::LocalEnv;
      57              : use crate::postgresql_conf::PostgresConf;
      58              : 
      59              : use compute_api::responses::{ComputeState, ComputeStatus};
      60              : use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
      61              : 
      62              : // contents of a endpoint.json file
      63        73815 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
      64              : pub struct EndpointConf {
      65              :     endpoint_id: String,
      66              :     tenant_id: TenantId,
      67              :     timeline_id: TimelineId,
      68              :     mode: ComputeMode,
      69              :     pg_port: u16,
      70              :     http_port: u16,
      71              :     pg_version: u32,
      72              :     skip_pg_catalog_updates: bool,
      73              :     features: Vec<ComputeFeature>,
      74              : }
      75              : 
      76              : //
      77              : // ComputeControlPlane
      78              : //
      79              : pub struct ComputeControlPlane {
      80              :     base_port: u16,
      81              : 
      82              :     // endpoint ID is the key
      83              :     pub endpoints: BTreeMap<String, Arc<Endpoint>>,
      84              : 
      85              :     env: LocalEnv,
      86              : }
      87              : 
      88              : impl ComputeControlPlane {
      89              :     // Load current endpoints from the endpoints/ subdirectories
      90         2408 :     pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
      91         2408 :         let mut endpoints = BTreeMap::default();
      92         3885 :         for endpoint_dir in std::fs::read_dir(env.endpoints_path())
      93         2408 :             .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
      94         3885 :         {
      95         3885 :             let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
      96         3885 :             endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
      97              :         }
      98              : 
      99         2408 :         Ok(ComputeControlPlane {
     100         2408 :             base_port: 55431,
     101         2408 :             endpoints,
     102         2408 :             env,
     103         2408 :         })
     104         2408 :     }
     105              : 
     106            6 :     fn get_port(&mut self) -> u16 {
     107            6 :         1 + self
     108            6 :             .endpoints
     109            6 :             .values()
     110            6 :             .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port()))
     111            6 :             .max()
     112            6 :             .unwrap_or(self.base_port)
     113            6 :     }
     114              : 
     115              :     #[allow(clippy::too_many_arguments)]
     116          518 :     pub fn new_endpoint(
     117          518 :         &mut self,
     118          518 :         endpoint_id: &str,
     119          518 :         tenant_id: TenantId,
     120          518 :         timeline_id: TimelineId,
     121          518 :         pg_port: Option<u16>,
     122          518 :         http_port: Option<u16>,
     123          518 :         pg_version: u32,
     124          518 :         mode: ComputeMode,
     125          518 :     ) -> Result<Arc<Endpoint>> {
     126          518 :         let pg_port = pg_port.unwrap_or_else(|| self.get_port());
     127          518 :         let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
     128          518 :         let ep = Arc::new(Endpoint {
     129          518 :             endpoint_id: endpoint_id.to_owned(),
     130          518 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
     131          518 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
     132          518 :             env: self.env.clone(),
     133          518 :             timeline_id,
     134          518 :             mode,
     135          518 :             tenant_id,
     136          518 :             pg_version,
     137          518 :             // We don't setup roles and databases in the spec locally, so we don't need to
     138          518 :             // do catalog updates. Catalog updates also include check availability
     139          518 :             // data creation. Yet, we have tests that check that size and db dump
     140          518 :             // before and after start are the same. So, skip catalog updates,
     141          518 :             // with this we basically test a case of waking up an idle compute, where
     142          518 :             // we also skip catalog updates in the cloud.
     143          518 :             skip_pg_catalog_updates: true,
     144          518 :             features: vec![],
     145          518 :         });
     146          518 : 
     147          518 :         ep.create_endpoint_dir()?;
     148              :         std::fs::write(
     149          518 :             ep.endpoint_path().join("endpoint.json"),
     150          518 :             serde_json::to_string_pretty(&EndpointConf {
     151          518 :                 endpoint_id: endpoint_id.to_string(),
     152          518 :                 tenant_id,
     153          518 :                 timeline_id,
     154          518 :                 mode,
     155          518 :                 http_port,
     156          518 :                 pg_port,
     157          518 :                 pg_version,
     158          518 :                 skip_pg_catalog_updates: true,
     159          518 :                 features: vec![],
     160          518 :             })?,
     161            0 :         )?;
     162              :         std::fs::write(
     163          518 :             ep.endpoint_path().join("postgresql.conf"),
     164          518 :             ep.setup_pg_conf()?.to_string(),
     165            0 :         )?;
     166              : 
     167          518 :         self.endpoints
     168          518 :             .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
     169          518 : 
     170          518 :         Ok(ep)
     171          518 :     }
     172              : 
     173         1126 :     pub fn check_conflicting_endpoints(
     174         1126 :         &self,
     175         1126 :         mode: ComputeMode,
     176         1126 :         tenant_id: TenantId,
     177         1126 :         timeline_id: TimelineId,
     178         1126 :     ) -> Result<()> {
     179         1126 :         if matches!(mode, ComputeMode::Primary) {
     180              :             // this check is not complete, as you could have a concurrent attempt at
     181              :             // creating another primary, both reading the state before checking it here,
     182              :             // but it's better than nothing.
     183         1513 :             let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
     184         1513 :                 v.tenant_id == tenant_id
     185         1392 :                     && v.timeline_id == timeline_id
     186          956 :                     && v.mode == mode
     187          952 :                     && v.status() != EndpointStatus::Stopped
     188         1513 :             });
     189              : 
     190         1024 :             if let Some((key, _)) = duplicates.next() {
     191           29 :                 bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
     192          995 :             }
     193          102 :         }
     194         1097 :         Ok(())
     195         1126 :     }
     196              : }
     197              : 
     198              : ///////////////////////////////////////////////////////////////////////////////
     199              : 
     200            0 : #[derive(Debug)]
     201              : pub struct Endpoint {
     202              :     /// used as the directory name
     203              :     endpoint_id: String,
     204              :     pub tenant_id: TenantId,
     205              :     pub timeline_id: TimelineId,
     206              :     pub mode: ComputeMode,
     207              : 
     208              :     // port and address of the Postgres server and `compute_ctl`'s HTTP API
     209              :     pub pg_address: SocketAddr,
     210              :     pub http_address: SocketAddr,
     211              : 
     212              :     // postgres major version in the format: 14, 15, etc.
     213              :     pg_version: u32,
     214              : 
     215              :     // These are not part of the endpoint as such, but the environment
     216              :     // the endpoint runs in.
     217              :     pub env: LocalEnv,
     218              : 
     219              :     // Optimizations
     220              :     skip_pg_catalog_updates: bool,
     221              : 
     222              :     // Feature flags
     223              :     features: Vec<ComputeFeature>,
     224              : }
     225              : 
     226         1543 : #[derive(PartialEq, Eq)]
     227              : pub enum EndpointStatus {
     228              :     Running,
     229              :     Stopped,
     230              :     Crashed,
     231              :     RunningNoPidfile,
     232              : }
     233              : 
     234              : impl std::fmt::Display for EndpointStatus {
     235            0 :     fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
     236            0 :         let s = match self {
     237            0 :             Self::Running => "running",
     238            0 :             Self::Stopped => "stopped",
     239            0 :             Self::Crashed => "crashed",
     240            0 :             Self::RunningNoPidfile => "running, no pidfile",
     241              :         };
     242            0 :         write!(writer, "{}", s)
     243            0 :     }
     244              : }
     245              : 
     246              : impl Endpoint {
     247         3885 :     fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
     248         3885 :         if !entry.file_type()?.is_dir() {
     249            0 :             anyhow::bail!(
     250            0 :                 "Endpoint::from_dir_entry failed: '{}' is not a directory",
     251            0 :                 entry.path().display()
     252            0 :             );
     253         3885 :         }
     254         3885 : 
     255         3885 :         // parse data directory name
     256         3885 :         let fname = entry.file_name();
     257         3885 :         let endpoint_id = fname.to_str().unwrap().to_string();
     258              : 
     259              :         // Read the endpoint.json file
     260         3885 :         let conf: EndpointConf =
     261         3885 :             serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
     262              : 
     263         3885 :         Ok(Endpoint {
     264         3885 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
     265         3885 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
     266         3885 :             endpoint_id,
     267         3885 :             env: env.clone(),
     268         3885 :             timeline_id: conf.timeline_id,
     269         3885 :             mode: conf.mode,
     270         3885 :             tenant_id: conf.tenant_id,
     271         3885 :             pg_version: conf.pg_version,
     272         3885 :             skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
     273         3885 :             features: conf.features,
     274         3885 :         })
     275         3885 :     }
     276              : 
     277          518 :     fn create_endpoint_dir(&self) -> Result<()> {
     278          518 :         std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
     279            0 :             format!(
     280            0 :                 "could not create endpoint directory {}",
     281            0 :                 self.endpoint_path().display()
     282            0 :             )
     283          518 :         })
     284          518 :     }
     285              : 
     286              :     // Generate postgresql.conf with default configuration
     287          518 :     fn setup_pg_conf(&self) -> Result<PostgresConf> {
     288          518 :         let mut conf = PostgresConf::new();
     289          518 :         conf.append("max_wal_senders", "10");
     290          518 :         conf.append("wal_log_hints", "off");
     291          518 :         conf.append("max_replication_slots", "10");
     292          518 :         conf.append("hot_standby", "on");
     293          518 :         conf.append("shared_buffers", "1MB");
     294          518 :         conf.append("fsync", "off");
     295          518 :         conf.append("max_connections", "100");
     296          518 :         conf.append("wal_level", "logical");
     297          518 :         // wal_sender_timeout is the maximum time to wait for WAL replication.
     298          518 :         // It also defines how often the walreciever will send a feedback message to the wal sender.
     299          518 :         conf.append("wal_sender_timeout", "5s");
     300          518 :         conf.append("listen_addresses", &self.pg_address.ip().to_string());
     301          518 :         conf.append("port", &self.pg_address.port().to_string());
     302          518 :         conf.append("wal_keep_size", "0");
     303          518 :         // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
     304          518 :         conf.append("restart_after_crash", "off");
     305          518 : 
     306          518 :         // Load the 'neon' extension
     307          518 :         conf.append("shared_preload_libraries", "neon");
     308          518 : 
     309          518 :         conf.append_line("");
     310          518 :         // Replication-related configurations, such as WAL sending
     311          518 :         match &self.mode {
     312              :             ComputeMode::Primary => {
     313              :                 // Configure backpressure
     314              :                 // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
     315              :                 //   This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
     316              :                 //   so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
     317              :                 //   Actually latency should be much smaller (better if < 1sec). But we assume that recently
     318              :                 //   updates pages are not requested from pageserver.
     319              :                 // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
     320              :                 //   delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
     321              :                 //   remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
     322              :                 //   recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
     323              :                 // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
     324              :                 //   To be able to restore database in case of pageserver node crash, safekeeper should not
     325              :                 //   remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
     326              :                 //   (if they are not able to upload WAL to S3).
     327          467 :                 conf.append("max_replication_write_lag", "15MB");
     328          467 :                 conf.append("max_replication_flush_lag", "10GB");
     329          467 : 
     330          467 :                 if !self.env.safekeepers.is_empty() {
     331          467 :                     // Configure Postgres to connect to the safekeepers
     332          467 :                     conf.append("synchronous_standby_names", "walproposer");
     333          467 : 
     334          467 :                     let safekeepers = self
     335          467 :                         .env
     336          467 :                         .safekeepers
     337          467 :                         .iter()
     338          608 :                         .map(|sk| format!("localhost:{}", sk.get_compute_port()))
     339          467 :                         .collect::<Vec<String>>()
     340          467 :                         .join(",");
     341          467 :                     conf.append("neon.safekeepers", &safekeepers);
     342          467 :                 } else {
     343            0 :                     // We only use setup without safekeepers for tests,
     344            0 :                     // and don't care about data durability on pageserver,
     345            0 :                     // so set more relaxed synchronous_commit.
     346            0 :                     conf.append("synchronous_commit", "remote_write");
     347            0 : 
     348            0 :                     // Configure the node to stream WAL directly to the pageserver
     349            0 :                     // This isn't really a supported configuration, but can be useful for
     350            0 :                     // testing.
     351            0 :                     conf.append("synchronous_standby_names", "pageserver");
     352            0 :                 }
     353              :             }
     354           49 :             ComputeMode::Static(lsn) => {
     355           49 :                 conf.append("recovery_target_lsn", &lsn.to_string());
     356           49 :             }
     357              :             ComputeMode::Replica => {
     358            2 :                 assert!(!self.env.safekeepers.is_empty());
     359              : 
     360              :                 // TODO: use future host field from safekeeper spec
     361              :                 // Pass the list of safekeepers to the replica so that it can connect to any of them,
     362              :                 // whichever is available.
     363            2 :                 let sk_ports = self
     364            2 :                     .env
     365            2 :                     .safekeepers
     366            2 :                     .iter()
     367            2 :                     .map(|x| x.get_compute_port().to_string())
     368            2 :                     .collect::<Vec<_>>()
     369            2 :                     .join(",");
     370            2 :                 let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
     371            2 : 
     372            2 :                 let connstr = format!(
     373            2 :                     "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
     374            2 :                     sk_hosts,
     375            2 :                     sk_ports,
     376            2 :                     &self.timeline_id.to_string(),
     377            2 :                     &self.tenant_id.to_string(),
     378            2 :                 );
     379            2 : 
     380            2 :                 let slot_name = format!("repl_{}_", self.timeline_id);
     381            2 :                 conf.append("primary_conninfo", connstr.as_str());
     382            2 :                 conf.append("primary_slot_name", slot_name.as_str());
     383            2 :                 conf.append("hot_standby", "on");
     384            2 :                 // prefetching of blocks referenced in WAL doesn't make sense for us
     385            2 :                 // Neon hot standby ignores pages that are not in the shared_buffers
     386            2 :                 if self.pg_version >= 15 {
     387            0 :                     conf.append("recovery_prefetch", "off");
     388            2 :                 }
     389              :             }
     390              :         }
     391              : 
     392          518 :         Ok(conf)
     393          518 :     }
     394              : 
     395         9480 :     pub fn endpoint_path(&self) -> PathBuf {
     396         9480 :         self.env.endpoints_path().join(&self.endpoint_id)
     397         9480 :     }
     398              : 
     399         3374 :     pub fn pgdata(&self) -> PathBuf {
     400         3374 :         self.endpoint_path().join("pgdata")
     401         3374 :     }
     402              : 
     403         1543 :     pub fn status(&self) -> EndpointStatus {
     404         1543 :         let timeout = Duration::from_millis(300);
     405         1543 :         let has_pidfile = self.pgdata().join("postmaster.pid").exists();
     406         1543 :         let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
     407         1543 : 
     408         1543 :         match (has_pidfile, can_connect) {
     409           35 :             (true, true) => EndpointStatus::Running,
     410         1508 :             (false, false) => EndpointStatus::Stopped,
     411            0 :             (true, false) => EndpointStatus::Crashed,
     412            0 :             (false, true) => EndpointStatus::RunningNoPidfile,
     413              :         }
     414         1543 :     }
     415              : 
     416          574 :     fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
     417          574 :         let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
     418          574 :         let mut cmd = Command::new(&pg_ctl_path);
     419          574 :         cmd.args(
     420          574 :             [
     421          574 :                 &[
     422          574 :                     "-D",
     423          574 :                     self.pgdata().to_str().unwrap(),
     424          574 :                     "-w", //wait till pg_ctl actually does what was asked
     425          574 :                 ],
     426          574 :                 args,
     427          574 :             ]
     428          574 :             .concat(),
     429          574 :         )
     430          574 :         .env_clear()
     431          574 :         .env(
     432          574 :             "LD_LIBRARY_PATH",
     433          574 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     434          574 :         )
     435          574 :         .env(
     436          574 :             "DYLD_LIBRARY_PATH",
     437          574 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     438              :         );
     439              : 
     440              :         // Pass authentication token used for the connections to pageserver and safekeepers
     441          574 :         if let Some(token) = auth_token {
     442            0 :             cmd.env("NEON_AUTH_TOKEN", token);
     443          574 :         }
     444              : 
     445          574 :         let pg_ctl = cmd
     446          574 :             .output()
     447          574 :             .context(format!("{} failed", pg_ctl_path.display()))?;
     448          574 :         if !pg_ctl.status.success() {
     449            2 :             anyhow::bail!(
     450            2 :                 "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
     451            2 :                 pg_ctl.status,
     452            2 :                 String::from_utf8_lossy(&pg_ctl.stdout),
     453            2 :                 String::from_utf8_lossy(&pg_ctl.stderr),
     454            2 :             );
     455          572 :         }
     456          572 : 
     457          572 :         Ok(())
     458          574 :     }
     459              : 
     460          572 :     fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
     461          572 :         // TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
     462          572 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     463          572 :         let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
     464          572 :         let pid = nix::unistd::Pid::from_raw(pid as i32);
     465          572 :         if send_sigterm {
     466           26 :             kill(pid, Signal::SIGTERM).ok();
     467          546 :         }
     468          572 :         crate::background_process::wait_until_stopped("compute_ctl", pid)?;
     469          572 :         Ok(())
     470          572 :     }
     471              : 
     472          813 :     fn read_postgresql_conf(&self) -> Result<String> {
     473          813 :         // Slurp the endpoints/<endpoint id>/postgresql.conf file into
     474          813 :         // memory. We will include it in the spec file that we pass to
     475          813 :         // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
     476          813 :         // in the data directory.
     477          813 :         let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
     478          813 :         match std::fs::read(&postgresql_conf_path) {
     479          813 :             Ok(content) => Ok(String::from_utf8(content)?),
     480            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
     481            0 :             Err(e) => Err(anyhow::Error::new(e).context(format!(
     482            0 :                 "failed to read config file in {}",
     483            0 :                 postgresql_conf_path.to_str().unwrap()
     484            0 :             ))),
     485              :         }
     486          813 :     }
     487              : 
     488          813 :     fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
     489          813 :         pageservers
     490          813 :             .iter()
     491          912 :             .map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
     492          813 :             .collect::<Vec<_>>()
     493          813 :             .join(",")
     494          813 :     }
     495              : 
     496          582 :     pub async fn start(
     497          582 :         &self,
     498          582 :         auth_token: &Option<String>,
     499          582 :         safekeepers: Vec<NodeId>,
     500          582 :         pageservers: Vec<(Host, u16)>,
     501          582 :         remote_ext_config: Option<&String>,
     502          582 :         shard_stripe_size: usize,
     503          582 :     ) -> Result<()> {
     504          582 :         if self.status() == EndpointStatus::Running {
     505            0 :             anyhow::bail!("The endpoint is already running");
     506          582 :         }
     507              : 
     508          582 :         let postgresql_conf = self.read_postgresql_conf()?;
     509              : 
     510              :         // We always start the compute node from scratch, so if the Postgres
     511              :         // data dir exists from a previous launch, remove it first.
     512          582 :         if self.pgdata().exists() {
     513           67 :             std::fs::remove_dir_all(self.pgdata())?;
     514          515 :         }
     515              : 
     516          582 :         let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
     517          582 :         assert!(!pageserver_connstring.is_empty());
     518              : 
     519          582 :         let mut safekeeper_connstrings = Vec::new();
     520          582 :         if self.mode == ComputeMode::Primary {
     521         1205 :             for sk_id in safekeepers {
     522          674 :                 let sk = self
     523          674 :                     .env
     524          674 :                     .safekeepers
     525          674 :                     .iter()
     526          892 :                     .find(|node| node.id == sk_id)
     527          674 :                     .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
     528          674 :                 safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
     529              :             }
     530           51 :         }
     531              : 
     532              :         // check for file remote_extensions_spec.json
     533              :         // if it is present, read it and pass to compute_ctl
     534          582 :         let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
     535          582 :         let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
     536              :         let remote_extensions: Option<RemoteExtSpec>;
     537              : 
     538          582 :         if let Ok(spec_file) = remote_extensions_spec {
     539            1 :             remote_extensions = serde_json::from_reader(spec_file).ok();
     540          581 :         } else {
     541          581 :             remote_extensions = None;
     542          581 :         };
     543              : 
     544              :         // Create spec file
     545          582 :         let spec = ComputeSpec {
     546          582 :             skip_pg_catalog_updates: self.skip_pg_catalog_updates,
     547          582 :             format_version: 1.0,
     548          582 :             operation_uuid: None,
     549          582 :             features: self.features.clone(),
     550          582 :             cluster: Cluster {
     551          582 :                 cluster_id: None, // project ID: not used
     552          582 :                 name: None,       // project name: not used
     553          582 :                 state: None,
     554          582 :                 roles: vec![],
     555          582 :                 databases: vec![],
     556          582 :                 settings: None,
     557          582 :                 postgresql_conf: Some(postgresql_conf),
     558          582 :             },
     559          582 :             delta_operations: None,
     560          582 :             tenant_id: Some(self.tenant_id),
     561          582 :             timeline_id: Some(self.timeline_id),
     562          582 :             mode: self.mode,
     563          582 :             pageserver_connstring: Some(pageserver_connstring),
     564          582 :             safekeeper_connstrings,
     565          582 :             storage_auth_token: auth_token.clone(),
     566          582 :             remote_extensions,
     567          582 :             pgbouncer_settings: None,
     568          582 :             shard_stripe_size: Some(shard_stripe_size),
     569          582 :         };
     570          582 :         let spec_path = self.endpoint_path().join("spec.json");
     571          582 :         std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
     572              : 
     573              :         // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
     574          582 :         let logfile = std::fs::OpenOptions::new()
     575          582 :             .create(true)
     576          582 :             .append(true)
     577          582 :             .open(self.endpoint_path().join("compute.log"))?;
     578              : 
     579              :         // Launch compute_ctl
     580          582 :         println!("Starting postgres node at '{}'", self.connstr());
     581          582 :         let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
     582          582 :         cmd.args(["--http-port", &self.http_address.port().to_string()])
     583          582 :             .args(["--pgdata", self.pgdata().to_str().unwrap()])
     584          582 :             .args(["--connstr", &self.connstr()])
     585          582 :             .args([
     586          582 :                 "--spec-path",
     587          582 :                 self.endpoint_path().join("spec.json").to_str().unwrap(),
     588          582 :             ])
     589          582 :             .args([
     590          582 :                 "--pgbin",
     591          582 :                 self.env
     592          582 :                     .pg_bin_dir(self.pg_version)?
     593          582 :                     .join("postgres")
     594          582 :                     .to_str()
     595          582 :                     .unwrap(),
     596          582 :             ])
     597          582 :             .stdin(std::process::Stdio::null())
     598          582 :             .stderr(logfile.try_clone()?)
     599          582 :             .stdout(logfile);
     600              : 
     601          582 :         if let Some(remote_ext_config) = remote_ext_config {
     602            1 :             cmd.args(["--remote-ext-config", remote_ext_config]);
     603          581 :         }
     604              : 
     605          582 :         let child = cmd.spawn()?;
     606              :         // set up a scopeguard to kill & wait for the child in case we panic or bail below
     607          582 :         let child = scopeguard::guard(child, |mut child| {
     608            8 :             println!("SIGKILL & wait the started process");
     609            8 :             (|| {
     610            8 :                 // TODO: use another signal that can be caught by the child so it can clean up any children it spawned
     611            8 :                 child.kill().context("SIGKILL child")?;
     612            8 :                 child.wait().context("wait() for child process")?;
     613            8 :                 anyhow::Ok(())
     614            8 :             })()
     615            8 :             .with_context(|| format!("scopeguard kill&wait child {child:?}"))
     616            8 :             .unwrap();
     617          582 :         });
     618          582 : 
     619          582 :         // Write down the pid so we can wait for it when we want to stop
     620          582 :         // TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
     621          582 :         let pid = child.id();
     622          582 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     623          582 :         std::fs::write(pidfile_path, pid.to_string())?;
     624              : 
     625              :         // Wait for it to start
     626          582 :         let mut attempt = 0;
     627              :         const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
     628              :         const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
     629              :         loop {
     630         3533 :             attempt += 1;
     631         9431 :             match self.get_status().await {
     632         2949 :                 Ok(state) => {
     633         2949 :                     match state.status {
     634              :                         ComputeStatus::Init => {
     635         2367 :                             if attempt == MAX_ATTEMPTS {
     636            0 :                                 bail!("compute startup timed out; still in Init state");
     637         2367 :                             }
     638              :                             // keep retrying
     639              :                         }
     640              :                         ComputeStatus::Running => {
     641              :                             // All good!
     642          574 :                             break;
     643              :                         }
     644              :                         ComputeStatus::Failed => {
     645            8 :                             bail!(
     646            8 :                                 "compute startup failed: {}",
     647            8 :                                 state
     648            8 :                                     .error
     649            8 :                                     .as_deref()
     650            8 :                                     .unwrap_or("<no error from compute_ctl>")
     651            8 :                             );
     652              :                         }
     653              :                         ComputeStatus::Empty
     654              :                         | ComputeStatus::ConfigurationPending
     655              :                         | ComputeStatus::Configuration => {
     656            0 :                             bail!("unexpected compute status: {:?}", state.status)
     657              :                         }
     658              :                     }
     659              :                 }
     660          584 :                 Err(e) => {
     661          584 :                     if attempt == MAX_ATTEMPTS {
     662            0 :                         return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
     663          584 :                     }
     664              :                 }
     665              :             }
     666         2951 :             std::thread::sleep(ATTEMPT_INTERVAL);
     667              :         }
     668              : 
     669              :         // disarm the scopeguard, let the child outlive this function (and neon_local invoction)
     670          574 :         drop(scopeguard::ScopeGuard::into_inner(child));
     671          574 : 
     672          574 :         Ok(())
     673          582 :     }
     674              : 
     675              :     // Call the /status HTTP API
     676         3533 :     pub async fn get_status(&self) -> Result<ComputeState> {
     677         3533 :         let client = reqwest::Client::new();
     678              : 
     679         3533 :         let response = client
     680         3533 :             .request(
     681         3533 :                 reqwest::Method::GET,
     682         3533 :                 format!(
     683         3533 :                     "http://{}:{}/status",
     684         3533 :                     self.http_address.ip(),
     685         3533 :                     self.http_address.port()
     686         3533 :                 ),
     687         3533 :             )
     688         3533 :             .send()
     689         9431 :             .await?;
     690              : 
     691              :         // Interpret the response
     692         2949 :         let status = response.status();
     693         2949 :         if !(status.is_client_error() || status.is_server_error()) {
     694         2949 :             Ok(response.json().await?)
     695              :         } else {
     696              :             // reqwest does not export its error construction utility functions, so let's craft the message ourselves
     697            0 :             let url = response.url().to_owned();
     698            0 :             let msg = match response.text().await {
     699            0 :                 Ok(err_body) => format!("Error: {}", err_body),
     700            0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
     701              :             };
     702            0 :             Err(anyhow::anyhow!(msg))
     703              :         }
     704         3533 :     }
     705              : 
     706          231 :     pub async fn reconfigure(&self, mut pageservers: Vec<(Host, u16)>) -> Result<()> {
     707          231 :         let mut spec: ComputeSpec = {
     708          231 :             let spec_path = self.endpoint_path().join("spec.json");
     709          231 :             let file = std::fs::File::open(spec_path)?;
     710          231 :             serde_json::from_reader(file)?
     711              :         };
     712              : 
     713          231 :         let postgresql_conf = self.read_postgresql_conf()?;
     714          231 :         spec.cluster.postgresql_conf = Some(postgresql_conf);
     715          231 : 
     716          231 :         // If we weren't given explicit pageservers, query the attachment service
     717          231 :         if pageservers.is_empty() {
     718            0 :             let attachment_service = AttachmentService::from_env(&self.env);
     719            0 :             let locate_result = attachment_service.tenant_locate(self.tenant_id).await?;
     720            0 :             pageservers = locate_result
     721            0 :                 .shards
     722            0 :                 .into_iter()
     723            0 :                 .map(|shard| {
     724            0 :                     (
     725            0 :                         Host::parse(&shard.listen_pg_addr)
     726            0 :                             .expect("Attachment service reported bad hostname"),
     727            0 :                         shard.listen_pg_port,
     728            0 :                     )
     729            0 :                 })
     730            0 :                 .collect::<Vec<_>>();
     731          231 :         }
     732              : 
     733          231 :         let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
     734          231 :         assert!(!pageserver_connstr.is_empty());
     735          231 :         spec.pageserver_connstring = Some(pageserver_connstr);
     736          231 : 
     737          231 :         let client = reqwest::Client::new();
     738          231 :         let response = client
     739          231 :             .post(format!(
     740          231 :                 "http://{}:{}/configure",
     741          231 :                 self.http_address.ip(),
     742          231 :                 self.http_address.port()
     743          231 :             ))
     744          231 :             .body(format!(
     745          231 :                 "{{\"spec\":{}}}",
     746          231 :                 serde_json::to_string_pretty(&spec)?
     747              :             ))
     748          231 :             .send()
     749          693 :             .await?;
     750              : 
     751          231 :         let status = response.status();
     752          231 :         if !(status.is_client_error() || status.is_server_error()) {
     753          231 :             Ok(())
     754              :         } else {
     755            0 :             let url = response.url().to_owned();
     756            0 :             let msg = match response.text().await {
     757            0 :                 Ok(err_body) => format!("Error: {}", err_body),
     758            0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
     759              :             };
     760            0 :             Err(anyhow::anyhow!(msg))
     761              :         }
     762          231 :     }
     763              : 
     764          574 :     pub fn stop(&self, mode: &str, destroy: bool) -> Result<()> {
     765          574 :         self.pg_ctl(&["-m", mode, "stop"], &None)?;
     766              : 
     767              :         // Also wait for the compute_ctl process to die. It might have some
     768              :         // cleanup work to do after postgres stops, like syncing safekeepers,
     769              :         // etc.
     770              :         //
     771              :         // If destroying, send it SIGTERM before waiting. Sometimes we do *not*
     772              :         // want this cleanup: tests intentionally do stop when majority of
     773              :         // safekeepers is down, so sync-safekeepers would hang otherwise. This
     774              :         // could be a separate flag though.
     775          572 :         self.wait_for_compute_ctl_to_exit(destroy)?;
     776          572 :         if destroy {
     777           26 :             println!(
     778           26 :                 "Destroying postgres data directory '{}'",
     779           26 :                 self.pgdata().to_str().unwrap()
     780           26 :             );
     781           26 :             std::fs::remove_dir_all(self.endpoint_path())?;
     782          546 :         }
     783          572 :         Ok(())
     784          574 :     }
     785              : 
     786         1164 :     pub fn connstr(&self) -> String {
     787         1164 :         format!(
     788         1164 :             "postgresql://{}@{}:{}/{}",
     789         1164 :             "cloud_admin",
     790         1164 :             self.pg_address.ip(),
     791         1164 :             self.pg_address.port(),
     792         1164 :             "postgres"
     793         1164 :         )
     794         1164 :     }
     795              : }
        

Generated by: LCOV version 2.1-beta