Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use chrono::Utc;
4 : use once_cell::sync::OnceCell;
5 : use smol_str::SmolStr;
6 : use std::net::IpAddr;
7 : use tokio::sync::mpsc;
8 : use tracing::{field::display, info, info_span, Span};
9 : use uuid::Uuid;
10 :
11 : use crate::{
12 : console::messages::{ColdStartInfo, MetricsAuxInfo},
13 : error::ErrorKind,
14 : intern::{BranchIdInt, ProjectIdInt},
15 : metrics::{ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol},
16 : DbName, EndpointId, RoleName,
17 : };
18 :
19 : use self::parquet::RequestData;
20 :
21 : pub mod parquet;
22 :
23 : pub static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
24 : pub static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
25 :
26 : /// Context data for a single request to connect to a database.
27 : ///
28 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
29 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
30 : pub struct RequestMonitoring {
31 : pub peer_addr: IpAddr,
32 : pub session_id: Uuid,
33 : pub protocol: Protocol,
34 : first_packet: chrono::DateTime<Utc>,
35 : region: &'static str,
36 : pub span: Span,
37 :
38 : // filled in as they are discovered
39 : project: Option<ProjectIdInt>,
40 : branch: Option<BranchIdInt>,
41 : endpoint_id: Option<EndpointId>,
42 : dbname: Option<DbName>,
43 : user: Option<RoleName>,
44 : application: Option<SmolStr>,
45 : error_kind: Option<ErrorKind>,
46 : pub(crate) auth_method: Option<AuthMethod>,
47 : success: bool,
48 : pub(crate) cold_start_info: ColdStartInfo,
49 :
50 : // extra
51 : // This sender is here to keep the request monitoring channel open while requests are taking place.
52 : sender: Option<mpsc::UnboundedSender<RequestData>>,
53 : // This sender is only used to log the length of session in case of success.
54 : disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
55 : pub latency_timer: LatencyTimer,
56 : // Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
57 : rejected: Option<bool>,
58 : disconnect_timestamp: Option<chrono::DateTime<Utc>>,
59 : }
60 :
61 : #[derive(Clone, Debug)]
62 : pub enum AuthMethod {
63 : // aka link aka passwordless
64 : Web,
65 : ScramSha256,
66 : ScramSha256Plus,
67 : Cleartext,
68 : }
69 :
70 : impl RequestMonitoring {
71 70 : pub fn new(
72 70 : session_id: Uuid,
73 70 : peer_addr: IpAddr,
74 70 : protocol: Protocol,
75 70 : region: &'static str,
76 70 : ) -> Self {
77 70 : let span = info_span!(
78 70 : "connect_request",
79 70 : %protocol,
80 70 : ?session_id,
81 70 : %peer_addr,
82 70 : ep = tracing::field::Empty,
83 70 : role = tracing::field::Empty,
84 70 : );
85 :
86 70 : Self {
87 70 : peer_addr,
88 70 : session_id,
89 70 : protocol,
90 70 : first_packet: Utc::now(),
91 70 : region,
92 70 : span,
93 70 :
94 70 : project: None,
95 70 : branch: None,
96 70 : endpoint_id: None,
97 70 : dbname: None,
98 70 : user: None,
99 70 : application: None,
100 70 : error_kind: None,
101 70 : auth_method: None,
102 70 : success: false,
103 70 : rejected: None,
104 70 : cold_start_info: ColdStartInfo::Unknown,
105 70 :
106 70 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
107 70 : disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
108 70 : latency_timer: LatencyTimer::new(protocol),
109 70 : disconnect_timestamp: None,
110 70 : }
111 70 : }
112 :
113 : #[cfg(test)]
114 70 : pub fn test() -> Self {
115 70 : RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), Protocol::Tcp, "test")
116 70 : }
117 :
118 0 : pub fn console_application_name(&self) -> String {
119 0 : format!(
120 0 : "{}/{}",
121 0 : self.application.as_deref().unwrap_or_default(),
122 0 : self.protocol
123 0 : )
124 0 : }
125 :
126 0 : pub fn set_rejected(&mut self, rejected: bool) {
127 0 : self.rejected = Some(rejected);
128 0 : }
129 :
130 0 : pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
131 0 : self.cold_start_info = info;
132 0 : self.latency_timer.cold_start_info(info);
133 0 : }
134 :
135 0 : pub fn set_project(&mut self, x: MetricsAuxInfo) {
136 0 : if self.endpoint_id.is_none() {
137 0 : self.set_endpoint_id(x.endpoint_id.as_str().into())
138 0 : }
139 0 : self.branch = Some(x.branch_id);
140 0 : self.project = Some(x.project_id);
141 0 : self.set_cold_start_info(x.cold_start_info);
142 0 : }
143 :
144 0 : pub fn set_project_id(&mut self, project_id: ProjectIdInt) {
145 0 : self.project = Some(project_id);
146 0 : }
147 :
148 16 : pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
149 16 : if self.endpoint_id.is_none() {
150 16 : self.span.record("ep", display(&endpoint_id));
151 16 : let metric = &Metrics::get().proxy.connecting_endpoints;
152 16 : let label = metric.with_labels(self.protocol);
153 16 : metric.get_metric(label).measure(&endpoint_id);
154 16 : self.endpoint_id = Some(endpoint_id);
155 16 : }
156 16 : }
157 :
158 26 : pub fn set_application(&mut self, app: Option<SmolStr>) {
159 26 : self.application = app.or_else(|| self.application.clone());
160 26 : }
161 :
162 2 : pub fn set_dbname(&mut self, dbname: DbName) {
163 2 : self.dbname = Some(dbname);
164 2 : }
165 :
166 26 : pub fn set_user(&mut self, user: RoleName) {
167 26 : self.span.record("role", display(&user));
168 26 : self.user = Some(user);
169 26 : }
170 :
171 6 : pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
172 6 : self.auth_method = Some(auth_method);
173 6 : }
174 :
175 0 : pub fn has_private_peer_addr(&self) -> bool {
176 0 : match self.peer_addr {
177 0 : IpAddr::V4(ip) => ip.is_private(),
178 0 : _ => false,
179 : }
180 0 : }
181 :
182 0 : pub fn set_error_kind(&mut self, kind: ErrorKind) {
183 0 : // Do not record errors from the private address to metrics.
184 0 : if !self.has_private_peer_addr() {
185 0 : Metrics::get().proxy.errors_total.inc(kind);
186 0 : }
187 0 : if let Some(ep) = &self.endpoint_id {
188 0 : let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
189 0 : let label = metric.with_labels(kind);
190 0 : metric.get_metric(label).measure(ep);
191 0 : }
192 0 : self.error_kind = Some(kind);
193 0 : }
194 :
195 0 : pub fn set_success(&mut self) {
196 0 : self.success = true;
197 0 : }
198 :
199 0 : pub fn log_connect(&mut self) {
200 0 : let outcome = if self.success {
201 0 : ConnectOutcome::Success
202 : } else {
203 0 : ConnectOutcome::Failed
204 : };
205 0 : if let Some(rejected) = self.rejected {
206 0 : let ep = self
207 0 : .endpoint_id
208 0 : .as_ref()
209 0 : .map(|x| x.as_str())
210 0 : .unwrap_or_default();
211 0 : // This makes sense only if cache is disabled
212 0 : info!(
213 : ?outcome,
214 : ?rejected,
215 : ?ep,
216 0 : "check endpoint is valid with outcome"
217 : );
218 0 : Metrics::get()
219 0 : .proxy
220 0 : .invalid_endpoints_total
221 0 : .inc(InvalidEndpointsGroup {
222 0 : protocol: self.protocol,
223 0 : rejected: rejected.into(),
224 0 : outcome,
225 0 : });
226 0 : }
227 0 : if let Some(tx) = self.sender.take() {
228 0 : let _: Result<(), _> = tx.send(RequestData::from(&*self));
229 0 : }
230 0 : }
231 :
232 70 : fn log_disconnect(&mut self) {
233 70 : // If we are here, it's guaranteed that the user successfully connected to the endpoint.
234 70 : // Here we log the length of the session.
235 70 : self.disconnect_timestamp = Some(Utc::now());
236 70 : if let Some(tx) = self.disconnect_sender.take() {
237 0 : let _: Result<(), _> = tx.send(RequestData::from(&*self));
238 70 : }
239 70 : }
240 : }
241 :
242 : impl Drop for RequestMonitoring {
243 70 : fn drop(&mut self) {
244 70 : if self.sender.is_some() {
245 0 : self.log_connect();
246 70 : } else {
247 70 : self.log_disconnect();
248 70 : }
249 70 : }
250 : }
|