LCOV - code coverage report
Current view: top level - proxy/src/control_plane - mod.rs (source / functions) Coverage Total Hit
Test: 553e39c2773e5840c720c90d86e56f89a4330d43.info Lines: 48.6 % 70 34
Test Date: 2025-06-13 20:01:21 Functions: 40.0 % 5 2

            Line data    Source code
       1              : //! Various stuff for dealing with the Neon Console.
       2              : //! Later we might move some API wrappers here.
       3              : 
       4              : /// Payloads used in the console's APIs.
       5              : pub mod messages;
       6              : 
       7              : /// Wrappers for console APIs and their mocks.
       8              : pub mod client;
       9              : 
      10              : pub(crate) mod errors;
      11              : 
      12              : use std::sync::Arc;
      13              : 
      14              : use messages::EndpointRateLimitConfig;
      15              : 
      16              : use crate::auth::backend::ComputeUserInfo;
      17              : use crate::auth::backend::jwt::AuthRule;
      18              : use crate::auth::{AuthError, IpPattern, check_peer_addr_is_in_list};
      19              : use crate::cache::{Cached, TimedLru};
      20              : use crate::config::ComputeConfig;
      21              : use crate::context::RequestContext;
      22              : use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
      23              : use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt};
      24              : use crate::protocol2::ConnectionInfoExtra;
      25              : use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig};
      26              : use crate::types::{EndpointCacheKey, EndpointId, RoleName};
      27              : use crate::{compute, scram};
      28              : 
      29              : /// Various cache-related types.
      30              : pub mod caches {
      31              :     pub use super::client::ApiCaches;
      32              : }
      33              : 
      34              : /// Various cache-related types.
      35              : pub mod locks {
      36              :     pub use super::client::ApiLocks;
      37              : }
      38              : 
      39              : /// Console's management API.
      40              : pub mod mgmt;
      41              : 
      42              : /// Auth secret which is managed by the cloud.
      43              : #[derive(Clone, Eq, PartialEq, Debug)]
      44              : pub(crate) enum AuthSecret {
      45              :     /// [SCRAM](crate::scram) authentication info.
      46              :     Scram(scram::ServerSecret),
      47              : }
      48              : 
      49              : #[derive(Default)]
      50              : pub(crate) struct AuthInfo {
      51              :     pub(crate) secret: Option<AuthSecret>,
      52              :     /// List of IP addresses allowed for the autorization.
      53              :     pub(crate) allowed_ips: Vec<IpPattern>,
      54              :     /// List of VPC endpoints allowed for the autorization.
      55              :     pub(crate) allowed_vpc_endpoint_ids: Vec<String>,
      56              :     /// Project ID. This is used for cache invalidation.
      57              :     pub(crate) project_id: Option<ProjectIdInt>,
      58              :     /// Account ID. This is used for cache invalidation.
      59              :     pub(crate) account_id: Option<AccountIdInt>,
      60              :     /// Are public connections or VPC connections blocked?
      61              :     pub(crate) access_blocker_flags: AccessBlockerFlags,
      62              :     /// The rate limits for this endpoint.
      63              :     pub(crate) rate_limits: EndpointRateLimitConfig,
      64              : }
      65              : 
      66              : /// Info for establishing a connection to a compute node.
      67              : #[derive(Clone)]
      68              : pub(crate) struct NodeInfo {
      69              :     pub(crate) conn_info: compute::ConnectInfo,
      70              : 
      71              :     /// Labels for proxy's metrics.
      72              :     pub(crate) aux: MetricsAuxInfo,
      73              : }
      74              : 
      75              : impl NodeInfo {
      76            0 :     pub(crate) async fn connect(
      77            0 :         &self,
      78            0 :         ctx: &RequestContext,
      79            0 :         auth: &compute::AuthInfo,
      80            0 :         config: &ComputeConfig,
      81            0 :         user_info: ComputeUserInfo,
      82            0 :     ) -> Result<compute::PostgresConnection, compute::ConnectionError> {
      83            0 :         self.conn_info
      84            0 :             .connect(ctx, self.aux.clone(), auth, config, user_info)
      85            0 :             .await
      86            0 :     }
      87              : }
      88              : 
      89              : #[derive(Copy, Clone, Default)]
      90              : pub(crate) struct AccessBlockerFlags {
      91              :     pub public_access_blocked: bool,
      92              :     pub vpc_access_blocked: bool,
      93              : }
      94              : 
      95              : pub(crate) type NodeInfoCache =
      96              :     TimedLru<EndpointCacheKey, Result<NodeInfo, Box<ControlPlaneErrorMessage>>>;
      97              : pub(crate) type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>;
      98              : 
      99              : #[derive(Clone)]
     100              : pub struct RoleAccessControl {
     101              :     pub secret: Option<AuthSecret>,
     102              : }
     103              : 
     104              : #[derive(Clone)]
     105              : pub struct EndpointAccessControl {
     106              :     pub allowed_ips: Arc<Vec<IpPattern>>,
     107              :     pub allowed_vpce: Arc<Vec<String>>,
     108              :     pub flags: AccessBlockerFlags,
     109              : 
     110              :     pub rate_limits: EndpointRateLimitConfig,
     111              : }
     112              : 
     113              : impl EndpointAccessControl {
     114            3 :     pub fn check(
     115            3 :         &self,
     116            3 :         ctx: &RequestContext,
     117            3 :         check_ip_allowed: bool,
     118            3 :         check_vpc_allowed: bool,
     119            3 :     ) -> Result<(), AuthError> {
     120            3 :         if check_ip_allowed && !check_peer_addr_is_in_list(&ctx.peer_addr(), &self.allowed_ips) {
     121            0 :             return Err(AuthError::IpAddressNotAllowed(ctx.peer_addr()));
     122            3 :         }
     123            3 : 
     124            3 :         // check if a VPC endpoint ID is coming in and if yes, if it's allowed
     125            3 :         if check_vpc_allowed {
     126            0 :             if self.flags.vpc_access_blocked {
     127            0 :                 return Err(AuthError::NetworkNotAllowed);
     128            0 :             }
     129              : 
     130            0 :             let incoming_vpc_endpoint_id = match ctx.extra() {
     131            0 :                 None => return Err(AuthError::MissingVPCEndpointId),
     132            0 :                 Some(ConnectionInfoExtra::Aws { vpce_id }) => vpce_id.to_string(),
     133            0 :                 Some(ConnectionInfoExtra::Azure { link_id }) => link_id.to_string(),
     134              :             };
     135              : 
     136            0 :             let vpce = &self.allowed_vpce;
     137            0 :             // TODO: For now an empty VPC endpoint ID list means all are allowed. We should replace that.
     138            0 :             if !vpce.is_empty() && !vpce.contains(&incoming_vpc_endpoint_id) {
     139            0 :                 return Err(AuthError::vpc_endpoint_id_not_allowed(
     140            0 :                     incoming_vpc_endpoint_id,
     141            0 :                 ));
     142            0 :             }
     143            3 :         } else if self.flags.public_access_blocked {
     144            0 :             return Err(AuthError::NetworkNotAllowed);
     145            3 :         }
     146              : 
     147            3 :         Ok(())
     148            3 :     }
     149              : 
     150            3 :     pub fn connection_attempt_rate_limit(
     151            3 :         &self,
     152            3 :         ctx: &RequestContext,
     153            3 :         endpoint: &EndpointId,
     154            3 :         rate_limiter: &EndpointRateLimiter,
     155            3 :     ) -> Result<(), AuthError> {
     156            3 :         let endpoint = EndpointIdInt::from(endpoint);
     157            3 : 
     158            3 :         let limits = &self.rate_limits.connection_attempts;
     159            3 :         let config = match ctx.protocol() {
     160            0 :             crate::metrics::Protocol::Http => limits.http,
     161            0 :             crate::metrics::Protocol::Ws => limits.ws,
     162            3 :             crate::metrics::Protocol::Tcp => limits.tcp,
     163            0 :             crate::metrics::Protocol::SniRouter => return Ok(()),
     164              :         };
     165            3 :         let config = config.and_then(|config| {
     166            0 :             if config.rps <= 0.0 || config.burst <= 0.0 {
     167            0 :                 return None;
     168            0 :             }
     169            0 : 
     170            0 :             Some(LeakyBucketConfig::new(config.rps, config.burst))
     171            3 :         });
     172            3 : 
     173            3 :         if !rate_limiter.check(endpoint, config, 1) {
     174            0 :             return Err(AuthError::too_many_connections());
     175            3 :         }
     176            3 : 
     177            3 :         Ok(())
     178            3 :     }
     179              : }
     180              : 
     181              : /// This will allocate per each call, but the http requests alone
     182              : /// already require a few allocations, so it should be fine.
     183              : pub(crate) trait ControlPlaneApi {
     184              :     async fn get_role_access_control(
     185              :         &self,
     186              :         ctx: &RequestContext,
     187              :         endpoint: &EndpointId,
     188              :         role: &RoleName,
     189              :     ) -> Result<RoleAccessControl, errors::GetAuthInfoError>;
     190              : 
     191              :     async fn get_endpoint_access_control(
     192              :         &self,
     193              :         ctx: &RequestContext,
     194              :         endpoint: &EndpointId,
     195              :         role: &RoleName,
     196              :     ) -> Result<EndpointAccessControl, errors::GetAuthInfoError>;
     197              : 
     198              :     async fn get_endpoint_jwks(
     199              :         &self,
     200              :         ctx: &RequestContext,
     201              :         endpoint: &EndpointId,
     202              :     ) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError>;
     203              : 
     204              :     /// Wake up the compute node and return the corresponding connection info.
     205              :     async fn wake_compute(
     206              :         &self,
     207              :         ctx: &RequestContext,
     208              :         user_info: &ComputeUserInfo,
     209              :     ) -> Result<CachedNodeInfo, errors::WakeComputeError>;
     210              : }
        

Generated by: LCOV version 2.1-beta