LCOV - code coverage report
Current view: top level - proxy/src - metrics.rs (source / functions) Coverage Total Hit
Test: b837401fb09d2d9818b70e630fdb67e9799b7b0d.info Lines: 80.5 % 118 95
Test Date: 2024-04-18 15:32:49 Functions: 76.6 % 47 36

            Line data    Source code
       1              : use std::sync::OnceLock;
       2              : 
       3              : use lasso::ThreadedRodeo;
       4              : use measured::{
       5              :     label::StaticLabelSet,
       6              :     metric::{histogram::Thresholds, name::MetricName},
       7              :     Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
       8              :     MetricGroup,
       9              : };
      10              : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
      11              : 
      12              : use tokio::time::{self, Instant};
      13              : 
      14              : use crate::console::messages::ColdStartInfo;
      15              : 
      16              : #[derive(MetricGroup)]
      17              : pub struct Metrics {
      18              :     #[metric(namespace = "proxy")]
      19              :     pub proxy: ProxyMetrics,
      20              : 
      21              :     #[metric(namespace = "wake_compute_lock")]
      22              :     pub wake_compute_lock: ApiLockMetrics,
      23              : }
      24              : 
      25              : impl Metrics {
      26          144 :     pub fn get() -> &'static Self {
      27          144 :         static SELF: OnceLock<Metrics> = OnceLock::new();
      28          144 :         SELF.get_or_init(|| Metrics {
      29           72 :             proxy: ProxyMetrics::default(),
      30           72 :             wake_compute_lock: ApiLockMetrics::new(),
      31          144 :         })
      32          144 :     }
      33              : }
      34              : 
      35           72 : #[derive(MetricGroup)]
      36              : #[metric(new())]
      37              : pub struct ProxyMetrics {
      38              :     #[metric(flatten)]
      39              :     pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
      40              :     #[metric(flatten)]
      41              :     pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
      42              :     #[metric(flatten)]
      43              :     pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
      44              :     #[metric(flatten)]
      45              :     pub http_endpoint_pools: HttpEndpointPools,
      46              : 
      47              :     /// Time it took for proxy to establish a connection to the compute endpoint.
      48              :     // largest bucket = 2^16 * 0.5ms = 32s
      49              :     #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
      50              :     pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
      51              : 
      52              :     /// Time it took for proxy to receive a response from control plane.
      53              :     #[metric(
      54              :         // largest bucket = 2^16 * 0.2ms = 13s
      55              :         metadata = Thresholds::exponential_buckets(0.0002, 2.0),
      56              :     )]
      57              :     pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
      58              : 
      59              :     /// Time it takes to acquire a token to call console plane.
      60              :     // largest bucket = 3^16 * 0.05ms = 2.15s
      61              :     #[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
      62              :     pub control_plane_token_acquire_seconds: Histogram<16>,
      63              : 
      64              :     /// Size of the HTTP request body lengths.
      65              :     // smallest bucket = 16 bytes
      66              :     // largest bucket = 4^12 * 16 bytes = 256MB
      67              :     #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
      68              :     pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
      69              : 
      70              :     /// Time it takes to reclaim unused connection pools.
      71              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
      72              :     pub http_pool_reclaimation_lag_seconds: Histogram<16>,
      73              : 
      74              :     /// Number of opened connections to a database.
      75              :     pub http_pool_opened_connections: Gauge,
      76              : 
      77              :     /// Number of cache hits/misses for allowed ips.
      78              :     pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
      79              : 
      80              :     /// Number of allowed ips
      81              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
      82              :     pub allowed_ips_number: Histogram<10>,
      83              : 
      84              :     /// Number of connections (per sni).
      85              :     pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
      86              : 
      87              :     /// Number of connection failures (per kind).
      88              :     pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
      89              : 
      90              :     /// Number of wake-up failures (per kind).
      91              :     pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
      92              : 
      93              :     /// Number of bytes sent/received between all clients and backends.
      94              :     pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
      95              : 
      96              :     /// Number of errors by a given classification.
      97              :     pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
      98              : 
      99              :     /// Number of cancellation requests (per found/not_found).
     100              :     pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
     101              : 
     102              :     /// Number of errors by a given classification
     103              :     pub redis_errors_total: CounterVec<RedisErrorsSet>,
     104              : 
     105              :     /// Number of TLS handshake failures
     106              :     pub tls_handshake_failures: Counter,
     107              : 
     108              :     /// Number of connection requests affected by authentication rate limits
     109              :     pub requests_auth_rate_limits_total: Counter,
     110              : 
     111              :     /// HLL approximate cardinality of endpoints that are connecting
     112              :     pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
     113              : 
     114              :     /// Number of endpoints affected by errors of a given classification
     115              :     pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
     116              : 
     117              :     /// Number of endpoints affected by authentication rate limits
     118              :     pub endpoints_auth_rate_limits: HyperLogLog<32>,
     119              : 
     120              :     /// Number of invalid endpoints (per protocol, per rejected).
     121              :     pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
     122              : }
     123              : 
     124           72 : #[derive(MetricGroup)]
     125              : #[metric(new())]
     126              : pub struct ApiLockMetrics {
     127              :     /// Number of semaphores registered in this api lock
     128              :     pub semaphores_registered: Counter,
     129              :     /// Number of semaphores unregistered in this api lock
     130              :     pub semaphores_unregistered: Counter,
     131              :     /// Time it takes to reclaim unused semaphores in the api lock
     132              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
     133              :     pub reclamation_lag_seconds: Histogram<16>,
     134              :     /// Time it takes to acquire a semaphore lock
     135              :     #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
     136              :     pub semaphore_acquire_seconds: Histogram<16>,
     137              : }
     138              : 
     139              : impl Default for ProxyMetrics {
     140           72 :     fn default() -> Self {
     141           72 :         Self::new()
     142           72 :     }
     143              : }
     144              : 
     145            0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
     146              : #[label(singleton = "direction")]
     147              : pub enum HttpDirection {
     148              :     Request,
     149              :     Response,
     150              : }
     151              : 
     152            0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
     153              : #[label(singleton = "direction")]
     154              : pub enum Direction {
     155              :     Tx,
     156              :     Rx,
     157              : }
     158              : 
     159            0 : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     160              : #[label(singleton = "protocol")]
     161              : pub enum Protocol {
     162              :     Http,
     163              :     Ws,
     164              :     Tcp,
     165              :     SniRouter,
     166              : }
     167              : 
     168              : impl Protocol {
     169            0 :     pub fn as_str(&self) -> &'static str {
     170            0 :         match self {
     171            0 :             Protocol::Http => "http",
     172            0 :             Protocol::Ws => "ws",
     173            0 :             Protocol::Tcp => "tcp",
     174            0 :             Protocol::SniRouter => "sni_router",
     175              :         }
     176            0 :     }
     177              : }
     178              : 
     179              : impl std::fmt::Display for Protocol {
     180            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     181            0 :         f.write_str(self.as_str())
     182            0 :     }
     183              : }
     184              : 
     185              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     186              : pub enum Bool {
     187              :     True,
     188              :     False,
     189              : }
     190              : 
     191            0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
     192              : #[label(singleton = "outcome")]
     193              : pub enum Outcome {
     194              :     Success,
     195              :     Failed,
     196              : }
     197              : 
     198            0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
     199              : #[label(singleton = "outcome")]
     200              : pub enum CacheOutcome {
     201              :     Hit,
     202              :     Miss,
     203              : }
     204              : 
     205          144 : #[derive(LabelGroup)]
     206              : #[label(set = ConsoleRequestSet)]
     207              : pub struct ConsoleRequest<'a> {
     208              :     #[label(dynamic_with = ThreadedRodeo, default)]
     209              :     pub request: &'a str,
     210              : }
     211              : 
     212              : #[derive(MetricGroup, Default)]
     213              : pub struct HttpEndpointPools {
     214              :     /// Number of endpoints we have registered pools for
     215              :     pub http_pool_endpoints_registered_total: Counter,
     216              :     /// Number of endpoints we have unregistered pools for
     217              :     pub http_pool_endpoints_unregistered_total: Counter,
     218              : }
     219              : 
     220              : pub struct HttpEndpointPoolsGuard<'a> {
     221              :     dec: &'a Counter,
     222              : }
     223              : 
     224              : impl Drop for HttpEndpointPoolsGuard<'_> {
     225            4 :     fn drop(&mut self) {
     226            4 :         self.dec.inc();
     227            4 :     }
     228              : }
     229              : 
     230              : impl HttpEndpointPools {
     231            4 :     pub fn guard(&self) -> HttpEndpointPoolsGuard {
     232            4 :         self.http_pool_endpoints_registered_total.inc();
     233            4 :         HttpEndpointPoolsGuard {
     234            4 :             dec: &self.http_pool_endpoints_unregistered_total,
     235            4 :         }
     236            4 :     }
     237              : }
     238              : pub struct NumDbConnectionsGauge;
     239              : impl CounterPairAssoc for NumDbConnectionsGauge {
     240              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
     241              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
     242              :     const INC_HELP: &'static str = "Number of opened connections to a database.";
     243              :     const DEC_HELP: &'static str = "Number of closed connections to a database.";
     244              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     245              : }
     246              : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
     247              : 
     248              : pub struct NumClientConnectionsGauge;
     249              : impl CounterPairAssoc for NumClientConnectionsGauge {
     250              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
     251              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
     252              :     const INC_HELP: &'static str = "Number of opened connections from a client.";
     253              :     const DEC_HELP: &'static str = "Number of closed connections from a client.";
     254              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     255              : }
     256              : pub type NumClientConnectionsGuard<'a> =
     257              :     metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
     258              : 
     259              : pub struct NumConnectionRequestsGauge;
     260              : impl CounterPairAssoc for NumConnectionRequestsGauge {
     261              :     const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
     262              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
     263              :     const INC_HELP: &'static str = "Number of client connections accepted.";
     264              :     const DEC_HELP: &'static str = "Number of client connections closed.";
     265              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     266              : }
     267              : pub type NumConnectionRequestsGuard<'a> =
     268              :     metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
     269              : 
     270          432 : #[derive(LabelGroup)]
     271              : #[label(set = ComputeConnectionLatencySet)]
     272              : pub struct ComputeConnectionLatencyGroup {
     273              :     protocol: Protocol,
     274              :     cold_start_info: ColdStartInfo,
     275              :     outcome: ConnectOutcome,
     276              :     excluded: LatencyExclusions,
     277              : }
     278              : 
     279              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     280              : pub enum LatencyExclusions {
     281              :     Client,
     282              :     ClientAndCplane,
     283              : }
     284              : 
     285            0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
     286              : #[label(singleton = "kind")]
     287              : pub enum SniKind {
     288              :     Sni,
     289              :     NoSni,
     290              :     PasswordHack,
     291              : }
     292              : 
     293            0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
     294              : #[label(singleton = "kind")]
     295              : pub enum ConnectionFailureKind {
     296              :     ComputeCached,
     297              :     ComputeUncached,
     298              : }
     299              : 
     300            0 : #[derive(FixedCardinalityLabel, Copy, Clone)]
     301              : #[label(singleton = "kind")]
     302              : pub enum WakeupFailureKind {
     303              :     BadComputeAddress,
     304              :     ApiTransportError,
     305              :     QuotaExceeded,
     306              :     ApiConsoleLocked,
     307              :     ApiConsoleBadRequest,
     308              :     ApiConsoleOtherServerError,
     309              :     ApiConsoleOtherError,
     310              :     TimeoutError,
     311              : }
     312              : 
     313          288 : #[derive(LabelGroup)]
     314              : #[label(set = ConnectionFailuresBreakdownSet)]
     315              : pub struct ConnectionFailuresBreakdownGroup {
     316              :     pub kind: WakeupFailureKind,
     317              :     pub retry: Bool,
     318              : }
     319              : 
     320          144 : #[derive(LabelGroup, Copy, Clone)]
     321              : #[label(set = RedisErrorsSet)]
     322              : pub struct RedisErrors<'a> {
     323              :     #[label(dynamic_with = ThreadedRodeo, default)]
     324              :     pub channel: &'a str,
     325              : }
     326              : 
     327              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     328              : pub enum CancellationSource {
     329              :     FromClient,
     330              :     FromRedis,
     331              :     Local,
     332              : }
     333              : 
     334              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     335              : pub enum CancellationOutcome {
     336              :     NotFound,
     337              :     Found,
     338              : }
     339              : 
     340          288 : #[derive(LabelGroup)]
     341              : #[label(set = CancellationRequestSet)]
     342              : pub struct CancellationRequest {
     343              :     pub source: CancellationSource,
     344              :     pub kind: CancellationOutcome,
     345              : }
     346              : 
     347              : pub enum Waiting {
     348              :     Cplane,
     349              :     Client,
     350              :     Compute,
     351              : }
     352              : 
     353              : #[derive(Default)]
     354              : struct Accumulated {
     355              :     cplane: time::Duration,
     356              :     client: time::Duration,
     357              :     compute: time::Duration,
     358              : }
     359              : 
     360              : pub struct LatencyTimer {
     361              :     // time since the stopwatch was started
     362              :     start: time::Instant,
     363              :     // time since the stopwatch was stopped
     364              :     stop: Option<time::Instant>,
     365              :     // accumulated time on the stopwatch
     366              :     accumulated: Accumulated,
     367              :     // label data
     368              :     protocol: Protocol,
     369              :     cold_start_info: ColdStartInfo,
     370              :     outcome: ConnectOutcome,
     371              : }
     372              : 
     373              : pub struct LatencyTimerPause<'a> {
     374              :     timer: &'a mut LatencyTimer,
     375              :     start: time::Instant,
     376              :     waiting_for: Waiting,
     377              : }
     378              : 
     379              : impl LatencyTimer {
     380           70 :     pub fn new(protocol: Protocol) -> Self {
     381           70 :         Self {
     382           70 :             start: time::Instant::now(),
     383           70 :             stop: None,
     384           70 :             accumulated: Accumulated::default(),
     385           70 :             protocol,
     386           70 :             cold_start_info: ColdStartInfo::Unknown,
     387           70 :             // assume failed unless otherwise specified
     388           70 :             outcome: ConnectOutcome::Failed,
     389           70 :         }
     390           70 :     }
     391              : 
     392           30 :     pub fn pause(&mut self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
     393           30 :         LatencyTimerPause {
     394           30 :             timer: self,
     395           30 :             start: Instant::now(),
     396           30 :             waiting_for,
     397           30 :         }
     398           30 :     }
     399              : 
     400            0 :     pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
     401            0 :         self.cold_start_info = cold_start_info;
     402            0 :     }
     403              : 
     404            8 :     pub fn success(&mut self) {
     405            8 :         // stop the stopwatch and record the time that we have accumulated
     406            8 :         self.stop = Some(time::Instant::now());
     407            8 : 
     408            8 :         // success
     409            8 :         self.outcome = ConnectOutcome::Success;
     410            8 :     }
     411              : }
     412              : 
     413              : impl Drop for LatencyTimerPause<'_> {
     414           30 :     fn drop(&mut self) {
     415           30 :         let dur = self.start.elapsed();
     416           30 :         match self.waiting_for {
     417            0 :             Waiting::Cplane => self.timer.accumulated.cplane += dur,
     418           30 :             Waiting::Client => self.timer.accumulated.client += dur,
     419            0 :             Waiting::Compute => self.timer.accumulated.compute += dur,
     420              :         }
     421           30 :     }
     422              : }
     423              : 
     424              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     425              : pub enum ConnectOutcome {
     426              :     Success,
     427              :     Failed,
     428              : }
     429              : 
     430              : impl Drop for LatencyTimer {
     431           70 :     fn drop(&mut self) {
     432           70 :         let duration = self
     433           70 :             .stop
     434           70 :             .unwrap_or_else(time::Instant::now)
     435           70 :             .duration_since(self.start);
     436           70 : 
     437           70 :         let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
     438           70 : 
     439           70 :         // Excluding client communication from the accumulated time.
     440           70 :         metric.observe(
     441           70 :             ComputeConnectionLatencyGroup {
     442           70 :                 protocol: self.protocol,
     443           70 :                 cold_start_info: self.cold_start_info,
     444           70 :                 outcome: self.outcome,
     445           70 :                 excluded: LatencyExclusions::Client,
     446           70 :             },
     447           70 :             duration
     448           70 :                 .saturating_sub(self.accumulated.client)
     449           70 :                 .as_secs_f64(),
     450           70 :         );
     451           70 : 
     452           70 :         // Exclude client and cplane communication from the accumulated time.
     453           70 :         let accumulated_total = self.accumulated.client + self.accumulated.cplane;
     454           70 :         metric.observe(
     455           70 :             ComputeConnectionLatencyGroup {
     456           70 :                 protocol: self.protocol,
     457           70 :                 cold_start_info: self.cold_start_info,
     458           70 :                 outcome: self.outcome,
     459           70 :                 excluded: LatencyExclusions::ClientAndCplane,
     460           70 :             },
     461           70 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     462           70 :         );
     463           70 :     }
     464              : }
     465              : 
     466              : impl From<bool> for Bool {
     467            6 :     fn from(value: bool) -> Self {
     468            6 :         if value {
     469            4 :             Bool::True
     470              :         } else {
     471            2 :             Bool::False
     472              :         }
     473            6 :     }
     474              : }
     475              : 
     476          360 : #[derive(LabelGroup)]
     477              : #[label(set = InvalidEndpointsSet)]
     478              : pub struct InvalidEndpointsGroup {
     479              :     pub protocol: Protocol,
     480              :     pub rejected: Bool,
     481              :     pub outcome: ConnectOutcome,
     482              : }
        

Generated by: LCOV version 2.1-beta