Line data Source code
1 : use std::{
2 : collections::HashMap,
3 : sync::{Arc, atomic::AtomicBool},
4 : time::Duration,
5 : };
6 :
7 : use arc_swap::ArcSwap;
8 : use pageserver_api::config::NodeMetadata;
9 : use posthog_client_lite::{
10 : CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError,
11 : PostHogFlagFilterPropertyValue,
12 : };
13 : use rand::Rng;
14 : use remote_storage::RemoteStorageKind;
15 : use serde_json::json;
16 : use tokio_util::sync::CancellationToken;
17 : use utils::id::TenantId;
18 :
19 : use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION, tenant::TenantShard};
20 :
21 : const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
22 :
23 : #[derive(Clone)]
24 : pub struct FeatureResolver {
25 : inner: Option<Arc<FeatureResolverBackgroundLoop>>,
26 : internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
27 : force_overrides_for_testing: Arc<ArcSwap<HashMap<String, String>>>,
28 : }
29 :
30 : impl FeatureResolver {
31 120 : pub fn new_disabled() -> Self {
32 120 : Self {
33 120 : inner: None,
34 120 : internal_properties: None,
35 120 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
36 120 : }
37 120 : }
38 :
39 0 : pub fn update(&self, spec: String) -> anyhow::Result<()> {
40 0 : if let Some(inner) = &self.inner {
41 0 : inner.update(spec)?;
42 0 : }
43 0 : Ok(())
44 0 : }
45 :
46 0 : pub fn spawn(
47 0 : conf: &PageServerConf,
48 0 : shutdown_pageserver: CancellationToken,
49 0 : handle: &tokio::runtime::Handle,
50 0 : ) -> anyhow::Result<Self> {
51 : // DO NOT block in this function: make it return as fast as possible to avoid startup delays.
52 0 : if let Some(posthog_config) = &conf.posthog_config {
53 0 : let posthog_client_config = match posthog_config.clone().try_into_posthog_config() {
54 0 : Ok(config) => config,
55 0 : Err(e) => {
56 0 : tracing::warn!(
57 0 : "invalid posthog config, skipping posthog integration: {}",
58 : e
59 : );
60 0 : return Ok(FeatureResolver {
61 0 : inner: None,
62 0 : internal_properties: None,
63 0 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(
64 0 : HashMap::new(),
65 0 : ))),
66 0 : });
67 : }
68 : };
69 0 : let inner =
70 0 : FeatureResolverBackgroundLoop::new(posthog_client_config, shutdown_pageserver);
71 0 : let inner = Arc::new(inner);
72 :
73 : // The properties shared by all tenants on this pageserver.
74 0 : let internal_properties = {
75 0 : let mut properties = HashMap::new();
76 0 : properties.insert(
77 0 : "pageserver_id".to_string(),
78 0 : PostHogFlagFilterPropertyValue::String(conf.id.to_string()),
79 : );
80 0 : if let Some(availability_zone) = &conf.availability_zone {
81 0 : properties.insert(
82 0 : "availability_zone".to_string(),
83 0 : PostHogFlagFilterPropertyValue::String(availability_zone.clone()),
84 0 : );
85 0 : }
86 : // Infer region based on the remote storage config.
87 0 : if let Some(remote_storage) = &conf.remote_storage_config {
88 0 : match &remote_storage.storage {
89 0 : RemoteStorageKind::AwsS3(config) => {
90 0 : properties.insert(
91 0 : "region".to_string(),
92 0 : PostHogFlagFilterPropertyValue::String(format!(
93 0 : "aws-{}",
94 0 : config.bucket_region
95 0 : )),
96 0 : );
97 0 : }
98 0 : RemoteStorageKind::AzureContainer(config) => {
99 0 : properties.insert(
100 0 : "region".to_string(),
101 0 : PostHogFlagFilterPropertyValue::String(format!(
102 0 : "azure-{}",
103 0 : config.container_region
104 0 : )),
105 0 : );
106 0 : }
107 0 : RemoteStorageKind::LocalFs { .. } => {
108 0 : properties.insert(
109 0 : "region".to_string(),
110 0 : PostHogFlagFilterPropertyValue::String("local".to_string()),
111 0 : );
112 0 : }
113 : }
114 0 : }
115 : // TODO: move this to a background task so that we don't block startup in case of slow disk
116 0 : let metadata_path = conf.metadata_path();
117 0 : match std::fs::read_to_string(&metadata_path) {
118 0 : Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
119 0 : Ok(metadata) => {
120 0 : properties.insert(
121 0 : "hostname".to_string(),
122 0 : PostHogFlagFilterPropertyValue::String(metadata.http_host),
123 : );
124 0 : if let Some(cplane_region) = metadata.other.get("region_id") {
125 0 : if let Some(cplane_region) = cplane_region.as_str() {
126 0 : // This region contains the cell number
127 0 : properties.insert(
128 0 : "neon_region".to_string(),
129 0 : PostHogFlagFilterPropertyValue::String(
130 0 : cplane_region.to_string(),
131 0 : ),
132 0 : );
133 0 : }
134 0 : }
135 : }
136 0 : Err(e) => {
137 0 : tracing::warn!("Failed to parse metadata.json: {}", e);
138 : }
139 : },
140 0 : Err(e) => {
141 0 : tracing::warn!("Failed to read metadata.json: {}", e);
142 : }
143 : }
144 0 : Arc::new(properties)
145 : };
146 :
147 0 : let fake_tenants = {
148 0 : let mut tenants = Vec::new();
149 0 : for i in 0..10 {
150 0 : let distinct_id = format!(
151 0 : "fake_tenant_{}_{}_{}",
152 0 : conf.availability_zone.as_deref().unwrap_or_default(),
153 0 : conf.id,
154 0 : i
155 0 : );
156 0 :
157 0 : let tenant_properties = PerTenantProperties {
158 0 : remote_size_mb: Some(rand::thread_rng().gen_range(100.0..1000000.00)),
159 0 : }
160 0 : .into_posthog_properties();
161 0 :
162 0 : let properties = Self::collect_properties_inner(
163 0 : distinct_id.clone(),
164 0 : Some(&internal_properties),
165 0 : &tenant_properties,
166 0 : );
167 0 : tenants.push(CaptureEvent {
168 0 : event: "initial_tenant_report".to_string(),
169 0 : distinct_id,
170 0 : properties: json!({ "$set": properties }), // use `$set` to set the person properties instead of the event properties
171 0 : });
172 0 : }
173 0 : tenants
174 : };
175 0 : inner.clone().spawn(
176 0 : handle,
177 0 : posthog_config
178 0 : .refresh_interval
179 0 : .unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL),
180 0 : fake_tenants,
181 : );
182 0 : Ok(FeatureResolver {
183 0 : inner: Some(inner),
184 0 : internal_properties: Some(internal_properties),
185 0 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
186 0 : })
187 : } else {
188 0 : Ok(FeatureResolver {
189 0 : inner: None,
190 0 : internal_properties: None,
191 0 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
192 0 : })
193 : }
194 0 : }
195 :
196 0 : fn collect_properties_inner(
197 0 : tenant_id: String,
198 0 : internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
199 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
200 0 : ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
201 0 : let mut properties = HashMap::new();
202 0 : if let Some(internal_properties) = internal_properties {
203 0 : for (key, value) in internal_properties.iter() {
204 0 : properties.insert(key.clone(), value.clone());
205 0 : }
206 0 : }
207 0 : properties.insert(
208 0 : "tenant_id".to_string(),
209 0 : PostHogFlagFilterPropertyValue::String(tenant_id),
210 : );
211 0 : for (key, value) in tenant_properties.iter() {
212 0 : properties.insert(key.clone(), value.clone());
213 0 : }
214 0 : properties
215 0 : }
216 :
217 : /// Collect all properties availble for the feature flag evaluation.
218 0 : pub(crate) fn collect_properties(
219 0 : &self,
220 0 : tenant_id: TenantId,
221 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
222 0 : ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
223 0 : Self::collect_properties_inner(
224 0 : tenant_id.to_string(),
225 0 : self.internal_properties.as_deref(),
226 0 : tenant_properties,
227 : )
228 0 : }
229 :
230 : /// Evaluate a multivariate feature flag. Currently, we do not support any properties.
231 : ///
232 : /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
233 : /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
234 : /// propagated beyond where the feature flag gets resolved.
235 0 : pub fn evaluate_multivariate(
236 0 : &self,
237 0 : flag_key: &str,
238 0 : tenant_id: TenantId,
239 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
240 0 : ) -> Result<String, PostHogEvaluationError> {
241 0 : let force_overrides = self.force_overrides_for_testing.load();
242 0 : if let Some(value) = force_overrides.get(flag_key) {
243 0 : return Ok(value.clone());
244 0 : }
245 :
246 0 : if let Some(inner) = &self.inner {
247 0 : let res = inner.feature_store().evaluate_multivariate(
248 0 : flag_key,
249 0 : &tenant_id.to_string(),
250 0 : &self.collect_properties(tenant_id, tenant_properties),
251 0 : );
252 0 : match &res {
253 0 : Ok(value) => {
254 0 : FEATURE_FLAG_EVALUATION
255 0 : .with_label_values(&[flag_key, "ok", value])
256 0 : .inc();
257 0 : }
258 0 : Err(e) => {
259 0 : FEATURE_FLAG_EVALUATION
260 0 : .with_label_values(&[flag_key, "error", e.as_variant_str()])
261 0 : .inc();
262 0 : }
263 : }
264 0 : res
265 : } else {
266 0 : Err(PostHogEvaluationError::NotAvailable(
267 0 : "PostHog integration is not enabled".to_string(),
268 0 : ))
269 : }
270 0 : }
271 :
272 : /// Evaluate a boolean feature flag. Currently, we do not support any properties.
273 : ///
274 : /// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error.
275 : ///
276 : /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
277 : /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
278 : /// propagated beyond where the feature flag gets resolved.
279 0 : pub fn evaluate_boolean(
280 0 : &self,
281 0 : flag_key: &str,
282 0 : tenant_id: TenantId,
283 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
284 0 : ) -> Result<(), PostHogEvaluationError> {
285 0 : let force_overrides = self.force_overrides_for_testing.load();
286 0 : if let Some(value) = force_overrides.get(flag_key) {
287 0 : return if value == "true" {
288 0 : Ok(())
289 : } else {
290 0 : Err(PostHogEvaluationError::NoConditionGroupMatched)
291 : };
292 0 : }
293 :
294 0 : if let Some(inner) = &self.inner {
295 0 : let res = inner.feature_store().evaluate_boolean(
296 0 : flag_key,
297 0 : &tenant_id.to_string(),
298 0 : &self.collect_properties(tenant_id, tenant_properties),
299 0 : );
300 0 : match &res {
301 0 : Ok(()) => {
302 0 : FEATURE_FLAG_EVALUATION
303 0 : .with_label_values(&[flag_key, "ok", "true"])
304 0 : .inc();
305 0 : }
306 0 : Err(e) => {
307 0 : FEATURE_FLAG_EVALUATION
308 0 : .with_label_values(&[flag_key, "error", e.as_variant_str()])
309 0 : .inc();
310 0 : }
311 : }
312 0 : res
313 : } else {
314 0 : Err(PostHogEvaluationError::NotAvailable(
315 0 : "PostHog integration is not enabled".to_string(),
316 0 : ))
317 : }
318 0 : }
319 :
320 0 : pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
321 0 : if let Some(inner) = &self.inner {
322 0 : inner.feature_store().is_feature_flag_boolean(flag_key)
323 : } else {
324 0 : Err(PostHogEvaluationError::NotAvailable(
325 0 : "PostHog integration is not enabled, cannot auto-determine the flag type"
326 0 : .to_string(),
327 0 : ))
328 : }
329 0 : }
330 :
331 : /// Force override a feature flag for testing. This is only for testing purposes. Assume the caller only call it
332 : /// from a single thread so it won't race.
333 0 : pub fn force_override_for_testing(&self, flag_key: &str, value: Option<&str>) {
334 0 : let mut force_overrides = self.force_overrides_for_testing.load().as_ref().clone();
335 0 : if let Some(value) = value {
336 0 : force_overrides.insert(flag_key.to_string(), value.to_string());
337 0 : } else {
338 0 : force_overrides.remove(flag_key);
339 0 : }
340 0 : self.force_overrides_for_testing
341 0 : .store(Arc::new(force_overrides));
342 0 : }
343 : }
344 :
345 : struct PerTenantProperties {
346 : pub remote_size_mb: Option<f64>,
347 : }
348 :
349 : impl PerTenantProperties {
350 0 : pub fn into_posthog_properties(self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
351 0 : let mut properties = HashMap::new();
352 0 : if let Some(remote_size_mb) = self.remote_size_mb {
353 0 : properties.insert(
354 0 : "tenant_remote_size_mb".to_string(),
355 0 : PostHogFlagFilterPropertyValue::Number(remote_size_mb),
356 0 : );
357 0 : }
358 0 : properties
359 0 : }
360 : }
361 :
362 : pub struct TenantFeatureResolver {
363 : inner: FeatureResolver,
364 : tenant_id: TenantId,
365 : cached_tenant_properties: ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>,
366 :
367 : // Add feature flag on the critical path below.
368 : //
369 : // If a feature flag will be used on the critical path, we will update it in the tenant housekeeping loop insetad of
370 : // resolving directly by calling `evaluate_multivariate` or `evaluate_boolean`. Remember to update the flag in the
371 : // housekeeping loop. The user should directly read this atomic flag instead of using the set of evaluate functions.
372 : pub feature_test_remote_size_flag: AtomicBool,
373 : }
374 :
375 : impl TenantFeatureResolver {
376 119 : pub fn new(inner: FeatureResolver, tenant_id: TenantId) -> Self {
377 119 : Self {
378 119 : inner,
379 119 : tenant_id,
380 119 : cached_tenant_properties: ArcSwap::new(Arc::new(HashMap::new())),
381 119 : feature_test_remote_size_flag: AtomicBool::new(false),
382 119 : }
383 119 : }
384 :
385 0 : pub fn evaluate_multivariate(&self, flag_key: &str) -> Result<String, PostHogEvaluationError> {
386 0 : self.inner.evaluate_multivariate(
387 0 : flag_key,
388 0 : self.tenant_id,
389 0 : &self.cached_tenant_properties.load(),
390 : )
391 0 : }
392 :
393 0 : pub fn evaluate_boolean(&self, flag_key: &str) -> Result<(), PostHogEvaluationError> {
394 0 : self.inner.evaluate_boolean(
395 0 : flag_key,
396 0 : self.tenant_id,
397 0 : &self.cached_tenant_properties.load(),
398 : )
399 0 : }
400 :
401 0 : pub fn collect_properties(&self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
402 0 : self.inner
403 0 : .collect_properties(self.tenant_id, &self.cached_tenant_properties.load())
404 0 : }
405 :
406 0 : pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
407 0 : self.inner.is_feature_flag_boolean(flag_key)
408 0 : }
409 :
410 : /// Refresh the cached properties and flags on the critical path.
411 0 : pub fn refresh_properties_and_flags(&self, tenant_shard: &TenantShard) {
412 0 : let mut remote_size_mb = Some(0.0);
413 0 : for timeline in tenant_shard.list_timelines() {
414 0 : let size = timeline.metrics.resident_physical_size_get();
415 0 : if size == 0 {
416 0 : remote_size_mb = None;
417 0 : break;
418 0 : }
419 0 : if let Some(ref mut remote_size_mb) = remote_size_mb {
420 0 : *remote_size_mb += size as f64 / 1024.0 / 1024.0;
421 0 : }
422 : }
423 0 : self.cached_tenant_properties.store(Arc::new(
424 0 : PerTenantProperties { remote_size_mb }.into_posthog_properties(),
425 : ));
426 :
427 : // BEGIN: Update the feature flag on the critical path.
428 0 : self.feature_test_remote_size_flag.store(
429 0 : self.evaluate_boolean("test-remote-size-flag").is_ok(),
430 0 : std::sync::atomic::Ordering::Relaxed,
431 : );
432 : // END: Update the feature flag on the critical path.
433 0 : }
434 : }
|