TLA Line data Source code
1 : //! Code to manage the storage broker
2 : //!
3 : //! In the local test environment, the data for each safekeeper is stored in
4 : //!
5 : //! ```text
6 : //! .neon/safekeepers/<safekeeper id>
7 : //! ```
8 : use anyhow::Context;
9 :
10 : use camino::Utf8PathBuf;
11 :
12 : use crate::{background_process, local_env};
13 :
14 CBC 5 : pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
15 5 : let broker = &env.broker;
16 5 : let listen_addr = &broker.listen_addr;
17 5 :
18 5 : print!("Starting neon broker at {}", listen_addr);
19 5 :
20 5 : let args = [format!("--listen-addr={listen_addr}")];
21 5 :
22 5 : let client = reqwest::blocking::Client::new();
23 5 : background_process::start_process(
24 5 : "storage_broker",
25 5 : &env.base_data_dir,
26 5 : &env.storage_broker_bin(),
27 5 : args,
28 5 : [],
29 5 : background_process::InitialPidFile::Create(&storage_broker_pid_file_path(env)),
30 8 : || {
31 8 : let url = broker.client_url();
32 8 : let status_url = url.join("status").with_context(|| {
33 UBC 0 : format!("Failed to append /status path to broker endpoint {url}")
34 CBC 8 : })?;
35 8 : let request = client
36 8 : .get(status_url)
37 8 : .build()
38 8 : .with_context(|| format!("Failed to construct request to broker endpoint {url}"))?;
39 8 : match client.execute(request) {
40 5 : Ok(resp) => Ok(resp.status().is_success()),
41 3 : Err(_) => Ok(false),
42 : }
43 8 : },
44 5 : )
45 5 : .context("Failed to spawn storage_broker subprocess")?;
46 5 : Ok(())
47 5 : }
48 :
49 5 : pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
50 5 : background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env))
51 5 : }
52 :
53 10 : fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf {
54 10 : Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid"))
55 10 : .expect("non-Unicode path")
56 10 : }
|