Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use chrono::Utc;
4 : use once_cell::sync::OnceCell;
5 : use smol_str::SmolStr;
6 : use std::net::IpAddr;
7 : use tokio::sync::mpsc;
8 : use tracing::{field::display, info_span, Span};
9 : use uuid::Uuid;
10 :
11 : use crate::{
12 : console::messages::{ColdStartInfo, MetricsAuxInfo},
13 : error::ErrorKind,
14 : intern::{BranchIdInt, ProjectIdInt},
15 : metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND},
16 : DbName, EndpointId, RoleName,
17 : };
18 :
19 : use self::parquet::RequestData;
20 :
21 : pub mod parquet;
22 :
23 : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
24 :
25 : /// Context data for a single request to connect to a database.
26 : ///
27 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
28 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
29 : pub struct RequestMonitoring {
30 : pub peer_addr: IpAddr,
31 : pub session_id: Uuid,
32 : pub protocol: &'static str,
33 : first_packet: chrono::DateTime<Utc>,
34 : region: &'static str,
35 : pub span: Span,
36 :
37 : // filled in as they are discovered
38 : project: Option<ProjectIdInt>,
39 : branch: Option<BranchIdInt>,
40 : endpoint_id: Option<EndpointId>,
41 : dbname: Option<DbName>,
42 : user: Option<RoleName>,
43 : application: Option<SmolStr>,
44 : error_kind: Option<ErrorKind>,
45 : pub(crate) auth_method: Option<AuthMethod>,
46 : success: bool,
47 : pub(crate) cold_start_info: ColdStartInfo,
48 :
49 : // extra
50 : // This sender is here to keep the request monitoring channel open while requests are taking place.
51 : sender: Option<mpsc::UnboundedSender<RequestData>>,
52 : pub latency_timer: LatencyTimer,
53 : }
54 :
55 : #[derive(Clone, Debug)]
56 : pub enum AuthMethod {
57 : // aka link aka passwordless
58 : Web,
59 : ScramSha256,
60 : ScramSha256Plus,
61 : Cleartext,
62 : }
63 :
64 : impl RequestMonitoring {
65 70 : pub fn new(
66 70 : session_id: Uuid,
67 70 : peer_addr: IpAddr,
68 70 : protocol: &'static str,
69 70 : region: &'static str,
70 70 : ) -> Self {
71 70 : let span = info_span!(
72 70 : "connect_request",
73 70 : %protocol,
74 70 : ?session_id,
75 70 : %peer_addr,
76 70 : ep = tracing::field::Empty,
77 70 : );
78 :
79 70 : Self {
80 70 : peer_addr,
81 70 : session_id,
82 70 : protocol,
83 70 : first_packet: Utc::now(),
84 70 : region,
85 70 : span,
86 70 :
87 70 : project: None,
88 70 : branch: None,
89 70 : endpoint_id: None,
90 70 : dbname: None,
91 70 : user: None,
92 70 : application: None,
93 70 : error_kind: None,
94 70 : auth_method: None,
95 70 : success: false,
96 70 : cold_start_info: ColdStartInfo::Unknown,
97 70 :
98 70 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
99 70 : latency_timer: LatencyTimer::new(protocol),
100 70 : }
101 70 : }
102 :
103 : #[cfg(test)]
104 70 : pub fn test() -> Self {
105 70 : RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
106 70 : }
107 :
108 0 : pub fn console_application_name(&self) -> String {
109 0 : format!(
110 0 : "{}/{}",
111 0 : self.application.as_deref().unwrap_or_default(),
112 0 : self.protocol
113 0 : )
114 0 : }
115 :
116 0 : pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
117 0 : self.cold_start_info = info;
118 0 : self.latency_timer.cold_start_info(info);
119 0 : }
120 :
121 0 : pub fn set_project(&mut self, x: MetricsAuxInfo) {
122 0 : if self.endpoint_id.is_none() {
123 0 : self.set_endpoint_id(x.endpoint_id.as_str().into())
124 0 : }
125 0 : self.branch = Some(x.branch_id);
126 0 : self.project = Some(x.project_id);
127 0 : self.set_cold_start_info(x.cold_start_info);
128 0 : }
129 :
130 0 : pub fn set_project_id(&mut self, project_id: ProjectIdInt) {
131 0 : self.project = Some(project_id);
132 0 : }
133 :
134 16 : pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
135 16 : if self.endpoint_id.is_none() {
136 16 : self.span.record("ep", display(&endpoint_id));
137 16 : crate::metrics::CONNECTING_ENDPOINTS
138 16 : .with_label_values(&[self.protocol])
139 16 : .measure(&endpoint_id);
140 16 : self.endpoint_id = Some(endpoint_id);
141 16 : }
142 16 : }
143 :
144 26 : pub fn set_application(&mut self, app: Option<SmolStr>) {
145 26 : self.application = app.or_else(|| self.application.clone());
146 26 : }
147 :
148 2 : pub fn set_dbname(&mut self, dbname: DbName) {
149 2 : self.dbname = Some(dbname);
150 2 : }
151 :
152 26 : pub fn set_user(&mut self, user: RoleName) {
153 26 : self.user = Some(user);
154 26 : }
155 :
156 6 : pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
157 6 : self.auth_method = Some(auth_method);
158 6 : }
159 :
160 0 : pub fn set_error_kind(&mut self, kind: ErrorKind) {
161 0 : ERROR_BY_KIND
162 0 : .with_label_values(&[kind.to_metric_label()])
163 0 : .inc();
164 0 : if let Some(ep) = &self.endpoint_id {
165 0 : ENDPOINT_ERRORS_BY_KIND
166 0 : .with_label_values(&[kind.to_metric_label()])
167 0 : .measure(ep);
168 0 : }
169 0 : self.error_kind = Some(kind);
170 0 : }
171 :
172 0 : pub fn set_success(&mut self) {
173 0 : self.success = true;
174 0 : }
175 :
176 0 : pub fn log(self) {}
177 : }
178 :
179 : impl Drop for RequestMonitoring {
180 70 : fn drop(&mut self) {
181 70 : if let Some(tx) = self.sender.take() {
182 0 : let _: Result<(), _> = tx.send(RequestData::from(&*self));
183 70 : }
184 70 : }
185 : }
|