Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use chrono::Utc;
4 : use once_cell::sync::OnceCell;
5 : use smol_str::SmolStr;
6 : use std::net::IpAddr;
7 : use tokio::sync::mpsc;
8 : use uuid::Uuid;
9 :
10 : use crate::{
11 : console::messages::MetricsAuxInfo, error::ErrorKind, metrics::LatencyTimer, BranchId,
12 : EndpointId, ProjectId, RoleName,
13 : };
14 :
15 : pub mod parquet;
16 :
17 : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
18 :
19 0 : #[derive(Clone)]
20 : /// Context data for a single request to connect to a database.
21 : ///
22 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
23 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
24 : pub struct RequestMonitoring {
25 : pub peer_addr: IpAddr,
26 : pub session_id: Uuid,
27 : pub protocol: &'static str,
28 : first_packet: chrono::DateTime<Utc>,
29 : region: &'static str,
30 :
31 : // filled in as they are discovered
32 : project: Option<ProjectId>,
33 : branch: Option<BranchId>,
34 : endpoint_id: Option<EndpointId>,
35 : user: Option<RoleName>,
36 : application: Option<SmolStr>,
37 : error_kind: Option<ErrorKind>,
38 : success: bool,
39 :
40 : // extra
41 : // This sender is here to keep the request monitoring channel open while requests are taking place.
42 : sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
43 : pub latency_timer: LatencyTimer,
44 : }
45 :
46 : impl RequestMonitoring {
47 149 : pub fn new(
48 149 : session_id: Uuid,
49 149 : peer_addr: IpAddr,
50 149 : protocol: &'static str,
51 149 : region: &'static str,
52 149 : ) -> Self {
53 149 : Self {
54 149 : peer_addr,
55 149 : session_id,
56 149 : protocol,
57 149 : first_packet: Utc::now(),
58 149 : region,
59 149 :
60 149 : project: None,
61 149 : branch: None,
62 149 : endpoint_id: None,
63 149 : user: None,
64 149 : application: None,
65 149 : error_kind: None,
66 149 : success: false,
67 149 :
68 149 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
69 149 : latency_timer: LatencyTimer::new(protocol),
70 149 : }
71 149 : }
72 :
73 : #[cfg(test)]
74 40 : pub fn test() -> Self {
75 40 : RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
76 40 : }
77 :
78 4 : pub fn console_application_name(&self) -> String {
79 4 : format!(
80 4 : "{}/{}",
81 4 : self.application.as_deref().unwrap_or_default(),
82 4 : self.protocol
83 4 : )
84 4 : }
85 :
86 84 : pub fn set_project(&mut self, x: MetricsAuxInfo) {
87 84 : self.branch = Some(x.branch_id);
88 84 : self.endpoint_id = Some(x.endpoint_id);
89 84 : self.project = Some(x.project_id);
90 84 : }
91 :
92 106 : pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) {
93 106 : crate::metrics::CONNECTING_ENDPOINTS
94 106 : .with_label_values(&[self.protocol])
95 106 : .measure(&endpoint_id);
96 106 : self.endpoint_id = Some(endpoint_id);
97 106 : }
98 :
99 115 : pub fn set_application(&mut self, app: Option<SmolStr>) {
100 115 : self.application = app.or_else(|| self.application.clone());
101 115 : }
102 :
103 122 : pub fn set_user(&mut self, user: RoleName) {
104 122 : self.user = Some(user);
105 122 : }
106 :
107 81 : pub fn set_success(&mut self) {
108 81 : self.success = true;
109 81 : }
110 :
111 230 : pub fn log(&mut self) {
112 230 : if let Some(tx) = self.sender.take() {
113 0 : let _: Result<(), _> = tx.send(self.clone());
114 230 : }
115 230 : }
116 : }
117 :
118 : impl Drop for RequestMonitoring {
119 149 : fn drop(&mut self) {
120 149 : self.log()
121 149 : }
122 : }
|