Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use anyhow::{Context, Ok, bail, ensure};
6 : use arc_swap::ArcSwapOption;
7 : use camino::{Utf8Path, Utf8PathBuf};
8 : use clap::ValueEnum;
9 : use compute_api::spec::LocalProxySpec;
10 : use remote_storage::RemoteStorageConfig;
11 : use thiserror::Error;
12 : use tokio::sync::Notify;
13 : use tracing::{debug, error, info, warn};
14 :
15 : use crate::auth::backend::jwt::JwkCache;
16 : use crate::auth::backend::local::JWKS_ROLE_MAP;
17 : use crate::control_plane::locks::ApiLocks;
18 : use crate::control_plane::messages::{EndpointJwksResponse, JwksSettings};
19 : use crate::ext::TaskExt;
20 : use crate::intern::RoleNameInt;
21 : use crate::rate_limiter::{RateLimitAlgorithm, RateLimiterConfig};
22 : use crate::scram::threadpool::ThreadPool;
23 : use crate::serverless::GlobalConnPoolOptions;
24 : use crate::serverless::cancel_set::CancelSet;
25 : #[cfg(feature = "rest_broker")]
26 : use crate::serverless::rest::DbSchemaCache;
27 : pub use crate::tls::server_config::{TlsConfig, configure_tls};
28 : use crate::types::{Host, RoleName};
29 :
30 : pub struct ProxyConfig {
31 : pub tls_config: ArcSwapOption<TlsConfig>,
32 : pub metric_collection: Option<MetricCollectionConfig>,
33 : pub http_config: HttpConfig,
34 : pub authentication_config: AuthenticationConfig,
35 : #[cfg(feature = "rest_broker")]
36 : pub rest_config: RestConfig,
37 : pub proxy_protocol_v2: ProxyProtocolV2,
38 : pub handshake_timeout: Duration,
39 : pub wake_compute_retry_config: RetryConfig,
40 : pub connect_compute_locks: ApiLocks<Host>,
41 : pub connect_to_compute: ComputeConfig,
42 : pub greetings: String, // Greeting message sent to the client after connection establishment and contains session_id.
43 : #[cfg(feature = "testing")]
44 : pub disable_pg_session_jwt: bool,
45 : }
46 :
47 : pub struct ComputeConfig {
48 : pub retry: RetryConfig,
49 : pub tls: Arc<rustls::ClientConfig>,
50 : pub timeout: Duration,
51 : }
52 :
53 : #[derive(Copy, Clone, Debug, ValueEnum, PartialEq)]
54 : pub enum ProxyProtocolV2 {
55 : /// Connection will error if PROXY protocol v2 header is missing
56 : Required,
57 : /// Connection will error if PROXY protocol v2 header is provided
58 : Rejected,
59 : }
60 :
61 : #[derive(Debug)]
62 : pub struct MetricCollectionConfig {
63 : pub endpoint: reqwest::Url,
64 : pub interval: Duration,
65 : pub backup_metric_collection_config: MetricBackupCollectionConfig,
66 : }
67 :
68 : pub struct HttpConfig {
69 : pub accept_websockets: bool,
70 : pub pool_options: GlobalConnPoolOptions,
71 : pub cancel_set: CancelSet,
72 : pub client_conn_threshold: u64,
73 : pub max_request_size_bytes: usize,
74 : pub max_response_size_bytes: usize,
75 : }
76 :
77 : pub struct AuthenticationConfig {
78 : pub thread_pool: Arc<ThreadPool>,
79 : pub scram_protocol_timeout: tokio::time::Duration,
80 : pub ip_allowlist_check_enabled: bool,
81 : pub is_vpc_acccess_proxy: bool,
82 : pub jwks_cache: JwkCache,
83 : pub is_auth_broker: bool,
84 : pub accept_jwts: bool,
85 : pub console_redirect_confirmation_timeout: tokio::time::Duration,
86 : }
87 :
88 : #[cfg(feature = "rest_broker")]
89 : pub struct RestConfig {
90 : pub is_rest_broker: bool,
91 : pub db_schema_cache: Option<DbSchemaCache>,
92 : pub max_schema_size: usize,
93 : pub hostname_prefix: String,
94 : }
95 :
96 : #[derive(Debug)]
97 : pub struct MetricBackupCollectionConfig {
98 : pub remote_storage_config: Option<RemoteStorageConfig>,
99 : pub chunk_size: usize,
100 : }
101 :
102 1 : pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<RemoteStorageConfig> {
103 1 : RemoteStorageConfig::from_toml(&s.parse()?)
104 1 : }
105 :
106 : /// Helper for cmdline cache options parsing.
107 : #[derive(Debug)]
108 : pub struct CacheOptions {
109 : /// Max number of entries.
110 : pub size: Option<u64>,
111 : /// Entry's time-to-live.
112 : pub absolute_ttl: Option<Duration>,
113 : /// Entry's time-to-idle.
114 : pub idle_ttl: Option<Duration>,
115 : }
116 :
117 : impl CacheOptions {
118 : /// Default options for [`crate::cache::node_info::NodeInfoCache`].
119 : pub const CACHE_DEFAULT_OPTIONS: &'static str = "size=4000,idle_ttl=4m";
120 :
121 : /// Parse cache options passed via cmdline.
122 : /// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
123 4 : fn parse(options: &str) -> anyhow::Result<Self> {
124 4 : let mut size = None;
125 4 : let mut absolute_ttl = None;
126 4 : let mut idle_ttl = None;
127 :
128 7 : for option in options.split(',') {
129 7 : let (key, value) = option
130 7 : .split_once('=')
131 7 : .with_context(|| format!("bad key-value pair: {option}"))?;
132 :
133 7 : match key {
134 7 : "size" => size = Some(value.parse()?),
135 3 : "absolute_ttl" | "ttl" => absolute_ttl = Some(humantime::parse_duration(value)?),
136 0 : "idle_ttl" | "tti" => idle_ttl = Some(humantime::parse_duration(value)?),
137 0 : unknown => bail!("unknown key: {unknown}"),
138 : }
139 : }
140 :
141 4 : Ok(Self {
142 4 : size,
143 4 : absolute_ttl,
144 4 : idle_ttl,
145 4 : })
146 4 : }
147 :
148 13 : pub fn moka<K, V, C>(
149 13 : &self,
150 13 : mut builder: moka::sync::CacheBuilder<K, V, C>,
151 13 : ) -> moka::sync::CacheBuilder<K, V, C> {
152 13 : if let Some(size) = self.size {
153 13 : builder = builder.max_capacity(size);
154 13 : }
155 13 : if let Some(ttl) = self.absolute_ttl {
156 13 : builder = builder.time_to_live(ttl);
157 13 : }
158 13 : if let Some(tti) = self.idle_ttl {
159 0 : builder = builder.time_to_idle(tti);
160 13 : }
161 13 : builder
162 13 : }
163 : }
164 :
165 : impl FromStr for CacheOptions {
166 : type Err = anyhow::Error;
167 :
168 4 : fn from_str(options: &str) -> Result<Self, Self::Err> {
169 4 : let error = || format!("failed to parse cache options '{options}'");
170 4 : Self::parse(options).with_context(error)
171 4 : }
172 : }
173 :
174 : /// Helper for cmdline cache options parsing.
175 : #[derive(Debug)]
176 : pub struct ProjectInfoCacheOptions {
177 : /// Max number of entries.
178 : pub size: u64,
179 : /// Entry's time-to-live.
180 : pub ttl: Duration,
181 : /// Max number of roles per endpoint.
182 : pub max_roles: u64,
183 : /// Gc interval.
184 : pub gc_interval: Duration,
185 : }
186 :
187 : impl ProjectInfoCacheOptions {
188 : /// Default options for [`crate::cache::project_info::ProjectInfoCache`].
189 : pub const CACHE_DEFAULT_OPTIONS: &'static str =
190 : "size=10000,ttl=4m,max_roles=10,gc_interval=60m";
191 :
192 : /// Parse cache options passed via cmdline.
193 : /// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
194 0 : fn parse(options: &str) -> anyhow::Result<Self> {
195 0 : let mut size = None;
196 0 : let mut ttl = None;
197 0 : let mut max_roles = None;
198 0 : let mut gc_interval = None;
199 :
200 0 : for option in options.split(',') {
201 0 : let (key, value) = option
202 0 : .split_once('=')
203 0 : .with_context(|| format!("bad key-value pair: {option}"))?;
204 :
205 0 : match key {
206 0 : "size" => size = Some(value.parse()?),
207 0 : "ttl" => ttl = Some(humantime::parse_duration(value)?),
208 0 : "max_roles" => max_roles = Some(value.parse()?),
209 0 : "gc_interval" => gc_interval = Some(humantime::parse_duration(value)?),
210 0 : unknown => bail!("unknown key: {unknown}"),
211 : }
212 : }
213 :
214 : // TTL doesn't matter if cache is always empty.
215 0 : if let Some(0) = size {
216 0 : ttl.get_or_insert(Duration::default());
217 0 : }
218 :
219 0 : Ok(Self {
220 0 : size: size.context("missing `size`")?,
221 0 : ttl: ttl.context("missing `ttl`")?,
222 0 : max_roles: max_roles.context("missing `max_roles`")?,
223 0 : gc_interval: gc_interval.context("missing `gc_interval`")?,
224 : })
225 0 : }
226 : }
227 :
228 : impl FromStr for ProjectInfoCacheOptions {
229 : type Err = anyhow::Error;
230 :
231 0 : fn from_str(options: &str) -> Result<Self, Self::Err> {
232 0 : let error = || format!("failed to parse cache options '{options}'");
233 0 : Self::parse(options).with_context(error)
234 0 : }
235 : }
236 :
237 : /// This is a config for connect to compute and wake compute.
238 : #[derive(Clone, Copy, Debug)]
239 : pub struct RetryConfig {
240 : /// Number of times we should retry.
241 : pub max_retries: u32,
242 : /// Retry duration is base_delay * backoff_factor ^ n, where n starts at 0
243 : pub base_delay: tokio::time::Duration,
244 : /// Exponential base for retry wait duration
245 : pub backoff_factor: f64,
246 : }
247 :
248 : impl RetryConfig {
249 : // Default options for RetryConfig.
250 :
251 : /// Total delay for 5 retries with 200ms base delay and 2 backoff factor is about 6s.
252 : pub const CONNECT_TO_COMPUTE_DEFAULT_VALUES: &'static str =
253 : "num_retries=5,base_retry_wait_duration=200ms,retry_wait_exponent_base=2";
254 : /// Total delay for 8 retries with 100ms base delay and 1.6 backoff factor is about 7s.
255 : /// Cplane has timeout of 60s on each request. 8m7s in total.
256 : pub const WAKE_COMPUTE_DEFAULT_VALUES: &'static str =
257 : "num_retries=8,base_retry_wait_duration=100ms,retry_wait_exponent_base=1.6";
258 :
259 : /// Parse retry options passed via cmdline.
260 : /// Example: [`Self::CONNECT_TO_COMPUTE_DEFAULT_VALUES`].
261 0 : pub fn parse(options: &str) -> anyhow::Result<Self> {
262 0 : let mut num_retries = None;
263 0 : let mut base_retry_wait_duration = None;
264 0 : let mut retry_wait_exponent_base = None;
265 :
266 0 : for option in options.split(',') {
267 0 : let (key, value) = option
268 0 : .split_once('=')
269 0 : .with_context(|| format!("bad key-value pair: {option}"))?;
270 :
271 0 : match key {
272 0 : "num_retries" => num_retries = Some(value.parse()?),
273 0 : "base_retry_wait_duration" => {
274 0 : base_retry_wait_duration = Some(humantime::parse_duration(value)?);
275 : }
276 0 : "retry_wait_exponent_base" => retry_wait_exponent_base = Some(value.parse()?),
277 0 : unknown => bail!("unknown key: {unknown}"),
278 : }
279 : }
280 :
281 0 : Ok(Self {
282 0 : max_retries: num_retries.context("missing `num_retries`")?,
283 0 : base_delay: base_retry_wait_duration.context("missing `base_retry_wait_duration`")?,
284 0 : backoff_factor: retry_wait_exponent_base
285 0 : .context("missing `retry_wait_exponent_base`")?,
286 : })
287 0 : }
288 : }
289 :
290 : /// Helper for cmdline cache options parsing.
291 : #[derive(serde::Deserialize)]
292 : pub struct ConcurrencyLockOptions {
293 : /// The number of shards the lock map should have
294 : pub shards: usize,
295 : /// The number of allowed concurrent requests for each endpoitn
296 : #[serde(flatten)]
297 : pub limiter: RateLimiterConfig,
298 : /// Garbage collection epoch
299 : #[serde(deserialize_with = "humantime_serde::deserialize")]
300 : pub epoch: Duration,
301 : /// Lock timeout
302 : #[serde(deserialize_with = "humantime_serde::deserialize")]
303 : pub timeout: Duration,
304 : }
305 :
306 : impl ConcurrencyLockOptions {
307 : /// Default options for [`crate::control_plane::client::ApiLocks`].
308 : pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
309 : /// Default options for [`crate::control_plane::client::ApiLocks`].
310 : pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str =
311 : "shards=64,permits=100,epoch=10m,timeout=10ms";
312 :
313 : // pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
314 :
315 : /// Parse lock options passed via cmdline.
316 : /// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
317 4 : fn parse(options: &str) -> anyhow::Result<Self> {
318 4 : let options = options.trim();
319 4 : if options.starts_with('{') && options.ends_with('}') {
320 1 : return Ok(serde_json::from_str(options)?);
321 3 : }
322 :
323 3 : let mut shards = None;
324 3 : let mut permits = None;
325 3 : let mut epoch = None;
326 3 : let mut timeout = None;
327 :
328 9 : for option in options.split(',') {
329 9 : let (key, value) = option
330 9 : .split_once('=')
331 9 : .with_context(|| format!("bad key-value pair: {option}"))?;
332 :
333 9 : match key {
334 9 : "shards" => shards = Some(value.parse()?),
335 7 : "permits" => permits = Some(value.parse()?),
336 4 : "epoch" => epoch = Some(humantime::parse_duration(value)?),
337 2 : "timeout" => timeout = Some(humantime::parse_duration(value)?),
338 0 : unknown => bail!("unknown key: {unknown}"),
339 : }
340 : }
341 :
342 : // these dont matter if lock is disabled
343 3 : if let Some(0) = permits {
344 1 : timeout = Some(Duration::default());
345 1 : epoch = Some(Duration::default());
346 1 : shards = Some(2);
347 2 : }
348 :
349 3 : let permits = permits.context("missing `permits`")?;
350 3 : let out = Self {
351 3 : shards: shards.context("missing `shards`")?,
352 3 : limiter: RateLimiterConfig {
353 3 : algorithm: RateLimitAlgorithm::Fixed,
354 3 : initial_limit: permits,
355 3 : },
356 3 : epoch: epoch.context("missing `epoch`")?,
357 3 : timeout: timeout.context("missing `timeout`")?,
358 : };
359 :
360 3 : ensure!(out.shards > 1, "shard count must be > 1");
361 3 : ensure!(
362 3 : out.shards.is_power_of_two(),
363 0 : "shard count must be a power of two"
364 : );
365 :
366 3 : Ok(out)
367 4 : }
368 : }
369 :
370 : impl FromStr for ConcurrencyLockOptions {
371 : type Err = anyhow::Error;
372 :
373 4 : fn from_str(options: &str) -> Result<Self, Self::Err> {
374 4 : let error = || format!("failed to parse cache lock options '{options}'");
375 4 : Self::parse(options).with_context(error)
376 4 : }
377 : }
378 :
379 : #[derive(Error, Debug)]
380 : pub(crate) enum RefreshConfigError {
381 : #[error(transparent)]
382 : Read(#[from] std::io::Error),
383 : #[error(transparent)]
384 : Parse(#[from] serde_json::Error),
385 : #[error(transparent)]
386 : Validate(anyhow::Error),
387 : #[error(transparent)]
388 : Tls(anyhow::Error),
389 : }
390 :
391 0 : pub(crate) async fn refresh_config_loop(config: &ProxyConfig, path: Utf8PathBuf, rx: Arc<Notify>) {
392 0 : let mut init = true;
393 : loop {
394 0 : rx.notified().await;
395 :
396 0 : match refresh_config_inner(config, &path).await {
397 0 : std::result::Result::Ok(()) => {}
398 : // don't log for file not found errors if this is the first time we are checking
399 : // for computes that don't use local_proxy, this is not an error.
400 0 : Err(RefreshConfigError::Read(e))
401 0 : if init && e.kind() == std::io::ErrorKind::NotFound =>
402 : {
403 0 : debug!(error=?e, ?path, "could not read config file");
404 : }
405 0 : Err(RefreshConfigError::Tls(e)) => {
406 0 : error!(error=?e, ?path, "could not read TLS certificates");
407 : }
408 0 : Err(e) => {
409 0 : error!(error=?e, ?path, "could not read config file");
410 : }
411 : }
412 :
413 0 : init = false;
414 : }
415 : }
416 :
417 0 : pub(crate) async fn refresh_config_inner(
418 0 : config: &ProxyConfig,
419 0 : path: &Utf8Path,
420 0 : ) -> Result<(), RefreshConfigError> {
421 0 : let bytes = tokio::fs::read(&path).await?;
422 0 : let data: LocalProxySpec = serde_json::from_slice(&bytes)?;
423 :
424 0 : let mut jwks_set = vec![];
425 :
426 0 : fn parse_jwks_settings(jwks: compute_api::spec::JwksSettings) -> anyhow::Result<JwksSettings> {
427 0 : let mut jwks_url = url::Url::from_str(&jwks.jwks_url).context("parsing JWKS url")?;
428 :
429 0 : ensure!(
430 0 : jwks_url.has_authority()
431 0 : && (jwks_url.scheme() == "http" || jwks_url.scheme() == "https"),
432 0 : "Invalid JWKS url. Must be HTTP",
433 : );
434 :
435 0 : ensure!(
436 0 : jwks_url.host().is_some_and(|h| h != url::Host::Domain("")),
437 0 : "Invalid JWKS url. No domain listed",
438 : );
439 :
440 : // clear username, password and ports
441 0 : jwks_url
442 0 : .set_username("")
443 0 : .expect("url can be a base and has a valid host and is not a file. should not error");
444 0 : jwks_url
445 0 : .set_password(None)
446 0 : .expect("url can be a base and has a valid host and is not a file. should not error");
447 : // local testing is hard if we need to have a specific restricted port
448 0 : if cfg!(not(feature = "testing")) {
449 0 : jwks_url.set_port(None).expect(
450 0 : "url can be a base and has a valid host and is not a file. should not error",
451 0 : );
452 0 : }
453 :
454 : // clear query params
455 0 : jwks_url.set_fragment(None);
456 0 : jwks_url.query_pairs_mut().clear().finish();
457 :
458 0 : if jwks_url.scheme() != "https" {
459 : // local testing is hard if we need to set up https support.
460 0 : if cfg!(not(feature = "testing")) {
461 0 : jwks_url
462 0 : .set_scheme("https")
463 0 : .expect("should not error to set the scheme to https if it was http");
464 0 : } else {
465 0 : warn!(scheme = jwks_url.scheme(), "JWKS url is not HTTPS");
466 : }
467 0 : }
468 :
469 0 : Ok(JwksSettings {
470 0 : id: jwks.id,
471 0 : jwks_url,
472 0 : _provider_name: jwks.provider_name,
473 0 : jwt_audience: jwks.jwt_audience,
474 0 : role_names: jwks
475 0 : .role_names
476 0 : .into_iter()
477 0 : .map(RoleName::from)
478 0 : .map(|s| RoleNameInt::from(&s))
479 0 : .collect(),
480 : })
481 0 : }
482 :
483 0 : for jwks in data.jwks.into_iter().flatten() {
484 0 : jwks_set.push(parse_jwks_settings(jwks).map_err(RefreshConfigError::Validate)?);
485 : }
486 :
487 0 : info!("successfully loaded new config");
488 0 : JWKS_ROLE_MAP.store(Some(Arc::new(EndpointJwksResponse { jwks: jwks_set })));
489 :
490 0 : if let Some(tls_config) = data.tls {
491 0 : let tls_config = tokio::task::spawn_blocking(move || {
492 0 : crate::tls::server_config::configure_tls(
493 0 : tls_config.key_path.as_ref(),
494 0 : tls_config.cert_path.as_ref(),
495 0 : None,
496 : false,
497 : )
498 0 : })
499 0 : .await
500 0 : .propagate_task_panic()
501 0 : .map_err(RefreshConfigError::Tls)?;
502 0 : config.tls_config.store(Some(Arc::new(tls_config)));
503 0 : }
504 :
505 0 : std::result::Result::Ok(())
506 0 : }
507 :
508 : #[cfg(test)]
509 : mod tests {
510 : use super::*;
511 : use crate::rate_limiter::Aimd;
512 :
513 : #[test]
514 1 : fn test_parse_cache_options() -> anyhow::Result<()> {
515 : let CacheOptions {
516 1 : size,
517 1 : absolute_ttl,
518 : idle_ttl: _,
519 1 : } = "size=4096,ttl=5min".parse()?;
520 1 : assert_eq!(size, Some(4096));
521 1 : assert_eq!(absolute_ttl, Some(Duration::from_secs(5 * 60)));
522 :
523 : let CacheOptions {
524 1 : size,
525 1 : absolute_ttl,
526 : idle_ttl: _,
527 1 : } = "ttl=4m,size=2".parse()?;
528 1 : assert_eq!(size, Some(2));
529 1 : assert_eq!(absolute_ttl, Some(Duration::from_secs(4 * 60)));
530 :
531 : let CacheOptions {
532 1 : size,
533 1 : absolute_ttl,
534 : idle_ttl: _,
535 1 : } = "size=0,ttl=1s".parse()?;
536 1 : assert_eq!(size, Some(0));
537 1 : assert_eq!(absolute_ttl, Some(Duration::from_secs(1)));
538 :
539 : let CacheOptions {
540 1 : size,
541 1 : absolute_ttl,
542 : idle_ttl: _,
543 1 : } = "size=0".parse()?;
544 1 : assert_eq!(size, Some(0));
545 1 : assert_eq!(absolute_ttl, None);
546 :
547 1 : Ok(())
548 1 : }
549 :
550 : #[test]
551 1 : fn test_parse_lock_options() -> anyhow::Result<()> {
552 : let ConcurrencyLockOptions {
553 1 : epoch,
554 1 : limiter,
555 1 : shards,
556 1 : timeout,
557 1 : } = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?;
558 1 : assert_eq!(epoch, Duration::from_secs(10 * 60));
559 1 : assert_eq!(timeout, Duration::from_secs(1));
560 1 : assert_eq!(shards, 32);
561 1 : assert_eq!(limiter.initial_limit, 4);
562 1 : assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
563 :
564 : let ConcurrencyLockOptions {
565 1 : epoch,
566 1 : limiter,
567 1 : shards,
568 1 : timeout,
569 1 : } = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?;
570 1 : assert_eq!(epoch, Duration::from_secs(60));
571 1 : assert_eq!(timeout, Duration::from_millis(100));
572 1 : assert_eq!(shards, 16);
573 1 : assert_eq!(limiter.initial_limit, 8);
574 1 : assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
575 :
576 : let ConcurrencyLockOptions {
577 1 : epoch,
578 1 : limiter,
579 1 : shards,
580 1 : timeout,
581 1 : } = "permits=0".parse()?;
582 1 : assert_eq!(epoch, Duration::ZERO);
583 1 : assert_eq!(timeout, Duration::ZERO);
584 1 : assert_eq!(shards, 2);
585 1 : assert_eq!(limiter.initial_limit, 0);
586 1 : assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
587 :
588 1 : Ok(())
589 1 : }
590 :
591 : #[test]
592 1 : fn test_parse_json_lock_options() -> anyhow::Result<()> {
593 : let ConcurrencyLockOptions {
594 1 : epoch,
595 1 : limiter,
596 1 : shards,
597 1 : timeout,
598 1 : } = r#"{"shards":32,"initial_limit":44,"aimd":{"min":5,"max":500,"inc":10,"dec":0.9,"utilisation":0.8},"epoch":"10m","timeout":"1s"}"#
599 1 : .parse()?;
600 1 : assert_eq!(epoch, Duration::from_secs(10 * 60));
601 1 : assert_eq!(timeout, Duration::from_secs(1));
602 1 : assert_eq!(shards, 32);
603 1 : assert_eq!(limiter.initial_limit, 44);
604 1 : assert_eq!(
605 : limiter.algorithm,
606 : RateLimitAlgorithm::Aimd {
607 : conf: Aimd {
608 : min: 5,
609 : max: 500,
610 : dec: 0.9,
611 : inc: 10,
612 : utilisation: 0.8
613 : }
614 : },
615 : );
616 :
617 1 : Ok(())
618 1 : }
619 : }
|