Line data Source code
1 : //! Code to manage the storage broker
2 : //!
3 : //! In the local test environment, the data for each safekeeper is stored in
4 : //!
5 : //! ```text
6 : //! .neon/safekeepers/<safekeeper id>
7 : //! ```
8 : use anyhow::Context;
9 :
10 : use camino::Utf8PathBuf;
11 :
12 : use crate::{background_process, local_env};
13 :
14 3 : pub async fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
15 3 : let broker = &env.broker;
16 3 : let listen_addr = &broker.listen_addr;
17 3 :
18 3 : print!("Starting neon broker at {}", listen_addr);
19 3 :
20 3 : let args = [format!("--listen-addr={listen_addr}")];
21 3 :
22 3 : let client = reqwest::Client::new();
23 3 : background_process::start_process(
24 3 : "storage_broker",
25 3 : &env.base_data_dir,
26 3 : &env.storage_broker_bin(),
27 3 : args,
28 3 : [],
29 3 : background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)),
30 4 : || async {
31 4 : let url = broker.client_url();
32 4 : let status_url = url.join("status").with_context(|| {
33 0 : format!("Failed to append /status path to broker endpoint {url}")
34 4 : })?;
35 4 : let request = client
36 4 : .get(status_url)
37 4 : .build()
38 4 : .with_context(|| format!("Failed to construct request to broker endpoint {url}"))?;
39 10 : match client.execute(request).await {
40 3 : Ok(resp) => Ok(resp.status().is_success()),
41 1 : Err(_) => Ok(false),
42 : }
43 4 : },
44 3 : )
45 10 : .await
46 3 : .context("Failed to spawn storage_broker subprocess")?;
47 3 : Ok(())
48 3 : }
49 :
50 3 : pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
51 3 : background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env))
52 3 : }
53 :
54 6 : fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf {
55 6 : Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid"))
56 6 : .expect("non-Unicode path")
57 6 : }
|