LCOV - code coverage report
Current view: top level - proxy/src - metrics.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 66.5 % 212 141
Test Date: 2025-05-26 10:37:33 Functions: 79.0 % 62 49

            Line data    Source code
       1              : use std::sync::{Arc, OnceLock};
       2              : 
       3              : use lasso::ThreadedRodeo;
       4              : use measured::label::{
       5              :     FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
       6              : };
       7              : use measured::metric::histogram::Thresholds;
       8              : use measured::metric::name::MetricName;
       9              : use measured::{
      10              :     Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
      11              :     MetricGroup,
      12              : };
      13              : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
      14              : use tokio::time::{self, Instant};
      15              : 
      16              : use crate::control_plane::messages::ColdStartInfo;
      17              : use crate::error::ErrorKind;
      18              : 
      19           49 : #[derive(MetricGroup)]
      20              : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
      21              : pub struct Metrics {
      22              :     #[metric(namespace = "proxy")]
      23              :     #[metric(init = ProxyMetrics::new(thread_pool))]
      24              :     pub proxy: ProxyMetrics,
      25              : 
      26              :     #[metric(namespace = "wake_compute_lock")]
      27              :     pub wake_compute_lock: ApiLockMetrics,
      28              : }
      29              : 
      30              : static SELF: OnceLock<Metrics> = OnceLock::new();
      31              : impl Metrics {
      32            0 :     pub fn install(thread_pool: Arc<ThreadPoolMetrics>) {
      33            0 :         let mut metrics = Metrics::new(thread_pool);
      34            0 : 
      35            0 :         metrics.proxy.errors_total.init_all_dense();
      36            0 :         metrics.proxy.redis_errors_total.init_all_dense();
      37            0 :         metrics.proxy.redis_events_count.init_all_dense();
      38            0 :         metrics.proxy.retries_metric.init_all_dense();
      39            0 :         metrics.proxy.invalid_endpoints_total.init_all_dense();
      40            0 :         metrics.proxy.connection_failures_total.init_all_dense();
      41            0 : 
      42            0 :         SELF.set(metrics)
      43            0 :             .ok()
      44            0 :             .expect("proxy metrics must not be installed more than once");
      45            0 :     }
      46              : 
      47          163 :     pub fn get() -> &'static Self {
      48          163 :         #[cfg(test)]
      49          163 :         return SELF.get_or_init(|| Metrics::new(Arc::new(ThreadPoolMetrics::new(0))));
      50          163 : 
      51          163 :         #[cfg(not(test))]
      52          163 :         SELF.get()
      53          163 :             .expect("proxy metrics must be installed by the main() function")
      54          163 :     }
      55              : }
      56              : 
      57           49 : #[derive(MetricGroup)]
      58              : #[metric(new(thread_pool: Arc<ThreadPoolMetrics>))]
      59              : pub struct ProxyMetrics {
      60              :     #[metric(flatten)]
      61              :     pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
      62              :     #[metric(flatten)]
      63              :     pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
      64              :     #[metric(flatten)]
      65              :     pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
      66              :     #[metric(flatten)]
      67              :     pub http_endpoint_pools: HttpEndpointPools,
      68              :     #[metric(flatten)]
      69              :     pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
      70              : 
      71              :     /// Time it took for proxy to establish a connection to the compute endpoint.
      72              :     // largest bucket = 2^16 * 0.5ms = 32s
      73              :     #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
      74              :     pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
      75              : 
      76              :     /// Time it took for proxy to receive a response from control plane.
      77              :     #[metric(
      78              :         // largest bucket = 2^16 * 0.2ms = 13s
      79              :         metadata = Thresholds::exponential_buckets(0.0002, 2.0),
      80              :     )]
      81              :     pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
      82              : 
      83              :     /// Time it takes to acquire a token to call console plane.
      84              :     // largest bucket = 3^16 * 0.05ms = 2.15s
      85              :     #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
      86              :     pub control_plane_token_acquire_seconds: Histogram<16>,
      87              : 
      88              :     /// Size of the HTTP request body lengths.
      89              :     // smallest bucket = 16 bytes
      90              :     // largest bucket = 4^12 * 16 bytes = 256MB
      91              :     #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
      92              :     pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
      93              : 
      94              :     /// Time it takes to reclaim unused connection pools.
      95              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
      96              :     pub http_pool_reclaimation_lag_seconds: Histogram<16>,
      97              : 
      98              :     /// Number of opened connections to a database.
      99              :     pub http_pool_opened_connections: Gauge,
     100              : 
     101              :     /// Number of cache hits/misses for allowed ips.
     102              :     pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
     103              : 
     104              :     /// Number of allowed ips
     105              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
     106              :     pub allowed_ips_number: Histogram<10>,
     107              : 
     108              :     /// Number of cache hits/misses for VPC endpoint IDs.
     109              :     pub vpc_endpoint_id_cache_stats: CounterVec<StaticLabelSet<CacheOutcome>>,
     110              : 
     111              :     /// Number of cache hits/misses for access blocker flags.
     112              :     pub access_blocker_flags_cache_stats: CounterVec<StaticLabelSet<CacheOutcome>>,
     113              : 
     114              :     /// Number of allowed VPC endpoints IDs
     115              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
     116              :     pub allowed_vpc_endpoint_ids: Histogram<10>,
     117              : 
     118              :     /// Number of connections, by the method we used to determine the endpoint.
     119              :     pub accepted_connections_by_sni: CounterVec<SniSet>,
     120              : 
     121              :     /// Number of connection failures (per kind).
     122              :     pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
     123              : 
     124              :     /// Number of wake-up failures (per kind).
     125              :     pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
     126              : 
     127              :     /// Number of bytes sent/received between all clients and backends.
     128              :     pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
     129              : 
     130              :     /// Number of errors by a given classification.
     131              :     pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
     132              : 
     133              :     /// Number of cancellation requests (per found/not_found).
     134              :     pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
     135              : 
     136              :     /// Number of errors by a given classification
     137              :     pub redis_errors_total: CounterVec<RedisErrorsSet>,
     138              : 
     139              :     /// Number of TLS handshake failures
     140              :     pub tls_handshake_failures: Counter,
     141              : 
     142              :     /// Number of connection requests affected by authentication rate limits
     143              :     pub requests_auth_rate_limits_total: Counter,
     144              : 
     145              :     /// HLL approximate cardinality of endpoints that are connecting
     146              :     pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
     147              : 
     148              :     /// Number of endpoints affected by errors of a given classification
     149              :     pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
     150              : 
     151              :     /// Number of endpoints affected by authentication rate limits
     152              :     pub endpoints_auth_rate_limits: HyperLogLog<32>,
     153              : 
     154              :     /// Number of invalid endpoints (per protocol, per rejected).
     155              :     pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
     156              : 
     157              :     /// Number of retries (per outcome, per retry_type).
     158              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
     159              :     pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
     160              : 
     161              :     /// Number of events consumed from redis (per event type).
     162              :     pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
     163              : 
     164              :     #[metric(namespace = "connect_compute_lock")]
     165              :     pub connect_compute_lock: ApiLockMetrics,
     166              : 
     167              :     #[metric(namespace = "scram_pool")]
     168              :     #[metric(init = thread_pool)]
     169              :     pub scram_pool: Arc<ThreadPoolMetrics>,
     170              : }
     171              : 
     172           98 : #[derive(MetricGroup)]
     173              : #[metric(new())]
     174              : pub struct ApiLockMetrics {
     175              :     /// Number of semaphores registered in this api lock
     176              :     pub semaphores_registered: Counter,
     177              :     /// Number of semaphores unregistered in this api lock
     178              :     pub semaphores_unregistered: Counter,
     179              :     /// Time it takes to reclaim unused semaphores in the api lock
     180              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
     181              :     pub reclamation_lag_seconds: Histogram<16>,
     182              :     /// Time it takes to acquire a semaphore lock
     183              :     #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
     184              :     pub semaphore_acquire_seconds: Histogram<16>,
     185              : }
     186              : 
     187              : impl Default for ApiLockMetrics {
     188           98 :     fn default() -> Self {
     189           98 :         Self::new()
     190           98 :     }
     191              : }
     192              : 
     193              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     194              : #[label(singleton = "direction")]
     195              : pub enum HttpDirection {
     196              :     Request,
     197              :     Response,
     198              : }
     199              : 
     200              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     201              : #[label(singleton = "direction")]
     202              : pub enum Direction {
     203              :     Tx,
     204              :     Rx,
     205              : }
     206              : 
     207              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     208              : #[label(singleton = "protocol")]
     209              : pub enum Protocol {
     210              :     Http,
     211              :     Ws,
     212              :     Tcp,
     213              :     SniRouter,
     214              : }
     215              : 
     216              : impl Protocol {
     217            4 :     pub fn as_str(self) -> &'static str {
     218            4 :         match self {
     219            0 :             Protocol::Http => "http",
     220            0 :             Protocol::Ws => "ws",
     221            4 :             Protocol::Tcp => "tcp",
     222            0 :             Protocol::SniRouter => "sni_router",
     223              :         }
     224            4 :     }
     225              : }
     226              : 
     227              : impl std::fmt::Display for Protocol {
     228            4 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     229            4 :         f.write_str(self.as_str())
     230            4 :     }
     231              : }
     232              : 
     233              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     234              : pub enum Bool {
     235              :     True,
     236              :     False,
     237              : }
     238              : 
     239              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     240              : #[label(singleton = "outcome")]
     241              : pub enum Outcome {
     242              :     Success,
     243              :     Failed,
     244              : }
     245              : 
     246              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     247              : #[label(singleton = "outcome")]
     248              : pub enum CacheOutcome {
     249              :     Hit,
     250              :     Miss,
     251              : }
     252              : 
     253           98 : #[derive(LabelGroup)]
     254              : #[label(set = ConsoleRequestSet)]
     255              : pub struct ConsoleRequest<'a> {
     256              :     #[label(dynamic_with = ThreadedRodeo, default)]
     257              :     pub request: &'a str,
     258              : }
     259              : 
     260              : #[derive(MetricGroup, Default)]
     261              : pub struct HttpEndpointPools {
     262              :     /// Number of endpoints we have registered pools for
     263              :     pub http_pool_endpoints_registered_total: Counter,
     264              :     /// Number of endpoints we have unregistered pools for
     265              :     pub http_pool_endpoints_unregistered_total: Counter,
     266              : }
     267              : 
     268              : pub struct HttpEndpointPoolsGuard<'a> {
     269              :     dec: &'a Counter,
     270              : }
     271              : 
     272              : impl Drop for HttpEndpointPoolsGuard<'_> {
     273            2 :     fn drop(&mut self) {
     274            2 :         self.dec.inc();
     275            2 :     }
     276              : }
     277              : 
     278              : impl HttpEndpointPools {
     279            2 :     pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
     280            2 :         self.http_pool_endpoints_registered_total.inc();
     281            2 :         HttpEndpointPoolsGuard {
     282            2 :             dec: &self.http_pool_endpoints_unregistered_total,
     283            2 :         }
     284            2 :     }
     285              : }
     286              : pub struct NumDbConnectionsGauge;
     287              : impl CounterPairAssoc for NumDbConnectionsGauge {
     288              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
     289              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
     290              :     const INC_HELP: &'static str = "Number of opened connections to a database.";
     291              :     const DEC_HELP: &'static str = "Number of closed connections to a database.";
     292              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     293              : }
     294              : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
     295              : 
     296              : pub struct NumClientConnectionsGauge;
     297              : impl CounterPairAssoc for NumClientConnectionsGauge {
     298              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
     299              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
     300              :     const INC_HELP: &'static str = "Number of opened connections from a client.";
     301              :     const DEC_HELP: &'static str = "Number of closed connections from a client.";
     302              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     303              : }
     304              : pub type NumClientConnectionsGuard<'a> =
     305              :     metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
     306              : 
     307              : pub struct NumConnectionRequestsGauge;
     308              : impl CounterPairAssoc for NumConnectionRequestsGauge {
     309              :     const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
     310              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
     311              :     const INC_HELP: &'static str = "Number of client connections accepted.";
     312              :     const DEC_HELP: &'static str = "Number of client connections closed.";
     313              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     314              : }
     315              : pub type NumConnectionRequestsGuard<'a> =
     316              :     metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
     317              : 
     318              : pub struct CancelChannelSizeGauge;
     319              : impl CounterPairAssoc for CancelChannelSizeGauge {
     320              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
     321              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
     322              :     const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
     323              :     const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
     324              :     type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
     325              : }
     326              : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
     327              : 
     328          294 : #[derive(LabelGroup)]
     329              : #[label(set = ComputeConnectionLatencySet)]
     330              : pub struct ComputeConnectionLatencyGroup {
     331              :     protocol: Protocol,
     332              :     cold_start_info: ColdStartInfo,
     333              :     outcome: ConnectOutcome,
     334              :     excluded: LatencyExclusions,
     335              : }
     336              : 
     337              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     338              : pub enum LatencyExclusions {
     339              :     Client,
     340              :     ClientAndCplane,
     341              :     ClientCplaneCompute,
     342              :     ClientCplaneComputeRetry,
     343              : }
     344              : 
     345          196 : #[derive(LabelGroup)]
     346              : #[label(set = SniSet)]
     347              : pub struct SniGroup {
     348              :     pub protocol: Protocol,
     349              :     pub kind: SniKind,
     350              : }
     351              : 
     352              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     353              : pub enum SniKind {
     354              :     /// Domain name based routing. SNI for libpq/websockets. Host for HTTP
     355              :     Sni,
     356              :     /// Metadata based routing. `options` for libpq/websockets. Header for HTTP
     357              :     NoSni,
     358              :     /// Metadata based routing, using the password field.
     359              :     PasswordHack,
     360              : }
     361              : 
     362              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     363              : #[label(singleton = "kind")]
     364              : pub enum ConnectionFailureKind {
     365              :     ComputeCached,
     366              :     ComputeUncached,
     367              : }
     368              : 
     369          196 : #[derive(LabelGroup)]
     370              : #[label(set = ConnectionFailuresBreakdownSet)]
     371              : pub struct ConnectionFailuresBreakdownGroup {
     372              :     pub kind: ErrorKind,
     373              :     pub retry: Bool,
     374              : }
     375              : 
     376           98 : #[derive(LabelGroup, Copy, Clone)]
     377              : #[label(set = RedisErrorsSet)]
     378              : pub struct RedisErrors<'a> {
     379              :     #[label(dynamic_with = ThreadedRodeo, default)]
     380              :     pub channel: &'a str,
     381              : }
     382              : 
     383              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     384              : pub enum CancellationOutcome {
     385              :     NotFound,
     386              :     Found,
     387              :     RateLimitExceeded,
     388              : }
     389              : 
     390          147 : #[derive(LabelGroup)]
     391              : #[label(set = CancellationRequestSet)]
     392              : pub struct CancellationRequest {
     393              :     pub kind: CancellationOutcome,
     394              : }
     395              : 
     396              : #[derive(Clone, Copy)]
     397              : pub enum Waiting {
     398              :     Cplane,
     399              :     Client,
     400              :     Compute,
     401              :     RetryTimeout,
     402              : }
     403              : 
     404              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     405              : #[label(singleton = "kind")]
     406              : #[allow(clippy::enum_variant_names)]
     407              : pub enum RedisMsgKind {
     408              :     HSet,
     409              :     HSetMultiple,
     410              :     HGet,
     411              :     HGetAll,
     412              :     HDel,
     413              : }
     414              : 
     415              : #[derive(Default, Clone)]
     416              : pub struct LatencyAccumulated {
     417              :     cplane: time::Duration,
     418              :     client: time::Duration,
     419              :     compute: time::Duration,
     420              :     retry: time::Duration,
     421              : }
     422              : 
     423              : impl std::fmt::Display for LatencyAccumulated {
     424            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     425            0 :         write!(
     426            0 :             f,
     427            0 :             "client: {}, cplane: {}, compute: {}, retry: {}",
     428            0 :             self.client.as_micros(),
     429            0 :             self.cplane.as_micros(),
     430            0 :             self.compute.as_micros(),
     431            0 :             self.retry.as_micros()
     432            0 :         )
     433            0 :     }
     434              : }
     435              : 
     436              : pub struct LatencyTimer {
     437              :     // time since the stopwatch was started
     438              :     start: time::Instant,
     439              :     // time since the stopwatch was stopped
     440              :     stop: Option<time::Instant>,
     441              :     // accumulated time on the stopwatch
     442              :     accumulated: LatencyAccumulated,
     443              :     // label data
     444              :     protocol: Protocol,
     445              :     cold_start_info: ColdStartInfo,
     446              :     outcome: ConnectOutcome,
     447              : 
     448              :     skip_reporting: bool,
     449              : }
     450              : 
     451              : impl LatencyTimer {
     452           75 :     pub fn new(protocol: Protocol) -> Self {
     453           75 :         Self {
     454           75 :             start: time::Instant::now(),
     455           75 :             stop: None,
     456           75 :             accumulated: LatencyAccumulated::default(),
     457           75 :             protocol,
     458           75 :             cold_start_info: ColdStartInfo::Unknown,
     459           75 :             // assume failed unless otherwise specified
     460           75 :             outcome: ConnectOutcome::Failed,
     461           75 :             skip_reporting: false,
     462           75 :         }
     463           75 :     }
     464              : 
     465            0 :     pub(crate) fn noop(protocol: Protocol) -> Self {
     466            0 :         Self {
     467            0 :             start: time::Instant::now(),
     468            0 :             stop: None,
     469            0 :             accumulated: LatencyAccumulated::default(),
     470            0 :             protocol,
     471            0 :             cold_start_info: ColdStartInfo::Unknown,
     472            0 :             // assume failed unless otherwise specified
     473            0 :             outcome: ConnectOutcome::Failed,
     474            0 :             skip_reporting: true,
     475            0 :         }
     476            0 :     }
     477              : 
     478           28 :     pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
     479           28 :         let dur = start.elapsed();
     480           28 :         match waiting_for {
     481            0 :             Waiting::Cplane => self.accumulated.cplane += dur,
     482           15 :             Waiting::Client => self.accumulated.client += dur,
     483            7 :             Waiting::Compute => self.accumulated.compute += dur,
     484            6 :             Waiting::RetryTimeout => self.accumulated.retry += dur,
     485              :         }
     486           28 :     }
     487              : 
     488            0 :     pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
     489            0 :         self.cold_start_info = cold_start_info;
     490            0 :     }
     491              : 
     492            7 :     pub fn success(&mut self) {
     493            7 :         // stop the stopwatch and record the time that we have accumulated
     494            7 :         self.stop = Some(time::Instant::now());
     495            7 : 
     496            7 :         // success
     497            7 :         self.outcome = ConnectOutcome::Success;
     498            7 :     }
     499              : 
     500            0 :     pub fn accumulated(&self) -> LatencyAccumulated {
     501            0 :         self.accumulated.clone()
     502            0 :     }
     503              : }
     504              : 
     505              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     506              : pub enum ConnectOutcome {
     507              :     Success,
     508              :     Failed,
     509              : }
     510              : 
     511              : impl Drop for LatencyTimer {
     512           75 :     fn drop(&mut self) {
     513           75 :         if self.skip_reporting {
     514            0 :             return;
     515           75 :         }
     516           75 : 
     517           75 :         let duration = self
     518           75 :             .stop
     519           75 :             .unwrap_or_else(time::Instant::now)
     520           75 :             .duration_since(self.start);
     521           75 : 
     522           75 :         let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
     523           75 : 
     524           75 :         // Excluding client communication from the accumulated time.
     525           75 :         metric.observe(
     526           75 :             ComputeConnectionLatencyGroup {
     527           75 :                 protocol: self.protocol,
     528           75 :                 cold_start_info: self.cold_start_info,
     529           75 :                 outcome: self.outcome,
     530           75 :                 excluded: LatencyExclusions::Client,
     531           75 :             },
     532           75 :             duration
     533           75 :                 .saturating_sub(self.accumulated.client)
     534           75 :                 .as_secs_f64(),
     535           75 :         );
     536           75 : 
     537           75 :         // Exclude client and cplane communication from the accumulated time.
     538           75 :         let accumulated_total = self.accumulated.client + self.accumulated.cplane;
     539           75 :         metric.observe(
     540           75 :             ComputeConnectionLatencyGroup {
     541           75 :                 protocol: self.protocol,
     542           75 :                 cold_start_info: self.cold_start_info,
     543           75 :                 outcome: self.outcome,
     544           75 :                 excluded: LatencyExclusions::ClientAndCplane,
     545           75 :             },
     546           75 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     547           75 :         );
     548           75 : 
     549           75 :         // Exclude client, cplane, compute communication from the accumulated time.
     550           75 :         let accumulated_total =
     551           75 :             self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
     552           75 :         metric.observe(
     553           75 :             ComputeConnectionLatencyGroup {
     554           75 :                 protocol: self.protocol,
     555           75 :                 cold_start_info: self.cold_start_info,
     556           75 :                 outcome: self.outcome,
     557           75 :                 excluded: LatencyExclusions::ClientCplaneCompute,
     558           75 :             },
     559           75 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     560           75 :         );
     561           75 : 
     562           75 :         // Exclude client, cplane, compute, retry communication from the accumulated time.
     563           75 :         let accumulated_total = self.accumulated.client
     564           75 :             + self.accumulated.cplane
     565           75 :             + self.accumulated.compute
     566           75 :             + self.accumulated.retry;
     567           75 :         metric.observe(
     568           75 :             ComputeConnectionLatencyGroup {
     569           75 :                 protocol: self.protocol,
     570           75 :                 cold_start_info: self.cold_start_info,
     571           75 :                 outcome: self.outcome,
     572           75 :                 excluded: LatencyExclusions::ClientCplaneComputeRetry,
     573           75 :             },
     574           75 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     575           75 :         );
     576           75 :     }
     577              : }
     578              : 
     579              : impl From<bool> for Bool {
     580            3 :     fn from(value: bool) -> Self {
     581            3 :         if value { Bool::True } else { Bool::False }
     582            3 :     }
     583              : }
     584              : 
     585          245 : #[derive(LabelGroup)]
     586              : #[label(set = InvalidEndpointsSet)]
     587              : pub struct InvalidEndpointsGroup {
     588              :     pub protocol: Protocol,
     589              :     pub rejected: Bool,
     590              :     pub outcome: ConnectOutcome,
     591              : }
     592              : 
     593          196 : #[derive(LabelGroup)]
     594              : #[label(set = RetriesMetricSet)]
     595              : pub struct RetriesMetricGroup {
     596              :     pub outcome: ConnectOutcome,
     597              :     pub retry_type: RetryType,
     598              : }
     599              : 
     600              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     601              : pub enum RetryType {
     602              :     WakeCompute,
     603              :     ConnectToCompute,
     604              : }
     605              : 
     606              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     607              : #[label(singleton = "event")]
     608              : pub enum RedisEventsCount {
     609              :     EndpointCreated,
     610              :     BranchCreated,
     611              :     ProjectCreated,
     612              :     CancelSession,
     613              :     PasswordUpdate,
     614              :     AllowedIpsUpdate,
     615              :     AllowedVpcEndpointIdsUpdateForProjects,
     616              :     AllowedVpcEndpointIdsUpdateForAllProjectsInOrg,
     617              :     BlockPublicOrVpcAccessUpdate,
     618              : }
     619              : 
     620              : pub struct ThreadPoolWorkers(usize);
     621              : #[derive(Copy, Clone)]
     622              : pub struct ThreadPoolWorkerId(pub usize);
     623              : 
     624              : impl LabelValue for ThreadPoolWorkerId {
     625            0 :     fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
     626            0 :         v.write_int(self.0 as i64)
     627            0 :     }
     628              : }
     629              : 
     630              : impl LabelGroup for ThreadPoolWorkerId {
     631            0 :     fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
     632            0 :         v.write_value(LabelName::from_str("worker"), self);
     633            0 :     }
     634              : }
     635              : 
     636              : impl LabelGroupSet for ThreadPoolWorkers {
     637              :     type Group<'a> = ThreadPoolWorkerId;
     638              : 
     639          110 :     fn cardinality(&self) -> Option<usize> {
     640          110 :         Some(self.0)
     641          110 :     }
     642              : 
     643            6 :     fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
     644            6 :         Some(value)
     645            6 :     }
     646              : 
     647            0 :     fn decode_dense(&self, value: usize) -> Self::Group<'_> {
     648            0 :         ThreadPoolWorkerId(value)
     649            0 :     }
     650              : 
     651              :     type Unique = usize;
     652              : 
     653            6 :     fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
     654            6 :         Some(value.0)
     655            6 :     }
     656              : 
     657            0 :     fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
     658            0 :         ThreadPoolWorkerId(*value)
     659            0 :     }
     660              : }
     661              : 
     662              : impl LabelSet for ThreadPoolWorkers {
     663              :     type Value<'a> = ThreadPoolWorkerId;
     664              : 
     665            0 :     fn dynamic_cardinality(&self) -> Option<usize> {
     666            0 :         Some(self.0)
     667            0 :     }
     668              : 
     669            0 :     fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
     670            0 :         (value.0 < self.0).then_some(value.0)
     671            0 :     }
     672              : 
     673            0 :     fn decode(&self, value: usize) -> Self::Value<'_> {
     674            0 :         ThreadPoolWorkerId(value)
     675            0 :     }
     676              : }
     677              : 
     678              : impl FixedCardinalitySet for ThreadPoolWorkers {
     679            0 :     fn cardinality(&self) -> usize {
     680            0 :         self.0
     681            0 :     }
     682              : }
     683              : 
     684           55 : #[derive(MetricGroup)]
     685              : #[metric(new(workers: usize))]
     686              : pub struct ThreadPoolMetrics {
     687              :     #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
     688              :     pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
     689              :     #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
     690              :     pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
     691              : }
        

Generated by: LCOV version 2.1-beta