LCOV - code coverage report
Current view: top level - proxy/src - metrics.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 56.6 % 189 107
Test Date: 2025-07-26 17:20:05 Functions: 48.4 % 31 15

            Line data    Source code
       1              : use std::sync::{Arc, OnceLock};
       2              : 
       3              : use lasso::ThreadedRodeo;
       4              : use measured::label::{
       5              :     FixedCardinalitySet, LabelGroupSet, LabelGroupVisitor, LabelName, LabelSet, LabelValue,
       6              :     StaticLabelSet,
       7              : };
       8              : use measured::metric::histogram::Thresholds;
       9              : use measured::metric::name::MetricName;
      10              : use measured::{
      11              :     Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
      12              :     MetricGroup,
      13              : };
      14              : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
      15              : use tokio::time::{self, Instant};
      16              : 
      17              : use crate::control_plane::messages::ColdStartInfo;
      18              : use crate::error::ErrorKind;
      19              : 
      20              : #[derive(MetricGroup)]
      21              : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
      22              : pub struct Metrics {
      23              :     #[metric(namespace = "proxy")]
      24              :     #[metric(init = ProxyMetrics::new(thread_pool))]
      25              :     pub proxy: ProxyMetrics,
      26              : 
      27              :     #[metric(namespace = "wake_compute_lock")]
      28              :     pub wake_compute_lock: ApiLockMetrics,
      29              : 
      30              :     #[metric(namespace = "service")]
      31              :     pub service: ServiceMetrics,
      32              : }
      33              : 
      34              : static SELF: OnceLock<Metrics> = OnceLock::new();
      35              : impl Metrics {
      36            0 :     pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
      37            0 :         let mut metrics = Metrics::new(thread_pool);
      38              : 
      39            0 :         metrics.proxy.errors_total.init_all_dense();
      40            0 :         metrics.proxy.redis_errors_total.init_all_dense();
      41            0 :         metrics.proxy.redis_events_count.init_all_dense();
      42            0 :         metrics.proxy.retries_metric.init_all_dense();
      43            0 :         metrics.proxy.connection_failures_total.init_all_dense();
      44              : 
      45            0 :         SELF.set(metrics)
      46            0 :             .ok()
      47            0 :             .expect("proxy metrics must not be installed more than once");
      48            0 :     }
      49              : 
      50          169 :     pub fn get() -> &'static Self {
      51              :         #[cfg(test)]
      52          169 :         return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
      53              : 
      54              :         #[cfg(not(test))]
      55            0 :         SELF.get()
      56            0 :             .expect("proxy metrics must be installed by the main() function")
      57          169 :     }
      58              : }
      59              : 
      60              : #[derive(MetricGroup)]
      61              : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
      62              : pub struct ProxyMetrics {
      63              :     #[metric(flatten)]
      64              :     pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
      65              :     #[metric(flatten)]
      66              :     pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
      67              :     #[metric(flatten)]
      68              :     pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
      69              :     #[metric(flatten)]
      70              :     pub http_endpoint_pools: HttpEndpointPools,
      71              :     #[metric(flatten)]
      72              :     pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
      73              : 
      74              :     /// Time it took for proxy to establish a connection to the compute endpoint.
      75              :     // largest bucket = 2^16 * 0.5ms = 32s
      76              :     #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
      77              :     pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
      78              : 
      79              :     /// Time it took for proxy to receive a response from control plane.
      80              :     #[metric(
      81              :         // largest bucket = 2^16 * 0.2ms = 13s
      82              :         metadata = Thresholds::exponential_buckets(0.0002, 2.0),
      83              :     )]
      84              :     pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
      85              : 
      86              :     /// Size of the HTTP request body lengths.
      87              :     // smallest bucket = 16 bytes
      88              :     // largest bucket = 4^12 * 16 bytes = 256MB
      89              :     #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
      90              :     pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
      91              : 
      92              :     /// Time it takes to reclaim unused connection pools.
      93              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
      94              :     pub http_pool_reclaimation_lag_seconds: Histogram<16>,
      95              : 
      96              :     /// Number of opened connections to a database.
      97              :     pub http_pool_opened_connections: Gauge,
      98              : 
      99              :     /// Number of allowed ips
     100              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
     101              :     pub allowed_ips_number: Histogram<10>,
     102              : 
     103              :     /// Number of allowed VPC endpoints IDs
     104              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
     105              :     pub allowed_vpc_endpoint_ids: Histogram<10>,
     106              : 
     107              :     /// Number of connections, by the method we used to determine the endpoint.
     108              :     pub accepted_connections_by_sni: CounterVec<SniSet>,
     109              : 
     110              :     /// Number of connection failures (per kind).
     111              :     pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
     112              : 
     113              :     /// Number of wake-up failures (per kind).
     114              :     pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
     115              : 
     116              :     /// Number of bytes sent/received between all clients and backends.
     117              :     pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
     118              : 
     119              :     /// Number of IO errors while logging.
     120              :     pub logging_errors_count: Counter,
     121              : 
     122              :     /// Number of errors by a given classification.
     123              :     pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
     124              : 
     125              :     /// Number of cancellation requests (per found/not_found).
     126              :     pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
     127              : 
     128              :     /// Number of errors by a given classification
     129              :     pub redis_errors_total: CounterVec<RedisErrorsSet>,
     130              : 
     131              :     /// Number of TLS handshake failures
     132              :     pub tls_handshake_failures: Counter,
     133              : 
     134              :     /// HLL approximate cardinality of endpoints that are connecting
     135              :     pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
     136              : 
     137              :     /// Number of endpoints affected by errors of a given classification
     138              :     pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
     139              : 
     140              :     /// Number of retries (per outcome, per retry_type).
     141              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
     142              :     pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
     143              : 
     144              :     /// Number of events consumed from redis (per event type).
     145              :     pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
     146              : 
     147              :     #[metric(namespace = "connect_compute_lock")]
     148              :     pub connect_compute_lock: ApiLockMetrics,
     149              : 
     150              :     #[metric(namespace = "scram_pool")]
     151              :     #[metric(init = thread_pool)]
     152              :     pub scram_pool: Arc<ThreadPoolMetrics>,
     153              : }
     154              : 
     155              : #[derive(MetricGroup)]
     156              : #[metric(new())]
     157              : pub struct ApiLockMetrics {
     158              :     /// Number of semaphores registered in this api lock
     159              :     pub semaphores_registered: Counter,
     160              :     /// Number of semaphores unregistered in this api lock
     161              :     pub semaphores_unregistered: Counter,
     162              :     /// Time it takes to reclaim unused semaphores in the api lock
     163              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
     164              :     pub reclamation_lag_seconds: Histogram<16>,
     165              :     /// Time it takes to acquire a semaphore lock
     166              :     #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
     167              :     pub semaphore_acquire_seconds: Histogram<16>,
     168              : }
     169              : 
     170              : impl Default for ApiLockMetrics {
     171          102 :     fn default() -> Self {
     172          102 :         Self::new()
     173          102 :     }
     174              : }
     175              : 
     176              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     177              : #[label(singleton = "direction")]
     178              : pub enum HttpDirection {
     179              :     Request,
     180              :     Response,
     181              : }
     182              : 
     183              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     184              : #[label(singleton = "direction")]
     185              : pub enum Direction {
     186              :     Tx,
     187              :     Rx,
     188              : }
     189              : 
     190              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     191              : #[label(singleton = "protocol")]
     192              : pub enum Protocol {
     193              :     Http,
     194              :     Ws,
     195              :     Tcp,
     196              :     SniRouter,
     197              : }
     198              : 
     199              : impl Protocol {
     200            6 :     pub fn as_str(self) -> &'static str {
     201            6 :         match self {
     202            0 :             Protocol::Http => "http",
     203            0 :             Protocol::Ws => "ws",
     204            6 :             Protocol::Tcp => "tcp",
     205            0 :             Protocol::SniRouter => "sni_router",
     206              :         }
     207            6 :     }
     208              : }
     209              : 
     210              : impl std::fmt::Display for Protocol {
     211            6 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     212            6 :         f.write_str(self.as_str())
     213            6 :     }
     214              : }
     215              : 
     216              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     217              : pub enum Bool {
     218              :     True,
     219              :     False,
     220              : }
     221              : 
     222              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     223              : #[label(singleton = "outcome")]
     224              : pub enum CacheOutcome {
     225              :     Hit,
     226              :     Miss,
     227              : }
     228              : 
     229              : #[derive(LabelGroup)]
     230              : #[label(set = ConsoleRequestSet)]
     231              : pub struct ConsoleRequest<'a> {
     232              :     #[label(dynamic_with = ThreadedRodeo, default)]
     233              :     pub request: &'a str,
     234              : }
     235              : 
     236              : #[derive(MetricGroup, Default)]
     237              : pub struct HttpEndpointPools {
     238              :     /// Number of endpoints we have registered pools for
     239              :     pub http_pool_endpoints_registered_total: Counter,
     240              :     /// Number of endpoints we have unregistered pools for
     241              :     pub http_pool_endpoints_unregistered_total: Counter,
     242              : }
     243              : 
     244              : pub struct HttpEndpointPoolsGuard<'a> {
     245              :     dec: &'a Counter,
     246              : }
     247              : 
     248              : impl Drop for HttpEndpointPoolsGuard<'_> {
     249            2 :     fn drop(&mut self) {
     250            2 :         self.dec.inc();
     251            2 :     }
     252              : }
     253              : 
     254              : impl HttpEndpointPools {
     255            2 :     pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
     256            2 :         self.http_pool_endpoints_registered_total.inc();
     257            2 :         HttpEndpointPoolsGuard {
     258            2 :             dec: &self.http_pool_endpoints_unregistered_total,
     259            2 :         }
     260            2 :     }
     261              : }
     262              : pub struct NumDbConnectionsGauge;
     263              : impl CounterPairAssoc for NumDbConnectionsGauge {
     264              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
     265              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
     266              :     const INC_HELP: &'static str = "Number of opened connections to a database.";
     267              :     const DEC_HELP: &'static str = "Number of closed connections to a database.";
     268              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     269              : }
     270              : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
     271              : 
     272              : pub struct NumClientConnectionsGauge;
     273              : impl CounterPairAssoc for NumClientConnectionsGauge {
     274              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
     275              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
     276              :     const INC_HELP: &'static str = "Number of opened connections from a client.";
     277              :     const DEC_HELP: &'static str = "Number of closed connections from a client.";
     278              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     279              : }
     280              : pub type NumClientConnectionsGuard<'a> =
     281              :     metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
     282              : 
     283              : pub struct NumConnectionRequestsGauge;
     284              : impl CounterPairAssoc for NumConnectionRequestsGauge {
     285              :     const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
     286              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
     287              :     const INC_HELP: &'static str = "Number of client connections accepted.";
     288              :     const DEC_HELP: &'static str = "Number of client connections closed.";
     289              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     290              : }
     291              : pub type NumConnectionRequestsGuard<'a> =
     292              :     metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
     293              : 
     294              : pub struct CancelChannelSizeGauge;
     295              : impl CounterPairAssoc for CancelChannelSizeGauge {
     296              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
     297              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
     298              :     const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
     299              :     const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
     300              :     type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
     301              : }
     302              : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
     303              : 
     304              : #[derive(LabelGroup)]
     305              : #[label(set = ComputeConnectionLatencySet)]
     306              : pub struct ComputeConnectionLatencyGroup {
     307              :     protocol: Protocol,
     308              :     cold_start_info: ColdStartInfo,
     309              :     outcome: ConnectOutcome,
     310              :     excluded: LatencyExclusions,
     311              : }
     312              : 
     313              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     314              : pub enum LatencyExclusions {
     315              :     Client,
     316              :     ClientAndCplane,
     317              :     ClientCplaneCompute,
     318              :     ClientCplaneComputeRetry,
     319              : }
     320              : 
     321              : #[derive(LabelGroup)]
     322              : #[label(set = SniSet)]
     323              : pub struct SniGroup {
     324              :     pub protocol: Protocol,
     325              :     pub kind: SniKind,
     326              : }
     327              : 
     328              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     329              : pub enum SniKind {
     330              :     /// Domain name based routing. SNI for libpq/websockets. Host for HTTP
     331              :     Sni,
     332              :     /// Metadata based routing. `options` for libpq/websockets. Header for HTTP
     333              :     NoSni,
     334              :     /// Metadata based routing, using the password field.
     335              :     PasswordHack,
     336              : }
     337              : 
     338              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     339              : #[label(singleton = "kind")]
     340              : pub enum ConnectionFailureKind {
     341              :     ComputeCached,
     342              :     ComputeUncached,
     343              : }
     344              : 
     345              : #[derive(LabelGroup)]
     346              : #[label(set = ConnectionFailuresBreakdownSet)]
     347              : pub struct ConnectionFailuresBreakdownGroup {
     348              :     pub kind: ErrorKind,
     349              :     pub retry: Bool,
     350              : }
     351              : 
     352              : #[derive(LabelGroup, Copy, Clone)]
     353              : #[label(set = RedisErrorsSet)]
     354              : pub struct RedisErrors<'a> {
     355              :     #[label(dynamic_with = ThreadedRodeo, default)]
     356              :     pub channel: &'a str,
     357              : }
     358              : 
     359              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     360              : pub enum CancellationOutcome {
     361              :     NotFound,
     362              :     Found,
     363              :     RateLimitExceeded,
     364              : }
     365              : 
     366              : #[derive(LabelGroup)]
     367              : #[label(set = CancellationRequestSet)]
     368              : pub struct CancellationRequest {
     369              :     pub kind: CancellationOutcome,
     370              : }
     371              : 
     372              : #[derive(Clone, Copy)]
     373              : pub enum Waiting {
     374              :     Cplane,
     375              :     Client,
     376              :     Compute,
     377              :     RetryTimeout,
     378              : }
     379              : 
     380              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     381              : #[label(singleton = "kind")]
     382              : #[allow(clippy::enum_variant_names)]
     383              : pub enum RedisMsgKind {
     384              :     Set,
     385              :     Get,
     386              :     Expire,
     387              :     HGet,
     388              : }
     389              : 
     390              : #[derive(Default, Clone)]
     391              : pub struct LatencyAccumulated {
     392              :     pub cplane: time::Duration,
     393              :     pub client: time::Duration,
     394              :     pub compute: time::Duration,
     395              :     pub retry: time::Duration,
     396              : }
     397              : 
     398              : impl std::fmt::Display for LatencyAccumulated {
     399            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     400            0 :         write!(
     401            0 :             f,
     402            0 :             "client: {}, cplane: {}, compute: {}, retry: {}",
     403            0 :             self.client.as_micros(),
     404            0 :             self.cplane.as_micros(),
     405            0 :             self.compute.as_micros(),
     406            0 :             self.retry.as_micros()
     407              :         )
     408            0 :     }
     409              : }
     410              : 
     411              : pub struct LatencyTimer {
     412              :     // time since the stopwatch was started
     413              :     start: time::Instant,
     414              :     // time since the stopwatch was stopped
     415              :     stop: Option<time::Instant>,
     416              :     // accumulated time on the stopwatch
     417              :     accumulated: LatencyAccumulated,
     418              :     // label data
     419              :     protocol: Protocol,
     420              :     cold_start_info: ColdStartInfo,
     421              :     outcome: ConnectOutcome,
     422              : 
     423              :     skip_reporting: bool,
     424              : }
     425              : 
     426              : impl LatencyTimer {
     427           77 :     pub fn new(protocol: Protocol) -> Self {
     428           77 :         Self {
     429           77 :             start: time::Instant::now(),
     430           77 :             stop: None,
     431           77 :             accumulated: LatencyAccumulated::default(),
     432           77 :             protocol,
     433           77 :             cold_start_info: ColdStartInfo::Unknown,
     434           77 :             // assume failed unless otherwise specified
     435           77 :             outcome: ConnectOutcome::Failed,
     436           77 :             skip_reporting: false,
     437           77 :         }
     438           77 :     }
     439              : 
     440            0 :     pub(crate) fn noop(protocol: Protocol) -> Self {
     441            0 :         Self {
     442            0 :             start: time::Instant::now(),
     443            0 :             stop: None,
     444            0 :             accumulated: LatencyAccumulated::default(),
     445            0 :             protocol,
     446            0 :             cold_start_info: ColdStartInfo::Unknown,
     447            0 :             // assume failed unless otherwise specified
     448            0 :             outcome: ConnectOutcome::Failed,
     449            0 :             skip_reporting: true,
     450            0 :         }
     451            0 :     }
     452              : 
     453           52 :     pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
     454           52 :         let dur = start.elapsed();
     455           52 :         match waiting_for {
     456            0 :             Waiting::Cplane => self.accumulated.cplane += dur,
     457           39 :             Waiting::Client => self.accumulated.client += dur,
     458            7 :             Waiting::Compute => self.accumulated.compute += dur,
     459            6 :             Waiting::RetryTimeout => self.accumulated.retry += dur,
     460              :         }
     461           52 :     }
     462              : 
     463            0 :     pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
     464            0 :         self.cold_start_info = cold_start_info;
     465            0 :     }
     466              : 
     467            8 :     pub fn success(&mut self) {
     468              :         // stop the stopwatch and record the time that we have accumulated
     469            8 :         self.stop = Some(time::Instant::now());
     470              : 
     471              :         // success
     472            8 :         self.outcome = ConnectOutcome::Success;
     473            8 :     }
     474              : 
     475            0 :     pub fn accumulated(&self) -> LatencyAccumulated {
     476            0 :         self.accumulated.clone()
     477            0 :     }
     478              : }
     479              : 
     480              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     481              : pub enum ConnectOutcome {
     482              :     Success,
     483              :     Failed,
     484              : }
     485              : 
     486              : impl Drop for LatencyTimer {
     487           77 :     fn drop(&mut self) {
     488           77 :         if self.skip_reporting {
     489            0 :             return;
     490           77 :         }
     491              : 
     492           77 :         let duration = self
     493           77 :             .stop
     494           77 :             .unwrap_or_else(time::Instant::now)
     495           77 :             .duration_since(self.start);
     496              : 
     497           77 :         let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
     498              : 
     499              :         // Excluding client communication from the accumulated time.
     500           77 :         metric.observe(
     501           77 :             ComputeConnectionLatencyGroup {
     502           77 :                 protocol: self.protocol,
     503           77 :                 cold_start_info: self.cold_start_info,
     504           77 :                 outcome: self.outcome,
     505           77 :                 excluded: LatencyExclusions::Client,
     506           77 :             },
     507           77 :             duration
     508           77 :                 .saturating_sub(self.accumulated.client)
     509           77 :                 .as_secs_f64(),
     510              :         );
     511              : 
     512              :         // Exclude client and cplane communication from the accumulated time.
     513           77 :         let accumulated_total = self.accumulated.client + self.accumulated.cplane;
     514           77 :         metric.observe(
     515           77 :             ComputeConnectionLatencyGroup {
     516           77 :                 protocol: self.protocol,
     517           77 :                 cold_start_info: self.cold_start_info,
     518           77 :                 outcome: self.outcome,
     519           77 :                 excluded: LatencyExclusions::ClientAndCplane,
     520           77 :             },
     521           77 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     522              :         );
     523              : 
     524              :         // Exclude client, cplane, compute communication from the accumulated time.
     525           77 :         let accumulated_total =
     526           77 :             self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
     527           77 :         metric.observe(
     528           77 :             ComputeConnectionLatencyGroup {
     529           77 :                 protocol: self.protocol,
     530           77 :                 cold_start_info: self.cold_start_info,
     531           77 :                 outcome: self.outcome,
     532           77 :                 excluded: LatencyExclusions::ClientCplaneCompute,
     533           77 :             },
     534           77 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     535              :         );
     536              : 
     537              :         // Exclude client, cplane, compute, retry communication from the accumulated time.
     538           77 :         let accumulated_total = self.accumulated.client
     539           77 :             + self.accumulated.cplane
     540           77 :             + self.accumulated.compute
     541           77 :             + self.accumulated.retry;
     542           77 :         metric.observe(
     543           77 :             ComputeConnectionLatencyGroup {
     544           77 :                 protocol: self.protocol,
     545           77 :                 cold_start_info: self.cold_start_info,
     546           77 :                 outcome: self.outcome,
     547           77 :                 excluded: LatencyExclusions::ClientCplaneComputeRetry,
     548           77 :             },
     549           77 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     550              :         );
     551           77 :     }
     552              : }
     553              : 
     554              : impl From<bool> for Bool {
     555            3 :     fn from(value: bool) -> Self {
     556            3 :         if value { Bool::True } else { Bool::False }
     557            3 :     }
     558              : }
     559              : 
     560              : #[derive(LabelGroup)]
     561              : #[label(set = InvalidEndpointsSet)]
     562              : pub struct InvalidEndpointsGroup {
     563              :     pub protocol: Protocol,
     564              :     pub rejected: Bool,
     565              :     pub outcome: ConnectOutcome,
     566              : }
     567              : 
     568              : #[derive(LabelGroup)]
     569              : #[label(set = RetriesMetricSet)]
     570              : pub struct RetriesMetricGroup {
     571              :     pub outcome: ConnectOutcome,
     572              :     pub retry_type: RetryType,
     573              : }
     574              : 
     575              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     576              : pub enum RetryType {
     577              :     WakeCompute,
     578              :     ConnectToCompute,
     579              : }
     580              : 
     581              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     582              : #[label(singleton = "event")]
     583              : pub enum RedisEventsCount {
     584              :     EndpointCreated,
     585              :     BranchCreated,
     586              :     ProjectCreated,
     587              :     CancelSession,
     588              :     InvalidateRole,
     589              :     InvalidateEndpoint,
     590              :     InvalidateProject,
     591              :     InvalidateProjects,
     592              :     InvalidateOrg,
     593              : }
     594              : 
     595              : pub struct ThreadPoolWorkers(usize);
     596              : #[derive(Copy, Clone)]
     597              : pub struct ThreadPoolWorkerId(pub usize);
     598              : 
     599              : impl LabelValue for ThreadPoolWorkerId {
     600            0 :     fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
     601            0 :         v.write_int(self.0 as i64)
     602            0 :     }
     603              : }
     604              : 
     605              : impl LabelGroup for ThreadPoolWorkerId {
     606            0 :     fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
     607            0 :         v.write_value(LabelName::from_str("worker"), self);
     608            0 :     }
     609              : }
     610              : 
     611              : impl LabelGroupSet for ThreadPoolWorkers {
     612              :     type Group<'a> = ThreadPoolWorkerId;
     613              : 
     614          114 :     fn cardinality(&self) -> Option<usize> {
     615          114 :         Some(self.0)
     616          114 :     }
     617              : 
     618            5 :     fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
     619            5 :         Some(value)
     620            5 :     }
     621              : 
     622            0 :     fn decode_dense(&self, value: usize) -> Self::Group<'_> {
     623            0 :         ThreadPoolWorkerId(value)
     624            0 :     }
     625              : 
     626              :     type Unique = usize;
     627              : 
     628            5 :     fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
     629            5 :         Some(value.0)
     630            5 :     }
     631              : 
     632            0 :     fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
     633            0 :         ThreadPoolWorkerId(*value)
     634            0 :     }
     635              : }
     636              : 
     637              : impl LabelSet for ThreadPoolWorkers {
     638              :     type Value<'a> = ThreadPoolWorkerId;
     639              : 
     640            0 :     fn dynamic_cardinality(&self) -> Option<usize> {
     641            0 :         Some(self.0)
     642            0 :     }
     643              : 
     644            0 :     fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
     645            0 :         (value.0 < self.0).then_some(value.0)
     646            0 :     }
     647              : 
     648            0 :     fn decode(&self, value: usize) -> Self::Value<'_> {
     649            0 :         ThreadPoolWorkerId(value)
     650            0 :     }
     651              : }
     652              : 
     653              : impl FixedCardinalitySet for ThreadPoolWorkers {
     654            0 :     fn cardinality(&self) -> usize {
     655            0 :         self.0
     656            0 :     }
     657              : }
     658              : 
     659              : #[derive(MetricGroup)]
     660              : #[metric(new(workers: usize))]
     661              : pub struct ThreadPoolMetrics {
     662              :     #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
     663              :     pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
     664              :     #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
     665              :     pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
     666              : }
     667              : 
     668              : #[derive(MetricGroup, Default)]
     669              : pub struct ServiceMetrics {
     670              :     pub info: InfoMetric<ServiceInfo>,
     671              : }
     672              : 
     673              : #[derive(Default)]
     674              : pub struct ServiceInfo {
     675              :     pub state: ServiceState,
     676              : }
     677              : 
     678              : impl ServiceInfo {
     679            0 :     pub const fn running() -> Self {
     680            0 :         ServiceInfo {
     681            0 :             state: ServiceState::Running,
     682            0 :         }
     683            0 :     }
     684              : 
     685            0 :     pub const fn terminating() -> Self {
     686            0 :         ServiceInfo {
     687            0 :             state: ServiceState::Terminating,
     688            0 :         }
     689            0 :     }
     690              : }
     691              : 
     692              : impl LabelGroup for ServiceInfo {
     693            0 :     fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
     694              :         const STATE: &LabelName = LabelName::from_str("state");
     695            0 :         v.write_value(STATE, &self.state);
     696            0 :     }
     697              : }
     698              : 
     699              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug, Default)]
     700              : #[label(singleton = "state")]
     701              : pub enum ServiceState {
     702              :     #[default]
     703              :     Init,
     704              :     Running,
     705              :     Terminating,
     706              : }
        

Generated by: LCOV version 2.1-beta