LCOV - code coverage report
Current view: top level - libs/compute_api/src - spec.rs (source / functions) Coverage Total Hit
Test: 1d5975439f3c9882b18414799141ebf9a3922c58.info Lines: 64.3 % 297 191
Test Date: 2025-07-31 15:59:03 Functions: 6.6 % 244 16

            Line data    Source code
       1              : //! The ComputeSpec contains all the information needed to start up
       2              : //! the right version of PostgreSQL, and connect it to the storage nodes.
       3              : //! It can be passed as part of the `config.json`, or the control plane can
       4              : //! provide it by calling the compute_ctl's `/compute_ctl` endpoint, or
       5              : //! compute_ctl can fetch it by calling the control plane's API.
       6              : use std::collections::HashMap;
       7              : use std::fmt::Display;
       8              : 
       9              : use anyhow::anyhow;
      10              : use indexmap::IndexMap;
      11              : use regex::Regex;
      12              : use remote_storage::RemotePath;
      13              : use serde::{Deserialize, Serialize};
      14              : use url::Url;
      15              : use utils::id::{NodeId, TenantId, TimelineId};
      16              : use utils::lsn::Lsn;
      17              : use utils::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize};
      18              : 
      19              : use crate::responses::TlsConfig;
      20              : 
      21              : /// String type alias representing Postgres identifier and
      22              : /// intended to be used for DB / role names.
      23              : pub type PgIdent = String;
      24              : 
      25              : /// String type alias representing Postgres extension version
      26              : pub type ExtVersion = String;
      27              : 
      28            7 : fn default_reconfigure_concurrency() -> usize {
      29            7 :     1
      30            7 : }
      31              : 
      32              : /// Cluster spec or configuration represented as an optional number of
      33              : /// delta operations + final cluster state description.
      34            0 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
      35              : pub struct ComputeSpec {
      36              :     pub format_version: f32,
      37              : 
      38              :     // The control plane also includes a 'timestamp' field in the JSON document,
      39              :     // but we don't use it for anything. Serde will ignore missing fields when
      40              :     // deserializing it.
      41              :     pub operation_uuid: Option<String>,
      42              : 
      43              :     /// Compute features to enable. These feature flags are provided, when we
      44              :     /// know all the details about client's compute, so they cannot be used
      45              :     /// to change `Empty` compute behavior.
      46              :     #[serde(default)]
      47              :     pub features: Vec<ComputeFeature>,
      48              : 
      49              :     /// If compute_ctl was passed `--resize-swap-on-bind`, a value of `Some(_)` instructs
      50              :     /// compute_ctl to `/neonvm/bin/resize-swap` with the given size, when the spec is first
      51              :     /// received.
      52              :     ///
      53              :     /// Both this field and `--resize-swap-on-bind` are required, so that the control plane's
      54              :     /// spec generation doesn't need to be aware of the actual compute it's running on, while
      55              :     /// guaranteeing gradual rollout of swap. Otherwise, without `--resize-swap-on-bind`, we could
      56              :     /// end up trying to resize swap in VMs without it -- or end up *not* resizing swap, thus
      57              :     /// giving every VM much more swap than it should have (32GiB).
      58              :     ///
      59              :     /// Eventually we may remove `--resize-swap-on-bind` and exclusively use `swap_size_bytes` for
      60              :     /// enabling the swap resizing behavior once rollout is complete.
      61              :     ///
      62              :     /// See neondatabase/cloud#12047 for more.
      63              :     #[serde(default)]
      64              :     pub swap_size_bytes: Option<u64>,
      65              : 
      66              :     /// If compute_ctl was passed `--set-disk-quota-for-fs`, a value of `Some(_)` instructs
      67              :     /// compute_ctl to run `/neonvm/bin/set-disk-quota` with the given size and fs, when the
      68              :     /// spec is first received.
      69              :     ///
      70              :     /// Both this field and `--set-disk-quota-for-fs` are required, so that the control plane's
      71              :     /// spec generation doesn't need to be aware of the actual compute it's running on, while
      72              :     /// guaranteeing gradual rollout of disk quota.
      73              :     #[serde(default)]
      74              :     pub disk_quota_bytes: Option<u64>,
      75              : 
      76              :     /// Disables the vm-monitor behavior that resizes LFC on upscale/downscale, instead relying on
      77              :     /// the initial size of LFC.
      78              :     ///
      79              :     /// This is intended for use when the LFC size is being overridden from the default but
      80              :     /// autoscaling is still enabled, and we don't want the vm-monitor to interfere with the custom
      81              :     /// LFC sizing.
      82              :     #[serde(default)]
      83              :     pub disable_lfc_resizing: Option<bool>,
      84              : 
      85              :     /// Expected cluster state at the end of transition process.
      86              :     pub cluster: Cluster,
      87              :     pub delta_operations: Option<Vec<DeltaOp>>,
      88              : 
      89              :     /// An optional hint that can be passed to speed up startup time if we know
      90              :     /// that no pg catalog mutations (like role creation, database creation,
      91              :     /// extension creation) need to be done on the actual database to start.
      92              :     #[serde(default)] // Default false
      93              :     pub skip_pg_catalog_updates: bool,
      94              : 
      95              :     // Information needed to connect to the storage layer.
      96              :     //
      97              :     // `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
      98              :     //
      99              :     // Depending on `mode`, this can be a primary read-write node, a read-only
     100              :     // replica, or a read-only node pinned at an older LSN.
     101              :     // `safekeeper_connstrings` must be set for a primary.
     102              :     //
     103              :     // For backwards compatibility, the control plane may leave out all of
     104              :     // these, and instead set the "neon.tenant_id", "neon.timeline_id",
     105              :     // etc. GUCs in cluster.settings. TODO: Once the control plane has been
     106              :     // updated to fill these fields, we can make these non optional.
     107              :     pub tenant_id: Option<TenantId>,
     108              :     pub timeline_id: Option<TimelineId>,
     109              : 
     110              :     /// Pageserver information can be passed in three different ways:
     111              :     /// 1. Here in `pageserver_connection_info`
     112              :     /// 2. In the `pageserver_connstring` field.
     113              :     /// 3. in `cluster.settings`.
     114              :     ///
     115              :     /// The goal is to use method 1. everywhere. But for backwards-compatibility with old
     116              :     /// versions of the control plane, `compute_ctl` will check 2. and 3. if the
     117              :     /// `pageserver_connection_info` field is missing.
     118              :     ///
     119              :     /// If both `pageserver_connection_info` and `pageserver_connstring`+`shard_stripe_size` are
     120              :     /// given, they must contain the same information.
     121              :     pub pageserver_connection_info: Option<PageserverConnectionInfo>,
     122              : 
     123              :     pub pageserver_connstring: Option<String>,
     124              : 
     125              :     /// Stripe size for pageserver sharding, in pages. This is set together with the legacy
     126              :     /// `pageserver_connstring` field. When the modern `pageserver_connection_info` field is used,
     127              :     /// the stripe size is stored in `pageserver_connection_info.stripe_size` instead.
     128              :     pub shard_stripe_size: Option<ShardStripeSize>,
     129              : 
     130              :     // More neon ids that we expose to the compute_ctl
     131              :     // and to postgres as neon extension GUCs.
     132              :     pub project_id: Option<String>,
     133              :     pub branch_id: Option<String>,
     134              :     pub endpoint_id: Option<String>,
     135              : 
     136              :     /// Safekeeper membership config generation. It is put in
     137              :     /// neon.safekeepers GUC and serves two purposes:
     138              :     /// 1) Non zero value forces walproposer to use membership configurations.
     139              :     /// 2) If walproposer wants to update list of safekeepers to connect to
     140              :     ///    taking them from some safekeeper mconf, it should check what value
     141              :     ///    is newer by comparing the generation.
     142              :     ///
     143              :     /// Note: it could be SafekeeperGeneration, but this needs linking
     144              :     /// compute_ctl with postgres_ffi.
     145              :     #[serde(default)]
     146              :     pub safekeepers_generation: Option<u32>,
     147              :     #[serde(default)]
     148              :     pub safekeeper_connstrings: Vec<String>,
     149              : 
     150              :     #[serde(default)]
     151              :     pub mode: ComputeMode,
     152              : 
     153              :     /// If set, 'storage_auth_token' is used as the password to authenticate to
     154              :     /// the pageserver and safekeepers.
     155              :     pub storage_auth_token: Option<String>,
     156              : 
     157              :     // information about available remote extensions
     158              :     pub remote_extensions: Option<RemoteExtSpec>,
     159              : 
     160              :     pub pgbouncer_settings: Option<IndexMap<String, String>>,
     161              : 
     162              :     /// Local Proxy configuration used for JWT authentication
     163              :     #[serde(default)]
     164              :     pub local_proxy_config: Option<LocalProxySpec>,
     165              : 
     166              :     /// Number of concurrent connections during the parallel RunInEachDatabase
     167              :     /// phase of the apply config process.
     168              :     ///
     169              :     /// We need a higher concurrency during reconfiguration in case of many DBs,
     170              :     /// but instance is already running and used by client. We can easily get out of
     171              :     /// `max_connections` limit, and the current code won't handle that.
     172              :     ///
     173              :     /// Default is 1, but also allow control plane to override this value for specific
     174              :     /// projects. It's also recommended to bump `superuser_reserved_connections` +=
     175              :     /// `reconfigure_concurrency` for such projects to ensure that we always have
     176              :     /// enough spare connections for reconfiguration process to succeed.
     177              :     #[serde(default = "default_reconfigure_concurrency")]
     178              :     pub reconfigure_concurrency: usize,
     179              : 
     180              :     /// If set to true, the compute_ctl will drop all subscriptions before starting the
     181              :     /// compute. This is needed when we start an endpoint on a branch, so that child
     182              :     /// would not compete with parent branch subscriptions
     183              :     /// over the same replication content from publisher.
     184              :     #[serde(default)] // Default false
     185              :     pub drop_subscriptions_before_start: bool,
     186              : 
     187              :     /// Log level for compute audit logging
     188              :     #[serde(default)]
     189              :     pub audit_log_level: ComputeAudit,
     190              : 
     191              :     /// Hostname and the port of the otel collector. Leave empty to disable Postgres logs forwarding.
     192              :     /// Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:10514
     193              :     pub logs_export_host: Option<String>,
     194              : 
     195              :     /// Address of endpoint storage service
     196              :     pub endpoint_storage_addr: Option<String>,
     197              :     /// JWT for authorizing requests to endpoint storage service
     198              :     pub endpoint_storage_token: Option<String>,
     199              : 
     200              :     #[serde(default)]
     201              :     /// Download LFC state from endpoint storage and pass it to Postgres on compute startup
     202              :     pub autoprewarm: bool,
     203              : 
     204              :     #[serde(default)]
     205              :     /// Upload LFC state to endpoint storage periodically. Default value (None) means "don't upload"
     206              :     pub offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
     207              : 
     208              :     /// Suspend timeout in seconds.
     209              :     ///
     210              :     /// We use this value to derive other values, such as the installed extensions metric.
     211              :     pub suspend_timeout_seconds: i64,
     212              : 
     213              :     // Databricks specific options for compute instance.
     214              :     pub databricks_settings: Option<DatabricksSettings>,
     215              : }
     216              : 
     217              : /// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
     218            0 : #[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
     219              : #[serde(rename_all = "snake_case")]
     220              : pub enum ComputeFeature {
     221              :     // XXX: Add more feature flags here.
     222              :     /// Enable the experimental activity monitor logic, which uses `pg_stat_database` to
     223              :     /// track short-lived connections as user activity.
     224              :     ActivityMonitorExperimental,
     225              : 
     226              :     /// Enable TLS functionality.
     227              :     TlsExperimental,
     228              : 
     229              :     /// This is a special feature flag that is used to represent unknown feature flags.
     230              :     /// Basically all unknown to enum flags are represented as this one. See unit test
     231              :     /// `parse_unknown_features()` for more details.
     232              :     #[serde(other)]
     233              :     UnknownFeature,
     234              : }
     235              : 
     236            0 : #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
     237              : pub struct PageserverConnectionInfo {
     238              :     /// NB: 0 for unsharded tenants, 1 for sharded tenants with 1 shard, following storage
     239              :     pub shard_count: ShardCount,
     240              : 
     241              :     /// INVARIANT: null if shard_count is 0, otherwise non-null and immutable
     242              :     pub stripe_size: Option<ShardStripeSize>,
     243              : 
     244              :     pub shards: HashMap<ShardIndex, PageserverShardInfo>,
     245              : 
     246              :     /// If the compute supports both protocols, this indicates which one it should use.  The compute
     247              :     /// may use other available protocols too, if it doesn't support the preferred one. The URL's
     248              :     /// for the protocol specified here must be present for all shards, i.e. do not mark a protocol
     249              :     /// as preferred if it cannot actually be used with all the pageservers.
     250              :     #[serde(default)]
     251              :     pub prefer_protocol: PageserverProtocol,
     252              : }
     253              : 
     254              : /// Extract PageserverConnectionInfo from a comma-separated list of libpq connection strings.
     255              : ///
     256              : /// This is used for backwards-compatibility, to parse the legacy
     257              : /// [ComputeSpec::pageserver_connstring] field, or the 'neon.pageserver_connstring' GUC. Nowadays,
     258              : /// the 'pageserver_connection_info' field should be used instead.
     259              : impl PageserverConnectionInfo {
     260            1 :     pub fn from_connstr(
     261            1 :         connstr: &str,
     262            1 :         stripe_size: Option<ShardStripeSize>,
     263            1 :     ) -> Result<PageserverConnectionInfo, anyhow::Error> {
     264            1 :         let shard_infos: Vec<_> = connstr
     265            1 :             .split(',')
     266            1 :             .map(|connstr| PageserverShardInfo {
     267            1 :                 pageservers: vec![PageserverShardConnectionInfo {
     268            1 :                     id: None,
     269            1 :                     libpq_url: Some(connstr.to_string()),
     270            1 :                     grpc_url: None,
     271            1 :                 }],
     272            1 :             })
     273            1 :             .collect();
     274              : 
     275            1 :         match shard_infos.len() {
     276            0 :             0 => anyhow::bail!("empty connection string"),
     277              :             1 => {
     278              :                 // We assume that if there's only connection string, it means "unsharded",
     279              :                 // rather than a sharded system with just a single shard. The latter is
     280              :                 // possible in principle, but we never do it.
     281            1 :                 let shard_count = ShardCount::unsharded();
     282            1 :                 let only_shard = shard_infos.first().unwrap().clone();
     283            1 :                 let shards = vec![(ShardIndex::unsharded(), only_shard)];
     284            1 :                 Ok(PageserverConnectionInfo {
     285            1 :                     shard_count,
     286            1 :                     stripe_size: None,
     287            1 :                     shards: shards.into_iter().collect(),
     288            1 :                     prefer_protocol: PageserverProtocol::Libpq,
     289            1 :                 })
     290              :             }
     291            0 :             n => {
     292            0 :                 if stripe_size.is_none() {
     293            0 :                     anyhow::bail!("{n} shards but no stripe_size");
     294            0 :                 }
     295            0 :                 let shard_count = ShardCount(n.try_into()?);
     296            0 :                 let shards = shard_infos
     297            0 :                     .into_iter()
     298            0 :                     .enumerate()
     299            0 :                     .map(|(idx, shard_info)| {
     300            0 :                         (
     301            0 :                             ShardIndex {
     302            0 :                                 shard_count,
     303            0 :                                 shard_number: ShardNumber(
     304            0 :                                     idx.try_into().expect("shard number fits in u8"),
     305            0 :                                 ),
     306            0 :                             },
     307            0 :                             shard_info,
     308            0 :                         )
     309            0 :                     })
     310            0 :                     .collect();
     311            0 :                 Ok(PageserverConnectionInfo {
     312            0 :                     shard_count,
     313            0 :                     stripe_size,
     314            0 :                     shards,
     315            0 :                     prefer_protocol: PageserverProtocol::Libpq,
     316            0 :                 })
     317              :             }
     318              :         }
     319            1 :     }
     320              : 
     321              :     /// Convenience routine to get the connection string for a shard.
     322            0 :     pub fn shard_url(
     323            0 :         &self,
     324            0 :         shard_number: ShardNumber,
     325            0 :         protocol: PageserverProtocol,
     326            0 :     ) -> anyhow::Result<&str> {
     327            0 :         let shard_index = ShardIndex {
     328            0 :             shard_number,
     329            0 :             shard_count: self.shard_count,
     330            0 :         };
     331            0 :         let shard = self.shards.get(&shard_index).ok_or_else(|| {
     332            0 :             anyhow::anyhow!("shard connection info missing for shard {}", shard_index)
     333            0 :         })?;
     334              : 
     335              :         // Just use the first pageserver in the list. That's good enough for this
     336              :         // convenience routine; if you need more control, like round robin policy or
     337              :         // failover support, roll your own. (As of this writing, we never have more than
     338              :         // one pageserver per shard anyway, but that will change in the future.)
     339            0 :         let pageserver = shard
     340            0 :             .pageservers
     341            0 :             .first()
     342            0 :             .ok_or(anyhow::anyhow!("must have at least one pageserver"))?;
     343              : 
     344            0 :         let result = match protocol {
     345            0 :             PageserverProtocol::Grpc => pageserver
     346            0 :                 .grpc_url
     347            0 :                 .as_ref()
     348            0 :                 .ok_or(anyhow::anyhow!("no grpc_url for shard {shard_index}"))?,
     349            0 :             PageserverProtocol::Libpq => pageserver
     350            0 :                 .libpq_url
     351            0 :                 .as_ref()
     352            0 :                 .ok_or(anyhow::anyhow!("no libpq_url for shard {shard_index}"))?,
     353              :         };
     354            0 :         Ok(result)
     355            0 :     }
     356              : }
     357              : 
     358            0 : #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
     359              : pub struct PageserverShardInfo {
     360              :     pub pageservers: Vec<PageserverShardConnectionInfo>,
     361              : }
     362              : 
     363            0 : #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
     364              : pub struct PageserverShardConnectionInfo {
     365              :     pub id: Option<NodeId>,
     366              :     pub libpq_url: Option<String>,
     367              :     pub grpc_url: Option<String>,
     368              : }
     369              : 
     370            0 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
     371              : pub struct RemoteExtSpec {
     372              :     pub public_extensions: Option<Vec<String>>,
     373              :     pub custom_extensions: Option<Vec<String>>,
     374              :     pub library_index: HashMap<String, String>,
     375              :     pub extension_data: HashMap<String, ExtensionData>,
     376              : }
     377              : 
     378            0 : #[derive(Clone, Debug, Serialize, Deserialize)]
     379              : pub struct ExtensionData {
     380              :     pub control_data: HashMap<String, String>,
     381              :     pub archive_path: String,
     382              : }
     383              : 
     384              : impl RemoteExtSpec {
     385            7 :     pub fn get_ext(
     386            7 :         &self,
     387            7 :         ext_name: &str,
     388            7 :         is_library: bool,
     389            7 :         build_tag: &str,
     390            7 :         pg_major_version: &str,
     391            7 :     ) -> anyhow::Result<(String, RemotePath)> {
     392            7 :         let mut real_ext_name = ext_name;
     393            7 :         if is_library {
     394              :             // sometimes library names might have a suffix like
     395              :             // library.so or library.so.3. We strip this off
     396              :             // because library_index is based on the name without the file extension
     397            1 :             let strip_lib_suffix = Regex::new(r"\.so.*").unwrap();
     398            1 :             let lib_raw_name = strip_lib_suffix.replace(real_ext_name, "").to_string();
     399              : 
     400            1 :             real_ext_name = self
     401            1 :                 .library_index
     402            1 :                 .get(&lib_raw_name)
     403            1 :                 .ok_or(anyhow::anyhow!("library {} is not found", lib_raw_name))?;
     404            6 :         }
     405              : 
     406              :         // Check if extension is present in public or custom.
     407              :         // If not, then it is not allowed to be used by this compute.
     408            7 :         if !self
     409            7 :             .public_extensions
     410            7 :             .as_ref()
     411            7 :             .is_some_and(|exts| exts.iter().any(|e| e == real_ext_name))
     412            4 :             && !self
     413            4 :                 .custom_extensions
     414            4 :                 .as_ref()
     415            4 :                 .is_some_and(|exts| exts.iter().any(|e| e == real_ext_name))
     416              :         {
     417            3 :             return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
     418            4 :         }
     419              : 
     420            4 :         match self.extension_data.get(real_ext_name) {
     421            4 :             Some(_ext_data) => Ok((
     422            4 :                 real_ext_name.to_string(),
     423            4 :                 Self::build_remote_path(build_tag, pg_major_version, real_ext_name)?,
     424              :             )),
     425            0 :             None => Err(anyhow::anyhow!(
     426            0 :                 "real_ext_name {} is not found",
     427            0 :                 real_ext_name
     428            0 :             )),
     429              :         }
     430            7 :     }
     431              : 
     432              :     /// Get the architecture-specific portion of the remote extension path. We
     433              :     /// use the Go naming convention due to Kubernetes.
     434            5 :     fn get_arch() -> &'static str {
     435            5 :         match std::env::consts::ARCH {
     436            5 :             "x86_64" => "amd64",
     437            0 :             "aarch64" => "arm64",
     438            0 :             arch => arch,
     439              :         }
     440            5 :     }
     441              : 
     442              :     /// Build a [`RemotePath`] for an extension.
     443            5 :     fn build_remote_path(
     444            5 :         build_tag: &str,
     445            5 :         pg_major_version: &str,
     446            5 :         ext_name: &str,
     447            5 :     ) -> anyhow::Result<RemotePath> {
     448            5 :         let arch = Self::get_arch();
     449              : 
     450              :         // Construct the path to the extension archive
     451              :         // BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
     452              :         //
     453              :         // Keep it in sync with path generation in
     454              :         // https://github.com/neondatabase/build-custom-extensions/tree/main
     455            5 :         RemotePath::from_string(&format!(
     456            5 :             "{build_tag}/{arch}/{pg_major_version}/extensions/{ext_name}.tar.zst"
     457            5 :         ))
     458            5 :     }
     459              : }
     460              : 
     461            0 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
     462              : pub enum ComputeMode {
     463              :     /// A read-write node
     464              :     #[default]
     465              :     Primary,
     466              :     /// A read-only node, pinned at a particular LSN
     467              :     Static(Lsn),
     468              :     /// A read-only node that follows the tip of the branch in hot standby mode
     469              :     ///
     470              :     /// Future versions may want to distinguish between replicas with hot standby
     471              :     /// feedback and other kinds of replication configurations.
     472              :     Replica,
     473              : }
     474              : 
     475              : impl ComputeMode {
     476              :     /// Convert the compute mode to a string that can be used to identify the type of compute,
     477              :     /// which means that if it's a static compute, the LSN will not be included.
     478            0 :     pub fn to_type_str(&self) -> &'static str {
     479            0 :         match self {
     480            0 :             ComputeMode::Primary => "primary",
     481            0 :             ComputeMode::Static(_) => "static",
     482            0 :             ComputeMode::Replica => "replica",
     483              :         }
     484            0 :     }
     485              : }
     486              : 
     487              : impl Display for ComputeMode {
     488            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     489            0 :         f.write_str(self.to_type_str())
     490            0 :     }
     491              : }
     492              : 
     493              : /// Log level for audit logging
     494            0 : #[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
     495              : pub enum ComputeAudit {
     496              :     #[default]
     497              :     Disabled,
     498              :     // Deprecated, use Base instead
     499              :     Log,
     500              :     // (pgaudit.log = 'ddl', pgaudit.log_parameter='off')
     501              :     // logged to the standard postgresql log stream
     502              :     Base,
     503              :     // Deprecated, use Full or Extended instead
     504              :     Hipaa,
     505              :     // (pgaudit.log = 'all, -misc', pgaudit.log_parameter='off')
     506              :     // logged to separate files collected by rsyslog
     507              :     // into dedicated log storage with strict access
     508              :     Extended,
     509              :     // (pgaudit.log='all', pgaudit.log_parameter='on'),
     510              :     // logged to separate files collected by rsyslog
     511              :     // into dedicated log storage with strict access.
     512              :     Full,
     513              : }
     514              : 
     515            0 : #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
     516              : pub struct Cluster {
     517              :     pub cluster_id: Option<String>,
     518              :     pub name: Option<String>,
     519              :     pub state: Option<String>,
     520              :     pub roles: Vec<Role>,
     521              :     pub databases: Vec<Database>,
     522              : 
     523              :     /// Desired contents of 'postgresql.conf' file. (The 'compute_ctl'
     524              :     /// tool may add additional settings to the final file.)
     525              :     pub postgresql_conf: Option<String>,
     526              : 
     527              :     /// Additional settings that will be appended to the 'postgresql.conf' file.
     528              :     pub settings: GenericOptions,
     529              : }
     530              : 
     531              : /// Single cluster state changing operation that could not be represented as
     532              : /// a static `Cluster` structure. For example:
     533              : /// - DROP DATABASE
     534              : /// - DROP ROLE
     535              : /// - ALTER ROLE name RENAME TO new_name
     536              : /// - ALTER DATABASE name RENAME TO new_name
     537            0 : #[derive(Clone, Debug, Deserialize, Serialize)]
     538              : pub struct DeltaOp {
     539              :     pub action: String,
     540              :     pub name: PgIdent,
     541              :     pub new_name: Option<PgIdent>,
     542              : }
     543              : 
     544              : /// Rust representation of Postgres role info with only those fields
     545              : /// that matter for us.
     546            0 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
     547              : pub struct Role {
     548              :     pub name: PgIdent,
     549              :     pub encrypted_password: Option<String>,
     550              :     pub options: GenericOptions,
     551              : }
     552              : 
     553              : /// Rust representation of Postgres database info with only those fields
     554              : /// that matter for us.
     555            0 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
     556              : pub struct Database {
     557              :     pub name: PgIdent,
     558              :     pub owner: PgIdent,
     559              :     pub options: GenericOptions,
     560              :     // These are derived flags, not present in the spec file.
     561              :     // They are never set by the control plane.
     562              :     #[serde(skip_deserializing, default)]
     563              :     pub restrict_conn: bool,
     564              :     #[serde(skip_deserializing, default)]
     565              :     pub invalid: bool,
     566              : }
     567              : 
     568              : /// Common type representing both SQL statement params with or without value,
     569              : /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
     570              : /// options like `wal_level = logical`.
     571            0 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
     572              : pub struct GenericOption {
     573              :     pub name: String,
     574              :     pub value: Option<String>,
     575              :     pub vartype: String,
     576              : }
     577              : 
     578              : /// Postgres compute TLS settings.
     579            0 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
     580              : pub struct PgComputeTlsSettings {
     581              :     // Absolute path to the certificate file for server-side TLS.
     582              :     pub cert_file: String,
     583              :     // Absolute path to the private key file for server-side TLS.
     584              :     pub key_file: String,
     585              :     // Absolute path to the certificate authority file for verifying client certificates.
     586              :     pub ca_file: String,
     587              : }
     588              : 
     589              : /// Databricks specific options for compute instance.
     590              : /// This is used to store any other settings that needs to be propagate to Compute
     591              : /// but should not be persisted to ComputeSpec in the database.
     592            0 : #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
     593              : pub struct DatabricksSettings {
     594              :     pub pg_compute_tls_settings: PgComputeTlsSettings,
     595              :     // Absolute file path to databricks_pg_hba.conf file.
     596              :     pub databricks_pg_hba: String,
     597              :     // Absolute file path to databricks_pg_ident.conf file.
     598              :     pub databricks_pg_ident: String,
     599              :     // Hostname portion of the Databricks workspace URL of the endpoint, or empty string if not known.
     600              :     // A valid hostname is required for the compute instance to support PAT logins.
     601              :     pub databricks_workspace_host: String,
     602              : }
     603              : 
     604              : /// Optional collection of `GenericOption`'s. Type alias allows us to
     605              : /// declare a `trait` on it.
     606              : pub type GenericOptions = Option<Vec<GenericOption>>;
     607              : 
     608              : /// Configured the local_proxy application with the relevant JWKS and roles it should
     609              : /// use for authorizing connect requests using JWT.
     610            0 : #[derive(Clone, Debug, Deserialize, Serialize)]
     611              : pub struct LocalProxySpec {
     612              :     #[serde(default)]
     613              :     #[serde(skip_serializing_if = "Option::is_none")]
     614              :     pub jwks: Option<Vec<JwksSettings>>,
     615              :     #[serde(default)]
     616              :     #[serde(skip_serializing_if = "Option::is_none")]
     617              :     pub tls: Option<TlsConfig>,
     618              : }
     619              : 
     620            0 : #[derive(Clone, Debug, Deserialize, Serialize)]
     621              : pub struct JwksSettings {
     622              :     pub id: String,
     623              :     pub role_names: Vec<String>,
     624              :     pub jwks_url: String,
     625              :     pub provider_name: String,
     626              :     pub jwt_audience: Option<String>,
     627              : }
     628              : 
     629              : /// Protocol used to connect to a Pageserver.
     630            0 : #[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
     631              : pub enum PageserverProtocol {
     632              :     /// The original protocol based on libpq and COPY. Uses postgresql:// or postgres:// scheme.
     633              :     #[default]
     634              :     #[serde(rename = "libpq")]
     635              :     Libpq,
     636              :     /// A newer, gRPC-based protocol. Uses grpc:// scheme.
     637              :     #[serde(rename = "grpc")]
     638              :     Grpc,
     639              : }
     640              : 
     641              : impl PageserverProtocol {
     642              :     /// Parses the protocol from a connstring scheme. Defaults to Libpq if no scheme is given.
     643              :     /// Errors if the connstring is an invalid URL.
     644            0 :     pub fn from_connstring(connstring: &str) -> anyhow::Result<Self> {
     645            0 :         let scheme = match Url::parse(connstring) {
     646            0 :             Ok(url) => url.scheme().to_lowercase(),
     647            0 :             Err(url::ParseError::RelativeUrlWithoutBase) => return Ok(Self::default()),
     648            0 :             Err(err) => return Err(anyhow!("invalid connstring URL: {err}")),
     649              :         };
     650            0 :         match scheme.as_str() {
     651            0 :             "postgresql" | "postgres" => Ok(Self::Libpq),
     652            0 :             "grpc" => Ok(Self::Grpc),
     653            0 :             scheme => Err(anyhow!("invalid protocol scheme: {scheme}")),
     654              :         }
     655            0 :     }
     656              : 
     657              :     /// Returns the URL scheme for the protocol, for use in connstrings.
     658            0 :     pub fn scheme(&self) -> &'static str {
     659            0 :         match self {
     660            0 :             Self::Libpq => "postgresql",
     661            0 :             Self::Grpc => "grpc",
     662              :         }
     663            0 :     }
     664              : }
     665              : 
     666              : impl Display for PageserverProtocol {
     667            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     668            0 :         f.write_str(self.scheme())
     669            0 :     }
     670              : }
     671              : 
     672              : #[cfg(test)]
     673              : mod tests {
     674              :     use std::fs::File;
     675              : 
     676              :     use super::*;
     677              : 
     678              :     #[test]
     679            1 :     fn allow_installing_remote_extensions() {
     680            1 :         let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
     681            1 :             "public_extensions": null,
     682            1 :             "custom_extensions": null,
     683            1 :             "library_index": {},
     684            1 :             "extension_data": {},
     685              :         }))
     686            1 :         .unwrap();
     687              : 
     688            1 :         rspec
     689            1 :             .get_ext("ext", false, "latest", "v17")
     690            1 :             .expect_err("Extension should not be found");
     691              : 
     692            1 :         let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
     693            1 :             "public_extensions": [],
     694            1 :             "custom_extensions": null,
     695            1 :             "library_index": {},
     696            1 :             "extension_data": {},
     697              :         }))
     698            1 :         .unwrap();
     699              : 
     700            1 :         rspec
     701            1 :             .get_ext("ext", false, "latest", "v17")
     702            1 :             .expect_err("Extension should not be found");
     703              : 
     704            1 :         let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
     705            1 :             "public_extensions": [],
     706            1 :             "custom_extensions": [],
     707            1 :             "library_index": {
     708            1 :                 "ext": "ext"
     709              :             },
     710            1 :             "extension_data": {
     711            1 :                 "ext": {
     712            1 :                     "control_data": {
     713            1 :                         "ext.control": ""
     714              :                     },
     715            1 :                     "archive_path": ""
     716              :                 }
     717              :             },
     718              :         }))
     719            1 :         .unwrap();
     720              : 
     721            1 :         rspec
     722            1 :             .get_ext("ext", false, "latest", "v17")
     723            1 :             .expect_err("Extension should not be found");
     724              : 
     725            1 :         let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
     726            1 :             "public_extensions": [],
     727            1 :             "custom_extensions": ["ext"],
     728            1 :             "library_index": {
     729            1 :                 "ext": "ext"
     730              :             },
     731            1 :             "extension_data": {
     732            1 :                 "ext": {
     733            1 :                     "control_data": {
     734            1 :                         "ext.control": ""
     735              :                     },
     736            1 :                     "archive_path": ""
     737              :                 }
     738              :             },
     739              :         }))
     740            1 :         .unwrap();
     741              : 
     742            1 :         rspec
     743            1 :             .get_ext("ext", false, "latest", "v17")
     744            1 :             .expect("Extension should be found");
     745              : 
     746            1 :         let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
     747            1 :             "public_extensions": ["ext"],
     748            1 :             "custom_extensions": [],
     749            1 :             "library_index": {
     750            1 :                 "extlib": "ext",
     751              :             },
     752            1 :             "extension_data": {
     753            1 :                 "ext": {
     754            1 :                     "control_data": {
     755            1 :                         "ext.control": ""
     756              :                     },
     757            1 :                     "archive_path": ""
     758              :                 }
     759              :             },
     760              :         }))
     761            1 :         .unwrap();
     762              : 
     763            1 :         rspec
     764            1 :             .get_ext("ext", false, "latest", "v17")
     765            1 :             .expect("Extension should be found");
     766              : 
     767              :         // test library index for the case when library name
     768              :         // doesn't match the extension name
     769            1 :         rspec
     770            1 :             .get_ext("extlib", true, "latest", "v17")
     771            1 :             .expect("Library should be found");
     772            1 :     }
     773              : 
     774              :     #[test]
     775            1 :     fn remote_extension_path() {
     776            1 :         let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
     777            1 :             "public_extensions": ["ext"],
     778            1 :             "custom_extensions": [],
     779            1 :             "library_index": {
     780            1 :                 "extlib": "ext",
     781              :             },
     782            1 :             "extension_data": {
     783            1 :                 "ext": {
     784            1 :                     "control_data": {
     785            1 :                         "ext.control": ""
     786              :                     },
     787            1 :                     "archive_path": ""
     788              :                 }
     789              :             },
     790              :         }))
     791            1 :         .unwrap();
     792              : 
     793            1 :         let (_ext_name, ext_path) = rspec
     794            1 :             .get_ext("ext", false, "latest", "v17")
     795            1 :             .expect("Extension should be found");
     796              :         // Starting with a forward slash would have consequences for the
     797              :         // Url::join() that occurs when downloading a remote extension.
     798            1 :         assert!(!ext_path.to_string().starts_with("/"));
     799            1 :         assert_eq!(
     800              :             ext_path,
     801            1 :             RemoteExtSpec::build_remote_path("latest", "v17", "ext").unwrap()
     802              :         );
     803            1 :     }
     804              : 
     805              :     #[test]
     806            1 :     fn parse_spec_file() {
     807            1 :         let file = File::open("tests/cluster_spec.json").unwrap();
     808            1 :         let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
     809              : 
     810              :         // Features list defaults to empty vector.
     811            1 :         assert!(spec.features.is_empty());
     812              : 
     813              :         // Reconfigure concurrency defaults to 1.
     814            1 :         assert_eq!(spec.reconfigure_concurrency, 1);
     815            1 :     }
     816              : 
     817              :     #[test]
     818            1 :     fn parse_unknown_fields() {
     819              :         // Forward compatibility test
     820            1 :         let file = File::open("tests/cluster_spec.json").unwrap();
     821            1 :         let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
     822            1 :         let ob = json.as_object_mut().unwrap();
     823            1 :         ob.insert("unknown_field_123123123".into(), "hello".into());
     824            1 :         let _spec: ComputeSpec = serde_json::from_value(json).unwrap();
     825            1 :     }
     826              : 
     827              :     #[test]
     828            1 :     fn parse_unknown_features() {
     829              :         // Test that unknown feature flags do not cause any errors.
     830            1 :         let file = File::open("tests/cluster_spec.json").unwrap();
     831            1 :         let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
     832            1 :         let ob = json.as_object_mut().unwrap();
     833              : 
     834              :         // Add unknown feature flags.
     835            1 :         let features = vec!["foo_bar_feature", "baz_feature"];
     836            1 :         ob.insert("features".into(), features.into());
     837              : 
     838            1 :         let spec: ComputeSpec = serde_json::from_value(json).unwrap();
     839              : 
     840            1 :         assert!(spec.features.len() == 2);
     841            1 :         assert!(spec.features.contains(&ComputeFeature::UnknownFeature));
     842            1 :         assert_eq!(spec.features, vec![ComputeFeature::UnknownFeature; 2]);
     843            1 :     }
     844              : 
     845              :     #[test]
     846            1 :     fn parse_known_features() {
     847              :         // Test that we can properly parse known feature flags.
     848            1 :         let file = File::open("tests/cluster_spec.json").unwrap();
     849            1 :         let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
     850            1 :         let ob = json.as_object_mut().unwrap();
     851              : 
     852              :         // Add known feature flags.
     853            1 :         let features = vec!["activity_monitor_experimental"];
     854            1 :         ob.insert("features".into(), features.into());
     855              : 
     856            1 :         let spec: ComputeSpec = serde_json::from_value(json).unwrap();
     857              : 
     858            1 :         assert_eq!(
     859              :             spec.features,
     860            1 :             vec![ComputeFeature::ActivityMonitorExperimental]
     861              :         );
     862            1 :     }
     863              : }
        

Generated by: LCOV version 2.1-beta