Line data Source code
1 : use std::sync::{Arc, OnceLock};
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::label::{
5 : FixedCardinalitySet, LabelGroupSet, LabelGroupVisitor, LabelName, LabelSet, LabelValue,
6 : StaticLabelSet,
7 : };
8 : use measured::metric::histogram::Thresholds;
9 : use measured::metric::name::MetricName;
10 : use measured::{
11 : Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
12 : LabelGroup, MetricGroup,
13 : };
14 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
15 : use tokio::time::{self, Instant};
16 :
17 : use crate::control_plane::messages::ColdStartInfo;
18 : use crate::error::ErrorKind;
19 :
20 : #[derive(MetricGroup)]
21 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
22 : pub struct Metrics {
23 : #[metric(namespace = "proxy")]
24 : #[metric(init = ProxyMetrics::new(thread_pool))]
25 : pub proxy: ProxyMetrics,
26 :
27 : #[metric(namespace = "wake_compute_lock")]
28 : pub wake_compute_lock: ApiLockMetrics,
29 :
30 : #[metric(namespace = "service")]
31 : pub service: ServiceMetrics,
32 :
33 : #[metric(namespace = "cache")]
34 : pub cache: CacheMetrics,
35 : }
36 :
37 : static SELF: OnceLock<Metrics> = OnceLock::new();
38 : impl Metrics {
39 0 : pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
40 0 : let mut metrics = Metrics::new(thread_pool);
41 :
42 0 : metrics.proxy.errors_total.init_all_dense();
43 0 : metrics.proxy.redis_errors_total.init_all_dense();
44 0 : metrics.proxy.redis_events_count.init_all_dense();
45 0 : metrics.proxy.retries_metric.init_all_dense();
46 0 : metrics.proxy.connection_failures_total.init_all_dense();
47 :
48 0 : SELF.set(metrics)
49 0 : .ok()
50 0 : .expect("proxy metrics must not be installed more than once");
51 0 : }
52 :
53 241 : pub fn get() -> &'static Self {
54 : #[cfg(test)]
55 241 : return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
56 :
57 : #[cfg(not(test))]
58 0 : SELF.get()
59 0 : .expect("proxy metrics must be installed by the main() function")
60 241 : }
61 : }
62 :
63 : #[derive(MetricGroup)]
64 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
65 : pub struct ProxyMetrics {
66 : #[metric(flatten)]
67 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
68 : #[metric(flatten)]
69 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
70 : #[metric(flatten)]
71 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
72 : #[metric(flatten)]
73 : pub http_endpoint_pools: HttpEndpointPools,
74 : #[metric(flatten)]
75 : pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
76 :
77 : /// Time it took for proxy to establish a connection to the compute endpoint.
78 : // largest bucket = 2^16 * 0.5ms = 32s
79 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
80 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
81 :
82 : /// Time it took for proxy to receive a response from control plane.
83 : #[metric(
84 : // largest bucket = 2^16 * 0.2ms = 13s
85 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
86 : )]
87 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
88 :
89 : /// Size of the HTTP request body lengths.
90 : // smallest bucket = 16 bytes
91 : // largest bucket = 4^12 * 16 bytes = 256MB
92 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
93 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
94 :
95 : /// Time it takes to reclaim unused connection pools.
96 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
97 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
98 :
99 : /// Number of opened connections to a database.
100 : pub http_pool_opened_connections: Gauge,
101 :
102 : /// Number of allowed ips
103 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
104 : pub allowed_ips_number: Histogram<10>,
105 :
106 : /// Number of allowed VPC endpoints IDs
107 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
108 : pub allowed_vpc_endpoint_ids: Histogram<10>,
109 :
110 : /// Number of connections, by the method we used to determine the endpoint.
111 : pub accepted_connections_by_sni: CounterVec<SniSet>,
112 :
113 : /// Number of connection failures (per kind).
114 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
115 :
116 : /// Number of wake-up failures (per kind).
117 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
118 :
119 : /// Number of bytes sent/received between all clients and backends.
120 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
121 :
122 : /// Number of IO errors while logging.
123 : pub logging_errors_count: Counter,
124 :
125 : /// Number of errors by a given classification.
126 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
127 :
128 : /// Number of cancellation requests (per found/not_found).
129 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
130 :
131 : /// Number of errors by a given classification
132 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
133 :
134 : /// Number of TLS handshake failures
135 : pub tls_handshake_failures: Counter,
136 :
137 : /// HLL approximate cardinality of endpoints that are connecting
138 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
139 :
140 : /// Number of endpoints affected by errors of a given classification
141 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
142 :
143 : /// Number of retries (per outcome, per retry_type).
144 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
145 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
146 :
147 : /// Number of events consumed from redis (per event type).
148 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
149 :
150 : #[metric(namespace = "connect_compute_lock")]
151 : pub connect_compute_lock: ApiLockMetrics,
152 :
153 : #[metric(namespace = "scram_pool")]
154 : #[metric(init = thread_pool)]
155 : pub scram_pool: Arc<ThreadPoolMetrics>,
156 : }
157 :
158 : #[derive(MetricGroup)]
159 : #[metric(new())]
160 : pub struct ApiLockMetrics {
161 : /// Number of semaphores registered in this api lock
162 : pub semaphores_registered: Counter,
163 : /// Number of semaphores unregistered in this api lock
164 : pub semaphores_unregistered: Counter,
165 : /// Time it takes to reclaim unused semaphores in the api lock
166 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
167 : pub reclamation_lag_seconds: Histogram<16>,
168 : /// Time it takes to acquire a semaphore lock
169 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
170 : pub semaphore_acquire_seconds: Histogram<16>,
171 : }
172 :
173 : impl Default for ApiLockMetrics {
174 106 : fn default() -> Self {
175 106 : Self::new()
176 106 : }
177 : }
178 :
179 : #[derive(FixedCardinalityLabel, Copy, Clone)]
180 : #[label(singleton = "direction")]
181 : pub enum HttpDirection {
182 : Request,
183 : Response,
184 : }
185 :
186 : #[derive(FixedCardinalityLabel, Copy, Clone)]
187 : #[label(singleton = "direction")]
188 : pub enum Direction {
189 : Tx,
190 : Rx,
191 : }
192 :
193 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
194 : #[label(singleton = "protocol")]
195 : pub enum Protocol {
196 : Http,
197 : Ws,
198 : Tcp,
199 : SniRouter,
200 : }
201 :
202 : impl Protocol {
203 6 : pub fn as_str(self) -> &'static str {
204 6 : match self {
205 0 : Protocol::Http => "http",
206 0 : Protocol::Ws => "ws",
207 6 : Protocol::Tcp => "tcp",
208 0 : Protocol::SniRouter => "sni_router",
209 : }
210 6 : }
211 : }
212 :
213 : impl std::fmt::Display for Protocol {
214 6 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 6 : f.write_str(self.as_str())
216 6 : }
217 : }
218 :
219 : #[derive(FixedCardinalityLabel, Copy, Clone)]
220 : pub enum Bool {
221 : True,
222 : False,
223 : }
224 :
225 : #[derive(LabelGroup)]
226 : #[label(set = ConsoleRequestSet)]
227 : pub struct ConsoleRequest<'a> {
228 : #[label(dynamic_with = ThreadedRodeo, default)]
229 : pub request: &'a str,
230 : }
231 :
232 : #[derive(MetricGroup, Default)]
233 : pub struct HttpEndpointPools {
234 : /// Number of endpoints we have registered pools for
235 : pub http_pool_endpoints_registered_total: Counter,
236 : /// Number of endpoints we have unregistered pools for
237 : pub http_pool_endpoints_unregistered_total: Counter,
238 : }
239 :
240 : pub struct HttpEndpointPoolsGuard<'a> {
241 : dec: &'a Counter,
242 : }
243 :
244 : impl Drop for HttpEndpointPoolsGuard<'_> {
245 2 : fn drop(&mut self) {
246 2 : self.dec.inc();
247 2 : }
248 : }
249 :
250 : impl HttpEndpointPools {
251 2 : pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
252 2 : self.http_pool_endpoints_registered_total.inc();
253 2 : HttpEndpointPoolsGuard {
254 2 : dec: &self.http_pool_endpoints_unregistered_total,
255 2 : }
256 2 : }
257 : }
258 : pub struct NumDbConnectionsGauge;
259 : impl CounterPairAssoc for NumDbConnectionsGauge {
260 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
261 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
262 : const INC_HELP: &'static str = "Number of opened connections to a database.";
263 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
264 : type LabelGroupSet = StaticLabelSet<Protocol>;
265 : }
266 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
267 :
268 : pub struct NumClientConnectionsGauge;
269 : impl CounterPairAssoc for NumClientConnectionsGauge {
270 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
271 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
272 : const INC_HELP: &'static str = "Number of opened connections from a client.";
273 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
274 : type LabelGroupSet = StaticLabelSet<Protocol>;
275 : }
276 : pub type NumClientConnectionsGuard<'a> =
277 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
278 :
279 : pub struct NumConnectionRequestsGauge;
280 : impl CounterPairAssoc for NumConnectionRequestsGauge {
281 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
282 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
283 : const INC_HELP: &'static str = "Number of client connections accepted.";
284 : const DEC_HELP: &'static str = "Number of client connections closed.";
285 : type LabelGroupSet = StaticLabelSet<Protocol>;
286 : }
287 : pub type NumConnectionRequestsGuard<'a> =
288 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
289 :
290 : pub struct CancelChannelSizeGauge;
291 : impl CounterPairAssoc for CancelChannelSizeGauge {
292 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
293 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
294 : const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
295 : const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
296 : type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
297 : }
298 : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
299 :
300 : #[derive(LabelGroup)]
301 : #[label(set = ComputeConnectionLatencySet)]
302 : pub struct ComputeConnectionLatencyGroup {
303 : protocol: Protocol,
304 : cold_start_info: ColdStartInfo,
305 : outcome: ConnectOutcome,
306 : excluded: LatencyExclusions,
307 : }
308 :
309 : #[derive(FixedCardinalityLabel, Copy, Clone)]
310 : pub enum LatencyExclusions {
311 : Client,
312 : ClientAndCplane,
313 : ClientCplaneCompute,
314 : ClientCplaneComputeRetry,
315 : }
316 :
317 : #[derive(LabelGroup)]
318 : #[label(set = SniSet)]
319 : pub struct SniGroup {
320 : pub protocol: Protocol,
321 : pub kind: SniKind,
322 : }
323 :
324 : #[derive(FixedCardinalityLabel, Copy, Clone)]
325 : pub enum SniKind {
326 : /// Domain name based routing. SNI for libpq/websockets. Host for HTTP
327 : Sni,
328 : /// Metadata based routing. `options` for libpq/websockets. Header for HTTP
329 : NoSni,
330 : /// Metadata based routing, using the password field.
331 : PasswordHack,
332 : }
333 :
334 : #[derive(FixedCardinalityLabel, Copy, Clone)]
335 : #[label(singleton = "kind")]
336 : pub enum ConnectionFailureKind {
337 : ComputeCached,
338 : ComputeUncached,
339 : }
340 :
341 : #[derive(LabelGroup)]
342 : #[label(set = ConnectionFailuresBreakdownSet)]
343 : pub struct ConnectionFailuresBreakdownGroup {
344 : pub kind: ErrorKind,
345 : pub retry: Bool,
346 : }
347 :
348 : #[derive(LabelGroup, Copy, Clone)]
349 : #[label(set = RedisErrorsSet)]
350 : pub struct RedisErrors<'a> {
351 : #[label(dynamic_with = ThreadedRodeo, default)]
352 : pub channel: &'a str,
353 : }
354 :
355 : #[derive(FixedCardinalityLabel, Copy, Clone)]
356 : pub enum CancellationOutcome {
357 : NotFound,
358 : Found,
359 : RateLimitExceeded,
360 : }
361 :
362 : #[derive(LabelGroup)]
363 : #[label(set = CancellationRequestSet)]
364 : pub struct CancellationRequest {
365 : pub kind: CancellationOutcome,
366 : }
367 :
368 : #[derive(Clone, Copy)]
369 : pub enum Waiting {
370 : Cplane,
371 : Client,
372 : Compute,
373 : RetryTimeout,
374 : }
375 :
376 : #[derive(FixedCardinalityLabel, Copy, Clone)]
377 : #[label(singleton = "kind")]
378 : #[allow(clippy::enum_variant_names)]
379 : pub enum RedisMsgKind {
380 : Set,
381 : Get,
382 : Expire,
383 : HGet,
384 : }
385 :
386 : #[derive(Default, Clone)]
387 : pub struct LatencyAccumulated {
388 : pub cplane: time::Duration,
389 : pub client: time::Duration,
390 : pub compute: time::Duration,
391 : pub retry: time::Duration,
392 : }
393 :
394 : impl std::fmt::Display for LatencyAccumulated {
395 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396 0 : write!(
397 0 : f,
398 0 : "client: {}, cplane: {}, compute: {}, retry: {}",
399 0 : self.client.as_micros(),
400 0 : self.cplane.as_micros(),
401 0 : self.compute.as_micros(),
402 0 : self.retry.as_micros()
403 : )
404 0 : }
405 : }
406 :
407 : pub struct LatencyTimer {
408 : // time since the stopwatch was started
409 : start: time::Instant,
410 : // time since the stopwatch was stopped
411 : stop: Option<time::Instant>,
412 : // accumulated time on the stopwatch
413 : accumulated: LatencyAccumulated,
414 : // label data
415 : protocol: Protocol,
416 : cold_start_info: ColdStartInfo,
417 : outcome: ConnectOutcome,
418 :
419 : skip_reporting: bool,
420 : }
421 :
422 : impl LatencyTimer {
423 77 : pub fn new(protocol: Protocol) -> Self {
424 77 : Self {
425 77 : start: time::Instant::now(),
426 77 : stop: None,
427 77 : accumulated: LatencyAccumulated::default(),
428 77 : protocol,
429 77 : cold_start_info: ColdStartInfo::Unknown,
430 77 : // assume failed unless otherwise specified
431 77 : outcome: ConnectOutcome::Failed,
432 77 : skip_reporting: false,
433 77 : }
434 77 : }
435 :
436 0 : pub(crate) fn noop(protocol: Protocol) -> Self {
437 0 : Self {
438 0 : start: time::Instant::now(),
439 0 : stop: None,
440 0 : accumulated: LatencyAccumulated::default(),
441 0 : protocol,
442 0 : cold_start_info: ColdStartInfo::Unknown,
443 0 : // assume failed unless otherwise specified
444 0 : outcome: ConnectOutcome::Failed,
445 0 : skip_reporting: true,
446 0 : }
447 0 : }
448 :
449 52 : pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
450 52 : let dur = start.elapsed();
451 52 : match waiting_for {
452 0 : Waiting::Cplane => self.accumulated.cplane += dur,
453 39 : Waiting::Client => self.accumulated.client += dur,
454 7 : Waiting::Compute => self.accumulated.compute += dur,
455 6 : Waiting::RetryTimeout => self.accumulated.retry += dur,
456 : }
457 52 : }
458 :
459 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
460 0 : self.cold_start_info = cold_start_info;
461 0 : }
462 :
463 8 : pub fn success(&mut self) {
464 : // stop the stopwatch and record the time that we have accumulated
465 8 : self.stop = Some(time::Instant::now());
466 :
467 : // success
468 8 : self.outcome = ConnectOutcome::Success;
469 8 : }
470 :
471 0 : pub fn accumulated(&self) -> LatencyAccumulated {
472 0 : self.accumulated.clone()
473 0 : }
474 : }
475 :
476 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
477 : pub enum ConnectOutcome {
478 : Success,
479 : Failed,
480 : }
481 :
482 : impl Drop for LatencyTimer {
483 77 : fn drop(&mut self) {
484 77 : if self.skip_reporting {
485 0 : return;
486 77 : }
487 :
488 77 : let duration = self
489 77 : .stop
490 77 : .unwrap_or_else(time::Instant::now)
491 77 : .duration_since(self.start);
492 :
493 77 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
494 :
495 : // Excluding client communication from the accumulated time.
496 77 : metric.observe(
497 77 : ComputeConnectionLatencyGroup {
498 77 : protocol: self.protocol,
499 77 : cold_start_info: self.cold_start_info,
500 77 : outcome: self.outcome,
501 77 : excluded: LatencyExclusions::Client,
502 77 : },
503 77 : duration
504 77 : .saturating_sub(self.accumulated.client)
505 77 : .as_secs_f64(),
506 : );
507 :
508 : // Exclude client and cplane communication from the accumulated time.
509 77 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
510 77 : metric.observe(
511 77 : ComputeConnectionLatencyGroup {
512 77 : protocol: self.protocol,
513 77 : cold_start_info: self.cold_start_info,
514 77 : outcome: self.outcome,
515 77 : excluded: LatencyExclusions::ClientAndCplane,
516 77 : },
517 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
518 : );
519 :
520 : // Exclude client, cplane, compute communication from the accumulated time.
521 77 : let accumulated_total =
522 77 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
523 77 : metric.observe(
524 77 : ComputeConnectionLatencyGroup {
525 77 : protocol: self.protocol,
526 77 : cold_start_info: self.cold_start_info,
527 77 : outcome: self.outcome,
528 77 : excluded: LatencyExclusions::ClientCplaneCompute,
529 77 : },
530 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
531 : );
532 :
533 : // Exclude client, cplane, compute, retry communication from the accumulated time.
534 77 : let accumulated_total = self.accumulated.client
535 77 : + self.accumulated.cplane
536 77 : + self.accumulated.compute
537 77 : + self.accumulated.retry;
538 77 : metric.observe(
539 77 : ComputeConnectionLatencyGroup {
540 77 : protocol: self.protocol,
541 77 : cold_start_info: self.cold_start_info,
542 77 : outcome: self.outcome,
543 77 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
544 77 : },
545 77 : duration.saturating_sub(accumulated_total).as_secs_f64(),
546 : );
547 77 : }
548 : }
549 :
550 : impl From<bool> for Bool {
551 3 : fn from(value: bool) -> Self {
552 3 : if value { Bool::True } else { Bool::False }
553 3 : }
554 : }
555 :
556 : #[derive(LabelGroup)]
557 : #[label(set = InvalidEndpointsSet)]
558 : pub struct InvalidEndpointsGroup {
559 : pub protocol: Protocol,
560 : pub rejected: Bool,
561 : pub outcome: ConnectOutcome,
562 : }
563 :
564 : #[derive(LabelGroup)]
565 : #[label(set = RetriesMetricSet)]
566 : pub struct RetriesMetricGroup {
567 : pub outcome: ConnectOutcome,
568 : pub retry_type: RetryType,
569 : }
570 :
571 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
572 : pub enum RetryType {
573 : WakeCompute,
574 : ConnectToCompute,
575 : }
576 :
577 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
578 : #[label(singleton = "event")]
579 : pub enum RedisEventsCount {
580 : EndpointCreated,
581 : BranchCreated,
582 : ProjectCreated,
583 : CancelSession,
584 : InvalidateRole,
585 : InvalidateEndpoint,
586 : InvalidateProject,
587 : InvalidateProjects,
588 : InvalidateOrg,
589 : }
590 :
591 : pub struct ThreadPoolWorkers(usize);
592 : #[derive(Copy, Clone)]
593 : pub struct ThreadPoolWorkerId(pub usize);
594 :
595 : impl LabelValue for ThreadPoolWorkerId {
596 0 : fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
597 0 : v.write_int(self.0 as i64)
598 0 : }
599 : }
600 :
601 : impl LabelGroup for ThreadPoolWorkerId {
602 0 : fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
603 0 : v.write_value(LabelName::from_str("worker"), self);
604 0 : }
605 : }
606 :
607 : impl LabelGroupSet for ThreadPoolWorkers {
608 : type Group<'a> = ThreadPoolWorkerId;
609 :
610 118 : fn cardinality(&self) -> Option<usize> {
611 118 : Some(self.0)
612 118 : }
613 :
614 5 : fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
615 5 : Some(value)
616 5 : }
617 :
618 0 : fn decode_dense(&self, value: usize) -> Self::Group<'_> {
619 0 : ThreadPoolWorkerId(value)
620 0 : }
621 :
622 : type Unique = usize;
623 :
624 5 : fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
625 5 : Some(value.0)
626 5 : }
627 :
628 0 : fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
629 0 : ThreadPoolWorkerId(*value)
630 0 : }
631 : }
632 :
633 : impl LabelSet for ThreadPoolWorkers {
634 : type Value<'a> = ThreadPoolWorkerId;
635 :
636 0 : fn dynamic_cardinality(&self) -> Option<usize> {
637 0 : Some(self.0)
638 0 : }
639 :
640 0 : fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
641 0 : (value.0 < self.0).then_some(value.0)
642 0 : }
643 :
644 0 : fn decode(&self, value: usize) -> Self::Value<'_> {
645 0 : ThreadPoolWorkerId(value)
646 0 : }
647 : }
648 :
649 : impl FixedCardinalitySet for ThreadPoolWorkers {
650 0 : fn cardinality(&self) -> usize {
651 0 : self.0
652 0 : }
653 : }
654 :
655 : #[derive(MetricGroup)]
656 : #[metric(new(workers: usize))]
657 : pub struct ThreadPoolMetrics {
658 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
659 : pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
660 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
661 : pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
662 : }
663 :
664 : #[derive(MetricGroup, Default)]
665 : pub struct ServiceMetrics {
666 : pub info: InfoMetric<ServiceInfo>,
667 : }
668 :
669 : #[derive(Default)]
670 : pub struct ServiceInfo {
671 : pub state: ServiceState,
672 : }
673 :
674 : impl ServiceInfo {
675 0 : pub const fn running() -> Self {
676 0 : ServiceInfo {
677 0 : state: ServiceState::Running,
678 0 : }
679 0 : }
680 :
681 0 : pub const fn terminating() -> Self {
682 0 : ServiceInfo {
683 0 : state: ServiceState::Terminating,
684 0 : }
685 0 : }
686 : }
687 :
688 : impl LabelGroup for ServiceInfo {
689 0 : fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
690 : const STATE: &LabelName = LabelName::from_str("state");
691 0 : v.write_value(STATE, &self.state);
692 0 : }
693 : }
694 :
695 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug, Default)]
696 : #[label(singleton = "state")]
697 : pub enum ServiceState {
698 : #[default]
699 : Init,
700 : Running,
701 : Terminating,
702 : }
703 :
704 : #[derive(MetricGroup)]
705 : #[metric(new())]
706 : pub struct CacheMetrics {
707 : /// The capacity of the cache
708 : pub capacity: GaugeVec<StaticLabelSet<CacheKind>>,
709 : /// The total number of entries inserted into the cache
710 : pub inserted_total: CounterVec<StaticLabelSet<CacheKind>>,
711 : /// The total number of entries removed from the cache
712 : pub evicted_total: CounterVec<CacheEvictionSet>,
713 : /// The total number of cache requests
714 : pub request_total: CounterVec<CacheOutcomeSet>,
715 : }
716 :
717 : impl Default for CacheMetrics {
718 53 : fn default() -> Self {
719 53 : Self::new()
720 53 : }
721 : }
722 :
723 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
724 : #[label(singleton = "cache")]
725 : pub enum CacheKind {
726 : NodeInfo,
727 : ProjectInfoEndpoints,
728 : ProjectInfoRoles,
729 : Schema,
730 : }
731 :
732 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
733 : pub enum CacheRemovalCause {
734 : Expired,
735 : Explicit,
736 : Replaced,
737 : Size,
738 : }
739 :
740 : #[derive(LabelGroup)]
741 : #[label(set = CacheEvictionSet)]
742 : pub struct CacheEviction {
743 : pub cache: CacheKind,
744 : pub cause: CacheRemovalCause,
745 : }
746 :
747 : #[derive(FixedCardinalityLabel, Copy, Clone)]
748 : pub enum CacheOutcome {
749 : Hit,
750 : Miss,
751 : }
752 :
753 : #[derive(LabelGroup)]
754 : #[label(set = CacheOutcomeSet)]
755 : pub struct CacheOutcomeGroup {
756 : pub cache: CacheKind,
757 : pub outcome: CacheOutcome,
758 : }
|