LCOV - code coverage report
Current view: top level - control_plane/src - endpoint.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 86.8 % 547 475
Test Date: 2024-02-07 07:37:29 Functions: 71.7 % 53 38

            Line data    Source code
       1              : //! Code to manage compute endpoints
       2              : //!
       3              : //! In the local test environment, the data for each endpoint is stored in
       4              : //!
       5              : //! ```text
       6              : //!   .neon/endpoints/<endpoint id>
       7              : //! ```
       8              : //!
       9              : //! Some basic information about the endpoint, like the tenant and timeline IDs,
      10              : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
      11              : //! when the endpoint is created, and doesn't change afterwards.
      12              : //!
      13              : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
      14              : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
      15              : //! the basebackup from the pageserver to initialize the the data directory, and
      16              : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
      17              : //! until it exits.
      18              : //!
      19              : //! When an endpoint is created, a `postgresql.conf` file is also created in
      20              : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
      21              : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
      22              : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
      23              : //! copy of it in the data directory.
      24              : //!
      25              : //! Directory contents:
      26              : //!
      27              : //! ```text
      28              : //! .neon/endpoints/main/
      29              : //!     compute.log               - log output of `compute_ctl` and `postgres`
      30              : //!     endpoint.json             - serialized `EndpointConf` struct
      31              : //!     postgresql.conf           - postgresql settings
      32              : //!     spec.json                 - passed to `compute_ctl`
      33              : //!     pgdata/
      34              : //!         postgresql.conf       - copy of postgresql.conf created by `compute_ctl`
      35              : //!         zenith.signal
      36              : //!         <other PostgreSQL files>
      37              : //! ```
      38              : //!
      39              : use std::collections::BTreeMap;
      40              : use std::net::SocketAddr;
      41              : use std::net::TcpStream;
      42              : use std::path::PathBuf;
      43              : use std::process::Command;
      44              : use std::sync::Arc;
      45              : use std::time::Duration;
      46              : 
      47              : use anyhow::{anyhow, bail, Context, Result};
      48              : use compute_api::spec::RemoteExtSpec;
      49              : use nix::sys::signal::kill;
      50              : use nix::sys::signal::Signal;
      51              : use serde::{Deserialize, Serialize};
      52              : use url::Host;
      53              : use utils::id::{NodeId, TenantId, TimelineId};
      54              : 
      55              : use crate::attachment_service::AttachmentService;
      56              : use crate::local_env::LocalEnv;
      57              : use crate::postgresql_conf::PostgresConf;
      58              : 
      59              : use compute_api::responses::{ComputeState, ComputeStatus};
      60              : use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
      61              : 
      62              : // contents of a endpoint.json file
      63        74024 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
      64              : pub struct EndpointConf {
      65              :     endpoint_id: String,
      66              :     tenant_id: TenantId,
      67              :     timeline_id: TimelineId,
      68              :     mode: ComputeMode,
      69              :     pg_port: u16,
      70              :     http_port: u16,
      71              :     pg_version: u32,
      72              :     skip_pg_catalog_updates: bool,
      73              :     features: Vec<ComputeFeature>,
      74              : }
      75              : 
      76              : //
      77              : // ComputeControlPlane
      78              : //
      79              : pub struct ComputeControlPlane {
      80              :     base_port: u16,
      81              : 
      82              :     // endpoint ID is the key
      83              :     pub endpoints: BTreeMap<String, Arc<Endpoint>>,
      84              : 
      85              :     env: LocalEnv,
      86              : }
      87              : 
      88              : impl ComputeControlPlane {
      89              :     // Load current endpoints from the endpoints/ subdirectories
      90         2400 :     pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
      91         2400 :         let mut endpoints = BTreeMap::default();
      92         3896 :         for endpoint_dir in std::fs::read_dir(env.endpoints_path())
      93         2400 :             .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
      94         3896 :         {
      95         3896 :             let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
      96         3896 :             endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
      97              :         }
      98              : 
      99         2400 :         Ok(ComputeControlPlane {
     100         2400 :             base_port: 55431,
     101         2400 :             endpoints,
     102         2400 :             env,
     103         2400 :         })
     104         2400 :     }
     105              : 
     106            8 :     fn get_port(&mut self) -> u16 {
     107            8 :         1 + self
     108            8 :             .endpoints
     109            8 :             .values()
     110            8 :             .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port()))
     111            8 :             .max()
     112            8 :             .unwrap_or(self.base_port)
     113            8 :     }
     114              : 
     115              :     #[allow(clippy::too_many_arguments)]
     116          522 :     pub fn new_endpoint(
     117          522 :         &mut self,
     118          522 :         endpoint_id: &str,
     119          522 :         tenant_id: TenantId,
     120          522 :         timeline_id: TimelineId,
     121          522 :         pg_port: Option<u16>,
     122          522 :         http_port: Option<u16>,
     123          522 :         pg_version: u32,
     124          522 :         mode: ComputeMode,
     125          522 :     ) -> Result<Arc<Endpoint>> {
     126          522 :         let pg_port = pg_port.unwrap_or_else(|| self.get_port());
     127          522 :         let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
     128          522 :         let ep = Arc::new(Endpoint {
     129          522 :             endpoint_id: endpoint_id.to_owned(),
     130          522 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
     131          522 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
     132          522 :             env: self.env.clone(),
     133          522 :             timeline_id,
     134          522 :             mode,
     135          522 :             tenant_id,
     136          522 :             pg_version,
     137          522 :             // We don't setup roles and databases in the spec locally, so we don't need to
     138          522 :             // do catalog updates. Catalog updates also include check availability
     139          522 :             // data creation. Yet, we have tests that check that size and db dump
     140          522 :             // before and after start are the same. So, skip catalog updates,
     141          522 :             // with this we basically test a case of waking up an idle compute, where
     142          522 :             // we also skip catalog updates in the cloud.
     143          522 :             skip_pg_catalog_updates: true,
     144          522 :             features: vec![],
     145          522 :         });
     146          522 : 
     147          522 :         ep.create_endpoint_dir()?;
     148              :         std::fs::write(
     149          522 :             ep.endpoint_path().join("endpoint.json"),
     150          522 :             serde_json::to_string_pretty(&EndpointConf {
     151          522 :                 endpoint_id: endpoint_id.to_string(),
     152          522 :                 tenant_id,
     153          522 :                 timeline_id,
     154          522 :                 mode,
     155          522 :                 http_port,
     156          522 :                 pg_port,
     157          522 :                 pg_version,
     158          522 :                 skip_pg_catalog_updates: true,
     159          522 :                 features: vec![],
     160          522 :             })?,
     161            0 :         )?;
     162              :         std::fs::write(
     163          522 :             ep.endpoint_path().join("postgresql.conf"),
     164          522 :             ep.setup_pg_conf()?.to_string(),
     165            0 :         )?;
     166              : 
     167          522 :         self.endpoints
     168          522 :             .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
     169          522 : 
     170          522 :         Ok(ep)
     171          522 :     }
     172              : 
     173         1131 :     pub fn check_conflicting_endpoints(
     174         1131 :         &self,
     175         1131 :         mode: ComputeMode,
     176         1131 :         tenant_id: TenantId,
     177         1131 :         timeline_id: TimelineId,
     178         1131 :     ) -> Result<()> {
     179         1131 :         if matches!(mode, ComputeMode::Primary) {
     180              :             // this check is not complete, as you could have a concurrent attempt at
     181              :             // creating another primary, both reading the state before checking it here,
     182              :             // but it's better than nothing.
     183         1514 :             let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
     184         1514 :                 v.tenant_id == tenant_id
     185         1393 :                     && v.timeline_id == timeline_id
     186          957 :                     && v.mode == mode
     187          953 :                     && v.status() != EndpointStatus::Stopped
     188         1514 :             });
     189              : 
     190         1023 :             if let Some((key, _)) = duplicates.next() {
     191           29 :                 bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
     192          994 :             }
     193          108 :         }
     194         1102 :         Ok(())
     195         1131 :     }
     196              : }
     197              : 
     198              : ///////////////////////////////////////////////////////////////////////////////
     199              : 
     200            0 : #[derive(Debug)]
     201              : pub struct Endpoint {
     202              :     /// used as the directory name
     203              :     endpoint_id: String,
     204              :     pub tenant_id: TenantId,
     205              :     pub timeline_id: TimelineId,
     206              :     pub mode: ComputeMode,
     207              : 
     208              :     // port and address of the Postgres server and `compute_ctl`'s HTTP API
     209              :     pub pg_address: SocketAddr,
     210              :     pub http_address: SocketAddr,
     211              : 
     212              :     // postgres major version in the format: 14, 15, etc.
     213              :     pg_version: u32,
     214              : 
     215              :     // These are not part of the endpoint as such, but the environment
     216              :     // the endpoint runs in.
     217              :     pub env: LocalEnv,
     218              : 
     219              :     // Optimizations
     220              :     skip_pg_catalog_updates: bool,
     221              : 
     222              :     // Feature flags
     223              :     features: Vec<ComputeFeature>,
     224              : }
     225              : 
     226         1540 : #[derive(PartialEq, Eq)]
     227              : pub enum EndpointStatus {
     228              :     Running,
     229              :     Stopped,
     230              :     Crashed,
     231              :     RunningNoPidfile,
     232              : }
     233              : 
     234              : impl std::fmt::Display for EndpointStatus {
     235            0 :     fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
     236            0 :         let s = match self {
     237            0 :             Self::Running => "running",
     238            0 :             Self::Stopped => "stopped",
     239            0 :             Self::Crashed => "crashed",
     240            0 :             Self::RunningNoPidfile => "running, no pidfile",
     241              :         };
     242            0 :         write!(writer, "{}", s)
     243            0 :     }
     244              : }
     245              : 
     246              : impl Endpoint {
     247         3896 :     fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
     248         3896 :         if !entry.file_type()?.is_dir() {
     249            0 :             anyhow::bail!(
     250            0 :                 "Endpoint::from_dir_entry failed: '{}' is not a directory",
     251            0 :                 entry.path().display()
     252            0 :             );
     253         3896 :         }
     254         3896 : 
     255         3896 :         // parse data directory name
     256         3896 :         let fname = entry.file_name();
     257         3896 :         let endpoint_id = fname.to_str().unwrap().to_string();
     258              : 
     259              :         // Read the endpoint.json file
     260         3896 :         let conf: EndpointConf =
     261         3896 :             serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
     262              : 
     263         3896 :         Ok(Endpoint {
     264         3896 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
     265         3896 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
     266         3896 :             endpoint_id,
     267         3896 :             env: env.clone(),
     268         3896 :             timeline_id: conf.timeline_id,
     269         3896 :             mode: conf.mode,
     270         3896 :             tenant_id: conf.tenant_id,
     271         3896 :             pg_version: conf.pg_version,
     272         3896 :             skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
     273         3896 :             features: conf.features,
     274         3896 :         })
     275         3896 :     }
     276              : 
     277          522 :     fn create_endpoint_dir(&self) -> Result<()> {
     278          522 :         std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
     279            0 :             format!(
     280            0 :                 "could not create endpoint directory {}",
     281            0 :                 self.endpoint_path().display()
     282            0 :             )
     283          522 :         })
     284          522 :     }
     285              : 
     286              :     // Generate postgresql.conf with default configuration
     287          522 :     fn setup_pg_conf(&self) -> Result<PostgresConf> {
     288          522 :         let mut conf = PostgresConf::new();
     289          522 :         conf.append("max_wal_senders", "10");
     290          522 :         conf.append("wal_log_hints", "off");
     291          522 :         conf.append("max_replication_slots", "10");
     292          522 :         conf.append("hot_standby", "on");
     293          522 :         conf.append("hot_standby_feedback", "on");
     294          522 :         conf.append("shared_buffers", "1MB");
     295          522 :         conf.append("fsync", "off");
     296          522 :         conf.append("max_connections", "100");
     297          522 :         conf.append("wal_level", "logical");
     298          522 :         // wal_sender_timeout is the maximum time to wait for WAL replication.
     299          522 :         // It also defines how often the walreciever will send a feedback message to the wal sender.
     300          522 :         conf.append("wal_sender_timeout", "5s");
     301          522 :         conf.append("listen_addresses", &self.pg_address.ip().to_string());
     302          522 :         conf.append("port", &self.pg_address.port().to_string());
     303          522 :         conf.append("wal_keep_size", "0");
     304          522 :         // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
     305          522 :         conf.append("restart_after_crash", "off");
     306          522 : 
     307          522 :         // Load the 'neon' extension
     308          522 :         conf.append("shared_preload_libraries", "neon");
     309          522 : 
     310          522 :         conf.append_line("");
     311          522 :         // Replication-related configurations, such as WAL sending
     312          522 :         match &self.mode {
     313              :             ComputeMode::Primary => {
     314              :                 // Configure backpressure
     315              :                 // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
     316              :                 //   This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
     317              :                 //   so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
     318              :                 //   Actually latency should be much smaller (better if < 1sec). But we assume that recently
     319              :                 //   updates pages are not requested from pageserver.
     320              :                 // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
     321              :                 //   delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
     322              :                 //   remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
     323              :                 //   recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
     324              :                 // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
     325              :                 //   To be able to restore database in case of pageserver node crash, safekeeper should not
     326              :                 //   remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
     327              :                 //   (if they are not able to upload WAL to S3).
     328          468 :                 conf.append("max_replication_write_lag", "15MB");
     329          468 :                 conf.append("max_replication_flush_lag", "10GB");
     330          468 : 
     331          468 :                 if !self.env.safekeepers.is_empty() {
     332          468 :                     // Configure Postgres to connect to the safekeepers
     333          468 :                     conf.append("synchronous_standby_names", "walproposer");
     334          468 : 
     335          468 :                     let safekeepers = self
     336          468 :                         .env
     337          468 :                         .safekeepers
     338          468 :                         .iter()
     339          609 :                         .map(|sk| format!("localhost:{}", sk.get_compute_port()))
     340          468 :                         .collect::<Vec<String>>()
     341          468 :                         .join(",");
     342          468 :                     conf.append("neon.safekeepers", &safekeepers);
     343          468 :                 } else {
     344            0 :                     // We only use setup without safekeepers for tests,
     345            0 :                     // and don't care about data durability on pageserver,
     346            0 :                     // so set more relaxed synchronous_commit.
     347            0 :                     conf.append("synchronous_commit", "remote_write");
     348            0 : 
     349            0 :                     // Configure the node to stream WAL directly to the pageserver
     350            0 :                     // This isn't really a supported configuration, but can be useful for
     351            0 :                     // testing.
     352            0 :                     conf.append("synchronous_standby_names", "pageserver");
     353            0 :                 }
     354              :             }
     355           49 :             ComputeMode::Static(lsn) => {
     356           49 :                 conf.append("recovery_target_lsn", &lsn.to_string());
     357           49 :             }
     358              :             ComputeMode::Replica => {
     359            5 :                 assert!(!self.env.safekeepers.is_empty());
     360              : 
     361              :                 // TODO: use future host field from safekeeper spec
     362              :                 // Pass the list of safekeepers to the replica so that it can connect to any of them,
     363              :                 // whichever is available.
     364            5 :                 let sk_ports = self
     365            5 :                     .env
     366            5 :                     .safekeepers
     367            5 :                     .iter()
     368            5 :                     .map(|x| x.get_compute_port().to_string())
     369            5 :                     .collect::<Vec<_>>()
     370            5 :                     .join(",");
     371            5 :                 let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
     372            5 : 
     373            5 :                 let connstr = format!(
     374            5 :                     "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
     375            5 :                     sk_hosts,
     376            5 :                     sk_ports,
     377            5 :                     &self.timeline_id.to_string(),
     378            5 :                     &self.tenant_id.to_string(),
     379            5 :                 );
     380            5 : 
     381            5 :                 let slot_name = format!("repl_{}_", self.timeline_id);
     382            5 :                 conf.append("primary_conninfo", connstr.as_str());
     383            5 :                 conf.append("primary_slot_name", slot_name.as_str());
     384            5 :                 conf.append("hot_standby", "on");
     385            5 :                 // prefetching of blocks referenced in WAL doesn't make sense for us
     386            5 :                 // Neon hot standby ignores pages that are not in the shared_buffers
     387            5 :                 if self.pg_version >= 15 {
     388            0 :                     conf.append("recovery_prefetch", "off");
     389            5 :                 }
     390              :             }
     391              :         }
     392              : 
     393          522 :         Ok(conf)
     394          522 :     }
     395              : 
     396         9490 :     pub fn endpoint_path(&self) -> PathBuf {
     397         9490 :         self.env.endpoints_path().join(&self.endpoint_id)
     398         9490 :     }
     399              : 
     400         3376 :     pub fn pgdata(&self) -> PathBuf {
     401         3376 :         self.endpoint_path().join("pgdata")
     402         3376 :     }
     403              : 
     404         1540 :     pub fn status(&self) -> EndpointStatus {
     405         1540 :         let timeout = Duration::from_millis(300);
     406         1540 :         let has_pidfile = self.pgdata().join("postmaster.pid").exists();
     407         1540 :         let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
     408         1540 : 
     409         1540 :         match (has_pidfile, can_connect) {
     410           29 :             (true, true) => EndpointStatus::Running,
     411         1511 :             (false, false) => EndpointStatus::Stopped,
     412            0 :             (true, false) => EndpointStatus::Crashed,
     413            0 :             (false, true) => EndpointStatus::RunningNoPidfile,
     414              :         }
     415         1540 :     }
     416              : 
     417          576 :     fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
     418          576 :         let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
     419          576 :         let mut cmd = Command::new(&pg_ctl_path);
     420          576 :         cmd.args(
     421          576 :             [
     422          576 :                 &[
     423          576 :                     "-D",
     424          576 :                     self.pgdata().to_str().unwrap(),
     425          576 :                     "-w", //wait till pg_ctl actually does what was asked
     426          576 :                 ],
     427          576 :                 args,
     428          576 :             ]
     429          576 :             .concat(),
     430          576 :         )
     431          576 :         .env_clear()
     432          576 :         .env(
     433          576 :             "LD_LIBRARY_PATH",
     434          576 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     435          576 :         )
     436          576 :         .env(
     437          576 :             "DYLD_LIBRARY_PATH",
     438          576 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     439              :         );
     440              : 
     441              :         // Pass authentication token used for the connections to pageserver and safekeepers
     442          576 :         if let Some(token) = auth_token {
     443            0 :             cmd.env("NEON_AUTH_TOKEN", token);
     444          576 :         }
     445              : 
     446          576 :         let pg_ctl = cmd
     447          576 :             .output()
     448          576 :             .context(format!("{} failed", pg_ctl_path.display()))?;
     449          576 :         if !pg_ctl.status.success() {
     450            0 :             anyhow::bail!(
     451            0 :                 "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
     452            0 :                 pg_ctl.status,
     453            0 :                 String::from_utf8_lossy(&pg_ctl.stdout),
     454            0 :                 String::from_utf8_lossy(&pg_ctl.stderr),
     455            0 :             );
     456          576 :         }
     457          576 : 
     458          576 :         Ok(())
     459          576 :     }
     460              : 
     461          576 :     fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
     462          576 :         // TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
     463          576 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     464          576 :         let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
     465          576 :         let pid = nix::unistd::Pid::from_raw(pid as i32);
     466          576 :         if send_sigterm {
     467           26 :             kill(pid, Signal::SIGTERM).ok();
     468          550 :         }
     469          576 :         crate::background_process::wait_until_stopped("compute_ctl", pid)?;
     470          576 :         Ok(())
     471          576 :     }
     472              : 
     473          805 :     fn read_postgresql_conf(&self) -> Result<String> {
     474          805 :         // Slurp the endpoints/<endpoint id>/postgresql.conf file into
     475          805 :         // memory. We will include it in the spec file that we pass to
     476          805 :         // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
     477          805 :         // in the data directory.
     478          805 :         let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
     479          805 :         match std::fs::read(&postgresql_conf_path) {
     480          805 :             Ok(content) => Ok(String::from_utf8(content)?),
     481            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
     482            0 :             Err(e) => Err(anyhow::Error::new(e).context(format!(
     483            0 :                 "failed to read config file in {}",
     484            0 :                 postgresql_conf_path.to_str().unwrap()
     485            0 :             ))),
     486              :         }
     487          805 :     }
     488              : 
     489          805 :     fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
     490          805 :         pageservers
     491          805 :             .iter()
     492          823 :             .map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
     493          805 :             .collect::<Vec<_>>()
     494          805 :             .join(",")
     495          805 :     }
     496              : 
     497          584 :     pub async fn start(
     498          584 :         &self,
     499          584 :         auth_token: &Option<String>,
     500          584 :         safekeepers: Vec<NodeId>,
     501          584 :         pageservers: Vec<(Host, u16)>,
     502          584 :         remote_ext_config: Option<&String>,
     503          584 :         shard_stripe_size: usize,
     504          584 :     ) -> Result<()> {
     505          584 :         if self.status() == EndpointStatus::Running {
     506            0 :             anyhow::bail!("The endpoint is already running");
     507          584 :         }
     508              : 
     509          584 :         let postgresql_conf = self.read_postgresql_conf()?;
     510              : 
     511              :         // We always start the compute node from scratch, so if the Postgres
     512              :         // data dir exists from a previous launch, remove it first.
     513          584 :         if self.pgdata().exists() {
     514           66 :             std::fs::remove_dir_all(self.pgdata())?;
     515          518 :         }
     516              : 
     517          584 :         let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
     518          584 :         assert!(!pageserver_connstring.is_empty());
     519              : 
     520          584 :         let mut safekeeper_connstrings = Vec::new();
     521          584 :         if self.mode == ComputeMode::Primary {
     522         1203 :             for sk_id in safekeepers {
     523          673 :                 let sk = self
     524          673 :                     .env
     525          673 :                     .safekeepers
     526          673 :                     .iter()
     527          891 :                     .find(|node| node.id == sk_id)
     528          673 :                     .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
     529          673 :                 safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
     530              :             }
     531           54 :         }
     532              : 
     533              :         // check for file remote_extensions_spec.json
     534              :         // if it is present, read it and pass to compute_ctl
     535          584 :         let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
     536          584 :         let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
     537              :         let remote_extensions: Option<RemoteExtSpec>;
     538              : 
     539          584 :         if let Ok(spec_file) = remote_extensions_spec {
     540            1 :             remote_extensions = serde_json::from_reader(spec_file).ok();
     541          583 :         } else {
     542          583 :             remote_extensions = None;
     543          583 :         };
     544              : 
     545              :         // Create spec file
     546          584 :         let spec = ComputeSpec {
     547          584 :             skip_pg_catalog_updates: self.skip_pg_catalog_updates,
     548          584 :             format_version: 1.0,
     549          584 :             operation_uuid: None,
     550          584 :             features: self.features.clone(),
     551          584 :             cluster: Cluster {
     552          584 :                 cluster_id: None, // project ID: not used
     553          584 :                 name: None,       // project name: not used
     554          584 :                 state: None,
     555          584 :                 roles: vec![],
     556          584 :                 databases: vec![],
     557          584 :                 settings: None,
     558          584 :                 postgresql_conf: Some(postgresql_conf),
     559          584 :             },
     560          584 :             delta_operations: None,
     561          584 :             tenant_id: Some(self.tenant_id),
     562          584 :             timeline_id: Some(self.timeline_id),
     563          584 :             mode: self.mode,
     564          584 :             pageserver_connstring: Some(pageserver_connstring),
     565          584 :             safekeeper_connstrings,
     566          584 :             storage_auth_token: auth_token.clone(),
     567          584 :             remote_extensions,
     568          584 :             pgbouncer_settings: None,
     569          584 :             shard_stripe_size: Some(shard_stripe_size),
     570          584 :         };
     571          584 :         let spec_path = self.endpoint_path().join("spec.json");
     572          584 :         std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
     573              : 
     574              :         // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
     575          584 :         let logfile = std::fs::OpenOptions::new()
     576          584 :             .create(true)
     577          584 :             .append(true)
     578          584 :             .open(self.endpoint_path().join("compute.log"))?;
     579              : 
     580              :         // Launch compute_ctl
     581          584 :         println!("Starting postgres node at '{}'", self.connstr());
     582          584 :         let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
     583          584 :         cmd.args(["--http-port", &self.http_address.port().to_string()])
     584          584 :             .args(["--pgdata", self.pgdata().to_str().unwrap()])
     585          584 :             .args(["--connstr", &self.connstr()])
     586          584 :             .args([
     587          584 :                 "--spec-path",
     588          584 :                 self.endpoint_path().join("spec.json").to_str().unwrap(),
     589          584 :             ])
     590          584 :             .args([
     591          584 :                 "--pgbin",
     592          584 :                 self.env
     593          584 :                     .pg_bin_dir(self.pg_version)?
     594          584 :                     .join("postgres")
     595          584 :                     .to_str()
     596          584 :                     .unwrap(),
     597          584 :             ])
     598          584 :             .stdin(std::process::Stdio::null())
     599          584 :             .stderr(logfile.try_clone()?)
     600          584 :             .stdout(logfile);
     601              : 
     602          584 :         if let Some(remote_ext_config) = remote_ext_config {
     603            1 :             cmd.args(["--remote-ext-config", remote_ext_config]);
     604          583 :         }
     605              : 
     606          584 :         let child = cmd.spawn()?;
     607              :         // set up a scopeguard to kill & wait for the child in case we panic or bail below
     608          584 :         let child = scopeguard::guard(child, |mut child| {
     609            8 :             println!("SIGKILL & wait the started process");
     610            8 :             (|| {
     611            8 :                 // TODO: use another signal that can be caught by the child so it can clean up any children it spawned
     612            8 :                 child.kill().context("SIGKILL child")?;
     613            8 :                 child.wait().context("wait() for child process")?;
     614            8 :                 anyhow::Ok(())
     615            8 :             })()
     616            8 :             .with_context(|| format!("scopeguard kill&wait child {child:?}"))
     617            8 :             .unwrap();
     618          584 :         });
     619          584 : 
     620          584 :         // Write down the pid so we can wait for it when we want to stop
     621          584 :         // TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
     622          584 :         let pid = child.id();
     623          584 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     624          584 :         std::fs::write(pidfile_path, pid.to_string())?;
     625              : 
     626              :         // Wait for it to start
     627          584 :         let mut attempt = 0;
     628              :         const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
     629              :         const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
     630              :         loop {
     631         4055 :             attempt += 1;
     632        10997 :             match self.get_status().await {
     633         3471 :                 Ok(state) => {
     634         3471 :                     match state.status {
     635              :                         ComputeStatus::Init => {
     636         2887 :                             if attempt == MAX_ATTEMPTS {
     637            0 :                                 bail!("compute startup timed out; still in Init state");
     638         2887 :                             }
     639              :                             // keep retrying
     640              :                         }
     641              :                         ComputeStatus::Running => {
     642              :                             // All good!
     643          576 :                             break;
     644              :                         }
     645              :                         ComputeStatus::Failed => {
     646            8 :                             bail!(
     647            8 :                                 "compute startup failed: {}",
     648            8 :                                 state
     649            8 :                                     .error
     650            8 :                                     .as_deref()
     651            8 :                                     .unwrap_or("<no error from compute_ctl>")
     652            8 :                             );
     653              :                         }
     654              :                         ComputeStatus::Empty
     655              :                         | ComputeStatus::ConfigurationPending
     656              :                         | ComputeStatus::Configuration => {
     657            0 :                             bail!("unexpected compute status: {:?}", state.status)
     658              :                         }
     659              :                     }
     660              :                 }
     661          584 :                 Err(e) => {
     662          584 :                     if attempt == MAX_ATTEMPTS {
     663            0 :                         return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
     664          584 :                     }
     665              :                 }
     666              :             }
     667         3471 :             std::thread::sleep(ATTEMPT_INTERVAL);
     668              :         }
     669              : 
     670              :         // disarm the scopeguard, let the child outlive this function (and neon_local invoction)
     671          576 :         drop(scopeguard::ScopeGuard::into_inner(child));
     672          576 : 
     673          576 :         Ok(())
     674          584 :     }
     675              : 
     676              :     // Call the /status HTTP API
     677         4055 :     pub async fn get_status(&self) -> Result<ComputeState> {
     678         4055 :         let client = reqwest::Client::new();
     679              : 
     680         4055 :         let response = client
     681         4055 :             .request(
     682         4055 :                 reqwest::Method::GET,
     683         4055 :                 format!(
     684         4055 :                     "http://{}:{}/status",
     685         4055 :                     self.http_address.ip(),
     686         4055 :                     self.http_address.port()
     687         4055 :                 ),
     688         4055 :             )
     689         4055 :             .send()
     690        10997 :             .await?;
     691              : 
     692              :         // Interpret the response
     693         3471 :         let status = response.status();
     694         3471 :         if !(status.is_client_error() || status.is_server_error()) {
     695         3471 :             Ok(response.json().await?)
     696              :         } else {
     697              :             // reqwest does not export its error construction utility functions, so let's craft the message ourselves
     698            0 :             let url = response.url().to_owned();
     699            0 :             let msg = match response.text().await {
     700            0 :                 Ok(err_body) => format!("Error: {}", err_body),
     701            0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
     702              :             };
     703            0 :             Err(anyhow::anyhow!(msg))
     704              :         }
     705         4055 :     }
     706              : 
     707          221 :     pub async fn reconfigure(&self, mut pageservers: Vec<(Host, u16)>) -> Result<()> {
     708          221 :         let mut spec: ComputeSpec = {
     709          221 :             let spec_path = self.endpoint_path().join("spec.json");
     710          221 :             let file = std::fs::File::open(spec_path)?;
     711          221 :             serde_json::from_reader(file)?
     712              :         };
     713              : 
     714          221 :         let postgresql_conf = self.read_postgresql_conf()?;
     715          221 :         spec.cluster.postgresql_conf = Some(postgresql_conf);
     716          221 : 
     717          221 :         // If we weren't given explicit pageservers, query the attachment service
     718          221 :         if pageservers.is_empty() {
     719            0 :             let attachment_service = AttachmentService::from_env(&self.env);
     720            0 :             let locate_result = attachment_service.tenant_locate(self.tenant_id).await?;
     721            0 :             pageservers = locate_result
     722            0 :                 .shards
     723            0 :                 .into_iter()
     724            0 :                 .map(|shard| {
     725            0 :                     (
     726            0 :                         Host::parse(&shard.listen_pg_addr)
     727            0 :                             .expect("Attachment service reported bad hostname"),
     728            0 :                         shard.listen_pg_port,
     729            0 :                     )
     730            0 :                 })
     731            0 :                 .collect::<Vec<_>>();
     732          221 :         }
     733              : 
     734          221 :         let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
     735          221 :         assert!(!pageserver_connstr.is_empty());
     736          221 :         spec.pageserver_connstring = Some(pageserver_connstr);
     737          221 : 
     738          221 :         let client = reqwest::Client::new();
     739          221 :         let response = client
     740          221 :             .post(format!(
     741          221 :                 "http://{}:{}/configure",
     742          221 :                 self.http_address.ip(),
     743          221 :                 self.http_address.port()
     744          221 :             ))
     745          221 :             .body(format!(
     746          221 :                 "{{\"spec\":{}}}",
     747          221 :                 serde_json::to_string_pretty(&spec)?
     748              :             ))
     749          221 :             .send()
     750          663 :             .await?;
     751              : 
     752          221 :         let status = response.status();
     753          221 :         if !(status.is_client_error() || status.is_server_error()) {
     754          221 :             Ok(())
     755              :         } else {
     756            0 :             let url = response.url().to_owned();
     757            0 :             let msg = match response.text().await {
     758            0 :                 Ok(err_body) => format!("Error: {}", err_body),
     759            0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
     760              :             };
     761            0 :             Err(anyhow::anyhow!(msg))
     762              :         }
     763          221 :     }
     764              : 
     765          576 :     pub fn stop(&self, destroy: bool) -> Result<()> {
     766          576 :         // If we are going to destroy data directory,
     767          576 :         // use immediate shutdown mode, otherwise,
     768          576 :         // shutdown gracefully to leave the data directory sane.
     769          576 :         //
     770          576 :         // Postgres is always started from scratch, so stop
     771          576 :         // without destroy only used for testing and debugging.
     772          576 :         //
     773          576 :         self.pg_ctl(
     774          576 :             if destroy {
     775           26 :                 &["-m", "immediate", "stop"]
     776              :             } else {
     777          550 :                 &["stop"]
     778              :             },
     779          576 :             &None,
     780            0 :         )?;
     781              : 
     782              :         // Also wait for the compute_ctl process to die. It might have some
     783              :         // cleanup work to do after postgres stops, like syncing safekeepers,
     784              :         // etc.
     785              :         //
     786              :         // If destroying, send it SIGTERM before waiting. Sometimes we do *not*
     787              :         // want this cleanup: tests intentionally do stop when majority of
     788              :         // safekeepers is down, so sync-safekeepers would hang otherwise. This
     789              :         // could be a separate flag though.
     790          576 :         self.wait_for_compute_ctl_to_exit(destroy)?;
     791          576 :         if destroy {
     792           26 :             println!(
     793           26 :                 "Destroying postgres data directory '{}'",
     794           26 :                 self.pgdata().to_str().unwrap()
     795           26 :             );
     796           26 :             std::fs::remove_dir_all(self.endpoint_path())?;
     797          550 :         }
     798          576 :         Ok(())
     799          576 :     }
     800              : 
     801         1168 :     pub fn connstr(&self) -> String {
     802         1168 :         format!(
     803         1168 :             "postgresql://{}@{}:{}/{}",
     804         1168 :             "cloud_admin",
     805         1168 :             self.pg_address.ip(),
     806         1168 :             self.pg_address.port(),
     807         1168 :             "postgres"
     808         1168 :         )
     809         1168 :     }
     810              : }
        

Generated by: LCOV version 2.1-beta