Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use chrono::Utc;
4 : use once_cell::sync::OnceCell;
5 : use pq_proto::StartupMessageParams;
6 : use smol_str::SmolStr;
7 : use std::net::IpAddr;
8 : use tokio::sync::mpsc;
9 : use tracing::{field::display, info, info_span, Span};
10 : use uuid::Uuid;
11 :
12 : use crate::{
13 : console::messages::{ColdStartInfo, MetricsAuxInfo},
14 : error::ErrorKind,
15 : intern::{BranchIdInt, ProjectIdInt},
16 : metrics::{ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol},
17 : DbName, EndpointId, RoleName,
18 : };
19 :
20 : use self::parquet::RequestData;
21 :
22 : pub mod parquet;
23 :
24 : pub static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
25 : pub static LOG_CHAN_DISCONNECT: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
26 :
27 : /// Context data for a single request to connect to a database.
28 : ///
29 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
30 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
31 : pub struct RequestMonitoring {
32 : pub peer_addr: IpAddr,
33 : pub session_id: Uuid,
34 : pub protocol: Protocol,
35 : first_packet: chrono::DateTime<Utc>,
36 : region: &'static str,
37 : pub span: Span,
38 :
39 : // filled in as they are discovered
40 : project: Option<ProjectIdInt>,
41 : branch: Option<BranchIdInt>,
42 : endpoint_id: Option<EndpointId>,
43 : dbname: Option<DbName>,
44 : user: Option<RoleName>,
45 : application: Option<SmolStr>,
46 : error_kind: Option<ErrorKind>,
47 : pub(crate) auth_method: Option<AuthMethod>,
48 : success: bool,
49 : pub(crate) cold_start_info: ColdStartInfo,
50 : pg_options: Option<StartupMessageParams>,
51 :
52 : // extra
53 : // This sender is here to keep the request monitoring channel open while requests are taking place.
54 : sender: Option<mpsc::UnboundedSender<RequestData>>,
55 : // This sender is only used to log the length of session in case of success.
56 : disconnect_sender: Option<mpsc::UnboundedSender<RequestData>>,
57 : pub latency_timer: LatencyTimer,
58 : // Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
59 : rejected: Option<bool>,
60 : disconnect_timestamp: Option<chrono::DateTime<Utc>>,
61 : }
62 :
63 : #[derive(Clone, Debug)]
64 : pub enum AuthMethod {
65 : // aka link aka passwordless
66 : Web,
67 : ScramSha256,
68 : ScramSha256Plus,
69 : Cleartext,
70 : }
71 :
72 : impl RequestMonitoring {
73 70 : pub fn new(
74 70 : session_id: Uuid,
75 70 : peer_addr: IpAddr,
76 70 : protocol: Protocol,
77 70 : region: &'static str,
78 70 : ) -> Self {
79 70 : let span = info_span!(
80 70 : "connect_request",
81 70 : %protocol,
82 70 : ?session_id,
83 70 : %peer_addr,
84 70 : ep = tracing::field::Empty,
85 70 : role = tracing::field::Empty,
86 70 : );
87 :
88 70 : Self {
89 70 : peer_addr,
90 70 : session_id,
91 70 : protocol,
92 70 : first_packet: Utc::now(),
93 70 : region,
94 70 : span,
95 70 :
96 70 : project: None,
97 70 : branch: None,
98 70 : endpoint_id: None,
99 70 : dbname: None,
100 70 : user: None,
101 70 : application: None,
102 70 : error_kind: None,
103 70 : auth_method: None,
104 70 : success: false,
105 70 : rejected: None,
106 70 : cold_start_info: ColdStartInfo::Unknown,
107 70 : pg_options: None,
108 70 :
109 70 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
110 70 : disconnect_sender: LOG_CHAN_DISCONNECT.get().and_then(|tx| tx.upgrade()),
111 70 : latency_timer: LatencyTimer::new(protocol),
112 70 : disconnect_timestamp: None,
113 70 : }
114 70 : }
115 :
116 : #[cfg(test)]
117 70 : pub fn test() -> Self {
118 70 : RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), Protocol::Tcp, "test")
119 70 : }
120 :
121 0 : pub fn console_application_name(&self) -> String {
122 0 : format!(
123 0 : "{}/{}",
124 0 : self.application.as_deref().unwrap_or_default(),
125 0 : self.protocol
126 0 : )
127 0 : }
128 :
129 0 : pub fn set_rejected(&mut self, rejected: bool) {
130 0 : self.rejected = Some(rejected);
131 0 : }
132 :
133 0 : pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
134 0 : self.cold_start_info = info;
135 0 : self.latency_timer.cold_start_info(info);
136 0 : }
137 :
138 0 : pub fn set_db_options(&mut self, options: StartupMessageParams) {
139 0 : self.set_application(options.get("application_name").map(SmolStr::from));
140 0 : if let Some(user) = options.get("user") {
141 0 : self.set_user(user.into());
142 0 : }
143 0 : if let Some(dbname) = options.get("database") {
144 0 : self.set_dbname(dbname.into());
145 0 : }
146 :
147 0 : self.pg_options = Some(options);
148 0 : }
149 :
150 0 : pub fn set_project(&mut self, x: MetricsAuxInfo) {
151 0 : if self.endpoint_id.is_none() {
152 0 : self.set_endpoint_id(x.endpoint_id.as_str().into())
153 0 : }
154 0 : self.branch = Some(x.branch_id);
155 0 : self.project = Some(x.project_id);
156 0 : self.set_cold_start_info(x.cold_start_info);
157 0 : }
158 :
159 0 : pub fn set_project_id(&mut self, project_id: ProjectIdInt) {
160 0 : self.project = Some(project_id);
161 0 : }
162 :
163 16 : pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
164 16 : if self.endpoint_id.is_none() {
165 16 : self.span.record("ep", display(&endpoint_id));
166 16 : let metric = &Metrics::get().proxy.connecting_endpoints;
167 16 : let label = metric.with_labels(self.protocol);
168 16 : metric.get_metric(label).measure(&endpoint_id);
169 16 : self.endpoint_id = Some(endpoint_id);
170 16 : }
171 16 : }
172 :
173 0 : fn set_application(&mut self, app: Option<SmolStr>) {
174 0 : if let Some(app) = app {
175 0 : self.application = Some(app);
176 0 : }
177 0 : }
178 :
179 0 : pub fn set_dbname(&mut self, dbname: DbName) {
180 0 : self.dbname = Some(dbname);
181 0 : }
182 :
183 0 : pub fn set_user(&mut self, user: RoleName) {
184 0 : self.span.record("role", display(&user));
185 0 : self.user = Some(user);
186 0 : }
187 :
188 6 : pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
189 6 : self.auth_method = Some(auth_method);
190 6 : }
191 :
192 0 : pub fn has_private_peer_addr(&self) -> bool {
193 0 : match self.peer_addr {
194 0 : IpAddr::V4(ip) => ip.is_private(),
195 0 : _ => false,
196 : }
197 0 : }
198 :
199 0 : pub fn set_error_kind(&mut self, kind: ErrorKind) {
200 0 : // Do not record errors from the private address to metrics.
201 0 : if !self.has_private_peer_addr() {
202 0 : Metrics::get().proxy.errors_total.inc(kind);
203 0 : }
204 0 : if let Some(ep) = &self.endpoint_id {
205 0 : let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
206 0 : let label = metric.with_labels(kind);
207 0 : metric.get_metric(label).measure(ep);
208 0 : }
209 0 : self.error_kind = Some(kind);
210 0 : }
211 :
212 0 : pub fn set_success(&mut self) {
213 0 : self.success = true;
214 0 : }
215 :
216 0 : pub fn log_connect(&mut self) {
217 0 : let outcome = if self.success {
218 0 : ConnectOutcome::Success
219 : } else {
220 0 : ConnectOutcome::Failed
221 : };
222 0 : if let Some(rejected) = self.rejected {
223 0 : let ep = self
224 0 : .endpoint_id
225 0 : .as_ref()
226 0 : .map(|x| x.as_str())
227 0 : .unwrap_or_default();
228 0 : // This makes sense only if cache is disabled
229 0 : info!(
230 : ?outcome,
231 : ?rejected,
232 : ?ep,
233 0 : "check endpoint is valid with outcome"
234 : );
235 0 : Metrics::get()
236 0 : .proxy
237 0 : .invalid_endpoints_total
238 0 : .inc(InvalidEndpointsGroup {
239 0 : protocol: self.protocol,
240 0 : rejected: rejected.into(),
241 0 : outcome,
242 0 : });
243 0 : }
244 0 : if let Some(tx) = self.sender.take() {
245 0 : let _: Result<(), _> = tx.send(RequestData::from(&*self));
246 0 : }
247 0 : }
248 :
249 70 : fn log_disconnect(&mut self) {
250 70 : // If we are here, it's guaranteed that the user successfully connected to the endpoint.
251 70 : // Here we log the length of the session.
252 70 : self.disconnect_timestamp = Some(Utc::now());
253 70 : if let Some(tx) = self.disconnect_sender.take() {
254 0 : let _: Result<(), _> = tx.send(RequestData::from(&*self));
255 70 : }
256 70 : }
257 : }
258 :
259 : impl Drop for RequestMonitoring {
260 70 : fn drop(&mut self) {
261 70 : if self.sender.is_some() {
262 0 : self.log_connect();
263 70 : } else {
264 70 : self.log_disconnect();
265 70 : }
266 70 : }
267 : }
|