Line data Source code
1 : use std::sync::OnceLock;
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::{
5 : label::StaticLabelSet,
6 : metric::{histogram::Thresholds, name::MetricName},
7 : Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
8 : MetricGroup,
9 : };
10 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
11 :
12 : use tokio::time::{self, Instant};
13 :
14 : use crate::console::messages::ColdStartInfo;
15 :
16 : #[derive(MetricGroup)]
17 : pub struct Metrics {
18 : #[metric(namespace = "proxy")]
19 : pub proxy: ProxyMetrics,
20 :
21 : #[metric(namespace = "wake_compute_lock")]
22 : pub wake_compute_lock: ApiLockMetrics,
23 : }
24 :
25 : impl Metrics {
26 178 : pub fn get() -> &'static Self {
27 178 : static SELF: OnceLock<Metrics> = OnceLock::new();
28 178 : SELF.get_or_init(|| Metrics {
29 72 : proxy: ProxyMetrics::default(),
30 72 : wake_compute_lock: ApiLockMetrics::new(),
31 178 : })
32 178 : }
33 : }
34 :
35 72 : #[derive(MetricGroup)]
36 : #[metric(new())]
37 : pub struct ProxyMetrics {
38 : #[metric(flatten)]
39 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
40 : #[metric(flatten)]
41 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
42 : #[metric(flatten)]
43 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
44 : #[metric(flatten)]
45 : pub http_endpoint_pools: HttpEndpointPools,
46 :
47 : /// Time it took for proxy to establish a connection to the compute endpoint.
48 : // largest bucket = 2^16 * 0.5ms = 32s
49 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
50 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
51 :
52 : /// Time it took for proxy to receive a response from control plane.
53 : #[metric(
54 : // largest bucket = 2^16 * 0.2ms = 13s
55 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
56 : )]
57 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
58 :
59 : /// Time it takes to acquire a token to call console plane.
60 : // largest bucket = 3^16 * 0.05ms = 2.15s
61 : #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
62 : pub control_plane_token_acquire_seconds: Histogram<16>,
63 :
64 : /// Size of the HTTP request body lengths.
65 : // smallest bucket = 16 bytes
66 : // largest bucket = 4^12 * 16 bytes = 256MB
67 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
68 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
69 :
70 : /// Time it takes to reclaim unused connection pools.
71 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
72 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
73 :
74 : /// Number of opened connections to a database.
75 : pub http_pool_opened_connections: Gauge,
76 :
77 : /// Number of cache hits/misses for allowed ips.
78 : pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
79 :
80 : /// Number of allowed ips
81 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
82 : pub allowed_ips_number: Histogram<10>,
83 :
84 : /// Number of connections (per sni).
85 : pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
86 :
87 : /// Number of connection failures (per kind).
88 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
89 :
90 : /// Number of wake-up failures (per kind).
91 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
92 :
93 : /// Number of bytes sent/received between all clients and backends.
94 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
95 :
96 : /// Number of errors by a given classification.
97 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
98 :
99 : /// Number of cancellation requests (per found/not_found).
100 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
101 :
102 : /// Number of errors by a given classification
103 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
104 :
105 : /// Number of TLS handshake failures
106 : pub tls_handshake_failures: Counter,
107 :
108 : /// Number of connection requests affected by authentication rate limits
109 : pub requests_auth_rate_limits_total: Counter,
110 :
111 : /// HLL approximate cardinality of endpoints that are connecting
112 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
113 :
114 : /// Number of endpoints affected by errors of a given classification
115 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
116 :
117 : /// Number of endpoints affected by authentication rate limits
118 : pub endpoints_auth_rate_limits: HyperLogLog<32>,
119 :
120 : /// Number of invalid endpoints (per protocol, per rejected).
121 : pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
122 :
123 : /// Number of retries (per outcome, per retry_type).
124 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
125 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
126 :
127 : /// Number of events consumed from redis (per event type).
128 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
129 :
130 : #[metric(namespace = "connect_compute_lock")]
131 : pub connect_compute_lock: ApiLockMetrics,
132 : }
133 :
134 144 : #[derive(MetricGroup)]
135 : #[metric(new())]
136 : pub struct ApiLockMetrics {
137 : /// Number of semaphores registered in this api lock
138 : pub semaphores_registered: Counter,
139 : /// Number of semaphores unregistered in this api lock
140 : pub semaphores_unregistered: Counter,
141 : /// Time it takes to reclaim unused semaphores in the api lock
142 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
143 : pub reclamation_lag_seconds: Histogram<16>,
144 : /// Time it takes to acquire a semaphore lock
145 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
146 : pub semaphore_acquire_seconds: Histogram<16>,
147 : }
148 :
149 : impl Default for ProxyMetrics {
150 72 : fn default() -> Self {
151 72 : Self::new()
152 72 : }
153 : }
154 :
155 : impl Default for ApiLockMetrics {
156 72 : fn default() -> Self {
157 72 : Self::new()
158 72 : }
159 : }
160 :
161 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
162 : #[label(singleton = "direction")]
163 : pub enum HttpDirection {
164 : Request,
165 : Response,
166 : }
167 :
168 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
169 : #[label(singleton = "direction")]
170 : pub enum Direction {
171 : Tx,
172 : Rx,
173 : }
174 :
175 0 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
176 : #[label(singleton = "protocol")]
177 : pub enum Protocol {
178 : Http,
179 : Ws,
180 : Tcp,
181 : SniRouter,
182 : }
183 :
184 : impl Protocol {
185 0 : pub fn as_str(&self) -> &'static str {
186 0 : match self {
187 0 : Protocol::Http => "http",
188 0 : Protocol::Ws => "ws",
189 0 : Protocol::Tcp => "tcp",
190 0 : Protocol::SniRouter => "sni_router",
191 : }
192 0 : }
193 : }
194 :
195 : impl std::fmt::Display for Protocol {
196 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 0 : f.write_str(self.as_str())
198 0 : }
199 : }
200 :
201 : #[derive(FixedCardinalityLabel, Copy, Clone)]
202 : pub enum Bool {
203 : True,
204 : False,
205 : }
206 :
207 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
208 : #[label(singleton = "outcome")]
209 : pub enum Outcome {
210 : Success,
211 : Failed,
212 : }
213 :
214 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
215 : #[label(singleton = "outcome")]
216 : pub enum CacheOutcome {
217 : Hit,
218 : Miss,
219 : }
220 :
221 144 : #[derive(LabelGroup)]
222 : #[label(set = ConsoleRequestSet)]
223 : pub struct ConsoleRequest<'a> {
224 : #[label(dynamic_with = ThreadedRodeo, default)]
225 : pub request: &'a str,
226 : }
227 :
228 : #[derive(MetricGroup, Default)]
229 : pub struct HttpEndpointPools {
230 : /// Number of endpoints we have registered pools for
231 : pub http_pool_endpoints_registered_total: Counter,
232 : /// Number of endpoints we have unregistered pools for
233 : pub http_pool_endpoints_unregistered_total: Counter,
234 : }
235 :
236 : pub struct HttpEndpointPoolsGuard<'a> {
237 : dec: &'a Counter,
238 : }
239 :
240 : impl Drop for HttpEndpointPoolsGuard<'_> {
241 4 : fn drop(&mut self) {
242 4 : self.dec.inc();
243 4 : }
244 : }
245 :
246 : impl HttpEndpointPools {
247 4 : pub fn guard(&self) -> HttpEndpointPoolsGuard {
248 4 : self.http_pool_endpoints_registered_total.inc();
249 4 : HttpEndpointPoolsGuard {
250 4 : dec: &self.http_pool_endpoints_unregistered_total,
251 4 : }
252 4 : }
253 : }
254 : pub struct NumDbConnectionsGauge;
255 : impl CounterPairAssoc for NumDbConnectionsGauge {
256 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
257 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
258 : const INC_HELP: &'static str = "Number of opened connections to a database.";
259 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
260 : type LabelGroupSet = StaticLabelSet<Protocol>;
261 : }
262 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
263 :
264 : pub struct NumClientConnectionsGauge;
265 : impl CounterPairAssoc for NumClientConnectionsGauge {
266 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
267 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
268 : const INC_HELP: &'static str = "Number of opened connections from a client.";
269 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
270 : type LabelGroupSet = StaticLabelSet<Protocol>;
271 : }
272 : pub type NumClientConnectionsGuard<'a> =
273 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
274 :
275 : pub struct NumConnectionRequestsGauge;
276 : impl CounterPairAssoc for NumConnectionRequestsGauge {
277 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
278 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
279 : const INC_HELP: &'static str = "Number of client connections accepted.";
280 : const DEC_HELP: &'static str = "Number of client connections closed.";
281 : type LabelGroupSet = StaticLabelSet<Protocol>;
282 : }
283 : pub type NumConnectionRequestsGuard<'a> =
284 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
285 :
286 432 : #[derive(LabelGroup)]
287 : #[label(set = ComputeConnectionLatencySet)]
288 : pub struct ComputeConnectionLatencyGroup {
289 : protocol: Protocol,
290 : cold_start_info: ColdStartInfo,
291 : outcome: ConnectOutcome,
292 : excluded: LatencyExclusions,
293 : }
294 :
295 : #[derive(FixedCardinalityLabel, Copy, Clone)]
296 : pub enum LatencyExclusions {
297 : Client,
298 : ClientAndCplane,
299 : ClientCplaneCompute,
300 : ClientCplaneComputeRetry,
301 : }
302 :
303 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
304 : #[label(singleton = "kind")]
305 : pub enum SniKind {
306 : Sni,
307 : NoSni,
308 : PasswordHack,
309 : }
310 :
311 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
312 : #[label(singleton = "kind")]
313 : pub enum ConnectionFailureKind {
314 : ComputeCached,
315 : ComputeUncached,
316 : }
317 :
318 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
319 : #[label(singleton = "kind")]
320 : pub enum WakeupFailureKind {
321 : BadComputeAddress,
322 : ApiTransportError,
323 : QuotaExceeded,
324 : ApiConsoleLocked,
325 : ApiConsoleBadRequest,
326 : ApiConsoleOtherServerError,
327 : ApiConsoleOtherError,
328 : TimeoutError,
329 : }
330 :
331 288 : #[derive(LabelGroup)]
332 : #[label(set = ConnectionFailuresBreakdownSet)]
333 : pub struct ConnectionFailuresBreakdownGroup {
334 : pub kind: WakeupFailureKind,
335 : pub retry: Bool,
336 : }
337 :
338 144 : #[derive(LabelGroup, Copy, Clone)]
339 : #[label(set = RedisErrorsSet)]
340 : pub struct RedisErrors<'a> {
341 : #[label(dynamic_with = ThreadedRodeo, default)]
342 : pub channel: &'a str,
343 : }
344 :
345 : #[derive(FixedCardinalityLabel, Copy, Clone)]
346 : pub enum CancellationSource {
347 : FromClient,
348 : FromRedis,
349 : Local,
350 : }
351 :
352 : #[derive(FixedCardinalityLabel, Copy, Clone)]
353 : pub enum CancellationOutcome {
354 : NotFound,
355 : Found,
356 : }
357 :
358 288 : #[derive(LabelGroup)]
359 : #[label(set = CancellationRequestSet)]
360 : pub struct CancellationRequest {
361 : pub source: CancellationSource,
362 : pub kind: CancellationOutcome,
363 : }
364 :
365 : pub enum Waiting {
366 : Cplane,
367 : Client,
368 : Compute,
369 : RetryTimeout,
370 : }
371 :
372 : #[derive(Default)]
373 : struct Accumulated {
374 : cplane: time::Duration,
375 : client: time::Duration,
376 : compute: time::Duration,
377 : retry: time::Duration,
378 : }
379 :
380 : pub struct LatencyTimer {
381 : // time since the stopwatch was started
382 : start: time::Instant,
383 : // time since the stopwatch was stopped
384 : stop: Option<time::Instant>,
385 : // accumulated time on the stopwatch
386 : accumulated: Accumulated,
387 : // label data
388 : protocol: Protocol,
389 : cold_start_info: ColdStartInfo,
390 : outcome: ConnectOutcome,
391 : }
392 :
393 : pub struct LatencyTimerPause<'a> {
394 : timer: &'a mut LatencyTimer,
395 : start: time::Instant,
396 : waiting_for: Waiting,
397 : }
398 :
399 : impl LatencyTimer {
400 70 : pub fn new(protocol: Protocol) -> Self {
401 70 : Self {
402 70 : start: time::Instant::now(),
403 70 : stop: None,
404 70 : accumulated: Accumulated::default(),
405 70 : protocol,
406 70 : cold_start_info: ColdStartInfo::Unknown,
407 70 : // assume failed unless otherwise specified
408 70 : outcome: ConnectOutcome::Failed,
409 70 : }
410 70 : }
411 :
412 42 : pub fn pause(&mut self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
413 42 : LatencyTimerPause {
414 42 : timer: self,
415 42 : start: Instant::now(),
416 42 : waiting_for,
417 42 : }
418 42 : }
419 :
420 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
421 0 : self.cold_start_info = cold_start_info;
422 0 : }
423 :
424 8 : pub fn success(&mut self) {
425 8 : // stop the stopwatch and record the time that we have accumulated
426 8 : self.stop = Some(time::Instant::now());
427 8 :
428 8 : // success
429 8 : self.outcome = ConnectOutcome::Success;
430 8 : }
431 : }
432 :
433 : impl Drop for LatencyTimerPause<'_> {
434 42 : fn drop(&mut self) {
435 42 : let dur = self.start.elapsed();
436 42 : match self.waiting_for {
437 0 : Waiting::Cplane => self.timer.accumulated.cplane += dur,
438 30 : Waiting::Client => self.timer.accumulated.client += dur,
439 0 : Waiting::Compute => self.timer.accumulated.compute += dur,
440 12 : Waiting::RetryTimeout => self.timer.accumulated.retry += dur,
441 : }
442 42 : }
443 : }
444 :
445 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
446 : pub enum ConnectOutcome {
447 : Success,
448 : Failed,
449 : }
450 :
451 : impl Drop for LatencyTimer {
452 70 : fn drop(&mut self) {
453 70 : let duration = self
454 70 : .stop
455 70 : .unwrap_or_else(time::Instant::now)
456 70 : .duration_since(self.start);
457 70 :
458 70 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
459 70 :
460 70 : // Excluding client communication from the accumulated time.
461 70 : metric.observe(
462 70 : ComputeConnectionLatencyGroup {
463 70 : protocol: self.protocol,
464 70 : cold_start_info: self.cold_start_info,
465 70 : outcome: self.outcome,
466 70 : excluded: LatencyExclusions::Client,
467 70 : },
468 70 : duration
469 70 : .saturating_sub(self.accumulated.client)
470 70 : .as_secs_f64(),
471 70 : );
472 70 :
473 70 : // Exclude client and cplane communication from the accumulated time.
474 70 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
475 70 : metric.observe(
476 70 : ComputeConnectionLatencyGroup {
477 70 : protocol: self.protocol,
478 70 : cold_start_info: self.cold_start_info,
479 70 : outcome: self.outcome,
480 70 : excluded: LatencyExclusions::ClientAndCplane,
481 70 : },
482 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
483 70 : );
484 70 :
485 70 : // Exclude client cplane, compue communication from the accumulated time.
486 70 : let accumulated_total =
487 70 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
488 70 : metric.observe(
489 70 : ComputeConnectionLatencyGroup {
490 70 : protocol: self.protocol,
491 70 : cold_start_info: self.cold_start_info,
492 70 : outcome: self.outcome,
493 70 : excluded: LatencyExclusions::ClientCplaneCompute,
494 70 : },
495 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
496 70 : );
497 70 :
498 70 : // Exclude client cplane, compue, retry communication from the accumulated time.
499 70 : let accumulated_total = self.accumulated.client
500 70 : + self.accumulated.cplane
501 70 : + self.accumulated.compute
502 70 : + self.accumulated.retry;
503 70 : metric.observe(
504 70 : ComputeConnectionLatencyGroup {
505 70 : protocol: self.protocol,
506 70 : cold_start_info: self.cold_start_info,
507 70 : outcome: self.outcome,
508 70 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
509 70 : },
510 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
511 70 : );
512 70 : }
513 : }
514 :
515 : impl From<bool> for Bool {
516 6 : fn from(value: bool) -> Self {
517 6 : if value {
518 4 : Bool::True
519 : } else {
520 2 : Bool::False
521 : }
522 6 : }
523 : }
524 :
525 360 : #[derive(LabelGroup)]
526 : #[label(set = InvalidEndpointsSet)]
527 : pub struct InvalidEndpointsGroup {
528 : pub protocol: Protocol,
529 : pub rejected: Bool,
530 : pub outcome: ConnectOutcome,
531 : }
532 :
533 288 : #[derive(LabelGroup)]
534 : #[label(set = RetriesMetricSet)]
535 : pub struct RetriesMetricGroup {
536 : pub outcome: ConnectOutcome,
537 : pub retry_type: RetryType,
538 : }
539 :
540 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
541 : pub enum RetryType {
542 : WakeCompute,
543 : ConnectToCompute,
544 : }
545 :
546 0 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
547 : #[label(singleton = "event")]
548 : pub enum RedisEventsCount {
549 : EndpointCreated,
550 : BranchCreated,
551 : ProjectCreated,
552 : CancelSession,
553 : PasswordUpdate,
554 : AllowedIpsUpdate,
555 : }
|