Line data Source code
1 : //! `endpoint_storage` is a service which provides API for uploading and downloading
2 : //! files. It is used by compute and control plane for accessing LFC prewarm data.
3 : //! This service is deployed either as a separate component or as part of compute image
4 : //! for large computes.
5 : mod app;
6 : use anyhow::Context;
7 : use clap::Parser;
8 : use std::net::{IpAddr, Ipv4Addr, SocketAddr};
9 : use tracing::info;
10 : use utils::logging;
11 :
12 : //see set()
13 0 : const fn max_upload_file_limit() -> usize {
14 0 : 100 * 1024 * 1024
15 0 : }
16 :
17 0 : const fn listen() -> SocketAddr {
18 0 : SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 51243)
19 0 : }
20 :
21 : #[derive(Parser)]
22 : struct Args {
23 : #[arg(exclusive = true)]
24 : config_file: Option<String>,
25 : #[arg(long, default_value = "false", requires = "config")]
26 : /// to allow testing k8s helm chart where we don't have s3 credentials
27 : no_s3_check_on_startup: bool,
28 : #[arg(long, value_name = "FILE")]
29 : /// inline config mode for k8s helm chart
30 : config: Option<String>,
31 : }
32 :
33 0 : #[derive(serde::Deserialize)]
34 : struct Config {
35 : #[serde(default = "listen")]
36 : listen: std::net::SocketAddr,
37 : pemfile: camino::Utf8PathBuf,
38 : #[serde(flatten)]
39 : storage_kind: remote_storage::TypedRemoteStorageKind,
40 : #[serde(default = "max_upload_file_limit")]
41 : max_upload_file_limit: usize,
42 : }
43 :
44 : #[tokio::main]
45 0 : async fn main() -> anyhow::Result<()> {
46 0 : logging::init(
47 0 : logging::LogFormat::Plain,
48 0 : logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
49 0 : logging::Output::Stdout,
50 0 : )?;
51 :
52 0 : let args = Args::parse();
53 0 : let config: Config = if let Some(config_path) = args.config_file {
54 0 : info!("Reading config from {config_path}");
55 0 : let config = std::fs::read_to_string(config_path)?;
56 0 : serde_json::from_str(&config).context("parsing config")?
57 0 : } else if let Some(config) = args.config {
58 0 : info!("Reading inline config");
59 0 : serde_json::from_str(&config).context("parsing config")?
60 : } else {
61 0 : anyhow::bail!("Supply either config file path or --config=inline-config");
62 : };
63 :
64 0 : info!("Reading pemfile from {}", config.pemfile.clone());
65 0 : let pemfile = std::fs::read(config.pemfile.clone())?;
66 0 : info!("Loading public key from {}", config.pemfile.clone());
67 0 : let auth = endpoint_storage::JwtAuth::new(&pemfile)?;
68 :
69 0 : let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
70 0 : info!("listening on {}", listener.local_addr().unwrap());
71 :
72 0 : let storage =
73 0 : remote_storage::GenericRemoteStorage::from_storage_kind(config.storage_kind).await?;
74 0 : let cancel = tokio_util::sync::CancellationToken::new();
75 0 : if !args.no_s3_check_on_startup {
76 0 : app::check_storage_permissions(&storage, cancel.clone()).await?;
77 0 : }
78 :
79 0 : let proxy = std::sync::Arc::new(endpoint_storage::Storage {
80 0 : auth,
81 0 : storage,
82 0 : cancel: cancel.clone(),
83 0 : max_upload_file_limit: config.max_upload_file_limit,
84 0 : });
85 :
86 0 : tokio::spawn(utils::signals::signal_handler(cancel.clone()));
87 0 : axum::serve(listener, app::app(proxy))
88 0 : .with_graceful_shutdown(async move { cancel.cancelled().await })
89 0 : .await?;
90 0 : Ok(())
91 0 : }
|