LCOV - code coverage report
Current view: top level - libs/compute_api/src - spec.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 57.0 % 107 61
Test Date: 2025-01-30 15:18:43 Functions: 9.1 % 242 22

            Line data    Source code
       1              : //! `ComputeSpec` represents the contents of the spec.json file.
       2              : //!
       3              : //! The spec.json file is used to pass information to 'compute_ctl'. It contains
       4              : //! all the information needed to start up the right version of PostgreSQL,
       5              : //! and connect it to the storage nodes.
       6              : use std::collections::HashMap;
       7              : 
       8              : use serde::{Deserialize, Serialize};
       9              : use utils::id::{TenantId, TimelineId};
      10              : use utils::lsn::Lsn;
      11              : 
      12              : use regex::Regex;
      13              : use remote_storage::RemotePath;
      14              : 
      15              : /// String type alias representing Postgres identifier and
      16              : /// intended to be used for DB / role names.
      17              : pub type PgIdent = String;
      18              : 
      19              : /// String type alias representing Postgres extension version
      20              : pub type ExtVersion = String;
      21              : 
      22            6 : fn default_reconfigure_concurrency() -> usize {
      23            6 :     1
      24            6 : }
      25              : 
      26              : /// Cluster spec or configuration represented as an optional number of
      27              : /// delta operations + final cluster state description.
      28           45 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
      29              : pub struct ComputeSpec {
      30              :     pub format_version: f32,
      31              : 
      32              :     // The control plane also includes a 'timestamp' field in the JSON document,
      33              :     // but we don't use it for anything. Serde will ignore missing fields when
      34              :     // deserializing it.
      35              :     pub operation_uuid: Option<String>,
      36              : 
      37              :     /// Compute features to enable. These feature flags are provided, when we
      38              :     /// know all the details about client's compute, so they cannot be used
      39              :     /// to change `Empty` compute behavior.
      40              :     #[serde(default)]
      41              :     pub features: Vec<ComputeFeature>,
      42              : 
      43              :     /// If compute_ctl was passed `--resize-swap-on-bind`, a value of `Some(_)` instructs
      44              :     /// compute_ctl to `/neonvm/bin/resize-swap` with the given size, when the spec is first
      45              :     /// received.
      46              :     ///
      47              :     /// Both this field and `--resize-swap-on-bind` are required, so that the control plane's
      48              :     /// spec generation doesn't need to be aware of the actual compute it's running on, while
      49              :     /// guaranteeing gradual rollout of swap. Otherwise, without `--resize-swap-on-bind`, we could
      50              :     /// end up trying to resize swap in VMs without it -- or end up *not* resizing swap, thus
      51              :     /// giving every VM much more swap than it should have (32GiB).
      52              :     ///
      53              :     /// Eventually we may remove `--resize-swap-on-bind` and exclusively use `swap_size_bytes` for
      54              :     /// enabling the swap resizing behavior once rollout is complete.
      55              :     ///
      56              :     /// See neondatabase/cloud#12047 for more.
      57              :     #[serde(default)]
      58              :     pub swap_size_bytes: Option<u64>,
      59              : 
      60              :     /// If compute_ctl was passed `--set-disk-quota-for-fs`, a value of `Some(_)` instructs
      61              :     /// compute_ctl to run `/neonvm/bin/set-disk-quota` with the given size and fs, when the
      62              :     /// spec is first received.
      63              :     ///
      64              :     /// Both this field and `--set-disk-quota-for-fs` are required, so that the control plane's
      65              :     /// spec generation doesn't need to be aware of the actual compute it's running on, while
      66              :     /// guaranteeing gradual rollout of disk quota.
      67              :     #[serde(default)]
      68              :     pub disk_quota_bytes: Option<u64>,
      69              : 
      70              :     /// Disables the vm-monitor behavior that resizes LFC on upscale/downscale, instead relying on
      71              :     /// the initial size of LFC.
      72              :     ///
      73              :     /// This is intended for use when the LFC size is being overridden from the default but
      74              :     /// autoscaling is still enabled, and we don't want the vm-monitor to interfere with the custom
      75              :     /// LFC sizing.
      76              :     #[serde(default)]
      77              :     pub disable_lfc_resizing: Option<bool>,
      78              : 
      79              :     /// Expected cluster state at the end of transition process.
      80              :     pub cluster: Cluster,
      81              :     pub delta_operations: Option<Vec<DeltaOp>>,
      82              : 
      83              :     /// An optional hint that can be passed to speed up startup time if we know
      84              :     /// that no pg catalog mutations (like role creation, database creation,
      85              :     /// extension creation) need to be done on the actual database to start.
      86              :     #[serde(default)] // Default false
      87              :     pub skip_pg_catalog_updates: bool,
      88              : 
      89              :     // Information needed to connect to the storage layer.
      90              :     //
      91              :     // `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
      92              :     //
      93              :     // Depending on `mode`, this can be a primary read-write node, a read-only
      94              :     // replica, or a read-only node pinned at an older LSN.
      95              :     // `safekeeper_connstrings` must be set for a primary.
      96              :     //
      97              :     // For backwards compatibility, the control plane may leave out all of
      98              :     // these, and instead set the "neon.tenant_id", "neon.timeline_id",
      99              :     // etc. GUCs in cluster.settings. TODO: Once the control plane has been
     100              :     // updated to fill these fields, we can make these non optional.
     101              :     pub tenant_id: Option<TenantId>,
     102              :     pub timeline_id: Option<TimelineId>,
     103              :     pub pageserver_connstring: Option<String>,
     104              : 
     105              :     #[serde(default)]
     106              :     pub safekeeper_connstrings: Vec<String>,
     107              : 
     108              :     #[serde(default)]
     109              :     pub mode: ComputeMode,
     110              : 
     111              :     /// If set, 'storage_auth_token' is used as the password to authenticate to
     112              :     /// the pageserver and safekeepers.
     113              :     pub storage_auth_token: Option<String>,
     114              : 
     115              :     // information about available remote extensions
     116              :     pub remote_extensions: Option<RemoteExtSpec>,
     117              : 
     118              :     pub pgbouncer_settings: Option<HashMap<String, String>>,
     119              : 
     120              :     // Stripe size for pageserver sharding, in pages
     121              :     #[serde(default)]
     122              :     pub shard_stripe_size: Option<usize>,
     123              : 
     124              :     /// Local Proxy configuration used for JWT authentication
     125              :     #[serde(default)]
     126              :     pub local_proxy_config: Option<LocalProxySpec>,
     127              : 
     128              :     /// Number of concurrent connections during the parallel RunInEachDatabase
     129              :     /// phase of the apply config process.
     130              :     ///
     131              :     /// We need a higher concurrency during reconfiguration in case of many DBs,
     132              :     /// but instance is already running and used by client. We can easily get out of
     133              :     /// `max_connections` limit, and the current code won't handle that.
     134              :     ///
     135              :     /// Default is 1, but also allow control plane to override this value for specific
     136              :     /// projects. It's also recommended to bump `superuser_reserved_connections` +=
     137              :     /// `reconfigure_concurrency` for such projects to ensure that we always have
     138              :     /// enough spare connections for reconfiguration process to succeed.
     139              :     #[serde(default = "default_reconfigure_concurrency")]
     140              :     pub reconfigure_concurrency: usize,
     141              : 
     142              :     /// If set to true, the compute_ctl will drop all subscriptions before starting the
     143              :     /// compute. This is needed when we start an endpoint on a branch, so that child
     144              :     /// would not compete with parent branch subscriptions
     145              :     /// over the same replication content from publisher.
     146              :     #[serde(default)] // Default false
     147              :     pub drop_subscriptions_before_start: bool,
     148              : }
     149              : 
     150              : /// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
     151            3 : #[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
     152              : #[serde(rename_all = "snake_case")]
     153              : pub enum ComputeFeature {
     154              :     // XXX: Add more feature flags here.
     155              :     /// Enable the experimental activity monitor logic, which uses `pg_stat_database` to
     156              :     /// track short-lived connections as user activity.
     157              :     ActivityMonitorExperimental,
     158              : 
     159              :     /// Pre-install and initialize anon extension for every database in the cluster
     160              :     AnonExtension,
     161              : 
     162              :     /// This is a special feature flag that is used to represent unknown feature flags.
     163              :     /// Basically all unknown to enum flags are represented as this one. See unit test
     164              :     /// `parse_unknown_features()` for more details.
     165              :     #[serde(other)]
     166              :     UnknownFeature,
     167              : }
     168              : 
     169           24 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
     170              : pub struct RemoteExtSpec {
     171              :     pub public_extensions: Option<Vec<String>>,
     172              :     pub custom_extensions: Option<Vec<String>>,
     173              :     pub library_index: HashMap<String, String>,
     174              :     pub extension_data: HashMap<String, ExtensionData>,
     175              : }
     176              : 
     177           24 : #[derive(Clone, Debug, Serialize, Deserialize)]
     178              : pub struct ExtensionData {
     179              :     pub control_data: HashMap<String, String>,
     180              :     pub archive_path: String,
     181              : }
     182              : 
     183              : impl RemoteExtSpec {
     184            0 :     pub fn get_ext(
     185            0 :         &self,
     186            0 :         ext_name: &str,
     187            0 :         is_library: bool,
     188            0 :         build_tag: &str,
     189            0 :         pg_major_version: &str,
     190            0 :     ) -> anyhow::Result<(String, RemotePath)> {
     191            0 :         let mut real_ext_name = ext_name;
     192            0 :         if is_library {
     193              :             // sometimes library names might have a suffix like
     194              :             // library.so or library.so.3. We strip this off
     195              :             // because library_index is based on the name without the file extension
     196            0 :             let strip_lib_suffix = Regex::new(r"\.so.*").unwrap();
     197            0 :             let lib_raw_name = strip_lib_suffix.replace(real_ext_name, "").to_string();
     198            0 : 
     199            0 :             real_ext_name = self
     200            0 :                 .library_index
     201            0 :                 .get(&lib_raw_name)
     202            0 :                 .ok_or(anyhow::anyhow!("library {} is not found", lib_raw_name))?;
     203            0 :         }
     204              : 
     205              :         // Check if extension is present in public or custom.
     206              :         // If not, then it is not allowed to be used by this compute.
     207            0 :         if let Some(public_extensions) = &self.public_extensions {
     208            0 :             if !public_extensions.contains(&real_ext_name.to_string()) {
     209            0 :                 if let Some(custom_extensions) = &self.custom_extensions {
     210            0 :                     if !custom_extensions.contains(&real_ext_name.to_string()) {
     211            0 :                         return Err(anyhow::anyhow!("extension {} is not found", real_ext_name));
     212            0 :                     }
     213            0 :                 }
     214            0 :             }
     215            0 :         }
     216              : 
     217            0 :         match self.extension_data.get(real_ext_name) {
     218            0 :             Some(_ext_data) => {
     219            0 :                 // Construct the path to the extension archive
     220            0 :                 // BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
     221            0 :                 //
     222            0 :                 // Keep it in sync with path generation in
     223            0 :                 // https://github.com/neondatabase/build-custom-extensions/tree/main
     224            0 :                 let archive_path_str =
     225            0 :                     format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
     226            0 :                 Ok((
     227            0 :                     real_ext_name.to_string(),
     228            0 :                     RemotePath::from_string(&archive_path_str)?,
     229              :                 ))
     230              :             }
     231            0 :             None => Err(anyhow::anyhow!(
     232            0 :                 "real_ext_name {} is not found",
     233            0 :                 real_ext_name
     234            0 :             )),
     235              :         }
     236            0 :     }
     237              : }
     238              : 
     239            0 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
     240              : pub enum ComputeMode {
     241              :     /// A read-write node
     242              :     #[default]
     243              :     Primary,
     244              :     /// A read-only node, pinned at a particular LSN
     245              :     Static(Lsn),
     246              :     /// A read-only node that follows the tip of the branch in hot standby mode
     247              :     ///
     248              :     /// Future versions may want to distinguish between replicas with hot standby
     249              :     /// feedback and other kinds of replication configurations.
     250              :     Replica,
     251              : }
     252              : 
     253           36 : #[derive(Clone, Debug, Default, Deserialize, Serialize)]
     254              : pub struct Cluster {
     255              :     pub cluster_id: Option<String>,
     256              :     pub name: Option<String>,
     257              :     pub state: Option<String>,
     258              :     pub roles: Vec<Role>,
     259              :     pub databases: Vec<Database>,
     260              : 
     261              :     /// Desired contents of 'postgresql.conf' file. (The 'compute_ctl'
     262              :     /// tool may add additional settings to the final file.)
     263              :     pub postgresql_conf: Option<String>,
     264              : 
     265              :     /// Additional settings that will be appended to the 'postgresql.conf' file.
     266              :     pub settings: GenericOptions,
     267              : }
     268              : 
     269              : /// Single cluster state changing operation that could not be represented as
     270              : /// a static `Cluster` structure. For example:
     271              : /// - DROP DATABASE
     272              : /// - DROP ROLE
     273              : /// - ALTER ROLE name RENAME TO new_name
     274              : /// - ALTER DATABASE name RENAME TO new_name
     275           60 : #[derive(Clone, Debug, Deserialize, Serialize)]
     276              : pub struct DeltaOp {
     277              :     pub action: String,
     278              :     pub name: PgIdent,
     279              :     pub new_name: Option<PgIdent>,
     280              : }
     281              : 
     282              : /// Rust representation of Postgres role info with only those fields
     283              : /// that matter for us.
     284           90 : #[derive(Clone, Debug, Deserialize, Serialize)]
     285              : pub struct Role {
     286              :     pub name: PgIdent,
     287              :     pub encrypted_password: Option<String>,
     288              :     pub options: GenericOptions,
     289              : }
     290              : 
     291              : /// Rust representation of Postgres database info with only those fields
     292              : /// that matter for us.
     293           42 : #[derive(Clone, Debug, Deserialize, Serialize)]
     294              : pub struct Database {
     295              :     pub name: PgIdent,
     296              :     pub owner: PgIdent,
     297              :     pub options: GenericOptions,
     298              :     // These are derived flags, not present in the spec file.
     299              :     // They are never set by the control plane.
     300              :     #[serde(skip_deserializing, default)]
     301              :     pub restrict_conn: bool,
     302              :     #[serde(skip_deserializing, default)]
     303              :     pub invalid: bool,
     304              : }
     305              : 
     306              : /// Common type representing both SQL statement params with or without value,
     307              : /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
     308              : /// options like `wal_level = logical`.
     309          468 : #[derive(Clone, Debug, Deserialize, Serialize)]
     310              : pub struct GenericOption {
     311              :     pub name: String,
     312              :     pub value: Option<String>,
     313              :     pub vartype: String,
     314              : }
     315              : 
     316              : /// Optional collection of `GenericOption`'s. Type alias allows us to
     317              : /// declare a `trait` on it.
     318              : pub type GenericOptions = Option<Vec<GenericOption>>;
     319              : 
     320              : /// Configured the local_proxy application with the relevant JWKS and roles it should
     321              : /// use for authorizing connect requests using JWT.
     322            0 : #[derive(Clone, Debug, Deserialize, Serialize)]
     323              : pub struct LocalProxySpec {
     324              :     #[serde(default)]
     325              :     #[serde(skip_serializing_if = "Option::is_none")]
     326              :     pub jwks: Option<Vec<JwksSettings>>,
     327              : }
     328              : 
     329            0 : #[derive(Clone, Debug, Deserialize, Serialize)]
     330              : pub struct JwksSettings {
     331              :     pub id: String,
     332              :     pub role_names: Vec<String>,
     333              :     pub jwks_url: String,
     334              :     pub provider_name: String,
     335              :     pub jwt_audience: Option<String>,
     336              : }
     337              : 
     338              : #[cfg(test)]
     339              : mod tests {
     340              :     use super::*;
     341              :     use std::fs::File;
     342              : 
     343              :     #[test]
     344            1 :     fn parse_spec_file() {
     345            1 :         let file = File::open("tests/cluster_spec.json").unwrap();
     346            1 :         let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
     347            1 : 
     348            1 :         // Features list defaults to empty vector.
     349            1 :         assert!(spec.features.is_empty());
     350              : 
     351              :         // Reconfigure concurrency defaults to 1.
     352            1 :         assert_eq!(spec.reconfigure_concurrency, 1);
     353            1 :     }
     354              : 
     355              :     #[test]
     356            1 :     fn parse_unknown_fields() {
     357            1 :         // Forward compatibility test
     358            1 :         let file = File::open("tests/cluster_spec.json").unwrap();
     359            1 :         let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
     360            1 :         let ob = json.as_object_mut().unwrap();
     361            1 :         ob.insert("unknown_field_123123123".into(), "hello".into());
     362            1 :         let _spec: ComputeSpec = serde_json::from_value(json).unwrap();
     363            1 :     }
     364              : 
     365              :     #[test]
     366            1 :     fn parse_unknown_features() {
     367            1 :         // Test that unknown feature flags do not cause any errors.
     368            1 :         let file = File::open("tests/cluster_spec.json").unwrap();
     369            1 :         let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
     370            1 :         let ob = json.as_object_mut().unwrap();
     371            1 : 
     372            1 :         // Add unknown feature flags.
     373            1 :         let features = vec!["foo_bar_feature", "baz_feature"];
     374            1 :         ob.insert("features".into(), features.into());
     375            1 : 
     376            1 :         let spec: ComputeSpec = serde_json::from_value(json).unwrap();
     377            1 : 
     378            1 :         assert!(spec.features.len() == 2);
     379            1 :         assert!(spec.features.contains(&ComputeFeature::UnknownFeature));
     380            1 :         assert_eq!(spec.features, vec![ComputeFeature::UnknownFeature; 2]);
     381            1 :     }
     382              : 
     383              :     #[test]
     384            1 :     fn parse_known_features() {
     385            1 :         // Test that we can properly parse known feature flags.
     386            1 :         let file = File::open("tests/cluster_spec.json").unwrap();
     387            1 :         let mut json: serde_json::Value = serde_json::from_reader(file).unwrap();
     388            1 :         let ob = json.as_object_mut().unwrap();
     389            1 : 
     390            1 :         // Add known feature flags.
     391            1 :         let features = vec!["activity_monitor_experimental"];
     392            1 :         ob.insert("features".into(), features.into());
     393            1 : 
     394            1 :         let spec: ComputeSpec = serde_json::from_value(json).unwrap();
     395            1 : 
     396            1 :         assert_eq!(
     397            1 :             spec.features,
     398            1 :             vec![ComputeFeature::ActivityMonitorExperimental]
     399            1 :         );
     400            1 :     }
     401              : }
        

Generated by: LCOV version 2.1-beta