|             Line data    Source code 
       1              : use std::io;
       2              : use std::sync::Arc;
       3              : use std::time::Duration;
       4              : 
       5              : use async_trait::async_trait;
       6              : use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
       7              : use p256::ecdsa::SigningKey;
       8              : use p256::elliptic_curve::JwkEcKey;
       9              : use rand::rngs::OsRng;
      10              : use tokio::net::{lookup_host, TcpStream};
      11              : use tracing::field::display;
      12              : use tracing::{debug, info};
      13              : 
      14              : use super::conn_pool::poll_client;
      15              : use super::conn_pool_lib::{Client, ConnInfo, GlobalConnPool};
      16              : use super::http_conn_pool::{self, poll_http2_client, Send};
      17              : use super::local_conn_pool::{self, LocalClient, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION};
      18              : use crate::auth::backend::local::StaticAuthRules;
      19              : use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
      20              : use crate::auth::{self, check_peer_addr_is_in_list, AuthError};
      21              : use crate::compute_ctl::{
      22              :     ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
      23              : };
      24              : use crate::config::ProxyConfig;
      25              : use crate::context::RequestMonitoring;
      26              : use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
      27              : use crate::control_plane::locks::ApiLocks;
      28              : use crate::control_plane::provider::ApiLockError;
      29              : use crate::control_plane::CachedNodeInfo;
      30              : use crate::error::{ErrorKind, ReportableError, UserFacingError};
      31              : use crate::intern::EndpointIdInt;
      32              : use crate::proxy::connect_compute::ConnectMechanism;
      33              : use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
      34              : use crate::rate_limiter::EndpointRateLimiter;
      35              : use crate::{compute, EndpointId, Host};
      36              : 
      37              : pub(crate) struct PoolingBackend {
      38              :     pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool<Send>>,
      39              :     pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
      40              :     pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
      41              : 
      42              :     pub(crate) config: &'static ProxyConfig,
      43              :     pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
      44              :     pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
      45              : }
      46              : 
      47              : impl PoolingBackend {
      48            0 :     pub(crate) async fn authenticate_with_password(
      49            0 :         &self,
      50            0 :         ctx: &RequestMonitoring,
      51            0 :         user_info: &ComputeUserInfo,
      52            0 :         password: &[u8],
      53            0 :     ) -> Result<ComputeCredentials, AuthError> {
      54            0 :         let user_info = user_info.clone();
      55            0 :         let backend = self.auth_backend.as_ref().map(|()| user_info.clone());
      56            0 :         let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
      57            0 :         if self.config.authentication_config.ip_allowlist_check_enabled
      58            0 :             && !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
      59              :         {
      60            0 :             return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
      61            0 :         }
      62            0 :         if !self
      63            0 :             .endpoint_rate_limiter
      64            0 :             .check(user_info.endpoint.clone().into(), 1)
      65              :         {
      66            0 :             return Err(AuthError::too_many_connections());
      67            0 :         }
      68            0 :         let cached_secret = match maybe_secret {
      69            0 :             Some(secret) => secret,
      70            0 :             None => backend.get_role_secret(ctx).await?,
      71              :         };
      72              : 
      73            0 :         let secret = match cached_secret.value.clone() {
      74            0 :             Some(secret) => self.config.authentication_config.check_rate_limit(
      75            0 :                 ctx,
      76            0 :                 secret,
      77            0 :                 &user_info.endpoint,
      78            0 :                 true,
      79            0 :             )?,
      80              :             None => {
      81              :                 // If we don't have an authentication secret, for the http flow we can just return an error.
      82            0 :                 info!("authentication info not found");
      83            0 :                 return Err(AuthError::auth_failed(&*user_info.user));
      84              :             }
      85              :         };
      86            0 :         let ep = EndpointIdInt::from(&user_info.endpoint);
      87            0 :         let auth_outcome = crate::auth::validate_password_and_exchange(
      88            0 :             &self.config.authentication_config.thread_pool,
      89            0 :             ep,
      90            0 :             password,
      91            0 :             secret,
      92            0 :         )
      93            0 :         .await?;
      94            0 :         let res = match auth_outcome {
      95            0 :             crate::sasl::Outcome::Success(key) => {
      96            0 :                 info!("user successfully authenticated");
      97            0 :                 Ok(key)
      98              :             }
      99            0 :             crate::sasl::Outcome::Failure(reason) => {
     100            0 :                 info!("auth backend failed with an error: {reason}");
     101            0 :                 Err(AuthError::auth_failed(&*user_info.user))
     102              :             }
     103              :         };
     104            0 :         res.map(|key| ComputeCredentials {
     105            0 :             info: user_info,
     106            0 :             keys: key,
     107            0 :         })
     108            0 :     }
     109              : 
     110            0 :     pub(crate) async fn authenticate_with_jwt(
     111            0 :         &self,
     112            0 :         ctx: &RequestMonitoring,
     113            0 :         user_info: &ComputeUserInfo,
     114            0 :         jwt: String,
     115            0 :     ) -> Result<ComputeCredentials, AuthError> {
     116            0 :         match &self.auth_backend {
     117            0 :             crate::auth::Backend::ControlPlane(console, ()) => {
     118            0 :                 self.config
     119            0 :                     .authentication_config
     120            0 :                     .jwks_cache
     121            0 :                     .check_jwt(
     122            0 :                         ctx,
     123            0 :                         user_info.endpoint.clone(),
     124            0 :                         &user_info.user,
     125            0 :                         &**console,
     126            0 :                         &jwt,
     127            0 :                     )
     128            0 :                     .await
     129            0 :                     .map_err(|e| AuthError::auth_failed(e.to_string()))?;
     130              : 
     131            0 :                 Ok(ComputeCredentials {
     132            0 :                     info: user_info.clone(),
     133            0 :                     keys: crate::auth::backend::ComputeCredentialKeys::None,
     134            0 :                 })
     135              :             }
     136              :             crate::auth::Backend::Local(_) => {
     137            0 :                 let keys = self
     138            0 :                     .config
     139            0 :                     .authentication_config
     140            0 :                     .jwks_cache
     141            0 :                     .check_jwt(
     142            0 :                         ctx,
     143            0 :                         user_info.endpoint.clone(),
     144            0 :                         &user_info.user,
     145            0 :                         &StaticAuthRules,
     146            0 :                         &jwt,
     147            0 :                     )
     148            0 :                     .await
     149            0 :                     .map_err(|e| AuthError::auth_failed(e.to_string()))?;
     150              : 
     151            0 :                 Ok(ComputeCredentials {
     152            0 :                     info: user_info.clone(),
     153            0 :                     keys,
     154            0 :                 })
     155              :             }
     156              :         }
     157            0 :     }
     158              : 
     159              :     // Wake up the destination if needed. Code here is a bit involved because
     160              :     // we reuse the code from the usual proxy and we need to prepare few structures
     161              :     // that this code expects.
     162            0 :     #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
     163              :     pub(crate) async fn connect_to_compute(
     164              :         &self,
     165              :         ctx: &RequestMonitoring,
     166              :         conn_info: ConnInfo,
     167              :         keys: ComputeCredentials,
     168              :         force_new: bool,
     169              :     ) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
     170              :         let maybe_client = if force_new {
     171              :             info!("pool: pool is disabled");
     172              :             None
     173              :         } else {
     174              :             info!("pool: looking for an existing connection");
     175              :             self.pool.get(ctx, &conn_info)?
     176              :         };
     177              : 
     178              :         if let Some(client) = maybe_client {
     179              :             return Ok(client);
     180              :         }
     181              :         let conn_id = uuid::Uuid::new_v4();
     182              :         tracing::Span::current().record("conn_id", display(conn_id));
     183              :         info!(%conn_id, "pool: opening a new connection '{conn_info}'");
     184            0 :         let backend = self.auth_backend.as_ref().map(|()| keys);
     185              :         crate::proxy::connect_compute::connect_to_compute(
     186              :             ctx,
     187              :             &TokioMechanism {
     188              :                 conn_id,
     189              :                 conn_info,
     190              :                 pool: self.pool.clone(),
     191              :                 locks: &self.config.connect_compute_locks,
     192              :             },
     193              :             &backend,
     194              :             false, // do not allow self signed compute for http flow
     195              :             self.config.wake_compute_retry_config,
     196              :             self.config.connect_to_compute_retry_config,
     197              :         )
     198              :         .await
     199              :     }
     200              : 
     201              :     // Wake up the destination if needed
     202            0 :     #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
     203              :     pub(crate) async fn connect_to_local_proxy(
     204              :         &self,
     205              :         ctx: &RequestMonitoring,
     206              :         conn_info: ConnInfo,
     207              :     ) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
     208              :         info!("pool: looking for an existing connection");
     209              :         if let Some(client) = self.http_conn_pool.get(ctx, &conn_info) {
     210              :             return Ok(client);
     211              :         }
     212              : 
     213              :         let conn_id = uuid::Uuid::new_v4();
     214              :         tracing::Span::current().record("conn_id", display(conn_id));
     215              :         info!(%conn_id, "pool: opening a new connection '{conn_info}'");
     216            0 :         let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
     217            0 :             info: ComputeUserInfo {
     218            0 :                 user: conn_info.user_info.user.clone(),
     219            0 :                 endpoint: EndpointId::from(format!("{}-local-proxy", conn_info.user_info.endpoint)),
     220            0 :                 options: conn_info.user_info.options.clone(),
     221            0 :             },
     222            0 :             keys: crate::auth::backend::ComputeCredentialKeys::None,
     223            0 :         });
     224              :         crate::proxy::connect_compute::connect_to_compute(
     225              :             ctx,
     226              :             &HyperMechanism {
     227              :                 conn_id,
     228              :                 conn_info,
     229              :                 pool: self.http_conn_pool.clone(),
     230              :                 locks: &self.config.connect_compute_locks,
     231              :             },
     232              :             &backend,
     233              :             false, // do not allow self signed compute for http flow
     234              :             self.config.wake_compute_retry_config,
     235              :             self.config.connect_to_compute_retry_config,
     236              :         )
     237              :         .await
     238              :     }
     239              : 
     240              :     /// Connect to postgres over localhost.
     241              :     ///
     242              :     /// We expect postgres to be started here, so we won't do any retries.
     243              :     ///
     244              :     /// # Panics
     245              :     ///
     246              :     /// Panics if called with a non-local_proxy backend.
     247            0 :     #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
     248              :     pub(crate) async fn connect_to_local_postgres(
     249              :         &self,
     250              :         ctx: &RequestMonitoring,
     251              :         conn_info: ConnInfo,
     252              :     ) -> Result<LocalClient<tokio_postgres::Client>, HttpConnError> {
     253              :         if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
     254              :             return Ok(client);
     255              :         }
     256              : 
     257              :         let local_backend = match &self.auth_backend {
     258              :             auth::Backend::ControlPlane(_, ()) => {
     259              :                 unreachable!("only local_proxy can connect to local postgres")
     260              :             }
     261              :             auth::Backend::Local(local) => local,
     262              :         };
     263              : 
     264              :         if !self.local_pool.initialized(&conn_info) {
     265              :             // only install and grant usage one at a time.
     266              :             let _permit = local_backend.initialize.acquire().await.unwrap();
     267              : 
     268              :             // check again for race
     269              :             if !self.local_pool.initialized(&conn_info) {
     270              :                 local_backend
     271              :                     .compute_ctl
     272              :                     .install_extension(&ExtensionInstallRequest {
     273              :                         extension: EXT_NAME,
     274              :                         database: conn_info.dbname.clone(),
     275              :                         version: EXT_VERSION,
     276              :                     })
     277              :                     .await?;
     278              : 
     279              :                 local_backend
     280              :                     .compute_ctl
     281              :                     .grant_role(&SetRoleGrantsRequest {
     282              :                         schema: EXT_SCHEMA,
     283              :                         privileges: vec![Privilege::Usage],
     284              :                         database: conn_info.dbname.clone(),
     285              :                         role: conn_info.user_info.user.clone(),
     286              :                     })
     287              :                     .await?;
     288              : 
     289              :                 self.local_pool.set_initialized(&conn_info);
     290              :             }
     291              :         }
     292              : 
     293              :         let conn_id = uuid::Uuid::new_v4();
     294              :         tracing::Span::current().record("conn_id", display(conn_id));
     295              :         info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
     296              : 
     297              :         let mut node_info = local_backend.node_info.clone();
     298              : 
     299              :         let (key, jwk) = create_random_jwk();
     300              : 
     301              :         let config = node_info
     302              :             .config
     303              :             .user(&conn_info.user_info.user)
     304              :             .dbname(&conn_info.dbname)
     305              :             .options(&format!(
     306              :                 "-c pg_session_jwt.jwk={}",
     307              :                 serde_json::to_string(&jwk).expect("serializing jwk to json should not fail")
     308              :             ));
     309              : 
     310              :         let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
     311              :         let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
     312              :         drop(pause);
     313              : 
     314              :         let pid = client.get_process_id();
     315              :         tracing::Span::current().record("pid", pid);
     316              : 
     317              :         let mut handle = local_conn_pool::poll_client(
     318              :             self.local_pool.clone(),
     319              :             ctx,
     320              :             conn_info,
     321              :             client,
     322              :             connection,
     323              :             key,
     324              :             conn_id,
     325              :             node_info.aux.clone(),
     326              :         );
     327              : 
     328              :         {
     329              :             let (client, mut discard) = handle.inner();
     330              :             debug!("setting up backend session state");
     331              : 
     332              :             // initiates the auth session
     333              :             if let Err(e) = client.query("select auth.init()", &[]).await {
     334              :                 discard.discard();
     335              :                 return Err(e.into());
     336              :             }
     337              : 
     338              :             info!("backend session state initialized");
     339              :         }
     340              : 
     341              :         Ok(handle)
     342              :     }
     343              : }
     344              : 
     345            0 : fn create_random_jwk() -> (SigningKey, JwkEcKey) {
     346            0 :     let key = SigningKey::random(&mut OsRng);
     347            0 :     let jwk = p256::PublicKey::from(key.verifying_key()).to_jwk();
     348            0 :     (key, jwk)
     349            0 : }
     350              : 
     351            0 : #[derive(Debug, thiserror::Error)]
     352              : pub(crate) enum HttpConnError {
     353              :     #[error("pooled connection closed at inconsistent state")]
     354              :     ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
     355              :     #[error("could not connection to postgres in compute")]
     356              :     PostgresConnectionError(#[from] tokio_postgres::Error),
     357              :     #[error("could not connection to local-proxy in compute")]
     358              :     LocalProxyConnectionError(#[from] LocalProxyConnError),
     359              :     #[error("could not parse JWT payload")]
     360              :     JwtPayloadError(serde_json::Error),
     361              : 
     362              :     #[error("could not install extension: {0}")]
     363              :     ComputeCtl(#[from] ComputeCtlError),
     364              :     #[error("could not get auth info")]
     365              :     GetAuthInfo(#[from] GetAuthInfoError),
     366              :     #[error("user not authenticated")]
     367              :     AuthError(#[from] AuthError),
     368              :     #[error("wake_compute returned error")]
     369              :     WakeCompute(#[from] WakeComputeError),
     370              :     #[error("error acquiring resource permit: {0}")]
     371              :     TooManyConnectionAttempts(#[from] ApiLockError),
     372              : }
     373              : 
     374            0 : #[derive(Debug, thiserror::Error)]
     375              : pub(crate) enum LocalProxyConnError {
     376              :     #[error("error with connection to local-proxy")]
     377              :     Io(#[source] std::io::Error),
     378              :     #[error("could not establish h2 connection")]
     379              :     H2(#[from] hyper::Error),
     380              : }
     381              : 
     382              : impl ReportableError for HttpConnError {
     383            0 :     fn get_error_kind(&self) -> ErrorKind {
     384            0 :         match self {
     385            0 :             HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
     386            0 :             HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
     387            0 :             HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
     388            0 :             HttpConnError::ComputeCtl(_) => ErrorKind::Service,
     389            0 :             HttpConnError::JwtPayloadError(_) => ErrorKind::User,
     390            0 :             HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
     391            0 :             HttpConnError::AuthError(a) => a.get_error_kind(),
     392            0 :             HttpConnError::WakeCompute(w) => w.get_error_kind(),
     393            0 :             HttpConnError::TooManyConnectionAttempts(w) => w.get_error_kind(),
     394              :         }
     395            0 :     }
     396              : }
     397              : 
     398              : impl UserFacingError for HttpConnError {
     399            0 :     fn to_string_client(&self) -> String {
     400            0 :         match self {
     401            0 :             HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
     402            0 :             HttpConnError::PostgresConnectionError(p) => p.to_string(),
     403            0 :             HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
     404            0 :             HttpConnError::ComputeCtl(_) => "could not set up the JWT authorization database extension".to_string(),
     405            0 :             HttpConnError::JwtPayloadError(p) => p.to_string(),
     406            0 :             HttpConnError::GetAuthInfo(c) => c.to_string_client(),
     407            0 :             HttpConnError::AuthError(c) => c.to_string_client(),
     408            0 :             HttpConnError::WakeCompute(c) => c.to_string_client(),
     409              :             HttpConnError::TooManyConnectionAttempts(_) => {
     410            0 :                 "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
     411              :             }
     412              :         }
     413            0 :     }
     414              : }
     415              : 
     416              : impl CouldRetry for HttpConnError {
     417            0 :     fn could_retry(&self) -> bool {
     418            0 :         match self {
     419            0 :             HttpConnError::PostgresConnectionError(e) => e.could_retry(),
     420            0 :             HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
     421            0 :             HttpConnError::ComputeCtl(_) => false,
     422            0 :             HttpConnError::ConnectionClosedAbruptly(_) => false,
     423            0 :             HttpConnError::JwtPayloadError(_) => false,
     424            0 :             HttpConnError::GetAuthInfo(_) => false,
     425            0 :             HttpConnError::AuthError(_) => false,
     426            0 :             HttpConnError::WakeCompute(_) => false,
     427            0 :             HttpConnError::TooManyConnectionAttempts(_) => false,
     428              :         }
     429            0 :     }
     430              : }
     431              : impl ShouldRetryWakeCompute for HttpConnError {
     432            0 :     fn should_retry_wake_compute(&self) -> bool {
     433            0 :         match self {
     434            0 :             HttpConnError::PostgresConnectionError(e) => e.should_retry_wake_compute(),
     435              :             // we never checked cache validity
     436            0 :             HttpConnError::TooManyConnectionAttempts(_) => false,
     437            0 :             _ => true,
     438              :         }
     439            0 :     }
     440              : }
     441              : 
     442              : impl ReportableError for LocalProxyConnError {
     443            0 :     fn get_error_kind(&self) -> ErrorKind {
     444            0 :         match self {
     445            0 :             LocalProxyConnError::Io(_) => ErrorKind::Compute,
     446            0 :             LocalProxyConnError::H2(_) => ErrorKind::Compute,
     447              :         }
     448            0 :     }
     449              : }
     450              : 
     451              : impl UserFacingError for LocalProxyConnError {
     452            0 :     fn to_string_client(&self) -> String {
     453            0 :         "Could not establish HTTP connection to the database".to_string()
     454            0 :     }
     455              : }
     456              : 
     457              : impl CouldRetry for LocalProxyConnError {
     458            0 :     fn could_retry(&self) -> bool {
     459            0 :         match self {
     460            0 :             LocalProxyConnError::Io(_) => false,
     461            0 :             LocalProxyConnError::H2(_) => false,
     462              :         }
     463            0 :     }
     464              : }
     465              : impl ShouldRetryWakeCompute for LocalProxyConnError {
     466            0 :     fn should_retry_wake_compute(&self) -> bool {
     467            0 :         match self {
     468            0 :             LocalProxyConnError::Io(_) => false,
     469            0 :             LocalProxyConnError::H2(_) => false,
     470              :         }
     471            0 :     }
     472              : }
     473              : 
     474              : struct TokioMechanism {
     475              :     pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
     476              :     conn_info: ConnInfo,
     477              :     conn_id: uuid::Uuid,
     478              : 
     479              :     /// connect_to_compute concurrency lock
     480              :     locks: &'static ApiLocks<Host>,
     481              : }
     482              : 
     483              : #[async_trait]
     484              : impl ConnectMechanism for TokioMechanism {
     485              :     type Connection = Client<tokio_postgres::Client>;
     486              :     type ConnectError = HttpConnError;
     487              :     type Error = HttpConnError;
     488              : 
     489            0 :     async fn connect_once(
     490            0 :         &self,
     491            0 :         ctx: &RequestMonitoring,
     492            0 :         node_info: &CachedNodeInfo,
     493            0 :         timeout: Duration,
     494            0 :     ) -> Result<Self::Connection, Self::ConnectError> {
     495            0 :         let host = node_info.config.get_host()?;
     496            0 :         let permit = self.locks.get_permit(&host).await?;
     497              : 
     498            0 :         let mut config = (*node_info.config).clone();
     499            0 :         let config = config
     500            0 :             .user(&self.conn_info.user_info.user)
     501            0 :             .dbname(&self.conn_info.dbname)
     502            0 :             .connect_timeout(timeout);
     503            0 : 
     504            0 :         let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
     505            0 :         let res = config.connect(tokio_postgres::NoTls).await;
     506            0 :         drop(pause);
     507            0 :         let (client, connection) = permit.release_result(res)?;
     508              : 
     509            0 :         tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
     510            0 :         Ok(poll_client(
     511            0 :             self.pool.clone(),
     512            0 :             ctx,
     513            0 :             self.conn_info.clone(),
     514            0 :             client,
     515            0 :             connection,
     516            0 :             self.conn_id,
     517            0 :             node_info.aux.clone(),
     518            0 :         ))
     519            0 :     }
     520              : 
     521            0 :     fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
     522              : }
     523              : 
     524              : struct HyperMechanism {
     525              :     pool: Arc<http_conn_pool::GlobalConnPool<Send>>,
     526              :     conn_info: ConnInfo,
     527              :     conn_id: uuid::Uuid,
     528              : 
     529              :     /// connect_to_compute concurrency lock
     530              :     locks: &'static ApiLocks<Host>,
     531              : }
     532              : 
     533              : #[async_trait]
     534              : impl ConnectMechanism for HyperMechanism {
     535              :     type Connection = http_conn_pool::Client<Send>;
     536              :     type ConnectError = HttpConnError;
     537              :     type Error = HttpConnError;
     538              : 
     539            0 :     async fn connect_once(
     540            0 :         &self,
     541            0 :         ctx: &RequestMonitoring,
     542            0 :         node_info: &CachedNodeInfo,
     543            0 :         timeout: Duration,
     544            0 :     ) -> Result<Self::Connection, Self::ConnectError> {
     545            0 :         let host = node_info.config.get_host()?;
     546            0 :         let permit = self.locks.get_permit(&host).await?;
     547              : 
     548            0 :         let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
     549              : 
     550            0 :         let port = *node_info.config.get_ports().first().ok_or_else(|| {
     551            0 :             HttpConnError::WakeCompute(WakeComputeError::BadComputeAddress(
     552            0 :                 "local-proxy port missing on compute address".into(),
     553            0 :             ))
     554            0 :         })?;
     555            0 :         let res = connect_http2(&host, port, timeout).await;
     556            0 :         drop(pause);
     557            0 :         let (client, connection) = permit.release_result(res)?;
     558              : 
     559            0 :         Ok(poll_http2_client(
     560            0 :             self.pool.clone(),
     561            0 :             ctx,
     562            0 :             &self.conn_info,
     563            0 :             client,
     564            0 :             connection,
     565            0 :             self.conn_id,
     566            0 :             node_info.aux.clone(),
     567            0 :         ))
     568            0 :     }
     569              : 
     570            0 :     fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
     571              : }
     572              : 
     573            0 : async fn connect_http2(
     574            0 :     host: &str,
     575            0 :     port: u16,
     576            0 :     timeout: Duration,
     577            0 : ) -> Result<(http_conn_pool::Send, http_conn_pool::Connect), LocalProxyConnError> {
     578              :     // assumption: host is an ip address so this should not actually perform any requests.
     579              :     // todo: add that assumption as a guarantee in the control-plane API.
     580            0 :     let mut addrs = lookup_host((host, port))
     581            0 :         .await
     582            0 :         .map_err(LocalProxyConnError::Io)?;
     583              : 
     584            0 :     let mut last_err = None;
     585              : 
     586            0 :     let stream = loop {
     587            0 :         let Some(addr) = addrs.next() else {
     588            0 :             return Err(last_err.unwrap_or_else(|| {
     589            0 :                 LocalProxyConnError::Io(io::Error::new(
     590            0 :                     io::ErrorKind::InvalidInput,
     591            0 :                     "could not resolve any addresses",
     592            0 :                 ))
     593            0 :             }));
     594              :         };
     595              : 
     596            0 :         match tokio::time::timeout(timeout, TcpStream::connect(addr)).await {
     597            0 :             Ok(Ok(stream)) => {
     598            0 :                 stream.set_nodelay(true).map_err(LocalProxyConnError::Io)?;
     599            0 :                 break stream;
     600              :             }
     601            0 :             Ok(Err(e)) => {
     602            0 :                 last_err = Some(LocalProxyConnError::Io(e));
     603            0 :             }
     604            0 :             Err(e) => {
     605            0 :                 last_err = Some(LocalProxyConnError::Io(io::Error::new(
     606            0 :                     io::ErrorKind::TimedOut,
     607            0 :                     e,
     608            0 :                 )));
     609            0 :             }
     610              :         };
     611              :     };
     612              : 
     613            0 :     let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
     614            0 :         .timer(TokioTimer::new())
     615            0 :         .keep_alive_interval(Duration::from_secs(20))
     616            0 :         .keep_alive_while_idle(true)
     617            0 :         .keep_alive_timeout(Duration::from_secs(5))
     618            0 :         .handshake(TokioIo::new(stream))
     619            0 :         .await?;
     620              : 
     621            0 :     Ok((client, connection))
     622            0 : }
         |