LCOV - code coverage report
Current view: top level - proxy/src/serverless - backend.rs (source / functions) Coverage Total Hit
Test: 915229b2d22dd355ad718d9afbb773e7f2fba970.info Lines: 0.0 % 287 0
Test Date: 2025-07-24 10:33:41 Functions: 0.0 % 23 0

            Line data    Source code
       1              : use std::io;
       2              : use std::net::{IpAddr, SocketAddr};
       3              : use std::sync::Arc;
       4              : use std::time::Duration;
       5              : 
       6              : use async_trait::async_trait;
       7              : use ed25519_dalek::SigningKey;
       8              : use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
       9              : use jose_jwk::jose_b64;
      10              : use postgres_client::config::SslMode;
      11              : use rand_core::OsRng;
      12              : use rustls::pki_types::{DnsName, ServerName};
      13              : use tokio::net::{TcpStream, lookup_host};
      14              : use tokio_rustls::TlsConnector;
      15              : use tracing::field::display;
      16              : use tracing::{debug, info};
      17              : 
      18              : use super::AsyncRW;
      19              : use super::conn_pool::poll_client;
      20              : use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
      21              : use super::http_conn_pool::{self, HttpConnPool, LocalProxyClient, poll_http2_client};
      22              : use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool};
      23              : use crate::auth::backend::local::StaticAuthRules;
      24              : use crate::auth::backend::{ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo};
      25              : use crate::auth::{self, AuthError};
      26              : use crate::compute_ctl::{
      27              :     ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
      28              : };
      29              : use crate::config::{ComputeConfig, ProxyConfig};
      30              : use crate::context::RequestContext;
      31              : use crate::control_plane::CachedNodeInfo;
      32              : use crate::control_plane::client::ApiLockError;
      33              : use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
      34              : use crate::control_plane::locks::ApiLocks;
      35              : use crate::error::{ErrorKind, ReportableError, UserFacingError};
      36              : use crate::intern::EndpointIdInt;
      37              : use crate::proxy::connect_compute::ConnectMechanism;
      38              : use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
      39              : use crate::rate_limiter::EndpointRateLimiter;
      40              : use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
      41              : 
      42              : pub(crate) struct PoolingBackend {
      43              :     pub(crate) http_conn_pool:
      44              :         Arc<GlobalConnPool<LocalProxyClient, HttpConnPool<LocalProxyClient>>>,
      45              :     pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
      46              :     pub(crate) pool:
      47              :         Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
      48              : 
      49              :     pub(crate) config: &'static ProxyConfig,
      50              :     pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
      51              :     pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
      52              : }
      53              : 
      54              : impl PoolingBackend {
      55            0 :     pub(crate) async fn authenticate_with_password(
      56            0 :         &self,
      57            0 :         ctx: &RequestContext,
      58            0 :         user_info: &ComputeUserInfo,
      59            0 :         password: &[u8],
      60            0 :     ) -> Result<ComputeCredentials, AuthError> {
      61            0 :         ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
      62              : 
      63            0 :         let user_info = user_info.clone();
      64            0 :         let backend = self.auth_backend.as_ref().map(|()| user_info.clone());
      65            0 :         let access_control = backend.get_endpoint_access_control(ctx).await?;
      66            0 :         access_control.check(
      67            0 :             ctx,
      68            0 :             self.config.authentication_config.ip_allowlist_check_enabled,
      69            0 :             self.config.authentication_config.is_vpc_acccess_proxy,
      70            0 :         )?;
      71              : 
      72            0 :         access_control.connection_attempt_rate_limit(
      73            0 :             ctx,
      74            0 :             &user_info.endpoint,
      75            0 :             &self.endpoint_rate_limiter,
      76            0 :         )?;
      77              : 
      78            0 :         let role_access = backend.get_role_secret(ctx).await?;
      79            0 :         let Some(secret) = role_access.secret else {
      80              :             // If we don't have an authentication secret, for the http flow we can just return an error.
      81            0 :             info!("authentication info not found");
      82            0 :             return Err(AuthError::password_failed(&*user_info.user));
      83              :         };
      84              : 
      85            0 :         let ep = EndpointIdInt::from(&user_info.endpoint);
      86            0 :         let auth_outcome = crate::auth::validate_password_and_exchange(
      87            0 :             &self.config.authentication_config.thread_pool,
      88            0 :             ep,
      89            0 :             password,
      90            0 :             secret,
      91            0 :         )
      92            0 :         .await?;
      93            0 :         let res = match auth_outcome {
      94            0 :             crate::sasl::Outcome::Success(key) => {
      95            0 :                 info!("user successfully authenticated");
      96            0 :                 Ok(key)
      97              :             }
      98            0 :             crate::sasl::Outcome::Failure(reason) => {
      99            0 :                 info!("auth backend failed with an error: {reason}");
     100            0 :                 Err(AuthError::password_failed(&*user_info.user))
     101              :             }
     102              :         };
     103            0 :         res.map(|key| ComputeCredentials {
     104            0 :             info: user_info,
     105            0 :             keys: key,
     106            0 :         })
     107            0 :     }
     108              : 
     109            0 :     pub(crate) async fn authenticate_with_jwt(
     110            0 :         &self,
     111            0 :         ctx: &RequestContext,
     112            0 :         user_info: &ComputeUserInfo,
     113            0 :         jwt: String,
     114            0 :     ) -> Result<ComputeCredentials, AuthError> {
     115            0 :         ctx.set_auth_method(crate::context::AuthMethod::Jwt);
     116              : 
     117            0 :         match &self.auth_backend {
     118            0 :             crate::auth::Backend::ControlPlane(console, ()) => {
     119            0 :                 let keys = self
     120            0 :                     .config
     121            0 :                     .authentication_config
     122            0 :                     .jwks_cache
     123            0 :                     .check_jwt(
     124            0 :                         ctx,
     125            0 :                         user_info.endpoint.clone(),
     126            0 :                         &user_info.user,
     127            0 :                         &**console,
     128            0 :                         &jwt,
     129            0 :                     )
     130            0 :                     .await?;
     131              : 
     132            0 :                 Ok(ComputeCredentials {
     133            0 :                     info: user_info.clone(),
     134            0 :                     keys,
     135            0 :                 })
     136              :             }
     137              :             crate::auth::Backend::Local(_) => {
     138            0 :                 let keys = self
     139            0 :                     .config
     140            0 :                     .authentication_config
     141            0 :                     .jwks_cache
     142            0 :                     .check_jwt(
     143            0 :                         ctx,
     144            0 :                         user_info.endpoint.clone(),
     145            0 :                         &user_info.user,
     146            0 :                         &StaticAuthRules,
     147            0 :                         &jwt,
     148            0 :                     )
     149            0 :                     .await?;
     150              : 
     151            0 :                 Ok(ComputeCredentials {
     152            0 :                     info: user_info.clone(),
     153            0 :                     keys,
     154            0 :                 })
     155              :             }
     156              :         }
     157            0 :     }
     158              : 
     159              :     // Wake up the destination if needed. Code here is a bit involved because
     160              :     // we reuse the code from the usual proxy and we need to prepare few structures
     161              :     // that this code expects.
     162              :     #[tracing::instrument(skip_all, fields(
     163              :         pid = tracing::field::Empty,
     164              :         compute_id = tracing::field::Empty,
     165              :         conn_id = tracing::field::Empty,
     166              :     ))]
     167              :     pub(crate) async fn connect_to_compute(
     168              :         &self,
     169              :         ctx: &RequestContext,
     170              :         conn_info: ConnInfo,
     171              :         keys: ComputeCredentials,
     172              :         force_new: bool,
     173              :     ) -> Result<Client<postgres_client::Client>, HttpConnError> {
     174              :         let maybe_client = if force_new {
     175              :             debug!("pool: pool is disabled");
     176              :             None
     177              :         } else {
     178              :             debug!("pool: looking for an existing connection");
     179              :             self.pool.get(ctx, &conn_info)?
     180              :         };
     181              : 
     182              :         if let Some(client) = maybe_client {
     183              :             return Ok(client);
     184              :         }
     185              :         let conn_id = uuid::Uuid::new_v4();
     186              :         tracing::Span::current().record("conn_id", display(conn_id));
     187              :         info!(%conn_id, "pool: opening a new connection '{conn_info}'");
     188              :         let backend = self.auth_backend.as_ref().map(|()| keys.info);
     189              :         crate::proxy::connect_compute::connect_to_compute(
     190              :             ctx,
     191              :             &TokioMechanism {
     192              :                 conn_id,
     193              :                 conn_info,
     194              :                 pool: self.pool.clone(),
     195              :                 locks: &self.config.connect_compute_locks,
     196              :                 keys: keys.keys,
     197              :             },
     198              :             &backend,
     199              :             self.config.wake_compute_retry_config,
     200              :             &self.config.connect_to_compute,
     201              :         )
     202              :         .await
     203              :     }
     204              : 
     205              :     // Wake up the destination if needed
     206              :     #[tracing::instrument(skip_all, fields(
     207              :         compute_id = tracing::field::Empty,
     208              :         conn_id = tracing::field::Empty,
     209              :     ))]
     210              :     pub(crate) async fn connect_to_local_proxy(
     211              :         &self,
     212              :         ctx: &RequestContext,
     213              :         conn_info: ConnInfo,
     214              :     ) -> Result<http_conn_pool::Client<LocalProxyClient>, HttpConnError> {
     215              :         debug!("pool: looking for an existing connection");
     216              :         if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
     217              :             return Ok(client);
     218              :         }
     219              : 
     220              :         let conn_id = uuid::Uuid::new_v4();
     221              :         tracing::Span::current().record("conn_id", display(conn_id));
     222              :         debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
     223              :         let backend = self.auth_backend.as_ref().map(|()| ComputeUserInfo {
     224            0 :             user: conn_info.user_info.user.clone(),
     225            0 :             endpoint: EndpointId::from(format!(
     226            0 :                 "{}{LOCAL_PROXY_SUFFIX}",
     227            0 :                 conn_info.user_info.endpoint.normalize()
     228              :             )),
     229            0 :             options: conn_info.user_info.options.clone(),
     230            0 :         });
     231              :         crate::proxy::connect_compute::connect_to_compute(
     232              :             ctx,
     233              :             &HyperMechanism {
     234              :                 conn_id,
     235              :                 conn_info,
     236              :                 pool: self.http_conn_pool.clone(),
     237              :                 locks: &self.config.connect_compute_locks,
     238              :             },
     239              :             &backend,
     240              :             self.config.wake_compute_retry_config,
     241              :             &self.config.connect_to_compute,
     242              :         )
     243              :         .await
     244              :     }
     245              : 
     246              :     /// Connect to postgres over localhost.
     247              :     ///
     248              :     /// We expect postgres to be started here, so we won't do any retries.
     249              :     ///
     250              :     /// # Panics
     251              :     ///
     252              :     /// Panics if called with a non-local_proxy backend.
     253              :     #[tracing::instrument(skip_all, fields(
     254              :         pid = tracing::field::Empty,
     255              :         conn_id = tracing::field::Empty,
     256              :     ))]
     257              :     pub(crate) async fn connect_to_local_postgres(
     258              :         &self,
     259              :         ctx: &RequestContext,
     260              :         conn_info: ConnInfo,
     261              :         disable_pg_session_jwt: bool,
     262              :     ) -> Result<Client<postgres_client::Client>, HttpConnError> {
     263              :         if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
     264              :             return Ok(client);
     265              :         }
     266              : 
     267              :         let local_backend = match &self.auth_backend {
     268              :             auth::Backend::ControlPlane(_, ()) => {
     269              :                 unreachable!("only local_proxy can connect to local postgres")
     270              :             }
     271              :             auth::Backend::Local(local) => local,
     272              :         };
     273              : 
     274              :         if !self.local_pool.initialized(&conn_info) {
     275              :             // only install and grant usage one at a time.
     276              :             let _permit = local_backend
     277              :                 .initialize
     278              :                 .acquire()
     279              :                 .await
     280              :                 .expect("semaphore should never be closed");
     281              : 
     282              :             // check again for race
     283              :             if !self.local_pool.initialized(&conn_info) && !disable_pg_session_jwt {
     284              :                 local_backend
     285              :                     .compute_ctl
     286              :                     .install_extension(&ExtensionInstallRequest {
     287              :                         extension: EXT_NAME,
     288              :                         database: conn_info.dbname.clone(),
     289              :                         version: EXT_VERSION,
     290              :                     })
     291              :                     .await?;
     292              : 
     293              :                 local_backend
     294              :                     .compute_ctl
     295              :                     .grant_role(&SetRoleGrantsRequest {
     296              :                         schema: EXT_SCHEMA,
     297              :                         privileges: vec![Privilege::Usage],
     298              :                         database: conn_info.dbname.clone(),
     299              :                         role: conn_info.user_info.user.clone(),
     300              :                     })
     301              :                     .await?;
     302              : 
     303              :                 self.local_pool.set_initialized(&conn_info);
     304              :             }
     305              :         }
     306              : 
     307              :         let conn_id = uuid::Uuid::new_v4();
     308              :         tracing::Span::current().record("conn_id", display(conn_id));
     309              :         info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
     310              : 
     311              :         let (key, jwk) = create_random_jwk();
     312              : 
     313              :         let mut config = local_backend
     314              :             .node_info
     315              :             .conn_info
     316              :             .to_postgres_client_config();
     317              :         config
     318              :             .user(&conn_info.user_info.user)
     319              :             .dbname(&conn_info.dbname);
     320              :         if !disable_pg_session_jwt {
     321              :             config.set_param(
     322              :                 "options",
     323              :                 &format!(
     324              :                     "-c pg_session_jwt.jwk={}",
     325              :                     serde_json::to_string(&jwk).expect("serializing jwk to json should not fail")
     326              :                 ),
     327              :             );
     328              :         }
     329              : 
     330              :         let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
     331              :         let (client, connection) = config.connect(&postgres_client::NoTls).await?;
     332              :         drop(pause);
     333              : 
     334              :         let pid = client.get_process_id();
     335              :         tracing::Span::current().record("pid", pid);
     336              : 
     337              :         let mut handle = local_conn_pool::poll_client(
     338              :             self.local_pool.clone(),
     339              :             ctx,
     340              :             conn_info,
     341              :             client,
     342              :             connection,
     343              :             key,
     344              :             conn_id,
     345              :             local_backend.node_info.aux.clone(),
     346              :         );
     347              : 
     348              :         {
     349              :             let (client, mut discard) = handle.inner();
     350              :             debug!("setting up backend session state");
     351              : 
     352              :             // initiates the auth session
     353              :             if !disable_pg_session_jwt
     354              :                 && let Err(e) = client.batch_execute("select auth.init();").await
     355              :             {
     356              :                 discard.discard();
     357              :                 return Err(e.into());
     358              :             }
     359              : 
     360              :             info!("backend session state initialized");
     361              :         }
     362              : 
     363              :         Ok(handle)
     364              :     }
     365              : }
     366              : 
     367            0 : fn create_random_jwk() -> (SigningKey, jose_jwk::Key) {
     368            0 :     let key = SigningKey::generate(&mut OsRng);
     369              : 
     370            0 :     let jwk = jose_jwk::Key::Okp(jose_jwk::Okp {
     371            0 :         crv: jose_jwk::OkpCurves::Ed25519,
     372            0 :         x: jose_b64::serde::Bytes::from(key.verifying_key().to_bytes().to_vec()),
     373            0 :         d: None,
     374            0 :     });
     375              : 
     376            0 :     (key, jwk)
     377            0 : }
     378              : 
     379              : #[derive(Debug, thiserror::Error)]
     380              : pub(crate) enum HttpConnError {
     381              :     #[error("pooled connection closed at inconsistent state")]
     382              :     ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
     383              :     #[error("could not connect to postgres in compute")]
     384              :     PostgresConnectionError(#[from] postgres_client::Error),
     385              :     #[error("could not connect to local-proxy in compute")]
     386              :     LocalProxyConnectionError(#[from] LocalProxyConnError),
     387              :     #[error("could not parse JWT payload")]
     388              :     JwtPayloadError(serde_json::Error),
     389              : 
     390              :     #[error("could not install extension: {0}")]
     391              :     ComputeCtl(#[from] ComputeCtlError),
     392              :     #[error("could not get auth info")]
     393              :     GetAuthInfo(#[from] GetAuthInfoError),
     394              :     #[error("user not authenticated")]
     395              :     AuthError(#[from] AuthError),
     396              :     #[error("wake_compute returned error")]
     397              :     WakeCompute(#[from] WakeComputeError),
     398              :     #[error("error acquiring resource permit: {0}")]
     399              :     TooManyConnectionAttempts(#[from] ApiLockError),
     400              : }
     401              : 
     402              : #[derive(Debug, thiserror::Error)]
     403              : pub(crate) enum LocalProxyConnError {
     404              :     #[error("error with connection to local-proxy")]
     405              :     Io(#[source] std::io::Error),
     406              :     #[error("could not establish h2 connection")]
     407              :     H2(#[from] hyper::Error),
     408              : }
     409              : 
     410              : impl ReportableError for HttpConnError {
     411            0 :     fn get_error_kind(&self) -> ErrorKind {
     412            0 :         match self {
     413            0 :             HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
     414            0 :             HttpConnError::PostgresConnectionError(p) => {
     415            0 :                 if p.as_db_error().is_some() {
     416              :                     // postgres rejected the connection
     417            0 :                     ErrorKind::Postgres
     418              :                 } else {
     419              :                     // couldn't even reach postgres
     420            0 :                     ErrorKind::Compute
     421              :                 }
     422              :             }
     423            0 :             HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
     424            0 :             HttpConnError::ComputeCtl(_) => ErrorKind::Service,
     425            0 :             HttpConnError::JwtPayloadError(_) => ErrorKind::User,
     426            0 :             HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
     427            0 :             HttpConnError::AuthError(a) => a.get_error_kind(),
     428            0 :             HttpConnError::WakeCompute(w) => w.get_error_kind(),
     429            0 :             HttpConnError::TooManyConnectionAttempts(w) => w.get_error_kind(),
     430              :         }
     431            0 :     }
     432              : }
     433              : 
     434              : impl UserFacingError for HttpConnError {
     435            0 :     fn to_string_client(&self) -> String {
     436            0 :         match self {
     437            0 :             HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
     438            0 :             HttpConnError::PostgresConnectionError(p) => p.to_string(),
     439            0 :             HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
     440            0 :             HttpConnError::ComputeCtl(_) => "could not set up the JWT authorization database extension".to_string(),
     441            0 :             HttpConnError::JwtPayloadError(p) => p.to_string(),
     442            0 :             HttpConnError::GetAuthInfo(c) => c.to_string_client(),
     443            0 :             HttpConnError::AuthError(c) => c.to_string_client(),
     444            0 :             HttpConnError::WakeCompute(c) => c.to_string_client(),
     445              :             HttpConnError::TooManyConnectionAttempts(_) => {
     446            0 :                 "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
     447              :             }
     448              :         }
     449            0 :     }
     450              : }
     451              : 
     452              : impl CouldRetry for HttpConnError {
     453            0 :     fn could_retry(&self) -> bool {
     454            0 :         match self {
     455            0 :             HttpConnError::PostgresConnectionError(e) => e.could_retry(),
     456            0 :             HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
     457            0 :             HttpConnError::ComputeCtl(_) => false,
     458            0 :             HttpConnError::ConnectionClosedAbruptly(_) => false,
     459            0 :             HttpConnError::JwtPayloadError(_) => false,
     460            0 :             HttpConnError::GetAuthInfo(_) => false,
     461            0 :             HttpConnError::AuthError(_) => false,
     462            0 :             HttpConnError::WakeCompute(_) => false,
     463            0 :             HttpConnError::TooManyConnectionAttempts(_) => false,
     464              :         }
     465            0 :     }
     466              : }
     467              : impl ShouldRetryWakeCompute for HttpConnError {
     468            0 :     fn should_retry_wake_compute(&self) -> bool {
     469            0 :         match self {
     470            0 :             HttpConnError::PostgresConnectionError(e) => e.should_retry_wake_compute(),
     471              :             // we never checked cache validity
     472            0 :             HttpConnError::TooManyConnectionAttempts(_) => false,
     473            0 :             _ => true,
     474              :         }
     475            0 :     }
     476              : }
     477              : 
     478              : impl ReportableError for LocalProxyConnError {
     479            0 :     fn get_error_kind(&self) -> ErrorKind {
     480            0 :         match self {
     481            0 :             LocalProxyConnError::Io(_) => ErrorKind::Compute,
     482            0 :             LocalProxyConnError::H2(_) => ErrorKind::Compute,
     483              :         }
     484            0 :     }
     485              : }
     486              : 
     487              : impl UserFacingError for LocalProxyConnError {
     488            0 :     fn to_string_client(&self) -> String {
     489            0 :         "Could not establish HTTP connection to the database".to_string()
     490            0 :     }
     491              : }
     492              : 
     493              : impl CouldRetry for LocalProxyConnError {
     494            0 :     fn could_retry(&self) -> bool {
     495            0 :         match self {
     496            0 :             LocalProxyConnError::Io(_) => false,
     497            0 :             LocalProxyConnError::H2(_) => false,
     498              :         }
     499            0 :     }
     500              : }
     501              : impl ShouldRetryWakeCompute for LocalProxyConnError {
     502            0 :     fn should_retry_wake_compute(&self) -> bool {
     503            0 :         match self {
     504            0 :             LocalProxyConnError::Io(_) => false,
     505            0 :             LocalProxyConnError::H2(_) => false,
     506              :         }
     507            0 :     }
     508              : }
     509              : 
     510              : struct TokioMechanism {
     511              :     pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
     512              :     conn_info: ConnInfo,
     513              :     conn_id: uuid::Uuid,
     514              :     keys: ComputeCredentialKeys,
     515              : 
     516              :     /// connect_to_compute concurrency lock
     517              :     locks: &'static ApiLocks<Host>,
     518              : }
     519              : 
     520              : #[async_trait]
     521              : impl ConnectMechanism for TokioMechanism {
     522              :     type Connection = Client<postgres_client::Client>;
     523              :     type ConnectError = HttpConnError;
     524              :     type Error = HttpConnError;
     525              : 
     526            0 :     async fn connect_once(
     527              :         &self,
     528              :         ctx: &RequestContext,
     529              :         node_info: &CachedNodeInfo,
     530              :         compute_config: &ComputeConfig,
     531            0 :     ) -> Result<Self::Connection, Self::ConnectError> {
     532            0 :         let permit = self.locks.get_permit(&node_info.conn_info.host).await?;
     533              : 
     534            0 :         let mut config = node_info.conn_info.to_postgres_client_config();
     535            0 :         let config = config
     536            0 :             .user(&self.conn_info.user_info.user)
     537            0 :             .dbname(&self.conn_info.dbname)
     538            0 :             .connect_timeout(compute_config.timeout);
     539              : 
     540            0 :         if let ComputeCredentialKeys::AuthKeys(auth_keys) = self.keys {
     541            0 :             config.auth_keys(auth_keys);
     542            0 :         }
     543              : 
     544            0 :         let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
     545            0 :         let res = config.connect(compute_config).await;
     546            0 :         drop(pause);
     547            0 :         let (client, connection) = permit.release_result(res)?;
     548              : 
     549            0 :         tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
     550            0 :         tracing::Span::current().record(
     551            0 :             "compute_id",
     552            0 :             tracing::field::display(&node_info.aux.compute_id),
     553              :         );
     554              : 
     555            0 :         if let Some(query_id) = ctx.get_testodrome_id() {
     556            0 :             info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
     557            0 :         }
     558              : 
     559            0 :         Ok(poll_client(
     560            0 :             self.pool.clone(),
     561            0 :             ctx,
     562            0 :             self.conn_info.clone(),
     563            0 :             client,
     564            0 :             connection,
     565            0 :             self.conn_id,
     566            0 :             node_info.aux.clone(),
     567            0 :         ))
     568            0 :     }
     569              : }
     570              : 
     571              : struct HyperMechanism {
     572              :     pool: Arc<GlobalConnPool<LocalProxyClient, HttpConnPool<LocalProxyClient>>>,
     573              :     conn_info: ConnInfo,
     574              :     conn_id: uuid::Uuid,
     575              : 
     576              :     /// connect_to_compute concurrency lock
     577              :     locks: &'static ApiLocks<Host>,
     578              : }
     579              : 
     580              : #[async_trait]
     581              : impl ConnectMechanism for HyperMechanism {
     582              :     type Connection = http_conn_pool::Client<LocalProxyClient>;
     583              :     type ConnectError = HttpConnError;
     584              :     type Error = HttpConnError;
     585              : 
     586            0 :     async fn connect_once(
     587              :         &self,
     588              :         ctx: &RequestContext,
     589              :         node_info: &CachedNodeInfo,
     590              :         config: &ComputeConfig,
     591            0 :     ) -> Result<Self::Connection, Self::ConnectError> {
     592            0 :         let host_addr = node_info.conn_info.host_addr;
     593            0 :         let host = &node_info.conn_info.host;
     594            0 :         let permit = self.locks.get_permit(host).await?;
     595              : 
     596            0 :         let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
     597              : 
     598            0 :         let tls = if node_info.conn_info.ssl_mode == SslMode::Disable {
     599            0 :             None
     600              :         } else {
     601            0 :             Some(&config.tls)
     602              :         };
     603              : 
     604            0 :         let port = node_info.conn_info.port;
     605            0 :         let res = connect_http2(host_addr, host, port, config.timeout, tls).await;
     606            0 :         drop(pause);
     607            0 :         let (client, connection) = permit.release_result(res)?;
     608              : 
     609            0 :         tracing::Span::current().record(
     610            0 :             "compute_id",
     611            0 :             tracing::field::display(&node_info.aux.compute_id),
     612              :         );
     613              : 
     614            0 :         if let Some(query_id) = ctx.get_testodrome_id() {
     615            0 :             info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
     616            0 :         }
     617              : 
     618            0 :         Ok(poll_http2_client(
     619            0 :             self.pool.clone(),
     620            0 :             ctx,
     621            0 :             &self.conn_info,
     622            0 :             client,
     623            0 :             connection,
     624            0 :             self.conn_id,
     625            0 :             node_info.aux.clone(),
     626            0 :         ))
     627            0 :     }
     628              : }
     629              : 
     630            0 : async fn connect_http2(
     631            0 :     host_addr: Option<IpAddr>,
     632            0 :     host: &str,
     633            0 :     port: u16,
     634            0 :     timeout: Duration,
     635            0 :     tls: Option<&Arc<rustls::ClientConfig>>,
     636            0 : ) -> Result<
     637            0 :     (
     638            0 :         http_conn_pool::LocalProxyClient,
     639            0 :         http_conn_pool::LocalProxyConnection,
     640            0 :     ),
     641            0 :     LocalProxyConnError,
     642            0 : > {
     643            0 :     let addrs = match host_addr {
     644            0 :         Some(addr) => vec![SocketAddr::new(addr, port)],
     645            0 :         None => lookup_host((host, port))
     646            0 :             .await
     647            0 :             .map_err(LocalProxyConnError::Io)?
     648            0 :             .collect(),
     649              :     };
     650            0 :     let mut last_err = None;
     651              : 
     652            0 :     let mut addrs = addrs.into_iter();
     653            0 :     let stream = loop {
     654            0 :         let Some(addr) = addrs.next() else {
     655            0 :             return Err(last_err.unwrap_or_else(|| {
     656            0 :                 LocalProxyConnError::Io(io::Error::new(
     657            0 :                     io::ErrorKind::InvalidInput,
     658            0 :                     "could not resolve any addresses",
     659            0 :                 ))
     660            0 :             }));
     661              :         };
     662              : 
     663            0 :         match tokio::time::timeout(timeout, TcpStream::connect(addr)).await {
     664            0 :             Ok(Ok(stream)) => {
     665            0 :                 stream.set_nodelay(true).map_err(LocalProxyConnError::Io)?;
     666            0 :                 break stream;
     667              :             }
     668            0 :             Ok(Err(e)) => {
     669            0 :                 last_err = Some(LocalProxyConnError::Io(e));
     670            0 :             }
     671            0 :             Err(e) => {
     672            0 :                 last_err = Some(LocalProxyConnError::Io(io::Error::new(
     673            0 :                     io::ErrorKind::TimedOut,
     674            0 :                     e,
     675            0 :                 )));
     676            0 :             }
     677              :         }
     678              :     };
     679              : 
     680            0 :     let stream = if let Some(tls) = tls {
     681            0 :         let host = DnsName::try_from(host)
     682            0 :             .map_err(io::Error::other)
     683            0 :             .map_err(LocalProxyConnError::Io)?
     684            0 :             .to_owned();
     685            0 :         let stream = TlsConnector::from(tls.clone())
     686            0 :             .connect(ServerName::DnsName(host), stream)
     687            0 :             .await
     688            0 :             .map_err(LocalProxyConnError::Io)?;
     689            0 :         Box::pin(stream) as AsyncRW
     690              :     } else {
     691            0 :         Box::pin(stream) as AsyncRW
     692              :     };
     693              : 
     694            0 :     let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
     695            0 :         .timer(TokioTimer::new())
     696            0 :         .keep_alive_interval(Duration::from_secs(20))
     697            0 :         .keep_alive_while_idle(true)
     698            0 :         .keep_alive_timeout(Duration::from_secs(5))
     699            0 :         .handshake(TokioIo::new(stream))
     700            0 :         .await?;
     701              : 
     702            0 :     Ok((client, connection))
     703            0 : }
        

Generated by: LCOV version 2.1-beta