LCOV - code coverage report
Current view: top level - proxy/src - config.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 52.8 % 282 149
Test Date: 2025-07-26 17:20:05 Functions: 32.1 % 28 9

            Line data    Source code
       1              : use std::str::FromStr;
       2              : use std::sync::Arc;
       3              : use std::time::Duration;
       4              : 
       5              : use anyhow::{Context, Ok, bail, ensure};
       6              : use arc_swap::ArcSwapOption;
       7              : use camino::{Utf8Path, Utf8PathBuf};
       8              : use clap::ValueEnum;
       9              : use compute_api::spec::LocalProxySpec;
      10              : use remote_storage::RemoteStorageConfig;
      11              : use thiserror::Error;
      12              : use tokio::sync::Notify;
      13              : use tracing::{debug, error, info, warn};
      14              : 
      15              : use crate::auth::backend::jwt::JwkCache;
      16              : use crate::auth::backend::local::JWKS_ROLE_MAP;
      17              : use crate::control_plane::locks::ApiLocks;
      18              : use crate::control_plane::messages::{EndpointJwksResponse, JwksSettings};
      19              : use crate::ext::TaskExt;
      20              : use crate::intern::RoleNameInt;
      21              : use crate::rate_limiter::{RateLimitAlgorithm, RateLimiterConfig};
      22              : use crate::scram::threadpool::ThreadPool;
      23              : use crate::serverless::GlobalConnPoolOptions;
      24              : use crate::serverless::cancel_set::CancelSet;
      25              : #[cfg(feature = "rest_broker")]
      26              : use crate::serverless::rest::DbSchemaCache;
      27              : pub use crate::tls::server_config::{TlsConfig, configure_tls};
      28              : use crate::types::{Host, RoleName};
      29              : 
      30              : pub struct ProxyConfig {
      31              :     pub tls_config: ArcSwapOption<TlsConfig>,
      32              :     pub metric_collection: Option<MetricCollectionConfig>,
      33              :     pub http_config: HttpConfig,
      34              :     pub authentication_config: AuthenticationConfig,
      35              :     #[cfg(feature = "rest_broker")]
      36              :     pub rest_config: RestConfig,
      37              :     pub proxy_protocol_v2: ProxyProtocolV2,
      38              :     pub handshake_timeout: Duration,
      39              :     pub wake_compute_retry_config: RetryConfig,
      40              :     pub connect_compute_locks: ApiLocks<Host>,
      41              :     pub connect_to_compute: ComputeConfig,
      42              :     pub greetings: String, // Greeting message sent to the client after connection establishment and contains session_id.
      43              :     #[cfg(feature = "testing")]
      44              :     pub disable_pg_session_jwt: bool,
      45              : }
      46              : 
      47              : pub struct ComputeConfig {
      48              :     pub retry: RetryConfig,
      49              :     pub tls: Arc<rustls::ClientConfig>,
      50              :     pub timeout: Duration,
      51              : }
      52              : 
      53              : #[derive(Copy, Clone, Debug, ValueEnum, PartialEq)]
      54              : pub enum ProxyProtocolV2 {
      55              :     /// Connection will error if PROXY protocol v2 header is missing
      56              :     Required,
      57              :     /// Connection will error if PROXY protocol v2 header is provided
      58              :     Rejected,
      59              : }
      60              : 
      61              : #[derive(Debug)]
      62              : pub struct MetricCollectionConfig {
      63              :     pub endpoint: reqwest::Url,
      64              :     pub interval: Duration,
      65              :     pub backup_metric_collection_config: MetricBackupCollectionConfig,
      66              : }
      67              : 
      68              : pub struct HttpConfig {
      69              :     pub accept_websockets: bool,
      70              :     pub pool_options: GlobalConnPoolOptions,
      71              :     pub cancel_set: CancelSet,
      72              :     pub client_conn_threshold: u64,
      73              :     pub max_request_size_bytes: usize,
      74              :     pub max_response_size_bytes: usize,
      75              : }
      76              : 
      77              : pub struct AuthenticationConfig {
      78              :     pub thread_pool: Arc<ThreadPool>,
      79              :     pub scram_protocol_timeout: tokio::time::Duration,
      80              :     pub ip_allowlist_check_enabled: bool,
      81              :     pub is_vpc_acccess_proxy: bool,
      82              :     pub jwks_cache: JwkCache,
      83              :     pub is_auth_broker: bool,
      84              :     pub accept_jwts: bool,
      85              :     pub console_redirect_confirmation_timeout: tokio::time::Duration,
      86              : }
      87              : 
      88              : #[cfg(feature = "rest_broker")]
      89              : pub struct RestConfig {
      90              :     pub is_rest_broker: bool,
      91              :     pub db_schema_cache: Option<DbSchemaCache>,
      92              :     pub max_schema_size: usize,
      93              :     pub hostname_prefix: String,
      94              : }
      95              : 
      96              : #[derive(Debug)]
      97              : pub struct MetricBackupCollectionConfig {
      98              :     pub remote_storage_config: Option<RemoteStorageConfig>,
      99              :     pub chunk_size: usize,
     100              : }
     101              : 
     102            1 : pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<RemoteStorageConfig> {
     103            1 :     RemoteStorageConfig::from_toml(&s.parse()?)
     104            1 : }
     105              : 
     106              : /// Helper for cmdline cache options parsing.
     107              : #[derive(Debug)]
     108              : pub struct CacheOptions {
     109              :     /// Max number of entries.
     110              :     pub size: Option<u64>,
     111              :     /// Entry's time-to-live.
     112              :     pub absolute_ttl: Option<Duration>,
     113              :     /// Entry's time-to-idle.
     114              :     pub idle_ttl: Option<Duration>,
     115              : }
     116              : 
     117              : impl CacheOptions {
     118              :     /// Default options for [`crate::cache::node_info::NodeInfoCache`].
     119              :     pub const CACHE_DEFAULT_OPTIONS: &'static str = "size=4000,idle_ttl=4m";
     120              : 
     121              :     /// Parse cache options passed via cmdline.
     122              :     /// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
     123            4 :     fn parse(options: &str) -> anyhow::Result<Self> {
     124            4 :         let mut size = None;
     125            4 :         let mut absolute_ttl = None;
     126            4 :         let mut idle_ttl = None;
     127              : 
     128            7 :         for option in options.split(',') {
     129            7 :             let (key, value) = option
     130            7 :                 .split_once('=')
     131            7 :                 .with_context(|| format!("bad key-value pair: {option}"))?;
     132              : 
     133            7 :             match key {
     134            7 :                 "size" => size = Some(value.parse()?),
     135            3 :                 "absolute_ttl" | "ttl" => absolute_ttl = Some(humantime::parse_duration(value)?),
     136            0 :                 "idle_ttl" | "tti" => idle_ttl = Some(humantime::parse_duration(value)?),
     137            0 :                 unknown => bail!("unknown key: {unknown}"),
     138              :             }
     139              :         }
     140              : 
     141            4 :         Ok(Self {
     142            4 :             size,
     143            4 :             absolute_ttl,
     144            4 :             idle_ttl,
     145            4 :         })
     146            4 :     }
     147              : 
     148           13 :     pub fn moka<K, V, C>(
     149           13 :         &self,
     150           13 :         mut builder: moka::sync::CacheBuilder<K, V, C>,
     151           13 :     ) -> moka::sync::CacheBuilder<K, V, C> {
     152           13 :         if let Some(size) = self.size {
     153           13 :             builder = builder.max_capacity(size);
     154           13 :         }
     155           13 :         if let Some(ttl) = self.absolute_ttl {
     156           13 :             builder = builder.time_to_live(ttl);
     157           13 :         }
     158           13 :         if let Some(tti) = self.idle_ttl {
     159            0 :             builder = builder.time_to_idle(tti);
     160           13 :         }
     161           13 :         builder
     162           13 :     }
     163              : }
     164              : 
     165              : impl FromStr for CacheOptions {
     166              :     type Err = anyhow::Error;
     167              : 
     168            4 :     fn from_str(options: &str) -> Result<Self, Self::Err> {
     169            4 :         let error = || format!("failed to parse cache options '{options}'");
     170            4 :         Self::parse(options).with_context(error)
     171            4 :     }
     172              : }
     173              : 
     174              : /// Helper for cmdline cache options parsing.
     175              : #[derive(Debug)]
     176              : pub struct ProjectInfoCacheOptions {
     177              :     /// Max number of entries.
     178              :     pub size: u64,
     179              :     /// Entry's time-to-live.
     180              :     pub ttl: Duration,
     181              :     /// Max number of roles per endpoint.
     182              :     pub max_roles: u64,
     183              :     /// Gc interval.
     184              :     pub gc_interval: Duration,
     185              : }
     186              : 
     187              : impl ProjectInfoCacheOptions {
     188              :     /// Default options for [`crate::cache::project_info::ProjectInfoCache`].
     189              :     pub const CACHE_DEFAULT_OPTIONS: &'static str =
     190              :         "size=10000,ttl=4m,max_roles=10,gc_interval=60m";
     191              : 
     192              :     /// Parse cache options passed via cmdline.
     193              :     /// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
     194            0 :     fn parse(options: &str) -> anyhow::Result<Self> {
     195            0 :         let mut size = None;
     196            0 :         let mut ttl = None;
     197            0 :         let mut max_roles = None;
     198            0 :         let mut gc_interval = None;
     199              : 
     200            0 :         for option in options.split(',') {
     201            0 :             let (key, value) = option
     202            0 :                 .split_once('=')
     203            0 :                 .with_context(|| format!("bad key-value pair: {option}"))?;
     204              : 
     205            0 :             match key {
     206            0 :                 "size" => size = Some(value.parse()?),
     207            0 :                 "ttl" => ttl = Some(humantime::parse_duration(value)?),
     208            0 :                 "max_roles" => max_roles = Some(value.parse()?),
     209            0 :                 "gc_interval" => gc_interval = Some(humantime::parse_duration(value)?),
     210            0 :                 unknown => bail!("unknown key: {unknown}"),
     211              :             }
     212              :         }
     213              : 
     214              :         // TTL doesn't matter if cache is always empty.
     215            0 :         if let Some(0) = size {
     216            0 :             ttl.get_or_insert(Duration::default());
     217            0 :         }
     218              : 
     219            0 :         Ok(Self {
     220            0 :             size: size.context("missing `size`")?,
     221            0 :             ttl: ttl.context("missing `ttl`")?,
     222            0 :             max_roles: max_roles.context("missing `max_roles`")?,
     223            0 :             gc_interval: gc_interval.context("missing `gc_interval`")?,
     224              :         })
     225            0 :     }
     226              : }
     227              : 
     228              : impl FromStr for ProjectInfoCacheOptions {
     229              :     type Err = anyhow::Error;
     230              : 
     231            0 :     fn from_str(options: &str) -> Result<Self, Self::Err> {
     232            0 :         let error = || format!("failed to parse cache options '{options}'");
     233            0 :         Self::parse(options).with_context(error)
     234            0 :     }
     235              : }
     236              : 
     237              : /// This is a config for connect to compute and wake compute.
     238              : #[derive(Clone, Copy, Debug)]
     239              : pub struct RetryConfig {
     240              :     /// Number of times we should retry.
     241              :     pub max_retries: u32,
     242              :     /// Retry duration is base_delay * backoff_factor ^ n, where n starts at 0
     243              :     pub base_delay: tokio::time::Duration,
     244              :     /// Exponential base for retry wait duration
     245              :     pub backoff_factor: f64,
     246              : }
     247              : 
     248              : impl RetryConfig {
     249              :     // Default options for RetryConfig.
     250              : 
     251              :     /// Total delay for 5 retries with 200ms base delay and 2 backoff factor is about 6s.
     252              :     pub const CONNECT_TO_COMPUTE_DEFAULT_VALUES: &'static str =
     253              :         "num_retries=5,base_retry_wait_duration=200ms,retry_wait_exponent_base=2";
     254              :     /// Total delay for 8 retries with 100ms base delay and 1.6 backoff factor is about 7s.
     255              :     /// Cplane has timeout of 60s on each request. 8m7s in total.
     256              :     pub const WAKE_COMPUTE_DEFAULT_VALUES: &'static str =
     257              :         "num_retries=8,base_retry_wait_duration=100ms,retry_wait_exponent_base=1.6";
     258              : 
     259              :     /// Parse retry options passed via cmdline.
     260              :     /// Example: [`Self::CONNECT_TO_COMPUTE_DEFAULT_VALUES`].
     261            0 :     pub fn parse(options: &str) -> anyhow::Result<Self> {
     262            0 :         let mut num_retries = None;
     263            0 :         let mut base_retry_wait_duration = None;
     264            0 :         let mut retry_wait_exponent_base = None;
     265              : 
     266            0 :         for option in options.split(',') {
     267            0 :             let (key, value) = option
     268            0 :                 .split_once('=')
     269            0 :                 .with_context(|| format!("bad key-value pair: {option}"))?;
     270              : 
     271            0 :             match key {
     272            0 :                 "num_retries" => num_retries = Some(value.parse()?),
     273            0 :                 "base_retry_wait_duration" => {
     274            0 :                     base_retry_wait_duration = Some(humantime::parse_duration(value)?);
     275              :                 }
     276            0 :                 "retry_wait_exponent_base" => retry_wait_exponent_base = Some(value.parse()?),
     277            0 :                 unknown => bail!("unknown key: {unknown}"),
     278              :             }
     279              :         }
     280              : 
     281            0 :         Ok(Self {
     282            0 :             max_retries: num_retries.context("missing `num_retries`")?,
     283            0 :             base_delay: base_retry_wait_duration.context("missing `base_retry_wait_duration`")?,
     284            0 :             backoff_factor: retry_wait_exponent_base
     285            0 :                 .context("missing `retry_wait_exponent_base`")?,
     286              :         })
     287            0 :     }
     288              : }
     289              : 
     290              : /// Helper for cmdline cache options parsing.
     291              : #[derive(serde::Deserialize)]
     292              : pub struct ConcurrencyLockOptions {
     293              :     /// The number of shards the lock map should have
     294              :     pub shards: usize,
     295              :     /// The number of allowed concurrent requests for each endpoitn
     296              :     #[serde(flatten)]
     297              :     pub limiter: RateLimiterConfig,
     298              :     /// Garbage collection epoch
     299              :     #[serde(deserialize_with = "humantime_serde::deserialize")]
     300              :     pub epoch: Duration,
     301              :     /// Lock timeout
     302              :     #[serde(deserialize_with = "humantime_serde::deserialize")]
     303              :     pub timeout: Duration,
     304              : }
     305              : 
     306              : impl ConcurrencyLockOptions {
     307              :     /// Default options for [`crate::control_plane::client::ApiLocks`].
     308              :     pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
     309              :     /// Default options for [`crate::control_plane::client::ApiLocks`].
     310              :     pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str =
     311              :         "shards=64,permits=100,epoch=10m,timeout=10ms";
     312              : 
     313              :     // pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
     314              : 
     315              :     /// Parse lock options passed via cmdline.
     316              :     /// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
     317            4 :     fn parse(options: &str) -> anyhow::Result<Self> {
     318            4 :         let options = options.trim();
     319            4 :         if options.starts_with('{') && options.ends_with('}') {
     320            1 :             return Ok(serde_json::from_str(options)?);
     321            3 :         }
     322              : 
     323            3 :         let mut shards = None;
     324            3 :         let mut permits = None;
     325            3 :         let mut epoch = None;
     326            3 :         let mut timeout = None;
     327              : 
     328            9 :         for option in options.split(',') {
     329            9 :             let (key, value) = option
     330            9 :                 .split_once('=')
     331            9 :                 .with_context(|| format!("bad key-value pair: {option}"))?;
     332              : 
     333            9 :             match key {
     334            9 :                 "shards" => shards = Some(value.parse()?),
     335            7 :                 "permits" => permits = Some(value.parse()?),
     336            4 :                 "epoch" => epoch = Some(humantime::parse_duration(value)?),
     337            2 :                 "timeout" => timeout = Some(humantime::parse_duration(value)?),
     338            0 :                 unknown => bail!("unknown key: {unknown}"),
     339              :             }
     340              :         }
     341              : 
     342              :         // these dont matter if lock is disabled
     343            3 :         if let Some(0) = permits {
     344            1 :             timeout = Some(Duration::default());
     345            1 :             epoch = Some(Duration::default());
     346            1 :             shards = Some(2);
     347            2 :         }
     348              : 
     349            3 :         let permits = permits.context("missing `permits`")?;
     350            3 :         let out = Self {
     351            3 :             shards: shards.context("missing `shards`")?,
     352            3 :             limiter: RateLimiterConfig {
     353            3 :                 algorithm: RateLimitAlgorithm::Fixed,
     354            3 :                 initial_limit: permits,
     355            3 :             },
     356            3 :             epoch: epoch.context("missing `epoch`")?,
     357            3 :             timeout: timeout.context("missing `timeout`")?,
     358              :         };
     359              : 
     360            3 :         ensure!(out.shards > 1, "shard count must be > 1");
     361            3 :         ensure!(
     362            3 :             out.shards.is_power_of_two(),
     363            0 :             "shard count must be a power of two"
     364              :         );
     365              : 
     366            3 :         Ok(out)
     367            4 :     }
     368              : }
     369              : 
     370              : impl FromStr for ConcurrencyLockOptions {
     371              :     type Err = anyhow::Error;
     372              : 
     373            4 :     fn from_str(options: &str) -> Result<Self, Self::Err> {
     374            4 :         let error = || format!("failed to parse cache lock options '{options}'");
     375            4 :         Self::parse(options).with_context(error)
     376            4 :     }
     377              : }
     378              : 
     379              : #[derive(Error, Debug)]
     380              : pub(crate) enum RefreshConfigError {
     381              :     #[error(transparent)]
     382              :     Read(#[from] std::io::Error),
     383              :     #[error(transparent)]
     384              :     Parse(#[from] serde_json::Error),
     385              :     #[error(transparent)]
     386              :     Validate(anyhow::Error),
     387              :     #[error(transparent)]
     388              :     Tls(anyhow::Error),
     389              : }
     390              : 
     391            0 : pub(crate) async fn refresh_config_loop(config: &ProxyConfig, path: Utf8PathBuf, rx: Arc<Notify>) {
     392            0 :     let mut init = true;
     393              :     loop {
     394            0 :         rx.notified().await;
     395              : 
     396            0 :         match refresh_config_inner(config, &path).await {
     397            0 :             std::result::Result::Ok(()) => {}
     398              :             // don't log for file not found errors if this is the first time we are checking
     399              :             // for computes that don't use local_proxy, this is not an error.
     400            0 :             Err(RefreshConfigError::Read(e))
     401            0 :                 if init && e.kind() == std::io::ErrorKind::NotFound =>
     402              :             {
     403            0 :                 debug!(error=?e, ?path, "could not read config file");
     404              :             }
     405            0 :             Err(RefreshConfigError::Tls(e)) => {
     406            0 :                 error!(error=?e, ?path, "could not read TLS certificates");
     407              :             }
     408            0 :             Err(e) => {
     409            0 :                 error!(error=?e, ?path, "could not read config file");
     410              :             }
     411              :         }
     412              : 
     413            0 :         init = false;
     414              :     }
     415              : }
     416              : 
     417            0 : pub(crate) async fn refresh_config_inner(
     418            0 :     config: &ProxyConfig,
     419            0 :     path: &Utf8Path,
     420            0 : ) -> Result<(), RefreshConfigError> {
     421            0 :     let bytes = tokio::fs::read(&path).await?;
     422            0 :     let data: LocalProxySpec = serde_json::from_slice(&bytes)?;
     423              : 
     424            0 :     let mut jwks_set = vec![];
     425              : 
     426            0 :     fn parse_jwks_settings(jwks: compute_api::spec::JwksSettings) -> anyhow::Result<JwksSettings> {
     427            0 :         let mut jwks_url = url::Url::from_str(&jwks.jwks_url).context("parsing JWKS url")?;
     428              : 
     429            0 :         ensure!(
     430            0 :             jwks_url.has_authority()
     431            0 :                 && (jwks_url.scheme() == "http" || jwks_url.scheme() == "https"),
     432            0 :             "Invalid JWKS url. Must be HTTP",
     433              :         );
     434              : 
     435            0 :         ensure!(
     436            0 :             jwks_url.host().is_some_and(|h| h != url::Host::Domain("")),
     437            0 :             "Invalid JWKS url. No domain listed",
     438              :         );
     439              : 
     440              :         // clear username, password and ports
     441            0 :         jwks_url
     442            0 :             .set_username("")
     443            0 :             .expect("url can be a base and has a valid host and is not a file. should not error");
     444            0 :         jwks_url
     445            0 :             .set_password(None)
     446            0 :             .expect("url can be a base and has a valid host and is not a file. should not error");
     447              :         // local testing is hard if we need to have a specific restricted port
     448            0 :         if cfg!(not(feature = "testing")) {
     449            0 :             jwks_url.set_port(None).expect(
     450            0 :                 "url can be a base and has a valid host and is not a file. should not error",
     451            0 :             );
     452            0 :         }
     453              : 
     454              :         // clear query params
     455            0 :         jwks_url.set_fragment(None);
     456            0 :         jwks_url.query_pairs_mut().clear().finish();
     457              : 
     458            0 :         if jwks_url.scheme() != "https" {
     459              :             // local testing is hard if we need to set up https support.
     460            0 :             if cfg!(not(feature = "testing")) {
     461            0 :                 jwks_url
     462            0 :                     .set_scheme("https")
     463            0 :                     .expect("should not error to set the scheme to https if it was http");
     464            0 :             } else {
     465            0 :                 warn!(scheme = jwks_url.scheme(), "JWKS url is not HTTPS");
     466              :             }
     467            0 :         }
     468              : 
     469            0 :         Ok(JwksSettings {
     470            0 :             id: jwks.id,
     471            0 :             jwks_url,
     472            0 :             _provider_name: jwks.provider_name,
     473            0 :             jwt_audience: jwks.jwt_audience,
     474            0 :             role_names: jwks
     475            0 :                 .role_names
     476            0 :                 .into_iter()
     477            0 :                 .map(RoleName::from)
     478            0 :                 .map(|s| RoleNameInt::from(&s))
     479            0 :                 .collect(),
     480              :         })
     481            0 :     }
     482              : 
     483            0 :     for jwks in data.jwks.into_iter().flatten() {
     484            0 :         jwks_set.push(parse_jwks_settings(jwks).map_err(RefreshConfigError::Validate)?);
     485              :     }
     486              : 
     487            0 :     info!("successfully loaded new config");
     488            0 :     JWKS_ROLE_MAP.store(Some(Arc::new(EndpointJwksResponse { jwks: jwks_set })));
     489              : 
     490            0 :     if let Some(tls_config) = data.tls {
     491            0 :         let tls_config = tokio::task::spawn_blocking(move || {
     492            0 :             crate::tls::server_config::configure_tls(
     493            0 :                 tls_config.key_path.as_ref(),
     494            0 :                 tls_config.cert_path.as_ref(),
     495            0 :                 None,
     496              :                 false,
     497              :             )
     498            0 :         })
     499            0 :         .await
     500            0 :         .propagate_task_panic()
     501            0 :         .map_err(RefreshConfigError::Tls)?;
     502            0 :         config.tls_config.store(Some(Arc::new(tls_config)));
     503            0 :     }
     504              : 
     505            0 :     std::result::Result::Ok(())
     506            0 : }
     507              : 
     508              : #[cfg(test)]
     509              : mod tests {
     510              :     use super::*;
     511              :     use crate::rate_limiter::Aimd;
     512              : 
     513              :     #[test]
     514            1 :     fn test_parse_cache_options() -> anyhow::Result<()> {
     515              :         let CacheOptions {
     516            1 :             size,
     517            1 :             absolute_ttl,
     518              :             idle_ttl: _,
     519            1 :         } = "size=4096,ttl=5min".parse()?;
     520            1 :         assert_eq!(size, Some(4096));
     521            1 :         assert_eq!(absolute_ttl, Some(Duration::from_secs(5 * 60)));
     522              : 
     523              :         let CacheOptions {
     524            1 :             size,
     525            1 :             absolute_ttl,
     526              :             idle_ttl: _,
     527            1 :         } = "ttl=4m,size=2".parse()?;
     528            1 :         assert_eq!(size, Some(2));
     529            1 :         assert_eq!(absolute_ttl, Some(Duration::from_secs(4 * 60)));
     530              : 
     531              :         let CacheOptions {
     532            1 :             size,
     533            1 :             absolute_ttl,
     534              :             idle_ttl: _,
     535            1 :         } = "size=0,ttl=1s".parse()?;
     536            1 :         assert_eq!(size, Some(0));
     537            1 :         assert_eq!(absolute_ttl, Some(Duration::from_secs(1)));
     538              : 
     539              :         let CacheOptions {
     540            1 :             size,
     541            1 :             absolute_ttl,
     542              :             idle_ttl: _,
     543            1 :         } = "size=0".parse()?;
     544            1 :         assert_eq!(size, Some(0));
     545            1 :         assert_eq!(absolute_ttl, None);
     546              : 
     547            1 :         Ok(())
     548            1 :     }
     549              : 
     550              :     #[test]
     551            1 :     fn test_parse_lock_options() -> anyhow::Result<()> {
     552              :         let ConcurrencyLockOptions {
     553            1 :             epoch,
     554            1 :             limiter,
     555            1 :             shards,
     556            1 :             timeout,
     557            1 :         } = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?;
     558            1 :         assert_eq!(epoch, Duration::from_secs(10 * 60));
     559            1 :         assert_eq!(timeout, Duration::from_secs(1));
     560            1 :         assert_eq!(shards, 32);
     561            1 :         assert_eq!(limiter.initial_limit, 4);
     562            1 :         assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
     563              : 
     564              :         let ConcurrencyLockOptions {
     565            1 :             epoch,
     566            1 :             limiter,
     567            1 :             shards,
     568            1 :             timeout,
     569            1 :         } = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?;
     570            1 :         assert_eq!(epoch, Duration::from_secs(60));
     571            1 :         assert_eq!(timeout, Duration::from_millis(100));
     572            1 :         assert_eq!(shards, 16);
     573            1 :         assert_eq!(limiter.initial_limit, 8);
     574            1 :         assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
     575              : 
     576              :         let ConcurrencyLockOptions {
     577            1 :             epoch,
     578            1 :             limiter,
     579            1 :             shards,
     580            1 :             timeout,
     581            1 :         } = "permits=0".parse()?;
     582            1 :         assert_eq!(epoch, Duration::ZERO);
     583            1 :         assert_eq!(timeout, Duration::ZERO);
     584            1 :         assert_eq!(shards, 2);
     585            1 :         assert_eq!(limiter.initial_limit, 0);
     586            1 :         assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
     587              : 
     588            1 :         Ok(())
     589            1 :     }
     590              : 
     591              :     #[test]
     592            1 :     fn test_parse_json_lock_options() -> anyhow::Result<()> {
     593              :         let ConcurrencyLockOptions {
     594            1 :             epoch,
     595            1 :             limiter,
     596            1 :             shards,
     597            1 :             timeout,
     598            1 :         } = r#"{"shards":32,"initial_limit":44,"aimd":{"min":5,"max":500,"inc":10,"dec":0.9,"utilisation":0.8},"epoch":"10m","timeout":"1s"}"#
     599            1 :             .parse()?;
     600            1 :         assert_eq!(epoch, Duration::from_secs(10 * 60));
     601            1 :         assert_eq!(timeout, Duration::from_secs(1));
     602            1 :         assert_eq!(shards, 32);
     603            1 :         assert_eq!(limiter.initial_limit, 44);
     604            1 :         assert_eq!(
     605              :             limiter.algorithm,
     606              :             RateLimitAlgorithm::Aimd {
     607              :                 conf: Aimd {
     608              :                     min: 5,
     609              :                     max: 500,
     610              :                     dec: 0.9,
     611              :                     inc: 10,
     612              :                     utilisation: 0.8
     613              :                 }
     614              :             },
     615              :         );
     616              : 
     617            1 :         Ok(())
     618            1 :     }
     619              : }
        

Generated by: LCOV version 2.1-beta