Line data Source code
1 : use std::{sync::Arc, time::Duration};
2 :
3 : use futures::StreamExt;
4 : use pageserver_api::config::PostHogConfig;
5 : use pageserver_client::mgmt_api;
6 : use posthog_client_lite::PostHogClient;
7 : use reqwest::StatusCode;
8 : use tokio::time::MissedTickBehavior;
9 : use tokio_util::sync::CancellationToken;
10 :
11 : use crate::{pageserver_client::PageserverClient, service::Service};
12 :
13 : pub struct FeatureFlagService {
14 : service: Arc<Service>,
15 : config: PostHogConfig,
16 : client: PostHogClient,
17 : http_client: reqwest::Client,
18 : }
19 :
20 : const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
21 :
22 : impl FeatureFlagService {
23 0 : pub fn new(service: Arc<Service>, config: PostHogConfig) -> Result<Self, &'static str> {
24 0 : let client = PostHogClient::new(config.clone().try_into_posthog_config()?);
25 0 : Ok(Self {
26 0 : service,
27 0 : config,
28 0 : client,
29 0 : http_client: reqwest::Client::new(),
30 0 : })
31 0 : }
32 :
33 0 : async fn refresh(self: Arc<Self>, cancel: CancellationToken) -> Result<(), anyhow::Error> {
34 0 : let nodes = {
35 0 : let inner = self.service.inner.read().unwrap();
36 0 : inner.nodes.clone()
37 : };
38 :
39 0 : let feature_flag_spec = self.client.get_feature_flags_local_evaluation_raw().await?;
40 0 : let stream = futures::stream::iter(nodes.values().cloned()).map(|node| {
41 0 : let this = self.clone();
42 0 : let feature_flag_spec = feature_flag_spec.clone();
43 0 : async move {
44 0 : let res = async {
45 0 : let client = PageserverClient::new(
46 0 : node.get_id(),
47 0 : this.http_client.clone(),
48 0 : node.base_url(),
49 : // TODO: what if we rotate the token during storcon lifetime?
50 0 : this.service.config.pageserver_jwt_token.as_deref(),
51 : );
52 :
53 0 : client.update_feature_flag_spec(feature_flag_spec).await?;
54 0 : tracing::info!(
55 0 : "Updated {}({}) with feature flag spec",
56 0 : node.get_id(),
57 0 : node.base_url()
58 : );
59 0 : Ok::<_, mgmt_api::Error>(())
60 0 : };
61 :
62 0 : if let Err(e) = res.await {
63 0 : if let mgmt_api::Error::ApiError(status, _) = e {
64 0 : if status == StatusCode::NOT_FOUND {
65 : // This is expected during deployments where the API is not available, so we can ignore it
66 0 : return;
67 0 : }
68 0 : }
69 0 : tracing::warn!(
70 0 : "Failed to update feature flag spec for {}: {e}",
71 0 : node.get_id()
72 : );
73 0 : }
74 0 : }
75 0 : });
76 0 : let mut stream = stream.buffer_unordered(8);
77 :
78 0 : while stream.next().await.is_some() {
79 0 : if cancel.is_cancelled() {
80 0 : return Ok(());
81 0 : }
82 : }
83 :
84 0 : Ok(())
85 0 : }
86 :
87 0 : pub async fn run(self: Arc<Self>, cancel: CancellationToken) {
88 0 : let refresh_interval = self
89 0 : .config
90 0 : .refresh_interval
91 0 : .unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL);
92 0 : let mut interval = tokio::time::interval(refresh_interval);
93 0 : interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
94 0 : tracing::info!(
95 0 : "Starting feature flag service with refresh interval: {:?}",
96 : refresh_interval
97 : );
98 : loop {
99 0 : tokio::select! {
100 0 : _ = interval.tick() => {}
101 0 : _ = cancel.cancelled() => {
102 0 : break;
103 : }
104 : }
105 0 : let res = self.clone().refresh(cancel.clone()).await;
106 0 : if let Err(e) = res {
107 0 : tracing::error!("Failed to refresh feature flags: {e:#?}");
108 0 : }
109 : }
110 0 : }
111 : }
|