Line data Source code
1 : use std::sync::{Arc, OnceLock};
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::label::{
5 : FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
6 : };
7 : use measured::metric::histogram::Thresholds;
8 : use measured::metric::name::MetricName;
9 : use measured::{
10 : Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
11 : MetricGroup,
12 : };
13 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
14 : use tokio::time::{self, Instant};
15 :
16 : use crate::control_plane::messages::ColdStartInfo;
17 : use crate::error::ErrorKind;
18 :
19 : #[derive(MetricGroup)]
20 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
21 : pub struct Metrics {
22 : #[metric(namespace = "proxy")]
23 : #[metric(init = ProxyMetrics::new(thread_pool))]
24 : pub proxy: ProxyMetrics,
25 :
26 : #[metric(namespace = "wake_compute_lock")]
27 : pub wake_compute_lock: ApiLockMetrics,
28 : }
29 :
30 : static SELF: OnceLock<Metrics> = OnceLock::new();
31 : impl Metrics {
32 0 : pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
33 0 : let mut metrics = Metrics::new(thread_pool);
34 :
35 0 : metrics.proxy.errors_total.init_all_dense();
36 0 : metrics.proxy.redis_errors_total.init_all_dense();
37 0 : metrics.proxy.redis_events_count.init_all_dense();
38 0 : metrics.proxy.retries_metric.init_all_dense();
39 0 : metrics.proxy.invalid_endpoints_total.init_all_dense();
40 0 : metrics.proxy.connection_failures_total.init_all_dense();
41 :
42 0 : SELF.set(metrics)
43 0 : .ok()
44 0 : .expect("proxy metrics must not be installed more than once");
45 0 : }
46 :
47 169 : pub fn get() -> &'static Self {
48 : #[cfg(test)]
49 169 : return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
50 :
51 : #[cfg(not(test))]
52 0 : SELF.get()
53 0 : .expect("proxy metrics must be installed by the main() function")
54 169 : }
55 : }
56 :
57 : #[derive(MetricGroup)]
58 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
59 : pub struct ProxyMetrics {
60 : #[metric(flatten)]
61 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
62 : #[metric(flatten)]
63 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
64 : #[metric(flatten)]
65 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
66 : #[metric(flatten)]
67 : pub http_endpoint_pools: HttpEndpointPools,
68 : #[metric(flatten)]
69 : pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
70 :
71 : /// Time it took for proxy to establish a connection to the compute endpoint.
72 : // largest bucket = 2^16 * 0.5ms = 32s
73 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
74 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
75 :
76 : /// Time it took for proxy to receive a response from control plane.
77 : #[metric(
78 : // largest bucket = 2^16 * 0.2ms = 13s
79 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
80 : )]
81 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
82 :
83 : /// Time it takes to acquire a token to call console plane.
84 : // largest bucket = 3^16 * 0.05ms = 2.15s
85 : #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
86 : pub control_plane_token_acquire_seconds: Histogram<16>,
87 :
88 : /// Size of the HTTP request body lengths.
89 : // smallest bucket = 16 bytes
90 : // largest bucket = 4^12 * 16 bytes = 256MB
91 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
92 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
93 :
94 : /// Time it takes to reclaim unused connection pools.
95 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
96 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
97 :
98 : /// Number of opened connections to a database.
99 : pub http_pool_opened_connections: Gauge,
100 :
101 : /// Number of cache hits/misses for allowed ips.
102 : pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
103 :
104 : /// Number of allowed ips
105 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
106 : pub allowed_ips_number: Histogram<10>,
107 :
108 : /// Number of cache hits/misses for VPC endpoint IDs.
109 : pub vpc_endpoint_id_cache_stats: CounterVec<StaticLabelSet<CacheOutcome>>,
110 :
111 : /// Number of cache hits/misses for access blocker flags.
112 : pub access_blocker_flags_cache_stats: CounterVec<StaticLabelSet<CacheOutcome>>,
113 :
114 : /// Number of allowed VPC endpoints IDs
115 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
116 : pub allowed_vpc_endpoint_ids: Histogram<10>,
117 :
118 : /// Number of connections, by the method we used to determine the endpoint.
119 : pub accepted_connections_by_sni: CounterVec<SniSet>,
120 :
121 : /// Number of connection failures (per kind).
122 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
123 :
124 : /// Number of wake-up failures (per kind).
125 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
126 :
127 : /// Number of bytes sent/received between all clients and backends.
128 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
129 :
130 : /// Number of errors by a given classification.
131 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
132 :
133 : /// Number of cancellation requests (per found/not_found).
134 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
135 :
136 : /// Number of errors by a given classification
137 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
138 :
139 : /// Number of TLS handshake failures
140 : pub tls_handshake_failures: Counter,
141 :
142 : /// Number of connection requests affected by authentication rate limits
143 : pub requests_auth_rate_limits_total: Counter,
144 :
145 : /// HLL approximate cardinality of endpoints that are connecting
146 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
147 :
148 : /// Number of endpoints affected by errors of a given classification
149 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
150 :
151 : /// Number of endpoints affected by authentication rate limits
152 : pub endpoints_auth_rate_limits: HyperLogLog<32>,
153 :
154 : /// Number of invalid endpoints (per protocol, per rejected).
155 : pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
156 :
157 : /// Number of retries (per outcome, per retry_type).
158 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
159 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
160 :
161 : /// Number of events consumed from redis (per event type).
162 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
163 :
164 : #[metric(namespace = "connect_compute_lock")]
165 : pub connect_compute_lock: ApiLockMetrics,
166 :
167 : #[metric(namespace = "scram_pool")]
168 : #[metric(init = thread_pool)]
169 : pub scram_pool: Arc<ThreadPoolMetrics>,
170 : }
171 :
172 : #[derive(MetricGroup)]
173 : #[metric(new())]
174 : pub struct ApiLockMetrics {
175 : /// Number of semaphores registered in this api lock
176 : pub semaphores_registered: Counter,
177 : /// Number of semaphores unregistered in this api lock
178 : pub semaphores_unregistered: Counter,
179 : /// Time it takes to reclaim unused semaphores in the api lock
180 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
181 : pub reclamation_lag_seconds: Histogram<16>,
182 : /// Time it takes to acquire a semaphore lock
183 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
184 : pub semaphore_acquire_seconds: Histogram<16>,
185 : }
186 :
187 : impl Default for ApiLockMetrics {
188 102 : fn default() -> Self {
189 102 : Self::new()
190 102 : }
191 : }
192 :
193 : #[derive(FixedCardinalityLabel, Copy, Clone)]
194 : #[label(singleton = "direction")]
195 : pub enum HttpDirection {
196 : Request,
197 : Response,
198 : }
199 :
200 : #[derive(FixedCardinalityLabel, Copy, Clone)]
201 : #[label(singleton = "direction")]
202 : pub enum Direction {
203 : Tx,
204 : Rx,
205 : }
206 :
207 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
208 : #[label(singleton = "protocol")]
209 : pub enum Protocol {
210 : Http,
211 : Ws,
212 : Tcp,
213 : SniRouter,
214 : }
215 :
216 : impl Protocol {
217 6 : pub fn as_str(self) -> &'static str {
218 6 : match self {
219 0 : Protocol::Http => "http",
220 0 : Protocol::Ws => "ws",
221 6 : Protocol::Tcp => "tcp",
222 0 : Protocol::SniRouter => "sni_router",
223 : }
224 6 : }
225 : }
226 :
227 : impl std::fmt::Display for Protocol {
228 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 6 : f.write_str(self.as_str())
230 6 : }
231 : }
232 :
233 : #[derive(FixedCardinalityLabel, Copy, Clone)]
234 : pub enum Bool {
235 : True,
236 : False,
237 : }
238 :
239 : #[derive(FixedCardinalityLabel, Copy, Clone)]
240 : #[label(singleton = "outcome")]
241 : pub enum CacheOutcome {
242 : Hit,
243 : Miss,
244 : }
245 :
246 : #[derive(LabelGroup)]
247 : #[label(set = ConsoleRequestSet)]
248 : pub struct ConsoleRequest<'a> {
249 : #[label(dynamic_with = ThreadedRodeo, default)]
250 : pub request: &'a str,
251 : }
252 :
253 : #[derive(MetricGroup, Default)]
254 : pub struct HttpEndpointPools {
255 : /// Number of endpoints we have registered pools for
256 : pub http_pool_endpoints_registered_total: Counter,
257 : /// Number of endpoints we have unregistered pools for
258 : pub http_pool_endpoints_unregistered_total: Counter,
259 : }
260 :
261 : pub struct HttpEndpointPoolsGuard<'a> {
262 : dec: &'a Counter,
263 : }
264 :
265 : impl Drop for HttpEndpointPoolsGuard<'_> {
266 2 : fn drop(&mut self) {
267 2 : self.dec.inc();
268 2 : }
269 : }
270 :
271 : impl HttpEndpointPools {
272 2 : pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
273 2 : self.http_pool_endpoints_registered_total.inc();
274 2 : HttpEndpointPoolsGuard {
275 2 : dec: &self.http_pool_endpoints_unregistered_total,
276 2 : }
277 2 : }
278 : }
279 : pub struct NumDbConnectionsGauge;
280 : impl CounterPairAssoc for NumDbConnectionsGauge {
281 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
282 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
283 : const INC_HELP: &'static str = "Number of opened connections to a database.";
284 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
285 : type LabelGroupSet = StaticLabelSet<Protocol>;
286 : }
287 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
288 :
289 : pub struct NumClientConnectionsGauge;
290 : impl CounterPairAssoc for NumClientConnectionsGauge {
291 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
292 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
293 : const INC_HELP: &'static str = "Number of opened connections from a client.";
294 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
295 : type LabelGroupSet = StaticLabelSet<Protocol>;
296 : }
297 : pub type NumClientConnectionsGuard<'a> =
298 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
299 :
300 : pub struct NumConnectionRequestsGauge;
301 : impl CounterPairAssoc for NumConnectionRequestsGauge {
302 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
303 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
304 : const INC_HELP: &'static str = "Number of client connections accepted.";
305 : const DEC_HELP: &'static str = "Number of client connections closed.";
306 : type LabelGroupSet = StaticLabelSet<Protocol>;
307 : }
308 : pub type NumConnectionRequestsGuard<'a> =
309 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
310 :
311 : pub struct CancelChannelSizeGauge;
312 : impl CounterPairAssoc for CancelChannelSizeGauge {
313 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
314 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
315 : const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
316 : const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
317 : type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
318 : }
319 : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
320 :
321 : #[derive(LabelGroup)]
322 : #[label(set = ComputeConnectionLatencySet)]
323 : pub struct ComputeConnectionLatencyGroup {
324 : protocol: Protocol,
325 : cold_start_info: ColdStartInfo,
326 : outcome: ConnectOutcome,
327 : excluded: LatencyExclusions,
328 : }
329 :
330 : #[derive(FixedCardinalityLabel, Copy, Clone)]
331 : pub enum LatencyExclusions {
332 : Client,
333 : ClientAndCplane,
334 : ClientCplaneCompute,
335 : ClientCplaneComputeRetry,
336 : }
337 :
338 : #[derive(LabelGroup)]
339 : #[label(set = SniSet)]
340 : pub struct SniGroup {
341 : pub protocol: Protocol,
342 : pub kind: SniKind,
343 : }
344 :
345 : #[derive(FixedCardinalityLabel, Copy, Clone)]
346 : pub enum SniKind {
347 : /// Domain name based routing. SNI for libpq/websockets. Host for HTTP
348 : Sni,
349 : /// Metadata based routing. `options` for libpq/websockets. Header for HTTP
350 : NoSni,
351 : /// Metadata based routing, using the password field.
352 : PasswordHack,
353 : }
354 :
355 : #[derive(FixedCardinalityLabel, Copy, Clone)]
356 : #[label(singleton = "kind")]
357 : pub enum ConnectionFailureKind {
358 : ComputeCached,
359 : ComputeUncached,
360 : }
361 :
362 : #[derive(LabelGroup)]
363 : #[label(set = ConnectionFailuresBreakdownSet)]
364 : pub struct ConnectionFailuresBreakdownGroup {
365 : pub kind: ErrorKind,
366 : pub retry: Bool,
367 : }
368 :
369 : #[derive(LabelGroup, Copy, Clone)]
370 : #[label(set = RedisErrorsSet)]
371 : pub struct RedisErrors<'a> {
372 : #[label(dynamic_with = ThreadedRodeo, default)]
373 : pub channel: &'a str,
374 : }
375 :
376 : #[derive(FixedCardinalityLabel, Copy, Clone)]
377 : pub enum CancellationOutcome {
378 : NotFound,
379 : Found,
380 : RateLimitExceeded,
381 : }
382 :
383 : #[derive(LabelGroup)]
384 : #[label(set = CancellationRequestSet)]
385 : pub struct CancellationRequest {
386 : pub kind: CancellationOutcome,
387 : }
388 :
389 : #[derive(Clone, Copy)]
390 : pub enum Waiting {
391 : Cplane,
392 : Client,
393 : Compute,
394 : RetryTimeout,
395 : }
396 :
397 : #[derive(FixedCardinalityLabel, Copy, Clone)]
398 : #[label(singleton = "kind")]
399 : #[allow(clippy::enum_variant_names)]
400 : pub enum RedisMsgKind {
401 : HSet,
402 : HSetMultiple,
403 : HGet,
404 : HGetAll,
405 : HDel,
406 : }
407 :
408 : #[derive(Default, Clone)]
409 : pub struct LatencyAccumulated {
410 : cplane: time::Duration,
411 : client: time::Duration,
412 : compute: time::Duration,
413 : retry: time::Duration,
414 : }
415 :
416 : impl std::fmt::Display for LatencyAccumulated {
417 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
418 0 : write!(
419 0 : f,
420 0 : "client: {}, cplane: {}, compute: {}, retry: {}",
421 0 : self.client.as_micros(),
422 0 : self.cplane.as_micros(),
423 0 : self.compute.as_micros(),
424 0 : self.retry.as_micros()
425 : )
426 0 : }
427 : }
428 :
429 : pub struct LatencyTimer {
430 : // time since the stopwatch was started
431 : start: time::Instant,
432 : // time since the stopwatch was stopped
433 : stop: Option<time::Instant>,
434 : // accumulated time on the stopwatch
435 : accumulated: LatencyAccumulated,
436 : // label data
437 : protocol: Protocol,
438 : cold_start_info: ColdStartInfo,
439 : outcome: ConnectOutcome,
440 :
441 : skip_reporting: bool,
442 : }
443 :
444 : impl LatencyTimer {
445 77 : pub fn new(protocol: Protocol) -> Self {
446 77 : Self {
447 77 : start: time::Instant::now(),
448 77 : stop: None,
449 77 : accumulated: LatencyAccumulated::default(),
450 77 : protocol,
451 77 : cold_start_info: ColdStartInfo::Unknown,
452 77 : // assume failed unless otherwise specified
453 77 : outcome: ConnectOutcome::Failed,
454 77 : skip_reporting: false,
455 77 : }
456 77 : }
457 :
458 0 : pub(crate) fn noop(protocol: Protocol) -> Self {
459 0 : Self {
460 0 : start: time::Instant::now(),
461 0 : stop: None,
462 0 : accumulated: LatencyAccumulated::default(),
463 0 : protocol,
464 0 : cold_start_info: ColdStartInfo::Unknown,
465 0 : // assume failed unless otherwise specified
466 0 : outcome: ConnectOutcome::Failed,
467 0 : skip_reporting: true,
468 0 : }
469 0 : }
470 :
471 52 : pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
472 52 : let dur = start.elapsed();
473 52 : match waiting_for {
474 0 : Waiting::Cplane => self.accumulated.cplane += dur,
475 39 : Waiting::Client => self.accumulated.client += dur,
476 7 : Waiting::Compute => self.accumulated.compute += dur,
477 6 : Waiting::RetryTimeout => self.accumulated.retry += dur,
478 : }
479 52 : }
480 :
481 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
482 0 : self.cold_start_info = cold_start_info;
483 0 : }
484 :
485 8 : pub fn success(&mut self) {
486 : // stop the stopwatch and record the time that we have accumulated
487 8 : self.stop = Some(time::Instant::now());
488 :
489 : // success
490 8 : self.outcome = ConnectOutcome::Success;
491 8 : }
492 :
493 0 : pub fn accumulated(&self) -> LatencyAccumulated {
494 0 : self.accumulated.clone()
495 0 : }
496 : }
497 :
498 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
499 : pub enum ConnectOutcome {
500 : Success,
501 : Failed,
502 : }
503 :
504 : impl Drop for LatencyTimer {
505 77 : fn drop(&mut self) {
506 77 : if self.skip_reporting {
507 0 : return;
508 77 : }
509 :
510 77 : let duration = self
511 77 : .stop
512 77 : .unwrap_or_else(time::Instant::now)
513 77 : .duration_since(self.start);
514 :
515 77 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
516 :
517 : // Excluding client communication from the accumulated time.
518 77 : metric.observe(
519 77 : ComputeConnectionLatencyGroup {
520 77 : protocol: self.protocol,
521 77 : cold_start_info: self.cold_start_info,
522 77 : outcome: self.outcome,
523 77 : excluded: LatencyExclusions::Client,
524 77 : },
525 77 : duration
526 77 : .saturating_sub(self.accumulated.client)
527 77 : .as_secs_f64(),
528 : );
529 :
530 : // Exclude client and cplane communication from the accumulated time.
531 77 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
532 77 : metric.observe(
533 77 : ComputeConnectionLatencyGroup {
534 77 : protocol: self.protocol,
535 77 : cold_start_info: self.cold_start_info,
536 77 : outcome: self.outcome,
537 77 : excluded: LatencyExclusions::ClientAndCplane,
538 77 : },
539 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
540 : );
541 :
542 : // Exclude client, cplane, compute communication from the accumulated time.
543 77 : let accumulated_total =
544 77 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
545 77 : metric.observe(
546 77 : ComputeConnectionLatencyGroup {
547 77 : protocol: self.protocol,
548 77 : cold_start_info: self.cold_start_info,
549 77 : outcome: self.outcome,
550 77 : excluded: LatencyExclusions::ClientCplaneCompute,
551 77 : },
552 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
553 : );
554 :
555 : // Exclude client, cplane, compute, retry communication from the accumulated time.
556 77 : let accumulated_total = self.accumulated.client
557 77 : + self.accumulated.cplane
558 77 : + self.accumulated.compute
559 77 : + self.accumulated.retry;
560 77 : metric.observe(
561 77 : ComputeConnectionLatencyGroup {
562 77 : protocol: self.protocol,
563 77 : cold_start_info: self.cold_start_info,
564 77 : outcome: self.outcome,
565 77 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
566 77 : },
567 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
568 : );
569 77 : }
570 : }
571 :
572 : impl From<bool> for Bool {
573 3 : fn from(value: bool) -> Self {
574 3 : if value { Bool::True } else { Bool::False }
575 3 : }
576 : }
577 :
578 : #[derive(LabelGroup)]
579 : #[label(set = InvalidEndpointsSet)]
580 : pub struct InvalidEndpointsGroup {
581 : pub protocol: Protocol,
582 : pub rejected: Bool,
583 : pub outcome: ConnectOutcome,
584 : }
585 :
586 : #[derive(LabelGroup)]
587 : #[label(set = RetriesMetricSet)]
588 : pub struct RetriesMetricGroup {
589 : pub outcome: ConnectOutcome,
590 : pub retry_type: RetryType,
591 : }
592 :
593 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
594 : pub enum RetryType {
595 : WakeCompute,
596 : ConnectToCompute,
597 : }
598 :
599 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
600 : #[label(singleton = "event")]
601 : pub enum RedisEventsCount {
602 : EndpointCreated,
603 : BranchCreated,
604 : ProjectCreated,
605 : CancelSession,
606 : InvalidateRole,
607 : InvalidateEndpoint,
608 : InvalidateProject,
609 : InvalidateProjects,
610 : InvalidateOrg,
611 : }
612 :
613 : pub struct ThreadPoolWorkers(usize);
614 : #[derive(Copy, Clone)]
615 : pub struct ThreadPoolWorkerId(pub usize);
616 :
617 : impl LabelValue for ThreadPoolWorkerId {
618 0 : fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
619 0 : v.write_int(self.0 as i64)
620 0 : }
621 : }
622 :
623 : impl LabelGroup for ThreadPoolWorkerId {
624 0 : fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
625 0 : v.write_value(LabelName::from_str("worker"), self);
626 0 : }
627 : }
628 :
629 : impl LabelGroupSet for ThreadPoolWorkers {
630 : type Group<'a> = ThreadPoolWorkerId;
631 :
632 114 : fn cardinality(&self) -> Option<usize> {
633 114 : Some(self.0)
634 114 : }
635 :
636 5 : fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
637 5 : Some(value)
638 5 : }
639 :
640 0 : fn decode_dense(&self, value: usize) -> Self::Group<'_> {
641 0 : ThreadPoolWorkerId(value)
642 0 : }
643 :
644 : type Unique = usize;
645 :
646 5 : fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
647 5 : Some(value.0)
648 5 : }
649 :
650 0 : fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
651 0 : ThreadPoolWorkerId(*value)
652 0 : }
653 : }
654 :
655 : impl LabelSet for ThreadPoolWorkers {
656 : type Value<'a> = ThreadPoolWorkerId;
657 :
658 0 : fn dynamic_cardinality(&self) -> Option<usize> {
659 0 : Some(self.0)
660 0 : }
661 :
662 0 : fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
663 0 : (value.0 < self.0).then_some(value.0)
664 0 : }
665 :
666 0 : fn decode(&self, value: usize) -> Self::Value<'_> {
667 0 : ThreadPoolWorkerId(value)
668 0 : }
669 : }
670 :
671 : impl FixedCardinalitySet for ThreadPoolWorkers {
672 0 : fn cardinality(&self) -> usize {
673 0 : self.0
674 0 : }
675 : }
676 :
677 : #[derive(MetricGroup)]
678 : #[metric(new(workers: usize))]
679 : pub struct ThreadPoolMetrics {
680 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
681 : pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
682 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
683 : pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
684 : }
|