Line data Source code
1 : //! The ComputeSpec contains all the information needed to start up
2 : //! the right version of PostgreSQL, and connect it to the storage nodes.
3 : //! It can be passed as part of the `config.json`, or the control plane can
4 : //! provide it by calling the compute_ctl's `/compute_ctl` endpoint, or
5 : //! compute_ctl can fetch it by calling the control plane's API.
6 : use std::collections::HashMap;
7 : use std::fmt::Display;
8 :
9 : use anyhow::anyhow;
10 : use indexmap::IndexMap;
11 : use regex::Regex;
12 : use remote_storage::RemotePath;
13 : use serde::{Deserialize, Serialize};
14 : use url::Url;
15 : use utils::id::{NodeId, TenantId, TimelineId};
16 : use utils::lsn::Lsn;
17 : use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize};
18 :
19 : use crate::responses::TlsConfig;
20 :
21 : /// String type alias representing Postgres identifier and
22 : /// intended to be used for DB / role names.
23 : pub type PgIdent = String;
24 :
25 : /// String type alias representing Postgres extension version
26 : pub type ExtVersion = String;
27 :
28 7 : fn default_reconfigure_concurrency() -> usize {
29 7 : 1
30 7 : }
31 :
32 : /// Cluster spec or configuration represented as an optional number of
33 : /// delta operations + final cluster state description.
34 0 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
35 : pub struct ComputeSpec {
36 : pub format_version: f32,
37 :
38 : // The control plane also includes a 'timestamp' field in the JSON document,
39 : // but we don't use it for anything. Serde will ignore missing fields when
40 : // deserializing it.
41 : pub operation_uuid: Option<String>,
42 :
43 : /// Compute features to enable. These feature flags are provided, when we
44 : /// know all the details about client's compute, so they cannot be used
45 : /// to change `Empty` compute behavior.
46 : #[serde(default)]
47 : pub features: Vec<ComputeFeature>,
48 :
49 : /// If compute_ctl was passed `--resize-swap-on-bind`, a value of `Some(_)` instructs
50 : /// compute_ctl to `/neonvm/bin/resize-swap` with the given size, when the spec is first
51 : /// received.
52 : ///
53 : /// Both this field and `--resize-swap-on-bind` are required, so that the control plane's
54 : /// spec generation doesn't need to be aware of the actual compute it's running on, while
55 : /// guaranteeing gradual rollout of swap. Otherwise, without `--resize-swap-on-bind`, we could
56 : /// end up trying to resize swap in VMs without it -- or end up *not* resizing swap, thus
57 : /// giving every VM much more swap than it should have (32GiB).
58 : ///
59 : /// Eventually we may remove `--resize-swap-on-bind` and exclusively use `swap_size_bytes` for
60 : /// enabling the swap resizing behavior once rollout is complete.
61 : ///
62 : /// See neondatabase/cloud#12047 for more.
63 : #[serde(default)]
64 : pub swap_size_bytes: Option<u64>,
65 :
66 : /// If compute_ctl was passed `--set-disk-quota-for-fs`, a value of `Some(_)` instructs
67 : /// compute_ctl to run `/neonvm/bin/set-disk-quota` with the given size and fs, when the
68 : /// spec is first received.
69 : ///
70 : /// Both this field and `--set-disk-quota-for-fs` are required, so that the control plane's
71 : /// spec generation doesn't need to be aware of the actual compute it's running on, while
72 : /// guaranteeing gradual rollout of disk quota.
73 : #[serde(default)]
74 : pub disk_quota_bytes: Option<u64>,
75 :
76 : /// Disables the vm-monitor behavior that resizes LFC on upscale/downscale, instead relying on
77 : /// the initial size of LFC.
78 : ///
79 : /// This is intended for use when the LFC size is being overridden from the default but
80 : /// autoscaling is still enabled, and we don't want the vm-monitor to interfere with the custom
81 : /// LFC sizing.
82 : #[serde(default)]
83 : pub disable_lfc_resizing: Option<bool>,
84 :
85 : /// Expected cluster state at the end of transition process.
86 : pub cluster: Cluster,
87 : pub delta_operations: Option<Vec<DeltaOp>>,
88 :
89 : /// An optional hint that can be passed to speed up startup time if we know
90 : /// that no pg catalog mutations (like role creation, database creation,
91 : /// extension creation) need to be done on the actual database to start.
92 : #[serde(default)] // Default false
93 : pub skip_pg_catalog_updates: bool,
94 :
95 : // Information needed to connect to the storage layer.
96 : //
97 : // `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
98 : //
99 : // Depending on `mode`, this can be a primary read-write node, a read-only
100 : // replica, or a read-only node pinned at an older LSN.
101 : // `safekeeper_connstrings` must be set for a primary.
102 : //
103 : // For backwards compatibility, the control plane may leave out all of
104 : // these, and instead set the "neon.tenant_id", "neon.timeline_id",
105 : // etc. GUCs in cluster.settings. TODO: Once the control plane has been
106 : // updated to fill these fields, we can make these non optional.
107 : pub tenant_id: Option<TenantId>,
108 : pub timeline_id: Option<TimelineId>,
109 :
110 : /// Pageserver information can be passed in three different ways:
111 : /// 1. Here in `pageserver_connection_info`
112 : /// 2. In the `pageserver_connstring` field.
113 : /// 3. in `cluster.settings`.
114 : ///
115 : /// The goal is to use method 1. everywhere. But for backwards-compatibility with old
116 : /// versions of the control plane, `compute_ctl` will check 2. and 3. if the
117 : /// `pageserver_connection_info` field is missing.
118 : ///
119 : /// If both `pageserver_connection_info` and `pageserver_connstring`+`shard_stripe_size` are
120 : /// given, they must contain the same information.
121 : pub pageserver_connection_info: Option<PageserverConnectionInfo>,
122 :
123 : pub pageserver_connstring: Option<String>,
124 :
125 : /// Stripe size for pageserver sharding, in pages. This is set together with the legacy
126 : /// `pageserver_connstring` field. When the modern `pageserver_connection_info` field is used,
127 : /// the stripe size is stored in `pageserver_connection_info.stripe_size` instead.
128 : pub shard_stripe_size: Option<ShardStripeSize>,
129 :
130 : // More neon ids that we expose to the compute_ctl
131 : // and to postgres as neon extension GUCs.
132 : pub project_id: Option<String>,
133 : pub branch_id: Option<String>,
134 : pub endpoint_id: Option<String>,
135 :
136 : /// Safekeeper membership config generation. It is put in
137 : /// neon.safekeepers GUC and serves two purposes:
138 : /// 1) Non zero value forces walproposer to use membership configurations.
139 : /// 2) If walproposer wants to update list of safekeepers to connect to
140 : /// taking them from some safekeeper mconf, it should check what value
141 : /// is newer by comparing the generation.
142 : ///
143 : /// Note: it could be SafekeeperGeneration, but this needs linking
144 : /// compute_ctl with postgres_ffi.
145 : #[serde(default)]
146 : pub safekeepers_generation: Option<u32>,
147 : #[serde(default)]
148 : pub safekeeper_connstrings: Vec<String>,
149 :
150 : #[serde(default)]
151 : pub mode: ComputeMode,
152 :
153 : /// If set, 'storage_auth_token' is used as the password to authenticate to
154 : /// the pageserver and safekeepers.
155 : pub storage_auth_token: Option<String>,
156 :
157 : // information about available remote extensions
158 : pub remote_extensions: Option<RemoteExtSpec>,
159 :
160 : pub pgbouncer_settings: Option<IndexMap<String, String>>,
161 :
162 : /// Local Proxy configuration used for JWT authentication
163 : #[serde(default)]
164 : pub local_proxy_config: Option<LocalProxySpec>,
165 :
166 : /// Number of concurrent connections during the parallel RunInEachDatabase
167 : /// phase of the apply config process.
168 : ///
169 : /// We need a higher concurrency during reconfiguration in case of many DBs,
170 : /// but instance is already running and used by client. We can easily get out of
171 : /// `max_connections` limit, and the current code won't handle that.
172 : ///
173 : /// Default is 1, but also allow control plane to override this value for specific
174 : /// projects. It's also recommended to bump `superuser_reserved_connections` +=
175 : /// `reconfigure_concurrency` for such projects to ensure that we always have
176 : /// enough spare connections for reconfiguration process to succeed.
177 : #[serde(default = "default_reconfigure_concurrency")]
178 : pub reconfigure_concurrency: usize,
179 :
180 : /// If set to true, the compute_ctl will drop all subscriptions before starting the
181 : /// compute. This is needed when we start an endpoint on a branch, so that child
182 : /// would not compete with parent branch subscriptions
183 : /// over the same replication content from publisher.
184 : #[serde(default)] // Default false
185 : pub drop_subscriptions_before_start: bool,
186 :
187 : /// Log level for compute audit logging
188 : #[serde(default)]
189 : pub audit_log_level: ComputeAudit,
190 :
191 : /// Hostname and the port of the otel collector. Leave empty to disable Postgres logs forwarding.
192 : /// Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:10514
193 : pub logs_export_host: Option<String>,
194 :
195 : /// Address of endpoint storage service
196 : pub endpoint_storage_addr: Option<String>,
197 : /// JWT for authorizing requests to endpoint storage service
198 : pub endpoint_storage_token: Option<String>,
199 :
200 : #[serde(default)]
201 : /// Download LFC state from endpoint storage and pass it to Postgres on compute startup
202 : pub autoprewarm: bool,
203 :
204 : #[serde(default)]
205 : /// Upload LFC state to endpoint storage periodically. Default value (None) means "don't upload"
206 : pub offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
207 :
208 : /// Suspend timeout in seconds.
209 : ///
210 : /// We use this value to derive other values, such as the installed extensions metric.
211 : pub suspend_timeout_seconds: i64,
212 :
213 : // Databricks specific options for compute instance.
214 : pub databricks_settings: Option<DatabricksSettings>,
215 : }
216 :
217 : /// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
218 0 : #[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
219 : #[serde(rename_all = "snake_case")]
220 : pub enum ComputeFeature {
221 : // XXX: Add more feature flags here.
222 : /// Enable the experimental activity monitor logic, which uses `pg_stat_database` to
223 : /// track short-lived connections as user activity.
224 : ActivityMonitorExperimental,
225 :
226 : /// Enable TLS functionality.
227 : TlsExperimental,
228 :
229 : /// This is a special feature flag that is used to represent unknown feature flags.
230 : /// Basically all unknown to enum flags are represented as this one. See unit test
231 : /// `parse_unknown_features()` for more details.
232 : #[serde(other)]
233 : UnknownFeature,
234 : }
235 :
236 0 : #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
237 : pub struct PageserverConnectionInfo {
238 : /// NB: 0 for unsharded tenants, 1 for sharded tenants with 1 shard, following storage
239 : pub shard_count: ShardCount,
240 :
241 : /// INVARIANT: null if shard_count is 0, otherwise non-null and immutable
242 : pub stripe_size: Option<ShardStripeSize>,
243 :
244 : pub shards: HashMap<ShardIndex, PageserverShardInfo>,
245 :
246 : /// If the compute supports both protocols, this indicates which one it should use. The compute
247 : /// may use other available protocols too, if it doesn't support the preferred one. The URL's
248 : /// for the protocol specified here must be present for all shards, i.e. do not mark a protocol
249 : /// as preferred if it cannot actually be used with all the pageservers.
250 : #[serde(default)]
251 : pub prefer_protocol: PageserverProtocol,
252 : }
253 :
254 : /// Extract PageserverConnectionInfo from a comma-separated list of libpq connection strings.
255 : ///
256 : /// This is used for backwards-compatibility, to parse the legacy
257 : /// [ComputeSpec::pageserver_connstring] field, or the 'neon.pageserver_connstring' GUC. Nowadays,
258 : /// the 'pageserver_connection_info' field should be used instead.
259 : impl PageserverConnectionInfo {
260 1 : pub fn from_connstr(
261 1 : connstr: &str,
262 1 : stripe_size: Option<ShardStripeSize>,
263 1 : ) -> Result<PageserverConnectionInfo, anyhow::Error> {
264 1 : let shard_infos: Vec<_> = connstr
265 1 : .split(',')
266 1 : .map(|connstr| PageserverShardInfo {
267 1 : pageservers: vec![PageserverShardConnectionInfo {
268 1 : id: None,
269 1 : libpq_url: Some(connstr.to_string()),
270 1 : grpc_url: None,
271 1 : }],
272 1 : })
273 1 : .collect();
274 :
275 1 : match shard_infos.len() {
276 0 : 0 => anyhow::bail!("empty connection string"),
277 : 1 => {
278 : // We assume that if there's only connection string, it means "unsharded",
279 : // rather than a sharded system with just a single shard. The latter is
280 : // possible in principle, but we never do it.
281 1 : let shard_count = ShardCount::unsharded();
282 1 : let only_shard = shard_infos.first().unwrap().clone();
283 1 : let shards = vec![(ShardIndex::unsharded(), only_shard)];
284 1 : Ok(PageserverConnectionInfo {
285 1 : shard_count,
286 1 : stripe_size: None,
287 1 : shards: shards.into_iter().collect(),
288 1 : prefer_protocol: PageserverProtocol::Libpq,
289 1 : })
290 : }
291 0 : n => {
292 0 : if stripe_size.is_none() {
293 0 : anyhow::bail!("{n} shards but no stripe_size");
294 0 : }
295 0 : let shard_count = ShardCount(n.try_into()?);
296 0 : let shards = shard_infos
297 0 : .into_iter()
298 0 : .enumerate()
299 0 : .map(|(idx, shard_info)| {
300 0 : (
301 0 : ShardIndex {
302 0 : shard_count,
303 0 : shard_number: ShardNumber(
304 0 : idx.try_into().expect("shard number fits in u8"),
305 0 : ),
306 0 : },
307 0 : shard_info,
308 0 : )
309 0 : })
310 0 : .collect();
311 0 : Ok(PageserverConnectionInfo {
312 0 : shard_count,
313 0 : stripe_size,
314 0 : shards,
315 0 : prefer_protocol: PageserverProtocol::Libpq,
316 0 : })
317 : }
318 : }
319 1 : }
320 :
321 : /// Convenience routine to get the connection string for a shard.
322 0 : pub fn shard_url(
323 0 : &self,
324 0 : shard_number: ShardNumber,
325 0 : protocol: PageserverProtocol,
326 0 : ) -> anyhow::Result<&str> {
327 0 : let shard_index = ShardIndex {
328 0 : shard_number,
329 0 : shard_count: self.shard_count,
330 0 : };
331 0 : let shard = self.shards.get(&shard_index).ok_or_else(|| {
332 0 : anyhow::anyhow!("shard connection info missing for shard {}", shard_index)
333 0 : })?;
334 :
335 : // Just use the first pageserver in the list. That's good enough for this
336 : // convenience routine; if you need more control, like round robin policy or
337 : // failover support, roll your own. (As of this writing, we never have more than
338 : // one pageserver per shard anyway, but that will change in the future.)
339 0 : let pageserver = shard
340 0 : .pageservers
341 0 : .first()
342 0 : .ok_or(anyhow::anyhow!("must have at least one pageserver"))?;
343 :
344 0 : let result = match protocol {
345 0 : PageserverProtocol::Grpc => pageserver
346 0 : .grpc_url
347 0 : .as_ref()
348 0 : .ok_or(anyhow::anyhow!("no grpc_url for shard {shard_index}"))?,
349 0 : PageserverProtocol::Libpq => pageserver
350 0 : .libpq_url
351 0 : .as_ref()
352 0 : .ok_or(anyhow::anyhow!("no libpq_url for shard {shard_index}"))?,
353 : };
354 0 : Ok(result)
355 0 : }
356 : }
357 :
358 0 : #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
359 : pub struct PageserverShardInfo {
360 : pub pageservers: Vec<PageserverShardConnectionInfo>,
361 : }
362 :
363 0 : #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
364 : pub struct PageserverShardConnectionInfo {
365 : pub id: Option<NodeId>,
366 : pub libpq_url: Option<String>,
367 : pub grpc_url: Option<String>,
368 : }
369 :
370 0 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
371 : pub struct RemoteExtSpec {
372 : pub public_extensions: Option<Vec<String>>,
373 : pub custom_extensions: Option<Vec<String>>,
374 : pub library_index: HashMap<String, String>,
375 : pub extension_data: HashMap<String, ExtensionData>,
376 : }
377 :
378 0 : #[derive(Clone, Debug, Serialize, Deserialize)]
379 : pub struct ExtensionData {
380 : pub control_data: HashMap<String, String>,
381 : pub archive_path: String,
382 : }
383 :
384 : impl RemoteExtSpec {
385 7 : pub fn get_ext(
386 7 : &self,
387 7 : ext_name: &str,
388 7 : is_library: bool,
389 7 : build_tag: &str,
390 7 : pg_major_version: &str,
391 7 : ) -> anyhow::Result<(String, RemotePath)> {
392 7 : let mut real_ext_name = ext_name;
393 7 : if is_library {
394 : // sometimes library names might have a suffix like
395 : // library.so or library.so.3. We strip this off
396 : // because library_index is based on the name without the file extension
397 1 : let strip_lib_suffix = Regex::new(r"\.so.*").unwrap();
398 1 : let lib_raw_name = strip_lib_suffix.replace(real_ext_name, "").to_string();
399 :
400 1 : real_ext_name = self
401 1 : .library_index
402 1 : .get(&lib_raw_name)
403 1 : .ok_or(anyhow::anyhow!("library {} is not found", lib_raw_name))?;
404 6 : }
405 :
406 : // Check if extension is present in public or custom.
407 : // If not, then it is not allowed to be used by this compute.
408 7 : if !self
409 7 : .public_extensions
410 7 : .as_ref()
411 7 : .is_some_and(|exts| exts.iter().any(|e| e == real_ext_name))
412 4 : && !self
413 4 : .custom_extensions
414 4 : .as_ref()
415 4 : .is_some_and(|exts| exts.iter().any(|e| e == real_ext_name))
416 : {
417 3 : return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
418 4 : }
419 :
420 4 : match self.extension_data.get(real_ext_name) {
421 4 : Some(_ext_data) => Ok((
422 4 : real_ext_name.to_string(),
423 4 : Self::build_remote_path(build_tag, pg_major_version, real_ext_name)?,
424 : )),
425 0 : None => Err(anyhow::anyhow!(
426 0 : "real_ext_name {} is not found",
427 0 : real_ext_name
428 0 : )),
429 : }
430 7 : }
431 :
432 : /// Get the architecture-specific portion of the remote extension path. We
433 : /// use the Go naming convention due to Kubernetes.
434 5 : fn get_arch() -> &'static str {
435 5 : match std::env::consts::ARCH {
436 5 : "x86_64" => "amd64",
437 0 : "aarch64" => "arm64",
438 0 : arch => arch,
439 : }
440 5 : }
441 :
442 : /// Build a [`RemotePath`] for an extension.
443 5 : fn build_remote_path(
444 5 : build_tag: &str,
445 5 : pg_major_version: &str,
446 5 : ext_name: &str,
447 5 : ) -> anyhow::Result<RemotePath> {
448 5 : let arch = Self::get_arch();
449 :
450 : // Construct the path to the extension archive
451 : // BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
452 : //
453 : // Keep it in sync with path generation in
454 : // https://github.com/neondatabase/build-custom-extensions/tree/main
455 5 : RemotePath::from_string(&format!(
456 5 : "{build_tag}/{arch}/{pg_major_version}/extensions/{ext_name}.tar.zst"
457 5 : ))
458 5 : }
459 : }
460 :
461 0 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
462 : pub enum ComputeMode {
463 : /// A read-write node
464 : #[default]
465 : Primary,
466 : /// A read-only node, pinned at a particular LSN
467 : Static(Lsn),
468 : /// A read-only node that follows the tip of the branch in hot standby mode
469 : ///
470 : /// Future versions may want to distinguish between replicas with hot standby
471 : /// feedback and other kinds of replication configurations.
472 : Replica,
473 : }
474 :
475 : impl ComputeMode {
476 : /// Convert the compute mode to a string that can be used to identify the type of compute,
477 : /// which means that if it's a static compute, the LSN will not be included.
478 0 : pub fn to_type_str(&self) -> &'static str {
479 0 : match self {
480 0 : ComputeMode::Primary => "primary",
481 0 : ComputeMode::Static(_) => "static",
482 0 : ComputeMode::Replica => "replica",
483 : }
484 0 : }
485 : }
486 :
487 : impl Display for ComputeMode {
488 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
489 0 : f.write_str(self.to_type_str())
490 0 : }
491 : }
492 :
493 : /// Log level for audit logging
494 0 : #[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
495 : pub enum ComputeAudit {
496 : #[default]
497 : Disabled,
498 : // Deprecated, use Base instead
499 : Log,
500 : // (pgaudit.log = 'ddl', pgaudit.log_parameter='off')
501 : // logged to the standard postgresql log stream
502 : Base,
503 : // Deprecated, use Full or Extended instead
504 : Hipaa,
505 : // (pgaudit.log = 'all, -misc', pgaudit.log_parameter='off')
506 : // logged to separate files collected by rsyslog
507 : // into dedicated log storage with strict access
508 : Extended,
509 : // (pgaudit.log='all', pgaudit.log_parameter='on'),
510 : // logged to separate files collected by rsyslog
511 : // into dedicated log storage with strict access.
512 : Full,
513 : }
514 :
515 0 : #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
516 : pub struct Cluster {
517 : pub cluster_id: Option<String>,
518 : pub name: Option<String>,
519 : pub state: Option<String>,
520 : pub roles: Vec<Role>,
521 : pub databases: Vec<Database>,
522 :
523 : /// Desired contents of 'postgresql.conf' file. (The 'compute_ctl'
524 : /// tool may add additional settings to the final file.)
525 : pub postgresql_conf: Option<String>,
526 :
527 : /// Additional settings that will be appended to the 'postgresql.conf' file.
528 : pub settings: GenericOptions,
529 : }
530 :
531 : /// Single cluster state changing operation that could not be represented as
532 : /// a static `Cluster` structure. For example:
533 : /// - DROP DATABASE
534 : /// - DROP ROLE
535 : /// - ALTER ROLE name RENAME TO new_name
536 : /// - ALTER DATABASE name RENAME TO new_name
537 0 : #[derive(Clone, Debug, Deserialize, Serialize)]
538 : pub struct DeltaOp {
539 : pub action: String,
540 : pub name: PgIdent,
541 : pub new_name: Option<PgIdent>,
542 : }
543 :
544 : /// Rust representation of Postgres role info with only those fields
545 : /// that matter for us.
546 0 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
547 : pub struct Role {
548 : pub name: PgIdent,
549 : pub encrypted_password: Option<String>,
550 : pub options: GenericOptions,
551 : }
552 :
553 : /// Rust representation of Postgres database info with only those fields
554 : /// that matter for us.
555 0 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
556 : pub struct Database {
557 : pub name: PgIdent,
558 : pub owner: PgIdent,
559 : pub options: GenericOptions,
560 : // These are derived flags, not present in the spec file.
561 : // They are never set by the control plane.
562 : #[serde(skip_deserializing, default)]
563 : pub restrict_conn: bool,
564 : #[serde(skip_deserializing, default)]
565 : pub invalid: bool,
566 : }
567 :
568 : /// Common type representing both SQL statement params with or without value,
569 : /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
570 : /// options like `wal_level = logical`.
571 0 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
572 : pub struct GenericOption {
573 : pub name: String,
574 : pub value: Option<String>,
575 : pub vartype: String,
576 : }
577 :
578 : /// Postgres compute TLS settings.
579 0 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
580 : pub struct PgComputeTlsSettings {
581 : // Absolute path to the certificate file for server-side TLS.
582 : pub cert_file: String,
583 : // Absolute path to the private key file for server-side TLS.
584 : pub key_file: String,
585 : // Absolute path to the certificate authority file for verifying client certificates.
586 : pub ca_file: String,
587 : }
588 :
589 : /// Databricks specific options for compute instance.
590 : /// This is used to store any other settings that needs to be propagate to Compute
591 : /// but should not be persisted to ComputeSpec in the database.
592 0 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
593 : pub struct DatabricksSettings {
594 : pub pg_compute_tls_settings: PgComputeTlsSettings,
595 : // Absolute file path to databricks_pg_hba.conf file.
596 : pub databricks_pg_hba: String,
597 : // Absolute file path to databricks_pg_ident.conf file.
598 : pub databricks_pg_ident: String,
599 : // Hostname portion of the Databricks workspace URL of the endpoint, or empty string if not known.
600 : // A valid hostname is required for the compute instance to support PAT logins.
601 : pub databricks_workspace_host: String,
602 : }
603 :
604 : /// Optional collection of `GenericOption`'s. Type alias allows us to
605 : /// declare a `trait` on it.
606 : pub type GenericOptions = Option<Vec<GenericOption>>;
607 :
608 : /// Configured the local_proxy application with the relevant JWKS and roles it should
609 : /// use for authorizing connect requests using JWT.
610 0 : #[derive(Clone, Debug, Deserialize, Serialize)]
611 : pub struct LocalProxySpec {
612 : #[serde(default)]
613 : #[serde(skip_serializing_if = "Option::is_none")]
614 : pub jwks: Option<Vec<JwksSettings>>,
615 : #[serde(default)]
616 : #[serde(skip_serializing_if = "Option::is_none")]
617 : pub tls: Option<TlsConfig>,
618 : }
619 :
620 0 : #[derive(Clone, Debug, Deserialize, Serialize)]
621 : pub struct JwksSettings {
622 : pub id: String,
623 : pub role_names: Vec<String>,
624 : pub jwks_url: String,
625 : pub provider_name: String,
626 : pub jwt_audience: Option<String>,
627 : }
628 :
629 : /// Protocol used to connect to a Pageserver.
630 0 : #[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
631 : pub enum PageserverProtocol {
632 : /// The original protocol based on libpq and COPY. Uses postgresql:// or postgres:// scheme.
633 : #[default]
634 : #[serde(rename = "libpq")]
635 : Libpq,
636 : /// A newer, gRPC-based protocol. Uses grpc:// scheme.
637 : #[serde(rename = "grpc")]
638 : Grpc,
639 : }
640 :
641 : impl PageserverProtocol {
642 : /// Parses the protocol from a connstring scheme. Defaults to Libpq if no scheme is given.
643 : /// Errors if the connstring is an invalid URL.
644 0 : pub fn from_connstring(connstring: &str) -> anyhow::Result<Self> {
645 0 : let scheme = match Url::parse(connstring) {
646 0 : Ok(url) => url.scheme().to_lowercase(),
647 0 : Err(url::ParseError::RelativeUrlWithoutBase) => return Ok(Self::default()),
648 0 : Err(err) => return Err(anyhow!("invalid connstring URL: {err}")),
649 : };
650 0 : match scheme.as_str() {
651 0 : "postgresql" | "postgres" => Ok(Self::Libpq),
652 0 : "grpc" => Ok(Self::Grpc),
653 0 : scheme => Err(anyhow!("invalid protocol scheme: {scheme}")),
654 : }
655 0 : }
656 :
657 : /// Returns the URL scheme for the protocol, for use in connstrings.
658 0 : pub fn scheme(&self) -> &'static str {
659 0 : match self {
660 0 : Self::Libpq => "postgresql",
661 0 : Self::Grpc => "grpc",
662 : }
663 0 : }
664 : }
665 :
666 : impl Display for PageserverProtocol {
667 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
668 0 : f.write_str(self.scheme())
669 0 : }
670 : }
671 :
672 : #[cfg(test)]
673 : mod tests {
674 : use std::fs::File;
675 :
676 : use super::*;
677 :
678 : #[test]
679 1 : fn allow_installing_remote_extensions() {
680 1 : let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
681 1 : "public_extensions": null,
682 1 : "custom_extensions": null,
683 1 : "library_index": {},
684 1 : "extension_data": {},
685 : }))
686 1 : .unwrap();
687 :
688 1 : rspec
689 1 : .get_ext("ext", false, "latest", "v17")
690 1 : .expect_err("Extension should not be found");
691 :
692 1 : let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
693 1 : "public_extensions": [],
694 1 : "custom_extensions": null,
695 1 : "library_index": {},
696 1 : "extension_data": {},
697 : }))
698 1 : .unwrap();
699 :
700 1 : rspec
701 1 : .get_ext("ext", false, "latest", "v17")
702 1 : .expect_err("Extension should not be found");
703 :
704 1 : let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
705 1 : "public_extensions": [],
706 1 : "custom_extensions": [],
707 1 : "library_index": {
708 1 : "ext": "ext"
709 : },
710 1 : "extension_data": {
711 1 : "ext": {
712 1 : "control_data": {
713 1 : "ext.control": ""
714 : },
715 1 : "archive_path": ""
716 : }
717 : },
718 : }))
719 1 : .unwrap();
720 :
721 1 : rspec
722 1 : .get_ext("ext", false, "latest", "v17")
723 1 : .expect_err("Extension should not be found");
724 :
725 1 : let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
726 1 : "public_extensions": [],
727 1 : "custom_extensions": ["ext"],
728 1 : "library_index": {
729 1 : "ext": "ext"
730 : },
731 1 : "extension_data": {
732 1 : "ext": {
733 1 : "control_data": {
734 1 : "ext.control": ""
735 : },
736 1 : "archive_path": ""
737 : }
738 : },
739 : }))
740 1 : .unwrap();
741 :
742 1 : rspec
743 1 : .get_ext("ext", false, "latest", "v17")
744 1 : .expect("Extension should be found");
745 :
746 1 : let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
747 1 : "public_extensions": ["ext"],
748 1 : "custom_extensions": [],
749 1 : "library_index": {
750 1 : "extlib": "ext",
751 : },
752 1 : "extension_data": {
753 1 : "ext": {
754 1 : "control_data": {
755 1 : "ext.control": ""
756 : },
757 1 : "archive_path": ""
758 : }
759 : },
760 : }))
761 1 : .unwrap();
762 :
763 1 : rspec
764 1 : .get_ext("ext", false, "latest", "v17")
765 1 : .expect("Extension should be found");
766 :
767 : // test library index for the case when library name
768 : // doesn't match the extension name
769 1 : rspec
770 1 : .get_ext("extlib", true, "latest", "v17")
771 1 : .expect("Library should be found");
772 1 : }
773 :
774 : #[test]
775 1 : fn remote_extension_path() {
776 1 : let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
777 1 : "public_extensions": ["ext"],
778 1 : "custom_extensions": [],
779 1 : "library_index": {
780 1 : "extlib": "ext",
781 : },
782 1 : "extension_data": {
783 1 : "ext": {
784 1 : "control_data": {
785 1 : "ext.control": ""
786 : },
787 1 : "archive_path": ""
788 : }
789 : },
790 : }))
791 1 : .unwrap();
792 :
793 1 : let (_ext_name, ext_path) = rspec
794 1 : .get_ext("ext", false, "latest", "v17")
795 1 : .expect("Extension should be found");
796 : // Starting with a forward slash would have consequences for the
797 : // Url::join() that occurs when downloading a remote extension.
798 1 : assert!(!ext_path.to_string().starts_with("/"));
799 1 : assert_eq!(
800 : ext_path,
801 1 : RemoteExtSpec::build_remote_path("latest", "v17", "ext").unwrap()
802 : );
803 1 : }
804 :
805 : #[test]
806 1 : fn parse_spec_file() {
807 1 : let file = File::open("tests/cluster_spec.json").unwrap();
808 1 : let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
809 :
810 : // Features list defaults to empty vector.
811 1 : assert!(spec.features.is_empty());
812 :
813 : // Reconfigure concurrency defaults to 1.
814 1 : assert_eq!(spec.reconfigure_concurrency, 1);
815 1 : }
816 :
817 : #[test]
818 1 : fn parse_unknown_fields() {
819 : // Forward compatibility test
820 1 : let file = File::open("tests/cluster_spec.json").unwrap();
821 1 : let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
822 1 : let ob = json.as_object_mut().unwrap();
823 1 : ob.insert("unknown_field_123123123".into(), "hello".into());
824 1 : let _spec: ComputeSpec = serde_json::from_value(json).unwrap();
825 1 : }
826 :
827 : #[test]
828 1 : fn parse_unknown_features() {
829 : // Test that unknown feature flags do not cause any errors.
830 1 : let file = File::open("tests/cluster_spec.json").unwrap();
831 1 : let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
832 1 : let ob = json.as_object_mut().unwrap();
833 :
834 : // Add unknown feature flags.
835 1 : let features = vec!["foo_bar_feature", "baz_feature"];
836 1 : ob.insert("features".into(), features.into());
837 :
838 1 : let spec: ComputeSpec = serde_json::from_value(json).unwrap();
839 :
840 1 : assert!(spec.features.len() == 2);
841 1 : assert!(spec.features.contains(&ComputeFeature::UnknownFeature));
842 1 : assert_eq!(spec.features, vec![ComputeFeature::UnknownFeature; 2]);
843 1 : }
844 :
845 : #[test]
846 1 : fn parse_known_features() {
847 : // Test that we can properly parse known feature flags.
848 1 : let file = File::open("tests/cluster_spec.json").unwrap();
849 1 : let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
850 1 : let ob = json.as_object_mut().unwrap();
851 :
852 : // Add known feature flags.
853 1 : let features = vec!["activity_monitor_experimental"];
854 1 : ob.insert("features".into(), features.into());
855 :
856 1 : let spec: ComputeSpec = serde_json::from_value(json).unwrap();
857 :
858 1 : assert_eq!(
859 : spec.features,
860 1 : vec![ComputeFeature::ActivityMonitorExperimental]
861 : );
862 1 : }
863 : }
|