Line data Source code
1 : use std::{
2 : fmt::Display,
3 : time::{Duration, Instant},
4 : };
5 :
6 : use metrics::IntCounter;
7 :
8 : /// Circuit breakers are for operations that are expensive and fallible.
9 : ///
10 : /// If a circuit breaker fails repeatedly, we will stop attempting it for some
11 : /// period of time, to avoid denial-of-service from retries, and
12 : /// to mitigate the log spam from repeated failures.
13 : pub struct CircuitBreaker {
14 : /// An identifier that enables us to log useful errors when a circuit is broken
15 : name: String,
16 :
17 : /// Consecutive failures since last success
18 : fail_count: usize,
19 :
20 : /// How many consecutive failures before we break the circuit
21 : fail_threshold: usize,
22 :
23 : /// If circuit is broken, when was it broken?
24 : broken_at: Option<Instant>,
25 :
26 : /// If set, we will auto-reset the circuit this long after it was broken. If None, broken
27 : /// circuits stay broken forever, or until success() is called.
28 : reset_period: Option<Duration>,
29 :
30 : /// If this is true, no actual circuit-breaking happens. This is for overriding a circuit breaker
31 : /// to permit something to keep running even if it would otherwise have tripped it.
32 : short_circuit: bool,
33 : }
34 :
35 : impl CircuitBreaker {
36 192 : pub fn new(name: String, fail_threshold: usize, reset_period: Option<Duration>) -> Self {
37 192 : Self {
38 192 : name,
39 192 : fail_count: 0,
40 192 : fail_threshold,
41 192 : broken_at: None,
42 192 : reset_period,
43 192 : short_circuit: false,
44 192 : }
45 192 : }
46 :
47 : /// Construct an unbreakable circuit breaker, for use in unit tests etc.
48 0 : pub fn short_circuit() -> Self {
49 0 : Self {
50 0 : name: String::new(),
51 0 : fail_threshold: 0,
52 0 : fail_count: 0,
53 0 : broken_at: None,
54 0 : reset_period: None,
55 0 : short_circuit: true,
56 0 : }
57 0 : }
58 :
59 0 : pub fn fail<E>(&mut self, metric: &IntCounter, error: E)
60 0 : where
61 0 : E: Display,
62 0 : {
63 0 : if self.short_circuit {
64 0 : return;
65 0 : }
66 0 :
67 0 : self.fail_count += 1;
68 0 : if self.broken_at.is_none() && self.fail_count >= self.fail_threshold {
69 0 : self.break_circuit(metric, error);
70 0 : }
71 0 : }
72 :
73 : /// Call this after successfully executing an operation
74 0 : pub fn success(&mut self, metric: &IntCounter) {
75 0 : self.fail_count = 0;
76 0 : if let Some(broken_at) = &self.broken_at {
77 0 : tracing::info!(breaker=%self.name, "Circuit breaker failure ended (was broken for {})",
78 0 : humantime::format_duration(broken_at.elapsed()));
79 0 : self.broken_at = None;
80 0 : metric.inc();
81 0 : }
82 0 : }
83 :
84 : /// Call this before attempting an operation, and skip the operation if we are currently broken.
85 0 : pub fn is_broken(&mut self) -> bool {
86 0 : if self.short_circuit {
87 0 : return false;
88 0 : }
89 :
90 0 : if let Some(broken_at) = self.broken_at {
91 0 : match self.reset_period {
92 0 : Some(reset_period) if broken_at.elapsed() > reset_period => {
93 0 : self.reset_circuit();
94 0 : false
95 : }
96 0 : _ => true,
97 : }
98 : } else {
99 0 : false
100 : }
101 0 : }
102 :
103 0 : fn break_circuit<E>(&mut self, metric: &IntCounter, error: E)
104 0 : where
105 0 : E: Display,
106 0 : {
107 0 : self.broken_at = Some(Instant::now());
108 0 : tracing::error!(breaker=%self.name, "Circuit breaker broken! Last error: {error}");
109 0 : metric.inc();
110 0 : }
111 :
112 0 : fn reset_circuit(&mut self) {
113 0 : self.broken_at = None;
114 0 : self.fail_count = 0;
115 0 : }
116 : }
|