Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use std::net::IpAddr;
4 :
5 : use chrono::Utc;
6 : use once_cell::sync::OnceCell;
7 : use pq_proto::StartupMessageParams;
8 : use smol_str::SmolStr;
9 : use tokio::sync::mpsc;
10 : use tracing::field::display;
11 : use tracing::{debug, info, info_span, Span};
12 : use try_lock::TryLock;
13 : use uuid::Uuid;
14 :
15 : use self::parquet::RequestData;
16 : use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
17 : use crate::error::ErrorKind;
18 : use crate::intern::{BranchIdInt, ProjectIdInt};
19 : use crate::metrics::{
20 : ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting,
21 : };
22 : use crate::{DbName, EndpointId, RoleName};
23 :
24 : pub mod parquet;
25 :
26 : pub(crate) static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
27 : pub(crate) static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> =
28 : OnceCell::new();
29 :
30 : /// Context data for a single request to connect to a database.
31 : ///
32 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
33 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
34 : pub struct RequestMonitoring(
35 : /// To allow easier use of the ctx object, we have interior mutability.
36 : /// I would typically use a RefCell but that would break the `Send` requirements
37 : /// so we need something with thread-safety. `TryLock` is a cheap alternative
38 : /// that offers similar semantics to a `RefCell` but with synchronisation.
39 : TryLock<RequestMonitoringInner>,
40 : );
41 :
42 : struct RequestMonitoringInner {
43 : pub(crate) peer_addr: IpAddr,
44 : pub(crate) session_id: Uuid,
45 : pub(crate) protocol: Protocol,
46 : first_packet: chrono::DateTime<Utc>,
47 : region: &'static str,
48 : pub(crate) span: Span,
49 :
50 : // filled in as they are discovered
51 : project: Option<ProjectIdInt>,
52 : branch: Option<BranchIdInt>,
53 : endpoint_id: Option<EndpointId>,
54 : dbname: Option<DbName>,
55 : user: Option<RoleName>,
56 : application: Option<SmolStr>,
57 : error_kind: Option<ErrorKind>,
58 : pub(crate) auth_method: Option<AuthMethod>,
59 : success: bool,
60 : pub(crate) cold_start_info: ColdStartInfo,
61 : pg_options: Option<StartupMessageParams>,
62 :
63 : // extra
64 : // This sender is here to keep the request monitoring channel open while requests are taking place.
65 : sender: Option<mpsc::UnboundedSender<RequestData>>,
66 : // This sender is only used to log the length of session in case of success.
67 : disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
68 : pub(crate) latency_timer: LatencyTimer,
69 : // Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
70 : rejected: Option<bool>,
71 : disconnect_timestamp: Option<chrono::DateTime<Utc>>,
72 : }
73 :
74 : #[derive(Clone, Debug)]
75 : pub(crate) enum AuthMethod {
76 : // aka passwordless, fka link
77 : Web,
78 : ScramSha256,
79 : ScramSha256Plus,
80 : Cleartext,
81 : }
82 :
83 : impl Clone for RequestMonitoring {
84 0 : fn clone(&self) -> Self {
85 0 : let inner = self.0.try_lock().expect("should not deadlock");
86 0 : let new = RequestMonitoringInner {
87 0 : peer_addr: inner.peer_addr,
88 0 : session_id: inner.session_id,
89 0 : protocol: inner.protocol,
90 0 : first_packet: inner.first_packet,
91 0 : region: inner.region,
92 0 : span: info_span!("background_task"),
93 :
94 0 : project: inner.project,
95 0 : branch: inner.branch,
96 0 : endpoint_id: inner.endpoint_id.clone(),
97 0 : dbname: inner.dbname.clone(),
98 0 : user: inner.user.clone(),
99 0 : application: inner.application.clone(),
100 0 : error_kind: inner.error_kind,
101 0 : auth_method: inner.auth_method.clone(),
102 0 : success: inner.success,
103 0 : rejected: inner.rejected,
104 0 : cold_start_info: inner.cold_start_info,
105 0 : pg_options: inner.pg_options.clone(),
106 0 :
107 0 : sender: None,
108 0 : disconnect_sender: None,
109 0 : latency_timer: LatencyTimer::noop(inner.protocol),
110 0 : disconnect_timestamp: inner.disconnect_timestamp,
111 0 : };
112 0 :
113 0 : Self(TryLock::new(new))
114 0 : }
115 : }
116 :
117 : impl RequestMonitoring {
118 67 : pub fn new(
119 67 : session_id: Uuid,
120 67 : peer_addr: IpAddr,
121 67 : protocol: Protocol,
122 67 : region: &'static str,
123 67 : ) -> Self {
124 67 : let span = info_span!(
125 67 : "connect_request",
126 67 : %protocol,
127 67 : ?session_id,
128 67 : %peer_addr,
129 67 : ep = tracing::field::Empty,
130 67 : role = tracing::field::Empty,
131 67 : );
132 :
133 67 : let inner = RequestMonitoringInner {
134 67 : peer_addr,
135 67 : session_id,
136 67 : protocol,
137 67 : first_packet: Utc::now(),
138 67 : region,
139 67 : span,
140 67 :
141 67 : project: None,
142 67 : branch: None,
143 67 : endpoint_id: None,
144 67 : dbname: None,
145 67 : user: None,
146 67 : application: None,
147 67 : error_kind: None,
148 67 : auth_method: None,
149 67 : success: false,
150 67 : rejected: None,
151 67 : cold_start_info: ColdStartInfo::Unknown,
152 67 : pg_options: None,
153 67 :
154 67 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
155 67 : disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
156 67 : latency_timer: LatencyTimer::new(protocol),
157 67 : disconnect_timestamp: None,
158 67 : };
159 67 :
160 67 : Self(TryLock::new(inner))
161 67 : }
162 :
163 : #[cfg(test)]
164 67 : pub(crate) fn test() -> Self {
165 67 : RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), Protocol::Tcp, "test")
166 67 : }
167 :
168 0 : pub(crate) fn console_application_name(&self) -> String {
169 0 : let this = self.0.try_lock().expect("should not deadlock");
170 0 : format!(
171 0 : "{}/{}",
172 0 : this.application.as_deref().unwrap_or_default(),
173 0 : this.protocol
174 0 : )
175 0 : }
176 :
177 0 : pub(crate) fn set_rejected(&self, rejected: bool) {
178 0 : let mut this = self.0.try_lock().expect("should not deadlock");
179 0 : this.rejected = Some(rejected);
180 0 : }
181 :
182 0 : pub(crate) fn set_cold_start_info(&self, info: ColdStartInfo) {
183 0 : self.0
184 0 : .try_lock()
185 0 : .expect("should not deadlock")
186 0 : .set_cold_start_info(info);
187 0 : }
188 :
189 0 : pub(crate) fn set_db_options(&self, options: StartupMessageParams) {
190 0 : let mut this = self.0.try_lock().expect("should not deadlock");
191 0 : this.set_application(options.get("application_name").map(SmolStr::from));
192 0 : if let Some(user) = options.get("user") {
193 0 : this.set_user(user.into());
194 0 : }
195 0 : if let Some(dbname) = options.get("database") {
196 0 : this.set_dbname(dbname.into());
197 0 : }
198 :
199 0 : this.pg_options = Some(options);
200 0 : }
201 :
202 0 : pub(crate) fn set_project(&self, x: MetricsAuxInfo) {
203 0 : let mut this = self.0.try_lock().expect("should not deadlock");
204 0 : if this.endpoint_id.is_none() {
205 0 : this.set_endpoint_id(x.endpoint_id.as_str().into());
206 0 : }
207 0 : this.branch = Some(x.branch_id);
208 0 : this.project = Some(x.project_id);
209 0 : this.set_cold_start_info(x.cold_start_info);
210 0 : }
211 :
212 0 : pub(crate) fn set_project_id(&self, project_id: ProjectIdInt) {
213 0 : let mut this = self.0.try_lock().expect("should not deadlock");
214 0 : this.project = Some(project_id);
215 0 : }
216 :
217 28 : pub(crate) fn set_endpoint_id(&self, endpoint_id: EndpointId) {
218 28 : self.0
219 28 : .try_lock()
220 28 : .expect("should not deadlock")
221 28 : .set_endpoint_id(endpoint_id);
222 28 : }
223 :
224 0 : pub(crate) fn set_dbname(&self, dbname: DbName) {
225 0 : self.0
226 0 : .try_lock()
227 0 : .expect("should not deadlock")
228 0 : .set_dbname(dbname);
229 0 : }
230 :
231 0 : pub(crate) fn set_user(&self, user: RoleName) {
232 0 : self.0
233 0 : .try_lock()
234 0 : .expect("should not deadlock")
235 0 : .set_user(user);
236 0 : }
237 :
238 15 : pub(crate) fn set_auth_method(&self, auth_method: AuthMethod) {
239 15 : let mut this = self.0.try_lock().expect("should not deadlock");
240 15 : this.auth_method = Some(auth_method);
241 15 : }
242 :
243 0 : pub fn has_private_peer_addr(&self) -> bool {
244 0 : self.0
245 0 : .try_lock()
246 0 : .expect("should not deadlock")
247 0 : .has_private_peer_addr()
248 0 : }
249 :
250 0 : pub(crate) fn set_error_kind(&self, kind: ErrorKind) {
251 0 : let mut this = self.0.try_lock().expect("should not deadlock");
252 0 : // Do not record errors from the private address to metrics.
253 0 : if !this.has_private_peer_addr() {
254 0 : Metrics::get().proxy.errors_total.inc(kind);
255 0 : }
256 0 : if let Some(ep) = &this.endpoint_id {
257 0 : let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
258 0 : let label = metric.with_labels(kind);
259 0 : metric.get_metric(label).measure(ep);
260 0 : }
261 0 : this.error_kind = Some(kind);
262 0 : }
263 :
264 0 : pub fn set_success(&self) {
265 0 : let mut this = self.0.try_lock().expect("should not deadlock");
266 0 : this.success = true;
267 0 : }
268 :
269 0 : pub fn log_connect(&self) {
270 0 : self.0
271 0 : .try_lock()
272 0 : .expect("should not deadlock")
273 0 : .log_connect();
274 0 : }
275 :
276 0 : pub(crate) fn protocol(&self) -> Protocol {
277 0 : self.0.try_lock().expect("should not deadlock").protocol
278 0 : }
279 :
280 0 : pub(crate) fn span(&self) -> Span {
281 0 : self.0.try_lock().expect("should not deadlock").span.clone()
282 0 : }
283 :
284 0 : pub(crate) fn session_id(&self) -> Uuid {
285 0 : self.0.try_lock().expect("should not deadlock").session_id
286 0 : }
287 :
288 6 : pub(crate) fn peer_addr(&self) -> IpAddr {
289 6 : self.0.try_lock().expect("should not deadlock").peer_addr
290 6 : }
291 :
292 0 : pub(crate) fn cold_start_info(&self) -> ColdStartInfo {
293 0 : self.0
294 0 : .try_lock()
295 0 : .expect("should not deadlock")
296 0 : .cold_start_info
297 0 : }
298 :
299 22 : pub(crate) fn latency_timer_pause(&self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
300 22 : LatencyTimerPause {
301 22 : ctx: self,
302 22 : start: tokio::time::Instant::now(),
303 22 : waiting_for,
304 22 : }
305 22 : }
306 :
307 4 : pub(crate) fn success(&self) {
308 4 : self.0
309 4 : .try_lock()
310 4 : .expect("should not deadlock")
311 4 : .latency_timer
312 4 : .success();
313 4 : }
314 : }
315 :
316 : pub(crate) struct LatencyTimerPause<'a> {
317 : ctx: &'a RequestMonitoring,
318 : start: tokio::time::Instant,
319 : waiting_for: Waiting,
320 : }
321 :
322 : impl Drop for LatencyTimerPause<'_> {
323 22 : fn drop(&mut self) {
324 22 : self.ctx
325 22 : .0
326 22 : .try_lock()
327 22 : .expect("should not deadlock")
328 22 : .latency_timer
329 22 : .unpause(self.start, self.waiting_for);
330 22 : }
331 : }
332 :
333 : impl RequestMonitoringInner {
334 0 : fn set_cold_start_info(&mut self, info: ColdStartInfo) {
335 0 : self.cold_start_info = info;
336 0 : self.latency_timer.cold_start_info(info);
337 0 : }
338 :
339 28 : fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
340 28 : if self.endpoint_id.is_none() {
341 28 : self.span.record("ep", display(&endpoint_id));
342 28 : let metric = &Metrics::get().proxy.connecting_endpoints;
343 28 : let label = metric.with_labels(self.protocol);
344 28 : metric.get_metric(label).measure(&endpoint_id);
345 28 : self.endpoint_id = Some(endpoint_id);
346 28 : }
347 28 : }
348 :
349 0 : fn set_application(&mut self, app: Option<SmolStr>) {
350 0 : if let Some(app) = app {
351 0 : self.application = Some(app);
352 0 : }
353 0 : }
354 :
355 0 : fn set_dbname(&mut self, dbname: DbName) {
356 0 : self.dbname = Some(dbname);
357 0 : }
358 :
359 0 : fn set_user(&mut self, user: RoleName) {
360 0 : self.span.record("role", display(&user));
361 0 : self.user = Some(user);
362 0 : }
363 :
364 0 : fn has_private_peer_addr(&self) -> bool {
365 0 : match self.peer_addr {
366 0 : IpAddr::V4(ip) => ip.is_private(),
367 0 : IpAddr::V6(_) => false,
368 : }
369 0 : }
370 :
371 0 : fn log_connect(&mut self) {
372 0 : let outcome = if self.success {
373 0 : ConnectOutcome::Success
374 : } else {
375 0 : ConnectOutcome::Failed
376 : };
377 0 : if let Some(rejected) = self.rejected {
378 0 : let ep = self
379 0 : .endpoint_id
380 0 : .as_ref()
381 0 : .map(|x| x.as_str())
382 0 : .unwrap_or_default();
383 0 : // This makes sense only if cache is disabled
384 0 : info!(
385 : ?outcome,
386 : ?rejected,
387 : ?ep,
388 0 : "check endpoint is valid with outcome"
389 : );
390 0 : Metrics::get()
391 0 : .proxy
392 0 : .invalid_endpoints_total
393 0 : .inc(InvalidEndpointsGroup {
394 0 : protocol: self.protocol,
395 0 : rejected: rejected.into(),
396 0 : outcome,
397 0 : });
398 0 : }
399 0 : if let Some(tx) = self.sender.take() {
400 0 : tx.send(RequestData::from(&*self))
401 0 : .inspect_err(|e| debug!("tx send failed: {e}"))
402 0 : .ok();
403 0 : }
404 0 : }
405 :
406 67 : fn log_disconnect(&mut self) {
407 67 : // If we are here, it's guaranteed that the user successfully connected to the endpoint.
408 67 : // Here we log the length of the session.
409 67 : self.disconnect_timestamp = Some(Utc::now());
410 67 : if let Some(tx) = self.disconnect_sender.take() {
411 0 : tx.send(RequestData::from(&*self))
412 0 : .inspect_err(|e| debug!("tx send failed: {e}"))
413 0 : .ok();
414 67 : }
415 67 : }
416 : }
417 :
418 : impl Drop for RequestMonitoringInner {
419 67 : fn drop(&mut self) {
420 67 : if self.sender.is_some() {
421 0 : self.log_connect();
422 67 : } else {
423 67 : self.log_disconnect();
424 67 : }
425 67 : }
426 : }
|