Line data Source code
1 : //! A background loop that fetches feature flags from PostHog and updates the feature store.
2 :
3 : use std::{sync::Arc, time::Duration};
4 :
5 : use arc_swap::ArcSwap;
6 : use tokio_util::sync::CancellationToken;
7 : use tracing::{Instrument, info_span};
8 :
9 : use crate::{CaptureEvent, FeatureStore, PostHogClient, PostHogClientConfig};
10 :
11 : /// A background loop that fetches feature flags from PostHog and updates the feature store.
12 : pub struct FeatureResolverBackgroundLoop {
13 : posthog_client: PostHogClient,
14 : feature_store: ArcSwap<FeatureStore>,
15 : cancel: CancellationToken,
16 : }
17 :
18 : impl FeatureResolverBackgroundLoop {
19 0 : pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self {
20 0 : Self {
21 0 : posthog_client: PostHogClient::new(config),
22 0 : feature_store: ArcSwap::new(Arc::new(FeatureStore::new())),
23 0 : cancel: shutdown_pageserver,
24 0 : }
25 0 : }
26 :
27 0 : pub fn spawn(
28 0 : self: Arc<Self>,
29 0 : handle: &tokio::runtime::Handle,
30 0 : refresh_period: Duration,
31 0 : fake_tenants: Vec<CaptureEvent>,
32 0 : ) {
33 0 : let this = self.clone();
34 0 : let cancel = self.cancel.clone();
35 0 :
36 0 : // Main loop of updating the feature flags.
37 0 : handle.spawn(
38 0 : async move {
39 0 : tracing::info!("Starting PostHog feature resolver");
40 0 : let mut ticker = tokio::time::interval(refresh_period);
41 0 : ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
42 : loop {
43 0 : tokio::select! {
44 0 : _ = ticker.tick() => {}
45 0 : _ = cancel.cancelled() => break
46 : }
47 0 : let resp = match this
48 0 : .posthog_client
49 0 : .get_feature_flags_local_evaluation()
50 0 : .await
51 : {
52 0 : Ok(resp) => resp,
53 0 : Err(e) => {
54 0 : tracing::warn!("Cannot get feature flags: {}", e);
55 0 : continue;
56 : }
57 : };
58 0 : let project_id = this.posthog_client.config.project_id.parse::<u64>().ok();
59 0 : match FeatureStore::new_with_flags(resp.flags, project_id) {
60 0 : Ok(feature_store) => {
61 0 : this.feature_store.store(Arc::new(feature_store));
62 0 : tracing::info!("Feature flag updated");
63 : }
64 0 : Err(e) => {
65 0 : tracing::warn!("Cannot process feature flag spec: {}", e);
66 : }
67 : }
68 : }
69 0 : tracing::info!("PostHog feature resolver stopped");
70 0 : }
71 0 : .instrument(info_span!("posthog_feature_resolver")),
72 : );
73 :
74 : // Report fake tenants to PostHog so that we have the combination of all the properties in the UI.
75 : // Do one report per pageserver restart.
76 0 : let this = self.clone();
77 0 : handle.spawn(
78 0 : async move {
79 0 : tracing::info!("Starting PostHog feature reporter");
80 0 : for tenant in &fake_tenants {
81 0 : tracing::info!("Reporting fake tenant: {:?}", tenant);
82 : }
83 0 : if let Err(e) = this.posthog_client.capture_event_batch(&fake_tenants).await {
84 0 : tracing::warn!("Cannot report fake tenants: {}", e);
85 0 : }
86 0 : }
87 0 : .instrument(info_span!("posthog_feature_reporter")),
88 : );
89 0 : }
90 :
91 0 : pub fn feature_store(&self) -> Arc<FeatureStore> {
92 0 : self.feature_store.load_full()
93 0 : }
94 : }
|