Line data Source code
1 : use std::sync::{Arc, OnceLock};
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::label::{
5 : FixedCardinalitySet, LabelGroupSet, LabelGroupVisitor, LabelName, LabelSet, LabelValue,
6 : StaticLabelSet,
7 : };
8 : use measured::metric::histogram::Thresholds;
9 : use measured::metric::name::MetricName;
10 : use measured::{
11 : Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
12 : MetricGroup,
13 : };
14 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
15 : use tokio::time::{self, Instant};
16 :
17 : use crate::control_plane::messages::ColdStartInfo;
18 : use crate::error::ErrorKind;
19 :
20 : #[derive(MetricGroup)]
21 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
22 : pub struct Metrics {
23 : #[metric(namespace = "proxy")]
24 : #[metric(init = ProxyMetrics::new(thread_pool))]
25 : pub proxy: ProxyMetrics,
26 :
27 : #[metric(namespace = "wake_compute_lock")]
28 : pub wake_compute_lock: ApiLockMetrics,
29 :
30 : #[metric(namespace = "service")]
31 : pub service: ServiceMetrics,
32 : }
33 :
34 : static SELF: OnceLock<Metrics> = OnceLock::new();
35 : impl Metrics {
36 0 : pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
37 0 : let mut metrics = Metrics::new(thread_pool);
38 :
39 0 : metrics.proxy.errors_total.init_all_dense();
40 0 : metrics.proxy.redis_errors_total.init_all_dense();
41 0 : metrics.proxy.redis_events_count.init_all_dense();
42 0 : metrics.proxy.retries_metric.init_all_dense();
43 0 : metrics.proxy.connection_failures_total.init_all_dense();
44 :
45 0 : SELF.set(metrics)
46 0 : .ok()
47 0 : .expect("proxy metrics must not be installed more than once");
48 0 : }
49 :
50 169 : pub fn get() -> &'static Self {
51 : #[cfg(test)]
52 169 : return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
53 :
54 : #[cfg(not(test))]
55 0 : SELF.get()
56 0 : .expect("proxy metrics must be installed by the main() function")
57 169 : }
58 : }
59 :
60 : #[derive(MetricGroup)]
61 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
62 : pub struct ProxyMetrics {
63 : #[metric(flatten)]
64 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
65 : #[metric(flatten)]
66 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
67 : #[metric(flatten)]
68 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
69 : #[metric(flatten)]
70 : pub http_endpoint_pools: HttpEndpointPools,
71 : #[metric(flatten)]
72 : pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
73 :
74 : /// Time it took for proxy to establish a connection to the compute endpoint.
75 : // largest bucket = 2^16 * 0.5ms = 32s
76 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
77 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
78 :
79 : /// Time it took for proxy to receive a response from control plane.
80 : #[metric(
81 : // largest bucket = 2^16 * 0.2ms = 13s
82 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
83 : )]
84 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
85 :
86 : /// Size of the HTTP request body lengths.
87 : // smallest bucket = 16 bytes
88 : // largest bucket = 4^12 * 16 bytes = 256MB
89 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
90 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
91 :
92 : /// Time it takes to reclaim unused connection pools.
93 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
94 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
95 :
96 : /// Number of opened connections to a database.
97 : pub http_pool_opened_connections: Gauge,
98 :
99 : /// Number of allowed ips
100 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
101 : pub allowed_ips_number: Histogram<10>,
102 :
103 : /// Number of allowed VPC endpoints IDs
104 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
105 : pub allowed_vpc_endpoint_ids: Histogram<10>,
106 :
107 : /// Number of connections, by the method we used to determine the endpoint.
108 : pub accepted_connections_by_sni: CounterVec<SniSet>,
109 :
110 : /// Number of connection failures (per kind).
111 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
112 :
113 : /// Number of wake-up failures (per kind).
114 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
115 :
116 : /// Number of bytes sent/received between all clients and backends.
117 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
118 :
119 : /// Number of IO errors while logging.
120 : pub logging_errors_count: Counter,
121 :
122 : /// Number of errors by a given classification.
123 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
124 :
125 : /// Number of cancellation requests (per found/not_found).
126 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
127 :
128 : /// Number of errors by a given classification
129 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
130 :
131 : /// Number of TLS handshake failures
132 : pub tls_handshake_failures: Counter,
133 :
134 : /// HLL approximate cardinality of endpoints that are connecting
135 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
136 :
137 : /// Number of endpoints affected by errors of a given classification
138 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
139 :
140 : /// Number of retries (per outcome, per retry_type).
141 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
142 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
143 :
144 : /// Number of events consumed from redis (per event type).
145 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
146 :
147 : #[metric(namespace = "connect_compute_lock")]
148 : pub connect_compute_lock: ApiLockMetrics,
149 :
150 : #[metric(namespace = "scram_pool")]
151 : #[metric(init = thread_pool)]
152 : pub scram_pool: Arc<ThreadPoolMetrics>,
153 : }
154 :
155 : #[derive(MetricGroup)]
156 : #[metric(new())]
157 : pub struct ApiLockMetrics {
158 : /// Number of semaphores registered in this api lock
159 : pub semaphores_registered: Counter,
160 : /// Number of semaphores unregistered in this api lock
161 : pub semaphores_unregistered: Counter,
162 : /// Time it takes to reclaim unused semaphores in the api lock
163 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
164 : pub reclamation_lag_seconds: Histogram<16>,
165 : /// Time it takes to acquire a semaphore lock
166 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
167 : pub semaphore_acquire_seconds: Histogram<16>,
168 : }
169 :
170 : impl Default for ApiLockMetrics {
171 102 : fn default() -> Self {
172 102 : Self::new()
173 102 : }
174 : }
175 :
176 : #[derive(FixedCardinalityLabel, Copy, Clone)]
177 : #[label(singleton = "direction")]
178 : pub enum HttpDirection {
179 : Request,
180 : Response,
181 : }
182 :
183 : #[derive(FixedCardinalityLabel, Copy, Clone)]
184 : #[label(singleton = "direction")]
185 : pub enum Direction {
186 : Tx,
187 : Rx,
188 : }
189 :
190 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
191 : #[label(singleton = "protocol")]
192 : pub enum Protocol {
193 : Http,
194 : Ws,
195 : Tcp,
196 : SniRouter,
197 : }
198 :
199 : impl Protocol {
200 6 : pub fn as_str(self) -> &'static str {
201 6 : match self {
202 0 : Protocol::Http => "http",
203 0 : Protocol::Ws => "ws",
204 6 : Protocol::Tcp => "tcp",
205 0 : Protocol::SniRouter => "sni_router",
206 : }
207 6 : }
208 : }
209 :
210 : impl std::fmt::Display for Protocol {
211 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 6 : f.write_str(self.as_str())
213 6 : }
214 : }
215 :
216 : #[derive(FixedCardinalityLabel, Copy, Clone)]
217 : pub enum Bool {
218 : True,
219 : False,
220 : }
221 :
222 : #[derive(FixedCardinalityLabel, Copy, Clone)]
223 : #[label(singleton = "outcome")]
224 : pub enum CacheOutcome {
225 : Hit,
226 : Miss,
227 : }
228 :
229 : #[derive(LabelGroup)]
230 : #[label(set = ConsoleRequestSet)]
231 : pub struct ConsoleRequest<'a> {
232 : #[label(dynamic_with = ThreadedRodeo, default)]
233 : pub request: &'a str,
234 : }
235 :
236 : #[derive(MetricGroup, Default)]
237 : pub struct HttpEndpointPools {
238 : /// Number of endpoints we have registered pools for
239 : pub http_pool_endpoints_registered_total: Counter,
240 : /// Number of endpoints we have unregistered pools for
241 : pub http_pool_endpoints_unregistered_total: Counter,
242 : }
243 :
244 : pub struct HttpEndpointPoolsGuard<'a> {
245 : dec: &'a Counter,
246 : }
247 :
248 : impl Drop for HttpEndpointPoolsGuard<'_> {
249 2 : fn drop(&mut self) {
250 2 : self.dec.inc();
251 2 : }
252 : }
253 :
254 : impl HttpEndpointPools {
255 2 : pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
256 2 : self.http_pool_endpoints_registered_total.inc();
257 2 : HttpEndpointPoolsGuard {
258 2 : dec: &self.http_pool_endpoints_unregistered_total,
259 2 : }
260 2 : }
261 : }
262 : pub struct NumDbConnectionsGauge;
263 : impl CounterPairAssoc for NumDbConnectionsGauge {
264 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
265 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
266 : const INC_HELP: &'static str = "Number of opened connections to a database.";
267 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
268 : type LabelGroupSet = StaticLabelSet<Protocol>;
269 : }
270 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
271 :
272 : pub struct NumClientConnectionsGauge;
273 : impl CounterPairAssoc for NumClientConnectionsGauge {
274 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
275 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
276 : const INC_HELP: &'static str = "Number of opened connections from a client.";
277 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
278 : type LabelGroupSet = StaticLabelSet<Protocol>;
279 : }
280 : pub type NumClientConnectionsGuard<'a> =
281 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
282 :
283 : pub struct NumConnectionRequestsGauge;
284 : impl CounterPairAssoc for NumConnectionRequestsGauge {
285 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
286 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
287 : const INC_HELP: &'static str = "Number of client connections accepted.";
288 : const DEC_HELP: &'static str = "Number of client connections closed.";
289 : type LabelGroupSet = StaticLabelSet<Protocol>;
290 : }
291 : pub type NumConnectionRequestsGuard<'a> =
292 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
293 :
294 : pub struct CancelChannelSizeGauge;
295 : impl CounterPairAssoc for CancelChannelSizeGauge {
296 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
297 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
298 : const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
299 : const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
300 : type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
301 : }
302 : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
303 :
304 : #[derive(LabelGroup)]
305 : #[label(set = ComputeConnectionLatencySet)]
306 : pub struct ComputeConnectionLatencyGroup {
307 : protocol: Protocol,
308 : cold_start_info: ColdStartInfo,
309 : outcome: ConnectOutcome,
310 : excluded: LatencyExclusions,
311 : }
312 :
313 : #[derive(FixedCardinalityLabel, Copy, Clone)]
314 : pub enum LatencyExclusions {
315 : Client,
316 : ClientAndCplane,
317 : ClientCplaneCompute,
318 : ClientCplaneComputeRetry,
319 : }
320 :
321 : #[derive(LabelGroup)]
322 : #[label(set = SniSet)]
323 : pub struct SniGroup {
324 : pub protocol: Protocol,
325 : pub kind: SniKind,
326 : }
327 :
328 : #[derive(FixedCardinalityLabel, Copy, Clone)]
329 : pub enum SniKind {
330 : /// Domain name based routing. SNI for libpq/websockets. Host for HTTP
331 : Sni,
332 : /// Metadata based routing. `options` for libpq/websockets. Header for HTTP
333 : NoSni,
334 : /// Metadata based routing, using the password field.
335 : PasswordHack,
336 : }
337 :
338 : #[derive(FixedCardinalityLabel, Copy, Clone)]
339 : #[label(singleton = "kind")]
340 : pub enum ConnectionFailureKind {
341 : ComputeCached,
342 : ComputeUncached,
343 : }
344 :
345 : #[derive(LabelGroup)]
346 : #[label(set = ConnectionFailuresBreakdownSet)]
347 : pub struct ConnectionFailuresBreakdownGroup {
348 : pub kind: ErrorKind,
349 : pub retry: Bool,
350 : }
351 :
352 : #[derive(LabelGroup, Copy, Clone)]
353 : #[label(set = RedisErrorsSet)]
354 : pub struct RedisErrors<'a> {
355 : #[label(dynamic_with = ThreadedRodeo, default)]
356 : pub channel: &'a str,
357 : }
358 :
359 : #[derive(FixedCardinalityLabel, Copy, Clone)]
360 : pub enum CancellationOutcome {
361 : NotFound,
362 : Found,
363 : RateLimitExceeded,
364 : }
365 :
366 : #[derive(LabelGroup)]
367 : #[label(set = CancellationRequestSet)]
368 : pub struct CancellationRequest {
369 : pub kind: CancellationOutcome,
370 : }
371 :
372 : #[derive(Clone, Copy)]
373 : pub enum Waiting {
374 : Cplane,
375 : Client,
376 : Compute,
377 : RetryTimeout,
378 : }
379 :
380 : #[derive(FixedCardinalityLabel, Copy, Clone)]
381 : #[label(singleton = "kind")]
382 : #[allow(clippy::enum_variant_names)]
383 : pub enum RedisMsgKind {
384 : Set,
385 : Get,
386 : Expire,
387 : HGet,
388 : }
389 :
390 : #[derive(Default, Clone)]
391 : pub struct LatencyAccumulated {
392 : pub cplane: time::Duration,
393 : pub client: time::Duration,
394 : pub compute: time::Duration,
395 : pub retry: time::Duration,
396 : }
397 :
398 : impl std::fmt::Display for LatencyAccumulated {
399 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 0 : write!(
401 0 : f,
402 0 : "client: {}, cplane: {}, compute: {}, retry: {}",
403 0 : self.client.as_micros(),
404 0 : self.cplane.as_micros(),
405 0 : self.compute.as_micros(),
406 0 : self.retry.as_micros()
407 : )
408 0 : }
409 : }
410 :
411 : pub struct LatencyTimer {
412 : // time since the stopwatch was started
413 : start: time::Instant,
414 : // time since the stopwatch was stopped
415 : stop: Option<time::Instant>,
416 : // accumulated time on the stopwatch
417 : accumulated: LatencyAccumulated,
418 : // label data
419 : protocol: Protocol,
420 : cold_start_info: ColdStartInfo,
421 : outcome: ConnectOutcome,
422 :
423 : skip_reporting: bool,
424 : }
425 :
426 : impl LatencyTimer {
427 77 : pub fn new(protocol: Protocol) -> Self {
428 77 : Self {
429 77 : start: time::Instant::now(),
430 77 : stop: None,
431 77 : accumulated: LatencyAccumulated::default(),
432 77 : protocol,
433 77 : cold_start_info: ColdStartInfo::Unknown,
434 77 : // assume failed unless otherwise specified
435 77 : outcome: ConnectOutcome::Failed,
436 77 : skip_reporting: false,
437 77 : }
438 77 : }
439 :
440 0 : pub(crate) fn noop(protocol: Protocol) -> Self {
441 0 : Self {
442 0 : start: time::Instant::now(),
443 0 : stop: None,
444 0 : accumulated: LatencyAccumulated::default(),
445 0 : protocol,
446 0 : cold_start_info: ColdStartInfo::Unknown,
447 0 : // assume failed unless otherwise specified
448 0 : outcome: ConnectOutcome::Failed,
449 0 : skip_reporting: true,
450 0 : }
451 0 : }
452 :
453 52 : pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
454 52 : let dur = start.elapsed();
455 52 : match waiting_for {
456 0 : Waiting::Cplane => self.accumulated.cplane += dur,
457 39 : Waiting::Client => self.accumulated.client += dur,
458 7 : Waiting::Compute => self.accumulated.compute += dur,
459 6 : Waiting::RetryTimeout => self.accumulated.retry += dur,
460 : }
461 52 : }
462 :
463 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
464 0 : self.cold_start_info = cold_start_info;
465 0 : }
466 :
467 8 : pub fn success(&mut self) {
468 : // stop the stopwatch and record the time that we have accumulated
469 8 : self.stop = Some(time::Instant::now());
470 :
471 : // success
472 8 : self.outcome = ConnectOutcome::Success;
473 8 : }
474 :
475 0 : pub fn accumulated(&self) -> LatencyAccumulated {
476 0 : self.accumulated.clone()
477 0 : }
478 : }
479 :
480 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
481 : pub enum ConnectOutcome {
482 : Success,
483 : Failed,
484 : }
485 :
486 : impl Drop for LatencyTimer {
487 77 : fn drop(&mut self) {
488 77 : if self.skip_reporting {
489 0 : return;
490 77 : }
491 :
492 77 : let duration = self
493 77 : .stop
494 77 : .unwrap_or_else(time::Instant::now)
495 77 : .duration_since(self.start);
496 :
497 77 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
498 :
499 : // Excluding client communication from the accumulated time.
500 77 : metric.observe(
501 77 : ComputeConnectionLatencyGroup {
502 77 : protocol: self.protocol,
503 77 : cold_start_info: self.cold_start_info,
504 77 : outcome: self.outcome,
505 77 : excluded: LatencyExclusions::Client,
506 77 : },
507 77 : duration
508 77 : .saturating_sub(self.accumulated.client)
509 77 : .as_secs_f64(),
510 : );
511 :
512 : // Exclude client and cplane communication from the accumulated time.
513 77 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
514 77 : metric.observe(
515 77 : ComputeConnectionLatencyGroup {
516 77 : protocol: self.protocol,
517 77 : cold_start_info: self.cold_start_info,
518 77 : outcome: self.outcome,
519 77 : excluded: LatencyExclusions::ClientAndCplane,
520 77 : },
521 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
522 : );
523 :
524 : // Exclude client, cplane, compute communication from the accumulated time.
525 77 : let accumulated_total =
526 77 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
527 77 : metric.observe(
528 77 : ComputeConnectionLatencyGroup {
529 77 : protocol: self.protocol,
530 77 : cold_start_info: self.cold_start_info,
531 77 : outcome: self.outcome,
532 77 : excluded: LatencyExclusions::ClientCplaneCompute,
533 77 : },
534 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
535 : );
536 :
537 : // Exclude client, cplane, compute, retry communication from the accumulated time.
538 77 : let accumulated_total = self.accumulated.client
539 77 : + self.accumulated.cplane
540 77 : + self.accumulated.compute
541 77 : + self.accumulated.retry;
542 77 : metric.observe(
543 77 : ComputeConnectionLatencyGroup {
544 77 : protocol: self.protocol,
545 77 : cold_start_info: self.cold_start_info,
546 77 : outcome: self.outcome,
547 77 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
548 77 : },
549 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
550 : );
551 77 : }
552 : }
553 :
554 : impl From<bool> for Bool {
555 3 : fn from(value: bool) -> Self {
556 3 : if value { Bool::True } else { Bool::False }
557 3 : }
558 : }
559 :
560 : #[derive(LabelGroup)]
561 : #[label(set = InvalidEndpointsSet)]
562 : pub struct InvalidEndpointsGroup {
563 : pub protocol: Protocol,
564 : pub rejected: Bool,
565 : pub outcome: ConnectOutcome,
566 : }
567 :
568 : #[derive(LabelGroup)]
569 : #[label(set = RetriesMetricSet)]
570 : pub struct RetriesMetricGroup {
571 : pub outcome: ConnectOutcome,
572 : pub retry_type: RetryType,
573 : }
574 :
575 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
576 : pub enum RetryType {
577 : WakeCompute,
578 : ConnectToCompute,
579 : }
580 :
581 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
582 : #[label(singleton = "event")]
583 : pub enum RedisEventsCount {
584 : EndpointCreated,
585 : BranchCreated,
586 : ProjectCreated,
587 : CancelSession,
588 : InvalidateRole,
589 : InvalidateEndpoint,
590 : InvalidateProject,
591 : InvalidateProjects,
592 : InvalidateOrg,
593 : }
594 :
595 : pub struct ThreadPoolWorkers(usize);
596 : #[derive(Copy, Clone)]
597 : pub struct ThreadPoolWorkerId(pub usize);
598 :
599 : impl LabelValue for ThreadPoolWorkerId {
600 0 : fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
601 0 : v.write_int(self.0 as i64)
602 0 : }
603 : }
604 :
605 : impl LabelGroup for ThreadPoolWorkerId {
606 0 : fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
607 0 : v.write_value(LabelName::from_str("worker"), self);
608 0 : }
609 : }
610 :
611 : impl LabelGroupSet for ThreadPoolWorkers {
612 : type Group<'a> = ThreadPoolWorkerId;
613 :
614 114 : fn cardinality(&self) -> Option<usize> {
615 114 : Some(self.0)
616 114 : }
617 :
618 5 : fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
619 5 : Some(value)
620 5 : }
621 :
622 0 : fn decode_dense(&self, value: usize) -> Self::Group<'_> {
623 0 : ThreadPoolWorkerId(value)
624 0 : }
625 :
626 : type Unique = usize;
627 :
628 5 : fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
629 5 : Some(value.0)
630 5 : }
631 :
632 0 : fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
633 0 : ThreadPoolWorkerId(*value)
634 0 : }
635 : }
636 :
637 : impl LabelSet for ThreadPoolWorkers {
638 : type Value<'a> = ThreadPoolWorkerId;
639 :
640 0 : fn dynamic_cardinality(&self) -> Option<usize> {
641 0 : Some(self.0)
642 0 : }
643 :
644 0 : fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
645 0 : (value.0 < self.0).then_some(value.0)
646 0 : }
647 :
648 0 : fn decode(&self, value: usize) -> Self::Value<'_> {
649 0 : ThreadPoolWorkerId(value)
650 0 : }
651 : }
652 :
653 : impl FixedCardinalitySet for ThreadPoolWorkers {
654 0 : fn cardinality(&self) -> usize {
655 0 : self.0
656 0 : }
657 : }
658 :
659 : #[derive(MetricGroup)]
660 : #[metric(new(workers: usize))]
661 : pub struct ThreadPoolMetrics {
662 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
663 : pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
664 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
665 : pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
666 : }
667 :
668 : #[derive(MetricGroup, Default)]
669 : pub struct ServiceMetrics {
670 : pub info: InfoMetric<ServiceInfo>,
671 : }
672 :
673 : #[derive(Default)]
674 : pub struct ServiceInfo {
675 : pub state: ServiceState,
676 : }
677 :
678 : impl ServiceInfo {
679 0 : pub const fn running() -> Self {
680 0 : ServiceInfo {
681 0 : state: ServiceState::Running,
682 0 : }
683 0 : }
684 :
685 0 : pub const fn terminating() -> Self {
686 0 : ServiceInfo {
687 0 : state: ServiceState::Terminating,
688 0 : }
689 0 : }
690 : }
691 :
692 : impl LabelGroup for ServiceInfo {
693 0 : fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
694 : const STATE: &LabelName = LabelName::from_str("state");
695 0 : v.write_value(STATE, &self.state);
696 0 : }
697 : }
698 :
699 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug, Default)]
700 : #[label(singleton = "state")]
701 : pub enum ServiceState {
702 : #[default]
703 : Init,
704 : Running,
705 : Terminating,
706 : }
|