Line data Source code
1 : //! Code to manage the storage broker
2 : //!
3 : //! In the local test environment, the data for each safekeeper is stored in
4 : //!
5 : //! ```text
6 : //! .neon/safekeepers/<safekeeper id>
7 : //! ```
8 : use anyhow::Context;
9 :
10 : use camino::Utf8PathBuf;
11 :
12 : use crate::{background_process, local_env};
13 :
14 0 : pub async fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
15 0 : let broker = &env.broker;
16 0 : let listen_addr = &broker.listen_addr;
17 0 :
18 0 : print!("Starting neon broker at {}", listen_addr);
19 0 :
20 0 : let args = [format!("--listen-addr={listen_addr}")];
21 0 :
22 0 : let client = reqwest::Client::new();
23 0 : background_process::start_process(
24 0 : "storage_broker",
25 0 : &env.base_data_dir,
26 0 : &env.storage_broker_bin(),
27 0 : args,
28 0 : [],
29 0 : background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)),
30 0 : || async {
31 0 : let url = broker.client_url();
32 0 : let status_url = url.join("status").with_context(|| {
33 0 : format!("Failed to append /status path to broker endpoint {url}")
34 0 : })?;
35 0 : let request = client
36 0 : .get(status_url)
37 0 : .build()
38 0 : .with_context(|| format!("Failed to construct request to broker endpoint {url}"))?;
39 0 : match client.execute(request).await {
40 0 : Ok(resp) => Ok(resp.status().is_success()),
41 0 : Err(_) => Ok(false),
42 : }
43 0 : },
44 0 : )
45 0 : .await
46 0 : .context("Failed to spawn storage_broker subprocess")?;
47 0 : Ok(())
48 0 : }
49 :
50 0 : pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
51 0 : background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env))
52 0 : }
53 :
54 0 : fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf {
55 0 : Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid"))
56 0 : .expect("non-Unicode path")
57 0 : }
|