Line data Source code
1 : //! Algorithms for controlling concurrency limits.
2 : use async_trait::async_trait;
3 : use std::time::Duration;
4 :
5 : use super::{limiter::Outcome, Aimd};
6 :
7 : /// An algorithm for controlling a concurrency limit.
8 : #[async_trait]
9 : pub trait LimitAlgorithm: Send + Sync + 'static {
10 : /// Update the concurrency limit in response to a new job completion.
11 : async fn update(&mut self, old_limit: usize, sample: Sample) -> usize;
12 : }
13 :
14 : /// The result of a job (or jobs), including the [Outcome] (loss) and latency (delay).
15 0 : #[derive(Debug, Clone, PartialEq, Eq)]
16 : pub struct Sample {
17 : pub(crate) latency: Duration,
18 : /// Jobs in flight when the sample was taken.
19 : pub(crate) in_flight: usize,
20 : pub(crate) outcome: Outcome,
21 : }
22 :
23 125 : #[derive(Clone, Copy, Debug, Default, clap::ValueEnum)]
24 : pub enum RateLimitAlgorithm {
25 : Fixed,
26 : #[default]
27 : Aimd,
28 : }
29 :
30 : pub struct Fixed;
31 :
32 : #[async_trait]
33 : impl LimitAlgorithm for Fixed {
34 10 : async fn update(&mut self, old_limit: usize, _sample: Sample) -> usize {
35 10 : old_limit
36 10 : }
37 : }
38 :
39 0 : #[derive(Clone, Copy, Debug)]
40 : pub struct RateLimiterConfig {
41 : pub disable: bool,
42 : pub algorithm: RateLimitAlgorithm,
43 : pub timeout: Duration,
44 : pub initial_limit: usize,
45 : pub aimd_config: Option<AimdConfig>,
46 : }
47 :
48 : impl RateLimiterConfig {
49 17 : pub fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
50 17 : match self.algorithm {
51 6 : RateLimitAlgorithm::Fixed => Box::new(Fixed),
52 11 : RateLimitAlgorithm::Aimd => Box::new(Aimd::new(self.aimd_config.unwrap())), // For aimd algorithm config is mandatory.
53 : }
54 17 : }
55 : }
56 :
57 : impl Default for RateLimiterConfig {
58 16 : fn default() -> Self {
59 16 : Self {
60 16 : disable: true,
61 16 : algorithm: RateLimitAlgorithm::Aimd,
62 16 : timeout: Duration::from_secs(1),
63 16 : initial_limit: 100,
64 16 : aimd_config: Some(AimdConfig::default()),
65 16 : }
66 16 : }
67 : }
68 :
69 25 : #[derive(clap::Parser, Clone, Copy, Debug)]
70 : pub struct AimdConfig {
71 : /// Minimum limit for AIMD algorithm. Makes sense only if `rate_limit_algorithm` is `Aimd`.
72 25 : #[clap(long, default_value_t = 1)]
73 0 : pub aimd_min_limit: usize,
74 : /// Maximum limit for AIMD algorithm. Makes sense only if `rate_limit_algorithm` is `Aimd`.
75 25 : #[clap(long, default_value_t = 1500)]
76 0 : pub aimd_max_limit: usize,
77 : /// Increase AIMD increase by value in case of success. Makes sense only if `rate_limit_algorithm` is `Aimd`.
78 25 : #[clap(long, default_value_t = 10)]
79 0 : pub aimd_increase_by: usize,
80 : /// Decrease AIMD decrease by value in case of timout/429. Makes sense only if `rate_limit_algorithm` is `Aimd`.
81 25 : #[clap(long, default_value_t = 0.9)]
82 0 : pub aimd_decrease_factor: f32,
83 : /// A threshold below which the limit won't be increased. Makes sense only if `rate_limit_algorithm` is `Aimd`.
84 25 : #[clap(long, default_value_t = 0.8)]
85 0 : pub aimd_min_utilisation_threshold: f32,
86 : }
87 :
88 : impl Default for AimdConfig {
89 24 : fn default() -> Self {
90 24 : Self {
91 24 : aimd_min_limit: 1,
92 24 : aimd_max_limit: 1500,
93 24 : aimd_increase_by: 10,
94 24 : aimd_decrease_factor: 0.9,
95 24 : aimd_min_utilisation_threshold: 0.8,
96 24 : }
97 24 : }
98 : }
|