Line data Source code
1 : use std::str::FromStr as _;
2 :
3 : use crate::worker_process::lfc_metrics::LfcMetricsCollector;
4 :
5 : use measured::MetricGroup;
6 : use measured::metric::MetricEncoding;
7 : use measured::metric::gauge::GaugeState;
8 : use measured::metric::group::Encoding;
9 : use utils::id::{TenantId, TimelineId};
10 :
11 : pub struct CommunicatorWorkerProcessStruct {
12 : runtime: tokio::runtime::Runtime,
13 :
14 : /*** Metrics ***/
15 : pub(crate) lfc_metrics: LfcMetricsCollector,
16 : }
17 :
18 : /// Launch the communicator process's Rust subsystems
19 0 : pub(super) fn init(
20 0 : tenant_id: Option<&str>,
21 0 : timeline_id: Option<&str>,
22 0 : ) -> Result<&'static CommunicatorWorkerProcessStruct, String> {
23 : // The caller validated these already
24 0 : let _tenant_id = tenant_id
25 0 : .map(TenantId::from_str)
26 0 : .transpose()
27 0 : .map_err(|e| format!("invalid tenant ID: {e}"))?;
28 0 : let _timeline_id = timeline_id
29 0 : .map(TimelineId::from_str)
30 0 : .transpose()
31 0 : .map_err(|e| format!("invalid timeline ID: {e}"))?;
32 :
33 0 : let runtime = tokio::runtime::Builder::new_multi_thread()
34 0 : .enable_all()
35 0 : .thread_name("communicator thread")
36 0 : .build()
37 0 : .unwrap();
38 :
39 0 : let worker_struct = CommunicatorWorkerProcessStruct {
40 0 : // Note: it's important to not drop the runtime, or all the tasks are dropped
41 0 : // too. Including it in the returned struct is one way to keep it around.
42 0 : runtime,
43 0 :
44 0 : // metrics
45 0 : lfc_metrics: LfcMetricsCollector,
46 0 : };
47 0 : let worker_struct = Box::leak(Box::new(worker_struct));
48 :
49 : // Start the listener on the control socket
50 0 : worker_struct
51 0 : .runtime
52 0 : .block_on(worker_struct.launch_control_socket_listener())
53 0 : .map_err(|e| e.to_string())?;
54 :
55 0 : Ok(worker_struct)
56 0 : }
57 :
58 : impl<T> MetricGroup<T> for CommunicatorWorkerProcessStruct
59 : where
60 : T: Encoding,
61 : GaugeState: MetricEncoding<T>,
62 : {
63 0 : fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> {
64 0 : self.lfc_metrics.collect_group_into(enc)
65 0 : }
66 : }
|