LCOV - differential code coverage report
Current view: top level - control_plane/src - endpoint.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 90.3 % 433 391 42 391
Current Date: 2023-10-19 02:04:12 Functions: 62.8 % 43 27 16 27
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Code to manage compute endpoints
       2                 : //!
       3                 : //! In the local test environment, the data for each endpoint is stored in
       4                 : //!
       5                 : //! ```text
       6                 : //!   .neon/endpoints/<endpoint id>
       7                 : //! ```
       8                 : //!
       9                 : //! Some basic information about the endpoint, like the tenant and timeline IDs,
      10                 : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
      11                 : //! when the endpoint is created, and doesn't change afterwards.
      12                 : //!
      13                 : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
      14                 : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
      15                 : //! the basebackup from the pageserver to initialize the the data directory, and
      16                 : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
      17                 : //! until it exits.
      18                 : //!
      19                 : //! When an endpoint is created, a `postgresql.conf` file is also created in
      20                 : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
      21                 : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
      22                 : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
      23                 : //! copy of it in the data directory.
      24                 : //!
      25                 : //! Directory contents:
      26                 : //!
      27                 : //! ```text
      28                 : //! .neon/endpoints/main/
      29                 : //!     compute.log               - log output of `compute_ctl` and `postgres`
      30                 : //!     endpoint.json             - serialized `EndpointConf` struct
      31                 : //!     postgresql.conf           - postgresql settings
      32                 : //!     spec.json                 - passed to `compute_ctl`
      33                 : //!     pgdata/
      34                 : //!         postgresql.conf       - copy of postgresql.conf created by `compute_ctl`
      35                 : //!         zenith.signal
      36                 : //!         <other PostgreSQL files>
      37                 : //! ```
      38                 : //!
      39                 : use std::collections::BTreeMap;
      40                 : use std::net::SocketAddr;
      41                 : use std::net::TcpStream;
      42                 : use std::path::PathBuf;
      43                 : use std::process::Command;
      44                 : use std::sync::Arc;
      45                 : use std::time::Duration;
      46                 : 
      47                 : use anyhow::{anyhow, bail, Context, Result};
      48                 : use serde::{Deserialize, Serialize};
      49                 : use serde_with::{serde_as, DisplayFromStr};
      50                 : use utils::id::{NodeId, TenantId, TimelineId};
      51                 : 
      52                 : use crate::local_env::LocalEnv;
      53                 : use crate::pageserver::PageServerNode;
      54                 : use crate::postgresql_conf::PostgresConf;
      55                 : 
      56                 : use compute_api::responses::{ComputeState, ComputeStatus};
      57                 : use compute_api::spec::{Cluster, ComputeMode, ComputeSpec};
      58                 : 
      59                 : // contents of a endpoint.json file
      60                 : #[serde_as]
      61 CBC      120981 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
      62                 : pub struct EndpointConf {
      63                 :     endpoint_id: String,
      64                 :     #[serde_as(as = "DisplayFromStr")]
      65                 :     tenant_id: TenantId,
      66                 :     #[serde_as(as = "DisplayFromStr")]
      67                 :     timeline_id: TimelineId,
      68                 :     mode: ComputeMode,
      69                 :     pg_port: u16,
      70                 :     http_port: u16,
      71                 :     pg_version: u32,
      72                 :     skip_pg_catalog_updates: bool,
      73                 :     pageserver_id: NodeId,
      74                 : }
      75                 : 
      76                 : //
      77                 : // ComputeControlPlane
      78                 : //
      79                 : pub struct ComputeControlPlane {
      80                 :     base_port: u16,
      81                 : 
      82                 :     // endpoint ID is the key
      83                 :     pub endpoints: BTreeMap<String, Arc<Endpoint>>,
      84                 : 
      85                 :     env: LocalEnv,
      86                 : }
      87                 : 
      88                 : impl ComputeControlPlane {
      89                 :     // Load current endpoints from the endpoints/ subdirectories
      90            1842 :     pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
      91            1842 :         let mut endpoints = BTreeMap::default();
      92            5761 :         for endpoint_dir in std::fs::read_dir(env.endpoints_path())
      93            1842 :             .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
      94            5761 :         {
      95            5761 :             let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
      96            5761 :             endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
      97                 :         }
      98                 : 
      99            1842 :         Ok(ComputeControlPlane {
     100            1842 :             base_port: 55431,
     101            1842 :             endpoints,
     102            1842 :             env,
     103            1842 :         })
     104            1842 :     }
     105                 : 
     106               6 :     fn get_port(&mut self) -> u16 {
     107               6 :         1 + self
     108               6 :             .endpoints
     109               6 :             .values()
     110               6 :             .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port()))
     111               6 :             .max()
     112               6 :             .unwrap_or(self.base_port)
     113               6 :     }
     114                 : 
     115                 :     #[allow(clippy::too_many_arguments)]
     116             568 :     pub fn new_endpoint(
     117             568 :         &mut self,
     118             568 :         endpoint_id: &str,
     119             568 :         tenant_id: TenantId,
     120             568 :         timeline_id: TimelineId,
     121             568 :         pg_port: Option<u16>,
     122             568 :         http_port: Option<u16>,
     123             568 :         pg_version: u32,
     124             568 :         mode: ComputeMode,
     125             568 :         pageserver_id: NodeId,
     126             568 :     ) -> Result<Arc<Endpoint>> {
     127             568 :         let pg_port = pg_port.unwrap_or_else(|| self.get_port());
     128             568 :         let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
     129             568 :         let pageserver =
     130             568 :             PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
     131             568 :         let ep = Arc::new(Endpoint {
     132             568 :             endpoint_id: endpoint_id.to_owned(),
     133             568 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
     134             568 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
     135             568 :             env: self.env.clone(),
     136             568 :             pageserver,
     137             568 :             timeline_id,
     138             568 :             mode,
     139             568 :             tenant_id,
     140             568 :             pg_version,
     141             568 :             // We don't setup roles and databases in the spec locally, so we don't need to
     142             568 :             // do catalog updates. Catalog updates also include check availability
     143             568 :             // data creation. Yet, we have tests that check that size and db dump
     144             568 :             // before and after start are the same. So, skip catalog updates,
     145             568 :             // with this we basically test a case of waking up an idle compute, where
     146             568 :             // we also skip catalog updates in the cloud.
     147             568 :             skip_pg_catalog_updates: true,
     148             568 :         });
     149             568 : 
     150             568 :         ep.create_endpoint_dir()?;
     151                 :         std::fs::write(
     152             568 :             ep.endpoint_path().join("endpoint.json"),
     153             568 :             serde_json::to_string_pretty(&EndpointConf {
     154             568 :                 endpoint_id: endpoint_id.to_string(),
     155             568 :                 tenant_id,
     156             568 :                 timeline_id,
     157             568 :                 mode,
     158             568 :                 http_port,
     159             568 :                 pg_port,
     160             568 :                 pg_version,
     161             568 :                 skip_pg_catalog_updates: true,
     162             568 :                 pageserver_id,
     163             568 :             })?,
     164 UBC           0 :         )?;
     165                 :         std::fs::write(
     166 CBC         568 :             ep.endpoint_path().join("postgresql.conf"),
     167             568 :             ep.setup_pg_conf()?.to_string(),
     168 UBC           0 :         )?;
     169                 : 
     170 CBC         568 :         self.endpoints
     171             568 :             .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
     172             568 : 
     173             568 :         Ok(ep)
     174             568 :     }
     175                 : }
     176                 : 
     177                 : ///////////////////////////////////////////////////////////////////////////////
     178                 : 
     179 UBC           0 : #[derive(Debug)]
     180                 : pub struct Endpoint {
     181                 :     /// used as the directory name
     182                 :     endpoint_id: String,
     183                 :     pub tenant_id: TenantId,
     184                 :     pub timeline_id: TimelineId,
     185                 :     pub mode: ComputeMode,
     186                 : 
     187                 :     // port and address of the Postgres server and `compute_ctl`'s HTTP API
     188                 :     pub pg_address: SocketAddr,
     189                 :     pub http_address: SocketAddr,
     190                 : 
     191                 :     // postgres major version in the format: 14, 15, etc.
     192                 :     pg_version: u32,
     193                 : 
     194                 :     // These are not part of the endpoint as such, but the environment
     195                 :     // the endpoint runs in.
     196                 :     pub env: LocalEnv,
     197                 :     pageserver: PageServerNode,
     198                 : 
     199                 :     // Optimizations
     200                 :     skip_pg_catalog_updates: bool,
     201                 : }
     202                 : 
     203                 : impl Endpoint {
     204 CBC        5761 :     fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
     205            5761 :         if !entry.file_type()?.is_dir() {
     206 UBC           0 :             anyhow::bail!(
     207               0 :                 "Endpoint::from_dir_entry failed: '{}' is not a directory",
     208               0 :                 entry.path().display()
     209               0 :             );
     210 CBC        5761 :         }
     211            5761 : 
     212            5761 :         // parse data directory name
     213            5761 :         let fname = entry.file_name();
     214            5761 :         let endpoint_id = fname.to_str().unwrap().to_string();
     215                 : 
     216                 :         // Read the endpoint.json file
     217            5761 :         let conf: EndpointConf =
     218            5761 :             serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
     219                 : 
     220            5761 :         let pageserver =
     221            5761 :             PageServerNode::from_env(env, env.get_pageserver_conf(conf.pageserver_id)?);
     222                 : 
     223            5761 :         Ok(Endpoint {
     224            5761 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
     225            5761 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
     226            5761 :             endpoint_id,
     227            5761 :             env: env.clone(),
     228            5761 :             pageserver,
     229            5761 :             timeline_id: conf.timeline_id,
     230            5761 :             mode: conf.mode,
     231            5761 :             tenant_id: conf.tenant_id,
     232            5761 :             pg_version: conf.pg_version,
     233            5761 :             skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
     234            5761 :         })
     235            5761 :     }
     236                 : 
     237             568 :     fn create_endpoint_dir(&self) -> Result<()> {
     238             568 :         std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
     239 UBC           0 :             format!(
     240               0 :                 "could not create endpoint directory {}",
     241               0 :                 self.endpoint_path().display()
     242               0 :             )
     243 CBC         568 :         })
     244             568 :     }
     245                 : 
     246                 :     // Generate postgresql.conf with default configuration
     247             568 :     fn setup_pg_conf(&self) -> Result<PostgresConf> {
     248             568 :         let mut conf = PostgresConf::new();
     249             568 :         conf.append("max_wal_senders", "10");
     250             568 :         conf.append("wal_log_hints", "off");
     251             568 :         conf.append("max_replication_slots", "10");
     252             568 :         conf.append("hot_standby", "on");
     253             568 :         conf.append("shared_buffers", "1MB");
     254             568 :         conf.append("fsync", "off");
     255             568 :         conf.append("max_connections", "100");
     256             568 :         conf.append("wal_level", "logical");
     257             568 :         // wal_sender_timeout is the maximum time to wait for WAL replication.
     258             568 :         // It also defines how often the walreciever will send a feedback message to the wal sender.
     259             568 :         conf.append("wal_sender_timeout", "5s");
     260             568 :         conf.append("listen_addresses", &self.pg_address.ip().to_string());
     261             568 :         conf.append("port", &self.pg_address.port().to_string());
     262             568 :         conf.append("wal_keep_size", "0");
     263             568 :         // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
     264             568 :         conf.append("restart_after_crash", "off");
     265             568 : 
     266             568 :         // Load the 'neon' extension
     267             568 :         conf.append("shared_preload_libraries", "neon");
     268             568 : 
     269             568 :         conf.append_line("");
     270             568 :         // Replication-related configurations, such as WAL sending
     271             568 :         match &self.mode {
     272                 :             ComputeMode::Primary => {
     273                 :                 // Configure backpressure
     274                 :                 // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
     275                 :                 //   This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
     276                 :                 //   so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
     277                 :                 //   Actually latency should be much smaller (better if < 1sec). But we assume that recently
     278                 :                 //   updates pages are not requested from pageserver.
     279                 :                 // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
     280                 :                 //   delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
     281                 :                 //   remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
     282                 :                 //   recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
     283                 :                 // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
     284                 :                 //   To be able to restore database in case of pageserver node crash, safekeeper should not
     285                 :                 //   remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
     286                 :                 //   (if they are not able to upload WAL to S3).
     287             479 :                 conf.append("max_replication_write_lag", "15MB");
     288             479 :                 conf.append("max_replication_flush_lag", "10GB");
     289             479 : 
     290             479 :                 if !self.env.safekeepers.is_empty() {
     291             479 :                     // Configure Postgres to connect to the safekeepers
     292             479 :                     conf.append("synchronous_standby_names", "walproposer");
     293             479 : 
     294             479 :                     let safekeepers = self
     295             479 :                         .env
     296             479 :                         .safekeepers
     297             479 :                         .iter()
     298             650 :                         .map(|sk| format!("localhost:{}", sk.get_compute_port()))
     299             479 :                         .collect::<Vec<String>>()
     300             479 :                         .join(",");
     301             479 :                     conf.append("neon.safekeepers", &safekeepers);
     302             479 :                 } else {
     303 UBC           0 :                     // We only use setup without safekeepers for tests,
     304               0 :                     // and don't care about data durability on pageserver,
     305               0 :                     // so set more relaxed synchronous_commit.
     306               0 :                     conf.append("synchronous_commit", "remote_write");
     307               0 : 
     308               0 :                     // Configure the node to stream WAL directly to the pageserver
     309               0 :                     // This isn't really a supported configuration, but can be useful for
     310               0 :                     // testing.
     311               0 :                     conf.append("synchronous_standby_names", "pageserver");
     312               0 :                 }
     313                 :             }
     314 CBC          88 :             ComputeMode::Static(lsn) => {
     315              88 :                 conf.append("recovery_target_lsn", &lsn.to_string());
     316              88 :             }
     317                 :             ComputeMode::Replica => {
     318               1 :                 assert!(!self.env.safekeepers.is_empty());
     319                 : 
     320                 :                 // TODO: use future host field from safekeeper spec
     321                 :                 // Pass the list of safekeepers to the replica so that it can connect to any of them,
     322                 :                 // whichever is available.
     323               1 :                 let sk_ports = self
     324               1 :                     .env
     325               1 :                     .safekeepers
     326               1 :                     .iter()
     327               1 :                     .map(|x| x.get_compute_port().to_string())
     328               1 :                     .collect::<Vec<_>>()
     329               1 :                     .join(",");
     330               1 :                 let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
     331               1 : 
     332               1 :                 let connstr = format!(
     333               1 :                     "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
     334               1 :                     sk_hosts,
     335               1 :                     sk_ports,
     336               1 :                     &self.timeline_id.to_string(),
     337               1 :                     &self.tenant_id.to_string(),
     338               1 :                 );
     339               1 : 
     340               1 :                 let slot_name = format!("repl_{}_", self.timeline_id);
     341               1 :                 conf.append("primary_conninfo", connstr.as_str());
     342               1 :                 conf.append("primary_slot_name", slot_name.as_str());
     343               1 :                 conf.append("hot_standby", "on");
     344               1 :                 // prefetching of blocks referenced in WAL doesn't make sense for us
     345               1 :                 // Neon hot standby ignores pages that are not in the shared_buffers
     346               1 :                 if self.pg_version >= 15 {
     347 UBC           0 :                     conf.append("recovery_prefetch", "off");
     348 CBC           1 :                 }
     349                 :             }
     350                 :         }
     351                 : 
     352             568 :         Ok(conf)
     353             568 :     }
     354                 : 
     355            8217 :     pub fn endpoint_path(&self) -> PathBuf {
     356            8217 :         self.env.endpoints_path().join(&self.endpoint_id)
     357            8217 :     }
     358                 : 
     359            2664 :     pub fn pgdata(&self) -> PathBuf {
     360            2664 :         self.endpoint_path().join("pgdata")
     361            2664 :     }
     362                 : 
     363             641 :     pub fn status(&self) -> &str {
     364             641 :         let timeout = Duration::from_millis(300);
     365             641 :         let has_pidfile = self.pgdata().join("postmaster.pid").exists();
     366             641 :         let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
     367             641 : 
     368             641 :         match (has_pidfile, can_connect) {
     369 UBC           0 :             (true, true) => "running",
     370 CBC         641 :             (false, false) => "stopped",
     371 UBC           0 :             (true, false) => "crashed",
     372               0 :             (false, true) => "running, no pidfile",
     373                 :         }
     374 CBC         641 :     }
     375                 : 
     376             634 :     fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
     377             634 :         let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
     378             634 :         let mut cmd = Command::new(&pg_ctl_path);
     379             634 :         cmd.args(
     380             634 :             [
     381             634 :                 &[
     382             634 :                     "-D",
     383             634 :                     self.pgdata().to_str().unwrap(),
     384             634 :                     "-w", //wait till pg_ctl actually does what was asked
     385             634 :                 ],
     386             634 :                 args,
     387             634 :             ]
     388             634 :             .concat(),
     389             634 :         )
     390             634 :         .env_clear()
     391             634 :         .env(
     392             634 :             "LD_LIBRARY_PATH",
     393             634 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     394             634 :         )
     395             634 :         .env(
     396             634 :             "DYLD_LIBRARY_PATH",
     397             634 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     398                 :         );
     399                 : 
     400                 :         // Pass authentication token used for the connections to pageserver and safekeepers
     401             634 :         if let Some(token) = auth_token {
     402 UBC           0 :             cmd.env("NEON_AUTH_TOKEN", token);
     403 CBC         634 :         }
     404                 : 
     405             634 :         let pg_ctl = cmd
     406             634 :             .output()
     407             634 :             .context(format!("{} failed", pg_ctl_path.display()))?;
     408             634 :         if !pg_ctl.status.success() {
     409              21 :             anyhow::bail!(
     410              21 :                 "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
     411              21 :                 pg_ctl.status,
     412              21 :                 String::from_utf8_lossy(&pg_ctl.stdout),
     413              21 :                 String::from_utf8_lossy(&pg_ctl.stderr),
     414              21 :             );
     415             613 :         }
     416             613 : 
     417             613 :         // Also wait for the compute_ctl process to die. It might have some cleanup
     418             613 :         // work to do after postgres stops, like syncing safekeepers, etc.
     419             613 :         //
     420             613 :         // TODO use background_process::stop_process instead
     421             613 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     422             613 :         let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
     423             613 :         let pid = nix::unistd::Pid::from_raw(pid as i32);
     424             613 :         crate::background_process::wait_until_stopped("compute_ctl", pid)?;
     425                 : 
     426             613 :         Ok(())
     427             634 :     }
     428                 : 
     429             641 :     pub fn start(
     430             641 :         &self,
     431             641 :         auth_token: &Option<String>,
     432             641 :         safekeepers: Vec<NodeId>,
     433             641 :         remote_ext_config: Option<&String>,
     434             641 :     ) -> Result<()> {
     435             641 :         if self.status() == "running" {
     436 UBC           0 :             anyhow::bail!("The endpoint is already running");
     437 CBC         641 :         }
     438             641 : 
     439             641 :         // Slurp the endpoints/<endpoint id>/postgresql.conf file into
     440             641 :         // memory. We will include it in the spec file that we pass to
     441             641 :         // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
     442             641 :         // in the data directory.
     443             641 :         let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
     444             641 :         let postgresql_conf = match std::fs::read(&postgresql_conf_path) {
     445             641 :             Ok(content) => String::from_utf8(content)?,
     446 UBC           0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => "".to_string(),
     447               0 :             Err(e) => {
     448               0 :                 return Err(anyhow::Error::new(e).context(format!(
     449               0 :                     "failed to read config file in {}",
     450               0 :                     postgresql_conf_path.to_str().unwrap()
     451               0 :                 )))
     452                 :             }
     453                 :         };
     454                 : 
     455                 :         // We always start the compute node from scratch, so if the Postgres
     456                 :         // data dir exists from a previous launch, remove it first.
     457 CBC         641 :         if self.pgdata().exists() {
     458              76 :             std::fs::remove_dir_all(self.pgdata())?;
     459             565 :         }
     460                 : 
     461             641 :         let pageserver_connstring = {
     462             641 :             let config = &self.pageserver.pg_connection_config;
     463             641 :             let (host, port) = (config.host(), config.port());
     464             641 : 
     465             641 :             // NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
     466             641 :             format!("postgresql://no_user@{host}:{port}")
     467             641 :         };
     468             641 :         let mut safekeeper_connstrings = Vec::new();
     469             641 :         if self.mode == ComputeMode::Primary {
     470            1301 :             for sk_id in safekeepers {
     471             749 :                 let sk = self
     472             749 :                     .env
     473             749 :                     .safekeepers
     474             749 :                     .iter()
     475            1048 :                     .find(|node| node.id == sk_id)
     476             749 :                     .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
     477             749 :                 safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
     478                 :             }
     479              89 :         }
     480                 : 
     481                 :         // Create spec file
     482             641 :         let spec = ComputeSpec {
     483             641 :             skip_pg_catalog_updates: self.skip_pg_catalog_updates,
     484             641 :             format_version: 1.0,
     485             641 :             operation_uuid: None,
     486             641 :             cluster: Cluster {
     487             641 :                 cluster_id: None, // project ID: not used
     488             641 :                 name: None,       // project name: not used
     489             641 :                 state: None,
     490             641 :                 roles: vec![],
     491             641 :                 databases: vec![],
     492             641 :                 settings: None,
     493             641 :                 postgresql_conf: Some(postgresql_conf),
     494             641 :             },
     495             641 :             delta_operations: None,
     496             641 :             tenant_id: Some(self.tenant_id),
     497             641 :             timeline_id: Some(self.timeline_id),
     498             641 :             mode: self.mode,
     499             641 :             pageserver_connstring: Some(pageserver_connstring),
     500             641 :             safekeeper_connstrings,
     501             641 :             storage_auth_token: auth_token.clone(),
     502             641 :             remote_extensions: None,
     503             641 :         };
     504             641 :         let spec_path = self.endpoint_path().join("spec.json");
     505             641 :         std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
     506                 : 
     507                 :         // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
     508             641 :         let logfile = std::fs::OpenOptions::new()
     509             641 :             .create(true)
     510             641 :             .append(true)
     511             641 :             .open(self.endpoint_path().join("compute.log"))?;
     512                 : 
     513                 :         // Launch compute_ctl
     514             641 :         println!("Starting postgres node at '{}'", self.connstr());
     515             641 :         let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
     516             641 :         cmd.args(["--http-port", &self.http_address.port().to_string()])
     517             641 :             .args(["--pgdata", self.pgdata().to_str().unwrap()])
     518             641 :             .args(["--connstr", &self.connstr()])
     519             641 :             .args([
     520             641 :                 "--spec-path",
     521             641 :                 self.endpoint_path().join("spec.json").to_str().unwrap(),
     522             641 :             ])
     523             641 :             .args([
     524             641 :                 "--pgbin",
     525             641 :                 self.env
     526             641 :                     .pg_bin_dir(self.pg_version)?
     527             641 :                     .join("postgres")
     528             641 :                     .to_str()
     529             641 :                     .unwrap(),
     530             641 :             ])
     531             641 :             .stdin(std::process::Stdio::null())
     532             641 :             .stderr(logfile.try_clone()?)
     533             641 :             .stdout(logfile);
     534                 : 
     535             641 :         if let Some(remote_ext_config) = remote_ext_config {
     536 UBC           0 :             cmd.args(["--remote-ext-config", remote_ext_config]);
     537 CBC         641 :         }
     538                 : 
     539             641 :         let child = cmd.spawn()?;
     540                 : 
     541                 :         // Write down the pid so we can wait for it when we want to stop
     542                 :         // TODO use background_process::start_process instead
     543             641 :         let pid = child.id();
     544             641 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     545             641 :         std::fs::write(pidfile_path, pid.to_string())?;
     546                 : 
     547                 :         // Wait for it to start
     548             641 :         let mut attempt = 0;
     549                 :         const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
     550                 :         const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
     551                 :         loop {
     552            2069 :             attempt += 1;
     553            2069 :             match self.get_status() {
     554            1436 :                 Ok(state) => {
     555            1436 :                     match state.status {
     556                 :                         ComputeStatus::Init => {
     557             795 :                             if attempt == MAX_ATTEMPTS {
     558 UBC           0 :                                 bail!("compute startup timed out; still in Init state");
     559 CBC         795 :                             }
     560                 :                             // keep retrying
     561                 :                         }
     562                 :                         ComputeStatus::Running => {
     563                 :                             // All good!
     564             632 :                             break;
     565                 :                         }
     566                 :                         ComputeStatus::Failed => {
     567               9 :                             bail!(
     568               9 :                                 "compute startup failed: {}",
     569               9 :                                 state
     570               9 :                                     .error
     571               9 :                                     .as_deref()
     572               9 :                                     .unwrap_or("<no error from compute_ctl>")
     573               9 :                             );
     574                 :                         }
     575                 :                         ComputeStatus::Empty
     576                 :                         | ComputeStatus::ConfigurationPending
     577                 :                         | ComputeStatus::Configuration => {
     578 UBC           0 :                             bail!("unexpected compute status: {:?}", state.status)
     579                 :                         }
     580                 :                     }
     581                 :                 }
     582 CBC         633 :                 Err(e) => {
     583             633 :                     if attempt == MAX_ATTEMPTS {
     584 UBC           0 :                         return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
     585 CBC         633 :                     }
     586                 :                 }
     587                 :             }
     588            1428 :             std::thread::sleep(ATTEMPT_INTERVAL);
     589                 :         }
     590                 : 
     591             632 :         Ok(())
     592             641 :     }
     593                 : 
     594                 :     // Call the /status HTTP API
     595            2069 :     pub fn get_status(&self) -> Result<ComputeState> {
     596            2069 :         let client = reqwest::blocking::Client::new();
     597                 : 
     598            2069 :         let response = client
     599            2069 :             .request(
     600            2069 :                 reqwest::Method::GET,
     601            2069 :                 format!(
     602            2069 :                     "http://{}:{}/status",
     603            2069 :                     self.http_address.ip(),
     604            2069 :                     self.http_address.port()
     605            2069 :                 ),
     606            2069 :             )
     607            2069 :             .send()?;
     608                 : 
     609                 :         // Interpret the response
     610            1436 :         let status = response.status();
     611            1436 :         if !(status.is_client_error() || status.is_server_error()) {
     612            1436 :             Ok(response.json()?)
     613                 :         } else {
     614                 :             // reqwest does not export its error construction utility functions, so let's craft the message ourselves
     615 UBC           0 :             let url = response.url().to_owned();
     616               0 :             let msg = match response.text() {
     617               0 :                 Ok(err_body) => format!("Error: {}", err_body),
     618               0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
     619                 :             };
     620               0 :             Err(anyhow::anyhow!(msg))
     621                 :         }
     622 CBC        2069 :     }
     623                 : 
     624             634 :     pub fn stop(&self, destroy: bool) -> Result<()> {
     625             634 :         // If we are going to destroy data directory,
     626             634 :         // use immediate shutdown mode, otherwise,
     627             634 :         // shutdown gracefully to leave the data directory sane.
     628             634 :         //
     629             634 :         // Postgres is always started from scratch, so stop
     630             634 :         // without destroy only used for testing and debugging.
     631             634 :         //
     632             634 :         if destroy {
     633              31 :             self.pg_ctl(&["-m", "immediate", "stop"], &None)?;
     634              31 :             println!(
     635              31 :                 "Destroying postgres data directory '{}'",
     636              31 :                 self.pgdata().to_str().unwrap()
     637              31 :             );
     638              31 :             std::fs::remove_dir_all(self.endpoint_path())?;
     639                 :         } else {
     640             603 :             self.pg_ctl(&["stop"], &None)?;
     641                 :         }
     642             613 :         Ok(())
     643             634 :     }
     644                 : 
     645            1282 :     pub fn connstr(&self) -> String {
     646            1282 :         format!(
     647            1282 :             "postgresql://{}@{}:{}/{}",
     648            1282 :             "cloud_admin",
     649            1282 :             self.pg_address.ip(),
     650            1282 :             self.pg_address.port(),
     651            1282 :             "postgres"
     652            1282 :         )
     653            1282 :     }
     654                 : }
        

Generated by: LCOV version 2.1-beta