LCOV - code coverage report
Current view: top level - control_plane/src - object_storage.rs (source / functions) Coverage Total Hit
Test: 37bd82a80da9937a25818120dcf8e865ea9f7fd2.info Lines: 0.0 % 67 0
Test Date: 2025-04-11 14:30:22 Functions: 0.0 % 11 0

            Line data    Source code
       1              : use crate::background_process::{self, start_process, stop_process};
       2              : use crate::local_env::LocalEnv;
       3              : use anyhow::anyhow;
       4              : use anyhow::{Context, Result};
       5              : use camino::Utf8PathBuf;
       6              : use std::io::Write;
       7              : use std::time::Duration;
       8              : 
       9              : /// Directory within .neon which will be used by default for LocalFs remote storage.
      10              : pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage";
      11              : pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993;
      12              : 
      13              : pub struct ObjectStorage {
      14              :     pub bin: Utf8PathBuf,
      15              :     pub data_dir: Utf8PathBuf,
      16              :     pub pemfile: Utf8PathBuf,
      17              :     pub port: u16,
      18              : }
      19              : 
      20              : impl ObjectStorage {
      21            0 :     pub fn from_env(env: &LocalEnv) -> ObjectStorage {
      22            0 :         ObjectStorage {
      23            0 :             bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(),
      24            0 :             data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(),
      25            0 :             pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
      26            0 :             port: env.object_storage.port,
      27            0 :         }
      28            0 :     }
      29              : 
      30            0 :     fn config_path(&self) -> Utf8PathBuf {
      31            0 :         self.data_dir.join("object_storage.json")
      32            0 :     }
      33              : 
      34            0 :     fn listen_addr(&self) -> Utf8PathBuf {
      35            0 :         format!("127.0.0.1:{}", self.port).into()
      36            0 :     }
      37              : 
      38            0 :     pub fn init(&self) -> Result<()> {
      39            0 :         println!("Initializing object storage in {:?}", self.data_dir);
      40            0 :         let parent = self.data_dir.parent().unwrap();
      41              : 
      42              :         #[derive(serde::Serialize)]
      43              :         struct Cfg {
      44              :             listen: Utf8PathBuf,
      45              :             pemfile: Utf8PathBuf,
      46              :             local_path: Utf8PathBuf,
      47              :             r#type: String,
      48              :         }
      49            0 :         let cfg = Cfg {
      50            0 :             listen: self.listen_addr(),
      51            0 :             pemfile: parent.join(self.pemfile.clone()),
      52            0 :             local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR),
      53            0 :             r#type: "LocalFs".to_string(),
      54            0 :         };
      55            0 :         std::fs::create_dir_all(self.config_path().parent().unwrap())?;
      56            0 :         std::fs::write(self.config_path(), serde_json::to_string(&cfg)?)
      57            0 :             .context("write object storage config")?;
      58            0 :         Ok(())
      59            0 :     }
      60              : 
      61            0 :     pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
      62            0 :         println!("Starting s3 proxy at {}", self.listen_addr());
      63            0 :         std::io::stdout().flush().context("flush stdout")?;
      64              : 
      65            0 :         let process_status_check = || async {
      66            0 :             tokio::time::sleep(Duration::from_millis(500)).await;
      67            0 :             let res = reqwest::Client::new()
      68            0 :                 .get(format!("http://{}/metrics", self.listen_addr()))
      69            0 :                 .send()
      70            0 :                 .await;
      71            0 :             match res {
      72            0 :                 Ok(response) if response.status().is_success() => Ok(true),
      73            0 :                 Ok(_) => Err(anyhow!("Failed to query /metrics")),
      74            0 :                 Err(e) => Err(anyhow!("Failed to check node status: {e}")),
      75              :             }
      76            0 :         };
      77              : 
      78            0 :         let res = start_process(
      79            0 :             "object_storage",
      80            0 :             &self.data_dir.clone().into_std_path_buf(),
      81            0 :             &self.bin.clone().into_std_path_buf(),
      82            0 :             vec![self.config_path().to_string()],
      83            0 :             vec![("RUST_LOG".into(), "debug".into())],
      84            0 :             background_process::InitialPidFile::Create(self.pid_file()),
      85            0 :             retry_timeout,
      86            0 :             process_status_check,
      87            0 :         )
      88            0 :         .await;
      89            0 :         if res.is_err() {
      90            0 :             eprintln!("Logs:\n{}", std::fs::read_to_string(self.log_file())?);
      91            0 :         }
      92              : 
      93            0 :         res
      94            0 :     }
      95              : 
      96            0 :     pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
      97            0 :         stop_process(immediate, "object_storage", &self.pid_file())
      98            0 :     }
      99              : 
     100            0 :     fn log_file(&self) -> Utf8PathBuf {
     101            0 :         self.data_dir.join("object_storage.log")
     102            0 :     }
     103              : 
     104            0 :     fn pid_file(&self) -> Utf8PathBuf {
     105            0 :         self.data_dir.join("object_storage.pid")
     106            0 :     }
     107              : }
        

Generated by: LCOV version 2.1-beta