LCOV - code coverage report
Current view: top level - proxy/src/control_plane - mod.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 47.5 % 59 28
Test Date: 2025-07-16 12:29:03 Functions: 40.0 % 5 2

            Line data    Source code
       1              : //! Various stuff for dealing with the Neon Console.
       2              : //! Later we might move some API wrappers here.
       3              : 
       4              : /// Payloads used in the console's APIs.
       5              : pub mod messages;
       6              : 
       7              : /// Wrappers for console APIs and their mocks.
       8              : pub mod client;
       9              : 
      10              : pub(crate) mod errors;
      11              : 
      12              : use std::sync::Arc;
      13              : 
      14              : use messages::EndpointRateLimitConfig;
      15              : 
      16              : use crate::auth::backend::ComputeUserInfo;
      17              : use crate::auth::backend::jwt::AuthRule;
      18              : use crate::auth::{AuthError, IpPattern, check_peer_addr_is_in_list};
      19              : use crate::cache::{Cached, TimedLru};
      20              : use crate::config::ComputeConfig;
      21              : use crate::context::RequestContext;
      22              : use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
      23              : use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt};
      24              : use crate::protocol2::ConnectionInfoExtra;
      25              : use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig};
      26              : use crate::types::{EndpointCacheKey, EndpointId, RoleName};
      27              : use crate::{compute, scram};
      28              : 
      29              : /// Various cache-related types.
      30              : pub mod caches {
      31              :     pub use super::client::ApiCaches;
      32              : }
      33              : 
      34              : /// Various cache-related types.
      35              : pub mod locks {
      36              :     pub use super::client::ApiLocks;
      37              : }
      38              : 
      39              : /// Console's management API.
      40              : pub mod mgmt;
      41              : 
      42              : /// Auth secret which is managed by the cloud.
      43              : #[derive(Clone, Eq, PartialEq, Debug)]
      44              : pub(crate) enum AuthSecret {
      45              :     /// [SCRAM](crate::scram) authentication info.
      46              :     Scram(scram::ServerSecret),
      47              : }
      48              : 
      49              : #[derive(Default)]
      50              : pub(crate) struct AuthInfo {
      51              :     pub(crate) secret: Option<AuthSecret>,
      52              :     /// List of IP addresses allowed for the autorization.
      53              :     pub(crate) allowed_ips: Vec<IpPattern>,
      54              :     /// List of VPC endpoints allowed for the autorization.
      55              :     pub(crate) allowed_vpc_endpoint_ids: Vec<String>,
      56              :     /// Project ID. This is used for cache invalidation.
      57              :     pub(crate) project_id: Option<ProjectIdInt>,
      58              :     /// Account ID. This is used for cache invalidation.
      59              :     pub(crate) account_id: Option<AccountIdInt>,
      60              :     /// Are public connections or VPC connections blocked?
      61              :     pub(crate) access_blocker_flags: AccessBlockerFlags,
      62              :     /// The rate limits for this endpoint.
      63              :     pub(crate) rate_limits: EndpointRateLimitConfig,
      64              : }
      65              : 
      66              : /// Info for establishing a connection to a compute node.
      67              : #[derive(Clone)]
      68              : pub(crate) struct NodeInfo {
      69              :     pub(crate) conn_info: compute::ConnectInfo,
      70              : 
      71              :     /// Labels for proxy's metrics.
      72              :     pub(crate) aux: MetricsAuxInfo,
      73              : }
      74              : 
      75              : impl NodeInfo {
      76            0 :     pub(crate) async fn connect(
      77            0 :         &self,
      78            0 :         ctx: &RequestContext,
      79            0 :         config: &ComputeConfig,
      80            0 :     ) -> Result<compute::ComputeConnection, compute::ConnectionError> {
      81            0 :         self.conn_info.connect(ctx, &self.aux, config).await
      82            0 :     }
      83              : }
      84              : 
      85              : #[derive(Copy, Clone, Default)]
      86              : pub(crate) struct AccessBlockerFlags {
      87              :     pub public_access_blocked: bool,
      88              :     pub vpc_access_blocked: bool,
      89              : }
      90              : 
      91              : pub(crate) type NodeInfoCache =
      92              :     TimedLru<EndpointCacheKey, Result<NodeInfo, Box<ControlPlaneErrorMessage>>>;
      93              : pub(crate) type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>;
      94              : 
      95              : #[derive(Clone)]
      96              : pub struct RoleAccessControl {
      97              :     pub secret: Option<AuthSecret>,
      98              : }
      99              : 
     100              : #[derive(Clone)]
     101              : pub struct EndpointAccessControl {
     102              :     pub allowed_ips: Arc<Vec<IpPattern>>,
     103              :     pub allowed_vpce: Arc<Vec<String>>,
     104              :     pub flags: AccessBlockerFlags,
     105              : 
     106              :     pub rate_limits: EndpointRateLimitConfig,
     107              : }
     108              : 
     109              : impl EndpointAccessControl {
     110            3 :     pub fn check(
     111            3 :         &self,
     112            3 :         ctx: &RequestContext,
     113            3 :         check_ip_allowed: bool,
     114            3 :         check_vpc_allowed: bool,
     115            3 :     ) -> Result<(), AuthError> {
     116            3 :         if check_ip_allowed && !check_peer_addr_is_in_list(&ctx.peer_addr(), &self.allowed_ips) {
     117            0 :             return Err(AuthError::IpAddressNotAllowed(ctx.peer_addr()));
     118            3 :         }
     119              : 
     120              :         // check if a VPC endpoint ID is coming in and if yes, if it's allowed
     121            3 :         if check_vpc_allowed {
     122            0 :             if self.flags.vpc_access_blocked {
     123            0 :                 return Err(AuthError::NetworkNotAllowed);
     124            0 :             }
     125              : 
     126            0 :             let incoming_vpc_endpoint_id = match ctx.extra() {
     127            0 :                 None => return Err(AuthError::MissingVPCEndpointId),
     128            0 :                 Some(ConnectionInfoExtra::Aws { vpce_id }) => vpce_id.to_string(),
     129            0 :                 Some(ConnectionInfoExtra::Azure { link_id }) => link_id.to_string(),
     130              :             };
     131              : 
     132            0 :             let vpce = &self.allowed_vpce;
     133              :             // TODO: For now an empty VPC endpoint ID list means all are allowed. We should replace that.
     134            0 :             if !vpce.is_empty() && !vpce.contains(&incoming_vpc_endpoint_id) {
     135            0 :                 return Err(AuthError::vpc_endpoint_id_not_allowed(
     136            0 :                     incoming_vpc_endpoint_id,
     137            0 :                 ));
     138            0 :             }
     139            3 :         } else if self.flags.public_access_blocked {
     140            0 :             return Err(AuthError::NetworkNotAllowed);
     141            3 :         }
     142              : 
     143            3 :         Ok(())
     144            3 :     }
     145              : 
     146            3 :     pub fn connection_attempt_rate_limit(
     147            3 :         &self,
     148            3 :         ctx: &RequestContext,
     149            3 :         endpoint: &EndpointId,
     150            3 :         rate_limiter: &EndpointRateLimiter,
     151            3 :     ) -> Result<(), AuthError> {
     152            3 :         let endpoint = EndpointIdInt::from(endpoint);
     153              : 
     154            3 :         let limits = &self.rate_limits.connection_attempts;
     155            3 :         let config = match ctx.protocol() {
     156            0 :             crate::metrics::Protocol::Http => limits.http,
     157            0 :             crate::metrics::Protocol::Ws => limits.ws,
     158            3 :             crate::metrics::Protocol::Tcp => limits.tcp,
     159            0 :             crate::metrics::Protocol::SniRouter => return Ok(()),
     160              :         };
     161            3 :         let config = config.and_then(|config| {
     162            0 :             if config.rps <= 0.0 || config.burst <= 0.0 {
     163            0 :                 return None;
     164            0 :             }
     165              : 
     166            0 :             Some(LeakyBucketConfig::new(config.rps, config.burst))
     167            0 :         });
     168              : 
     169            3 :         if !rate_limiter.check(endpoint, config, 1) {
     170            0 :             return Err(AuthError::too_many_connections());
     171            3 :         }
     172              : 
     173            3 :         Ok(())
     174            3 :     }
     175              : }
     176              : 
     177              : /// This will allocate per each call, but the http requests alone
     178              : /// already require a few allocations, so it should be fine.
     179              : pub(crate) trait ControlPlaneApi {
     180              :     async fn get_role_access_control(
     181              :         &self,
     182              :         ctx: &RequestContext,
     183              :         endpoint: &EndpointId,
     184              :         role: &RoleName,
     185              :     ) -> Result<RoleAccessControl, errors::GetAuthInfoError>;
     186              : 
     187              :     async fn get_endpoint_access_control(
     188              :         &self,
     189              :         ctx: &RequestContext,
     190              :         endpoint: &EndpointId,
     191              :         role: &RoleName,
     192              :     ) -> Result<EndpointAccessControl, errors::GetAuthInfoError>;
     193              : 
     194              :     async fn get_endpoint_jwks(
     195              :         &self,
     196              :         ctx: &RequestContext,
     197              :         endpoint: &EndpointId,
     198              :     ) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError>;
     199              : 
     200              :     /// Wake up the compute node and return the corresponding connection info.
     201              :     async fn wake_compute(
     202              :         &self,
     203              :         ctx: &RequestContext,
     204              :         user_info: &ComputeUserInfo,
     205              :     ) -> Result<CachedNodeInfo, errors::WakeComputeError>;
     206              : }
        

Generated by: LCOV version 2.1-beta