Line data Source code
1 : //! Code to manage the storage broker
2 : //!
3 : //! In the local test environment, the storage broker stores its data directly in
4 : //!
5 : //! ```text
6 : //! .neon
7 : //! ```
8 : use std::time::Duration;
9 :
10 : use anyhow::Context;
11 :
12 : use camino::Utf8PathBuf;
13 :
14 : use crate::{background_process, local_env};
15 :
16 0 : pub async fn start_broker_process(
17 0 : env: &local_env::LocalEnv,
18 0 : retry_timeout: &Duration,
19 0 : ) -> anyhow::Result<()> {
20 0 : let broker = &env.broker;
21 0 : let listen_addr = &broker.listen_addr;
22 0 :
23 0 : print!("Starting neon broker at {}", listen_addr);
24 0 :
25 0 : let args = [format!("--listen-addr={listen_addr}")];
26 0 :
27 0 : let client = reqwest::Client::new();
28 0 : background_process::start_process(
29 0 : "storage_broker",
30 0 : &env.base_data_dir,
31 0 : &env.storage_broker_bin(),
32 0 : args,
33 0 : [],
34 0 : background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)),
35 0 : retry_timeout,
36 0 : || async {
37 0 : let url = broker.client_url();
38 0 : let status_url = url.join("status").with_context(|| {
39 0 : format!("Failed to append /status path to broker endpoint {url}")
40 0 : })?;
41 0 : let request = client
42 0 : .get(status_url)
43 0 : .build()
44 0 : .with_context(|| format!("Failed to construct request to broker endpoint {url}"))?;
45 0 : match client.execute(request).await {
46 0 : Ok(resp) => Ok(resp.status().is_success()),
47 0 : Err(_) => Ok(false),
48 : }
49 0 : },
50 0 : )
51 0 : .await
52 0 : .context("Failed to spawn storage_broker subprocess")?;
53 0 : Ok(())
54 0 : }
55 :
56 0 : pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
57 0 : background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env))
58 0 : }
59 :
60 0 : fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf {
61 0 : Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid"))
62 0 : .expect("non-Unicode path")
63 0 : }
|