Line data Source code
1 : use std::sync::{Arc, OnceLock};
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::label::{
5 : FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
6 : };
7 : use measured::metric::histogram::Thresholds;
8 : use measured::metric::name::MetricName;
9 : use measured::{
10 : Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
11 : MetricGroup,
12 : };
13 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec};
14 : use tokio::time::{self, Instant};
15 :
16 : use crate::control_plane::messages::ColdStartInfo;
17 : use crate::error::ErrorKind;
18 :
19 : #[derive(MetricGroup)]
20 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
21 : pub struct Metrics {
22 : #[metric(namespace = "proxy")]
23 : #[metric(init = ProxyMetrics::new(thread_pool))]
24 : pub proxy: ProxyMetrics,
25 :
26 : #[metric(namespace = "wake_compute_lock")]
27 : pub wake_compute_lock: ApiLockMetrics,
28 : }
29 :
30 : static SELF: OnceLock<Metrics> = OnceLock::new();
31 : impl Metrics {
32 0 : pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
33 0 : let mut metrics = Metrics::new(thread_pool);
34 :
35 0 : metrics.proxy.errors_total.init_all_dense();
36 0 : metrics.proxy.redis_errors_total.init_all_dense();
37 0 : metrics.proxy.redis_events_count.init_all_dense();
38 0 : metrics.proxy.retries_metric.init_all_dense();
39 0 : metrics.proxy.connection_failures_total.init_all_dense();
40 :
41 0 : SELF.set(metrics)
42 0 : .ok()
43 0 : .expect("proxy metrics must not be installed more than once");
44 0 : }
45 :
46 169 : pub fn get() -> &'static Self {
47 : #[cfg(test)]
48 169 : return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
49 :
50 : #[cfg(not(test))]
51 0 : SELF.get()
52 0 : .expect("proxy metrics must be installed by the main() function")
53 169 : }
54 : }
55 :
56 : #[derive(MetricGroup)]
57 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
58 : pub struct ProxyMetrics {
59 : #[metric(flatten)]
60 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
61 : #[metric(flatten)]
62 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
63 : #[metric(flatten)]
64 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
65 : #[metric(flatten)]
66 : pub http_endpoint_pools: HttpEndpointPools,
67 : #[metric(flatten)]
68 : pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
69 :
70 : /// Time it took for proxy to establish a connection to the compute endpoint.
71 : // largest bucket = 2^16 * 0.5ms = 32s
72 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
73 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
74 :
75 : /// Time it took for proxy to receive a response from control plane.
76 : #[metric(
77 : // largest bucket = 2^16 * 0.2ms = 13s
78 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
79 : )]
80 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
81 :
82 : /// Size of the HTTP request body lengths.
83 : // smallest bucket = 16 bytes
84 : // largest bucket = 4^12 * 16 bytes = 256MB
85 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
86 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
87 :
88 : /// Time it takes to reclaim unused connection pools.
89 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
90 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
91 :
92 : /// Number of opened connections to a database.
93 : pub http_pool_opened_connections: Gauge,
94 :
95 : /// Number of allowed ips
96 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
97 : pub allowed_ips_number: Histogram<10>,
98 :
99 : /// Number of allowed VPC endpoints IDs
100 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
101 : pub allowed_vpc_endpoint_ids: Histogram<10>,
102 :
103 : /// Number of connections, by the method we used to determine the endpoint.
104 : pub accepted_connections_by_sni: CounterVec<SniSet>,
105 :
106 : /// Number of connection failures (per kind).
107 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
108 :
109 : /// Number of wake-up failures (per kind).
110 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
111 :
112 : /// Number of bytes sent/received between all clients and backends.
113 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
114 :
115 : /// Number of errors by a given classification.
116 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
117 :
118 : /// Number of cancellation requests (per found/not_found).
119 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
120 :
121 : /// Number of errors by a given classification
122 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
123 :
124 : /// Number of TLS handshake failures
125 : pub tls_handshake_failures: Counter,
126 :
127 : /// HLL approximate cardinality of endpoints that are connecting
128 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
129 :
130 : /// Number of endpoints affected by errors of a given classification
131 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
132 :
133 : /// Number of retries (per outcome, per retry_type).
134 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
135 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
136 :
137 : /// Number of events consumed from redis (per event type).
138 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
139 :
140 : #[metric(namespace = "connect_compute_lock")]
141 : pub connect_compute_lock: ApiLockMetrics,
142 :
143 : #[metric(namespace = "scram_pool")]
144 : #[metric(init = thread_pool)]
145 : pub scram_pool: Arc<ThreadPoolMetrics>,
146 : }
147 :
148 : #[derive(MetricGroup)]
149 : #[metric(new())]
150 : pub struct ApiLockMetrics {
151 : /// Number of semaphores registered in this api lock
152 : pub semaphores_registered: Counter,
153 : /// Number of semaphores unregistered in this api lock
154 : pub semaphores_unregistered: Counter,
155 : /// Time it takes to reclaim unused semaphores in the api lock
156 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
157 : pub reclamation_lag_seconds: Histogram<16>,
158 : /// Time it takes to acquire a semaphore lock
159 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
160 : pub semaphore_acquire_seconds: Histogram<16>,
161 : }
162 :
163 : impl Default for ApiLockMetrics {
164 102 : fn default() -> Self {
165 102 : Self::new()
166 102 : }
167 : }
168 :
169 : #[derive(FixedCardinalityLabel, Copy, Clone)]
170 : #[label(singleton = "direction")]
171 : pub enum HttpDirection {
172 : Request,
173 : Response,
174 : }
175 :
176 : #[derive(FixedCardinalityLabel, Copy, Clone)]
177 : #[label(singleton = "direction")]
178 : pub enum Direction {
179 : Tx,
180 : Rx,
181 : }
182 :
183 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
184 : #[label(singleton = "protocol")]
185 : pub enum Protocol {
186 : Http,
187 : Ws,
188 : Tcp,
189 : SniRouter,
190 : }
191 :
192 : impl Protocol {
193 6 : pub fn as_str(self) -> &'static str {
194 6 : match self {
195 0 : Protocol::Http => "http",
196 0 : Protocol::Ws => "ws",
197 6 : Protocol::Tcp => "tcp",
198 0 : Protocol::SniRouter => "sni_router",
199 : }
200 6 : }
201 : }
202 :
203 : impl std::fmt::Display for Protocol {
204 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 6 : f.write_str(self.as_str())
206 6 : }
207 : }
208 :
209 : #[derive(FixedCardinalityLabel, Copy, Clone)]
210 : pub enum Bool {
211 : True,
212 : False,
213 : }
214 :
215 : #[derive(FixedCardinalityLabel, Copy, Clone)]
216 : #[label(singleton = "outcome")]
217 : pub enum CacheOutcome {
218 : Hit,
219 : Miss,
220 : }
221 :
222 : #[derive(LabelGroup)]
223 : #[label(set = ConsoleRequestSet)]
224 : pub struct ConsoleRequest<'a> {
225 : #[label(dynamic_with = ThreadedRodeo, default)]
226 : pub request: &'a str,
227 : }
228 :
229 : #[derive(MetricGroup, Default)]
230 : pub struct HttpEndpointPools {
231 : /// Number of endpoints we have registered pools for
232 : pub http_pool_endpoints_registered_total: Counter,
233 : /// Number of endpoints we have unregistered pools for
234 : pub http_pool_endpoints_unregistered_total: Counter,
235 : }
236 :
237 : pub struct HttpEndpointPoolsGuard<'a> {
238 : dec: &'a Counter,
239 : }
240 :
241 : impl Drop for HttpEndpointPoolsGuard<'_> {
242 2 : fn drop(&mut self) {
243 2 : self.dec.inc();
244 2 : }
245 : }
246 :
247 : impl HttpEndpointPools {
248 2 : pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
249 2 : self.http_pool_endpoints_registered_total.inc();
250 2 : HttpEndpointPoolsGuard {
251 2 : dec: &self.http_pool_endpoints_unregistered_total,
252 2 : }
253 2 : }
254 : }
255 : pub struct NumDbConnectionsGauge;
256 : impl CounterPairAssoc for NumDbConnectionsGauge {
257 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
258 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
259 : const INC_HELP: &'static str = "Number of opened connections to a database.";
260 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
261 : type LabelGroupSet = StaticLabelSet<Protocol>;
262 : }
263 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
264 :
265 : pub struct NumClientConnectionsGauge;
266 : impl CounterPairAssoc for NumClientConnectionsGauge {
267 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
268 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
269 : const INC_HELP: &'static str = "Number of opened connections from a client.";
270 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
271 : type LabelGroupSet = StaticLabelSet<Protocol>;
272 : }
273 : pub type NumClientConnectionsGuard<'a> =
274 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
275 :
276 : pub struct NumConnectionRequestsGauge;
277 : impl CounterPairAssoc for NumConnectionRequestsGauge {
278 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
279 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
280 : const INC_HELP: &'static str = "Number of client connections accepted.";
281 : const DEC_HELP: &'static str = "Number of client connections closed.";
282 : type LabelGroupSet = StaticLabelSet<Protocol>;
283 : }
284 : pub type NumConnectionRequestsGuard<'a> =
285 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
286 :
287 : pub struct CancelChannelSizeGauge;
288 : impl CounterPairAssoc for CancelChannelSizeGauge {
289 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
290 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
291 : const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
292 : const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
293 : type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
294 : }
295 : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
296 :
297 : #[derive(LabelGroup)]
298 : #[label(set = ComputeConnectionLatencySet)]
299 : pub struct ComputeConnectionLatencyGroup {
300 : protocol: Protocol,
301 : cold_start_info: ColdStartInfo,
302 : outcome: ConnectOutcome,
303 : excluded: LatencyExclusions,
304 : }
305 :
306 : #[derive(FixedCardinalityLabel, Copy, Clone)]
307 : pub enum LatencyExclusions {
308 : Client,
309 : ClientAndCplane,
310 : ClientCplaneCompute,
311 : ClientCplaneComputeRetry,
312 : }
313 :
314 : #[derive(LabelGroup)]
315 : #[label(set = SniSet)]
316 : pub struct SniGroup {
317 : pub protocol: Protocol,
318 : pub kind: SniKind,
319 : }
320 :
321 : #[derive(FixedCardinalityLabel, Copy, Clone)]
322 : pub enum SniKind {
323 : /// Domain name based routing. SNI for libpq/websockets. Host for HTTP
324 : Sni,
325 : /// Metadata based routing. `options` for libpq/websockets. Header for HTTP
326 : NoSni,
327 : /// Metadata based routing, using the password field.
328 : PasswordHack,
329 : }
330 :
331 : #[derive(FixedCardinalityLabel, Copy, Clone)]
332 : #[label(singleton = "kind")]
333 : pub enum ConnectionFailureKind {
334 : ComputeCached,
335 : ComputeUncached,
336 : }
337 :
338 : #[derive(LabelGroup)]
339 : #[label(set = ConnectionFailuresBreakdownSet)]
340 : pub struct ConnectionFailuresBreakdownGroup {
341 : pub kind: ErrorKind,
342 : pub retry: Bool,
343 : }
344 :
345 : #[derive(LabelGroup, Copy, Clone)]
346 : #[label(set = RedisErrorsSet)]
347 : pub struct RedisErrors<'a> {
348 : #[label(dynamic_with = ThreadedRodeo, default)]
349 : pub channel: &'a str,
350 : }
351 :
352 : #[derive(FixedCardinalityLabel, Copy, Clone)]
353 : pub enum CancellationOutcome {
354 : NotFound,
355 : Found,
356 : RateLimitExceeded,
357 : }
358 :
359 : #[derive(LabelGroup)]
360 : #[label(set = CancellationRequestSet)]
361 : pub struct CancellationRequest {
362 : pub kind: CancellationOutcome,
363 : }
364 :
365 : #[derive(Clone, Copy)]
366 : pub enum Waiting {
367 : Cplane,
368 : Client,
369 : Compute,
370 : RetryTimeout,
371 : }
372 :
373 : #[derive(FixedCardinalityLabel, Copy, Clone)]
374 : #[label(singleton = "kind")]
375 : #[allow(clippy::enum_variant_names)]
376 : pub enum RedisMsgKind {
377 : Set,
378 : Get,
379 : Expire,
380 : HGet,
381 : }
382 :
383 : #[derive(Default, Clone)]
384 : pub struct LatencyAccumulated {
385 : cplane: time::Duration,
386 : client: time::Duration,
387 : compute: time::Duration,
388 : retry: time::Duration,
389 : }
390 :
391 : impl std::fmt::Display for LatencyAccumulated {
392 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393 0 : write!(
394 0 : f,
395 0 : "client: {}, cplane: {}, compute: {}, retry: {}",
396 0 : self.client.as_micros(),
397 0 : self.cplane.as_micros(),
398 0 : self.compute.as_micros(),
399 0 : self.retry.as_micros()
400 : )
401 0 : }
402 : }
403 :
404 : pub struct LatencyTimer {
405 : // time since the stopwatch was started
406 : start: time::Instant,
407 : // time since the stopwatch was stopped
408 : stop: Option<time::Instant>,
409 : // accumulated time on the stopwatch
410 : accumulated: LatencyAccumulated,
411 : // label data
412 : protocol: Protocol,
413 : cold_start_info: ColdStartInfo,
414 : outcome: ConnectOutcome,
415 :
416 : skip_reporting: bool,
417 : }
418 :
419 : impl LatencyTimer {
420 77 : pub fn new(protocol: Protocol) -> Self {
421 77 : Self {
422 77 : start: time::Instant::now(),
423 77 : stop: None,
424 77 : accumulated: LatencyAccumulated::default(),
425 77 : protocol,
426 77 : cold_start_info: ColdStartInfo::Unknown,
427 77 : // assume failed unless otherwise specified
428 77 : outcome: ConnectOutcome::Failed,
429 77 : skip_reporting: false,
430 77 : }
431 77 : }
432 :
433 0 : pub(crate) fn noop(protocol: Protocol) -> Self {
434 0 : Self {
435 0 : start: time::Instant::now(),
436 0 : stop: None,
437 0 : accumulated: LatencyAccumulated::default(),
438 0 : protocol,
439 0 : cold_start_info: ColdStartInfo::Unknown,
440 0 : // assume failed unless otherwise specified
441 0 : outcome: ConnectOutcome::Failed,
442 0 : skip_reporting: true,
443 0 : }
444 0 : }
445 :
446 52 : pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
447 52 : let dur = start.elapsed();
448 52 : match waiting_for {
449 0 : Waiting::Cplane => self.accumulated.cplane += dur,
450 39 : Waiting::Client => self.accumulated.client += dur,
451 7 : Waiting::Compute => self.accumulated.compute += dur,
452 6 : Waiting::RetryTimeout => self.accumulated.retry += dur,
453 : }
454 52 : }
455 :
456 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
457 0 : self.cold_start_info = cold_start_info;
458 0 : }
459 :
460 8 : pub fn success(&mut self) {
461 : // stop the stopwatch and record the time that we have accumulated
462 8 : self.stop = Some(time::Instant::now());
463 :
464 : // success
465 8 : self.outcome = ConnectOutcome::Success;
466 8 : }
467 :
468 0 : pub fn accumulated(&self) -> LatencyAccumulated {
469 0 : self.accumulated.clone()
470 0 : }
471 : }
472 :
473 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
474 : pub enum ConnectOutcome {
475 : Success,
476 : Failed,
477 : }
478 :
479 : impl Drop for LatencyTimer {
480 77 : fn drop(&mut self) {
481 77 : if self.skip_reporting {
482 0 : return;
483 77 : }
484 :
485 77 : let duration = self
486 77 : .stop
487 77 : .unwrap_or_else(time::Instant::now)
488 77 : .duration_since(self.start);
489 :
490 77 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
491 :
492 : // Excluding client communication from the accumulated time.
493 77 : metric.observe(
494 77 : ComputeConnectionLatencyGroup {
495 77 : protocol: self.protocol,
496 77 : cold_start_info: self.cold_start_info,
497 77 : outcome: self.outcome,
498 77 : excluded: LatencyExclusions::Client,
499 77 : },
500 77 : duration
501 77 : .saturating_sub(self.accumulated.client)
502 77 : .as_secs_f64(),
503 : );
504 :
505 : // Exclude client and cplane communication from the accumulated time.
506 77 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
507 77 : metric.observe(
508 77 : ComputeConnectionLatencyGroup {
509 77 : protocol: self.protocol,
510 77 : cold_start_info: self.cold_start_info,
511 77 : outcome: self.outcome,
512 77 : excluded: LatencyExclusions::ClientAndCplane,
513 77 : },
514 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
515 : );
516 :
517 : // Exclude client, cplane, compute communication from the accumulated time.
518 77 : let accumulated_total =
519 77 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
520 77 : metric.observe(
521 77 : ComputeConnectionLatencyGroup {
522 77 : protocol: self.protocol,
523 77 : cold_start_info: self.cold_start_info,
524 77 : outcome: self.outcome,
525 77 : excluded: LatencyExclusions::ClientCplaneCompute,
526 77 : },
527 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
528 : );
529 :
530 : // Exclude client, cplane, compute, retry communication from the accumulated time.
531 77 : let accumulated_total = self.accumulated.client
532 77 : + self.accumulated.cplane
533 77 : + self.accumulated.compute
534 77 : + self.accumulated.retry;
535 77 : metric.observe(
536 77 : ComputeConnectionLatencyGroup {
537 77 : protocol: self.protocol,
538 77 : cold_start_info: self.cold_start_info,
539 77 : outcome: self.outcome,
540 77 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
541 77 : },
542 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
543 : );
544 77 : }
545 : }
546 :
547 : impl From<bool> for Bool {
548 3 : fn from(value: bool) -> Self {
549 3 : if value { Bool::True } else { Bool::False }
550 3 : }
551 : }
552 :
553 : #[derive(LabelGroup)]
554 : #[label(set = InvalidEndpointsSet)]
555 : pub struct InvalidEndpointsGroup {
556 : pub protocol: Protocol,
557 : pub rejected: Bool,
558 : pub outcome: ConnectOutcome,
559 : }
560 :
561 : #[derive(LabelGroup)]
562 : #[label(set = RetriesMetricSet)]
563 : pub struct RetriesMetricGroup {
564 : pub outcome: ConnectOutcome,
565 : pub retry_type: RetryType,
566 : }
567 :
568 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
569 : pub enum RetryType {
570 : WakeCompute,
571 : ConnectToCompute,
572 : }
573 :
574 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
575 : #[label(singleton = "event")]
576 : pub enum RedisEventsCount {
577 : EndpointCreated,
578 : BranchCreated,
579 : ProjectCreated,
580 : CancelSession,
581 : InvalidateRole,
582 : InvalidateEndpoint,
583 : InvalidateProject,
584 : InvalidateProjects,
585 : InvalidateOrg,
586 : }
587 :
588 : pub struct ThreadPoolWorkers(usize);
589 : #[derive(Copy, Clone)]
590 : pub struct ThreadPoolWorkerId(pub usize);
591 :
592 : impl LabelValue for ThreadPoolWorkerId {
593 0 : fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
594 0 : v.write_int(self.0 as i64)
595 0 : }
596 : }
597 :
598 : impl LabelGroup for ThreadPoolWorkerId {
599 0 : fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
600 0 : v.write_value(LabelName::from_str("worker"), self);
601 0 : }
602 : }
603 :
604 : impl LabelGroupSet for ThreadPoolWorkers {
605 : type Group<'a> = ThreadPoolWorkerId;
606 :
607 114 : fn cardinality(&self) -> Option<usize> {
608 114 : Some(self.0)
609 114 : }
610 :
611 6 : fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
612 6 : Some(value)
613 6 : }
614 :
615 0 : fn decode_dense(&self, value: usize) -> Self::Group<'_> {
616 0 : ThreadPoolWorkerId(value)
617 0 : }
618 :
619 : type Unique = usize;
620 :
621 6 : fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
622 6 : Some(value.0)
623 6 : }
624 :
625 0 : fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
626 0 : ThreadPoolWorkerId(*value)
627 0 : }
628 : }
629 :
630 : impl LabelSet for ThreadPoolWorkers {
631 : type Value<'a> = ThreadPoolWorkerId;
632 :
633 0 : fn dynamic_cardinality(&self) -> Option<usize> {
634 0 : Some(self.0)
635 0 : }
636 :
637 0 : fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
638 0 : (value.0 < self.0).then_some(value.0)
639 0 : }
640 :
641 0 : fn decode(&self, value: usize) -> Self::Value<'_> {
642 0 : ThreadPoolWorkerId(value)
643 0 : }
644 : }
645 :
646 : impl FixedCardinalitySet for ThreadPoolWorkers {
647 0 : fn cardinality(&self) -> usize {
648 0 : self.0
649 0 : }
650 : }
651 :
652 : #[derive(MetricGroup)]
653 : #[metric(new(workers: usize))]
654 : pub struct ThreadPoolMetrics {
655 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
656 : pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
657 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
658 : pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
659 : }
|