Line data Source code
1 : use std::sync::OnceLock;
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::{
5 : label::StaticLabelSet,
6 : metric::{histogram::Thresholds, name::MetricName},
7 : Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
8 : MetricGroup,
9 : };
10 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
11 :
12 : use tokio::time::{self, Instant};
13 :
14 : use crate::console::messages::ColdStartInfo;
15 :
16 : #[derive(MetricGroup)]
17 : pub struct Metrics {
18 : #[metric(namespace = "proxy")]
19 : pub proxy: ProxyMetrics,
20 :
21 : #[metric(namespace = "wake_compute_lock")]
22 : pub wake_compute_lock: ApiLockMetrics,
23 : }
24 :
25 : impl Metrics {
26 144 : pub fn get() -> &'static Self {
27 144 : static SELF: OnceLock<Metrics> = OnceLock::new();
28 144 : SELF.get_or_init(|| Metrics {
29 72 : proxy: ProxyMetrics::default(),
30 72 : wake_compute_lock: ApiLockMetrics::new(),
31 144 : })
32 144 : }
33 : }
34 :
35 72 : #[derive(MetricGroup)]
36 : #[metric(new())]
37 : pub struct ProxyMetrics {
38 : #[metric(flatten)]
39 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
40 : #[metric(flatten)]
41 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
42 : #[metric(flatten)]
43 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
44 : #[metric(flatten)]
45 : pub http_endpoint_pools: HttpEndpointPools,
46 :
47 : /// Time it took for proxy to establish a connection to the compute endpoint.
48 : // largest bucket = 2^16 * 0.5ms = 32s
49 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
50 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
51 :
52 : /// Time it took for proxy to receive a response from control plane.
53 : #[metric(
54 : // largest bucket = 2^16 * 0.2ms = 13s
55 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
56 : )]
57 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
58 :
59 : /// Time it takes to acquire a token to call console plane.
60 : // largest bucket = 3^16 * 0.05ms = 2.15s
61 : #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
62 : pub control_plane_token_acquire_seconds: Histogram<16>,
63 :
64 : /// Size of the HTTP request body lengths.
65 : // smallest bucket = 16 bytes
66 : // largest bucket = 4^12 * 16 bytes = 256MB
67 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
68 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
69 :
70 : /// Time it takes to reclaim unused connection pools.
71 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
72 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
73 :
74 : /// Number of opened connections to a database.
75 : pub http_pool_opened_connections: Gauge,
76 :
77 : /// Number of cache hits/misses for allowed ips.
78 : pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
79 :
80 : /// Number of allowed ips
81 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
82 : pub allowed_ips_number: Histogram<10>,
83 :
84 : /// Number of connections (per sni).
85 : pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
86 :
87 : /// Number of connection failures (per kind).
88 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
89 :
90 : /// Number of wake-up failures (per kind).
91 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
92 :
93 : /// Number of bytes sent/received between all clients and backends.
94 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
95 :
96 : /// Number of errors by a given classification.
97 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
98 :
99 : /// Number of cancellation requests (per found/not_found).
100 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
101 :
102 : /// Number of errors by a given classification
103 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
104 :
105 : /// Number of TLS handshake failures
106 : pub tls_handshake_failures: Counter,
107 :
108 : /// Number of connection requests affected by authentication rate limits
109 : pub requests_auth_rate_limits_total: Counter,
110 :
111 : /// HLL approximate cardinality of endpoints that are connecting
112 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
113 :
114 : /// Number of endpoints affected by errors of a given classification
115 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
116 :
117 : /// Number of endpoints affected by authentication rate limits
118 : pub endpoints_auth_rate_limits: HyperLogLog<32>,
119 :
120 : /// Number of invalid endpoints (per protocol, per rejected).
121 : pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
122 : }
123 :
124 72 : #[derive(MetricGroup)]
125 : #[metric(new())]
126 : pub struct ApiLockMetrics {
127 : /// Number of semaphores registered in this api lock
128 : pub semaphores_registered: Counter,
129 : /// Number of semaphores unregistered in this api lock
130 : pub semaphores_unregistered: Counter,
131 : /// Time it takes to reclaim unused semaphores in the api lock
132 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
133 : pub reclamation_lag_seconds: Histogram<16>,
134 : /// Time it takes to acquire a semaphore lock
135 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
136 : pub semaphore_acquire_seconds: Histogram<16>,
137 : }
138 :
139 : impl Default for ProxyMetrics {
140 72 : fn default() -> Self {
141 72 : Self::new()
142 72 : }
143 : }
144 :
145 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
146 : #[label(singleton = "direction")]
147 : pub enum HttpDirection {
148 : Request,
149 : Response,
150 : }
151 :
152 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
153 : #[label(singleton = "direction")]
154 : pub enum Direction {
155 : Tx,
156 : Rx,
157 : }
158 :
159 0 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
160 : #[label(singleton = "protocol")]
161 : pub enum Protocol {
162 : Http,
163 : Ws,
164 : Tcp,
165 : SniRouter,
166 : }
167 :
168 : impl Protocol {
169 0 : pub fn as_str(&self) -> &'static str {
170 0 : match self {
171 0 : Protocol::Http => "http",
172 0 : Protocol::Ws => "ws",
173 0 : Protocol::Tcp => "tcp",
174 0 : Protocol::SniRouter => "sni_router",
175 : }
176 0 : }
177 : }
178 :
179 : impl std::fmt::Display for Protocol {
180 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 0 : f.write_str(self.as_str())
182 0 : }
183 : }
184 :
185 : #[derive(FixedCardinalityLabel, Copy, Clone)]
186 : pub enum Bool {
187 : True,
188 : False,
189 : }
190 :
191 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
192 : #[label(singleton = "outcome")]
193 : pub enum Outcome {
194 : Success,
195 : Failed,
196 : }
197 :
198 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
199 : #[label(singleton = "outcome")]
200 : pub enum CacheOutcome {
201 : Hit,
202 : Miss,
203 : }
204 :
205 144 : #[derive(LabelGroup)]
206 : #[label(set = ConsoleRequestSet)]
207 : pub struct ConsoleRequest<'a> {
208 : #[label(dynamic_with = ThreadedRodeo, default)]
209 : pub request: &'a str,
210 : }
211 :
212 : #[derive(MetricGroup, Default)]
213 : pub struct HttpEndpointPools {
214 : /// Number of endpoints we have registered pools for
215 : pub http_pool_endpoints_registered_total: Counter,
216 : /// Number of endpoints we have unregistered pools for
217 : pub http_pool_endpoints_unregistered_total: Counter,
218 : }
219 :
220 : pub struct HttpEndpointPoolsGuard<'a> {
221 : dec: &'a Counter,
222 : }
223 :
224 : impl Drop for HttpEndpointPoolsGuard<'_> {
225 4 : fn drop(&mut self) {
226 4 : self.dec.inc();
227 4 : }
228 : }
229 :
230 : impl HttpEndpointPools {
231 4 : pub fn guard(&self) -> HttpEndpointPoolsGuard {
232 4 : self.http_pool_endpoints_registered_total.inc();
233 4 : HttpEndpointPoolsGuard {
234 4 : dec: &self.http_pool_endpoints_unregistered_total,
235 4 : }
236 4 : }
237 : }
238 : pub struct NumDbConnectionsGauge;
239 : impl CounterPairAssoc for NumDbConnectionsGauge {
240 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
241 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
242 : const INC_HELP: &'static str = "Number of opened connections to a database.";
243 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
244 : type LabelGroupSet = StaticLabelSet<Protocol>;
245 : }
246 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
247 :
248 : pub struct NumClientConnectionsGauge;
249 : impl CounterPairAssoc for NumClientConnectionsGauge {
250 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
251 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
252 : const INC_HELP: &'static str = "Number of opened connections from a client.";
253 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
254 : type LabelGroupSet = StaticLabelSet<Protocol>;
255 : }
256 : pub type NumClientConnectionsGuard<'a> =
257 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
258 :
259 : pub struct NumConnectionRequestsGauge;
260 : impl CounterPairAssoc for NumConnectionRequestsGauge {
261 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
262 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
263 : const INC_HELP: &'static str = "Number of client connections accepted.";
264 : const DEC_HELP: &'static str = "Number of client connections closed.";
265 : type LabelGroupSet = StaticLabelSet<Protocol>;
266 : }
267 : pub type NumConnectionRequestsGuard<'a> =
268 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
269 :
270 432 : #[derive(LabelGroup)]
271 : #[label(set = ComputeConnectionLatencySet)]
272 : pub struct ComputeConnectionLatencyGroup {
273 : protocol: Protocol,
274 : cold_start_info: ColdStartInfo,
275 : outcome: ConnectOutcome,
276 : excluded: LatencyExclusions,
277 : }
278 :
279 : #[derive(FixedCardinalityLabel, Copy, Clone)]
280 : pub enum LatencyExclusions {
281 : Client,
282 : ClientAndCplane,
283 : }
284 :
285 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
286 : #[label(singleton = "kind")]
287 : pub enum SniKind {
288 : Sni,
289 : NoSni,
290 : PasswordHack,
291 : }
292 :
293 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
294 : #[label(singleton = "kind")]
295 : pub enum ConnectionFailureKind {
296 : ComputeCached,
297 : ComputeUncached,
298 : }
299 :
300 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
301 : #[label(singleton = "kind")]
302 : pub enum WakeupFailureKind {
303 : BadComputeAddress,
304 : ApiTransportError,
305 : QuotaExceeded,
306 : ApiConsoleLocked,
307 : ApiConsoleBadRequest,
308 : ApiConsoleOtherServerError,
309 : ApiConsoleOtherError,
310 : TimeoutError,
311 : }
312 :
313 288 : #[derive(LabelGroup)]
314 : #[label(set = ConnectionFailuresBreakdownSet)]
315 : pub struct ConnectionFailuresBreakdownGroup {
316 : pub kind: WakeupFailureKind,
317 : pub retry: Bool,
318 : }
319 :
320 144 : #[derive(LabelGroup, Copy, Clone)]
321 : #[label(set = RedisErrorsSet)]
322 : pub struct RedisErrors<'a> {
323 : #[label(dynamic_with = ThreadedRodeo, default)]
324 : pub channel: &'a str,
325 : }
326 :
327 : #[derive(FixedCardinalityLabel, Copy, Clone)]
328 : pub enum CancellationSource {
329 : FromClient,
330 : FromRedis,
331 : Local,
332 : }
333 :
334 : #[derive(FixedCardinalityLabel, Copy, Clone)]
335 : pub enum CancellationOutcome {
336 : NotFound,
337 : Found,
338 : }
339 :
340 288 : #[derive(LabelGroup)]
341 : #[label(set = CancellationRequestSet)]
342 : pub struct CancellationRequest {
343 : pub source: CancellationSource,
344 : pub kind: CancellationOutcome,
345 : }
346 :
347 : pub enum Waiting {
348 : Cplane,
349 : Client,
350 : Compute,
351 : }
352 :
353 : #[derive(Default)]
354 : struct Accumulated {
355 : cplane: time::Duration,
356 : client: time::Duration,
357 : compute: time::Duration,
358 : }
359 :
360 : pub struct LatencyTimer {
361 : // time since the stopwatch was started
362 : start: time::Instant,
363 : // time since the stopwatch was stopped
364 : stop: Option<time::Instant>,
365 : // accumulated time on the stopwatch
366 : accumulated: Accumulated,
367 : // label data
368 : protocol: Protocol,
369 : cold_start_info: ColdStartInfo,
370 : outcome: ConnectOutcome,
371 : }
372 :
373 : pub struct LatencyTimerPause<'a> {
374 : timer: &'a mut LatencyTimer,
375 : start: time::Instant,
376 : waiting_for: Waiting,
377 : }
378 :
379 : impl LatencyTimer {
380 70 : pub fn new(protocol: Protocol) -> Self {
381 70 : Self {
382 70 : start: time::Instant::now(),
383 70 : stop: None,
384 70 : accumulated: Accumulated::default(),
385 70 : protocol,
386 70 : cold_start_info: ColdStartInfo::Unknown,
387 70 : // assume failed unless otherwise specified
388 70 : outcome: ConnectOutcome::Failed,
389 70 : }
390 70 : }
391 :
392 30 : pub fn pause(&mut self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
393 30 : LatencyTimerPause {
394 30 : timer: self,
395 30 : start: Instant::now(),
396 30 : waiting_for,
397 30 : }
398 30 : }
399 :
400 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
401 0 : self.cold_start_info = cold_start_info;
402 0 : }
403 :
404 8 : pub fn success(&mut self) {
405 8 : // stop the stopwatch and record the time that we have accumulated
406 8 : self.stop = Some(time::Instant::now());
407 8 :
408 8 : // success
409 8 : self.outcome = ConnectOutcome::Success;
410 8 : }
411 : }
412 :
413 : impl Drop for LatencyTimerPause<'_> {
414 30 : fn drop(&mut self) {
415 30 : let dur = self.start.elapsed();
416 30 : match self.waiting_for {
417 0 : Waiting::Cplane => self.timer.accumulated.cplane += dur,
418 30 : Waiting::Client => self.timer.accumulated.client += dur,
419 0 : Waiting::Compute => self.timer.accumulated.compute += dur,
420 : }
421 30 : }
422 : }
423 :
424 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
425 : pub enum ConnectOutcome {
426 : Success,
427 : Failed,
428 : }
429 :
430 : impl Drop for LatencyTimer {
431 70 : fn drop(&mut self) {
432 70 : let duration = self
433 70 : .stop
434 70 : .unwrap_or_else(time::Instant::now)
435 70 : .duration_since(self.start);
436 70 :
437 70 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
438 70 :
439 70 : // Excluding client communication from the accumulated time.
440 70 : metric.observe(
441 70 : ComputeConnectionLatencyGroup {
442 70 : protocol: self.protocol,
443 70 : cold_start_info: self.cold_start_info,
444 70 : outcome: self.outcome,
445 70 : excluded: LatencyExclusions::Client,
446 70 : },
447 70 : duration
448 70 : .saturating_sub(self.accumulated.client)
449 70 : .as_secs_f64(),
450 70 : );
451 70 :
452 70 : // Exclude client and cplane communication from the accumulated time.
453 70 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
454 70 : metric.observe(
455 70 : ComputeConnectionLatencyGroup {
456 70 : protocol: self.protocol,
457 70 : cold_start_info: self.cold_start_info,
458 70 : outcome: self.outcome,
459 70 : excluded: LatencyExclusions::ClientAndCplane,
460 70 : },
461 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
462 70 : );
463 70 : }
464 : }
465 :
466 : impl From<bool> for Bool {
467 6 : fn from(value: bool) -> Self {
468 6 : if value {
469 4 : Bool::True
470 : } else {
471 2 : Bool::False
472 : }
473 6 : }
474 : }
475 :
476 360 : #[derive(LabelGroup)]
477 : #[label(set = InvalidEndpointsSet)]
478 : pub struct InvalidEndpointsGroup {
479 : pub protocol: Protocol,
480 : pub rejected: Bool,
481 : pub outcome: ConnectOutcome,
482 : }
|