LCOV - code coverage report
Current view: top level - proxy/src - context.rs (source / functions) Coverage Total Hit
Test: 36bb8dd7c7efcb53483d1a7d9f7cb33e8406dcf0.info Lines: 62.7 % 102 64
Test Date: 2024-04-08 10:22:05 Functions: 52.9 % 17 9

            Line data    Source code
       1              : //! Connection request monitoring contexts
       2              : 
       3              : use chrono::Utc;
       4              : use once_cell::sync::OnceCell;
       5              : use smol_str::SmolStr;
       6              : use std::net::IpAddr;
       7              : use tokio::sync::mpsc;
       8              : use tracing::{field::display, info_span, Span};
       9              : use uuid::Uuid;
      10              : 
      11              : use crate::{
      12              :     console::messages::{ColdStartInfo, MetricsAuxInfo},
      13              :     error::ErrorKind,
      14              :     intern::{BranchIdInt, ProjectIdInt},
      15              :     metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND},
      16              :     DbName, EndpointId, RoleName,
      17              : };
      18              : 
      19              : use self::parquet::RequestData;
      20              : 
      21              : pub mod parquet;
      22              : 
      23              : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
      24              : 
      25              : /// Context data for a single request to connect to a database.
      26              : ///
      27              : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
      28              : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
      29              : pub struct RequestMonitoring {
      30              :     pub peer_addr: IpAddr,
      31              :     pub session_id: Uuid,
      32              :     pub protocol: &'static str,
      33              :     first_packet: chrono::DateTime<Utc>,
      34              :     region: &'static str,
      35              :     pub span: Span,
      36              : 
      37              :     // filled in as they are discovered
      38              :     project: Option<ProjectIdInt>,
      39              :     branch: Option<BranchIdInt>,
      40              :     endpoint_id: Option<EndpointId>,
      41              :     dbname: Option<DbName>,
      42              :     user: Option<RoleName>,
      43              :     application: Option<SmolStr>,
      44              :     error_kind: Option<ErrorKind>,
      45              :     pub(crate) auth_method: Option<AuthMethod>,
      46              :     success: bool,
      47              :     pub(crate) cold_start_info: ColdStartInfo,
      48              : 
      49              :     // extra
      50              :     // This sender is here to keep the request monitoring channel open while requests are taking place.
      51              :     sender: Option<mpsc::UnboundedSender<RequestData>>,
      52              :     pub latency_timer: LatencyTimer,
      53              : }
      54              : 
      55              : #[derive(Clone, Debug)]
      56              : pub enum AuthMethod {
      57              :     // aka link aka passwordless
      58              :     Web,
      59              :     ScramSha256,
      60              :     ScramSha256Plus,
      61              :     Cleartext,
      62              : }
      63              : 
      64              : impl RequestMonitoring {
      65           70 :     pub fn new(
      66           70 :         session_id: Uuid,
      67           70 :         peer_addr: IpAddr,
      68           70 :         protocol: &'static str,
      69           70 :         region: &'static str,
      70           70 :     ) -> Self {
      71           70 :         let span = info_span!(
      72           70 :             "connect_request",
      73           70 :             %protocol,
      74           70 :             ?session_id,
      75           70 :             %peer_addr,
      76           70 :             ep = tracing::field::Empty,
      77           70 :         );
      78              : 
      79           70 :         Self {
      80           70 :             peer_addr,
      81           70 :             session_id,
      82           70 :             protocol,
      83           70 :             first_packet: Utc::now(),
      84           70 :             region,
      85           70 :             span,
      86           70 : 
      87           70 :             project: None,
      88           70 :             branch: None,
      89           70 :             endpoint_id: None,
      90           70 :             dbname: None,
      91           70 :             user: None,
      92           70 :             application: None,
      93           70 :             error_kind: None,
      94           70 :             auth_method: None,
      95           70 :             success: false,
      96           70 :             cold_start_info: ColdStartInfo::Unknown,
      97           70 : 
      98           70 :             sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
      99           70 :             latency_timer: LatencyTimer::new(protocol),
     100           70 :         }
     101           70 :     }
     102              : 
     103              :     #[cfg(test)]
     104           70 :     pub fn test() -> Self {
     105           70 :         RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
     106           70 :     }
     107              : 
     108            0 :     pub fn console_application_name(&self) -> String {
     109            0 :         format!(
     110            0 :             "{}/{}",
     111            0 :             self.application.as_deref().unwrap_or_default(),
     112            0 :             self.protocol
     113            0 :         )
     114            0 :     }
     115              : 
     116            0 :     pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
     117            0 :         self.cold_start_info = info;
     118            0 :         self.latency_timer.cold_start_info(info);
     119            0 :     }
     120              : 
     121            0 :     pub fn set_project(&mut self, x: MetricsAuxInfo) {
     122            0 :         if self.endpoint_id.is_none() {
     123            0 :             self.set_endpoint_id(x.endpoint_id.as_str().into())
     124            0 :         }
     125            0 :         self.branch = Some(x.branch_id);
     126            0 :         self.project = Some(x.project_id);
     127            0 :         self.set_cold_start_info(x.cold_start_info);
     128            0 :     }
     129              : 
     130            0 :     pub fn set_project_id(&mut self, project_id: ProjectIdInt) {
     131            0 :         self.project = Some(project_id);
     132            0 :     }
     133              : 
     134           16 :     pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
     135           16 :         if self.endpoint_id.is_none() {
     136           16 :             self.span.record("ep", display(&endpoint_id));
     137           16 :             crate::metrics::CONNECTING_ENDPOINTS
     138           16 :                 .with_label_values(&[self.protocol])
     139           16 :                 .measure(&endpoint_id);
     140           16 :             self.endpoint_id = Some(endpoint_id);
     141           16 :         }
     142           16 :     }
     143              : 
     144           26 :     pub fn set_application(&mut self, app: Option<SmolStr>) {
     145           26 :         self.application = app.or_else(|| self.application.clone());
     146           26 :     }
     147              : 
     148            2 :     pub fn set_dbname(&mut self, dbname: DbName) {
     149            2 :         self.dbname = Some(dbname);
     150            2 :     }
     151              : 
     152           26 :     pub fn set_user(&mut self, user: RoleName) {
     153           26 :         self.user = Some(user);
     154           26 :     }
     155              : 
     156            6 :     pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
     157            6 :         self.auth_method = Some(auth_method);
     158            6 :     }
     159              : 
     160            0 :     pub fn set_error_kind(&mut self, kind: ErrorKind) {
     161            0 :         ERROR_BY_KIND
     162            0 :             .with_label_values(&[kind.to_metric_label()])
     163            0 :             .inc();
     164            0 :         if let Some(ep) = &self.endpoint_id {
     165            0 :             ENDPOINT_ERRORS_BY_KIND
     166            0 :                 .with_label_values(&[kind.to_metric_label()])
     167            0 :                 .measure(ep);
     168            0 :         }
     169            0 :         self.error_kind = Some(kind);
     170            0 :     }
     171              : 
     172            0 :     pub fn set_success(&mut self) {
     173            0 :         self.success = true;
     174            0 :     }
     175              : 
     176            0 :     pub fn log(self) {}
     177              : }
     178              : 
     179              : impl Drop for RequestMonitoring {
     180           70 :     fn drop(&mut self) {
     181           70 :         if let Some(tx) = self.sender.take() {
     182            0 :             let _: Result<(), _> = tx.send(RequestData::from(&*self));
     183           70 :         }
     184           70 :     }
     185              : }
        

Generated by: LCOV version 2.1-beta