Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use chrono::Utc;
4 : use once_cell::sync::OnceCell;
5 : use smol_str::SmolStr;
6 : use std::net::IpAddr;
7 : use tokio::sync::mpsc;
8 : use uuid::Uuid;
9 :
10 : use crate::{
11 : console::messages::MetricsAuxInfo,
12 : error::ErrorKind,
13 : metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND},
14 : BranchId, DbName, EndpointId, ProjectId, RoleName,
15 : };
16 :
17 : pub mod parquet;
18 :
19 : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
20 :
21 0 : #[derive(Clone)]
22 : /// Context data for a single request to connect to a database.
23 : ///
24 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
25 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
26 : pub struct RequestMonitoring {
27 : pub peer_addr: IpAddr,
28 : pub session_id: Uuid,
29 : pub protocol: &'static str,
30 : first_packet: chrono::DateTime<Utc>,
31 : region: &'static str,
32 :
33 : // filled in as they are discovered
34 : project: Option<ProjectId>,
35 : branch: Option<BranchId>,
36 : endpoint_id: Option<EndpointId>,
37 : dbname: Option<DbName>,
38 : user: Option<RoleName>,
39 : application: Option<SmolStr>,
40 : error_kind: Option<ErrorKind>,
41 : pub(crate) auth_method: Option<AuthMethod>,
42 : success: bool,
43 :
44 : // extra
45 : // This sender is here to keep the request monitoring channel open while requests are taking place.
46 : sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
47 : pub latency_timer: LatencyTimer,
48 : }
49 :
50 0 : #[derive(Clone, Debug)]
51 : pub enum AuthMethod {
52 : // aka link aka passwordless
53 : Web,
54 : ScramSha256,
55 : ScramSha256Plus,
56 : Cleartext,
57 : }
58 :
59 : impl RequestMonitoring {
60 176 : pub fn new(
61 176 : session_id: Uuid,
62 176 : peer_addr: IpAddr,
63 176 : protocol: &'static str,
64 176 : region: &'static str,
65 176 : ) -> Self {
66 176 : Self {
67 176 : peer_addr,
68 176 : session_id,
69 176 : protocol,
70 176 : first_packet: Utc::now(),
71 176 : region,
72 176 :
73 176 : project: None,
74 176 : branch: None,
75 176 : endpoint_id: None,
76 176 : dbname: None,
77 176 : user: None,
78 176 : application: None,
79 176 : error_kind: None,
80 176 : auth_method: None,
81 176 : success: false,
82 176 :
83 176 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
84 176 : latency_timer: LatencyTimer::new(protocol),
85 176 : }
86 176 : }
87 :
88 : #[cfg(test)]
89 64 : pub fn test() -> Self {
90 64 : RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
91 64 : }
92 :
93 4 : pub fn console_application_name(&self) -> String {
94 4 : format!(
95 4 : "{}/{}",
96 4 : self.application.as_deref().unwrap_or_default(),
97 4 : self.protocol
98 4 : )
99 4 : }
100 :
101 81 : pub fn set_project(&mut self, x: MetricsAuxInfo) {
102 81 : self.branch = Some(x.branch_id);
103 81 : self.endpoint_id = Some(x.endpoint_id);
104 81 : self.project = Some(x.project_id);
105 81 : }
106 :
107 0 : pub fn set_project_id(&mut self, project_id: ProjectId) {
108 0 : self.project = Some(project_id);
109 0 : }
110 :
111 109 : pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
112 109 : crate::metrics::CONNECTING_ENDPOINTS
113 109 : .with_label_values(&[self.protocol])
114 109 : .measure(&endpoint_id);
115 109 : self.endpoint_id = Some(endpoint_id);
116 109 : }
117 :
118 75 : pub fn set_application(&mut self, app: Option<SmolStr>) {
119 75 : self.application = app.or_else(|| self.application.clone());
120 75 : }
121 :
122 98 : pub fn set_dbname(&mut self, dbname: DbName) {
123 98 : self.dbname = Some(dbname);
124 98 : }
125 :
126 125 : pub fn set_user(&mut self, user: RoleName) {
127 125 : self.user = Some(user);
128 125 : }
129 :
130 53 : pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
131 53 : self.auth_method = Some(auth_method);
132 53 : }
133 :
134 22 : pub fn set_error_kind(&mut self, kind: ErrorKind) {
135 22 : ERROR_BY_KIND
136 22 : .with_label_values(&[kind.to_metric_label()])
137 22 : .inc();
138 22 : if let Some(ep) = &self.endpoint_id {
139 10 : ENDPOINT_ERRORS_BY_KIND
140 10 : .with_label_values(&[kind.to_metric_label()])
141 10 : .measure(ep);
142 12 : }
143 22 : self.error_kind = Some(kind);
144 22 : }
145 :
146 84 : pub fn set_success(&mut self) {
147 84 : self.success = true;
148 84 : }
149 :
150 240 : pub fn log(&mut self) {
151 240 : if let Some(tx) = self.sender.take() {
152 0 : let _: Result<(), _> = tx.send(self.clone());
153 240 : }
154 240 : }
155 : }
156 :
157 : impl Drop for RequestMonitoring {
158 176 : fn drop(&mut self) {
159 176 : self.log()
160 176 : }
161 : }
|