LCOV - code coverage report
Current view: top level - proxy/src - context.rs (source / functions) Coverage Total Hit
Test: ccf45ed1c149555259baec52d6229a81013dcd6a.info Lines: 41.2 % 250 103
Test Date: 2024-08-21 17:32:46 Functions: 30.6 % 36 11

            Line data    Source code
       1              : //! Connection request monitoring contexts
       2              : 
       3              : use chrono::Utc;
       4              : use once_cell::sync::OnceCell;
       5              : use pq_proto::StartupMessageParams;
       6              : use smol_str::SmolStr;
       7              : use std::net::IpAddr;
       8              : use tokio::sync::mpsc;
       9              : use tracing::{field::display, info, info_span, Span};
      10              : use try_lock::TryLock;
      11              : use uuid::Uuid;
      12              : 
      13              : use crate::{
      14              :     console::messages::{ColdStartInfo, MetricsAuxInfo},
      15              :     error::ErrorKind,
      16              :     intern::{BranchIdInt, ProjectIdInt},
      17              :     metrics::{ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting},
      18              :     DbName, EndpointId, RoleName,
      19              : };
      20              : 
      21              : use self::parquet::RequestData;
      22              : 
      23              : pub mod parquet;
      24              : 
      25              : pub static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
      26              : pub static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
      27              : 
      28              : /// Context data for a single request to connect to a database.
      29              : ///
      30              : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
      31              : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
      32              : pub struct RequestMonitoring(
      33              :     /// To allow easier use of the ctx object, we have interior mutability.
      34              :     /// I would typically use a RefCell but that would break the `Send` requirements
      35              :     /// so we need something with thread-safety. `TryLock` is a cheap alternative
      36              :     /// that offers similar semantics to a `RefCell` but with synchronisation.
      37              :     TryLock<RequestMonitoringInner>,
      38              : );
      39              : 
      40              : struct RequestMonitoringInner {
      41              :     pub peer_addr: IpAddr,
      42              :     pub session_id: Uuid,
      43              :     pub protocol: Protocol,
      44              :     first_packet: chrono::DateTime<Utc>,
      45              :     region: &'static str,
      46              :     pub span: Span,
      47              : 
      48              :     // filled in as they are discovered
      49              :     project: Option<ProjectIdInt>,
      50              :     branch: Option<BranchIdInt>,
      51              :     endpoint_id: Option<EndpointId>,
      52              :     dbname: Option<DbName>,
      53              :     user: Option<RoleName>,
      54              :     application: Option<SmolStr>,
      55              :     error_kind: Option<ErrorKind>,
      56              :     pub(crate) auth_method: Option<AuthMethod>,
      57              :     success: bool,
      58              :     pub(crate) cold_start_info: ColdStartInfo,
      59              :     pg_options: Option<StartupMessageParams>,
      60              : 
      61              :     // extra
      62              :     // This sender is here to keep the request monitoring channel open while requests are taking place.
      63              :     sender: Option<mpsc::UnboundedSender<RequestData>>,
      64              :     // This sender is only used to log the length of session in case of success.
      65              :     disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
      66              :     pub latency_timer: LatencyTimer,
      67              :     // Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
      68              :     rejected: Option<bool>,
      69              :     disconnect_timestamp: Option<chrono::DateTime<Utc>>,
      70              : }
      71              : 
      72              : #[derive(Clone, Debug)]
      73              : pub enum AuthMethod {
      74              :     // aka link aka passwordless
      75              :     Web,
      76              :     ScramSha256,
      77              :     ScramSha256Plus,
      78              :     Cleartext,
      79              : }
      80              : 
      81              : impl RequestMonitoring {
      82          114 :     pub fn new(
      83          114 :         session_id: Uuid,
      84          114 :         peer_addr: IpAddr,
      85          114 :         protocol: Protocol,
      86          114 :         region: &'static str,
      87          114 :     ) -> Self {
      88          114 :         let span = info_span!(
      89          114 :             "connect_request",
      90          114 :             %protocol,
      91          114 :             ?session_id,
      92          114 :             %peer_addr,
      93          114 :             ep = tracing::field::Empty,
      94          114 :             role = tracing::field::Empty,
      95          114 :         );
      96              : 
      97          114 :         let inner = RequestMonitoringInner {
      98          114 :             peer_addr,
      99          114 :             session_id,
     100          114 :             protocol,
     101          114 :             first_packet: Utc::now(),
     102          114 :             region,
     103          114 :             span,
     104          114 : 
     105          114 :             project: None,
     106          114 :             branch: None,
     107          114 :             endpoint_id: None,
     108          114 :             dbname: None,
     109          114 :             user: None,
     110          114 :             application: None,
     111          114 :             error_kind: None,
     112          114 :             auth_method: None,
     113          114 :             success: false,
     114          114 :             rejected: None,
     115          114 :             cold_start_info: ColdStartInfo::Unknown,
     116          114 :             pg_options: None,
     117          114 : 
     118          114 :             sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
     119          114 :             disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
     120          114 :             latency_timer: LatencyTimer::new(protocol),
     121          114 :             disconnect_timestamp: None,
     122          114 :         };
     123          114 : 
     124          114 :         Self(TryLock::new(inner))
     125          114 :     }
     126              : 
     127              :     #[cfg(test)]
     128          114 :     pub fn test() -> Self {
     129          114 :         RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), Protocol::Tcp, "test")
     130          114 :     }
     131              : 
     132            0 :     pub fn console_application_name(&self) -> String {
     133            0 :         let this = self.0.try_lock().expect("should not deadlock");
     134            0 :         format!(
     135            0 :             "{}/{}",
     136            0 :             this.application.as_deref().unwrap_or_default(),
     137            0 :             this.protocol
     138            0 :         )
     139            0 :     }
     140              : 
     141            0 :     pub fn set_rejected(&self, rejected: bool) {
     142            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     143            0 :         this.rejected = Some(rejected);
     144            0 :     }
     145              : 
     146            0 :     pub fn set_cold_start_info(&self, info: ColdStartInfo) {
     147            0 :         self.0
     148            0 :             .try_lock()
     149            0 :             .expect("should not deadlock")
     150            0 :             .set_cold_start_info(info);
     151            0 :     }
     152              : 
     153            0 :     pub fn set_db_options(&self, options: StartupMessageParams) {
     154            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     155            0 :         this.set_application(options.get("application_name").map(SmolStr::from));
     156            0 :         if let Some(user) = options.get("user") {
     157            0 :             this.set_user(user.into());
     158            0 :         }
     159            0 :         if let Some(dbname) = options.get("database") {
     160            0 :             this.set_dbname(dbname.into());
     161            0 :         }
     162              : 
     163            0 :         this.pg_options = Some(options);
     164            0 :     }
     165              : 
     166            0 :     pub fn set_project(&self, x: MetricsAuxInfo) {
     167            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     168            0 :         if this.endpoint_id.is_none() {
     169            0 :             this.set_endpoint_id(x.endpoint_id.as_str().into());
     170            0 :         }
     171            0 :         this.branch = Some(x.branch_id);
     172            0 :         this.project = Some(x.project_id);
     173            0 :         this.set_cold_start_info(x.cold_start_info);
     174            0 :     }
     175              : 
     176            0 :     pub fn set_project_id(&self, project_id: ProjectIdInt) {
     177            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     178            0 :         this.project = Some(project_id);
     179            0 :     }
     180              : 
     181           56 :     pub fn set_endpoint_id(&self, endpoint_id: EndpointId) {
     182           56 :         self.0
     183           56 :             .try_lock()
     184           56 :             .expect("should not deadlock")
     185           56 :             .set_endpoint_id(endpoint_id);
     186           56 :     }
     187              : 
     188            0 :     pub fn set_dbname(&self, dbname: DbName) {
     189            0 :         self.0
     190            0 :             .try_lock()
     191            0 :             .expect("should not deadlock")
     192            0 :             .set_dbname(dbname);
     193            0 :     }
     194              : 
     195            0 :     pub fn set_user(&self, user: RoleName) {
     196            0 :         self.0
     197            0 :             .try_lock()
     198            0 :             .expect("should not deadlock")
     199            0 :             .set_user(user);
     200            0 :     }
     201              : 
     202           30 :     pub fn set_auth_method(&self, auth_method: AuthMethod) {
     203           30 :         let mut this = self.0.try_lock().expect("should not deadlock");
     204           30 :         this.auth_method = Some(auth_method);
     205           30 :     }
     206              : 
     207            0 :     pub fn has_private_peer_addr(&self) -> bool {
     208            0 :         self.0
     209            0 :             .try_lock()
     210            0 :             .expect("should not deadlock")
     211            0 :             .has_private_peer_addr()
     212            0 :     }
     213              : 
     214            0 :     pub fn set_error_kind(&self, kind: ErrorKind) {
     215            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     216            0 :         // Do not record errors from the private address to metrics.
     217            0 :         if !this.has_private_peer_addr() {
     218            0 :             Metrics::get().proxy.errors_total.inc(kind);
     219            0 :         }
     220            0 :         if let Some(ep) = &this.endpoint_id {
     221            0 :             let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
     222            0 :             let label = metric.with_labels(kind);
     223            0 :             metric.get_metric(label).measure(ep);
     224            0 :         }
     225            0 :         this.error_kind = Some(kind);
     226            0 :     }
     227              : 
     228            0 :     pub fn set_success(&self) {
     229            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     230            0 :         this.success = true;
     231            0 :     }
     232              : 
     233            0 :     pub fn log_connect(&self) {
     234            0 :         self.0
     235            0 :             .try_lock()
     236            0 :             .expect("should not deadlock")
     237            0 :             .log_connect();
     238            0 :     }
     239              : 
     240            0 :     pub fn protocol(&self) -> Protocol {
     241            0 :         self.0.try_lock().expect("should not deadlock").protocol
     242            0 :     }
     243              : 
     244            0 :     pub fn span(&self) -> Span {
     245            0 :         self.0.try_lock().expect("should not deadlock").span.clone()
     246            0 :     }
     247              : 
     248            0 :     pub fn session_id(&self) -> Uuid {
     249            0 :         self.0.try_lock().expect("should not deadlock").session_id
     250            0 :     }
     251              : 
     252           12 :     pub fn peer_addr(&self) -> IpAddr {
     253           12 :         self.0.try_lock().expect("should not deadlock").peer_addr
     254           12 :     }
     255              : 
     256            0 :     pub fn cold_start_info(&self) -> ColdStartInfo {
     257            0 :         self.0
     258            0 :             .try_lock()
     259            0 :             .expect("should not deadlock")
     260            0 :             .cold_start_info
     261            0 :     }
     262              : 
     263           42 :     pub fn latency_timer_pause(&self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
     264           42 :         LatencyTimerPause {
     265           42 :             ctx: self,
     266           42 :             start: tokio::time::Instant::now(),
     267           42 :             waiting_for,
     268           42 :         }
     269           42 :     }
     270              : 
     271            8 :     pub fn success(&self) {
     272            8 :         self.0
     273            8 :             .try_lock()
     274            8 :             .expect("should not deadlock")
     275            8 :             .latency_timer
     276            8 :             .success();
     277            8 :     }
     278              : }
     279              : 
     280              : pub struct LatencyTimerPause<'a> {
     281              :     ctx: &'a RequestMonitoring,
     282              :     start: tokio::time::Instant,
     283              :     waiting_for: Waiting,
     284              : }
     285              : 
     286              : impl Drop for LatencyTimerPause<'_> {
     287           42 :     fn drop(&mut self) {
     288           42 :         self.ctx
     289           42 :             .0
     290           42 :             .try_lock()
     291           42 :             .expect("should not deadlock")
     292           42 :             .latency_timer
     293           42 :             .unpause(self.start, self.waiting_for);
     294           42 :     }
     295              : }
     296              : 
     297              : impl RequestMonitoringInner {
     298            0 :     fn set_cold_start_info(&mut self, info: ColdStartInfo) {
     299            0 :         self.cold_start_info = info;
     300            0 :         self.latency_timer.cold_start_info(info);
     301            0 :     }
     302              : 
     303           56 :     fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
     304           56 :         if self.endpoint_id.is_none() {
     305           56 :             self.span.record("ep", display(&endpoint_id));
     306           56 :             let metric = &Metrics::get().proxy.connecting_endpoints;
     307           56 :             let label = metric.with_labels(self.protocol);
     308           56 :             metric.get_metric(label).measure(&endpoint_id);
     309           56 :             self.endpoint_id = Some(endpoint_id);
     310           56 :         }
     311           56 :     }
     312              : 
     313            0 :     fn set_application(&mut self, app: Option<SmolStr>) {
     314            0 :         if let Some(app) = app {
     315            0 :             self.application = Some(app);
     316            0 :         }
     317            0 :     }
     318              : 
     319            0 :     fn set_dbname(&mut self, dbname: DbName) {
     320            0 :         self.dbname = Some(dbname);
     321            0 :     }
     322              : 
     323            0 :     fn set_user(&mut self, user: RoleName) {
     324            0 :         self.span.record("role", display(&user));
     325            0 :         self.user = Some(user);
     326            0 :     }
     327              : 
     328            0 :     fn has_private_peer_addr(&self) -> bool {
     329            0 :         match self.peer_addr {
     330            0 :             IpAddr::V4(ip) => ip.is_private(),
     331            0 :             IpAddr::V6(_) => false,
     332              :         }
     333            0 :     }
     334              : 
     335            0 :     fn log_connect(&mut self) {
     336            0 :         let outcome = if self.success {
     337            0 :             ConnectOutcome::Success
     338              :         } else {
     339            0 :             ConnectOutcome::Failed
     340              :         };
     341            0 :         if let Some(rejected) = self.rejected {
     342            0 :             let ep = self
     343            0 :                 .endpoint_id
     344            0 :                 .as_ref()
     345            0 :                 .map(|x| x.as_str())
     346            0 :                 .unwrap_or_default();
     347            0 :             // This makes sense only if cache is disabled
     348            0 :             info!(
     349              :                 ?outcome,
     350              :                 ?rejected,
     351              :                 ?ep,
     352            0 :                 "check endpoint is valid with outcome"
     353              :             );
     354            0 :             Metrics::get()
     355            0 :                 .proxy
     356            0 :                 .invalid_endpoints_total
     357            0 :                 .inc(InvalidEndpointsGroup {
     358            0 :                     protocol: self.protocol,
     359            0 :                     rejected: rejected.into(),
     360            0 :                     outcome,
     361            0 :                 });
     362            0 :         }
     363            0 :         if let Some(tx) = self.sender.take() {
     364            0 :             let _: Result<(), _> = tx.send(RequestData::from(&*self));
     365            0 :         }
     366            0 :     }
     367              : 
     368          114 :     fn log_disconnect(&mut self) {
     369          114 :         // If we are here, it's guaranteed that the user successfully connected to the endpoint.
     370          114 :         // Here we log the length of the session.
     371          114 :         self.disconnect_timestamp = Some(Utc::now());
     372          114 :         if let Some(tx) = self.disconnect_sender.take() {
     373            0 :             let _: Result<(), _> = tx.send(RequestData::from(&*self));
     374          114 :         }
     375          114 :     }
     376              : }
     377              : 
     378              : impl Drop for RequestMonitoringInner {
     379          114 :     fn drop(&mut self) {
     380          114 :         if self.sender.is_some() {
     381            0 :             self.log_connect();
     382          114 :         } else {
     383          114 :             self.log_disconnect();
     384          114 :         }
     385          114 :     }
     386              : }
        

Generated by: LCOV version 2.1-beta