Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use chrono::Utc;
4 : use once_cell::sync::OnceCell;
5 : use smol_str::SmolStr;
6 : use std::net::IpAddr;
7 : use tokio::sync::mpsc;
8 : use tracing::{field::display, info, info_span, Span};
9 : use uuid::Uuid;
10 :
11 : use crate::{
12 : console::messages::{ColdStartInfo, MetricsAuxInfo},
13 : error::ErrorKind,
14 : intern::{BranchIdInt, ProjectIdInt},
15 : metrics::{ConnectOutcome, InvalidEndpointsGroup, LatencyTimer, Metrics, Protocol},
16 : DbName, EndpointId, RoleName,
17 : };
18 :
19 : use self::parquet::RequestData;
20 :
21 : pub mod parquet;
22 :
23 : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestData>> = OnceCell::new();
24 :
25 : /// Context data for a single request to connect to a database.
26 : ///
27 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
28 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
29 : pub struct RequestMonitoring {
30 : pub peer_addr: IpAddr,
31 : pub session_id: Uuid,
32 : pub protocol: Protocol,
33 : first_packet: chrono::DateTime<Utc>,
34 : region: &'static str,
35 : pub span: Span,
36 :
37 : // filled in as they are discovered
38 : project: Option<ProjectIdInt>,
39 : branch: Option<BranchIdInt>,
40 : endpoint_id: Option<EndpointId>,
41 : dbname: Option<DbName>,
42 : user: Option<RoleName>,
43 : application: Option<SmolStr>,
44 : error_kind: Option<ErrorKind>,
45 : pub(crate) auth_method: Option<AuthMethod>,
46 : success: bool,
47 : pub(crate) cold_start_info: ColdStartInfo,
48 :
49 : // extra
50 : // This sender is here to keep the request monitoring channel open while requests are taking place.
51 : sender: Option<mpsc::UnboundedSender<RequestData>>,
52 : pub latency_timer: LatencyTimer,
53 : // Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
54 : rejected: Option<bool>,
55 : }
56 :
57 : #[derive(Clone, Debug)]
58 : pub enum AuthMethod {
59 : // aka link aka passwordless
60 : Web,
61 : ScramSha256,
62 : ScramSha256Plus,
63 : Cleartext,
64 : }
65 :
66 : impl RequestMonitoring {
67 70 : pub fn new(
68 70 : session_id: Uuid,
69 70 : peer_addr: IpAddr,
70 70 : protocol: Protocol,
71 70 : region: &'static str,
72 70 : ) -> Self {
73 70 : let span = info_span!(
74 70 : "connect_request",
75 70 : %protocol,
76 70 : ?session_id,
77 70 : %peer_addr,
78 70 : ep = tracing::field::Empty,
79 70 : role = tracing::field::Empty,
80 70 : );
81 :
82 70 : Self {
83 70 : peer_addr,
84 70 : session_id,
85 70 : protocol,
86 70 : first_packet: Utc::now(),
87 70 : region,
88 70 : span,
89 70 :
90 70 : project: None,
91 70 : branch: None,
92 70 : endpoint_id: None,
93 70 : dbname: None,
94 70 : user: None,
95 70 : application: None,
96 70 : error_kind: None,
97 70 : auth_method: None,
98 70 : success: false,
99 70 : rejected: None,
100 70 : cold_start_info: ColdStartInfo::Unknown,
101 70 :
102 70 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
103 70 : latency_timer: LatencyTimer::new(protocol),
104 70 : }
105 70 : }
106 :
107 : #[cfg(test)]
108 70 : pub fn test() -> Self {
109 70 : RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), Protocol::Tcp, "test")
110 70 : }
111 :
112 0 : pub fn console_application_name(&self) -> String {
113 0 : format!(
114 0 : "{}/{}",
115 0 : self.application.as_deref().unwrap_or_default(),
116 0 : self.protocol
117 0 : )
118 0 : }
119 :
120 0 : pub fn set_rejected(&mut self, rejected: bool) {
121 0 : self.rejected = Some(rejected);
122 0 : }
123 :
124 0 : pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
125 0 : self.cold_start_info = info;
126 0 : self.latency_timer.cold_start_info(info);
127 0 : }
128 :
129 0 : pub fn set_project(&mut self, x: MetricsAuxInfo) {
130 0 : if self.endpoint_id.is_none() {
131 0 : self.set_endpoint_id(x.endpoint_id.as_str().into())
132 0 : }
133 0 : self.branch = Some(x.branch_id);
134 0 : self.project = Some(x.project_id);
135 0 : self.set_cold_start_info(x.cold_start_info);
136 0 : }
137 :
138 0 : pub fn set_project_id(&mut self, project_id: ProjectIdInt) {
139 0 : self.project = Some(project_id);
140 0 : }
141 :
142 16 : pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
143 16 : if self.endpoint_id.is_none() {
144 16 : self.span.record("ep", display(&endpoint_id));
145 16 : let metric = &Metrics::get().proxy.connecting_endpoints;
146 16 : let label = metric.with_labels(self.protocol);
147 16 : metric.get_metric(label).measure(&endpoint_id);
148 16 : self.endpoint_id = Some(endpoint_id);
149 16 : }
150 16 : }
151 :
152 26 : pub fn set_application(&mut self, app: Option<SmolStr>) {
153 26 : self.application = app.or_else(|| self.application.clone());
154 26 : }
155 :
156 2 : pub fn set_dbname(&mut self, dbname: DbName) {
157 2 : self.dbname = Some(dbname);
158 2 : }
159 :
160 26 : pub fn set_user(&mut self, user: RoleName) {
161 26 : self.span.record("role", display(&user));
162 26 : self.user = Some(user);
163 26 : }
164 :
165 6 : pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
166 6 : self.auth_method = Some(auth_method);
167 6 : }
168 :
169 0 : pub fn has_private_peer_addr(&self) -> bool {
170 0 : match self.peer_addr {
171 0 : IpAddr::V4(ip) => ip.is_private(),
172 0 : _ => false,
173 : }
174 0 : }
175 :
176 0 : pub fn set_error_kind(&mut self, kind: ErrorKind) {
177 0 : // Do not record errors from the private address to metrics.
178 0 : if !self.has_private_peer_addr() {
179 0 : Metrics::get().proxy.errors_total.inc(kind);
180 0 : }
181 0 : if let Some(ep) = &self.endpoint_id {
182 0 : let metric = &Metrics::get().proxy.endpoints_affected_by_errors;
183 0 : let label = metric.with_labels(kind);
184 0 : metric.get_metric(label).measure(ep);
185 0 : }
186 0 : self.error_kind = Some(kind);
187 0 : }
188 :
189 0 : pub fn set_success(&mut self) {
190 0 : self.success = true;
191 0 : }
192 :
193 0 : pub fn log(self) {}
194 : }
195 :
196 : impl Drop for RequestMonitoring {
197 70 : fn drop(&mut self) {
198 70 : let outcome = if self.success {
199 0 : ConnectOutcome::Success
200 : } else {
201 70 : ConnectOutcome::Failed
202 : };
203 70 : if let Some(rejected) = self.rejected {
204 0 : let ep = self
205 0 : .endpoint_id
206 0 : .as_ref()
207 0 : .map(|x| x.as_str())
208 0 : .unwrap_or_default();
209 0 : // This makes sense only if cache is disabled
210 0 : info!(
211 0 : ?outcome,
212 0 : ?rejected,
213 0 : ?ep,
214 0 : "check endpoint is valid with outcome"
215 0 : );
216 0 : Metrics::get()
217 0 : .proxy
218 0 : .invalid_endpoints_total
219 0 : .inc(InvalidEndpointsGroup {
220 0 : protocol: self.protocol,
221 0 : rejected: rejected.into(),
222 0 : outcome,
223 0 : });
224 70 : }
225 70 : if let Some(tx) = self.sender.take() {
226 0 : let _: Result<(), _> = tx.send(RequestData::from(&*self));
227 70 : }
228 70 : }
229 : }
|