LCOV - code coverage report
Current view: top level - control_plane/src - broker.rs (source / functions) Coverage Total Hit
Test: 2620485e474b48c32427149a5d91ef8fc2cd649e.info Lines: 0.0 % 57 0
Test Date: 2025-05-01 22:50:11 Functions: 0.0 % 10 0

            Line data    Source code
       1              : //! Code to manage the storage broker
       2              : //!
       3              : //! In the local test environment, the storage broker stores its data directly in
       4              : //!
       5              : //! ```text
       6              : //!   .neon/storage_broker
       7              : //! ```
       8              : use std::time::Duration;
       9              : 
      10              : use anyhow::Context;
      11              : use camino::Utf8PathBuf;
      12              : 
      13              : use crate::{background_process, local_env::LocalEnv};
      14              : 
      15              : pub struct StorageBroker {
      16              :     env: LocalEnv,
      17              : }
      18              : 
      19              : impl StorageBroker {
      20              :     /// Create a new `StorageBroker` instance from the environment.
      21            0 :     pub fn from_env(env: &LocalEnv) -> Self {
      22            0 :         Self { env: env.clone() }
      23            0 :     }
      24              : 
      25            0 :     pub fn initialize(&self) -> anyhow::Result<()> {
      26            0 :         if self.env.generate_local_ssl_certs {
      27            0 :             self.env.generate_ssl_cert(
      28            0 :                 &self.env.storage_broker_data_dir().join("server.crt"),
      29            0 :                 &self.env.storage_broker_data_dir().join("server.key"),
      30            0 :             )?;
      31            0 :         }
      32            0 :         Ok(())
      33            0 :     }
      34              : 
      35              :     /// Start the storage broker process.
      36            0 :     pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
      37            0 :         let broker = &self.env.broker;
      38            0 : 
      39            0 :         print!("Starting neon broker at {}", broker.client_url());
      40            0 : 
      41            0 :         let mut args = Vec::new();
      42              : 
      43            0 :         if let Some(addr) = &broker.listen_addr {
      44            0 :             args.push(format!("--listen-addr={addr}"));
      45            0 :         }
      46            0 :         if let Some(addr) = &broker.listen_https_addr {
      47            0 :             args.push(format!("--listen-https-addr={addr}"));
      48            0 :         }
      49              : 
      50            0 :         let client = self.env.create_http_client();
      51            0 :         background_process::start_process(
      52            0 :             "storage_broker",
      53            0 :             &self.env.storage_broker_data_dir(),
      54            0 :             &self.env.storage_broker_bin(),
      55            0 :             args,
      56            0 :             [],
      57            0 :             background_process::InitialPidFile::Create(self.pid_file_path()),
      58            0 :             retry_timeout,
      59            0 :             || async {
      60            0 :                 let url = broker.client_url();
      61            0 :                 let status_url = url.join("status").with_context(|| {
      62            0 :                     format!("Failed to append /status path to broker endpoint {url}")
      63            0 :                 })?;
      64            0 :                 let request = client.get(status_url).build().with_context(|| {
      65            0 :                     format!("Failed to construct request to broker endpoint {url}")
      66            0 :                 })?;
      67            0 :                 match client.execute(request).await {
      68            0 :                     Ok(resp) => Ok(resp.status().is_success()),
      69            0 :                     Err(_) => Ok(false),
      70              :                 }
      71            0 :             },
      72            0 :         )
      73            0 :         .await
      74            0 :         .context("Failed to spawn storage_broker subprocess")?;
      75            0 :         Ok(())
      76            0 :     }
      77              : 
      78              :     /// Stop the storage broker process.
      79            0 :     pub fn stop(&self) -> anyhow::Result<()> {
      80            0 :         background_process::stop_process(true, "storage_broker", &self.pid_file_path())
      81            0 :     }
      82              : 
      83              :     /// Get the path to the PID file for the storage broker.
      84            0 :     fn pid_file_path(&self) -> Utf8PathBuf {
      85            0 :         Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_broker.pid"))
      86            0 :             .expect("non-Unicode path")
      87            0 :     }
      88              : }
        

Generated by: LCOV version 2.1-beta