LCOV - differential code coverage report
Current view: top level - control_plane/src - endpoint.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 89.7 % 516 463 53 463
Current Date: 2024-01-09 02:06:09 Functions: 73.5 % 49 36 13 36
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Code to manage compute endpoints
       2                 : //!
       3                 : //! In the local test environment, the data for each endpoint is stored in
       4                 : //!
       5                 : //! ```text
       6                 : //!   .neon/endpoints/<endpoint id>
       7                 : //! ```
       8                 : //!
       9                 : //! Some basic information about the endpoint, like the tenant and timeline IDs,
      10                 : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
      11                 : //! when the endpoint is created, and doesn't change afterwards.
      12                 : //!
      13                 : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
      14                 : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
      15                 : //! the basebackup from the pageserver to initialize the the data directory, and
      16                 : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
      17                 : //! until it exits.
      18                 : //!
      19                 : //! When an endpoint is created, a `postgresql.conf` file is also created in
      20                 : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
      21                 : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
      22                 : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
      23                 : //! copy of it in the data directory.
      24                 : //!
      25                 : //! Directory contents:
      26                 : //!
      27                 : //! ```text
      28                 : //! .neon/endpoints/main/
      29                 : //!     compute.log               - log output of `compute_ctl` and `postgres`
      30                 : //!     endpoint.json             - serialized `EndpointConf` struct
      31                 : //!     postgresql.conf           - postgresql settings
      32                 : //!     spec.json                 - passed to `compute_ctl`
      33                 : //!     pgdata/
      34                 : //!         postgresql.conf       - copy of postgresql.conf created by `compute_ctl`
      35                 : //!         zenith.signal
      36                 : //!         <other PostgreSQL files>
      37                 : //! ```
      38                 : //!
      39                 : use std::collections::BTreeMap;
      40                 : use std::net::SocketAddr;
      41                 : use std::net::TcpStream;
      42                 : use std::path::PathBuf;
      43                 : use std::process::Command;
      44                 : use std::sync::Arc;
      45                 : use std::time::Duration;
      46                 : 
      47                 : use anyhow::{anyhow, bail, Context, Result};
      48                 : use compute_api::spec::RemoteExtSpec;
      49                 : use nix::sys::signal::kill;
      50                 : use nix::sys::signal::Signal;
      51                 : use serde::{Deserialize, Serialize};
      52                 : use utils::id::{NodeId, TenantId, TimelineId};
      53                 : 
      54                 : use crate::local_env::LocalEnv;
      55                 : use crate::pageserver::PageServerNode;
      56                 : use crate::postgresql_conf::PostgresConf;
      57                 : 
      58                 : use compute_api::responses::{ComputeState, ComputeStatus};
      59                 : use compute_api::spec::{Cluster, ComputeMode, ComputeSpec};
      60                 : 
      61                 : // contents of a endpoint.json file
      62 CBC       73549 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
      63                 : pub struct EndpointConf {
      64                 :     endpoint_id: String,
      65                 :     tenant_id: TenantId,
      66                 :     timeline_id: TimelineId,
      67                 :     mode: ComputeMode,
      68                 :     pg_port: u16,
      69                 :     http_port: u16,
      70                 :     pg_version: u32,
      71                 :     skip_pg_catalog_updates: bool,
      72                 :     pageserver_id: NodeId,
      73                 : }
      74                 : 
      75                 : //
      76                 : // ComputeControlPlane
      77                 : //
      78                 : pub struct ComputeControlPlane {
      79                 :     base_port: u16,
      80                 : 
      81                 :     // endpoint ID is the key
      82                 :     pub endpoints: BTreeMap<String, Arc<Endpoint>>,
      83                 : 
      84                 :     env: LocalEnv,
      85                 : }
      86                 : 
      87                 : impl ComputeControlPlane {
      88                 :     // Load current endpoints from the endpoints/ subdirectories
      89            1810 :     pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
      90            1810 :         let mut endpoints = BTreeMap::default();
      91            3654 :         for endpoint_dir in std::fs::read_dir(env.endpoints_path())
      92            1810 :             .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
      93            3654 :         {
      94            3654 :             let ep = Endpoint::from_dir_entry(endpoint_dir?, &env)?;
      95            3654 :             endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
      96                 :         }
      97                 : 
      98            1810 :         Ok(ComputeControlPlane {
      99            1810 :             base_port: 55431,
     100            1810 :             endpoints,
     101            1810 :             env,
     102            1810 :         })
     103            1810 :     }
     104                 : 
     105               6 :     fn get_port(&mut self) -> u16 {
     106               6 :         1 + self
     107               6 :             .endpoints
     108               6 :             .values()
     109               6 :             .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port()))
     110               6 :             .max()
     111               6 :             .unwrap_or(self.base_port)
     112               6 :     }
     113                 : 
     114                 :     #[allow(clippy::too_many_arguments)]
     115             484 :     pub fn new_endpoint(
     116             484 :         &mut self,
     117             484 :         endpoint_id: &str,
     118             484 :         tenant_id: TenantId,
     119             484 :         timeline_id: TimelineId,
     120             484 :         pg_port: Option<u16>,
     121             484 :         http_port: Option<u16>,
     122             484 :         pg_version: u32,
     123             484 :         mode: ComputeMode,
     124             484 :         pageserver_id: NodeId,
     125             484 :     ) -> Result<Arc<Endpoint>> {
     126             484 :         let pg_port = pg_port.unwrap_or_else(|| self.get_port());
     127             484 :         let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
     128             484 :         let pageserver =
     129             484 :             PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
     130                 : 
     131             484 :         let ep = Arc::new(Endpoint {
     132             484 :             endpoint_id: endpoint_id.to_owned(),
     133             484 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
     134             484 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
     135             484 :             env: self.env.clone(),
     136             484 :             pageserver,
     137             484 :             timeline_id,
     138             484 :             mode,
     139             484 :             tenant_id,
     140             484 :             pg_version,
     141             484 :             // We don't setup roles and databases in the spec locally, so we don't need to
     142             484 :             // do catalog updates. Catalog updates also include check availability
     143             484 :             // data creation. Yet, we have tests that check that size and db dump
     144             484 :             // before and after start are the same. So, skip catalog updates,
     145             484 :             // with this we basically test a case of waking up an idle compute, where
     146             484 :             // we also skip catalog updates in the cloud.
     147             484 :             skip_pg_catalog_updates: true,
     148             484 :         });
     149             484 : 
     150             484 :         ep.create_endpoint_dir()?;
     151                 :         std::fs::write(
     152             484 :             ep.endpoint_path().join("endpoint.json"),
     153             484 :             serde_json::to_string_pretty(&EndpointConf {
     154             484 :                 endpoint_id: endpoint_id.to_string(),
     155             484 :                 tenant_id,
     156             484 :                 timeline_id,
     157             484 :                 mode,
     158             484 :                 http_port,
     159             484 :                 pg_port,
     160             484 :                 pg_version,
     161             484 :                 skip_pg_catalog_updates: true,
     162             484 :                 pageserver_id,
     163             484 :             })?,
     164 UBC           0 :         )?;
     165                 :         std::fs::write(
     166 CBC         484 :             ep.endpoint_path().join("postgresql.conf"),
     167             484 :             ep.setup_pg_conf()?.to_string(),
     168 UBC           0 :         )?;
     169                 : 
     170 CBC         484 :         self.endpoints
     171             484 :             .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
     172             484 : 
     173             484 :         Ok(ep)
     174             484 :     }
     175                 : 
     176            1051 :     pub fn check_conflicting_endpoints(
     177            1051 :         &self,
     178            1051 :         mode: ComputeMode,
     179            1051 :         tenant_id: TenantId,
     180            1051 :         timeline_id: TimelineId,
     181            1051 :     ) -> Result<()> {
     182            1051 :         if matches!(mode, ComputeMode::Primary) {
     183                 :             // this check is not complete, as you could have a concurrent attempt at
     184                 :             // creating another primary, both reading the state before checking it here,
     185                 :             // but it's better than nothing.
     186            1414 :             let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
     187            1414 :                 v.tenant_id == tenant_id
     188            1331 :                     && v.timeline_id == timeline_id
     189             906 :                     && v.mode == mode
     190             902 :                     && v.status() != "stopped"
     191            1414 :             });
     192                 : 
     193             949 :             if let Some((key, _)) = duplicates.next() {
     194              25 :                 bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
     195             924 :             }
     196             102 :         }
     197            1026 :         Ok(())
     198            1051 :     }
     199                 : }
     200                 : 
     201                 : ///////////////////////////////////////////////////////////////////////////////
     202                 : 
     203 UBC           0 : #[derive(Debug)]
     204                 : pub struct Endpoint {
     205                 :     /// used as the directory name
     206                 :     endpoint_id: String,
     207                 :     pub tenant_id: TenantId,
     208                 :     pub timeline_id: TimelineId,
     209                 :     pub mode: ComputeMode,
     210                 : 
     211                 :     // port and address of the Postgres server and `compute_ctl`'s HTTP API
     212                 :     pub pg_address: SocketAddr,
     213                 :     pub http_address: SocketAddr,
     214                 : 
     215                 :     // postgres major version in the format: 14, 15, etc.
     216                 :     pg_version: u32,
     217                 : 
     218                 :     // These are not part of the endpoint as such, but the environment
     219                 :     // the endpoint runs in.
     220                 :     pub env: LocalEnv,
     221                 :     pageserver: PageServerNode,
     222                 : 
     223                 :     // Optimizations
     224                 :     skip_pg_catalog_updates: bool,
     225                 : }
     226                 : 
     227                 : impl Endpoint {
     228 CBC        3654 :     fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
     229            3654 :         if !entry.file_type()?.is_dir() {
     230 UBC           0 :             anyhow::bail!(
     231               0 :                 "Endpoint::from_dir_entry failed: '{}' is not a directory",
     232               0 :                 entry.path().display()
     233               0 :             );
     234 CBC        3654 :         }
     235            3654 : 
     236            3654 :         // parse data directory name
     237            3654 :         let fname = entry.file_name();
     238            3654 :         let endpoint_id = fname.to_str().unwrap().to_string();
     239                 : 
     240                 :         // Read the endpoint.json file
     241            3654 :         let conf: EndpointConf =
     242            3654 :             serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
     243                 : 
     244            3654 :         let pageserver =
     245            3654 :             PageServerNode::from_env(env, env.get_pageserver_conf(conf.pageserver_id)?);
     246                 : 
     247            3654 :         Ok(Endpoint {
     248            3654 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
     249            3654 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
     250            3654 :             endpoint_id,
     251            3654 :             env: env.clone(),
     252            3654 :             pageserver,
     253            3654 :             timeline_id: conf.timeline_id,
     254            3654 :             mode: conf.mode,
     255            3654 :             tenant_id: conf.tenant_id,
     256            3654 :             pg_version: conf.pg_version,
     257            3654 :             skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
     258            3654 :         })
     259            3654 :     }
     260                 : 
     261             484 :     fn create_endpoint_dir(&self) -> Result<()> {
     262             484 :         std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
     263 UBC           0 :             format!(
     264               0 :                 "could not create endpoint directory {}",
     265               0 :                 self.endpoint_path().display()
     266               0 :             )
     267 CBC         484 :         })
     268             484 :     }
     269                 : 
     270                 :     // Generate postgresql.conf with default configuration
     271             484 :     fn setup_pg_conf(&self) -> Result<PostgresConf> {
     272             484 :         let mut conf = PostgresConf::new();
     273             484 :         conf.append("max_wal_senders", "10");
     274             484 :         conf.append("wal_log_hints", "off");
     275             484 :         conf.append("max_replication_slots", "10");
     276             484 :         conf.append("hot_standby", "on");
     277             484 :         conf.append("shared_buffers", "1MB");
     278             484 :         conf.append("fsync", "off");
     279             484 :         conf.append("max_connections", "100");
     280             484 :         conf.append("wal_level", "logical");
     281             484 :         // wal_sender_timeout is the maximum time to wait for WAL replication.
     282             484 :         // It also defines how often the walreciever will send a feedback message to the wal sender.
     283             484 :         conf.append("wal_sender_timeout", "5s");
     284             484 :         conf.append("listen_addresses", &self.pg_address.ip().to_string());
     285             484 :         conf.append("port", &self.pg_address.port().to_string());
     286             484 :         conf.append("wal_keep_size", "0");
     287             484 :         // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
     288             484 :         conf.append("restart_after_crash", "off");
     289             484 : 
     290             484 :         // Load the 'neon' extension
     291             484 :         conf.append("shared_preload_libraries", "neon");
     292             484 : 
     293             484 :         conf.append_line("");
     294             484 :         // Replication-related configurations, such as WAL sending
     295             484 :         match &self.mode {
     296                 :             ComputeMode::Primary => {
     297                 :                 // Configure backpressure
     298                 :                 // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
     299                 :                 //   This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
     300                 :                 //   so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
     301                 :                 //   Actually latency should be much smaller (better if < 1sec). But we assume that recently
     302                 :                 //   updates pages are not requested from pageserver.
     303                 :                 // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
     304                 :                 //   delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
     305                 :                 //   remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
     306                 :                 //   recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
     307                 :                 // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
     308                 :                 //   To be able to restore database in case of pageserver node crash, safekeeper should not
     309                 :                 //   remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
     310                 :                 //   (if they are not able to upload WAL to S3).
     311             433 :                 conf.append("max_replication_write_lag", "15MB");
     312             433 :                 conf.append("max_replication_flush_lag", "10GB");
     313             433 : 
     314             433 :                 if !self.env.safekeepers.is_empty() {
     315             433 :                     // Configure Postgres to connect to the safekeepers
     316             433 :                     conf.append("synchronous_standby_names", "walproposer");
     317             433 : 
     318             433 :                     let safekeepers = self
     319             433 :                         .env
     320             433 :                         .safekeepers
     321             433 :                         .iter()
     322             572 :                         .map(|sk| format!("localhost:{}", sk.get_compute_port()))
     323             433 :                         .collect::<Vec<String>>()
     324             433 :                         .join(",");
     325             433 :                     conf.append("neon.safekeepers", &safekeepers);
     326             433 :                 } else {
     327 UBC           0 :                     // We only use setup without safekeepers for tests,
     328               0 :                     // and don't care about data durability on pageserver,
     329               0 :                     // so set more relaxed synchronous_commit.
     330               0 :                     conf.append("synchronous_commit", "remote_write");
     331               0 : 
     332               0 :                     // Configure the node to stream WAL directly to the pageserver
     333               0 :                     // This isn't really a supported configuration, but can be useful for
     334               0 :                     // testing.
     335               0 :                     conf.append("synchronous_standby_names", "pageserver");
     336               0 :                 }
     337                 :             }
     338 CBC          49 :             ComputeMode::Static(lsn) => {
     339              49 :                 conf.append("recovery_target_lsn", &lsn.to_string());
     340              49 :             }
     341                 :             ComputeMode::Replica => {
     342               2 :                 assert!(!self.env.safekeepers.is_empty());
     343                 : 
     344                 :                 // TODO: use future host field from safekeeper spec
     345                 :                 // Pass the list of safekeepers to the replica so that it can connect to any of them,
     346                 :                 // whichever is available.
     347               2 :                 let sk_ports = self
     348               2 :                     .env
     349               2 :                     .safekeepers
     350               2 :                     .iter()
     351               2 :                     .map(|x| x.get_compute_port().to_string())
     352               2 :                     .collect::<Vec<_>>()
     353               2 :                     .join(",");
     354               2 :                 let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
     355               2 : 
     356               2 :                 let connstr = format!(
     357               2 :                     "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
     358               2 :                     sk_hosts,
     359               2 :                     sk_ports,
     360               2 :                     &self.timeline_id.to_string(),
     361               2 :                     &self.tenant_id.to_string(),
     362               2 :                 );
     363               2 : 
     364               2 :                 let slot_name = format!("repl_{}_", self.timeline_id);
     365               2 :                 conf.append("primary_conninfo", connstr.as_str());
     366               2 :                 conf.append("primary_slot_name", slot_name.as_str());
     367               2 :                 conf.append("hot_standby", "on");
     368               2 :                 // prefetching of blocks referenced in WAL doesn't make sense for us
     369               2 :                 // Neon hot standby ignores pages that are not in the shared_buffers
     370               2 :                 if self.pg_version >= 15 {
     371 UBC           0 :                     conf.append("recovery_prefetch", "off");
     372 CBC           2 :                 }
     373                 :             }
     374                 :         }
     375                 : 
     376             484 :         Ok(conf)
     377             484 :     }
     378                 : 
     379            9100 :     pub fn endpoint_path(&self) -> PathBuf {
     380            9100 :         self.env.endpoints_path().join(&self.endpoint_id)
     381            9100 :     }
     382                 : 
     383            3164 :     pub fn pgdata(&self) -> PathBuf {
     384            3164 :         self.endpoint_path().join("pgdata")
     385            3164 :     }
     386                 : 
     387            1447 :     pub fn status(&self) -> &str {
     388            1447 :         let timeout = Duration::from_millis(300);
     389            1447 :         let has_pidfile = self.pgdata().join("postmaster.pid").exists();
     390            1447 :         let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
     391            1447 : 
     392            1447 :         match (has_pidfile, can_connect) {
     393              25 :             (true, true) => "running",
     394            1422 :             (false, false) => "stopped",
     395 UBC           0 :             (true, false) => "crashed",
     396               0 :             (false, true) => "running, no pidfile",
     397                 :         }
     398 CBC        1447 :     }
     399                 : 
     400             536 :     fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
     401             536 :         let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
     402             536 :         let mut cmd = Command::new(&pg_ctl_path);
     403             536 :         cmd.args(
     404             536 :             [
     405             536 :                 &[
     406             536 :                     "-D",
     407             536 :                     self.pgdata().to_str().unwrap(),
     408             536 :                     "-w", //wait till pg_ctl actually does what was asked
     409             536 :                 ],
     410             536 :                 args,
     411             536 :             ]
     412             536 :             .concat(),
     413             536 :         )
     414             536 :         .env_clear()
     415             536 :         .env(
     416             536 :             "LD_LIBRARY_PATH",
     417             536 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     418             536 :         )
     419             536 :         .env(
     420             536 :             "DYLD_LIBRARY_PATH",
     421             536 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     422                 :         );
     423                 : 
     424                 :         // Pass authentication token used for the connections to pageserver and safekeepers
     425             536 :         if let Some(token) = auth_token {
     426 UBC           0 :             cmd.env("NEON_AUTH_TOKEN", token);
     427 CBC         536 :         }
     428                 : 
     429             536 :         let pg_ctl = cmd
     430             536 :             .output()
     431             536 :             .context(format!("{} failed", pg_ctl_path.display()))?;
     432             536 :         if !pg_ctl.status.success() {
     433 UBC           0 :             anyhow::bail!(
     434               0 :                 "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
     435               0 :                 pg_ctl.status,
     436               0 :                 String::from_utf8_lossy(&pg_ctl.stdout),
     437               0 :                 String::from_utf8_lossy(&pg_ctl.stderr),
     438               0 :             );
     439 CBC         536 :         }
     440             536 : 
     441             536 :         Ok(())
     442             536 :     }
     443                 : 
     444             536 :     fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
     445             536 :         // TODO use background_process::stop_process instead
     446             536 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     447             536 :         let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
     448             536 :         let pid = nix::unistd::Pid::from_raw(pid as i32);
     449             536 :         if send_sigterm {
     450              27 :             kill(pid, Signal::SIGTERM).ok();
     451             509 :         }
     452             536 :         crate::background_process::wait_until_stopped("compute_ctl", pid)?;
     453             536 :         Ok(())
     454             536 :     }
     455                 : 
     456             762 :     fn read_postgresql_conf(&self) -> Result<String> {
     457             762 :         // Slurp the endpoints/<endpoint id>/postgresql.conf file into
     458             762 :         // memory. We will include it in the spec file that we pass to
     459             762 :         // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
     460             762 :         // in the data directory.
     461             762 :         let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
     462             762 :         match std::fs::read(&postgresql_conf_path) {
     463             762 :             Ok(content) => Ok(String::from_utf8(content)?),
     464 UBC           0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
     465               0 :             Err(e) => Err(anyhow::Error::new(e).context(format!(
     466               0 :                 "failed to read config file in {}",
     467               0 :                 postgresql_conf_path.to_str().unwrap()
     468               0 :             ))),
     469                 :         }
     470 CBC         762 :     }
     471                 : 
     472             545 :     pub async fn start(
     473             545 :         &self,
     474             545 :         auth_token: &Option<String>,
     475             545 :         safekeepers: Vec<NodeId>,
     476             545 :         remote_ext_config: Option<&String>,
     477             545 :     ) -> Result<()> {
     478             545 :         if self.status() == "running" {
     479 UBC           0 :             anyhow::bail!("The endpoint is already running");
     480 CBC         545 :         }
     481                 : 
     482             545 :         let postgresql_conf = self.read_postgresql_conf()?;
     483                 : 
     484                 :         // We always start the compute node from scratch, so if the Postgres
     485                 :         // data dir exists from a previous launch, remove it first.
     486             545 :         if self.pgdata().exists() {
     487              64 :             std::fs::remove_dir_all(self.pgdata())?;
     488             481 :         }
     489                 : 
     490             545 :         let pageserver_connstring = {
     491             545 :             let config = &self.pageserver.pg_connection_config;
     492             545 :             let (host, port) = (config.host(), config.port());
     493             545 : 
     494             545 :             // NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere.
     495             545 :             format!("postgresql://no_user@{host}:{port}")
     496             545 :         };
     497             545 :         let mut safekeeper_connstrings = Vec::new();
     498             545 :         if self.mode == ComputeMode::Primary {
     499            1129 :             for sk_id in safekeepers {
     500             635 :                 let sk = self
     501             635 :                     .env
     502             635 :                     .safekeepers
     503             635 :                     .iter()
     504             850 :                     .find(|node| node.id == sk_id)
     505             635 :                     .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
     506             635 :                 safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
     507                 :             }
     508              51 :         }
     509                 : 
     510                 :         // check for file remote_extensions_spec.json
     511                 :         // if it is present, read it and pass to compute_ctl
     512             545 :         let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
     513             545 :         let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
     514                 :         let remote_extensions: Option<RemoteExtSpec>;
     515                 : 
     516             545 :         if let Ok(spec_file) = remote_extensions_spec {
     517               1 :             remote_extensions = serde_json::from_reader(spec_file).ok();
     518             544 :         } else {
     519             544 :             remote_extensions = None;
     520             544 :         };
     521                 : 
     522                 :         // Create spec file
     523             545 :         let spec = ComputeSpec {
     524             545 :             skip_pg_catalog_updates: self.skip_pg_catalog_updates,
     525             545 :             format_version: 1.0,
     526             545 :             operation_uuid: None,
     527             545 :             features: vec![],
     528             545 :             cluster: Cluster {
     529             545 :                 cluster_id: None, // project ID: not used
     530             545 :                 name: None,       // project name: not used
     531             545 :                 state: None,
     532             545 :                 roles: vec![],
     533             545 :                 databases: vec![],
     534             545 :                 settings: None,
     535             545 :                 postgresql_conf: Some(postgresql_conf),
     536             545 :             },
     537             545 :             delta_operations: None,
     538             545 :             tenant_id: Some(self.tenant_id),
     539             545 :             timeline_id: Some(self.timeline_id),
     540             545 :             mode: self.mode,
     541             545 :             pageserver_connstring: Some(pageserver_connstring),
     542             545 :             safekeeper_connstrings,
     543             545 :             storage_auth_token: auth_token.clone(),
     544             545 :             remote_extensions,
     545             545 :             pgbouncer_settings: None,
     546             545 :         };
     547             545 :         let spec_path = self.endpoint_path().join("spec.json");
     548             545 :         std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
     549                 : 
     550                 :         // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
     551             545 :         let logfile = std::fs::OpenOptions::new()
     552             545 :             .create(true)
     553             545 :             .append(true)
     554             545 :             .open(self.endpoint_path().join("compute.log"))?;
     555                 : 
     556                 :         // Launch compute_ctl
     557             545 :         println!("Starting postgres node at '{}'", self.connstr());
     558             545 :         let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
     559             545 :         cmd.args(["--http-port", &self.http_address.port().to_string()])
     560             545 :             .args(["--pgdata", self.pgdata().to_str().unwrap()])
     561             545 :             .args(["--connstr", &self.connstr()])
     562             545 :             .args([
     563             545 :                 "--spec-path",
     564             545 :                 self.endpoint_path().join("spec.json").to_str().unwrap(),
     565             545 :             ])
     566             545 :             .args([
     567             545 :                 "--pgbin",
     568             545 :                 self.env
     569             545 :                     .pg_bin_dir(self.pg_version)?
     570             545 :                     .join("postgres")
     571             545 :                     .to_str()
     572             545 :                     .unwrap(),
     573             545 :             ])
     574             545 :             .stdin(std::process::Stdio::null())
     575             545 :             .stderr(logfile.try_clone()?)
     576             545 :             .stdout(logfile);
     577                 : 
     578             545 :         if let Some(remote_ext_config) = remote_ext_config {
     579               1 :             cmd.args(["--remote-ext-config", remote_ext_config]);
     580             544 :         }
     581                 : 
     582             545 :         let child = cmd.spawn()?;
     583                 : 
     584                 :         // Write down the pid so we can wait for it when we want to stop
     585                 :         // TODO use background_process::start_process instead
     586             545 :         let pid = child.id();
     587             545 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     588             545 :         std::fs::write(pidfile_path, pid.to_string())?;
     589                 : 
     590                 :         // Wait for it to start
     591             545 :         let mut attempt = 0;
     592                 :         const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
     593                 :         const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s
     594                 :         loop {
     595            2420 :             attempt += 1;
     596            6170 :             match self.get_status().await {
     597            1875 :                 Ok(state) => {
     598            1875 :                     match state.status {
     599                 :                         ComputeStatus::Init => {
     600            1330 :                             if attempt == MAX_ATTEMPTS {
     601 UBC           0 :                                 bail!("compute startup timed out; still in Init state");
     602 CBC        1330 :                             }
     603                 :                             // keep retrying
     604                 :                         }
     605                 :                         ComputeStatus::Running => {
     606                 :                             // All good!
     607             537 :                             break;
     608                 :                         }
     609                 :                         ComputeStatus::Failed => {
     610               8 :                             bail!(
     611               8 :                                 "compute startup failed: {}",
     612               8 :                                 state
     613               8 :                                     .error
     614               8 :                                     .as_deref()
     615               8 :                                     .unwrap_or("<no error from compute_ctl>")
     616               8 :                             );
     617                 :                         }
     618                 :                         ComputeStatus::Empty
     619                 :                         | ComputeStatus::ConfigurationPending
     620                 :                         | ComputeStatus::Configuration => {
     621 UBC           0 :                             bail!("unexpected compute status: {:?}", state.status)
     622                 :                         }
     623                 :                     }
     624                 :                 }
     625 CBC         545 :                 Err(e) => {
     626             545 :                     if attempt == MAX_ATTEMPTS {
     627 UBC           0 :                         return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
     628 CBC         545 :                     }
     629                 :                 }
     630                 :             }
     631            1875 :             std::thread::sleep(ATTEMPT_INTERVAL);
     632                 :         }
     633                 : 
     634             537 :         Ok(())
     635             545 :     }
     636                 : 
     637                 :     // Call the /status HTTP API
     638            2420 :     pub async fn get_status(&self) -> Result<ComputeState> {
     639            2420 :         let client = reqwest::Client::new();
     640                 : 
     641            2420 :         let response = client
     642            2420 :             .request(
     643            2420 :                 reqwest::Method::GET,
     644            2420 :                 format!(
     645            2420 :                     "http://{}:{}/status",
     646            2420 :                     self.http_address.ip(),
     647            2420 :                     self.http_address.port()
     648            2420 :                 ),
     649            2420 :             )
     650            2420 :             .send()
     651            6170 :             .await?;
     652                 : 
     653                 :         // Interpret the response
     654            1875 :         let status = response.status();
     655            1875 :         if !(status.is_client_error() || status.is_server_error()) {
     656            1875 :             Ok(response.json().await?)
     657                 :         } else {
     658                 :             // reqwest does not export its error construction utility functions, so let's craft the message ourselves
     659 UBC           0 :             let url = response.url().to_owned();
     660               0 :             let msg = match response.text().await {
     661               0 :                 Ok(err_body) => format!("Error: {}", err_body),
     662               0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
     663                 :             };
     664               0 :             Err(anyhow::anyhow!(msg))
     665                 :         }
     666 CBC        2420 :     }
     667                 : 
     668             217 :     pub async fn reconfigure(&self, pageserver_id: Option<NodeId>) -> Result<()> {
     669             217 :         let mut spec: ComputeSpec = {
     670             217 :             let spec_path = self.endpoint_path().join("spec.json");
     671             217 :             let file = std::fs::File::open(spec_path)?;
     672             217 :             serde_json::from_reader(file)?
     673                 :         };
     674                 : 
     675             217 :         let postgresql_conf = self.read_postgresql_conf()?;
     676             217 :         spec.cluster.postgresql_conf = Some(postgresql_conf);
     677                 : 
     678             217 :         if let Some(pageserver_id) = pageserver_id {
     679             217 :             let endpoint_config_path = self.endpoint_path().join("endpoint.json");
     680             217 :             let mut endpoint_conf: EndpointConf = {
     681             217 :                 let file = std::fs::File::open(&endpoint_config_path)?;
     682             217 :                 serde_json::from_reader(file)?
     683                 :             };
     684             217 :             endpoint_conf.pageserver_id = pageserver_id;
     685             217 :             std::fs::write(
     686             217 :                 endpoint_config_path,
     687             217 :                 serde_json::to_string_pretty(&endpoint_conf)?,
     688 UBC           0 :             )?;
     689                 : 
     690 CBC         217 :             let pageserver =
     691             217 :                 PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
     692             217 :             let ps_http_conf = &pageserver.pg_connection_config;
     693             217 :             let (host, port) = (ps_http_conf.host(), ps_http_conf.port());
     694             217 :             spec.pageserver_connstring = Some(format!("postgresql://no_user@{host}:{port}"));
     695 UBC           0 :         }
     696                 : 
     697 CBC         217 :         let client = reqwest::Client::new();
     698             217 :         let response = client
     699             217 :             .post(format!(
     700             217 :                 "http://{}:{}/configure",
     701             217 :                 self.http_address.ip(),
     702             217 :                 self.http_address.port()
     703             217 :             ))
     704             217 :             .body(format!(
     705             217 :                 "{{\"spec\":{}}}",
     706             217 :                 serde_json::to_string_pretty(&spec)?
     707                 :             ))
     708             217 :             .send()
     709             651 :             .await?;
     710                 : 
     711             217 :         let status = response.status();
     712             217 :         if !(status.is_client_error() || status.is_server_error()) {
     713             217 :             Ok(())
     714                 :         } else {
     715 UBC           0 :             let url = response.url().to_owned();
     716               0 :             let msg = match response.text().await {
     717               0 :                 Ok(err_body) => format!("Error: {}", err_body),
     718               0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
     719                 :             };
     720               0 :             Err(anyhow::anyhow!(msg))
     721                 :         }
     722 CBC         217 :     }
     723                 : 
     724             536 :     pub fn stop(&self, destroy: bool) -> Result<()> {
     725             536 :         // If we are going to destroy data directory,
     726             536 :         // use immediate shutdown mode, otherwise,
     727             536 :         // shutdown gracefully to leave the data directory sane.
     728             536 :         //
     729             536 :         // Postgres is always started from scratch, so stop
     730             536 :         // without destroy only used for testing and debugging.
     731             536 :         //
     732             536 :         self.pg_ctl(
     733             536 :             if destroy {
     734              27 :                 &["-m", "immediate", "stop"]
     735                 :             } else {
     736             509 :                 &["stop"]
     737                 :             },
     738             536 :             &None,
     739 UBC           0 :         )?;
     740                 : 
     741                 :         // Also wait for the compute_ctl process to die. It might have some
     742                 :         // cleanup work to do after postgres stops, like syncing safekeepers,
     743                 :         // etc.
     744                 :         //
     745                 :         // If destroying, send it SIGTERM before waiting. Sometimes we do *not*
     746                 :         // want this cleanup: tests intentionally do stop when majority of
     747                 :         // safekeepers is down, so sync-safekeepers would hang otherwise. This
     748                 :         // could be a separate flag though.
     749 CBC         536 :         self.wait_for_compute_ctl_to_exit(destroy)?;
     750             536 :         if destroy {
     751              27 :             println!(
     752              27 :                 "Destroying postgres data directory '{}'",
     753              27 :                 self.pgdata().to_str().unwrap()
     754              27 :             );
     755              27 :             std::fs::remove_dir_all(self.endpoint_path())?;
     756             509 :         }
     757             536 :         Ok(())
     758             536 :     }
     759                 : 
     760            1090 :     pub fn connstr(&self) -> String {
     761            1090 :         format!(
     762            1090 :             "postgresql://{}@{}:{}/{}",
     763            1090 :             "cloud_admin",
     764            1090 :             self.pg_address.ip(),
     765            1090 :             self.pg_address.port(),
     766            1090 :             "postgres"
     767            1090 :         )
     768            1090 :     }
     769                 : }
        

Generated by: LCOV version 2.1-beta