Line data Source code
1 : use std::io;
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use async_trait::async_trait;
6 : use ed25519_dalek::SigningKey;
7 : use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
8 : use jose_jwk::jose_b64;
9 : use rand::rngs::OsRng;
10 : use tokio::net::{lookup_host, TcpStream};
11 : use tracing::field::display;
12 : use tracing::{debug, info};
13 :
14 : use super::conn_pool::poll_client;
15 : use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
16 : use super::http_conn_pool::{self, poll_http2_client, HttpConnPool, Send};
17 : use super::local_conn_pool::{self, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION};
18 : use crate::auth::backend::local::StaticAuthRules;
19 : use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
20 : use crate::auth::{self, check_peer_addr_is_in_list, AuthError};
21 : use crate::compute;
22 : use crate::compute_ctl::{
23 : ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
24 : };
25 : use crate::config::{ComputeConfig, ProxyConfig};
26 : use crate::context::RequestContext;
27 : use crate::control_plane::client::ApiLockError;
28 : use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
29 : use crate::control_plane::locks::ApiLocks;
30 : use crate::control_plane::CachedNodeInfo;
31 : use crate::error::{ErrorKind, ReportableError, UserFacingError};
32 : use crate::intern::EndpointIdInt;
33 : use crate::proxy::connect_compute::ConnectMechanism;
34 : use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
35 : use crate::rate_limiter::EndpointRateLimiter;
36 : use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
37 :
38 : pub(crate) struct PoolingBackend {
39 : pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
40 : pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
41 : pub(crate) pool:
42 : Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
43 :
44 : pub(crate) config: &'static ProxyConfig,
45 : pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
46 : pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
47 : }
48 :
49 : impl PoolingBackend {
50 0 : pub(crate) async fn authenticate_with_password(
51 0 : &self,
52 0 : ctx: &RequestContext,
53 0 : user_info: &ComputeUserInfo,
54 0 : password: &[u8],
55 0 : ) -> Result<ComputeCredentials, AuthError> {
56 0 : ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
57 0 :
58 0 : let user_info = user_info.clone();
59 0 : let backend = self.auth_backend.as_ref().map(|()| user_info.clone());
60 0 : let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
61 0 : if self.config.authentication_config.ip_allowlist_check_enabled
62 0 : && !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
63 : {
64 0 : return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
65 0 : }
66 0 : if !self
67 0 : .endpoint_rate_limiter
68 0 : .check(user_info.endpoint.clone().into(), 1)
69 : {
70 0 : return Err(AuthError::too_many_connections());
71 0 : }
72 0 : let cached_secret = match maybe_secret {
73 0 : Some(secret) => secret,
74 0 : None => backend.get_role_secret(ctx).await?,
75 : };
76 :
77 0 : let secret = match cached_secret.value.clone() {
78 0 : Some(secret) => self.config.authentication_config.check_rate_limit(
79 0 : ctx,
80 0 : secret,
81 0 : &user_info.endpoint,
82 0 : true,
83 0 : )?,
84 : None => {
85 : // If we don't have an authentication secret, for the http flow we can just return an error.
86 0 : info!("authentication info not found");
87 0 : return Err(AuthError::password_failed(&*user_info.user));
88 : }
89 : };
90 0 : let ep = EndpointIdInt::from(&user_info.endpoint);
91 0 : let auth_outcome = crate::auth::validate_password_and_exchange(
92 0 : &self.config.authentication_config.thread_pool,
93 0 : ep,
94 0 : password,
95 0 : secret,
96 0 : )
97 0 : .await?;
98 0 : let res = match auth_outcome {
99 0 : crate::sasl::Outcome::Success(key) => {
100 0 : info!("user successfully authenticated");
101 0 : Ok(key)
102 : }
103 0 : crate::sasl::Outcome::Failure(reason) => {
104 0 : info!("auth backend failed with an error: {reason}");
105 0 : Err(AuthError::password_failed(&*user_info.user))
106 : }
107 : };
108 0 : res.map(|key| ComputeCredentials {
109 0 : info: user_info,
110 0 : keys: key,
111 0 : })
112 0 : }
113 :
114 0 : pub(crate) async fn authenticate_with_jwt(
115 0 : &self,
116 0 : ctx: &RequestContext,
117 0 : user_info: &ComputeUserInfo,
118 0 : jwt: String,
119 0 : ) -> Result<ComputeCredentials, AuthError> {
120 0 : ctx.set_auth_method(crate::context::AuthMethod::Jwt);
121 0 :
122 0 : match &self.auth_backend {
123 0 : crate::auth::Backend::ControlPlane(console, ()) => {
124 0 : self.config
125 0 : .authentication_config
126 0 : .jwks_cache
127 0 : .check_jwt(
128 0 : ctx,
129 0 : user_info.endpoint.clone(),
130 0 : &user_info.user,
131 0 : &**console,
132 0 : &jwt,
133 0 : )
134 0 : .await?;
135 :
136 0 : Ok(ComputeCredentials {
137 0 : info: user_info.clone(),
138 0 : keys: crate::auth::backend::ComputeCredentialKeys::None,
139 0 : })
140 : }
141 : crate::auth::Backend::Local(_) => {
142 0 : let keys = self
143 0 : .config
144 0 : .authentication_config
145 0 : .jwks_cache
146 0 : .check_jwt(
147 0 : ctx,
148 0 : user_info.endpoint.clone(),
149 0 : &user_info.user,
150 0 : &StaticAuthRules,
151 0 : &jwt,
152 0 : )
153 0 : .await?;
154 :
155 0 : Ok(ComputeCredentials {
156 0 : info: user_info.clone(),
157 0 : keys,
158 0 : })
159 : }
160 : }
161 0 : }
162 :
163 : // Wake up the destination if needed. Code here is a bit involved because
164 : // we reuse the code from the usual proxy and we need to prepare few structures
165 : // that this code expects.
166 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
167 : pub(crate) async fn connect_to_compute(
168 : &self,
169 : ctx: &RequestContext,
170 : conn_info: ConnInfo,
171 : keys: ComputeCredentials,
172 : force_new: bool,
173 : ) -> Result<Client<postgres_client::Client>, HttpConnError> {
174 : let maybe_client = if force_new {
175 : debug!("pool: pool is disabled");
176 : None
177 : } else {
178 : debug!("pool: looking for an existing connection");
179 : self.pool.get(ctx, &conn_info)?
180 : };
181 :
182 : if let Some(client) = maybe_client {
183 : return Ok(client);
184 : }
185 : let conn_id = uuid::Uuid::new_v4();
186 : tracing::Span::current().record("conn_id", display(conn_id));
187 : info!(%conn_id, "pool: opening a new connection '{conn_info}'");
188 0 : let backend = self.auth_backend.as_ref().map(|()| keys);
189 : crate::proxy::connect_compute::connect_to_compute(
190 : ctx,
191 : &TokioMechanism {
192 : conn_id,
193 : conn_info,
194 : pool: self.pool.clone(),
195 : locks: &self.config.connect_compute_locks,
196 : },
197 : &backend,
198 : self.config.wake_compute_retry_config,
199 : &self.config.connect_to_compute,
200 : )
201 : .await
202 : }
203 :
204 : // Wake up the destination if needed
205 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
206 : pub(crate) async fn connect_to_local_proxy(
207 : &self,
208 : ctx: &RequestContext,
209 : conn_info: ConnInfo,
210 : ) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
211 : debug!("pool: looking for an existing connection");
212 : if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
213 : return Ok(client);
214 : }
215 :
216 : let conn_id = uuid::Uuid::new_v4();
217 : tracing::Span::current().record("conn_id", display(conn_id));
218 : debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
219 0 : let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
220 0 : info: ComputeUserInfo {
221 0 : user: conn_info.user_info.user.clone(),
222 0 : endpoint: EndpointId::from(format!(
223 0 : "{}{LOCAL_PROXY_SUFFIX}",
224 0 : conn_info.user_info.endpoint.normalize()
225 0 : )),
226 0 : options: conn_info.user_info.options.clone(),
227 0 : },
228 0 : keys: crate::auth::backend::ComputeCredentialKeys::None,
229 0 : });
230 : crate::proxy::connect_compute::connect_to_compute(
231 : ctx,
232 : &HyperMechanism {
233 : conn_id,
234 : conn_info,
235 : pool: self.http_conn_pool.clone(),
236 : locks: &self.config.connect_compute_locks,
237 : },
238 : &backend,
239 : self.config.wake_compute_retry_config,
240 : &self.config.connect_to_compute,
241 : )
242 : .await
243 : }
244 :
245 : /// Connect to postgres over localhost.
246 : ///
247 : /// We expect postgres to be started here, so we won't do any retries.
248 : ///
249 : /// # Panics
250 : ///
251 : /// Panics if called with a non-local_proxy backend.
252 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
253 : pub(crate) async fn connect_to_local_postgres(
254 : &self,
255 : ctx: &RequestContext,
256 : conn_info: ConnInfo,
257 : ) -> Result<Client<postgres_client::Client>, HttpConnError> {
258 : if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
259 : return Ok(client);
260 : }
261 :
262 : let local_backend = match &self.auth_backend {
263 : auth::Backend::ControlPlane(_, ()) => {
264 : unreachable!("only local_proxy can connect to local postgres")
265 : }
266 : auth::Backend::Local(local) => local,
267 : };
268 :
269 : if !self.local_pool.initialized(&conn_info) {
270 : // only install and grant usage one at a time.
271 : let _permit = local_backend
272 : .initialize
273 : .acquire()
274 : .await
275 : .expect("semaphore should never be closed");
276 :
277 : // check again for race
278 : if !self.local_pool.initialized(&conn_info) {
279 : local_backend
280 : .compute_ctl
281 : .install_extension(&ExtensionInstallRequest {
282 : extension: EXT_NAME,
283 : database: conn_info.dbname.clone(),
284 : version: EXT_VERSION,
285 : })
286 : .await?;
287 :
288 : local_backend
289 : .compute_ctl
290 : .grant_role(&SetRoleGrantsRequest {
291 : schema: EXT_SCHEMA,
292 : privileges: vec![Privilege::Usage],
293 : database: conn_info.dbname.clone(),
294 : role: conn_info.user_info.user.clone(),
295 : })
296 : .await?;
297 :
298 : self.local_pool.set_initialized(&conn_info);
299 : }
300 : }
301 :
302 : let conn_id = uuid::Uuid::new_v4();
303 : tracing::Span::current().record("conn_id", display(conn_id));
304 : info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
305 :
306 : let mut node_info = local_backend.node_info.clone();
307 :
308 : let (key, jwk) = create_random_jwk();
309 :
310 : let config = node_info
311 : .config
312 : .user(&conn_info.user_info.user)
313 : .dbname(&conn_info.dbname)
314 : .set_param(
315 : "options",
316 : &format!(
317 : "-c pg_session_jwt.jwk={}",
318 : serde_json::to_string(&jwk).expect("serializing jwk to json should not fail")
319 : ),
320 : );
321 :
322 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
323 : let (client, connection) = config.connect(postgres_client::NoTls).await?;
324 : drop(pause);
325 :
326 : let pid = client.get_process_id();
327 : tracing::Span::current().record("pid", pid);
328 :
329 : let mut handle = local_conn_pool::poll_client(
330 : self.local_pool.clone(),
331 : ctx,
332 : conn_info,
333 : client,
334 : connection,
335 : key,
336 : conn_id,
337 : node_info.aux.clone(),
338 : );
339 :
340 : {
341 : let (client, mut discard) = handle.inner();
342 : debug!("setting up backend session state");
343 :
344 : // initiates the auth session
345 : if let Err(e) = client.execute("select auth.init()", &[]).await {
346 : discard.discard();
347 : return Err(e.into());
348 : }
349 :
350 : info!("backend session state initialized");
351 : }
352 :
353 : Ok(handle)
354 : }
355 : }
356 :
357 0 : fn create_random_jwk() -> (SigningKey, jose_jwk::Key) {
358 0 : let key = SigningKey::generate(&mut OsRng);
359 0 :
360 0 : let jwk = jose_jwk::Key::Okp(jose_jwk::Okp {
361 0 : crv: jose_jwk::OkpCurves::Ed25519,
362 0 : x: jose_b64::serde::Bytes::from(key.verifying_key().to_bytes().to_vec()),
363 0 : d: None,
364 0 : });
365 0 :
366 0 : (key, jwk)
367 0 : }
368 :
369 : #[derive(Debug, thiserror::Error)]
370 : pub(crate) enum HttpConnError {
371 : #[error("pooled connection closed at inconsistent state")]
372 : ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
373 : #[error("could not connection to postgres in compute")]
374 : PostgresConnectionError(#[from] postgres_client::Error),
375 : #[error("could not connection to local-proxy in compute")]
376 : LocalProxyConnectionError(#[from] LocalProxyConnError),
377 : #[error("could not parse JWT payload")]
378 : JwtPayloadError(serde_json::Error),
379 :
380 : #[error("could not install extension: {0}")]
381 : ComputeCtl(#[from] ComputeCtlError),
382 : #[error("could not get auth info")]
383 : GetAuthInfo(#[from] GetAuthInfoError),
384 : #[error("user not authenticated")]
385 : AuthError(#[from] AuthError),
386 : #[error("wake_compute returned error")]
387 : WakeCompute(#[from] WakeComputeError),
388 : #[error("error acquiring resource permit: {0}")]
389 : TooManyConnectionAttempts(#[from] ApiLockError),
390 : }
391 :
392 : #[derive(Debug, thiserror::Error)]
393 : pub(crate) enum LocalProxyConnError {
394 : #[error("error with connection to local-proxy")]
395 : Io(#[source] std::io::Error),
396 : #[error("could not establish h2 connection")]
397 : H2(#[from] hyper::Error),
398 : }
399 :
400 : impl ReportableError for HttpConnError {
401 0 : fn get_error_kind(&self) -> ErrorKind {
402 0 : match self {
403 0 : HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
404 0 : HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
405 0 : HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
406 0 : HttpConnError::ComputeCtl(_) => ErrorKind::Service,
407 0 : HttpConnError::JwtPayloadError(_) => ErrorKind::User,
408 0 : HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
409 0 : HttpConnError::AuthError(a) => a.get_error_kind(),
410 0 : HttpConnError::WakeCompute(w) => w.get_error_kind(),
411 0 : HttpConnError::TooManyConnectionAttempts(w) => w.get_error_kind(),
412 : }
413 0 : }
414 : }
415 :
416 : impl UserFacingError for HttpConnError {
417 0 : fn to_string_client(&self) -> String {
418 0 : match self {
419 0 : HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
420 0 : HttpConnError::PostgresConnectionError(p) => p.to_string(),
421 0 : HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
422 0 : HttpConnError::ComputeCtl(_) => "could not set up the JWT authorization database extension".to_string(),
423 0 : HttpConnError::JwtPayloadError(p) => p.to_string(),
424 0 : HttpConnError::GetAuthInfo(c) => c.to_string_client(),
425 0 : HttpConnError::AuthError(c) => c.to_string_client(),
426 0 : HttpConnError::WakeCompute(c) => c.to_string_client(),
427 : HttpConnError::TooManyConnectionAttempts(_) => {
428 0 : "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
429 : }
430 : }
431 0 : }
432 : }
433 :
434 : impl CouldRetry for HttpConnError {
435 0 : fn could_retry(&self) -> bool {
436 0 : match self {
437 0 : HttpConnError::PostgresConnectionError(e) => e.could_retry(),
438 0 : HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
439 0 : HttpConnError::ComputeCtl(_) => false,
440 0 : HttpConnError::ConnectionClosedAbruptly(_) => false,
441 0 : HttpConnError::JwtPayloadError(_) => false,
442 0 : HttpConnError::GetAuthInfo(_) => false,
443 0 : HttpConnError::AuthError(_) => false,
444 0 : HttpConnError::WakeCompute(_) => false,
445 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
446 : }
447 0 : }
448 : }
449 : impl ShouldRetryWakeCompute for HttpConnError {
450 0 : fn should_retry_wake_compute(&self) -> bool {
451 0 : match self {
452 0 : HttpConnError::PostgresConnectionError(e) => e.should_retry_wake_compute(),
453 : // we never checked cache validity
454 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
455 0 : _ => true,
456 : }
457 0 : }
458 : }
459 :
460 : impl ReportableError for LocalProxyConnError {
461 0 : fn get_error_kind(&self) -> ErrorKind {
462 0 : match self {
463 0 : LocalProxyConnError::Io(_) => ErrorKind::Compute,
464 0 : LocalProxyConnError::H2(_) => ErrorKind::Compute,
465 : }
466 0 : }
467 : }
468 :
469 : impl UserFacingError for LocalProxyConnError {
470 0 : fn to_string_client(&self) -> String {
471 0 : "Could not establish HTTP connection to the database".to_string()
472 0 : }
473 : }
474 :
475 : impl CouldRetry for LocalProxyConnError {
476 0 : fn could_retry(&self) -> bool {
477 0 : match self {
478 0 : LocalProxyConnError::Io(_) => false,
479 0 : LocalProxyConnError::H2(_) => false,
480 : }
481 0 : }
482 : }
483 : impl ShouldRetryWakeCompute for LocalProxyConnError {
484 0 : fn should_retry_wake_compute(&self) -> bool {
485 0 : match self {
486 0 : LocalProxyConnError::Io(_) => false,
487 0 : LocalProxyConnError::H2(_) => false,
488 : }
489 0 : }
490 : }
491 :
492 : struct TokioMechanism {
493 : pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
494 : conn_info: ConnInfo,
495 : conn_id: uuid::Uuid,
496 :
497 : /// connect_to_compute concurrency lock
498 : locks: &'static ApiLocks<Host>,
499 : }
500 :
501 : #[async_trait]
502 : impl ConnectMechanism for TokioMechanism {
503 : type Connection = Client<postgres_client::Client>;
504 : type ConnectError = HttpConnError;
505 : type Error = HttpConnError;
506 :
507 0 : async fn connect_once(
508 0 : &self,
509 0 : ctx: &RequestContext,
510 0 : node_info: &CachedNodeInfo,
511 0 : compute_config: &ComputeConfig,
512 0 : ) -> Result<Self::Connection, Self::ConnectError> {
513 0 : let host = node_info.config.get_host();
514 0 : let permit = self.locks.get_permit(&host).await?;
515 :
516 0 : let mut config = (*node_info.config).clone();
517 0 : let config = config
518 0 : .user(&self.conn_info.user_info.user)
519 0 : .dbname(&self.conn_info.dbname)
520 0 : .connect_timeout(compute_config.timeout);
521 0 :
522 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
523 0 : let res = config.connect(postgres_client::NoTls).await;
524 0 : drop(pause);
525 0 : let (client, connection) = permit.release_result(res)?;
526 :
527 0 : tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
528 0 : Ok(poll_client(
529 0 : self.pool.clone(),
530 0 : ctx,
531 0 : self.conn_info.clone(),
532 0 : client,
533 0 : connection,
534 0 : self.conn_id,
535 0 : node_info.aux.clone(),
536 0 : ))
537 0 : }
538 :
539 0 : fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
540 : }
541 :
542 : struct HyperMechanism {
543 : pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
544 : conn_info: ConnInfo,
545 : conn_id: uuid::Uuid,
546 :
547 : /// connect_to_compute concurrency lock
548 : locks: &'static ApiLocks<Host>,
549 : }
550 :
551 : #[async_trait]
552 : impl ConnectMechanism for HyperMechanism {
553 : type Connection = http_conn_pool::Client<Send>;
554 : type ConnectError = HttpConnError;
555 : type Error = HttpConnError;
556 :
557 0 : async fn connect_once(
558 0 : &self,
559 0 : ctx: &RequestContext,
560 0 : node_info: &CachedNodeInfo,
561 0 : config: &ComputeConfig,
562 0 : ) -> Result<Self::Connection, Self::ConnectError> {
563 0 : let host = node_info.config.get_host();
564 0 : let permit = self.locks.get_permit(&host).await?;
565 :
566 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
567 0 :
568 0 : let port = node_info.config.get_port();
569 0 : let res = connect_http2(&host, port, config.timeout).await;
570 0 : drop(pause);
571 0 : let (client, connection) = permit.release_result(res)?;
572 :
573 0 : Ok(poll_http2_client(
574 0 : self.pool.clone(),
575 0 : ctx,
576 0 : &self.conn_info,
577 0 : client,
578 0 : connection,
579 0 : self.conn_id,
580 0 : node_info.aux.clone(),
581 0 : ))
582 0 : }
583 :
584 0 : fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
585 : }
586 :
587 0 : async fn connect_http2(
588 0 : host: &str,
589 0 : port: u16,
590 0 : timeout: Duration,
591 0 : ) -> Result<(http_conn_pool::Send, http_conn_pool::Connect), LocalProxyConnError> {
592 : // assumption: host is an ip address so this should not actually perform any requests.
593 : // todo: add that assumption as a guarantee in the control-plane API.
594 0 : let mut addrs = lookup_host((host, port))
595 0 : .await
596 0 : .map_err(LocalProxyConnError::Io)?;
597 :
598 0 : let mut last_err = None;
599 :
600 0 : let stream = loop {
601 0 : let Some(addr) = addrs.next() else {
602 0 : return Err(last_err.unwrap_or_else(|| {
603 0 : LocalProxyConnError::Io(io::Error::new(
604 0 : io::ErrorKind::InvalidInput,
605 0 : "could not resolve any addresses",
606 0 : ))
607 0 : }));
608 : };
609 :
610 0 : match tokio::time::timeout(timeout, TcpStream::connect(addr)).await {
611 0 : Ok(Ok(stream)) => {
612 0 : stream.set_nodelay(true).map_err(LocalProxyConnError::Io)?;
613 0 : break stream;
614 : }
615 0 : Ok(Err(e)) => {
616 0 : last_err = Some(LocalProxyConnError::Io(e));
617 0 : }
618 0 : Err(e) => {
619 0 : last_err = Some(LocalProxyConnError::Io(io::Error::new(
620 0 : io::ErrorKind::TimedOut,
621 0 : e,
622 0 : )));
623 0 : }
624 : };
625 : };
626 :
627 0 : let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
628 0 : .timer(TokioTimer::new())
629 0 : .keep_alive_interval(Duration::from_secs(20))
630 0 : .keep_alive_while_idle(true)
631 0 : .keep_alive_timeout(Duration::from_secs(5))
632 0 : .handshake(TokioIo::new(stream))
633 0 : .await?;
634 :
635 0 : Ok((client, connection))
636 0 : }
|