LCOV - code coverage report
Current view: top level - proxy/src - context.rs (source / functions) Coverage Total Hit
Test: 53437f7e869ac68c86c7d3e4c20964c0156f158c.info Lines: 36.3 % 284 103
Test Date: 2024-09-20 16:14:12 Functions: 28.2 % 39 11

            Line data    Source code
       1              : //! Connection request monitoring contexts
       2              : 
       3              : use chrono::Utc;
       4              : use once_cell::sync::OnceCell;
       5              : use pq_proto::StartupMessageParams;
       6              : use smol_str::SmolStr;
       7              : use std::net::IpAddr;
       8              : use tokio::sync::mpsc;
       9              : use tracing::{debug, field::display, info, info_span, Span};
      10              : use try_lock::TryLock;
      11              : use uuid::Uuid;
      12              : 
      13              : use crate::{
      14              :     console::messages::{ColdStartInfo, MetricsAuxInfo},
      15              :     error::ErrorKind,
      16              :     intern::{BranchIdInt, ProjectIdInt},
      17              :     metrics::{ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting},
      18              :     DbName, EndpointId, RoleName,
      19              : };
      20              : 
      21              : use self::parquet::RequestData;
      22              : 
      23              : pub mod parquet;
      24              : 
      25              : pub(crate) static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
      26              : pub(crate) static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> =
      27              :     OnceCell::new();
      28              : 
      29              : /// Context data for a single request to connect to a database.
      30              : ///
      31              : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
      32              : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
      33              : pub struct RequestMonitoring(
      34              :     /// To allow easier use of the ctx object, we have interior mutability.
      35              :     /// I would typically use a RefCell but that would break the `Send` requirements
      36              :     /// so we need something with thread-safety. `TryLock` is a cheap alternative
      37              :     /// that offers similar semantics to a `RefCell` but with synchronisation.
      38              :     TryLock<RequestMonitoringInner>,
      39              : );
      40              : 
      41              : struct RequestMonitoringInner {
      42              :     pub(crate) peer_addr: IpAddr,
      43              :     pub(crate) session_id: Uuid,
      44              :     pub(crate) protocol: Protocol,
      45              :     first_packet: chrono::DateTime<Utc>,
      46              :     region: &'static str,
      47              :     pub(crate) span: Span,
      48              : 
      49              :     // filled in as they are discovered
      50              :     project: Option<ProjectIdInt>,
      51              :     branch: Option<BranchIdInt>,
      52              :     endpoint_id: Option<EndpointId>,
      53              :     dbname: Option<DbName>,
      54              :     user: Option<RoleName>,
      55              :     application: Option<SmolStr>,
      56              :     error_kind: Option<ErrorKind>,
      57              :     pub(crate) auth_method: Option<AuthMethod>,
      58              :     success: bool,
      59              :     pub(crate) cold_start_info: ColdStartInfo,
      60              :     pg_options: Option<StartupMessageParams>,
      61              : 
      62              :     // extra
      63              :     // This sender is here to keep the request monitoring channel open while requests are taking place.
      64              :     sender: Option<mpsc::UnboundedSender<RequestData>>,
      65              :     // This sender is only used to log the length of session in case of success.
      66              :     disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
      67              :     pub(crate) latency_timer: LatencyTimer,
      68              :     // Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
      69              :     rejected: Option<bool>,
      70              :     disconnect_timestamp: Option<chrono::DateTime<Utc>>,
      71              : }
      72              : 
      73              : #[derive(Clone, Debug)]
      74              : pub(crate) enum AuthMethod {
      75              :     // aka passwordless, fka link
      76              :     Web,
      77              :     ScramSha256,
      78              :     ScramSha256Plus,
      79              :     Cleartext,
      80              : }
      81              : 
      82              : impl Clone for RequestMonitoring {
      83            0 :     fn clone(&self) -> Self {
      84            0 :         let inner = self.0.try_lock().expect("should not deadlock");
      85            0 :         let new = RequestMonitoringInner {
      86            0 :             peer_addr: inner.peer_addr,
      87            0 :             session_id: inner.session_id,
      88            0 :             protocol: inner.protocol,
      89            0 :             first_packet: inner.first_packet,
      90            0 :             region: inner.region,
      91            0 :             span: info_span!("background_task"),
      92              : 
      93            0 :             project: inner.project,
      94            0 :             branch: inner.branch,
      95            0 :             endpoint_id: inner.endpoint_id.clone(),
      96            0 :             dbname: inner.dbname.clone(),
      97            0 :             user: inner.user.clone(),
      98            0 :             application: inner.application.clone(),
      99            0 :             error_kind: inner.error_kind,
     100            0 :             auth_method: inner.auth_method.clone(),
     101            0 :             success: inner.success,
     102            0 :             rejected: inner.rejected,
     103            0 :             cold_start_info: inner.cold_start_info,
     104            0 :             pg_options: inner.pg_options.clone(),
     105            0 : 
     106            0 :             sender: None,
     107            0 :             disconnect_sender: None,
     108            0 :             latency_timer: LatencyTimer::noop(inner.protocol),
     109            0 :             disconnect_timestamp: inner.disconnect_timestamp,
     110            0 :         };
     111            0 : 
     112            0 :         Self(TryLock::new(new))
     113            0 :     }
     114              : }
     115              : 
     116              : impl RequestMonitoring {
     117           61 :     pub fn new(
     118           61 :         session_id: Uuid,
     119           61 :         peer_addr: IpAddr,
     120           61 :         protocol: Protocol,
     121           61 :         region: &'static str,
     122           61 :     ) -> Self {
     123           61 :         let span = info_span!(
     124           61 :             "connect_request",
     125           61 :             %protocol,
     126           61 :             ?session_id,
     127           61 :             %peer_addr,
     128           61 :             ep = tracing::field::Empty,
     129           61 :             role = tracing::field::Empty,
     130           61 :         );
     131              : 
     132           61 :         let inner = RequestMonitoringInner {
     133           61 :             peer_addr,
     134           61 :             session_id,
     135           61 :             protocol,
     136           61 :             first_packet: Utc::now(),
     137           61 :             region,
     138           61 :             span,
     139           61 : 
     140           61 :             project: None,
     141           61 :             branch: None,
     142           61 :             endpoint_id: None,
     143           61 :             dbname: None,
     144           61 :             user: None,
     145           61 :             application: None,
     146           61 :             error_kind: None,
     147           61 :             auth_method: None,
     148           61 :             success: false,
     149           61 :             rejected: None,
     150           61 :             cold_start_info: ColdStartInfo::Unknown,
     151           61 :             pg_options: None,
     152           61 : 
     153           61 :             sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
     154           61 :             disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
     155           61 :             latency_timer: LatencyTimer::new(protocol),
     156           61 :             disconnect_timestamp: None,
     157           61 :         };
     158           61 : 
     159           61 :         Self(TryLock::new(inner))
     160           61 :     }
     161              : 
     162              :     #[cfg(test)]
     163           61 :     pub(crate) fn test() -> Self {
     164           61 :         RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), Protocol::Tcp, "test")
     165           61 :     }
     166              : 
     167            0 :     pub(crate) fn console_application_name(&self) -> String {
     168            0 :         let this = self.0.try_lock().expect("should not deadlock");
     169            0 :         format!(
     170            0 :             "{}/{}",
     171            0 :             this.application.as_deref().unwrap_or_default(),
     172            0 :             this.protocol
     173            0 :         )
     174            0 :     }
     175              : 
     176            0 :     pub(crate) fn set_rejected(&self, rejected: bool) {
     177            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     178            0 :         this.rejected = Some(rejected);
     179            0 :     }
     180              : 
     181            0 :     pub(crate) fn set_cold_start_info(&self, info: ColdStartInfo) {
     182            0 :         self.0
     183            0 :             .try_lock()
     184            0 :             .expect("should not deadlock")
     185            0 :             .set_cold_start_info(info);
     186            0 :     }
     187              : 
     188            0 :     pub(crate) fn set_db_options(&self, options: StartupMessageParams) {
     189            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     190            0 :         this.set_application(options.get("application_name").map(SmolStr::from));
     191            0 :         if let Some(user) = options.get("user") {
     192            0 :             this.set_user(user.into());
     193            0 :         }
     194            0 :         if let Some(dbname) = options.get("database") {
     195            0 :             this.set_dbname(dbname.into());
     196            0 :         }
     197              : 
     198            0 :         this.pg_options = Some(options);
     199            0 :     }
     200              : 
     201            0 :     pub(crate) fn set_project(&self, x: MetricsAuxInfo) {
     202            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     203            0 :         if this.endpoint_id.is_none() {
     204            0 :             this.set_endpoint_id(x.endpoint_id.as_str().into());
     205            0 :         }
     206            0 :         this.branch = Some(x.branch_id);
     207            0 :         this.project = Some(x.project_id);
     208            0 :         this.set_cold_start_info(x.cold_start_info);
     209            0 :     }
     210              : 
     211            0 :     pub(crate) fn set_project_id(&self, project_id: ProjectIdInt) {
     212            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     213            0 :         this.project = Some(project_id);
     214            0 :     }
     215              : 
     216           28 :     pub(crate) fn set_endpoint_id(&self, endpoint_id: EndpointId) {
     217           28 :         self.0
     218           28 :             .try_lock()
     219           28 :             .expect("should not deadlock")
     220           28 :             .set_endpoint_id(endpoint_id);
     221           28 :     }
     222              : 
     223            0 :     pub(crate) fn set_dbname(&self, dbname: DbName) {
     224            0 :         self.0
     225            0 :             .try_lock()
     226            0 :             .expect("should not deadlock")
     227            0 :             .set_dbname(dbname);
     228            0 :     }
     229              : 
     230            0 :     pub(crate) fn set_user(&self, user: RoleName) {
     231            0 :         self.0
     232            0 :             .try_lock()
     233            0 :             .expect("should not deadlock")
     234            0 :             .set_user(user);
     235            0 :     }
     236              : 
     237           15 :     pub(crate) fn set_auth_method(&self, auth_method: AuthMethod) {
     238           15 :         let mut this = self.0.try_lock().expect("should not deadlock");
     239           15 :         this.auth_method = Some(auth_method);
     240           15 :     }
     241              : 
     242            0 :     pub fn has_private_peer_addr(&self) -> bool {
     243            0 :         self.0
     244            0 :             .try_lock()
     245            0 :             .expect("should not deadlock")
     246            0 :             .has_private_peer_addr()
     247            0 :     }
     248              : 
     249            0 :     pub(crate) fn set_error_kind(&self, kind: ErrorKind) {
     250            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     251            0 :         // Do not record errors from the private address to metrics.
     252            0 :         if !this.has_private_peer_addr() {
     253            0 :             Metrics::get().proxy.errors_total.inc(kind);
     254            0 :         }
     255            0 :         if let Some(ep) = &this.endpoint_id {
     256            0 :             let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
     257            0 :             let label = metric.with_labels(kind);
     258            0 :             metric.get_metric(label).measure(ep);
     259            0 :         }
     260            0 :         this.error_kind = Some(kind);
     261            0 :     }
     262              : 
     263            0 :     pub fn set_success(&self) {
     264            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     265            0 :         this.success = true;
     266            0 :     }
     267              : 
     268            0 :     pub fn log_connect(&self) {
     269            0 :         self.0
     270            0 :             .try_lock()
     271            0 :             .expect("should not deadlock")
     272            0 :             .log_connect();
     273            0 :     }
     274              : 
     275            0 :     pub(crate) fn protocol(&self) -> Protocol {
     276            0 :         self.0.try_lock().expect("should not deadlock").protocol
     277            0 :     }
     278              : 
     279            0 :     pub(crate) fn span(&self) -> Span {
     280            0 :         self.0.try_lock().expect("should not deadlock").span.clone()
     281            0 :     }
     282              : 
     283            0 :     pub(crate) fn session_id(&self) -> Uuid {
     284            0 :         self.0.try_lock().expect("should not deadlock").session_id
     285            0 :     }
     286              : 
     287            6 :     pub(crate) fn peer_addr(&self) -> IpAddr {
     288            6 :         self.0.try_lock().expect("should not deadlock").peer_addr
     289            6 :     }
     290              : 
     291            0 :     pub(crate) fn cold_start_info(&self) -> ColdStartInfo {
     292            0 :         self.0
     293            0 :             .try_lock()
     294            0 :             .expect("should not deadlock")
     295            0 :             .cold_start_info
     296            0 :     }
     297              : 
     298           22 :     pub(crate) fn latency_timer_pause(&self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
     299           22 :         LatencyTimerPause {
     300           22 :             ctx: self,
     301           22 :             start: tokio::time::Instant::now(),
     302           22 :             waiting_for,
     303           22 :         }
     304           22 :     }
     305              : 
     306            4 :     pub(crate) fn success(&self) {
     307            4 :         self.0
     308            4 :             .try_lock()
     309            4 :             .expect("should not deadlock")
     310            4 :             .latency_timer
     311            4 :             .success();
     312            4 :     }
     313              : }
     314              : 
     315              : pub(crate) struct LatencyTimerPause<'a> {
     316              :     ctx: &'a RequestMonitoring,
     317              :     start: tokio::time::Instant,
     318              :     waiting_for: Waiting,
     319              : }
     320              : 
     321              : impl Drop for LatencyTimerPause<'_> {
     322           22 :     fn drop(&mut self) {
     323           22 :         self.ctx
     324           22 :             .0
     325           22 :             .try_lock()
     326           22 :             .expect("should not deadlock")
     327           22 :             .latency_timer
     328           22 :             .unpause(self.start, self.waiting_for);
     329           22 :     }
     330              : }
     331              : 
     332              : impl RequestMonitoringInner {
     333            0 :     fn set_cold_start_info(&mut self, info: ColdStartInfo) {
     334            0 :         self.cold_start_info = info;
     335            0 :         self.latency_timer.cold_start_info(info);
     336            0 :     }
     337              : 
     338           28 :     fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
     339           28 :         if self.endpoint_id.is_none() {
     340           28 :             self.span.record("ep", display(&endpoint_id));
     341           28 :             let metric = &Metrics::get().proxy.connecting_endpoints;
     342           28 :             let label = metric.with_labels(self.protocol);
     343           28 :             metric.get_metric(label).measure(&endpoint_id);
     344           28 :             self.endpoint_id = Some(endpoint_id);
     345           28 :         }
     346           28 :     }
     347              : 
     348            0 :     fn set_application(&mut self, app: Option<SmolStr>) {
     349            0 :         if let Some(app) = app {
     350            0 :             self.application = Some(app);
     351            0 :         }
     352            0 :     }
     353              : 
     354            0 :     fn set_dbname(&mut self, dbname: DbName) {
     355            0 :         self.dbname = Some(dbname);
     356            0 :     }
     357              : 
     358            0 :     fn set_user(&mut self, user: RoleName) {
     359            0 :         self.span.record("role", display(&user));
     360            0 :         self.user = Some(user);
     361            0 :     }
     362              : 
     363            0 :     fn has_private_peer_addr(&self) -> bool {
     364            0 :         match self.peer_addr {
     365            0 :             IpAddr::V4(ip) => ip.is_private(),
     366            0 :             IpAddr::V6(_) => false,
     367              :         }
     368            0 :     }
     369              : 
     370            0 :     fn log_connect(&mut self) {
     371            0 :         let outcome = if self.success {
     372            0 :             ConnectOutcome::Success
     373              :         } else {
     374            0 :             ConnectOutcome::Failed
     375              :         };
     376            0 :         if let Some(rejected) = self.rejected {
     377            0 :             let ep = self
     378            0 :                 .endpoint_id
     379            0 :                 .as_ref()
     380            0 :                 .map(|x| x.as_str())
     381            0 :                 .unwrap_or_default();
     382            0 :             // This makes sense only if cache is disabled
     383            0 :             info!(
     384              :                 ?outcome,
     385              :                 ?rejected,
     386              :                 ?ep,
     387            0 :                 "check endpoint is valid with outcome"
     388              :             );
     389            0 :             Metrics::get()
     390            0 :                 .proxy
     391            0 :                 .invalid_endpoints_total
     392            0 :                 .inc(InvalidEndpointsGroup {
     393            0 :                     protocol: self.protocol,
     394            0 :                     rejected: rejected.into(),
     395            0 :                     outcome,
     396            0 :                 });
     397            0 :         }
     398            0 :         if let Some(tx) = self.sender.take() {
     399            0 :             tx.send(RequestData::from(&*self))
     400            0 :                 .inspect_err(|e| debug!("tx send failed: {e}"))
     401            0 :                 .ok();
     402            0 :         }
     403            0 :     }
     404              : 
     405           61 :     fn log_disconnect(&mut self) {
     406           61 :         // If we are here, it's guaranteed that the user successfully connected to the endpoint.
     407           61 :         // Here we log the length of the session.
     408           61 :         self.disconnect_timestamp = Some(Utc::now());
     409           61 :         if let Some(tx) = self.disconnect_sender.take() {
     410            0 :             tx.send(RequestData::from(&*self))
     411            0 :                 .inspect_err(|e| debug!("tx send failed: {e}"))
     412            0 :                 .ok();
     413           61 :         }
     414           61 :     }
     415              : }
     416              : 
     417              : impl Drop for RequestMonitoringInner {
     418           61 :     fn drop(&mut self) {
     419           61 :         if self.sender.is_some() {
     420            0 :             self.log_connect();
     421           61 :         } else {
     422           61 :             self.log_disconnect();
     423           61 :         }
     424           61 :     }
     425              : }
        

Generated by: LCOV version 2.1-beta