Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use anyhow::{bail, ensure, Context, Ok};
6 : use clap::ValueEnum;
7 : use remote_storage::RemoteStorageConfig;
8 :
9 : use crate::auth::backend::jwt::JwkCache;
10 : use crate::auth::backend::AuthRateLimiter;
11 : use crate::control_plane::locks::ApiLocks;
12 : use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig};
13 : use crate::scram::threadpool::ThreadPool;
14 : use crate::serverless::cancel_set::CancelSet;
15 : use crate::serverless::GlobalConnPoolOptions;
16 : pub use crate::tls::server_config::{configure_tls, TlsConfig};
17 : use crate::types::Host;
18 :
19 : pub struct ProxyConfig {
20 : pub tls_config: Option<TlsConfig>,
21 : pub metric_collection: Option<MetricCollectionConfig>,
22 : pub http_config: HttpConfig,
23 : pub authentication_config: AuthenticationConfig,
24 : pub proxy_protocol_v2: ProxyProtocolV2,
25 : pub region: String,
26 : pub handshake_timeout: Duration,
27 : pub wake_compute_retry_config: RetryConfig,
28 : pub connect_compute_locks: ApiLocks<Host>,
29 : pub connect_to_compute: ComputeConfig,
30 : }
31 :
32 : pub struct ComputeConfig {
33 : pub retry: RetryConfig,
34 : pub tls: Arc<rustls::ClientConfig>,
35 : pub timeout: Duration,
36 : }
37 :
38 : #[derive(Copy, Clone, Debug, ValueEnum, PartialEq)]
39 : pub enum ProxyProtocolV2 {
40 : /// Connection will error if PROXY protocol v2 header is missing
41 : Required,
42 : /// Connection will parse PROXY protocol v2 header, but accept the connection if it's missing.
43 : Supported,
44 : /// Connection will error if PROXY protocol v2 header is provided
45 : Rejected,
46 : }
47 :
48 : #[derive(Debug)]
49 : pub struct MetricCollectionConfig {
50 : pub endpoint: reqwest::Url,
51 : pub interval: Duration,
52 : pub backup_metric_collection_config: MetricBackupCollectionConfig,
53 : }
54 :
55 : pub struct HttpConfig {
56 : pub accept_websockets: bool,
57 : pub pool_options: GlobalConnPoolOptions,
58 : pub cancel_set: CancelSet,
59 : pub client_conn_threshold: u64,
60 : pub max_request_size_bytes: usize,
61 : pub max_response_size_bytes: usize,
62 : }
63 :
64 : pub struct AuthenticationConfig {
65 : pub thread_pool: Arc<ThreadPool>,
66 : pub scram_protocol_timeout: tokio::time::Duration,
67 : pub rate_limiter_enabled: bool,
68 : pub rate_limiter: AuthRateLimiter,
69 : pub rate_limit_ip_subnet: u8,
70 : pub ip_allowlist_check_enabled: bool,
71 : pub is_vpc_acccess_proxy: bool,
72 : pub jwks_cache: JwkCache,
73 : pub is_auth_broker: bool,
74 : pub accept_jwts: bool,
75 : pub console_redirect_confirmation_timeout: tokio::time::Duration,
76 : }
77 :
78 : #[derive(Debug)]
79 : pub struct EndpointCacheConfig {
80 : /// Batch size to receive all endpoints on the startup.
81 : pub initial_batch_size: usize,
82 : /// Batch size to receive endpoints.
83 : pub default_batch_size: usize,
84 : /// Timeouts for the stream read operation.
85 : pub xread_timeout: Duration,
86 : /// Stream name to read from.
87 : pub stream_name: String,
88 : /// Limiter info (to distinguish when to enable cache).
89 : pub limiter_info: Vec<RateBucketInfo>,
90 : /// Disable cache.
91 : /// If true, cache is ignored, but reports all statistics.
92 : pub disable_cache: bool,
93 : /// Retry interval for the stream read operation.
94 : pub retry_interval: Duration,
95 : }
96 :
97 : impl EndpointCacheConfig {
98 : /// Default options for [`crate::control_plane::NodeInfoCache`].
99 : /// Notice that by default the limiter is empty, which means that cache is disabled.
100 : pub const CACHE_DEFAULT_OPTIONS: &'static str =
101 : "initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s,retry_interval=1s";
102 :
103 : /// Parse cache options passed via cmdline.
104 : /// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
105 0 : fn parse(options: &str) -> anyhow::Result<Self> {
106 0 : let mut initial_batch_size = None;
107 0 : let mut default_batch_size = None;
108 0 : let mut xread_timeout = None;
109 0 : let mut stream_name = None;
110 0 : let mut limiter_info = vec![];
111 0 : let mut disable_cache = false;
112 0 : let mut retry_interval = None;
113 :
114 0 : for option in options.split(',') {
115 0 : let (key, value) = option
116 0 : .split_once('=')
117 0 : .with_context(|| format!("bad key-value pair: {option}"))?;
118 :
119 0 : match key {
120 0 : "initial_batch_size" => initial_batch_size = Some(value.parse()?),
121 0 : "default_batch_size" => default_batch_size = Some(value.parse()?),
122 0 : "xread_timeout" => xread_timeout = Some(humantime::parse_duration(value)?),
123 0 : "stream_name" => stream_name = Some(value.to_string()),
124 0 : "limiter_info" => limiter_info.push(RateBucketInfo::from_str(value)?),
125 0 : "disable_cache" => disable_cache = value.parse()?,
126 0 : "retry_interval" => retry_interval = Some(humantime::parse_duration(value)?),
127 0 : unknown => bail!("unknown key: {unknown}"),
128 : }
129 : }
130 0 : RateBucketInfo::validate(&mut limiter_info)?;
131 :
132 : Ok(Self {
133 0 : initial_batch_size: initial_batch_size.context("missing `initial_batch_size`")?,
134 0 : default_batch_size: default_batch_size.context("missing `default_batch_size`")?,
135 0 : xread_timeout: xread_timeout.context("missing `xread_timeout`")?,
136 0 : stream_name: stream_name.context("missing `stream_name`")?,
137 0 : disable_cache,
138 0 : limiter_info,
139 0 : retry_interval: retry_interval.context("missing `retry_interval`")?,
140 : })
141 0 : }
142 : }
143 :
144 : impl FromStr for EndpointCacheConfig {
145 : type Err = anyhow::Error;
146 :
147 0 : fn from_str(options: &str) -> Result<Self, Self::Err> {
148 0 : let error = || format!("failed to parse endpoint cache options '{options}'");
149 0 : Self::parse(options).with_context(error)
150 0 : }
151 : }
152 : #[derive(Debug)]
153 : pub struct MetricBackupCollectionConfig {
154 : pub remote_storage_config: Option<RemoteStorageConfig>,
155 : pub chunk_size: usize,
156 : }
157 :
158 1 : pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<RemoteStorageConfig> {
159 1 : RemoteStorageConfig::from_toml(&s.parse()?)
160 1 : }
161 :
162 : /// Helper for cmdline cache options parsing.
163 : #[derive(Debug)]
164 : pub struct CacheOptions {
165 : /// Max number of entries.
166 : pub size: usize,
167 : /// Entry's time-to-live.
168 : pub ttl: Duration,
169 : }
170 :
171 : impl CacheOptions {
172 : /// Default options for [`crate::control_plane::NodeInfoCache`].
173 : pub const CACHE_DEFAULT_OPTIONS: &'static str = "size=4000,ttl=4m";
174 :
175 : /// Parse cache options passed via cmdline.
176 : /// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
177 4 : fn parse(options: &str) -> anyhow::Result<Self> {
178 4 : let mut size = None;
179 4 : let mut ttl = None;
180 :
181 7 : for option in options.split(',') {
182 7 : let (key, value) = option
183 7 : .split_once('=')
184 7 : .with_context(|| format!("bad key-value pair: {option}"))?;
185 :
186 7 : match key {
187 7 : "size" => size = Some(value.parse()?),
188 3 : "ttl" => ttl = Some(humantime::parse_duration(value)?),
189 0 : unknown => bail!("unknown key: {unknown}"),
190 : }
191 : }
192 :
193 : // TTL doesn't matter if cache is always empty.
194 4 : if let Some(0) = size {
195 2 : ttl.get_or_insert(Duration::default());
196 2 : }
197 :
198 : Ok(Self {
199 4 : size: size.context("missing `size`")?,
200 4 : ttl: ttl.context("missing `ttl`")?,
201 : })
202 4 : }
203 : }
204 :
205 : impl FromStr for CacheOptions {
206 : type Err = anyhow::Error;
207 :
208 4 : fn from_str(options: &str) -> Result<Self, Self::Err> {
209 4 : let error = || format!("failed to parse cache options '{options}'");
210 4 : Self::parse(options).with_context(error)
211 4 : }
212 : }
213 :
214 : /// Helper for cmdline cache options parsing.
215 : #[derive(Debug)]
216 : pub struct ProjectInfoCacheOptions {
217 : /// Max number of entries.
218 : pub size: usize,
219 : /// Entry's time-to-live.
220 : pub ttl: Duration,
221 : /// Max number of roles per endpoint.
222 : pub max_roles: usize,
223 : /// Gc interval.
224 : pub gc_interval: Duration,
225 : }
226 :
227 : impl ProjectInfoCacheOptions {
228 : /// Default options for [`crate::control_plane::NodeInfoCache`].
229 : pub const CACHE_DEFAULT_OPTIONS: &'static str =
230 : "size=10000,ttl=4m,max_roles=10,gc_interval=60m";
231 :
232 : /// Parse cache options passed via cmdline.
233 : /// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
234 0 : fn parse(options: &str) -> anyhow::Result<Self> {
235 0 : let mut size = None;
236 0 : let mut ttl = None;
237 0 : let mut max_roles = None;
238 0 : let mut gc_interval = None;
239 :
240 0 : for option in options.split(',') {
241 0 : let (key, value) = option
242 0 : .split_once('=')
243 0 : .with_context(|| format!("bad key-value pair: {option}"))?;
244 :
245 0 : match key {
246 0 : "size" => size = Some(value.parse()?),
247 0 : "ttl" => ttl = Some(humantime::parse_duration(value)?),
248 0 : "max_roles" => max_roles = Some(value.parse()?),
249 0 : "gc_interval" => gc_interval = Some(humantime::parse_duration(value)?),
250 0 : unknown => bail!("unknown key: {unknown}"),
251 : }
252 : }
253 :
254 : // TTL doesn't matter if cache is always empty.
255 0 : if let Some(0) = size {
256 0 : ttl.get_or_insert(Duration::default());
257 0 : }
258 :
259 : Ok(Self {
260 0 : size: size.context("missing `size`")?,
261 0 : ttl: ttl.context("missing `ttl`")?,
262 0 : max_roles: max_roles.context("missing `max_roles`")?,
263 0 : gc_interval: gc_interval.context("missing `gc_interval`")?,
264 : })
265 0 : }
266 : }
267 :
268 : impl FromStr for ProjectInfoCacheOptions {
269 : type Err = anyhow::Error;
270 :
271 0 : fn from_str(options: &str) -> Result<Self, Self::Err> {
272 0 : let error = || format!("failed to parse cache options '{options}'");
273 0 : Self::parse(options).with_context(error)
274 0 : }
275 : }
276 :
277 : /// This is a config for connect to compute and wake compute.
278 : #[derive(Clone, Copy, Debug)]
279 : pub struct RetryConfig {
280 : /// Number of times we should retry.
281 : pub max_retries: u32,
282 : /// Retry duration is base_delay * backoff_factor ^ n, where n starts at 0
283 : pub base_delay: tokio::time::Duration,
284 : /// Exponential base for retry wait duration
285 : pub backoff_factor: f64,
286 : }
287 :
288 : impl RetryConfig {
289 : // Default options for RetryConfig.
290 :
291 : /// Total delay for 5 retries with 200ms base delay and 2 backoff factor is about 6s.
292 : pub const CONNECT_TO_COMPUTE_DEFAULT_VALUES: &'static str =
293 : "num_retries=5,base_retry_wait_duration=200ms,retry_wait_exponent_base=2";
294 : /// Total delay for 8 retries with 100ms base delay and 1.6 backoff factor is about 7s.
295 : /// Cplane has timeout of 60s on each request. 8m7s in total.
296 : pub const WAKE_COMPUTE_DEFAULT_VALUES: &'static str =
297 : "num_retries=8,base_retry_wait_duration=100ms,retry_wait_exponent_base=1.6";
298 :
299 : /// Parse retry options passed via cmdline.
300 : /// Example: [`Self::CONNECT_TO_COMPUTE_DEFAULT_VALUES`].
301 0 : pub fn parse(options: &str) -> anyhow::Result<Self> {
302 0 : let mut num_retries = None;
303 0 : let mut base_retry_wait_duration = None;
304 0 : let mut retry_wait_exponent_base = None;
305 :
306 0 : for option in options.split(',') {
307 0 : let (key, value) = option
308 0 : .split_once('=')
309 0 : .with_context(|| format!("bad key-value pair: {option}"))?;
310 :
311 0 : match key {
312 0 : "num_retries" => num_retries = Some(value.parse()?),
313 0 : "base_retry_wait_duration" => {
314 0 : base_retry_wait_duration = Some(humantime::parse_duration(value)?);
315 : }
316 0 : "retry_wait_exponent_base" => retry_wait_exponent_base = Some(value.parse()?),
317 0 : unknown => bail!("unknown key: {unknown}"),
318 : }
319 : }
320 :
321 : Ok(Self {
322 0 : max_retries: num_retries.context("missing `num_retries`")?,
323 0 : base_delay: base_retry_wait_duration.context("missing `base_retry_wait_duration`")?,
324 0 : backoff_factor: retry_wait_exponent_base
325 0 : .context("missing `retry_wait_exponent_base`")?,
326 : })
327 0 : }
328 : }
329 :
330 : /// Helper for cmdline cache options parsing.
331 5 : #[derive(serde::Deserialize)]
332 : pub struct ConcurrencyLockOptions {
333 : /// The number of shards the lock map should have
334 : pub shards: usize,
335 : /// The number of allowed concurrent requests for each endpoitn
336 : #[serde(flatten)]
337 : pub limiter: RateLimiterConfig,
338 : /// Garbage collection epoch
339 : #[serde(deserialize_with = "humantime_serde::deserialize")]
340 : pub epoch: Duration,
341 : /// Lock timeout
342 : #[serde(deserialize_with = "humantime_serde::deserialize")]
343 : pub timeout: Duration,
344 : }
345 :
346 : impl ConcurrencyLockOptions {
347 : /// Default options for [`crate::control_plane::client::ApiLocks`].
348 : pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
349 : /// Default options for [`crate::control_plane::client::ApiLocks`].
350 : pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str =
351 : "shards=64,permits=100,epoch=10m,timeout=10ms";
352 :
353 : // pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
354 :
355 : /// Parse lock options passed via cmdline.
356 : /// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
357 4 : fn parse(options: &str) -> anyhow::Result<Self> {
358 4 : let options = options.trim();
359 4 : if options.starts_with('{') && options.ends_with('}') {
360 1 : return Ok(serde_json::from_str(options)?);
361 3 : }
362 3 :
363 3 : let mut shards = None;
364 3 : let mut permits = None;
365 3 : let mut epoch = None;
366 3 : let mut timeout = None;
367 :
368 9 : for option in options.split(',') {
369 9 : let (key, value) = option
370 9 : .split_once('=')
371 9 : .with_context(|| format!("bad key-value pair: {option}"))?;
372 :
373 9 : match key {
374 9 : "shards" => shards = Some(value.parse()?),
375 7 : "permits" => permits = Some(value.parse()?),
376 4 : "epoch" => epoch = Some(humantime::parse_duration(value)?),
377 2 : "timeout" => timeout = Some(humantime::parse_duration(value)?),
378 0 : unknown => bail!("unknown key: {unknown}"),
379 : }
380 : }
381 :
382 : // these dont matter if lock is disabled
383 3 : if let Some(0) = permits {
384 1 : timeout = Some(Duration::default());
385 1 : epoch = Some(Duration::default());
386 1 : shards = Some(2);
387 2 : }
388 :
389 3 : let permits = permits.context("missing `permits`")?;
390 3 : let out = Self {
391 3 : shards: shards.context("missing `shards`")?,
392 3 : limiter: RateLimiterConfig {
393 3 : algorithm: RateLimitAlgorithm::Fixed,
394 3 : initial_limit: permits,
395 3 : },
396 3 : epoch: epoch.context("missing `epoch`")?,
397 3 : timeout: timeout.context("missing `timeout`")?,
398 : };
399 :
400 3 : ensure!(out.shards > 1, "shard count must be > 1");
401 3 : ensure!(
402 3 : out.shards.is_power_of_two(),
403 0 : "shard count must be a power of two"
404 : );
405 :
406 3 : Ok(out)
407 4 : }
408 : }
409 :
410 : impl FromStr for ConcurrencyLockOptions {
411 : type Err = anyhow::Error;
412 :
413 4 : fn from_str(options: &str) -> Result<Self, Self::Err> {
414 4 : let error = || format!("failed to parse cache lock options '{options}'");
415 4 : Self::parse(options).with_context(error)
416 4 : }
417 : }
418 :
419 : #[cfg(test)]
420 : mod tests {
421 : use super::*;
422 : use crate::rate_limiter::Aimd;
423 :
424 : #[test]
425 1 : fn test_parse_cache_options() -> anyhow::Result<()> {
426 1 : let CacheOptions { size, ttl } = "size=4096,ttl=5min".parse()?;
427 1 : assert_eq!(size, 4096);
428 1 : assert_eq!(ttl, Duration::from_secs(5 * 60));
429 :
430 1 : let CacheOptions { size, ttl } = "ttl=4m,size=2".parse()?;
431 1 : assert_eq!(size, 2);
432 1 : assert_eq!(ttl, Duration::from_secs(4 * 60));
433 :
434 1 : let CacheOptions { size, ttl } = "size=0,ttl=1s".parse()?;
435 1 : assert_eq!(size, 0);
436 1 : assert_eq!(ttl, Duration::from_secs(1));
437 :
438 1 : let CacheOptions { size, ttl } = "size=0".parse()?;
439 1 : assert_eq!(size, 0);
440 1 : assert_eq!(ttl, Duration::default());
441 :
442 1 : Ok(())
443 1 : }
444 :
445 : #[test]
446 1 : fn test_parse_lock_options() -> anyhow::Result<()> {
447 : let ConcurrencyLockOptions {
448 1 : epoch,
449 1 : limiter,
450 1 : shards,
451 1 : timeout,
452 1 : } = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?;
453 1 : assert_eq!(epoch, Duration::from_secs(10 * 60));
454 1 : assert_eq!(timeout, Duration::from_secs(1));
455 1 : assert_eq!(shards, 32);
456 1 : assert_eq!(limiter.initial_limit, 4);
457 1 : assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
458 :
459 : let ConcurrencyLockOptions {
460 1 : epoch,
461 1 : limiter,
462 1 : shards,
463 1 : timeout,
464 1 : } = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?;
465 1 : assert_eq!(epoch, Duration::from_secs(60));
466 1 : assert_eq!(timeout, Duration::from_millis(100));
467 1 : assert_eq!(shards, 16);
468 1 : assert_eq!(limiter.initial_limit, 8);
469 1 : assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
470 :
471 : let ConcurrencyLockOptions {
472 1 : epoch,
473 1 : limiter,
474 1 : shards,
475 1 : timeout,
476 1 : } = "permits=0".parse()?;
477 1 : assert_eq!(epoch, Duration::ZERO);
478 1 : assert_eq!(timeout, Duration::ZERO);
479 1 : assert_eq!(shards, 2);
480 1 : assert_eq!(limiter.initial_limit, 0);
481 1 : assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
482 :
483 1 : Ok(())
484 1 : }
485 :
486 : #[test]
487 1 : fn test_parse_json_lock_options() -> anyhow::Result<()> {
488 : let ConcurrencyLockOptions {
489 1 : epoch,
490 1 : limiter,
491 1 : shards,
492 1 : timeout,
493 1 : } = r#"{"shards":32,"initial_limit":44,"aimd":{"min":5,"max":500,"inc":10,"dec":0.9,"utilisation":0.8},"epoch":"10m","timeout":"1s"}"#
494 1 : .parse()?;
495 1 : assert_eq!(epoch, Duration::from_secs(10 * 60));
496 1 : assert_eq!(timeout, Duration::from_secs(1));
497 1 : assert_eq!(shards, 32);
498 1 : assert_eq!(limiter.initial_limit, 44);
499 1 : assert_eq!(
500 1 : limiter.algorithm,
501 1 : RateLimitAlgorithm::Aimd {
502 1 : conf: Aimd {
503 1 : min: 5,
504 1 : max: 500,
505 1 : dec: 0.9,
506 1 : inc: 10,
507 1 : utilisation: 0.8
508 1 : }
509 1 : },
510 1 : );
511 :
512 1 : Ok(())
513 1 : }
514 : }
|