Line data Source code
1 : use crate::background_process::{self, start_process, stop_process};
2 : use crate::local_env::LocalEnv;
3 : use anyhow::{Context, Result};
4 : use camino::Utf8PathBuf;
5 : use std::io::Write;
6 : use std::time::Duration;
7 :
8 : /// Directory within .neon which will be used by default for LocalFs remote storage.
9 : pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
10 : pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993;
11 :
12 : pub struct EndpointStorage {
13 : pub bin: Utf8PathBuf,
14 : pub data_dir: Utf8PathBuf,
15 : pub pemfile: Utf8PathBuf,
16 : pub port: u16,
17 : }
18 :
19 : impl EndpointStorage {
20 0 : pub fn from_env(env: &LocalEnv) -> EndpointStorage {
21 0 : EndpointStorage {
22 0 : bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
23 0 : data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
24 0 : pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
25 0 : port: env.endpoint_storage.port,
26 0 : }
27 0 : }
28 :
29 0 : fn config_path(&self) -> Utf8PathBuf {
30 0 : self.data_dir.join("endpoint_storage.json")
31 0 : }
32 :
33 0 : fn listen_addr(&self) -> Utf8PathBuf {
34 0 : format!("127.0.0.1:{}", self.port).into()
35 0 : }
36 :
37 0 : pub fn init(&self) -> Result<()> {
38 0 : println!("Initializing object storage in {:?}", self.data_dir);
39 0 : let parent = self.data_dir.parent().unwrap();
40 :
41 : #[derive(serde::Serialize)]
42 : struct Cfg {
43 : listen: Utf8PathBuf,
44 : pemfile: Utf8PathBuf,
45 : local_path: Utf8PathBuf,
46 : r#type: String,
47 : }
48 0 : let cfg = Cfg {
49 0 : listen: self.listen_addr(),
50 0 : pemfile: parent.join(self.pemfile.clone()),
51 0 : local_path: parent.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR),
52 0 : r#type: "LocalFs".to_string(),
53 0 : };
54 0 : std::fs::create_dir_all(self.config_path().parent().unwrap())?;
55 0 : std::fs::write(self.config_path(), serde_json::to_string(&cfg)?)
56 0 : .context("write object storage config")?;
57 0 : Ok(())
58 0 : }
59 :
60 0 : pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
61 0 : println!("Starting endpoint_storage at {}", self.listen_addr());
62 0 : std::io::stdout().flush().context("flush stdout")?;
63 :
64 0 : let process_status_check = || async {
65 0 : let res = reqwest::Client::new().get(format!("http://{}/metrics", self.listen_addr()));
66 0 : match res.send().await {
67 0 : Ok(res) => Ok(res.status().is_success()),
68 0 : Err(_) => Ok(false),
69 : }
70 0 : };
71 :
72 0 : let res = start_process(
73 0 : "endpoint_storage",
74 0 : &self.data_dir.clone().into_std_path_buf(),
75 0 : &self.bin.clone().into_std_path_buf(),
76 0 : vec![self.config_path().to_string()],
77 0 : vec![("RUST_LOG".into(), "debug".into())],
78 0 : background_process::InitialPidFile::Create(self.pid_file()),
79 0 : retry_timeout,
80 0 : process_status_check,
81 0 : )
82 0 : .await;
83 0 : if res.is_err() {
84 0 : eprintln!("Logs:\n{}", std::fs::read_to_string(self.log_file())?);
85 0 : }
86 :
87 0 : res
88 0 : }
89 :
90 0 : pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
91 0 : stop_process(immediate, "endpoint_storage", &self.pid_file())
92 0 : }
93 :
94 0 : fn log_file(&self) -> Utf8PathBuf {
95 0 : self.data_dir.join("endpoint_storage.log")
96 0 : }
97 :
98 0 : fn pid_file(&self) -> Utf8PathBuf {
99 0 : self.data_dir.join("endpoint_storage.pid")
100 0 : }
101 : }
|