Line data Source code
1 : use std::sync::{Arc, OnceLock};
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::label::{
5 : FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
6 : };
7 : use measured::metric::histogram::Thresholds;
8 : use measured::metric::name::MetricName;
9 : use measured::{
10 : Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
11 : MetricGroup,
12 : };
13 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
14 : use tokio::time::{self, Instant};
15 :
16 : use crate::control_plane::messages::ColdStartInfo;
17 : use crate::error::ErrorKind;
18 :
19 49 : #[derive(MetricGroup)]
20 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
21 : pub struct Metrics {
22 : #[metric(namespace = "proxy")]
23 : #[metric(init = ProxyMetrics::new(thread_pool))]
24 : pub proxy: ProxyMetrics,
25 :
26 : #[metric(namespace = "wake_compute_lock")]
27 : pub wake_compute_lock: ApiLockMetrics,
28 : }
29 :
30 : static SELF: OnceLock<Metrics> = OnceLock::new();
31 : impl Metrics {
32 0 : pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
33 0 : let mut metrics = Metrics::new(thread_pool);
34 0 :
35 0 : metrics.proxy.errors_total.init_all_dense();
36 0 : metrics.proxy.redis_errors_total.init_all_dense();
37 0 : metrics.proxy.redis_events_count.init_all_dense();
38 0 : metrics.proxy.retries_metric.init_all_dense();
39 0 : metrics.proxy.invalid_endpoints_total.init_all_dense();
40 0 : metrics.proxy.connection_failures_total.init_all_dense();
41 0 :
42 0 : SELF.set(metrics)
43 0 : .ok()
44 0 : .expect("proxy metrics must not be installed more than once");
45 0 : }
46 :
47 163 : pub fn get() -> &'static Self {
48 163 : #[cfg(test)]
49 163 : return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
50 163 :
51 163 : #[cfg(not(test))]
52 163 : SELF.get()
53 163 : .expect("proxy metrics must be installed by the main() function")
54 163 : }
55 : }
56 :
57 49 : #[derive(MetricGroup)]
58 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
59 : pub struct ProxyMetrics {
60 : #[metric(flatten)]
61 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
62 : #[metric(flatten)]
63 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
64 : #[metric(flatten)]
65 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
66 : #[metric(flatten)]
67 : pub http_endpoint_pools: HttpEndpointPools,
68 : #[metric(flatten)]
69 : pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
70 :
71 : /// Time it took for proxy to establish a connection to the compute endpoint.
72 : // largest bucket = 2^16 * 0.5ms = 32s
73 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
74 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
75 :
76 : /// Time it took for proxy to receive a response from control plane.
77 : #[metric(
78 : // largest bucket = 2^16 * 0.2ms = 13s
79 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
80 : )]
81 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
82 :
83 : /// Time it takes to acquire a token to call console plane.
84 : // largest bucket = 3^16 * 0.05ms = 2.15s
85 : #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
86 : pub control_plane_token_acquire_seconds: Histogram<16>,
87 :
88 : /// Size of the HTTP request body lengths.
89 : // smallest bucket = 16 bytes
90 : // largest bucket = 4^12 * 16 bytes = 256MB
91 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
92 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
93 :
94 : /// Time it takes to reclaim unused connection pools.
95 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
96 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
97 :
98 : /// Number of opened connections to a database.
99 : pub http_pool_opened_connections: Gauge,
100 :
101 : /// Number of cache hits/misses for allowed ips.
102 : pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
103 :
104 : /// Number of allowed ips
105 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
106 : pub allowed_ips_number: Histogram<10>,
107 :
108 : /// Number of cache hits/misses for VPC endpoint IDs.
109 : pub vpc_endpoint_id_cache_stats: CounterVec<StaticLabelSet<CacheOutcome>>,
110 :
111 : /// Number of cache hits/misses for access blocker flags.
112 : pub access_blocker_flags_cache_stats: CounterVec<StaticLabelSet<CacheOutcome>>,
113 :
114 : /// Number of allowed VPC endpoints IDs
115 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
116 : pub allowed_vpc_endpoint_ids: Histogram<10>,
117 :
118 : /// Number of connections, by the method we used to determine the endpoint.
119 : pub accepted_connections_by_sni: CounterVec<SniSet>,
120 :
121 : /// Number of connection failures (per kind).
122 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
123 :
124 : /// Number of wake-up failures (per kind).
125 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
126 :
127 : /// Number of bytes sent/received between all clients and backends.
128 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
129 :
130 : /// Number of errors by a given classification.
131 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
132 :
133 : /// Number of cancellation requests (per found/not_found).
134 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
135 :
136 : /// Number of errors by a given classification
137 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
138 :
139 : /// Number of TLS handshake failures
140 : pub tls_handshake_failures: Counter,
141 :
142 : /// Number of connection requests affected by authentication rate limits
143 : pub requests_auth_rate_limits_total: Counter,
144 :
145 : /// HLL approximate cardinality of endpoints that are connecting
146 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
147 :
148 : /// Number of endpoints affected by errors of a given classification
149 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
150 :
151 : /// Number of endpoints affected by authentication rate limits
152 : pub endpoints_auth_rate_limits: HyperLogLog<32>,
153 :
154 : /// Number of invalid endpoints (per protocol, per rejected).
155 : pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
156 :
157 : /// Number of retries (per outcome, per retry_type).
158 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
159 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
160 :
161 : /// Number of events consumed from redis (per event type).
162 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
163 :
164 : #[metric(namespace = "connect_compute_lock")]
165 : pub connect_compute_lock: ApiLockMetrics,
166 :
167 : #[metric(namespace = "scram_pool")]
168 : #[metric(init = thread_pool)]
169 : pub scram_pool: Arc<ThreadPoolMetrics>,
170 : }
171 :
172 98 : #[derive(MetricGroup)]
173 : #[metric(new())]
174 : pub struct ApiLockMetrics {
175 : /// Number of semaphores registered in this api lock
176 : pub semaphores_registered: Counter,
177 : /// Number of semaphores unregistered in this api lock
178 : pub semaphores_unregistered: Counter,
179 : /// Time it takes to reclaim unused semaphores in the api lock
180 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
181 : pub reclamation_lag_seconds: Histogram<16>,
182 : /// Time it takes to acquire a semaphore lock
183 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
184 : pub semaphore_acquire_seconds: Histogram<16>,
185 : }
186 :
187 : impl Default for ApiLockMetrics {
188 98 : fn default() -> Self {
189 98 : Self::new()
190 98 : }
191 : }
192 :
193 : #[derive(FixedCardinalityLabel, Copy, Clone)]
194 : #[label(singleton = "direction")]
195 : pub enum HttpDirection {
196 : Request,
197 : Response,
198 : }
199 :
200 : #[derive(FixedCardinalityLabel, Copy, Clone)]
201 : #[label(singleton = "direction")]
202 : pub enum Direction {
203 : Tx,
204 : Rx,
205 : }
206 :
207 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
208 : #[label(singleton = "protocol")]
209 : pub enum Protocol {
210 : Http,
211 : Ws,
212 : Tcp,
213 : SniRouter,
214 : }
215 :
216 : impl Protocol {
217 4 : pub fn as_str(self) -> &'static str {
218 4 : match self {
219 0 : Protocol::Http => "http",
220 0 : Protocol::Ws => "ws",
221 4 : Protocol::Tcp => "tcp",
222 0 : Protocol::SniRouter => "sni_router",
223 : }
224 4 : }
225 : }
226 :
227 : impl std::fmt::Display for Protocol {
228 4 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 4 : f.write_str(self.as_str())
230 4 : }
231 : }
232 :
233 : #[derive(FixedCardinalityLabel, Copy, Clone)]
234 : pub enum Bool {
235 : True,
236 : False,
237 : }
238 :
239 : #[derive(FixedCardinalityLabel, Copy, Clone)]
240 : #[label(singleton = "outcome")]
241 : pub enum Outcome {
242 : Success,
243 : Failed,
244 : }
245 :
246 : #[derive(FixedCardinalityLabel, Copy, Clone)]
247 : #[label(singleton = "outcome")]
248 : pub enum CacheOutcome {
249 : Hit,
250 : Miss,
251 : }
252 :
253 98 : #[derive(LabelGroup)]
254 : #[label(set = ConsoleRequestSet)]
255 : pub struct ConsoleRequest<'a> {
256 : #[label(dynamic_with = ThreadedRodeo, default)]
257 : pub request: &'a str,
258 : }
259 :
260 : #[derive(MetricGroup, Default)]
261 : pub struct HttpEndpointPools {
262 : /// Number of endpoints we have registered pools for
263 : pub http_pool_endpoints_registered_total: Counter,
264 : /// Number of endpoints we have unregistered pools for
265 : pub http_pool_endpoints_unregistered_total: Counter,
266 : }
267 :
268 : pub struct HttpEndpointPoolsGuard<'a> {
269 : dec: &'a Counter,
270 : }
271 :
272 : impl Drop for HttpEndpointPoolsGuard<'_> {
273 2 : fn drop(&mut self) {
274 2 : self.dec.inc();
275 2 : }
276 : }
277 :
278 : impl HttpEndpointPools {
279 2 : pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
280 2 : self.http_pool_endpoints_registered_total.inc();
281 2 : HttpEndpointPoolsGuard {
282 2 : dec: &self.http_pool_endpoints_unregistered_total,
283 2 : }
284 2 : }
285 : }
286 : pub struct NumDbConnectionsGauge;
287 : impl CounterPairAssoc for NumDbConnectionsGauge {
288 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
289 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
290 : const INC_HELP: &'static str = "Number of opened connections to a database.";
291 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
292 : type LabelGroupSet = StaticLabelSet<Protocol>;
293 : }
294 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
295 :
296 : pub struct NumClientConnectionsGauge;
297 : impl CounterPairAssoc for NumClientConnectionsGauge {
298 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
299 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
300 : const INC_HELP: &'static str = "Number of opened connections from a client.";
301 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
302 : type LabelGroupSet = StaticLabelSet<Protocol>;
303 : }
304 : pub type NumClientConnectionsGuard<'a> =
305 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
306 :
307 : pub struct NumConnectionRequestsGauge;
308 : impl CounterPairAssoc for NumConnectionRequestsGauge {
309 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
310 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
311 : const INC_HELP: &'static str = "Number of client connections accepted.";
312 : const DEC_HELP: &'static str = "Number of client connections closed.";
313 : type LabelGroupSet = StaticLabelSet<Protocol>;
314 : }
315 : pub type NumConnectionRequestsGuard<'a> =
316 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
317 :
318 : pub struct CancelChannelSizeGauge;
319 : impl CounterPairAssoc for CancelChannelSizeGauge {
320 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
321 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
322 : const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
323 : const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
324 : type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
325 : }
326 : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
327 :
328 294 : #[derive(LabelGroup)]
329 : #[label(set = ComputeConnectionLatencySet)]
330 : pub struct ComputeConnectionLatencyGroup {
331 : protocol: Protocol,
332 : cold_start_info: ColdStartInfo,
333 : outcome: ConnectOutcome,
334 : excluded: LatencyExclusions,
335 : }
336 :
337 : #[derive(FixedCardinalityLabel, Copy, Clone)]
338 : pub enum LatencyExclusions {
339 : Client,
340 : ClientAndCplane,
341 : ClientCplaneCompute,
342 : ClientCplaneComputeRetry,
343 : }
344 :
345 196 : #[derive(LabelGroup)]
346 : #[label(set = SniSet)]
347 : pub struct SniGroup {
348 : pub protocol: Protocol,
349 : pub kind: SniKind,
350 : }
351 :
352 : #[derive(FixedCardinalityLabel, Copy, Clone)]
353 : pub enum SniKind {
354 : /// Domain name based routing. SNI for libpq/websockets. Host for HTTP
355 : Sni,
356 : /// Metadata based routing. `options` for libpq/websockets. Header for HTTP
357 : NoSni,
358 : /// Metadata based routing, using the password field.
359 : PasswordHack,
360 : }
361 :
362 : #[derive(FixedCardinalityLabel, Copy, Clone)]
363 : #[label(singleton = "kind")]
364 : pub enum ConnectionFailureKind {
365 : ComputeCached,
366 : ComputeUncached,
367 : }
368 :
369 196 : #[derive(LabelGroup)]
370 : #[label(set = ConnectionFailuresBreakdownSet)]
371 : pub struct ConnectionFailuresBreakdownGroup {
372 : pub kind: ErrorKind,
373 : pub retry: Bool,
374 : }
375 :
376 98 : #[derive(LabelGroup, Copy, Clone)]
377 : #[label(set = RedisErrorsSet)]
378 : pub struct RedisErrors<'a> {
379 : #[label(dynamic_with = ThreadedRodeo, default)]
380 : pub channel: &'a str,
381 : }
382 :
383 : #[derive(FixedCardinalityLabel, Copy, Clone)]
384 : pub enum CancellationOutcome {
385 : NotFound,
386 : Found,
387 : RateLimitExceeded,
388 : }
389 :
390 147 : #[derive(LabelGroup)]
391 : #[label(set = CancellationRequestSet)]
392 : pub struct CancellationRequest {
393 : pub kind: CancellationOutcome,
394 : }
395 :
396 : #[derive(Clone, Copy)]
397 : pub enum Waiting {
398 : Cplane,
399 : Client,
400 : Compute,
401 : RetryTimeout,
402 : }
403 :
404 : #[derive(FixedCardinalityLabel, Copy, Clone)]
405 : #[label(singleton = "kind")]
406 : #[allow(clippy::enum_variant_names)]
407 : pub enum RedisMsgKind {
408 : HSet,
409 : HSetMultiple,
410 : HGet,
411 : HGetAll,
412 : HDel,
413 : }
414 :
415 : #[derive(Default, Clone)]
416 : pub struct LatencyAccumulated {
417 : cplane: time::Duration,
418 : client: time::Duration,
419 : compute: time::Duration,
420 : retry: time::Duration,
421 : }
422 :
423 : impl std::fmt::Display for LatencyAccumulated {
424 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425 0 : write!(
426 0 : f,
427 0 : "client: {}, cplane: {}, compute: {}, retry: {}",
428 0 : self.client.as_micros(),
429 0 : self.cplane.as_micros(),
430 0 : self.compute.as_micros(),
431 0 : self.retry.as_micros()
432 0 : )
433 0 : }
434 : }
435 :
436 : pub struct LatencyTimer {
437 : // time since the stopwatch was started
438 : start: time::Instant,
439 : // time since the stopwatch was stopped
440 : stop: Option<time::Instant>,
441 : // accumulated time on the stopwatch
442 : accumulated: LatencyAccumulated,
443 : // label data
444 : protocol: Protocol,
445 : cold_start_info: ColdStartInfo,
446 : outcome: ConnectOutcome,
447 :
448 : skip_reporting: bool,
449 : }
450 :
451 : impl LatencyTimer {
452 75 : pub fn new(protocol: Protocol) -> Self {
453 75 : Self {
454 75 : start: time::Instant::now(),
455 75 : stop: None,
456 75 : accumulated: LatencyAccumulated::default(),
457 75 : protocol,
458 75 : cold_start_info: ColdStartInfo::Unknown,
459 75 : // assume failed unless otherwise specified
460 75 : outcome: ConnectOutcome::Failed,
461 75 : skip_reporting: false,
462 75 : }
463 75 : }
464 :
465 0 : pub(crate) fn noop(protocol: Protocol) -> Self {
466 0 : Self {
467 0 : start: time::Instant::now(),
468 0 : stop: None,
469 0 : accumulated: LatencyAccumulated::default(),
470 0 : protocol,
471 0 : cold_start_info: ColdStartInfo::Unknown,
472 0 : // assume failed unless otherwise specified
473 0 : outcome: ConnectOutcome::Failed,
474 0 : skip_reporting: true,
475 0 : }
476 0 : }
477 :
478 28 : pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
479 28 : let dur = start.elapsed();
480 28 : match waiting_for {
481 0 : Waiting::Cplane => self.accumulated.cplane += dur,
482 15 : Waiting::Client => self.accumulated.client += dur,
483 7 : Waiting::Compute => self.accumulated.compute += dur,
484 6 : Waiting::RetryTimeout => self.accumulated.retry += dur,
485 : }
486 28 : }
487 :
488 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
489 0 : self.cold_start_info = cold_start_info;
490 0 : }
491 :
492 7 : pub fn success(&mut self) {
493 7 : // stop the stopwatch and record the time that we have accumulated
494 7 : self.stop = Some(time::Instant::now());
495 7 :
496 7 : // success
497 7 : self.outcome = ConnectOutcome::Success;
498 7 : }
499 :
500 0 : pub fn accumulated(&self) -> LatencyAccumulated {
501 0 : self.accumulated.clone()
502 0 : }
503 : }
504 :
505 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
506 : pub enum ConnectOutcome {
507 : Success,
508 : Failed,
509 : }
510 :
511 : impl Drop for LatencyTimer {
512 75 : fn drop(&mut self) {
513 75 : if self.skip_reporting {
514 0 : return;
515 75 : }
516 75 :
517 75 : let duration = self
518 75 : .stop
519 75 : .unwrap_or_else(time::Instant::now)
520 75 : .duration_since(self.start);
521 75 :
522 75 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
523 75 :
524 75 : // Excluding client communication from the accumulated time.
525 75 : metric.observe(
526 75 : ComputeConnectionLatencyGroup {
527 75 : protocol: self.protocol,
528 75 : cold_start_info: self.cold_start_info,
529 75 : outcome: self.outcome,
530 75 : excluded: LatencyExclusions::Client,
531 75 : },
532 75 : duration
533 75 : .saturating_sub(self.accumulated.client)
534 75 : .as_secs_f64(),
535 75 : );
536 75 :
537 75 : // Exclude client and cplane communication from the accumulated time.
538 75 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
539 75 : metric.observe(
540 75 : ComputeConnectionLatencyGroup {
541 75 : protocol: self.protocol,
542 75 : cold_start_info: self.cold_start_info,
543 75 : outcome: self.outcome,
544 75 : excluded: LatencyExclusions::ClientAndCplane,
545 75 : },
546 75 : duration.saturating_sub(accumulated_total).as_secs_f64(),
547 75 : );
548 75 :
549 75 : // Exclude client, cplane, compute communication from the accumulated time.
550 75 : let accumulated_total =
551 75 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
552 75 : metric.observe(
553 75 : ComputeConnectionLatencyGroup {
554 75 : protocol: self.protocol,
555 75 : cold_start_info: self.cold_start_info,
556 75 : outcome: self.outcome,
557 75 : excluded: LatencyExclusions::ClientCplaneCompute,
558 75 : },
559 75 : duration.saturating_sub(accumulated_total).as_secs_f64(),
560 75 : );
561 75 :
562 75 : // Exclude client, cplane, compute, retry communication from the accumulated time.
563 75 : let accumulated_total = self.accumulated.client
564 75 : + self.accumulated.cplane
565 75 : + self.accumulated.compute
566 75 : + self.accumulated.retry;
567 75 : metric.observe(
568 75 : ComputeConnectionLatencyGroup {
569 75 : protocol: self.protocol,
570 75 : cold_start_info: self.cold_start_info,
571 75 : outcome: self.outcome,
572 75 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
573 75 : },
574 75 : duration.saturating_sub(accumulated_total).as_secs_f64(),
575 75 : );
576 75 : }
577 : }
578 :
579 : impl From<bool> for Bool {
580 3 : fn from(value: bool) -> Self {
581 3 : if value { Bool::True } else { Bool::False }
582 3 : }
583 : }
584 :
585 245 : #[derive(LabelGroup)]
586 : #[label(set = InvalidEndpointsSet)]
587 : pub struct InvalidEndpointsGroup {
588 : pub protocol: Protocol,
589 : pub rejected: Bool,
590 : pub outcome: ConnectOutcome,
591 : }
592 :
593 196 : #[derive(LabelGroup)]
594 : #[label(set = RetriesMetricSet)]
595 : pub struct RetriesMetricGroup {
596 : pub outcome: ConnectOutcome,
597 : pub retry_type: RetryType,
598 : }
599 :
600 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
601 : pub enum RetryType {
602 : WakeCompute,
603 : ConnectToCompute,
604 : }
605 :
606 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
607 : #[label(singleton = "event")]
608 : pub enum RedisEventsCount {
609 : EndpointCreated,
610 : BranchCreated,
611 : ProjectCreated,
612 : CancelSession,
613 : PasswordUpdate,
614 : AllowedIpsUpdate,
615 : AllowedVpcEndpointIdsUpdateForProjects,
616 : AllowedVpcEndpointIdsUpdateForAllProjectsInOrg,
617 : BlockPublicOrVpcAccessUpdate,
618 : }
619 :
620 : pub struct ThreadPoolWorkers(usize);
621 : #[derive(Copy, Clone)]
622 : pub struct ThreadPoolWorkerId(pub usize);
623 :
624 : impl LabelValue for ThreadPoolWorkerId {
625 0 : fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
626 0 : v.write_int(self.0 as i64)
627 0 : }
628 : }
629 :
630 : impl LabelGroup for ThreadPoolWorkerId {
631 0 : fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
632 0 : v.write_value(LabelName::from_str("worker"), self);
633 0 : }
634 : }
635 :
636 : impl LabelGroupSet for ThreadPoolWorkers {
637 : type Group<'a> = ThreadPoolWorkerId;
638 :
639 110 : fn cardinality(&self) -> Option<usize> {
640 110 : Some(self.0)
641 110 : }
642 :
643 6 : fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
644 6 : Some(value)
645 6 : }
646 :
647 0 : fn decode_dense(&self, value: usize) -> Self::Group<'_> {
648 0 : ThreadPoolWorkerId(value)
649 0 : }
650 :
651 : type Unique = usize;
652 :
653 6 : fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
654 6 : Some(value.0)
655 6 : }
656 :
657 0 : fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
658 0 : ThreadPoolWorkerId(*value)
659 0 : }
660 : }
661 :
662 : impl LabelSet for ThreadPoolWorkers {
663 : type Value<'a> = ThreadPoolWorkerId;
664 :
665 0 : fn dynamic_cardinality(&self) -> Option<usize> {
666 0 : Some(self.0)
667 0 : }
668 :
669 0 : fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
670 0 : (value.0 < self.0).then_some(value.0)
671 0 : }
672 :
673 0 : fn decode(&self, value: usize) -> Self::Value<'_> {
674 0 : ThreadPoolWorkerId(value)
675 0 : }
676 : }
677 :
678 : impl FixedCardinalitySet for ThreadPoolWorkers {
679 0 : fn cardinality(&self) -> usize {
680 0 : self.0
681 0 : }
682 : }
683 :
684 55 : #[derive(MetricGroup)]
685 : #[metric(new(workers: usize))]
686 : pub struct ThreadPoolMetrics {
687 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
688 : pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
689 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
690 : pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
691 : }
|