TLA Line data Source code
1 : //! Connection request monitoring contexts
2 :
3 : use chrono::Utc;
4 : use once_cell::sync::OnceCell;
5 : use smol_str::SmolStr;
6 : use std::net::IpAddr;
7 : use tokio::sync::mpsc;
8 : use uuid::Uuid;
9 :
10 : use crate::{console::messages::MetricsAuxInfo, error::ErrorKind, metrics::LatencyTimer};
11 :
12 : pub mod parquet;
13 :
14 : static LOG_CHAN: OnceCell<mpsc::WeakUnboundedSender<RequestMonitoring>> = OnceCell::new();
15 :
16 UBC 0 : #[derive(Clone)]
17 : /// Context data for a single request to connect to a database.
18 : ///
19 : /// This data should **not** be used for connection logic, only for observability and limiting purposes.
20 : /// All connection logic should instead use strongly typed state machines, not a bunch of Options.
21 : pub struct RequestMonitoring {
22 : pub peer_addr: IpAddr,
23 : pub session_id: Uuid,
24 : pub protocol: &'static str,
25 : first_packet: chrono::DateTime<Utc>,
26 : region: &'static str,
27 :
28 : // filled in as they are discovered
29 : project: Option<SmolStr>,
30 : branch: Option<SmolStr>,
31 : endpoint_id: Option<SmolStr>,
32 : user: Option<SmolStr>,
33 : application: Option<SmolStr>,
34 : error_kind: Option<ErrorKind>,
35 :
36 : // extra
37 : // This sender is here to keep the request monitoring channel open while requests are taking place.
38 : sender: Option<mpsc::UnboundedSender<RequestMonitoring>>,
39 : pub latency_timer: LatencyTimer,
40 : }
41 :
42 : impl RequestMonitoring {
43 CBC 127 : pub fn new(
44 127 : session_id: Uuid,
45 127 : peer_addr: IpAddr,
46 127 : protocol: &'static str,
47 127 : region: &'static str,
48 127 : ) -> Self {
49 127 : Self {
50 127 : peer_addr,
51 127 : session_id,
52 127 : protocol,
53 127 : first_packet: Utc::now(),
54 127 : region,
55 127 :
56 127 : project: None,
57 127 : branch: None,
58 127 : endpoint_id: None,
59 127 : user: None,
60 127 : application: None,
61 127 : error_kind: None,
62 127 :
63 127 : sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
64 127 : latency_timer: LatencyTimer::new(protocol),
65 127 : }
66 127 : }
67 :
68 : #[cfg(test)]
69 20 : pub fn test() -> Self {
70 20 : RequestMonitoring::new(Uuid::now_v7(), [127, 0, 0, 1].into(), "test", "test")
71 20 : }
72 :
73 4 : pub fn console_application_name(&self) -> String {
74 4 : format!(
75 4 : "{}/{}",
76 4 : self.application.as_deref().unwrap_or_default(),
77 4 : self.protocol
78 4 : )
79 4 : }
80 :
81 75 : pub fn set_project(&mut self, x: MetricsAuxInfo) {
82 75 : self.branch = Some(x.branch_id);
83 75 : self.endpoint_id = Some(x.endpoint_id);
84 75 : self.project = Some(x.project_id);
85 75 : }
86 :
87 196 : pub fn set_endpoint_id(&mut self, endpoint_id: Option<SmolStr>) {
88 196 : self.endpoint_id = endpoint_id.or_else(|| self.endpoint_id.clone());
89 196 : }
90 :
91 100 : pub fn set_application(&mut self, app: Option<SmolStr>) {
92 100 : self.application = app.or_else(|| self.application.clone());
93 100 : }
94 :
95 145 : pub fn set_user(&mut self, user: SmolStr) {
96 145 : self.user = Some(user);
97 145 : }
98 :
99 206 : pub fn log(&mut self) {
100 206 : if let Some(tx) = self.sender.take() {
101 UBC 0 : let _: Result<(), _> = tx.send(self.clone());
102 CBC 206 : }
103 206 : }
104 : }
105 :
106 : impl Drop for RequestMonitoring {
107 127 : fn drop(&mut self) {
108 127 : self.log()
109 127 : }
110 : }
|