LCOV - code coverage report
Current view: top level - proxy/src/serverless - backend.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 0.0 % 293 0
Test Date: 2025-02-20 13:11:02 Functions: 0.0 % 26 0

            Line data    Source code
       1              : use std::io;
       2              : use std::sync::Arc;
       3              : use std::time::Duration;
       4              : 
       5              : use async_trait::async_trait;
       6              : use ed25519_dalek::SigningKey;
       7              : use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
       8              : use jose_jwk::jose_b64;
       9              : use rand::rngs::OsRng;
      10              : use tokio::net::{lookup_host, TcpStream};
      11              : use tracing::field::display;
      12              : use tracing::{debug, info};
      13              : 
      14              : use super::conn_pool::poll_client;
      15              : use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
      16              : use super::http_conn_pool::{self, poll_http2_client, HttpConnPool, Send};
      17              : use super::local_conn_pool::{self, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION};
      18              : use crate::auth::backend::local::StaticAuthRules;
      19              : use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
      20              : use crate::auth::{self, check_peer_addr_is_in_list, AuthError};
      21              : use crate::compute;
      22              : use crate::compute_ctl::{
      23              :     ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
      24              : };
      25              : use crate::config::{ComputeConfig, ProxyConfig};
      26              : use crate::context::RequestContext;
      27              : use crate::control_plane::client::ApiLockError;
      28              : use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
      29              : use crate::control_plane::locks::ApiLocks;
      30              : use crate::control_plane::CachedNodeInfo;
      31              : use crate::error::{ErrorKind, ReportableError, UserFacingError};
      32              : use crate::intern::EndpointIdInt;
      33              : use crate::protocol2::ConnectionInfoExtra;
      34              : use crate::proxy::connect_compute::ConnectMechanism;
      35              : use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
      36              : use crate::rate_limiter::EndpointRateLimiter;
      37              : use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
      38              : 
      39              : pub(crate) struct PoolingBackend {
      40              :     pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
      41              :     pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
      42              :     pub(crate) pool:
      43              :         Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
      44              : 
      45              :     pub(crate) config: &'static ProxyConfig,
      46              :     pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
      47              :     pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
      48              : }
      49              : 
      50              : impl PoolingBackend {
      51            0 :     pub(crate) async fn authenticate_with_password(
      52            0 :         &self,
      53            0 :         ctx: &RequestContext,
      54            0 :         user_info: &ComputeUserInfo,
      55            0 :         password: &[u8],
      56            0 :     ) -> Result<ComputeCredentials, AuthError> {
      57            0 :         ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
      58            0 : 
      59            0 :         let user_info = user_info.clone();
      60            0 :         let backend = self.auth_backend.as_ref().map(|()| user_info.clone());
      61            0 :         let allowed_ips = backend.get_allowed_ips(ctx).await?;
      62              : 
      63            0 :         if self.config.authentication_config.ip_allowlist_check_enabled
      64            0 :             && !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
      65              :         {
      66            0 :             return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
      67            0 :         }
      68              : 
      69            0 :         let access_blocker_flags = backend.get_block_public_or_vpc_access(ctx).await?;
      70            0 :         if self.config.authentication_config.is_vpc_acccess_proxy {
      71            0 :             if access_blocker_flags.vpc_access_blocked {
      72            0 :                 return Err(AuthError::NetworkNotAllowed);
      73            0 :             }
      74            0 : 
      75            0 :             let extra = ctx.extra();
      76            0 :             let incoming_endpoint_id = match extra {
      77            0 :                 None => String::new(),
      78            0 :                 Some(ConnectionInfoExtra::Aws { vpce_id }) => {
      79            0 :                     // Convert the vcpe_id to a string
      80            0 :                     String::from_utf8(vpce_id.to_vec()).unwrap_or_default()
      81              :                 }
      82            0 :                 Some(ConnectionInfoExtra::Azure { link_id }) => link_id.to_string(),
      83              :             };
      84              : 
      85            0 :             if incoming_endpoint_id.is_empty() {
      86            0 :                 return Err(AuthError::MissingVPCEndpointId);
      87            0 :             }
      88              : 
      89            0 :             let allowed_vpc_endpoint_ids = backend.get_allowed_vpc_endpoint_ids(ctx).await?;
      90              :             // TODO: For now an empty VPC endpoint ID list means all are allowed. We should replace that.
      91            0 :             if !allowed_vpc_endpoint_ids.is_empty()
      92            0 :                 && !allowed_vpc_endpoint_ids.contains(&incoming_endpoint_id)
      93              :             {
      94            0 :                 return Err(AuthError::vpc_endpoint_id_not_allowed(incoming_endpoint_id));
      95            0 :             }
      96            0 :         } else if access_blocker_flags.public_access_blocked {
      97            0 :             return Err(AuthError::NetworkNotAllowed);
      98            0 :         }
      99              : 
     100            0 :         if !self
     101            0 :             .endpoint_rate_limiter
     102            0 :             .check(user_info.endpoint.clone().into(), 1)
     103              :         {
     104            0 :             return Err(AuthError::too_many_connections());
     105            0 :         }
     106            0 :         let cached_secret = backend.get_role_secret(ctx).await?;
     107            0 :         let secret = match cached_secret.value.clone() {
     108            0 :             Some(secret) => self.config.authentication_config.check_rate_limit(
     109            0 :                 ctx,
     110            0 :                 secret,
     111            0 :                 &user_info.endpoint,
     112            0 :                 true,
     113            0 :             )?,
     114              :             None => {
     115              :                 // If we don't have an authentication secret, for the http flow we can just return an error.
     116            0 :                 info!("authentication info not found");
     117            0 :                 return Err(AuthError::password_failed(&*user_info.user));
     118              :             }
     119              :         };
     120            0 :         let ep = EndpointIdInt::from(&user_info.endpoint);
     121            0 :         let auth_outcome = crate::auth::validate_password_and_exchange(
     122            0 :             &self.config.authentication_config.thread_pool,
     123            0 :             ep,
     124            0 :             password,
     125            0 :             secret,
     126            0 :         )
     127            0 :         .await?;
     128            0 :         let res = match auth_outcome {
     129            0 :             crate::sasl::Outcome::Success(key) => {
     130            0 :                 info!("user successfully authenticated");
     131            0 :                 Ok(key)
     132              :             }
     133            0 :             crate::sasl::Outcome::Failure(reason) => {
     134            0 :                 info!("auth backend failed with an error: {reason}");
     135            0 :                 Err(AuthError::password_failed(&*user_info.user))
     136              :             }
     137              :         };
     138            0 :         res.map(|key| ComputeCredentials {
     139            0 :             info: user_info,
     140            0 :             keys: key,
     141            0 :         })
     142            0 :     }
     143              : 
     144            0 :     pub(crate) async fn authenticate_with_jwt(
     145            0 :         &self,
     146            0 :         ctx: &RequestContext,
     147            0 :         user_info: &ComputeUserInfo,
     148            0 :         jwt: String,
     149            0 :     ) -> Result<ComputeCredentials, AuthError> {
     150            0 :         ctx.set_auth_method(crate::context::AuthMethod::Jwt);
     151            0 : 
     152            0 :         match &self.auth_backend {
     153            0 :             crate::auth::Backend::ControlPlane(console, ()) => {
     154            0 :                 self.config
     155            0 :                     .authentication_config
     156            0 :                     .jwks_cache
     157            0 :                     .check_jwt(
     158            0 :                         ctx,
     159            0 :                         user_info.endpoint.clone(),
     160            0 :                         &user_info.user,
     161            0 :                         &**console,
     162            0 :                         &jwt,
     163            0 :                     )
     164            0 :                     .await?;
     165              : 
     166            0 :                 Ok(ComputeCredentials {
     167            0 :                     info: user_info.clone(),
     168            0 :                     keys: crate::auth::backend::ComputeCredentialKeys::None,
     169            0 :                 })
     170              :             }
     171              :             crate::auth::Backend::Local(_) => {
     172            0 :                 let keys = self
     173            0 :                     .config
     174            0 :                     .authentication_config
     175            0 :                     .jwks_cache
     176            0 :                     .check_jwt(
     177            0 :                         ctx,
     178            0 :                         user_info.endpoint.clone(),
     179            0 :                         &user_info.user,
     180            0 :                         &StaticAuthRules,
     181            0 :                         &jwt,
     182            0 :                     )
     183            0 :                     .await?;
     184              : 
     185            0 :                 Ok(ComputeCredentials {
     186            0 :                     info: user_info.clone(),
     187            0 :                     keys,
     188            0 :                 })
     189              :             }
     190              :         }
     191            0 :     }
     192              : 
     193              :     // Wake up the destination if needed. Code here is a bit involved because
     194              :     // we reuse the code from the usual proxy and we need to prepare few structures
     195              :     // that this code expects.
     196              :     #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
     197              :     pub(crate) async fn connect_to_compute(
     198              :         &self,
     199              :         ctx: &RequestContext,
     200              :         conn_info: ConnInfo,
     201              :         keys: ComputeCredentials,
     202              :         force_new: bool,
     203              :     ) -> Result<Client<postgres_client::Client>, HttpConnError> {
     204              :         let maybe_client = if force_new {
     205              :             debug!("pool: pool is disabled");
     206              :             None
     207              :         } else {
     208              :             debug!("pool: looking for an existing connection");
     209              :             self.pool.get(ctx, &conn_info)?
     210              :         };
     211              : 
     212              :         if let Some(client) = maybe_client {
     213              :             return Ok(client);
     214              :         }
     215              :         let conn_id = uuid::Uuid::new_v4();
     216              :         tracing::Span::current().record("conn_id", display(conn_id));
     217              :         info!(%conn_id, "pool: opening a new connection '{conn_info}'");
     218            0 :         let backend = self.auth_backend.as_ref().map(|()| keys);
     219              :         crate::proxy::connect_compute::connect_to_compute(
     220              :             ctx,
     221              :             &TokioMechanism {
     222              :                 conn_id,
     223              :                 conn_info,
     224              :                 pool: self.pool.clone(),
     225              :                 locks: &self.config.connect_compute_locks,
     226              :             },
     227              :             &backend,
     228              :             self.config.wake_compute_retry_config,
     229              :             &self.config.connect_to_compute,
     230              :         )
     231              :         .await
     232              :     }
     233              : 
     234              :     // Wake up the destination if needed
     235              :     #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
     236              :     pub(crate) async fn connect_to_local_proxy(
     237              :         &self,
     238              :         ctx: &RequestContext,
     239              :         conn_info: ConnInfo,
     240              :     ) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
     241              :         debug!("pool: looking for an existing connection");
     242              :         if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
     243              :             return Ok(client);
     244              :         }
     245              : 
     246              :         let conn_id = uuid::Uuid::new_v4();
     247              :         tracing::Span::current().record("conn_id", display(conn_id));
     248              :         debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
     249            0 :         let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
     250            0 :             info: ComputeUserInfo {
     251            0 :                 user: conn_info.user_info.user.clone(),
     252            0 :                 endpoint: EndpointId::from(format!(
     253            0 :                     "{}{LOCAL_PROXY_SUFFIX}",
     254            0 :                     conn_info.user_info.endpoint.normalize()
     255            0 :                 )),
     256            0 :                 options: conn_info.user_info.options.clone(),
     257            0 :             },
     258            0 :             keys: crate::auth::backend::ComputeCredentialKeys::None,
     259            0 :         });
     260              :         crate::proxy::connect_compute::connect_to_compute(
     261              :             ctx,
     262              :             &HyperMechanism {
     263              :                 conn_id,
     264              :                 conn_info,
     265              :                 pool: self.http_conn_pool.clone(),
     266              :                 locks: &self.config.connect_compute_locks,
     267              :             },
     268              :             &backend,
     269              :             self.config.wake_compute_retry_config,
     270              :             &self.config.connect_to_compute,
     271              :         )
     272              :         .await
     273              :     }
     274              : 
     275              :     /// Connect to postgres over localhost.
     276              :     ///
     277              :     /// We expect postgres to be started here, so we won't do any retries.
     278              :     ///
     279              :     /// # Panics
     280              :     ///
     281              :     /// Panics if called with a non-local_proxy backend.
     282              :     #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
     283              :     pub(crate) async fn connect_to_local_postgres(
     284              :         &self,
     285              :         ctx: &RequestContext,
     286              :         conn_info: ConnInfo,
     287              :     ) -> Result<Client<postgres_client::Client>, HttpConnError> {
     288              :         if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
     289              :             return Ok(client);
     290              :         }
     291              : 
     292              :         let local_backend = match &self.auth_backend {
     293              :             auth::Backend::ControlPlane(_, ()) => {
     294              :                 unreachable!("only local_proxy can connect to local postgres")
     295              :             }
     296              :             auth::Backend::Local(local) => local,
     297              :         };
     298              : 
     299              :         if !self.local_pool.initialized(&conn_info) {
     300              :             // only install and grant usage one at a time.
     301              :             let _permit = local_backend
     302              :                 .initialize
     303              :                 .acquire()
     304              :                 .await
     305              :                 .expect("semaphore should never be closed");
     306              : 
     307              :             // check again for race
     308              :             if !self.local_pool.initialized(&conn_info) {
     309              :                 local_backend
     310              :                     .compute_ctl
     311              :                     .install_extension(&ExtensionInstallRequest {
     312              :                         extension: EXT_NAME,
     313              :                         database: conn_info.dbname.clone(),
     314              :                         version: EXT_VERSION,
     315              :                     })
     316              :                     .await?;
     317              : 
     318              :                 local_backend
     319              :                     .compute_ctl
     320              :                     .grant_role(&SetRoleGrantsRequest {
     321              :                         schema: EXT_SCHEMA,
     322              :                         privileges: vec![Privilege::Usage],
     323              :                         database: conn_info.dbname.clone(),
     324              :                         role: conn_info.user_info.user.clone(),
     325              :                     })
     326              :                     .await?;
     327              : 
     328              :                 self.local_pool.set_initialized(&conn_info);
     329              :             }
     330              :         }
     331              : 
     332              :         let conn_id = uuid::Uuid::new_v4();
     333              :         tracing::Span::current().record("conn_id", display(conn_id));
     334              :         info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
     335              : 
     336              :         let mut node_info = local_backend.node_info.clone();
     337              : 
     338              :         let (key, jwk) = create_random_jwk();
     339              : 
     340              :         let config = node_info
     341              :             .config
     342              :             .user(&conn_info.user_info.user)
     343              :             .dbname(&conn_info.dbname)
     344              :             .set_param(
     345              :                 "options",
     346              :                 &format!(
     347              :                     "-c pg_session_jwt.jwk={}",
     348              :                     serde_json::to_string(&jwk).expect("serializing jwk to json should not fail")
     349              :                 ),
     350              :             );
     351              : 
     352              :         let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
     353              :         let (client, connection) = config.connect(postgres_client::NoTls).await?;
     354              :         drop(pause);
     355              : 
     356              :         let pid = client.get_process_id();
     357              :         tracing::Span::current().record("pid", pid);
     358              : 
     359              :         let mut handle = local_conn_pool::poll_client(
     360              :             self.local_pool.clone(),
     361              :             ctx,
     362              :             conn_info,
     363              :             client,
     364              :             connection,
     365              :             key,
     366              :             conn_id,
     367              :             node_info.aux.clone(),
     368              :         );
     369              : 
     370              :         {
     371              :             let (client, mut discard) = handle.inner();
     372              :             debug!("setting up backend session state");
     373              : 
     374              :             // initiates the auth session
     375              :             if let Err(e) = client.batch_execute("select auth.init();").await {
     376              :                 discard.discard();
     377              :                 return Err(e.into());
     378              :             }
     379              : 
     380              :             info!("backend session state initialized");
     381              :         }
     382              : 
     383              :         Ok(handle)
     384              :     }
     385              : }
     386              : 
     387            0 : fn create_random_jwk() -> (SigningKey, jose_jwk::Key) {
     388            0 :     let key = SigningKey::generate(&mut OsRng);
     389            0 : 
     390            0 :     let jwk = jose_jwk::Key::Okp(jose_jwk::Okp {
     391            0 :         crv: jose_jwk::OkpCurves::Ed25519,
     392            0 :         x: jose_b64::serde::Bytes::from(key.verifying_key().to_bytes().to_vec()),
     393            0 :         d: None,
     394            0 :     });
     395            0 : 
     396            0 :     (key, jwk)
     397            0 : }
     398              : 
     399              : #[derive(Debug, thiserror::Error)]
     400              : pub(crate) enum HttpConnError {
     401              :     #[error("pooled connection closed at inconsistent state")]
     402              :     ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
     403              :     #[error("could not connect to postgres in compute")]
     404              :     PostgresConnectionError(#[from] postgres_client::Error),
     405              :     #[error("could not connect to local-proxy in compute")]
     406              :     LocalProxyConnectionError(#[from] LocalProxyConnError),
     407              :     #[error("could not parse JWT payload")]
     408              :     JwtPayloadError(serde_json::Error),
     409              : 
     410              :     #[error("could not install extension: {0}")]
     411              :     ComputeCtl(#[from] ComputeCtlError),
     412              :     #[error("could not get auth info")]
     413              :     GetAuthInfo(#[from] GetAuthInfoError),
     414              :     #[error("user not authenticated")]
     415              :     AuthError(#[from] AuthError),
     416              :     #[error("wake_compute returned error")]
     417              :     WakeCompute(#[from] WakeComputeError),
     418              :     #[error("error acquiring resource permit: {0}")]
     419              :     TooManyConnectionAttempts(#[from] ApiLockError),
     420              : }
     421              : 
     422              : #[derive(Debug, thiserror::Error)]
     423              : pub(crate) enum LocalProxyConnError {
     424              :     #[error("error with connection to local-proxy")]
     425              :     Io(#[source] std::io::Error),
     426              :     #[error("could not establish h2 connection")]
     427              :     H2(#[from] hyper::Error),
     428              : }
     429              : 
     430              : impl ReportableError for HttpConnError {
     431            0 :     fn get_error_kind(&self) -> ErrorKind {
     432            0 :         match self {
     433            0 :             HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
     434            0 :             HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
     435            0 :             HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
     436            0 :             HttpConnError::ComputeCtl(_) => ErrorKind::Service,
     437            0 :             HttpConnError::JwtPayloadError(_) => ErrorKind::User,
     438            0 :             HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
     439            0 :             HttpConnError::AuthError(a) => a.get_error_kind(),
     440            0 :             HttpConnError::WakeCompute(w) => w.get_error_kind(),
     441            0 :             HttpConnError::TooManyConnectionAttempts(w) => w.get_error_kind(),
     442              :         }
     443            0 :     }
     444              : }
     445              : 
     446              : impl UserFacingError for HttpConnError {
     447            0 :     fn to_string_client(&self) -> String {
     448            0 :         match self {
     449            0 :             HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
     450            0 :             HttpConnError::PostgresConnectionError(p) => p.to_string(),
     451            0 :             HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
     452            0 :             HttpConnError::ComputeCtl(_) => "could not set up the JWT authorization database extension".to_string(),
     453            0 :             HttpConnError::JwtPayloadError(p) => p.to_string(),
     454            0 :             HttpConnError::GetAuthInfo(c) => c.to_string_client(),
     455            0 :             HttpConnError::AuthError(c) => c.to_string_client(),
     456            0 :             HttpConnError::WakeCompute(c) => c.to_string_client(),
     457              :             HttpConnError::TooManyConnectionAttempts(_) => {
     458            0 :                 "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
     459              :             }
     460              :         }
     461            0 :     }
     462              : }
     463              : 
     464              : impl CouldRetry for HttpConnError {
     465            0 :     fn could_retry(&self) -> bool {
     466            0 :         match self {
     467            0 :             HttpConnError::PostgresConnectionError(e) => e.could_retry(),
     468            0 :             HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
     469            0 :             HttpConnError::ComputeCtl(_) => false,
     470            0 :             HttpConnError::ConnectionClosedAbruptly(_) => false,
     471            0 :             HttpConnError::JwtPayloadError(_) => false,
     472            0 :             HttpConnError::GetAuthInfo(_) => false,
     473            0 :             HttpConnError::AuthError(_) => false,
     474            0 :             HttpConnError::WakeCompute(_) => false,
     475            0 :             HttpConnError::TooManyConnectionAttempts(_) => false,
     476              :         }
     477            0 :     }
     478              : }
     479              : impl ShouldRetryWakeCompute for HttpConnError {
     480            0 :     fn should_retry_wake_compute(&self) -> bool {
     481            0 :         match self {
     482            0 :             HttpConnError::PostgresConnectionError(e) => e.should_retry_wake_compute(),
     483              :             // we never checked cache validity
     484            0 :             HttpConnError::TooManyConnectionAttempts(_) => false,
     485            0 :             _ => true,
     486              :         }
     487            0 :     }
     488              : }
     489              : 
     490              : impl ReportableError for LocalProxyConnError {
     491            0 :     fn get_error_kind(&self) -> ErrorKind {
     492            0 :         match self {
     493            0 :             LocalProxyConnError::Io(_) => ErrorKind::Compute,
     494            0 :             LocalProxyConnError::H2(_) => ErrorKind::Compute,
     495              :         }
     496            0 :     }
     497              : }
     498              : 
     499              : impl UserFacingError for LocalProxyConnError {
     500            0 :     fn to_string_client(&self) -> String {
     501            0 :         "Could not establish HTTP connection to the database".to_string()
     502            0 :     }
     503              : }
     504              : 
     505              : impl CouldRetry for LocalProxyConnError {
     506            0 :     fn could_retry(&self) -> bool {
     507            0 :         match self {
     508            0 :             LocalProxyConnError::Io(_) => false,
     509            0 :             LocalProxyConnError::H2(_) => false,
     510              :         }
     511            0 :     }
     512              : }
     513              : impl ShouldRetryWakeCompute for LocalProxyConnError {
     514            0 :     fn should_retry_wake_compute(&self) -> bool {
     515            0 :         match self {
     516            0 :             LocalProxyConnError::Io(_) => false,
     517            0 :             LocalProxyConnError::H2(_) => false,
     518              :         }
     519            0 :     }
     520              : }
     521              : 
     522              : struct TokioMechanism {
     523              :     pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
     524              :     conn_info: ConnInfo,
     525              :     conn_id: uuid::Uuid,
     526              : 
     527              :     /// connect_to_compute concurrency lock
     528              :     locks: &'static ApiLocks<Host>,
     529              : }
     530              : 
     531              : #[async_trait]
     532              : impl ConnectMechanism for TokioMechanism {
     533              :     type Connection = Client<postgres_client::Client>;
     534              :     type ConnectError = HttpConnError;
     535              :     type Error = HttpConnError;
     536              : 
     537            0 :     async fn connect_once(
     538            0 :         &self,
     539            0 :         ctx: &RequestContext,
     540            0 :         node_info: &CachedNodeInfo,
     541            0 :         compute_config: &ComputeConfig,
     542            0 :     ) -> Result<Self::Connection, Self::ConnectError> {
     543            0 :         let host = node_info.config.get_host();
     544            0 :         let permit = self.locks.get_permit(&host).await?;
     545              : 
     546            0 :         let mut config = (*node_info.config).clone();
     547            0 :         let config = config
     548            0 :             .user(&self.conn_info.user_info.user)
     549            0 :             .dbname(&self.conn_info.dbname)
     550            0 :             .connect_timeout(compute_config.timeout);
     551            0 : 
     552            0 :         let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
     553            0 :         let res = config.connect(postgres_client::NoTls).await;
     554            0 :         drop(pause);
     555            0 :         let (client, connection) = permit.release_result(res)?;
     556              : 
     557            0 :         tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
     558            0 :         Ok(poll_client(
     559            0 :             self.pool.clone(),
     560            0 :             ctx,
     561            0 :             self.conn_info.clone(),
     562            0 :             client,
     563            0 :             connection,
     564            0 :             self.conn_id,
     565            0 :             node_info.aux.clone(),
     566            0 :         ))
     567            0 :     }
     568              : 
     569            0 :     fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
     570              : }
     571              : 
     572              : struct HyperMechanism {
     573              :     pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
     574              :     conn_info: ConnInfo,
     575              :     conn_id: uuid::Uuid,
     576              : 
     577              :     /// connect_to_compute concurrency lock
     578              :     locks: &'static ApiLocks<Host>,
     579              : }
     580              : 
     581              : #[async_trait]
     582              : impl ConnectMechanism for HyperMechanism {
     583              :     type Connection = http_conn_pool::Client<Send>;
     584              :     type ConnectError = HttpConnError;
     585              :     type Error = HttpConnError;
     586              : 
     587            0 :     async fn connect_once(
     588            0 :         &self,
     589            0 :         ctx: &RequestContext,
     590            0 :         node_info: &CachedNodeInfo,
     591            0 :         config: &ComputeConfig,
     592            0 :     ) -> Result<Self::Connection, Self::ConnectError> {
     593            0 :         let host = node_info.config.get_host();
     594            0 :         let permit = self.locks.get_permit(&host).await?;
     595              : 
     596            0 :         let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
     597            0 : 
     598            0 :         let port = node_info.config.get_port();
     599            0 :         let res = connect_http2(&host, port, config.timeout).await;
     600            0 :         drop(pause);
     601            0 :         let (client, connection) = permit.release_result(res)?;
     602              : 
     603            0 :         Ok(poll_http2_client(
     604            0 :             self.pool.clone(),
     605            0 :             ctx,
     606            0 :             &self.conn_info,
     607            0 :             client,
     608            0 :             connection,
     609            0 :             self.conn_id,
     610            0 :             node_info.aux.clone(),
     611            0 :         ))
     612            0 :     }
     613              : 
     614            0 :     fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
     615              : }
     616              : 
     617            0 : async fn connect_http2(
     618            0 :     host: &str,
     619            0 :     port: u16,
     620            0 :     timeout: Duration,
     621            0 : ) -> Result<(http_conn_pool::Send, http_conn_pool::Connect), LocalProxyConnError> {
     622              :     // assumption: host is an ip address so this should not actually perform any requests.
     623              :     // todo: add that assumption as a guarantee in the control-plane API.
     624            0 :     let mut addrs = lookup_host((host, port))
     625            0 :         .await
     626            0 :         .map_err(LocalProxyConnError::Io)?;
     627              : 
     628            0 :     let mut last_err = None;
     629              : 
     630            0 :     let stream = loop {
     631            0 :         let Some(addr) = addrs.next() else {
     632            0 :             return Err(last_err.unwrap_or_else(|| {
     633            0 :                 LocalProxyConnError::Io(io::Error::new(
     634            0 :                     io::ErrorKind::InvalidInput,
     635            0 :                     "could not resolve any addresses",
     636            0 :                 ))
     637            0 :             }));
     638              :         };
     639              : 
     640            0 :         match tokio::time::timeout(timeout, TcpStream::connect(addr)).await {
     641            0 :             Ok(Ok(stream)) => {
     642            0 :                 stream.set_nodelay(true).map_err(LocalProxyConnError::Io)?;
     643            0 :                 break stream;
     644              :             }
     645            0 :             Ok(Err(e)) => {
     646            0 :                 last_err = Some(LocalProxyConnError::Io(e));
     647            0 :             }
     648            0 :             Err(e) => {
     649            0 :                 last_err = Some(LocalProxyConnError::Io(io::Error::new(
     650            0 :                     io::ErrorKind::TimedOut,
     651            0 :                     e,
     652            0 :                 )));
     653            0 :             }
     654              :         }
     655              :     };
     656              : 
     657            0 :     let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
     658            0 :         .timer(TokioTimer::new())
     659            0 :         .keep_alive_interval(Duration::from_secs(20))
     660            0 :         .keep_alive_while_idle(true)
     661            0 :         .keep_alive_timeout(Duration::from_secs(5))
     662            0 :         .handshake(TokioIo::new(stream))
     663            0 :         .await?;
     664              : 
     665            0 :     Ok((client, connection))
     666            0 : }
        

Generated by: LCOV version 2.1-beta