LCOV - code coverage report
Current view: top level - proxy/src/serverless - http_conn_pool.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 0.0 % 232 0
Test Date: 2024-10-22 22:13:45 Functions: 0.0 % 31 0

            Line data    Source code
       1              : use std::collections::VecDeque;
       2              : use std::sync::atomic::{self, AtomicUsize};
       3              : use std::sync::{Arc, Weak};
       4              : 
       5              : use dashmap::DashMap;
       6              : use hyper::client::conn::http2;
       7              : use hyper_util::rt::{TokioExecutor, TokioIo};
       8              : use parking_lot::RwLock;
       9              : use rand::Rng;
      10              : use tokio::net::TcpStream;
      11              : use tracing::{debug, error, info, info_span, Instrument};
      12              : 
      13              : use super::conn_pool_lib::{ClientInnerExt, ConnInfo};
      14              : use crate::context::RequestMonitoring;
      15              : use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
      16              : use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
      17              : use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
      18              : use crate::EndpointCacheKey;
      19              : 
      20              : pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
      21              : pub(crate) type Connect =
      22              :     http2::Connection<TokioIo<TcpStream>, hyper::body::Incoming, TokioExecutor>;
      23              : 
      24              : #[derive(Clone)]
      25              : pub(crate) struct ConnPoolEntry<C: ClientInnerExt + Clone> {
      26              :     conn: C,
      27              :     conn_id: uuid::Uuid,
      28              :     aux: MetricsAuxInfo,
      29              : }
      30              : 
      31              : // Per-endpoint connection pool
      32              : // Number of open connections is limited by the `max_conns_per_endpoint`.
      33              : pub(crate) struct EndpointConnPool<C: ClientInnerExt + Clone> {
      34              :     // TODO(conrad):
      35              :     // either we should open more connections depending on stream count
      36              :     // (not exposed by hyper, need our own counter)
      37              :     // or we can change this to an Option rather than a VecDeque.
      38              :     //
      39              :     // Opening more connections to the same db because we run out of streams
      40              :     // seems somewhat redundant though.
      41              :     //
      42              :     // Probably we should run a semaphore and just the single conn. TBD.
      43              :     conns: VecDeque<ConnPoolEntry<C>>,
      44              :     _guard: HttpEndpointPoolsGuard<'static>,
      45              :     global_connections_count: Arc<AtomicUsize>,
      46              : }
      47              : 
      48              : impl<C: ClientInnerExt + Clone> EndpointConnPool<C> {
      49            0 :     fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<C>> {
      50            0 :         let Self { conns, .. } = self;
      51              : 
      52              :         loop {
      53            0 :             let conn = conns.pop_front()?;
      54            0 :             if !conn.conn.is_closed() {
      55            0 :                 conns.push_back(conn.clone());
      56            0 :                 return Some(conn);
      57            0 :             }
      58              :         }
      59            0 :     }
      60              : 
      61            0 :     fn remove_conn(&mut self, conn_id: uuid::Uuid) -> bool {
      62            0 :         let Self {
      63            0 :             conns,
      64            0 :             global_connections_count,
      65            0 :             ..
      66            0 :         } = self;
      67            0 : 
      68            0 :         let old_len = conns.len();
      69            0 :         conns.retain(|conn| conn.conn_id != conn_id);
      70            0 :         let new_len = conns.len();
      71            0 :         let removed = old_len - new_len;
      72            0 :         if removed > 0 {
      73            0 :             global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed);
      74            0 :             Metrics::get()
      75            0 :                 .proxy
      76            0 :                 .http_pool_opened_connections
      77            0 :                 .get_metric()
      78            0 :                 .dec_by(removed as i64);
      79            0 :         }
      80            0 :         removed > 0
      81            0 :     }
      82              : }
      83              : 
      84              : impl<C: ClientInnerExt + Clone> Drop for EndpointConnPool<C> {
      85            0 :     fn drop(&mut self) {
      86            0 :         if !self.conns.is_empty() {
      87            0 :             self.global_connections_count
      88            0 :                 .fetch_sub(self.conns.len(), atomic::Ordering::Relaxed);
      89            0 :             Metrics::get()
      90            0 :                 .proxy
      91            0 :                 .http_pool_opened_connections
      92            0 :                 .get_metric()
      93            0 :                 .dec_by(self.conns.len() as i64);
      94            0 :         }
      95            0 :     }
      96              : }
      97              : 
      98              : pub(crate) struct GlobalConnPool<C: ClientInnerExt + Clone> {
      99              :     // endpoint -> per-endpoint connection pool
     100              :     //
     101              :     // That should be a fairly conteded map, so return reference to the per-endpoint
     102              :     // pool as early as possible and release the lock.
     103              :     global_pool: DashMap<EndpointCacheKey, Arc<RwLock<EndpointConnPool<C>>>>,
     104              : 
     105              :     /// Number of endpoint-connection pools
     106              :     ///
     107              :     /// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
     108              :     /// That seems like far too much effort, so we're using a relaxed increment counter instead.
     109              :     /// It's only used for diagnostics.
     110              :     global_pool_size: AtomicUsize,
     111              : 
     112              :     /// Total number of connections in the pool
     113              :     global_connections_count: Arc<AtomicUsize>,
     114              : 
     115              :     config: &'static crate::config::HttpConfig,
     116              : }
     117              : 
     118              : impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
     119            0 :     pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
     120            0 :         let shards = config.pool_options.pool_shards;
     121            0 :         Arc::new(Self {
     122            0 :             global_pool: DashMap::with_shard_amount(shards),
     123            0 :             global_pool_size: AtomicUsize::new(0),
     124            0 :             config,
     125            0 :             global_connections_count: Arc::new(AtomicUsize::new(0)),
     126            0 :         })
     127            0 :     }
     128              : 
     129            0 :     pub(crate) fn shutdown(&self) {
     130            0 :         // drops all strong references to endpoint-pools
     131            0 :         self.global_pool.clear();
     132            0 :     }
     133              : 
     134            0 :     pub(crate) async fn gc_worker(&self, mut rng: impl Rng) {
     135            0 :         let epoch = self.config.pool_options.gc_epoch;
     136            0 :         let mut interval = tokio::time::interval(epoch / (self.global_pool.shards().len()) as u32);
     137              :         loop {
     138            0 :             interval.tick().await;
     139              : 
     140            0 :             let shard = rng.gen_range(0..self.global_pool.shards().len());
     141            0 :             self.gc(shard);
     142              :         }
     143              :     }
     144              : 
     145            0 :     fn gc(&self, shard: usize) {
     146            0 :         debug!(shard, "pool: performing epoch reclamation");
     147              : 
     148              :         // acquire a random shard lock
     149            0 :         let mut shard = self.global_pool.shards()[shard].write();
     150            0 : 
     151            0 :         let timer = Metrics::get()
     152            0 :             .proxy
     153            0 :             .http_pool_reclaimation_lag_seconds
     154            0 :             .start_timer();
     155            0 :         let current_len = shard.len();
     156            0 :         let mut clients_removed = 0;
     157            0 :         shard.retain(|endpoint, x| {
     158              :             // if the current endpoint pool is unique (no other strong or weak references)
     159              :             // then it is currently not in use by any connections.
     160            0 :             if let Some(pool) = Arc::get_mut(x.get_mut()) {
     161            0 :                 let EndpointConnPool { conns, .. } = pool.get_mut();
     162            0 : 
     163            0 :                 let old_len = conns.len();
     164            0 : 
     165            0 :                 conns.retain(|conn| !conn.conn.is_closed());
     166            0 : 
     167            0 :                 let new_len = conns.len();
     168            0 :                 let removed = old_len - new_len;
     169            0 :                 clients_removed += removed;
     170            0 : 
     171            0 :                 // we only remove this pool if it has no active connections
     172            0 :                 if conns.is_empty() {
     173            0 :                     info!("pool: discarding pool for endpoint {endpoint}");
     174            0 :                     return false;
     175            0 :                 }
     176            0 :             }
     177              : 
     178            0 :             true
     179            0 :         });
     180            0 : 
     181            0 :         let new_len = shard.len();
     182            0 :         drop(shard);
     183            0 :         timer.observe();
     184            0 : 
     185            0 :         // Do logging outside of the lock.
     186            0 :         if clients_removed > 0 {
     187            0 :             let size = self
     188            0 :                 .global_connections_count
     189            0 :                 .fetch_sub(clients_removed, atomic::Ordering::Relaxed)
     190            0 :                 - clients_removed;
     191            0 :             Metrics::get()
     192            0 :                 .proxy
     193            0 :                 .http_pool_opened_connections
     194            0 :                 .get_metric()
     195            0 :                 .dec_by(clients_removed as i64);
     196            0 :             info!("pool: performed global pool gc. removed {clients_removed} clients, total number of clients in pool is {size}");
     197            0 :         }
     198            0 :         let removed = current_len - new_len;
     199            0 : 
     200            0 :         if removed > 0 {
     201            0 :             let global_pool_size = self
     202            0 :                 .global_pool_size
     203            0 :                 .fetch_sub(removed, atomic::Ordering::Relaxed)
     204            0 :                 - removed;
     205            0 :             info!("pool: performed global pool gc. size now {global_pool_size}");
     206            0 :         }
     207            0 :     }
     208              : 
     209            0 :     pub(crate) fn get(
     210            0 :         self: &Arc<Self>,
     211            0 :         ctx: &RequestMonitoring,
     212            0 :         conn_info: &ConnInfo,
     213            0 :     ) -> Option<Client<C>> {
     214            0 :         let endpoint = conn_info.endpoint_cache_key()?;
     215            0 :         let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
     216            0 :         let client = endpoint_pool.write().get_conn_entry()?;
     217              : 
     218            0 :         tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
     219            0 :         info!(
     220            0 :             cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
     221            0 :             "pool: reusing connection '{conn_info}'"
     222              :         );
     223            0 :         ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
     224            0 :         ctx.success();
     225            0 :         Some(Client::new(client.conn, client.aux))
     226            0 :     }
     227              : 
     228            0 :     fn get_or_create_endpoint_pool(
     229            0 :         self: &Arc<Self>,
     230            0 :         endpoint: &EndpointCacheKey,
     231            0 :     ) -> Arc<RwLock<EndpointConnPool<C>>> {
     232              :         // fast path
     233            0 :         if let Some(pool) = self.global_pool.get(endpoint) {
     234            0 :             return pool.clone();
     235            0 :         }
     236            0 : 
     237            0 :         // slow path
     238            0 :         let new_pool = Arc::new(RwLock::new(EndpointConnPool {
     239            0 :             conns: VecDeque::new(),
     240            0 :             _guard: Metrics::get().proxy.http_endpoint_pools.guard(),
     241            0 :             global_connections_count: self.global_connections_count.clone(),
     242            0 :         }));
     243            0 : 
     244            0 :         // find or create a pool for this endpoint
     245            0 :         let mut created = false;
     246            0 :         let pool = self
     247            0 :             .global_pool
     248            0 :             .entry(endpoint.clone())
     249            0 :             .or_insert_with(|| {
     250            0 :                 created = true;
     251            0 :                 new_pool
     252            0 :             })
     253            0 :             .clone();
     254            0 : 
     255            0 :         // log new global pool size
     256            0 :         if created {
     257            0 :             let global_pool_size = self
     258            0 :                 .global_pool_size
     259            0 :                 .fetch_add(1, atomic::Ordering::Relaxed)
     260            0 :                 + 1;
     261            0 :             info!(
     262            0 :                 "pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
     263              :             );
     264            0 :         }
     265              : 
     266            0 :         pool
     267            0 :     }
     268              : }
     269              : 
     270            0 : pub(crate) fn poll_http2_client(
     271            0 :     global_pool: Arc<GlobalConnPool<Send>>,
     272            0 :     ctx: &RequestMonitoring,
     273            0 :     conn_info: &ConnInfo,
     274            0 :     client: Send,
     275            0 :     connection: Connect,
     276            0 :     conn_id: uuid::Uuid,
     277            0 :     aux: MetricsAuxInfo,
     278            0 : ) -> Client<Send> {
     279            0 :     let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
     280            0 :     let session_id = ctx.session_id();
     281              : 
     282            0 :     let span = info_span!(parent: None, "connection", %conn_id);
     283            0 :     let cold_start_info = ctx.cold_start_info();
     284            0 :     span.in_scope(|| {
     285            0 :         info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
     286            0 :     });
     287              : 
     288            0 :     let pool = match conn_info.endpoint_cache_key() {
     289            0 :         Some(endpoint) => {
     290            0 :             let pool = global_pool.get_or_create_endpoint_pool(&endpoint);
     291            0 : 
     292            0 :             pool.write().conns.push_back(ConnPoolEntry {
     293            0 :                 conn: client.clone(),
     294            0 :                 conn_id,
     295            0 :                 aux: aux.clone(),
     296            0 :             });
     297            0 : 
     298            0 :             Arc::downgrade(&pool)
     299              :         }
     300            0 :         None => Weak::new(),
     301              :     };
     302              : 
     303            0 :     tokio::spawn(
     304            0 :         async move {
     305            0 :             let _conn_gauge = conn_gauge;
     306            0 :             let res = connection.await;
     307            0 :             match res {
     308            0 :                 Ok(()) => info!("connection closed"),
     309            0 :                 Err(e) => error!(%session_id, "connection error: {}", e),
     310              :             }
     311              : 
     312              :             // remove from connection pool
     313            0 :             if let Some(pool) = pool.clone().upgrade() {
     314            0 :                 if pool.write().remove_conn(conn_id) {
     315            0 :                     info!("closed connection removed");
     316            0 :                 }
     317            0 :             }
     318            0 :         }
     319            0 :         .instrument(span),
     320            0 :     );
     321            0 : 
     322            0 :     Client::new(client, aux)
     323            0 : }
     324              : 
     325              : pub(crate) struct Client<C: ClientInnerExt + Clone> {
     326              :     pub(crate) inner: C,
     327              :     aux: MetricsAuxInfo,
     328              : }
     329              : 
     330              : impl<C: ClientInnerExt + Clone> Client<C> {
     331            0 :     pub(self) fn new(inner: C, aux: MetricsAuxInfo) -> Self {
     332            0 :         Self { inner, aux }
     333            0 :     }
     334              : 
     335            0 :     pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
     336            0 :         USAGE_METRICS.register(Ids {
     337            0 :             endpoint_id: self.aux.endpoint_id,
     338            0 :             branch_id: self.aux.branch_id,
     339            0 :         })
     340            0 :     }
     341              : }
     342              : 
     343              : impl ClientInnerExt for Send {
     344            0 :     fn is_closed(&self) -> bool {
     345            0 :         self.is_closed()
     346            0 :     }
     347              : 
     348            0 :     fn get_process_id(&self) -> i32 {
     349            0 :         // ideally throw something meaningful
     350            0 :         -1
     351            0 :     }
     352              : }
        

Generated by: LCOV version 2.1-beta