LCOV - code coverage report
Current view: top level - proxy/src/serverless - http_conn_pool.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 0.0 % 203 0
Test Date: 2025-03-12 18:28:53 Functions: 0.0 % 23 0

            Line data    Source code
       1              : use std::collections::VecDeque;
       2              : use std::sync::atomic::{self, AtomicUsize};
       3              : use std::sync::{Arc, Weak};
       4              : 
       5              : use hyper::client::conn::http2;
       6              : use hyper_util::rt::{TokioExecutor, TokioIo};
       7              : use parking_lot::RwLock;
       8              : use smol_str::ToSmolStr;
       9              : use tracing::{Instrument, debug, error, info, info_span};
      10              : 
      11              : use super::AsyncRW;
      12              : use super::backend::HttpConnError;
      13              : use super::conn_pool_lib::{
      14              :     ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry,
      15              :     EndpointConnPoolExt, GlobalConnPool,
      16              : };
      17              : use crate::context::RequestContext;
      18              : use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
      19              : use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
      20              : use crate::protocol2::ConnectionInfoExtra;
      21              : use crate::types::EndpointCacheKey;
      22              : use crate::usage_metrics::{Ids, MetricCounter, TrafficDirection, USAGE_METRICS};
      23              : 
      24              : pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
      25              : pub(crate) type Connect = http2::Connection<TokioIo<AsyncRW>, hyper::body::Incoming, TokioExecutor>;
      26              : 
      27              : #[derive(Clone)]
      28              : pub(crate) struct ClientDataHttp();
      29              : 
      30              : // Per-endpoint connection pool
      31              : // Number of open connections is limited by the `max_conns_per_endpoint`.
      32              : pub(crate) struct HttpConnPool<C: ClientInnerExt + Clone> {
      33              :     // TODO(conrad):
      34              :     // either we should open more connections depending on stream count
      35              :     // (not exposed by hyper, need our own counter)
      36              :     // or we can change this to an Option rather than a VecDeque.
      37              :     //
      38              :     // Opening more connections to the same db because we run out of streams
      39              :     // seems somewhat redundant though.
      40              :     //
      41              :     // Probably we should run a semaphore and just the single conn. TBD.
      42              :     conns: VecDeque<ConnPoolEntry<C>>,
      43              :     _guard: HttpEndpointPoolsGuard<'static>,
      44              :     global_connections_count: Arc<AtomicUsize>,
      45              : }
      46              : 
      47              : impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
      48            0 :     fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<C>> {
      49            0 :         let Self { conns, .. } = self;
      50              : 
      51              :         loop {
      52            0 :             let conn = conns.pop_front()?;
      53            0 :             if !conn.conn.inner.is_closed() {
      54            0 :                 let new_conn = ConnPoolEntry {
      55            0 :                     conn: conn.conn.clone(),
      56            0 :                     _last_access: std::time::Instant::now(),
      57            0 :                 };
      58            0 : 
      59            0 :                 conns.push_back(new_conn);
      60            0 :                 return Some(conn);
      61            0 :             }
      62              :         }
      63            0 :     }
      64              : 
      65            0 :     fn remove_conn(&mut self, conn_id: uuid::Uuid) -> bool {
      66            0 :         let Self {
      67            0 :             conns,
      68            0 :             global_connections_count,
      69            0 :             ..
      70            0 :         } = self;
      71            0 : 
      72            0 :         let old_len = conns.len();
      73            0 :         conns.retain(|entry| entry.conn.conn_id != conn_id);
      74            0 :         let new_len = conns.len();
      75            0 :         let removed = old_len - new_len;
      76            0 :         if removed > 0 {
      77            0 :             global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed);
      78            0 :             Metrics::get()
      79            0 :                 .proxy
      80            0 :                 .http_pool_opened_connections
      81            0 :                 .get_metric()
      82            0 :                 .dec_by(removed as i64);
      83            0 :         }
      84            0 :         removed > 0
      85            0 :     }
      86              : }
      87              : 
      88              : impl<C: ClientInnerExt + Clone> EndpointConnPoolExt<C> for HttpConnPool<C> {
      89            0 :     fn clear_closed(&mut self) -> usize {
      90            0 :         let Self { conns, .. } = self;
      91            0 :         let old_len = conns.len();
      92            0 :         conns.retain(|entry| !entry.conn.inner.is_closed());
      93            0 : 
      94            0 :         let new_len = conns.len();
      95            0 :         old_len - new_len
      96            0 :     }
      97              : 
      98            0 :     fn total_conns(&self) -> usize {
      99            0 :         self.conns.len()
     100            0 :     }
     101              : }
     102              : 
     103              : impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
     104            0 :     fn drop(&mut self) {
     105            0 :         if !self.conns.is_empty() {
     106            0 :             self.global_connections_count
     107            0 :                 .fetch_sub(self.conns.len(), atomic::Ordering::Relaxed);
     108            0 :             Metrics::get()
     109            0 :                 .proxy
     110            0 :                 .http_pool_opened_connections
     111            0 :                 .get_metric()
     112            0 :                 .dec_by(self.conns.len() as i64);
     113            0 :         }
     114            0 :     }
     115              : }
     116              : 
     117              : impl<C: ClientInnerExt + Clone> GlobalConnPool<C, HttpConnPool<C>> {
     118              :     #[expect(unused_results)]
     119            0 :     pub(crate) fn get(
     120            0 :         self: &Arc<Self>,
     121            0 :         ctx: &RequestContext,
     122            0 :         conn_info: &ConnInfo,
     123            0 :     ) -> Result<Option<Client<C>>, HttpConnError> {
     124              :         let result: Result<Option<Client<C>>, HttpConnError>;
     125            0 :         let Some(endpoint) = conn_info.endpoint_cache_key() else {
     126            0 :             result = Ok(None);
     127            0 :             return result;
     128              :         };
     129            0 :         let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
     130            0 :         let Some(client) = endpoint_pool.write().get_conn_entry() else {
     131            0 :             result = Ok(None);
     132            0 :             return result;
     133              :         };
     134              : 
     135            0 :         tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id));
     136            0 :         debug!(
     137            0 :             cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
     138            0 :             "pool: reusing connection '{conn_info}'"
     139              :         );
     140            0 :         ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
     141            0 :         ctx.success();
     142            0 : 
     143            0 :         Ok(Some(Client::new(client.conn.clone())))
     144            0 :     }
     145              : 
     146            0 :     fn get_or_create_endpoint_pool(
     147            0 :         self: &Arc<Self>,
     148            0 :         endpoint: &EndpointCacheKey,
     149            0 :     ) -> Arc<RwLock<HttpConnPool<C>>> {
     150              :         // fast path
     151            0 :         if let Some(pool) = self.global_pool.get(endpoint) {
     152            0 :             return pool.clone();
     153            0 :         }
     154            0 : 
     155            0 :         // slow path
     156            0 :         let new_pool = Arc::new(RwLock::new(HttpConnPool {
     157            0 :             conns: VecDeque::new(),
     158            0 :             _guard: Metrics::get().proxy.http_endpoint_pools.guard(),
     159            0 :             global_connections_count: self.global_connections_count.clone(),
     160            0 :         }));
     161            0 : 
     162            0 :         // find or create a pool for this endpoint
     163            0 :         let mut created = false;
     164            0 :         let pool = self
     165            0 :             .global_pool
     166            0 :             .entry(endpoint.clone())
     167            0 :             .or_insert_with(|| {
     168            0 :                 created = true;
     169            0 :                 new_pool
     170            0 :             })
     171            0 :             .clone();
     172            0 : 
     173            0 :         // log new global pool size
     174            0 :         if created {
     175            0 :             let global_pool_size = self
     176            0 :                 .global_pool_size
     177            0 :                 .fetch_add(1, atomic::Ordering::Relaxed)
     178            0 :                 + 1;
     179            0 :             info!(
     180            0 :                 "pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
     181              :             );
     182            0 :         }
     183              : 
     184            0 :         pool
     185            0 :     }
     186              : }
     187              : 
     188            0 : pub(crate) fn poll_http2_client(
     189            0 :     global_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
     190            0 :     ctx: &RequestContext,
     191            0 :     conn_info: &ConnInfo,
     192            0 :     client: Send,
     193            0 :     connection: Connect,
     194            0 :     conn_id: uuid::Uuid,
     195            0 :     aux: MetricsAuxInfo,
     196            0 : ) -> Client<Send> {
     197            0 :     let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
     198            0 :     let session_id = ctx.session_id();
     199              : 
     200            0 :     let span = info_span!(parent: None, "connection", %conn_id);
     201            0 :     let cold_start_info = ctx.cold_start_info();
     202            0 :     span.in_scope(|| {
     203            0 :         info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
     204            0 :     });
     205              : 
     206            0 :     let pool = match conn_info.endpoint_cache_key() {
     207            0 :         Some(endpoint) => {
     208            0 :             let pool = global_pool.get_or_create_endpoint_pool(&endpoint);
     209            0 :             let client = ClientInnerCommon {
     210            0 :                 inner: client.clone(),
     211            0 :                 aux: aux.clone(),
     212            0 :                 conn_id,
     213            0 :                 data: ClientDataEnum::Http(ClientDataHttp()),
     214            0 :             };
     215            0 :             pool.write().conns.push_back(ConnPoolEntry {
     216            0 :                 conn: client,
     217            0 :                 _last_access: std::time::Instant::now(),
     218            0 :             });
     219            0 :             Metrics::get()
     220            0 :                 .proxy
     221            0 :                 .http_pool_opened_connections
     222            0 :                 .get_metric()
     223            0 :                 .inc();
     224            0 : 
     225            0 :             Arc::downgrade(&pool)
     226              :         }
     227            0 :         None => Weak::new(),
     228              :     };
     229              : 
     230            0 :     tokio::spawn(
     231            0 :         async move {
     232            0 :             let _conn_gauge = conn_gauge;
     233            0 :             let res = connection.await;
     234            0 :             match res {
     235            0 :                 Ok(()) => info!("connection closed"),
     236            0 :                 Err(e) => error!(%session_id, "connection error: {e:?}"),
     237              :             }
     238              : 
     239              :             // remove from connection pool
     240            0 :             if let Some(pool) = pool.clone().upgrade() {
     241            0 :                 if pool.write().remove_conn(conn_id) {
     242            0 :                     info!("closed connection removed");
     243            0 :                 }
     244            0 :             }
     245            0 :         }
     246            0 :         .instrument(span),
     247            0 :     );
     248            0 : 
     249            0 :     let client = ClientInnerCommon {
     250            0 :         inner: client,
     251            0 :         aux,
     252            0 :         conn_id,
     253            0 :         data: ClientDataEnum::Http(ClientDataHttp()),
     254            0 :     };
     255            0 : 
     256            0 :     Client::new(client)
     257            0 : }
     258              : 
     259              : pub(crate) struct Client<C: ClientInnerExt + Clone> {
     260              :     pub(crate) inner: ClientInnerCommon<C>,
     261              : }
     262              : 
     263              : impl<C: ClientInnerExt + Clone> Client<C> {
     264            0 :     pub(self) fn new(inner: ClientInnerCommon<C>) -> Self {
     265            0 :         Self { inner }
     266            0 :     }
     267              : 
     268            0 :     pub(crate) fn metrics(
     269            0 :         &self,
     270            0 :         direction: TrafficDirection,
     271            0 :         ctx: &RequestContext,
     272            0 :     ) -> Arc<MetricCounter> {
     273            0 :         let aux = &self.inner.aux;
     274              : 
     275            0 :         let private_link_id = match ctx.extra() {
     276            0 :             None => None,
     277            0 :             Some(ConnectionInfoExtra::Aws { vpce_id }) => Some(vpce_id.clone()),
     278            0 :             Some(ConnectionInfoExtra::Azure { link_id }) => Some(link_id.to_smolstr()),
     279              :         };
     280              : 
     281            0 :         USAGE_METRICS.register(Ids {
     282            0 :             endpoint_id: aux.endpoint_id,
     283            0 :             branch_id: aux.branch_id,
     284            0 :             direction,
     285            0 :             private_link_id,
     286            0 :         })
     287            0 :     }
     288              : }
     289              : 
     290              : impl ClientInnerExt for Send {
     291            0 :     fn is_closed(&self) -> bool {
     292            0 :         self.is_closed()
     293            0 :     }
     294              : 
     295            0 :     fn get_process_id(&self) -> i32 {
     296            0 :         // ideally throw something meaningful
     297            0 :         -1
     298            0 :     }
     299              : }
        

Generated by: LCOV version 2.1-beta