Line data Source code
1 : //! A background loop that fetches feature flags from PostHog and updates the feature store.
2 :
3 : use std::{sync::Arc, time::Duration};
4 :
5 : use arc_swap::ArcSwap;
6 : use tokio_util::sync::CancellationToken;
7 :
8 : use crate::{FeatureStore, PostHogClient, PostHogClientConfig};
9 :
10 : /// A background loop that fetches feature flags from PostHog and updates the feature store.
11 : pub struct FeatureResolverBackgroundLoop {
12 : posthog_client: PostHogClient,
13 : feature_store: ArcSwap<FeatureStore>,
14 : cancel: CancellationToken,
15 : }
16 :
17 : impl FeatureResolverBackgroundLoop {
18 0 : pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self {
19 0 : Self {
20 0 : posthog_client: PostHogClient::new(config),
21 0 : feature_store: ArcSwap::new(Arc::new(FeatureStore::new())),
22 0 : cancel: shutdown_pageserver,
23 0 : }
24 0 : }
25 :
26 0 : pub fn spawn(self: Arc<Self>, handle: &tokio::runtime::Handle, refresh_period: Duration) {
27 0 : let this = self.clone();
28 0 : let cancel = self.cancel.clone();
29 0 : handle.spawn(async move {
30 0 : tracing::info!("Starting PostHog feature resolver");
31 0 : let mut ticker = tokio::time::interval(refresh_period);
32 0 : ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
33 : loop {
34 0 : tokio::select! {
35 0 : _ = ticker.tick() => {}
36 0 : _ = cancel.cancelled() => break
37 : }
38 0 : let resp = match this
39 0 : .posthog_client
40 0 : .get_feature_flags_local_evaluation()
41 0 : .await
42 : {
43 0 : Ok(resp) => resp,
44 0 : Err(e) => {
45 0 : tracing::warn!("Cannot get feature flags: {}", e);
46 0 : continue;
47 : }
48 : };
49 0 : let feature_store = FeatureStore::new_with_flags(resp.flags);
50 0 : this.feature_store.store(Arc::new(feature_store));
51 : }
52 0 : tracing::info!("PostHog feature resolver stopped");
53 0 : });
54 0 : }
55 :
56 0 : pub fn feature_store(&self) -> Arc<FeatureStore> {
57 0 : self.feature_store.load_full()
58 0 : }
59 : }
|