Line data Source code
1 : use std::collections::VecDeque;
2 : use std::sync::atomic::{self, AtomicUsize};
3 : use std::sync::{Arc, Weak};
4 :
5 : use dashmap::DashMap;
6 : use hyper::client::conn::http2;
7 : use hyper_util::rt::{TokioExecutor, TokioIo};
8 : use parking_lot::RwLock;
9 : use rand::Rng;
10 : use tokio::net::TcpStream;
11 : use tracing::{debug, error, info, info_span, Instrument};
12 :
13 : use super::conn_pool_lib::{ClientInnerExt, ConnInfo};
14 : use crate::context::RequestMonitoring;
15 : use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
16 : use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
17 : use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
18 : use crate::EndpointCacheKey;
19 :
20 : pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
21 : pub(crate) type Connect =
22 : http2::Connection<TokioIo<TcpStream>, hyper::body::Incoming, TokioExecutor>;
23 :
24 : #[derive(Clone)]
25 : pub(crate) struct ConnPoolEntry<C: ClientInnerExt + Clone> {
26 : conn: C,
27 : conn_id: uuid::Uuid,
28 : aux: MetricsAuxInfo,
29 : }
30 :
31 : // Per-endpoint connection pool
32 : // Number of open connections is limited by the `max_conns_per_endpoint`.
33 : pub(crate) struct EndpointConnPool<C: ClientInnerExt + Clone> {
34 : // TODO(conrad):
35 : // either we should open more connections depending on stream count
36 : // (not exposed by hyper, need our own counter)
37 : // or we can change this to an Option rather than a VecDeque.
38 : //
39 : // Opening more connections to the same db because we run out of streams
40 : // seems somewhat redundant though.
41 : //
42 : // Probably we should run a semaphore and just the single conn. TBD.
43 : conns: VecDeque<ConnPoolEntry<C>>,
44 : _guard: HttpEndpointPoolsGuard<'static>,
45 : global_connections_count: Arc<AtomicUsize>,
46 : }
47 :
48 : impl<C: ClientInnerExt + Clone> EndpointConnPool<C> {
49 0 : fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<C>> {
50 0 : let Self { conns, .. } = self;
51 :
52 : loop {
53 0 : let conn = conns.pop_front()?;
54 0 : if !conn.conn.is_closed() {
55 0 : conns.push_back(conn.clone());
56 0 : return Some(conn);
57 0 : }
58 : }
59 0 : }
60 :
61 0 : fn remove_conn(&mut self, conn_id: uuid::Uuid) -> bool {
62 0 : let Self {
63 0 : conns,
64 0 : global_connections_count,
65 0 : ..
66 0 : } = self;
67 0 :
68 0 : let old_len = conns.len();
69 0 : conns.retain(|conn| conn.conn_id != conn_id);
70 0 : let new_len = conns.len();
71 0 : let removed = old_len - new_len;
72 0 : if removed > 0 {
73 0 : global_connections_count.fetch_sub(removed, atomic::Ordering::Relaxed);
74 0 : Metrics::get()
75 0 : .proxy
76 0 : .http_pool_opened_connections
77 0 : .get_metric()
78 0 : .dec_by(removed as i64);
79 0 : }
80 0 : removed > 0
81 0 : }
82 : }
83 :
84 : impl<C: ClientInnerExt + Clone> Drop for EndpointConnPool<C> {
85 0 : fn drop(&mut self) {
86 0 : if !self.conns.is_empty() {
87 0 : self.global_connections_count
88 0 : .fetch_sub(self.conns.len(), atomic::Ordering::Relaxed);
89 0 : Metrics::get()
90 0 : .proxy
91 0 : .http_pool_opened_connections
92 0 : .get_metric()
93 0 : .dec_by(self.conns.len() as i64);
94 0 : }
95 0 : }
96 : }
97 :
98 : pub(crate) struct GlobalConnPool<C: ClientInnerExt + Clone> {
99 : // endpoint -> per-endpoint connection pool
100 : //
101 : // That should be a fairly conteded map, so return reference to the per-endpoint
102 : // pool as early as possible and release the lock.
103 : global_pool: DashMap<EndpointCacheKey, Arc<RwLock<EndpointConnPool<C>>>>,
104 :
105 : /// Number of endpoint-connection pools
106 : ///
107 : /// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
108 : /// That seems like far too much effort, so we're using a relaxed increment counter instead.
109 : /// It's only used for diagnostics.
110 : global_pool_size: AtomicUsize,
111 :
112 : /// Total number of connections in the pool
113 : global_connections_count: Arc<AtomicUsize>,
114 :
115 : config: &'static crate::config::HttpConfig,
116 : }
117 :
118 : impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
119 0 : pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
120 0 : let shards = config.pool_options.pool_shards;
121 0 : Arc::new(Self {
122 0 : global_pool: DashMap::with_shard_amount(shards),
123 0 : global_pool_size: AtomicUsize::new(0),
124 0 : config,
125 0 : global_connections_count: Arc::new(AtomicUsize::new(0)),
126 0 : })
127 0 : }
128 :
129 0 : pub(crate) fn shutdown(&self) {
130 0 : // drops all strong references to endpoint-pools
131 0 : self.global_pool.clear();
132 0 : }
133 :
134 0 : pub(crate) async fn gc_worker(&self, mut rng: impl Rng) {
135 0 : let epoch = self.config.pool_options.gc_epoch;
136 0 : let mut interval = tokio::time::interval(epoch / (self.global_pool.shards().len()) as u32);
137 : loop {
138 0 : interval.tick().await;
139 :
140 0 : let shard = rng.gen_range(0..self.global_pool.shards().len());
141 0 : self.gc(shard);
142 : }
143 : }
144 :
145 0 : fn gc(&self, shard: usize) {
146 0 : debug!(shard, "pool: performing epoch reclamation");
147 :
148 : // acquire a random shard lock
149 0 : let mut shard = self.global_pool.shards()[shard].write();
150 0 :
151 0 : let timer = Metrics::get()
152 0 : .proxy
153 0 : .http_pool_reclaimation_lag_seconds
154 0 : .start_timer();
155 0 : let current_len = shard.len();
156 0 : let mut clients_removed = 0;
157 0 : shard.retain(|endpoint, x| {
158 : // if the current endpoint pool is unique (no other strong or weak references)
159 : // then it is currently not in use by any connections.
160 0 : if let Some(pool) = Arc::get_mut(x.get_mut()) {
161 0 : let EndpointConnPool { conns, .. } = pool.get_mut();
162 0 :
163 0 : let old_len = conns.len();
164 0 :
165 0 : conns.retain(|conn| !conn.conn.is_closed());
166 0 :
167 0 : let new_len = conns.len();
168 0 : let removed = old_len - new_len;
169 0 : clients_removed += removed;
170 0 :
171 0 : // we only remove this pool if it has no active connections
172 0 : if conns.is_empty() {
173 0 : info!("pool: discarding pool for endpoint {endpoint}");
174 0 : return false;
175 0 : }
176 0 : }
177 :
178 0 : true
179 0 : });
180 0 :
181 0 : let new_len = shard.len();
182 0 : drop(shard);
183 0 : timer.observe();
184 0 :
185 0 : // Do logging outside of the lock.
186 0 : if clients_removed > 0 {
187 0 : let size = self
188 0 : .global_connections_count
189 0 : .fetch_sub(clients_removed, atomic::Ordering::Relaxed)
190 0 : - clients_removed;
191 0 : Metrics::get()
192 0 : .proxy
193 0 : .http_pool_opened_connections
194 0 : .get_metric()
195 0 : .dec_by(clients_removed as i64);
196 0 : info!("pool: performed global pool gc. removed {clients_removed} clients, total number of clients in pool is {size}");
197 0 : }
198 0 : let removed = current_len - new_len;
199 0 :
200 0 : if removed > 0 {
201 0 : let global_pool_size = self
202 0 : .global_pool_size
203 0 : .fetch_sub(removed, atomic::Ordering::Relaxed)
204 0 : - removed;
205 0 : info!("pool: performed global pool gc. size now {global_pool_size}");
206 0 : }
207 0 : }
208 :
209 0 : pub(crate) fn get(
210 0 : self: &Arc<Self>,
211 0 : ctx: &RequestMonitoring,
212 0 : conn_info: &ConnInfo,
213 0 : ) -> Option<Client<C>> {
214 0 : let endpoint = conn_info.endpoint_cache_key()?;
215 0 : let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
216 0 : let client = endpoint_pool.write().get_conn_entry()?;
217 :
218 0 : tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
219 0 : info!(
220 0 : cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
221 0 : "pool: reusing connection '{conn_info}'"
222 : );
223 0 : ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
224 0 : ctx.success();
225 0 : Some(Client::new(client.conn, client.aux))
226 0 : }
227 :
228 0 : fn get_or_create_endpoint_pool(
229 0 : self: &Arc<Self>,
230 0 : endpoint: &EndpointCacheKey,
231 0 : ) -> Arc<RwLock<EndpointConnPool<C>>> {
232 : // fast path
233 0 : if let Some(pool) = self.global_pool.get(endpoint) {
234 0 : return pool.clone();
235 0 : }
236 0 :
237 0 : // slow path
238 0 : let new_pool = Arc::new(RwLock::new(EndpointConnPool {
239 0 : conns: VecDeque::new(),
240 0 : _guard: Metrics::get().proxy.http_endpoint_pools.guard(),
241 0 : global_connections_count: self.global_connections_count.clone(),
242 0 : }));
243 0 :
244 0 : // find or create a pool for this endpoint
245 0 : let mut created = false;
246 0 : let pool = self
247 0 : .global_pool
248 0 : .entry(endpoint.clone())
249 0 : .or_insert_with(|| {
250 0 : created = true;
251 0 : new_pool
252 0 : })
253 0 : .clone();
254 0 :
255 0 : // log new global pool size
256 0 : if created {
257 0 : let global_pool_size = self
258 0 : .global_pool_size
259 0 : .fetch_add(1, atomic::Ordering::Relaxed)
260 0 : + 1;
261 0 : info!(
262 0 : "pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
263 : );
264 0 : }
265 :
266 0 : pool
267 0 : }
268 : }
269 :
270 0 : pub(crate) fn poll_http2_client(
271 0 : global_pool: Arc<GlobalConnPool<Send>>,
272 0 : ctx: &RequestMonitoring,
273 0 : conn_info: &ConnInfo,
274 0 : client: Send,
275 0 : connection: Connect,
276 0 : conn_id: uuid::Uuid,
277 0 : aux: MetricsAuxInfo,
278 0 : ) -> Client<Send> {
279 0 : let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
280 0 : let session_id = ctx.session_id();
281 :
282 0 : let span = info_span!(parent: None, "connection", %conn_id);
283 0 : let cold_start_info = ctx.cold_start_info();
284 0 : span.in_scope(|| {
285 0 : info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
286 0 : });
287 :
288 0 : let pool = match conn_info.endpoint_cache_key() {
289 0 : Some(endpoint) => {
290 0 : let pool = global_pool.get_or_create_endpoint_pool(&endpoint);
291 0 :
292 0 : pool.write().conns.push_back(ConnPoolEntry {
293 0 : conn: client.clone(),
294 0 : conn_id,
295 0 : aux: aux.clone(),
296 0 : });
297 0 :
298 0 : Arc::downgrade(&pool)
299 : }
300 0 : None => Weak::new(),
301 : };
302 :
303 0 : tokio::spawn(
304 0 : async move {
305 0 : let _conn_gauge = conn_gauge;
306 0 : let res = connection.await;
307 0 : match res {
308 0 : Ok(()) => info!("connection closed"),
309 0 : Err(e) => error!(%session_id, "connection error: {}", e),
310 : }
311 :
312 : // remove from connection pool
313 0 : if let Some(pool) = pool.clone().upgrade() {
314 0 : if pool.write().remove_conn(conn_id) {
315 0 : info!("closed connection removed");
316 0 : }
317 0 : }
318 0 : }
319 0 : .instrument(span),
320 0 : );
321 0 :
322 0 : Client::new(client, aux)
323 0 : }
324 :
325 : pub(crate) struct Client<C: ClientInnerExt + Clone> {
326 : pub(crate) inner: C,
327 : aux: MetricsAuxInfo,
328 : }
329 :
330 : impl<C: ClientInnerExt + Clone> Client<C> {
331 0 : pub(self) fn new(inner: C, aux: MetricsAuxInfo) -> Self {
332 0 : Self { inner, aux }
333 0 : }
334 :
335 0 : pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
336 0 : USAGE_METRICS.register(Ids {
337 0 : endpoint_id: self.aux.endpoint_id,
338 0 : branch_id: self.aux.branch_id,
339 0 : })
340 0 : }
341 : }
342 :
343 : impl ClientInnerExt for Send {
344 0 : fn is_closed(&self) -> bool {
345 0 : self.is_closed()
346 0 : }
347 :
348 0 : fn get_process_id(&self) -> i32 {
349 0 : // ideally throw something meaningful
350 0 : -1
351 0 : }
352 : }
|