Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use anyhow::{Context, Ok, bail, ensure};
6 : use clap::ValueEnum;
7 : use remote_storage::RemoteStorageConfig;
8 :
9 : use crate::auth::backend::AuthRateLimiter;
10 : use crate::auth::backend::jwt::JwkCache;
11 : use crate::control_plane::locks::ApiLocks;
12 : use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig};
13 : use crate::scram::threadpool::ThreadPool;
14 : use crate::serverless::GlobalConnPoolOptions;
15 : use crate::serverless::cancel_set::CancelSet;
16 : pub use crate::tls::server_config::{TlsConfig, configure_tls};
17 : use crate::types::Host;
18 :
19 : pub struct ProxyConfig {
20 : pub tls_config: Option<TlsConfig>,
21 : pub metric_collection: Option<MetricCollectionConfig>,
22 : pub http_config: HttpConfig,
23 : pub authentication_config: AuthenticationConfig,
24 : pub proxy_protocol_v2: ProxyProtocolV2,
25 : pub region: String,
26 : pub handshake_timeout: Duration,
27 : pub wake_compute_retry_config: RetryConfig,
28 : pub connect_compute_locks: ApiLocks<Host>,
29 : pub connect_to_compute: ComputeConfig,
30 : }
31 :
32 : pub struct ComputeConfig {
33 : pub retry: RetryConfig,
34 : pub tls: Arc<rustls::ClientConfig>,
35 : pub timeout: Duration,
36 : }
37 :
38 : #[derive(Copy, Clone, Debug, ValueEnum, PartialEq)]
39 : pub enum ProxyProtocolV2 {
40 : /// Connection will error if PROXY protocol v2 header is missing
41 : Required,
42 : /// Connection will parse PROXY protocol v2 header, but accept the connection if it's missing.
43 : Supported,
44 : /// Connection will error if PROXY protocol v2 header is provided
45 : Rejected,
46 : }
47 :
48 : #[derive(Debug)]
49 : pub struct MetricCollectionConfig {
50 : pub endpoint: reqwest::Url,
51 : pub interval: Duration,
52 : pub backup_metric_collection_config: MetricBackupCollectionConfig,
53 : }
54 :
55 : pub struct HttpConfig {
56 : pub accept_websockets: bool,
57 : pub pool_options: GlobalConnPoolOptions,
58 : pub cancel_set: CancelSet,
59 : pub client_conn_threshold: u64,
60 : pub max_request_size_bytes: usize,
61 : pub max_response_size_bytes: usize,
62 : }
63 :
64 : pub struct AuthenticationConfig {
65 : pub thread_pool: Arc<ThreadPool>,
66 : pub scram_protocol_timeout: tokio::time::Duration,
67 : pub rate_limiter_enabled: bool,
68 : pub rate_limiter: AuthRateLimiter,
69 : pub rate_limit_ip_subnet: u8,
70 : pub ip_allowlist_check_enabled: bool,
71 : pub is_vpc_acccess_proxy: bool,
72 : pub jwks_cache: JwkCache,
73 : pub is_auth_broker: bool,
74 : pub accept_jwts: bool,
75 : pub console_redirect_confirmation_timeout: tokio::time::Duration,
76 : }
77 :
78 : #[derive(Debug)]
79 : pub struct EndpointCacheConfig {
80 : /// Batch size to receive all endpoints on the startup.
81 : pub initial_batch_size: usize,
82 : /// Batch size to receive endpoints.
83 : pub default_batch_size: usize,
84 : /// Timeouts for the stream read operation.
85 : pub xread_timeout: Duration,
86 : /// Stream name to read from.
87 : pub stream_name: String,
88 : /// Limiter info (to distinguish when to enable cache).
89 : pub limiter_info: Vec<RateBucketInfo>,
90 : /// Disable cache.
91 : /// If true, cache is ignored, but reports all statistics.
92 : pub disable_cache: bool,
93 : /// Retry interval for the stream read operation.
94 : pub retry_interval: Duration,
95 : }
96 :
97 : impl EndpointCacheConfig {
98 : /// Default options for [`crate::control_plane::NodeInfoCache`].
99 : /// Notice that by default the limiter is empty, which means that cache is disabled.
100 : pub const CACHE_DEFAULT_OPTIONS: &'static str = "initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s,retry_interval=1s";
101 :
102 : /// Parse cache options passed via cmdline.
103 : /// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
104 0 : fn parse(options: &str) -> anyhow::Result<Self> {
105 0 : let mut initial_batch_size = None;
106 0 : let mut default_batch_size = None;
107 0 : let mut xread_timeout = None;
108 0 : let mut stream_name = None;
109 0 : let mut limiter_info = vec![];
110 0 : let mut disable_cache = false;
111 0 : let mut retry_interval = None;
112 :
113 0 : for option in options.split(',') {
114 0 : let (key, value) = option
115 0 : .split_once('=')
116 0 : .with_context(|| format!("bad key-value pair: {option}"))?;
117 :
118 0 : match key {
119 0 : "initial_batch_size" => initial_batch_size = Some(value.parse()?),
120 0 : "default_batch_size" => default_batch_size = Some(value.parse()?),
121 0 : "xread_timeout" => xread_timeout = Some(humantime::parse_duration(value)?),
122 0 : "stream_name" => stream_name = Some(value.to_string()),
123 0 : "limiter_info" => limiter_info.push(RateBucketInfo::from_str(value)?),
124 0 : "disable_cache" => disable_cache = value.parse()?,
125 0 : "retry_interval" => retry_interval = Some(humantime::parse_duration(value)?),
126 0 : unknown => bail!("unknown key: {unknown}"),
127 : }
128 : }
129 0 : RateBucketInfo::validate(&mut limiter_info)?;
130 :
131 : Ok(Self {
132 0 : initial_batch_size: initial_batch_size.context("missing `initial_batch_size`")?,
133 0 : default_batch_size: default_batch_size.context("missing `default_batch_size`")?,
134 0 : xread_timeout: xread_timeout.context("missing `xread_timeout`")?,
135 0 : stream_name: stream_name.context("missing `stream_name`")?,
136 0 : disable_cache,
137 0 : limiter_info,
138 0 : retry_interval: retry_interval.context("missing `retry_interval`")?,
139 : })
140 0 : }
141 : }
142 :
143 : impl FromStr for EndpointCacheConfig {
144 : type Err = anyhow::Error;
145 :
146 0 : fn from_str(options: &str) -> Result<Self, Self::Err> {
147 0 : let error = || format!("failed to parse endpoint cache options '{options}'");
148 0 : Self::parse(options).with_context(error)
149 0 : }
150 : }
151 : #[derive(Debug)]
152 : pub struct MetricBackupCollectionConfig {
153 : pub remote_storage_config: Option<RemoteStorageConfig>,
154 : pub chunk_size: usize,
155 : }
156 :
157 1 : pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<RemoteStorageConfig> {
158 1 : RemoteStorageConfig::from_toml(&s.parse()?)
159 1 : }
160 :
161 : /// Helper for cmdline cache options parsing.
162 : #[derive(Debug)]
163 : pub struct CacheOptions {
164 : /// Max number of entries.
165 : pub size: usize,
166 : /// Entry's time-to-live.
167 : pub ttl: Duration,
168 : }
169 :
170 : impl CacheOptions {
171 : /// Default options for [`crate::control_plane::NodeInfoCache`].
172 : pub const CACHE_DEFAULT_OPTIONS: &'static str = "size=4000,ttl=4m";
173 :
174 : /// Parse cache options passed via cmdline.
175 : /// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
176 4 : fn parse(options: &str) -> anyhow::Result<Self> {
177 4 : let mut size = None;
178 4 : let mut ttl = None;
179 :
180 7 : for option in options.split(',') {
181 7 : let (key, value) = option
182 7 : .split_once('=')
183 7 : .with_context(|| format!("bad key-value pair: {option}"))?;
184 :
185 7 : match key {
186 7 : "size" => size = Some(value.parse()?),
187 3 : "ttl" => ttl = Some(humantime::parse_duration(value)?),
188 0 : unknown => bail!("unknown key: {unknown}"),
189 : }
190 : }
191 :
192 : // TTL doesn't matter if cache is always empty.
193 4 : if let Some(0) = size {
194 2 : ttl.get_or_insert(Duration::default());
195 2 : }
196 :
197 : Ok(Self {
198 4 : size: size.context("missing `size`")?,
199 4 : ttl: ttl.context("missing `ttl`")?,
200 : })
201 4 : }
202 : }
203 :
204 : impl FromStr for CacheOptions {
205 : type Err = anyhow::Error;
206 :
207 4 : fn from_str(options: &str) -> Result<Self, Self::Err> {
208 4 : let error = || format!("failed to parse cache options '{options}'");
209 4 : Self::parse(options).with_context(error)
210 4 : }
211 : }
212 :
213 : /// Helper for cmdline cache options parsing.
214 : #[derive(Debug)]
215 : pub struct ProjectInfoCacheOptions {
216 : /// Max number of entries.
217 : pub size: usize,
218 : /// Entry's time-to-live.
219 : pub ttl: Duration,
220 : /// Max number of roles per endpoint.
221 : pub max_roles: usize,
222 : /// Gc interval.
223 : pub gc_interval: Duration,
224 : }
225 :
226 : impl ProjectInfoCacheOptions {
227 : /// Default options for [`crate::control_plane::NodeInfoCache`].
228 : pub const CACHE_DEFAULT_OPTIONS: &'static str =
229 : "size=10000,ttl=4m,max_roles=10,gc_interval=60m";
230 :
231 : /// Parse cache options passed via cmdline.
232 : /// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
233 0 : fn parse(options: &str) -> anyhow::Result<Self> {
234 0 : let mut size = None;
235 0 : let mut ttl = None;
236 0 : let mut max_roles = None;
237 0 : let mut gc_interval = None;
238 :
239 0 : for option in options.split(',') {
240 0 : let (key, value) = option
241 0 : .split_once('=')
242 0 : .with_context(|| format!("bad key-value pair: {option}"))?;
243 :
244 0 : match key {
245 0 : "size" => size = Some(value.parse()?),
246 0 : "ttl" => ttl = Some(humantime::parse_duration(value)?),
247 0 : "max_roles" => max_roles = Some(value.parse()?),
248 0 : "gc_interval" => gc_interval = Some(humantime::parse_duration(value)?),
249 0 : unknown => bail!("unknown key: {unknown}"),
250 : }
251 : }
252 :
253 : // TTL doesn't matter if cache is always empty.
254 0 : if let Some(0) = size {
255 0 : ttl.get_or_insert(Duration::default());
256 0 : }
257 :
258 : Ok(Self {
259 0 : size: size.context("missing `size`")?,
260 0 : ttl: ttl.context("missing `ttl`")?,
261 0 : max_roles: max_roles.context("missing `max_roles`")?,
262 0 : gc_interval: gc_interval.context("missing `gc_interval`")?,
263 : })
264 0 : }
265 : }
266 :
267 : impl FromStr for ProjectInfoCacheOptions {
268 : type Err = anyhow::Error;
269 :
270 0 : fn from_str(options: &str) -> Result<Self, Self::Err> {
271 0 : let error = || format!("failed to parse cache options '{options}'");
272 0 : Self::parse(options).with_context(error)
273 0 : }
274 : }
275 :
276 : /// This is a config for connect to compute and wake compute.
277 : #[derive(Clone, Copy, Debug)]
278 : pub struct RetryConfig {
279 : /// Number of times we should retry.
280 : pub max_retries: u32,
281 : /// Retry duration is base_delay * backoff_factor ^ n, where n starts at 0
282 : pub base_delay: tokio::time::Duration,
283 : /// Exponential base for retry wait duration
284 : pub backoff_factor: f64,
285 : }
286 :
287 : impl RetryConfig {
288 : // Default options for RetryConfig.
289 :
290 : /// Total delay for 5 retries with 200ms base delay and 2 backoff factor is about 6s.
291 : pub const CONNECT_TO_COMPUTE_DEFAULT_VALUES: &'static str =
292 : "num_retries=5,base_retry_wait_duration=200ms,retry_wait_exponent_base=2";
293 : /// Total delay for 8 retries with 100ms base delay and 1.6 backoff factor is about 7s.
294 : /// Cplane has timeout of 60s on each request. 8m7s in total.
295 : pub const WAKE_COMPUTE_DEFAULT_VALUES: &'static str =
296 : "num_retries=8,base_retry_wait_duration=100ms,retry_wait_exponent_base=1.6";
297 :
298 : /// Parse retry options passed via cmdline.
299 : /// Example: [`Self::CONNECT_TO_COMPUTE_DEFAULT_VALUES`].
300 0 : pub fn parse(options: &str) -> anyhow::Result<Self> {
301 0 : let mut num_retries = None;
302 0 : let mut base_retry_wait_duration = None;
303 0 : let mut retry_wait_exponent_base = None;
304 :
305 0 : for option in options.split(',') {
306 0 : let (key, value) = option
307 0 : .split_once('=')
308 0 : .with_context(|| format!("bad key-value pair: {option}"))?;
309 :
310 0 : match key {
311 0 : "num_retries" => num_retries = Some(value.parse()?),
312 0 : "base_retry_wait_duration" => {
313 0 : base_retry_wait_duration = Some(humantime::parse_duration(value)?);
314 : }
315 0 : "retry_wait_exponent_base" => retry_wait_exponent_base = Some(value.parse()?),
316 0 : unknown => bail!("unknown key: {unknown}"),
317 : }
318 : }
319 :
320 : Ok(Self {
321 0 : max_retries: num_retries.context("missing `num_retries`")?,
322 0 : base_delay: base_retry_wait_duration.context("missing `base_retry_wait_duration`")?,
323 0 : backoff_factor: retry_wait_exponent_base
324 0 : .context("missing `retry_wait_exponent_base`")?,
325 : })
326 0 : }
327 : }
328 :
329 : /// Helper for cmdline cache options parsing.
330 5 : #[derive(serde::Deserialize)]
331 : pub struct ConcurrencyLockOptions {
332 : /// The number of shards the lock map should have
333 : pub shards: usize,
334 : /// The number of allowed concurrent requests for each endpoitn
335 : #[serde(flatten)]
336 : pub limiter: RateLimiterConfig,
337 : /// Garbage collection epoch
338 : #[serde(deserialize_with = "humantime_serde::deserialize")]
339 : pub epoch: Duration,
340 : /// Lock timeout
341 : #[serde(deserialize_with = "humantime_serde::deserialize")]
342 : pub timeout: Duration,
343 : }
344 :
345 : impl ConcurrencyLockOptions {
346 : /// Default options for [`crate::control_plane::client::ApiLocks`].
347 : pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
348 : /// Default options for [`crate::control_plane::client::ApiLocks`].
349 : pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str =
350 : "shards=64,permits=100,epoch=10m,timeout=10ms";
351 :
352 : // pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
353 :
354 : /// Parse lock options passed via cmdline.
355 : /// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
356 4 : fn parse(options: &str) -> anyhow::Result<Self> {
357 4 : let options = options.trim();
358 4 : if options.starts_with('{') && options.ends_with('}') {
359 1 : return Ok(serde_json::from_str(options)?);
360 3 : }
361 3 :
362 3 : let mut shards = None;
363 3 : let mut permits = None;
364 3 : let mut epoch = None;
365 3 : let mut timeout = None;
366 :
367 9 : for option in options.split(',') {
368 9 : let (key, value) = option
369 9 : .split_once('=')
370 9 : .with_context(|| format!("bad key-value pair: {option}"))?;
371 :
372 9 : match key {
373 9 : "shards" => shards = Some(value.parse()?),
374 7 : "permits" => permits = Some(value.parse()?),
375 4 : "epoch" => epoch = Some(humantime::parse_duration(value)?),
376 2 : "timeout" => timeout = Some(humantime::parse_duration(value)?),
377 0 : unknown => bail!("unknown key: {unknown}"),
378 : }
379 : }
380 :
381 : // these dont matter if lock is disabled
382 3 : if let Some(0) = permits {
383 1 : timeout = Some(Duration::default());
384 1 : epoch = Some(Duration::default());
385 1 : shards = Some(2);
386 2 : }
387 :
388 3 : let permits = permits.context("missing `permits`")?;
389 3 : let out = Self {
390 3 : shards: shards.context("missing `shards`")?,
391 3 : limiter: RateLimiterConfig {
392 3 : algorithm: RateLimitAlgorithm::Fixed,
393 3 : initial_limit: permits,
394 3 : },
395 3 : epoch: epoch.context("missing `epoch`")?,
396 3 : timeout: timeout.context("missing `timeout`")?,
397 : };
398 :
399 3 : ensure!(out.shards > 1, "shard count must be > 1");
400 3 : ensure!(
401 3 : out.shards.is_power_of_two(),
402 0 : "shard count must be a power of two"
403 : );
404 :
405 3 : Ok(out)
406 4 : }
407 : }
408 :
409 : impl FromStr for ConcurrencyLockOptions {
410 : type Err = anyhow::Error;
411 :
412 4 : fn from_str(options: &str) -> Result<Self, Self::Err> {
413 4 : let error = || format!("failed to parse cache lock options '{options}'");
414 4 : Self::parse(options).with_context(error)
415 4 : }
416 : }
417 :
418 : #[cfg(test)]
419 : mod tests {
420 : use super::*;
421 : use crate::rate_limiter::Aimd;
422 :
423 : #[test]
424 1 : fn test_parse_cache_options() -> anyhow::Result<()> {
425 1 : let CacheOptions { size, ttl } = "size=4096,ttl=5min".parse()?;
426 1 : assert_eq!(size, 4096);
427 1 : assert_eq!(ttl, Duration::from_secs(5 * 60));
428 :
429 1 : let CacheOptions { size, ttl } = "ttl=4m,size=2".parse()?;
430 1 : assert_eq!(size, 2);
431 1 : assert_eq!(ttl, Duration::from_secs(4 * 60));
432 :
433 1 : let CacheOptions { size, ttl } = "size=0,ttl=1s".parse()?;
434 1 : assert_eq!(size, 0);
435 1 : assert_eq!(ttl, Duration::from_secs(1));
436 :
437 1 : let CacheOptions { size, ttl } = "size=0".parse()?;
438 1 : assert_eq!(size, 0);
439 1 : assert_eq!(ttl, Duration::default());
440 :
441 1 : Ok(())
442 1 : }
443 :
444 : #[test]
445 1 : fn test_parse_lock_options() -> anyhow::Result<()> {
446 : let ConcurrencyLockOptions {
447 1 : epoch,
448 1 : limiter,
449 1 : shards,
450 1 : timeout,
451 1 : } = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?;
452 1 : assert_eq!(epoch, Duration::from_secs(10 * 60));
453 1 : assert_eq!(timeout, Duration::from_secs(1));
454 1 : assert_eq!(shards, 32);
455 1 : assert_eq!(limiter.initial_limit, 4);
456 1 : assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
457 :
458 : let ConcurrencyLockOptions {
459 1 : epoch,
460 1 : limiter,
461 1 : shards,
462 1 : timeout,
463 1 : } = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?;
464 1 : assert_eq!(epoch, Duration::from_secs(60));
465 1 : assert_eq!(timeout, Duration::from_millis(100));
466 1 : assert_eq!(shards, 16);
467 1 : assert_eq!(limiter.initial_limit, 8);
468 1 : assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
469 :
470 : let ConcurrencyLockOptions {
471 1 : epoch,
472 1 : limiter,
473 1 : shards,
474 1 : timeout,
475 1 : } = "permits=0".parse()?;
476 1 : assert_eq!(epoch, Duration::ZERO);
477 1 : assert_eq!(timeout, Duration::ZERO);
478 1 : assert_eq!(shards, 2);
479 1 : assert_eq!(limiter.initial_limit, 0);
480 1 : assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
481 :
482 1 : Ok(())
483 1 : }
484 :
485 : #[test]
486 1 : fn test_parse_json_lock_options() -> anyhow::Result<()> {
487 : let ConcurrencyLockOptions {
488 1 : epoch,
489 1 : limiter,
490 1 : shards,
491 1 : timeout,
492 1 : } = r#"{"shards":32,"initial_limit":44,"aimd":{"min":5,"max":500,"inc":10,"dec":0.9,"utilisation":0.8},"epoch":"10m","timeout":"1s"}"#
493 1 : .parse()?;
494 1 : assert_eq!(epoch, Duration::from_secs(10 * 60));
495 1 : assert_eq!(timeout, Duration::from_secs(1));
496 1 : assert_eq!(shards, 32);
497 1 : assert_eq!(limiter.initial_limit, 44);
498 1 : assert_eq!(
499 1 : limiter.algorithm,
500 1 : RateLimitAlgorithm::Aimd {
501 1 : conf: Aimd {
502 1 : min: 5,
503 1 : max: 500,
504 1 : dec: 0.9,
505 1 : inc: 10,
506 1 : utilisation: 0.8
507 1 : }
508 1 : },
509 1 : );
510 :
511 1 : Ok(())
512 1 : }
513 : }
|