LCOV - code coverage report
Current view: top level - proxy/src - metrics.rs (source / functions) Coverage Total Hit
Test: a14d6a1f0ccf210374e9eaed9918e97cd6f5d5ba.info Lines: 61.7 % 196 121
Test Date: 2025-08-04 14:37:31 Functions: 51.5 % 33 17

            Line data    Source code
       1              : use std::sync::{Arc, OnceLock};
       2              : 
       3              : use lasso::ThreadedRodeo;
       4              : use measured::label::{
       5              :     FixedCardinalitySet, LabelGroupSet, LabelGroupVisitor, LabelName, LabelSet, LabelValue,
       6              :     StaticLabelSet,
       7              : };
       8              : use measured::metric::group::Encoding;
       9              : use measured::metric::histogram::Thresholds;
      10              : use measured::metric::name::MetricName;
      11              : use measured::{
      12              :     Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
      13              :     LabelGroup, MetricGroup,
      14              : };
      15              : use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
      16              : use tokio::time::{self, Instant};
      17              : 
      18              : use crate::control_plane::messages::ColdStartInfo;
      19              : use crate::error::ErrorKind;
      20              : 
      21              : #[derive(MetricGroup)]
      22              : #[metric(new())]
      23              : pub struct Metrics {
      24              :     #[metric(namespace = "proxy")]
      25              :     #[metric(init = ProxyMetrics::new())]
      26              :     pub proxy: ProxyMetrics,
      27              : 
      28              :     #[metric(namespace = "wake_compute_lock")]
      29              :     pub wake_compute_lock: ApiLockMetrics,
      30              : 
      31              :     #[metric(namespace = "service")]
      32              :     pub service: ServiceMetrics,
      33              : 
      34              :     #[metric(namespace = "cache")]
      35              :     pub cache: CacheMetrics,
      36              : }
      37              : 
      38              : impl Metrics {
      39              :     #[track_caller]
      40          344 :     pub fn get() -> &'static Self {
      41              :         static SELF: OnceLock<Metrics> = OnceLock::new();
      42              : 
      43          344 :         SELF.get_or_init(|| {
      44           59 :             let mut metrics = Metrics::new();
      45              : 
      46           59 :             metrics.proxy.errors_total.init_all_dense();
      47           59 :             metrics.proxy.redis_errors_total.init_all_dense();
      48           59 :             metrics.proxy.redis_events_count.init_all_dense();
      49           59 :             metrics.proxy.retries_metric.init_all_dense();
      50           59 :             metrics.proxy.connection_failures_total.init_all_dense();
      51              : 
      52           59 :             metrics
      53           59 :         })
      54          344 :     }
      55              : }
      56              : 
      57              : #[derive(MetricGroup)]
      58              : #[metric(new())]
      59              : pub struct ProxyMetrics {
      60              :     #[metric(flatten)]
      61              :     pub db_connections: CounterPairVec<NumDbConnectionsGauge>,
      62              :     #[metric(flatten)]
      63              :     pub client_connections: CounterPairVec<NumClientConnectionsGauge>,
      64              :     #[metric(flatten)]
      65              :     pub connection_requests: CounterPairVec<NumConnectionRequestsGauge>,
      66              :     #[metric(flatten)]
      67              :     pub http_endpoint_pools: HttpEndpointPools,
      68              :     #[metric(flatten)]
      69              :     pub cancel_channel_size: CounterPairVec<CancelChannelSizeGauge>,
      70              : 
      71              :     /// Time it took for proxy to establish a connection to the compute endpoint.
      72              :     // largest bucket = 2^16 * 0.5ms = 32s
      73              :     #[metric(metadata = Thresholds::exponential_buckets(0.0005, 2.0))]
      74              :     pub compute_connection_latency_seconds: HistogramVec<ComputeConnectionLatencySet, 16>,
      75              : 
      76              :     /// Time it took for proxy to receive a response from control plane.
      77              :     #[metric(
      78              :         // largest bucket = 2^16 * 0.2ms = 13s
      79              :         metadata = Thresholds::exponential_buckets(0.0002, 2.0),
      80              :     )]
      81              :     pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
      82              : 
      83              :     /// Size of the HTTP request body lengths.
      84              :     // smallest bucket = 16 bytes
      85              :     // largest bucket = 4^12 * 16 bytes = 256MB
      86              :     #[metric(metadata = Thresholds::exponential_buckets(16.0, 4.0))]
      87              :     pub http_conn_content_length_bytes: HistogramVec<StaticLabelSet<HttpDirection>, 12>,
      88              : 
      89              :     /// Time it takes to reclaim unused connection pools.
      90              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
      91              :     pub http_pool_reclaimation_lag_seconds: Histogram<16>,
      92              : 
      93              :     /// Number of opened connections to a database.
      94              :     pub http_pool_opened_connections: Gauge,
      95              : 
      96              :     /// Number of allowed ips
      97              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
      98              :     pub allowed_ips_number: Histogram<10>,
      99              : 
     100              :     /// Number of allowed VPC endpoints IDs
     101              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
     102              :     pub allowed_vpc_endpoint_ids: Histogram<10>,
     103              : 
     104              :     /// Number of connections, by the method we used to determine the endpoint.
     105              :     pub accepted_connections_by_sni: CounterVec<SniSet>,
     106              : 
     107              :     /// Number of connection failures (per kind).
     108              :     pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
     109              : 
     110              :     /// Number of wake-up failures (per kind).
     111              :     pub connection_failures_breakdown: CounterVec<ConnectionFailuresBreakdownSet>,
     112              : 
     113              :     /// Number of bytes sent/received between all clients and backends.
     114              :     pub io_bytes: CounterVec<StaticLabelSet<Direction>>,
     115              : 
     116              :     /// Number of IO errors while logging.
     117              :     pub logging_errors_count: Counter,
     118              : 
     119              :     /// Number of errors by a given classification.
     120              :     pub errors_total: CounterVec<StaticLabelSet<crate::error::ErrorKind>>,
     121              : 
     122              :     /// Number of cancellation requests (per found/not_found).
     123              :     pub cancellation_requests_total: CounterVec<CancellationRequestSet>,
     124              : 
     125              :     /// Number of errors by a given classification
     126              :     pub redis_errors_total: CounterVec<RedisErrorsSet>,
     127              : 
     128              :     /// Number of TLS handshake failures
     129              :     pub tls_handshake_failures: Counter,
     130              : 
     131              :     /// Number of SHA 256 rounds executed.
     132              :     pub sha_rounds: Counter,
     133              : 
     134              :     /// HLL approximate cardinality of endpoints that are connecting
     135              :     pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
     136              : 
     137              :     /// Number of endpoints affected by errors of a given classification
     138              :     pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
     139              : 
     140              :     /// Number of retries (per outcome, per retry_type).
     141              :     #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
     142              :     pub retries_metric: HistogramVec<RetriesMetricSet, 9>,
     143              : 
     144              :     /// Number of events consumed from redis (per event type).
     145              :     pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
     146              : 
     147              :     #[metric(namespace = "connect_compute_lock")]
     148              :     pub connect_compute_lock: ApiLockMetrics,
     149              : 
     150              :     #[metric(namespace = "scram_pool")]
     151              :     pub scram_pool: OnceLockWrapper<Arc<ThreadPoolMetrics>>,
     152              : }
     153              : 
     154              : /// A Wrapper over [`OnceLock`] to implement [`MetricGroup`].
     155              : pub struct OnceLockWrapper<T>(pub OnceLock<T>);
     156              : 
     157              : impl<T> Default for OnceLockWrapper<T> {
     158           59 :     fn default() -> Self {
     159           59 :         Self(OnceLock::new())
     160           59 :     }
     161              : }
     162              : 
     163              : impl<Enc: Encoding, T: MetricGroup<Enc>> MetricGroup<Enc> for OnceLockWrapper<T> {
     164            0 :     fn collect_group_into(&self, enc: &mut Enc) -> Result<(), Enc::Err> {
     165            0 :         if let Some(inner) = self.0.get() {
     166            0 :             inner.collect_group_into(enc)?;
     167            0 :         }
     168            0 :         Ok(())
     169            0 :     }
     170              : }
     171              : 
     172              : #[derive(MetricGroup)]
     173              : #[metric(new())]
     174              : pub struct ApiLockMetrics {
     175              :     /// Number of semaphores registered in this api lock
     176              :     pub semaphores_registered: Counter,
     177              :     /// Number of semaphores unregistered in this api lock
     178              :     pub semaphores_unregistered: Counter,
     179              :     /// Time it takes to reclaim unused semaphores in the api lock
     180              :     #[metric(metadata = Thresholds::exponential_buckets(1e-6, 2.0))]
     181              :     pub reclamation_lag_seconds: Histogram<16>,
     182              :     /// Time it takes to acquire a semaphore lock
     183              :     #[metric(metadata = Thresholds::exponential_buckets(1e-4, 2.0))]
     184              :     pub semaphore_acquire_seconds: Histogram<16>,
     185              : }
     186              : 
     187              : impl Default for ApiLockMetrics {
     188          118 :     fn default() -> Self {
     189          118 :         Self::new()
     190          118 :     }
     191              : }
     192              : 
     193              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     194              : #[label(singleton = "direction")]
     195              : pub enum HttpDirection {
     196              :     Request,
     197              :     Response,
     198              : }
     199              : 
     200              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     201              : #[label(singleton = "direction")]
     202              : pub enum Direction {
     203              :     Tx,
     204              :     Rx,
     205              : }
     206              : 
     207              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     208              : #[label(singleton = "protocol")]
     209              : pub enum Protocol {
     210              :     Http,
     211              :     Ws,
     212              :     Tcp,
     213              :     SniRouter,
     214              : }
     215              : 
     216              : impl Protocol {
     217            6 :     pub fn as_str(self) -> &'static str {
     218            6 :         match self {
     219            0 :             Protocol::Http => "http",
     220            0 :             Protocol::Ws => "ws",
     221            6 :             Protocol::Tcp => "tcp",
     222            0 :             Protocol::SniRouter => "sni_router",
     223              :         }
     224            6 :     }
     225              : }
     226              : 
     227              : impl std::fmt::Display for Protocol {
     228            6 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     229            6 :         f.write_str(self.as_str())
     230            6 :     }
     231              : }
     232              : 
     233              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     234              : pub enum Bool {
     235              :     True,
     236              :     False,
     237              : }
     238              : 
     239              : #[derive(LabelGroup)]
     240              : #[label(set = ConsoleRequestSet)]
     241              : pub struct ConsoleRequest<'a> {
     242              :     #[label(dynamic_with = ThreadedRodeo, default)]
     243              :     pub request: &'a str,
     244              : }
     245              : 
     246              : #[derive(MetricGroup, Default)]
     247              : pub struct HttpEndpointPools {
     248              :     /// Number of endpoints we have registered pools for
     249              :     pub http_pool_endpoints_registered_total: Counter,
     250              :     /// Number of endpoints we have unregistered pools for
     251              :     pub http_pool_endpoints_unregistered_total: Counter,
     252              : }
     253              : 
     254              : pub struct HttpEndpointPoolsGuard<'a> {
     255              :     dec: &'a Counter,
     256              : }
     257              : 
     258              : impl Drop for HttpEndpointPoolsGuard<'_> {
     259            2 :     fn drop(&mut self) {
     260            2 :         self.dec.inc();
     261            2 :     }
     262              : }
     263              : 
     264              : impl HttpEndpointPools {
     265            2 :     pub fn guard(&self) -> HttpEndpointPoolsGuard<'_> {
     266            2 :         self.http_pool_endpoints_registered_total.inc();
     267            2 :         HttpEndpointPoolsGuard {
     268            2 :             dec: &self.http_pool_endpoints_unregistered_total,
     269            2 :         }
     270            2 :     }
     271              : }
     272              : pub struct NumDbConnectionsGauge;
     273              : impl CounterPairAssoc for NumDbConnectionsGauge {
     274              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_db_connections_total");
     275              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_db_connections_total");
     276              :     const INC_HELP: &'static str = "Number of opened connections to a database.";
     277              :     const DEC_HELP: &'static str = "Number of closed connections to a database.";
     278              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     279              : }
     280              : pub type NumDbConnectionsGuard<'a> = metrics::MeasuredCounterPairGuard<'a, NumDbConnectionsGauge>;
     281              : 
     282              : pub struct NumClientConnectionsGauge;
     283              : impl CounterPairAssoc for NumClientConnectionsGauge {
     284              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_client_connections_total");
     285              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_client_connections_total");
     286              :     const INC_HELP: &'static str = "Number of opened connections from a client.";
     287              :     const DEC_HELP: &'static str = "Number of closed connections from a client.";
     288              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     289              : }
     290              : pub type NumClientConnectionsGuard<'a> =
     291              :     metrics::MeasuredCounterPairGuard<'a, NumClientConnectionsGauge>;
     292              : 
     293              : pub struct NumConnectionRequestsGauge;
     294              : impl CounterPairAssoc for NumConnectionRequestsGauge {
     295              :     const INC_NAME: &'static MetricName = MetricName::from_str("accepted_connections_total");
     296              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_connections_total");
     297              :     const INC_HELP: &'static str = "Number of client connections accepted.";
     298              :     const DEC_HELP: &'static str = "Number of client connections closed.";
     299              :     type LabelGroupSet = StaticLabelSet<Protocol>;
     300              : }
     301              : pub type NumConnectionRequestsGuard<'a> =
     302              :     metrics::MeasuredCounterPairGuard<'a, NumConnectionRequestsGauge>;
     303              : 
     304              : pub struct CancelChannelSizeGauge;
     305              : impl CounterPairAssoc for CancelChannelSizeGauge {
     306              :     const INC_NAME: &'static MetricName = MetricName::from_str("opened_msgs_cancel_channel_total");
     307              :     const DEC_NAME: &'static MetricName = MetricName::from_str("closed_msgs_cancel_channel_total");
     308              :     const INC_HELP: &'static str = "Number of processing messages in the cancellation channel.";
     309              :     const DEC_HELP: &'static str = "Number of closed messages in the cancellation channel.";
     310              :     type LabelGroupSet = StaticLabelSet<RedisMsgKind>;
     311              : }
     312              : pub type CancelChannelSizeGuard<'a> = metrics::MeasuredCounterPairGuard<'a, CancelChannelSizeGauge>;
     313              : 
     314              : #[derive(LabelGroup)]
     315              : #[label(set = ComputeConnectionLatencySet)]
     316              : pub struct ComputeConnectionLatencyGroup {
     317              :     protocol: Protocol,
     318              :     cold_start_info: ColdStartInfo,
     319              :     outcome: ConnectOutcome,
     320              :     excluded: LatencyExclusions,
     321              : }
     322              : 
     323              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     324              : pub enum LatencyExclusions {
     325              :     Client,
     326              :     ClientAndCplane,
     327              :     ClientCplaneCompute,
     328              :     ClientCplaneComputeRetry,
     329              : }
     330              : 
     331              : #[derive(LabelGroup)]
     332              : #[label(set = SniSet)]
     333              : pub struct SniGroup {
     334              :     pub protocol: Protocol,
     335              :     pub kind: SniKind,
     336              : }
     337              : 
     338              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     339              : pub enum SniKind {
     340              :     /// Domain name based routing. SNI for libpq/websockets. Host for HTTP
     341              :     Sni,
     342              :     /// Metadata based routing. `options` for libpq/websockets. Header for HTTP
     343              :     NoSni,
     344              :     /// Metadata based routing, using the password field.
     345              :     PasswordHack,
     346              : }
     347              : 
     348              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     349              : #[label(singleton = "kind")]
     350              : pub enum ConnectionFailureKind {
     351              :     ComputeCached,
     352              :     ComputeUncached,
     353              : }
     354              : 
     355              : #[derive(LabelGroup)]
     356              : #[label(set = ConnectionFailuresBreakdownSet)]
     357              : pub struct ConnectionFailuresBreakdownGroup {
     358              :     pub kind: ErrorKind,
     359              :     pub retry: Bool,
     360              : }
     361              : 
     362              : #[derive(LabelGroup, Copy, Clone)]
     363              : #[label(set = RedisErrorsSet)]
     364              : pub struct RedisErrors<'a> {
     365              :     #[label(dynamic_with = ThreadedRodeo, default)]
     366              :     pub channel: &'a str,
     367              : }
     368              : 
     369              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     370              : pub enum CancellationOutcome {
     371              :     NotFound,
     372              :     Found,
     373              :     RateLimitExceeded,
     374              : }
     375              : 
     376              : #[derive(LabelGroup)]
     377              : #[label(set = CancellationRequestSet)]
     378              : pub struct CancellationRequest {
     379              :     pub kind: CancellationOutcome,
     380              : }
     381              : 
     382              : #[derive(Clone, Copy)]
     383              : pub enum Waiting {
     384              :     Cplane,
     385              :     Client,
     386              :     Compute,
     387              :     RetryTimeout,
     388              : }
     389              : 
     390              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     391              : #[label(singleton = "kind")]
     392              : #[allow(clippy::enum_variant_names)]
     393              : pub enum RedisMsgKind {
     394              :     Set,
     395              :     Get,
     396              :     Expire,
     397              :     HGet,
     398              : }
     399              : 
     400              : #[derive(Default, Clone)]
     401              : pub struct LatencyAccumulated {
     402              :     pub cplane: time::Duration,
     403              :     pub client: time::Duration,
     404              :     pub compute: time::Duration,
     405              :     pub retry: time::Duration,
     406              : }
     407              : 
     408              : impl std::fmt::Display for LatencyAccumulated {
     409            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     410            0 :         write!(
     411            0 :             f,
     412            0 :             "client: {}, cplane: {}, compute: {}, retry: {}",
     413            0 :             self.client.as_micros(),
     414            0 :             self.cplane.as_micros(),
     415            0 :             self.compute.as_micros(),
     416            0 :             self.retry.as_micros()
     417              :         )
     418            0 :     }
     419              : }
     420              : 
     421              : pub struct LatencyTimer {
     422              :     // time since the stopwatch was started
     423              :     start: time::Instant,
     424              :     // time since the stopwatch was stopped
     425              :     stop: Option<time::Instant>,
     426              :     // accumulated time on the stopwatch
     427              :     accumulated: LatencyAccumulated,
     428              :     // label data
     429              :     protocol: Protocol,
     430              :     cold_start_info: ColdStartInfo,
     431              :     outcome: ConnectOutcome,
     432              : 
     433              :     skip_reporting: bool,
     434              : }
     435              : 
     436              : impl LatencyTimer {
     437           77 :     pub fn new(protocol: Protocol) -> Self {
     438           77 :         Self {
     439           77 :             start: time::Instant::now(),
     440           77 :             stop: None,
     441           77 :             accumulated: LatencyAccumulated::default(),
     442           77 :             protocol,
     443           77 :             cold_start_info: ColdStartInfo::Unknown,
     444           77 :             // assume failed unless otherwise specified
     445           77 :             outcome: ConnectOutcome::Failed,
     446           77 :             skip_reporting: false,
     447           77 :         }
     448           77 :     }
     449              : 
     450            0 :     pub(crate) fn noop(protocol: Protocol) -> Self {
     451            0 :         Self {
     452            0 :             start: time::Instant::now(),
     453            0 :             stop: None,
     454            0 :             accumulated: LatencyAccumulated::default(),
     455            0 :             protocol,
     456            0 :             cold_start_info: ColdStartInfo::Unknown,
     457            0 :             // assume failed unless otherwise specified
     458            0 :             outcome: ConnectOutcome::Failed,
     459            0 :             skip_reporting: true,
     460            0 :         }
     461            0 :     }
     462              : 
     463           52 :     pub fn unpause(&mut self, start: Instant, waiting_for: Waiting) {
     464           52 :         let dur = start.elapsed();
     465           52 :         match waiting_for {
     466            0 :             Waiting::Cplane => self.accumulated.cplane += dur,
     467           39 :             Waiting::Client => self.accumulated.client += dur,
     468            7 :             Waiting::Compute => self.accumulated.compute += dur,
     469            6 :             Waiting::RetryTimeout => self.accumulated.retry += dur,
     470              :         }
     471           52 :     }
     472              : 
     473            0 :     pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) {
     474            0 :         self.cold_start_info = cold_start_info;
     475            0 :     }
     476              : 
     477            8 :     pub fn success(&mut self) {
     478              :         // stop the stopwatch and record the time that we have accumulated
     479            8 :         self.stop = Some(time::Instant::now());
     480              : 
     481              :         // success
     482            8 :         self.outcome = ConnectOutcome::Success;
     483            8 :     }
     484              : 
     485            0 :     pub fn accumulated(&self) -> LatencyAccumulated {
     486            0 :         self.accumulated.clone()
     487            0 :     }
     488              : }
     489              : 
     490              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     491              : pub enum ConnectOutcome {
     492              :     Success,
     493              :     Failed,
     494              : }
     495              : 
     496              : impl Drop for LatencyTimer {
     497           77 :     fn drop(&mut self) {
     498           77 :         if self.skip_reporting {
     499            0 :             return;
     500           77 :         }
     501              : 
     502           77 :         let duration = self
     503           77 :             .stop
     504           77 :             .unwrap_or_else(time::Instant::now)
     505           77 :             .duration_since(self.start);
     506              : 
     507           77 :         let metric = &Metrics::get().proxy.compute_connection_latency_seconds;
     508              : 
     509              :         // Excluding client communication from the accumulated time.
     510           77 :         metric.observe(
     511           77 :             ComputeConnectionLatencyGroup {
     512           77 :                 protocol: self.protocol,
     513           77 :                 cold_start_info: self.cold_start_info,
     514           77 :                 outcome: self.outcome,
     515           77 :                 excluded: LatencyExclusions::Client,
     516           77 :             },
     517           77 :             duration
     518           77 :                 .saturating_sub(self.accumulated.client)
     519           77 :                 .as_secs_f64(),
     520              :         );
     521              : 
     522              :         // Exclude client and cplane communication from the accumulated time.
     523           77 :         let accumulated_total = self.accumulated.client + self.accumulated.cplane;
     524           77 :         metric.observe(
     525           77 :             ComputeConnectionLatencyGroup {
     526           77 :                 protocol: self.protocol,
     527           77 :                 cold_start_info: self.cold_start_info,
     528           77 :                 outcome: self.outcome,
     529           77 :                 excluded: LatencyExclusions::ClientAndCplane,
     530           77 :             },
     531           77 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     532              :         );
     533              : 
     534              :         // Exclude client, cplane, compute communication from the accumulated time.
     535           77 :         let accumulated_total =
     536           77 :             self.accumulated.client + self.accumulated.cplane + self.accumulated.compute;
     537           77 :         metric.observe(
     538           77 :             ComputeConnectionLatencyGroup {
     539           77 :                 protocol: self.protocol,
     540           77 :                 cold_start_info: self.cold_start_info,
     541           77 :                 outcome: self.outcome,
     542           77 :                 excluded: LatencyExclusions::ClientCplaneCompute,
     543           77 :             },
     544           77 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     545              :         );
     546              : 
     547              :         // Exclude client, cplane, compute, retry communication from the accumulated time.
     548           77 :         let accumulated_total = self.accumulated.client
     549           77 :             + self.accumulated.cplane
     550           77 :             + self.accumulated.compute
     551           77 :             + self.accumulated.retry;
     552           77 :         metric.observe(
     553           77 :             ComputeConnectionLatencyGroup {
     554           77 :                 protocol: self.protocol,
     555           77 :                 cold_start_info: self.cold_start_info,
     556           77 :                 outcome: self.outcome,
     557           77 :                 excluded: LatencyExclusions::ClientCplaneComputeRetry,
     558           77 :             },
     559           77 :             duration.saturating_sub(accumulated_total).as_secs_f64(),
     560              :         );
     561           77 :     }
     562              : }
     563              : 
     564              : impl From<bool> for Bool {
     565            3 :     fn from(value: bool) -> Self {
     566            3 :         if value { Bool::True } else { Bool::False }
     567            3 :     }
     568              : }
     569              : 
     570              : #[derive(LabelGroup)]
     571              : #[label(set = RetriesMetricSet)]
     572              : pub struct RetriesMetricGroup {
     573              :     pub outcome: ConnectOutcome,
     574              :     pub retry_type: RetryType,
     575              : }
     576              : 
     577              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     578              : pub enum RetryType {
     579              :     WakeCompute,
     580              :     ConnectToCompute,
     581              : }
     582              : 
     583              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     584              : #[label(singleton = "event")]
     585              : pub enum RedisEventsCount {
     586              :     EndpointCreated,
     587              :     BranchCreated,
     588              :     ProjectCreated,
     589              :     CancelSession,
     590              :     InvalidateRole,
     591              :     InvalidateEndpoint,
     592              :     InvalidateProject,
     593              :     InvalidateProjects,
     594              :     InvalidateOrg,
     595              : }
     596              : 
     597              : pub struct ThreadPoolWorkers(usize);
     598              : #[derive(Copy, Clone)]
     599              : pub struct ThreadPoolWorkerId(pub usize);
     600              : 
     601              : impl LabelValue for ThreadPoolWorkerId {
     602            0 :     fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
     603            0 :         v.write_int(self.0 as i64)
     604            0 :     }
     605              : }
     606              : 
     607              : impl LabelGroup for ThreadPoolWorkerId {
     608            0 :     fn visit_values(&self, v: &mut impl measured::label::LabelGroupVisitor) {
     609            0 :         v.write_value(LabelName::from_str("worker"), self);
     610            0 :     }
     611              : }
     612              : 
     613              : impl LabelGroupSet for ThreadPoolWorkers {
     614              :     type Group<'a> = ThreadPoolWorkerId;
     615              : 
     616           14 :     fn cardinality(&self) -> Option<usize> {
     617           14 :         Some(self.0)
     618           14 :     }
     619              : 
     620           16 :     fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
     621           16 :         Some(value)
     622           16 :     }
     623              : 
     624            0 :     fn decode_dense(&self, value: usize) -> Self::Group<'_> {
     625            0 :         ThreadPoolWorkerId(value)
     626            0 :     }
     627              : 
     628              :     type Unique = usize;
     629              : 
     630           16 :     fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
     631           16 :         Some(value.0)
     632           16 :     }
     633              : 
     634            0 :     fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
     635            0 :         ThreadPoolWorkerId(*value)
     636            0 :     }
     637              : }
     638              : 
     639              : impl LabelSet for ThreadPoolWorkers {
     640              :     type Value<'a> = ThreadPoolWorkerId;
     641              : 
     642            0 :     fn dynamic_cardinality(&self) -> Option<usize> {
     643            0 :         Some(self.0)
     644            0 :     }
     645              : 
     646            0 :     fn encode(&self, value: Self::Value<'_>) -> Option<usize> {
     647            0 :         (value.0 < self.0).then_some(value.0)
     648            0 :     }
     649              : 
     650            0 :     fn decode(&self, value: usize) -> Self::Value<'_> {
     651            0 :         ThreadPoolWorkerId(value)
     652            0 :     }
     653              : }
     654              : 
     655              : impl FixedCardinalitySet for ThreadPoolWorkers {
     656            0 :     fn cardinality(&self) -> usize {
     657            0 :         self.0
     658            0 :     }
     659              : }
     660              : 
     661              : #[derive(MetricGroup)]
     662              : #[metric(new(workers: usize))]
     663              : pub struct ThreadPoolMetrics {
     664              :     #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
     665              :     pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
     666              :     #[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
     667              :     pub worker_task_skips_total: CounterVec<ThreadPoolWorkers>,
     668              : }
     669              : 
     670              : #[derive(MetricGroup, Default)]
     671              : pub struct ServiceMetrics {
     672              :     pub info: InfoMetric<ServiceInfo>,
     673              : }
     674              : 
     675              : #[derive(Default)]
     676              : pub struct ServiceInfo {
     677              :     pub state: ServiceState,
     678              : }
     679              : 
     680              : impl ServiceInfo {
     681            0 :     pub const fn running() -> Self {
     682            0 :         ServiceInfo {
     683            0 :             state: ServiceState::Running,
     684            0 :         }
     685            0 :     }
     686              : 
     687            0 :     pub const fn terminating() -> Self {
     688            0 :         ServiceInfo {
     689            0 :             state: ServiceState::Terminating,
     690            0 :         }
     691            0 :     }
     692              : }
     693              : 
     694              : impl LabelGroup for ServiceInfo {
     695            0 :     fn visit_values(&self, v: &mut impl LabelGroupVisitor) {
     696              :         const STATE: &LabelName = LabelName::from_str("state");
     697            0 :         v.write_value(STATE, &self.state);
     698            0 :     }
     699              : }
     700              : 
     701              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug, Default)]
     702              : #[label(singleton = "state")]
     703              : pub enum ServiceState {
     704              :     #[default]
     705              :     Init,
     706              :     Running,
     707              :     Terminating,
     708              : }
     709              : 
     710              : #[derive(MetricGroup)]
     711              : #[metric(new())]
     712              : pub struct CacheMetrics {
     713              :     /// The capacity of the cache
     714              :     pub capacity: GaugeVec<StaticLabelSet<CacheKind>>,
     715              :     /// The total number of entries inserted into the cache
     716              :     pub inserted_total: CounterVec<StaticLabelSet<CacheKind>>,
     717              :     /// The total number of entries removed from the cache
     718              :     pub evicted_total: CounterVec<CacheEvictionSet>,
     719              :     /// The total number of cache requests
     720              :     pub request_total: CounterVec<CacheOutcomeSet>,
     721              : }
     722              : 
     723              : impl Default for CacheMetrics {
     724           59 :     fn default() -> Self {
     725           59 :         Self::new()
     726           59 :     }
     727              : }
     728              : 
     729              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     730              : #[label(singleton = "cache")]
     731              : pub enum CacheKind {
     732              :     NodeInfo,
     733              :     ProjectInfoEndpoints,
     734              :     ProjectInfoRoles,
     735              :     Schema,
     736              :     Pbkdf2,
     737              : }
     738              : 
     739              : #[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
     740              : pub enum CacheRemovalCause {
     741              :     Expired,
     742              :     Explicit,
     743              :     Replaced,
     744              :     Size,
     745              : }
     746              : 
     747              : #[derive(LabelGroup)]
     748              : #[label(set = CacheEvictionSet)]
     749              : pub struct CacheEviction {
     750              :     pub cache: CacheKind,
     751              :     pub cause: CacheRemovalCause,
     752              : }
     753              : 
     754              : #[derive(FixedCardinalityLabel, Copy, Clone)]
     755              : pub enum CacheOutcome {
     756              :     Hit,
     757              :     Miss,
     758              : }
     759              : 
     760              : #[derive(LabelGroup)]
     761              : #[label(set = CacheOutcomeSet)]
     762              : pub struct CacheOutcomeGroup {
     763              :     pub cache: CacheKind,
     764              :     pub outcome: CacheOutcome,
     765              : }
        

Generated by: LCOV version 2.1-beta