|             Line data    Source code 
       1              : use std::str::FromStr as _;
       2              : 
       3              : use crate::worker_process::lfc_metrics::LfcMetricsCollector;
       4              : 
       5              : use measured::MetricGroup;
       6              : use measured::metric::MetricEncoding;
       7              : use measured::metric::gauge::GaugeState;
       8              : use measured::metric::group::Encoding;
       9              : use utils::id::{TenantId, TimelineId};
      10              : 
      11              : pub struct CommunicatorWorkerProcessStruct {
      12              :     runtime: tokio::runtime::Runtime,
      13              : 
      14              :     /*** Metrics ***/
      15              :     pub(crate) lfc_metrics: LfcMetricsCollector,
      16              : }
      17              : 
      18              : /// Launch the communicator process's Rust subsystems
      19            0 : pub(super) fn init(
      20            0 :     tenant_id: Option<&str>,
      21            0 :     timeline_id: Option<&str>,
      22            0 : ) -> Result<&'static CommunicatorWorkerProcessStruct, String> {
      23              :     // The caller validated these already
      24            0 :     let _tenant_id = tenant_id
      25            0 :         .map(TenantId::from_str)
      26            0 :         .transpose()
      27            0 :         .map_err(|e| format!("invalid tenant ID: {e}"))?;
      28            0 :     let _timeline_id = timeline_id
      29            0 :         .map(TimelineId::from_str)
      30            0 :         .transpose()
      31            0 :         .map_err(|e| format!("invalid timeline ID: {e}"))?;
      32              : 
      33            0 :     let runtime = tokio::runtime::Builder::new_multi_thread()
      34            0 :         .enable_all()
      35            0 :         .thread_name("communicator thread")
      36            0 :         .build()
      37            0 :         .unwrap();
      38              : 
      39            0 :     let worker_struct = CommunicatorWorkerProcessStruct {
      40            0 :         // Note: it's important to not drop the runtime, or all the tasks are dropped
      41            0 :         // too. Including it in the returned struct is one way to keep it around.
      42            0 :         runtime,
      43            0 : 
      44            0 :         // metrics
      45            0 :         lfc_metrics: LfcMetricsCollector,
      46            0 :     };
      47            0 :     let worker_struct = Box::leak(Box::new(worker_struct));
      48              : 
      49              :     // Start the listener on the control socket
      50            0 :     worker_struct
      51            0 :         .runtime
      52            0 :         .block_on(worker_struct.launch_control_socket_listener())
      53            0 :         .map_err(|e| e.to_string())?;
      54              : 
      55            0 :     Ok(worker_struct)
      56            0 : }
      57              : 
      58              : impl<T> MetricGroup<T> for CommunicatorWorkerProcessStruct
      59              : where
      60              :     T: Encoding,
      61              :     GaugeState: MetricEncoding<T>,
      62              : {
      63            0 :     fn collect_group_into(&self, enc: &mut T) -> Result<(), T::Err> {
      64            0 :         self.lfc_metrics.collect_group_into(enc)
      65            0 :     }
      66              : }
         |