LCOV - code coverage report
Current view: top level - proxy/src - context.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 96.9 % 64 62
Test Date: 2024-02-07 07:37:29 Functions: 84.6 % 13 11

            Line data    Source code
       1              : //! Connection request monitoring contexts
       2              : 
       3              : use chrono::Utc;
       4              : use once_cell::sync::OnceCell;
       5              : use smol_str::SmolStr;
       6              : use std::net::IpAddr;
       7              : use tokio::sync::mpsc;
       8              : use uuid::Uuid;
       9              : 
      10              : use crate::{
      11              :     console::messages::MetricsAuxInfo, error::ErrorKind, metrics::LatencyTimer, BranchId,
      12              :     EndpointId, ProjectId, RoleName,
      13              : };
      14              : 
      15              : pub mod parquet;
      16              : 
      17              : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
      18              : 
      19            0 : #[derive(Clone)]
      20              : /// Context data for a single request to connect to a database.
      21              : ///
      22              : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
      23              : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
      24              : pub struct RequestMonitoring {
      25              :     pub peer_addr: IpAddr,
      26              :     pub session_id: Uuid,
      27              :     pub protocol: &'static str,
      28              :     first_packet: chrono::DateTime<Utc>,
      29              :     region: &'static str,
      30              : 
      31              :     // filled in as they are discovered
      32              :     project: Option<ProjectId>,
      33              :     branch: Option<BranchId>,
      34              :     endpoint_id: Option<EndpointId>,
      35              :     user: Option<RoleName>,
      36              :     application: Option<SmolStr>,
      37              :     error_kind: Option<ErrorKind>,
      38              :     success: bool,
      39              : 
      40              :     // extra
      41              :     // This sender is here to keep the request monitoring channel open while requests are taking place.
      42              :     sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
      43              :     pub latency_timer: LatencyTimer,
      44              : }
      45              : 
      46              : impl RequestMonitoring {
      47          149 :     pub fn new(
      48          149 :         session_id: Uuid,
      49          149 :         peer_addr: IpAddr,
      50          149 :         protocol: &'static str,
      51          149 :         region: &'static str,
      52          149 :     ) -> Self {
      53          149 :         Self {
      54          149 :             peer_addr,
      55          149 :             session_id,
      56          149 :             protocol,
      57          149 :             first_packet: Utc::now(),
      58          149 :             region,
      59          149 : 
      60          149 :             project: None,
      61          149 :             branch: None,
      62          149 :             endpoint_id: None,
      63          149 :             user: None,
      64          149 :             application: None,
      65          149 :             error_kind: None,
      66          149 :             success: false,
      67          149 : 
      68          149 :             sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
      69          149 :             latency_timer: LatencyTimer::new(protocol),
      70          149 :         }
      71          149 :     }
      72              : 
      73              :     #[cfg(test)]
      74           40 :     pub fn test() -> Self {
      75           40 :         RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
      76           40 :     }
      77              : 
      78            4 :     pub fn console_application_name(&self) -> String {
      79            4 :         format!(
      80            4 :             "{}/{}",
      81            4 :             self.application.as_deref().unwrap_or_default(),
      82            4 :             self.protocol
      83            4 :         )
      84            4 :     }
      85              : 
      86           84 :     pub fn set_project(&mut self, x: MetricsAuxInfo) {
      87           84 :         self.branch = Some(x.branch_id);
      88           84 :         self.endpoint_id = Some(x.endpoint_id);
      89           84 :         self.project = Some(x.project_id);
      90           84 :     }
      91              : 
      92          106 :     pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
      93          106 :         crate::metrics::CONNECTING_ENDPOINTS
      94          106 :             .with_label_values(&[self.protocol])
      95          106 :             .measure(&endpoint_id);
      96          106 :         self.endpoint_id = Some(endpoint_id);
      97          106 :     }
      98              : 
      99          115 :     pub fn set_application(&mut self, app: Option<SmolStr>) {
     100          115 :         self.application = app.or_else(|| self.application.clone());
     101          115 :     }
     102              : 
     103          122 :     pub fn set_user(&mut self, user: RoleName) {
     104          122 :         self.user = Some(user);
     105          122 :     }
     106              : 
     107           81 :     pub fn set_success(&mut self) {
     108           81 :         self.success = true;
     109           81 :     }
     110              : 
     111          230 :     pub fn log(&mut self) {
     112          230 :         if let Some(tx) = self.sender.take() {
     113            0 :             let _: Result<(), _> = tx.send(self.clone());
     114          230 :         }
     115          230 :     }
     116              : }
     117              : 
     118              : impl Drop for RequestMonitoring {
     119          149 :     fn drop(&mut self) {
     120          149 :         self.log()
     121          149 :     }
     122              : }
        

Generated by: LCOV version 2.1-beta