Line data Source code
1 : use std::io;
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use async_trait::async_trait;
6 : use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
7 : use p256::ecdsa::SigningKey;
8 : use p256::elliptic_curve::JwkEcKey;
9 : use rand::rngs::OsRng;
10 : use tokio::net::{lookup_host, TcpStream};
11 : use tracing::field::display;
12 : use tracing::{debug, info};
13 :
14 : use super::conn_pool::poll_client;
15 : use super::conn_pool_lib::{Client, ConnInfo, GlobalConnPool};
16 : use super::http_conn_pool::{self, poll_http2_client, Send};
17 : use super::local_conn_pool::{self, LocalClient, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION};
18 : use crate::auth::backend::local::StaticAuthRules;
19 : use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
20 : use crate::auth::{self, check_peer_addr_is_in_list, AuthError};
21 : use crate::compute;
22 : use crate::compute_ctl::{
23 : ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
24 : };
25 : use crate::config::ProxyConfig;
26 : use crate::context::RequestMonitoring;
27 : use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
28 : use crate::control_plane::locks::ApiLocks;
29 : use crate::control_plane::provider::ApiLockError;
30 : use crate::control_plane::CachedNodeInfo;
31 : use crate::error::{ErrorKind, ReportableError, UserFacingError};
32 : use crate::intern::EndpointIdInt;
33 : use crate::proxy::connect_compute::ConnectMechanism;
34 : use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
35 : use crate::rate_limiter::EndpointRateLimiter;
36 : use crate::types::{EndpointId, Host};
37 :
38 : pub(crate) struct PoolingBackend {
39 : pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool<Send>>,
40 : pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
41 : pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
42 :
43 : pub(crate) config: &'static ProxyConfig,
44 : pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
45 : pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
46 : }
47 :
48 : impl PoolingBackend {
49 0 : pub(crate) async fn authenticate_with_password(
50 0 : &self,
51 0 : ctx: &RequestMonitoring,
52 0 : user_info: &ComputeUserInfo,
53 0 : password: &[u8],
54 0 : ) -> Result<ComputeCredentials, AuthError> {
55 0 : let user_info = user_info.clone();
56 0 : let backend = self.auth_backend.as_ref().map(|()| user_info.clone());
57 0 : let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
58 0 : if self.config.authentication_config.ip_allowlist_check_enabled
59 0 : && !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
60 : {
61 0 : return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
62 0 : }
63 0 : if !self
64 0 : .endpoint_rate_limiter
65 0 : .check(user_info.endpoint.clone().into(), 1)
66 : {
67 0 : return Err(AuthError::too_many_connections());
68 0 : }
69 0 : let cached_secret = match maybe_secret {
70 0 : Some(secret) => secret,
71 0 : None => backend.get_role_secret(ctx).await?,
72 : };
73 :
74 0 : let secret = match cached_secret.value.clone() {
75 0 : Some(secret) => self.config.authentication_config.check_rate_limit(
76 0 : ctx,
77 0 : secret,
78 0 : &user_info.endpoint,
79 0 : true,
80 0 : )?,
81 : None => {
82 : // If we don't have an authentication secret, for the http flow we can just return an error.
83 0 : info!("authentication info not found");
84 0 : return Err(AuthError::auth_failed(&*user_info.user));
85 : }
86 : };
87 0 : let ep = EndpointIdInt::from(&user_info.endpoint);
88 0 : let auth_outcome = crate::auth::validate_password_and_exchange(
89 0 : &self.config.authentication_config.thread_pool,
90 0 : ep,
91 0 : password,
92 0 : secret,
93 0 : )
94 0 : .await?;
95 0 : let res = match auth_outcome {
96 0 : crate::sasl::Outcome::Success(key) => {
97 0 : info!("user successfully authenticated");
98 0 : Ok(key)
99 : }
100 0 : crate::sasl::Outcome::Failure(reason) => {
101 0 : info!("auth backend failed with an error: {reason}");
102 0 : Err(AuthError::auth_failed(&*user_info.user))
103 : }
104 : };
105 0 : res.map(|key| ComputeCredentials {
106 0 : info: user_info,
107 0 : keys: key,
108 0 : })
109 0 : }
110 :
111 0 : pub(crate) async fn authenticate_with_jwt(
112 0 : &self,
113 0 : ctx: &RequestMonitoring,
114 0 : user_info: &ComputeUserInfo,
115 0 : jwt: String,
116 0 : ) -> Result<ComputeCredentials, AuthError> {
117 0 : match &self.auth_backend {
118 0 : crate::auth::Backend::ControlPlane(console, ()) => {
119 0 : self.config
120 0 : .authentication_config
121 0 : .jwks_cache
122 0 : .check_jwt(
123 0 : ctx,
124 0 : user_info.endpoint.clone(),
125 0 : &user_info.user,
126 0 : &**console,
127 0 : &jwt,
128 0 : )
129 0 : .await
130 0 : .map_err(|e| AuthError::auth_failed(e.to_string()))?;
131 :
132 0 : Ok(ComputeCredentials {
133 0 : info: user_info.clone(),
134 0 : keys: crate::auth::backend::ComputeCredentialKeys::None,
135 0 : })
136 : }
137 : crate::auth::Backend::Local(_) => {
138 0 : let keys = self
139 0 : .config
140 0 : .authentication_config
141 0 : .jwks_cache
142 0 : .check_jwt(
143 0 : ctx,
144 0 : user_info.endpoint.clone(),
145 0 : &user_info.user,
146 0 : &StaticAuthRules,
147 0 : &jwt,
148 0 : )
149 0 : .await
150 0 : .map_err(|e| AuthError::auth_failed(e.to_string()))?;
151 :
152 0 : Ok(ComputeCredentials {
153 0 : info: user_info.clone(),
154 0 : keys,
155 0 : })
156 : }
157 : }
158 0 : }
159 :
160 : // Wake up the destination if needed. Code here is a bit involved because
161 : // we reuse the code from the usual proxy and we need to prepare few structures
162 : // that this code expects.
163 0 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
164 : pub(crate) async fn connect_to_compute(
165 : &self,
166 : ctx: &RequestMonitoring,
167 : conn_info: ConnInfo,
168 : keys: ComputeCredentials,
169 : force_new: bool,
170 : ) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
171 : let maybe_client = if force_new {
172 : info!("pool: pool is disabled");
173 : None
174 : } else {
175 : info!("pool: looking for an existing connection");
176 : self.pool.get(ctx, &conn_info)?
177 : };
178 :
179 : if let Some(client) = maybe_client {
180 : return Ok(client);
181 : }
182 : let conn_id = uuid::Uuid::new_v4();
183 : tracing::Span::current().record("conn_id", display(conn_id));
184 : info!(%conn_id, "pool: opening a new connection '{conn_info}'");
185 0 : let backend = self.auth_backend.as_ref().map(|()| keys);
186 : crate::proxy::connect_compute::connect_to_compute(
187 : ctx,
188 : &TokioMechanism {
189 : conn_id,
190 : conn_info,
191 : pool: self.pool.clone(),
192 : locks: &self.config.connect_compute_locks,
193 : },
194 : &backend,
195 : false, // do not allow self signed compute for http flow
196 : self.config.wake_compute_retry_config,
197 : self.config.connect_to_compute_retry_config,
198 : )
199 : .await
200 : }
201 :
202 : // Wake up the destination if needed
203 0 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
204 : pub(crate) async fn connect_to_local_proxy(
205 : &self,
206 : ctx: &RequestMonitoring,
207 : conn_info: ConnInfo,
208 : ) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
209 : info!("pool: looking for an existing connection");
210 : if let Some(client) = self.http_conn_pool.get(ctx, &conn_info) {
211 : return Ok(client);
212 : }
213 :
214 : let conn_id = uuid::Uuid::new_v4();
215 : tracing::Span::current().record("conn_id", display(conn_id));
216 : info!(%conn_id, "pool: opening a new connection '{conn_info}'");
217 0 : let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
218 0 : info: ComputeUserInfo {
219 0 : user: conn_info.user_info.user.clone(),
220 0 : endpoint: EndpointId::from(format!("{}-local-proxy", conn_info.user_info.endpoint)),
221 0 : options: conn_info.user_info.options.clone(),
222 0 : },
223 0 : keys: crate::auth::backend::ComputeCredentialKeys::None,
224 0 : });
225 : crate::proxy::connect_compute::connect_to_compute(
226 : ctx,
227 : &HyperMechanism {
228 : conn_id,
229 : conn_info,
230 : pool: self.http_conn_pool.clone(),
231 : locks: &self.config.connect_compute_locks,
232 : },
233 : &backend,
234 : false, // do not allow self signed compute for http flow
235 : self.config.wake_compute_retry_config,
236 : self.config.connect_to_compute_retry_config,
237 : )
238 : .await
239 : }
240 :
241 : /// Connect to postgres over localhost.
242 : ///
243 : /// We expect postgres to be started here, so we won't do any retries.
244 : ///
245 : /// # Panics
246 : ///
247 : /// Panics if called with a non-local_proxy backend.
248 0 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
249 : pub(crate) async fn connect_to_local_postgres(
250 : &self,
251 : ctx: &RequestMonitoring,
252 : conn_info: ConnInfo,
253 : ) -> Result<LocalClient<tokio_postgres::Client>, HttpConnError> {
254 : if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
255 : return Ok(client);
256 : }
257 :
258 : let local_backend = match &self.auth_backend {
259 : auth::Backend::ControlPlane(_, ()) => {
260 : unreachable!("only local_proxy can connect to local postgres")
261 : }
262 : auth::Backend::Local(local) => local,
263 : };
264 :
265 : if !self.local_pool.initialized(&conn_info) {
266 : // only install and grant usage one at a time.
267 : let _permit = local_backend.initialize.acquire().await.unwrap();
268 :
269 : // check again for race
270 : if !self.local_pool.initialized(&conn_info) {
271 : local_backend
272 : .compute_ctl
273 : .install_extension(&ExtensionInstallRequest {
274 : extension: EXT_NAME,
275 : database: conn_info.dbname.clone(),
276 : version: EXT_VERSION,
277 : })
278 : .await?;
279 :
280 : local_backend
281 : .compute_ctl
282 : .grant_role(&SetRoleGrantsRequest {
283 : schema: EXT_SCHEMA,
284 : privileges: vec![Privilege::Usage],
285 : database: conn_info.dbname.clone(),
286 : role: conn_info.user_info.user.clone(),
287 : })
288 : .await?;
289 :
290 : self.local_pool.set_initialized(&conn_info);
291 : }
292 : }
293 :
294 : let conn_id = uuid::Uuid::new_v4();
295 : tracing::Span::current().record("conn_id", display(conn_id));
296 : info!(%conn_id, "local_pool: opening a new connection '{conn_info}'");
297 :
298 : let mut node_info = local_backend.node_info.clone();
299 :
300 : let (key, jwk) = create_random_jwk();
301 :
302 : let config = node_info
303 : .config
304 : .user(&conn_info.user_info.user)
305 : .dbname(&conn_info.dbname)
306 : .options(&format!(
307 : "-c pg_session_jwt.jwk={}",
308 : serde_json::to_string(&jwk).expect("serializing jwk to json should not fail")
309 : ));
310 :
311 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
312 : let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
313 : drop(pause);
314 :
315 : let pid = client.get_process_id();
316 : tracing::Span::current().record("pid", pid);
317 :
318 : let mut handle = local_conn_pool::poll_client(
319 : self.local_pool.clone(),
320 : ctx,
321 : conn_info,
322 : client,
323 : connection,
324 : key,
325 : conn_id,
326 : node_info.aux.clone(),
327 : );
328 :
329 : {
330 : let (client, mut discard) = handle.inner();
331 : debug!("setting up backend session state");
332 :
333 : // initiates the auth session
334 : if let Err(e) = client.query("select auth.init()", &[]).await {
335 : discard.discard();
336 : return Err(e.into());
337 : }
338 :
339 : info!("backend session state initialized");
340 : }
341 :
342 : Ok(handle)
343 : }
344 : }
345 :
346 0 : fn create_random_jwk() -> (SigningKey, JwkEcKey) {
347 0 : let key = SigningKey::random(&mut OsRng);
348 0 : let jwk = p256::PublicKey::from(key.verifying_key()).to_jwk();
349 0 : (key, jwk)
350 0 : }
351 :
352 0 : #[derive(Debug, thiserror::Error)]
353 : pub(crate) enum HttpConnError {
354 : #[error("pooled connection closed at inconsistent state")]
355 : ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
356 : #[error("could not connection to postgres in compute")]
357 : PostgresConnectionError(#[from] tokio_postgres::Error),
358 : #[error("could not connection to local-proxy in compute")]
359 : LocalProxyConnectionError(#[from] LocalProxyConnError),
360 : #[error("could not parse JWT payload")]
361 : JwtPayloadError(serde_json::Error),
362 :
363 : #[error("could not install extension: {0}")]
364 : ComputeCtl(#[from] ComputeCtlError),
365 : #[error("could not get auth info")]
366 : GetAuthInfo(#[from] GetAuthInfoError),
367 : #[error("user not authenticated")]
368 : AuthError(#[from] AuthError),
369 : #[error("wake_compute returned error")]
370 : WakeCompute(#[from] WakeComputeError),
371 : #[error("error acquiring resource permit: {0}")]
372 : TooManyConnectionAttempts(#[from] ApiLockError),
373 : }
374 :
375 0 : #[derive(Debug, thiserror::Error)]
376 : pub(crate) enum LocalProxyConnError {
377 : #[error("error with connection to local-proxy")]
378 : Io(#[source] std::io::Error),
379 : #[error("could not establish h2 connection")]
380 : H2(#[from] hyper::Error),
381 : }
382 :
383 : impl ReportableError for HttpConnError {
384 0 : fn get_error_kind(&self) -> ErrorKind {
385 0 : match self {
386 0 : HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
387 0 : HttpConnError::PostgresConnectionError(p) => p.get_error_kind(),
388 0 : HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute,
389 0 : HttpConnError::ComputeCtl(_) => ErrorKind::Service,
390 0 : HttpConnError::JwtPayloadError(_) => ErrorKind::User,
391 0 : HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
392 0 : HttpConnError::AuthError(a) => a.get_error_kind(),
393 0 : HttpConnError::WakeCompute(w) => w.get_error_kind(),
394 0 : HttpConnError::TooManyConnectionAttempts(w) => w.get_error_kind(),
395 : }
396 0 : }
397 : }
398 :
399 : impl UserFacingError for HttpConnError {
400 0 : fn to_string_client(&self) -> String {
401 0 : match self {
402 0 : HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
403 0 : HttpConnError::PostgresConnectionError(p) => p.to_string(),
404 0 : HttpConnError::LocalProxyConnectionError(p) => p.to_string(),
405 0 : HttpConnError::ComputeCtl(_) => "could not set up the JWT authorization database extension".to_string(),
406 0 : HttpConnError::JwtPayloadError(p) => p.to_string(),
407 0 : HttpConnError::GetAuthInfo(c) => c.to_string_client(),
408 0 : HttpConnError::AuthError(c) => c.to_string_client(),
409 0 : HttpConnError::WakeCompute(c) => c.to_string_client(),
410 : HttpConnError::TooManyConnectionAttempts(_) => {
411 0 : "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
412 : }
413 : }
414 0 : }
415 : }
416 :
417 : impl CouldRetry for HttpConnError {
418 0 : fn could_retry(&self) -> bool {
419 0 : match self {
420 0 : HttpConnError::PostgresConnectionError(e) => e.could_retry(),
421 0 : HttpConnError::LocalProxyConnectionError(e) => e.could_retry(),
422 0 : HttpConnError::ComputeCtl(_) => false,
423 0 : HttpConnError::ConnectionClosedAbruptly(_) => false,
424 0 : HttpConnError::JwtPayloadError(_) => false,
425 0 : HttpConnError::GetAuthInfo(_) => false,
426 0 : HttpConnError::AuthError(_) => false,
427 0 : HttpConnError::WakeCompute(_) => false,
428 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
429 : }
430 0 : }
431 : }
432 : impl ShouldRetryWakeCompute for HttpConnError {
433 0 : fn should_retry_wake_compute(&self) -> bool {
434 0 : match self {
435 0 : HttpConnError::PostgresConnectionError(e) => e.should_retry_wake_compute(),
436 : // we never checked cache validity
437 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
438 0 : _ => true,
439 : }
440 0 : }
441 : }
442 :
443 : impl ReportableError for LocalProxyConnError {
444 0 : fn get_error_kind(&self) -> ErrorKind {
445 0 : match self {
446 0 : LocalProxyConnError::Io(_) => ErrorKind::Compute,
447 0 : LocalProxyConnError::H2(_) => ErrorKind::Compute,
448 : }
449 0 : }
450 : }
451 :
452 : impl UserFacingError for LocalProxyConnError {
453 0 : fn to_string_client(&self) -> String {
454 0 : "Could not establish HTTP connection to the database".to_string()
455 0 : }
456 : }
457 :
458 : impl CouldRetry for LocalProxyConnError {
459 0 : fn could_retry(&self) -> bool {
460 0 : match self {
461 0 : LocalProxyConnError::Io(_) => false,
462 0 : LocalProxyConnError::H2(_) => false,
463 : }
464 0 : }
465 : }
466 : impl ShouldRetryWakeCompute for LocalProxyConnError {
467 0 : fn should_retry_wake_compute(&self) -> bool {
468 0 : match self {
469 0 : LocalProxyConnError::Io(_) => false,
470 0 : LocalProxyConnError::H2(_) => false,
471 : }
472 0 : }
473 : }
474 :
475 : struct TokioMechanism {
476 : pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
477 : conn_info: ConnInfo,
478 : conn_id: uuid::Uuid,
479 :
480 : /// connect_to_compute concurrency lock
481 : locks: &'static ApiLocks<Host>,
482 : }
483 :
484 : #[async_trait]
485 : impl ConnectMechanism for TokioMechanism {
486 : type Connection = Client<tokio_postgres::Client>;
487 : type ConnectError = HttpConnError;
488 : type Error = HttpConnError;
489 :
490 0 : async fn connect_once(
491 0 : &self,
492 0 : ctx: &RequestMonitoring,
493 0 : node_info: &CachedNodeInfo,
494 0 : timeout: Duration,
495 0 : ) -> Result<Self::Connection, Self::ConnectError> {
496 0 : let host = node_info.config.get_host()?;
497 0 : let permit = self.locks.get_permit(&host).await?;
498 :
499 0 : let mut config = (*node_info.config).clone();
500 0 : let config = config
501 0 : .user(&self.conn_info.user_info.user)
502 0 : .dbname(&self.conn_info.dbname)
503 0 : .connect_timeout(timeout);
504 0 :
505 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
506 0 : let res = config.connect(tokio_postgres::NoTls).await;
507 0 : drop(pause);
508 0 : let (client, connection) = permit.release_result(res)?;
509 :
510 0 : tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
511 0 : Ok(poll_client(
512 0 : self.pool.clone(),
513 0 : ctx,
514 0 : self.conn_info.clone(),
515 0 : client,
516 0 : connection,
517 0 : self.conn_id,
518 0 : node_info.aux.clone(),
519 0 : ))
520 0 : }
521 :
522 0 : fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
523 : }
524 :
525 : struct HyperMechanism {
526 : pool: Arc<http_conn_pool::GlobalConnPool<Send>>,
527 : conn_info: ConnInfo,
528 : conn_id: uuid::Uuid,
529 :
530 : /// connect_to_compute concurrency lock
531 : locks: &'static ApiLocks<Host>,
532 : }
533 :
534 : #[async_trait]
535 : impl ConnectMechanism for HyperMechanism {
536 : type Connection = http_conn_pool::Client<Send>;
537 : type ConnectError = HttpConnError;
538 : type Error = HttpConnError;
539 :
540 0 : async fn connect_once(
541 0 : &self,
542 0 : ctx: &RequestMonitoring,
543 0 : node_info: &CachedNodeInfo,
544 0 : timeout: Duration,
545 0 : ) -> Result<Self::Connection, Self::ConnectError> {
546 0 : let host = node_info.config.get_host()?;
547 0 : let permit = self.locks.get_permit(&host).await?;
548 :
549 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
550 :
551 0 : let port = *node_info.config.get_ports().first().ok_or_else(|| {
552 0 : HttpConnError::WakeCompute(WakeComputeError::BadComputeAddress(
553 0 : "local-proxy port missing on compute address".into(),
554 0 : ))
555 0 : })?;
556 0 : let res = connect_http2(&host, port, timeout).await;
557 0 : drop(pause);
558 0 : let (client, connection) = permit.release_result(res)?;
559 :
560 0 : Ok(poll_http2_client(
561 0 : self.pool.clone(),
562 0 : ctx,
563 0 : &self.conn_info,
564 0 : client,
565 0 : connection,
566 0 : self.conn_id,
567 0 : node_info.aux.clone(),
568 0 : ))
569 0 : }
570 :
571 0 : fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
572 : }
573 :
574 0 : async fn connect_http2(
575 0 : host: &str,
576 0 : port: u16,
577 0 : timeout: Duration,
578 0 : ) -> Result<(http_conn_pool::Send, http_conn_pool::Connect), LocalProxyConnError> {
579 : // assumption: host is an ip address so this should not actually perform any requests.
580 : // todo: add that assumption as a guarantee in the control-plane API.
581 0 : let mut addrs = lookup_host((host, port))
582 0 : .await
583 0 : .map_err(LocalProxyConnError::Io)?;
584 :
585 0 : let mut last_err = None;
586 :
587 0 : let stream = loop {
588 0 : let Some(addr) = addrs.next() else {
589 0 : return Err(last_err.unwrap_or_else(|| {
590 0 : LocalProxyConnError::Io(io::Error::new(
591 0 : io::ErrorKind::InvalidInput,
592 0 : "could not resolve any addresses",
593 0 : ))
594 0 : }));
595 : };
596 :
597 0 : match tokio::time::timeout(timeout, TcpStream::connect(addr)).await {
598 0 : Ok(Ok(stream)) => {
599 0 : stream.set_nodelay(true).map_err(LocalProxyConnError::Io)?;
600 0 : break stream;
601 : }
602 0 : Ok(Err(e)) => {
603 0 : last_err = Some(LocalProxyConnError::Io(e));
604 0 : }
605 0 : Err(e) => {
606 0 : last_err = Some(LocalProxyConnError::Io(io::Error::new(
607 0 : io::ErrorKind::TimedOut,
608 0 : e,
609 0 : )));
610 0 : }
611 : };
612 : };
613 :
614 0 : let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
615 0 : .timer(TokioTimer::new())
616 0 : .keep_alive_interval(Duration::from_secs(20))
617 0 : .keep_alive_while_idle(true)
618 0 : .keep_alive_timeout(Duration::from_secs(5))
619 0 : .handshake(TokioIo::new(stream))
620 0 : .await?;
621 :
622 0 : Ok((client, connection))
623 0 : }
|