Line data Source code
1 : //! Code to manage the storage broker
2 : //!
3 : //! In the local test environment, the data for each safekeeper is stored in
4 : //!
5 : //! ```text
6 : //! .neon/safekeepers/<safekeeper id>
7 : //! ```
8 : use anyhow::Context;
9 :
10 : use std::path::PathBuf;
11 :
12 : use crate::{background_process, local_env};
13 :
14 4 : pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
15 4 : let broker = &env.broker;
16 4 : let listen_addr = &broker.listen_addr;
17 4 :
18 4 : print!("Starting neon broker at {}", listen_addr);
19 4 :
20 4 : let args = [format!("--listen-addr={listen_addr}")];
21 4 :
22 4 : let client = reqwest::blocking::Client::new();
23 4 : background_process::start_process(
24 4 : "storage_broker",
25 4 : &env.base_data_dir,
26 4 : &env.storage_broker_bin(),
27 4 : args,
28 4 : [],
29 4 : background_process::InitialPidFile::Create(&storage_broker_pid_file_path(env)),
30 7 : || {
31 7 : let url = broker.client_url();
32 7 : let status_url = url.join("status").with_context(|| {
33 0 : format!("Failed to append /status path to broker endpoint {url}",)
34 7 : })?;
35 7 : let request = client
36 7 : .get(status_url)
37 7 : .build()
38 7 : .with_context(|| format!("Failed to construct request to broker endpoint {url}"))?;
39 7 : match client.execute(request) {
40 4 : Ok(resp) => Ok(resp.status().is_success()),
41 3 : Err(_) => Ok(false),
42 : }
43 7 : },
44 4 : )
45 4 : .context("Failed to spawn storage_broker subprocess")?;
46 4 : Ok(())
47 4 : }
48 :
49 4 : pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
50 4 : background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env))
51 4 : }
52 :
53 8 : fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> PathBuf {
54 8 : env.base_data_dir.join("storage_broker.pid")
55 8 : }
|