Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use chrono::Utc;
4 : use once_cell::sync::OnceCell;
5 : use smol_str::SmolStr;
6 : use std::net::IpAddr;
7 : use tokio::sync::mpsc;
8 : use tracing::{field::display, info_span, Span};
9 : use uuid::Uuid;
10 :
11 : use crate::{
12 : console::messages::MetricsAuxInfo,
13 : error::ErrorKind,
14 : metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND},
15 : BranchId, DbName, EndpointId, ProjectId, RoleName,
16 : };
17 :
18 : pub mod parquet;
19 :
20 : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
21 :
22 0 : #[derive(Clone)]
23 : /// Context data for a single request to connect to a database.
24 : ///
25 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
26 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
27 : pub struct RequestMonitoring {
28 : pub peer_addr: IpAddr,
29 : pub session_id: Uuid,
30 : pub protocol: &'static str,
31 : first_packet: chrono::DateTime<Utc>,
32 : region: &'static str,
33 : pub span: Span,
34 :
35 : // filled in as they are discovered
36 : project: Option<ProjectId>,
37 : branch: Option<BranchId>,
38 : endpoint_id: Option<EndpointId>,
39 : dbname: Option<DbName>,
40 : user: Option<RoleName>,
41 : application: Option<SmolStr>,
42 : error_kind: Option<ErrorKind>,
43 : pub(crate) auth_method: Option<AuthMethod>,
44 : success: bool,
45 : is_cold_start: Option<bool>,
46 :
47 : // extra
48 : // This sender is here to keep the request monitoring channel open while requests are taking place.
49 : sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
50 : pub latency_timer: LatencyTimer,
51 : }
52 :
53 0 : #[derive(Clone, Debug)]
54 : pub enum AuthMethod {
55 : // aka link aka passwordless
56 : Web,
57 : ScramSha256,
58 : ScramSha256Plus,
59 : Cleartext,
60 : }
61 :
62 : impl RequestMonitoring {
63 64 : pub fn new(
64 64 : session_id: Uuid,
65 64 : peer_addr: IpAddr,
66 64 : protocol: &'static str,
67 64 : region: &'static str,
68 64 : ) -> Self {
69 64 : let span = info_span!(
70 64 : "connect_request",
71 64 : %protocol,
72 64 : ?session_id,
73 64 : %peer_addr,
74 64 : ep = tracing::field::Empty,
75 64 : );
76 :
77 64 : Self {
78 64 : peer_addr,
79 64 : session_id,
80 64 : protocol,
81 64 : first_packet: Utc::now(),
82 64 : region,
83 64 : span,
84 64 :
85 64 : project: None,
86 64 : branch: None,
87 64 : endpoint_id: None,
88 64 : dbname: None,
89 64 : user: None,
90 64 : application: None,
91 64 : error_kind: None,
92 64 : auth_method: None,
93 64 : success: false,
94 64 : is_cold_start: None,
95 64 :
96 64 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
97 64 : latency_timer: LatencyTimer::new(protocol),
98 64 : }
99 64 : }
100 :
101 : #[cfg(test)]
102 64 : pub fn test() -> Self {
103 64 : RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
104 64 : }
105 :
106 0 : pub fn console_application_name(&self) -> String {
107 0 : format!(
108 0 : "{}/{}",
109 0 : self.application.as_deref().unwrap_or_default(),
110 0 : self.protocol
111 0 : )
112 0 : }
113 :
114 0 : pub fn set_project(&mut self, x: MetricsAuxInfo) {
115 0 : self.set_endpoint_id(x.endpoint_id);
116 0 : self.branch = Some(x.branch_id);
117 0 : self.project = Some(x.project_id);
118 0 : self.is_cold_start = x.is_cold_start;
119 0 : }
120 :
121 0 : pub fn set_project_id(&mut self, project_id: ProjectId) {
122 0 : self.project = Some(project_id);
123 0 : }
124 :
125 14 : pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
126 14 : self.span.record("ep", display(&endpoint_id));
127 14 : crate::metrics::CONNECTING_ENDPOINTS
128 14 : .with_label_values(&[self.protocol])
129 14 : .measure(&endpoint_id);
130 14 : self.endpoint_id = Some(endpoint_id);
131 14 : }
132 :
133 26 : pub fn set_application(&mut self, app: Option<SmolStr>) {
134 26 : self.application = app.or_else(|| self.application.clone());
135 26 : }
136 :
137 2 : pub fn set_dbname(&mut self, dbname: DbName) {
138 2 : self.dbname = Some(dbname);
139 2 : }
140 :
141 26 : pub fn set_user(&mut self, user: RoleName) {
142 26 : self.user = Some(user);
143 26 : }
144 :
145 0 : pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
146 0 : self.auth_method = Some(auth_method);
147 0 : }
148 :
149 0 : pub fn set_error_kind(&mut self, kind: ErrorKind) {
150 0 : ERROR_BY_KIND
151 0 : .with_label_values(&[kind.to_metric_label()])
152 0 : .inc();
153 0 : if let Some(ep) = &self.endpoint_id {
154 0 : ENDPOINT_ERRORS_BY_KIND
155 0 : .with_label_values(&[kind.to_metric_label()])
156 0 : .measure(ep);
157 0 : }
158 0 : self.error_kind = Some(kind);
159 0 : }
160 :
161 0 : pub fn set_success(&mut self) {
162 0 : self.success = true;
163 0 : }
164 :
165 0 : pub fn log(self) {}
166 : }
167 :
168 : impl Drop for RequestMonitoring {
169 64 : fn drop(&mut self) {
170 64 : if let Some(tx) = self.sender.take() {
171 0 : let _: Result<(), _> = tx.send(self.clone());
172 64 : }
173 64 : }
174 : }
|