LCOV - code coverage report
Current view: top level - control_plane/src - endpoint.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 90.3 % 434 392
Test Date: 2023-09-06 10:18:01 Functions: 62.8 % 43 27

            Line data    Source code
       1              : //! Code to manage compute endpoints
       2              : //!
       3              : //! In the local test environment, the data for each endpoint is stored in
       4              : //!
       5              : //! ```text
       6              : //!   .neon/endpoints/<endpoint id>
       7              : //! ```
       8              : //!
       9              : //! Some basic information about the endpoint, like the tenant and timeline IDs,
      10              : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
      11              : //! when the endpoint is created, and doesn't change afterwards.
      12              : //!
      13              : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
      14              : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
      15              : //! the basebackup from the pageserver to initialize the the data directory, and
      16              : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
      17              : //! until it exits.
      18              : //!
      19              : //! When an endpoint is created, a `postgresql.conf` file is also created in
      20              : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
      21              : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
      22              : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
      23              : //! copy of it in the data directory.
      24              : //!
      25              : //! Directory contents:
      26              : //!
      27              : //! ```text
      28              : //! .neon/endpoints/main/
      29              : //!     compute.log               - log output of `compute_ctl` and `postgres`
      30              : //!     endpoint.json             - serialized `EndpointConf` struct
      31              : //!     postgresql.conf           - postgresql settings
      32              : //!     spec.json                 - passed to `compute_ctl`
      33              : //!     pgdata/
      34              : //!         postgresql.conf       - copy of postgresql.conf created by `compute_ctl`
      35              : //!         zenith.signal
      36              : //!         <other PostgreSQL files>
      37              : //! ```
      38              : //!
      39              : use std::collections::BTreeMap;
      40              : use std::net::SocketAddr;
      41              : use std::net::TcpStream;
      42              : use std::path::PathBuf;
      43              : use std::process::Command;
      44              : use std::sync::Arc;
      45              : use std::time::Duration;
      46              : 
      47              : use anyhow::{anyhow, bail, Context, Result};
      48              : use serde::{Deserialize, Serialize};
      49              : use serde_with::{serde_as, DisplayFromStr};
      50              : use utils::id::{NodeId, TenantId, TimelineId};
      51              : 
      52              : use crate::local_env::LocalEnv;
      53              : use crate::pageserver::PageServerNode;
      54              : use crate::postgresql_conf::PostgresConf;
      55              : 
      56              : use compute_api::responses::{ComputeState, ComputeStatus};
      57              : use compute_api::spec::{Cluster, ComputeMode, ComputeSpec};
      58              : 
      59              : // contents of a endpoint.json file
      60              : #[serde_as]
      61       130530 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
      62              : pub struct EndpointConf {
      63              :     endpoint_id: String,
      64              :     #[serde_as(as = "DisplayFromStr")]
      65              :     tenant_id: TenantId,
      66              :     #[serde_as(as = "DisplayFromStr")]
      67              :     timeline_id: TimelineId,
      68              :     mode: ComputeMode,
      69              :     pg_port: u16,
      70              :     http_port: u16,
      71              :     pg_version: u32,
      72              :     skip_pg_catalog_updates: bool,
      73              : }
      74              : 
      75              : //
      76              : // ComputeControlPlane
      77              : //
      78              : pub struct ComputeControlPlane {
      79              :     base_port: u16,
      80              : 
      81              :     // endpoint ID is the key
      82              :     pub endpoints: BTreeMap<String, Arc<Endpoint>>,
      83              : 
      84              :     env: LocalEnv,
      85              :     pageserver: Arc<PageServerNode>,
      86              : }
      87              : 
      88              : impl ComputeControlPlane {
      89              :     // Load current endpoints from the endpoints/ subdirectories
      90         1916 :     pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
      91         1916 :         let pageserver = Arc::new(PageServerNode::from_env(&env));
      92         1916 : 
      93         1916 :         let mut endpoints = BTreeMap::default();
      94         6870 :         for endpoint_dir in std::fs::read_dir(env.endpoints_path())
      95         1916 :             .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
      96         6870 :         {
      97         6870 :             let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
      98         6870 :             endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
      99              :         }
     100              : 
     101         1916 :         Ok(ComputeControlPlane {
     102         1916 :             base_port: 55431,
     103         1916 :             endpoints,
     104         1916 :             env,
     105         1916 :             pageserver,
     106         1916 :         })
     107         1916 :     }
     108              : 
     109            6 :     fn get_port(&mut self) -> u16 {
     110            6 :         1 + self
     111            6 :             .endpoints
     112            6 :             .values()
     113            6 :             .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port()))
     114            6 :             .max()
     115            6 :             .unwrap_or(self.base_port)
     116            6 :     }
     117              : 
     118              :     #[allow(clippy::too_many_arguments)]
     119          599 :     pub fn new_endpoint(
     120          599 :         &mut self,
     121          599 :         endpoint_id: &str,
     122          599 :         tenant_id: TenantId,
     123          599 :         timeline_id: TimelineId,
     124          599 :         pg_port: Option<u16>,
     125          599 :         http_port: Option<u16>,
     126          599 :         pg_version: u32,
     127          599 :         mode: ComputeMode,
     128          599 :     ) -> Result<Arc<Endpoint>> {
     129          599 :         let pg_port = pg_port.unwrap_or_else(|| self.get_port());
     130          599 :         let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
     131          599 :         let ep = Arc::new(Endpoint {
     132          599 :             endpoint_id: endpoint_id.to_owned(),
     133          599 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
     134          599 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
     135          599 :             env: self.env.clone(),
     136          599 :             pageserver: Arc::clone(&self.pageserver),
     137          599 :             timeline_id,
     138          599 :             mode,
     139          599 :             tenant_id,
     140          599 :             pg_version,
     141          599 :             // We don't setup roles and databases in the spec locally, so we don't need to
     142          599 :             // do catalog updates. Catalog updates also include check availability
     143          599 :             // data creation. Yet, we have tests that check that size and db dump
     144          599 :             // before and after start are the same. So, skip catalog updates,
     145          599 :             // with this we basically test a case of waking up an idle compute, where
     146          599 :             // we also skip catalog updates in the cloud.
     147          599 :             skip_pg_catalog_updates: true,
     148          599 :         });
     149          599 : 
     150          599 :         ep.create_endpoint_dir()?;
     151              :         std::fs::write(
     152          599 :             ep.endpoint_path().join("endpoint.json"),
     153          599 :             serde_json::to_string_pretty(&EndpointConf {
     154          599 :                 endpoint_id: endpoint_id.to_string(),
     155          599 :                 tenant_id,
     156          599 :                 timeline_id,
     157          599 :                 mode,
     158          599 :                 http_port,
     159          599 :                 pg_port,
     160          599 :                 pg_version,
     161          599 :                 skip_pg_catalog_updates: true,
     162          599 :             })?,
     163            0 :         )?;
     164              :         std::fs::write(
     165          599 :             ep.endpoint_path().join("postgresql.conf"),
     166          599 :             ep.setup_pg_conf()?.to_string(),
     167            0 :         )?;
     168              : 
     169          599 :         self.endpoints
     170          599 :             .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
     171          599 : 
     172          599 :         Ok(ep)
     173          599 :     }
     174              : }
     175              : 
     176              : ///////////////////////////////////////////////////////////////////////////////
     177              : 
     178            0 : #[derive(Debug)]
     179              : pub struct Endpoint {
     180              :     /// used as the directory name
     181              :     endpoint_id: String,
     182              :     pub tenant_id: TenantId,
     183              :     pub timeline_id: TimelineId,
     184              :     pub mode: ComputeMode,
     185              : 
     186              :     // port and address of the Postgres server and `compute_ctl`'s HTTP API
     187              :     pub pg_address: SocketAddr,
     188              :     pub http_address: SocketAddr,
     189              : 
     190              :     // postgres major version in the format: 14, 15, etc.
     191              :     pg_version: u32,
     192              : 
     193              :     // These are not part of the endpoint as such, but the environment
     194              :     // the endpoint runs in.
     195              :     pub env: LocalEnv,
     196              :     pageserver: Arc<PageServerNode>,
     197              : 
     198              :     // Optimizations
     199              :     skip_pg_catalog_updates: bool,
     200              : }
     201              : 
     202              : impl Endpoint {
     203         6870 :     fn from_dir_entry(
     204         6870 :         entry: std::fs::DirEntry,
     205         6870 :         env: &LocalEnv,
     206         6870 :         pageserver: &Arc<PageServerNode>,
     207         6870 :     ) -> Result<Endpoint> {
     208         6870 :         if !entry.file_type()?.is_dir() {
     209            0 :             anyhow::bail!(
     210            0 :                 "Endpoint::from_dir_entry failed: '{}' is not a directory",
     211            0 :                 entry.path().display()
     212            0 :             );
     213         6870 :         }
     214         6870 : 
     215         6870 :         // parse data directory name
     216         6870 :         let fname = entry.file_name();
     217         6870 :         let endpoint_id = fname.to_str().unwrap().to_string();
     218              : 
     219              :         // Read the endpoint.json file
     220         6870 :         let conf: EndpointConf =
     221         6870 :             serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
     222              : 
     223         6870 :         Ok(Endpoint {
     224         6870 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
     225         6870 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
     226         6870 :             endpoint_id,
     227         6870 :             env: env.clone(),
     228         6870 :             pageserver: Arc::clone(pageserver),
     229         6870 :             timeline_id: conf.timeline_id,
     230         6870 :             mode: conf.mode,
     231         6870 :             tenant_id: conf.tenant_id,
     232         6870 :             pg_version: conf.pg_version,
     233         6870 :             skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
     234         6870 :         })
     235         6870 :     }
     236              : 
     237          599 :     fn create_endpoint_dir(&self) -> Result<()> {
     238          599 :         std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
     239            0 :             format!(
     240            0 :                 "could not create endpoint directory {}",
     241            0 :                 self.endpoint_path().display()
     242            0 :             )
     243          599 :         })
     244          599 :     }
     245              : 
     246              :     // Generate postgresql.conf with default configuration
     247          599 :     fn setup_pg_conf(&self) -> Result<PostgresConf> {
     248          599 :         let mut conf = PostgresConf::new();
     249          599 :         conf.append("max_wal_senders", "10");
     250          599 :         conf.append("wal_log_hints", "off");
     251          599 :         conf.append("max_replication_slots", "10");
     252          599 :         conf.append("hot_standby", "on");
     253          599 :         conf.append("shared_buffers", "1MB");
     254          599 :         conf.append("fsync", "off");
     255          599 :         conf.append("max_connections", "100");
     256          599 :         conf.append("wal_level", "replica");
     257          599 :         // wal_sender_timeout is the maximum time to wait for WAL replication.
     258          599 :         // It also defines how often the walreciever will send a feedback message to the wal sender.
     259          599 :         conf.append("wal_sender_timeout", "5s");
     260          599 :         conf.append("listen_addresses", &self.pg_address.ip().to_string());
     261          599 :         conf.append("port", &self.pg_address.port().to_string());
     262          599 :         conf.append("wal_keep_size", "0");
     263          599 :         // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
     264          599 :         conf.append("restart_after_crash", "off");
     265          599 : 
     266          599 :         // Load the 'neon' extension
     267          599 :         conf.append("shared_preload_libraries", "neon");
     268          599 : 
     269          599 :         conf.append_line("");
     270          599 :         // Replication-related configurations, such as WAL sending
     271          599 :         match &self.mode {
     272              :             ComputeMode::Primary => {
     273              :                 // Configure backpressure
     274              :                 // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
     275              :                 //   This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
     276              :                 //   so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
     277              :                 //   Actually latency should be much smaller (better if < 1sec). But we assume that recently
     278              :                 //   updates pages are not requested from pageserver.
     279              :                 // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
     280              :                 //   delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
     281              :                 //   remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
     282              :                 //   recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
     283              :                 // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
     284              :                 //   To be able to restore database in case of pageserver node crash, safekeeper should not
     285              :                 //   remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
     286              :                 //   (if they are not able to upload WAL to S3).
     287          510 :                 conf.append("max_replication_write_lag", "15MB");
     288          510 :                 conf.append("max_replication_flush_lag", "10GB");
     289          510 : 
     290          510 :                 if !self.env.safekeepers.is_empty() {
     291          510 :                     // Configure Postgres to connect to the safekeepers
     292          510 :                     conf.append("synchronous_standby_names", "walproposer");
     293          510 : 
     294          510 :                     let safekeepers = self
     295          510 :                         .env
     296          510 :                         .safekeepers
     297          510 :                         .iter()
     298          695 :                         .map(|sk| format!("localhost:{}", sk.get_compute_port()))
     299          510 :                         .collect::<Vec<String>>()
     300          510 :                         .join(",");
     301          510 :                     conf.append("neon.safekeepers", &safekeepers);
     302          510 :                 } else {
     303            0 :                     // We only use setup without safekeepers for tests,
     304            0 :                     // and don't care about data durability on pageserver,
     305            0 :                     // so set more relaxed synchronous_commit.
     306            0 :                     conf.append("synchronous_commit", "remote_write");
     307            0 : 
     308            0 :                     // Configure the node to stream WAL directly to the pageserver
     309            0 :                     // This isn't really a supported configuration, but can be useful for
     310            0 :                     // testing.
     311            0 :                     conf.append("synchronous_standby_names", "pageserver");
     312            0 :                 }
     313              :             }
     314           88 :             ComputeMode::Static(lsn) => {
     315           88 :                 conf.append("recovery_target_lsn", &lsn.to_string());
     316           88 :             }
     317              :             ComputeMode::Replica => {
     318            1 :                 assert!(!self.env.safekeepers.is_empty());
     319              : 
     320              :                 // TODO: use future host field from safekeeper spec
     321              :                 // Pass the list of safekeepers to the replica so that it can connect to any of them,
     322              :                 // whichever is available.
     323            1 :                 let sk_ports = self
     324            1 :                     .env
     325            1 :                     .safekeepers
     326            1 :                     .iter()
     327            1 :                     .map(|x| x.get_compute_port().to_string())
     328            1 :                     .collect::<Vec<_>>()
     329            1 :                     .join(",");
     330            1 :                 let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
     331            1 : 
     332            1 :                 let connstr = format!(
     333            1 :                     "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
     334            1 :                     sk_hosts,
     335            1 :                     sk_ports,
     336            1 :                     &self.timeline_id.to_string(),
     337            1 :                     &self.tenant_id.to_string(),
     338            1 :                 );
     339            1 : 
     340            1 :                 let slot_name = format!("repl_{}_", self.timeline_id);
     341            1 :                 conf.append("primary_conninfo", connstr.as_str());
     342            1 :                 conf.append("primary_slot_name", slot_name.as_str());
     343            1 :                 conf.append("hot_standby", "on");
     344            1 :                 // prefetching of blocks referenced in WAL doesn't make sense for us
     345            1 :                 // Neon hot standby ignores pages that are not in the shared_buffers
     346            1 :                 if self.pg_version >= 15 {
     347            0 :                     conf.append("recovery_prefetch", "off");
     348            1 :                 }
     349              :             }
     350              :         }
     351              : 
     352          599 :         Ok(conf)
     353          599 :     }
     354              : 
     355         8516 :     pub fn endpoint_path(&self) -> PathBuf {
     356         8516 :         self.env.endpoints_path().join(&self.endpoint_id)
     357         8516 :     }
     358              : 
     359         2743 :     pub fn pgdata(&self) -> PathBuf {
     360         2743 :         self.endpoint_path().join("pgdata")
     361         2743 :     }
     362              : 
     363          663 :     pub fn status(&self) -> &str {
     364          663 :         let timeout = Duration::from_millis(300);
     365          663 :         let has_pidfile = self.pgdata().join("postmaster.pid").exists();
     366          663 :         let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
     367          663 : 
     368          663 :         match (has_pidfile, can_connect) {
     369            0 :             (true, true) => "running",
     370          663 :             (false, false) => "stopped",
     371            0 :             (true, false) => "crashed",
     372            0 :             (false, true) => "running, no pidfile",
     373              :         }
     374          663 :     }
     375              : 
     376          656 :     fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
     377          656 :         let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
     378          656 :         let mut cmd = Command::new(&pg_ctl_path);
     379          656 :         cmd.args(
     380          656 :             [
     381          656 :                 &[
     382          656 :                     "-D",
     383          656 :                     self.pgdata().to_str().unwrap(),
     384          656 :                     "-w", //wait till pg_ctl actually does what was asked
     385          656 :                 ],
     386          656 :                 args,
     387          656 :             ]
     388          656 :             .concat(),
     389          656 :         )
     390          656 :         .env_clear()
     391          656 :         .env(
     392          656 :             "LD_LIBRARY_PATH",
     393          656 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     394          656 :         )
     395          656 :         .env(
     396          656 :             "DYLD_LIBRARY_PATH",
     397          656 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     398              :         );
     399              : 
     400              :         // Pass authentication token used for the connections to pageserver and safekeepers
     401          656 :         if let Some(token) = auth_token {
     402            0 :             cmd.env("NEON_AUTH_TOKEN", token);
     403          656 :         }
     404              : 
     405          656 :         let pg_ctl = cmd
     406          656 :             .output()
     407          656 :             .context(format!("{} failed", pg_ctl_path.display()))?;
     408          656 :         if !pg_ctl.status.success() {
     409           26 :             anyhow::bail!(
     410           26 :                 "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
     411           26 :                 pg_ctl.status,
     412           26 :                 String::from_utf8_lossy(&pg_ctl.stdout),
     413           26 :                 String::from_utf8_lossy(&pg_ctl.stderr),
     414           26 :             );
     415          630 :         }
     416          630 : 
     417          630 :         // Also wait for the compute_ctl process to die. It might have some cleanup
     418          630 :         // work to do after postgres stops, like syncing safekeepers, etc.
     419          630 :         //
     420          630 :         // TODO use background_process::stop_process instead
     421          630 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     422          630 :         let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
     423          630 :         let pid = nix::unistd::Pid::from_raw(pid as i32);
     424          630 :         crate::background_process::wait_until_stopped("compute_ctl", pid)?;
     425              : 
     426          630 :         Ok(())
     427          656 :     }
     428              : 
     429          663 :     pub fn start(
     430          663 :         &self,
     431          663 :         auth_token: &Option<String>,
     432          663 :         safekeepers: Vec<NodeId>,
     433          663 :         remote_ext_config: Option<&String>,
     434          663 :     ) -> Result<()> {
     435          663 :         if self.status() == "running" {
     436            0 :             anyhow::bail!("The endpoint is already running");
     437          663 :         }
     438          663 : 
     439          663 :         // Slurp the endpoints/<endpoint id>/postgresql.conf file into
     440          663 :         // memory. We will include it in the spec file that we pass to
     441          663 :         // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
     442          663 :         // in the data directory.
     443          663 :         let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
     444          663 :         let postgresql_conf = match std::fs::read(&postgresql_conf_path) {
     445          663 :             Ok(content) => String::from_utf8(content)?,
     446            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => "".to_string(),
     447            0 :             Err(e) => {
     448            0 :                 return Err(anyhow::Error::new(e).context(format!(
     449            0 :                     "failed to read config file in {}",
     450            0 :                     postgresql_conf_path.to_str().unwrap()
     451            0 :                 )))
     452              :             }
     453              :         };
     454              : 
     455              :         // We always start the compute node from scratch, so if the Postgres
     456              :         // data dir exists from a previous launch, remove it first.
     457          663 :         if self.pgdata().exists() {
     458           67 :             std::fs::remove_dir_all(self.pgdata())?;
     459          596 :         }
     460              : 
     461          663 :         let pageserver_connstring = {
     462          663 :             let config = &self.pageserver.pg_connection_config;
     463          663 :             let (host, port) = (config.host(), config.port());
     464          663 : 
     465          663 :             // NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
     466          663 :             format!("postgresql://no_user@{host}:{port}")
     467          663 :         };
     468          663 :         let mut safekeeper_connstrings = Vec::new();
     469          663 :         if self.mode == ComputeMode::Primary {
     470         1351 :             for sk_id in safekeepers {
     471          777 :                 let sk = self
     472          777 :                     .env
     473          777 :                     .safekeepers
     474          777 :                     .iter()
     475         1085 :                     .find(|node| node.id == sk_id)
     476          777 :                     .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
     477          777 :                 safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
     478              :             }
     479           89 :         }
     480              : 
     481              :         // Create spec file
     482          663 :         let spec = ComputeSpec {
     483          663 :             skip_pg_catalog_updates: self.skip_pg_catalog_updates,
     484          663 :             format_version: 1.0,
     485          663 :             operation_uuid: None,
     486          663 :             cluster: Cluster {
     487          663 :                 cluster_id: None, // project ID: not used
     488          663 :                 name: None,       // project name: not used
     489          663 :                 state: None,
     490          663 :                 roles: vec![],
     491          663 :                 databases: vec![],
     492          663 :                 settings: None,
     493          663 :                 postgresql_conf: Some(postgresql_conf),
     494          663 :             },
     495          663 :             delta_operations: None,
     496          663 :             tenant_id: Some(self.tenant_id),
     497          663 :             timeline_id: Some(self.timeline_id),
     498          663 :             mode: self.mode,
     499          663 :             pageserver_connstring: Some(pageserver_connstring),
     500          663 :             safekeeper_connstrings,
     501          663 :             storage_auth_token: auth_token.clone(),
     502          663 :             remote_extensions: None,
     503          663 :         };
     504          663 :         let spec_path = self.endpoint_path().join("spec.json");
     505          663 :         std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
     506              : 
     507              :         // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
     508          663 :         let logfile = std::fs::OpenOptions::new()
     509          663 :             .create(true)
     510          663 :             .append(true)
     511          663 :             .open(self.endpoint_path().join("compute.log"))?;
     512              : 
     513              :         // Launch compute_ctl
     514          663 :         println!("Starting postgres node at '{}'", self.connstr());
     515          663 :         let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
     516          663 :         cmd.args(["--http-port", &self.http_address.port().to_string()])
     517          663 :             .args(["--pgdata", self.pgdata().to_str().unwrap()])
     518          663 :             .args(["--connstr", &self.connstr()])
     519          663 :             .args([
     520          663 :                 "--spec-path",
     521          663 :                 self.endpoint_path().join("spec.json").to_str().unwrap(),
     522          663 :             ])
     523          663 :             .args([
     524          663 :                 "--pgbin",
     525          663 :                 self.env
     526          663 :                     .pg_bin_dir(self.pg_version)?
     527          663 :                     .join("postgres")
     528          663 :                     .to_str()
     529          663 :                     .unwrap(),
     530          663 :             ])
     531          663 :             .stdin(std::process::Stdio::null())
     532          663 :             .stderr(logfile.try_clone()?)
     533          663 :             .stdout(logfile);
     534              : 
     535          663 :         if let Some(remote_ext_config) = remote_ext_config {
     536            0 :             cmd.args(["--remote-ext-config", remote_ext_config]);
     537          663 :         }
     538              : 
     539          663 :         let child = cmd.spawn()?;
     540              : 
     541              :         // Write down the pid so we can wait for it when we want to stop
     542              :         // TODO use background_process::start_process instead
     543          663 :         let pid = child.id();
     544          663 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     545          663 :         std::fs::write(pidfile_path, pid.to_string())?;
     546              : 
     547              :         // Wait for it to start
     548          663 :         let mut attempt = 0;
     549              :         const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
     550              :         const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
     551              :         loop {
     552         2041 :             attempt += 1;
     553         2041 :             match self.get_status() {
     554         1382 :                 Ok(state) => {
     555         1382 :                     match state.status {
     556              :                         ComputeStatus::Init => {
     557          719 :                             if attempt == MAX_ATTEMPTS {
     558            0 :                                 bail!("compute startup timed out; still in Init state");
     559          719 :                             }
     560              :                             // keep retrying
     561              :                         }
     562              :                         ComputeStatus::Running => {
     563              :                             // All good!
     564          654 :                             break;
     565              :                         }
     566              :                         ComputeStatus::Failed => {
     567            9 :                             bail!(
     568            9 :                                 "compute startup failed: {}",
     569            9 :                                 state
     570            9 :                                     .error
     571            9 :                                     .as_deref()
     572            9 :                                     .unwrap_or("<no error from compute_ctl>")
     573            9 :                             );
     574              :                         }
     575              :                         ComputeStatus::Empty
     576              :                         | ComputeStatus::ConfigurationPending
     577              :                         | ComputeStatus::Configuration => {
     578            0 :                             bail!("unexpected compute status: {:?}", state.status)
     579              :                         }
     580              :                     }
     581              :                 }
     582          659 :                 Err(e) => {
     583          659 :                     if attempt == MAX_ATTEMPTS {
     584            0 :                         return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
     585          659 :                     }
     586              :                 }
     587              :             }
     588         1378 :             std::thread::sleep(ATTEMPT_INTERVAL);
     589              :         }
     590              : 
     591          654 :         Ok(())
     592          663 :     }
     593              : 
     594              :     // Call the /status HTTP API
     595         2041 :     pub fn get_status(&self) -> Result<ComputeState> {
     596         2041 :         let client = reqwest::blocking::Client::new();
     597              : 
     598         2041 :         let response = client
     599         2041 :             .request(
     600         2041 :                 reqwest::Method::GET,
     601         2041 :                 format!(
     602         2041 :                     "http://{}:{}/status",
     603         2041 :                     self.http_address.ip(),
     604         2041 :                     self.http_address.port()
     605         2041 :                 ),
     606         2041 :             )
     607         2041 :             .send()?;
     608              : 
     609              :         // Interpret the response
     610         1382 :         let status = response.status();
     611         1382 :         if !(status.is_client_error() || status.is_server_error()) {
     612         1382 :             Ok(response.json()?)
     613              :         } else {
     614              :             // reqwest does not export its error construction utility functions, so let's craft the message ourselves
     615            0 :             let url = response.url().to_owned();
     616            0 :             let msg = match response.text() {
     617            0 :                 Ok(err_body) => format!("Error: {}", err_body),
     618            0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
     619              :             };
     620            0 :             Err(anyhow::anyhow!(msg))
     621              :         }
     622         2041 :     }
     623              : 
     624          656 :     pub fn stop(&self, destroy: bool) -> Result<()> {
     625          656 :         // If we are going to destroy data directory,
     626          656 :         // use immediate shutdown mode, otherwise,
     627          656 :         // shutdown gracefully to leave the data directory sane.
     628          656 :         //
     629          656 :         // Postgres is always started from scratch, so stop
     630          656 :         // without destroy only used for testing and debugging.
     631          656 :         //
     632          656 :         if destroy {
     633           31 :             self.pg_ctl(&["-m", "immediate", "stop"], &None)?;
     634           31 :             println!(
     635           31 :                 "Destroying postgres data directory '{}'",
     636           31 :                 self.pgdata().to_str().unwrap()
     637           31 :             );
     638           31 :             std::fs::remove_dir_all(self.endpoint_path())?;
     639              :         } else {
     640          625 :             self.pg_ctl(&["stop"], &None)?;
     641              :         }
     642          630 :         Ok(())
     643          656 :     }
     644              : 
     645         1326 :     pub fn connstr(&self) -> String {
     646         1326 :         format!(
     647         1326 :             "postgresql://{}@{}:{}/{}",
     648         1326 :             "cloud_admin",
     649         1326 :             self.pg_address.ip(),
     650         1326 :             self.pg_address.port(),
     651         1326 :             "postgres"
     652         1326 :         )
     653         1326 :     }
     654              : }
        

Generated by: LCOV version 2.1-beta