LCOV - code coverage report
Current view: top level - proxy/src/control_plane/client - cplane_proxy_v1.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 7.0 % 298 21
Test Date: 2025-07-22 17:50:06 Functions: 10.0 % 40 4

            Line data    Source code
       1              : //! Production console backend.
       2              : 
       3              : use std::net::IpAddr;
       4              : use std::str::FromStr;
       5              : use std::sync::Arc;
       6              : use std::time::Duration;
       7              : 
       8              : use ::http::HeaderName;
       9              : use ::http::header::AUTHORIZATION;
      10              : use bytes::Bytes;
      11              : use futures::TryFutureExt;
      12              : use hyper::StatusCode;
      13              : use postgres_client::config::SslMode;
      14              : use tokio::time::Instant;
      15              : use tracing::{Instrument, debug, info, info_span, warn};
      16              : 
      17              : use super::super::messages::{ControlPlaneErrorMessage, GetEndpointAccessControl, WakeCompute};
      18              : use crate::auth::backend::ComputeUserInfo;
      19              : use crate::auth::backend::jwt::AuthRule;
      20              : use crate::context::RequestContext;
      21              : use crate::control_plane::caches::ApiCaches;
      22              : use crate::control_plane::errors::{
      23              :     ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
      24              : };
      25              : use crate::control_plane::locks::ApiLocks;
      26              : use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse};
      27              : use crate::control_plane::{
      28              :     AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo,
      29              :     RoleAccessControl,
      30              : };
      31              : use crate::metrics::Metrics;
      32              : use crate::proxy::retry::CouldRetry;
      33              : use crate::rate_limiter::WakeComputeRateLimiter;
      34              : use crate::types::{EndpointCacheKey, EndpointId, RoleName};
      35              : use crate::{compute, http, scram};
      36              : 
      37              : pub(crate) const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
      38              : 
      39              : #[derive(Clone)]
      40              : pub struct NeonControlPlaneClient {
      41              :     endpoint: http::Endpoint,
      42              :     pub caches: &'static ApiCaches,
      43              :     pub(crate) locks: &'static ApiLocks<EndpointCacheKey>,
      44              :     pub(crate) wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
      45              :     // put in a shared ref so we don't copy secrets all over in memory
      46              :     jwt: Arc<str>,
      47              : }
      48              : 
      49              : impl NeonControlPlaneClient {
      50              :     /// Construct an API object containing the auth parameters.
      51            0 :     pub fn new(
      52            0 :         endpoint: http::Endpoint,
      53            0 :         jwt: Arc<str>,
      54            0 :         caches: &'static ApiCaches,
      55            0 :         locks: &'static ApiLocks<EndpointCacheKey>,
      56            0 :         wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
      57            0 :     ) -> Self {
      58            0 :         Self {
      59            0 :             endpoint,
      60            0 :             caches,
      61            0 :             locks,
      62            0 :             wake_compute_endpoint_rate_limiter,
      63            0 :             jwt,
      64            0 :         }
      65            0 :     }
      66              : 
      67            0 :     pub(crate) fn url(&self) -> &str {
      68            0 :         self.endpoint.url().as_str()
      69            0 :     }
      70              : 
      71            0 :     async fn get_and_cache_auth_info<T>(
      72            0 :         &self,
      73            0 :         ctx: &RequestContext,
      74            0 :         endpoint: &EndpointId,
      75            0 :         role: &RoleName,
      76            0 :         cache_key: &EndpointId,
      77            0 :         extract: impl FnOnce(&EndpointAccessControl, &RoleAccessControl) -> T,
      78            0 :     ) -> Result<T, GetAuthInfoError> {
      79            0 :         match self.do_get_auth_req(ctx, endpoint, role).await {
      80            0 :             Ok(auth_info) => {
      81            0 :                 let control = EndpointAccessControl {
      82            0 :                     allowed_ips: Arc::new(auth_info.allowed_ips),
      83            0 :                     allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
      84            0 :                     flags: auth_info.access_blocker_flags,
      85            0 :                     rate_limits: auth_info.rate_limits,
      86            0 :                 };
      87            0 :                 let role_control = RoleAccessControl {
      88            0 :                     secret: auth_info.secret,
      89            0 :                 };
      90            0 :                 let res = extract(&control, &role_control);
      91              : 
      92            0 :                 self.caches.project_info.insert_endpoint_access(
      93            0 :                     auth_info.account_id,
      94            0 :                     auth_info.project_id,
      95            0 :                     cache_key.into(),
      96            0 :                     role.into(),
      97            0 :                     control,
      98            0 :                     role_control,
      99            0 :                 );
     100              : 
     101            0 :                 if let Some(project_id) = auth_info.project_id {
     102            0 :                     ctx.set_project_id(project_id);
     103            0 :                 }
     104              : 
     105            0 :                 Ok(res)
     106              :             }
     107            0 :             Err(err) => match err {
     108            0 :                 GetAuthInfoError::ApiError(ControlPlaneError::Message(ref msg)) => {
     109            0 :                     let retry_info = msg.status.as_ref().and_then(|s| s.details.retry_info);
     110              : 
     111              :                     // If we can retry this error, do not cache it,
     112              :                     // unless we were given a retry delay.
     113            0 :                     if msg.could_retry() && retry_info.is_none() {
     114            0 :                         return Err(err);
     115            0 :                     }
     116              : 
     117            0 :                     self.caches.project_info.insert_endpoint_access_err(
     118            0 :                         cache_key.into(),
     119            0 :                         role.into(),
     120            0 :                         msg.clone(),
     121            0 :                         retry_info.map(|r| Duration::from_millis(r.retry_delay_ms)),
     122              :                     );
     123              : 
     124            0 :                     Err(err)
     125              :                 }
     126            0 :                 err => Err(err),
     127              :             },
     128              :         }
     129            0 :     }
     130              : 
     131            0 :     async fn do_get_auth_req(
     132            0 :         &self,
     133            0 :         ctx: &RequestContext,
     134            0 :         endpoint: &EndpointId,
     135            0 :         role: &RoleName,
     136            0 :     ) -> Result<AuthInfo, GetAuthInfoError> {
     137            0 :         async {
     138            0 :             let response = {
     139            0 :                 let request = self
     140            0 :                     .endpoint
     141            0 :                     .get_path("get_endpoint_access_control")
     142            0 :                     .header(X_REQUEST_ID, ctx.session_id().to_string())
     143            0 :                     .header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
     144            0 :                     .query(&[("session_id", ctx.session_id())])
     145            0 :                     .query(&[
     146            0 :                         ("application_name", ctx.console_application_name().as_str()),
     147            0 :                         ("endpointish", endpoint.as_str()),
     148            0 :                         ("role", role.as_str()),
     149            0 :                     ])
     150            0 :                     .build()?;
     151              : 
     152            0 :                 debug!(url = request.url().as_str(), "sending http request");
     153            0 :                 let start = Instant::now();
     154            0 :                 let _pause = ctx.latency_timer_pause_at(start, crate::metrics::Waiting::Cplane);
     155            0 :                 let response = self.endpoint.execute(request).await?;
     156              : 
     157            0 :                 info!(duration = ?start.elapsed(), "received http response");
     158              : 
     159            0 :                 response
     160              :             };
     161              : 
     162            0 :             let body = match parse_body::<GetEndpointAccessControl>(
     163            0 :                 response.status(),
     164            0 :                 response.bytes().await?,
     165              :             ) {
     166            0 :                 Ok(body) => body,
     167              :                 // Error 404 is special: it's ok not to have a secret.
     168              :                 // TODO(anna): retry
     169            0 :                 Err(e) => {
     170            0 :                     return if e.get_reason().is_not_found() {
     171              :                         // TODO: refactor this because it's weird
     172              :                         // this is a failure to authenticate but we return Ok.
     173            0 :                         Ok(AuthInfo::default())
     174              :                     } else {
     175            0 :                         Err(e.into())
     176              :                     };
     177              :                 }
     178              :             };
     179              : 
     180            0 :             let secret = if body.role_secret.is_empty() {
     181            0 :                 None
     182              :             } else {
     183            0 :                 let secret = scram::ServerSecret::parse(&body.role_secret)
     184            0 :                     .map(AuthSecret::Scram)
     185            0 :                     .ok_or(GetAuthInfoError::BadSecret)?;
     186            0 :                 Some(secret)
     187              :             };
     188            0 :             let allowed_ips = body.allowed_ips.unwrap_or_default();
     189            0 :             Metrics::get()
     190            0 :                 .proxy
     191            0 :                 .allowed_ips_number
     192            0 :                 .observe(allowed_ips.len() as f64);
     193            0 :             let allowed_vpc_endpoint_ids = body.allowed_vpc_endpoint_ids.unwrap_or_default();
     194            0 :             Metrics::get()
     195            0 :                 .proxy
     196            0 :                 .allowed_vpc_endpoint_ids
     197            0 :                 .observe(allowed_vpc_endpoint_ids.len() as f64);
     198            0 :             let block_public_connections = body.block_public_connections.unwrap_or_default();
     199            0 :             let block_vpc_connections = body.block_vpc_connections.unwrap_or_default();
     200            0 :             Ok(AuthInfo {
     201            0 :                 secret,
     202            0 :                 allowed_ips,
     203            0 :                 allowed_vpc_endpoint_ids,
     204            0 :                 project_id: body.project_id,
     205            0 :                 account_id: body.account_id,
     206            0 :                 access_blocker_flags: AccessBlockerFlags {
     207            0 :                     public_access_blocked: block_public_connections,
     208            0 :                     vpc_access_blocked: block_vpc_connections,
     209            0 :                 },
     210            0 :                 rate_limits: body.rate_limits,
     211            0 :             })
     212            0 :         }
     213            0 :         .inspect_err(|e| tracing::debug!(error = ?e))
     214            0 :         .instrument(info_span!("do_get_auth_info"))
     215            0 :         .await
     216            0 :     }
     217              : 
     218            0 :     async fn do_get_endpoint_jwks(
     219            0 :         &self,
     220            0 :         ctx: &RequestContext,
     221            0 :         endpoint: &EndpointId,
     222            0 :     ) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
     223            0 :         let request_id = ctx.session_id().to_string();
     224            0 :         async {
     225            0 :             let request = self
     226            0 :                 .endpoint
     227            0 :                 .get_with_url(|url| {
     228            0 :                     url.path_segments_mut()
     229            0 :                         .push("endpoints")
     230            0 :                         .push(endpoint.as_str())
     231            0 :                         .push("jwks");
     232            0 :                 })
     233            0 :                 .header(X_REQUEST_ID, &request_id)
     234            0 :                 .header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
     235            0 :                 .query(&[("session_id", ctx.session_id())])
     236            0 :                 .build()
     237            0 :                 .map_err(GetEndpointJwksError::RequestBuild)?;
     238              : 
     239            0 :             debug!(url = request.url().as_str(), "sending http request");
     240            0 :             let start = Instant::now();
     241            0 :             let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
     242            0 :             let response = self
     243            0 :                 .endpoint
     244            0 :                 .execute(request)
     245            0 :                 .await
     246            0 :                 .map_err(GetEndpointJwksError::RequestExecute)?;
     247            0 :             drop(pause);
     248            0 :             info!(duration = ?start.elapsed(), "received http response");
     249              : 
     250            0 :             let body = parse_body::<EndpointJwksResponse>(
     251            0 :                 response.status(),
     252            0 :                 response.bytes().await.map_err(ControlPlaneError::from)?,
     253            0 :             )?;
     254              : 
     255            0 :             let rules = body
     256            0 :                 .jwks
     257            0 :                 .into_iter()
     258            0 :                 .map(|jwks| AuthRule {
     259            0 :                     id: jwks.id,
     260            0 :                     jwks_url: jwks.jwks_url,
     261            0 :                     audience: jwks.jwt_audience,
     262            0 :                     role_names: jwks.role_names,
     263            0 :                 })
     264            0 :                 .collect();
     265              : 
     266            0 :             Ok(rules)
     267            0 :         }
     268            0 :         .inspect_err(|e| tracing::debug!(error = ?e))
     269            0 :         .instrument(info_span!("do_get_endpoint_jwks"))
     270            0 :         .await
     271            0 :     }
     272              : 
     273            0 :     async fn do_wake_compute(
     274            0 :         &self,
     275            0 :         ctx: &RequestContext,
     276            0 :         user_info: &ComputeUserInfo,
     277            0 :     ) -> Result<NodeInfo, WakeComputeError> {
     278            0 :         let request_id = ctx.session_id().to_string();
     279            0 :         let application_name = ctx.console_application_name();
     280            0 :         async {
     281            0 :             let mut request_builder = self
     282            0 :                 .endpoint
     283            0 :                 .get_path("wake_compute")
     284            0 :                 .header("X-Request-ID", &request_id)
     285            0 :                 .header("Authorization", format!("Bearer {}", &self.jwt))
     286            0 :                 .query(&[("session_id", ctx.session_id())])
     287            0 :                 .query(&[
     288            0 :                     ("application_name", application_name.as_str()),
     289            0 :                     ("endpointish", user_info.endpoint.as_str()),
     290            0 :                 ]);
     291              : 
     292            0 :             let options = user_info.options.to_deep_object();
     293            0 :             if !options.is_empty() {
     294            0 :                 request_builder = request_builder.query(&options);
     295            0 :             }
     296              : 
     297            0 :             let request = request_builder.build()?;
     298              : 
     299            0 :             debug!(url = request.url().as_str(), "sending http request");
     300            0 :             let start = Instant::now();
     301            0 :             let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
     302            0 :             let response = self.endpoint.execute(request).await?;
     303            0 :             drop(pause);
     304            0 :             info!(duration = ?start.elapsed(), "received http response");
     305            0 :             let body = parse_body::<WakeCompute>(response.status(), response.bytes().await?)?;
     306              : 
     307            0 :             let Some((host, port)) = parse_host_port(&body.address) else {
     308            0 :                 return Err(WakeComputeError::BadComputeAddress(body.address));
     309              :             };
     310              : 
     311            0 :             let host_addr = IpAddr::from_str(host).ok();
     312              : 
     313            0 :             let ssl_mode = match &body.server_name {
     314            0 :                 Some(_) => SslMode::Require,
     315            0 :                 None => SslMode::Disable,
     316              :             };
     317            0 :             let host = match body.server_name {
     318            0 :                 Some(host) => host.into(),
     319            0 :                 None => host.into(),
     320              :             };
     321              : 
     322            0 :             let node = NodeInfo {
     323            0 :                 conn_info: compute::ConnectInfo {
     324            0 :                     host_addr,
     325            0 :                     host,
     326            0 :                     port,
     327            0 :                     ssl_mode,
     328            0 :                 },
     329            0 :                 aux: body.aux,
     330            0 :             };
     331              : 
     332            0 :             Ok(node)
     333            0 :         }
     334            0 :         .inspect_err(|e| tracing::debug!(error = ?e))
     335            0 :         .instrument(info_span!("do_wake_compute"))
     336            0 :         .await
     337            0 :     }
     338              : }
     339              : 
     340              : impl super::ControlPlaneApi for NeonControlPlaneClient {
     341              :     #[tracing::instrument(skip_all)]
     342              :     async fn get_role_access_control(
     343              :         &self,
     344              :         ctx: &RequestContext,
     345              :         endpoint: &EndpointId,
     346              :         role: &RoleName,
     347              :     ) -> Result<RoleAccessControl, GetAuthInfoError> {
     348              :         let key = endpoint.normalize();
     349              : 
     350              :         if let Some((role_control, ttl)) = self
     351              :             .caches
     352              :             .project_info
     353              :             .get_role_secret_with_ttl(&key, role)
     354              :         {
     355              :             return match role_control {
     356              :                 Err(mut msg) => {
     357              :                     info!(key = &*key, "found cached get_role_access_control error");
     358              : 
     359              :                     // if retry_delay_ms is set change it to the remaining TTL
     360            0 :                     replace_retry_delay_ms(&mut msg, |_| ttl.as_millis() as u64);
     361              : 
     362              :                     Err(GetAuthInfoError::ApiError(ControlPlaneError::Message(msg)))
     363              :                 }
     364              :                 Ok(role_control) => {
     365              :                     debug!(key = &*key, "found cached role access control");
     366              :                     Ok(role_control)
     367              :                 }
     368              :             };
     369              :         }
     370              : 
     371            0 :         self.get_and_cache_auth_info(ctx, endpoint, role, &key, |_, role_control| {
     372            0 :             role_control.clone()
     373            0 :         })
     374              :         .await
     375              :     }
     376              : 
     377              :     #[tracing::instrument(skip_all)]
     378              :     async fn get_endpoint_access_control(
     379              :         &self,
     380              :         ctx: &RequestContext,
     381              :         endpoint: &EndpointId,
     382              :         role: &RoleName,
     383              :     ) -> Result<EndpointAccessControl, GetAuthInfoError> {
     384              :         let key = endpoint.normalize();
     385              : 
     386              :         if let Some((control, ttl)) = self.caches.project_info.get_endpoint_access_with_ttl(&key) {
     387              :             return match control {
     388              :                 Err(mut msg) => {
     389              :                     info!(
     390              :                         key = &*key,
     391              :                         "found cached get_endpoint_access_control error"
     392              :                     );
     393              : 
     394              :                     // if retry_delay_ms is set change it to the remaining TTL
     395            0 :                     replace_retry_delay_ms(&mut msg, |_| ttl.as_millis() as u64);
     396              : 
     397              :                     Err(GetAuthInfoError::ApiError(ControlPlaneError::Message(msg)))
     398              :                 }
     399              :                 Ok(control) => {
     400              :                     debug!(key = &*key, "found cached endpoint access control");
     401              :                     Ok(control)
     402              :                 }
     403              :             };
     404              :         }
     405              : 
     406            0 :         self.get_and_cache_auth_info(ctx, endpoint, role, &key, |control, _| control.clone())
     407              :             .await
     408              :     }
     409              : 
     410              :     #[tracing::instrument(skip_all)]
     411              :     async fn get_endpoint_jwks(
     412              :         &self,
     413              :         ctx: &RequestContext,
     414              :         endpoint: &EndpointId,
     415              :     ) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
     416              :         self.do_get_endpoint_jwks(ctx, endpoint).await
     417              :     }
     418              : 
     419              :     #[tracing::instrument(skip_all)]
     420              :     async fn wake_compute(
     421              :         &self,
     422              :         ctx: &RequestContext,
     423              :         user_info: &ComputeUserInfo,
     424              :     ) -> Result<CachedNodeInfo, WakeComputeError> {
     425              :         let key = user_info.endpoint_cache_key();
     426              : 
     427              :         macro_rules! check_cache {
     428              :             () => {
     429              :                 if let Some(cached) = self.caches.node_info.get_with_created_at(&key) {
     430              :                     let (cached, (info, created_at)) = cached.take_value();
     431              :                     return match info {
     432              :                         Err(mut msg) => {
     433              :                             info!(key = &*key, "found cached wake_compute error");
     434              : 
     435              :                             // if retry_delay_ms is set, reduce it by the amount of time it spent in cache
     436            0 :                             replace_retry_delay_ms(&mut msg, |delay| {
     437            0 :                                 delay.saturating_sub(created_at.elapsed().as_millis() as u64)
     438            0 :                             });
     439              : 
     440              :                             Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
     441              :                                 msg,
     442              :                             )))
     443              :                         }
     444              :                         Ok(info) => {
     445              :                             debug!(key = &*key, "found cached compute node info");
     446              :                             ctx.set_project(info.aux.clone());
     447              :                             Ok(cached.map(|()| info))
     448              :                         }
     449              :                     };
     450              :                 }
     451              :             };
     452              :         }
     453              : 
     454              :         // Every time we do a wakeup http request, the compute node will stay up
     455              :         // for some time (highly depends on the console's scale-to-zero policy);
     456              :         // The connection info remains the same during that period of time,
     457              :         // which means that we might cache it to reduce the load and latency.
     458              :         check_cache!();
     459              : 
     460              :         let permit = self.locks.get_permit(&key).await?;
     461              : 
     462              :         // after getting back a permit - it's possible the cache was filled
     463              :         // double check
     464              :         if permit.should_check_cache() {
     465              :             // TODO: if there is something in the cache, mark the permit as success.
     466              :             check_cache!();
     467              :         }
     468              : 
     469              :         // check rate limit
     470              :         if !self
     471              :             .wake_compute_endpoint_rate_limiter
     472              :             .check(user_info.endpoint.normalize_intern(), 1)
     473              :         {
     474              :             return Err(WakeComputeError::TooManyConnections);
     475              :         }
     476              : 
     477              :         let node = permit.release_result(self.do_wake_compute(ctx, user_info).await);
     478              :         match node {
     479              :             Ok(node) => {
     480              :                 ctx.set_project(node.aux.clone());
     481              :                 debug!(key = &*key, "created a cache entry for woken compute node");
     482              : 
     483              :                 let mut stored_node = node.clone();
     484              :                 // store the cached node as 'warm_cached'
     485              :                 stored_node.aux.cold_start_info = ColdStartInfo::WarmCached;
     486              : 
     487              :                 let (_, cached) = self.caches.node_info.insert_unit(key, Ok(stored_node));
     488              : 
     489              :                 Ok(cached.map(|()| node))
     490              :             }
     491              :             Err(err) => match err {
     492              :                 WakeComputeError::ControlPlane(ControlPlaneError::Message(ref msg)) => {
     493              :                     let retry_info = msg.status.as_ref().and_then(|s| s.details.retry_info);
     494              : 
     495              :                     // If we can retry this error, do not cache it,
     496              :                     // unless we were given a retry delay.
     497              :                     if msg.could_retry() && retry_info.is_none() {
     498              :                         return Err(err);
     499              :                     }
     500              : 
     501              :                     debug!(
     502              :                         key = &*key,
     503              :                         "created a cache entry for the wake compute error"
     504              :                     );
     505              : 
     506            0 :                     let ttl = retry_info.map_or(Duration::from_secs(30), |r| {
     507            0 :                         Duration::from_millis(r.retry_delay_ms)
     508            0 :                     });
     509              : 
     510              :                     self.caches.node_info.insert_ttl(key, Err(msg.clone()), ttl);
     511              : 
     512              :                     Err(err)
     513              :                 }
     514              :                 err => Err(err),
     515              :             },
     516              :         }
     517              :     }
     518              : }
     519              : 
     520            0 : fn replace_retry_delay_ms(msg: &mut ControlPlaneErrorMessage, f: impl FnOnce(u64) -> u64) {
     521            0 :     if let Some(status) = &mut msg.status
     522            0 :         && let Some(retry_info) = &mut status.details.retry_info
     523            0 :     {
     524            0 :         retry_info.retry_delay_ms = f(retry_info.retry_delay_ms);
     525            0 :     }
     526            0 : }
     527              : 
     528              : /// Parse http response body, taking status code into account.
     529            0 : fn parse_body<T: for<'a> serde::Deserialize<'a>>(
     530            0 :     status: StatusCode,
     531            0 :     body: Bytes,
     532            0 : ) -> Result<T, ControlPlaneError> {
     533            0 :     if status.is_success() {
     534              :         // We shouldn't log raw body because it may contain secrets.
     535            0 :         info!("request succeeded, processing the body");
     536            0 :         return Ok(serde_json::from_slice(&body).map_err(std::io::Error::other)?);
     537            0 :     }
     538              : 
     539              :     // Log plaintext to be able to detect, whether there are some cases not covered by the error struct.
     540            0 :     info!("response_error plaintext: {:?}", body);
     541              : 
     542              :     // Don't throw an error here because it's not as important
     543              :     // as the fact that the request itself has failed.
     544            0 :     let mut body = serde_json::from_slice(&body).unwrap_or_else(|e| {
     545            0 :         warn!("failed to parse error body: {e}");
     546            0 :         Box::new(ControlPlaneErrorMessage {
     547            0 :             error: "reason unclear (malformed error message)".into(),
     548            0 :             http_status_code: status,
     549            0 :             status: None,
     550            0 :         })
     551            0 :     });
     552            0 :     body.http_status_code = status;
     553              : 
     554            0 :     warn!("console responded with an error ({status}): {body:?}");
     555            0 :     Err(ControlPlaneError::Message(body))
     556            0 : }
     557              : 
     558            3 : fn parse_host_port(input: &str) -> Option<(&str, u16)> {
     559            3 :     let (host, port) = input.rsplit_once(':')?;
     560            3 :     let ipv6_brackets: &[_] = &['[', ']'];
     561            3 :     Some((host.trim_matches(ipv6_brackets), port.parse().ok()?))
     562            3 : }
     563              : 
     564              : #[cfg(test)]
     565              : mod tests {
     566              :     use super::*;
     567              : 
     568              :     #[test]
     569            1 :     fn test_parse_host_port_v4() {
     570            1 :         let (host, port) = parse_host_port("127.0.0.1:5432").expect("failed to parse");
     571            1 :         assert_eq!(host, "127.0.0.1");
     572            1 :         assert_eq!(port, 5432);
     573            1 :     }
     574              : 
     575              :     #[test]
     576            1 :     fn test_parse_host_port_v6() {
     577            1 :         let (host, port) = parse_host_port("[2001:db8::1]:5432").expect("failed to parse");
     578            1 :         assert_eq!(host, "2001:db8::1");
     579            1 :         assert_eq!(port, 5432);
     580            1 :     }
     581              : 
     582              :     #[test]
     583            1 :     fn test_parse_host_port_url() {
     584            1 :         let (host, port) = parse_host_port("compute-foo-bar-1234.default.svc.cluster.local:5432")
     585            1 :             .expect("failed to parse");
     586            1 :         assert_eq!(host, "compute-foo-bar-1234.default.svc.cluster.local");
     587            1 :         assert_eq!(port, 5432);
     588            1 :     }
     589              : }
        

Generated by: LCOV version 2.1-beta