Line data Source code
1 : use std::{
2 : collections::HashMap,
3 : sync::{Arc, atomic::AtomicBool},
4 : time::Duration,
5 : };
6 :
7 : use arc_swap::ArcSwap;
8 : use pageserver_api::config::NodeMetadata;
9 : use posthog_client_lite::{
10 : CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError,
11 : PostHogFlagFilterPropertyValue,
12 : };
13 : use rand::Rng;
14 : use remote_storage::RemoteStorageKind;
15 : use serde_json::json;
16 : use tokio_util::sync::CancellationToken;
17 : use utils::id::TenantId;
18 :
19 : use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION, tenant::TenantShard};
20 :
21 : const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
22 :
23 : #[derive(Clone)]
24 : pub struct FeatureResolver {
25 : inner: Option<Arc<FeatureResolverBackgroundLoop>>,
26 : internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
27 : force_overrides_for_testing: Arc<ArcSwap<HashMap<String, String>>>,
28 : }
29 :
30 : impl FeatureResolver {
31 120 : pub fn new_disabled() -> Self {
32 120 : Self {
33 120 : inner: None,
34 120 : internal_properties: None,
35 120 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
36 120 : }
37 120 : }
38 :
39 0 : pub fn update(&self, spec: String) -> anyhow::Result<()> {
40 0 : if let Some(inner) = &self.inner {
41 0 : inner.update(spec)?;
42 0 : }
43 0 : Ok(())
44 0 : }
45 :
46 0 : pub fn spawn(
47 0 : conf: &PageServerConf,
48 0 : shutdown_pageserver: CancellationToken,
49 0 : handle: &tokio::runtime::Handle,
50 0 : ) -> anyhow::Result<Self> {
51 : // DO NOT block in this function: make it return as fast as possible to avoid startup delays.
52 0 : if let Some(posthog_config) = &conf.posthog_config {
53 0 : let posthog_client_config = match posthog_config.clone().try_into_posthog_config() {
54 0 : Ok(config) => config,
55 0 : Err(e) => {
56 0 : tracing::warn!(
57 0 : "invalid posthog config, skipping posthog integration: {}",
58 : e
59 : );
60 0 : return Ok(FeatureResolver {
61 0 : inner: None,
62 0 : internal_properties: None,
63 0 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(
64 0 : HashMap::new(),
65 0 : ))),
66 0 : });
67 : }
68 : };
69 0 : let inner =
70 0 : FeatureResolverBackgroundLoop::new(posthog_client_config, shutdown_pageserver);
71 0 : let inner = Arc::new(inner);
72 :
73 : // The properties shared by all tenants on this pageserver.
74 0 : let internal_properties = {
75 0 : let mut properties = HashMap::new();
76 0 : properties.insert(
77 0 : "pageserver_id".to_string(),
78 0 : PostHogFlagFilterPropertyValue::String(conf.id.to_string()),
79 : );
80 0 : if let Some(availability_zone) = &conf.availability_zone {
81 0 : properties.insert(
82 0 : "availability_zone".to_string(),
83 0 : PostHogFlagFilterPropertyValue::String(availability_zone.clone()),
84 0 : );
85 0 : }
86 : // Infer region based on the remote storage config.
87 0 : if let Some(remote_storage) = &conf.remote_storage_config {
88 0 : match &remote_storage.storage {
89 0 : RemoteStorageKind::AwsS3(config) => {
90 0 : properties.insert(
91 0 : "region".to_string(),
92 0 : PostHogFlagFilterPropertyValue::String(format!(
93 0 : "aws-{}",
94 0 : config.bucket_region
95 0 : )),
96 0 : );
97 0 : }
98 0 : RemoteStorageKind::AzureContainer(config) => {
99 0 : properties.insert(
100 0 : "region".to_string(),
101 0 : PostHogFlagFilterPropertyValue::String(format!(
102 0 : "azure-{}",
103 0 : config.container_region
104 0 : )),
105 0 : );
106 0 : }
107 0 : RemoteStorageKind::LocalFs { .. } => {
108 0 : properties.insert(
109 0 : "region".to_string(),
110 0 : PostHogFlagFilterPropertyValue::String("local".to_string()),
111 0 : );
112 0 : }
113 : }
114 0 : }
115 : // TODO: move this to a background task so that we don't block startup in case of slow disk
116 0 : let metadata_path = conf.metadata_path();
117 0 : match std::fs::read_to_string(&metadata_path) {
118 0 : Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
119 0 : Ok(metadata) => {
120 0 : properties.insert(
121 0 : "hostname".to_string(),
122 0 : PostHogFlagFilterPropertyValue::String(metadata.http_host),
123 : );
124 0 : if let Some(cplane_region) = metadata.other.get("region_id") {
125 0 : if let Some(cplane_region) = cplane_region.as_str() {
126 0 : // This region contains the cell number
127 0 : properties.insert(
128 0 : "neon_region".to_string(),
129 0 : PostHogFlagFilterPropertyValue::String(
130 0 : cplane_region.to_string(),
131 0 : ),
132 0 : );
133 0 : }
134 0 : }
135 : }
136 0 : Err(e) => {
137 0 : tracing::warn!("Failed to parse metadata.json: {}", e);
138 : }
139 : },
140 0 : Err(e) => {
141 0 : tracing::warn!("Failed to read metadata.json: {}", e);
142 : }
143 : }
144 0 : Arc::new(properties)
145 : };
146 :
147 0 : let fake_tenants = {
148 0 : let mut tenants = Vec::new();
149 0 : for i in 0..10 {
150 0 : let distinct_id = format!(
151 0 : "fake_tenant_{}_{}_{}",
152 0 : conf.availability_zone.as_deref().unwrap_or_default(),
153 0 : conf.id,
154 0 : i
155 0 : );
156 0 :
157 0 : let tenant_properties = PerTenantProperties {
158 0 : remote_size_mb: Some(rand::rng().random_range(100.0..1000000.00)),
159 0 : db_count_max: Some(rand::rng().random_range(1..1000)),
160 0 : rel_count_max: Some(rand::rng().random_range(1..1000)),
161 0 : }
162 0 : .into_posthog_properties();
163 0 :
164 0 : let properties = Self::collect_properties_inner(
165 0 : distinct_id.clone(),
166 0 : Some(&internal_properties),
167 0 : &tenant_properties,
168 0 : );
169 0 : tenants.push(CaptureEvent {
170 0 : event: "initial_tenant_report".to_string(),
171 0 : distinct_id,
172 0 : properties: json!({ "$set": properties }), // use `$set` to set the person properties instead of the event properties
173 0 : });
174 0 : }
175 0 : tenants
176 : };
177 0 : inner.clone().spawn(
178 0 : handle,
179 0 : posthog_config
180 0 : .refresh_interval
181 0 : .unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL),
182 0 : fake_tenants,
183 : );
184 0 : Ok(FeatureResolver {
185 0 : inner: Some(inner),
186 0 : internal_properties: Some(internal_properties),
187 0 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
188 0 : })
189 : } else {
190 0 : Ok(FeatureResolver {
191 0 : inner: None,
192 0 : internal_properties: None,
193 0 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
194 0 : })
195 : }
196 0 : }
197 :
198 0 : fn collect_properties_inner(
199 0 : tenant_id: String,
200 0 : internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
201 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
202 0 : ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
203 0 : let mut properties = HashMap::new();
204 0 : if let Some(internal_properties) = internal_properties {
205 0 : for (key, value) in internal_properties.iter() {
206 0 : properties.insert(key.clone(), value.clone());
207 0 : }
208 0 : }
209 0 : properties.insert(
210 0 : "tenant_id".to_string(),
211 0 : PostHogFlagFilterPropertyValue::String(tenant_id),
212 : );
213 0 : for (key, value) in tenant_properties.iter() {
214 0 : properties.insert(key.clone(), value.clone());
215 0 : }
216 0 : properties
217 0 : }
218 :
219 : /// Collect all properties availble for the feature flag evaluation.
220 0 : pub(crate) fn collect_properties(
221 0 : &self,
222 0 : tenant_id: TenantId,
223 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
224 0 : ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
225 0 : Self::collect_properties_inner(
226 0 : tenant_id.to_string(),
227 0 : self.internal_properties.as_deref(),
228 0 : tenant_properties,
229 : )
230 0 : }
231 :
232 : /// Evaluate a multivariate feature flag. Currently, we do not support any properties.
233 : ///
234 : /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
235 : /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
236 : /// propagated beyond where the feature flag gets resolved.
237 0 : pub fn evaluate_multivariate(
238 0 : &self,
239 0 : flag_key: &str,
240 0 : tenant_id: TenantId,
241 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
242 0 : ) -> Result<String, PostHogEvaluationError> {
243 0 : let force_overrides = self.force_overrides_for_testing.load();
244 0 : if let Some(value) = force_overrides.get(flag_key) {
245 0 : return Ok(value.clone());
246 0 : }
247 :
248 0 : if let Some(inner) = &self.inner {
249 0 : let res = inner.feature_store().evaluate_multivariate(
250 0 : flag_key,
251 0 : &tenant_id.to_string(),
252 0 : &self.collect_properties(tenant_id, tenant_properties),
253 0 : );
254 0 : match &res {
255 0 : Ok(value) => {
256 0 : FEATURE_FLAG_EVALUATION
257 0 : .with_label_values(&[flag_key, "ok", value])
258 0 : .inc();
259 0 : }
260 0 : Err(e) => {
261 0 : FEATURE_FLAG_EVALUATION
262 0 : .with_label_values(&[flag_key, "error", e.as_variant_str()])
263 0 : .inc();
264 0 : }
265 : }
266 0 : res
267 : } else {
268 0 : Err(PostHogEvaluationError::NotAvailable(
269 0 : "PostHog integration is not enabled".to_string(),
270 0 : ))
271 : }
272 0 : }
273 :
274 : /// Evaluate a boolean feature flag. Currently, we do not support any properties.
275 : ///
276 : /// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error.
277 : ///
278 : /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
279 : /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
280 : /// propagated beyond where the feature flag gets resolved.
281 0 : pub fn evaluate_boolean(
282 0 : &self,
283 0 : flag_key: &str,
284 0 : tenant_id: TenantId,
285 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
286 0 : ) -> Result<(), PostHogEvaluationError> {
287 0 : let force_overrides = self.force_overrides_for_testing.load();
288 0 : if let Some(value) = force_overrides.get(flag_key) {
289 0 : return if value == "true" {
290 0 : Ok(())
291 : } else {
292 0 : Err(PostHogEvaluationError::NoConditionGroupMatched)
293 : };
294 0 : }
295 :
296 0 : if let Some(inner) = &self.inner {
297 0 : let res = inner.feature_store().evaluate_boolean(
298 0 : flag_key,
299 0 : &tenant_id.to_string(),
300 0 : &self.collect_properties(tenant_id, tenant_properties),
301 0 : );
302 0 : match &res {
303 0 : Ok(()) => {
304 0 : FEATURE_FLAG_EVALUATION
305 0 : .with_label_values(&[flag_key, "ok", "true"])
306 0 : .inc();
307 0 : }
308 0 : Err(e) => {
309 0 : FEATURE_FLAG_EVALUATION
310 0 : .with_label_values(&[flag_key, "error", e.as_variant_str()])
311 0 : .inc();
312 0 : }
313 : }
314 0 : res
315 : } else {
316 0 : Err(PostHogEvaluationError::NotAvailable(
317 0 : "PostHog integration is not enabled".to_string(),
318 0 : ))
319 : }
320 0 : }
321 :
322 0 : pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
323 0 : if let Some(inner) = &self.inner {
324 0 : inner.feature_store().is_feature_flag_boolean(flag_key)
325 : } else {
326 0 : Err(PostHogEvaluationError::NotAvailable(
327 0 : "PostHog integration is not enabled, cannot auto-determine the flag type"
328 0 : .to_string(),
329 0 : ))
330 : }
331 0 : }
332 :
333 : /// Force override a feature flag for testing. This is only for testing purposes. Assume the caller only call it
334 : /// from a single thread so it won't race.
335 0 : pub fn force_override_for_testing(&self, flag_key: &str, value: Option<&str>) {
336 0 : let mut force_overrides = self.force_overrides_for_testing.load().as_ref().clone();
337 0 : if let Some(value) = value {
338 0 : force_overrides.insert(flag_key.to_string(), value.to_string());
339 0 : } else {
340 0 : force_overrides.remove(flag_key);
341 0 : }
342 0 : self.force_overrides_for_testing
343 0 : .store(Arc::new(force_overrides));
344 0 : }
345 : }
346 :
347 : struct PerTenantProperties {
348 : pub remote_size_mb: Option<f64>,
349 : pub db_count_max: Option<usize>,
350 : pub rel_count_max: Option<usize>,
351 : }
352 :
353 : impl PerTenantProperties {
354 0 : pub fn into_posthog_properties(self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
355 0 : let mut properties = HashMap::new();
356 0 : if let Some(remote_size_mb) = self.remote_size_mb {
357 0 : properties.insert(
358 0 : "tenant_remote_size_mb".to_string(),
359 0 : PostHogFlagFilterPropertyValue::Number(remote_size_mb),
360 0 : );
361 0 : }
362 0 : if let Some(db_count) = self.db_count_max {
363 0 : properties.insert(
364 0 : "tenant_db_count_max".to_string(),
365 0 : PostHogFlagFilterPropertyValue::Number(db_count as f64),
366 0 : );
367 0 : }
368 0 : if let Some(rel_count) = self.rel_count_max {
369 0 : properties.insert(
370 0 : "tenant_rel_count_max".to_string(),
371 0 : PostHogFlagFilterPropertyValue::Number(rel_count as f64),
372 0 : );
373 0 : }
374 0 : properties
375 0 : }
376 : }
377 :
378 : pub struct TenantFeatureResolver {
379 : inner: FeatureResolver,
380 : tenant_id: TenantId,
381 : cached_tenant_properties: ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>,
382 :
383 : // Add feature flag on the critical path below.
384 : //
385 : // If a feature flag will be used on the critical path, we will update it in the tenant housekeeping loop insetad of
386 : // resolving directly by calling `evaluate_multivariate` or `evaluate_boolean`. Remember to update the flag in the
387 : // housekeeping loop. The user should directly read this atomic flag instead of using the set of evaluate functions.
388 : pub feature_test_remote_size_flag: AtomicBool,
389 : }
390 :
391 : impl TenantFeatureResolver {
392 119 : pub fn new(inner: FeatureResolver, tenant_id: TenantId) -> Self {
393 119 : Self {
394 119 : inner,
395 119 : tenant_id,
396 119 : cached_tenant_properties: ArcSwap::new(Arc::new(HashMap::new())),
397 119 : feature_test_remote_size_flag: AtomicBool::new(false),
398 119 : }
399 119 : }
400 :
401 0 : pub fn evaluate_multivariate(&self, flag_key: &str) -> Result<String, PostHogEvaluationError> {
402 0 : self.inner.evaluate_multivariate(
403 0 : flag_key,
404 0 : self.tenant_id,
405 0 : &self.cached_tenant_properties.load(),
406 : )
407 0 : }
408 :
409 0 : pub fn evaluate_boolean(&self, flag_key: &str) -> Result<(), PostHogEvaluationError> {
410 0 : self.inner.evaluate_boolean(
411 0 : flag_key,
412 0 : self.tenant_id,
413 0 : &self.cached_tenant_properties.load(),
414 : )
415 0 : }
416 :
417 0 : pub fn collect_properties(&self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
418 0 : self.inner
419 0 : .collect_properties(self.tenant_id, &self.cached_tenant_properties.load())
420 0 : }
421 :
422 0 : pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
423 0 : self.inner.is_feature_flag_boolean(flag_key)
424 0 : }
425 :
426 : /// Refresh the cached properties and flags on the critical path.
427 0 : pub fn refresh_properties_and_flags(&self, tenant_shard: &TenantShard) {
428 : // Any of the remote size is none => this property is none.
429 0 : let mut remote_size_mb = Some(0.0);
430 : // Any of the db or rel count is available => this property is available.
431 0 : let mut db_count_max = None;
432 0 : let mut rel_count_max = None;
433 0 : for timeline in tenant_shard.list_timelines() {
434 0 : let size = timeline.metrics.resident_physical_size_get();
435 0 : if size == 0 {
436 0 : remote_size_mb = None;
437 0 : break;
438 0 : }
439 0 : if let Some(ref mut remote_size_mb) = remote_size_mb {
440 0 : *remote_size_mb += size as f64 / 1024.0 / 1024.0;
441 0 : }
442 0 : if let Some(data) = timeline.db_rel_count.load_full() {
443 0 : let (db_count, rel_count) = *data.as_ref();
444 0 : if db_count_max.is_none() {
445 0 : db_count_max = Some(db_count);
446 0 : }
447 0 : if rel_count_max.is_none() {
448 0 : rel_count_max = Some(rel_count);
449 0 : }
450 0 : db_count_max = db_count_max.map(|max| max.max(db_count));
451 0 : rel_count_max = rel_count_max.map(|max| max.max(rel_count));
452 0 : }
453 : }
454 0 : self.cached_tenant_properties.store(Arc::new(
455 0 : PerTenantProperties {
456 0 : remote_size_mb,
457 0 : db_count_max,
458 0 : rel_count_max,
459 0 : }
460 0 : .into_posthog_properties(),
461 : ));
462 :
463 : // BEGIN: Update the feature flag on the critical path.
464 0 : self.feature_test_remote_size_flag.store(
465 0 : self.evaluate_boolean("test-remote-size-flag").is_ok(),
466 0 : std::sync::atomic::Ordering::Relaxed,
467 : );
468 : // END: Update the feature flag on the critical path.
469 0 : }
470 : }
|