LCOV - code coverage report
Current view: top level - proxy/src - context.rs (source / functions) Coverage Total Hit
Test: 190869232aac3a234374e5bb62582e91cf5f5818.info Lines: 57.6 % 85 49
Test Date: 2024-02-23 13:21:27 Functions: 42.1 % 19 8

            Line data    Source code
       1              : //! Connection request monitoring contexts
       2              : 
       3              : use chrono::Utc;
       4              : use once_cell::sync::OnceCell;
       5              : use smol_str::SmolStr;
       6              : use std::net::IpAddr;
       7              : use tokio::sync::mpsc;
       8              : use uuid::Uuid;
       9              : 
      10              : use crate::{
      11              :     console::messages::MetricsAuxInfo,
      12              :     error::ErrorKind,
      13              :     metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND},
      14              :     BranchId, DbName, EndpointId, ProjectId, RoleName,
      15              : };
      16              : 
      17              : pub mod parquet;
      18              : 
      19              : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
      20              : 
      21            0 : #[derive(Clone)]
      22              : /// Context data for a single request to connect to a database.
      23              : ///
      24              : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
      25              : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
      26              : pub struct RequestMonitoring {
      27              :     pub peer_addr: IpAddr,
      28              :     pub session_id: Uuid,
      29              :     pub protocol: &'static str,
      30              :     first_packet: chrono::DateTime<Utc>,
      31              :     region: &'static str,
      32              : 
      33              :     // filled in as they are discovered
      34              :     project: Option<ProjectId>,
      35              :     branch: Option<BranchId>,
      36              :     endpoint_id: Option<EndpointId>,
      37              :     dbname: Option<DbName>,
      38              :     user: Option<RoleName>,
      39              :     application: Option<SmolStr>,
      40              :     error_kind: Option<ErrorKind>,
      41              :     pub(crate) auth_method: Option<AuthMethod>,
      42              :     success: bool,
      43              : 
      44              :     // extra
      45              :     // This sender is here to keep the request monitoring channel open while requests are taking place.
      46              :     sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
      47              :     pub latency_timer: LatencyTimer,
      48              : }
      49              : 
      50            0 : #[derive(Clone, Debug)]
      51              : pub enum AuthMethod {
      52              :     // aka link aka passwordless
      53              :     Web,
      54              :     ScramSha256,
      55              :     ScramSha256Plus,
      56              :     Cleartext,
      57              : }
      58              : 
      59              : impl RequestMonitoring {
      60           64 :     pub fn new(
      61           64 :         session_id: Uuid,
      62           64 :         peer_addr: IpAddr,
      63           64 :         protocol: &'static str,
      64           64 :         region: &'static str,
      65           64 :     ) -> Self {
      66           64 :         Self {
      67           64 :             peer_addr,
      68           64 :             session_id,
      69           64 :             protocol,
      70           64 :             first_packet: Utc::now(),
      71           64 :             region,
      72           64 : 
      73           64 :             project: None,
      74           64 :             branch: None,
      75           64 :             endpoint_id: None,
      76           64 :             dbname: None,
      77           64 :             user: None,
      78           64 :             application: None,
      79           64 :             error_kind: None,
      80           64 :             auth_method: None,
      81           64 :             success: false,
      82           64 : 
      83           64 :             sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
      84           64 :             latency_timer: LatencyTimer::new(protocol),
      85           64 :         }
      86           64 :     }
      87              : 
      88              :     #[cfg(test)]
      89           64 :     pub fn test() -> Self {
      90           64 :         RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
      91           64 :     }
      92              : 
      93            0 :     pub fn console_application_name(&self) -> String {
      94            0 :         format!(
      95            0 :             "{}/{}",
      96            0 :             self.application.as_deref().unwrap_or_default(),
      97            0 :             self.protocol
      98            0 :         )
      99            0 :     }
     100              : 
     101            0 :     pub fn set_project(&mut self, x: MetricsAuxInfo) {
     102            0 :         self.branch = Some(x.branch_id);
     103            0 :         self.endpoint_id = Some(x.endpoint_id);
     104            0 :         self.project = Some(x.project_id);
     105            0 :     }
     106              : 
     107            0 :     pub fn set_project_id(&mut self, project_id: ProjectId) {
     108            0 :         self.project = Some(project_id);
     109            0 :     }
     110              : 
     111           14 :     pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
     112           14 :         crate::metrics::CONNECTING_ENDPOINTS
     113           14 :             .with_label_values(&[self.protocol])
     114           14 :             .measure(&endpoint_id);
     115           14 :         self.endpoint_id = Some(endpoint_id);
     116           14 :     }
     117              : 
     118           26 :     pub fn set_application(&mut self, app: Option<SmolStr>) {
     119           26 :         self.application = app.or_else(|| self.application.clone());
     120           26 :     }
     121              : 
     122            2 :     pub fn set_dbname(&mut self, dbname: DbName) {
     123            2 :         self.dbname = Some(dbname);
     124            2 :     }
     125              : 
     126           26 :     pub fn set_user(&mut self, user: RoleName) {
     127           26 :         self.user = Some(user);
     128           26 :     }
     129              : 
     130            0 :     pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
     131            0 :         self.auth_method = Some(auth_method);
     132            0 :     }
     133              : 
     134            0 :     pub fn set_error_kind(&mut self, kind: ErrorKind) {
     135            0 :         ERROR_BY_KIND
     136            0 :             .with_label_values(&[kind.to_metric_label()])
     137            0 :             .inc();
     138            0 :         if let Some(ep) = &self.endpoint_id {
     139            0 :             ENDPOINT_ERRORS_BY_KIND
     140            0 :                 .with_label_values(&[kind.to_metric_label()])
     141            0 :                 .measure(ep);
     142            0 :         }
     143            0 :         self.error_kind = Some(kind);
     144            0 :     }
     145              : 
     146            0 :     pub fn set_success(&mut self) {
     147            0 :         self.success = true;
     148            0 :     }
     149              : 
     150            0 :     pub fn log(self) {}
     151              : }
     152              : 
     153              : impl Drop for RequestMonitoring {
     154           64 :     fn drop(&mut self) {
     155           64 :         if let Some(tx) = self.sender.take() {
     156            0 :             let _: Result<(), _> = tx.send(self.clone());
     157           64 :         }
     158           64 :     }
     159              : }
        

Generated by: LCOV version 2.1-beta