Line data Source code
1 : use std::sync::{Arc, OnceLock};
2 :
3 : use lasso::ThreadedRodeo;
4 : use measured::label::{
5 : FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
6 : };
7 : use measured::metric::histogram::Thresholds;
8 : use measured::metric::name::MetricName;
9 : use measured::{
10 : Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
11 : MetricGroup,
12 : };
13 : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
14 : use tokio::time::{self, Instant};
15 :
16 : use crate::control_plane::messages::ColdStartInfo;
17 : use crate::error::ErrorKind;
18 :
19 44 : #[derive(MetricGroup)]
20 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
21 : pub struct Metrics {
22 : #[metric(namespace = "proxy")]
23 : #[metric(init = ProxyMetrics::new(thread_pool))]
24 : pub proxy: ProxyMetrics,
25 :
26 : #[metric(namespace = "wake_compute_lock")]
27 : pub wake_compute_lock: ApiLockMetrics,
28 : }
29 :
30 : static SELF: OnceLock<Metrics> = OnceLock::new();
31 : impl Metrics {
32 0 : pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
33 0 : SELF.set(Metrics::new(thread_pool))
34 0 : .ok()
35 0 : .expect("proxy metrics must not be installed more than once");
36 0 : }
37 :
38 143 : pub fn get() -> &'static Self {
39 143 : #[cfg(test)]
40 143 : return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
41 143 :
42 143 : #[cfg(not(test))]
43 143 : SELF.get()
44 143 : .expect("proxy metrics must be installed by the main() function")
45 143 : }
46 : }
47 :
48 44 : #[derive(MetricGroup)]
49 : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
50 : pub struct ProxyMetrics {
51 : #[metric(flatten)]
52 : pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
53 : #[metric(flatten)]
54 : pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
55 : #[metric(flatten)]
56 : pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
57 : #[metric(flatten)]
58 : pub http_endpoint_pools: HttpEndpointPools,
59 : #[metric(flatten)]
60 : pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
61 :
62 : /// Time it took for proxy to establish a connection to the compute endpoint.
63 : // largest bucket = 2^16 * 0.5ms = 32s
64 : #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
65 : pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
66 :
67 : /// Time it took for proxy to receive a response from control plane.
68 : #[metric(
69 : // largest bucket = 2^16 * 0.2ms = 13s
70 : metadata = Thresholds::exponential_buckets(0.0002, 2.0),
71 : )]
72 : pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
73 :
74 : /// Time it takes to acquire a token to call console plane.
75 : // largest bucket = 3^16 * 0.05ms = 2.15s
76 : #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
77 : pub control_plane_token_acquire_seconds: Histogram<16>,
78 :
79 : /// Size of the HTTP request body lengths.
80 : // smallest bucket = 16 bytes
81 : // largest bucket = 4^12 * 16 bytes = 256MB
82 : #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
83 : pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
84 :
85 : /// Time it takes to reclaim unused connection pools.
86 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
87 : pub http_pool_reclaimation_lag_seconds: Histogram<16>,
88 :
89 : /// Number of opened connections to a database.
90 : pub http_pool_opened_connections: Gauge,
91 :
92 : /// Number of cache hits/misses for allowed ips.
93 : pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
94 :
95 : /// Number of allowed ips
96 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
97 : pub allowed_ips_number: Histogram<10>,
98 :
99 : /// Number of cache hits/misses for VPC endpoint IDs.
100 : pub vpc_endpoint_id_cache_stats: CounterVec<StaticLabelSet<CacheOutcome>>,
101 :
102 : /// Number of cache hits/misses for access blocker flags.
103 : pub access_blocker_flags_cache_stats: CounterVec<StaticLabelSet<CacheOutcome>>,
104 :
105 : /// Number of allowed VPC endpoints IDs
106 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
107 : pub allowed_vpc_endpoint_ids: Histogram<10>,
108 :
109 : /// Number of connections (per sni).
110 : pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
111 :
112 : /// Number of connection failures (per kind).
113 : pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
114 :
115 : /// Number of wake-up failures (per kind).
116 : pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
117 :
118 : /// Number of bytes sent/received between all clients and backends.
119 : pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
120 :
121 : /// Number of errors by a given classification.
122 : pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
123 :
124 : /// Number of cancellation requests (per found/not_found).
125 : pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
126 :
127 : /// Number of errors by a given classification
128 : pub redis_errors_total: CounterVec<RedisErrorsSet>,
129 :
130 : /// Number of TLS handshake failures
131 : pub tls_handshake_failures: Counter,
132 :
133 : /// Number of connection requests affected by authentication rate limits
134 : pub requests_auth_rate_limits_total: Counter,
135 :
136 : /// HLL approximate cardinality of endpoints that are connecting
137 : pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
138 :
139 : /// Number of endpoints affected by errors of a given classification
140 : pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
141 :
142 : /// Number of endpoints affected by authentication rate limits
143 : pub endpoints_auth_rate_limits: HyperLogLog<32>,
144 :
145 : /// Number of invalid endpoints (per protocol, per rejected).
146 : pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
147 :
148 : /// Number of retries (per outcome, per retry_type).
149 : #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
150 : pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
151 :
152 : /// Number of events consumed from redis (per event type).
153 : pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
154 :
155 : #[metric(namespace = "connect_compute_lock")]
156 : pub connect_compute_lock: ApiLockMetrics,
157 :
158 : #[metric(namespace = "scram_pool")]
159 : #[metric(init = thread_pool)]
160 : pub scram_pool: Arc<ThreadPoolMetrics>,
161 : }
162 :
163 88 : #[derive(MetricGroup)]
164 : #[metric(new())]
165 : pub struct ApiLockMetrics {
166 : /// Number of semaphores registered in this api lock
167 : pub semaphores_registered: Counter,
168 : /// Number of semaphores unregistered in this api lock
169 : pub semaphores_unregistered: Counter,
170 : /// Time it takes to reclaim unused semaphores in the api lock
171 : #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
172 : pub reclamation_lag_seconds: Histogram<16>,
173 : /// Time it takes to acquire a semaphore lock
174 : #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
175 : pub semaphore_acquire_seconds: Histogram<16>,
176 : }
177 :
178 : impl Default for ApiLockMetrics {
179 88 : fn default() -> Self {
180 88 : Self::new()
181 88 : }
182 : }
183 :
184 : #[derive(FixedCardinalityLabel, Copy, Clone)]
185 : #[label(singleton = "direction")]
186 : pub enum HttpDirection {
187 : Request,
188 : Response,
189 : }
190 :
191 : #[derive(FixedCardinalityLabel, Copy, Clone)]
192 : #[label(singleton = "direction")]
193 : pub enum Direction {
194 : Tx,
195 : Rx,
196 : }
197 :
198 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
199 : #[label(singleton = "protocol")]
200 : pub enum Protocol {
201 : Http,
202 : Ws,
203 : Tcp,
204 : SniRouter,
205 : }
206 :
207 : impl Protocol {
208 0 : pub fn as_str(self) -> &'static str {
209 0 : match self {
210 0 : Protocol::Http => "http",
211 0 : Protocol::Ws => "ws",
212 0 : Protocol::Tcp => "tcp",
213 0 : Protocol::SniRouter => "sni_router",
214 : }
215 0 : }
216 : }
217 :
218 : impl std::fmt::Display for Protocol {
219 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220 0 : f.write_str(self.as_str())
221 0 : }
222 : }
223 :
224 : #[derive(FixedCardinalityLabel, Copy, Clone)]
225 : pub enum Bool {
226 : True,
227 : False,
228 : }
229 :
230 : #[derive(FixedCardinalityLabel, Copy, Clone)]
231 : #[label(singleton = "outcome")]
232 : pub enum Outcome {
233 : Success,
234 : Failed,
235 : }
236 :
237 : #[derive(FixedCardinalityLabel, Copy, Clone)]
238 : #[label(singleton = "outcome")]
239 : pub enum CacheOutcome {
240 : Hit,
241 : Miss,
242 : }
243 :
244 88 : #[derive(LabelGroup)]
245 : #[label(set = ConsoleRequestSet)]
246 : pub struct ConsoleRequest<'a> {
247 : #[label(dynamic_with = ThreadedRodeo, default)]
248 : pub request: &'a str,
249 : }
250 :
251 : #[derive(MetricGroup, Default)]
252 : pub struct HttpEndpointPools {
253 : /// Number of endpoints we have registered pools for
254 : pub http_pool_endpoints_registered_total: Counter,
255 : /// Number of endpoints we have unregistered pools for
256 : pub http_pool_endpoints_unregistered_total: Counter,
257 : }
258 :
259 : pub struct HttpEndpointPoolsGuard<'a> {
260 : dec: &'a Counter,
261 : }
262 :
263 : impl Drop for HttpEndpointPoolsGuard<'_> {
264 2 : fn drop(&mut self) {
265 2 : self.dec.inc();
266 2 : }
267 : }
268 :
269 : impl HttpEndpointPools {
270 2 : pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
271 2 : self.http_pool_endpoints_registered_total.inc();
272 2 : HttpEndpointPoolsGuard {
273 2 : dec: &self.http_pool_endpoints_unregistered_total,
274 2 : }
275 2 : }
276 : }
277 : pub struct NumDbConnectionsGauge;
278 : impl CounterPairAssoc for NumDbConnectionsGauge {
279 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
280 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
281 : const INC_HELP: &'static str = "Number of opened connections to a database.";
282 : const DEC_HELP: &'static str = "Number of closed connections to a database.";
283 : type LabelGroupSet = StaticLabelSet<Protocol>;
284 : }
285 : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
286 :
287 : pub struct NumClientConnectionsGauge;
288 : impl CounterPairAssoc for NumClientConnectionsGauge {
289 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
290 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
291 : const INC_HELP: &'static str = "Number of opened connections from a client.";
292 : const DEC_HELP: &'static str = "Number of closed connections from a client.";
293 : type LabelGroupSet = StaticLabelSet<Protocol>;
294 : }
295 : pub type NumClientConnectionsGuard<'a> =
296 : metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
297 :
298 : pub struct NumConnectionRequestsGauge;
299 : impl CounterPairAssoc for NumConnectionRequestsGauge {
300 : const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
301 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
302 : const INC_HELP: &'static str = "Number of client connections accepted.";
303 : const DEC_HELP: &'static str = "Number of client connections closed.";
304 : type LabelGroupSet = StaticLabelSet<Protocol>;
305 : }
306 : pub type NumConnectionRequestsGuard<'a> =
307 : metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
308 :
309 : pub struct CancelChannelSizeGauge;
310 : impl CounterPairAssoc for CancelChannelSizeGauge {
311 : const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
312 : const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
313 : const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
314 : const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
315 : type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
316 : }
317 : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
318 :
319 264 : #[derive(LabelGroup)]
320 : #[label(set = ComputeConnectionLatencySet)]
321 : pub struct ComputeConnectionLatencyGroup {
322 : protocol: Protocol,
323 : cold_start_info: ColdStartInfo,
324 : outcome: ConnectOutcome,
325 : excluded: LatencyExclusions,
326 : }
327 :
328 : #[derive(FixedCardinalityLabel, Copy, Clone)]
329 : pub enum LatencyExclusions {
330 : Client,
331 : ClientAndCplane,
332 : ClientCplaneCompute,
333 : ClientCplaneComputeRetry,
334 : }
335 :
336 : #[derive(FixedCardinalityLabel, Copy, Clone)]
337 : #[label(singleton = "kind")]
338 : pub enum SniKind {
339 : Sni,
340 : NoSni,
341 : PasswordHack,
342 : }
343 :
344 : #[derive(FixedCardinalityLabel, Copy, Clone)]
345 : #[label(singleton = "kind")]
346 : pub enum ConnectionFailureKind {
347 : ComputeCached,
348 : ComputeUncached,
349 : }
350 :
351 176 : #[derive(LabelGroup)]
352 : #[label(set = ConnectionFailuresBreakdownSet)]
353 : pub struct ConnectionFailuresBreakdownGroup {
354 : pub kind: ErrorKind,
355 : pub retry: Bool,
356 : }
357 :
358 88 : #[derive(LabelGroup, Copy, Clone)]
359 : #[label(set = RedisErrorsSet)]
360 : pub struct RedisErrors<'a> {
361 : #[label(dynamic_with = ThreadedRodeo, default)]
362 : pub channel: &'a str,
363 : }
364 :
365 : #[derive(FixedCardinalityLabel, Copy, Clone)]
366 : pub enum CancellationOutcome {
367 : NotFound,
368 : Found,
369 : RateLimitExceeded,
370 : }
371 :
372 132 : #[derive(LabelGroup)]
373 : #[label(set = CancellationRequestSet)]
374 : pub struct CancellationRequest {
375 : pub kind: CancellationOutcome,
376 : }
377 :
378 : #[derive(Clone, Copy)]
379 : pub enum Waiting {
380 : Cplane,
381 : Client,
382 : Compute,
383 : RetryTimeout,
384 : }
385 :
386 : #[derive(FixedCardinalityLabel, Copy, Clone)]
387 : #[label(singleton = "kind")]
388 : #[allow(clippy::enum_variant_names)]
389 : pub enum RedisMsgKind {
390 : HSet,
391 : HSetMultiple,
392 : HGet,
393 : HGetAll,
394 : HDel,
395 : }
396 :
397 : #[derive(Default, Clone)]
398 : pub struct LatencyAccumulated {
399 : cplane: time::Duration,
400 : client: time::Duration,
401 : compute: time::Duration,
402 : retry: time::Duration,
403 : }
404 :
405 : impl std::fmt::Display for LatencyAccumulated {
406 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
407 0 : write!(
408 0 : f,
409 0 : "client: {}, cplane: {}, compute: {}, retry: {}",
410 0 : self.client.as_micros(),
411 0 : self.cplane.as_micros(),
412 0 : self.compute.as_micros(),
413 0 : self.retry.as_micros()
414 0 : )
415 0 : }
416 : }
417 :
418 : pub struct LatencyTimer {
419 : // time since the stopwatch was started
420 : start: time::Instant,
421 : // time since the stopwatch was stopped
422 : stop: Option<time::Instant>,
423 : // accumulated time on the stopwatch
424 : accumulated: LatencyAccumulated,
425 : // label data
426 : protocol: Protocol,
427 : cold_start_info: ColdStartInfo,
428 : outcome: ConnectOutcome,
429 :
430 : skip_reporting: bool,
431 : }
432 :
433 : impl LatencyTimer {
434 70 : pub fn new(protocol: Protocol) -> Self {
435 70 : Self {
436 70 : start: time::Instant::now(),
437 70 : stop: None,
438 70 : accumulated: LatencyAccumulated::default(),
439 70 : protocol,
440 70 : cold_start_info: ColdStartInfo::Unknown,
441 70 : // assume failed unless otherwise specified
442 70 : outcome: ConnectOutcome::Failed,
443 70 : skip_reporting: false,
444 70 : }
445 70 : }
446 :
447 0 : pub(crate) fn noop(protocol: Protocol) -> Self {
448 0 : Self {
449 0 : start: time::Instant::now(),
450 0 : stop: None,
451 0 : accumulated: LatencyAccumulated::default(),
452 0 : protocol,
453 0 : cold_start_info: ColdStartInfo::Unknown,
454 0 : // assume failed unless otherwise specified
455 0 : outcome: ConnectOutcome::Failed,
456 0 : skip_reporting: true,
457 0 : }
458 0 : }
459 :
460 28 : pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
461 28 : let dur = start.elapsed();
462 28 : match waiting_for {
463 0 : Waiting::Cplane => self.accumulated.cplane += dur,
464 15 : Waiting::Client => self.accumulated.client += dur,
465 7 : Waiting::Compute => self.accumulated.compute += dur,
466 6 : Waiting::RetryTimeout => self.accumulated.retry += dur,
467 : }
468 28 : }
469 :
470 0 : pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
471 0 : self.cold_start_info = cold_start_info;
472 0 : }
473 :
474 4 : pub fn success(&mut self) {
475 4 : // stop the stopwatch and record the time that we have accumulated
476 4 : self.stop = Some(time::Instant::now());
477 4 :
478 4 : // success
479 4 : self.outcome = ConnectOutcome::Success;
480 4 : }
481 :
482 0 : pub fn accumulated(&self) -> LatencyAccumulated {
483 0 : self.accumulated.clone()
484 0 : }
485 : }
486 :
487 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
488 : pub enum ConnectOutcome {
489 : Success,
490 : Failed,
491 : }
492 :
493 : impl Drop for LatencyTimer {
494 70 : fn drop(&mut self) {
495 70 : if self.skip_reporting {
496 0 : return;
497 70 : }
498 70 :
499 70 : let duration = self
500 70 : .stop
501 70 : .unwrap_or_else(time::Instant::now)
502 70 : .duration_since(self.start);
503 70 :
504 70 : let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
505 70 :
506 70 : // Excluding client communication from the accumulated time.
507 70 : metric.observe(
508 70 : ComputeConnectionLatencyGroup {
509 70 : protocol: self.protocol,
510 70 : cold_start_info: self.cold_start_info,
511 70 : outcome: self.outcome,
512 70 : excluded: LatencyExclusions::Client,
513 70 : },
514 70 : duration
515 70 : .saturating_sub(self.accumulated.client)
516 70 : .as_secs_f64(),
517 70 : );
518 70 :
519 70 : // Exclude client and cplane communication from the accumulated time.
520 70 : let accumulated_total = self.accumulated.client + self.accumulated.cplane;
521 70 : metric.observe(
522 70 : ComputeConnectionLatencyGroup {
523 70 : protocol: self.protocol,
524 70 : cold_start_info: self.cold_start_info,
525 70 : outcome: self.outcome,
526 70 : excluded: LatencyExclusions::ClientAndCplane,
527 70 : },
528 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
529 70 : );
530 70 :
531 70 : // Exclude client, cplane, compute communication from the accumulated time.
532 70 : let accumulated_total =
533 70 : self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
534 70 : metric.observe(
535 70 : ComputeConnectionLatencyGroup {
536 70 : protocol: self.protocol,
537 70 : cold_start_info: self.cold_start_info,
538 70 : outcome: self.outcome,
539 70 : excluded: LatencyExclusions::ClientCplaneCompute,
540 70 : },
541 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
542 70 : );
543 70 :
544 70 : // Exclude client, cplane, compute, retry communication from the accumulated time.
545 70 : let accumulated_total = self.accumulated.client
546 70 : + self.accumulated.cplane
547 70 : + self.accumulated.compute
548 70 : + self.accumulated.retry;
549 70 : metric.observe(
550 70 : ComputeConnectionLatencyGroup {
551 70 : protocol: self.protocol,
552 70 : cold_start_info: self.cold_start_info,
553 70 : outcome: self.outcome,
554 70 : excluded: LatencyExclusions::ClientCplaneComputeRetry,
555 70 : },
556 70 : duration.saturating_sub(accumulated_total).as_secs_f64(),
557 70 : );
558 70 : }
559 : }
560 :
561 : impl From<bool> for Bool {
562 3 : fn from(value: bool) -> Self {
563 3 : if value { Bool::True } else { Bool::False }
564 3 : }
565 : }
566 :
567 220 : #[derive(LabelGroup)]
568 : #[label(set = InvalidEndpointsSet)]
569 : pub struct InvalidEndpointsGroup {
570 : pub protocol: Protocol,
571 : pub rejected: Bool,
572 : pub outcome: ConnectOutcome,
573 : }
574 :
575 176 : #[derive(LabelGroup)]
576 : #[label(set = RetriesMetricSet)]
577 : pub struct RetriesMetricGroup {
578 : pub outcome: ConnectOutcome,
579 : pub retry_type: RetryType,
580 : }
581 :
582 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
583 : pub enum RetryType {
584 : WakeCompute,
585 : ConnectToCompute,
586 : }
587 :
588 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
589 : #[label(singleton = "event")]
590 : pub enum RedisEventsCount {
591 : EndpointCreated,
592 : BranchCreated,
593 : ProjectCreated,
594 : CancelSession,
595 : PasswordUpdate,
596 : AllowedIpsUpdate,
597 : AllowedVpcEndpointIdsUpdateForProjects,
598 : AllowedVpcEndpointIdsUpdateForAllProjectsInOrg,
599 : BlockPublicOrVpcAccessUpdate,
600 : }
601 :
602 : pub struct ThreadPoolWorkers(usize);
603 : #[derive(Copy, Clone)]
604 : pub struct ThreadPoolWorkerId(pub usize);
605 :
606 : impl LabelValue for ThreadPoolWorkerId {
607 0 : fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
608 0 : v.write_int(self.0 as i64)
609 0 : }
610 : }
611 :
612 : impl LabelGroup for ThreadPoolWorkerId {
613 0 : fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
614 0 : v.write_value(LabelName::from_str("worker"), self);
615 0 : }
616 : }
617 :
618 : impl LabelGroupSet for ThreadPoolWorkers {
619 : type Group<'a> = ThreadPoolWorkerId;
620 :
621 100 : fn cardinality(&self) -> Option<usize> {
622 100 : Some(self.0)
623 100 : }
624 :
625 10 : fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
626 10 : Some(value)
627 10 : }
628 :
629 0 : fn decode_dense(&self, value: usize) -> Self::Group<'_> {
630 0 : ThreadPoolWorkerId(value)
631 0 : }
632 :
633 : type Unique = usize;
634 :
635 10 : fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
636 10 : Some(value.0)
637 10 : }
638 :
639 0 : fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
640 0 : ThreadPoolWorkerId(*value)
641 0 : }
642 : }
643 :
644 : impl LabelSet for ThreadPoolWorkers {
645 : type Value<'a> = ThreadPoolWorkerId;
646 :
647 0 : fn dynamic_cardinality(&self) -> Option<usize> {
648 0 : Some(self.0)
649 0 : }
650 :
651 0 : fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
652 0 : (value.0 < self.0).then_some(value.0)
653 0 : }
654 :
655 0 : fn decode(&self, value: usize) -> Self::Value<'_> {
656 0 : ThreadPoolWorkerId(value)
657 0 : }
658 : }
659 :
660 : impl FixedCardinalitySet for ThreadPoolWorkers {
661 0 : fn cardinality(&self) -> usize {
662 0 : self.0
663 0 : }
664 : }
665 :
666 50 : #[derive(MetricGroup)]
667 : #[metric(new(workers: usize))]
668 : pub struct ThreadPoolMetrics {
669 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
670 : pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
671 : #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
672 : pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
673 : }
|