Line data Source code
1 : //! `endpoint_storage` is a service which provides API for uploading and downloading
2 : //! files. It is used by compute and control plane for accessing LFC prewarm data.
3 : //! This service is deployed either as a separate component or as part of compute image
4 : //! for large computes.
5 : mod app;
6 : use anyhow::Context;
7 : use clap::Parser;
8 : use postgres_backend::AuthType;
9 : use std::net::{IpAddr, Ipv4Addr, SocketAddr};
10 : use tracing::info;
11 : use utils::auth::JwtAuth;
12 : use utils::logging;
13 :
14 : //see set()
15 0 : const fn max_upload_file_limit() -> usize {
16 0 : 100 * 1024 * 1024
17 0 : }
18 :
19 0 : const fn listen() -> SocketAddr {
20 0 : SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 51243)
21 0 : }
22 :
23 0 : const fn default_auth_type() -> AuthType {
24 0 : AuthType::NeonJWT
25 0 : }
26 :
27 : #[derive(Parser)]
28 : struct Args {
29 : #[arg(exclusive = true)]
30 : config_file: Option<String>,
31 : #[arg(long, default_value = "false", requires = "config")]
32 : /// to allow testing k8s helm chart where we don't have s3 credentials
33 : no_s3_check_on_startup: bool,
34 : #[arg(long, value_name = "FILE")]
35 : /// inline config mode for k8s helm chart
36 : config: Option<String>,
37 : }
38 :
39 0 : #[derive(serde::Deserialize)]
40 : struct Config {
41 : #[serde(default = "listen")]
42 : listen: std::net::SocketAddr,
43 : pemfile: camino::Utf8PathBuf,
44 : #[serde(flatten)]
45 : storage_kind: remote_storage::TypedRemoteStorageKind,
46 : #[serde(default = "max_upload_file_limit")]
47 : max_upload_file_limit: usize,
48 : #[serde(default = "default_auth_type")]
49 : auth_type: AuthType,
50 : }
51 :
52 : #[tokio::main]
53 0 : async fn main() -> anyhow::Result<()> {
54 0 : logging::init(
55 0 : logging::LogFormat::Plain,
56 0 : logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
57 0 : logging::Output::Stdout,
58 0 : )?;
59 :
60 0 : let args = Args::parse();
61 0 : let config: Config = if let Some(config_path) = args.config_file {
62 0 : info!("Reading config from {config_path}");
63 0 : let config = std::fs::read_to_string(config_path)?;
64 0 : serde_json::from_str(&config).context("parsing config")?
65 0 : } else if let Some(config) = args.config {
66 0 : info!("Reading inline config");
67 0 : serde_json::from_str(&config).context("parsing config")?
68 : } else {
69 0 : anyhow::bail!("Supply either config file path or --config=inline-config");
70 : };
71 :
72 0 : if config.auth_type == AuthType::Trust {
73 0 : anyhow::bail!("Trust based auth is not supported");
74 0 : }
75 :
76 0 : let auth = match config.auth_type {
77 0 : AuthType::NeonJWT => JwtAuth::from_key_path(&config.pemfile)?,
78 0 : AuthType::HadronJWT => JwtAuth::from_cert_path(&config.pemfile)?,
79 0 : AuthType::Trust => unreachable!(),
80 : };
81 :
82 0 : let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
83 0 : info!("listening on {}", listener.local_addr().unwrap());
84 :
85 0 : let storage =
86 0 : remote_storage::GenericRemoteStorage::from_storage_kind(config.storage_kind).await?;
87 0 : let cancel = tokio_util::sync::CancellationToken::new();
88 0 : if !args.no_s3_check_on_startup {
89 0 : app::check_storage_permissions(&storage, cancel.clone()).await?;
90 0 : }
91 :
92 0 : let proxy = std::sync::Arc::new(endpoint_storage::Storage {
93 0 : auth,
94 0 : storage,
95 0 : cancel: cancel.clone(),
96 0 : max_upload_file_limit: config.max_upload_file_limit,
97 0 : });
98 :
99 0 : tokio::spawn(utils::signals::signal_handler(cancel.clone()));
100 0 : axum::serve(listener, app::app(proxy))
101 0 : .with_graceful_shutdown(async move { cancel.cancelled().await })
102 0 : .await?;
103 0 : Ok(())
104 0 : }
|