Line data Source code
1 : use std::fmt::Display;
2 : use std::time::{Duration, Instant};
3 :
4 : use metrics::IntCounter;
5 :
6 : /// Circuit breakers are for operations that are expensive and fallible.
7 : ///
8 : /// If a circuit breaker fails repeatedly, we will stop attempting it for some
9 : /// period of time, to avoid denial-of-service from retries, and
10 : /// to mitigate the log spam from repeated failures.
11 : pub struct CircuitBreaker {
12 : /// An identifier that enables us to log useful errors when a circuit is broken
13 : name: String,
14 :
15 : /// Consecutive failures since last success
16 : fail_count: usize,
17 :
18 : /// How many consecutive failures before we break the circuit
19 : fail_threshold: usize,
20 :
21 : /// If circuit is broken, when was it broken?
22 : broken_at: Option<Instant>,
23 :
24 : /// If set, we will auto-reset the circuit this long after it was broken. If None, broken
25 : /// circuits stay broken forever, or until success() is called.
26 : reset_period: Option<Duration>,
27 :
28 : /// If this is true, no actual circuit-breaking happens. This is for overriding a circuit breaker
29 : /// to permit something to keep running even if it would otherwise have tripped it.
30 : short_circuit: bool,
31 : }
32 :
33 : impl CircuitBreaker {
34 452 : pub fn new(name: String, fail_threshold: usize, reset_period: Option<Duration>) -> Self {
35 452 : Self {
36 452 : name,
37 452 : fail_count: 0,
38 452 : fail_threshold,
39 452 : broken_at: None,
40 452 : reset_period,
41 452 : short_circuit: false,
42 452 : }
43 452 : }
44 :
45 : /// Construct an unbreakable circuit breaker, for use in unit tests etc.
46 0 : pub fn short_circuit() -> Self {
47 0 : Self {
48 0 : name: String::new(),
49 0 : fail_threshold: 0,
50 0 : fail_count: 0,
51 0 : broken_at: None,
52 0 : reset_period: None,
53 0 : short_circuit: true,
54 0 : }
55 0 : }
56 :
57 0 : pub fn fail<E>(&mut self, metric: &IntCounter, error: E)
58 0 : where
59 0 : E: Display,
60 0 : {
61 0 : if self.short_circuit {
62 0 : return;
63 0 : }
64 0 :
65 0 : self.fail_count += 1;
66 0 : if self.broken_at.is_none() && self.fail_count >= self.fail_threshold {
67 0 : self.break_circuit(metric, error);
68 0 : }
69 0 : }
70 :
71 : /// Call this after successfully executing an operation
72 0 : pub fn success(&mut self, metric: &IntCounter) {
73 0 : self.fail_count = 0;
74 0 : if let Some(broken_at) = &self.broken_at {
75 0 : tracing::info!(breaker=%self.name, "Circuit breaker failure ended (was broken for {})",
76 0 : humantime::format_duration(broken_at.elapsed()));
77 0 : self.broken_at = None;
78 0 : metric.inc();
79 0 : }
80 0 : }
81 :
82 : /// Call this before attempting an operation, and skip the operation if we are currently broken.
83 0 : pub fn is_broken(&mut self) -> bool {
84 0 : if self.short_circuit {
85 0 : return false;
86 0 : }
87 :
88 0 : if let Some(broken_at) = self.broken_at {
89 0 : match self.reset_period {
90 0 : Some(reset_period) if broken_at.elapsed() > reset_period => {
91 0 : self.reset_circuit();
92 0 : false
93 : }
94 0 : _ => true,
95 : }
96 : } else {
97 0 : false
98 : }
99 0 : }
100 :
101 0 : fn break_circuit<E>(&mut self, metric: &IntCounter, error: E)
102 0 : where
103 0 : E: Display,
104 0 : {
105 0 : self.broken_at = Some(Instant::now());
106 0 : tracing::error!(breaker=%self.name, "Circuit breaker broken! Last error: {error}");
107 0 : metric.inc();
108 0 : }
109 :
110 0 : fn reset_circuit(&mut self) {
111 0 : self.broken_at = None;
112 0 : self.fail_count = 0;
113 0 : }
114 : }
|