Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use std::net::IpAddr;
4 :
5 : use chrono::Utc;
6 : use once_cell::sync::OnceCell;
7 : use pq_proto::StartupMessageParams;
8 : use smol_str::SmolStr;
9 : use tokio::sync::mpsc;
10 : use tracing::field::display;
11 : use tracing::{debug, info_span, Span};
12 : use try_lock::TryLock;
13 : use uuid::Uuid;
14 :
15 : use self::parquet::RequestData;
16 : use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
17 : use crate::error::ErrorKind;
18 : use crate::intern::{BranchIdInt, ProjectIdInt};
19 : use crate::metrics::{
20 : ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting,
21 : };
22 : use crate::protocol2::ConnectionInfo;
23 : use crate::types::{DbName, EndpointId, RoleName};
24 :
25 : pub mod parquet;
26 :
27 : pub(crate) static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
28 : pub(crate) static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> =
29 : OnceCell::new();
30 :
31 : /// Context data for a single request to connect to a database.
32 : ///
33 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
34 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
35 : pub struct RequestContext(
36 : /// To allow easier use of the ctx object, we have interior mutability.
37 : /// I would typically use a RefCell but that would break the `Send` requirements
38 : /// so we need something with thread-safety. `TryLock` is a cheap alternative
39 : /// that offers similar semantics to a `RefCell` but with synchronisation.
40 : TryLock<RequestContextInner>,
41 : );
42 :
43 : struct RequestContextInner {
44 : pub(crate) conn_info: ConnectionInfo,
45 : pub(crate) session_id: Uuid,
46 : pub(crate) protocol: Protocol,
47 : first_packet: chrono::DateTime<Utc>,
48 : region: &'static str,
49 : pub(crate) span: Span,
50 :
51 : // filled in as they are discovered
52 : project: Option<ProjectIdInt>,
53 : branch: Option<BranchIdInt>,
54 : endpoint_id: Option<EndpointId>,
55 : dbname: Option<DbName>,
56 : user: Option<RoleName>,
57 : application: Option<SmolStr>,
58 : error_kind: Option<ErrorKind>,
59 : pub(crate) auth_method: Option<AuthMethod>,
60 : success: bool,
61 : pub(crate) cold_start_info: ColdStartInfo,
62 : pg_options: Option<StartupMessageParams>,
63 :
64 : // extra
65 : // This sender is here to keep the request monitoring channel open while requests are taking place.
66 : sender: Option<mpsc::UnboundedSender<RequestData>>,
67 : // This sender is only used to log the length of session in case of success.
68 : disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
69 : pub(crate) latency_timer: LatencyTimer,
70 : // Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
71 : rejected: Option<bool>,
72 : disconnect_timestamp: Option<chrono::DateTime<Utc>>,
73 : }
74 :
75 : #[derive(Clone, Debug)]
76 : pub(crate) enum AuthMethod {
77 : // aka passwordless, fka link
78 : ConsoleRedirect,
79 : ScramSha256,
80 : ScramSha256Plus,
81 : Cleartext,
82 : }
83 :
84 : impl Clone for RequestContext {
85 0 : fn clone(&self) -> Self {
86 0 : let inner = self.0.try_lock().expect("should not deadlock");
87 0 : let new = RequestContextInner {
88 0 : conn_info: inner.conn_info.clone(),
89 0 : session_id: inner.session_id,
90 0 : protocol: inner.protocol,
91 0 : first_packet: inner.first_packet,
92 0 : region: inner.region,
93 0 : span: info_span!("background_task"),
94 :
95 0 : project: inner.project,
96 0 : branch: inner.branch,
97 0 : endpoint_id: inner.endpoint_id.clone(),
98 0 : dbname: inner.dbname.clone(),
99 0 : user: inner.user.clone(),
100 0 : application: inner.application.clone(),
101 0 : error_kind: inner.error_kind,
102 0 : auth_method: inner.auth_method.clone(),
103 0 : success: inner.success,
104 0 : rejected: inner.rejected,
105 0 : cold_start_info: inner.cold_start_info,
106 0 : pg_options: inner.pg_options.clone(),
107 0 :
108 0 : sender: None,
109 0 : disconnect_sender: None,
110 0 : latency_timer: LatencyTimer::noop(inner.protocol),
111 0 : disconnect_timestamp: inner.disconnect_timestamp,
112 0 : };
113 0 :
114 0 : Self(TryLock::new(new))
115 0 : }
116 : }
117 :
118 : impl RequestContext {
119 70 : pub fn new(
120 70 : session_id: Uuid,
121 70 : conn_info: ConnectionInfo,
122 70 : protocol: Protocol,
123 70 : region: &'static str,
124 70 : ) -> Self {
125 : // TODO: be careful with long lived spans
126 70 : let span = info_span!(
127 70 : "connect_request",
128 70 : %protocol,
129 70 : ?session_id,
130 70 : %conn_info,
131 70 : ep = tracing::field::Empty,
132 70 : role = tracing::field::Empty,
133 70 : );
134 :
135 70 : let inner = RequestContextInner {
136 70 : conn_info,
137 70 : session_id,
138 70 : protocol,
139 70 : first_packet: Utc::now(),
140 70 : region,
141 70 : span,
142 70 :
143 70 : project: None,
144 70 : branch: None,
145 70 : endpoint_id: None,
146 70 : dbname: None,
147 70 : user: None,
148 70 : application: None,
149 70 : error_kind: None,
150 70 : auth_method: None,
151 70 : success: false,
152 70 : rejected: None,
153 70 : cold_start_info: ColdStartInfo::Unknown,
154 70 : pg_options: None,
155 70 :
156 70 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
157 70 : disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
158 70 : latency_timer: LatencyTimer::new(protocol),
159 70 : disconnect_timestamp: None,
160 70 : };
161 70 :
162 70 : Self(TryLock::new(inner))
163 70 : }
164 :
165 : #[cfg(test)]
166 70 : pub(crate) fn test() -> Self {
167 : use std::net::SocketAddr;
168 70 : let ip = IpAddr::from([127, 0, 0, 1]);
169 70 : let addr = SocketAddr::new(ip, 5432);
170 70 : let conn_info = ConnectionInfo { addr, extra: None };
171 70 : RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test")
172 70 : }
173 :
174 0 : pub(crate) fn console_application_name(&self) -> String {
175 0 : let this = self.0.try_lock().expect("should not deadlock");
176 0 : format!(
177 0 : "{}/{}",
178 0 : this.application.as_deref().unwrap_or_default(),
179 0 : this.protocol
180 0 : )
181 0 : }
182 :
183 0 : pub(crate) fn set_rejected(&self, rejected: bool) {
184 0 : let mut this = self.0.try_lock().expect("should not deadlock");
185 0 : this.rejected = Some(rejected);
186 0 : }
187 :
188 0 : pub(crate) fn set_cold_start_info(&self, info: ColdStartInfo) {
189 0 : self.0
190 0 : .try_lock()
191 0 : .expect("should not deadlock")
192 0 : .set_cold_start_info(info);
193 0 : }
194 :
195 0 : pub(crate) fn set_db_options(&self, options: StartupMessageParams) {
196 0 : let mut this = self.0.try_lock().expect("should not deadlock");
197 0 : this.set_application(options.get("application_name").map(SmolStr::from));
198 0 : if let Some(user) = options.get("user") {
199 0 : this.set_user(user.into());
200 0 : }
201 0 : if let Some(dbname) = options.get("database") {
202 0 : this.set_dbname(dbname.into());
203 0 : }
204 :
205 0 : this.pg_options = Some(options);
206 0 : }
207 :
208 0 : pub(crate) fn set_project(&self, x: MetricsAuxInfo) {
209 0 : let mut this = self.0.try_lock().expect("should not deadlock");
210 0 : if this.endpoint_id.is_none() {
211 0 : this.set_endpoint_id(x.endpoint_id.as_str().into());
212 0 : }
213 0 : this.branch = Some(x.branch_id);
214 0 : this.project = Some(x.project_id);
215 0 : this.set_cold_start_info(x.cold_start_info);
216 0 : }
217 :
218 0 : pub(crate) fn set_project_id(&self, project_id: ProjectIdInt) {
219 0 : let mut this = self.0.try_lock().expect("should not deadlock");
220 0 : this.project = Some(project_id);
221 0 : }
222 :
223 28 : pub(crate) fn set_endpoint_id(&self, endpoint_id: EndpointId) {
224 28 : self.0
225 28 : .try_lock()
226 28 : .expect("should not deadlock")
227 28 : .set_endpoint_id(endpoint_id);
228 28 : }
229 :
230 0 : pub(crate) fn set_dbname(&self, dbname: DbName) {
231 0 : self.0
232 0 : .try_lock()
233 0 : .expect("should not deadlock")
234 0 : .set_dbname(dbname);
235 0 : }
236 :
237 0 : pub(crate) fn set_user(&self, user: RoleName) {
238 0 : self.0
239 0 : .try_lock()
240 0 : .expect("should not deadlock")
241 0 : .set_user(user);
242 0 : }
243 :
244 15 : pub(crate) fn set_auth_method(&self, auth_method: AuthMethod) {
245 15 : let mut this = self.0.try_lock().expect("should not deadlock");
246 15 : this.auth_method = Some(auth_method);
247 15 : }
248 :
249 0 : pub fn has_private_peer_addr(&self) -> bool {
250 0 : self.0
251 0 : .try_lock()
252 0 : .expect("should not deadlock")
253 0 : .has_private_peer_addr()
254 0 : }
255 :
256 0 : pub(crate) fn set_error_kind(&self, kind: ErrorKind) {
257 0 : let mut this = self.0.try_lock().expect("should not deadlock");
258 0 : // Do not record errors from the private address to metrics.
259 0 : if !this.has_private_peer_addr() {
260 0 : Metrics::get().proxy.errors_total.inc(kind);
261 0 : }
262 0 : if let Some(ep) = &this.endpoint_id {
263 0 : let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
264 0 : let label = metric.with_labels(kind);
265 0 : metric.get_metric(label).measure(ep);
266 0 : }
267 0 : this.error_kind = Some(kind);
268 0 : }
269 :
270 0 : pub fn set_success(&self) {
271 0 : let mut this = self.0.try_lock().expect("should not deadlock");
272 0 : this.success = true;
273 0 : }
274 :
275 0 : pub fn log_connect(&self) {
276 0 : self.0
277 0 : .try_lock()
278 0 : .expect("should not deadlock")
279 0 : .log_connect();
280 0 : }
281 :
282 0 : pub(crate) fn protocol(&self) -> Protocol {
283 0 : self.0.try_lock().expect("should not deadlock").protocol
284 0 : }
285 :
286 0 : pub(crate) fn span(&self) -> Span {
287 0 : self.0.try_lock().expect("should not deadlock").span.clone()
288 0 : }
289 :
290 0 : pub(crate) fn session_id(&self) -> Uuid {
291 0 : self.0.try_lock().expect("should not deadlock").session_id
292 0 : }
293 :
294 6 : pub(crate) fn peer_addr(&self) -> IpAddr {
295 6 : self.0
296 6 : .try_lock()
297 6 : .expect("should not deadlock")
298 6 : .conn_info
299 6 : .addr
300 6 : .ip()
301 6 : }
302 :
303 0 : pub(crate) fn cold_start_info(&self) -> ColdStartInfo {
304 0 : self.0
305 0 : .try_lock()
306 0 : .expect("should not deadlock")
307 0 : .cold_start_info
308 0 : }
309 :
310 28 : pub(crate) fn latency_timer_pause(&self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
311 28 : LatencyTimerPause {
312 28 : ctx: self,
313 28 : start: tokio::time::Instant::now(),
314 28 : waiting_for,
315 28 : }
316 28 : }
317 :
318 4 : pub(crate) fn success(&self) {
319 4 : self.0
320 4 : .try_lock()
321 4 : .expect("should not deadlock")
322 4 : .latency_timer
323 4 : .success();
324 4 : }
325 : }
326 :
327 : pub(crate) struct LatencyTimerPause<'a> {
328 : ctx: &'a RequestContext,
329 : start: tokio::time::Instant,
330 : waiting_for: Waiting,
331 : }
332 :
333 : impl Drop for LatencyTimerPause<'_> {
334 28 : fn drop(&mut self) {
335 28 : self.ctx
336 28 : .0
337 28 : .try_lock()
338 28 : .expect("should not deadlock")
339 28 : .latency_timer
340 28 : .unpause(self.start, self.waiting_for);
341 28 : }
342 : }
343 :
344 : impl RequestContextInner {
345 0 : fn set_cold_start_info(&mut self, info: ColdStartInfo) {
346 0 : self.cold_start_info = info;
347 0 : self.latency_timer.cold_start_info(info);
348 0 : }
349 :
350 28 : fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
351 28 : if self.endpoint_id.is_none() {
352 28 : self.span.record("ep", display(&endpoint_id));
353 28 : let metric = &Metrics::get().proxy.connecting_endpoints;
354 28 : let label = metric.with_labels(self.protocol);
355 28 : metric.get_metric(label).measure(&endpoint_id);
356 28 : self.endpoint_id = Some(endpoint_id);
357 28 : }
358 28 : }
359 :
360 0 : fn set_application(&mut self, app: Option<SmolStr>) {
361 0 : if let Some(app) = app {
362 0 : self.application = Some(app);
363 0 : }
364 0 : }
365 :
366 0 : fn set_dbname(&mut self, dbname: DbName) {
367 0 : self.dbname = Some(dbname);
368 0 : }
369 :
370 0 : fn set_user(&mut self, user: RoleName) {
371 0 : self.span.record("role", display(&user));
372 0 : self.user = Some(user);
373 0 : }
374 :
375 0 : fn has_private_peer_addr(&self) -> bool {
376 0 : match self.conn_info.addr.ip() {
377 0 : IpAddr::V4(ip) => ip.is_private(),
378 0 : IpAddr::V6(_) => false,
379 : }
380 0 : }
381 :
382 0 : fn log_connect(&mut self) {
383 0 : let outcome = if self.success {
384 0 : ConnectOutcome::Success
385 : } else {
386 0 : ConnectOutcome::Failed
387 : };
388 :
389 : // TODO: get rid of entirely/refactor
390 : // check for false positives
391 : // AND false negatives
392 0 : if let Some(rejected) = self.rejected {
393 0 : let ep = self
394 0 : .endpoint_id
395 0 : .as_ref()
396 0 : .map(|x| x.as_str())
397 0 : .unwrap_or_default();
398 0 : // This makes sense only if cache is disabled
399 0 : debug!(
400 : ?outcome,
401 : ?rejected,
402 : ?ep,
403 0 : "check endpoint is valid with outcome"
404 : );
405 0 : Metrics::get()
406 0 : .proxy
407 0 : .invalid_endpoints_total
408 0 : .inc(InvalidEndpointsGroup {
409 0 : protocol: self.protocol,
410 0 : rejected: rejected.into(),
411 0 : outcome,
412 0 : });
413 0 : }
414 0 : if let Some(tx) = self.sender.take() {
415 0 : tx.send(RequestData::from(&*self))
416 0 : .inspect_err(|e| debug!("tx send failed: {e}"))
417 0 : .ok();
418 0 : }
419 0 : }
420 :
421 70 : fn log_disconnect(&mut self) {
422 70 : // If we are here, it's guaranteed that the user successfully connected to the endpoint.
423 70 : // Here we log the length of the session.
424 70 : self.disconnect_timestamp = Some(Utc::now());
425 70 : if let Some(tx) = self.disconnect_sender.take() {
426 0 : tx.send(RequestData::from(&*self))
427 0 : .inspect_err(|e| debug!("tx send failed: {e}"))
428 0 : .ok();
429 70 : }
430 70 : }
431 : }
432 :
433 : impl Drop for RequestContextInner {
434 70 : fn drop(&mut self) {
435 70 : if self.sender.is_some() {
436 0 : self.log_connect();
437 70 : } else {
438 70 : self.log_disconnect();
439 70 : }
440 70 : }
441 : }
|