Line data Source code
1 : use crate::background_process::{self, start_process, stop_process};
2 : use crate::local_env::LocalEnv;
3 : use anyhow::anyhow;
4 : use anyhow::{Context, Result};
5 : use camino::Utf8PathBuf;
6 : use std::io::Write;
7 : use std::time::Duration;
8 :
9 : /// Directory within .neon which will be used by default for LocalFs remote storage.
10 : pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage";
11 : pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993;
12 :
13 : pub struct ObjectStorage {
14 : pub bin: Utf8PathBuf,
15 : pub data_dir: Utf8PathBuf,
16 : pub pemfile: Utf8PathBuf,
17 : pub port: u16,
18 : }
19 :
20 : impl ObjectStorage {
21 0 : pub fn from_env(env: &LocalEnv) -> ObjectStorage {
22 0 : ObjectStorage {
23 0 : bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(),
24 0 : data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(),
25 0 : pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
26 0 : port: env.object_storage.port,
27 0 : }
28 0 : }
29 :
30 0 : fn config_path(&self) -> Utf8PathBuf {
31 0 : self.data_dir.join("object_storage.json")
32 0 : }
33 :
34 0 : fn listen_addr(&self) -> Utf8PathBuf {
35 0 : format!("127.0.0.1:{}", self.port).into()
36 0 : }
37 :
38 0 : pub fn init(&self) -> Result<()> {
39 0 : println!("Initializing object storage in {:?}", self.data_dir);
40 0 : let parent = self.data_dir.parent().unwrap();
41 :
42 : #[derive(serde::Serialize)]
43 : struct Cfg {
44 : listen: Utf8PathBuf,
45 : pemfile: Utf8PathBuf,
46 : local_path: Utf8PathBuf,
47 : r#type: String,
48 : }
49 0 : let cfg = Cfg {
50 0 : listen: self.listen_addr(),
51 0 : pemfile: parent.join(self.pemfile.clone()),
52 0 : local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR),
53 0 : r#type: "LocalFs".to_string(),
54 0 : };
55 0 : std::fs::create_dir_all(self.config_path().parent().unwrap())?;
56 0 : std::fs::write(self.config_path(), serde_json::to_string(&cfg)?)
57 0 : .context("write object storage config")?;
58 0 : Ok(())
59 0 : }
60 :
61 0 : pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
62 0 : println!("Starting s3 proxy at {}", self.listen_addr());
63 0 : std::io::stdout().flush().context("flush stdout")?;
64 :
65 0 : let process_status_check = || async {
66 0 : tokio::time::sleep(Duration::from_millis(500)).await;
67 0 : let res = reqwest::Client::new()
68 0 : .get(format!("http://{}/metrics", self.listen_addr()))
69 0 : .send()
70 0 : .await;
71 0 : match res {
72 0 : Ok(response) if response.status().is_success() => Ok(true),
73 0 : Ok(_) => Err(anyhow!("Failed to query /metrics")),
74 0 : Err(e) => Err(anyhow!("Failed to check node status: {e}")),
75 : }
76 0 : };
77 :
78 0 : let res = start_process(
79 0 : "object_storage",
80 0 : &self.data_dir.clone().into_std_path_buf(),
81 0 : &self.bin.clone().into_std_path_buf(),
82 0 : vec![self.config_path().to_string()],
83 0 : vec![("RUST_LOG".into(), "debug".into())],
84 0 : background_process::InitialPidFile::Create(self.pid_file()),
85 0 : retry_timeout,
86 0 : process_status_check,
87 0 : )
88 0 : .await;
89 0 : if res.is_err() {
90 0 : eprintln!("Logs:\n{}", std::fs::read_to_string(self.log_file())?);
91 0 : }
92 :
93 0 : res
94 0 : }
95 :
96 0 : pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
97 0 : stop_process(immediate, "object_storage", &self.pid_file())
98 0 : }
99 :
100 0 : fn log_file(&self) -> Utf8PathBuf {
101 0 : self.data_dir.join("object_storage.log")
102 0 : }
103 :
104 0 : fn pid_file(&self) -> Utf8PathBuf {
105 0 : self.data_dir.join("object_storage.pid")
106 0 : }
107 : }
|