LCOV - code coverage report
Current view: top level - proxy/src - context.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 93.1 % 87 81
Test Date: 2024-02-12 20:26:03 Functions: 73.7 % 19 14

            Line data    Source code
       1              : //! Connection request monitoring contexts
       2              : 
       3              : use chrono::Utc;
       4              : use once_cell::sync::OnceCell;
       5              : use smol_str::SmolStr;
       6              : use std::net::IpAddr;
       7              : use tokio::sync::mpsc;
       8              : use uuid::Uuid;
       9              : 
      10              : use crate::{
      11              :     console::messages::MetricsAuxInfo,
      12              :     error::ErrorKind,
      13              :     metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND},
      14              :     BranchId, DbName, EndpointId, ProjectId, RoleName,
      15              : };
      16              : 
      17              : pub mod parquet;
      18              : 
      19              : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
      20              : 
      21            0 : #[derive(Clone)]
      22              : /// Context data for a single request to connect to a database.
      23              : ///
      24              : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
      25              : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
      26              : pub struct RequestMonitoring {
      27              :     pub peer_addr: IpAddr,
      28              :     pub session_id: Uuid,
      29              :     pub protocol: &'static str,
      30              :     first_packet: chrono::DateTime<Utc>,
      31              :     region: &'static str,
      32              : 
      33              :     // filled in as they are discovered
      34              :     project: Option<ProjectId>,
      35              :     branch: Option<BranchId>,
      36              :     endpoint_id: Option<EndpointId>,
      37              :     dbname: Option<DbName>,
      38              :     user: Option<RoleName>,
      39              :     application: Option<SmolStr>,
      40              :     error_kind: Option<ErrorKind>,
      41              :     pub(crate) auth_method: Option<AuthMethod>,
      42              :     success: bool,
      43              : 
      44              :     // extra
      45              :     // This sender is here to keep the request monitoring channel open while requests are taking place.
      46              :     sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
      47              :     pub latency_timer: LatencyTimer,
      48              : }
      49              : 
      50            0 : #[derive(Clone, Debug)]
      51              : pub enum AuthMethod {
      52              :     // aka link aka passwordless
      53              :     Web,
      54              :     ScramSha256,
      55              :     ScramSha256Plus,
      56              :     Cleartext,
      57              : }
      58              : 
      59              : impl RequestMonitoring {
      60          176 :     pub fn new(
      61          176 :         session_id: Uuid,
      62          176 :         peer_addr: IpAddr,
      63          176 :         protocol: &'static str,
      64          176 :         region: &'static str,
      65          176 :     ) -> Self {
      66          176 :         Self {
      67          176 :             peer_addr,
      68          176 :             session_id,
      69          176 :             protocol,
      70          176 :             first_packet: Utc::now(),
      71          176 :             region,
      72          176 : 
      73          176 :             project: None,
      74          176 :             branch: None,
      75          176 :             endpoint_id: None,
      76          176 :             dbname: None,
      77          176 :             user: None,
      78          176 :             application: None,
      79          176 :             error_kind: None,
      80          176 :             auth_method: None,
      81          176 :             success: false,
      82          176 : 
      83          176 :             sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
      84          176 :             latency_timer: LatencyTimer::new(protocol),
      85          176 :         }
      86          176 :     }
      87              : 
      88              :     #[cfg(test)]
      89           64 :     pub fn test() -> Self {
      90           64 :         RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
      91           64 :     }
      92              : 
      93            4 :     pub fn console_application_name(&self) -> String {
      94            4 :         format!(
      95            4 :             "{}/{}",
      96            4 :             self.application.as_deref().unwrap_or_default(),
      97            4 :             self.protocol
      98            4 :         )
      99            4 :     }
     100              : 
     101           81 :     pub fn set_project(&mut self, x: MetricsAuxInfo) {
     102           81 :         self.branch = Some(x.branch_id);
     103           81 :         self.endpoint_id = Some(x.endpoint_id);
     104           81 :         self.project = Some(x.project_id);
     105           81 :     }
     106              : 
     107            0 :     pub fn set_project_id(&mut self, project_id: ProjectId) {
     108            0 :         self.project = Some(project_id);
     109            0 :     }
     110              : 
     111          109 :     pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
     112          109 :         crate::metrics::CONNECTING_ENDPOINTS
     113          109 :             .with_label_values(&[self.protocol])
     114          109 :             .measure(&endpoint_id);
     115          109 :         self.endpoint_id = Some(endpoint_id);
     116          109 :     }
     117              : 
     118           75 :     pub fn set_application(&mut self, app: Option<SmolStr>) {
     119           75 :         self.application = app.or_else(|| self.application.clone());
     120           75 :     }
     121              : 
     122           98 :     pub fn set_dbname(&mut self, dbname: DbName) {
     123           98 :         self.dbname = Some(dbname);
     124           98 :     }
     125              : 
     126          125 :     pub fn set_user(&mut self, user: RoleName) {
     127          125 :         self.user = Some(user);
     128          125 :     }
     129              : 
     130           53 :     pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
     131           53 :         self.auth_method = Some(auth_method);
     132           53 :     }
     133              : 
     134           22 :     pub fn set_error_kind(&mut self, kind: ErrorKind) {
     135           22 :         ERROR_BY_KIND
     136           22 :             .with_label_values(&[kind.to_metric_label()])
     137           22 :             .inc();
     138           22 :         if let Some(ep) = &self.endpoint_id {
     139           10 :             ENDPOINT_ERRORS_BY_KIND
     140           10 :                 .with_label_values(&[kind.to_metric_label()])
     141           10 :                 .measure(ep);
     142           12 :         }
     143           22 :         self.error_kind = Some(kind);
     144           22 :     }
     145              : 
     146           84 :     pub fn set_success(&mut self) {
     147           84 :         self.success = true;
     148           84 :     }
     149              : 
     150          240 :     pub fn log(&mut self) {
     151          240 :         if let Some(tx) = self.sender.take() {
     152            0 :             let _: Result<(), _> = tx.send(self.clone());
     153          240 :         }
     154          240 :     }
     155              : }
     156              : 
     157              : impl Drop for RequestMonitoring {
     158          176 :     fn drop(&mut self) {
     159          176 :         self.log()
     160          176 :     }
     161              : }
        

Generated by: LCOV version 2.1-beta