LCOV - code coverage report
Current view: top level - proxy/src/context - mod.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 30.0 % 310 93
Test Date: 2025-07-16 12:29:03 Functions: 26.7 % 45 12

            Line data    Source code
       1              : //! Connection request monitoring contexts
       2              : 
       3              : use std::net::IpAddr;
       4              : 
       5              : use chrono::Utc;
       6              : use once_cell::sync::OnceCell;
       7              : use smol_str::SmolStr;
       8              : use tokio::sync::mpsc;
       9              : use tracing::field::display;
      10              : use tracing::{Span, error, info_span};
      11              : use try_lock::TryLock;
      12              : use uuid::Uuid;
      13              : 
      14              : use self::parquet::RequestData;
      15              : use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
      16              : use crate::error::ErrorKind;
      17              : use crate::intern::{BranchIdInt, ProjectIdInt};
      18              : use crate::metrics::{LatencyAccumulated, LatencyTimer, Metrics, Protocol, Waiting};
      19              : use crate::pqproto::StartupMessageParams;
      20              : use crate::protocol2::{ConnectionInfo, ConnectionInfoExtra};
      21              : use crate::types::{DbName, EndpointId, RoleName};
      22              : 
      23              : pub mod parquet;
      24              : 
      25              : pub(crate) static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
      26              : pub(crate) static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> =
      27              :     OnceCell::new();
      28              : 
      29              : /// Context data for a single request to connect to a database.
      30              : ///
      31              : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
      32              : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
      33              : pub struct RequestContext(
      34              :     /// To allow easier use of the ctx object, we have interior mutability.
      35              :     /// I would typically use a RefCell but that would break the `Send` requirements
      36              :     /// so we need something with thread-safety. `TryLock` is a cheap alternative
      37              :     /// that offers similar semantics to a `RefCell` but with synchronisation.
      38              :     TryLock<RequestContextInner>,
      39              : );
      40              : 
      41              : struct RequestContextInner {
      42              :     pub(crate) conn_info: ConnectionInfo,
      43              :     pub(crate) session_id: Uuid,
      44              :     pub(crate) protocol: Protocol,
      45              :     first_packet: chrono::DateTime<Utc>,
      46              :     pub(crate) span: Span,
      47              : 
      48              :     // filled in as they are discovered
      49              :     project: Option<ProjectIdInt>,
      50              :     branch: Option<BranchIdInt>,
      51              :     endpoint_id: Option<EndpointId>,
      52              :     dbname: Option<DbName>,
      53              :     user: Option<RoleName>,
      54              :     application: Option<SmolStr>,
      55              :     user_agent: Option<SmolStr>,
      56              :     error_kind: Option<ErrorKind>,
      57              :     pub(crate) auth_method: Option<AuthMethod>,
      58              :     jwt_issuer: Option<String>,
      59              :     success: bool,
      60              :     pub(crate) cold_start_info: ColdStartInfo,
      61              :     pg_options: Option<StartupMessageParams>,
      62              :     testodrome_query_id: Option<SmolStr>,
      63              : 
      64              :     // extra
      65              :     // This sender is here to keep the request monitoring channel open while requests are taking place.
      66              :     sender: Option<mpsc::UnboundedSender<RequestData>>,
      67              :     // This sender is only used to log the length of session in case of success.
      68              :     disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
      69              :     pub(crate) latency_timer: LatencyTimer,
      70              :     disconnect_timestamp: Option<chrono::DateTime<Utc>>,
      71              : }
      72              : 
      73              : #[derive(Clone, Debug)]
      74              : pub(crate) enum AuthMethod {
      75              :     // aka link
      76              :     ConsoleRedirect,
      77              :     ScramSha256,
      78              :     ScramSha256Plus,
      79              :     Cleartext,
      80              :     Jwt,
      81              : }
      82              : 
      83              : impl Clone for RequestContext {
      84            0 :     fn clone(&self) -> Self {
      85            0 :         let inner = self.0.try_lock().expect("should not deadlock");
      86            0 :         let new = RequestContextInner {
      87            0 :             conn_info: inner.conn_info.clone(),
      88            0 :             session_id: inner.session_id,
      89            0 :             protocol: inner.protocol,
      90            0 :             first_packet: inner.first_packet,
      91            0 :             span: info_span!("background_task"),
      92              : 
      93            0 :             project: inner.project,
      94            0 :             branch: inner.branch,
      95            0 :             endpoint_id: inner.endpoint_id.clone(),
      96            0 :             dbname: inner.dbname.clone(),
      97            0 :             user: inner.user.clone(),
      98            0 :             application: inner.application.clone(),
      99            0 :             user_agent: inner.user_agent.clone(),
     100            0 :             error_kind: inner.error_kind,
     101            0 :             auth_method: inner.auth_method.clone(),
     102            0 :             jwt_issuer: inner.jwt_issuer.clone(),
     103            0 :             success: inner.success,
     104            0 :             cold_start_info: inner.cold_start_info,
     105            0 :             pg_options: inner.pg_options.clone(),
     106            0 :             testodrome_query_id: inner.testodrome_query_id.clone(),
     107              : 
     108            0 :             sender: None,
     109            0 :             disconnect_sender: None,
     110            0 :             latency_timer: LatencyTimer::noop(inner.protocol),
     111            0 :             disconnect_timestamp: inner.disconnect_timestamp,
     112              :         };
     113              : 
     114            0 :         Self(TryLock::new(new))
     115            0 :     }
     116              : }
     117              : 
     118              : impl RequestContext {
     119           77 :     pub fn new(session_id: Uuid, conn_info: ConnectionInfo, protocol: Protocol) -> Self {
     120              :         // TODO: be careful with long lived spans
     121           77 :         let span = info_span!(
     122              :             "connect_request",
     123              :             %protocol,
     124              :             ?session_id,
     125              :             %conn_info,
     126              :             ep = tracing::field::Empty,
     127              :             role = tracing::field::Empty,
     128              :         );
     129              : 
     130           77 :         let inner = RequestContextInner {
     131           77 :             conn_info,
     132           77 :             session_id,
     133           77 :             protocol,
     134           77 :             first_packet: Utc::now(),
     135           77 :             span,
     136              : 
     137           77 :             project: None,
     138           77 :             branch: None,
     139           77 :             endpoint_id: None,
     140           77 :             dbname: None,
     141           77 :             user: None,
     142           77 :             application: None,
     143           77 :             user_agent: None,
     144           77 :             error_kind: None,
     145           77 :             auth_method: None,
     146           77 :             jwt_issuer: None,
     147              :             success: false,
     148           77 :             cold_start_info: ColdStartInfo::Unknown,
     149           77 :             pg_options: None,
     150           77 :             testodrome_query_id: None,
     151              : 
     152           77 :             sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
     153           77 :             disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
     154           77 :             latency_timer: LatencyTimer::new(protocol),
     155           77 :             disconnect_timestamp: None,
     156              :         };
     157              : 
     158           77 :         Self(TryLock::new(inner))
     159           77 :     }
     160              : 
     161              :     #[cfg(test)]
     162           77 :     pub(crate) fn test() -> Self {
     163              :         use std::net::SocketAddr;
     164           77 :         let ip = IpAddr::from([127, 0, 0, 1]);
     165           77 :         let addr = SocketAddr::new(ip, 5432);
     166           77 :         let conn_info = ConnectionInfo { addr, extra: None };
     167           77 :         RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp)
     168           77 :     }
     169              : 
     170            0 :     pub(crate) fn console_application_name(&self) -> String {
     171            0 :         let this = self.0.try_lock().expect("should not deadlock");
     172            0 :         format!(
     173            0 :             "{}/{}",
     174            0 :             this.application.as_deref().unwrap_or_default(),
     175            0 :             this.protocol
     176              :         )
     177            0 :     }
     178              : 
     179            0 :     pub(crate) fn set_cold_start_info(&self, info: ColdStartInfo) {
     180            0 :         self.0
     181            0 :             .try_lock()
     182            0 :             .expect("should not deadlock")
     183            0 :             .set_cold_start_info(info);
     184            0 :     }
     185              : 
     186            0 :     pub(crate) fn set_db_options(&self, options: StartupMessageParams) {
     187            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     188            0 :         this.set_application(options.get("application_name").map(SmolStr::from));
     189            0 :         if let Some(user) = options.get("user") {
     190            0 :             this.set_user(user.into());
     191            0 :         }
     192            0 :         if let Some(dbname) = options.get("database") {
     193            0 :             this.set_dbname(dbname.into());
     194            0 :         }
     195              : 
     196              :         // Try to get testodrome_query_id directly from parameters
     197            0 :         if let Some(options_str) = options.get("options") {
     198              :             // If not found directly, try to extract it from the options string
     199            0 :             for option in options_str.split_whitespace() {
     200            0 :                 if let Some(value) = option.strip_prefix("neon_query_id:") {
     201            0 :                     this.set_testodrome_id(value.into());
     202            0 :                     break;
     203            0 :                 }
     204              :             }
     205            0 :         }
     206              : 
     207            0 :         this.pg_options = Some(options);
     208            0 :     }
     209              : 
     210            0 :     pub(crate) fn set_project(&self, x: MetricsAuxInfo) {
     211            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     212            0 :         if this.endpoint_id.is_none() {
     213            0 :             this.set_endpoint_id(x.endpoint_id.as_str().into());
     214            0 :         }
     215            0 :         this.branch = Some(x.branch_id);
     216            0 :         this.project = Some(x.project_id);
     217            0 :         this.set_cold_start_info(x.cold_start_info);
     218            0 :     }
     219              : 
     220            0 :     pub(crate) fn set_project_id(&self, project_id: ProjectIdInt) {
     221            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     222            0 :         this.project = Some(project_id);
     223            0 :     }
     224              : 
     225           29 :     pub(crate) fn set_endpoint_id(&self, endpoint_id: EndpointId) {
     226           29 :         self.0
     227           29 :             .try_lock()
     228           29 :             .expect("should not deadlock")
     229           29 :             .set_endpoint_id(endpoint_id);
     230           29 :     }
     231              : 
     232            0 :     pub(crate) fn set_dbname(&self, dbname: DbName) {
     233            0 :         self.0
     234            0 :             .try_lock()
     235            0 :             .expect("should not deadlock")
     236            0 :             .set_dbname(dbname);
     237            0 :     }
     238              : 
     239            0 :     pub(crate) fn set_user(&self, user: RoleName) {
     240            0 :         self.0
     241            0 :             .try_lock()
     242            0 :             .expect("should not deadlock")
     243            0 :             .set_user(user);
     244            0 :     }
     245              : 
     246            0 :     pub(crate) fn set_user_agent(&self, user_agent: Option<SmolStr>) {
     247            0 :         self.0
     248            0 :             .try_lock()
     249            0 :             .expect("should not deadlock")
     250            0 :             .set_user_agent(user_agent);
     251            0 :     }
     252              : 
     253            0 :     pub(crate) fn set_testodrome_id(&self, query_id: SmolStr) {
     254            0 :         self.0
     255            0 :             .try_lock()
     256            0 :             .expect("should not deadlock")
     257            0 :             .set_testodrome_id(query_id);
     258            0 :     }
     259              : 
     260           15 :     pub(crate) fn set_auth_method(&self, auth_method: AuthMethod) {
     261           15 :         let mut this = self.0.try_lock().expect("should not deadlock");
     262           15 :         this.auth_method = Some(auth_method);
     263           15 :     }
     264              : 
     265           12 :     pub(crate) fn set_jwt_issuer(&self, jwt_issuer: String) {
     266           12 :         let mut this = self.0.try_lock().expect("should not deadlock");
     267           12 :         this.jwt_issuer = Some(jwt_issuer);
     268           12 :     }
     269              : 
     270            0 :     pub fn has_private_peer_addr(&self) -> bool {
     271            0 :         self.0
     272            0 :             .try_lock()
     273            0 :             .expect("should not deadlock")
     274            0 :             .has_private_peer_addr()
     275            0 :     }
     276              : 
     277            0 :     pub(crate) fn set_error_kind(&self, kind: ErrorKind) {
     278            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     279              :         // Do not record errors from the private address to metrics.
     280            0 :         if !this.has_private_peer_addr() {
     281            0 :             Metrics::get().proxy.errors_total.inc(kind);
     282            0 :         }
     283            0 :         if let Some(ep) = &this.endpoint_id {
     284            0 :             let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
     285            0 :             let label = metric.with_labels(kind);
     286            0 :             metric.get_metric(label).measure(ep);
     287            0 :         }
     288            0 :         this.error_kind = Some(kind);
     289            0 :     }
     290              : 
     291            0 :     pub fn set_success(&self) {
     292            0 :         let mut this = self.0.try_lock().expect("should not deadlock");
     293            0 :         this.success = true;
     294            0 :     }
     295              : 
     296            0 :     pub fn log_connect(self) -> DisconnectLogger {
     297            0 :         let mut this = self.0.into_inner();
     298            0 :         this.log_connect();
     299              : 
     300              :         // close current span.
     301            0 :         this.span = Span::none();
     302              : 
     303            0 :         DisconnectLogger(this)
     304            0 :     }
     305              : 
     306           16 :     pub(crate) fn protocol(&self) -> Protocol {
     307           16 :         self.0.try_lock().expect("should not deadlock").protocol
     308           16 :     }
     309              : 
     310            0 :     pub(crate) fn span(&self) -> Span {
     311            0 :         self.0.try_lock().expect("should not deadlock").span.clone()
     312            0 :     }
     313              : 
     314            0 :     pub(crate) fn session_id(&self) -> Uuid {
     315            0 :         self.0.try_lock().expect("should not deadlock").session_id
     316            0 :     }
     317              : 
     318            3 :     pub(crate) fn peer_addr(&self) -> IpAddr {
     319            3 :         self.0
     320            3 :             .try_lock()
     321            3 :             .expect("should not deadlock")
     322            3 :             .conn_info
     323            3 :             .addr
     324            3 :             .ip()
     325            3 :     }
     326              : 
     327            0 :     pub(crate) fn extra(&self) -> Option<ConnectionInfoExtra> {
     328            0 :         self.0
     329            0 :             .try_lock()
     330            0 :             .expect("should not deadlock")
     331            0 :             .conn_info
     332            0 :             .extra
     333            0 :             .clone()
     334            0 :     }
     335              : 
     336            0 :     pub(crate) fn cold_start_info(&self) -> ColdStartInfo {
     337            0 :         self.0
     338            0 :             .try_lock()
     339            0 :             .expect("should not deadlock")
     340            0 :             .cold_start_info
     341            0 :     }
     342              : 
     343           52 :     pub(crate) fn latency_timer_pause(&self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
     344           52 :         LatencyTimerPause {
     345           52 :             ctx: self,
     346           52 :             start: tokio::time::Instant::now(),
     347           52 :             waiting_for,
     348           52 :         }
     349           52 :     }
     350              : 
     351            0 :     pub(crate) fn latency_timer_pause_at(
     352            0 :         &self,
     353            0 :         at: tokio::time::Instant,
     354            0 :         waiting_for: Waiting,
     355            0 :     ) -> LatencyTimerPause<'_> {
     356            0 :         LatencyTimerPause {
     357            0 :             ctx: self,
     358            0 :             start: at,
     359            0 :             waiting_for,
     360            0 :         }
     361            0 :     }
     362              : 
     363            0 :     pub(crate) fn get_proxy_latency(&self) -> LatencyAccumulated {
     364            0 :         self.0
     365            0 :             .try_lock()
     366            0 :             .expect("should not deadlock")
     367            0 :             .latency_timer
     368            0 :             .accumulated()
     369            0 :     }
     370              : 
     371            0 :     pub(crate) fn get_testodrome_id(&self) -> Option<SmolStr> {
     372            0 :         self.0
     373            0 :             .try_lock()
     374            0 :             .expect("should not deadlock")
     375            0 :             .testodrome_query_id
     376            0 :             .clone()
     377            0 :     }
     378              : 
     379            8 :     pub(crate) fn success(&self) {
     380            8 :         self.0
     381            8 :             .try_lock()
     382            8 :             .expect("should not deadlock")
     383            8 :             .latency_timer
     384            8 :             .success();
     385            8 :     }
     386              : }
     387              : 
     388              : pub(crate) struct LatencyTimerPause<'a> {
     389              :     ctx: &'a RequestContext,
     390              :     start: tokio::time::Instant,
     391              :     waiting_for: Waiting,
     392              : }
     393              : 
     394              : impl Drop for LatencyTimerPause<'_> {
     395           52 :     fn drop(&mut self) {
     396           52 :         self.ctx
     397           52 :             .0
     398           52 :             .try_lock()
     399           52 :             .expect("should not deadlock")
     400           52 :             .latency_timer
     401           52 :             .unpause(self.start, self.waiting_for);
     402           52 :     }
     403              : }
     404              : 
     405              : impl RequestContextInner {
     406            0 :     fn set_cold_start_info(&mut self, info: ColdStartInfo) {
     407            0 :         self.cold_start_info = info;
     408            0 :         self.latency_timer.cold_start_info(info);
     409            0 :     }
     410              : 
     411           29 :     fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
     412           29 :         if self.endpoint_id.is_none() {
     413           29 :             self.span.record("ep", display(&endpoint_id));
     414           29 :             let metric = &Metrics::get().proxy.connecting_endpoints;
     415           29 :             let label = metric.with_labels(self.protocol);
     416           29 :             metric.get_metric(label).measure(&endpoint_id);
     417           29 :             self.endpoint_id = Some(endpoint_id);
     418           29 :         }
     419           29 :     }
     420              : 
     421            0 :     fn set_application(&mut self, app: Option<SmolStr>) {
     422            0 :         if let Some(app) = app {
     423            0 :             self.application = Some(app);
     424            0 :         }
     425            0 :     }
     426              : 
     427            0 :     fn set_user_agent(&mut self, user_agent: Option<SmolStr>) {
     428            0 :         self.user_agent = user_agent;
     429            0 :     }
     430              : 
     431            0 :     fn set_dbname(&mut self, dbname: DbName) {
     432            0 :         self.dbname = Some(dbname);
     433            0 :     }
     434              : 
     435            0 :     fn set_user(&mut self, user: RoleName) {
     436            0 :         self.span.record("role", display(&user));
     437            0 :         self.user = Some(user);
     438            0 :     }
     439              : 
     440            0 :     fn set_testodrome_id(&mut self, query_id: SmolStr) {
     441            0 :         self.testodrome_query_id = Some(query_id);
     442            0 :     }
     443              : 
     444            0 :     fn has_private_peer_addr(&self) -> bool {
     445            0 :         match self.conn_info.addr.ip() {
     446            0 :             IpAddr::V4(ip) => ip.is_private(),
     447            0 :             IpAddr::V6(_) => false,
     448              :         }
     449            0 :     }
     450              : 
     451            0 :     fn log_connect(&mut self) {
     452            0 :         if let Some(tx) = self.sender.take() {
     453              :             // If type changes, this error handling needs to be updated.
     454            0 :             let tx: mpsc::UnboundedSender<RequestData> = tx;
     455            0 :             if let Err(e) = tx.send(RequestData::from(&*self)) {
     456            0 :                 error!("log_connect channel send failed: {e}");
     457            0 :             }
     458            0 :         }
     459            0 :     }
     460              : 
     461            0 :     fn log_disconnect(&mut self) {
     462              :         // If we are here, it's guaranteed that the user successfully connected to the endpoint.
     463              :         // Here we log the length of the session.
     464            0 :         self.disconnect_timestamp = Some(Utc::now());
     465            0 :         if let Some(tx) = self.disconnect_sender.take() {
     466              :             // If type changes, this error handling needs to be updated.
     467            0 :             let tx: mpsc::UnboundedSender<RequestData> = tx;
     468            0 :             if let Err(e) = tx.send(RequestData::from(&*self)) {
     469            0 :                 error!("log_disconnect channel send failed: {e}");
     470            0 :             }
     471            0 :         }
     472            0 :     }
     473              : }
     474              : 
     475              : impl Drop for RequestContextInner {
     476           77 :     fn drop(&mut self) {
     477           77 :         if self.sender.is_some() {
     478            0 :             self.log_connect();
     479           77 :         }
     480           77 :     }
     481              : }
     482              : 
     483              : pub struct DisconnectLogger(RequestContextInner);
     484              : 
     485              : impl Drop for DisconnectLogger {
     486            0 :     fn drop(&mut self) {
     487            0 :         self.0.log_disconnect();
     488            0 :     }
     489              : }
        

Generated by: LCOV version 2.1-beta