Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use chrono::Utc;
4 : use once_cell::sync::OnceCell;
5 : use pq_proto::StartupMessageParams;
6 : use smol_str::SmolStr;
7 : use std::net::IpAddr;
8 : use tokio::sync::mpsc;
9 : use tracing::{debug, field::display, info, info_span, Span};
10 : use try_lock::TryLock;
11 : use uuid::Uuid;
12 :
13 : use crate::{
14 : console::messages::{ColdStartInfo, MetricsAuxInfo},
15 : error::ErrorKind,
16 : intern::{BranchIdInt, ProjectIdInt},
17 : metrics::{ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting},
18 : DbName, EndpointId, RoleName,
19 : };
20 :
21 : use self::parquet::RequestData;
22 :
23 : pub mod parquet;
24 :
25 : pub(crate) static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
26 : pub(crate) static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> =
27 : OnceCell::new();
28 :
29 : /// Context data for a single request to connect to a database.
30 : ///
31 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
32 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
33 : pub struct RequestMonitoring(
34 : /// To allow easier use of the ctx object, we have interior mutability.
35 : /// I would typically use a RefCell but that would break the `Send` requirements
36 : /// so we need something with thread-safety. `TryLock` is a cheap alternative
37 : /// that offers similar semantics to a `RefCell` but with synchronisation.
38 : TryLock<RequestMonitoringInner>,
39 : );
40 :
41 : struct RequestMonitoringInner {
42 : pub(crate) peer_addr: IpAddr,
43 : pub(crate) session_id: Uuid,
44 : pub(crate) protocol: Protocol,
45 : first_packet: chrono::DateTime<Utc>,
46 : region: &'static str,
47 : pub(crate) span: Span,
48 :
49 : // filled in as they are discovered
50 : project: Option<ProjectIdInt>,
51 : branch: Option<BranchIdInt>,
52 : endpoint_id: Option<EndpointId>,
53 : dbname: Option<DbName>,
54 : user: Option<RoleName>,
55 : application: Option<SmolStr>,
56 : error_kind: Option<ErrorKind>,
57 : pub(crate) auth_method: Option<AuthMethod>,
58 : success: bool,
59 : pub(crate) cold_start_info: ColdStartInfo,
60 : pg_options: Option<StartupMessageParams>,
61 :
62 : // extra
63 : // This sender is here to keep the request monitoring channel open while requests are taking place.
64 : sender: Option<mpsc::UnboundedSender<RequestData>>,
65 : // This sender is only used to log the length of session in case of success.
66 : disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
67 : pub(crate) latency_timer: LatencyTimer,
68 : // Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
69 : rejected: Option<bool>,
70 : disconnect_timestamp: Option<chrono::DateTime<Utc>>,
71 : }
72 :
73 : #[derive(Clone, Debug)]
74 : pub(crate) enum AuthMethod {
75 : // aka passwordless, fka link
76 : Web,
77 : ScramSha256,
78 : ScramSha256Plus,
79 : Cleartext,
80 : }
81 :
82 : impl RequestMonitoring {
83 61 : pub fn new(
84 61 : session_id: Uuid,
85 61 : peer_addr: IpAddr,
86 61 : protocol: Protocol,
87 61 : region: &'static str,
88 61 : ) -> Self {
89 61 : let span = info_span!(
90 61 : "connect_request",
91 61 : %protocol,
92 61 : ?session_id,
93 61 : %peer_addr,
94 61 : ep = tracing::field::Empty,
95 61 : role = tracing::field::Empty,
96 61 : );
97 :
98 61 : let inner = RequestMonitoringInner {
99 61 : peer_addr,
100 61 : session_id,
101 61 : protocol,
102 61 : first_packet: Utc::now(),
103 61 : region,
104 61 : span,
105 61 :
106 61 : project: None,
107 61 : branch: None,
108 61 : endpoint_id: None,
109 61 : dbname: None,
110 61 : user: None,
111 61 : application: None,
112 61 : error_kind: None,
113 61 : auth_method: None,
114 61 : success: false,
115 61 : rejected: None,
116 61 : cold_start_info: ColdStartInfo::Unknown,
117 61 : pg_options: None,
118 61 :
119 61 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
120 61 : disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
121 61 : latency_timer: LatencyTimer::new(protocol),
122 61 : disconnect_timestamp: None,
123 61 : };
124 61 :
125 61 : Self(TryLock::new(inner))
126 61 : }
127 :
128 : #[cfg(test)]
129 61 : pub(crate) fn test() -> Self {
130 61 : RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), Protocol::Tcp, "test")
131 61 : }
132 :
133 0 : pub(crate) fn console_application_name(&self) -> String {
134 0 : let this = self.0.try_lock().expect("should not deadlock");
135 0 : format!(
136 0 : "{}/{}",
137 0 : this.application.as_deref().unwrap_or_default(),
138 0 : this.protocol
139 0 : )
140 0 : }
141 :
142 0 : pub(crate) fn set_rejected(&self, rejected: bool) {
143 0 : let mut this = self.0.try_lock().expect("should not deadlock");
144 0 : this.rejected = Some(rejected);
145 0 : }
146 :
147 0 : pub(crate) fn set_cold_start_info(&self, info: ColdStartInfo) {
148 0 : self.0
149 0 : .try_lock()
150 0 : .expect("should not deadlock")
151 0 : .set_cold_start_info(info);
152 0 : }
153 :
154 0 : pub(crate) fn set_db_options(&self, options: StartupMessageParams) {
155 0 : let mut this = self.0.try_lock().expect("should not deadlock");
156 0 : this.set_application(options.get("application_name").map(SmolStr::from));
157 0 : if let Some(user) = options.get("user") {
158 0 : this.set_user(user.into());
159 0 : }
160 0 : if let Some(dbname) = options.get("database") {
161 0 : this.set_dbname(dbname.into());
162 0 : }
163 :
164 0 : this.pg_options = Some(options);
165 0 : }
166 :
167 0 : pub(crate) fn set_project(&self, x: MetricsAuxInfo) {
168 0 : let mut this = self.0.try_lock().expect("should not deadlock");
169 0 : if this.endpoint_id.is_none() {
170 0 : this.set_endpoint_id(x.endpoint_id.as_str().into());
171 0 : }
172 0 : this.branch = Some(x.branch_id);
173 0 : this.project = Some(x.project_id);
174 0 : this.set_cold_start_info(x.cold_start_info);
175 0 : }
176 :
177 0 : pub(crate) fn set_project_id(&self, project_id: ProjectIdInt) {
178 0 : let mut this = self.0.try_lock().expect("should not deadlock");
179 0 : this.project = Some(project_id);
180 0 : }
181 :
182 28 : pub(crate) fn set_endpoint_id(&self, endpoint_id: EndpointId) {
183 28 : self.0
184 28 : .try_lock()
185 28 : .expect("should not deadlock")
186 28 : .set_endpoint_id(endpoint_id);
187 28 : }
188 :
189 0 : pub(crate) fn set_dbname(&self, dbname: DbName) {
190 0 : self.0
191 0 : .try_lock()
192 0 : .expect("should not deadlock")
193 0 : .set_dbname(dbname);
194 0 : }
195 :
196 0 : pub(crate) fn set_user(&self, user: RoleName) {
197 0 : self.0
198 0 : .try_lock()
199 0 : .expect("should not deadlock")
200 0 : .set_user(user);
201 0 : }
202 :
203 15 : pub(crate) fn set_auth_method(&self, auth_method: AuthMethod) {
204 15 : let mut this = self.0.try_lock().expect("should not deadlock");
205 15 : this.auth_method = Some(auth_method);
206 15 : }
207 :
208 0 : pub fn has_private_peer_addr(&self) -> bool {
209 0 : self.0
210 0 : .try_lock()
211 0 : .expect("should not deadlock")
212 0 : .has_private_peer_addr()
213 0 : }
214 :
215 0 : pub(crate) fn set_error_kind(&self, kind: ErrorKind) {
216 0 : let mut this = self.0.try_lock().expect("should not deadlock");
217 0 : // Do not record errors from the private address to metrics.
218 0 : if !this.has_private_peer_addr() {
219 0 : Metrics::get().proxy.errors_total.inc(kind);
220 0 : }
221 0 : if let Some(ep) = &this.endpoint_id {
222 0 : let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
223 0 : let label = metric.with_labels(kind);
224 0 : metric.get_metric(label).measure(ep);
225 0 : }
226 0 : this.error_kind = Some(kind);
227 0 : }
228 :
229 0 : pub fn set_success(&self) {
230 0 : let mut this = self.0.try_lock().expect("should not deadlock");
231 0 : this.success = true;
232 0 : }
233 :
234 0 : pub fn log_connect(&self) {
235 0 : self.0
236 0 : .try_lock()
237 0 : .expect("should not deadlock")
238 0 : .log_connect();
239 0 : }
240 :
241 0 : pub(crate) fn protocol(&self) -> Protocol {
242 0 : self.0.try_lock().expect("should not deadlock").protocol
243 0 : }
244 :
245 0 : pub(crate) fn span(&self) -> Span {
246 0 : self.0.try_lock().expect("should not deadlock").span.clone()
247 0 : }
248 :
249 0 : pub(crate) fn session_id(&self) -> Uuid {
250 0 : self.0.try_lock().expect("should not deadlock").session_id
251 0 : }
252 :
253 6 : pub(crate) fn peer_addr(&self) -> IpAddr {
254 6 : self.0.try_lock().expect("should not deadlock").peer_addr
255 6 : }
256 :
257 0 : pub(crate) fn cold_start_info(&self) -> ColdStartInfo {
258 0 : self.0
259 0 : .try_lock()
260 0 : .expect("should not deadlock")
261 0 : .cold_start_info
262 0 : }
263 :
264 22 : pub(crate) fn latency_timer_pause(&self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
265 22 : LatencyTimerPause {
266 22 : ctx: self,
267 22 : start: tokio::time::Instant::now(),
268 22 : waiting_for,
269 22 : }
270 22 : }
271 :
272 4 : pub(crate) fn success(&self) {
273 4 : self.0
274 4 : .try_lock()
275 4 : .expect("should not deadlock")
276 4 : .latency_timer
277 4 : .success();
278 4 : }
279 : }
280 :
281 : pub(crate) struct LatencyTimerPause<'a> {
282 : ctx: &'a RequestMonitoring,
283 : start: tokio::time::Instant,
284 : waiting_for: Waiting,
285 : }
286 :
287 : impl Drop for LatencyTimerPause<'_> {
288 22 : fn drop(&mut self) {
289 22 : self.ctx
290 22 : .0
291 22 : .try_lock()
292 22 : .expect("should not deadlock")
293 22 : .latency_timer
294 22 : .unpause(self.start, self.waiting_for);
295 22 : }
296 : }
297 :
298 : impl RequestMonitoringInner {
299 0 : fn set_cold_start_info(&mut self, info: ColdStartInfo) {
300 0 : self.cold_start_info = info;
301 0 : self.latency_timer.cold_start_info(info);
302 0 : }
303 :
304 28 : fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
305 28 : if self.endpoint_id.is_none() {
306 28 : self.span.record("ep", display(&endpoint_id));
307 28 : let metric = &Metrics::get().proxy.connecting_endpoints;
308 28 : let label = metric.with_labels(self.protocol);
309 28 : metric.get_metric(label).measure(&endpoint_id);
310 28 : self.endpoint_id = Some(endpoint_id);
311 28 : }
312 28 : }
313 :
314 0 : fn set_application(&mut self, app: Option<SmolStr>) {
315 0 : if let Some(app) = app {
316 0 : self.application = Some(app);
317 0 : }
318 0 : }
319 :
320 0 : fn set_dbname(&mut self, dbname: DbName) {
321 0 : self.dbname = Some(dbname);
322 0 : }
323 :
324 0 : fn set_user(&mut self, user: RoleName) {
325 0 : self.span.record("role", display(&user));
326 0 : self.user = Some(user);
327 0 : }
328 :
329 0 : fn has_private_peer_addr(&self) -> bool {
330 0 : match self.peer_addr {
331 0 : IpAddr::V4(ip) => ip.is_private(),
332 0 : IpAddr::V6(_) => false,
333 : }
334 0 : }
335 :
336 0 : fn log_connect(&mut self) {
337 0 : let outcome = if self.success {
338 0 : ConnectOutcome::Success
339 : } else {
340 0 : ConnectOutcome::Failed
341 : };
342 0 : if let Some(rejected) = self.rejected {
343 0 : let ep = self
344 0 : .endpoint_id
345 0 : .as_ref()
346 0 : .map(|x| x.as_str())
347 0 : .unwrap_or_default();
348 0 : // This makes sense only if cache is disabled
349 0 : info!(
350 : ?outcome,
351 : ?rejected,
352 : ?ep,
353 0 : "check endpoint is valid with outcome"
354 : );
355 0 : Metrics::get()
356 0 : .proxy
357 0 : .invalid_endpoints_total
358 0 : .inc(InvalidEndpointsGroup {
359 0 : protocol: self.protocol,
360 0 : rejected: rejected.into(),
361 0 : outcome,
362 0 : });
363 0 : }
364 0 : if let Some(tx) = self.sender.take() {
365 0 : tx.send(RequestData::from(&*self))
366 0 : .inspect_err(|e| debug!("tx send failed: {e}"))
367 0 : .ok();
368 0 : }
369 0 : }
370 :
371 61 : fn log_disconnect(&mut self) {
372 61 : // If we are here, it's guaranteed that the user successfully connected to the endpoint.
373 61 : // Here we log the length of the session.
374 61 : self.disconnect_timestamp = Some(Utc::now());
375 61 : if let Some(tx) = self.disconnect_sender.take() {
376 0 : tx.send(RequestData::from(&*self))
377 0 : .inspect_err(|e| debug!("tx send failed: {e}"))
378 0 : .ok();
379 61 : }
380 61 : }
381 : }
382 :
383 : impl Drop for RequestMonitoringInner {
384 61 : fn drop(&mut self) {
385 61 : if self.sender.is_some() {
386 0 : self.log_connect();
387 61 : } else {
388 61 : self.log_disconnect();
389 61 : }
390 61 : }
391 : }
|