LCOV - code coverage report
Current view: top level - pageserver/src - feature_resolver.rs (source / functions) Coverage Total Hit
Test: ac1e0b9bf1b4ead74961174b01ba016322d3f9a6.info Lines: 4.6 % 302 14
Test Date: 2025-07-08 09:16:10 Functions: 12.5 % 16 2

            Line data    Source code
       1              : use std::{collections::HashMap, sync::Arc, time::Duration};
       2              : 
       3              : use arc_swap::ArcSwap;
       4              : use pageserver_api::config::NodeMetadata;
       5              : use posthog_client_lite::{
       6              :     CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError,
       7              :     PostHogFlagFilterPropertyValue,
       8              : };
       9              : use rand::Rng;
      10              : use remote_storage::RemoteStorageKind;
      11              : use serde_json::json;
      12              : use tokio_util::sync::CancellationToken;
      13              : use utils::id::TenantId;
      14              : 
      15              : use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION, tenant::TenantShard};
      16              : 
      17              : const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
      18              : 
      19              : #[derive(Clone)]
      20              : pub struct FeatureResolver {
      21              :     inner: Option<Arc<FeatureResolverBackgroundLoop>>,
      22              :     internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
      23              :     force_overrides_for_testing: Arc<ArcSwap<HashMap<String, String>>>,
      24              : }
      25              : 
      26              : impl FeatureResolver {
      27          119 :     pub fn new_disabled() -> Self {
      28          119 :         Self {
      29          119 :             inner: None,
      30          119 :             internal_properties: None,
      31          119 :             force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
      32          119 :         }
      33          119 :     }
      34              : 
      35            0 :     pub fn update(&self, spec: String) -> anyhow::Result<()> {
      36            0 :         if let Some(inner) = &self.inner {
      37            0 :             inner.update(spec)?;
      38            0 :         }
      39            0 :         Ok(())
      40            0 :     }
      41              : 
      42            0 :     pub fn spawn(
      43            0 :         conf: &PageServerConf,
      44            0 :         shutdown_pageserver: CancellationToken,
      45            0 :         handle: &tokio::runtime::Handle,
      46            0 :     ) -> anyhow::Result<Self> {
      47              :         // DO NOT block in this function: make it return as fast as possible to avoid startup delays.
      48            0 :         if let Some(posthog_config) = &conf.posthog_config {
      49            0 :             let posthog_client_config = match posthog_config.clone().try_into_posthog_config() {
      50            0 :                 Ok(config) => config,
      51            0 :                 Err(e) => {
      52            0 :                     tracing::warn!(
      53            0 :                         "invalid posthog config, skipping posthog integration: {}",
      54              :                         e
      55              :                     );
      56            0 :                     return Ok(FeatureResolver {
      57            0 :                         inner: None,
      58            0 :                         internal_properties: None,
      59            0 :                         force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(
      60            0 :                             HashMap::new(),
      61            0 :                         ))),
      62            0 :                     });
      63              :                 }
      64              :             };
      65            0 :             let inner =
      66            0 :                 FeatureResolverBackgroundLoop::new(posthog_client_config, shutdown_pageserver);
      67            0 :             let inner = Arc::new(inner);
      68              : 
      69              :             // The properties shared by all tenants on this pageserver.
      70            0 :             let internal_properties = {
      71            0 :                 let mut properties = HashMap::new();
      72            0 :                 properties.insert(
      73            0 :                     "pageserver_id".to_string(),
      74            0 :                     PostHogFlagFilterPropertyValue::String(conf.id.to_string()),
      75              :                 );
      76            0 :                 if let Some(availability_zone) = &conf.availability_zone {
      77            0 :                     properties.insert(
      78            0 :                         "availability_zone".to_string(),
      79            0 :                         PostHogFlagFilterPropertyValue::String(availability_zone.clone()),
      80            0 :                     );
      81            0 :                 }
      82              :                 // Infer region based on the remote storage config.
      83            0 :                 if let Some(remote_storage) = &conf.remote_storage_config {
      84            0 :                     match &remote_storage.storage {
      85            0 :                         RemoteStorageKind::AwsS3(config) => {
      86            0 :                             properties.insert(
      87            0 :                                 "region".to_string(),
      88            0 :                                 PostHogFlagFilterPropertyValue::String(format!(
      89            0 :                                     "aws-{}",
      90            0 :                                     config.bucket_region
      91            0 :                                 )),
      92            0 :                             );
      93            0 :                         }
      94            0 :                         RemoteStorageKind::AzureContainer(config) => {
      95            0 :                             properties.insert(
      96            0 :                                 "region".to_string(),
      97            0 :                                 PostHogFlagFilterPropertyValue::String(format!(
      98            0 :                                     "azure-{}",
      99            0 :                                     config.container_region
     100            0 :                                 )),
     101            0 :                             );
     102            0 :                         }
     103            0 :                         RemoteStorageKind::LocalFs { .. } => {
     104            0 :                             properties.insert(
     105            0 :                                 "region".to_string(),
     106            0 :                                 PostHogFlagFilterPropertyValue::String("local".to_string()),
     107            0 :                             );
     108            0 :                         }
     109              :                     }
     110            0 :                 }
     111              :                 // TODO: move this to a background task so that we don't block startup in case of slow disk
     112            0 :                 let metadata_path = conf.metadata_path();
     113            0 :                 match std::fs::read_to_string(&metadata_path) {
     114            0 :                     Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
     115            0 :                         Ok(metadata) => {
     116            0 :                             properties.insert(
     117            0 :                                 "hostname".to_string(),
     118            0 :                                 PostHogFlagFilterPropertyValue::String(metadata.http_host),
     119              :                             );
     120            0 :                             if let Some(cplane_region) = metadata.other.get("region_id") {
     121            0 :                                 if let Some(cplane_region) = cplane_region.as_str() {
     122            0 :                                     // This region contains the cell number
     123            0 :                                     properties.insert(
     124            0 :                                         "neon_region".to_string(),
     125            0 :                                         PostHogFlagFilterPropertyValue::String(
     126            0 :                                             cplane_region.to_string(),
     127            0 :                                         ),
     128            0 :                                     );
     129            0 :                                 }
     130            0 :                             }
     131              :                         }
     132            0 :                         Err(e) => {
     133            0 :                             tracing::warn!("Failed to parse metadata.json: {}", e);
     134              :                         }
     135              :                     },
     136            0 :                     Err(e) => {
     137            0 :                         tracing::warn!("Failed to read metadata.json: {}", e);
     138              :                     }
     139              :                 }
     140            0 :                 Arc::new(properties)
     141              :             };
     142              : 
     143            0 :             let fake_tenants = {
     144            0 :                 let mut tenants = Vec::new();
     145            0 :                 for i in 0..10 {
     146            0 :                     let distinct_id = format!(
     147            0 :                         "fake_tenant_{}_{}_{}",
     148            0 :                         conf.availability_zone.as_deref().unwrap_or_default(),
     149            0 :                         conf.id,
     150            0 :                         i
     151            0 :                     );
     152            0 : 
     153            0 :                     let tenant_properties = PerTenantProperties {
     154            0 :                         remote_size_mb: Some(rand::thread_rng().gen_range(100.0..1000000.00)),
     155            0 :                     }
     156            0 :                     .into_posthog_properties();
     157            0 : 
     158            0 :                     let properties = Self::collect_properties_inner(
     159            0 :                         distinct_id.clone(),
     160            0 :                         Some(&internal_properties),
     161            0 :                         &tenant_properties,
     162            0 :                     );
     163            0 :                     tenants.push(CaptureEvent {
     164            0 :                         event: "initial_tenant_report".to_string(),
     165            0 :                         distinct_id,
     166            0 :                         properties: json!({ "$set": properties }), // use `$set` to set the person properties instead of the event properties
     167            0 :                     });
     168            0 :                 }
     169            0 :                 tenants
     170              :             };
     171            0 :             inner.clone().spawn(
     172            0 :                 handle,
     173            0 :                 posthog_config
     174            0 :                     .refresh_interval
     175            0 :                     .unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL),
     176            0 :                 fake_tenants,
     177              :             );
     178            0 :             Ok(FeatureResolver {
     179            0 :                 inner: Some(inner),
     180            0 :                 internal_properties: Some(internal_properties),
     181            0 :                 force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
     182            0 :             })
     183              :         } else {
     184            0 :             Ok(FeatureResolver {
     185            0 :                 inner: None,
     186            0 :                 internal_properties: None,
     187            0 :                 force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
     188            0 :             })
     189              :         }
     190            0 :     }
     191              : 
     192            0 :     fn collect_properties_inner(
     193            0 :         tenant_id: String,
     194            0 :         internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
     195            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     196            0 :     ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     197            0 :         let mut properties = HashMap::new();
     198            0 :         if let Some(internal_properties) = internal_properties {
     199            0 :             for (key, value) in internal_properties.iter() {
     200            0 :                 properties.insert(key.clone(), value.clone());
     201            0 :             }
     202            0 :         }
     203            0 :         properties.insert(
     204            0 :             "tenant_id".to_string(),
     205            0 :             PostHogFlagFilterPropertyValue::String(tenant_id),
     206              :         );
     207            0 :         for (key, value) in tenant_properties.iter() {
     208            0 :             properties.insert(key.clone(), value.clone());
     209            0 :         }
     210            0 :         properties
     211            0 :     }
     212              : 
     213              :     /// Collect all properties availble for the feature flag evaluation.
     214            0 :     pub(crate) fn collect_properties(
     215            0 :         &self,
     216            0 :         tenant_id: TenantId,
     217            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     218            0 :     ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     219            0 :         Self::collect_properties_inner(
     220            0 :             tenant_id.to_string(),
     221            0 :             self.internal_properties.as_deref(),
     222            0 :             tenant_properties,
     223              :         )
     224            0 :     }
     225              : 
     226              :     /// Evaluate a multivariate feature flag. Currently, we do not support any properties.
     227              :     ///
     228              :     /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
     229              :     /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
     230              :     /// propagated beyond where the feature flag gets resolved.
     231            0 :     pub fn evaluate_multivariate(
     232            0 :         &self,
     233            0 :         flag_key: &str,
     234            0 :         tenant_id: TenantId,
     235            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     236            0 :     ) -> Result<String, PostHogEvaluationError> {
     237            0 :         let force_overrides = self.force_overrides_for_testing.load();
     238            0 :         if let Some(value) = force_overrides.get(flag_key) {
     239            0 :             return Ok(value.clone());
     240            0 :         }
     241              : 
     242            0 :         if let Some(inner) = &self.inner {
     243            0 :             let res = inner.feature_store().evaluate_multivariate(
     244            0 :                 flag_key,
     245            0 :                 &tenant_id.to_string(),
     246            0 :                 &self.collect_properties(tenant_id, tenant_properties),
     247            0 :             );
     248            0 :             match &res {
     249            0 :                 Ok(value) => {
     250            0 :                     FEATURE_FLAG_EVALUATION
     251            0 :                         .with_label_values(&[flag_key, "ok", value])
     252            0 :                         .inc();
     253            0 :                 }
     254            0 :                 Err(e) => {
     255            0 :                     FEATURE_FLAG_EVALUATION
     256            0 :                         .with_label_values(&[flag_key, "error", e.as_variant_str()])
     257            0 :                         .inc();
     258            0 :                 }
     259              :             }
     260            0 :             res
     261              :         } else {
     262            0 :             Err(PostHogEvaluationError::NotAvailable(
     263            0 :                 "PostHog integration is not enabled".to_string(),
     264            0 :             ))
     265              :         }
     266            0 :     }
     267              : 
     268              :     /// Evaluate a boolean feature flag. Currently, we do not support any properties.
     269              :     ///
     270              :     /// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error.
     271              :     ///
     272              :     /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
     273              :     /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
     274              :     /// propagated beyond where the feature flag gets resolved.
     275            0 :     pub fn evaluate_boolean(
     276            0 :         &self,
     277            0 :         flag_key: &str,
     278            0 :         tenant_id: TenantId,
     279            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     280            0 :     ) -> Result<(), PostHogEvaluationError> {
     281            0 :         let force_overrides = self.force_overrides_for_testing.load();
     282            0 :         if let Some(value) = force_overrides.get(flag_key) {
     283            0 :             return if value == "true" {
     284            0 :                 Ok(())
     285              :             } else {
     286            0 :                 Err(PostHogEvaluationError::NoConditionGroupMatched)
     287              :             };
     288            0 :         }
     289              : 
     290            0 :         if let Some(inner) = &self.inner {
     291            0 :             let res = inner.feature_store().evaluate_boolean(
     292            0 :                 flag_key,
     293            0 :                 &tenant_id.to_string(),
     294            0 :                 &self.collect_properties(tenant_id, tenant_properties),
     295            0 :             );
     296            0 :             match &res {
     297            0 :                 Ok(()) => {
     298            0 :                     FEATURE_FLAG_EVALUATION
     299            0 :                         .with_label_values(&[flag_key, "ok", "true"])
     300            0 :                         .inc();
     301            0 :                 }
     302            0 :                 Err(e) => {
     303            0 :                     FEATURE_FLAG_EVALUATION
     304            0 :                         .with_label_values(&[flag_key, "error", e.as_variant_str()])
     305            0 :                         .inc();
     306            0 :                 }
     307              :             }
     308            0 :             res
     309              :         } else {
     310            0 :             Err(PostHogEvaluationError::NotAvailable(
     311            0 :                 "PostHog integration is not enabled".to_string(),
     312            0 :             ))
     313              :         }
     314            0 :     }
     315              : 
     316            0 :     pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
     317            0 :         if let Some(inner) = &self.inner {
     318            0 :             inner.feature_store().is_feature_flag_boolean(flag_key)
     319              :         } else {
     320            0 :             Err(PostHogEvaluationError::NotAvailable(
     321            0 :                 "PostHog integration is not enabled, cannot auto-determine the flag type"
     322            0 :                     .to_string(),
     323            0 :             ))
     324              :         }
     325            0 :     }
     326              : 
     327              :     /// Force override a feature flag for testing. This is only for testing purposes. Assume the caller only call it
     328              :     /// from a single thread so it won't race.
     329            0 :     pub fn force_override_for_testing(&self, flag_key: &str, value: Option<&str>) {
     330            0 :         let mut force_overrides = self.force_overrides_for_testing.load().as_ref().clone();
     331            0 :         if let Some(value) = value {
     332            0 :             force_overrides.insert(flag_key.to_string(), value.to_string());
     333            0 :         } else {
     334            0 :             force_overrides.remove(flag_key);
     335            0 :         }
     336            0 :         self.force_overrides_for_testing
     337            0 :             .store(Arc::new(force_overrides));
     338            0 :     }
     339              : }
     340              : 
     341              : struct PerTenantProperties {
     342              :     pub remote_size_mb: Option<f64>,
     343              : }
     344              : 
     345              : impl PerTenantProperties {
     346            0 :     pub fn into_posthog_properties(self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     347            0 :         let mut properties = HashMap::new();
     348            0 :         if let Some(remote_size_mb) = self.remote_size_mb {
     349            0 :             properties.insert(
     350            0 :                 "tenant_remote_size_mb".to_string(),
     351            0 :                 PostHogFlagFilterPropertyValue::Number(remote_size_mb),
     352            0 :             );
     353            0 :         }
     354            0 :         properties
     355            0 :     }
     356              : }
     357              : 
     358              : #[derive(Clone)]
     359              : pub struct TenantFeatureResolver {
     360              :     inner: FeatureResolver,
     361              :     tenant_id: TenantId,
     362              :     cached_tenant_properties: Arc<ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>>,
     363              : }
     364              : 
     365              : impl TenantFeatureResolver {
     366          118 :     pub fn new(inner: FeatureResolver, tenant_id: TenantId) -> Self {
     367          118 :         Self {
     368          118 :             inner,
     369          118 :             tenant_id,
     370          118 :             cached_tenant_properties: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
     371          118 :         }
     372          118 :     }
     373              : 
     374            0 :     pub fn evaluate_multivariate(&self, flag_key: &str) -> Result<String, PostHogEvaluationError> {
     375            0 :         self.inner.evaluate_multivariate(
     376            0 :             flag_key,
     377            0 :             self.tenant_id,
     378            0 :             &self.cached_tenant_properties.load(),
     379              :         )
     380            0 :     }
     381              : 
     382            0 :     pub fn evaluate_boolean(&self, flag_key: &str) -> Result<(), PostHogEvaluationError> {
     383            0 :         self.inner.evaluate_boolean(
     384            0 :             flag_key,
     385            0 :             self.tenant_id,
     386            0 :             &self.cached_tenant_properties.load(),
     387              :         )
     388            0 :     }
     389              : 
     390            0 :     pub fn collect_properties(&self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     391            0 :         self.inner
     392            0 :             .collect_properties(self.tenant_id, &self.cached_tenant_properties.load())
     393            0 :     }
     394              : 
     395            0 :     pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
     396            0 :         self.inner.is_feature_flag_boolean(flag_key)
     397            0 :     }
     398              : 
     399            0 :     pub fn update_cached_tenant_properties(&self, tenant_shard: &TenantShard) {
     400            0 :         let mut remote_size_mb = None;
     401            0 :         for timeline in tenant_shard.list_timelines() {
     402            0 :             let size = timeline.metrics.resident_physical_size_get();
     403            0 :             if size == 0 {
     404            0 :                 remote_size_mb = None;
     405            0 :             }
     406            0 :             if let Some(ref mut remote_size_mb) = remote_size_mb {
     407            0 :                 *remote_size_mb += size as f64 / 1024.0 / 1024.0;
     408            0 :             }
     409              :         }
     410            0 :         self.cached_tenant_properties.store(Arc::new(
     411            0 :             PerTenantProperties { remote_size_mb }.into_posthog_properties(),
     412            0 :         ));
     413            0 :     }
     414              : }
        

Generated by: LCOV version 2.1-beta