Line data Source code
1 : use std::sync::{Arc, OnceLock};
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::{
5 : label::{FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet},
6 : metric::{histogram::Thresholds, name::MetricName},
7 : Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
8 : MetricGroup,
9 : };
10 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
11 :
12 : use tokio::time::{self, Instant};
13 :
14 : use crate::console::messages::ColdStartInfo;
15 :
16 40 : #[derive(MetricGroup)]
17 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
18 : pub struct Metrics {
19 : #[metric(namespace = "proxy")]
20 : #[metric(init = ProxyMetrics::new(thread_pool))]
21 : pub proxy: ProxyMetrics,
22 :
23 : #[metric(namespace = "wake_compute_lock")]
24 : pub wake_compute_lock: ApiLockMetrics,
25 : }
26 :
27 : static SELF: OnceLock<Metrics> = OnceLock::new();
28 : impl Metrics {
29 0 : pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
30 0 : SELF.set(Metrics::new(thread_pool))
31 0 : .ok()
32 0 : .expect("proxy metrics must not be installed more than once");
33 0 : }
34 :
35 135 : pub fn get() -> &'static Self {
36 135 : #[cfg(test)]
37 135 : return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
38 135 :
39 135 : #[cfg(not(test))]
40 135 : SELF.get()
41 135 : .expect("proxy metrics must be installed by the main() function")
42 135 : }
43 : }
44 :
45 40 : #[derive(MetricGroup)]
46 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
47 : pub struct ProxyMetrics {
48 : #[metric(flatten)]
49 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
50 : #[metric(flatten)]
51 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
52 : #[metric(flatten)]
53 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
54 : #[metric(flatten)]
55 : pub http_endpoint_pools: HttpEndpointPools,
56 :
57 : /// Time it took for proxy to establish a connection to the compute endpoint.
58 : // largest bucket = 2^16 * 0.5ms = 32s
59 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
60 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
61 :
62 : /// Time it took for proxy to receive a response from control plane.
63 : #[metric(
64 : // largest bucket = 2^16 * 0.2ms = 13s
65 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
66 : )]
67 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
68 :
69 : /// Time it takes to acquire a token to call console plane.
70 : // largest bucket = 3^16 * 0.05ms = 2.15s
71 : #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
72 : pub control_plane_token_acquire_seconds: Histogram<16>,
73 :
74 : /// Size of the HTTP request body lengths.
75 : // smallest bucket = 16 bytes
76 : // largest bucket = 4^12 * 16 bytes = 256MB
77 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
78 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
79 :
80 : /// Time it takes to reclaim unused connection pools.
81 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
82 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
83 :
84 : /// Number of opened connections to a database.
85 : pub http_pool_opened_connections: Gauge,
86 :
87 : /// Number of cache hits/misses for allowed ips.
88 : pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
89 :
90 : /// Number of allowed ips
91 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
92 : pub allowed_ips_number: Histogram<10>,
93 :
94 : /// Number of connections (per sni).
95 : pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
96 :
97 : /// Number of connection failures (per kind).
98 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
99 :
100 : /// Number of wake-up failures (per kind).
101 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
102 :
103 : /// Number of bytes sent/received between all clients and backends.
104 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
105 :
106 : /// Number of errors by a given classification.
107 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
108 :
109 : /// Number of cancellation requests (per found/not_found).
110 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
111 :
112 : /// Number of errors by a given classification
113 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
114 :
115 : /// Number of TLS handshake failures
116 : pub tls_handshake_failures: Counter,
117 :
118 : /// Number of connection requests affected by authentication rate limits
119 : pub requests_auth_rate_limits_total: Counter,
120 :
121 : /// HLL approximate cardinality of endpoints that are connecting
122 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
123 :
124 : /// Number of endpoints affected by errors of a given classification
125 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
126 :
127 : /// Number of endpoints affected by authentication rate limits
128 : pub endpoints_auth_rate_limits: HyperLogLog<32>,
129 :
130 : /// Number of invalid endpoints (per protocol, per rejected).
131 : pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
132 :
133 : /// Number of retries (per outcome, per retry_type).
134 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
135 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
136 :
137 : /// Number of events consumed from redis (per event type).
138 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
139 :
140 : #[metric(namespace = "connect_compute_lock")]
141 : pub connect_compute_lock: ApiLockMetrics,
142 :
143 : #[metric(namespace = "scram_pool")]
144 : #[metric(init = thread_pool)]
145 : pub scram_pool: Arc<ThreadPoolMetrics>,
146 : }
147 :
148 80 : #[derive(MetricGroup)]
149 : #[metric(new())]
150 : pub struct ApiLockMetrics {
151 : /// Number of semaphores registered in this api lock
152 : pub semaphores_registered: Counter,
153 : /// Number of semaphores unregistered in this api lock
154 : pub semaphores_unregistered: Counter,
155 : /// Time it takes to reclaim unused semaphores in the api lock
156 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
157 : pub reclamation_lag_seconds: Histogram<16>,
158 : /// Time it takes to acquire a semaphore lock
159 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
160 : pub semaphore_acquire_seconds: Histogram<16>,
161 : }
162 :
163 : impl Default for ApiLockMetrics {
164 80 : fn default() -> Self {
165 80 : Self::new()
166 80 : }
167 : }
168 :
169 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
170 : #[label(singleton = "direction")]
171 : pub enum HttpDirection {
172 : Request,
173 : Response,
174 : }
175 :
176 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
177 : #[label(singleton = "direction")]
178 : pub enum Direction {
179 : Tx,
180 : Rx,
181 : }
182 :
183 0 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
184 : #[label(singleton = "protocol")]
185 : pub enum Protocol {
186 : Http,
187 : Ws,
188 : Tcp,
189 : SniRouter,
190 : }
191 :
192 : impl Protocol {
193 0 : pub fn as_str(&self) -> &'static str {
194 0 : match self {
195 0 : Protocol::Http => "http",
196 0 : Protocol::Ws => "ws",
197 0 : Protocol::Tcp => "tcp",
198 0 : Protocol::SniRouter => "sni_router",
199 : }
200 0 : }
201 : }
202 :
203 : impl std::fmt::Display for Protocol {
204 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 0 : f.write_str(self.as_str())
206 0 : }
207 : }
208 :
209 : #[derive(FixedCardinalityLabel, Copy, Clone)]
210 : pub enum Bool {
211 : True,
212 : False,
213 : }
214 :
215 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
216 : #[label(singleton = "outcome")]
217 : pub enum Outcome {
218 : Success,
219 : Failed,
220 : }
221 :
222 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
223 : #[label(singleton = "outcome")]
224 : pub enum CacheOutcome {
225 : Hit,
226 : Miss,
227 : }
228 :
229 80 : #[derive(LabelGroup)]
230 : #[label(set = ConsoleRequestSet)]
231 : pub struct ConsoleRequest<'a> {
232 : #[label(dynamic_with = ThreadedRodeo, default)]
233 : pub request: &'a str,
234 : }
235 :
236 : #[derive(MetricGroup, Default)]
237 : pub struct HttpEndpointPools {
238 : /// Number of endpoints we have registered pools for
239 : pub http_pool_endpoints_registered_total: Counter,
240 : /// Number of endpoints we have unregistered pools for
241 : pub http_pool_endpoints_unregistered_total: Counter,
242 : }
243 :
244 : pub struct HttpEndpointPoolsGuard<'a> {
245 : dec: &'a Counter,
246 : }
247 :
248 : impl Drop for HttpEndpointPoolsGuard<'_> {
249 2 : fn drop(&mut self) {
250 2 : self.dec.inc();
251 2 : }
252 : }
253 :
254 : impl HttpEndpointPools {
255 2 : pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
256 2 : self.http_pool_endpoints_registered_total.inc();
257 2 : HttpEndpointPoolsGuard {
258 2 : dec: &self.http_pool_endpoints_unregistered_total,
259 2 : }
260 2 : }
261 : }
262 : pub struct NumDbConnectionsGauge;
263 : impl CounterPairAssoc for NumDbConnectionsGauge {
264 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
265 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
266 : const INC_HELP: &'static str = "Number of opened connections to a database.";
267 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
268 : type LabelGroupSet = StaticLabelSet<Protocol>;
269 : }
270 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
271 :
272 : pub struct NumClientConnectionsGauge;
273 : impl CounterPairAssoc for NumClientConnectionsGauge {
274 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
275 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
276 : const INC_HELP: &'static str = "Number of opened connections from a client.";
277 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
278 : type LabelGroupSet = StaticLabelSet<Protocol>;
279 : }
280 : pub type NumClientConnectionsGuard<'a> =
281 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
282 :
283 : pub struct NumConnectionRequestsGauge;
284 : impl CounterPairAssoc for NumConnectionRequestsGauge {
285 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
286 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
287 : const INC_HELP: &'static str = "Number of client connections accepted.";
288 : const DEC_HELP: &'static str = "Number of client connections closed.";
289 : type LabelGroupSet = StaticLabelSet<Protocol>;
290 : }
291 : pub type NumConnectionRequestsGuard<'a> =
292 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
293 :
294 240 : #[derive(LabelGroup)]
295 : #[label(set = ComputeConnectionLatencySet)]
296 : pub struct ComputeConnectionLatencyGroup {
297 : protocol: Protocol,
298 : cold_start_info: ColdStartInfo,
299 : outcome: ConnectOutcome,
300 : excluded: LatencyExclusions,
301 : }
302 :
303 : #[derive(FixedCardinalityLabel, Copy, Clone)]
304 : pub enum LatencyExclusions {
305 : Client,
306 : ClientAndCplane,
307 : ClientCplaneCompute,
308 : ClientCplaneComputeRetry,
309 : }
310 :
311 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
312 : #[label(singleton = "kind")]
313 : pub enum SniKind {
314 : Sni,
315 : NoSni,
316 : PasswordHack,
317 : }
318 :
319 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
320 : #[label(singleton = "kind")]
321 : pub enum ConnectionFailureKind {
322 : ComputeCached,
323 : ComputeUncached,
324 : }
325 :
326 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
327 : #[label(singleton = "kind")]
328 : pub enum WakeupFailureKind {
329 : BadComputeAddress,
330 : ApiTransportError,
331 : QuotaExceeded,
332 : ApiConsoleLocked,
333 : ApiConsoleBadRequest,
334 : ApiConsoleOtherServerError,
335 : ApiConsoleOtherError,
336 : TimeoutError,
337 : }
338 :
339 160 : #[derive(LabelGroup)]
340 : #[label(set = ConnectionFailuresBreakdownSet)]
341 : pub struct ConnectionFailuresBreakdownGroup {
342 : pub kind: WakeupFailureKind,
343 : pub retry: Bool,
344 : }
345 :
346 80 : #[derive(LabelGroup, Copy, Clone)]
347 : #[label(set = RedisErrorsSet)]
348 : pub struct RedisErrors<'a> {
349 : #[label(dynamic_with = ThreadedRodeo, default)]
350 : pub channel: &'a str,
351 : }
352 :
353 : #[derive(FixedCardinalityLabel, Copy, Clone)]
354 : pub enum CancellationSource {
355 : FromClient,
356 : FromRedis,
357 : Local,
358 : }
359 :
360 : #[derive(FixedCardinalityLabel, Copy, Clone)]
361 : pub enum CancellationOutcome {
362 : NotFound,
363 : Found,
364 : }
365 :
366 160 : #[derive(LabelGroup)]
367 : #[label(set = CancellationRequestSet)]
368 : pub struct CancellationRequest {
369 : pub source: CancellationSource,
370 : pub kind: CancellationOutcome,
371 : }
372 :
373 : #[derive(Clone, Copy)]
374 : pub enum Waiting {
375 : Cplane,
376 : Client,
377 : Compute,
378 : RetryTimeout,
379 : }
380 :
381 : #[derive(Default)]
382 : struct Accumulated {
383 : cplane: time::Duration,
384 : client: time::Duration,
385 : compute: time::Duration,
386 : retry: time::Duration,
387 : }
388 :
389 : pub struct LatencyTimer {
390 : // time since the stopwatch was started
391 : start: time::Instant,
392 : // time since the stopwatch was stopped
393 : stop: Option<time::Instant>,
394 : // accumulated time on the stopwatch
395 : accumulated: Accumulated,
396 : // label data
397 : protocol: Protocol,
398 : cold_start_info: ColdStartInfo,
399 : outcome: ConnectOutcome,
400 :
401 : skip_reporting: bool,
402 : }
403 :
404 : impl LatencyTimer {
405 61 : pub fn new(protocol: Protocol) -> Self {
406 61 : Self {
407 61 : start: time::Instant::now(),
408 61 : stop: None,
409 61 : accumulated: Accumulated::default(),
410 61 : protocol,
411 61 : cold_start_info: ColdStartInfo::Unknown,
412 61 : // assume failed unless otherwise specified
413 61 : outcome: ConnectOutcome::Failed,
414 61 : skip_reporting: false,
415 61 : }
416 61 : }
417 :
418 0 : pub(crate) fn noop(protocol: Protocol) -> Self {
419 0 : Self {
420 0 : start: time::Instant::now(),
421 0 : stop: None,
422 0 : accumulated: Accumulated::default(),
423 0 : protocol,
424 0 : cold_start_info: ColdStartInfo::Unknown,
425 0 : // assume failed unless otherwise specified
426 0 : outcome: ConnectOutcome::Failed,
427 0 : skip_reporting: true,
428 0 : }
429 0 : }
430 :
431 22 : pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
432 22 : let dur = start.elapsed();
433 22 : match waiting_for {
434 0 : Waiting::Cplane => self.accumulated.cplane += dur,
435 15 : Waiting::Client => self.accumulated.client += dur,
436 1 : Waiting::Compute => self.accumulated.compute += dur,
437 6 : Waiting::RetryTimeout => self.accumulated.retry += dur,
438 : }
439 22 : }
440 :
441 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
442 0 : self.cold_start_info = cold_start_info;
443 0 : }
444 :
445 4 : pub fn success(&mut self) {
446 4 : // stop the stopwatch and record the time that we have accumulated
447 4 : self.stop = Some(time::Instant::now());
448 4 :
449 4 : // success
450 4 : self.outcome = ConnectOutcome::Success;
451 4 : }
452 : }
453 :
454 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
455 : pub enum ConnectOutcome {
456 : Success,
457 : Failed,
458 : }
459 :
460 : impl Drop for LatencyTimer {
461 61 : fn drop(&mut self) {
462 61 : if self.skip_reporting {
463 0 : return;
464 61 : }
465 61 :
466 61 : let duration = self
467 61 : .stop
468 61 : .unwrap_or_else(time::Instant::now)
469 61 : .duration_since(self.start);
470 61 :
471 61 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
472 61 :
473 61 : // Excluding client communication from the accumulated time.
474 61 : metric.observe(
475 61 : ComputeConnectionLatencyGroup {
476 61 : protocol: self.protocol,
477 61 : cold_start_info: self.cold_start_info,
478 61 : outcome: self.outcome,
479 61 : excluded: LatencyExclusions::Client,
480 61 : },
481 61 : duration
482 61 : .saturating_sub(self.accumulated.client)
483 61 : .as_secs_f64(),
484 61 : );
485 61 :
486 61 : // Exclude client and cplane communication from the accumulated time.
487 61 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
488 61 : metric.observe(
489 61 : ComputeConnectionLatencyGroup {
490 61 : protocol: self.protocol,
491 61 : cold_start_info: self.cold_start_info,
492 61 : outcome: self.outcome,
493 61 : excluded: LatencyExclusions::ClientAndCplane,
494 61 : },
495 61 : duration.saturating_sub(accumulated_total).as_secs_f64(),
496 61 : );
497 61 :
498 61 : // Exclude client cplane, compue communication from the accumulated time.
499 61 : let accumulated_total =
500 61 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
501 61 : metric.observe(
502 61 : ComputeConnectionLatencyGroup {
503 61 : protocol: self.protocol,
504 61 : cold_start_info: self.cold_start_info,
505 61 : outcome: self.outcome,
506 61 : excluded: LatencyExclusions::ClientCplaneCompute,
507 61 : },
508 61 : duration.saturating_sub(accumulated_total).as_secs_f64(),
509 61 : );
510 61 :
511 61 : // Exclude client cplane, compue, retry communication from the accumulated time.
512 61 : let accumulated_total = self.accumulated.client
513 61 : + self.accumulated.cplane
514 61 : + self.accumulated.compute
515 61 : + self.accumulated.retry;
516 61 : metric.observe(
517 61 : ComputeConnectionLatencyGroup {
518 61 : protocol: self.protocol,
519 61 : cold_start_info: self.cold_start_info,
520 61 : outcome: self.outcome,
521 61 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
522 61 : },
523 61 : duration.saturating_sub(accumulated_total).as_secs_f64(),
524 61 : );
525 61 : }
526 : }
527 :
528 : impl From<bool> for Bool {
529 3 : fn from(value: bool) -> Self {
530 3 : if value {
531 2 : Bool::True
532 : } else {
533 1 : Bool::False
534 : }
535 3 : }
536 : }
537 :
538 200 : #[derive(LabelGroup)]
539 : #[label(set = InvalidEndpointsSet)]
540 : pub struct InvalidEndpointsGroup {
541 : pub protocol: Protocol,
542 : pub rejected: Bool,
543 : pub outcome: ConnectOutcome,
544 : }
545 :
546 160 : #[derive(LabelGroup)]
547 : #[label(set = RetriesMetricSet)]
548 : pub struct RetriesMetricGroup {
549 : pub outcome: ConnectOutcome,
550 : pub retry_type: RetryType,
551 : }
552 :
553 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
554 : pub enum RetryType {
555 : WakeCompute,
556 : ConnectToCompute,
557 : }
558 :
559 0 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
560 : #[label(singleton = "event")]
561 : pub enum RedisEventsCount {
562 : EndpointCreated,
563 : BranchCreated,
564 : ProjectCreated,
565 : CancelSession,
566 : PasswordUpdate,
567 : AllowedIpsUpdate,
568 : }
569 :
570 : pub struct ThreadPoolWorkers(usize);
571 : #[derive(Copy, Clone)]
572 : pub struct ThreadPoolWorkerId(pub usize);
573 :
574 : impl LabelValue for ThreadPoolWorkerId {
575 0 : fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
576 0 : v.write_int(self.0 as i64)
577 0 : }
578 : }
579 :
580 : impl LabelGroup for ThreadPoolWorkerId {
581 0 : fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
582 0 : v.write_value(LabelName::from_str("worker"), self);
583 0 : }
584 : }
585 :
586 : impl LabelGroupSet for ThreadPoolWorkers {
587 : type Group<'a> = ThreadPoolWorkerId;
588 :
589 92 : fn cardinality(&self) -> Option<usize> {
590 92 : Some(self.0)
591 92 : }
592 :
593 5 : fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
594 5 : Some(value)
595 5 : }
596 :
597 0 : fn decode_dense(&self, value: usize) -> Self::Group<'_> {
598 0 : ThreadPoolWorkerId(value)
599 0 : }
600 :
601 : type Unique = usize;
602 :
603 5 : fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
604 5 : Some(value.0)
605 5 : }
606 :
607 0 : fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
608 0 : ThreadPoolWorkerId(*value)
609 0 : }
610 : }
611 :
612 : impl LabelSet for ThreadPoolWorkers {
613 : type Value<'a> = ThreadPoolWorkerId;
614 :
615 0 : fn dynamic_cardinality(&self) -> Option<usize> {
616 0 : Some(self.0)
617 0 : }
618 :
619 0 : fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
620 0 : (value.0 < self.0).then_some(value.0)
621 0 : }
622 :
623 0 : fn decode(&self, value: usize) -> Self::Value<'_> {
624 0 : ThreadPoolWorkerId(value)
625 0 : }
626 : }
627 :
628 : impl FixedCardinalitySet for ThreadPoolWorkers {
629 0 : fn cardinality(&self) -> usize {
630 0 : self.0
631 0 : }
632 : }
633 :
634 46 : #[derive(MetricGroup)]
635 : #[metric(new(workers: usize))]
636 : pub struct ThreadPoolMetrics {
637 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
638 : pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
639 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
640 : pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
641 : }
|