Line data Source code
1 : use std::io;
2 : use std::net::{IpAddr, SocketAddr};
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use async_trait::async_trait;
7 : use ed25519_dalek::SigningKey;
8 : use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
9 : use jose_jwk::jose_b64;
10 : use postgres_client::config::SslMode;
11 : use rand::rngs::OsRng;
12 : use rustls::pki_types::{DnsName, ServerName};
13 : use tokio::net::{TcpStream, lookup_host};
14 : use tokio_rustls::TlsConnector;
15 : use tracing::field::display;
16 : use tracing::{debug, info};
17 :
18 : use super::AsyncRW;
19 : use super::conn_pool::poll_client;
20 : use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
21 : use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client};
22 : use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool};
23 : use crate::auth::backend::local::StaticAuthRules;
24 : use crate::auth::backend::{ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo};
25 : use crate::auth::{self, AuthError};
26 : use crate::compute_ctl::{
27 : ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
28 : };
29 : use crate::config::{ComputeConfig, ProxyConfig};
30 : use crate::context::RequestContext;
31 : use crate::control_plane::CachedNodeInfo;
32 : use crate::control_plane::client::ApiLockError;
33 : use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
34 : use crate::control_plane::locks::ApiLocks;
35 : use crate::error::{ErrorKind, ReportableError, UserFacingError};
36 : use crate::intern::EndpointIdInt;
37 : use crate::proxy::connect_compute::ConnectMechanism;
38 : use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
39 : use crate::rate_limiter::EndpointRateLimiter;
40 : use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
41 :
42 : pub(crate) struct PoolingBackend {
43 : pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
44 : pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
45 : pub(crate) pool:
46 : Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
47 :
48 : pub(crate) config: &'static ProxyConfig,
49 : pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
50 : pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
51 : }
52 :
53 : impl PoolingBackend {
54 0 : pub(crate) async fn authenticate_with_password(
55 0 : &self,
56 0 : ctx: &RequestContext,
57 0 : user_info: &ComputeUserInfo,
58 0 : password: &[u8],
59 0 : ) -> Result<ComputeCredentials, AuthError> {
60 0 : ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
61 :
62 0 : let user_info = user_info.clone();
63 0 : let backend = self.auth_backend.as_ref().map(|()| user_info.clone());
64 0 : let access_control = backend.get_endpoint_access_control(ctx).await?;
65 0 : access_control.check(
66 0 : ctx,
67 0 : self.config.authentication_config.ip_allowlist_check_enabled,
68 0 : self.config.authentication_config.is_vpc_acccess_proxy,
69 0 : )?;
70 :
71 0 : access_control.connection_attempt_rate_limit(
72 0 : ctx,
73 0 : &user_info.endpoint,
74 0 : &self.endpoint_rate_limiter,
75 0 : )?;
76 :
77 0 : let role_access = backend.get_role_secret(ctx).await?;
78 0 : let Some(secret) = role_access.secret else {
79 : // If we don't have an authentication secret, for the http flow we can just return an error.
80 0 : info!("authentication info not found");
81 0 : return Err(AuthError::password_failed(&*user_info.user));
82 : };
83 :
84 0 : let ep = EndpointIdInt::from(&user_info.endpoint);
85 0 : let auth_outcome = crate::auth::validate_password_and_exchange(
86 0 : &self.config.authentication_config.thread_pool,
87 0 : ep,
88 0 : password,
89 0 : secret,
90 0 : )
91 0 : .await?;
92 0 : let res = match auth_outcome {
93 0 : crate::sasl::Outcome::Success(key) => {
94 0 : info!("user successfully authenticated");
95 0 : Ok(key)
96 : }
97 0 : crate::sasl::Outcome::Failure(reason) => {
98 0 : info!("auth backend failed with an error: {reason}");
99 0 : Err(AuthError::password_failed(&*user_info.user))
100 : }
101 : };
102 0 : res.map(|key| ComputeCredentials {
103 0 : info: user_info,
104 0 : keys: key,
105 0 : })
106 0 : }
107 :
108 0 : pub(crate) async fn authenticate_with_jwt(
109 0 : &self,
110 0 : ctx: &RequestContext,
111 0 : user_info: &ComputeUserInfo,
112 0 : jwt: String,
113 0 : ) -> Result<ComputeCredentials, AuthError> {
114 0 : ctx.set_auth_method(crate::context::AuthMethod::Jwt);
115 :
116 0 : match &self.auth_backend {
117 0 : crate::auth::Backend::ControlPlane(console, ()) => {
118 0 : let keys = self
119 0 : .config
120 0 : .authentication_config
121 0 : .jwks_cache
122 0 : .check_jwt(
123 0 : ctx,
124 0 : user_info.endpoint.clone(),
125 0 : &user_info.user,
126 0 : &**console,
127 0 : &jwt,
128 0 : )
129 0 : .await?;
130 :
131 0 : Ok(ComputeCredentials {
132 0 : info: user_info.clone(),
133 0 : keys,
134 0 : })
135 : }
136 : crate::auth::Backend::Local(_) => {
137 0 : let keys = self
138 0 : .config
139 0 : .authentication_config
140 0 : .jwks_cache
141 0 : .check_jwt(
142 0 : ctx,
143 0 : user_info.endpoint.clone(),
144 0 : &user_info.user,
145 0 : &StaticAuthRules,
146 0 : &jwt,
147 0 : )
148 0 : .await?;
149 :
150 0 : Ok(ComputeCredentials {
151 0 : info: user_info.clone(),
152 0 : keys,
153 0 : })
154 : }
155 : }
156 0 : }
157 :
158 : // Wake up the destination if needed. Code here is a bit involved because
159 : // we reuse the code from the usual proxy and we need to prepare few structures
160 : // that this code expects.
161 : #[tracing::instrument(skip_all, fields(
162 : pid = tracing::field::Empty,
163 : compute_id = tracing::field::Empty,
164 : conn_id = tracing::field::Empty,
165 : ))]
166 : pub(crate) async fn connect_to_compute(
167 : &self,
168 : ctx: &RequestContext,
169 : conn_info: ConnInfo,
170 : keys: ComputeCredentials,
171 : force_new: bool,
172 : ) -> Result<Client<postgres_client::Client>, HttpConnError> {
173 : let maybe_client = if force_new {
174 : debug!("pool: pool is disabled");
175 : None
176 : } else {
177 : debug!("pool: looking for an existing connection");
178 : self.pool.get(ctx, &conn_info)?
179 : };
180 :
181 : if let Some(client) = maybe_client {
182 : return Ok(client);
183 : }
184 : let conn_id = uuid::Uuid::new_v4();
185 : tracing::Span::current().record("conn_id", display(conn_id));
186 : info!(%conn_id, "pool: opening a new connection '{conn_info}'");
187 : let backend = self.auth_backend.as_ref().map(|()| keys.info);
188 : crate::proxy::connect_compute::connect_to_compute(
189 : ctx,
190 : &TokioMechanism {
191 : conn_id,
192 : conn_info,
193 : pool: self.pool.clone(),
194 : locks: &self.config.connect_compute_locks,
195 : keys: keys.keys,
196 : },
197 : &backend,
198 : self.config.wake_compute_retry_config,
199 : &self.config.connect_to_compute,
200 : )
201 : .await
202 : }
203 :
204 : // Wake up the destination if needed
205 : #[tracing::instrument(skip_all, fields(
206 : compute_id = tracing::field::Empty,
207 : conn_id = tracing::field::Empty,
208 : ))]
209 : pub(crate) async fn connect_to_local_proxy(
210 : &self,
211 : ctx: &RequestContext,
212 : conn_info: ConnInfo,
213 : ) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
214 : debug!("pool: looking for an existing connection");
215 : if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
216 : return Ok(client);
217 : }
218 :
219 : let conn_id = uuid::Uuid::new_v4();
220 : tracing::Span::current().record("conn_id", display(conn_id));
221 : debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
222 : let backend = self.auth_backend.as_ref().map(|()| ComputeUserInfo {
223 0 : user: conn_info.user_info.user.clone(),
224 0 : endpoint: EndpointId::from(format!(
225 0 : "{}{LOCAL_PROXY_SUFFIX}",
226 0 : conn_info.user_info.endpoint.normalize()
227 : )),
228 0 : options: conn_info.user_info.options.clone(),
229 0 : });
230 : crate::proxy::connect_compute::connect_to_compute(
231 : ctx,
232 : &HyperMechanism {
233 : conn_id,
234 : conn_info,
235 : pool: self.http_conn_pool.clone(),
236 : locks: &self.config.connect_compute_locks,
237 : },
238 : &backend,
239 : self.config.wake_compute_retry_config,
240 : &self.config.connect_to_compute,
241 : )
242 : .await
243 : }
244 :
245 : /// Connect to postgres over localhost.
246 : ///
247 : /// We expect postgres to be started here, so we won't do any retries.
248 : ///
249 : /// # Panics
250 : ///
251 : /// Panics if called with a non-local_proxy backend.
252 : #[tracing::instrument(skip_all, fields(
253 : pid = tracing::field::Empty,
254 : conn_id = tracing::field::Empty,
255 : ))]
256 : pub(crate) async fn connect_to_local_postgres(
257 : &self,
258 : ctx: &RequestContext,
259 : conn_info: ConnInfo,
260 : disable_pg_session_jwt: bool,
261 : ) -> Result<Client<postgres_client::Client>, HttpConnError> {
262 : if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
263 : return Ok(client);
264 : }
265 :
266 : let local_backend = match &self.auth_backend {
267 : auth::Backend::ControlPlane(_, ()) => {
268 : unreachable!("only local_proxy can connect to local postgres")
269 : }
270 : auth::Backend::Local(local) => local,
271 : };
272 :
273 : if !self.local_pool.initialized(&conn_info) {
274 : // only install and grant usage one at a time.
275 : let _permit = local_backend
276 : .initialize
277 : .acquire()
278 : .await
279 : .expect("semaphore should never be closed");
280 :
281 : // check again for race
282 : if !self.local_pool.initialized(&conn_info) && !disable_pg_session_jwt {
283 : local_backend
284 : .compute_ctl
285 : .install_extension(&ExtensionInstallRequest {
286 : extension: EXT_NAME,
287 : database: conn_info.dbname.clone(),
288 : version: EXT_VERSION,
289 : })
290 : .await?;
291 :
292 : local_backend
293 : .compute_ctl
294 : .grant_role(&SetRoleGrantsRequest {
295 : schema: EXT_SCHEMA,
296 : privileges: vec![Privilege::Usage],
297 : database: conn_info.dbname.clone(),
298 : role: conn_info.user_info.user.clone(),
299 : })
300 : .await?;
301 :
302 : self.local_pool.set_initialized(&conn_info);
303 : }
304 : }
305 :
306 : let conn_id = uuid::Uuid::new_v4();
307 : tracing::Span::current().record("conn_id", display(conn_id));
308 : info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
309 :
310 : let (key, jwk) = create_random_jwk();
311 :
312 : let mut config = local_backend
313 : .node_info
314 : .conn_info
315 : .to_postgres_client_config();
316 : config
317 : .user(&conn_info.user_info.user)
318 : .dbname(&conn_info.dbname);
319 : if !disable_pg_session_jwt {
320 : config.set_param(
321 : "options",
322 : &format!(
323 : "-c pg_session_jwt.jwk={}",
324 : serde_json::to_string(&jwk).expect("serializing jwk to json should not fail")
325 : ),
326 : );
327 : }
328 :
329 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
330 : let (client, connection) = config.connect(&postgres_client::NoTls).await?;
331 : drop(pause);
332 :
333 : let pid = client.get_process_id();
334 : tracing::Span::current().record("pid", pid);
335 :
336 : let mut handle = local_conn_pool::poll_client(
337 : self.local_pool.clone(),
338 : ctx,
339 : conn_info,
340 : client,
341 : connection,
342 : key,
343 : conn_id,
344 : local_backend.node_info.aux.clone(),
345 : );
346 :
347 : {
348 : let (client, mut discard) = handle.inner();
349 : debug!("setting up backend session state");
350 :
351 : // initiates the auth session
352 : if !disable_pg_session_jwt
353 : && let Err(e) = client.batch_execute("select auth.init();").await
354 : {
355 : discard.discard();
356 : return Err(e.into());
357 : }
358 :
359 : info!("backend session state initialized");
360 : }
361 :
362 : Ok(handle)
363 : }
364 : }
365 :
366 0 : fn create_random_jwk() -> (SigningKey, jose_jwk::Key) {
367 0 : let key = SigningKey::generate(&mut OsRng);
368 :
369 0 : let jwk = jose_jwk::Key::Okp(jose_jwk::Okp {
370 0 : crv: jose_jwk::OkpCurves::Ed25519,
371 0 : x: jose_b64::serde::Bytes::from(key.verifying_key().to_bytes().to_vec()),
372 0 : d: None,
373 0 : });
374 :
375 0 : (key, jwk)
376 0 : }
377 :
378 : #[derive(Debug, thiserror::Error)]
379 : pub(crate) enum HttpConnError {
380 : #[error("pooled connection closed at inconsistent state")]
381 : ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
382 : #[error("could not connect to postgres in compute")]
383 : PostgresConnectionError(#[from] postgres_client::Error),
384 : #[error("could not connect to local-proxy in compute")]
385 : LocalProxyConnectionError(#[from] LocalProxyConnError),
386 : #[error("could not parse JWT payload")]
387 : JwtPayloadError(serde_json::Error),
388 :
389 : #[error("could not install extension: {0}")]
390 : ComputeCtl(#[from] ComputeCtlError),
391 : #[error("could not get auth info")]
392 : GetAuthInfo(#[from] GetAuthInfoError),
393 : #[error("user not authenticated")]
394 : AuthError(#[from] AuthError),
395 : #[error("wake_compute returned error")]
396 : WakeCompute(#[from] WakeComputeError),
397 : #[error("error acquiring resource permit: {0}")]
398 : TooManyConnectionAttempts(#[from] ApiLockError),
399 : }
400 :
401 : #[derive(Debug, thiserror::Error)]
402 : pub(crate) enum LocalProxyConnError {
403 : #[error("error with connection to local-proxy")]
404 : Io(#[source] std::io::Error),
405 : #[error("could not establish h2 connection")]
406 : H2(#[from] hyper::Error),
407 : }
408 :
409 : impl ReportableError for HttpConnError {
410 0 : fn get_error_kind(&self) -> ErrorKind {
411 0 : match self {
412 0 : HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
413 0 : HttpConnError::PostgresConnectionError(p) => {
414 0 : if p.as_db_error().is_some() {
415 : // postgres rejected the connection
416 0 : ErrorKind::Postgres
417 : } else {
418 : // couldn't even reach postgres
419 0 : ErrorKind::Compute
420 : }
421 : }
422 0 : HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
423 0 : HttpConnError::ComputeCtl(_) => ErrorKind::Service,
424 0 : HttpConnError::JwtPayloadError(_) => ErrorKind::User,
425 0 : HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
426 0 : HttpConnError::AuthError(a) => a.get_error_kind(),
427 0 : HttpConnError::WakeCompute(w) => w.get_error_kind(),
428 0 : HttpConnError::TooManyConnectionAttempts(w) => w.get_error_kind(),
429 : }
430 0 : }
431 : }
432 :
433 : impl UserFacingError for HttpConnError {
434 0 : fn to_string_client(&self) -> String {
435 0 : match self {
436 0 : HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
437 0 : HttpConnError::PostgresConnectionError(p) => p.to_string(),
438 0 : HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
439 0 : HttpConnError::ComputeCtl(_) => "could not set up the JWT authorization database extension".to_string(),
440 0 : HttpConnError::JwtPayloadError(p) => p.to_string(),
441 0 : HttpConnError::GetAuthInfo(c) => c.to_string_client(),
442 0 : HttpConnError::AuthError(c) => c.to_string_client(),
443 0 : HttpConnError::WakeCompute(c) => c.to_string_client(),
444 : HttpConnError::TooManyConnectionAttempts(_) => {
445 0 : "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
446 : }
447 : }
448 0 : }
449 : }
450 :
451 : impl CouldRetry for HttpConnError {
452 0 : fn could_retry(&self) -> bool {
453 0 : match self {
454 0 : HttpConnError::PostgresConnectionError(e) => e.could_retry(),
455 0 : HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
456 0 : HttpConnError::ComputeCtl(_) => false,
457 0 : HttpConnError::ConnectionClosedAbruptly(_) => false,
458 0 : HttpConnError::JwtPayloadError(_) => false,
459 0 : HttpConnError::GetAuthInfo(_) => false,
460 0 : HttpConnError::AuthError(_) => false,
461 0 : HttpConnError::WakeCompute(_) => false,
462 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
463 : }
464 0 : }
465 : }
466 : impl ShouldRetryWakeCompute for HttpConnError {
467 0 : fn should_retry_wake_compute(&self) -> bool {
468 0 : match self {
469 0 : HttpConnError::PostgresConnectionError(e) => e.should_retry_wake_compute(),
470 : // we never checked cache validity
471 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
472 0 : _ => true,
473 : }
474 0 : }
475 : }
476 :
477 : impl ReportableError for LocalProxyConnError {
478 0 : fn get_error_kind(&self) -> ErrorKind {
479 0 : match self {
480 0 : LocalProxyConnError::Io(_) => ErrorKind::Compute,
481 0 : LocalProxyConnError::H2(_) => ErrorKind::Compute,
482 : }
483 0 : }
484 : }
485 :
486 : impl UserFacingError for LocalProxyConnError {
487 0 : fn to_string_client(&self) -> String {
488 0 : "Could not establish HTTP connection to the database".to_string()
489 0 : }
490 : }
491 :
492 : impl CouldRetry for LocalProxyConnError {
493 0 : fn could_retry(&self) -> bool {
494 0 : match self {
495 0 : LocalProxyConnError::Io(_) => false,
496 0 : LocalProxyConnError::H2(_) => false,
497 : }
498 0 : }
499 : }
500 : impl ShouldRetryWakeCompute for LocalProxyConnError {
501 0 : fn should_retry_wake_compute(&self) -> bool {
502 0 : match self {
503 0 : LocalProxyConnError::Io(_) => false,
504 0 : LocalProxyConnError::H2(_) => false,
505 : }
506 0 : }
507 : }
508 :
509 : struct TokioMechanism {
510 : pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
511 : conn_info: ConnInfo,
512 : conn_id: uuid::Uuid,
513 : keys: ComputeCredentialKeys,
514 :
515 : /// connect_to_compute concurrency lock
516 : locks: &'static ApiLocks<Host>,
517 : }
518 :
519 : #[async_trait]
520 : impl ConnectMechanism for TokioMechanism {
521 : type Connection = Client<postgres_client::Client>;
522 : type ConnectError = HttpConnError;
523 : type Error = HttpConnError;
524 :
525 0 : async fn connect_once(
526 : &self,
527 : ctx: &RequestContext,
528 : node_info: &CachedNodeInfo,
529 : compute_config: &ComputeConfig,
530 0 : ) -> Result<Self::Connection, Self::ConnectError> {
531 0 : let permit = self.locks.get_permit(&node_info.conn_info.host).await?;
532 :
533 0 : let mut config = node_info.conn_info.to_postgres_client_config();
534 0 : let config = config
535 0 : .user(&self.conn_info.user_info.user)
536 0 : .dbname(&self.conn_info.dbname)
537 0 : .connect_timeout(compute_config.timeout);
538 :
539 0 : if let ComputeCredentialKeys::AuthKeys(auth_keys) = self.keys {
540 0 : config.auth_keys(auth_keys);
541 0 : }
542 :
543 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
544 0 : let res = config.connect(compute_config).await;
545 0 : drop(pause);
546 0 : let (client, connection) = permit.release_result(res)?;
547 :
548 0 : tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
549 0 : tracing::Span::current().record(
550 0 : "compute_id",
551 0 : tracing::field::display(&node_info.aux.compute_id),
552 : );
553 :
554 0 : if let Some(query_id) = ctx.get_testodrome_id() {
555 0 : info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
556 0 : }
557 :
558 0 : Ok(poll_client(
559 0 : self.pool.clone(),
560 0 : ctx,
561 0 : self.conn_info.clone(),
562 0 : client,
563 0 : connection,
564 0 : self.conn_id,
565 0 : node_info.aux.clone(),
566 0 : ))
567 0 : }
568 : }
569 :
570 : struct HyperMechanism {
571 : pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
572 : conn_info: ConnInfo,
573 : conn_id: uuid::Uuid,
574 :
575 : /// connect_to_compute concurrency lock
576 : locks: &'static ApiLocks<Host>,
577 : }
578 :
579 : #[async_trait]
580 : impl ConnectMechanism for HyperMechanism {
581 : type Connection = http_conn_pool::Client<Send>;
582 : type ConnectError = HttpConnError;
583 : type Error = HttpConnError;
584 :
585 0 : async fn connect_once(
586 : &self,
587 : ctx: &RequestContext,
588 : node_info: &CachedNodeInfo,
589 : config: &ComputeConfig,
590 0 : ) -> Result<Self::Connection, Self::ConnectError> {
591 0 : let host_addr = node_info.conn_info.host_addr;
592 0 : let host = &node_info.conn_info.host;
593 0 : let permit = self.locks.get_permit(host).await?;
594 :
595 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
596 :
597 0 : let tls = if node_info.conn_info.ssl_mode == SslMode::Disable {
598 0 : None
599 : } else {
600 0 : Some(&config.tls)
601 : };
602 :
603 0 : let port = node_info.conn_info.port;
604 0 : let res = connect_http2(host_addr, host, port, config.timeout, tls).await;
605 0 : drop(pause);
606 0 : let (client, connection) = permit.release_result(res)?;
607 :
608 0 : tracing::Span::current().record(
609 0 : "compute_id",
610 0 : tracing::field::display(&node_info.aux.compute_id),
611 : );
612 :
613 0 : if let Some(query_id) = ctx.get_testodrome_id() {
614 0 : info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
615 0 : }
616 :
617 0 : Ok(poll_http2_client(
618 0 : self.pool.clone(),
619 0 : ctx,
620 0 : &self.conn_info,
621 0 : client,
622 0 : connection,
623 0 : self.conn_id,
624 0 : node_info.aux.clone(),
625 0 : ))
626 0 : }
627 : }
628 :
629 0 : async fn connect_http2(
630 0 : host_addr: Option<IpAddr>,
631 0 : host: &str,
632 0 : port: u16,
633 0 : timeout: Duration,
634 0 : tls: Option<&Arc<rustls::ClientConfig>>,
635 0 : ) -> Result<(http_conn_pool::Send, http_conn_pool::Connect), LocalProxyConnError> {
636 0 : let addrs = match host_addr {
637 0 : Some(addr) => vec![SocketAddr::new(addr, port)],
638 0 : None => lookup_host((host, port))
639 0 : .await
640 0 : .map_err(LocalProxyConnError::Io)?
641 0 : .collect(),
642 : };
643 0 : let mut last_err = None;
644 :
645 0 : let mut addrs = addrs.into_iter();
646 0 : let stream = loop {
647 0 : let Some(addr) = addrs.next() else {
648 0 : return Err(last_err.unwrap_or_else(|| {
649 0 : LocalProxyConnError::Io(io::Error::new(
650 0 : io::ErrorKind::InvalidInput,
651 0 : "could not resolve any addresses",
652 0 : ))
653 0 : }));
654 : };
655 :
656 0 : match tokio::time::timeout(timeout, TcpStream::connect(addr)).await {
657 0 : Ok(Ok(stream)) => {
658 0 : stream.set_nodelay(true).map_err(LocalProxyConnError::Io)?;
659 0 : break stream;
660 : }
661 0 : Ok(Err(e)) => {
662 0 : last_err = Some(LocalProxyConnError::Io(e));
663 0 : }
664 0 : Err(e) => {
665 0 : last_err = Some(LocalProxyConnError::Io(io::Error::new(
666 0 : io::ErrorKind::TimedOut,
667 0 : e,
668 0 : )));
669 0 : }
670 : }
671 : };
672 :
673 0 : let stream = if let Some(tls) = tls {
674 0 : let host = DnsName::try_from(host)
675 0 : .map_err(io::Error::other)
676 0 : .map_err(LocalProxyConnError::Io)?
677 0 : .to_owned();
678 0 : let stream = TlsConnector::from(tls.clone())
679 0 : .connect(ServerName::DnsName(host), stream)
680 0 : .await
681 0 : .map_err(LocalProxyConnError::Io)?;
682 0 : Box::pin(stream) as AsyncRW
683 : } else {
684 0 : Box::pin(stream) as AsyncRW
685 : };
686 :
687 0 : let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
688 0 : .timer(TokioTimer::new())
689 0 : .keep_alive_interval(Duration::from_secs(20))
690 0 : .keep_alive_while_idle(true)
691 0 : .keep_alive_timeout(Duration::from_secs(5))
692 0 : .handshake(TokioIo::new(stream))
693 0 : .await?;
694 :
695 0 : Ok((client, connection))
696 0 : }
|