LCOV - code coverage report
Current view: top level - proxy/src - context.rs (source / functions) Coverage Total Hit
Test: 86c536b7fe84b2afe03c3bb264199e9c319ae0f8.info Lines: 42.1 % 164 69
Test Date: 2024-06-24 16:38:41 Functions: 27.3 % 22 6

            Line data    Source code
       1              : //! Connection request monitoring contexts
       2              : 
       3              : use chrono::Utc;
       4              : use once_cell::sync::OnceCell;
       5              : use pq_proto::StartupMessageParams;
       6              : use smol_str::SmolStr;
       7              : use std::net::IpAddr;
       8              : use tokio::sync::mpsc;
       9              : use tracing::{field::display, info, info_span, Span};
      10              : use uuid::Uuid;
      11              : 
      12              : use crate::{
      13              :     console::messages::{ColdStartInfo, MetricsAuxInfo},
      14              :     error::ErrorKind,
      15              :     intern::{BranchIdInt, ProjectIdInt},
      16              :     metrics::{ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol},
      17              :     DbName, EndpointId, RoleName,
      18              : };
      19              : 
      20              : use self::parquet::RequestData;
      21              : 
      22              : pub mod parquet;
      23              : 
      24              : pub static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
      25              : pub static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
      26              : 
      27              : /// Context data for a single request to connect to a database.
      28              : ///
      29              : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
      30              : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
      31              : pub struct RequestMonitoring {
      32              :     pub peer_addr: IpAddr,
      33              :     pub session_id: Uuid,
      34              :     pub protocol: Protocol,
      35              :     first_packet: chrono::DateTime<Utc>,
      36              :     region: &'static str,
      37              :     pub span: Span,
      38              : 
      39              :     // filled in as they are discovered
      40              :     project: Option<ProjectIdInt>,
      41              :     branch: Option<BranchIdInt>,
      42              :     endpoint_id: Option<EndpointId>,
      43              :     dbname: Option<DbName>,
      44              :     user: Option<RoleName>,
      45              :     application: Option<SmolStr>,
      46              :     error_kind: Option<ErrorKind>,
      47              :     pub(crate) auth_method: Option<AuthMethod>,
      48              :     success: bool,
      49              :     pub(crate) cold_start_info: ColdStartInfo,
      50              :     pg_options: Option<StartupMessageParams>,
      51              : 
      52              :     // extra
      53              :     // This sender is here to keep the request monitoring channel open while requests are taking place.
      54              :     sender: Option<mpsc::UnboundedSender<RequestData>>,
      55              :     // This sender is only used to log the length of session in case of success.
      56              :     disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
      57              :     pub latency_timer: LatencyTimer,
      58              :     // Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
      59              :     rejected: Option<bool>,
      60              :     disconnect_timestamp: Option<chrono::DateTime<Utc>>,
      61              : }
      62              : 
      63              : #[derive(Clone, Debug)]
      64              : pub enum AuthMethod {
      65              :     // aka link aka passwordless
      66              :     Web,
      67              :     ScramSha256,
      68              :     ScramSha256Plus,
      69              :     Cleartext,
      70              : }
      71              : 
      72              : impl RequestMonitoring {
      73           70 :     pub fn new(
      74           70 :         session_id: Uuid,
      75           70 :         peer_addr: IpAddr,
      76           70 :         protocol: Protocol,
      77           70 :         region: &'static str,
      78           70 :     ) -> Self {
      79           70 :         let span = info_span!(
      80           70 :             "connect_request",
      81           70 :             %protocol,
      82           70 :             ?session_id,
      83           70 :             %peer_addr,
      84           70 :             ep = tracing::field::Empty,
      85           70 :             role = tracing::field::Empty,
      86           70 :         );
      87              : 
      88           70 :         Self {
      89           70 :             peer_addr,
      90           70 :             session_id,
      91           70 :             protocol,
      92           70 :             first_packet: Utc::now(),
      93           70 :             region,
      94           70 :             span,
      95           70 : 
      96           70 :             project: None,
      97           70 :             branch: None,
      98           70 :             endpoint_id: None,
      99           70 :             dbname: None,
     100           70 :             user: None,
     101           70 :             application: None,
     102           70 :             error_kind: None,
     103           70 :             auth_method: None,
     104           70 :             success: false,
     105           70 :             rejected: None,
     106           70 :             cold_start_info: ColdStartInfo::Unknown,
     107           70 :             pg_options: None,
     108           70 : 
     109           70 :             sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
     110           70 :             disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
     111           70 :             latency_timer: LatencyTimer::new(protocol),
     112           70 :             disconnect_timestamp: None,
     113           70 :         }
     114           70 :     }
     115              : 
     116              :     #[cfg(test)]
     117           70 :     pub fn test() -> Self {
     118           70 :         RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), Protocol::Tcp, "test")
     119           70 :     }
     120              : 
     121            0 :     pub fn console_application_name(&self) -> String {
     122            0 :         format!(
     123            0 :             "{}/{}",
     124            0 :             self.application.as_deref().unwrap_or_default(),
     125            0 :             self.protocol
     126            0 :         )
     127            0 :     }
     128              : 
     129            0 :     pub fn set_rejected(&mut self, rejected: bool) {
     130            0 :         self.rejected = Some(rejected);
     131            0 :     }
     132              : 
     133            0 :     pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
     134            0 :         self.cold_start_info = info;
     135            0 :         self.latency_timer.cold_start_info(info);
     136            0 :     }
     137              : 
     138            0 :     pub fn set_db_options(&mut self, options: StartupMessageParams) {
     139            0 :         self.set_application(options.get("application_name").map(SmolStr::from));
     140            0 :         if let Some(user) = options.get("user") {
     141            0 :             self.set_user(user.into());
     142            0 :         }
     143            0 :         if let Some(dbname) = options.get("database") {
     144            0 :             self.set_dbname(dbname.into());
     145            0 :         }
     146              : 
     147            0 :         self.pg_options = Some(options);
     148            0 :     }
     149              : 
     150            0 :     pub fn set_project(&mut self, x: MetricsAuxInfo) {
     151            0 :         if self.endpoint_id.is_none() {
     152            0 :             self.set_endpoint_id(x.endpoint_id.as_str().into())
     153            0 :         }
     154            0 :         self.branch = Some(x.branch_id);
     155            0 :         self.project = Some(x.project_id);
     156            0 :         self.set_cold_start_info(x.cold_start_info);
     157            0 :     }
     158              : 
     159            0 :     pub fn set_project_id(&mut self, project_id: ProjectIdInt) {
     160            0 :         self.project = Some(project_id);
     161            0 :     }
     162              : 
     163           16 :     pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
     164           16 :         if self.endpoint_id.is_none() {
     165           16 :             self.span.record("ep", display(&endpoint_id));
     166           16 :             let metric = &Metrics::get().proxy.connecting_endpoints;
     167           16 :             let label = metric.with_labels(self.protocol);
     168           16 :             metric.get_metric(label).measure(&endpoint_id);
     169           16 :             self.endpoint_id = Some(endpoint_id);
     170           16 :         }
     171           16 :     }
     172              : 
     173            0 :     fn set_application(&mut self, app: Option<SmolStr>) {
     174            0 :         if let Some(app) = app {
     175            0 :             self.application = Some(app);
     176            0 :         }
     177            0 :     }
     178              : 
     179            0 :     pub fn set_dbname(&mut self, dbname: DbName) {
     180            0 :         self.dbname = Some(dbname);
     181            0 :     }
     182              : 
     183            0 :     pub fn set_user(&mut self, user: RoleName) {
     184            0 :         self.span.record("role", display(&user));
     185            0 :         self.user = Some(user);
     186            0 :     }
     187              : 
     188            6 :     pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
     189            6 :         self.auth_method = Some(auth_method);
     190            6 :     }
     191              : 
     192            0 :     pub fn has_private_peer_addr(&self) -> bool {
     193            0 :         match self.peer_addr {
     194            0 :             IpAddr::V4(ip) => ip.is_private(),
     195            0 :             _ => false,
     196              :         }
     197            0 :     }
     198              : 
     199            0 :     pub fn set_error_kind(&mut self, kind: ErrorKind) {
     200            0 :         // Do not record errors from the private address to metrics.
     201            0 :         if !self.has_private_peer_addr() {
     202            0 :             Metrics::get().proxy.errors_total.inc(kind);
     203            0 :         }
     204            0 :         if let Some(ep) = &self.endpoint_id {
     205            0 :             let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
     206            0 :             let label = metric.with_labels(kind);
     207            0 :             metric.get_metric(label).measure(ep);
     208            0 :         }
     209            0 :         self.error_kind = Some(kind);
     210            0 :     }
     211              : 
     212            0 :     pub fn set_success(&mut self) {
     213            0 :         self.success = true;
     214            0 :     }
     215              : 
     216            0 :     pub fn log_connect(&mut self) {
     217            0 :         let outcome = if self.success {
     218            0 :             ConnectOutcome::Success
     219              :         } else {
     220            0 :             ConnectOutcome::Failed
     221              :         };
     222            0 :         if let Some(rejected) = self.rejected {
     223            0 :             let ep = self
     224            0 :                 .endpoint_id
     225            0 :                 .as_ref()
     226            0 :                 .map(|x| x.as_str())
     227            0 :                 .unwrap_or_default();
     228            0 :             // This makes sense only if cache is disabled
     229            0 :             info!(
     230              :                 ?outcome,
     231              :                 ?rejected,
     232              :                 ?ep,
     233            0 :                 "check endpoint is valid with outcome"
     234              :             );
     235            0 :             Metrics::get()
     236            0 :                 .proxy
     237            0 :                 .invalid_endpoints_total
     238            0 :                 .inc(InvalidEndpointsGroup {
     239            0 :                     protocol: self.protocol,
     240            0 :                     rejected: rejected.into(),
     241            0 :                     outcome,
     242            0 :                 });
     243            0 :         }
     244            0 :         if let Some(tx) = self.sender.take() {
     245            0 :             let _: Result<(), _> = tx.send(RequestData::from(&*self));
     246            0 :         }
     247            0 :     }
     248              : 
     249           70 :     fn log_disconnect(&mut self) {
     250           70 :         // If we are here, it's guaranteed that the user successfully connected to the endpoint.
     251           70 :         // Here we log the length of the session.
     252           70 :         self.disconnect_timestamp = Some(Utc::now());
     253           70 :         if let Some(tx) = self.disconnect_sender.take() {
     254            0 :             let _: Result<(), _> = tx.send(RequestData::from(&*self));
     255           70 :         }
     256           70 :     }
     257              : }
     258              : 
     259              : impl Drop for RequestMonitoring {
     260           70 :     fn drop(&mut self) {
     261           70 :         if self.sender.is_some() {
     262            0 :             self.log_connect();
     263           70 :         } else {
     264           70 :             self.log_disconnect();
     265           70 :         }
     266           70 :     }
     267              : }
        

Generated by: LCOV version 2.1-beta