Line data Source code
1 : //! A background loop that fetches feature flags from PostHog and updates the feature store.
2 :
3 : use std::{
4 : sync::Arc,
5 : time::{Duration, SystemTime},
6 : };
7 :
8 : use arc_swap::ArcSwap;
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::{Instrument, info_span};
11 :
12 : use crate::{
13 : CaptureEvent, FeatureStore, LocalEvaluationResponse, PostHogClient, PostHogClientConfig,
14 : };
15 :
16 : /// A background loop that fetches feature flags from PostHog and updates the feature store.
17 : pub struct FeatureResolverBackgroundLoop {
18 : posthog_client: PostHogClient,
19 : feature_store: ArcSwap<(SystemTime, Arc<FeatureStore>)>,
20 : cancel: CancellationToken,
21 : }
22 :
23 : impl FeatureResolverBackgroundLoop {
24 0 : pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self {
25 0 : Self {
26 0 : posthog_client: PostHogClient::new(config),
27 0 : feature_store: ArcSwap::new(Arc::new((
28 0 : SystemTime::UNIX_EPOCH,
29 0 : Arc::new(FeatureStore::new()),
30 0 : ))),
31 0 : cancel: shutdown_pageserver,
32 0 : }
33 0 : }
34 :
35 : /// Update the feature store with a new feature flag spec bypassing the normal refresh loop.
36 0 : pub fn update(&self, spec: String) -> anyhow::Result<()> {
37 0 : let resp: LocalEvaluationResponse = serde_json::from_str(&spec)?;
38 0 : self.update_feature_store_nofail(resp, "http_propagate");
39 0 : Ok(())
40 0 : }
41 :
42 0 : fn update_feature_store_nofail(&self, resp: LocalEvaluationResponse, source: &'static str) {
43 0 : let project_id = self.posthog_client.config.project_id.parse::<u64>().ok();
44 0 : match FeatureStore::new_with_flags(resp.flags, project_id) {
45 0 : Ok(feature_store) => {
46 0 : self.feature_store
47 0 : .store(Arc::new((SystemTime::now(), Arc::new(feature_store))));
48 0 : tracing::info!("Feature flag updated from {}", source);
49 : }
50 0 : Err(e) => {
51 0 : tracing::warn!("Cannot process feature flag spec from {}: {}", source, e);
52 : }
53 : }
54 0 : }
55 :
56 0 : pub fn spawn(
57 0 : self: Arc<Self>,
58 0 : handle: &tokio::runtime::Handle,
59 0 : refresh_period: Duration,
60 0 : fake_tenants: Vec<CaptureEvent>,
61 0 : ) {
62 0 : let this = self.clone();
63 0 : let cancel = self.cancel.clone();
64 :
65 : // Main loop of updating the feature flags.
66 0 : handle.spawn(
67 0 : async move {
68 0 : tracing::info!(
69 0 : "Starting PostHog feature resolver with refresh period: {:?}",
70 : refresh_period
71 : );
72 0 : let mut ticker = tokio::time::interval(refresh_period);
73 0 : ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
74 : loop {
75 0 : tokio::select! {
76 0 : _ = ticker.tick() => {}
77 0 : _ = cancel.cancelled() => break
78 : }
79 : {
80 0 : let last_update = this.feature_store.load().0;
81 0 : if let Ok(elapsed) = last_update.elapsed() {
82 0 : if elapsed < refresh_period {
83 0 : tracing::debug!(
84 0 : "Skipping feature flag refresh because it's too soon"
85 : );
86 0 : continue;
87 0 : }
88 0 : }
89 : }
90 0 : let resp = match this
91 0 : .posthog_client
92 0 : .get_feature_flags_local_evaluation()
93 0 : .await
94 : {
95 0 : Ok(resp) => resp,
96 0 : Err(e) => {
97 0 : tracing::warn!("Cannot get feature flags: {}", e);
98 0 : continue;
99 : }
100 : };
101 0 : this.update_feature_store_nofail(resp, "refresh_loop");
102 : }
103 0 : tracing::info!("PostHog feature resolver stopped");
104 0 : }
105 0 : .instrument(info_span!("posthog_feature_resolver")),
106 : );
107 :
108 : // Report fake tenants to PostHog so that we have the combination of all the properties in the UI.
109 : // Do one report per pageserver restart.
110 0 : let this = self.clone();
111 0 : handle.spawn(
112 0 : async move {
113 0 : tracing::info!("Starting PostHog feature reporter");
114 0 : for tenant in &fake_tenants {
115 0 : tracing::info!("Reporting fake tenant: {:?}", tenant);
116 : }
117 0 : if let Err(e) = this.posthog_client.capture_event_batch(&fake_tenants).await {
118 0 : tracing::warn!("Cannot report fake tenants: {}", e);
119 0 : }
120 0 : }
121 0 : .instrument(info_span!("posthog_feature_reporter")),
122 : );
123 0 : }
124 :
125 0 : pub fn feature_store(&self) -> Arc<FeatureStore> {
126 0 : self.feature_store.load().1.clone()
127 0 : }
128 : }
|