Line data Source code
1 : use std::io;
2 : use std::net::{IpAddr, SocketAddr};
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use async_trait::async_trait;
7 : use ed25519_dalek::SigningKey;
8 : use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
9 : use jose_jwk::jose_b64;
10 : use postgres_client::config::SslMode;
11 : use rand_core::OsRng;
12 : use rustls::pki_types::{DnsName, ServerName};
13 : use tokio::net::{TcpStream, lookup_host};
14 : use tokio_rustls::TlsConnector;
15 : use tracing::field::display;
16 : use tracing::{debug, info};
17 :
18 : use super::AsyncRW;
19 : use super::conn_pool::poll_client;
20 : use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
21 : use super::http_conn_pool::{self, HttpConnPool, LocalProxyClient, poll_http2_client};
22 : use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool};
23 : use crate::auth::backend::local::StaticAuthRules;
24 : use crate::auth::backend::{ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo};
25 : use crate::auth::{self, AuthError};
26 : use crate::compute_ctl::{
27 : ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
28 : };
29 : use crate::config::{ComputeConfig, ProxyConfig};
30 : use crate::context::RequestContext;
31 : use crate::control_plane::CachedNodeInfo;
32 : use crate::control_plane::client::ApiLockError;
33 : use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
34 : use crate::control_plane::locks::ApiLocks;
35 : use crate::error::{ErrorKind, ReportableError, UserFacingError};
36 : use crate::intern::EndpointIdInt;
37 : use crate::proxy::connect_compute::ConnectMechanism;
38 : use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
39 : use crate::rate_limiter::EndpointRateLimiter;
40 : use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
41 :
42 : pub(crate) struct PoolingBackend {
43 : pub(crate) http_conn_pool:
44 : Arc<GlobalConnPool<LocalProxyClient, HttpConnPool<LocalProxyClient>>>,
45 : pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
46 : pub(crate) pool:
47 : Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
48 :
49 : pub(crate) config: &'static ProxyConfig,
50 : pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
51 : pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
52 : }
53 :
54 : impl PoolingBackend {
55 0 : pub(crate) async fn authenticate_with_password(
56 0 : &self,
57 0 : ctx: &RequestContext,
58 0 : user_info: &ComputeUserInfo,
59 0 : password: &[u8],
60 0 : ) -> Result<ComputeCredentials, AuthError> {
61 0 : ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
62 :
63 0 : let user_info = user_info.clone();
64 0 : let backend = self.auth_backend.as_ref().map(|()| user_info.clone());
65 0 : let access_control = backend.get_endpoint_access_control(ctx).await?;
66 0 : access_control.check(
67 0 : ctx,
68 0 : self.config.authentication_config.ip_allowlist_check_enabled,
69 0 : self.config.authentication_config.is_vpc_acccess_proxy,
70 0 : )?;
71 :
72 0 : access_control.connection_attempt_rate_limit(
73 0 : ctx,
74 0 : &user_info.endpoint,
75 0 : &self.endpoint_rate_limiter,
76 0 : )?;
77 :
78 0 : let role_access = backend.get_role_secret(ctx).await?;
79 0 : let Some(secret) = role_access.secret else {
80 : // If we don't have an authentication secret, for the http flow we can just return an error.
81 0 : info!("authentication info not found");
82 0 : return Err(AuthError::password_failed(&*user_info.user));
83 : };
84 :
85 0 : let ep = EndpointIdInt::from(&user_info.endpoint);
86 0 : let auth_outcome = crate::auth::validate_password_and_exchange(
87 0 : &self.config.authentication_config.thread_pool,
88 0 : ep,
89 0 : password,
90 0 : secret,
91 0 : )
92 0 : .await?;
93 0 : let res = match auth_outcome {
94 0 : crate::sasl::Outcome::Success(key) => {
95 0 : info!("user successfully authenticated");
96 0 : Ok(key)
97 : }
98 0 : crate::sasl::Outcome::Failure(reason) => {
99 0 : info!("auth backend failed with an error: {reason}");
100 0 : Err(AuthError::password_failed(&*user_info.user))
101 : }
102 : };
103 0 : res.map(|key| ComputeCredentials {
104 0 : info: user_info,
105 0 : keys: key,
106 0 : })
107 0 : }
108 :
109 0 : pub(crate) async fn authenticate_with_jwt(
110 0 : &self,
111 0 : ctx: &RequestContext,
112 0 : user_info: &ComputeUserInfo,
113 0 : jwt: String,
114 0 : ) -> Result<ComputeCredentials, AuthError> {
115 0 : ctx.set_auth_method(crate::context::AuthMethod::Jwt);
116 :
117 0 : match &self.auth_backend {
118 0 : crate::auth::Backend::ControlPlane(console, ()) => {
119 0 : let keys = self
120 0 : .config
121 0 : .authentication_config
122 0 : .jwks_cache
123 0 : .check_jwt(
124 0 : ctx,
125 0 : user_info.endpoint.clone(),
126 0 : &user_info.user,
127 0 : &**console,
128 0 : &jwt,
129 0 : )
130 0 : .await?;
131 :
132 0 : Ok(ComputeCredentials {
133 0 : info: user_info.clone(),
134 0 : keys,
135 0 : })
136 : }
137 : crate::auth::Backend::Local(_) => {
138 0 : let keys = self
139 0 : .config
140 0 : .authentication_config
141 0 : .jwks_cache
142 0 : .check_jwt(
143 0 : ctx,
144 0 : user_info.endpoint.clone(),
145 0 : &user_info.user,
146 0 : &StaticAuthRules,
147 0 : &jwt,
148 0 : )
149 0 : .await?;
150 :
151 0 : Ok(ComputeCredentials {
152 0 : info: user_info.clone(),
153 0 : keys,
154 0 : })
155 : }
156 : }
157 0 : }
158 :
159 : // Wake up the destination if needed. Code here is a bit involved because
160 : // we reuse the code from the usual proxy and we need to prepare few structures
161 : // that this code expects.
162 : #[tracing::instrument(skip_all, fields(
163 : pid = tracing::field::Empty,
164 : compute_id = tracing::field::Empty,
165 : conn_id = tracing::field::Empty,
166 : ))]
167 : pub(crate) async fn connect_to_compute(
168 : &self,
169 : ctx: &RequestContext,
170 : conn_info: ConnInfo,
171 : keys: ComputeCredentials,
172 : force_new: bool,
173 : ) -> Result<Client<postgres_client::Client>, HttpConnError> {
174 : let maybe_client = if force_new {
175 : debug!("pool: pool is disabled");
176 : None
177 : } else {
178 : debug!("pool: looking for an existing connection");
179 : self.pool.get(ctx, &conn_info)?
180 : };
181 :
182 : if let Some(client) = maybe_client {
183 : return Ok(client);
184 : }
185 : let conn_id = uuid::Uuid::new_v4();
186 : tracing::Span::current().record("conn_id", display(conn_id));
187 : info!(%conn_id, "pool: opening a new connection '{conn_info}'");
188 : let backend = self.auth_backend.as_ref().map(|()| keys.info);
189 : crate::proxy::connect_compute::connect_to_compute(
190 : ctx,
191 : &TokioMechanism {
192 : conn_id,
193 : conn_info,
194 : pool: self.pool.clone(),
195 : locks: &self.config.connect_compute_locks,
196 : keys: keys.keys,
197 : },
198 : &backend,
199 : self.config.wake_compute_retry_config,
200 : &self.config.connect_to_compute,
201 : )
202 : .await
203 : }
204 :
205 : // Wake up the destination if needed
206 : #[tracing::instrument(skip_all, fields(
207 : compute_id = tracing::field::Empty,
208 : conn_id = tracing::field::Empty,
209 : ))]
210 : pub(crate) async fn connect_to_local_proxy(
211 : &self,
212 : ctx: &RequestContext,
213 : conn_info: ConnInfo,
214 : ) -> Result<http_conn_pool::Client<LocalProxyClient>, HttpConnError> {
215 : debug!("pool: looking for an existing connection");
216 : if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
217 : return Ok(client);
218 : }
219 :
220 : let conn_id = uuid::Uuid::new_v4();
221 : tracing::Span::current().record("conn_id", display(conn_id));
222 : debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
223 : let backend = self.auth_backend.as_ref().map(|()| ComputeUserInfo {
224 0 : user: conn_info.user_info.user.clone(),
225 0 : endpoint: EndpointId::from(format!(
226 0 : "{}{LOCAL_PROXY_SUFFIX}",
227 0 : conn_info.user_info.endpoint.normalize()
228 : )),
229 0 : options: conn_info.user_info.options.clone(),
230 0 : });
231 : crate::proxy::connect_compute::connect_to_compute(
232 : ctx,
233 : &HyperMechanism {
234 : conn_id,
235 : conn_info,
236 : pool: self.http_conn_pool.clone(),
237 : locks: &self.config.connect_compute_locks,
238 : },
239 : &backend,
240 : self.config.wake_compute_retry_config,
241 : &self.config.connect_to_compute,
242 : )
243 : .await
244 : }
245 :
246 : /// Connect to postgres over localhost.
247 : ///
248 : /// We expect postgres to be started here, so we won't do any retries.
249 : ///
250 : /// # Panics
251 : ///
252 : /// Panics if called with a non-local_proxy backend.
253 : #[tracing::instrument(skip_all, fields(
254 : pid = tracing::field::Empty,
255 : conn_id = tracing::field::Empty,
256 : ))]
257 : pub(crate) async fn connect_to_local_postgres(
258 : &self,
259 : ctx: &RequestContext,
260 : conn_info: ConnInfo,
261 : disable_pg_session_jwt: bool,
262 : ) -> Result<Client<postgres_client::Client>, HttpConnError> {
263 : if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
264 : return Ok(client);
265 : }
266 :
267 : let local_backend = match &self.auth_backend {
268 : auth::Backend::ControlPlane(_, ()) => {
269 : unreachable!("only local_proxy can connect to local postgres")
270 : }
271 : auth::Backend::Local(local) => local,
272 : };
273 :
274 : if !self.local_pool.initialized(&conn_info) {
275 : // only install and grant usage one at a time.
276 : let _permit = local_backend
277 : .initialize
278 : .acquire()
279 : .await
280 : .expect("semaphore should never be closed");
281 :
282 : // check again for race
283 : if !self.local_pool.initialized(&conn_info) && !disable_pg_session_jwt {
284 : local_backend
285 : .compute_ctl
286 : .install_extension(&ExtensionInstallRequest {
287 : extension: EXT_NAME,
288 : database: conn_info.dbname.clone(),
289 : version: EXT_VERSION,
290 : })
291 : .await?;
292 :
293 : local_backend
294 : .compute_ctl
295 : .grant_role(&SetRoleGrantsRequest {
296 : schema: EXT_SCHEMA,
297 : privileges: vec![Privilege::Usage],
298 : database: conn_info.dbname.clone(),
299 : role: conn_info.user_info.user.clone(),
300 : })
301 : .await?;
302 :
303 : self.local_pool.set_initialized(&conn_info);
304 : }
305 : }
306 :
307 : let conn_id = uuid::Uuid::new_v4();
308 : tracing::Span::current().record("conn_id", display(conn_id));
309 : info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
310 :
311 : let (key, jwk) = create_random_jwk();
312 :
313 : let mut config = local_backend
314 : .node_info
315 : .conn_info
316 : .to_postgres_client_config();
317 : config
318 : .user(&conn_info.user_info.user)
319 : .dbname(&conn_info.dbname);
320 : if !disable_pg_session_jwt {
321 : config.set_param(
322 : "options",
323 : &format!(
324 : "-c pg_session_jwt.jwk={}",
325 : serde_json::to_string(&jwk).expect("serializing jwk to json should not fail")
326 : ),
327 : );
328 : }
329 :
330 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
331 : let (client, connection) = config.connect(&postgres_client::NoTls).await?;
332 : drop(pause);
333 :
334 : let pid = client.get_process_id();
335 : tracing::Span::current().record("pid", pid);
336 :
337 : let mut handle = local_conn_pool::poll_client(
338 : self.local_pool.clone(),
339 : ctx,
340 : conn_info,
341 : client,
342 : connection,
343 : key,
344 : conn_id,
345 : local_backend.node_info.aux.clone(),
346 : );
347 :
348 : {
349 : let (client, mut discard) = handle.inner();
350 : debug!("setting up backend session state");
351 :
352 : // initiates the auth session
353 : if !disable_pg_session_jwt
354 : && let Err(e) = client.batch_execute("select auth.init();").await
355 : {
356 : discard.discard();
357 : return Err(e.into());
358 : }
359 :
360 : info!("backend session state initialized");
361 : }
362 :
363 : Ok(handle)
364 : }
365 : }
366 :
367 0 : fn create_random_jwk() -> (SigningKey, jose_jwk::Key) {
368 0 : let key = SigningKey::generate(&mut OsRng);
369 :
370 0 : let jwk = jose_jwk::Key::Okp(jose_jwk::Okp {
371 0 : crv: jose_jwk::OkpCurves::Ed25519,
372 0 : x: jose_b64::serde::Bytes::from(key.verifying_key().to_bytes().to_vec()),
373 0 : d: None,
374 0 : });
375 :
376 0 : (key, jwk)
377 0 : }
378 :
379 : #[derive(Debug, thiserror::Error)]
380 : pub(crate) enum HttpConnError {
381 : #[error("pooled connection closed at inconsistent state")]
382 : ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
383 : #[error("could not connect to postgres in compute")]
384 : PostgresConnectionError(#[from] postgres_client::Error),
385 : #[error("could not connect to local-proxy in compute")]
386 : LocalProxyConnectionError(#[from] LocalProxyConnError),
387 : #[error("could not parse JWT payload")]
388 : JwtPayloadError(serde_json::Error),
389 :
390 : #[error("could not install extension: {0}")]
391 : ComputeCtl(#[from] ComputeCtlError),
392 : #[error("could not get auth info")]
393 : GetAuthInfo(#[from] GetAuthInfoError),
394 : #[error("user not authenticated")]
395 : AuthError(#[from] AuthError),
396 : #[error("wake_compute returned error")]
397 : WakeCompute(#[from] WakeComputeError),
398 : #[error("error acquiring resource permit: {0}")]
399 : TooManyConnectionAttempts(#[from] ApiLockError),
400 : }
401 :
402 : #[derive(Debug, thiserror::Error)]
403 : pub(crate) enum LocalProxyConnError {
404 : #[error("error with connection to local-proxy")]
405 : Io(#[source] std::io::Error),
406 : #[error("could not establish h2 connection")]
407 : H2(#[from] hyper::Error),
408 : }
409 :
410 : impl ReportableError for HttpConnError {
411 0 : fn get_error_kind(&self) -> ErrorKind {
412 0 : match self {
413 0 : HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
414 0 : HttpConnError::PostgresConnectionError(p) => {
415 0 : if p.as_db_error().is_some() {
416 : // postgres rejected the connection
417 0 : ErrorKind::Postgres
418 : } else {
419 : // couldn't even reach postgres
420 0 : ErrorKind::Compute
421 : }
422 : }
423 0 : HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
424 0 : HttpConnError::ComputeCtl(_) => ErrorKind::Service,
425 0 : HttpConnError::JwtPayloadError(_) => ErrorKind::User,
426 0 : HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
427 0 : HttpConnError::AuthError(a) => a.get_error_kind(),
428 0 : HttpConnError::WakeCompute(w) => w.get_error_kind(),
429 0 : HttpConnError::TooManyConnectionAttempts(w) => w.get_error_kind(),
430 : }
431 0 : }
432 : }
433 :
434 : impl UserFacingError for HttpConnError {
435 0 : fn to_string_client(&self) -> String {
436 0 : match self {
437 0 : HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
438 0 : HttpConnError::PostgresConnectionError(p) => p.to_string(),
439 0 : HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
440 0 : HttpConnError::ComputeCtl(_) => "could not set up the JWT authorization database extension".to_string(),
441 0 : HttpConnError::JwtPayloadError(p) => p.to_string(),
442 0 : HttpConnError::GetAuthInfo(c) => c.to_string_client(),
443 0 : HttpConnError::AuthError(c) => c.to_string_client(),
444 0 : HttpConnError::WakeCompute(c) => c.to_string_client(),
445 : HttpConnError::TooManyConnectionAttempts(_) => {
446 0 : "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
447 : }
448 : }
449 0 : }
450 : }
451 :
452 : impl CouldRetry for HttpConnError {
453 0 : fn could_retry(&self) -> bool {
454 0 : match self {
455 0 : HttpConnError::PostgresConnectionError(e) => e.could_retry(),
456 0 : HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
457 0 : HttpConnError::ComputeCtl(_) => false,
458 0 : HttpConnError::ConnectionClosedAbruptly(_) => false,
459 0 : HttpConnError::JwtPayloadError(_) => false,
460 0 : HttpConnError::GetAuthInfo(_) => false,
461 0 : HttpConnError::AuthError(_) => false,
462 0 : HttpConnError::WakeCompute(_) => false,
463 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
464 : }
465 0 : }
466 : }
467 : impl ShouldRetryWakeCompute for HttpConnError {
468 0 : fn should_retry_wake_compute(&self) -> bool {
469 0 : match self {
470 0 : HttpConnError::PostgresConnectionError(e) => e.should_retry_wake_compute(),
471 : // we never checked cache validity
472 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
473 0 : _ => true,
474 : }
475 0 : }
476 : }
477 :
478 : impl ReportableError for LocalProxyConnError {
479 0 : fn get_error_kind(&self) -> ErrorKind {
480 0 : match self {
481 0 : LocalProxyConnError::Io(_) => ErrorKind::Compute,
482 0 : LocalProxyConnError::H2(_) => ErrorKind::Compute,
483 : }
484 0 : }
485 : }
486 :
487 : impl UserFacingError for LocalProxyConnError {
488 0 : fn to_string_client(&self) -> String {
489 0 : "Could not establish HTTP connection to the database".to_string()
490 0 : }
491 : }
492 :
493 : impl CouldRetry for LocalProxyConnError {
494 0 : fn could_retry(&self) -> bool {
495 0 : match self {
496 0 : LocalProxyConnError::Io(_) => false,
497 0 : LocalProxyConnError::H2(_) => false,
498 : }
499 0 : }
500 : }
501 : impl ShouldRetryWakeCompute for LocalProxyConnError {
502 0 : fn should_retry_wake_compute(&self) -> bool {
503 0 : match self {
504 0 : LocalProxyConnError::Io(_) => false,
505 0 : LocalProxyConnError::H2(_) => false,
506 : }
507 0 : }
508 : }
509 :
510 : struct TokioMechanism {
511 : pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
512 : conn_info: ConnInfo,
513 : conn_id: uuid::Uuid,
514 : keys: ComputeCredentialKeys,
515 :
516 : /// connect_to_compute concurrency lock
517 : locks: &'static ApiLocks<Host>,
518 : }
519 :
520 : #[async_trait]
521 : impl ConnectMechanism for TokioMechanism {
522 : type Connection = Client<postgres_client::Client>;
523 : type ConnectError = HttpConnError;
524 : type Error = HttpConnError;
525 :
526 0 : async fn connect_once(
527 : &self,
528 : ctx: &RequestContext,
529 : node_info: &CachedNodeInfo,
530 : compute_config: &ComputeConfig,
531 0 : ) -> Result<Self::Connection, Self::ConnectError> {
532 0 : let permit = self.locks.get_permit(&node_info.conn_info.host).await?;
533 :
534 0 : let mut config = node_info.conn_info.to_postgres_client_config();
535 0 : let config = config
536 0 : .user(&self.conn_info.user_info.user)
537 0 : .dbname(&self.conn_info.dbname)
538 0 : .connect_timeout(compute_config.timeout);
539 :
540 0 : if let ComputeCredentialKeys::AuthKeys(auth_keys) = self.keys {
541 0 : config.auth_keys(auth_keys);
542 0 : }
543 :
544 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
545 0 : let res = config.connect(compute_config).await;
546 0 : drop(pause);
547 0 : let (client, connection) = permit.release_result(res)?;
548 :
549 0 : tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
550 0 : tracing::Span::current().record(
551 0 : "compute_id",
552 0 : tracing::field::display(&node_info.aux.compute_id),
553 : );
554 :
555 0 : if let Some(query_id) = ctx.get_testodrome_id() {
556 0 : info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
557 0 : }
558 :
559 0 : Ok(poll_client(
560 0 : self.pool.clone(),
561 0 : ctx,
562 0 : self.conn_info.clone(),
563 0 : client,
564 0 : connection,
565 0 : self.conn_id,
566 0 : node_info.aux.clone(),
567 0 : ))
568 0 : }
569 : }
570 :
571 : struct HyperMechanism {
572 : pool: Arc<GlobalConnPool<LocalProxyClient, HttpConnPool<LocalProxyClient>>>,
573 : conn_info: ConnInfo,
574 : conn_id: uuid::Uuid,
575 :
576 : /// connect_to_compute concurrency lock
577 : locks: &'static ApiLocks<Host>,
578 : }
579 :
580 : #[async_trait]
581 : impl ConnectMechanism for HyperMechanism {
582 : type Connection = http_conn_pool::Client<LocalProxyClient>;
583 : type ConnectError = HttpConnError;
584 : type Error = HttpConnError;
585 :
586 0 : async fn connect_once(
587 : &self,
588 : ctx: &RequestContext,
589 : node_info: &CachedNodeInfo,
590 : config: &ComputeConfig,
591 0 : ) -> Result<Self::Connection, Self::ConnectError> {
592 0 : let host_addr = node_info.conn_info.host_addr;
593 0 : let host = &node_info.conn_info.host;
594 0 : let permit = self.locks.get_permit(host).await?;
595 :
596 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
597 :
598 0 : let tls = if node_info.conn_info.ssl_mode == SslMode::Disable {
599 0 : None
600 : } else {
601 0 : Some(&config.tls)
602 : };
603 :
604 0 : let port = node_info.conn_info.port;
605 0 : let res = connect_http2(host_addr, host, port, config.timeout, tls).await;
606 0 : drop(pause);
607 0 : let (client, connection) = permit.release_result(res)?;
608 :
609 0 : tracing::Span::current().record(
610 0 : "compute_id",
611 0 : tracing::field::display(&node_info.aux.compute_id),
612 : );
613 :
614 0 : if let Some(query_id) = ctx.get_testodrome_id() {
615 0 : info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
616 0 : }
617 :
618 0 : Ok(poll_http2_client(
619 0 : self.pool.clone(),
620 0 : ctx,
621 0 : &self.conn_info,
622 0 : client,
623 0 : connection,
624 0 : self.conn_id,
625 0 : node_info.aux.clone(),
626 0 : ))
627 0 : }
628 : }
629 :
630 0 : async fn connect_http2(
631 0 : host_addr: Option<IpAddr>,
632 0 : host: &str,
633 0 : port: u16,
634 0 : timeout: Duration,
635 0 : tls: Option<&Arc<rustls::ClientConfig>>,
636 0 : ) -> Result<
637 0 : (
638 0 : http_conn_pool::LocalProxyClient,
639 0 : http_conn_pool::LocalProxyConnection,
640 0 : ),
641 0 : LocalProxyConnError,
642 0 : > {
643 0 : let addrs = match host_addr {
644 0 : Some(addr) => vec![SocketAddr::new(addr, port)],
645 0 : None => lookup_host((host, port))
646 0 : .await
647 0 : .map_err(LocalProxyConnError::Io)?
648 0 : .collect(),
649 : };
650 0 : let mut last_err = None;
651 :
652 0 : let mut addrs = addrs.into_iter();
653 0 : let stream = loop {
654 0 : let Some(addr) = addrs.next() else {
655 0 : return Err(last_err.unwrap_or_else(|| {
656 0 : LocalProxyConnError::Io(io::Error::new(
657 0 : io::ErrorKind::InvalidInput,
658 0 : "could not resolve any addresses",
659 0 : ))
660 0 : }));
661 : };
662 :
663 0 : match tokio::time::timeout(timeout, TcpStream::connect(addr)).await {
664 0 : Ok(Ok(stream)) => {
665 0 : stream.set_nodelay(true).map_err(LocalProxyConnError::Io)?;
666 0 : break stream;
667 : }
668 0 : Ok(Err(e)) => {
669 0 : last_err = Some(LocalProxyConnError::Io(e));
670 0 : }
671 0 : Err(e) => {
672 0 : last_err = Some(LocalProxyConnError::Io(io::Error::new(
673 0 : io::ErrorKind::TimedOut,
674 0 : e,
675 0 : )));
676 0 : }
677 : }
678 : };
679 :
680 0 : let stream = if let Some(tls) = tls {
681 0 : let host = DnsName::try_from(host)
682 0 : .map_err(io::Error::other)
683 0 : .map_err(LocalProxyConnError::Io)?
684 0 : .to_owned();
685 0 : let stream = TlsConnector::from(tls.clone())
686 0 : .connect(ServerName::DnsName(host), stream)
687 0 : .await
688 0 : .map_err(LocalProxyConnError::Io)?;
689 0 : Box::pin(stream) as AsyncRW
690 : } else {
691 0 : Box::pin(stream) as AsyncRW
692 : };
693 :
694 0 : let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
695 0 : .timer(TokioTimer::new())
696 0 : .keep_alive_interval(Duration::from_secs(20))
697 0 : .keep_alive_while_idle(true)
698 0 : .keep_alive_timeout(Duration::from_secs(5))
699 0 : .handshake(TokioIo::new(stream))
700 0 : .await?;
701 :
702 0 : Ok((client, connection))
703 0 : }
|