LCOV - differential code coverage report
Current view: top level - proxy/src - context.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 96.5 % 57 55 2 55
Current Date: 2024-01-09 02:06:09 Functions: 84.6 % 13 11 2 11
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Connection request monitoring contexts
       2                 : 
       3                 : use chrono::Utc;
       4                 : use once_cell::sync::OnceCell;
       5                 : use smol_str::SmolStr;
       6                 : use std::net::IpAddr;
       7                 : use tokio::sync::mpsc;
       8                 : use uuid::Uuid;
       9                 : 
      10                 : use crate::{console::messages::MetricsAuxInfo, error::ErrorKind, metrics::LatencyTimer};
      11                 : 
      12                 : pub mod parquet;
      13                 : 
      14                 : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
      15                 : 
      16 UBC           0 : #[derive(Clone)]
      17                 : /// Context data for a single request to connect to a database.
      18                 : ///
      19                 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
      20                 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
      21                 : pub struct RequestMonitoring {
      22                 :     pub peer_addr: IpAddr,
      23                 :     pub session_id: Uuid,
      24                 :     pub protocol: &'static str,
      25                 :     first_packet: chrono::DateTime<Utc>,
      26                 :     region: &'static str,
      27                 : 
      28                 :     // filled in as they are discovered
      29                 :     project: Option<SmolStr>,
      30                 :     branch: Option<SmolStr>,
      31                 :     endpoint_id: Option<SmolStr>,
      32                 :     user: Option<SmolStr>,
      33                 :     application: Option<SmolStr>,
      34                 :     error_kind: Option<ErrorKind>,
      35                 : 
      36                 :     // extra
      37                 :     // This sender is here to keep the request monitoring channel open while requests are taking place.
      38                 :     sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
      39                 :     pub latency_timer: LatencyTimer,
      40                 : }
      41                 : 
      42                 : impl RequestMonitoring {
      43 CBC         127 :     pub fn new(
      44             127 :         session_id: Uuid,
      45             127 :         peer_addr: IpAddr,
      46             127 :         protocol: &'static str,
      47             127 :         region: &'static str,
      48             127 :     ) -> Self {
      49             127 :         Self {
      50             127 :             peer_addr,
      51             127 :             session_id,
      52             127 :             protocol,
      53             127 :             first_packet: Utc::now(),
      54             127 :             region,
      55             127 : 
      56             127 :             project: None,
      57             127 :             branch: None,
      58             127 :             endpoint_id: None,
      59             127 :             user: None,
      60             127 :             application: None,
      61             127 :             error_kind: None,
      62             127 : 
      63             127 :             sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
      64             127 :             latency_timer: LatencyTimer::new(protocol),
      65             127 :         }
      66             127 :     }
      67                 : 
      68                 :     #[cfg(test)]
      69              20 :     pub fn test() -> Self {
      70              20 :         RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
      71              20 :     }
      72                 : 
      73               4 :     pub fn console_application_name(&self) -> String {
      74               4 :         format!(
      75               4 :             "{}/{}",
      76               4 :             self.application.as_deref().unwrap_or_default(),
      77               4 :             self.protocol
      78               4 :         )
      79               4 :     }
      80                 : 
      81              75 :     pub fn set_project(&mut self, x: MetricsAuxInfo) {
      82              75 :         self.branch = Some(x.branch_id);
      83              75 :         self.endpoint_id = Some(x.endpoint_id);
      84              75 :         self.project = Some(x.project_id);
      85              75 :     }
      86                 : 
      87             196 :     pub fn set_endpoint_id(&mut self, endpoint_id: Option<SmolStr>) {
      88             196 :         self.endpoint_id = endpoint_id.or_else(|| self.endpoint_id.clone());
      89             196 :     }
      90                 : 
      91             100 :     pub fn set_application(&mut self, app: Option<SmolStr>) {
      92             100 :         self.application = app.or_else(|| self.application.clone());
      93             100 :     }
      94                 : 
      95             145 :     pub fn set_user(&mut self, user: SmolStr) {
      96             145 :         self.user = Some(user);
      97             145 :     }
      98                 : 
      99             206 :     pub fn log(&mut self) {
     100             206 :         if let Some(tx) = self.sender.take() {
     101 UBC           0 :             let _: Result<(), _> = tx.send(self.clone());
     102 CBC         206 :         }
     103             206 :     }
     104                 : }
     105                 : 
     106                 : impl Drop for RequestMonitoring {
     107             127 :     fn drop(&mut self) {
     108             127 :         self.log()
     109             127 :     }
     110                 : }
        

Generated by: LCOV version 2.1-beta