Line data Source code
1 : //! Code to manage the storage broker
2 : //!
3 : //! In the local test environment, the storage broker stores its data directly in
4 : //!
5 : //! ```text
6 : //! .neon/storage_broker
7 : //! ```
8 : use std::time::Duration;
9 :
10 : use anyhow::Context;
11 : use camino::Utf8PathBuf;
12 :
13 : use crate::{background_process, local_env::LocalEnv};
14 :
15 : pub struct StorageBroker {
16 : env: LocalEnv,
17 : }
18 :
19 : impl StorageBroker {
20 : /// Create a new `StorageBroker` instance from the environment.
21 0 : pub fn from_env(env: &LocalEnv) -> Self {
22 0 : Self { env: env.clone() }
23 0 : }
24 :
25 0 : pub fn initialize(&self) -> anyhow::Result<()> {
26 0 : if self.env.generate_local_ssl_certs {
27 0 : self.env.generate_ssl_cert(
28 0 : &self.env.storage_broker_data_dir().join("server.crt"),
29 0 : &self.env.storage_broker_data_dir().join("server.key"),
30 0 : )?;
31 0 : }
32 0 : Ok(())
33 0 : }
34 :
35 : /// Start the storage broker process.
36 0 : pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
37 0 : let broker = &self.env.broker;
38 0 :
39 0 : print!("Starting neon broker at {}", broker.client_url());
40 0 :
41 0 : let mut args = Vec::new();
42 :
43 0 : if let Some(addr) = &broker.listen_addr {
44 0 : args.push(format!("--listen-addr={addr}"));
45 0 : }
46 0 : if let Some(addr) = &broker.listen_https_addr {
47 0 : args.push(format!("--listen-https-addr={addr}"));
48 0 : }
49 :
50 0 : let client = self.env.create_http_client();
51 0 : background_process::start_process(
52 0 : "storage_broker",
53 0 : &self.env.storage_broker_data_dir(),
54 0 : &self.env.storage_broker_bin(),
55 0 : args,
56 0 : [],
57 0 : background_process::InitialPidFile::Create(self.pid_file_path()),
58 0 : retry_timeout,
59 0 : || async {
60 0 : let url = broker.client_url();
61 0 : let status_url = url.join("status").with_context(|| {
62 0 : format!("Failed to append /status path to broker endpoint {url}")
63 0 : })?;
64 0 : let request = client.get(status_url).build().with_context(|| {
65 0 : format!("Failed to construct request to broker endpoint {url}")
66 0 : })?;
67 0 : match client.execute(request).await {
68 0 : Ok(resp) => Ok(resp.status().is_success()),
69 0 : Err(_) => Ok(false),
70 : }
71 0 : },
72 0 : )
73 0 : .await
74 0 : .context("Failed to spawn storage_broker subprocess")?;
75 0 : Ok(())
76 0 : }
77 :
78 : /// Stop the storage broker process.
79 0 : pub fn stop(&self) -> anyhow::Result<()> {
80 0 : background_process::stop_process(true, "storage_broker", &self.pid_file_path())
81 0 : }
82 :
83 : /// Get the path to the PID file for the storage broker.
84 0 : fn pid_file_path(&self) -> Utf8PathBuf {
85 0 : Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_broker.pid"))
86 0 : .expect("non-Unicode path")
87 0 : }
88 : }
|