LCOV - code coverage report
Current view: top level - control_plane/src - endpoint_storage.rs (source / functions) Coverage Total Hit
Test: a1cc1f33dc9899e4da66eb51e44e911a4b3bd648.info Lines: 0.0 % 67 0
Test Date: 2025-07-31 11:35:14 Functions: 0.0 % 11 0

            Line data    Source code
       1              : use crate::background_process::{self, start_process, stop_process};
       2              : use crate::local_env::LocalEnv;
       3              : use anyhow::{Context, Result};
       4              : use camino::Utf8PathBuf;
       5              : use postgres_backend::AuthType;
       6              : use std::io::Write;
       7              : use std::net::SocketAddr;
       8              : use std::time::Duration;
       9              : 
      10              : /// Directory within .neon which will be used by default for LocalFs remote storage.
      11              : pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
      12              : pub const ENDPOINT_STORAGE_DEFAULT_ADDR: SocketAddr =
      13              :     SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9993);
      14              : 
      15              : pub struct EndpointStorage {
      16              :     pub bin: Utf8PathBuf,
      17              :     pub data_dir: Utf8PathBuf,
      18              :     pub pemfile: Utf8PathBuf,
      19              :     pub addr: SocketAddr,
      20              :     pub auth_type: AuthType,
      21              : }
      22              : 
      23              : impl EndpointStorage {
      24            0 :     pub fn from_env(env: &LocalEnv) -> EndpointStorage {
      25            0 :         let auth_type = match env.token_auth_type {
      26            0 :             AuthType::HadronJWT => AuthType::HadronJWT,
      27            0 :             AuthType::NeonJWT | AuthType::Trust => AuthType::NeonJWT,
      28              :         };
      29              : 
      30            0 :         EndpointStorage {
      31            0 :             bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
      32            0 :             data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
      33            0 :             pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
      34            0 :             addr: env.endpoint_storage.listen_addr,
      35            0 :             auth_type,
      36            0 :         }
      37            0 :     }
      38              : 
      39            0 :     fn config_path(&self) -> Utf8PathBuf {
      40            0 :         self.data_dir.join("endpoint_storage.json")
      41            0 :     }
      42              : 
      43            0 :     fn listen_addr(&self) -> Utf8PathBuf {
      44            0 :         format!("{}:{}", self.addr.ip(), self.addr.port()).into()
      45            0 :     }
      46              : 
      47            0 :     pub fn init(&self) -> Result<()> {
      48            0 :         println!("Initializing object storage in {:?}", self.data_dir);
      49            0 :         let parent = self.data_dir.parent().unwrap();
      50              : 
      51              :         #[derive(serde::Serialize)]
      52              :         struct Cfg {
      53              :             listen: Utf8PathBuf,
      54              :             pemfile: Utf8PathBuf,
      55              :             local_path: Utf8PathBuf,
      56              :             r#type: String,
      57              :             auth_type: AuthType,
      58              :         }
      59            0 :         let cfg = Cfg {
      60            0 :             listen: self.listen_addr(),
      61            0 :             pemfile: parent.join(self.pemfile.clone()),
      62            0 :             local_path: parent.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR),
      63            0 :             r#type: "LocalFs".to_string(),
      64            0 :             auth_type: self.auth_type,
      65            0 :         };
      66            0 :         std::fs::create_dir_all(self.config_path().parent().unwrap())?;
      67            0 :         std::fs::write(self.config_path(), serde_json::to_string(&cfg)?)
      68            0 :             .context("write object storage config")?;
      69            0 :         Ok(())
      70            0 :     }
      71              : 
      72            0 :     pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
      73            0 :         println!("Starting endpoint_storage at {}", self.listen_addr());
      74            0 :         std::io::stdout().flush().context("flush stdout")?;
      75              : 
      76            0 :         let process_status_check = || async {
      77            0 :             let res = reqwest::Client::new().get(format!("http://{}/metrics", self.listen_addr()));
      78            0 :             match res.send().await {
      79            0 :                 Ok(res) => Ok(res.status().is_success()),
      80            0 :                 Err(_) => Ok(false),
      81              :             }
      82            0 :         };
      83              : 
      84            0 :         let res = start_process(
      85            0 :             "endpoint_storage",
      86            0 :             &self.data_dir.clone().into_std_path_buf(),
      87            0 :             &self.bin.clone().into_std_path_buf(),
      88            0 :             vec![self.config_path().to_string()],
      89            0 :             vec![("RUST_LOG".into(), "debug".into())],
      90            0 :             background_process::InitialPidFile::Create(self.pid_file()),
      91            0 :             retry_timeout,
      92            0 :             process_status_check,
      93            0 :         )
      94            0 :         .await;
      95            0 :         if res.is_err() {
      96            0 :             eprintln!("Logs:\n{}", std::fs::read_to_string(self.log_file())?);
      97            0 :         }
      98              : 
      99            0 :         res
     100            0 :     }
     101              : 
     102            0 :     pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
     103            0 :         stop_process(immediate, "endpoint_storage", &self.pid_file())
     104            0 :     }
     105              : 
     106            0 :     fn log_file(&self) -> Utf8PathBuf {
     107            0 :         self.data_dir.join("endpoint_storage.log")
     108            0 :     }
     109              : 
     110            0 :     fn pid_file(&self) -> Utf8PathBuf {
     111            0 :         self.data_dir.join("endpoint_storage.pid")
     112            0 :     }
     113              : }
        

Generated by: LCOV version 2.1-beta