Line data Source code
1 : use std::{collections::HashMap, sync::Arc, time::Duration};
2 :
3 : use pageserver_api::config::NodeMetadata;
4 : use posthog_client_lite::{
5 : CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
6 : PostHogFlagFilterPropertyValue,
7 : };
8 : use remote_storage::RemoteStorageKind;
9 : use serde_json::json;
10 : use tokio_util::sync::CancellationToken;
11 : use utils::id::TenantId;
12 :
13 : use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION};
14 :
15 : #[derive(Clone)]
16 : pub struct FeatureResolver {
17 : inner: Option<Arc<FeatureResolverBackgroundLoop>>,
18 : internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
19 : }
20 :
21 : impl FeatureResolver {
22 119 : pub fn new_disabled() -> Self {
23 119 : Self {
24 119 : inner: None,
25 119 : internal_properties: None,
26 119 : }
27 119 : }
28 :
29 0 : pub fn spawn(
30 0 : conf: &PageServerConf,
31 0 : shutdown_pageserver: CancellationToken,
32 0 : handle: &tokio::runtime::Handle,
33 0 : ) -> anyhow::Result<Self> {
34 : // DO NOT block in this function: make it return as fast as possible to avoid startup delays.
35 0 : if let Some(posthog_config) = &conf.posthog_config {
36 0 : let inner = FeatureResolverBackgroundLoop::new(
37 0 : PostHogClientConfig {
38 0 : server_api_key: posthog_config.server_api_key.clone(),
39 0 : client_api_key: posthog_config.client_api_key.clone(),
40 0 : project_id: posthog_config.project_id.clone(),
41 0 : private_api_url: posthog_config.private_api_url.clone(),
42 0 : public_api_url: posthog_config.public_api_url.clone(),
43 0 : },
44 0 : shutdown_pageserver,
45 0 : );
46 0 : let inner = Arc::new(inner);
47 :
48 : // The properties shared by all tenants on this pageserver.
49 0 : let internal_properties = {
50 0 : let mut properties = HashMap::new();
51 0 : properties.insert(
52 0 : "pageserver_id".to_string(),
53 0 : PostHogFlagFilterPropertyValue::String(conf.id.to_string()),
54 0 : );
55 0 : if let Some(availability_zone) = &conf.availability_zone {
56 0 : properties.insert(
57 0 : "availability_zone".to_string(),
58 0 : PostHogFlagFilterPropertyValue::String(availability_zone.clone()),
59 0 : );
60 0 : }
61 : // Infer region based on the remote storage config.
62 0 : if let Some(remote_storage) = &conf.remote_storage_config {
63 0 : match &remote_storage.storage {
64 0 : RemoteStorageKind::AwsS3(config) => {
65 0 : properties.insert(
66 0 : "region".to_string(),
67 0 : PostHogFlagFilterPropertyValue::String(format!(
68 0 : "aws-{}",
69 0 : config.bucket_region
70 0 : )),
71 0 : );
72 0 : }
73 0 : RemoteStorageKind::AzureContainer(config) => {
74 0 : properties.insert(
75 0 : "region".to_string(),
76 0 : PostHogFlagFilterPropertyValue::String(format!(
77 0 : "azure-{}",
78 0 : config.container_region
79 0 : )),
80 0 : );
81 0 : }
82 0 : RemoteStorageKind::LocalFs { .. } => {
83 0 : properties.insert(
84 0 : "region".to_string(),
85 0 : PostHogFlagFilterPropertyValue::String("local".to_string()),
86 0 : );
87 0 : }
88 : }
89 0 : }
90 : // TODO: move this to a background task so that we don't block startup in case of slow disk
91 0 : let metadata_path = conf.metadata_path();
92 0 : match std::fs::read_to_string(&metadata_path) {
93 0 : Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
94 0 : Ok(metadata) => {
95 0 : properties.insert(
96 0 : "hostname".to_string(),
97 0 : PostHogFlagFilterPropertyValue::String(metadata.http_host),
98 0 : );
99 0 : if let Some(cplane_region) = metadata.other.get("region_id") {
100 0 : if let Some(cplane_region) = cplane_region.as_str() {
101 0 : // This region contains the cell number
102 0 : properties.insert(
103 0 : "neon_region".to_string(),
104 0 : PostHogFlagFilterPropertyValue::String(
105 0 : cplane_region.to_string(),
106 0 : ),
107 0 : );
108 0 : }
109 0 : }
110 : }
111 0 : Err(e) => {
112 0 : tracing::warn!("Failed to parse metadata.json: {}", e);
113 : }
114 : },
115 0 : Err(e) => {
116 0 : tracing::warn!("Failed to read metadata.json: {}", e);
117 : }
118 : }
119 0 : Arc::new(properties)
120 : };
121 0 : let fake_tenants = {
122 0 : let mut tenants = Vec::new();
123 0 : for i in 0..10 {
124 0 : let distinct_id = format!(
125 0 : "fake_tenant_{}_{}_{}",
126 0 : conf.availability_zone.as_deref().unwrap_or_default(),
127 0 : conf.id,
128 0 : i
129 0 : );
130 0 : let properties = Self::collect_properties_inner(
131 0 : distinct_id.clone(),
132 0 : Some(&internal_properties),
133 0 : );
134 0 : tenants.push(CaptureEvent {
135 0 : event: "initial_tenant_report".to_string(),
136 0 : distinct_id,
137 0 : properties: json!({ "$set": properties }), // use `$set` to set the person properties instead of the event properties
138 0 : });
139 0 : }
140 0 : tenants
141 0 : };
142 0 : // TODO: make refresh period configurable
143 0 : inner
144 0 : .clone()
145 0 : .spawn(handle, Duration::from_secs(60), fake_tenants);
146 0 : Ok(FeatureResolver {
147 0 : inner: Some(inner),
148 0 : internal_properties: Some(internal_properties),
149 0 : })
150 : } else {
151 0 : Ok(FeatureResolver {
152 0 : inner: None,
153 0 : internal_properties: None,
154 0 : })
155 : }
156 0 : }
157 :
158 0 : fn collect_properties_inner(
159 0 : tenant_id: String,
160 0 : internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
161 0 : ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
162 0 : let mut properties = HashMap::new();
163 0 : if let Some(internal_properties) = internal_properties {
164 0 : for (key, value) in internal_properties.iter() {
165 0 : properties.insert(key.clone(), value.clone());
166 0 : }
167 0 : }
168 0 : properties.insert(
169 0 : "tenant_id".to_string(),
170 0 : PostHogFlagFilterPropertyValue::String(tenant_id),
171 0 : );
172 0 : properties
173 0 : }
174 :
175 : /// Collect all properties availble for the feature flag evaluation.
176 0 : pub(crate) fn collect_properties(
177 0 : &self,
178 0 : tenant_id: TenantId,
179 0 : ) -> HashMap<String, PostHogFlagFilterPropertyValue> {
180 0 : Self::collect_properties_inner(tenant_id.to_string(), self.internal_properties.as_deref())
181 0 : }
182 :
183 : /// Evaluate a multivariate feature flag. Currently, we do not support any properties.
184 : ///
185 : /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
186 : /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
187 : /// propagated beyond where the feature flag gets resolved.
188 0 : pub fn evaluate_multivariate(
189 0 : &self,
190 0 : flag_key: &str,
191 0 : tenant_id: TenantId,
192 0 : ) -> Result<String, PostHogEvaluationError> {
193 0 : if let Some(inner) = &self.inner {
194 0 : let res = inner.feature_store().evaluate_multivariate(
195 0 : flag_key,
196 0 : &tenant_id.to_string(),
197 0 : &self.collect_properties(tenant_id),
198 0 : );
199 0 : match &res {
200 0 : Ok(value) => {
201 0 : FEATURE_FLAG_EVALUATION
202 0 : .with_label_values(&[flag_key, "ok", value])
203 0 : .inc();
204 0 : }
205 0 : Err(e) => {
206 0 : FEATURE_FLAG_EVALUATION
207 0 : .with_label_values(&[flag_key, "error", e.as_variant_str()])
208 0 : .inc();
209 0 : }
210 : }
211 0 : res
212 : } else {
213 0 : Err(PostHogEvaluationError::NotAvailable(
214 0 : "PostHog integration is not enabled".to_string(),
215 0 : ))
216 : }
217 0 : }
218 :
219 : /// Evaluate a boolean feature flag. Currently, we do not support any properties.
220 : ///
221 : /// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error.
222 : ///
223 : /// Error handling: the caller should inspect the error and decide the behavior when a feature flag
224 : /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
225 : /// propagated beyond where the feature flag gets resolved.
226 0 : pub fn evaluate_boolean(
227 0 : &self,
228 0 : flag_key: &str,
229 0 : tenant_id: TenantId,
230 0 : ) -> Result<(), PostHogEvaluationError> {
231 0 : if let Some(inner) = &self.inner {
232 0 : let res = inner.feature_store().evaluate_boolean(
233 0 : flag_key,
234 0 : &tenant_id.to_string(),
235 0 : &self.collect_properties(tenant_id),
236 0 : );
237 0 : match &res {
238 0 : Ok(()) => {
239 0 : FEATURE_FLAG_EVALUATION
240 0 : .with_label_values(&[flag_key, "ok", "true"])
241 0 : .inc();
242 0 : }
243 0 : Err(e) => {
244 0 : FEATURE_FLAG_EVALUATION
245 0 : .with_label_values(&[flag_key, "error", e.as_variant_str()])
246 0 : .inc();
247 0 : }
248 : }
249 0 : res
250 : } else {
251 0 : Err(PostHogEvaluationError::NotAvailable(
252 0 : "PostHog integration is not enabled".to_string(),
253 0 : ))
254 : }
255 0 : }
256 :
257 0 : pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
258 0 : if let Some(inner) = &self.inner {
259 0 : inner.feature_store().is_feature_flag_boolean(flag_key)
260 : } else {
261 0 : Err(PostHogEvaluationError::NotAvailable(
262 0 : "PostHog integration is not enabled".to_string(),
263 0 : ))
264 : }
265 0 : }
266 : }
|