Line data Source code
1 : use std::sync::{Arc, OnceLock};
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::label::{
5 : FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
6 : };
7 : use measured::metric::histogram::Thresholds;
8 : use measured::metric::name::MetricName;
9 : use measured::{
10 : Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
11 : MetricGroup,
12 : };
13 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
14 : use tokio::time::{self, Instant};
15 :
16 : use crate::control_plane::messages::ColdStartInfo;
17 : use crate::error::ErrorKind;
18 :
19 45 : #[derive(MetricGroup)]
20 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
21 : pub struct Metrics {
22 : #[metric(namespace = "proxy")]
23 : #[metric(init = ProxyMetrics::new(thread_pool))]
24 : pub proxy: ProxyMetrics,
25 :
26 : #[metric(namespace = "wake_compute_lock")]
27 : pub wake_compute_lock: ApiLockMetrics,
28 : }
29 :
30 : static SELF: OnceLock<Metrics> = OnceLock::new();
31 : impl Metrics {
32 0 : pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
33 0 : SELF.set(Metrics::new(thread_pool))
34 0 : .ok()
35 0 : .expect("proxy metrics must not be installed more than once");
36 0 : }
37 :
38 144 : pub fn get() -> &'static Self {
39 144 : #[cfg(test)]
40 144 : return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
41 144 :
42 144 : #[cfg(not(test))]
43 144 : SELF.get()
44 144 : .expect("proxy metrics must be installed by the main() function")
45 144 : }
46 : }
47 :
48 45 : #[derive(MetricGroup)]
49 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
50 : pub struct ProxyMetrics {
51 : #[metric(flatten)]
52 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
53 : #[metric(flatten)]
54 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
55 : #[metric(flatten)]
56 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
57 : #[metric(flatten)]
58 : pub http_endpoint_pools: HttpEndpointPools,
59 :
60 : /// Time it took for proxy to establish a connection to the compute endpoint.
61 : // largest bucket = 2^16 * 0.5ms = 32s
62 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
63 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
64 :
65 : /// Time it took for proxy to receive a response from control plane.
66 : #[metric(
67 : // largest bucket = 2^16 * 0.2ms = 13s
68 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
69 : )]
70 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
71 :
72 : /// Time it takes to acquire a token to call console plane.
73 : // largest bucket = 3^16 * 0.05ms = 2.15s
74 : #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
75 : pub control_plane_token_acquire_seconds: Histogram<16>,
76 :
77 : /// Size of the HTTP request body lengths.
78 : // smallest bucket = 16 bytes
79 : // largest bucket = 4^12 * 16 bytes = 256MB
80 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
81 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
82 :
83 : /// Time it takes to reclaim unused connection pools.
84 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
85 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
86 :
87 : /// Number of opened connections to a database.
88 : pub http_pool_opened_connections: Gauge,
89 :
90 : /// Number of cache hits/misses for allowed ips.
91 : pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
92 :
93 : /// Number of allowed ips
94 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
95 : pub allowed_ips_number: Histogram<10>,
96 :
97 : /// Number of connections (per sni).
98 : pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
99 :
100 : /// Number of connection failures (per kind).
101 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
102 :
103 : /// Number of wake-up failures (per kind).
104 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
105 :
106 : /// Number of bytes sent/received between all clients and backends.
107 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
108 :
109 : /// Number of errors by a given classification.
110 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
111 :
112 : /// Number of cancellation requests (per found/not_found).
113 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
114 :
115 : /// Number of errors by a given classification
116 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
117 :
118 : /// Number of TLS handshake failures
119 : pub tls_handshake_failures: Counter,
120 :
121 : /// Number of connection requests affected by authentication rate limits
122 : pub requests_auth_rate_limits_total: Counter,
123 :
124 : /// HLL approximate cardinality of endpoints that are connecting
125 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
126 :
127 : /// Number of endpoints affected by errors of a given classification
128 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
129 :
130 : /// Number of endpoints affected by authentication rate limits
131 : pub endpoints_auth_rate_limits: HyperLogLog<32>,
132 :
133 : /// Number of invalid endpoints (per protocol, per rejected).
134 : pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
135 :
136 : /// Number of retries (per outcome, per retry_type).
137 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
138 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
139 :
140 : /// Number of events consumed from redis (per event type).
141 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
142 :
143 : #[metric(namespace = "connect_compute_lock")]
144 : pub connect_compute_lock: ApiLockMetrics,
145 :
146 : #[metric(namespace = "scram_pool")]
147 : #[metric(init = thread_pool)]
148 : pub scram_pool: Arc<ThreadPoolMetrics>,
149 : }
150 :
151 90 : #[derive(MetricGroup)]
152 : #[metric(new())]
153 : pub struct ApiLockMetrics {
154 : /// Number of semaphores registered in this api lock
155 : pub semaphores_registered: Counter,
156 : /// Number of semaphores unregistered in this api lock
157 : pub semaphores_unregistered: Counter,
158 : /// Time it takes to reclaim unused semaphores in the api lock
159 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
160 : pub reclamation_lag_seconds: Histogram<16>,
161 : /// Time it takes to acquire a semaphore lock
162 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
163 : pub semaphore_acquire_seconds: Histogram<16>,
164 : }
165 :
166 : impl Default for ApiLockMetrics {
167 90 : fn default() -> Self {
168 90 : Self::new()
169 90 : }
170 : }
171 :
172 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
173 : #[label(singleton = "direction")]
174 : pub enum HttpDirection {
175 : Request,
176 : Response,
177 : }
178 :
179 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
180 : #[label(singleton = "direction")]
181 : pub enum Direction {
182 : Tx,
183 : Rx,
184 : }
185 :
186 0 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
187 : #[label(singleton = "protocol")]
188 : pub enum Protocol {
189 : Http,
190 : Ws,
191 : Tcp,
192 : SniRouter,
193 : }
194 :
195 : impl Protocol {
196 0 : pub fn as_str(&self) -> &'static str {
197 0 : match self {
198 0 : Protocol::Http => "http",
199 0 : Protocol::Ws => "ws",
200 0 : Protocol::Tcp => "tcp",
201 0 : Protocol::SniRouter => "sni_router",
202 : }
203 0 : }
204 : }
205 :
206 : impl std::fmt::Display for Protocol {
207 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 0 : f.write_str(self.as_str())
209 0 : }
210 : }
211 :
212 : #[derive(FixedCardinalityLabel, Copy, Clone)]
213 : pub enum Bool {
214 : True,
215 : False,
216 : }
217 :
218 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
219 : #[label(singleton = "outcome")]
220 : pub enum Outcome {
221 : Success,
222 : Failed,
223 : }
224 :
225 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
226 : #[label(singleton = "outcome")]
227 : pub enum CacheOutcome {
228 : Hit,
229 : Miss,
230 : }
231 :
232 90 : #[derive(LabelGroup)]
233 : #[label(set = ConsoleRequestSet)]
234 : pub struct ConsoleRequest<'a> {
235 : #[label(dynamic_with = ThreadedRodeo, default)]
236 : pub request: &'a str,
237 : }
238 :
239 : #[derive(MetricGroup, Default)]
240 : pub struct HttpEndpointPools {
241 : /// Number of endpoints we have registered pools for
242 : pub http_pool_endpoints_registered_total: Counter,
243 : /// Number of endpoints we have unregistered pools for
244 : pub http_pool_endpoints_unregistered_total: Counter,
245 : }
246 :
247 : pub struct HttpEndpointPoolsGuard<'a> {
248 : dec: &'a Counter,
249 : }
250 :
251 : impl Drop for HttpEndpointPoolsGuard<'_> {
252 2 : fn drop(&mut self) {
253 2 : self.dec.inc();
254 2 : }
255 : }
256 :
257 : impl HttpEndpointPools {
258 2 : pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
259 2 : self.http_pool_endpoints_registered_total.inc();
260 2 : HttpEndpointPoolsGuard {
261 2 : dec: &self.http_pool_endpoints_unregistered_total,
262 2 : }
263 2 : }
264 : }
265 : pub struct NumDbConnectionsGauge;
266 : impl CounterPairAssoc for NumDbConnectionsGauge {
267 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
268 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
269 : const INC_HELP: &'static str = "Number of opened connections to a database.";
270 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
271 : type LabelGroupSet = StaticLabelSet<Protocol>;
272 : }
273 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
274 :
275 : pub struct NumClientConnectionsGauge;
276 : impl CounterPairAssoc for NumClientConnectionsGauge {
277 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
278 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
279 : const INC_HELP: &'static str = "Number of opened connections from a client.";
280 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
281 : type LabelGroupSet = StaticLabelSet<Protocol>;
282 : }
283 : pub type NumClientConnectionsGuard<'a> =
284 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
285 :
286 : pub struct NumConnectionRequestsGauge;
287 : impl CounterPairAssoc for NumConnectionRequestsGauge {
288 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
289 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
290 : const INC_HELP: &'static str = "Number of client connections accepted.";
291 : const DEC_HELP: &'static str = "Number of client connections closed.";
292 : type LabelGroupSet = StaticLabelSet<Protocol>;
293 : }
294 : pub type NumConnectionRequestsGuard<'a> =
295 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
296 :
297 270 : #[derive(LabelGroup)]
298 : #[label(set = ComputeConnectionLatencySet)]
299 : pub struct ComputeConnectionLatencyGroup {
300 : protocol: Protocol,
301 : cold_start_info: ColdStartInfo,
302 : outcome: ConnectOutcome,
303 : excluded: LatencyExclusions,
304 : }
305 :
306 : #[derive(FixedCardinalityLabel, Copy, Clone)]
307 : pub enum LatencyExclusions {
308 : Client,
309 : ClientAndCplane,
310 : ClientCplaneCompute,
311 : ClientCplaneComputeRetry,
312 : }
313 :
314 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
315 : #[label(singleton = "kind")]
316 : pub enum SniKind {
317 : Sni,
318 : NoSni,
319 : PasswordHack,
320 : }
321 :
322 0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
323 : #[label(singleton = "kind")]
324 : pub enum ConnectionFailureKind {
325 : ComputeCached,
326 : ComputeUncached,
327 : }
328 :
329 180 : #[derive(LabelGroup)]
330 : #[label(set = ConnectionFailuresBreakdownSet)]
331 : pub struct ConnectionFailuresBreakdownGroup {
332 : pub kind: ErrorKind,
333 : pub retry: Bool,
334 : }
335 :
336 90 : #[derive(LabelGroup, Copy, Clone)]
337 : #[label(set = RedisErrorsSet)]
338 : pub struct RedisErrors<'a> {
339 : #[label(dynamic_with = ThreadedRodeo, default)]
340 : pub channel: &'a str,
341 : }
342 :
343 : #[derive(FixedCardinalityLabel, Copy, Clone)]
344 : pub enum CancellationSource {
345 : FromClient,
346 : FromRedis,
347 : Local,
348 : }
349 :
350 : #[derive(FixedCardinalityLabel, Copy, Clone)]
351 : pub enum CancellationOutcome {
352 : NotFound,
353 : Found,
354 : RateLimitExceeded,
355 : }
356 :
357 180 : #[derive(LabelGroup)]
358 : #[label(set = CancellationRequestSet)]
359 : pub struct CancellationRequest {
360 : pub source: CancellationSource,
361 : pub kind: CancellationOutcome,
362 : }
363 :
364 : #[derive(Clone, Copy)]
365 : pub enum Waiting {
366 : Cplane,
367 : Client,
368 : Compute,
369 : RetryTimeout,
370 : }
371 :
372 : #[derive(Default)]
373 : struct Accumulated {
374 : cplane: time::Duration,
375 : client: time::Duration,
376 : compute: time::Duration,
377 : retry: time::Duration,
378 : }
379 :
380 : pub struct LatencyTimer {
381 : // time since the stopwatch was started
382 : start: time::Instant,
383 : // time since the stopwatch was stopped
384 : stop: Option<time::Instant>,
385 : // accumulated time on the stopwatch
386 : accumulated: Accumulated,
387 : // label data
388 : protocol: Protocol,
389 : cold_start_info: ColdStartInfo,
390 : outcome: ConnectOutcome,
391 :
392 : skip_reporting: bool,
393 : }
394 :
395 : impl LatencyTimer {
396 70 : pub fn new(protocol: Protocol) -> Self {
397 70 : Self {
398 70 : start: time::Instant::now(),
399 70 : stop: None,
400 70 : accumulated: Accumulated::default(),
401 70 : protocol,
402 70 : cold_start_info: ColdStartInfo::Unknown,
403 70 : // assume failed unless otherwise specified
404 70 : outcome: ConnectOutcome::Failed,
405 70 : skip_reporting: false,
406 70 : }
407 70 : }
408 :
409 0 : pub(crate) fn noop(protocol: Protocol) -> Self {
410 0 : Self {
411 0 : start: time::Instant::now(),
412 0 : stop: None,
413 0 : accumulated: Accumulated::default(),
414 0 : protocol,
415 0 : cold_start_info: ColdStartInfo::Unknown,
416 0 : // assume failed unless otherwise specified
417 0 : outcome: ConnectOutcome::Failed,
418 0 : skip_reporting: true,
419 0 : }
420 0 : }
421 :
422 28 : pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
423 28 : let dur = start.elapsed();
424 28 : match waiting_for {
425 0 : Waiting::Cplane => self.accumulated.cplane += dur,
426 15 : Waiting::Client => self.accumulated.client += dur,
427 7 : Waiting::Compute => self.accumulated.compute += dur,
428 6 : Waiting::RetryTimeout => self.accumulated.retry += dur,
429 : }
430 28 : }
431 :
432 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
433 0 : self.cold_start_info = cold_start_info;
434 0 : }
435 :
436 4 : pub fn success(&mut self) {
437 4 : // stop the stopwatch and record the time that we have accumulated
438 4 : self.stop = Some(time::Instant::now());
439 4 :
440 4 : // success
441 4 : self.outcome = ConnectOutcome::Success;
442 4 : }
443 : }
444 :
445 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
446 : pub enum ConnectOutcome {
447 : Success,
448 : Failed,
449 : }
450 :
451 : impl Drop for LatencyTimer {
452 70 : fn drop(&mut self) {
453 70 : if self.skip_reporting {
454 0 : return;
455 70 : }
456 70 :
457 70 : let duration = self
458 70 : .stop
459 70 : .unwrap_or_else(time::Instant::now)
460 70 : .duration_since(self.start);
461 70 :
462 70 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
463 70 :
464 70 : // Excluding client communication from the accumulated time.
465 70 : metric.observe(
466 70 : ComputeConnectionLatencyGroup {
467 70 : protocol: self.protocol,
468 70 : cold_start_info: self.cold_start_info,
469 70 : outcome: self.outcome,
470 70 : excluded: LatencyExclusions::Client,
471 70 : },
472 70 : duration
473 70 : .saturating_sub(self.accumulated.client)
474 70 : .as_secs_f64(),
475 70 : );
476 70 :
477 70 : // Exclude client and cplane communication from the accumulated time.
478 70 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
479 70 : metric.observe(
480 70 : ComputeConnectionLatencyGroup {
481 70 : protocol: self.protocol,
482 70 : cold_start_info: self.cold_start_info,
483 70 : outcome: self.outcome,
484 70 : excluded: LatencyExclusions::ClientAndCplane,
485 70 : },
486 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
487 70 : );
488 70 :
489 70 : // Exclude client cplane, compue communication from the accumulated time.
490 70 : let accumulated_total =
491 70 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
492 70 : metric.observe(
493 70 : ComputeConnectionLatencyGroup {
494 70 : protocol: self.protocol,
495 70 : cold_start_info: self.cold_start_info,
496 70 : outcome: self.outcome,
497 70 : excluded: LatencyExclusions::ClientCplaneCompute,
498 70 : },
499 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
500 70 : );
501 70 :
502 70 : // Exclude client cplane, compue, retry communication from the accumulated time.
503 70 : let accumulated_total = self.accumulated.client
504 70 : + self.accumulated.cplane
505 70 : + self.accumulated.compute
506 70 : + self.accumulated.retry;
507 70 : metric.observe(
508 70 : ComputeConnectionLatencyGroup {
509 70 : protocol: self.protocol,
510 70 : cold_start_info: self.cold_start_info,
511 70 : outcome: self.outcome,
512 70 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
513 70 : },
514 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
515 70 : );
516 70 : }
517 : }
518 :
519 : impl From<bool> for Bool {
520 3 : fn from(value: bool) -> Self {
521 3 : if value {
522 2 : Bool::True
523 : } else {
524 1 : Bool::False
525 : }
526 3 : }
527 : }
528 :
529 225 : #[derive(LabelGroup)]
530 : #[label(set = InvalidEndpointsSet)]
531 : pub struct InvalidEndpointsGroup {
532 : pub protocol: Protocol,
533 : pub rejected: Bool,
534 : pub outcome: ConnectOutcome,
535 : }
536 :
537 180 : #[derive(LabelGroup)]
538 : #[label(set = RetriesMetricSet)]
539 : pub struct RetriesMetricGroup {
540 : pub outcome: ConnectOutcome,
541 : pub retry_type: RetryType,
542 : }
543 :
544 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
545 : pub enum RetryType {
546 : WakeCompute,
547 : ConnectToCompute,
548 : }
549 :
550 0 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
551 : #[label(singleton = "event")]
552 : pub enum RedisEventsCount {
553 : EndpointCreated,
554 : BranchCreated,
555 : ProjectCreated,
556 : CancelSession,
557 : PasswordUpdate,
558 : AllowedIpsUpdate,
559 : }
560 :
561 : pub struct ThreadPoolWorkers(usize);
562 : #[derive(Copy, Clone)]
563 : pub struct ThreadPoolWorkerId(pub usize);
564 :
565 : impl LabelValue for ThreadPoolWorkerId {
566 0 : fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
567 0 : v.write_int(self.0 as i64)
568 0 : }
569 : }
570 :
571 : impl LabelGroup for ThreadPoolWorkerId {
572 0 : fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
573 0 : v.write_value(LabelName::from_str("worker"), self);
574 0 : }
575 : }
576 :
577 : impl LabelGroupSet for ThreadPoolWorkers {
578 : type Group<'a> = ThreadPoolWorkerId;
579 :
580 102 : fn cardinality(&self) -> Option<usize> {
581 102 : Some(self.0)
582 102 : }
583 :
584 8 : fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
585 8 : Some(value)
586 8 : }
587 :
588 0 : fn decode_dense(&self, value: usize) -> Self::Group<'_> {
589 0 : ThreadPoolWorkerId(value)
590 0 : }
591 :
592 : type Unique = usize;
593 :
594 8 : fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
595 8 : Some(value.0)
596 8 : }
597 :
598 0 : fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
599 0 : ThreadPoolWorkerId(*value)
600 0 : }
601 : }
602 :
603 : impl LabelSet for ThreadPoolWorkers {
604 : type Value<'a> = ThreadPoolWorkerId;
605 :
606 0 : fn dynamic_cardinality(&self) -> Option<usize> {
607 0 : Some(self.0)
608 0 : }
609 :
610 0 : fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
611 0 : (value.0 < self.0).then_some(value.0)
612 0 : }
613 :
614 0 : fn decode(&self, value: usize) -> Self::Value<'_> {
615 0 : ThreadPoolWorkerId(value)
616 0 : }
617 : }
618 :
619 : impl FixedCardinalitySet for ThreadPoolWorkers {
620 0 : fn cardinality(&self) -> usize {
621 0 : self.0
622 0 : }
623 : }
624 :
625 51 : #[derive(MetricGroup)]
626 : #[metric(new(workers: usize))]
627 : pub struct ThreadPoolMetrics {
628 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
629 : pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
630 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
631 : pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
632 : }
|