LCOV - code coverage report
Current view: top level - control_plane/src - endpoint.rs (source / functions) Coverage Total Hit
Test: 2e3a7638747e564a4f6d1af1cc0c3b3438fbb740.info Lines: 0.0 % 583 0
Test Date: 2024-11-20 01:36:58 Functions: 0.0 % 48 0

            Line data    Source code
       1              : //! Code to manage compute endpoints
       2              : //!
       3              : //! In the local test environment, the data for each endpoint is stored in
       4              : //!
       5              : //! ```text
       6              : //!   .neon/endpoints/<endpoint id>
       7              : //! ```
       8              : //!
       9              : //! Some basic information about the endpoint, like the tenant and timeline IDs,
      10              : //! are stored in the `endpoint.json` file. The `endpoint.json` file is created
      11              : //! when the endpoint is created, and doesn't change afterwards.
      12              : //!
      13              : //! The endpoint is managed by the `compute_ctl` binary. When an endpoint is
      14              : //! started, we launch `compute_ctl` It synchronizes the safekeepers, downloads
      15              : //! the basebackup from the pageserver to initialize the data directory, and
      16              : //! finally launches the PostgreSQL process. It watches the PostgreSQL process
      17              : //! until it exits.
      18              : //!
      19              : //! When an endpoint is created, a `postgresql.conf` file is also created in
      20              : //! the endpoint's directory. The file can be modified before starting PostgreSQL.
      21              : //! However, the `postgresql.conf` file in the endpoint directory is not used directly
      22              : //! by PostgreSQL. It is passed to `compute_ctl`, and `compute_ctl` writes another
      23              : //! copy of it in the data directory.
      24              : //!
      25              : //! Directory contents:
      26              : //!
      27              : //! ```text
      28              : //! .neon/endpoints/main/
      29              : //!     compute.log               - log output of `compute_ctl` and `postgres`
      30              : //!     endpoint.json             - serialized `EndpointConf` struct
      31              : //!     postgresql.conf           - postgresql settings
      32              : //!     spec.json                 - passed to `compute_ctl`
      33              : //!     pgdata/
      34              : //!         postgresql.conf       - copy of postgresql.conf created by `compute_ctl`
      35              : //!         zenith.signal
      36              : //!         <other PostgreSQL files>
      37              : //! ```
      38              : //!
      39              : use std::collections::BTreeMap;
      40              : use std::net::SocketAddr;
      41              : use std::net::TcpStream;
      42              : use std::path::PathBuf;
      43              : use std::process::Command;
      44              : use std::str::FromStr;
      45              : use std::sync::Arc;
      46              : use std::time::Duration;
      47              : 
      48              : use anyhow::{anyhow, bail, Context, Result};
      49              : use compute_api::spec::Database;
      50              : use compute_api::spec::PgIdent;
      51              : use compute_api::spec::RemoteExtSpec;
      52              : use compute_api::spec::Role;
      53              : use nix::sys::signal::kill;
      54              : use nix::sys::signal::Signal;
      55              : use pageserver_api::shard::ShardStripeSize;
      56              : use serde::{Deserialize, Serialize};
      57              : use url::Host;
      58              : use utils::id::{NodeId, TenantId, TimelineId};
      59              : 
      60              : use crate::local_env::LocalEnv;
      61              : use crate::postgresql_conf::PostgresConf;
      62              : use crate::storage_controller::StorageController;
      63              : 
      64              : use compute_api::responses::{ComputeState, ComputeStatus};
      65              : use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
      66              : 
      67              : // contents of a endpoint.json file
      68            0 : #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
      69              : pub struct EndpointConf {
      70              :     endpoint_id: String,
      71              :     tenant_id: TenantId,
      72              :     timeline_id: TimelineId,
      73              :     mode: ComputeMode,
      74              :     pg_port: u16,
      75              :     http_port: u16,
      76              :     pg_version: u32,
      77              :     skip_pg_catalog_updates: bool,
      78              :     features: Vec<ComputeFeature>,
      79              : }
      80              : 
      81              : //
      82              : // ComputeControlPlane
      83              : //
      84              : pub struct ComputeControlPlane {
      85              :     base_port: u16,
      86              : 
      87              :     // endpoint ID is the key
      88              :     pub endpoints: BTreeMap<String, Arc<Endpoint>>,
      89              : 
      90              :     env: LocalEnv,
      91              : }
      92              : 
      93              : impl ComputeControlPlane {
      94              :     // Load current endpoints from the endpoints/ subdirectories
      95            0 :     pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
      96            0 :         let mut endpoints = BTreeMap::default();
      97            0 :         for endpoint_dir in std::fs::read_dir(env.endpoints_path())
      98            0 :             .with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
      99              :         {
     100            0 :             let ep_res = Endpoint::from_dir_entry(endpoint_dir?, &env);
     101            0 :             let ep = match ep_res {
     102            0 :                 Ok(ep) => ep,
     103            0 :                 Err(e) => match e.downcast::<std::io::Error>() {
     104            0 :                     Ok(e) => {
     105            0 :                         // A parallel task could delete an endpoint while we have just scanned the directory
     106            0 :                         if e.kind() == std::io::ErrorKind::NotFound {
     107            0 :                             continue;
     108              :                         } else {
     109            0 :                             Err(e)?
     110              :                         }
     111              :                     }
     112            0 :                     Err(e) => Err(e)?,
     113              :                 },
     114              :             };
     115            0 :             endpoints.insert(ep.endpoint_id.clone(), Arc::new(ep));
     116              :         }
     117              : 
     118            0 :         Ok(ComputeControlPlane {
     119            0 :             base_port: 55431,
     120            0 :             endpoints,
     121            0 :             env,
     122            0 :         })
     123            0 :     }
     124              : 
     125            0 :     fn get_port(&mut self) -> u16 {
     126            0 :         1 + self
     127            0 :             .endpoints
     128            0 :             .values()
     129            0 :             .map(|ep| std::cmp::max(ep.pg_address.port(), ep.http_address.port()))
     130            0 :             .max()
     131            0 :             .unwrap_or(self.base_port)
     132            0 :     }
     133              : 
     134              :     #[allow(clippy::too_many_arguments)]
     135            0 :     pub fn new_endpoint(
     136            0 :         &mut self,
     137            0 :         endpoint_id: &str,
     138            0 :         tenant_id: TenantId,
     139            0 :         timeline_id: TimelineId,
     140            0 :         pg_port: Option<u16>,
     141            0 :         http_port: Option<u16>,
     142            0 :         pg_version: u32,
     143            0 :         mode: ComputeMode,
     144            0 :         skip_pg_catalog_updates: bool,
     145            0 :     ) -> Result<Arc<Endpoint>> {
     146            0 :         let pg_port = pg_port.unwrap_or_else(|| self.get_port());
     147            0 :         let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
     148            0 :         let ep = Arc::new(Endpoint {
     149            0 :             endpoint_id: endpoint_id.to_owned(),
     150            0 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
     151            0 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port),
     152            0 :             env: self.env.clone(),
     153            0 :             timeline_id,
     154            0 :             mode,
     155            0 :             tenant_id,
     156            0 :             pg_version,
     157            0 :             // We don't setup roles and databases in the spec locally, so we don't need to
     158            0 :             // do catalog updates. Catalog updates also include check availability
     159            0 :             // data creation. Yet, we have tests that check that size and db dump
     160            0 :             // before and after start are the same. So, skip catalog updates,
     161            0 :             // with this we basically test a case of waking up an idle compute, where
     162            0 :             // we also skip catalog updates in the cloud.
     163            0 :             skip_pg_catalog_updates,
     164            0 :             features: vec![],
     165            0 :         });
     166            0 : 
     167            0 :         ep.create_endpoint_dir()?;
     168              :         std::fs::write(
     169            0 :             ep.endpoint_path().join("endpoint.json"),
     170            0 :             serde_json::to_string_pretty(&EndpointConf {
     171            0 :                 endpoint_id: endpoint_id.to_string(),
     172            0 :                 tenant_id,
     173            0 :                 timeline_id,
     174            0 :                 mode,
     175            0 :                 http_port,
     176            0 :                 pg_port,
     177            0 :                 pg_version,
     178            0 :                 skip_pg_catalog_updates,
     179            0 :                 features: vec![],
     180            0 :             })?,
     181            0 :         )?;
     182              :         std::fs::write(
     183            0 :             ep.endpoint_path().join("postgresql.conf"),
     184            0 :             ep.setup_pg_conf()?.to_string(),
     185            0 :         )?;
     186              : 
     187            0 :         self.endpoints
     188            0 :             .insert(ep.endpoint_id.clone(), Arc::clone(&ep));
     189            0 : 
     190            0 :         Ok(ep)
     191            0 :     }
     192              : 
     193            0 :     pub fn check_conflicting_endpoints(
     194            0 :         &self,
     195            0 :         mode: ComputeMode,
     196            0 :         tenant_id: TenantId,
     197            0 :         timeline_id: TimelineId,
     198            0 :     ) -> Result<()> {
     199            0 :         if matches!(mode, ComputeMode::Primary) {
     200              :             // this check is not complete, as you could have a concurrent attempt at
     201              :             // creating another primary, both reading the state before checking it here,
     202              :             // but it's better than nothing.
     203            0 :             let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
     204            0 :                 v.tenant_id == tenant_id
     205            0 :                     && v.timeline_id == timeline_id
     206            0 :                     && v.mode == mode
     207            0 :                     && v.status() != EndpointStatus::Stopped
     208            0 :             });
     209              : 
     210            0 :             if let Some((key, _)) = duplicates.next() {
     211            0 :                 bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
     212            0 :             }
     213            0 :         }
     214            0 :         Ok(())
     215            0 :     }
     216              : }
     217              : 
     218              : ///////////////////////////////////////////////////////////////////////////////
     219              : 
     220              : #[derive(Debug)]
     221              : pub struct Endpoint {
     222              :     /// used as the directory name
     223              :     endpoint_id: String,
     224              :     pub tenant_id: TenantId,
     225              :     pub timeline_id: TimelineId,
     226              :     pub mode: ComputeMode,
     227              : 
     228              :     // port and address of the Postgres server and `compute_ctl`'s HTTP API
     229              :     pub pg_address: SocketAddr,
     230              :     pub http_address: SocketAddr,
     231              : 
     232              :     // postgres major version in the format: 14, 15, etc.
     233              :     pg_version: u32,
     234              : 
     235              :     // These are not part of the endpoint as such, but the environment
     236              :     // the endpoint runs in.
     237              :     pub env: LocalEnv,
     238              : 
     239              :     // Optimizations
     240              :     skip_pg_catalog_updates: bool,
     241              : 
     242              :     // Feature flags
     243              :     features: Vec<ComputeFeature>,
     244              : }
     245              : 
     246              : #[derive(PartialEq, Eq)]
     247              : pub enum EndpointStatus {
     248              :     Running,
     249              :     Stopped,
     250              :     Crashed,
     251              :     RunningNoPidfile,
     252              : }
     253              : 
     254              : impl std::fmt::Display for EndpointStatus {
     255            0 :     fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
     256            0 :         let s = match self {
     257            0 :             Self::Running => "running",
     258            0 :             Self::Stopped => "stopped",
     259            0 :             Self::Crashed => "crashed",
     260            0 :             Self::RunningNoPidfile => "running, no pidfile",
     261              :         };
     262            0 :         write!(writer, "{}", s)
     263            0 :     }
     264              : }
     265              : 
     266              : impl Endpoint {
     267            0 :     fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
     268            0 :         if !entry.file_type()?.is_dir() {
     269            0 :             anyhow::bail!(
     270            0 :                 "Endpoint::from_dir_entry failed: '{}' is not a directory",
     271            0 :                 entry.path().display()
     272            0 :             );
     273            0 :         }
     274            0 : 
     275            0 :         // parse data directory name
     276            0 :         let fname = entry.file_name();
     277            0 :         let endpoint_id = fname.to_str().unwrap().to_string();
     278              : 
     279              :         // Read the endpoint.json file
     280            0 :         let conf: EndpointConf =
     281            0 :             serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
     282              : 
     283            0 :         Ok(Endpoint {
     284            0 :             pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port),
     285            0 :             http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port),
     286            0 :             endpoint_id,
     287            0 :             env: env.clone(),
     288            0 :             timeline_id: conf.timeline_id,
     289            0 :             mode: conf.mode,
     290            0 :             tenant_id: conf.tenant_id,
     291            0 :             pg_version: conf.pg_version,
     292            0 :             skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
     293            0 :             features: conf.features,
     294            0 :         })
     295            0 :     }
     296              : 
     297            0 :     fn create_endpoint_dir(&self) -> Result<()> {
     298            0 :         std::fs::create_dir_all(self.endpoint_path()).with_context(|| {
     299            0 :             format!(
     300            0 :                 "could not create endpoint directory {}",
     301            0 :                 self.endpoint_path().display()
     302            0 :             )
     303            0 :         })
     304            0 :     }
     305              : 
     306              :     // Generate postgresql.conf with default configuration
     307            0 :     fn setup_pg_conf(&self) -> Result<PostgresConf> {
     308            0 :         let mut conf = PostgresConf::new();
     309            0 :         conf.append("max_wal_senders", "10");
     310            0 :         conf.append("wal_log_hints", "off");
     311            0 :         conf.append("max_replication_slots", "10");
     312            0 :         conf.append("hot_standby", "on");
     313            0 :         conf.append("shared_buffers", "1MB");
     314            0 :         conf.append("fsync", "off");
     315            0 :         conf.append("max_connections", "100");
     316            0 :         conf.append("wal_level", "logical");
     317            0 :         // wal_sender_timeout is the maximum time to wait for WAL replication.
     318            0 :         // It also defines how often the walreciever will send a feedback message to the wal sender.
     319            0 :         conf.append("wal_sender_timeout", "5s");
     320            0 :         conf.append("listen_addresses", &self.pg_address.ip().to_string());
     321            0 :         conf.append("port", &self.pg_address.port().to_string());
     322            0 :         conf.append("wal_keep_size", "0");
     323            0 :         // walproposer panics when basebackup is invalid, it is pointless to restart in this case.
     324            0 :         conf.append("restart_after_crash", "off");
     325            0 : 
     326            0 :         // Load the 'neon' extension
     327            0 :         conf.append("shared_preload_libraries", "neon");
     328            0 : 
     329            0 :         conf.append_line("");
     330            0 :         // Replication-related configurations, such as WAL sending
     331            0 :         match &self.mode {
     332              :             ComputeMode::Primary => {
     333              :                 // Configure backpressure
     334              :                 // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
     335              :                 //   This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
     336              :                 //   so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
     337              :                 //   Actually latency should be much smaller (better if < 1sec). But we assume that recently
     338              :                 //   updates pages are not requested from pageserver.
     339              :                 // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
     340              :                 //   delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
     341              :                 //   remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
     342              :                 //   recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
     343              :                 // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
     344              :                 //   To be able to restore database in case of pageserver node crash, safekeeper should not
     345              :                 //   remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
     346              :                 //   (if they are not able to upload WAL to S3).
     347            0 :                 conf.append("max_replication_write_lag", "15MB");
     348            0 :                 conf.append("max_replication_flush_lag", "10GB");
     349            0 : 
     350            0 :                 if !self.env.safekeepers.is_empty() {
     351            0 :                     // Configure Postgres to connect to the safekeepers
     352            0 :                     conf.append("synchronous_standby_names", "walproposer");
     353            0 : 
     354            0 :                     let safekeepers = self
     355            0 :                         .env
     356            0 :                         .safekeepers
     357            0 :                         .iter()
     358            0 :                         .map(|sk| format!("localhost:{}", sk.get_compute_port()))
     359            0 :                         .collect::<Vec<String>>()
     360            0 :                         .join(",");
     361            0 :                     conf.append("neon.safekeepers", &safekeepers);
     362            0 :                 } else {
     363            0 :                     // We only use setup without safekeepers for tests,
     364            0 :                     // and don't care about data durability on pageserver,
     365            0 :                     // so set more relaxed synchronous_commit.
     366            0 :                     conf.append("synchronous_commit", "remote_write");
     367            0 : 
     368            0 :                     // Configure the node to stream WAL directly to the pageserver
     369            0 :                     // This isn't really a supported configuration, but can be useful for
     370            0 :                     // testing.
     371            0 :                     conf.append("synchronous_standby_names", "pageserver");
     372            0 :                 }
     373              :             }
     374            0 :             ComputeMode::Static(lsn) => {
     375            0 :                 conf.append("recovery_target_lsn", &lsn.to_string());
     376            0 :             }
     377              :             ComputeMode::Replica => {
     378            0 :                 assert!(!self.env.safekeepers.is_empty());
     379              : 
     380              :                 // TODO: use future host field from safekeeper spec
     381              :                 // Pass the list of safekeepers to the replica so that it can connect to any of them,
     382              :                 // whichever is available.
     383            0 :                 let sk_ports = self
     384            0 :                     .env
     385            0 :                     .safekeepers
     386            0 :                     .iter()
     387            0 :                     .map(|x| x.get_compute_port().to_string())
     388            0 :                     .collect::<Vec<_>>()
     389            0 :                     .join(",");
     390            0 :                 let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
     391            0 : 
     392            0 :                 let connstr = format!(
     393            0 :                     "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
     394            0 :                     sk_hosts,
     395            0 :                     sk_ports,
     396            0 :                     &self.timeline_id.to_string(),
     397            0 :                     &self.tenant_id.to_string(),
     398            0 :                 );
     399            0 : 
     400            0 :                 let slot_name = format!("repl_{}_", self.timeline_id);
     401            0 :                 conf.append("primary_conninfo", connstr.as_str());
     402            0 :                 conf.append("primary_slot_name", slot_name.as_str());
     403            0 :                 conf.append("hot_standby", "on");
     404            0 :                 // prefetching of blocks referenced in WAL doesn't make sense for us
     405            0 :                 // Neon hot standby ignores pages that are not in the shared_buffers
     406            0 :                 if self.pg_version >= 15 {
     407            0 :                     conf.append("recovery_prefetch", "off");
     408            0 :                 }
     409              :             }
     410              :         }
     411              : 
     412            0 :         Ok(conf)
     413            0 :     }
     414              : 
     415            0 :     pub fn endpoint_path(&self) -> PathBuf {
     416            0 :         self.env.endpoints_path().join(&self.endpoint_id)
     417            0 :     }
     418              : 
     419            0 :     pub fn pgdata(&self) -> PathBuf {
     420            0 :         self.endpoint_path().join("pgdata")
     421            0 :     }
     422              : 
     423            0 :     pub fn status(&self) -> EndpointStatus {
     424            0 :         let timeout = Duration::from_millis(300);
     425            0 :         let has_pidfile = self.pgdata().join("postmaster.pid").exists();
     426            0 :         let can_connect = TcpStream::connect_timeout(&self.pg_address, timeout).is_ok();
     427            0 : 
     428            0 :         match (has_pidfile, can_connect) {
     429            0 :             (true, true) => EndpointStatus::Running,
     430            0 :             (false, false) => EndpointStatus::Stopped,
     431            0 :             (true, false) => EndpointStatus::Crashed,
     432            0 :             (false, true) => EndpointStatus::RunningNoPidfile,
     433              :         }
     434            0 :     }
     435              : 
     436            0 :     fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
     437            0 :         let pg_ctl_path = self.env.pg_bin_dir(self.pg_version)?.join("pg_ctl");
     438            0 :         let mut cmd = Command::new(&pg_ctl_path);
     439            0 :         cmd.args(
     440            0 :             [
     441            0 :                 &[
     442            0 :                     "-D",
     443            0 :                     self.pgdata().to_str().unwrap(),
     444            0 :                     "-w", //wait till pg_ctl actually does what was asked
     445            0 :                 ],
     446            0 :                 args,
     447            0 :             ]
     448            0 :             .concat(),
     449            0 :         )
     450            0 :         .env_clear()
     451            0 :         .env(
     452            0 :             "LD_LIBRARY_PATH",
     453            0 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     454            0 :         )
     455            0 :         .env(
     456            0 :             "DYLD_LIBRARY_PATH",
     457            0 :             self.env.pg_lib_dir(self.pg_version)?.to_str().unwrap(),
     458              :         );
     459              : 
     460              :         // Pass authentication token used for the connections to pageserver and safekeepers
     461            0 :         if let Some(token) = auth_token {
     462            0 :             cmd.env("NEON_AUTH_TOKEN", token);
     463            0 :         }
     464              : 
     465            0 :         let pg_ctl = cmd
     466            0 :             .output()
     467            0 :             .context(format!("{} failed", pg_ctl_path.display()))?;
     468            0 :         if !pg_ctl.status.success() {
     469            0 :             anyhow::bail!(
     470            0 :                 "pg_ctl failed, exit code: {}, stdout: {}, stderr: {}",
     471            0 :                 pg_ctl.status,
     472            0 :                 String::from_utf8_lossy(&pg_ctl.stdout),
     473            0 :                 String::from_utf8_lossy(&pg_ctl.stderr),
     474            0 :             );
     475            0 :         }
     476            0 : 
     477            0 :         Ok(())
     478            0 :     }
     479              : 
     480            0 :     fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
     481            0 :         // TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
     482            0 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     483            0 :         let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
     484            0 :         let pid = nix::unistd::Pid::from_raw(pid as i32);
     485            0 :         if send_sigterm {
     486            0 :             kill(pid, Signal::SIGTERM).ok();
     487            0 :         }
     488            0 :         crate::background_process::wait_until_stopped("compute_ctl", pid)?;
     489            0 :         Ok(())
     490            0 :     }
     491              : 
     492            0 :     fn read_postgresql_conf(&self) -> Result<String> {
     493            0 :         // Slurp the endpoints/<endpoint id>/postgresql.conf file into
     494            0 :         // memory. We will include it in the spec file that we pass to
     495            0 :         // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
     496            0 :         // in the data directory.
     497            0 :         let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
     498            0 :         match std::fs::read(&postgresql_conf_path) {
     499            0 :             Ok(content) => Ok(String::from_utf8(content)?),
     500            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
     501            0 :             Err(e) => Err(anyhow::Error::new(e).context(format!(
     502            0 :                 "failed to read config file in {}",
     503            0 :                 postgresql_conf_path.to_str().unwrap()
     504            0 :             ))),
     505              :         }
     506            0 :     }
     507              : 
     508            0 :     fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
     509            0 :         pageservers
     510            0 :             .iter()
     511            0 :             .map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
     512            0 :             .collect::<Vec<_>>()
     513            0 :             .join(",")
     514            0 :     }
     515              : 
     516              :     /// Map safekeepers ids to the actual connection strings.
     517            0 :     fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
     518            0 :         let mut safekeeper_connstrings = Vec::new();
     519            0 :         if self.mode == ComputeMode::Primary {
     520            0 :             for sk_id in sk_ids {
     521            0 :                 let sk = self
     522            0 :                     .env
     523            0 :                     .safekeepers
     524            0 :                     .iter()
     525            0 :                     .find(|node| node.id == sk_id)
     526            0 :                     .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
     527            0 :                 safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
     528              :             }
     529            0 :         }
     530            0 :         Ok(safekeeper_connstrings)
     531            0 :     }
     532              : 
     533            0 :     pub async fn start(
     534            0 :         &self,
     535            0 :         auth_token: &Option<String>,
     536            0 :         safekeepers: Vec<NodeId>,
     537            0 :         pageservers: Vec<(Host, u16)>,
     538            0 :         remote_ext_config: Option<&String>,
     539            0 :         shard_stripe_size: usize,
     540            0 :         create_test_user: bool,
     541            0 :     ) -> Result<()> {
     542            0 :         if self.status() == EndpointStatus::Running {
     543            0 :             anyhow::bail!("The endpoint is already running");
     544            0 :         }
     545              : 
     546            0 :         let postgresql_conf = self.read_postgresql_conf()?;
     547              : 
     548              :         // We always start the compute node from scratch, so if the Postgres
     549              :         // data dir exists from a previous launch, remove it first.
     550            0 :         if self.pgdata().exists() {
     551            0 :             std::fs::remove_dir_all(self.pgdata())?;
     552            0 :         }
     553              : 
     554            0 :         let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
     555            0 :         assert!(!pageserver_connstring.is_empty());
     556              : 
     557            0 :         let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
     558              : 
     559              :         // check for file remote_extensions_spec.json
     560              :         // if it is present, read it and pass to compute_ctl
     561            0 :         let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
     562            0 :         let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
     563              :         let remote_extensions: Option<RemoteExtSpec>;
     564              : 
     565            0 :         if let Ok(spec_file) = remote_extensions_spec {
     566            0 :             remote_extensions = serde_json::from_reader(spec_file).ok();
     567            0 :         } else {
     568            0 :             remote_extensions = None;
     569            0 :         };
     570              : 
     571              :         // Create spec file
     572            0 :         let spec = ComputeSpec {
     573            0 :             skip_pg_catalog_updates: self.skip_pg_catalog_updates,
     574            0 :             format_version: 1.0,
     575            0 :             operation_uuid: None,
     576            0 :             features: self.features.clone(),
     577            0 :             swap_size_bytes: None,
     578            0 :             disk_quota_bytes: None,
     579            0 :             cluster: Cluster {
     580            0 :                 cluster_id: None, // project ID: not used
     581            0 :                 name: None,       // project name: not used
     582            0 :                 state: None,
     583            0 :                 roles: if create_test_user {
     584            0 :                     vec![Role {
     585            0 :                         name: PgIdent::from_str("test").unwrap(),
     586            0 :                         encrypted_password: None,
     587            0 :                         options: None,
     588            0 :                     }]
     589              :                 } else {
     590            0 :                     Vec::new()
     591              :                 },
     592            0 :                 databases: if create_test_user {
     593            0 :                     vec![Database {
     594            0 :                         name: PgIdent::from_str("neondb").unwrap(),
     595            0 :                         owner: PgIdent::from_str("test").unwrap(),
     596            0 :                         options: None,
     597            0 :                         restrict_conn: false,
     598            0 :                         invalid: false,
     599            0 :                     }]
     600              :                 } else {
     601            0 :                     Vec::new()
     602              :                 },
     603            0 :                 settings: None,
     604            0 :                 postgresql_conf: Some(postgresql_conf),
     605            0 :             },
     606            0 :             delta_operations: None,
     607            0 :             tenant_id: Some(self.tenant_id),
     608            0 :             timeline_id: Some(self.timeline_id),
     609            0 :             mode: self.mode,
     610            0 :             pageserver_connstring: Some(pageserver_connstring),
     611            0 :             safekeeper_connstrings,
     612            0 :             storage_auth_token: auth_token.clone(),
     613            0 :             remote_extensions,
     614            0 :             pgbouncer_settings: None,
     615            0 :             shard_stripe_size: Some(shard_stripe_size),
     616            0 :             local_proxy_config: None,
     617            0 :         };
     618            0 :         let spec_path = self.endpoint_path().join("spec.json");
     619            0 :         std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
     620              : 
     621              :         // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it.
     622            0 :         let logfile = std::fs::OpenOptions::new()
     623            0 :             .create(true)
     624            0 :             .append(true)
     625            0 :             .open(self.endpoint_path().join("compute.log"))?;
     626              : 
     627              :         // Launch compute_ctl
     628            0 :         let conn_str = self.connstr("cloud_admin", "postgres");
     629            0 :         println!("Starting postgres node at '{}'", conn_str);
     630            0 :         if create_test_user {
     631            0 :             let conn_str = self.connstr("test", "neondb");
     632            0 :             println!("Also at '{}'", conn_str);
     633            0 :         }
     634            0 :         let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
     635            0 :         cmd.args(["--http-port", &self.http_address.port().to_string()])
     636            0 :             .args(["--pgdata", self.pgdata().to_str().unwrap()])
     637            0 :             .args(["--connstr", &conn_str])
     638            0 :             .args([
     639            0 :                 "--spec-path",
     640            0 :                 self.endpoint_path().join("spec.json").to_str().unwrap(),
     641            0 :             ])
     642            0 :             .args([
     643            0 :                 "--pgbin",
     644            0 :                 self.env
     645            0 :                     .pg_bin_dir(self.pg_version)?
     646            0 :                     .join("postgres")
     647            0 :                     .to_str()
     648            0 :                     .unwrap(),
     649            0 :             ])
     650            0 :             .stdin(std::process::Stdio::null())
     651            0 :             .stderr(logfile.try_clone()?)
     652            0 :             .stdout(logfile);
     653              : 
     654            0 :         if let Some(remote_ext_config) = remote_ext_config {
     655            0 :             cmd.args(["--remote-ext-config", remote_ext_config]);
     656            0 :         }
     657              : 
     658            0 :         let child = cmd.spawn()?;
     659              :         // set up a scopeguard to kill & wait for the child in case we panic or bail below
     660            0 :         let child = scopeguard::guard(child, |mut child| {
     661            0 :             println!("SIGKILL & wait the started process");
     662            0 :             (|| {
     663            0 :                 // TODO: use another signal that can be caught by the child so it can clean up any children it spawned
     664            0 :                 child.kill().context("SIGKILL child")?;
     665            0 :                 child.wait().context("wait() for child process")?;
     666            0 :                 anyhow::Ok(())
     667            0 :             })()
     668            0 :             .with_context(|| format!("scopeguard kill&wait child {child:?}"))
     669            0 :             .unwrap();
     670            0 :         });
     671            0 : 
     672            0 :         // Write down the pid so we can wait for it when we want to stop
     673            0 :         // TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
     674            0 :         let pid = child.id();
     675            0 :         let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
     676            0 :         std::fs::write(pidfile_path, pid.to_string())?;
     677              : 
     678              :         // Wait for it to start
     679            0 :         let mut attempt = 0;
     680              :         const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
     681              :         const MAX_ATTEMPTS: u32 = 10 * 90; // Wait up to 1.5 min
     682              :         loop {
     683            0 :             attempt += 1;
     684            0 :             match self.get_status().await {
     685            0 :                 Ok(state) => {
     686            0 :                     match state.status {
     687              :                         ComputeStatus::Init => {
     688            0 :                             if attempt == MAX_ATTEMPTS {
     689            0 :                                 bail!("compute startup timed out; still in Init state");
     690            0 :                             }
     691              :                             // keep retrying
     692              :                         }
     693              :                         ComputeStatus::Running => {
     694              :                             // All good!
     695            0 :                             break;
     696              :                         }
     697              :                         ComputeStatus::Failed => {
     698            0 :                             bail!(
     699            0 :                                 "compute startup failed: {}",
     700            0 :                                 state
     701            0 :                                     .error
     702            0 :                                     .as_deref()
     703            0 :                                     .unwrap_or("<no error from compute_ctl>")
     704            0 :                             );
     705              :                         }
     706              :                         ComputeStatus::Empty
     707              :                         | ComputeStatus::ConfigurationPending
     708              :                         | ComputeStatus::Configuration
     709              :                         | ComputeStatus::TerminationPending
     710              :                         | ComputeStatus::Terminated => {
     711            0 :                             bail!("unexpected compute status: {:?}", state.status)
     712              :                         }
     713              :                     }
     714              :                 }
     715            0 :                 Err(e) => {
     716            0 :                     if attempt == MAX_ATTEMPTS {
     717            0 :                         return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
     718            0 :                     }
     719              :                 }
     720              :             }
     721            0 :             tokio::time::sleep(ATTEMPT_INTERVAL).await;
     722              :         }
     723              : 
     724              :         // disarm the scopeguard, let the child outlive this function (and neon_local invoction)
     725            0 :         drop(scopeguard::ScopeGuard::into_inner(child));
     726            0 : 
     727            0 :         Ok(())
     728            0 :     }
     729              : 
     730              :     // Call the /status HTTP API
     731            0 :     pub async fn get_status(&self) -> Result<ComputeState> {
     732            0 :         let client = reqwest::Client::new();
     733              : 
     734            0 :         let response = client
     735            0 :             .request(
     736            0 :                 reqwest::Method::GET,
     737            0 :                 format!(
     738            0 :                     "http://{}:{}/status",
     739            0 :                     self.http_address.ip(),
     740            0 :                     self.http_address.port()
     741            0 :                 ),
     742            0 :             )
     743            0 :             .send()
     744            0 :             .await?;
     745              : 
     746              :         // Interpret the response
     747            0 :         let status = response.status();
     748            0 :         if !(status.is_client_error() || status.is_server_error()) {
     749            0 :             Ok(response.json().await?)
     750              :         } else {
     751              :             // reqwest does not export its error construction utility functions, so let's craft the message ourselves
     752            0 :             let url = response.url().to_owned();
     753            0 :             let msg = match response.text().await {
     754            0 :                 Ok(err_body) => format!("Error: {}", err_body),
     755            0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
     756              :             };
     757            0 :             Err(anyhow::anyhow!(msg))
     758              :         }
     759            0 :     }
     760              : 
     761            0 :     pub async fn reconfigure(
     762            0 :         &self,
     763            0 :         mut pageservers: Vec<(Host, u16)>,
     764            0 :         stripe_size: Option<ShardStripeSize>,
     765            0 :         safekeepers: Option<Vec<NodeId>>,
     766            0 :     ) -> Result<()> {
     767            0 :         let mut spec: ComputeSpec = {
     768            0 :             let spec_path = self.endpoint_path().join("spec.json");
     769            0 :             let file = std::fs::File::open(spec_path)?;
     770            0 :             serde_json::from_reader(file)?
     771              :         };
     772              : 
     773            0 :         let postgresql_conf = self.read_postgresql_conf()?;
     774            0 :         spec.cluster.postgresql_conf = Some(postgresql_conf);
     775            0 : 
     776            0 :         // If we weren't given explicit pageservers, query the storage controller
     777            0 :         if pageservers.is_empty() {
     778            0 :             let storage_controller = StorageController::from_env(&self.env);
     779            0 :             let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
     780            0 :             pageservers = locate_result
     781            0 :                 .shards
     782            0 :                 .into_iter()
     783            0 :                 .map(|shard| {
     784            0 :                     (
     785            0 :                         Host::parse(&shard.listen_pg_addr)
     786            0 :                             .expect("Storage controller reported bad hostname"),
     787            0 :                         shard.listen_pg_port,
     788            0 :                     )
     789            0 :                 })
     790            0 :                 .collect::<Vec<_>>();
     791            0 :         }
     792              : 
     793            0 :         let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
     794            0 :         assert!(!pageserver_connstr.is_empty());
     795            0 :         spec.pageserver_connstring = Some(pageserver_connstr);
     796            0 :         if stripe_size.is_some() {
     797            0 :             spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
     798            0 :         }
     799              : 
     800              :         // If safekeepers are not specified, don't change them.
     801            0 :         if let Some(safekeepers) = safekeepers {
     802            0 :             let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
     803            0 :             spec.safekeeper_connstrings = safekeeper_connstrings;
     804            0 :         }
     805              : 
     806            0 :         let client = reqwest::Client::builder()
     807            0 :             .timeout(Duration::from_secs(30))
     808            0 :             .build()
     809            0 :             .unwrap();
     810            0 :         let response = client
     811            0 :             .post(format!(
     812            0 :                 "http://{}:{}/configure",
     813            0 :                 self.http_address.ip(),
     814            0 :                 self.http_address.port()
     815            0 :             ))
     816            0 :             .body(format!(
     817            0 :                 "{{\"spec\":{}}}",
     818            0 :                 serde_json::to_string_pretty(&spec)?
     819              :             ))
     820            0 :             .send()
     821            0 :             .await?;
     822              : 
     823            0 :         let status = response.status();
     824            0 :         if !(status.is_client_error() || status.is_server_error()) {
     825            0 :             Ok(())
     826              :         } else {
     827            0 :             let url = response.url().to_owned();
     828            0 :             let msg = match response.text().await {
     829            0 :                 Ok(err_body) => format!("Error: {}", err_body),
     830            0 :                 Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
     831              :             };
     832            0 :             Err(anyhow::anyhow!(msg))
     833              :         }
     834            0 :     }
     835              : 
     836            0 :     pub fn stop(&self, mode: &str, destroy: bool) -> Result<()> {
     837            0 :         self.pg_ctl(&["-m", mode, "stop"], &None)?;
     838              : 
     839              :         // Also wait for the compute_ctl process to die. It might have some
     840              :         // cleanup work to do after postgres stops, like syncing safekeepers,
     841              :         // etc.
     842              :         //
     843              :         // If destroying or stop mode is immediate, send it SIGTERM before
     844              :         // waiting. Sometimes we do *not* want this cleanup: tests intentionally
     845              :         // do stop when majority of safekeepers is down, so sync-safekeepers
     846              :         // would hang otherwise. This could be a separate flag though.
     847            0 :         let send_sigterm = destroy || mode == "immediate";
     848            0 :         self.wait_for_compute_ctl_to_exit(send_sigterm)?;
     849            0 :         if destroy {
     850            0 :             println!(
     851            0 :                 "Destroying postgres data directory '{}'",
     852            0 :                 self.pgdata().to_str().unwrap()
     853            0 :             );
     854            0 :             std::fs::remove_dir_all(self.endpoint_path())?;
     855            0 :         }
     856            0 :         Ok(())
     857            0 :     }
     858              : 
     859            0 :     pub fn connstr(&self, user: &str, db_name: &str) -> String {
     860            0 :         format!(
     861            0 :             "postgresql://{}@{}:{}/{}",
     862            0 :             user,
     863            0 :             self.pg_address.ip(),
     864            0 :             self.pg_address.port(),
     865            0 :             db_name
     866            0 :         )
     867            0 :     }
     868              : }
        

Generated by: LCOV version 2.1-beta