Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use chrono::Utc;
4 : use once_cell::sync::OnceCell;
5 : use pq_proto::StartupMessageParams;
6 : use smol_str::SmolStr;
7 : use std::net::IpAddr;
8 : use tokio::sync::mpsc;
9 : use tracing::{field::display, info, info_span, Span};
10 : use try_lock::TryLock;
11 : use uuid::Uuid;
12 :
13 : use crate::{
14 : console::messages::{ColdStartInfo, MetricsAuxInfo},
15 : error::ErrorKind,
16 : intern::{BranchIdInt, ProjectIdInt},
17 : metrics::{ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol, Waiting},
18 : DbName, EndpointId, RoleName,
19 : };
20 :
21 : use self::parquet::RequestData;
22 :
23 : pub mod parquet;
24 :
25 : pub static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
26 : pub static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
27 :
28 : /// Context data for a single request to connect to a database.
29 : ///
30 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
31 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
32 : pub struct RequestMonitoring(
33 : /// To allow easier use of the ctx object, we have interior mutability.
34 : /// I would typically use a RefCell but that would break the `Send` requirements
35 : /// so we need something with thread-safety. `TryLock` is a cheap alternative
36 : /// that offers similar semantics to a `RefCell` but with synchronisation.
37 : TryLock<RequestMonitoringInner>,
38 : );
39 :
40 : struct RequestMonitoringInner {
41 : pub peer_addr: IpAddr,
42 : pub session_id: Uuid,
43 : pub protocol: Protocol,
44 : first_packet: chrono::DateTime<Utc>,
45 : region: &'static str,
46 : pub span: Span,
47 :
48 : // filled in as they are discovered
49 : project: Option<ProjectIdInt>,
50 : branch: Option<BranchIdInt>,
51 : endpoint_id: Option<EndpointId>,
52 : dbname: Option<DbName>,
53 : user: Option<RoleName>,
54 : application: Option<SmolStr>,
55 : error_kind: Option<ErrorKind>,
56 : pub(crate) auth_method: Option<AuthMethod>,
57 : success: bool,
58 : pub(crate) cold_start_info: ColdStartInfo,
59 : pg_options: Option<StartupMessageParams>,
60 :
61 : // extra
62 : // This sender is here to keep the request monitoring channel open while requests are taking place.
63 : sender: Option<mpsc::UnboundedSender<RequestData>>,
64 : // This sender is only used to log the length of session in case of success.
65 : disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
66 : pub latency_timer: LatencyTimer,
67 : // Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
68 : rejected: Option<bool>,
69 : disconnect_timestamp: Option<chrono::DateTime<Utc>>,
70 : }
71 :
72 : #[derive(Clone, Debug)]
73 : pub enum AuthMethod {
74 : // aka link aka passwordless
75 : Web,
76 : ScramSha256,
77 : ScramSha256Plus,
78 : Cleartext,
79 : }
80 :
81 : impl RequestMonitoring {
82 114 : pub fn new(
83 114 : session_id: Uuid,
84 114 : peer_addr: IpAddr,
85 114 : protocol: Protocol,
86 114 : region: &'static str,
87 114 : ) -> Self {
88 114 : let span = info_span!(
89 114 : "connect_request",
90 114 : %protocol,
91 114 : ?session_id,
92 114 : %peer_addr,
93 114 : ep = tracing::field::Empty,
94 114 : role = tracing::field::Empty,
95 114 : );
96 :
97 114 : let inner = RequestMonitoringInner {
98 114 : peer_addr,
99 114 : session_id,
100 114 : protocol,
101 114 : first_packet: Utc::now(),
102 114 : region,
103 114 : span,
104 114 :
105 114 : project: None,
106 114 : branch: None,
107 114 : endpoint_id: None,
108 114 : dbname: None,
109 114 : user: None,
110 114 : application: None,
111 114 : error_kind: None,
112 114 : auth_method: None,
113 114 : success: false,
114 114 : rejected: None,
115 114 : cold_start_info: ColdStartInfo::Unknown,
116 114 : pg_options: None,
117 114 :
118 114 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
119 114 : disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
120 114 : latency_timer: LatencyTimer::new(protocol),
121 114 : disconnect_timestamp: None,
122 114 : };
123 114 :
124 114 : Self(TryLock::new(inner))
125 114 : }
126 :
127 : #[cfg(test)]
128 114 : pub fn test() -> Self {
129 114 : RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), Protocol::Tcp, "test")
130 114 : }
131 :
132 0 : pub fn console_application_name(&self) -> String {
133 0 : let this = self.0.try_lock().expect("should not deadlock");
134 0 : format!(
135 0 : "{}/{}",
136 0 : this.application.as_deref().unwrap_or_default(),
137 0 : this.protocol
138 0 : )
139 0 : }
140 :
141 0 : pub fn set_rejected(&self, rejected: bool) {
142 0 : let mut this = self.0.try_lock().expect("should not deadlock");
143 0 : this.rejected = Some(rejected);
144 0 : }
145 :
146 0 : pub fn set_cold_start_info(&self, info: ColdStartInfo) {
147 0 : self.0
148 0 : .try_lock()
149 0 : .expect("should not deadlock")
150 0 : .set_cold_start_info(info);
151 0 : }
152 :
153 0 : pub fn set_db_options(&self, options: StartupMessageParams) {
154 0 : let mut this = self.0.try_lock().expect("should not deadlock");
155 0 : this.set_application(options.get("application_name").map(SmolStr::from));
156 0 : if let Some(user) = options.get("user") {
157 0 : this.set_user(user.into());
158 0 : }
159 0 : if let Some(dbname) = options.get("database") {
160 0 : this.set_dbname(dbname.into());
161 0 : }
162 :
163 0 : this.pg_options = Some(options);
164 0 : }
165 :
166 0 : pub fn set_project(&self, x: MetricsAuxInfo) {
167 0 : let mut this = self.0.try_lock().expect("should not deadlock");
168 0 : if this.endpoint_id.is_none() {
169 0 : this.set_endpoint_id(x.endpoint_id.as_str().into());
170 0 : }
171 0 : this.branch = Some(x.branch_id);
172 0 : this.project = Some(x.project_id);
173 0 : this.set_cold_start_info(x.cold_start_info);
174 0 : }
175 :
176 0 : pub fn set_project_id(&self, project_id: ProjectIdInt) {
177 0 : let mut this = self.0.try_lock().expect("should not deadlock");
178 0 : this.project = Some(project_id);
179 0 : }
180 :
181 56 : pub fn set_endpoint_id(&self, endpoint_id: EndpointId) {
182 56 : self.0
183 56 : .try_lock()
184 56 : .expect("should not deadlock")
185 56 : .set_endpoint_id(endpoint_id);
186 56 : }
187 :
188 0 : pub fn set_dbname(&self, dbname: DbName) {
189 0 : self.0
190 0 : .try_lock()
191 0 : .expect("should not deadlock")
192 0 : .set_dbname(dbname);
193 0 : }
194 :
195 0 : pub fn set_user(&self, user: RoleName) {
196 0 : self.0
197 0 : .try_lock()
198 0 : .expect("should not deadlock")
199 0 : .set_user(user);
200 0 : }
201 :
202 30 : pub fn set_auth_method(&self, auth_method: AuthMethod) {
203 30 : let mut this = self.0.try_lock().expect("should not deadlock");
204 30 : this.auth_method = Some(auth_method);
205 30 : }
206 :
207 0 : pub fn has_private_peer_addr(&self) -> bool {
208 0 : self.0
209 0 : .try_lock()
210 0 : .expect("should not deadlock")
211 0 : .has_private_peer_addr()
212 0 : }
213 :
214 0 : pub fn set_error_kind(&self, kind: ErrorKind) {
215 0 : let mut this = self.0.try_lock().expect("should not deadlock");
216 0 : // Do not record errors from the private address to metrics.
217 0 : if !this.has_private_peer_addr() {
218 0 : Metrics::get().proxy.errors_total.inc(kind);
219 0 : }
220 0 : if let Some(ep) = &this.endpoint_id {
221 0 : let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
222 0 : let label = metric.with_labels(kind);
223 0 : metric.get_metric(label).measure(ep);
224 0 : }
225 0 : this.error_kind = Some(kind);
226 0 : }
227 :
228 0 : pub fn set_success(&self) {
229 0 : let mut this = self.0.try_lock().expect("should not deadlock");
230 0 : this.success = true;
231 0 : }
232 :
233 0 : pub fn log_connect(&self) {
234 0 : self.0
235 0 : .try_lock()
236 0 : .expect("should not deadlock")
237 0 : .log_connect();
238 0 : }
239 :
240 0 : pub fn protocol(&self) -> Protocol {
241 0 : self.0.try_lock().expect("should not deadlock").protocol
242 0 : }
243 :
244 0 : pub fn span(&self) -> Span {
245 0 : self.0.try_lock().expect("should not deadlock").span.clone()
246 0 : }
247 :
248 0 : pub fn session_id(&self) -> Uuid {
249 0 : self.0.try_lock().expect("should not deadlock").session_id
250 0 : }
251 :
252 12 : pub fn peer_addr(&self) -> IpAddr {
253 12 : self.0.try_lock().expect("should not deadlock").peer_addr
254 12 : }
255 :
256 0 : pub fn cold_start_info(&self) -> ColdStartInfo {
257 0 : self.0
258 0 : .try_lock()
259 0 : .expect("should not deadlock")
260 0 : .cold_start_info
261 0 : }
262 :
263 42 : pub fn latency_timer_pause(&self, waiting_for: Waiting) -> LatencyTimerPause<'_> {
264 42 : LatencyTimerPause {
265 42 : ctx: self,
266 42 : start: tokio::time::Instant::now(),
267 42 : waiting_for,
268 42 : }
269 42 : }
270 :
271 8 : pub fn success(&self) {
272 8 : self.0
273 8 : .try_lock()
274 8 : .expect("should not deadlock")
275 8 : .latency_timer
276 8 : .success();
277 8 : }
278 : }
279 :
280 : pub struct LatencyTimerPause<'a> {
281 : ctx: &'a RequestMonitoring,
282 : start: tokio::time::Instant,
283 : waiting_for: Waiting,
284 : }
285 :
286 : impl Drop for LatencyTimerPause<'_> {
287 42 : fn drop(&mut self) {
288 42 : self.ctx
289 42 : .0
290 42 : .try_lock()
291 42 : .expect("should not deadlock")
292 42 : .latency_timer
293 42 : .unpause(self.start, self.waiting_for);
294 42 : }
295 : }
296 :
297 : impl RequestMonitoringInner {
298 0 : fn set_cold_start_info(&mut self, info: ColdStartInfo) {
299 0 : self.cold_start_info = info;
300 0 : self.latency_timer.cold_start_info(info);
301 0 : }
302 :
303 56 : fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
304 56 : if self.endpoint_id.is_none() {
305 56 : self.span.record("ep", display(&endpoint_id));
306 56 : let metric = &Metrics::get().proxy.connecting_endpoints;
307 56 : let label = metric.with_labels(self.protocol);
308 56 : metric.get_metric(label).measure(&endpoint_id);
309 56 : self.endpoint_id = Some(endpoint_id);
310 56 : }
311 56 : }
312 :
313 0 : fn set_application(&mut self, app: Option<SmolStr>) {
314 0 : if let Some(app) = app {
315 0 : self.application = Some(app);
316 0 : }
317 0 : }
318 :
319 0 : fn set_dbname(&mut self, dbname: DbName) {
320 0 : self.dbname = Some(dbname);
321 0 : }
322 :
323 0 : fn set_user(&mut self, user: RoleName) {
324 0 : self.span.record("role", display(&user));
325 0 : self.user = Some(user);
326 0 : }
327 :
328 0 : fn has_private_peer_addr(&self) -> bool {
329 0 : match self.peer_addr {
330 0 : IpAddr::V4(ip) => ip.is_private(),
331 0 : IpAddr::V6(_) => false,
332 : }
333 0 : }
334 :
335 0 : fn log_connect(&mut self) {
336 0 : let outcome = if self.success {
337 0 : ConnectOutcome::Success
338 : } else {
339 0 : ConnectOutcome::Failed
340 : };
341 0 : if let Some(rejected) = self.rejected {
342 0 : let ep = self
343 0 : .endpoint_id
344 0 : .as_ref()
345 0 : .map(|x| x.as_str())
346 0 : .unwrap_or_default();
347 0 : // This makes sense only if cache is disabled
348 0 : info!(
349 : ?outcome,
350 : ?rejected,
351 : ?ep,
352 0 : "check endpoint is valid with outcome"
353 : );
354 0 : Metrics::get()
355 0 : .proxy
356 0 : .invalid_endpoints_total
357 0 : .inc(InvalidEndpointsGroup {
358 0 : protocol: self.protocol,
359 0 : rejected: rejected.into(),
360 0 : outcome,
361 0 : });
362 0 : }
363 0 : if let Some(tx) = self.sender.take() {
364 0 : let _: Result<(), _> = tx.send(RequestData::from(&*self));
365 0 : }
366 0 : }
367 :
368 114 : fn log_disconnect(&mut self) {
369 114 : // If we are here, it's guaranteed that the user successfully connected to the endpoint.
370 114 : // Here we log the length of the session.
371 114 : self.disconnect_timestamp = Some(Utc::now());
372 114 : if let Some(tx) = self.disconnect_sender.take() {
373 0 : let _: Result<(), _> = tx.send(RequestData::from(&*self));
374 114 : }
375 114 : }
376 : }
377 :
378 : impl Drop for RequestMonitoringInner {
379 114 : fn drop(&mut self) {
380 114 : if self.sender.is_some() {
381 0 : self.log_connect();
382 114 : } else {
383 114 : self.log_disconnect();
384 114 : }
385 114 : }
386 : }
|