Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use std::net::IpAddr;
4 :
5 : use chrono::Utc;
6 : use once_cell::sync::OnceCell;
7 : use smol_str::SmolStr;
8 : use tokio::sync::mpsc;
9 : use tracing::field::display;
10 : use tracing::{Span, error, info_span};
11 : use try_lock::TryLock;
12 : use uuid::Uuid;
13 :
14 : use self::parquet::RequestData;
15 : use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
16 : use crate::error::ErrorKind;
17 : use crate::intern::{BranchIdInt, ProjectIdInt};
18 : use crate::metrics::{LatencyAccumulated, LatencyTimer, Metrics, Protocol, Waiting};
19 : use crate::pqproto::StartupMessageParams;
20 : use crate::protocol2::{ConnectionInfo, ConnectionInfoExtra};
21 : use crate::types::{DbName, EndpointId, RoleName};
22 :
23 : pub mod parquet;
24 :
25 : pub(crate) static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
26 : pub(crate) static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> =
27 : OnceCell::new();
28 :
29 : /// Context data for a single request to connect to a database.
30 : ///
31 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
32 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
33 : pub struct RequestContext(
34 : /// To allow easier use of the ctx object, we have interior mutability.
35 : /// I would typically use a RefCell but that would break the `Send` requirements
36 : /// so we need something with thread-safety. `TryLock` is a cheap alternative
37 : /// that offers similar semantics to a `RefCell` but with synchronisation.
38 : TryLock<RequestContextInner>,
39 : );
40 :
41 : struct RequestContextInner {
42 : pub(crate) conn_info: ConnectionInfo,
43 : pub(crate) session_id: Uuid,
44 : pub(crate) protocol: Protocol,
45 : first_packet: chrono::DateTime<Utc>,
46 : pub(crate) span: Span,
47 :
48 : // filled in as they are discovered
49 : project: Option<ProjectIdInt>,
50 : branch: Option<BranchIdInt>,
51 : endpoint_id: Option<EndpointId>,
52 : dbname: Option<DbName>,
53 : user: Option<RoleName>,
54 : application: Option<SmolStr>,
55 : user_agent: Option<SmolStr>,
56 : error_kind: Option<ErrorKind>,
57 : pub(crate) auth_method: Option<AuthMethod>,
58 : jwt_issuer: Option<String>,
59 : success: bool,
60 : pub(crate) cold_start_info: ColdStartInfo,
61 : pg_options: Option<StartupMessageParams>,
62 : testodrome_query_id: Option<SmolStr>,
63 :
64 : // extra
65 : // This sender is here to keep the request monitoring channel open while requests are taking place.
66 : sender: Option<mpsc::UnboundedSender<RequestData>>,
67 : // This sender is only used to log the length of session in case of success.
68 : disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
69 : pub(crate) latency_timer: LatencyTimer,
70 : disconnect_timestamp: Option<chrono::DateTime<Utc>>,
71 : }
72 :
73 : #[derive(Clone, Debug)]
74 : pub(crate) enum AuthMethod {
75 : // aka link
76 : ConsoleRedirect,
77 : ScramSha256,
78 : ScramSha256Plus,
79 : Cleartext,
80 : Jwt,
81 : }
82 :
83 : impl Clone for RequestContext {
84 0 : fn clone(&self) -> Self {
85 0 : let inner = self.0.try_lock().expect("should not deadlock");
86 0 : let new = RequestContextInner {
87 0 : conn_info: inner.conn_info.clone(),
88 0 : session_id: inner.session_id,
89 0 : protocol: inner.protocol,
90 0 : first_packet: inner.first_packet,
91 0 : span: info_span!("background_task"),
92 :
93 0 : project: inner.project,
94 0 : branch: inner.branch,
95 0 : endpoint_id: inner.endpoint_id.clone(),
96 0 : dbname: inner.dbname.clone(),
97 0 : user: inner.user.clone(),
98 0 : application: inner.application.clone(),
99 0 : user_agent: inner.user_agent.clone(),
100 0 : error_kind: inner.error_kind,
101 0 : auth_method: inner.auth_method.clone(),
102 0 : jwt_issuer: inner.jwt_issuer.clone(),
103 0 : success: inner.success,
104 0 : cold_start_info: inner.cold_start_info,
105 0 : pg_options: inner.pg_options.clone(),
106 0 : testodrome_query_id: inner.testodrome_query_id.clone(),
107 :
108 0 : sender: None,
109 0 : disconnect_sender: None,
110 0 : latency_timer: LatencyTimer::noop(inner.protocol),
111 0 : disconnect_timestamp: inner.disconnect_timestamp,
112 : };
113 :
114 0 : Self(TryLock::new(new))
115 0 : }
116 : }
117 :
118 : impl RequestContext {
119 77 : pub fn new(session_id: Uuid, conn_info: ConnectionInfo, protocol: Protocol) -> Self {
120 : // TODO: be careful with long lived spans
121 77 : let span = info_span!(
122 : "connect_request",
123 : %protocol,
124 : ?session_id,
125 : %conn_info,
126 : ep = tracing::field::Empty,
127 : role = tracing::field::Empty,
128 : );
129 :
130 77 : let inner = RequestContextInner {
131 77 : conn_info,
132 77 : session_id,
133 77 : protocol,
134 77 : first_packet: Utc::now(),
135 77 : span,
136 :
137 77 : project: None,
138 77 : branch: None,
139 77 : endpoint_id: None,
140 77 : dbname: None,
141 77 : user: None,
142 77 : application: None,
143 77 : user_agent: None,
144 77 : error_kind: None,
145 77 : auth_method: None,
146 77 : jwt_issuer: None,
147 : success: false,
148 77 : cold_start_info: ColdStartInfo::Unknown,
149 77 : pg_options: None,
150 77 : testodrome_query_id: None,
151 :
152 77 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
153 77 : disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
154 77 : latency_timer: LatencyTimer::new(protocol),
155 77 : disconnect_timestamp: None,
156 : };
157 :
158 77 : Self(TryLock::new(inner))
159 77 : }
160 :
161 : #[cfg(test)]
162 77 : pub(crate) fn test() -> Self {
163 : use std::net::SocketAddr;
164 77 : let ip = IpAddr::from([127, 0, 0, 1]);
165 77 : let addr = SocketAddr::new(ip, 5432);
166 77 : let conn_info = ConnectionInfo { addr, extra: None };
167 77 : RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp)
168 77 : }
169 :
170 0 : pub(crate) fn console_application_name(&self) -> String {
171 0 : let this = self.0.try_lock().expect("should not deadlock");
172 0 : format!(
173 0 : "{}/{}",
174 0 : this.application.as_deref().unwrap_or_default(),
175 0 : this.protocol
176 : )
177 0 : }
178 :
179 0 : pub(crate) fn set_cold_start_info(&self, info: ColdStartInfo) {
180 0 : self.0
181 0 : .try_lock()
182 0 : .expect("should not deadlock")
183 0 : .set_cold_start_info(info);
184 0 : }
185 :
186 0 : pub(crate) fn set_db_options(&self, options: StartupMessageParams) {
187 0 : let mut this = self.0.try_lock().expect("should not deadlock");
188 0 : this.set_application(options.get("application_name").map(SmolStr::from));
189 0 : if let Some(user) = options.get("user") {
190 0 : this.set_user(user.into());
191 0 : }
192 0 : if let Some(dbname) = options.get("database") {
193 0 : this.set_dbname(dbname.into());
194 0 : }
195 :
196 : // Try to get testodrome_query_id directly from parameters
197 0 : if let Some(options_str) = options.get("options") {
198 : // If not found directly, try to extract it from the options string
199 0 : for option in options_str.split_whitespace() {
200 0 : if let Some(value) = option.strip_prefix("neon_query_id:") {
201 0 : this.set_testodrome_id(value.into());
202 0 : break;
203 0 : }
204 : }
205 0 : }
206 :
207 0 : this.pg_options = Some(options);
208 0 : }
209 :
210 0 : pub(crate) fn set_project(&self, x: MetricsAuxInfo) {
211 0 : let mut this = self.0.try_lock().expect("should not deadlock");
212 0 : if this.endpoint_id.is_none() {
213 0 : this.set_endpoint_id(x.endpoint_id.as_str().into());
214 0 : }
215 0 : this.branch = Some(x.branch_id);
216 0 : this.project = Some(x.project_id);
217 0 : this.set_cold_start_info(x.cold_start_info);
218 0 : }
219 :
220 0 : pub(crate) fn set_project_id(&self, project_id: ProjectIdInt) {
221 0 : let mut this = self.0.try_lock().expect("should not deadlock");
222 0 : this.project = Some(project_id);
223 0 : }
224 :
225 29 : pub(crate) fn set_endpoint_id(&self, endpoint_id: EndpointId) {
226 29 : self.0
227 29 : .try_lock()
228 29 : .expect("should not deadlock")
229 29 : .set_endpoint_id(endpoint_id);
230 29 : }
231 :
232 0 : pub(crate) fn set_dbname(&self, dbname: DbName) {
233 0 : self.0
234 0 : .try_lock()
235 0 : .expect("should not deadlock")
236 0 : .set_dbname(dbname);
237 0 : }
238 :
239 0 : pub(crate) fn set_user(&self, user: RoleName) {
240 0 : self.0
241 0 : .try_lock()
242 0 : .expect("should not deadlock")
243 0 : .set_user(user);
244 0 : }
245 :
246 0 : pub(crate) fn set_user_agent(&self, user_agent: Option<SmolStr>) {
247 0 : self.0
248 0 : .try_lock()
249 0 : .expect("should not deadlock")
250 0 : .set_user_agent(user_agent);
251 0 : }
252 :
253 0 : pub(crate) fn set_testodrome_id(&self, query_id: SmolStr) {
254 0 : self.0
255 0 : .try_lock()
256 0 : .expect("should not deadlock")
257 0 : .set_testodrome_id(query_id);
258 0 : }
259 :
260 15 : pub(crate) fn set_auth_method(&self, auth_method: AuthMethod) {
261 15 : let mut this = self.0.try_lock().expect("should not deadlock");
262 15 : this.auth_method = Some(auth_method);
263 15 : }
264 :
265 12 : pub(crate) fn set_jwt_issuer(&self, jwt_issuer: String) {
266 12 : let mut this = self.0.try_lock().expect("should not deadlock");
267 12 : this.jwt_issuer = Some(jwt_issuer);
268 12 : }
269 :
270 0 : pub fn has_private_peer_addr(&self) -> bool {
271 0 : self.0
272 0 : .try_lock()
273 0 : .expect("should not deadlock")
274 0 : .has_private_peer_addr()
275 0 : }
276 :
277 0 : pub(crate) fn set_error_kind(&self, kind: ErrorKind) {
278 0 : let mut this = self.0.try_lock().expect("should not deadlock");
279 : // Do not record errors from the private address to metrics.
280 0 : if !this.has_private_peer_addr() {
281 0 : Metrics::get().proxy.errors_total.inc(kind);
282 0 : }
283 0 : if let Some(ep) = &this.endpoint_id {
284 0 : let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
285 0 : let label = metric.with_labels(kind);
286 0 : metric.get_metric(label).measure(ep);
287 0 : }
288 0 : this.error_kind = Some(kind);
289 0 : }
290 :
291 0 : pub fn set_success(&self) {
292 0 : let mut this = self.0.try_lock().expect("should not deadlock");
293 0 : this.success = true;
294 0 : }
295 :
296 0 : pub fn log_connect(self) -> DisconnectLogger {
297 0 : let mut this = self.0.into_inner();
298 0 : this.log_connect();
299 :
300 : // close current span.
301 0 : this.span = Span::none();
302 :
303 0 : DisconnectLogger(this)
304 0 : }
305 :
306 16 : pub(crate) fn protocol(&self) -> Protocol {
307 16 : self.0.try_lock().expect("should not deadlock").protocol
308 16 : }
309 :
310 0 : pub(crate) fn span(&self) -> Span {
311 0 : self.0.try_lock().expect("should not deadlock").span.clone()
312 0 : }
313 :
314 0 : pub(crate) fn session_id(&self) -> Uuid {
315 0 : self.0.try_lock().expect("should not deadlock").session_id
316 0 : }
317 :
318 3 : pub(crate) fn peer_addr(&self) -> IpAddr {
319 3 : self.0
320 3 : .try_lock()
321 3 : .expect("should not deadlock")
322 3 : .conn_info
323 3 : .addr
324 3 : .ip()
325 3 : }
326 :
327 0 : pub(crate) fn extra(&self) -> Option<ConnectionInfoExtra> {
328 0 : self.0
329 0 : .try_lock()
330 0 : .expect("should not deadlock")
331 0 : .conn_info
332 0 : .extra
333 0 : .clone()
334 0 : }
335 :
336 0 : pub(crate) fn cold_start_info(&self) -> ColdStartInfo {
337 0 : self.0
338 0 : .try_lock()
339 0 : .expect("should not deadlock")
340 0 : .cold_start_info
341 0 : }
342 :
343 52 : pub(crate) fn latency_timer_pause(&self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
344 52 : LatencyTimerPause {
345 52 : ctx: self,
346 52 : start: tokio::time::Instant::now(),
347 52 : waiting_for,
348 52 : }
349 52 : }
350 :
351 0 : pub(crate) fn latency_timer_pause_at(
352 0 : &self,
353 0 : at: tokio::time::Instant,
354 0 : waiting_for: Waiting,
355 0 : ) -> LatencyTimerPause<'_> {
356 0 : LatencyTimerPause {
357 0 : ctx: self,
358 0 : start: at,
359 0 : waiting_for,
360 0 : }
361 0 : }
362 :
363 0 : pub(crate) fn get_proxy_latency(&self) -> LatencyAccumulated {
364 0 : self.0
365 0 : .try_lock()
366 0 : .expect("should not deadlock")
367 0 : .latency_timer
368 0 : .accumulated()
369 0 : }
370 :
371 0 : pub(crate) fn get_testodrome_id(&self) -> Option<SmolStr> {
372 0 : self.0
373 0 : .try_lock()
374 0 : .expect("should not deadlock")
375 0 : .testodrome_query_id
376 0 : .clone()
377 0 : }
378 :
379 8 : pub(crate) fn success(&self) {
380 8 : self.0
381 8 : .try_lock()
382 8 : .expect("should not deadlock")
383 8 : .latency_timer
384 8 : .success();
385 8 : }
386 : }
387 :
388 : pub(crate) struct LatencyTimerPause<'a> {
389 : ctx: &'a RequestContext,
390 : start: tokio::time::Instant,
391 : waiting_for: Waiting,
392 : }
393 :
394 : impl Drop for LatencyTimerPause<'_> {
395 52 : fn drop(&mut self) {
396 52 : self.ctx
397 52 : .0
398 52 : .try_lock()
399 52 : .expect("should not deadlock")
400 52 : .latency_timer
401 52 : .unpause(self.start, self.waiting_for);
402 52 : }
403 : }
404 :
405 : impl RequestContextInner {
406 0 : fn set_cold_start_info(&mut self, info: ColdStartInfo) {
407 0 : self.cold_start_info = info;
408 0 : self.latency_timer.cold_start_info(info);
409 0 : }
410 :
411 29 : fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
412 29 : if self.endpoint_id.is_none() {
413 29 : self.span.record("ep", display(&endpoint_id));
414 29 : let metric = &Metrics::get().proxy.connecting_endpoints;
415 29 : let label = metric.with_labels(self.protocol);
416 29 : metric.get_metric(label).measure(&endpoint_id);
417 29 : self.endpoint_id = Some(endpoint_id);
418 29 : }
419 29 : }
420 :
421 0 : fn set_application(&mut self, app: Option<SmolStr>) {
422 0 : if let Some(app) = app {
423 0 : self.application = Some(app);
424 0 : }
425 0 : }
426 :
427 0 : fn set_user_agent(&mut self, user_agent: Option<SmolStr>) {
428 0 : self.user_agent = user_agent;
429 0 : }
430 :
431 0 : fn set_dbname(&mut self, dbname: DbName) {
432 0 : self.dbname = Some(dbname);
433 0 : }
434 :
435 0 : fn set_user(&mut self, user: RoleName) {
436 0 : self.span.record("role", display(&user));
437 0 : self.user = Some(user);
438 0 : }
439 :
440 0 : fn set_testodrome_id(&mut self, query_id: SmolStr) {
441 0 : self.testodrome_query_id = Some(query_id);
442 0 : }
443 :
444 0 : fn has_private_peer_addr(&self) -> bool {
445 0 : match self.conn_info.addr.ip() {
446 0 : IpAddr::V4(ip) => ip.is_private(),
447 0 : IpAddr::V6(_) => false,
448 : }
449 0 : }
450 :
451 0 : fn log_connect(&mut self) {
452 0 : if let Some(tx) = self.sender.take() {
453 : // If type changes, this error handling needs to be updated.
454 0 : let tx: mpsc::UnboundedSender<RequestData> = tx;
455 0 : if let Err(e) = tx.send(RequestData::from(&*self)) {
456 0 : error!("log_connect channel send failed: {e}");
457 0 : }
458 0 : }
459 0 : }
460 :
461 0 : fn log_disconnect(&mut self) {
462 : // If we are here, it's guaranteed that the user successfully connected to the endpoint.
463 : // Here we log the length of the session.
464 0 : self.disconnect_timestamp = Some(Utc::now());
465 0 : if let Some(tx) = self.disconnect_sender.take() {
466 : // If type changes, this error handling needs to be updated.
467 0 : let tx: mpsc::UnboundedSender<RequestData> = tx;
468 0 : if let Err(e) = tx.send(RequestData::from(&*self)) {
469 0 : error!("log_disconnect channel send failed: {e}");
470 0 : }
471 0 : }
472 0 : }
473 : }
474 :
475 : impl Drop for RequestContextInner {
476 77 : fn drop(&mut self) {
477 77 : if self.sender.is_some() {
478 0 : self.log_connect();
479 77 : }
480 77 : }
481 : }
482 :
483 : pub struct DisconnectLogger(RequestContextInner);
484 :
485 : impl Drop for DisconnectLogger {
486 0 : fn drop(&mut self) {
487 0 : self.0.log_disconnect();
488 0 : }
489 : }
|