LCOV - code coverage report
Current view: top level - storage_controller/src/service - feature_flag.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 68 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 8 0

            Line data    Source code
       1              : use std::{sync::Arc, time::Duration};
       2              : 
       3              : use futures::StreamExt;
       4              : use pageserver_api::config::PostHogConfig;
       5              : use pageserver_client::mgmt_api;
       6              : use posthog_client_lite::PostHogClient;
       7              : use reqwest::StatusCode;
       8              : use tokio::time::MissedTickBehavior;
       9              : use tokio_util::sync::CancellationToken;
      10              : 
      11              : use crate::{pageserver_client::PageserverClient, service::Service};
      12              : 
      13              : pub struct FeatureFlagService {
      14              :     service: Arc<Service>,
      15              :     config: PostHogConfig,
      16              :     client: PostHogClient,
      17              :     http_client: reqwest::Client,
      18              : }
      19              : 
      20              : const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
      21              : 
      22              : impl FeatureFlagService {
      23            0 :     pub fn new(service: Arc<Service>, config: PostHogConfig) -> Result<Self, &'static str> {
      24            0 :         let client = PostHogClient::new(config.clone().try_into_posthog_config()?);
      25            0 :         Ok(Self {
      26            0 :             service,
      27            0 :             config,
      28            0 :             client,
      29            0 :             http_client: reqwest::Client::new(),
      30            0 :         })
      31            0 :     }
      32              : 
      33            0 :     async fn refresh(self: Arc<Self>, cancel: CancellationToken) -> Result<(), anyhow::Error> {
      34            0 :         let nodes = {
      35            0 :             let inner = self.service.inner.read().unwrap();
      36            0 :             inner.nodes.clone()
      37              :         };
      38              : 
      39            0 :         let feature_flag_spec = self.client.get_feature_flags_local_evaluation_raw().await?;
      40            0 :         let stream = futures::stream::iter(nodes.values().cloned()).map(|node| {
      41            0 :             let this = self.clone();
      42            0 :             let feature_flag_spec = feature_flag_spec.clone();
      43            0 :             async move {
      44            0 :                 let res = async {
      45            0 :                     let client = PageserverClient::new(
      46            0 :                         node.get_id(),
      47            0 :                         this.http_client.clone(),
      48            0 :                         node.base_url(),
      49              :                         // TODO: what if we rotate the token during storcon lifetime?
      50            0 :                         this.service.config.pageserver_jwt_token.as_deref(),
      51              :                     );
      52              : 
      53            0 :                     client.update_feature_flag_spec(feature_flag_spec).await?;
      54            0 :                     tracing::info!(
      55            0 :                         "Updated {}({}) with feature flag spec",
      56            0 :                         node.get_id(),
      57            0 :                         node.base_url()
      58              :                     );
      59            0 :                     Ok::<_, mgmt_api::Error>(())
      60            0 :                 };
      61              : 
      62            0 :                 if let Err(e) = res.await {
      63            0 :                     if let mgmt_api::Error::ApiError(status, _) = e {
      64            0 :                         if status == StatusCode::NOT_FOUND {
      65              :                             // This is expected during deployments where the API is not available, so we can ignore it
      66            0 :                             return;
      67            0 :                         }
      68            0 :                     }
      69            0 :                     tracing::warn!(
      70            0 :                         "Failed to update feature flag spec for {}: {e}",
      71            0 :                         node.get_id()
      72              :                     );
      73            0 :                 }
      74            0 :             }
      75            0 :         });
      76            0 :         let mut stream = stream.buffer_unordered(8);
      77              : 
      78            0 :         while stream.next().await.is_some() {
      79            0 :             if cancel.is_cancelled() {
      80            0 :                 return Ok(());
      81            0 :             }
      82              :         }
      83              : 
      84            0 :         Ok(())
      85            0 :     }
      86              : 
      87            0 :     pub async fn run(self: Arc<Self>, cancel: CancellationToken) {
      88            0 :         let refresh_interval = self
      89            0 :             .config
      90            0 :             .refresh_interval
      91            0 :             .unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL);
      92            0 :         let mut interval = tokio::time::interval(refresh_interval);
      93            0 :         interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
      94            0 :         tracing::info!(
      95            0 :             "Starting feature flag service with refresh interval: {:?}",
      96              :             refresh_interval
      97              :         );
      98              :         loop {
      99            0 :             tokio::select! {
     100            0 :                 _ = interval.tick() => {}
     101            0 :                 _ = cancel.cancelled() => {
     102            0 :                     break;
     103              :                 }
     104              :             }
     105            0 :             let res = self.clone().refresh(cancel.clone()).await;
     106            0 :             if let Err(e) = res {
     107            0 :                 tracing::error!("Failed to refresh feature flags: {e:#?}");
     108            0 :             }
     109              :         }
     110            0 :     }
     111              : }
        

Generated by: LCOV version 2.1-beta