Line data Source code
1 : use std::sync::{Arc, OnceLock};
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::label::{
5 : FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
6 : };
7 : use measured::metric::histogram::Thresholds;
8 : use measured::metric::name::MetricName;
9 : use measured::{
10 : Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
11 : MetricGroup,
12 : };
13 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
14 : use tokio::time::{self, Instant};
15 :
16 : use crate::control_plane::messages::ColdStartInfo;
17 : use crate::error::ErrorKind;
18 :
19 40 : #[derive(MetricGroup)]
20 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
21 : pub struct Metrics {
22 : #[metric(namespace = "proxy")]
23 : #[metric(init = ProxyMetrics::new(thread_pool))]
24 : pub proxy: ProxyMetrics,
25 :
26 : #[metric(namespace = "wake_compute_lock")]
27 : pub wake_compute_lock: ApiLockMetrics,
28 : }
29 :
30 : static SELF: OnceLock<Metrics> = OnceLock::new();
31 : impl Metrics {
32 0 : pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
33 0 : SELF.set(Metrics::new(thread_pool))
34 0 : .ok()
35 0 : .expect("proxy metrics must not be installed more than once");
36 0 : }
37 :
38 141 : pub fn get() -> &'static Self {
39 141 : #[cfg(test)]
40 141 : return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
41 141 :
42 141 : #[cfg(not(test))]
43 141 : SELF.get()
44 141 : .expect("proxy metrics must be installed by the main() function")
45 141 : }
46 : }
47 :
48 40 : #[derive(MetricGroup)]
49 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
50 : pub struct ProxyMetrics {
51 : #[metric(flatten)]
52 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
53 : #[metric(flatten)]
54 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
55 : #[metric(flatten)]
56 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
57 : #[metric(flatten)]
58 : pub http_endpoint_pools: HttpEndpointPools,
59 :
60 : /// Time it took for proxy to establish a connection to the compute endpoint.
61 : // largest bucket = 2^16 * 0.5ms = 32s
62 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
63 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
64 :
65 : /// Time it took for proxy to receive a response from control plane.
66 : #[metric(
67 : // largest bucket = 2^16 * 0.2ms = 13s
68 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
69 : )]
70 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
71 :
72 : /// Time it takes to acquire a token to call console plane.
73 : // largest bucket = 3^16 * 0.05ms = 2.15s
74 : #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
75 : pub control_plane_token_acquire_seconds: Histogram<16>,
76 :
77 : /// Size of the HTTP request body lengths.
78 : // smallest bucket = 16 bytes
79 : // largest bucket = 4^12 * 16 bytes = 256MB
80 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
81 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
82 :
83 : /// Time it takes to reclaim unused connection pools.
84 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
85 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
86 :
87 : /// Number of opened connections to a database.
88 : pub http_pool_opened_connections: Gauge,
89 :
90 : /// Number of cache hits/misses for allowed ips.
91 : pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
92 :
93 : /// Number of allowed ips
94 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
95 : pub allowed_ips_number: Histogram<10>,
96 :
97 : /// Number of connections (per sni).
98 : pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
99 :
100 : /// Number of connection failures (per kind).
101 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
102 :
103 : /// Number of wake-up failures (per kind).
104 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
105 :
106 : /// Number of bytes sent/received between all clients and backends.
107 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
108 :
109 : /// Number of errors by a given classification.
110 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
111 :
112 : /// Number of cancellation requests (per found/not_found).
113 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
114 :
115 : /// Number of errors by a given classification
116 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
117 :
118 : /// Number of TLS handshake failures
119 : pub tls_handshake_failures: Counter,
120 :
121 : /// Number of connection requests affected by authentication rate limits
122 : pub requests_auth_rate_limits_total: Counter,
123 :
124 : /// HLL approximate cardinality of endpoints that are connecting
125 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
126 :
127 : /// Number of endpoints affected by errors of a given classification
128 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
129 :
130 : /// Number of endpoints affected by authentication rate limits
131 : pub endpoints_auth_rate_limits: HyperLogLog<32>,
132 :
133 : /// Number of invalid endpoints (per protocol, per rejected).
134 : pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
135 :
136 : /// Number of retries (per outcome, per retry_type).
137 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
138 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
139 :
140 : /// Number of events consumed from redis (per event type).
141 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
142 :
143 : #[metric(namespace = "connect_compute_lock")]
144 : pub connect_compute_lock: ApiLockMetrics,
145 :
146 : #[metric(namespace = "scram_pool")]
147 : #[metric(init = thread_pool)]
148 : pub scram_pool: Arc<ThreadPoolMetrics>,
149 : }
150 :
151 80 : #[derive(MetricGroup)]
152 : #[metric(new())]
153 : pub struct ApiLockMetrics {
154 : /// Number of semaphores registered in this api lock
155 : pub semaphores_registered: Counter,
156 : /// Number of semaphores unregistered in this api lock
157 : pub semaphores_unregistered: Counter,
158 : /// Time it takes to reclaim unused semaphores in the api lock
159 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
160 : pub reclamation_lag_seconds: Histogram<16>,
161 : /// Time it takes to acquire a semaphore lock
162 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
163 : pub semaphore_acquire_seconds: Histogram<16>,
164 : }
165 :
166 : impl Default for ApiLockMetrics {
167 80 : fn default() -> Self {
168 80 : Self::new()
169 80 : }
170 : }
171 :
172 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
173 : #[label(singleton = "direction")]
174 : pub enum HttpDirection {
175 : Request,
176 : Response,
177 : }
178 :
179 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
180 : #[label(singleton = "direction")]
181 : pub enum Direction {
182 : Tx,
183 : Rx,
184 : }
185 :
186 0 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
187 : #[label(singleton = "protocol")]
188 : pub enum Protocol {
189 : Http,
190 : Ws,
191 : Tcp,
192 : SniRouter,
193 : }
194 :
195 : impl Protocol {
196 0 : pub fn as_str(&self) -> &'static str {
197 0 : match self {
198 0 : Protocol::Http => "http",
199 0 : Protocol::Ws => "ws",
200 0 : Protocol::Tcp => "tcp",
201 0 : Protocol::SniRouter => "sni_router",
202 : }
203 0 : }
204 : }
205 :
206 : impl std::fmt::Display for Protocol {
207 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 0 : f.write_str(self.as_str())
209 0 : }
210 : }
211 :
212 : #[derive(FixedCardinalityLabel, Copy, Clone)]
213 : pub enum Bool {
214 : True,
215 : False,
216 : }
217 :
218 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
219 : #[label(singleton = "outcome")]
220 : pub enum Outcome {
221 : Success,
222 : Failed,
223 : }
224 :
225 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
226 : #[label(singleton = "outcome")]
227 : pub enum CacheOutcome {
228 : Hit,
229 : Miss,
230 : }
231 :
232 80 : #[derive(LabelGroup)]
233 : #[label(set = ConsoleRequestSet)]
234 : pub struct ConsoleRequest<'a> {
235 : #[label(dynamic_with = ThreadedRodeo, default)]
236 : pub request: &'a str,
237 : }
238 :
239 : #[derive(MetricGroup, Default)]
240 : pub struct HttpEndpointPools {
241 : /// Number of endpoints we have registered pools for
242 : pub http_pool_endpoints_registered_total: Counter,
243 : /// Number of endpoints we have unregistered pools for
244 : pub http_pool_endpoints_unregistered_total: Counter,
245 : }
246 :
247 : pub struct HttpEndpointPoolsGuard<'a> {
248 : dec: &'a Counter,
249 : }
250 :
251 : impl Drop for HttpEndpointPoolsGuard<'_> {
252 2 : fn drop(&mut self) {
253 2 : self.dec.inc();
254 2 : }
255 : }
256 :
257 : impl HttpEndpointPools {
258 2 : pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
259 2 : self.http_pool_endpoints_registered_total.inc();
260 2 : HttpEndpointPoolsGuard {
261 2 : dec: &self.http_pool_endpoints_unregistered_total,
262 2 : }
263 2 : }
264 : }
265 : pub struct NumDbConnectionsGauge;
266 : impl CounterPairAssoc for NumDbConnectionsGauge {
267 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
268 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
269 : const INC_HELP: &'static str = "Number of opened connections to a database.";
270 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
271 : type LabelGroupSet = StaticLabelSet<Protocol>;
272 : }
273 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
274 :
275 : pub struct NumClientConnectionsGauge;
276 : impl CounterPairAssoc for NumClientConnectionsGauge {
277 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
278 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
279 : const INC_HELP: &'static str = "Number of opened connections from a client.";
280 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
281 : type LabelGroupSet = StaticLabelSet<Protocol>;
282 : }
283 : pub type NumClientConnectionsGuard<'a> =
284 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
285 :
286 : pub struct NumConnectionRequestsGauge;
287 : impl CounterPairAssoc for NumConnectionRequestsGauge {
288 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
289 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
290 : const INC_HELP: &'static str = "Number of client connections accepted.";
291 : const DEC_HELP: &'static str = "Number of client connections closed.";
292 : type LabelGroupSet = StaticLabelSet<Protocol>;
293 : }
294 : pub type NumConnectionRequestsGuard<'a> =
295 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
296 :
297 240 : #[derive(LabelGroup)]
298 : #[label(set = ComputeConnectionLatencySet)]
299 : pub struct ComputeConnectionLatencyGroup {
300 : protocol: Protocol,
301 : cold_start_info: ColdStartInfo,
302 : outcome: ConnectOutcome,
303 : excluded: LatencyExclusions,
304 : }
305 :
306 : #[derive(FixedCardinalityLabel, Copy, Clone)]
307 : pub enum LatencyExclusions {
308 : Client,
309 : ClientAndCplane,
310 : ClientCplaneCompute,
311 : ClientCplaneComputeRetry,
312 : }
313 :
314 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
315 : #[label(singleton = "kind")]
316 : pub enum SniKind {
317 : Sni,
318 : NoSni,
319 : PasswordHack,
320 : }
321 :
322 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
323 : #[label(singleton = "kind")]
324 : pub enum ConnectionFailureKind {
325 : ComputeCached,
326 : ComputeUncached,
327 : }
328 :
329 160 : #[derive(LabelGroup)]
330 : #[label(set = ConnectionFailuresBreakdownSet)]
331 : pub struct ConnectionFailuresBreakdownGroup {
332 : pub kind: ErrorKind,
333 : pub retry: Bool,
334 : }
335 :
336 80 : #[derive(LabelGroup, Copy, Clone)]
337 : #[label(set = RedisErrorsSet)]
338 : pub struct RedisErrors<'a> {
339 : #[label(dynamic_with = ThreadedRodeo, default)]
340 : pub channel: &'a str,
341 : }
342 :
343 : #[derive(FixedCardinalityLabel, Copy, Clone)]
344 : pub enum CancellationSource {
345 : FromClient,
346 : FromRedis,
347 : Local,
348 : }
349 :
350 : #[derive(FixedCardinalityLabel, Copy, Clone)]
351 : pub enum CancellationOutcome {
352 : NotFound,
353 : Found,
354 : }
355 :
356 160 : #[derive(LabelGroup)]
357 : #[label(set = CancellationRequestSet)]
358 : pub struct CancellationRequest {
359 : pub source: CancellationSource,
360 : pub kind: CancellationOutcome,
361 : }
362 :
363 : #[derive(Clone, Copy)]
364 : pub enum Waiting {
365 : Cplane,
366 : Client,
367 : Compute,
368 : RetryTimeout,
369 : }
370 :
371 : #[derive(Default)]
372 : struct Accumulated {
373 : cplane: time::Duration,
374 : client: time::Duration,
375 : compute: time::Duration,
376 : retry: time::Duration,
377 : }
378 :
379 : pub struct LatencyTimer {
380 : // time since the stopwatch was started
381 : start: time::Instant,
382 : // time since the stopwatch was stopped
383 : stop: Option<time::Instant>,
384 : // accumulated time on the stopwatch
385 : accumulated: Accumulated,
386 : // label data
387 : protocol: Protocol,
388 : cold_start_info: ColdStartInfo,
389 : outcome: ConnectOutcome,
390 :
391 : skip_reporting: bool,
392 : }
393 :
394 : impl LatencyTimer {
395 67 : pub fn new(protocol: Protocol) -> Self {
396 67 : Self {
397 67 : start: time::Instant::now(),
398 67 : stop: None,
399 67 : accumulated: Accumulated::default(),
400 67 : protocol,
401 67 : cold_start_info: ColdStartInfo::Unknown,
402 67 : // assume failed unless otherwise specified
403 67 : outcome: ConnectOutcome::Failed,
404 67 : skip_reporting: false,
405 67 : }
406 67 : }
407 :
408 0 : pub(crate) fn noop(protocol: Protocol) -> Self {
409 0 : Self {
410 0 : start: time::Instant::now(),
411 0 : stop: None,
412 0 : accumulated: Accumulated::default(),
413 0 : protocol,
414 0 : cold_start_info: ColdStartInfo::Unknown,
415 0 : // assume failed unless otherwise specified
416 0 : outcome: ConnectOutcome::Failed,
417 0 : skip_reporting: true,
418 0 : }
419 0 : }
420 :
421 22 : pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
422 22 : let dur = start.elapsed();
423 22 : match waiting_for {
424 0 : Waiting::Cplane => self.accumulated.cplane += dur,
425 15 : Waiting::Client => self.accumulated.client += dur,
426 1 : Waiting::Compute => self.accumulated.compute += dur,
427 6 : Waiting::RetryTimeout => self.accumulated.retry += dur,
428 : }
429 22 : }
430 :
431 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
432 0 : self.cold_start_info = cold_start_info;
433 0 : }
434 :
435 4 : pub fn success(&mut self) {
436 4 : // stop the stopwatch and record the time that we have accumulated
437 4 : self.stop = Some(time::Instant::now());
438 4 :
439 4 : // success
440 4 : self.outcome = ConnectOutcome::Success;
441 4 : }
442 : }
443 :
444 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
445 : pub enum ConnectOutcome {
446 : Success,
447 : Failed,
448 : }
449 :
450 : impl Drop for LatencyTimer {
451 67 : fn drop(&mut self) {
452 67 : if self.skip_reporting {
453 0 : return;
454 67 : }
455 67 :
456 67 : let duration = self
457 67 : .stop
458 67 : .unwrap_or_else(time::Instant::now)
459 67 : .duration_since(self.start);
460 67 :
461 67 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
462 67 :
463 67 : // Excluding client communication from the accumulated time.
464 67 : metric.observe(
465 67 : ComputeConnectionLatencyGroup {
466 67 : protocol: self.protocol,
467 67 : cold_start_info: self.cold_start_info,
468 67 : outcome: self.outcome,
469 67 : excluded: LatencyExclusions::Client,
470 67 : },
471 67 : duration
472 67 : .saturating_sub(self.accumulated.client)
473 67 : .as_secs_f64(),
474 67 : );
475 67 :
476 67 : // Exclude client and cplane communication from the accumulated time.
477 67 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
478 67 : metric.observe(
479 67 : ComputeConnectionLatencyGroup {
480 67 : protocol: self.protocol,
481 67 : cold_start_info: self.cold_start_info,
482 67 : outcome: self.outcome,
483 67 : excluded: LatencyExclusions::ClientAndCplane,
484 67 : },
485 67 : duration.saturating_sub(accumulated_total).as_secs_f64(),
486 67 : );
487 67 :
488 67 : // Exclude client cplane, compue communication from the accumulated time.
489 67 : let accumulated_total =
490 67 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
491 67 : metric.observe(
492 67 : ComputeConnectionLatencyGroup {
493 67 : protocol: self.protocol,
494 67 : cold_start_info: self.cold_start_info,
495 67 : outcome: self.outcome,
496 67 : excluded: LatencyExclusions::ClientCplaneCompute,
497 67 : },
498 67 : duration.saturating_sub(accumulated_total).as_secs_f64(),
499 67 : );
500 67 :
501 67 : // Exclude client cplane, compue, retry communication from the accumulated time.
502 67 : let accumulated_total = self.accumulated.client
503 67 : + self.accumulated.cplane
504 67 : + self.accumulated.compute
505 67 : + self.accumulated.retry;
506 67 : metric.observe(
507 67 : ComputeConnectionLatencyGroup {
508 67 : protocol: self.protocol,
509 67 : cold_start_info: self.cold_start_info,
510 67 : outcome: self.outcome,
511 67 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
512 67 : },
513 67 : duration.saturating_sub(accumulated_total).as_secs_f64(),
514 67 : );
515 67 : }
516 : }
517 :
518 : impl From<bool> for Bool {
519 3 : fn from(value: bool) -> Self {
520 3 : if value {
521 2 : Bool::True
522 : } else {
523 1 : Bool::False
524 : }
525 3 : }
526 : }
527 :
528 200 : #[derive(LabelGroup)]
529 : #[label(set = InvalidEndpointsSet)]
530 : pub struct InvalidEndpointsGroup {
531 : pub protocol: Protocol,
532 : pub rejected: Bool,
533 : pub outcome: ConnectOutcome,
534 : }
535 :
536 160 : #[derive(LabelGroup)]
537 : #[label(set = RetriesMetricSet)]
538 : pub struct RetriesMetricGroup {
539 : pub outcome: ConnectOutcome,
540 : pub retry_type: RetryType,
541 : }
542 :
543 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
544 : pub enum RetryType {
545 : WakeCompute,
546 : ConnectToCompute,
547 : }
548 :
549 0 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
550 : #[label(singleton = "event")]
551 : pub enum RedisEventsCount {
552 : EndpointCreated,
553 : BranchCreated,
554 : ProjectCreated,
555 : CancelSession,
556 : PasswordUpdate,
557 : AllowedIpsUpdate,
558 : }
559 :
560 : pub struct ThreadPoolWorkers(usize);
561 : #[derive(Copy, Clone)]
562 : pub struct ThreadPoolWorkerId(pub usize);
563 :
564 : impl LabelValue for ThreadPoolWorkerId {
565 0 : fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
566 0 : v.write_int(self.0 as i64)
567 0 : }
568 : }
569 :
570 : impl LabelGroup for ThreadPoolWorkerId {
571 0 : fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
572 0 : v.write_value(LabelName::from_str("worker"), self);
573 0 : }
574 : }
575 :
576 : impl LabelGroupSet for ThreadPoolWorkers {
577 : type Group<'a> = ThreadPoolWorkerId;
578 :
579 92 : fn cardinality(&self) -> Option<usize> {
580 92 : Some(self.0)
581 92 : }
582 :
583 6 : fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
584 6 : Some(value)
585 6 : }
586 :
587 0 : fn decode_dense(&self, value: usize) -> Self::Group<'_> {
588 0 : ThreadPoolWorkerId(value)
589 0 : }
590 :
591 : type Unique = usize;
592 :
593 6 : fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
594 6 : Some(value.0)
595 6 : }
596 :
597 0 : fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
598 0 : ThreadPoolWorkerId(*value)
599 0 : }
600 : }
601 :
602 : impl LabelSet for ThreadPoolWorkers {
603 : type Value<'a> = ThreadPoolWorkerId;
604 :
605 0 : fn dynamic_cardinality(&self) -> Option<usize> {
606 0 : Some(self.0)
607 0 : }
608 :
609 0 : fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
610 0 : (value.0 < self.0).then_some(value.0)
611 0 : }
612 :
613 0 : fn decode(&self, value: usize) -> Self::Value<'_> {
614 0 : ThreadPoolWorkerId(value)
615 0 : }
616 : }
617 :
618 : impl FixedCardinalitySet for ThreadPoolWorkers {
619 0 : fn cardinality(&self) -> usize {
620 0 : self.0
621 0 : }
622 : }
623 :
624 46 : #[derive(MetricGroup)]
625 : #[metric(new(workers: usize))]
626 : pub struct ThreadPoolMetrics {
627 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
628 : pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
629 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
630 : pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
631 : }
|