Line data Source code
1 : use std::sync::{Arc, OnceLock};
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::label::{
5 : FixedCardinalitySet, LabelGroupSet, LabelGroupVisitor, LabelName, LabelSet, LabelValue,
6 : StaticLabelSet,
7 : };
8 : use measured::metric::group::Encoding;
9 : use measured::metric::histogram::Thresholds;
10 : use measured::metric::name::MetricName;
11 : use measured::{
12 : Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
13 : LabelGroup, MetricGroup,
14 : };
15 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
16 : use tokio::time::{self, Instant};
17 :
18 : use crate::control_plane::messages::ColdStartInfo;
19 : use crate::error::ErrorKind;
20 :
21 : #[derive(MetricGroup)]
22 : #[metric(new())]
23 : pub struct Metrics {
24 : #[metric(namespace = "proxy")]
25 : #[metric(init = ProxyMetrics::new())]
26 : pub proxy: ProxyMetrics,
27 :
28 : #[metric(namespace = "wake_compute_lock")]
29 : pub wake_compute_lock: ApiLockMetrics,
30 :
31 : #[metric(namespace = "service")]
32 : pub service: ServiceMetrics,
33 :
34 : #[metric(namespace = "cache")]
35 : pub cache: CacheMetrics,
36 : }
37 :
38 : impl Metrics {
39 : #[track_caller]
40 344 : pub fn get() -> &'static Self {
41 : static SELF: OnceLock<Metrics> = OnceLock::new();
42 :
43 344 : SELF.get_or_init(|| {
44 59 : let mut metrics = Metrics::new();
45 :
46 59 : metrics.proxy.errors_total.init_all_dense();
47 59 : metrics.proxy.redis_errors_total.init_all_dense();
48 59 : metrics.proxy.redis_events_count.init_all_dense();
49 59 : metrics.proxy.retries_metric.init_all_dense();
50 59 : metrics.proxy.connection_failures_total.init_all_dense();
51 :
52 59 : metrics
53 59 : })
54 344 : }
55 : }
56 :
57 : #[derive(MetricGroup)]
58 : #[metric(new())]
59 : pub struct ProxyMetrics {
60 : #[metric(flatten)]
61 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
62 : #[metric(flatten)]
63 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
64 : #[metric(flatten)]
65 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
66 : #[metric(flatten)]
67 : pub http_endpoint_pools: HttpEndpointPools,
68 : #[metric(flatten)]
69 : pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
70 :
71 : /// Time it took for proxy to establish a connection to the compute endpoint.
72 : // largest bucket = 2^16 * 0.5ms = 32s
73 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
74 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
75 :
76 : /// Time it took for proxy to receive a response from control plane.
77 : #[metric(
78 : // largest bucket = 2^16 * 0.2ms = 13s
79 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
80 : )]
81 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
82 :
83 : /// Size of the HTTP request body lengths.
84 : // smallest bucket = 16 bytes
85 : // largest bucket = 4^12 * 16 bytes = 256MB
86 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
87 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
88 :
89 : /// Time it takes to reclaim unused connection pools.
90 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
91 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
92 :
93 : /// Number of opened connections to a database.
94 : pub http_pool_opened_connections: Gauge,
95 :
96 : /// Number of allowed ips
97 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
98 : pub allowed_ips_number: Histogram<10>,
99 :
100 : /// Number of allowed VPC endpoints IDs
101 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
102 : pub allowed_vpc_endpoint_ids: Histogram<10>,
103 :
104 : /// Number of connections, by the method we used to determine the endpoint.
105 : pub accepted_connections_by_sni: CounterVec<SniSet>,
106 :
107 : /// Number of connection failures (per kind).
108 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
109 :
110 : /// Number of wake-up failures (per kind).
111 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
112 :
113 : /// Number of bytes sent/received between all clients and backends.
114 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
115 :
116 : /// Number of IO errors while logging.
117 : pub logging_errors_count: Counter,
118 :
119 : /// Number of errors by a given classification.
120 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
121 :
122 : /// Number of cancellation requests (per found/not_found).
123 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
124 :
125 : /// Number of errors by a given classification
126 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
127 :
128 : /// Number of TLS handshake failures
129 : pub tls_handshake_failures: Counter,
130 :
131 : /// Number of SHA 256 rounds executed.
132 : pub sha_rounds: Counter,
133 :
134 : /// HLL approximate cardinality of endpoints that are connecting
135 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
136 :
137 : /// Number of endpoints affected by errors of a given classification
138 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
139 :
140 : /// Number of retries (per outcome, per retry_type).
141 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
142 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
143 :
144 : /// Number of events consumed from redis (per event type).
145 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
146 :
147 : #[metric(namespace = "connect_compute_lock")]
148 : pub connect_compute_lock: ApiLockMetrics,
149 :
150 : #[metric(namespace = "scram_pool")]
151 : pub scram_pool: OnceLockWrapper<Arc<ThreadPoolMetrics>>,
152 : }
153 :
154 : /// A Wrapper over [`OnceLock`] to implement [`MetricGroup`].
155 : pub struct OnceLockWrapper<T>(pub OnceLock<T>);
156 :
157 : impl<T> Default for OnceLockWrapper<T> {
158 59 : fn default() -> Self {
159 59 : Self(OnceLock::new())
160 59 : }
161 : }
162 :
163 : impl<Enc: Encoding, T: MetricGroup<Enc>> MetricGroup<Enc> for OnceLockWrapper<T> {
164 0 : fn collect_group_into(&self, enc: &mut Enc) -> Result<(), Enc::Err> {
165 0 : if let Some(inner) = self.0.get() {
166 0 : inner.collect_group_into(enc)?;
167 0 : }
168 0 : Ok(())
169 0 : }
170 : }
171 :
172 : #[derive(MetricGroup)]
173 : #[metric(new())]
174 : pub struct ApiLockMetrics {
175 : /// Number of semaphores registered in this api lock
176 : pub semaphores_registered: Counter,
177 : /// Number of semaphores unregistered in this api lock
178 : pub semaphores_unregistered: Counter,
179 : /// Time it takes to reclaim unused semaphores in the api lock
180 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
181 : pub reclamation_lag_seconds: Histogram<16>,
182 : /// Time it takes to acquire a semaphore lock
183 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
184 : pub semaphore_acquire_seconds: Histogram<16>,
185 : }
186 :
187 : impl Default for ApiLockMetrics {
188 118 : fn default() -> Self {
189 118 : Self::new()
190 118 : }
191 : }
192 :
193 : #[derive(FixedCardinalityLabel, Copy, Clone)]
194 : #[label(singleton = "direction")]
195 : pub enum HttpDirection {
196 : Request,
197 : Response,
198 : }
199 :
200 : #[derive(FixedCardinalityLabel, Copy, Clone)]
201 : #[label(singleton = "direction")]
202 : pub enum Direction {
203 : Tx,
204 : Rx,
205 : }
206 :
207 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
208 : #[label(singleton = "protocol")]
209 : pub enum Protocol {
210 : Http,
211 : Ws,
212 : Tcp,
213 : SniRouter,
214 : }
215 :
216 : impl Protocol {
217 6 : pub fn as_str(self) -> &'static str {
218 6 : match self {
219 0 : Protocol::Http => "http",
220 0 : Protocol::Ws => "ws",
221 6 : Protocol::Tcp => "tcp",
222 0 : Protocol::SniRouter => "sni_router",
223 : }
224 6 : }
225 : }
226 :
227 : impl std::fmt::Display for Protocol {
228 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 6 : f.write_str(self.as_str())
230 6 : }
231 : }
232 :
233 : #[derive(FixedCardinalityLabel, Copy, Clone)]
234 : pub enum Bool {
235 : True,
236 : False,
237 : }
238 :
239 : #[derive(LabelGroup)]
240 : #[label(set = ConsoleRequestSet)]
241 : pub struct ConsoleRequest<'a> {
242 : #[label(dynamic_with = ThreadedRodeo, default)]
243 : pub request: &'a str,
244 : }
245 :
246 : #[derive(MetricGroup, Default)]
247 : pub struct HttpEndpointPools {
248 : /// Number of endpoints we have registered pools for
249 : pub http_pool_endpoints_registered_total: Counter,
250 : /// Number of endpoints we have unregistered pools for
251 : pub http_pool_endpoints_unregistered_total: Counter,
252 : }
253 :
254 : pub struct HttpEndpointPoolsGuard<'a> {
255 : dec: &'a Counter,
256 : }
257 :
258 : impl Drop for HttpEndpointPoolsGuard<'_> {
259 2 : fn drop(&mut self) {
260 2 : self.dec.inc();
261 2 : }
262 : }
263 :
264 : impl HttpEndpointPools {
265 2 : pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
266 2 : self.http_pool_endpoints_registered_total.inc();
267 2 : HttpEndpointPoolsGuard {
268 2 : dec: &self.http_pool_endpoints_unregistered_total,
269 2 : }
270 2 : }
271 : }
272 : pub struct NumDbConnectionsGauge;
273 : impl CounterPairAssoc for NumDbConnectionsGauge {
274 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
275 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
276 : const INC_HELP: &'static str = "Number of opened connections to a database.";
277 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
278 : type LabelGroupSet = StaticLabelSet<Protocol>;
279 : }
280 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
281 :
282 : pub struct NumClientConnectionsGauge;
283 : impl CounterPairAssoc for NumClientConnectionsGauge {
284 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
285 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
286 : const INC_HELP: &'static str = "Number of opened connections from a client.";
287 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
288 : type LabelGroupSet = StaticLabelSet<Protocol>;
289 : }
290 : pub type NumClientConnectionsGuard<'a> =
291 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
292 :
293 : pub struct NumConnectionRequestsGauge;
294 : impl CounterPairAssoc for NumConnectionRequestsGauge {
295 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
296 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
297 : const INC_HELP: &'static str = "Number of client connections accepted.";
298 : const DEC_HELP: &'static str = "Number of client connections closed.";
299 : type LabelGroupSet = StaticLabelSet<Protocol>;
300 : }
301 : pub type NumConnectionRequestsGuard<'a> =
302 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
303 :
304 : pub struct CancelChannelSizeGauge;
305 : impl CounterPairAssoc for CancelChannelSizeGauge {
306 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
307 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
308 : const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
309 : const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
310 : type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
311 : }
312 : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
313 :
314 : #[derive(LabelGroup)]
315 : #[label(set = ComputeConnectionLatencySet)]
316 : pub struct ComputeConnectionLatencyGroup {
317 : protocol: Protocol,
318 : cold_start_info: ColdStartInfo,
319 : outcome: ConnectOutcome,
320 : excluded: LatencyExclusions,
321 : }
322 :
323 : #[derive(FixedCardinalityLabel, Copy, Clone)]
324 : pub enum LatencyExclusions {
325 : Client,
326 : ClientAndCplane,
327 : ClientCplaneCompute,
328 : ClientCplaneComputeRetry,
329 : }
330 :
331 : #[derive(LabelGroup)]
332 : #[label(set = SniSet)]
333 : pub struct SniGroup {
334 : pub protocol: Protocol,
335 : pub kind: SniKind,
336 : }
337 :
338 : #[derive(FixedCardinalityLabel, Copy, Clone)]
339 : pub enum SniKind {
340 : /// Domain name based routing. SNI for libpq/websockets. Host for HTTP
341 : Sni,
342 : /// Metadata based routing. `options` for libpq/websockets. Header for HTTP
343 : NoSni,
344 : /// Metadata based routing, using the password field.
345 : PasswordHack,
346 : }
347 :
348 : #[derive(FixedCardinalityLabel, Copy, Clone)]
349 : #[label(singleton = "kind")]
350 : pub enum ConnectionFailureKind {
351 : ComputeCached,
352 : ComputeUncached,
353 : }
354 :
355 : #[derive(LabelGroup)]
356 : #[label(set = ConnectionFailuresBreakdownSet)]
357 : pub struct ConnectionFailuresBreakdownGroup {
358 : pub kind: ErrorKind,
359 : pub retry: Bool,
360 : }
361 :
362 : #[derive(LabelGroup, Copy, Clone)]
363 : #[label(set = RedisErrorsSet)]
364 : pub struct RedisErrors<'a> {
365 : #[label(dynamic_with = ThreadedRodeo, default)]
366 : pub channel: &'a str,
367 : }
368 :
369 : #[derive(FixedCardinalityLabel, Copy, Clone)]
370 : pub enum CancellationOutcome {
371 : NotFound,
372 : Found,
373 : RateLimitExceeded,
374 : }
375 :
376 : #[derive(LabelGroup)]
377 : #[label(set = CancellationRequestSet)]
378 : pub struct CancellationRequest {
379 : pub kind: CancellationOutcome,
380 : }
381 :
382 : #[derive(Clone, Copy)]
383 : pub enum Waiting {
384 : Cplane,
385 : Client,
386 : Compute,
387 : RetryTimeout,
388 : }
389 :
390 : #[derive(FixedCardinalityLabel, Copy, Clone)]
391 : #[label(singleton = "kind")]
392 : #[allow(clippy::enum_variant_names)]
393 : pub enum RedisMsgKind {
394 : Set,
395 : Get,
396 : Expire,
397 : HGet,
398 : }
399 :
400 : #[derive(Default, Clone)]
401 : pub struct LatencyAccumulated {
402 : pub cplane: time::Duration,
403 : pub client: time::Duration,
404 : pub compute: time::Duration,
405 : pub retry: time::Duration,
406 : }
407 :
408 : impl std::fmt::Display for LatencyAccumulated {
409 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410 0 : write!(
411 0 : f,
412 0 : "client: {}, cplane: {}, compute: {}, retry: {}",
413 0 : self.client.as_micros(),
414 0 : self.cplane.as_micros(),
415 0 : self.compute.as_micros(),
416 0 : self.retry.as_micros()
417 : )
418 0 : }
419 : }
420 :
421 : pub struct LatencyTimer {
422 : // time since the stopwatch was started
423 : start: time::Instant,
424 : // time since the stopwatch was stopped
425 : stop: Option<time::Instant>,
426 : // accumulated time on the stopwatch
427 : accumulated: LatencyAccumulated,
428 : // label data
429 : protocol: Protocol,
430 : cold_start_info: ColdStartInfo,
431 : outcome: ConnectOutcome,
432 :
433 : skip_reporting: bool,
434 : }
435 :
436 : impl LatencyTimer {
437 77 : pub fn new(protocol: Protocol) -> Self {
438 77 : Self {
439 77 : start: time::Instant::now(),
440 77 : stop: None,
441 77 : accumulated: LatencyAccumulated::default(),
442 77 : protocol,
443 77 : cold_start_info: ColdStartInfo::Unknown,
444 77 : // assume failed unless otherwise specified
445 77 : outcome: ConnectOutcome::Failed,
446 77 : skip_reporting: false,
447 77 : }
448 77 : }
449 :
450 0 : pub(crate) fn noop(protocol: Protocol) -> Self {
451 0 : Self {
452 0 : start: time::Instant::now(),
453 0 : stop: None,
454 0 : accumulated: LatencyAccumulated::default(),
455 0 : protocol,
456 0 : cold_start_info: ColdStartInfo::Unknown,
457 0 : // assume failed unless otherwise specified
458 0 : outcome: ConnectOutcome::Failed,
459 0 : skip_reporting: true,
460 0 : }
461 0 : }
462 :
463 52 : pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
464 52 : let dur = start.elapsed();
465 52 : match waiting_for {
466 0 : Waiting::Cplane => self.accumulated.cplane += dur,
467 39 : Waiting::Client => self.accumulated.client += dur,
468 7 : Waiting::Compute => self.accumulated.compute += dur,
469 6 : Waiting::RetryTimeout => self.accumulated.retry += dur,
470 : }
471 52 : }
472 :
473 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
474 0 : self.cold_start_info = cold_start_info;
475 0 : }
476 :
477 8 : pub fn success(&mut self) {
478 : // stop the stopwatch and record the time that we have accumulated
479 8 : self.stop = Some(time::Instant::now());
480 :
481 : // success
482 8 : self.outcome = ConnectOutcome::Success;
483 8 : }
484 :
485 0 : pub fn accumulated(&self) -> LatencyAccumulated {
486 0 : self.accumulated.clone()
487 0 : }
488 : }
489 :
490 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
491 : pub enum ConnectOutcome {
492 : Success,
493 : Failed,
494 : }
495 :
496 : impl Drop for LatencyTimer {
497 77 : fn drop(&mut self) {
498 77 : if self.skip_reporting {
499 0 : return;
500 77 : }
501 :
502 77 : let duration = self
503 77 : .stop
504 77 : .unwrap_or_else(time::Instant::now)
505 77 : .duration_since(self.start);
506 :
507 77 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
508 :
509 : // Excluding client communication from the accumulated time.
510 77 : metric.observe(
511 77 : ComputeConnectionLatencyGroup {
512 77 : protocol: self.protocol,
513 77 : cold_start_info: self.cold_start_info,
514 77 : outcome: self.outcome,
515 77 : excluded: LatencyExclusions::Client,
516 77 : },
517 77 : duration
518 77 : .saturating_sub(self.accumulated.client)
519 77 : .as_secs_f64(),
520 : );
521 :
522 : // Exclude client and cplane communication from the accumulated time.
523 77 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
524 77 : metric.observe(
525 77 : ComputeConnectionLatencyGroup {
526 77 : protocol: self.protocol,
527 77 : cold_start_info: self.cold_start_info,
528 77 : outcome: self.outcome,
529 77 : excluded: LatencyExclusions::ClientAndCplane,
530 77 : },
531 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
532 : );
533 :
534 : // Exclude client, cplane, compute communication from the accumulated time.
535 77 : let accumulated_total =
536 77 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
537 77 : metric.observe(
538 77 : ComputeConnectionLatencyGroup {
539 77 : protocol: self.protocol,
540 77 : cold_start_info: self.cold_start_info,
541 77 : outcome: self.outcome,
542 77 : excluded: LatencyExclusions::ClientCplaneCompute,
543 77 : },
544 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
545 : );
546 :
547 : // Exclude client, cplane, compute, retry communication from the accumulated time.
548 77 : let accumulated_total = self.accumulated.client
549 77 : + self.accumulated.cplane
550 77 : + self.accumulated.compute
551 77 : + self.accumulated.retry;
552 77 : metric.observe(
553 77 : ComputeConnectionLatencyGroup {
554 77 : protocol: self.protocol,
555 77 : cold_start_info: self.cold_start_info,
556 77 : outcome: self.outcome,
557 77 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
558 77 : },
559 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
560 : );
561 77 : }
562 : }
563 :
564 : impl From<bool> for Bool {
565 3 : fn from(value: bool) -> Self {
566 3 : if value { Bool::True } else { Bool::False }
567 3 : }
568 : }
569 :
570 : #[derive(LabelGroup)]
571 : #[label(set = RetriesMetricSet)]
572 : pub struct RetriesMetricGroup {
573 : pub outcome: ConnectOutcome,
574 : pub retry_type: RetryType,
575 : }
576 :
577 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
578 : pub enum RetryType {
579 : WakeCompute,
580 : ConnectToCompute,
581 : }
582 :
583 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
584 : #[label(singleton = "event")]
585 : pub enum RedisEventsCount {
586 : EndpointCreated,
587 : BranchCreated,
588 : ProjectCreated,
589 : CancelSession,
590 : InvalidateRole,
591 : InvalidateEndpoint,
592 : InvalidateProject,
593 : InvalidateProjects,
594 : InvalidateOrg,
595 : }
596 :
597 : pub struct ThreadPoolWorkers(usize);
598 : #[derive(Copy, Clone)]
599 : pub struct ThreadPoolWorkerId(pub usize);
600 :
601 : impl LabelValue for ThreadPoolWorkerId {
602 0 : fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
603 0 : v.write_int(self.0 as i64)
604 0 : }
605 : }
606 :
607 : impl LabelGroup for ThreadPoolWorkerId {
608 0 : fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
609 0 : v.write_value(LabelName::from_str("worker"), self);
610 0 : }
611 : }
612 :
613 : impl LabelGroupSet for ThreadPoolWorkers {
614 : type Group<'a> = ThreadPoolWorkerId;
615 :
616 14 : fn cardinality(&self) -> Option<usize> {
617 14 : Some(self.0)
618 14 : }
619 :
620 16 : fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
621 16 : Some(value)
622 16 : }
623 :
624 0 : fn decode_dense(&self, value: usize) -> Self::Group<'_> {
625 0 : ThreadPoolWorkerId(value)
626 0 : }
627 :
628 : type Unique = usize;
629 :
630 16 : fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
631 16 : Some(value.0)
632 16 : }
633 :
634 0 : fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
635 0 : ThreadPoolWorkerId(*value)
636 0 : }
637 : }
638 :
639 : impl LabelSet for ThreadPoolWorkers {
640 : type Value<'a> = ThreadPoolWorkerId;
641 :
642 0 : fn dynamic_cardinality(&self) -> Option<usize> {
643 0 : Some(self.0)
644 0 : }
645 :
646 0 : fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
647 0 : (value.0 < self.0).then_some(value.0)
648 0 : }
649 :
650 0 : fn decode(&self, value: usize) -> Self::Value<'_> {
651 0 : ThreadPoolWorkerId(value)
652 0 : }
653 : }
654 :
655 : impl FixedCardinalitySet for ThreadPoolWorkers {
656 0 : fn cardinality(&self) -> usize {
657 0 : self.0
658 0 : }
659 : }
660 :
661 : #[derive(MetricGroup)]
662 : #[metric(new(workers: usize))]
663 : pub struct ThreadPoolMetrics {
664 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
665 : pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
666 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
667 : pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
668 : }
669 :
670 : #[derive(MetricGroup, Default)]
671 : pub struct ServiceMetrics {
672 : pub info: InfoMetric<ServiceInfo>,
673 : }
674 :
675 : #[derive(Default)]
676 : pub struct ServiceInfo {
677 : pub state: ServiceState,
678 : }
679 :
680 : impl ServiceInfo {
681 0 : pub const fn running() -> Self {
682 0 : ServiceInfo {
683 0 : state: ServiceState::Running,
684 0 : }
685 0 : }
686 :
687 0 : pub const fn terminating() -> Self {
688 0 : ServiceInfo {
689 0 : state: ServiceState::Terminating,
690 0 : }
691 0 : }
692 : }
693 :
694 : impl LabelGroup for ServiceInfo {
695 0 : fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
696 : const STATE: &LabelName = LabelName::from_str("state");
697 0 : v.write_value(STATE, &self.state);
698 0 : }
699 : }
700 :
701 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug, Default)]
702 : #[label(singleton = "state")]
703 : pub enum ServiceState {
704 : #[default]
705 : Init,
706 : Running,
707 : Terminating,
708 : }
709 :
710 : #[derive(MetricGroup)]
711 : #[metric(new())]
712 : pub struct CacheMetrics {
713 : /// The capacity of the cache
714 : pub capacity: GaugeVec<StaticLabelSet<CacheKind>>,
715 : /// The total number of entries inserted into the cache
716 : pub inserted_total: CounterVec<StaticLabelSet<CacheKind>>,
717 : /// The total number of entries removed from the cache
718 : pub evicted_total: CounterVec<CacheEvictionSet>,
719 : /// The total number of cache requests
720 : pub request_total: CounterVec<CacheOutcomeSet>,
721 : }
722 :
723 : impl Default for CacheMetrics {
724 59 : fn default() -> Self {
725 59 : Self::new()
726 59 : }
727 : }
728 :
729 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
730 : #[label(singleton = "cache")]
731 : pub enum CacheKind {
732 : NodeInfo,
733 : ProjectInfoEndpoints,
734 : ProjectInfoRoles,
735 : Schema,
736 : Pbkdf2,
737 : }
738 :
739 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
740 : pub enum CacheRemovalCause {
741 : Expired,
742 : Explicit,
743 : Replaced,
744 : Size,
745 : }
746 :
747 : #[derive(LabelGroup)]
748 : #[label(set = CacheEvictionSet)]
749 : pub struct CacheEviction {
750 : pub cache: CacheKind,
751 : pub cause: CacheRemovalCause,
752 : }
753 :
754 : #[derive(FixedCardinalityLabel, Copy, Clone)]
755 : pub enum CacheOutcome {
756 : Hit,
757 : Miss,
758 : }
759 :
760 : #[derive(LabelGroup)]
761 : #[label(set = CacheOutcomeSet)]
762 : pub struct CacheOutcomeGroup {
763 : pub cache: CacheKind,
764 : pub outcome: CacheOutcome,
765 : }
|