Line data Source code
1 : use std::sync::{Arc, OnceLock};
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::label::{
5 : FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
6 : };
7 : use measured::metric::histogram::Thresholds;
8 : use measured::metric::name::MetricName;
9 : use measured::{
10 : Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
11 : MetricGroup,
12 : };
13 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
14 : use tokio::time::{self, Instant};
15 :
16 : use crate::control_plane::messages::ColdStartInfo;
17 : use crate::error::ErrorKind;
18 :
19 44 : #[derive(MetricGroup)]
20 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
21 : pub struct Metrics {
22 : #[metric(namespace = "proxy")]
23 : #[metric(init = ProxyMetrics::new(thread_pool))]
24 : pub proxy: ProxyMetrics,
25 :
26 : #[metric(namespace = "wake_compute_lock")]
27 : pub wake_compute_lock: ApiLockMetrics,
28 : }
29 :
30 : static SELF: OnceLock<Metrics> = OnceLock::new();
31 : impl Metrics {
32 0 : pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
33 0 : SELF.set(Metrics::new(thread_pool))
34 0 : .ok()
35 0 : .expect("proxy metrics must not be installed more than once");
36 0 : }
37 :
38 143 : pub fn get() -> &'static Self {
39 143 : #[cfg(test)]
40 143 : return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
41 143 :
42 143 : #[cfg(not(test))]
43 143 : SELF.get()
44 143 : .expect("proxy metrics must be installed by the main() function")
45 143 : }
46 : }
47 :
48 44 : #[derive(MetricGroup)]
49 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
50 : pub struct ProxyMetrics {
51 : #[metric(flatten)]
52 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
53 : #[metric(flatten)]
54 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
55 : #[metric(flatten)]
56 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
57 : #[metric(flatten)]
58 : pub http_endpoint_pools: HttpEndpointPools,
59 : #[metric(flatten)]
60 : pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
61 :
62 : /// Time it took for proxy to establish a connection to the compute endpoint.
63 : // largest bucket = 2^16 * 0.5ms = 32s
64 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
65 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
66 :
67 : /// Time it took for proxy to receive a response from control plane.
68 : #[metric(
69 : // largest bucket = 2^16 * 0.2ms = 13s
70 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
71 : )]
72 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
73 :
74 : /// Time it takes to acquire a token to call console plane.
75 : // largest bucket = 3^16 * 0.05ms = 2.15s
76 : #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
77 : pub control_plane_token_acquire_seconds: Histogram<16>,
78 :
79 : /// Size of the HTTP request body lengths.
80 : // smallest bucket = 16 bytes
81 : // largest bucket = 4^12 * 16 bytes = 256MB
82 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
83 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
84 :
85 : /// Time it takes to reclaim unused connection pools.
86 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
87 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
88 :
89 : /// Number of opened connections to a database.
90 : pub http_pool_opened_connections: Gauge,
91 :
92 : /// Number of cache hits/misses for allowed ips.
93 : pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
94 :
95 : /// Number of allowed ips
96 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
97 : pub allowed_ips_number: Histogram<10>,
98 :
99 : /// Number of connections (per sni).
100 : pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
101 :
102 : /// Number of connection failures (per kind).
103 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
104 :
105 : /// Number of wake-up failures (per kind).
106 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
107 :
108 : /// Number of bytes sent/received between all clients and backends.
109 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
110 :
111 : /// Number of errors by a given classification.
112 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
113 :
114 : /// Number of cancellation requests (per found/not_found).
115 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
116 :
117 : /// Number of errors by a given classification
118 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
119 :
120 : /// Number of TLS handshake failures
121 : pub tls_handshake_failures: Counter,
122 :
123 : /// Number of connection requests affected by authentication rate limits
124 : pub requests_auth_rate_limits_total: Counter,
125 :
126 : /// HLL approximate cardinality of endpoints that are connecting
127 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
128 :
129 : /// Number of endpoints affected by errors of a given classification
130 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
131 :
132 : /// Number of endpoints affected by authentication rate limits
133 : pub endpoints_auth_rate_limits: HyperLogLog<32>,
134 :
135 : /// Number of invalid endpoints (per protocol, per rejected).
136 : pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
137 :
138 : /// Number of retries (per outcome, per retry_type).
139 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
140 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
141 :
142 : /// Number of events consumed from redis (per event type).
143 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
144 :
145 : #[metric(namespace = "connect_compute_lock")]
146 : pub connect_compute_lock: ApiLockMetrics,
147 :
148 : #[metric(namespace = "scram_pool")]
149 : #[metric(init = thread_pool)]
150 : pub scram_pool: Arc<ThreadPoolMetrics>,
151 : }
152 :
153 88 : #[derive(MetricGroup)]
154 : #[metric(new())]
155 : pub struct ApiLockMetrics {
156 : /// Number of semaphores registered in this api lock
157 : pub semaphores_registered: Counter,
158 : /// Number of semaphores unregistered in this api lock
159 : pub semaphores_unregistered: Counter,
160 : /// Time it takes to reclaim unused semaphores in the api lock
161 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
162 : pub reclamation_lag_seconds: Histogram<16>,
163 : /// Time it takes to acquire a semaphore lock
164 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
165 : pub semaphore_acquire_seconds: Histogram<16>,
166 : }
167 :
168 : impl Default for ApiLockMetrics {
169 88 : fn default() -> Self {
170 88 : Self::new()
171 88 : }
172 : }
173 :
174 : #[derive(FixedCardinalityLabel, Copy, Clone)]
175 : #[label(singleton = "direction")]
176 : pub enum HttpDirection {
177 : Request,
178 : Response,
179 : }
180 :
181 : #[derive(FixedCardinalityLabel, Copy, Clone)]
182 : #[label(singleton = "direction")]
183 : pub enum Direction {
184 : Tx,
185 : Rx,
186 : }
187 :
188 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
189 : #[label(singleton = "protocol")]
190 : pub enum Protocol {
191 : Http,
192 : Ws,
193 : Tcp,
194 : SniRouter,
195 : }
196 :
197 : impl Protocol {
198 0 : pub fn as_str(&self) -> &'static str {
199 0 : match self {
200 0 : Protocol::Http => "http",
201 0 : Protocol::Ws => "ws",
202 0 : Protocol::Tcp => "tcp",
203 0 : Protocol::SniRouter => "sni_router",
204 : }
205 0 : }
206 : }
207 :
208 : impl std::fmt::Display for Protocol {
209 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 0 : f.write_str(self.as_str())
211 0 : }
212 : }
213 :
214 : #[derive(FixedCardinalityLabel, Copy, Clone)]
215 : pub enum Bool {
216 : True,
217 : False,
218 : }
219 :
220 : #[derive(FixedCardinalityLabel, Copy, Clone)]
221 : #[label(singleton = "outcome")]
222 : pub enum Outcome {
223 : Success,
224 : Failed,
225 : }
226 :
227 : #[derive(FixedCardinalityLabel, Copy, Clone)]
228 : #[label(singleton = "outcome")]
229 : pub enum CacheOutcome {
230 : Hit,
231 : Miss,
232 : }
233 :
234 88 : #[derive(LabelGroup)]
235 : #[label(set = ConsoleRequestSet)]
236 : pub struct ConsoleRequest<'a> {
237 : #[label(dynamic_with = ThreadedRodeo, default)]
238 : pub request: &'a str,
239 : }
240 :
241 : #[derive(MetricGroup, Default)]
242 : pub struct HttpEndpointPools {
243 : /// Number of endpoints we have registered pools for
244 : pub http_pool_endpoints_registered_total: Counter,
245 : /// Number of endpoints we have unregistered pools for
246 : pub http_pool_endpoints_unregistered_total: Counter,
247 : }
248 :
249 : pub struct HttpEndpointPoolsGuard<'a> {
250 : dec: &'a Counter,
251 : }
252 :
253 : impl Drop for HttpEndpointPoolsGuard<'_> {
254 2 : fn drop(&mut self) {
255 2 : self.dec.inc();
256 2 : }
257 : }
258 :
259 : impl HttpEndpointPools {
260 2 : pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
261 2 : self.http_pool_endpoints_registered_total.inc();
262 2 : HttpEndpointPoolsGuard {
263 2 : dec: &self.http_pool_endpoints_unregistered_total,
264 2 : }
265 2 : }
266 : }
267 : pub struct NumDbConnectionsGauge;
268 : impl CounterPairAssoc for NumDbConnectionsGauge {
269 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
270 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
271 : const INC_HELP: &'static str = "Number of opened connections to a database.";
272 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
273 : type LabelGroupSet = StaticLabelSet<Protocol>;
274 : }
275 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
276 :
277 : pub struct NumClientConnectionsGauge;
278 : impl CounterPairAssoc for NumClientConnectionsGauge {
279 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
280 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
281 : const INC_HELP: &'static str = "Number of opened connections from a client.";
282 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
283 : type LabelGroupSet = StaticLabelSet<Protocol>;
284 : }
285 : pub type NumClientConnectionsGuard<'a> =
286 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
287 :
288 : pub struct NumConnectionRequestsGauge;
289 : impl CounterPairAssoc for NumConnectionRequestsGauge {
290 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
291 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
292 : const INC_HELP: &'static str = "Number of client connections accepted.";
293 : const DEC_HELP: &'static str = "Number of client connections closed.";
294 : type LabelGroupSet = StaticLabelSet<Protocol>;
295 : }
296 : pub type NumConnectionRequestsGuard<'a> =
297 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
298 :
299 : pub struct CancelChannelSizeGauge;
300 : impl CounterPairAssoc for CancelChannelSizeGauge {
301 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
302 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
303 : const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
304 : const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
305 : type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
306 : }
307 : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
308 :
309 264 : #[derive(LabelGroup)]
310 : #[label(set = ComputeConnectionLatencySet)]
311 : pub struct ComputeConnectionLatencyGroup {
312 : protocol: Protocol,
313 : cold_start_info: ColdStartInfo,
314 : outcome: ConnectOutcome,
315 : excluded: LatencyExclusions,
316 : }
317 :
318 : #[derive(FixedCardinalityLabel, Copy, Clone)]
319 : pub enum LatencyExclusions {
320 : Client,
321 : ClientAndCplane,
322 : ClientCplaneCompute,
323 : ClientCplaneComputeRetry,
324 : }
325 :
326 : #[derive(FixedCardinalityLabel, Copy, Clone)]
327 : #[label(singleton = "kind")]
328 : pub enum SniKind {
329 : Sni,
330 : NoSni,
331 : PasswordHack,
332 : }
333 :
334 : #[derive(FixedCardinalityLabel, Copy, Clone)]
335 : #[label(singleton = "kind")]
336 : pub enum ConnectionFailureKind {
337 : ComputeCached,
338 : ComputeUncached,
339 : }
340 :
341 176 : #[derive(LabelGroup)]
342 : #[label(set = ConnectionFailuresBreakdownSet)]
343 : pub struct ConnectionFailuresBreakdownGroup {
344 : pub kind: ErrorKind,
345 : pub retry: Bool,
346 : }
347 :
348 88 : #[derive(LabelGroup, Copy, Clone)]
349 : #[label(set = RedisErrorsSet)]
350 : pub struct RedisErrors<'a> {
351 : #[label(dynamic_with = ThreadedRodeo, default)]
352 : pub channel: &'a str,
353 : }
354 :
355 : #[derive(FixedCardinalityLabel, Copy, Clone)]
356 : pub enum CancellationOutcome {
357 : NotFound,
358 : Found,
359 : RateLimitExceeded,
360 : }
361 :
362 132 : #[derive(LabelGroup)]
363 : #[label(set = CancellationRequestSet)]
364 : pub struct CancellationRequest {
365 : pub kind: CancellationOutcome,
366 : }
367 :
368 : #[derive(Clone, Copy)]
369 : pub enum Waiting {
370 : Cplane,
371 : Client,
372 : Compute,
373 : RetryTimeout,
374 : }
375 :
376 : #[derive(FixedCardinalityLabel, Copy, Clone)]
377 : #[label(singleton = "kind")]
378 : pub enum RedisMsgKind {
379 : HSet,
380 : HSetMultiple,
381 : HGet,
382 : HGetAll,
383 : HDel,
384 : }
385 :
386 : #[derive(Default)]
387 : struct Accumulated {
388 : cplane: time::Duration,
389 : client: time::Duration,
390 : compute: time::Duration,
391 : retry: time::Duration,
392 : }
393 :
394 : pub struct LatencyTimer {
395 : // time since the stopwatch was started
396 : start: time::Instant,
397 : // time since the stopwatch was stopped
398 : stop: Option<time::Instant>,
399 : // accumulated time on the stopwatch
400 : accumulated: Accumulated,
401 : // label data
402 : protocol: Protocol,
403 : cold_start_info: ColdStartInfo,
404 : outcome: ConnectOutcome,
405 :
406 : skip_reporting: bool,
407 : }
408 :
409 : impl LatencyTimer {
410 70 : pub fn new(protocol: Protocol) -> Self {
411 70 : Self {
412 70 : start: time::Instant::now(),
413 70 : stop: None,
414 70 : accumulated: Accumulated::default(),
415 70 : protocol,
416 70 : cold_start_info: ColdStartInfo::Unknown,
417 70 : // assume failed unless otherwise specified
418 70 : outcome: ConnectOutcome::Failed,
419 70 : skip_reporting: false,
420 70 : }
421 70 : }
422 :
423 0 : pub(crate) fn noop(protocol: Protocol) -> Self {
424 0 : Self {
425 0 : start: time::Instant::now(),
426 0 : stop: None,
427 0 : accumulated: Accumulated::default(),
428 0 : protocol,
429 0 : cold_start_info: ColdStartInfo::Unknown,
430 0 : // assume failed unless otherwise specified
431 0 : outcome: ConnectOutcome::Failed,
432 0 : skip_reporting: true,
433 0 : }
434 0 : }
435 :
436 28 : pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
437 28 : let dur = start.elapsed();
438 28 : match waiting_for {
439 0 : Waiting::Cplane => self.accumulated.cplane += dur,
440 15 : Waiting::Client => self.accumulated.client += dur,
441 7 : Waiting::Compute => self.accumulated.compute += dur,
442 6 : Waiting::RetryTimeout => self.accumulated.retry += dur,
443 : }
444 28 : }
445 :
446 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
447 0 : self.cold_start_info = cold_start_info;
448 0 : }
449 :
450 4 : pub fn success(&mut self) {
451 4 : // stop the stopwatch and record the time that we have accumulated
452 4 : self.stop = Some(time::Instant::now());
453 4 :
454 4 : // success
455 4 : self.outcome = ConnectOutcome::Success;
456 4 : }
457 : }
458 :
459 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
460 : pub enum ConnectOutcome {
461 : Success,
462 : Failed,
463 : }
464 :
465 : impl Drop for LatencyTimer {
466 70 : fn drop(&mut self) {
467 70 : if self.skip_reporting {
468 0 : return;
469 70 : }
470 70 :
471 70 : let duration = self
472 70 : .stop
473 70 : .unwrap_or_else(time::Instant::now)
474 70 : .duration_since(self.start);
475 70 :
476 70 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
477 70 :
478 70 : // Excluding client communication from the accumulated time.
479 70 : metric.observe(
480 70 : ComputeConnectionLatencyGroup {
481 70 : protocol: self.protocol,
482 70 : cold_start_info: self.cold_start_info,
483 70 : outcome: self.outcome,
484 70 : excluded: LatencyExclusions::Client,
485 70 : },
486 70 : duration
487 70 : .saturating_sub(self.accumulated.client)
488 70 : .as_secs_f64(),
489 70 : );
490 70 :
491 70 : // Exclude client and cplane communication from the accumulated time.
492 70 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
493 70 : metric.observe(
494 70 : ComputeConnectionLatencyGroup {
495 70 : protocol: self.protocol,
496 70 : cold_start_info: self.cold_start_info,
497 70 : outcome: self.outcome,
498 70 : excluded: LatencyExclusions::ClientAndCplane,
499 70 : },
500 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
501 70 : );
502 70 :
503 70 : // Exclude client cplane, compue communication from the accumulated time.
504 70 : let accumulated_total =
505 70 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
506 70 : metric.observe(
507 70 : ComputeConnectionLatencyGroup {
508 70 : protocol: self.protocol,
509 70 : cold_start_info: self.cold_start_info,
510 70 : outcome: self.outcome,
511 70 : excluded: LatencyExclusions::ClientCplaneCompute,
512 70 : },
513 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
514 70 : );
515 70 :
516 70 : // Exclude client cplane, compue, retry communication from the accumulated time.
517 70 : let accumulated_total = self.accumulated.client
518 70 : + self.accumulated.cplane
519 70 : + self.accumulated.compute
520 70 : + self.accumulated.retry;
521 70 : metric.observe(
522 70 : ComputeConnectionLatencyGroup {
523 70 : protocol: self.protocol,
524 70 : cold_start_info: self.cold_start_info,
525 70 : outcome: self.outcome,
526 70 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
527 70 : },
528 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
529 70 : );
530 70 : }
531 : }
532 :
533 : impl From<bool> for Bool {
534 3 : fn from(value: bool) -> Self {
535 3 : if value {
536 2 : Bool::True
537 : } else {
538 1 : Bool::False
539 : }
540 3 : }
541 : }
542 :
543 220 : #[derive(LabelGroup)]
544 : #[label(set = InvalidEndpointsSet)]
545 : pub struct InvalidEndpointsGroup {
546 : pub protocol: Protocol,
547 : pub rejected: Bool,
548 : pub outcome: ConnectOutcome,
549 : }
550 :
551 176 : #[derive(LabelGroup)]
552 : #[label(set = RetriesMetricSet)]
553 : pub struct RetriesMetricGroup {
554 : pub outcome: ConnectOutcome,
555 : pub retry_type: RetryType,
556 : }
557 :
558 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
559 : pub enum RetryType {
560 : WakeCompute,
561 : ConnectToCompute,
562 : }
563 :
564 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
565 : #[label(singleton = "event")]
566 : pub enum RedisEventsCount {
567 : EndpointCreated,
568 : BranchCreated,
569 : ProjectCreated,
570 : CancelSession,
571 : PasswordUpdate,
572 : AllowedIpsUpdate,
573 : }
574 :
575 : pub struct ThreadPoolWorkers(usize);
576 : #[derive(Copy, Clone)]
577 : pub struct ThreadPoolWorkerId(pub usize);
578 :
579 : impl LabelValue for ThreadPoolWorkerId {
580 0 : fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
581 0 : v.write_int(self.0 as i64)
582 0 : }
583 : }
584 :
585 : impl LabelGroup for ThreadPoolWorkerId {
586 0 : fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
587 0 : v.write_value(LabelName::from_str("worker"), self);
588 0 : }
589 : }
590 :
591 : impl LabelGroupSet for ThreadPoolWorkers {
592 : type Group<'a> = ThreadPoolWorkerId;
593 :
594 100 : fn cardinality(&self) -> Option<usize> {
595 100 : Some(self.0)
596 100 : }
597 :
598 8 : fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
599 8 : Some(value)
600 8 : }
601 :
602 0 : fn decode_dense(&self, value: usize) -> Self::Group<'_> {
603 0 : ThreadPoolWorkerId(value)
604 0 : }
605 :
606 : type Unique = usize;
607 :
608 8 : fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
609 8 : Some(value.0)
610 8 : }
611 :
612 0 : fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
613 0 : ThreadPoolWorkerId(*value)
614 0 : }
615 : }
616 :
617 : impl LabelSet for ThreadPoolWorkers {
618 : type Value<'a> = ThreadPoolWorkerId;
619 :
620 0 : fn dynamic_cardinality(&self) -> Option<usize> {
621 0 : Some(self.0)
622 0 : }
623 :
624 0 : fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
625 0 : (value.0 < self.0).then_some(value.0)
626 0 : }
627 :
628 0 : fn decode(&self, value: usize) -> Self::Value<'_> {
629 0 : ThreadPoolWorkerId(value)
630 0 : }
631 : }
632 :
633 : impl FixedCardinalitySet for ThreadPoolWorkers {
634 0 : fn cardinality(&self) -> usize {
635 0 : self.0
636 0 : }
637 : }
638 :
639 50 : #[derive(MetricGroup)]
640 : #[metric(new(workers: usize))]
641 : pub struct ThreadPoolMetrics {
642 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
643 : pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
644 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
645 : pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
646 : }
|