Line data Source code
1 : use std::{collections::HashMap, sync::Arc, time::Duration};
2 :
3 : use arc_swap::ArcSwap;
4 : use pageserver_api::config::NodeMetadata;
5 : use posthog_client_lite::{
6 : CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError,
7 : PostHogFlagFilterPropertyValue,
8 : };
9 : use rand::Rng;
10 : use remote_storage::RemoteStorageKind;
11 : use serde_json::json;
12 : use tokio_util::sync::CancellationToken;
13 : use utils::id::TenantId;
14 :
15 : use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION, tenant::TenantShard};
16 :
17 : const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
18 :
19 : #[derive(Clone)]
20 : pub struct FeatureResolver {
21 : inner: Option<Arc<FeatureResolverBackgroundLoop>>,
22 : internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
23 : force_overrides_for_testing: Arc<ArcSwap<HashMap<String, String>>>,
24 : }
25 :
26 : impl FeatureResolver {
27 119 : pub fn new_disabled() -> Self {
28 119 : Self {
29 119 : inner: None,
30 119 : internal_properties: None,
31 119 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
32 119 : }
33 119 : }
34 :
35 0 : pub fn update(&self, spec: String) -> anyhow::Result<()> {
36 0 : if let Some(inner) = &self.inner {
37 0 : inner.update(spec)?;
38 0 : }
39 0 : Ok(())
40 0 : }
41 :
42 0 : pub fn spawn(
43 0 : conf: &PageServerConf,
44 0 : shutdown_pageserver: CancellationToken,
45 0 : handle: &tokio::runtime::Handle,
46 0 : ) -> anyhow::Result<Self> {
47 : // DO NOT block in this function: make it return as fast as possible to avoid startup delays.
48 0 : if let Some(posthog_config) = &conf.posthog_config {
49 0 : let posthog_client_config = match posthog_config.clone().try_into_posthog_config() {
50 0 : Ok(config) => config,
51 0 : Err(e) => {
52 0 : tracing::warn!(
53 0 : "invalid posthog config, skipping posthog integration: {}",
54 : e
55 : );
56 0 : return Ok(FeatureResolver {
57 0 : inner: None,
58 0 : internal_properties: None,
59 0 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(
60 0 : HashMap::new(),
61 0 : ))),
62 0 : });
63 : }
64 : };
65 0 : let inner =
66 0 : FeatureResolverBackgroundLoop::new(posthog_client_config, shutdown_pageserver);
67 0 : let inner = Arc::new(inner);
68 :
69 : // The properties shared by all tenants on this pageserver.
70 0 : let internal_properties = {
71 0 : let mut properties = HashMap::new();
72 0 : properties.insert(
73 0 : "pageserver_id".to_string(),
74 0 : PostHogFlagFilterPropertyValue::String(conf.id.to_string()),
75 : );
76 0 : if let Some(availability_zone) = &conf.availability_zone {
77 0 : properties.insert(
78 0 : "availability_zone".to_string(),
79 0 : PostHogFlagFilterPropertyValue::String(availability_zone.clone()),
80 0 : );
81 0 : }
82 : // Infer region based on the remote storage config.
83 0 : if let Some(remote_storage) = &conf.remote_storage_config {
84 0 : match &remote_storage.storage {
85 0 : RemoteStorageKind::AwsS3(config) => {
86 0 : properties.insert(
87 0 : "region".to_string(),
88 0 : PostHogFlagFilterPropertyValue::String(format!(
89 0 : "aws-{}",
90 0 : config.bucket_region
91 0 : )),
92 0 : );
93 0 : }
94 0 : RemoteStorageKind::AzureContainer(config) => {
95 0 : properties.insert(
96 0 : "region".to_string(),
97 0 : PostHogFlagFilterPropertyValue::String(format!(
98 0 : "azure-{}",
99 0 : config.container_region
100 0 : )),
101 0 : );
102 0 : }
103 0 : RemoteStorageKind::LocalFs { .. } => {
104 0 : properties.insert(
105 0 : "region".to_string(),
106 0 : PostHogFlagFilterPropertyValue::String("local".to_string()),
107 0 : );
108 0 : }
109 : }
110 0 : }
111 : // TODO: move this to a background task so that we don't block startup in case of slow disk
112 0 : let metadata_path = conf.metadata_path();
113 0 : match std::fs::read_to_string(&metadata_path) {
114 0 : Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
115 0 : Ok(metadata) => {
116 0 : properties.insert(
117 0 : "hostname".to_string(),
118 0 : PostHogFlagFilterPropertyValue::String(metadata.http_host),
119 : );
120 0 : if let Some(cplane_region) = metadata.other.get("region_id") {
121 0 : if let Some(cplane_region) = cplane_region.as_str() {
122 0 : // This region contains the cell number
123 0 : properties.insert(
124 0 : "neon_region".to_string(),
125 0 : PostHogFlagFilterPropertyValue::String(
126 0 : cplane_region.to_string(),
127 0 : ),
128 0 : );
129 0 : }
130 0 : }
131 : }
132 0 : Err(e) => {
133 0 : tracing::warn!("Failed to parse metadata.json: {}", e);
134 : }
135 : },
136 0 : Err(e) => {
137 0 : tracing::warn!("Failed to read metadata.json: {}", e);
138 : }
139 : }
140 0 : Arc::new(properties)
141 : };
142 :
143 0 : let fake_tenants = {
144 0 : let mut tenants = Vec::new();
145 0 : for i in 0..10 {
146 0 : let distinct_id = format!(
147 0 : "fake_tenant_{}_{}_{}",
148 0 : conf.availability_zone.as_deref().unwrap_or_default(),
149 0 : conf.id,
150 0 : i
151 0 : );
152 0 :
153 0 : let tenant_properties = PerTenantProperties {
154 0 : remote_size_mb: Some(rand::thread_rng().gen_range(100.0..1000000.00)),
155 0 : }
156 0 : .into_posthog_properties();
157 0 :
158 0 : let properties = Self::collect_properties_inner(
159 0 : distinct_id.clone(),
160 0 : Some(&internal_properties),
161 0 : &tenant_properties,
162 0 : );
163 0 : tenants.push(CaptureEvent {
164 0 : event: "initial_tenant_report".to_string(),
165 0 : distinct_id,
166 0 : properties: json!({ "$set": properties }), // use `$set` to set the person properties instead of the event properties
167 0 : });
168 0 : }
169 0 : tenants
170 : };
171 0 : inner.clone().spawn(
172 0 : handle,
173 0 : posthog_config
174 0 : .refresh_interval
175 0 : .unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL),
176 0 : fake_tenants,
177 : );
178 0 : Ok(FeatureResolver {
179 0 : inner: Some(inner),
180 0 : internal_properties: Some(internal_properties),
181 0 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
182 0 : })
183 : } else {
184 0 : Ok(FeatureResolver {
185 0 : inner: None,
186 0 : internal_properties: None,
187 0 : force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
188 0 : })
189 : }
190 0 : }
191 :
192 0 : fn collect_properties_inner(
193 0 : tenant_id: String,
194 0 : internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
195 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
196 0 : ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
197 0 : let mut properties = HashMap::new();
198 0 : if let Some(internal_properties) = internal_properties {
199 0 : for (key, value) in internal_properties.iter() {
200 0 : properties.insert(key.clone(), value.clone());
201 0 : }
202 0 : }
203 0 : properties.insert(
204 0 : "tenant_id".to_string(),
205 0 : PostHogFlagFilterPropertyValue::String(tenant_id),
206 : );
207 0 : for (key, value) in tenant_properties.iter() {
208 0 : properties.insert(key.clone(), value.clone());
209 0 : }
210 0 : properties
211 0 : }
212 :
213 : /// Collect all properties availble for the feature flag evaluation.
214 0 : pub(crate) fn collect_properties(
215 0 : &self,
216 0 : tenant_id: TenantId,
217 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
218 0 : ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
219 0 : Self::collect_properties_inner(
220 0 : tenant_id.to_string(),
221 0 : self.internal_properties.as_deref(),
222 0 : tenant_properties,
223 : )
224 0 : }
225 :
226 : /// Evaluate a multivariate feature flag. Currently, we do not support any properties.
227 : ///
228 : /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
229 : /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
230 : /// propagated beyond where the feature flag gets resolved.
231 0 : pub fn evaluate_multivariate(
232 0 : &self,
233 0 : flag_key: &str,
234 0 : tenant_id: TenantId,
235 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
236 0 : ) -> Result<String, PostHogEvaluationError> {
237 0 : let force_overrides = self.force_overrides_for_testing.load();
238 0 : if let Some(value) = force_overrides.get(flag_key) {
239 0 : return Ok(value.clone());
240 0 : }
241 :
242 0 : if let Some(inner) = &self.inner {
243 0 : let res = inner.feature_store().evaluate_multivariate(
244 0 : flag_key,
245 0 : &tenant_id.to_string(),
246 0 : &self.collect_properties(tenant_id, tenant_properties),
247 0 : );
248 0 : match &res {
249 0 : Ok(value) => {
250 0 : FEATURE_FLAG_EVALUATION
251 0 : .with_label_values(&[flag_key, "ok", value])
252 0 : .inc();
253 0 : }
254 0 : Err(e) => {
255 0 : FEATURE_FLAG_EVALUATION
256 0 : .with_label_values(&[flag_key, "error", e.as_variant_str()])
257 0 : .inc();
258 0 : }
259 : }
260 0 : res
261 : } else {
262 0 : Err(PostHogEvaluationError::NotAvailable(
263 0 : "PostHog integration is not enabled".to_string(),
264 0 : ))
265 : }
266 0 : }
267 :
268 : /// Evaluate a boolean feature flag. Currently, we do not support any properties.
269 : ///
270 : /// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error.
271 : ///
272 : /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
273 : /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
274 : /// propagated beyond where the feature flag gets resolved.
275 0 : pub fn evaluate_boolean(
276 0 : &self,
277 0 : flag_key: &str,
278 0 : tenant_id: TenantId,
279 0 : tenant_properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
280 0 : ) -> Result<(), PostHogEvaluationError> {
281 0 : let force_overrides = self.force_overrides_for_testing.load();
282 0 : if let Some(value) = force_overrides.get(flag_key) {
283 0 : return if value == "true" {
284 0 : Ok(())
285 : } else {
286 0 : Err(PostHogEvaluationError::NoConditionGroupMatched)
287 : };
288 0 : }
289 :
290 0 : if let Some(inner) = &self.inner {
291 0 : let res = inner.feature_store().evaluate_boolean(
292 0 : flag_key,
293 0 : &tenant_id.to_string(),
294 0 : &self.collect_properties(tenant_id, tenant_properties),
295 0 : );
296 0 : match &res {
297 0 : Ok(()) => {
298 0 : FEATURE_FLAG_EVALUATION
299 0 : .with_label_values(&[flag_key, "ok", "true"])
300 0 : .inc();
301 0 : }
302 0 : Err(e) => {
303 0 : FEATURE_FLAG_EVALUATION
304 0 : .with_label_values(&[flag_key, "error", e.as_variant_str()])
305 0 : .inc();
306 0 : }
307 : }
308 0 : res
309 : } else {
310 0 : Err(PostHogEvaluationError::NotAvailable(
311 0 : "PostHog integration is not enabled".to_string(),
312 0 : ))
313 : }
314 0 : }
315 :
316 0 : pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
317 0 : if let Some(inner) = &self.inner {
318 0 : inner.feature_store().is_feature_flag_boolean(flag_key)
319 : } else {
320 0 : Err(PostHogEvaluationError::NotAvailable(
321 0 : "PostHog integration is not enabled, cannot auto-determine the flag type"
322 0 : .to_string(),
323 0 : ))
324 : }
325 0 : }
326 :
327 : /// Force override a feature flag for testing. This is only for testing purposes. Assume the caller only call it
328 : /// from a single thread so it won't race.
329 0 : pub fn force_override_for_testing(&self, flag_key: &str, value: Option<&str>) {
330 0 : let mut force_overrides = self.force_overrides_for_testing.load().as_ref().clone();
331 0 : if let Some(value) = value {
332 0 : force_overrides.insert(flag_key.to_string(), value.to_string());
333 0 : } else {
334 0 : force_overrides.remove(flag_key);
335 0 : }
336 0 : self.force_overrides_for_testing
337 0 : .store(Arc::new(force_overrides));
338 0 : }
339 : }
340 :
341 : struct PerTenantProperties {
342 : pub remote_size_mb: Option<f64>,
343 : }
344 :
345 : impl PerTenantProperties {
346 0 : pub fn into_posthog_properties(self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
347 0 : let mut properties = HashMap::new();
348 0 : if let Some(remote_size_mb) = self.remote_size_mb {
349 0 : properties.insert(
350 0 : "tenant_remote_size_mb".to_string(),
351 0 : PostHogFlagFilterPropertyValue::Number(remote_size_mb),
352 0 : );
353 0 : }
354 0 : properties
355 0 : }
356 : }
357 :
358 : #[derive(Clone)]
359 : pub struct TenantFeatureResolver {
360 : inner: FeatureResolver,
361 : tenant_id: TenantId,
362 : cached_tenant_properties: Arc<ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>>,
363 : }
364 :
365 : impl TenantFeatureResolver {
366 118 : pub fn new(inner: FeatureResolver, tenant_id: TenantId) -> Self {
367 118 : Self {
368 118 : inner,
369 118 : tenant_id,
370 118 : cached_tenant_properties: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
371 118 : }
372 118 : }
373 :
374 0 : pub fn evaluate_multivariate(&self, flag_key: &str) -> Result<String, PostHogEvaluationError> {
375 0 : self.inner.evaluate_multivariate(
376 0 : flag_key,
377 0 : self.tenant_id,
378 0 : &self.cached_tenant_properties.load(),
379 : )
380 0 : }
381 :
382 0 : pub fn evaluate_boolean(&self, flag_key: &str) -> Result<(), PostHogEvaluationError> {
383 0 : self.inner.evaluate_boolean(
384 0 : flag_key,
385 0 : self.tenant_id,
386 0 : &self.cached_tenant_properties.load(),
387 : )
388 0 : }
389 :
390 0 : pub fn collect_properties(&self) -> HashMap<String, PostHogFlagFilterPropertyValue> {
391 0 : self.inner
392 0 : .collect_properties(self.tenant_id, &self.cached_tenant_properties.load())
393 0 : }
394 :
395 0 : pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
396 0 : self.inner.is_feature_flag_boolean(flag_key)
397 0 : }
398 :
399 0 : pub fn update_cached_tenant_properties(&self, tenant_shard: &TenantShard) {
400 0 : let mut remote_size_mb = None;
401 0 : for timeline in tenant_shard.list_timelines() {
402 0 : let size = timeline.metrics.resident_physical_size_get();
403 0 : if size == 0 {
404 0 : remote_size_mb = None;
405 0 : }
406 0 : if let Some(ref mut remote_size_mb) = remote_size_mb {
407 0 : *remote_size_mb += size as f64 / 1024.0 / 1024.0;
408 0 : }
409 : }
410 0 : self.cached_tenant_properties.store(Arc::new(
411 0 : PerTenantProperties { remote_size_mb }.into_posthog_properties(),
412 0 : ));
413 0 : }
414 : }
|