Line data Source code
1 : //! `ComputeSpec` represents the contents of the spec.json file.
2 : //!
3 : //! The spec.json file is used to pass information to 'compute_ctl'. It contains
4 : //! all the information needed to start up the right version of PostgreSQL,
5 : //! and connect it to the storage nodes.
6 : use std::collections::HashMap;
7 :
8 : use serde::{Deserialize, Serialize};
9 : use utils::id::{TenantId, TimelineId};
10 : use utils::lsn::Lsn;
11 :
12 : use regex::Regex;
13 : use remote_storage::RemotePath;
14 :
15 : /// String type alias representing Postgres identifier and
16 : /// intended to be used for DB / role names.
17 : pub type PgIdent = String;
18 :
19 : /// String type alias representing Postgres extension version
20 : pub type ExtVersion = String;
21 :
22 6 : fn default_reconfigure_concurrency() -> usize {
23 6 : 1
24 6 : }
25 :
26 : /// Cluster spec or configuration represented as an optional number of
27 : /// delta operations + final cluster state description.
28 45 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
29 : pub struct ComputeSpec {
30 : pub format_version: f32,
31 :
32 : // The control plane also includes a 'timestamp' field in the JSON document,
33 : // but we don't use it for anything. Serde will ignore missing fields when
34 : // deserializing it.
35 : pub operation_uuid: Option<String>,
36 :
37 : /// Compute features to enable. These feature flags are provided, when we
38 : /// know all the details about client's compute, so they cannot be used
39 : /// to change `Empty` compute behavior.
40 : #[serde(default)]
41 : pub features: Vec<ComputeFeature>,
42 :
43 : /// If compute_ctl was passed `--resize-swap-on-bind`, a value of `Some(_)` instructs
44 : /// compute_ctl to `/neonvm/bin/resize-swap` with the given size, when the spec is first
45 : /// received.
46 : ///
47 : /// Both this field and `--resize-swap-on-bind` are required, so that the control plane's
48 : /// spec generation doesn't need to be aware of the actual compute it's running on, while
49 : /// guaranteeing gradual rollout of swap. Otherwise, without `--resize-swap-on-bind`, we could
50 : /// end up trying to resize swap in VMs without it -- or end up *not* resizing swap, thus
51 : /// giving every VM much more swap than it should have (32GiB).
52 : ///
53 : /// Eventually we may remove `--resize-swap-on-bind` and exclusively use `swap_size_bytes` for
54 : /// enabling the swap resizing behavior once rollout is complete.
55 : ///
56 : /// See neondatabase/cloud#12047 for more.
57 : #[serde(default)]
58 : pub swap_size_bytes: Option<u64>,
59 :
60 : /// If compute_ctl was passed `--set-disk-quota-for-fs`, a value of `Some(_)` instructs
61 : /// compute_ctl to run `/neonvm/bin/set-disk-quota` with the given size and fs, when the
62 : /// spec is first received.
63 : ///
64 : /// Both this field and `--set-disk-quota-for-fs` are required, so that the control plane's
65 : /// spec generation doesn't need to be aware of the actual compute it's running on, while
66 : /// guaranteeing gradual rollout of disk quota.
67 : #[serde(default)]
68 : pub disk_quota_bytes: Option<u64>,
69 :
70 : /// Disables the vm-monitor behavior that resizes LFC on upscale/downscale, instead relying on
71 : /// the initial size of LFC.
72 : ///
73 : /// This is intended for use when the LFC size is being overridden from the default but
74 : /// autoscaling is still enabled, and we don't want the vm-monitor to interfere with the custom
75 : /// LFC sizing.
76 : #[serde(default)]
77 : pub disable_lfc_resizing: Option<bool>,
78 :
79 : /// Expected cluster state at the end of transition process.
80 : pub cluster: Cluster,
81 : pub delta_operations: Option<Vec<DeltaOp>>,
82 :
83 : /// An optional hint that can be passed to speed up startup time if we know
84 : /// that no pg catalog mutations (like role creation, database creation,
85 : /// extension creation) need to be done on the actual database to start.
86 : #[serde(default)] // Default false
87 : pub skip_pg_catalog_updates: bool,
88 :
89 : // Information needed to connect to the storage layer.
90 : //
91 : // `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
92 : //
93 : // Depending on `mode`, this can be a primary read-write node, a read-only
94 : // replica, or a read-only node pinned at an older LSN.
95 : // `safekeeper_connstrings` must be set for a primary.
96 : //
97 : // For backwards compatibility, the control plane may leave out all of
98 : // these, and instead set the "neon.tenant_id", "neon.timeline_id",
99 : // etc. GUCs in cluster.settings. TODO: Once the control plane has been
100 : // updated to fill these fields, we can make these non optional.
101 : pub tenant_id: Option<TenantId>,
102 : pub timeline_id: Option<TimelineId>,
103 : pub pageserver_connstring: Option<String>,
104 :
105 : #[serde(default)]
106 : pub safekeeper_connstrings: Vec<String>,
107 :
108 : #[serde(default)]
109 : pub mode: ComputeMode,
110 :
111 : /// If set, 'storage_auth_token' is used as the password to authenticate to
112 : /// the pageserver and safekeepers.
113 : pub storage_auth_token: Option<String>,
114 :
115 : // information about available remote extensions
116 : pub remote_extensions: Option<RemoteExtSpec>,
117 :
118 : pub pgbouncer_settings: Option<HashMap<String, String>>,
119 :
120 : // Stripe size for pageserver sharding, in pages
121 : #[serde(default)]
122 : pub shard_stripe_size: Option<usize>,
123 :
124 : /// Local Proxy configuration used for JWT authentication
125 : #[serde(default)]
126 : pub local_proxy_config: Option<LocalProxySpec>,
127 :
128 : /// Number of concurrent connections during the parallel RunInEachDatabase
129 : /// phase of the apply config process.
130 : ///
131 : /// We need a higher concurrency during reconfiguration in case of many DBs,
132 : /// but instance is already running and used by client. We can easily get out of
133 : /// `max_connections` limit, and the current code won't handle that.
134 : ///
135 : /// Default is 1, but also allow control plane to override this value for specific
136 : /// projects. It's also recommended to bump `superuser_reserved_connections` +=
137 : /// `reconfigure_concurrency` for such projects to ensure that we always have
138 : /// enough spare connections for reconfiguration process to succeed.
139 : #[serde(default = "default_reconfigure_concurrency")]
140 : pub reconfigure_concurrency: usize,
141 :
142 : /// If set to true, the compute_ctl will drop all subscriptions before starting the
143 : /// compute. This is needed when we start an endpoint on a branch, so that child
144 : /// would not compete with parent branch subscriptions
145 : /// over the same replication content from publisher.
146 : #[serde(default)] // Default false
147 : pub drop_subscriptions_before_start: bool,
148 : }
149 :
150 : /// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
151 3 : #[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
152 : #[serde(rename_all = "snake_case")]
153 : pub enum ComputeFeature {
154 : // XXX: Add more feature flags here.
155 : /// Enable the experimental activity monitor logic, which uses `pg_stat_database` to
156 : /// track short-lived connections as user activity.
157 : ActivityMonitorExperimental,
158 :
159 : /// Pre-install and initialize anon extension for every database in the cluster
160 : AnonExtension,
161 :
162 : /// This is a special feature flag that is used to represent unknown feature flags.
163 : /// Basically all unknown to enum flags are represented as this one. See unit test
164 : /// `parse_unknown_features()` for more details.
165 : #[serde(other)]
166 : UnknownFeature,
167 : }
168 :
169 24 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
170 : pub struct RemoteExtSpec {
171 : pub public_extensions: Option<Vec<String>>,
172 : pub custom_extensions: Option<Vec<String>>,
173 : pub library_index: HashMap<String, String>,
174 : pub extension_data: HashMap<String, ExtensionData>,
175 : }
176 :
177 24 : #[derive(Clone, Debug, Serialize, Deserialize)]
178 : pub struct ExtensionData {
179 : pub control_data: HashMap<String, String>,
180 : pub archive_path: String,
181 : }
182 :
183 : impl RemoteExtSpec {
184 0 : pub fn get_ext(
185 0 : &self,
186 0 : ext_name: &str,
187 0 : is_library: bool,
188 0 : build_tag: &str,
189 0 : pg_major_version: &str,
190 0 : ) -> anyhow::Result<(String, RemotePath)> {
191 0 : let mut real_ext_name = ext_name;
192 0 : if is_library {
193 : // sometimes library names might have a suffix like
194 : // library.so or library.so.3. We strip this off
195 : // because library_index is based on the name without the file extension
196 0 : let strip_lib_suffix = Regex::new(r"\.so.*").unwrap();
197 0 : let lib_raw_name = strip_lib_suffix.replace(real_ext_name, "").to_string();
198 0 :
199 0 : real_ext_name = self
200 0 : .library_index
201 0 : .get(&lib_raw_name)
202 0 : .ok_or(anyhow::anyhow!("library {} is not found", lib_raw_name))?;
203 0 : }
204 :
205 : // Check if extension is present in public or custom.
206 : // If not, then it is not allowed to be used by this compute.
207 0 : if let Some(public_extensions) = &self.public_extensions {
208 0 : if !public_extensions.contains(&real_ext_name.to_string()) {
209 0 : if let Some(custom_extensions) = &self.custom_extensions {
210 0 : if !custom_extensions.contains(&real_ext_name.to_string()) {
211 0 : return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
212 0 : }
213 0 : }
214 0 : }
215 0 : }
216 :
217 0 : match self.extension_data.get(real_ext_name) {
218 0 : Some(_ext_data) => {
219 0 : // Construct the path to the extension archive
220 0 : // BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
221 0 : //
222 0 : // Keep it in sync with path generation in
223 0 : // https://github.com/neondatabase/build-custom-extensions/tree/main
224 0 : let archive_path_str =
225 0 : format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
226 0 : Ok((
227 0 : real_ext_name.to_string(),
228 0 : RemotePath::from_string(&archive_path_str)?,
229 : ))
230 : }
231 0 : None => Err(anyhow::anyhow!(
232 0 : "real_ext_name {} is not found",
233 0 : real_ext_name
234 0 : )),
235 : }
236 0 : }
237 : }
238 :
239 0 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
240 : pub enum ComputeMode {
241 : /// A read-write node
242 : #[default]
243 : Primary,
244 : /// A read-only node, pinned at a particular LSN
245 : Static(Lsn),
246 : /// A read-only node that follows the tip of the branch in hot standby mode
247 : ///
248 : /// Future versions may want to distinguish between replicas with hot standby
249 : /// feedback and other kinds of replication configurations.
250 : Replica,
251 : }
252 :
253 36 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
254 : pub struct Cluster {
255 : pub cluster_id: Option<String>,
256 : pub name: Option<String>,
257 : pub state: Option<String>,
258 : pub roles: Vec<Role>,
259 : pub databases: Vec<Database>,
260 :
261 : /// Desired contents of 'postgresql.conf' file. (The 'compute_ctl'
262 : /// tool may add additional settings to the final file.)
263 : pub postgresql_conf: Option<String>,
264 :
265 : /// Additional settings that will be appended to the 'postgresql.conf' file.
266 : pub settings: GenericOptions,
267 : }
268 :
269 : /// Single cluster state changing operation that could not be represented as
270 : /// a static `Cluster` structure. For example:
271 : /// - DROP DATABASE
272 : /// - DROP ROLE
273 : /// - ALTER ROLE name RENAME TO new_name
274 : /// - ALTER DATABASE name RENAME TO new_name
275 60 : #[derive(Clone, Debug, Deserialize, Serialize)]
276 : pub struct DeltaOp {
277 : pub action: String,
278 : pub name: PgIdent,
279 : pub new_name: Option<PgIdent>,
280 : }
281 :
282 : /// Rust representation of Postgres role info with only those fields
283 : /// that matter for us.
284 90 : #[derive(Clone, Debug, Deserialize, Serialize)]
285 : pub struct Role {
286 : pub name: PgIdent,
287 : pub encrypted_password: Option<String>,
288 : pub options: GenericOptions,
289 : }
290 :
291 : /// Rust representation of Postgres database info with only those fields
292 : /// that matter for us.
293 42 : #[derive(Clone, Debug, Deserialize, Serialize)]
294 : pub struct Database {
295 : pub name: PgIdent,
296 : pub owner: PgIdent,
297 : pub options: GenericOptions,
298 : // These are derived flags, not present in the spec file.
299 : // They are never set by the control plane.
300 : #[serde(skip_deserializing, default)]
301 : pub restrict_conn: bool,
302 : #[serde(skip_deserializing, default)]
303 : pub invalid: bool,
304 : }
305 :
306 : /// Common type representing both SQL statement params with or without value,
307 : /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
308 : /// options like `wal_level = logical`.
309 468 : #[derive(Clone, Debug, Deserialize, Serialize)]
310 : pub struct GenericOption {
311 : pub name: String,
312 : pub value: Option<String>,
313 : pub vartype: String,
314 : }
315 :
316 : /// Optional collection of `GenericOption`'s. Type alias allows us to
317 : /// declare a `trait` on it.
318 : pub type GenericOptions = Option<Vec<GenericOption>>;
319 :
320 : /// Configured the local_proxy application with the relevant JWKS and roles it should
321 : /// use for authorizing connect requests using JWT.
322 0 : #[derive(Clone, Debug, Deserialize, Serialize)]
323 : pub struct LocalProxySpec {
324 : #[serde(default)]
325 : #[serde(skip_serializing_if = "Option::is_none")]
326 : pub jwks: Option<Vec<JwksSettings>>,
327 : }
328 :
329 0 : #[derive(Clone, Debug, Deserialize, Serialize)]
330 : pub struct JwksSettings {
331 : pub id: String,
332 : pub role_names: Vec<String>,
333 : pub jwks_url: String,
334 : pub provider_name: String,
335 : pub jwt_audience: Option<String>,
336 : }
337 :
338 : #[cfg(test)]
339 : mod tests {
340 : use super::*;
341 : use std::fs::File;
342 :
343 : #[test]
344 1 : fn parse_spec_file() {
345 1 : let file = File::open("tests/cluster_spec.json").unwrap();
346 1 : let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
347 1 :
348 1 : // Features list defaults to empty vector.
349 1 : assert!(spec.features.is_empty());
350 :
351 : // Reconfigure concurrency defaults to 1.
352 1 : assert_eq!(spec.reconfigure_concurrency, 1);
353 1 : }
354 :
355 : #[test]
356 1 : fn parse_unknown_fields() {
357 1 : // Forward compatibility test
358 1 : let file = File::open("tests/cluster_spec.json").unwrap();
359 1 : let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
360 1 : let ob = json.as_object_mut().unwrap();
361 1 : ob.insert("unknown_field_123123123".into(), "hello".into());
362 1 : let _spec: ComputeSpec = serde_json::from_value(json).unwrap();
363 1 : }
364 :
365 : #[test]
366 1 : fn parse_unknown_features() {
367 1 : // Test that unknown feature flags do not cause any errors.
368 1 : let file = File::open("tests/cluster_spec.json").unwrap();
369 1 : let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
370 1 : let ob = json.as_object_mut().unwrap();
371 1 :
372 1 : // Add unknown feature flags.
373 1 : let features = vec!["foo_bar_feature", "baz_feature"];
374 1 : ob.insert("features".into(), features.into());
375 1 :
376 1 : let spec: ComputeSpec = serde_json::from_value(json).unwrap();
377 1 :
378 1 : assert!(spec.features.len() == 2);
379 1 : assert!(spec.features.contains(&ComputeFeature::UnknownFeature));
380 1 : assert_eq!(spec.features, vec![ComputeFeature::UnknownFeature; 2]);
381 1 : }
382 :
383 : #[test]
384 1 : fn parse_known_features() {
385 1 : // Test that we can properly parse known feature flags.
386 1 : let file = File::open("tests/cluster_spec.json").unwrap();
387 1 : let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
388 1 : let ob = json.as_object_mut().unwrap();
389 1 :
390 1 : // Add known feature flags.
391 1 : let features = vec!["activity_monitor_experimental"];
392 1 : ob.insert("features".into(), features.into());
393 1 :
394 1 : let spec: ComputeSpec = serde_json::from_value(json).unwrap();
395 1 :
396 1 : assert_eq!(
397 1 : spec.features,
398 1 : vec![ComputeFeature::ActivityMonitorExperimental]
399 1 : );
400 1 : }
401 : }
|