LCOV - code coverage report
Current view: top level - libs/posthog_client_lite/src - background_loop.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 77 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 7 0

            Line data    Source code
       1              : //! A background loop that fetches feature flags from PostHog and updates the feature store.
       2              : 
       3              : use std::{
       4              :     sync::Arc,
       5              :     time::{Duration, SystemTime},
       6              : };
       7              : 
       8              : use arc_swap::ArcSwap;
       9              : use tokio_util::sync::CancellationToken;
      10              : use tracing::{Instrument, info_span};
      11              : 
      12              : use crate::{
      13              :     CaptureEvent, FeatureStore, LocalEvaluationResponse, PostHogClient, PostHogClientConfig,
      14              : };
      15              : 
      16              : /// A background loop that fetches feature flags from PostHog and updates the feature store.
      17              : pub struct FeatureResolverBackgroundLoop {
      18              :     posthog_client: PostHogClient,
      19              :     feature_store: ArcSwap<(SystemTime, Arc<FeatureStore>)>,
      20              :     cancel: CancellationToken,
      21              : }
      22              : 
      23              : impl FeatureResolverBackgroundLoop {
      24            0 :     pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self {
      25            0 :         Self {
      26            0 :             posthog_client: PostHogClient::new(config),
      27            0 :             feature_store: ArcSwap::new(Arc::new((
      28            0 :                 SystemTime::UNIX_EPOCH,
      29            0 :                 Arc::new(FeatureStore::new()),
      30            0 :             ))),
      31            0 :             cancel: shutdown_pageserver,
      32            0 :         }
      33            0 :     }
      34              : 
      35              :     /// Update the feature store with a new feature flag spec bypassing the normal refresh loop.
      36            0 :     pub fn update(&self, spec: String) -> anyhow::Result<()> {
      37            0 :         let resp: LocalEvaluationResponse = serde_json::from_str(&spec)?;
      38            0 :         self.update_feature_store_nofail(resp, "http_propagate");
      39            0 :         Ok(())
      40            0 :     }
      41              : 
      42            0 :     fn update_feature_store_nofail(&self, resp: LocalEvaluationResponse, source: &'static str) {
      43            0 :         let project_id = self.posthog_client.config.project_id.parse::<u64>().ok();
      44            0 :         match FeatureStore::new_with_flags(resp.flags, project_id) {
      45            0 :             Ok(feature_store) => {
      46            0 :                 self.feature_store
      47            0 :                     .store(Arc::new((SystemTime::now(), Arc::new(feature_store))));
      48            0 :                 tracing::info!("Feature flag updated from {}", source);
      49              :             }
      50            0 :             Err(e) => {
      51            0 :                 tracing::warn!("Cannot process feature flag spec from {}: {}", source, e);
      52              :             }
      53              :         }
      54            0 :     }
      55              : 
      56            0 :     pub fn spawn(
      57            0 :         self: Arc<Self>,
      58            0 :         handle: &tokio::runtime::Handle,
      59            0 :         refresh_period: Duration,
      60            0 :         fake_tenants: Vec<CaptureEvent>,
      61            0 :     ) {
      62            0 :         let this = self.clone();
      63            0 :         let cancel = self.cancel.clone();
      64              : 
      65              :         // Main loop of updating the feature flags.
      66            0 :         handle.spawn(
      67            0 :             async move {
      68            0 :                 tracing::info!(
      69            0 :                     "Starting PostHog feature resolver with refresh period: {:?}",
      70              :                     refresh_period
      71              :                 );
      72            0 :                 let mut ticker = tokio::time::interval(refresh_period);
      73            0 :                 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
      74              :                 loop {
      75            0 :                     tokio::select! {
      76            0 :                         _ = ticker.tick() => {}
      77            0 :                         _ = cancel.cancelled() => break
      78              :                     }
      79              :                     {
      80            0 :                         let last_update = this.feature_store.load().0;
      81            0 :                         if let Ok(elapsed) = last_update.elapsed() {
      82            0 :                             if elapsed < refresh_period {
      83            0 :                                 tracing::debug!(
      84            0 :                                     "Skipping feature flag refresh because it's too soon"
      85              :                                 );
      86            0 :                                 continue;
      87            0 :                             }
      88            0 :                         }
      89              :                     }
      90            0 :                     let resp = match this
      91            0 :                         .posthog_client
      92            0 :                         .get_feature_flags_local_evaluation()
      93            0 :                         .await
      94              :                     {
      95            0 :                         Ok(resp) => resp,
      96            0 :                         Err(e) => {
      97            0 :                             tracing::warn!("Cannot get feature flags: {}", e);
      98            0 :                             continue;
      99              :                         }
     100              :                     };
     101            0 :                     this.update_feature_store_nofail(resp, "refresh_loop");
     102              :                 }
     103            0 :                 tracing::info!("PostHog feature resolver stopped");
     104            0 :             }
     105            0 :             .instrument(info_span!("posthog_feature_resolver")),
     106              :         );
     107              : 
     108              :         // Report fake tenants to PostHog so that we have the combination of all the properties in the UI.
     109              :         // Do one report per pageserver restart.
     110            0 :         let this = self.clone();
     111            0 :         handle.spawn(
     112            0 :             async move {
     113            0 :                 tracing::info!("Starting PostHog feature reporter");
     114            0 :                 for tenant in &fake_tenants {
     115            0 :                     tracing::info!("Reporting fake tenant: {:?}", tenant);
     116              :                 }
     117            0 :                 if let Err(e) = this.posthog_client.capture_event_batch(&fake_tenants).await {
     118            0 :                     tracing::warn!("Cannot report fake tenants: {}", e);
     119            0 :                 }
     120            0 :             }
     121            0 :             .instrument(info_span!("posthog_feature_reporter")),
     122              :         );
     123            0 :     }
     124              : 
     125            0 :     pub fn feature_store(&self) -> Arc<FeatureStore> {
     126            0 :         self.feature_store.load().1.clone()
     127            0 :     }
     128              : }
        

Generated by: LCOV version 2.1-beta