LCOV - code coverage report
Current view: top level - proxy/src - metrics.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 70.7 % 191 135
Test Date: 2025-01-30 15:18:43 Functions: 76.8 % 56 43

            Line data    Source code
       1              : use std::sync::{Arc, OnceLock};
       2              : 
       3              : use lasso::ThreadedRodeo;
       4              : use measured::label::{
       5              :     FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
       6              : };
       7              : use measured::metric::histogram::Thresholds;
       8              : use measured::metric::name::MetricName;
       9              : use measured::{
      10              :     Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
      11              :     MetricGroup,
      12              : };
      13              : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
      14              : use tokio::time::{self, Instant};
      15              : 
      16              : use crate::control_plane::messages::ColdStartInfo;
      17              : use crate::error::ErrorKind;
      18              : 
      19           44 : #[derive(MetricGroup)]
      20              : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
      21              : pub struct Metrics {
      22              :     #[metric(namespace = "proxy")]
      23              :     #[metric(init = ProxyMetrics::new(thread_pool))]
      24              :     pub proxy: ProxyMetrics,
      25              : 
      26              :     #[metric(namespace = "wake_compute_lock")]
      27              :     pub wake_compute_lock: ApiLockMetrics,
      28              : }
      29              : 
      30              : static SELF: OnceLock<Metrics> = OnceLock::new();
      31              : impl Metrics {
      32            0 :     pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
      33            0 :         SELF.set(Metrics::new(thread_pool))
      34            0 :             .ok()
      35            0 :             .expect("proxy metrics must not be installed more than once");
      36            0 :     }
      37              : 
      38          143 :     pub fn get() -> &'static Self {
      39          143 :         #[cfg(test)]
      40          143 :         return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
      41          143 : 
      42          143 :         #[cfg(not(test))]
      43          143 :         SELF.get()
      44          143 :             .expect("proxy metrics must be installed by the main() function")
      45          143 :     }
      46              : }
      47              : 
      48           44 : #[derive(MetricGroup)]
      49              : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
      50              : pub struct ProxyMetrics {
      51              :     #[metric(flatten)]
      52              :     pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
      53              :     #[metric(flatten)]
      54              :     pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
      55              :     #[metric(flatten)]
      56              :     pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
      57              :     #[metric(flatten)]
      58              :     pub http_endpoint_pools: HttpEndpointPools,
      59              :     #[metric(flatten)]
      60              :     pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
      61              : 
      62              :     /// Time it took for proxy to establish a connection to the compute endpoint.
      63              :     // largest bucket = 2^16 * 0.5ms = 32s
      64              :     #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
      65              :     pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
      66              : 
      67              :     /// Time it took for proxy to receive a response from control plane.
      68              :     #[metric(
      69              :         // largest bucket = 2^16 * 0.2ms = 13s
      70              :         metadata = Thresholds::exponential_buckets(0.0002, 2.0),
      71              :     )]
      72              :     pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
      73              : 
      74              :     /// Time it takes to acquire a token to call console plane.
      75              :     // largest bucket = 3^16 * 0.05ms = 2.15s
      76              :     #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
      77              :     pub control_plane_token_acquire_seconds: Histogram<16>,
      78              : 
      79              :     /// Size of the HTTP request body lengths.
      80              :     // smallest bucket = 16 bytes
      81              :     // largest bucket = 4^12 * 16 bytes = 256MB
      82              :     #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
      83              :     pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
      84              : 
      85              :     /// Time it takes to reclaim unused connection pools.
      86              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
      87              :     pub http_pool_reclaimation_lag_seconds: Histogram<16>,
      88              : 
      89              :     /// Number of opened connections to a database.
      90              :     pub http_pool_opened_connections: Gauge,
      91              : 
      92              :     /// Number of cache hits/misses for allowed ips.
      93              :     pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
      94              : 
      95              :     /// Number of allowed ips
      96              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
      97              :     pub allowed_ips_number: Histogram<10>,
      98              : 
      99              :     /// Number of connections (per sni).
     100              :     pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
     101              : 
     102              :     /// Number of connection failures (per kind).
     103              :     pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
     104              : 
     105              :     /// Number of wake-up failures (per kind).
     106              :     pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
     107              : 
     108              :     /// Number of bytes sent/received between all clients and backends.
     109              :     pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
     110              : 
     111              :     /// Number of errors by a given classification.
     112              :     pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
     113              : 
     114              :     /// Number of cancellation requests (per found/not_found).
     115              :     pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
     116              : 
     117              :     /// Number of errors by a given classification
     118              :     pub redis_errors_total: CounterVec<RedisErrorsSet>,
     119              : 
     120              :     /// Number of TLS handshake failures
     121              :     pub tls_handshake_failures: Counter,
     122              : 
     123              :     /// Number of connection requests affected by authentication rate limits
     124              :     pub requests_auth_rate_limits_total: Counter,
     125              : 
     126              :     /// HLL approximate cardinality of endpoints that are connecting
     127              :     pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
     128              : 
     129              :     /// Number of endpoints affected by errors of a given classification
     130              :     pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
     131              : 
     132              :     /// Number of endpoints affected by authentication rate limits
     133              :     pub endpoints_auth_rate_limits: HyperLogLog<32>,
     134              : 
     135              :     /// Number of invalid endpoints (per protocol, per rejected).
     136              :     pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
     137              : 
     138              :     /// Number of retries (per outcome, per retry_type).
     139              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
     140              :     pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
     141              : 
     142              :     /// Number of events consumed from redis (per event type).
     143              :     pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
     144              : 
     145              :     #[metric(namespace = "connect_compute_lock")]
     146              :     pub connect_compute_lock: ApiLockMetrics,
     147              : 
     148              :     #[metric(namespace = "scram_pool")]
     149              :     #[metric(init = thread_pool)]
     150              :     pub scram_pool: Arc<ThreadPoolMetrics>,
     151              : }
     152              : 
     153           88 : #[derive(MetricGroup)]
     154              : #[metric(new())]
     155              : pub struct ApiLockMetrics {
     156              :     /// Number of semaphores registered in this api lock
     157              :     pub semaphores_registered: Counter,
     158              :     /// Number of semaphores unregistered in this api lock
     159              :     pub semaphores_unregistered: Counter,
     160              :     /// Time it takes to reclaim unused semaphores in the api lock
     161              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
     162              :     pub reclamation_lag_seconds: Histogram<16>,
     163              :     /// Time it takes to acquire a semaphore lock
     164              :     #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
     165              :     pub semaphore_acquire_seconds: Histogram<16>,
     166              : }
     167              : 
     168              : impl Default for ApiLockMetrics {
     169           88 :     fn default() -> Self {
     170           88 :         Self::new()
     171           88 :     }
     172              : }
     173              : 
     174              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     175              : #[label(singleton = "direction")]
     176              : pub enum HttpDirection {
     177              :     Request,
     178              :     Response,
     179              : }
     180              : 
     181              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     182              : #[label(singleton = "direction")]
     183              : pub enum Direction {
     184              :     Tx,
     185              :     Rx,
     186              : }
     187              : 
     188              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     189              : #[label(singleton = "protocol")]
     190              : pub enum Protocol {
     191              :     Http,
     192              :     Ws,
     193              :     Tcp,
     194              :     SniRouter,
     195              : }
     196              : 
     197              : impl Protocol {
     198            0 :     pub fn as_str(&self) -> &'static str {
     199            0 :         match self {
     200            0 :             Protocol::Http => "http",
     201            0 :             Protocol::Ws => "ws",
     202            0 :             Protocol::Tcp => "tcp",
     203            0 :             Protocol::SniRouter => "sni_router",
     204              :         }
     205            0 :     }
     206              : }
     207              : 
     208              : impl std::fmt::Display for Protocol {
     209            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     210            0 :         f.write_str(self.as_str())
     211            0 :     }
     212              : }
     213              : 
     214              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     215              : pub enum Bool {
     216              :     True,
     217              :     False,
     218              : }
     219              : 
     220              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     221              : #[label(singleton = "outcome")]
     222              : pub enum Outcome {
     223              :     Success,
     224              :     Failed,
     225              : }
     226              : 
     227              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     228              : #[label(singleton = "outcome")]
     229              : pub enum CacheOutcome {
     230              :     Hit,
     231              :     Miss,
     232              : }
     233              : 
     234           88 : #[derive(LabelGroup)]
     235              : #[label(set = ConsoleRequestSet)]
     236              : pub struct ConsoleRequest<'a> {
     237              :     #[label(dynamic_with = ThreadedRodeo, default)]
     238              :     pub request: &'a str,
     239              : }
     240              : 
     241              : #[derive(MetricGroup, Default)]
     242              : pub struct HttpEndpointPools {
     243              :     /// Number of endpoints we have registered pools for
     244              :     pub http_pool_endpoints_registered_total: Counter,
     245              :     /// Number of endpoints we have unregistered pools for
     246              :     pub http_pool_endpoints_unregistered_total: Counter,
     247              : }
     248              : 
     249              : pub struct HttpEndpointPoolsGuard<'a> {
     250              :     dec: &'a Counter,
     251              : }
     252              : 
     253              : impl Drop for HttpEndpointPoolsGuard<'_> {
     254            2 :     fn drop(&mut self) {
     255            2 :         self.dec.inc();
     256            2 :     }
     257              : }
     258              : 
     259              : impl HttpEndpointPools {
     260            2 :     pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
     261            2 :         self.http_pool_endpoints_registered_total.inc();
     262            2 :         HttpEndpointPoolsGuard {
     263            2 :             dec: &self.http_pool_endpoints_unregistered_total,
     264            2 :         }
     265            2 :     }
     266              : }
     267              : pub struct NumDbConnectionsGauge;
     268              : impl CounterPairAssoc for NumDbConnectionsGauge {
     269              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
     270              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
     271              :     const INC_HELP: &'static str = "Number of opened connections to a database.";
     272              :     const DEC_HELP: &'static str = "Number of closed connections to a database.";
     273              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     274              : }
     275              : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
     276              : 
     277              : pub struct NumClientConnectionsGauge;
     278              : impl CounterPairAssoc for NumClientConnectionsGauge {
     279              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
     280              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
     281              :     const INC_HELP: &'static str = "Number of opened connections from a client.";
     282              :     const DEC_HELP: &'static str = "Number of closed connections from a client.";
     283              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     284              : }
     285              : pub type NumClientConnectionsGuard<'a> =
     286              :     metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
     287              : 
     288              : pub struct NumConnectionRequestsGauge;
     289              : impl CounterPairAssoc for NumConnectionRequestsGauge {
     290              :     const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
     291              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
     292              :     const INC_HELP: &'static str = "Number of client connections accepted.";
     293              :     const DEC_HELP: &'static str = "Number of client connections closed.";
     294              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     295              : }
     296              : pub type NumConnectionRequestsGuard<'a> =
     297              :     metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
     298              : 
     299              : pub struct CancelChannelSizeGauge;
     300              : impl CounterPairAssoc for CancelChannelSizeGauge {
     301              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
     302              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
     303              :     const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
     304              :     const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
     305              :     type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
     306              : }
     307              : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
     308              : 
     309          264 : #[derive(LabelGroup)]
     310              : #[label(set = ComputeConnectionLatencySet)]
     311              : pub struct ComputeConnectionLatencyGroup {
     312              :     protocol: Protocol,
     313              :     cold_start_info: ColdStartInfo,
     314              :     outcome: ConnectOutcome,
     315              :     excluded: LatencyExclusions,
     316              : }
     317              : 
     318              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     319              : pub enum LatencyExclusions {
     320              :     Client,
     321              :     ClientAndCplane,
     322              :     ClientCplaneCompute,
     323              :     ClientCplaneComputeRetry,
     324              : }
     325              : 
     326              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     327              : #[label(singleton = "kind")]
     328              : pub enum SniKind {
     329              :     Sni,
     330              :     NoSni,
     331              :     PasswordHack,
     332              : }
     333              : 
     334              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     335              : #[label(singleton = "kind")]
     336              : pub enum ConnectionFailureKind {
     337              :     ComputeCached,
     338              :     ComputeUncached,
     339              : }
     340              : 
     341          176 : #[derive(LabelGroup)]
     342              : #[label(set = ConnectionFailuresBreakdownSet)]
     343              : pub struct ConnectionFailuresBreakdownGroup {
     344              :     pub kind: ErrorKind,
     345              :     pub retry: Bool,
     346              : }
     347              : 
     348           88 : #[derive(LabelGroup, Copy, Clone)]
     349              : #[label(set = RedisErrorsSet)]
     350              : pub struct RedisErrors<'a> {
     351              :     #[label(dynamic_with = ThreadedRodeo, default)]
     352              :     pub channel: &'a str,
     353              : }
     354              : 
     355              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     356              : pub enum CancellationOutcome {
     357              :     NotFound,
     358              :     Found,
     359              :     RateLimitExceeded,
     360              : }
     361              : 
     362          132 : #[derive(LabelGroup)]
     363              : #[label(set = CancellationRequestSet)]
     364              : pub struct CancellationRequest {
     365              :     pub kind: CancellationOutcome,
     366              : }
     367              : 
     368              : #[derive(Clone, Copy)]
     369              : pub enum Waiting {
     370              :     Cplane,
     371              :     Client,
     372              :     Compute,
     373              :     RetryTimeout,
     374              : }
     375              : 
     376              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     377              : #[label(singleton = "kind")]
     378              : pub enum RedisMsgKind {
     379              :     HSet,
     380              :     HSetMultiple,
     381              :     HGet,
     382              :     HGetAll,
     383              :     HDel,
     384              : }
     385              : 
     386              : #[derive(Default)]
     387              : struct Accumulated {
     388              :     cplane: time::Duration,
     389              :     client: time::Duration,
     390              :     compute: time::Duration,
     391              :     retry: time::Duration,
     392              : }
     393              : 
     394              : pub struct LatencyTimer {
     395              :     // time since the stopwatch was started
     396              :     start: time::Instant,
     397              :     // time since the stopwatch was stopped
     398              :     stop: Option<time::Instant>,
     399              :     // accumulated time on the stopwatch
     400              :     accumulated: Accumulated,
     401              :     // label data
     402              :     protocol: Protocol,
     403              :     cold_start_info: ColdStartInfo,
     404              :     outcome: ConnectOutcome,
     405              : 
     406              :     skip_reporting: bool,
     407              : }
     408              : 
     409              : impl LatencyTimer {
     410           70 :     pub fn new(protocol: Protocol) -> Self {
     411           70 :         Self {
     412           70 :             start: time::Instant::now(),
     413           70 :             stop: None,
     414           70 :             accumulated: Accumulated::default(),
     415           70 :             protocol,
     416           70 :             cold_start_info: ColdStartInfo::Unknown,
     417           70 :             // assume failed unless otherwise specified
     418           70 :             outcome: ConnectOutcome::Failed,
     419           70 :             skip_reporting: false,
     420           70 :         }
     421           70 :     }
     422              : 
     423            0 :     pub(crate) fn noop(protocol: Protocol) -> Self {
     424            0 :         Self {
     425            0 :             start: time::Instant::now(),
     426            0 :             stop: None,
     427            0 :             accumulated: Accumulated::default(),
     428            0 :             protocol,
     429            0 :             cold_start_info: ColdStartInfo::Unknown,
     430            0 :             // assume failed unless otherwise specified
     431            0 :             outcome: ConnectOutcome::Failed,
     432            0 :             skip_reporting: true,
     433            0 :         }
     434            0 :     }
     435              : 
     436           28 :     pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
     437           28 :         let dur = start.elapsed();
     438           28 :         match waiting_for {
     439            0 :             Waiting::Cplane => self.accumulated.cplane += dur,
     440           15 :             Waiting::Client => self.accumulated.client += dur,
     441            7 :             Waiting::Compute => self.accumulated.compute += dur,
     442            6 :             Waiting::RetryTimeout => self.accumulated.retry += dur,
     443              :         }
     444           28 :     }
     445              : 
     446            0 :     pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
     447            0 :         self.cold_start_info = cold_start_info;
     448            0 :     }
     449              : 
     450            4 :     pub fn success(&mut self) {
     451            4 :         // stop the stopwatch and record the time that we have accumulated
     452            4 :         self.stop = Some(time::Instant::now());
     453            4 : 
     454            4 :         // success
     455            4 :         self.outcome = ConnectOutcome::Success;
     456            4 :     }
     457              : }
     458              : 
     459              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     460              : pub enum ConnectOutcome {
     461              :     Success,
     462              :     Failed,
     463              : }
     464              : 
     465              : impl Drop for LatencyTimer {
     466           70 :     fn drop(&mut self) {
     467           70 :         if self.skip_reporting {
     468            0 :             return;
     469           70 :         }
     470           70 : 
     471           70 :         let duration = self
     472           70 :             .stop
     473           70 :             .unwrap_or_else(time::Instant::now)
     474           70 :             .duration_since(self.start);
     475           70 : 
     476           70 :         let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
     477           70 : 
     478           70 :         // Excluding client communication from the accumulated time.
     479           70 :         metric.observe(
     480           70 :             ComputeConnectionLatencyGroup {
     481           70 :                 protocol: self.protocol,
     482           70 :                 cold_start_info: self.cold_start_info,
     483           70 :                 outcome: self.outcome,
     484           70 :                 excluded: LatencyExclusions::Client,
     485           70 :             },
     486           70 :             duration
     487           70 :                 .saturating_sub(self.accumulated.client)
     488           70 :                 .as_secs_f64(),
     489           70 :         );
     490           70 : 
     491           70 :         // Exclude client and cplane communication from the accumulated time.
     492           70 :         let accumulated_total = self.accumulated.client + self.accumulated.cplane;
     493           70 :         metric.observe(
     494           70 :             ComputeConnectionLatencyGroup {
     495           70 :                 protocol: self.protocol,
     496           70 :                 cold_start_info: self.cold_start_info,
     497           70 :                 outcome: self.outcome,
     498           70 :                 excluded: LatencyExclusions::ClientAndCplane,
     499           70 :             },
     500           70 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     501           70 :         );
     502           70 : 
     503           70 :         // Exclude client cplane, compue communication from the accumulated time.
     504           70 :         let accumulated_total =
     505           70 :             self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
     506           70 :         metric.observe(
     507           70 :             ComputeConnectionLatencyGroup {
     508           70 :                 protocol: self.protocol,
     509           70 :                 cold_start_info: self.cold_start_info,
     510           70 :                 outcome: self.outcome,
     511           70 :                 excluded: LatencyExclusions::ClientCplaneCompute,
     512           70 :             },
     513           70 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     514           70 :         );
     515           70 : 
     516           70 :         // Exclude client cplane, compue, retry communication from the accumulated time.
     517           70 :         let accumulated_total = self.accumulated.client
     518           70 :             + self.accumulated.cplane
     519           70 :             + self.accumulated.compute
     520           70 :             + self.accumulated.retry;
     521           70 :         metric.observe(
     522           70 :             ComputeConnectionLatencyGroup {
     523           70 :                 protocol: self.protocol,
     524           70 :                 cold_start_info: self.cold_start_info,
     525           70 :                 outcome: self.outcome,
     526           70 :                 excluded: LatencyExclusions::ClientCplaneComputeRetry,
     527           70 :             },
     528           70 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     529           70 :         );
     530           70 :     }
     531              : }
     532              : 
     533              : impl From<bool> for Bool {
     534            3 :     fn from(value: bool) -> Self {
     535            3 :         if value {
     536            2 :             Bool::True
     537              :         } else {
     538            1 :             Bool::False
     539              :         }
     540            3 :     }
     541              : }
     542              : 
     543          220 : #[derive(LabelGroup)]
     544              : #[label(set = InvalidEndpointsSet)]
     545              : pub struct InvalidEndpointsGroup {
     546              :     pub protocol: Protocol,
     547              :     pub rejected: Bool,
     548              :     pub outcome: ConnectOutcome,
     549              : }
     550              : 
     551          176 : #[derive(LabelGroup)]
     552              : #[label(set = RetriesMetricSet)]
     553              : pub struct RetriesMetricGroup {
     554              :     pub outcome: ConnectOutcome,
     555              :     pub retry_type: RetryType,
     556              : }
     557              : 
     558              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     559              : pub enum RetryType {
     560              :     WakeCompute,
     561              :     ConnectToCompute,
     562              : }
     563              : 
     564              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     565              : #[label(singleton = "event")]
     566              : pub enum RedisEventsCount {
     567              :     EndpointCreated,
     568              :     BranchCreated,
     569              :     ProjectCreated,
     570              :     CancelSession,
     571              :     PasswordUpdate,
     572              :     AllowedIpsUpdate,
     573              : }
     574              : 
     575              : pub struct ThreadPoolWorkers(usize);
     576              : #[derive(Copy, Clone)]
     577              : pub struct ThreadPoolWorkerId(pub usize);
     578              : 
     579              : impl LabelValue for ThreadPoolWorkerId {
     580            0 :     fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
     581            0 :         v.write_int(self.0 as i64)
     582            0 :     }
     583              : }
     584              : 
     585              : impl LabelGroup for ThreadPoolWorkerId {
     586            0 :     fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
     587            0 :         v.write_value(LabelName::from_str("worker"), self);
     588            0 :     }
     589              : }
     590              : 
     591              : impl LabelGroupSet for ThreadPoolWorkers {
     592              :     type Group<'a> = ThreadPoolWorkerId;
     593              : 
     594          100 :     fn cardinality(&self) -> Option<usize> {
     595          100 :         Some(self.0)
     596          100 :     }
     597              : 
     598            8 :     fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
     599            8 :         Some(value)
     600            8 :     }
     601              : 
     602            0 :     fn decode_dense(&self, value: usize) -> Self::Group<'_> {
     603            0 :         ThreadPoolWorkerId(value)
     604            0 :     }
     605              : 
     606              :     type Unique = usize;
     607              : 
     608            8 :     fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
     609            8 :         Some(value.0)
     610            8 :     }
     611              : 
     612            0 :     fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
     613            0 :         ThreadPoolWorkerId(*value)
     614            0 :     }
     615              : }
     616              : 
     617              : impl LabelSet for ThreadPoolWorkers {
     618              :     type Value<'a> = ThreadPoolWorkerId;
     619              : 
     620            0 :     fn dynamic_cardinality(&self) -> Option<usize> {
     621            0 :         Some(self.0)
     622            0 :     }
     623              : 
     624            0 :     fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
     625            0 :         (value.0 < self.0).then_some(value.0)
     626            0 :     }
     627              : 
     628            0 :     fn decode(&self, value: usize) -> Self::Value<'_> {
     629            0 :         ThreadPoolWorkerId(value)
     630            0 :     }
     631              : }
     632              : 
     633              : impl FixedCardinalitySet for ThreadPoolWorkers {
     634            0 :     fn cardinality(&self) -> usize {
     635            0 :         self.0
     636            0 :     }
     637              : }
     638              : 
     639           50 : #[derive(MetricGroup)]
     640              : #[metric(new(workers: usize))]
     641              : pub struct ThreadPoolMetrics {
     642              :     #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
     643              :     pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
     644              :     #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
     645              :     pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
     646              : }
        

Generated by: LCOV version 2.1-beta