LCOV - code coverage report
Current view: top level - proxy/src - context.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 61.5 % 96 59
Test Date: 2024-02-29 11:57:12 Functions: 42.1 % 19 8

            Line data    Source code
       1              : //! Connection request monitoring contexts
       2              : 
       3              : use chrono::Utc;
       4              : use once_cell::sync::OnceCell;
       5              : use smol_str::SmolStr;
       6              : use std::net::IpAddr;
       7              : use tokio::sync::mpsc;
       8              : use tracing::{field::display, info_span, Span};
       9              : use uuid::Uuid;
      10              : 
      11              : use crate::{
      12              :     console::messages::MetricsAuxInfo,
      13              :     error::ErrorKind,
      14              :     metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND},
      15              :     BranchId, DbName, EndpointId, ProjectId, RoleName,
      16              : };
      17              : 
      18              : pub mod parquet;
      19              : 
      20              : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
      21              : 
      22            0 : #[derive(Clone)]
      23              : /// Context data for a single request to connect to a database.
      24              : ///
      25              : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
      26              : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
      27              : pub struct RequestMonitoring {
      28              :     pub peer_addr: IpAddr,
      29              :     pub session_id: Uuid,
      30              :     pub protocol: &'static str,
      31              :     first_packet: chrono::DateTime<Utc>,
      32              :     region: &'static str,
      33              :     pub span: Span,
      34              : 
      35              :     // filled in as they are discovered
      36              :     project: Option<ProjectId>,
      37              :     branch: Option<BranchId>,
      38              :     endpoint_id: Option<EndpointId>,
      39              :     dbname: Option<DbName>,
      40              :     user: Option<RoleName>,
      41              :     application: Option<SmolStr>,
      42              :     error_kind: Option<ErrorKind>,
      43              :     pub(crate) auth_method: Option<AuthMethod>,
      44              :     success: bool,
      45              :     is_cold_start: Option<bool>,
      46              : 
      47              :     // extra
      48              :     // This sender is here to keep the request monitoring channel open while requests are taking place.
      49              :     sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
      50              :     pub latency_timer: LatencyTimer,
      51              : }
      52              : 
      53            0 : #[derive(Clone, Debug)]
      54              : pub enum AuthMethod {
      55              :     // aka link aka passwordless
      56              :     Web,
      57              :     ScramSha256,
      58              :     ScramSha256Plus,
      59              :     Cleartext,
      60              : }
      61              : 
      62              : impl RequestMonitoring {
      63           64 :     pub fn new(
      64           64 :         session_id: Uuid,
      65           64 :         peer_addr: IpAddr,
      66           64 :         protocol: &'static str,
      67           64 :         region: &'static str,
      68           64 :     ) -> Self {
      69           64 :         let span = info_span!(
      70           64 :             "connect_request",
      71           64 :             %protocol,
      72           64 :             ?session_id,
      73           64 :             %peer_addr,
      74           64 :             ep = tracing::field::Empty,
      75           64 :         );
      76              : 
      77           64 :         Self {
      78           64 :             peer_addr,
      79           64 :             session_id,
      80           64 :             protocol,
      81           64 :             first_packet: Utc::now(),
      82           64 :             region,
      83           64 :             span,
      84           64 : 
      85           64 :             project: None,
      86           64 :             branch: None,
      87           64 :             endpoint_id: None,
      88           64 :             dbname: None,
      89           64 :             user: None,
      90           64 :             application: None,
      91           64 :             error_kind: None,
      92           64 :             auth_method: None,
      93           64 :             success: false,
      94           64 :             is_cold_start: None,
      95           64 : 
      96           64 :             sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
      97           64 :             latency_timer: LatencyTimer::new(protocol),
      98           64 :         }
      99           64 :     }
     100              : 
     101              :     #[cfg(test)]
     102           64 :     pub fn test() -> Self {
     103           64 :         RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
     104           64 :     }
     105              : 
     106            0 :     pub fn console_application_name(&self) -> String {
     107            0 :         format!(
     108            0 :             "{}/{}",
     109            0 :             self.application.as_deref().unwrap_or_default(),
     110            0 :             self.protocol
     111            0 :         )
     112            0 :     }
     113              : 
     114            0 :     pub fn set_project(&mut self, x: MetricsAuxInfo) {
     115            0 :         self.set_endpoint_id(x.endpoint_id);
     116            0 :         self.branch = Some(x.branch_id);
     117            0 :         self.project = Some(x.project_id);
     118            0 :         self.is_cold_start = x.is_cold_start;
     119            0 :     }
     120              : 
     121            0 :     pub fn set_project_id(&mut self, project_id: ProjectId) {
     122            0 :         self.project = Some(project_id);
     123            0 :     }
     124              : 
     125           14 :     pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
     126           14 :         self.span.record("ep", display(&endpoint_id));
     127           14 :         crate::metrics::CONNECTING_ENDPOINTS
     128           14 :             .with_label_values(&[self.protocol])
     129           14 :             .measure(&endpoint_id);
     130           14 :         self.endpoint_id = Some(endpoint_id);
     131           14 :     }
     132              : 
     133           26 :     pub fn set_application(&mut self, app: Option<SmolStr>) {
     134           26 :         self.application = app.or_else(|| self.application.clone());
     135           26 :     }
     136              : 
     137            2 :     pub fn set_dbname(&mut self, dbname: DbName) {
     138            2 :         self.dbname = Some(dbname);
     139            2 :     }
     140              : 
     141           26 :     pub fn set_user(&mut self, user: RoleName) {
     142           26 :         self.user = Some(user);
     143           26 :     }
     144              : 
     145            0 :     pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
     146            0 :         self.auth_method = Some(auth_method);
     147            0 :     }
     148              : 
     149            0 :     pub fn set_error_kind(&mut self, kind: ErrorKind) {
     150            0 :         ERROR_BY_KIND
     151            0 :             .with_label_values(&[kind.to_metric_label()])
     152            0 :             .inc();
     153            0 :         if let Some(ep) = &self.endpoint_id {
     154            0 :             ENDPOINT_ERRORS_BY_KIND
     155            0 :                 .with_label_values(&[kind.to_metric_label()])
     156            0 :                 .measure(ep);
     157            0 :         }
     158            0 :         self.error_kind = Some(kind);
     159            0 :     }
     160              : 
     161            0 :     pub fn set_success(&mut self) {
     162            0 :         self.success = true;
     163            0 :     }
     164              : 
     165            0 :     pub fn log(self) {}
     166              : }
     167              : 
     168              : impl Drop for RequestMonitoring {
     169           64 :     fn drop(&mut self) {
     170           64 :         if let Some(tx) = self.sender.take() {
     171            0 :             let _: Result<(), _> = tx.send(self.clone());
     172           64 :         }
     173           64 :     }
     174              : }
        

Generated by: LCOV version 2.1-beta