Line data Source code
1 : //! `ComputeSpec` represents the contents of the spec.json file.
2 : //!
3 : //! The spec.json file is used to pass information to 'compute_ctl'. It contains
4 : //! all the information needed to start up the right version of PostgreSQL,
5 : //! and connect it to the storage nodes.
6 : use std::collections::HashMap;
7 :
8 : use regex::Regex;
9 : use remote_storage::RemotePath;
10 : use serde::{Deserialize, Serialize};
11 : use utils::id::{TenantId, TimelineId};
12 : use utils::lsn::Lsn;
13 :
14 : /// String type alias representing Postgres identifier and
15 : /// intended to be used for DB / role names.
16 : pub type PgIdent = String;
17 :
18 : /// String type alias representing Postgres extension version
19 : pub type ExtVersion = String;
20 :
21 6 : fn default_reconfigure_concurrency() -> usize {
22 6 : 1
23 6 : }
24 :
25 : /// Cluster spec or configuration represented as an optional number of
26 : /// delta operations + final cluster state description.
27 45 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
28 : pub struct ComputeSpec {
29 : pub format_version: f32,
30 :
31 : // The control plane also includes a 'timestamp' field in the JSON document,
32 : // but we don't use it for anything. Serde will ignore missing fields when
33 : // deserializing it.
34 : pub operation_uuid: Option<String>,
35 :
36 : /// Compute features to enable. These feature flags are provided, when we
37 : /// know all the details about client's compute, so they cannot be used
38 : /// to change `Empty` compute behavior.
39 : #[serde(default)]
40 : pub features: Vec<ComputeFeature>,
41 :
42 : /// If compute_ctl was passed `--resize-swap-on-bind`, a value of `Some(_)` instructs
43 : /// compute_ctl to `/neonvm/bin/resize-swap` with the given size, when the spec is first
44 : /// received.
45 : ///
46 : /// Both this field and `--resize-swap-on-bind` are required, so that the control plane's
47 : /// spec generation doesn't need to be aware of the actual compute it's running on, while
48 : /// guaranteeing gradual rollout of swap. Otherwise, without `--resize-swap-on-bind`, we could
49 : /// end up trying to resize swap in VMs without it -- or end up *not* resizing swap, thus
50 : /// giving every VM much more swap than it should have (32GiB).
51 : ///
52 : /// Eventually we may remove `--resize-swap-on-bind` and exclusively use `swap_size_bytes` for
53 : /// enabling the swap resizing behavior once rollout is complete.
54 : ///
55 : /// See neondatabase/cloud#12047 for more.
56 : #[serde(default)]
57 : pub swap_size_bytes: Option<u64>,
58 :
59 : /// If compute_ctl was passed `--set-disk-quota-for-fs`, a value of `Some(_)` instructs
60 : /// compute_ctl to run `/neonvm/bin/set-disk-quota` with the given size and fs, when the
61 : /// spec is first received.
62 : ///
63 : /// Both this field and `--set-disk-quota-for-fs` are required, so that the control plane's
64 : /// spec generation doesn't need to be aware of the actual compute it's running on, while
65 : /// guaranteeing gradual rollout of disk quota.
66 : #[serde(default)]
67 : pub disk_quota_bytes: Option<u64>,
68 :
69 : /// Disables the vm-monitor behavior that resizes LFC on upscale/downscale, instead relying on
70 : /// the initial size of LFC.
71 : ///
72 : /// This is intended for use when the LFC size is being overridden from the default but
73 : /// autoscaling is still enabled, and we don't want the vm-monitor to interfere with the custom
74 : /// LFC sizing.
75 : #[serde(default)]
76 : pub disable_lfc_resizing: Option<bool>,
77 :
78 : /// Expected cluster state at the end of transition process.
79 : pub cluster: Cluster,
80 : pub delta_operations: Option<Vec<DeltaOp>>,
81 :
82 : /// An optional hint that can be passed to speed up startup time if we know
83 : /// that no pg catalog mutations (like role creation, database creation,
84 : /// extension creation) need to be done on the actual database to start.
85 : #[serde(default)] // Default false
86 : pub skip_pg_catalog_updates: bool,
87 :
88 : // Information needed to connect to the storage layer.
89 : //
90 : // `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
91 : //
92 : // Depending on `mode`, this can be a primary read-write node, a read-only
93 : // replica, or a read-only node pinned at an older LSN.
94 : // `safekeeper_connstrings` must be set for a primary.
95 : //
96 : // For backwards compatibility, the control plane may leave out all of
97 : // these, and instead set the "neon.tenant_id", "neon.timeline_id",
98 : // etc. GUCs in cluster.settings. TODO: Once the control plane has been
99 : // updated to fill these fields, we can make these non optional.
100 : pub tenant_id: Option<TenantId>,
101 : pub timeline_id: Option<TimelineId>,
102 : pub pageserver_connstring: Option<String>,
103 :
104 : #[serde(default)]
105 : pub safekeeper_connstrings: Vec<String>,
106 :
107 : #[serde(default)]
108 : pub mode: ComputeMode,
109 :
110 : /// If set, 'storage_auth_token' is used as the password to authenticate to
111 : /// the pageserver and safekeepers.
112 : pub storage_auth_token: Option<String>,
113 :
114 : // information about available remote extensions
115 : pub remote_extensions: Option<RemoteExtSpec>,
116 :
117 : pub pgbouncer_settings: Option<HashMap<String, String>>,
118 :
119 : // Stripe size for pageserver sharding, in pages
120 : #[serde(default)]
121 : pub shard_stripe_size: Option<usize>,
122 :
123 : /// Local Proxy configuration used for JWT authentication
124 : #[serde(default)]
125 : pub local_proxy_config: Option<LocalProxySpec>,
126 :
127 : /// Number of concurrent connections during the parallel RunInEachDatabase
128 : /// phase of the apply config process.
129 : ///
130 : /// We need a higher concurrency during reconfiguration in case of many DBs,
131 : /// but instance is already running and used by client. We can easily get out of
132 : /// `max_connections` limit, and the current code won't handle that.
133 : ///
134 : /// Default is 1, but also allow control plane to override this value for specific
135 : /// projects. It's also recommended to bump `superuser_reserved_connections` +=
136 : /// `reconfigure_concurrency` for such projects to ensure that we always have
137 : /// enough spare connections for reconfiguration process to succeed.
138 : #[serde(default = "default_reconfigure_concurrency")]
139 : pub reconfigure_concurrency: usize,
140 :
141 : /// If set to true, the compute_ctl will drop all subscriptions before starting the
142 : /// compute. This is needed when we start an endpoint on a branch, so that child
143 : /// would not compete with parent branch subscriptions
144 : /// over the same replication content from publisher.
145 : #[serde(default)] // Default false
146 : pub drop_subscriptions_before_start: bool,
147 : }
148 :
149 : /// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
150 3 : #[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
151 : #[serde(rename_all = "snake_case")]
152 : pub enum ComputeFeature {
153 : // XXX: Add more feature flags here.
154 : /// Enable the experimental activity monitor logic, which uses `pg_stat_database` to
155 : /// track short-lived connections as user activity.
156 : ActivityMonitorExperimental,
157 :
158 : /// Pre-install and initialize anon extension for every database in the cluster
159 : AnonExtension,
160 :
161 : /// This is a special feature flag that is used to represent unknown feature flags.
162 : /// Basically all unknown to enum flags are represented as this one. See unit test
163 : /// `parse_unknown_features()` for more details.
164 : #[serde(other)]
165 : UnknownFeature,
166 : }
167 :
168 44 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
169 : pub struct RemoteExtSpec {
170 : pub public_extensions: Option<Vec<String>>,
171 : pub custom_extensions: Option<Vec<String>>,
172 : pub library_index: HashMap<String, String>,
173 : pub extension_data: HashMap<String, ExtensionData>,
174 : }
175 :
176 30 : #[derive(Clone, Debug, Serialize, Deserialize)]
177 : pub struct ExtensionData {
178 : pub control_data: HashMap<String, String>,
179 : pub archive_path: String,
180 : }
181 :
182 : impl RemoteExtSpec {
183 6 : pub fn get_ext(
184 6 : &self,
185 6 : ext_name: &str,
186 6 : is_library: bool,
187 6 : build_tag: &str,
188 6 : pg_major_version: &str,
189 6 : ) -> anyhow::Result<(String, RemotePath)> {
190 6 : let mut real_ext_name = ext_name;
191 6 : if is_library {
192 : // sometimes library names might have a suffix like
193 : // library.so or library.so.3. We strip this off
194 : // because library_index is based on the name without the file extension
195 1 : let strip_lib_suffix = Regex::new(r"\.so.*").unwrap();
196 1 : let lib_raw_name = strip_lib_suffix.replace(real_ext_name, "").to_string();
197 1 :
198 1 : real_ext_name = self
199 1 : .library_index
200 1 : .get(&lib_raw_name)
201 1 : .ok_or(anyhow::anyhow!("library {} is not found", lib_raw_name))?;
202 5 : }
203 :
204 : // Check if extension is present in public or custom.
205 : // If not, then it is not allowed to be used by this compute.
206 6 : if !self
207 6 : .public_extensions
208 6 : .as_ref()
209 6 : .is_some_and(|exts| exts.iter().any(|e| e == real_ext_name))
210 4 : && !self
211 4 : .custom_extensions
212 4 : .as_ref()
213 4 : .is_some_and(|exts| exts.iter().any(|e| e == real_ext_name))
214 : {
215 3 : return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
216 3 : }
217 3 :
218 3 : match self.extension_data.get(real_ext_name) {
219 3 : Some(_ext_data) => {
220 3 : // Construct the path to the extension archive
221 3 : // BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
222 3 : //
223 3 : // Keep it in sync with path generation in
224 3 : // https://github.com/neondatabase/build-custom-extensions/tree/main
225 3 : let archive_path_str =
226 3 : format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
227 3 : Ok((
228 3 : real_ext_name.to_string(),
229 3 : RemotePath::from_string(&archive_path_str)?,
230 : ))
231 : }
232 0 : None => Err(anyhow::anyhow!(
233 0 : "real_ext_name {} is not found",
234 0 : real_ext_name
235 0 : )),
236 : }
237 6 : }
238 : }
239 :
240 0 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
241 : pub enum ComputeMode {
242 : /// A read-write node
243 : #[default]
244 : Primary,
245 : /// A read-only node, pinned at a particular LSN
246 : Static(Lsn),
247 : /// A read-only node that follows the tip of the branch in hot standby mode
248 : ///
249 : /// Future versions may want to distinguish between replicas with hot standby
250 : /// feedback and other kinds of replication configurations.
251 : Replica,
252 : }
253 :
254 36 : #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
255 : pub struct Cluster {
256 : pub cluster_id: Option<String>,
257 : pub name: Option<String>,
258 : pub state: Option<String>,
259 : pub roles: Vec<Role>,
260 : pub databases: Vec<Database>,
261 :
262 : /// Desired contents of 'postgresql.conf' file. (The 'compute_ctl'
263 : /// tool may add additional settings to the final file.)
264 : pub postgresql_conf: Option<String>,
265 :
266 : /// Additional settings that will be appended to the 'postgresql.conf' file.
267 : pub settings: GenericOptions,
268 : }
269 :
270 : /// Single cluster state changing operation that could not be represented as
271 : /// a static `Cluster` structure. For example:
272 : /// - DROP DATABASE
273 : /// - DROP ROLE
274 : /// - ALTER ROLE name RENAME TO new_name
275 : /// - ALTER DATABASE name RENAME TO new_name
276 60 : #[derive(Clone, Debug, Deserialize, Serialize)]
277 : pub struct DeltaOp {
278 : pub action: String,
279 : pub name: PgIdent,
280 : pub new_name: Option<PgIdent>,
281 : }
282 :
283 : /// Rust representation of Postgres role info with only those fields
284 : /// that matter for us.
285 90 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
286 : pub struct Role {
287 : pub name: PgIdent,
288 : pub encrypted_password: Option<String>,
289 : pub options: GenericOptions,
290 : }
291 :
292 : /// Rust representation of Postgres database info with only those fields
293 : /// that matter for us.
294 42 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
295 : pub struct Database {
296 : pub name: PgIdent,
297 : pub owner: PgIdent,
298 : pub options: GenericOptions,
299 : // These are derived flags, not present in the spec file.
300 : // They are never set by the control plane.
301 : #[serde(skip_deserializing, default)]
302 : pub restrict_conn: bool,
303 : #[serde(skip_deserializing, default)]
304 : pub invalid: bool,
305 : }
306 :
307 : /// Common type representing both SQL statement params with or without value,
308 : /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
309 : /// options like `wal_level = logical`.
310 468 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
311 : pub struct GenericOption {
312 : pub name: String,
313 : pub value: Option<String>,
314 : pub vartype: String,
315 : }
316 :
317 : /// Optional collection of `GenericOption`'s. Type alias allows us to
318 : /// declare a `trait` on it.
319 : pub type GenericOptions = Option<Vec<GenericOption>>;
320 :
321 : /// Configured the local_proxy application with the relevant JWKS and roles it should
322 : /// use for authorizing connect requests using JWT.
323 0 : #[derive(Clone, Debug, Deserialize, Serialize)]
324 : pub struct LocalProxySpec {
325 : #[serde(default)]
326 : #[serde(skip_serializing_if = "Option::is_none")]
327 : pub jwks: Option<Vec<JwksSettings>>,
328 : }
329 :
330 0 : #[derive(Clone, Debug, Deserialize, Serialize)]
331 : pub struct JwksSettings {
332 : pub id: String,
333 : pub role_names: Vec<String>,
334 : pub jwks_url: String,
335 : pub provider_name: String,
336 : pub jwt_audience: Option<String>,
337 : }
338 :
339 : #[cfg(test)]
340 : mod tests {
341 : use std::fs::File;
342 :
343 : use super::*;
344 :
345 : #[test]
346 1 : fn allow_installing_remote_extensions() {
347 1 : let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
348 1 : "public_extensions": null,
349 1 : "custom_extensions": null,
350 1 : "library_index": {},
351 1 : "extension_data": {},
352 1 : }))
353 1 : .unwrap();
354 1 :
355 1 : rspec
356 1 : .get_ext("ext", false, "latest", "v17")
357 1 : .expect_err("Extension should not be found");
358 1 :
359 1 : let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
360 1 : "public_extensions": [],
361 1 : "custom_extensions": null,
362 1 : "library_index": {},
363 1 : "extension_data": {},
364 1 : }))
365 1 : .unwrap();
366 1 :
367 1 : rspec
368 1 : .get_ext("ext", false, "latest", "v17")
369 1 : .expect_err("Extension should not be found");
370 1 :
371 1 : let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
372 1 : "public_extensions": [],
373 1 : "custom_extensions": [],
374 1 : "library_index": {
375 1 : "ext": "ext"
376 1 : },
377 1 : "extension_data": {
378 1 : "ext": {
379 1 : "control_data": {
380 1 : "ext.control": ""
381 1 : },
382 1 : "archive_path": ""
383 1 : }
384 1 : },
385 1 : }))
386 1 : .unwrap();
387 1 :
388 1 : rspec
389 1 : .get_ext("ext", false, "latest", "v17")
390 1 : .expect_err("Extension should not be found");
391 1 :
392 1 : let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
393 1 : "public_extensions": [],
394 1 : "custom_extensions": ["ext"],
395 1 : "library_index": {
396 1 : "ext": "ext"
397 1 : },
398 1 : "extension_data": {
399 1 : "ext": {
400 1 : "control_data": {
401 1 : "ext.control": ""
402 1 : },
403 1 : "archive_path": ""
404 1 : }
405 1 : },
406 1 : }))
407 1 : .unwrap();
408 1 :
409 1 : rspec
410 1 : .get_ext("ext", false, "latest", "v17")
411 1 : .expect("Extension should be found");
412 1 :
413 1 : let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
414 1 : "public_extensions": ["ext"],
415 1 : "custom_extensions": [],
416 1 : "library_index": {
417 1 : "extlib": "ext",
418 1 : },
419 1 : "extension_data": {
420 1 : "ext": {
421 1 : "control_data": {
422 1 : "ext.control": ""
423 1 : },
424 1 : "archive_path": ""
425 1 : }
426 1 : },
427 1 : }))
428 1 : .unwrap();
429 1 :
430 1 : rspec
431 1 : .get_ext("ext", false, "latest", "v17")
432 1 : .expect("Extension should be found");
433 1 :
434 1 : // test library index for the case when library name
435 1 : // doesn't match the extension name
436 1 : rspec
437 1 : .get_ext("extlib", true, "latest", "v17")
438 1 : .expect("Library should be found");
439 1 : }
440 :
441 : #[test]
442 1 : fn parse_spec_file() {
443 1 : let file = File::open("tests/cluster_spec.json").unwrap();
444 1 : let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
445 1 :
446 1 : // Features list defaults to empty vector.
447 1 : assert!(spec.features.is_empty());
448 :
449 : // Reconfigure concurrency defaults to 1.
450 1 : assert_eq!(spec.reconfigure_concurrency, 1);
451 1 : }
452 :
453 : #[test]
454 1 : fn parse_unknown_fields() {
455 1 : // Forward compatibility test
456 1 : let file = File::open("tests/cluster_spec.json").unwrap();
457 1 : let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
458 1 : let ob = json.as_object_mut().unwrap();
459 1 : ob.insert("unknown_field_123123123".into(), "hello".into());
460 1 : let _spec: ComputeSpec = serde_json::from_value(json).unwrap();
461 1 : }
462 :
463 : #[test]
464 1 : fn parse_unknown_features() {
465 1 : // Test that unknown feature flags do not cause any errors.
466 1 : let file = File::open("tests/cluster_spec.json").unwrap();
467 1 : let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
468 1 : let ob = json.as_object_mut().unwrap();
469 1 :
470 1 : // Add unknown feature flags.
471 1 : let features = vec!["foo_bar_feature", "baz_feature"];
472 1 : ob.insert("features".into(), features.into());
473 1 :
474 1 : let spec: ComputeSpec = serde_json::from_value(json).unwrap();
475 1 :
476 1 : assert!(spec.features.len() == 2);
477 1 : assert!(spec.features.contains(&ComputeFeature::UnknownFeature));
478 1 : assert_eq!(spec.features, vec![ComputeFeature::UnknownFeature; 2]);
479 1 : }
480 :
481 : #[test]
482 1 : fn parse_known_features() {
483 1 : // Test that we can properly parse known feature flags.
484 1 : let file = File::open("tests/cluster_spec.json").unwrap();
485 1 : let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
486 1 : let ob = json.as_object_mut().unwrap();
487 1 :
488 1 : // Add known feature flags.
489 1 : let features = vec!["activity_monitor_experimental"];
490 1 : ob.insert("features".into(), features.into());
491 1 :
492 1 : let spec: ComputeSpec = serde_json::from_value(json).unwrap();
493 1 :
494 1 : assert_eq!(
495 1 : spec.features,
496 1 : vec![ComputeFeature::ActivityMonitorExperimental]
497 1 : );
498 1 : }
499 : }
|