LCOV - code coverage report
Current view: top level - proxy/src - metrics.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 60.8 % 176 107
Test Date: 2025-07-22 17:50:06 Functions: 53.6 % 28 15

            Line data    Source code
       1              : use std::sync::{Arc, OnceLock};
       2              : 
       3              : use lasso::ThreadedRodeo;
       4              : use measured::label::{
       5              :     FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
       6              : };
       7              : use measured::metric::histogram::Thresholds;
       8              : use measured::metric::name::MetricName;
       9              : use measured::{
      10              :     Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
      11              :     MetricGroup,
      12              : };
      13              : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec};
      14              : use tokio::time::{self, Instant};
      15              : 
      16              : use crate::control_plane::messages::ColdStartInfo;
      17              : use crate::error::ErrorKind;
      18              : 
      19              : #[derive(MetricGroup)]
      20              : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
      21              : pub struct Metrics {
      22              :     #[metric(namespace = "proxy")]
      23              :     #[metric(init = ProxyMetrics::new(thread_pool))]
      24              :     pub proxy: ProxyMetrics,
      25              : 
      26              :     #[metric(namespace = "wake_compute_lock")]
      27              :     pub wake_compute_lock: ApiLockMetrics,
      28              : }
      29              : 
      30              : static SELF: OnceLock<Metrics> = OnceLock::new();
      31              : impl Metrics {
      32            0 :     pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
      33            0 :         let mut metrics = Metrics::new(thread_pool);
      34              : 
      35            0 :         metrics.proxy.errors_total.init_all_dense();
      36            0 :         metrics.proxy.redis_errors_total.init_all_dense();
      37            0 :         metrics.proxy.redis_events_count.init_all_dense();
      38            0 :         metrics.proxy.retries_metric.init_all_dense();
      39            0 :         metrics.proxy.connection_failures_total.init_all_dense();
      40              : 
      41            0 :         SELF.set(metrics)
      42            0 :             .ok()
      43            0 :             .expect("proxy metrics must not be installed more than once");
      44            0 :     }
      45              : 
      46          169 :     pub fn get() -> &'static Self {
      47              :         #[cfg(test)]
      48          169 :         return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
      49              : 
      50              :         #[cfg(not(test))]
      51            0 :         SELF.get()
      52            0 :             .expect("proxy metrics must be installed by the main() function")
      53          169 :     }
      54              : }
      55              : 
      56              : #[derive(MetricGroup)]
      57              : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
      58              : pub struct ProxyMetrics {
      59              :     #[metric(flatten)]
      60              :     pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
      61              :     #[metric(flatten)]
      62              :     pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
      63              :     #[metric(flatten)]
      64              :     pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
      65              :     #[metric(flatten)]
      66              :     pub http_endpoint_pools: HttpEndpointPools,
      67              :     #[metric(flatten)]
      68              :     pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
      69              : 
      70              :     /// Time it took for proxy to establish a connection to the compute endpoint.
      71              :     // largest bucket = 2^16 * 0.5ms = 32s
      72              :     #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
      73              :     pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
      74              : 
      75              :     /// Time it took for proxy to receive a response from control plane.
      76              :     #[metric(
      77              :         // largest bucket = 2^16 * 0.2ms = 13s
      78              :         metadata = Thresholds::exponential_buckets(0.0002, 2.0),
      79              :     )]
      80              :     pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
      81              : 
      82              :     /// Size of the HTTP request body lengths.
      83              :     // smallest bucket = 16 bytes
      84              :     // largest bucket = 4^12 * 16 bytes = 256MB
      85              :     #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
      86              :     pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
      87              : 
      88              :     /// Time it takes to reclaim unused connection pools.
      89              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
      90              :     pub http_pool_reclaimation_lag_seconds: Histogram<16>,
      91              : 
      92              :     /// Number of opened connections to a database.
      93              :     pub http_pool_opened_connections: Gauge,
      94              : 
      95              :     /// Number of allowed ips
      96              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
      97              :     pub allowed_ips_number: Histogram<10>,
      98              : 
      99              :     /// Number of allowed VPC endpoints IDs
     100              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
     101              :     pub allowed_vpc_endpoint_ids: Histogram<10>,
     102              : 
     103              :     /// Number of connections, by the method we used to determine the endpoint.
     104              :     pub accepted_connections_by_sni: CounterVec<SniSet>,
     105              : 
     106              :     /// Number of connection failures (per kind).
     107              :     pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
     108              : 
     109              :     /// Number of wake-up failures (per kind).
     110              :     pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
     111              : 
     112              :     /// Number of bytes sent/received between all clients and backends.
     113              :     pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
     114              : 
     115              :     /// Number of IO errors while logging.
     116              :     pub logging_errors_count: Counter,
     117              : 
     118              :     /// Number of errors by a given classification.
     119              :     pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
     120              : 
     121              :     /// Number of cancellation requests (per found/not_found).
     122              :     pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
     123              : 
     124              :     /// Number of errors by a given classification
     125              :     pub redis_errors_total: CounterVec<RedisErrorsSet>,
     126              : 
     127              :     /// Number of TLS handshake failures
     128              :     pub tls_handshake_failures: Counter,
     129              : 
     130              :     /// HLL approximate cardinality of endpoints that are connecting
     131              :     pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
     132              : 
     133              :     /// Number of endpoints affected by errors of a given classification
     134              :     pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
     135              : 
     136              :     /// Number of retries (per outcome, per retry_type).
     137              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
     138              :     pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
     139              : 
     140              :     /// Number of events consumed from redis (per event type).
     141              :     pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
     142              : 
     143              :     #[metric(namespace = "connect_compute_lock")]
     144              :     pub connect_compute_lock: ApiLockMetrics,
     145              : 
     146              :     #[metric(namespace = "scram_pool")]
     147              :     #[metric(init = thread_pool)]
     148              :     pub scram_pool: Arc<ThreadPoolMetrics>,
     149              : }
     150              : 
     151              : #[derive(MetricGroup)]
     152              : #[metric(new())]
     153              : pub struct ApiLockMetrics {
     154              :     /// Number of semaphores registered in this api lock
     155              :     pub semaphores_registered: Counter,
     156              :     /// Number of semaphores unregistered in this api lock
     157              :     pub semaphores_unregistered: Counter,
     158              :     /// Time it takes to reclaim unused semaphores in the api lock
     159              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
     160              :     pub reclamation_lag_seconds: Histogram<16>,
     161              :     /// Time it takes to acquire a semaphore lock
     162              :     #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
     163              :     pub semaphore_acquire_seconds: Histogram<16>,
     164              : }
     165              : 
     166              : impl Default for ApiLockMetrics {
     167          102 :     fn default() -> Self {
     168          102 :         Self::new()
     169          102 :     }
     170              : }
     171              : 
     172              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     173              : #[label(singleton = "direction")]
     174              : pub enum HttpDirection {
     175              :     Request,
     176              :     Response,
     177              : }
     178              : 
     179              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     180              : #[label(singleton = "direction")]
     181              : pub enum Direction {
     182              :     Tx,
     183              :     Rx,
     184              : }
     185              : 
     186              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     187              : #[label(singleton = "protocol")]
     188              : pub enum Protocol {
     189              :     Http,
     190              :     Ws,
     191              :     Tcp,
     192              :     SniRouter,
     193              : }
     194              : 
     195              : impl Protocol {
     196            6 :     pub fn as_str(self) -> &'static str {
     197            6 :         match self {
     198            0 :             Protocol::Http => "http",
     199            0 :             Protocol::Ws => "ws",
     200            6 :             Protocol::Tcp => "tcp",
     201            0 :             Protocol::SniRouter => "sni_router",
     202              :         }
     203            6 :     }
     204              : }
     205              : 
     206              : impl std::fmt::Display for Protocol {
     207            6 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     208            6 :         f.write_str(self.as_str())
     209            6 :     }
     210              : }
     211              : 
     212              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     213              : pub enum Bool {
     214              :     True,
     215              :     False,
     216              : }
     217              : 
     218              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     219              : #[label(singleton = "outcome")]
     220              : pub enum CacheOutcome {
     221              :     Hit,
     222              :     Miss,
     223              : }
     224              : 
     225              : #[derive(LabelGroup)]
     226              : #[label(set = ConsoleRequestSet)]
     227              : pub struct ConsoleRequest<'a> {
     228              :     #[label(dynamic_with = ThreadedRodeo, default)]
     229              :     pub request: &'a str,
     230              : }
     231              : 
     232              : #[derive(MetricGroup, Default)]
     233              : pub struct HttpEndpointPools {
     234              :     /// Number of endpoints we have registered pools for
     235              :     pub http_pool_endpoints_registered_total: Counter,
     236              :     /// Number of endpoints we have unregistered pools for
     237              :     pub http_pool_endpoints_unregistered_total: Counter,
     238              : }
     239              : 
     240              : pub struct HttpEndpointPoolsGuard<'a> {
     241              :     dec: &'a Counter,
     242              : }
     243              : 
     244              : impl Drop for HttpEndpointPoolsGuard<'_> {
     245            2 :     fn drop(&mut self) {
     246            2 :         self.dec.inc();
     247            2 :     }
     248              : }
     249              : 
     250              : impl HttpEndpointPools {
     251            2 :     pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
     252            2 :         self.http_pool_endpoints_registered_total.inc();
     253            2 :         HttpEndpointPoolsGuard {
     254            2 :             dec: &self.http_pool_endpoints_unregistered_total,
     255            2 :         }
     256            2 :     }
     257              : }
     258              : pub struct NumDbConnectionsGauge;
     259              : impl CounterPairAssoc for NumDbConnectionsGauge {
     260              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
     261              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
     262              :     const INC_HELP: &'static str = "Number of opened connections to a database.";
     263              :     const DEC_HELP: &'static str = "Number of closed connections to a database.";
     264              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     265              : }
     266              : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
     267              : 
     268              : pub struct NumClientConnectionsGauge;
     269              : impl CounterPairAssoc for NumClientConnectionsGauge {
     270              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
     271              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
     272              :     const INC_HELP: &'static str = "Number of opened connections from a client.";
     273              :     const DEC_HELP: &'static str = "Number of closed connections from a client.";
     274              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     275              : }
     276              : pub type NumClientConnectionsGuard<'a> =
     277              :     metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
     278              : 
     279              : pub struct NumConnectionRequestsGauge;
     280              : impl CounterPairAssoc for NumConnectionRequestsGauge {
     281              :     const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
     282              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
     283              :     const INC_HELP: &'static str = "Number of client connections accepted.";
     284              :     const DEC_HELP: &'static str = "Number of client connections closed.";
     285              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     286              : }
     287              : pub type NumConnectionRequestsGuard<'a> =
     288              :     metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
     289              : 
     290              : pub struct CancelChannelSizeGauge;
     291              : impl CounterPairAssoc for CancelChannelSizeGauge {
     292              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
     293              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
     294              :     const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
     295              :     const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
     296              :     type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
     297              : }
     298              : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
     299              : 
     300              : #[derive(LabelGroup)]
     301              : #[label(set = ComputeConnectionLatencySet)]
     302              : pub struct ComputeConnectionLatencyGroup {
     303              :     protocol: Protocol,
     304              :     cold_start_info: ColdStartInfo,
     305              :     outcome: ConnectOutcome,
     306              :     excluded: LatencyExclusions,
     307              : }
     308              : 
     309              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     310              : pub enum LatencyExclusions {
     311              :     Client,
     312              :     ClientAndCplane,
     313              :     ClientCplaneCompute,
     314              :     ClientCplaneComputeRetry,
     315              : }
     316              : 
     317              : #[derive(LabelGroup)]
     318              : #[label(set = SniSet)]
     319              : pub struct SniGroup {
     320              :     pub protocol: Protocol,
     321              :     pub kind: SniKind,
     322              : }
     323              : 
     324              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     325              : pub enum SniKind {
     326              :     /// Domain name based routing. SNI for libpq/websockets. Host for HTTP
     327              :     Sni,
     328              :     /// Metadata based routing. `options` for libpq/websockets. Header for HTTP
     329              :     NoSni,
     330              :     /// Metadata based routing, using the password field.
     331              :     PasswordHack,
     332              : }
     333              : 
     334              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     335              : #[label(singleton = "kind")]
     336              : pub enum ConnectionFailureKind {
     337              :     ComputeCached,
     338              :     ComputeUncached,
     339              : }
     340              : 
     341              : #[derive(LabelGroup)]
     342              : #[label(set = ConnectionFailuresBreakdownSet)]
     343              : pub struct ConnectionFailuresBreakdownGroup {
     344              :     pub kind: ErrorKind,
     345              :     pub retry: Bool,
     346              : }
     347              : 
     348              : #[derive(LabelGroup, Copy, Clone)]
     349              : #[label(set = RedisErrorsSet)]
     350              : pub struct RedisErrors<'a> {
     351              :     #[label(dynamic_with = ThreadedRodeo, default)]
     352              :     pub channel: &'a str,
     353              : }
     354              : 
     355              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     356              : pub enum CancellationOutcome {
     357              :     NotFound,
     358              :     Found,
     359              :     RateLimitExceeded,
     360              : }
     361              : 
     362              : #[derive(LabelGroup)]
     363              : #[label(set = CancellationRequestSet)]
     364              : pub struct CancellationRequest {
     365              :     pub kind: CancellationOutcome,
     366              : }
     367              : 
     368              : #[derive(Clone, Copy)]
     369              : pub enum Waiting {
     370              :     Cplane,
     371              :     Client,
     372              :     Compute,
     373              :     RetryTimeout,
     374              : }
     375              : 
     376              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     377              : #[label(singleton = "kind")]
     378              : #[allow(clippy::enum_variant_names)]
     379              : pub enum RedisMsgKind {
     380              :     Set,
     381              :     Get,
     382              :     Expire,
     383              :     HGet,
     384              : }
     385              : 
     386              : #[derive(Default, Clone)]
     387              : pub struct LatencyAccumulated {
     388              :     pub cplane: time::Duration,
     389              :     pub client: time::Duration,
     390              :     pub compute: time::Duration,
     391              :     pub retry: time::Duration,
     392              : }
     393              : 
     394              : impl std::fmt::Display for LatencyAccumulated {
     395            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     396            0 :         write!(
     397            0 :             f,
     398            0 :             "client: {}, cplane: {}, compute: {}, retry: {}",
     399            0 :             self.client.as_micros(),
     400            0 :             self.cplane.as_micros(),
     401            0 :             self.compute.as_micros(),
     402            0 :             self.retry.as_micros()
     403              :         )
     404            0 :     }
     405              : }
     406              : 
     407              : pub struct LatencyTimer {
     408              :     // time since the stopwatch was started
     409              :     start: time::Instant,
     410              :     // time since the stopwatch was stopped
     411              :     stop: Option<time::Instant>,
     412              :     // accumulated time on the stopwatch
     413              :     accumulated: LatencyAccumulated,
     414              :     // label data
     415              :     protocol: Protocol,
     416              :     cold_start_info: ColdStartInfo,
     417              :     outcome: ConnectOutcome,
     418              : 
     419              :     skip_reporting: bool,
     420              : }
     421              : 
     422              : impl LatencyTimer {
     423           77 :     pub fn new(protocol: Protocol) -> Self {
     424           77 :         Self {
     425           77 :             start: time::Instant::now(),
     426           77 :             stop: None,
     427           77 :             accumulated: LatencyAccumulated::default(),
     428           77 :             protocol,
     429           77 :             cold_start_info: ColdStartInfo::Unknown,
     430           77 :             // assume failed unless otherwise specified
     431           77 :             outcome: ConnectOutcome::Failed,
     432           77 :             skip_reporting: false,
     433           77 :         }
     434           77 :     }
     435              : 
     436            0 :     pub(crate) fn noop(protocol: Protocol) -> Self {
     437            0 :         Self {
     438            0 :             start: time::Instant::now(),
     439            0 :             stop: None,
     440            0 :             accumulated: LatencyAccumulated::default(),
     441            0 :             protocol,
     442            0 :             cold_start_info: ColdStartInfo::Unknown,
     443            0 :             // assume failed unless otherwise specified
     444            0 :             outcome: ConnectOutcome::Failed,
     445            0 :             skip_reporting: true,
     446            0 :         }
     447            0 :     }
     448              : 
     449           52 :     pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
     450           52 :         let dur = start.elapsed();
     451           52 :         match waiting_for {
     452            0 :             Waiting::Cplane => self.accumulated.cplane += dur,
     453           39 :             Waiting::Client => self.accumulated.client += dur,
     454            7 :             Waiting::Compute => self.accumulated.compute += dur,
     455            6 :             Waiting::RetryTimeout => self.accumulated.retry += dur,
     456              :         }
     457           52 :     }
     458              : 
     459            0 :     pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
     460            0 :         self.cold_start_info = cold_start_info;
     461            0 :     }
     462              : 
     463            8 :     pub fn success(&mut self) {
     464              :         // stop the stopwatch and record the time that we have accumulated
     465            8 :         self.stop = Some(time::Instant::now());
     466              : 
     467              :         // success
     468            8 :         self.outcome = ConnectOutcome::Success;
     469            8 :     }
     470              : 
     471            0 :     pub fn accumulated(&self) -> LatencyAccumulated {
     472            0 :         self.accumulated.clone()
     473            0 :     }
     474              : }
     475              : 
     476              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     477              : pub enum ConnectOutcome {
     478              :     Success,
     479              :     Failed,
     480              : }
     481              : 
     482              : impl Drop for LatencyTimer {
     483           77 :     fn drop(&mut self) {
     484           77 :         if self.skip_reporting {
     485            0 :             return;
     486           77 :         }
     487              : 
     488           77 :         let duration = self
     489           77 :             .stop
     490           77 :             .unwrap_or_else(time::Instant::now)
     491           77 :             .duration_since(self.start);
     492              : 
     493           77 :         let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
     494              : 
     495              :         // Excluding client communication from the accumulated time.
     496           77 :         metric.observe(
     497           77 :             ComputeConnectionLatencyGroup {
     498           77 :                 protocol: self.protocol,
     499           77 :                 cold_start_info: self.cold_start_info,
     500           77 :                 outcome: self.outcome,
     501           77 :                 excluded: LatencyExclusions::Client,
     502           77 :             },
     503           77 :             duration
     504           77 :                 .saturating_sub(self.accumulated.client)
     505           77 :                 .as_secs_f64(),
     506              :         );
     507              : 
     508              :         // Exclude client and cplane communication from the accumulated time.
     509           77 :         let accumulated_total = self.accumulated.client + self.accumulated.cplane;
     510           77 :         metric.observe(
     511           77 :             ComputeConnectionLatencyGroup {
     512           77 :                 protocol: self.protocol,
     513           77 :                 cold_start_info: self.cold_start_info,
     514           77 :                 outcome: self.outcome,
     515           77 :                 excluded: LatencyExclusions::ClientAndCplane,
     516           77 :             },
     517           77 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     518              :         );
     519              : 
     520              :         // Exclude client, cplane, compute communication from the accumulated time.
     521           77 :         let accumulated_total =
     522           77 :             self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
     523           77 :         metric.observe(
     524           77 :             ComputeConnectionLatencyGroup {
     525           77 :                 protocol: self.protocol,
     526           77 :                 cold_start_info: self.cold_start_info,
     527           77 :                 outcome: self.outcome,
     528           77 :                 excluded: LatencyExclusions::ClientCplaneCompute,
     529           77 :             },
     530           77 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     531              :         );
     532              : 
     533              :         // Exclude client, cplane, compute, retry communication from the accumulated time.
     534           77 :         let accumulated_total = self.accumulated.client
     535           77 :             + self.accumulated.cplane
     536           77 :             + self.accumulated.compute
     537           77 :             + self.accumulated.retry;
     538           77 :         metric.observe(
     539           77 :             ComputeConnectionLatencyGroup {
     540           77 :                 protocol: self.protocol,
     541           77 :                 cold_start_info: self.cold_start_info,
     542           77 :                 outcome: self.outcome,
     543           77 :                 excluded: LatencyExclusions::ClientCplaneComputeRetry,
     544           77 :             },
     545           77 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     546              :         );
     547           77 :     }
     548              : }
     549              : 
     550              : impl From<bool> for Bool {
     551            3 :     fn from(value: bool) -> Self {
     552            3 :         if value { Bool::True } else { Bool::False }
     553            3 :     }
     554              : }
     555              : 
     556              : #[derive(LabelGroup)]
     557              : #[label(set = InvalidEndpointsSet)]
     558              : pub struct InvalidEndpointsGroup {
     559              :     pub protocol: Protocol,
     560              :     pub rejected: Bool,
     561              :     pub outcome: ConnectOutcome,
     562              : }
     563              : 
     564              : #[derive(LabelGroup)]
     565              : #[label(set = RetriesMetricSet)]
     566              : pub struct RetriesMetricGroup {
     567              :     pub outcome: ConnectOutcome,
     568              :     pub retry_type: RetryType,
     569              : }
     570              : 
     571              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     572              : pub enum RetryType {
     573              :     WakeCompute,
     574              :     ConnectToCompute,
     575              : }
     576              : 
     577              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     578              : #[label(singleton = "event")]
     579              : pub enum RedisEventsCount {
     580              :     EndpointCreated,
     581              :     BranchCreated,
     582              :     ProjectCreated,
     583              :     CancelSession,
     584              :     InvalidateRole,
     585              :     InvalidateEndpoint,
     586              :     InvalidateProject,
     587              :     InvalidateProjects,
     588              :     InvalidateOrg,
     589              : }
     590              : 
     591              : pub struct ThreadPoolWorkers(usize);
     592              : #[derive(Copy, Clone)]
     593              : pub struct ThreadPoolWorkerId(pub usize);
     594              : 
     595              : impl LabelValue for ThreadPoolWorkerId {
     596            0 :     fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
     597            0 :         v.write_int(self.0 as i64)
     598            0 :     }
     599              : }
     600              : 
     601              : impl LabelGroup for ThreadPoolWorkerId {
     602            0 :     fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
     603            0 :         v.write_value(LabelName::from_str("worker"), self);
     604            0 :     }
     605              : }
     606              : 
     607              : impl LabelGroupSet for ThreadPoolWorkers {
     608              :     type Group<'a> = ThreadPoolWorkerId;
     609              : 
     610          114 :     fn cardinality(&self) -> Option<usize> {
     611          114 :         Some(self.0)
     612          114 :     }
     613              : 
     614            6 :     fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
     615            6 :         Some(value)
     616            6 :     }
     617              : 
     618            0 :     fn decode_dense(&self, value: usize) -> Self::Group<'_> {
     619            0 :         ThreadPoolWorkerId(value)
     620            0 :     }
     621              : 
     622              :     type Unique = usize;
     623              : 
     624            6 :     fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
     625            6 :         Some(value.0)
     626            6 :     }
     627              : 
     628            0 :     fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
     629            0 :         ThreadPoolWorkerId(*value)
     630            0 :     }
     631              : }
     632              : 
     633              : impl LabelSet for ThreadPoolWorkers {
     634              :     type Value<'a> = ThreadPoolWorkerId;
     635              : 
     636            0 :     fn dynamic_cardinality(&self) -> Option<usize> {
     637            0 :         Some(self.0)
     638            0 :     }
     639              : 
     640            0 :     fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
     641            0 :         (value.0 < self.0).then_some(value.0)
     642            0 :     }
     643              : 
     644            0 :     fn decode(&self, value: usize) -> Self::Value<'_> {
     645            0 :         ThreadPoolWorkerId(value)
     646            0 :     }
     647              : }
     648              : 
     649              : impl FixedCardinalitySet for ThreadPoolWorkers {
     650            0 :     fn cardinality(&self) -> usize {
     651            0 :         self.0
     652            0 :     }
     653              : }
     654              : 
     655              : #[derive(MetricGroup)]
     656              : #[metric(new(workers: usize))]
     657              : pub struct ThreadPoolMetrics {
     658              :     #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
     659              :     pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
     660              :     #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
     661              :     pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
     662              : }
        

Generated by: LCOV version 2.1-beta