LCOV - code coverage report
Current view: top level - pageserver/src - feature_resolver.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 4.9 % 306 15
Test Date: 2025-07-16 12:29:03 Functions: 12.5 % 16 2

            Line data    Source code
       1              : use std::{
       2              :     collections::HashMap,
       3              :     sync::{Arc, atomic::AtomicBool},
       4              :     time::Duration,
       5              : };
       6              : 
       7              : use arc_swap::ArcSwap;
       8              : use pageserver_api::config::NodeMetadata;
       9              : use posthog_client_lite::{
      10              :     CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError,
      11              :     PostHogFlagFilterPropertyValue,
      12              : };
      13              : use rand::Rng;
      14              : use remote_storage::RemoteStorageKind;
      15              : use serde_json::json;
      16              : use tokio_util::sync::CancellationToken;
      17              : use utils::id::TenantId;
      18              : 
      19              : use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION, tenant::TenantShard};
      20              : 
      21              : const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
      22              : 
      23              : #[derive(Clone)]
      24              : pub struct FeatureResolver {
      25              :     inner: Option<Arc<FeatureResolverBackgroundLoop>>,
      26              :     internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
      27              :     force_overrides_for_testing: Arc<ArcSwap<HashMap<String, String>>>,
      28              : }
      29              : 
      30              : impl FeatureResolver {
      31          120 :     pub fn new_disabled() -> Self {
      32          120 :         Self {
      33          120 :             inner: None,
      34          120 :             internal_properties: None,
      35          120 :             force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
      36          120 :         }
      37          120 :     }
      38              : 
      39            0 :     pub fn update(&self, spec: String) -> anyhow::Result<()> {
      40            0 :         if let Some(inner) = &self.inner {
      41            0 :             inner.update(spec)?;
      42            0 :         }
      43            0 :         Ok(())
      44            0 :     }
      45              : 
      46            0 :     pub fn spawn(
      47            0 :         conf: &PageServerConf,
      48            0 :         shutdown_pageserver: CancellationToken,
      49            0 :         handle: &tokio::runtime::Handle,
      50            0 :     ) -> anyhow::Result<Self> {
      51              :         // DO NOT block in this function: make it return as fast as possible to avoid startup delays.
      52            0 :         if let Some(posthog_config) = &conf.posthog_config {
      53            0 :             let posthog_client_config = match posthog_config.clone().try_into_posthog_config() {
      54            0 :                 Ok(config) => config,
      55            0 :                 Err(e) => {
      56            0 :                     tracing::warn!(
      57            0 :                         "invalid posthog config, skipping posthog integration: {}",
      58              :                         e
      59              :                     );
      60            0 :                     return Ok(FeatureResolver {
      61            0 :                         inner: None,
      62            0 :                         internal_properties: None,
      63            0 :                         force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(
      64            0 :                             HashMap::new(),
      65            0 :                         ))),
      66            0 :                     });
      67              :                 }
      68              :             };
      69            0 :             let inner =
      70            0 :                 FeatureResolverBackgroundLoop::new(posthog_client_config, shutdown_pageserver);
      71            0 :             let inner = Arc::new(inner);
      72              : 
      73              :             // The properties shared by all tenants on this pageserver.
      74            0 :             let internal_properties = {
      75            0 :                 let mut properties = HashMap::new();
      76            0 :                 properties.insert(
      77            0 :                     "pageserver_id".to_string(),
      78            0 :                     PostHogFlagFilterPropertyValue::String(conf.id.to_string()),
      79              :                 );
      80            0 :                 if let Some(availability_zone) = &conf.availability_zone {
      81            0 :                     properties.insert(
      82            0 :                         "availability_zone".to_string(),
      83            0 :                         PostHogFlagFilterPropertyValue::String(availability_zone.clone()),
      84            0 :                     );
      85            0 :                 }
      86              :                 // Infer region based on the remote storage config.
      87            0 :                 if let Some(remote_storage) = &conf.remote_storage_config {
      88            0 :                     match &remote_storage.storage {
      89            0 :                         RemoteStorageKind::AwsS3(config) => {
      90            0 :                             properties.insert(
      91            0 :                                 "region".to_string(),
      92            0 :                                 PostHogFlagFilterPropertyValue::String(format!(
      93            0 :                                     "aws-{}",
      94            0 :                                     config.bucket_region
      95            0 :                                 )),
      96            0 :                             );
      97            0 :                         }
      98            0 :                         RemoteStorageKind::AzureContainer(config) => {
      99            0 :                             properties.insert(
     100            0 :                                 "region".to_string(),
     101            0 :                                 PostHogFlagFilterPropertyValue::String(format!(
     102            0 :                                     "azure-{}",
     103            0 :                                     config.container_region
     104            0 :                                 )),
     105            0 :                             );
     106            0 :                         }
     107            0 :                         RemoteStorageKind::LocalFs { .. } => {
     108            0 :                             properties.insert(
     109            0 :                                 "region".to_string(),
     110            0 :                                 PostHogFlagFilterPropertyValue::String("local".to_string()),
     111            0 :                             );
     112            0 :                         }
     113              :                     }
     114            0 :                 }
     115              :                 // TODO: move this to a background task so that we don't block startup in case of slow disk
     116            0 :                 let metadata_path = conf.metadata_path();
     117            0 :                 match std::fs::read_to_string(&metadata_path) {
     118            0 :                     Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
     119            0 :                         Ok(metadata) => {
     120            0 :                             properties.insert(
     121            0 :                                 "hostname".to_string(),
     122            0 :                                 PostHogFlagFilterPropertyValue::String(metadata.http_host),
     123              :                             );
     124            0 :                             if let Some(cplane_region) = metadata.other.get("region_id") {
     125            0 :                                 if let Some(cplane_region) = cplane_region.as_str() {
     126            0 :                                     // This region contains the cell number
     127            0 :                                     properties.insert(
     128            0 :                                         "neon_region".to_string(),
     129            0 :                                         PostHogFlagFilterPropertyValue::String(
     130            0 :                                             cplane_region.to_string(),
     131            0 :                                         ),
     132            0 :                                     );
     133            0 :                                 }
     134            0 :                             }
     135              :                         }
     136            0 :                         Err(e) => {
     137            0 :                             tracing::warn!("Failed to parse metadata.json: {}", e);
     138              :                         }
     139              :                     },
     140            0 :                     Err(e) => {
     141            0 :                         tracing::warn!("Failed to read metadata.json: {}", e);
     142              :                     }
     143              :                 }
     144            0 :                 Arc::new(properties)
     145              :             };
     146              : 
     147            0 :             let fake_tenants = {
     148            0 :                 let mut tenants = Vec::new();
     149            0 :                 for i in 0..10 {
     150            0 :                     let distinct_id = format!(
     151            0 :                         "fake_tenant_{}_{}_{}",
     152            0 :                         conf.availability_zone.as_deref().unwrap_or_default(),
     153            0 :                         conf.id,
     154            0 :                         i
     155            0 :                     );
     156            0 : 
     157            0 :                     let tenant_properties = PerTenantProperties {
     158            0 :                         remote_size_mb: Some(rand::thread_rng().gen_range(100.0..1000000.00)),
     159            0 :                     }
     160            0 :                     .into_posthog_properties();
     161            0 : 
     162            0 :                     let properties = Self::collect_properties_inner(
     163            0 :                         distinct_id.clone(),
     164            0 :                         Some(&internal_properties),
     165            0 :                         &tenant_properties,
     166            0 :                     );
     167            0 :                     tenants.push(CaptureEvent {
     168            0 :                         event: "initial_tenant_report".to_string(),
     169            0 :                         distinct_id,
     170            0 :                         properties: json!({ "$set": properties }), // use `$set` to set the person properties instead of the event properties
     171            0 :                     });
     172            0 :                 }
     173            0 :                 tenants
     174              :             };
     175            0 :             inner.clone().spawn(
     176            0 :                 handle,
     177            0 :                 posthog_config
     178            0 :                     .refresh_interval
     179            0 :                     .unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL),
     180            0 :                 fake_tenants,
     181              :             );
     182            0 :             Ok(FeatureResolver {
     183            0 :                 inner: Some(inner),
     184            0 :                 internal_properties: Some(internal_properties),
     185            0 :                 force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
     186            0 :             })
     187              :         } else {
     188            0 :             Ok(FeatureResolver {
     189            0 :                 inner: None,
     190            0 :                 internal_properties: None,
     191            0 :                 force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
     192            0 :             })
     193              :         }
     194            0 :     }
     195              : 
     196            0 :     fn collect_properties_inner(
     197            0 :         tenant_id: String,
     198            0 :         internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
     199            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     200            0 :     ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     201            0 :         let mut properties = HashMap::new();
     202            0 :         if let Some(internal_properties) = internal_properties {
     203            0 :             for (key, value) in internal_properties.iter() {
     204            0 :                 properties.insert(key.clone(), value.clone());
     205            0 :             }
     206            0 :         }
     207            0 :         properties.insert(
     208            0 :             "tenant_id".to_string(),
     209            0 :             PostHogFlagFilterPropertyValue::String(tenant_id),
     210              :         );
     211            0 :         for (key, value) in tenant_properties.iter() {
     212            0 :             properties.insert(key.clone(), value.clone());
     213            0 :         }
     214            0 :         properties
     215            0 :     }
     216              : 
     217              :     /// Collect all properties availble for the feature flag evaluation.
     218            0 :     pub(crate) fn collect_properties(
     219            0 :         &self,
     220            0 :         tenant_id: TenantId,
     221            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     222            0 :     ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     223            0 :         Self::collect_properties_inner(
     224            0 :             tenant_id.to_string(),
     225            0 :             self.internal_properties.as_deref(),
     226            0 :             tenant_properties,
     227              :         )
     228            0 :     }
     229              : 
     230              :     /// Evaluate a multivariate feature flag. Currently, we do not support any properties.
     231              :     ///
     232              :     /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
     233              :     /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
     234              :     /// propagated beyond where the feature flag gets resolved.
     235            0 :     pub fn evaluate_multivariate(
     236            0 :         &self,
     237            0 :         flag_key: &str,
     238            0 :         tenant_id: TenantId,
     239            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     240            0 :     ) -> Result<String, PostHogEvaluationError> {
     241            0 :         let force_overrides = self.force_overrides_for_testing.load();
     242            0 :         if let Some(value) = force_overrides.get(flag_key) {
     243            0 :             return Ok(value.clone());
     244            0 :         }
     245              : 
     246            0 :         if let Some(inner) = &self.inner {
     247            0 :             let res = inner.feature_store().evaluate_multivariate(
     248            0 :                 flag_key,
     249            0 :                 &tenant_id.to_string(),
     250            0 :                 &self.collect_properties(tenant_id, tenant_properties),
     251            0 :             );
     252            0 :             match &res {
     253            0 :                 Ok(value) => {
     254            0 :                     FEATURE_FLAG_EVALUATION
     255            0 :                         .with_label_values(&[flag_key, "ok", value])
     256            0 :                         .inc();
     257            0 :                 }
     258            0 :                 Err(e) => {
     259            0 :                     FEATURE_FLAG_EVALUATION
     260            0 :                         .with_label_values(&[flag_key, "error", e.as_variant_str()])
     261            0 :                         .inc();
     262            0 :                 }
     263              :             }
     264            0 :             res
     265              :         } else {
     266            0 :             Err(PostHogEvaluationError::NotAvailable(
     267            0 :                 "PostHog integration is not enabled".to_string(),
     268            0 :             ))
     269              :         }
     270            0 :     }
     271              : 
     272              :     /// Evaluate a boolean feature flag. Currently, we do not support any properties.
     273              :     ///
     274              :     /// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error.
     275              :     ///
     276              :     /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
     277              :     /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
     278              :     /// propagated beyond where the feature flag gets resolved.
     279            0 :     pub fn evaluate_boolean(
     280            0 :         &self,
     281            0 :         flag_key: &str,
     282            0 :         tenant_id: TenantId,
     283            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     284            0 :     ) -> Result<(), PostHogEvaluationError> {
     285            0 :         let force_overrides = self.force_overrides_for_testing.load();
     286            0 :         if let Some(value) = force_overrides.get(flag_key) {
     287            0 :             return if value == "true" {
     288            0 :                 Ok(())
     289              :             } else {
     290            0 :                 Err(PostHogEvaluationError::NoConditionGroupMatched)
     291              :             };
     292            0 :         }
     293              : 
     294            0 :         if let Some(inner) = &self.inner {
     295            0 :             let res = inner.feature_store().evaluate_boolean(
     296            0 :                 flag_key,
     297            0 :                 &tenant_id.to_string(),
     298            0 :                 &self.collect_properties(tenant_id, tenant_properties),
     299            0 :             );
     300            0 :             match &res {
     301            0 :                 Ok(()) => {
     302            0 :                     FEATURE_FLAG_EVALUATION
     303            0 :                         .with_label_values(&[flag_key, "ok", "true"])
     304            0 :                         .inc();
     305            0 :                 }
     306            0 :                 Err(e) => {
     307            0 :                     FEATURE_FLAG_EVALUATION
     308            0 :                         .with_label_values(&[flag_key, "error", e.as_variant_str()])
     309            0 :                         .inc();
     310            0 :                 }
     311              :             }
     312            0 :             res
     313              :         } else {
     314            0 :             Err(PostHogEvaluationError::NotAvailable(
     315            0 :                 "PostHog integration is not enabled".to_string(),
     316            0 :             ))
     317              :         }
     318            0 :     }
     319              : 
     320            0 :     pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
     321            0 :         if let Some(inner) = &self.inner {
     322            0 :             inner.feature_store().is_feature_flag_boolean(flag_key)
     323              :         } else {
     324            0 :             Err(PostHogEvaluationError::NotAvailable(
     325            0 :                 "PostHog integration is not enabled, cannot auto-determine the flag type"
     326            0 :                     .to_string(),
     327            0 :             ))
     328              :         }
     329            0 :     }
     330              : 
     331              :     /// Force override a feature flag for testing. This is only for testing purposes. Assume the caller only call it
     332              :     /// from a single thread so it won't race.
     333            0 :     pub fn force_override_for_testing(&self, flag_key: &str, value: Option<&str>) {
     334            0 :         let mut force_overrides = self.force_overrides_for_testing.load().as_ref().clone();
     335            0 :         if let Some(value) = value {
     336            0 :             force_overrides.insert(flag_key.to_string(), value.to_string());
     337            0 :         } else {
     338            0 :             force_overrides.remove(flag_key);
     339            0 :         }
     340            0 :         self.force_overrides_for_testing
     341            0 :             .store(Arc::new(force_overrides));
     342            0 :     }
     343              : }
     344              : 
     345              : struct PerTenantProperties {
     346              :     pub remote_size_mb: Option<f64>,
     347              : }
     348              : 
     349              : impl PerTenantProperties {
     350            0 :     pub fn into_posthog_properties(self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     351            0 :         let mut properties = HashMap::new();
     352            0 :         if let Some(remote_size_mb) = self.remote_size_mb {
     353            0 :             properties.insert(
     354            0 :                 "tenant_remote_size_mb".to_string(),
     355            0 :                 PostHogFlagFilterPropertyValue::Number(remote_size_mb),
     356            0 :             );
     357            0 :         }
     358            0 :         properties
     359            0 :     }
     360              : }
     361              : 
     362              : pub struct TenantFeatureResolver {
     363              :     inner: FeatureResolver,
     364              :     tenant_id: TenantId,
     365              :     cached_tenant_properties: ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>,
     366              : 
     367              :     // Add feature flag on the critical path below.
     368              :     //
     369              :     // If a feature flag will be used on the critical path, we will update it in the tenant housekeeping loop insetad of
     370              :     // resolving directly by calling `evaluate_multivariate` or `evaluate_boolean`. Remember to update the flag in the
     371              :     // housekeeping loop. The user should directly read this atomic flag instead of using the set of evaluate functions.
     372              :     pub feature_test_remote_size_flag: AtomicBool,
     373              : }
     374              : 
     375              : impl TenantFeatureResolver {
     376          119 :     pub fn new(inner: FeatureResolver, tenant_id: TenantId) -> Self {
     377          119 :         Self {
     378          119 :             inner,
     379          119 :             tenant_id,
     380          119 :             cached_tenant_properties: ArcSwap::new(Arc::new(HashMap::new())),
     381          119 :             feature_test_remote_size_flag: AtomicBool::new(false),
     382          119 :         }
     383          119 :     }
     384              : 
     385            0 :     pub fn evaluate_multivariate(&self, flag_key: &str) -> Result<String, PostHogEvaluationError> {
     386            0 :         self.inner.evaluate_multivariate(
     387            0 :             flag_key,
     388            0 :             self.tenant_id,
     389            0 :             &self.cached_tenant_properties.load(),
     390              :         )
     391            0 :     }
     392              : 
     393            0 :     pub fn evaluate_boolean(&self, flag_key: &str) -> Result<(), PostHogEvaluationError> {
     394            0 :         self.inner.evaluate_boolean(
     395            0 :             flag_key,
     396            0 :             self.tenant_id,
     397            0 :             &self.cached_tenant_properties.load(),
     398              :         )
     399            0 :     }
     400              : 
     401            0 :     pub fn collect_properties(&self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     402            0 :         self.inner
     403            0 :             .collect_properties(self.tenant_id, &self.cached_tenant_properties.load())
     404            0 :     }
     405              : 
     406            0 :     pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
     407            0 :         self.inner.is_feature_flag_boolean(flag_key)
     408            0 :     }
     409              : 
     410              :     /// Refresh the cached properties and flags on the critical path.
     411            0 :     pub fn refresh_properties_and_flags(&self, tenant_shard: &TenantShard) {
     412            0 :         let mut remote_size_mb = Some(0.0);
     413            0 :         for timeline in tenant_shard.list_timelines() {
     414            0 :             let size = timeline.metrics.resident_physical_size_get();
     415            0 :             if size == 0 {
     416            0 :                 remote_size_mb = None;
     417            0 :                 break;
     418            0 :             }
     419            0 :             if let Some(ref mut remote_size_mb) = remote_size_mb {
     420            0 :                 *remote_size_mb += size as f64 / 1024.0 / 1024.0;
     421            0 :             }
     422              :         }
     423            0 :         self.cached_tenant_properties.store(Arc::new(
     424            0 :             PerTenantProperties { remote_size_mb }.into_posthog_properties(),
     425              :         ));
     426              : 
     427              :         // BEGIN: Update the feature flag on the critical path.
     428            0 :         self.feature_test_remote_size_flag.store(
     429            0 :             self.evaluate_boolean("test-remote-size-flag").is_ok(),
     430            0 :             std::sync::atomic::Ordering::Relaxed,
     431              :         );
     432              :         // END: Update the feature flag on the critical path.
     433            0 :     }
     434              : }
        

Generated by: LCOV version 2.1-beta