LCOV - code coverage report
Current view: top level - pageserver/src - feature_resolver.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 4.4 % 338 15
Test Date: 2025-07-26 17:20:05 Functions: 11.1 % 18 2

            Line data    Source code
       1              : use std::{
       2              :     collections::HashMap,
       3              :     sync::{Arc, atomic::AtomicBool},
       4              :     time::Duration,
       5              : };
       6              : 
       7              : use arc_swap::ArcSwap;
       8              : use pageserver_api::config::NodeMetadata;
       9              : use posthog_client_lite::{
      10              :     CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError,
      11              :     PostHogFlagFilterPropertyValue,
      12              : };
      13              : use rand::Rng;
      14              : use remote_storage::RemoteStorageKind;
      15              : use serde_json::json;
      16              : use tokio_util::sync::CancellationToken;
      17              : use utils::id::TenantId;
      18              : 
      19              : use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION, tenant::TenantShard};
      20              : 
      21              : const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
      22              : 
      23              : #[derive(Clone)]
      24              : pub struct FeatureResolver {
      25              :     inner: Option<Arc<FeatureResolverBackgroundLoop>>,
      26              :     internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
      27              :     force_overrides_for_testing: Arc<ArcSwap<HashMap<String, String>>>,
      28              : }
      29              : 
      30              : impl FeatureResolver {
      31          120 :     pub fn new_disabled() -> Self {
      32          120 :         Self {
      33          120 :             inner: None,
      34          120 :             internal_properties: None,
      35          120 :             force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
      36          120 :         }
      37          120 :     }
      38              : 
      39            0 :     pub fn update(&self, spec: String) -> anyhow::Result<()> {
      40            0 :         if let Some(inner) = &self.inner {
      41            0 :             inner.update(spec)?;
      42            0 :         }
      43            0 :         Ok(())
      44            0 :     }
      45              : 
      46            0 :     pub fn spawn(
      47            0 :         conf: &PageServerConf,
      48            0 :         shutdown_pageserver: CancellationToken,
      49            0 :         handle: &tokio::runtime::Handle,
      50            0 :     ) -> anyhow::Result<Self> {
      51              :         // DO NOT block in this function: make it return as fast as possible to avoid startup delays.
      52            0 :         if let Some(posthog_config) = &conf.posthog_config {
      53            0 :             let posthog_client_config = match posthog_config.clone().try_into_posthog_config() {
      54            0 :                 Ok(config) => config,
      55            0 :                 Err(e) => {
      56            0 :                     tracing::warn!(
      57            0 :                         "invalid posthog config, skipping posthog integration: {}",
      58              :                         e
      59              :                     );
      60            0 :                     return Ok(FeatureResolver {
      61            0 :                         inner: None,
      62            0 :                         internal_properties: None,
      63            0 :                         force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(
      64            0 :                             HashMap::new(),
      65            0 :                         ))),
      66            0 :                     });
      67              :                 }
      68              :             };
      69            0 :             let inner =
      70            0 :                 FeatureResolverBackgroundLoop::new(posthog_client_config, shutdown_pageserver);
      71            0 :             let inner = Arc::new(inner);
      72              : 
      73              :             // The properties shared by all tenants on this pageserver.
      74            0 :             let internal_properties = {
      75            0 :                 let mut properties = HashMap::new();
      76            0 :                 properties.insert(
      77            0 :                     "pageserver_id".to_string(),
      78            0 :                     PostHogFlagFilterPropertyValue::String(conf.id.to_string()),
      79              :                 );
      80            0 :                 if let Some(availability_zone) = &conf.availability_zone {
      81            0 :                     properties.insert(
      82            0 :                         "availability_zone".to_string(),
      83            0 :                         PostHogFlagFilterPropertyValue::String(availability_zone.clone()),
      84            0 :                     );
      85            0 :                 }
      86              :                 // Infer region based on the remote storage config.
      87            0 :                 if let Some(remote_storage) = &conf.remote_storage_config {
      88            0 :                     match &remote_storage.storage {
      89            0 :                         RemoteStorageKind::AwsS3(config) => {
      90            0 :                             properties.insert(
      91            0 :                                 "region".to_string(),
      92            0 :                                 PostHogFlagFilterPropertyValue::String(format!(
      93            0 :                                     "aws-{}",
      94            0 :                                     config.bucket_region
      95            0 :                                 )),
      96            0 :                             );
      97            0 :                         }
      98            0 :                         RemoteStorageKind::AzureContainer(config) => {
      99            0 :                             properties.insert(
     100            0 :                                 "region".to_string(),
     101            0 :                                 PostHogFlagFilterPropertyValue::String(format!(
     102            0 :                                     "azure-{}",
     103            0 :                                     config.container_region
     104            0 :                                 )),
     105            0 :                             );
     106            0 :                         }
     107            0 :                         RemoteStorageKind::LocalFs { .. } => {
     108            0 :                             properties.insert(
     109            0 :                                 "region".to_string(),
     110            0 :                                 PostHogFlagFilterPropertyValue::String("local".to_string()),
     111            0 :                             );
     112            0 :                         }
     113              :                     }
     114            0 :                 }
     115              :                 // TODO: move this to a background task so that we don't block startup in case of slow disk
     116            0 :                 let metadata_path = conf.metadata_path();
     117            0 :                 match std::fs::read_to_string(&metadata_path) {
     118            0 :                     Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
     119            0 :                         Ok(metadata) => {
     120            0 :                             properties.insert(
     121            0 :                                 "hostname".to_string(),
     122            0 :                                 PostHogFlagFilterPropertyValue::String(metadata.http_host),
     123              :                             );
     124            0 :                             if let Some(cplane_region) = metadata.other.get("region_id") {
     125            0 :                                 if let Some(cplane_region) = cplane_region.as_str() {
     126            0 :                                     // This region contains the cell number
     127            0 :                                     properties.insert(
     128            0 :                                         "neon_region".to_string(),
     129            0 :                                         PostHogFlagFilterPropertyValue::String(
     130            0 :                                             cplane_region.to_string(),
     131            0 :                                         ),
     132            0 :                                     );
     133            0 :                                 }
     134            0 :                             }
     135              :                         }
     136            0 :                         Err(e) => {
     137            0 :                             tracing::warn!("Failed to parse metadata.json: {}", e);
     138              :                         }
     139              :                     },
     140            0 :                     Err(e) => {
     141            0 :                         tracing::warn!("Failed to read metadata.json: {}", e);
     142              :                     }
     143              :                 }
     144            0 :                 Arc::new(properties)
     145              :             };
     146              : 
     147            0 :             let fake_tenants = {
     148            0 :                 let mut tenants = Vec::new();
     149            0 :                 for i in 0..10 {
     150            0 :                     let distinct_id = format!(
     151            0 :                         "fake_tenant_{}_{}_{}",
     152            0 :                         conf.availability_zone.as_deref().unwrap_or_default(),
     153            0 :                         conf.id,
     154            0 :                         i
     155            0 :                     );
     156            0 : 
     157            0 :                     let tenant_properties = PerTenantProperties {
     158            0 :                         remote_size_mb: Some(rand::rng().random_range(100.0..1000000.00)),
     159            0 :                         db_count_max: Some(rand::rng().random_range(1..1000)),
     160            0 :                         rel_count_max: Some(rand::rng().random_range(1..1000)),
     161            0 :                     }
     162            0 :                     .into_posthog_properties();
     163            0 : 
     164            0 :                     let properties = Self::collect_properties_inner(
     165            0 :                         distinct_id.clone(),
     166            0 :                         Some(&internal_properties),
     167            0 :                         &tenant_properties,
     168            0 :                     );
     169            0 :                     tenants.push(CaptureEvent {
     170            0 :                         event: "initial_tenant_report".to_string(),
     171            0 :                         distinct_id,
     172            0 :                         properties: json!({ "$set": properties }), // use `$set` to set the person properties instead of the event properties
     173            0 :                     });
     174            0 :                 }
     175            0 :                 tenants
     176              :             };
     177            0 :             inner.clone().spawn(
     178            0 :                 handle,
     179            0 :                 posthog_config
     180            0 :                     .refresh_interval
     181            0 :                     .unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL),
     182            0 :                 fake_tenants,
     183              :             );
     184            0 :             Ok(FeatureResolver {
     185            0 :                 inner: Some(inner),
     186            0 :                 internal_properties: Some(internal_properties),
     187            0 :                 force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
     188            0 :             })
     189              :         } else {
     190            0 :             Ok(FeatureResolver {
     191            0 :                 inner: None,
     192            0 :                 internal_properties: None,
     193            0 :                 force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
     194            0 :             })
     195              :         }
     196            0 :     }
     197              : 
     198            0 :     fn collect_properties_inner(
     199            0 :         tenant_id: String,
     200            0 :         internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
     201            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     202            0 :     ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     203            0 :         let mut properties = HashMap::new();
     204            0 :         if let Some(internal_properties) = internal_properties {
     205            0 :             for (key, value) in internal_properties.iter() {
     206            0 :                 properties.insert(key.clone(), value.clone());
     207            0 :             }
     208            0 :         }
     209            0 :         properties.insert(
     210            0 :             "tenant_id".to_string(),
     211            0 :             PostHogFlagFilterPropertyValue::String(tenant_id),
     212              :         );
     213            0 :         for (key, value) in tenant_properties.iter() {
     214            0 :             properties.insert(key.clone(), value.clone());
     215            0 :         }
     216            0 :         properties
     217            0 :     }
     218              : 
     219              :     /// Collect all properties availble for the feature flag evaluation.
     220            0 :     pub(crate) fn collect_properties(
     221            0 :         &self,
     222            0 :         tenant_id: TenantId,
     223            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     224            0 :     ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     225            0 :         Self::collect_properties_inner(
     226            0 :             tenant_id.to_string(),
     227            0 :             self.internal_properties.as_deref(),
     228            0 :             tenant_properties,
     229              :         )
     230            0 :     }
     231              : 
     232              :     /// Evaluate a multivariate feature flag. Currently, we do not support any properties.
     233              :     ///
     234              :     /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
     235              :     /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
     236              :     /// propagated beyond where the feature flag gets resolved.
     237            0 :     pub fn evaluate_multivariate(
     238            0 :         &self,
     239            0 :         flag_key: &str,
     240            0 :         tenant_id: TenantId,
     241            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     242            0 :     ) -> Result<String, PostHogEvaluationError> {
     243            0 :         let force_overrides = self.force_overrides_for_testing.load();
     244            0 :         if let Some(value) = force_overrides.get(flag_key) {
     245            0 :             return Ok(value.clone());
     246            0 :         }
     247              : 
     248            0 :         if let Some(inner) = &self.inner {
     249            0 :             let res = inner.feature_store().evaluate_multivariate(
     250            0 :                 flag_key,
     251            0 :                 &tenant_id.to_string(),
     252            0 :                 &self.collect_properties(tenant_id, tenant_properties),
     253            0 :             );
     254            0 :             match &res {
     255            0 :                 Ok(value) => {
     256            0 :                     FEATURE_FLAG_EVALUATION
     257            0 :                         .with_label_values(&[flag_key, "ok", value])
     258            0 :                         .inc();
     259            0 :                 }
     260            0 :                 Err(e) => {
     261            0 :                     FEATURE_FLAG_EVALUATION
     262            0 :                         .with_label_values(&[flag_key, "error", e.as_variant_str()])
     263            0 :                         .inc();
     264            0 :                 }
     265              :             }
     266            0 :             res
     267              :         } else {
     268            0 :             Err(PostHogEvaluationError::NotAvailable(
     269            0 :                 "PostHog integration is not enabled".to_string(),
     270            0 :             ))
     271              :         }
     272            0 :     }
     273              : 
     274              :     /// Evaluate a boolean feature flag. Currently, we do not support any properties.
     275              :     ///
     276              :     /// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error.
     277              :     ///
     278              :     /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
     279              :     /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
     280              :     /// propagated beyond where the feature flag gets resolved.
     281            0 :     pub fn evaluate_boolean(
     282            0 :         &self,
     283            0 :         flag_key: &str,
     284            0 :         tenant_id: TenantId,
     285            0 :         tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
     286            0 :     ) -> Result<(), PostHogEvaluationError> {
     287            0 :         let force_overrides = self.force_overrides_for_testing.load();
     288            0 :         if let Some(value) = force_overrides.get(flag_key) {
     289            0 :             return if value == "true" {
     290            0 :                 Ok(())
     291              :             } else {
     292            0 :                 Err(PostHogEvaluationError::NoConditionGroupMatched)
     293              :             };
     294            0 :         }
     295              : 
     296            0 :         if let Some(inner) = &self.inner {
     297            0 :             let res = inner.feature_store().evaluate_boolean(
     298            0 :                 flag_key,
     299            0 :                 &tenant_id.to_string(),
     300            0 :                 &self.collect_properties(tenant_id, tenant_properties),
     301            0 :             );
     302            0 :             match &res {
     303            0 :                 Ok(()) => {
     304            0 :                     FEATURE_FLAG_EVALUATION
     305            0 :                         .with_label_values(&[flag_key, "ok", "true"])
     306            0 :                         .inc();
     307            0 :                 }
     308            0 :                 Err(e) => {
     309            0 :                     FEATURE_FLAG_EVALUATION
     310            0 :                         .with_label_values(&[flag_key, "error", e.as_variant_str()])
     311            0 :                         .inc();
     312            0 :                 }
     313              :             }
     314            0 :             res
     315              :         } else {
     316            0 :             Err(PostHogEvaluationError::NotAvailable(
     317            0 :                 "PostHog integration is not enabled".to_string(),
     318            0 :             ))
     319              :         }
     320            0 :     }
     321              : 
     322            0 :     pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
     323            0 :         if let Some(inner) = &self.inner {
     324            0 :             inner.feature_store().is_feature_flag_boolean(flag_key)
     325              :         } else {
     326            0 :             Err(PostHogEvaluationError::NotAvailable(
     327            0 :                 "PostHog integration is not enabled, cannot auto-determine the flag type"
     328            0 :                     .to_string(),
     329            0 :             ))
     330              :         }
     331            0 :     }
     332              : 
     333              :     /// Force override a feature flag for testing. This is only for testing purposes. Assume the caller only call it
     334              :     /// from a single thread so it won't race.
     335            0 :     pub fn force_override_for_testing(&self, flag_key: &str, value: Option<&str>) {
     336            0 :         let mut force_overrides = self.force_overrides_for_testing.load().as_ref().clone();
     337            0 :         if let Some(value) = value {
     338            0 :             force_overrides.insert(flag_key.to_string(), value.to_string());
     339            0 :         } else {
     340            0 :             force_overrides.remove(flag_key);
     341            0 :         }
     342            0 :         self.force_overrides_for_testing
     343            0 :             .store(Arc::new(force_overrides));
     344            0 :     }
     345              : }
     346              : 
     347              : struct PerTenantProperties {
     348              :     pub remote_size_mb: Option<f64>,
     349              :     pub db_count_max: Option<usize>,
     350              :     pub rel_count_max: Option<usize>,
     351              : }
     352              : 
     353              : impl PerTenantProperties {
     354            0 :     pub fn into_posthog_properties(self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     355            0 :         let mut properties = HashMap::new();
     356            0 :         if let Some(remote_size_mb) = self.remote_size_mb {
     357            0 :             properties.insert(
     358            0 :                 "tenant_remote_size_mb".to_string(),
     359            0 :                 PostHogFlagFilterPropertyValue::Number(remote_size_mb),
     360            0 :             );
     361            0 :         }
     362            0 :         if let Some(db_count) = self.db_count_max {
     363            0 :             properties.insert(
     364            0 :                 "tenant_db_count_max".to_string(),
     365            0 :                 PostHogFlagFilterPropertyValue::Number(db_count as f64),
     366            0 :             );
     367            0 :         }
     368            0 :         if let Some(rel_count) = self.rel_count_max {
     369            0 :             properties.insert(
     370            0 :                 "tenant_rel_count_max".to_string(),
     371            0 :                 PostHogFlagFilterPropertyValue::Number(rel_count as f64),
     372            0 :             );
     373            0 :         }
     374            0 :         properties
     375            0 :     }
     376              : }
     377              : 
     378              : pub struct TenantFeatureResolver {
     379              :     inner: FeatureResolver,
     380              :     tenant_id: TenantId,
     381              :     cached_tenant_properties: ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>,
     382              : 
     383              :     // Add feature flag on the critical path below.
     384              :     //
     385              :     // If a feature flag will be used on the critical path, we will update it in the tenant housekeeping loop insetad of
     386              :     // resolving directly by calling `evaluate_multivariate` or `evaluate_boolean`. Remember to update the flag in the
     387              :     // housekeeping loop. The user should directly read this atomic flag instead of using the set of evaluate functions.
     388              :     pub feature_test_remote_size_flag: AtomicBool,
     389              : }
     390              : 
     391              : impl TenantFeatureResolver {
     392          119 :     pub fn new(inner: FeatureResolver, tenant_id: TenantId) -> Self {
     393          119 :         Self {
     394          119 :             inner,
     395          119 :             tenant_id,
     396          119 :             cached_tenant_properties: ArcSwap::new(Arc::new(HashMap::new())),
     397          119 :             feature_test_remote_size_flag: AtomicBool::new(false),
     398          119 :         }
     399          119 :     }
     400              : 
     401            0 :     pub fn evaluate_multivariate(&self, flag_key: &str) -> Result<String, PostHogEvaluationError> {
     402            0 :         self.inner.evaluate_multivariate(
     403            0 :             flag_key,
     404            0 :             self.tenant_id,
     405            0 :             &self.cached_tenant_properties.load(),
     406              :         )
     407            0 :     }
     408              : 
     409            0 :     pub fn evaluate_boolean(&self, flag_key: &str) -> Result<(), PostHogEvaluationError> {
     410            0 :         self.inner.evaluate_boolean(
     411            0 :             flag_key,
     412            0 :             self.tenant_id,
     413            0 :             &self.cached_tenant_properties.load(),
     414              :         )
     415            0 :     }
     416              : 
     417            0 :     pub fn collect_properties(&self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
     418            0 :         self.inner
     419            0 :             .collect_properties(self.tenant_id, &self.cached_tenant_properties.load())
     420            0 :     }
     421              : 
     422            0 :     pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
     423            0 :         self.inner.is_feature_flag_boolean(flag_key)
     424            0 :     }
     425              : 
     426              :     /// Refresh the cached properties and flags on the critical path.
     427            0 :     pub fn refresh_properties_and_flags(&self, tenant_shard: &TenantShard) {
     428              :         // Any of the remote size is none => this property is none.
     429            0 :         let mut remote_size_mb = Some(0.0);
     430              :         // Any of the db or rel count is available => this property is available.
     431            0 :         let mut db_count_max = None;
     432            0 :         let mut rel_count_max = None;
     433            0 :         for timeline in tenant_shard.list_timelines() {
     434            0 :             let size = timeline.metrics.resident_physical_size_get();
     435            0 :             if size == 0 {
     436            0 :                 remote_size_mb = None;
     437            0 :                 break;
     438            0 :             }
     439            0 :             if let Some(ref mut remote_size_mb) = remote_size_mb {
     440            0 :                 *remote_size_mb += size as f64 / 1024.0 / 1024.0;
     441            0 :             }
     442            0 :             if let Some(data) = timeline.db_rel_count.load_full() {
     443            0 :                 let (db_count, rel_count) = *data.as_ref();
     444            0 :                 if db_count_max.is_none() {
     445            0 :                     db_count_max = Some(db_count);
     446            0 :                 }
     447            0 :                 if rel_count_max.is_none() {
     448            0 :                     rel_count_max = Some(rel_count);
     449            0 :                 }
     450            0 :                 db_count_max = db_count_max.map(|max| max.max(db_count));
     451            0 :                 rel_count_max = rel_count_max.map(|max| max.max(rel_count));
     452            0 :             }
     453              :         }
     454            0 :         self.cached_tenant_properties.store(Arc::new(
     455            0 :             PerTenantProperties {
     456            0 :                 remote_size_mb,
     457            0 :                 db_count_max,
     458            0 :                 rel_count_max,
     459            0 :             }
     460            0 :             .into_posthog_properties(),
     461              :         ));
     462              : 
     463              :         // BEGIN: Update the feature flag on the critical path.
     464            0 :         self.feature_test_remote_size_flag.store(
     465            0 :             self.evaluate_boolean("test-remote-size-flag").is_ok(),
     466            0 :             std::sync::atomic::Ordering::Relaxed,
     467              :         );
     468              :         // END: Update the feature flag on the critical path.
     469            0 :     }
     470              : }
        

Generated by: LCOV version 2.1-beta