Line data Source code
1 : use std::io;
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use async_trait::async_trait;
6 : use ed25519_dalek::SigningKey;
7 : use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
8 : use jose_jwk::jose_b64;
9 : use rand::rngs::OsRng;
10 : use tokio::net::{lookup_host, TcpStream};
11 : use tracing::field::display;
12 : use tracing::{debug, info};
13 :
14 : use super::conn_pool::poll_client;
15 : use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
16 : use super::http_conn_pool::{self, poll_http2_client, HttpConnPool, Send};
17 : use super::local_conn_pool::{self, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION};
18 : use crate::auth::backend::local::StaticAuthRules;
19 : use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
20 : use crate::auth::{self, check_peer_addr_is_in_list, AuthError};
21 : use crate::compute;
22 : use crate::compute_ctl::{
23 : ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
24 : };
25 : use crate::config::{ComputeConfig, ProxyConfig};
26 : use crate::context::RequestContext;
27 : use crate::control_plane::client::ApiLockError;
28 : use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
29 : use crate::control_plane::locks::ApiLocks;
30 : use crate::control_plane::CachedNodeInfo;
31 : use crate::error::{ErrorKind, ReportableError, UserFacingError};
32 : use crate::intern::EndpointIdInt;
33 : use crate::protocol2::ConnectionInfoExtra;
34 : use crate::proxy::connect_compute::ConnectMechanism;
35 : use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
36 : use crate::rate_limiter::EndpointRateLimiter;
37 : use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
38 :
39 : pub(crate) struct PoolingBackend {
40 : pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
41 : pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
42 : pub(crate) pool:
43 : Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
44 :
45 : pub(crate) config: &'static ProxyConfig,
46 : pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
47 : pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
48 : }
49 :
50 : impl PoolingBackend {
51 0 : pub(crate) async fn authenticate_with_password(
52 0 : &self,
53 0 : ctx: &RequestContext,
54 0 : user_info: &ComputeUserInfo,
55 0 : password: &[u8],
56 0 : ) -> Result<ComputeCredentials, AuthError> {
57 0 : ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
58 0 :
59 0 : let user_info = user_info.clone();
60 0 : let backend = self.auth_backend.as_ref().map(|()| user_info.clone());
61 0 : let allowed_ips = backend.get_allowed_ips(ctx).await?;
62 :
63 0 : if self.config.authentication_config.ip_allowlist_check_enabled
64 0 : && !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
65 : {
66 0 : return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
67 0 : }
68 :
69 0 : let access_blocker_flags = backend.get_block_public_or_vpc_access(ctx).await?;
70 0 : if self.config.authentication_config.is_vpc_acccess_proxy {
71 0 : if access_blocker_flags.vpc_access_blocked {
72 0 : return Err(AuthError::NetworkNotAllowed);
73 0 : }
74 0 :
75 0 : let extra = ctx.extra();
76 0 : let incoming_endpoint_id = match extra {
77 0 : None => String::new(),
78 0 : Some(ConnectionInfoExtra::Aws { vpce_id }) => {
79 0 : // Convert the vcpe_id to a string
80 0 : String::from_utf8(vpce_id.to_vec()).unwrap_or_default()
81 : }
82 0 : Some(ConnectionInfoExtra::Azure { link_id }) => link_id.to_string(),
83 : };
84 :
85 0 : if incoming_endpoint_id.is_empty() {
86 0 : return Err(AuthError::MissingVPCEndpointId);
87 0 : }
88 :
89 0 : let allowed_vpc_endpoint_ids = backend.get_allowed_vpc_endpoint_ids(ctx).await?;
90 : // TODO: For now an empty VPC endpoint ID list means all are allowed. We should replace that.
91 0 : if !allowed_vpc_endpoint_ids.is_empty()
92 0 : && !allowed_vpc_endpoint_ids.contains(&incoming_endpoint_id)
93 : {
94 0 : return Err(AuthError::vpc_endpoint_id_not_allowed(incoming_endpoint_id));
95 0 : }
96 0 : } else if access_blocker_flags.public_access_blocked {
97 0 : return Err(AuthError::NetworkNotAllowed);
98 0 : }
99 :
100 0 : if !self
101 0 : .endpoint_rate_limiter
102 0 : .check(user_info.endpoint.clone().into(), 1)
103 : {
104 0 : return Err(AuthError::too_many_connections());
105 0 : }
106 0 : let cached_secret = backend.get_role_secret(ctx).await?;
107 0 : let secret = match cached_secret.value.clone() {
108 0 : Some(secret) => self.config.authentication_config.check_rate_limit(
109 0 : ctx,
110 0 : secret,
111 0 : &user_info.endpoint,
112 0 : true,
113 0 : )?,
114 : None => {
115 : // If we don't have an authentication secret, for the http flow we can just return an error.
116 0 : info!("authentication info not found");
117 0 : return Err(AuthError::password_failed(&*user_info.user));
118 : }
119 : };
120 0 : let ep = EndpointIdInt::from(&user_info.endpoint);
121 0 : let auth_outcome = crate::auth::validate_password_and_exchange(
122 0 : &self.config.authentication_config.thread_pool,
123 0 : ep,
124 0 : password,
125 0 : secret,
126 0 : )
127 0 : .await?;
128 0 : let res = match auth_outcome {
129 0 : crate::sasl::Outcome::Success(key) => {
130 0 : info!("user successfully authenticated");
131 0 : Ok(key)
132 : }
133 0 : crate::sasl::Outcome::Failure(reason) => {
134 0 : info!("auth backend failed with an error: {reason}");
135 0 : Err(AuthError::password_failed(&*user_info.user))
136 : }
137 : };
138 0 : res.map(|key| ComputeCredentials {
139 0 : info: user_info,
140 0 : keys: key,
141 0 : })
142 0 : }
143 :
144 0 : pub(crate) async fn authenticate_with_jwt(
145 0 : &self,
146 0 : ctx: &RequestContext,
147 0 : user_info: &ComputeUserInfo,
148 0 : jwt: String,
149 0 : ) -> Result<ComputeCredentials, AuthError> {
150 0 : ctx.set_auth_method(crate::context::AuthMethod::Jwt);
151 0 :
152 0 : match &self.auth_backend {
153 0 : crate::auth::Backend::ControlPlane(console, ()) => {
154 0 : self.config
155 0 : .authentication_config
156 0 : .jwks_cache
157 0 : .check_jwt(
158 0 : ctx,
159 0 : user_info.endpoint.clone(),
160 0 : &user_info.user,
161 0 : &**console,
162 0 : &jwt,
163 0 : )
164 0 : .await?;
165 :
166 0 : Ok(ComputeCredentials {
167 0 : info: user_info.clone(),
168 0 : keys: crate::auth::backend::ComputeCredentialKeys::None,
169 0 : })
170 : }
171 : crate::auth::Backend::Local(_) => {
172 0 : let keys = self
173 0 : .config
174 0 : .authentication_config
175 0 : .jwks_cache
176 0 : .check_jwt(
177 0 : ctx,
178 0 : user_info.endpoint.clone(),
179 0 : &user_info.user,
180 0 : &StaticAuthRules,
181 0 : &jwt,
182 0 : )
183 0 : .await?;
184 :
185 0 : Ok(ComputeCredentials {
186 0 : info: user_info.clone(),
187 0 : keys,
188 0 : })
189 : }
190 : }
191 0 : }
192 :
193 : // Wake up the destination if needed. Code here is a bit involved because
194 : // we reuse the code from the usual proxy and we need to prepare few structures
195 : // that this code expects.
196 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
197 : pub(crate) async fn connect_to_compute(
198 : &self,
199 : ctx: &RequestContext,
200 : conn_info: ConnInfo,
201 : keys: ComputeCredentials,
202 : force_new: bool,
203 : ) -> Result<Client<postgres_client::Client>, HttpConnError> {
204 : let maybe_client = if force_new {
205 : debug!("pool: pool is disabled");
206 : None
207 : } else {
208 : debug!("pool: looking for an existing connection");
209 : self.pool.get(ctx, &conn_info)?
210 : };
211 :
212 : if let Some(client) = maybe_client {
213 : return Ok(client);
214 : }
215 : let conn_id = uuid::Uuid::new_v4();
216 : tracing::Span::current().record("conn_id", display(conn_id));
217 : info!(%conn_id, "pool: opening a new connection '{conn_info}'");
218 0 : let backend = self.auth_backend.as_ref().map(|()| keys);
219 : crate::proxy::connect_compute::connect_to_compute(
220 : ctx,
221 : &TokioMechanism {
222 : conn_id,
223 : conn_info,
224 : pool: self.pool.clone(),
225 : locks: &self.config.connect_compute_locks,
226 : },
227 : &backend,
228 : self.config.wake_compute_retry_config,
229 : &self.config.connect_to_compute,
230 : )
231 : .await
232 : }
233 :
234 : // Wake up the destination if needed
235 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
236 : pub(crate) async fn connect_to_local_proxy(
237 : &self,
238 : ctx: &RequestContext,
239 : conn_info: ConnInfo,
240 : ) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
241 : debug!("pool: looking for an existing connection");
242 : if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
243 : return Ok(client);
244 : }
245 :
246 : let conn_id = uuid::Uuid::new_v4();
247 : tracing::Span::current().record("conn_id", display(conn_id));
248 : debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
249 0 : let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
250 0 : info: ComputeUserInfo {
251 0 : user: conn_info.user_info.user.clone(),
252 0 : endpoint: EndpointId::from(format!(
253 0 : "{}{LOCAL_PROXY_SUFFIX}",
254 0 : conn_info.user_info.endpoint.normalize()
255 0 : )),
256 0 : options: conn_info.user_info.options.clone(),
257 0 : },
258 0 : keys: crate::auth::backend::ComputeCredentialKeys::None,
259 0 : });
260 : crate::proxy::connect_compute::connect_to_compute(
261 : ctx,
262 : &HyperMechanism {
263 : conn_id,
264 : conn_info,
265 : pool: self.http_conn_pool.clone(),
266 : locks: &self.config.connect_compute_locks,
267 : },
268 : &backend,
269 : self.config.wake_compute_retry_config,
270 : &self.config.connect_to_compute,
271 : )
272 : .await
273 : }
274 :
275 : /// Connect to postgres over localhost.
276 : ///
277 : /// We expect postgres to be started here, so we won't do any retries.
278 : ///
279 : /// # Panics
280 : ///
281 : /// Panics if called with a non-local_proxy backend.
282 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
283 : pub(crate) async fn connect_to_local_postgres(
284 : &self,
285 : ctx: &RequestContext,
286 : conn_info: ConnInfo,
287 : ) -> Result<Client<postgres_client::Client>, HttpConnError> {
288 : if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
289 : return Ok(client);
290 : }
291 :
292 : let local_backend = match &self.auth_backend {
293 : auth::Backend::ControlPlane(_, ()) => {
294 : unreachable!("only local_proxy can connect to local postgres")
295 : }
296 : auth::Backend::Local(local) => local,
297 : };
298 :
299 : if !self.local_pool.initialized(&conn_info) {
300 : // only install and grant usage one at a time.
301 : let _permit = local_backend
302 : .initialize
303 : .acquire()
304 : .await
305 : .expect("semaphore should never be closed");
306 :
307 : // check again for race
308 : if !self.local_pool.initialized(&conn_info) {
309 : local_backend
310 : .compute_ctl
311 : .install_extension(&ExtensionInstallRequest {
312 : extension: EXT_NAME,
313 : database: conn_info.dbname.clone(),
314 : version: EXT_VERSION,
315 : })
316 : .await?;
317 :
318 : local_backend
319 : .compute_ctl
320 : .grant_role(&SetRoleGrantsRequest {
321 : schema: EXT_SCHEMA,
322 : privileges: vec![Privilege::Usage],
323 : database: conn_info.dbname.clone(),
324 : role: conn_info.user_info.user.clone(),
325 : })
326 : .await?;
327 :
328 : self.local_pool.set_initialized(&conn_info);
329 : }
330 : }
331 :
332 : let conn_id = uuid::Uuid::new_v4();
333 : tracing::Span::current().record("conn_id", display(conn_id));
334 : info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
335 :
336 : let mut node_info = local_backend.node_info.clone();
337 :
338 : let (key, jwk) = create_random_jwk();
339 :
340 : let config = node_info
341 : .config
342 : .user(&conn_info.user_info.user)
343 : .dbname(&conn_info.dbname)
344 : .set_param(
345 : "options",
346 : &format!(
347 : "-c pg_session_jwt.jwk={}",
348 : serde_json::to_string(&jwk).expect("serializing jwk to json should not fail")
349 : ),
350 : );
351 :
352 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
353 : let (client, connection) = config.connect(postgres_client::NoTls).await?;
354 : drop(pause);
355 :
356 : let pid = client.get_process_id();
357 : tracing::Span::current().record("pid", pid);
358 :
359 : let mut handle = local_conn_pool::poll_client(
360 : self.local_pool.clone(),
361 : ctx,
362 : conn_info,
363 : client,
364 : connection,
365 : key,
366 : conn_id,
367 : node_info.aux.clone(),
368 : );
369 :
370 : {
371 : let (client, mut discard) = handle.inner();
372 : debug!("setting up backend session state");
373 :
374 : // initiates the auth session
375 : if let Err(e) = client.batch_execute("select auth.init();").await {
376 : discard.discard();
377 : return Err(e.into());
378 : }
379 :
380 : info!("backend session state initialized");
381 : }
382 :
383 : Ok(handle)
384 : }
385 : }
386 :
387 0 : fn create_random_jwk() -> (SigningKey, jose_jwk::Key) {
388 0 : let key = SigningKey::generate(&mut OsRng);
389 0 :
390 0 : let jwk = jose_jwk::Key::Okp(jose_jwk::Okp {
391 0 : crv: jose_jwk::OkpCurves::Ed25519,
392 0 : x: jose_b64::serde::Bytes::from(key.verifying_key().to_bytes().to_vec()),
393 0 : d: None,
394 0 : });
395 0 :
396 0 : (key, jwk)
397 0 : }
398 :
399 : #[derive(Debug, thiserror::Error)]
400 : pub(crate) enum HttpConnError {
401 : #[error("pooled connection closed at inconsistent state")]
402 : ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
403 : #[error("could not connect to postgres in compute")]
404 : PostgresConnectionError(#[from] postgres_client::Error),
405 : #[error("could not connect to local-proxy in compute")]
406 : LocalProxyConnectionError(#[from] LocalProxyConnError),
407 : #[error("could not parse JWT payload")]
408 : JwtPayloadError(serde_json::Error),
409 :
410 : #[error("could not install extension: {0}")]
411 : ComputeCtl(#[from] ComputeCtlError),
412 : #[error("could not get auth info")]
413 : GetAuthInfo(#[from] GetAuthInfoError),
414 : #[error("user not authenticated")]
415 : AuthError(#[from] AuthError),
416 : #[error("wake_compute returned error")]
417 : WakeCompute(#[from] WakeComputeError),
418 : #[error("error acquiring resource permit: {0}")]
419 : TooManyConnectionAttempts(#[from] ApiLockError),
420 : }
421 :
422 : #[derive(Debug, thiserror::Error)]
423 : pub(crate) enum LocalProxyConnError {
424 : #[error("error with connection to local-proxy")]
425 : Io(#[source] std::io::Error),
426 : #[error("could not establish h2 connection")]
427 : H2(#[from] hyper::Error),
428 : }
429 :
430 : impl ReportableError for HttpConnError {
431 0 : fn get_error_kind(&self) -> ErrorKind {
432 0 : match self {
433 0 : HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
434 0 : HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
435 0 : HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
436 0 : HttpConnError::ComputeCtl(_) => ErrorKind::Service,
437 0 : HttpConnError::JwtPayloadError(_) => ErrorKind::User,
438 0 : HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
439 0 : HttpConnError::AuthError(a) => a.get_error_kind(),
440 0 : HttpConnError::WakeCompute(w) => w.get_error_kind(),
441 0 : HttpConnError::TooManyConnectionAttempts(w) => w.get_error_kind(),
442 : }
443 0 : }
444 : }
445 :
446 : impl UserFacingError for HttpConnError {
447 0 : fn to_string_client(&self) -> String {
448 0 : match self {
449 0 : HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
450 0 : HttpConnError::PostgresConnectionError(p) => p.to_string(),
451 0 : HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
452 0 : HttpConnError::ComputeCtl(_) => "could not set up the JWT authorization database extension".to_string(),
453 0 : HttpConnError::JwtPayloadError(p) => p.to_string(),
454 0 : HttpConnError::GetAuthInfo(c) => c.to_string_client(),
455 0 : HttpConnError::AuthError(c) => c.to_string_client(),
456 0 : HttpConnError::WakeCompute(c) => c.to_string_client(),
457 : HttpConnError::TooManyConnectionAttempts(_) => {
458 0 : "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
459 : }
460 : }
461 0 : }
462 : }
463 :
464 : impl CouldRetry for HttpConnError {
465 0 : fn could_retry(&self) -> bool {
466 0 : match self {
467 0 : HttpConnError::PostgresConnectionError(e) => e.could_retry(),
468 0 : HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
469 0 : HttpConnError::ComputeCtl(_) => false,
470 0 : HttpConnError::ConnectionClosedAbruptly(_) => false,
471 0 : HttpConnError::JwtPayloadError(_) => false,
472 0 : HttpConnError::GetAuthInfo(_) => false,
473 0 : HttpConnError::AuthError(_) => false,
474 0 : HttpConnError::WakeCompute(_) => false,
475 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
476 : }
477 0 : }
478 : }
479 : impl ShouldRetryWakeCompute for HttpConnError {
480 0 : fn should_retry_wake_compute(&self) -> bool {
481 0 : match self {
482 0 : HttpConnError::PostgresConnectionError(e) => e.should_retry_wake_compute(),
483 : // we never checked cache validity
484 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
485 0 : _ => true,
486 : }
487 0 : }
488 : }
489 :
490 : impl ReportableError for LocalProxyConnError {
491 0 : fn get_error_kind(&self) -> ErrorKind {
492 0 : match self {
493 0 : LocalProxyConnError::Io(_) => ErrorKind::Compute,
494 0 : LocalProxyConnError::H2(_) => ErrorKind::Compute,
495 : }
496 0 : }
497 : }
498 :
499 : impl UserFacingError for LocalProxyConnError {
500 0 : fn to_string_client(&self) -> String {
501 0 : "Could not establish HTTP connection to the database".to_string()
502 0 : }
503 : }
504 :
505 : impl CouldRetry for LocalProxyConnError {
506 0 : fn could_retry(&self) -> bool {
507 0 : match self {
508 0 : LocalProxyConnError::Io(_) => false,
509 0 : LocalProxyConnError::H2(_) => false,
510 : }
511 0 : }
512 : }
513 : impl ShouldRetryWakeCompute for LocalProxyConnError {
514 0 : fn should_retry_wake_compute(&self) -> bool {
515 0 : match self {
516 0 : LocalProxyConnError::Io(_) => false,
517 0 : LocalProxyConnError::H2(_) => false,
518 : }
519 0 : }
520 : }
521 :
522 : struct TokioMechanism {
523 : pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
524 : conn_info: ConnInfo,
525 : conn_id: uuid::Uuid,
526 :
527 : /// connect_to_compute concurrency lock
528 : locks: &'static ApiLocks<Host>,
529 : }
530 :
531 : #[async_trait]
532 : impl ConnectMechanism for TokioMechanism {
533 : type Connection = Client<postgres_client::Client>;
534 : type ConnectError = HttpConnError;
535 : type Error = HttpConnError;
536 :
537 0 : async fn connect_once(
538 0 : &self,
539 0 : ctx: &RequestContext,
540 0 : node_info: &CachedNodeInfo,
541 0 : compute_config: &ComputeConfig,
542 0 : ) -> Result<Self::Connection, Self::ConnectError> {
543 0 : let host = node_info.config.get_host();
544 0 : let permit = self.locks.get_permit(&host).await?;
545 :
546 0 : let mut config = (*node_info.config).clone();
547 0 : let config = config
548 0 : .user(&self.conn_info.user_info.user)
549 0 : .dbname(&self.conn_info.dbname)
550 0 : .connect_timeout(compute_config.timeout);
551 0 :
552 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
553 0 : let res = config.connect(postgres_client::NoTls).await;
554 0 : drop(pause);
555 0 : let (client, connection) = permit.release_result(res)?;
556 :
557 0 : tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
558 0 : Ok(poll_client(
559 0 : self.pool.clone(),
560 0 : ctx,
561 0 : self.conn_info.clone(),
562 0 : client,
563 0 : connection,
564 0 : self.conn_id,
565 0 : node_info.aux.clone(),
566 0 : ))
567 0 : }
568 :
569 0 : fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
570 : }
571 :
572 : struct HyperMechanism {
573 : pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
574 : conn_info: ConnInfo,
575 : conn_id: uuid::Uuid,
576 :
577 : /// connect_to_compute concurrency lock
578 : locks: &'static ApiLocks<Host>,
579 : }
580 :
581 : #[async_trait]
582 : impl ConnectMechanism for HyperMechanism {
583 : type Connection = http_conn_pool::Client<Send>;
584 : type ConnectError = HttpConnError;
585 : type Error = HttpConnError;
586 :
587 0 : async fn connect_once(
588 0 : &self,
589 0 : ctx: &RequestContext,
590 0 : node_info: &CachedNodeInfo,
591 0 : config: &ComputeConfig,
592 0 : ) -> Result<Self::Connection, Self::ConnectError> {
593 0 : let host = node_info.config.get_host();
594 0 : let permit = self.locks.get_permit(&host).await?;
595 :
596 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
597 0 :
598 0 : let port = node_info.config.get_port();
599 0 : let res = connect_http2(&host, port, config.timeout).await;
600 0 : drop(pause);
601 0 : let (client, connection) = permit.release_result(res)?;
602 :
603 0 : Ok(poll_http2_client(
604 0 : self.pool.clone(),
605 0 : ctx,
606 0 : &self.conn_info,
607 0 : client,
608 0 : connection,
609 0 : self.conn_id,
610 0 : node_info.aux.clone(),
611 0 : ))
612 0 : }
613 :
614 0 : fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
615 : }
616 :
617 0 : async fn connect_http2(
618 0 : host: &str,
619 0 : port: u16,
620 0 : timeout: Duration,
621 0 : ) -> Result<(http_conn_pool::Send, http_conn_pool::Connect), LocalProxyConnError> {
622 : // assumption: host is an ip address so this should not actually perform any requests.
623 : // todo: add that assumption as a guarantee in the control-plane API.
624 0 : let mut addrs = lookup_host((host, port))
625 0 : .await
626 0 : .map_err(LocalProxyConnError::Io)?;
627 :
628 0 : let mut last_err = None;
629 :
630 0 : let stream = loop {
631 0 : let Some(addr) = addrs.next() else {
632 0 : return Err(last_err.unwrap_or_else(|| {
633 0 : LocalProxyConnError::Io(io::Error::new(
634 0 : io::ErrorKind::InvalidInput,
635 0 : "could not resolve any addresses",
636 0 : ))
637 0 : }));
638 : };
639 :
640 0 : match tokio::time::timeout(timeout, TcpStream::connect(addr)).await {
641 0 : Ok(Ok(stream)) => {
642 0 : stream.set_nodelay(true).map_err(LocalProxyConnError::Io)?;
643 0 : break stream;
644 : }
645 0 : Ok(Err(e)) => {
646 0 : last_err = Some(LocalProxyConnError::Io(e));
647 0 : }
648 0 : Err(e) => {
649 0 : last_err = Some(LocalProxyConnError::Io(io::Error::new(
650 0 : io::ErrorKind::TimedOut,
651 0 : e,
652 0 : )));
653 0 : }
654 : }
655 : };
656 :
657 0 : let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
658 0 : .timer(TokioTimer::new())
659 0 : .keep_alive_interval(Duration::from_secs(20))
660 0 : .keep_alive_while_idle(true)
661 0 : .keep_alive_timeout(Duration::from_secs(5))
662 0 : .handshake(TokioIo::new(stream))
663 0 : .await?;
664 :
665 0 : Ok((client, connection))
666 0 : }
|