LCOV - code coverage report
Current view: top level - proxy/src/serverless - local_conn_pool.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 15.6 % 422 66
Test Date: 2024-10-22 22:13:45 Functions: 8.0 % 50 4

            Line data    Source code
       1              : //! Manages the pool of connections between local_proxy and postgres.
       2              : //!
       3              : //! The pool is keyed by database and role_name, and can contain multiple connections
       4              : //! shared between users.
       5              : //!
       6              : //! The pool manages the pg_session_jwt extension used for authorizing
       7              : //! requests in the db.
       8              : //!
       9              : //! The first time a db/role pair is seen, local_proxy attempts to install the extension
      10              : //! and grant usage to the role on the given schema.
      11              : 
      12              : use std::collections::HashMap;
      13              : use std::pin::pin;
      14              : use std::sync::{Arc, Weak};
      15              : use std::task::{ready, Poll};
      16              : use std::time::Duration;
      17              : 
      18              : use futures::future::poll_fn;
      19              : use futures::Future;
      20              : use indexmap::IndexMap;
      21              : use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding};
      22              : use p256::ecdsa::{Signature, SigningKey};
      23              : use parking_lot::RwLock;
      24              : use serde_json::value::RawValue;
      25              : use signature::Signer;
      26              : use tokio::time::Instant;
      27              : use tokio_postgres::tls::NoTlsStream;
      28              : use tokio_postgres::types::ToSql;
      29              : use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket};
      30              : use tokio_util::sync::CancellationToken;
      31              : use tracing::{error, info, info_span, warn, Instrument, Span};
      32              : 
      33              : use super::backend::HttpConnError;
      34              : use super::conn_pool_lib::{ClientInnerExt, ConnInfo};
      35              : use crate::context::RequestMonitoring;
      36              : use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
      37              : use crate::metrics::Metrics;
      38              : use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
      39              : use crate::{DbName, RoleName};
      40              : 
      41              : pub(crate) const EXT_NAME: &str = "pg_session_jwt";
      42              : pub(crate) const EXT_VERSION: &str = "0.1.2";
      43              : pub(crate) const EXT_SCHEMA: &str = "auth";
      44              : 
      45              : struct ConnPoolEntry<C: ClientInnerExt> {
      46              :     conn: ClientInner<C>,
      47              :     _last_access: std::time::Instant,
      48              : }
      49              : 
      50              : // Per-endpoint connection pool, (dbname, username) -> DbUserConnPool
      51              : // Number of open connections is limited by the `max_conns_per_endpoint`.
      52              : pub(crate) struct EndpointConnPool<C: ClientInnerExt> {
      53              :     pools: HashMap<(DbName, RoleName), DbUserConnPool<C>>,
      54              :     total_conns: usize,
      55              :     max_conns: usize,
      56              :     global_pool_size_max_conns: usize,
      57              : }
      58              : 
      59              : impl<C: ClientInnerExt> EndpointConnPool<C> {
      60            0 :     fn get_conn_entry(&mut self, db_user: (DbName, RoleName)) -> Option<ConnPoolEntry<C>> {
      61            0 :         let Self {
      62            0 :             pools, total_conns, ..
      63            0 :         } = self;
      64            0 :         pools
      65            0 :             .get_mut(&db_user)
      66            0 :             .and_then(|pool_entries| pool_entries.get_conn_entry(total_conns))
      67            0 :     }
      68              : 
      69            0 :     fn remove_client(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool {
      70            0 :         let Self {
      71            0 :             pools, total_conns, ..
      72            0 :         } = self;
      73            0 :         if let Some(pool) = pools.get_mut(&db_user) {
      74            0 :             let old_len = pool.conns.len();
      75            0 :             pool.conns.retain(|conn| conn.conn.conn_id != conn_id);
      76            0 :             let new_len = pool.conns.len();
      77            0 :             let removed = old_len - new_len;
      78            0 :             if removed > 0 {
      79            0 :                 Metrics::get()
      80            0 :                     .proxy
      81            0 :                     .http_pool_opened_connections
      82            0 :                     .get_metric()
      83            0 :                     .dec_by(removed as i64);
      84            0 :             }
      85            0 :             *total_conns -= removed;
      86            0 :             removed > 0
      87              :         } else {
      88            0 :             false
      89              :         }
      90            0 :     }
      91              : 
      92            0 :     fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInner<C>) {
      93            0 :         let conn_id = client.conn_id;
      94            0 : 
      95            0 :         if client.is_closed() {
      96            0 :             info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because connection is closed");
      97            0 :             return;
      98            0 :         }
      99            0 :         let global_max_conn = pool.read().global_pool_size_max_conns;
     100            0 :         if pool.read().total_conns >= global_max_conn {
     101            0 :             info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because pool is full");
     102            0 :             return;
     103            0 :         }
     104            0 : 
     105            0 :         // return connection to the pool
     106            0 :         let mut returned = false;
     107            0 :         let mut per_db_size = 0;
     108            0 :         let total_conns = {
     109            0 :             let mut pool = pool.write();
     110            0 : 
     111            0 :             if pool.total_conns < pool.max_conns {
     112            0 :                 let pool_entries = pool.pools.entry(conn_info.db_and_user()).or_default();
     113            0 :                 pool_entries.conns.push(ConnPoolEntry {
     114            0 :                     conn: client,
     115            0 :                     _last_access: std::time::Instant::now(),
     116            0 :                 });
     117            0 : 
     118            0 :                 returned = true;
     119            0 :                 per_db_size = pool_entries.conns.len();
     120            0 : 
     121            0 :                 pool.total_conns += 1;
     122            0 :                 Metrics::get()
     123            0 :                     .proxy
     124            0 :                     .http_pool_opened_connections
     125            0 :                     .get_metric()
     126            0 :                     .inc();
     127            0 :             }
     128              : 
     129            0 :             pool.total_conns
     130            0 :         };
     131            0 : 
     132            0 :         // do logging outside of the mutex
     133            0 :         if returned {
     134            0 :             info!(%conn_id, "local_pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
     135              :         } else {
     136            0 :             info!(%conn_id, "local_pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
     137              :         }
     138            0 :     }
     139              : }
     140              : 
     141              : impl<C: ClientInnerExt> Drop for EndpointConnPool<C> {
     142            0 :     fn drop(&mut self) {
     143            0 :         if self.total_conns > 0 {
     144            0 :             Metrics::get()
     145            0 :                 .proxy
     146            0 :                 .http_pool_opened_connections
     147            0 :                 .get_metric()
     148            0 :                 .dec_by(self.total_conns as i64);
     149            0 :         }
     150            0 :     }
     151              : }
     152              : 
     153              : pub(crate) struct DbUserConnPool<C: ClientInnerExt> {
     154              :     conns: Vec<ConnPoolEntry<C>>,
     155              : 
     156              :     // true if we have definitely installed the extension and
     157              :     // granted the role access to the auth schema.
     158              :     initialized: bool,
     159              : }
     160              : 
     161              : impl<C: ClientInnerExt> Default for DbUserConnPool<C> {
     162            0 :     fn default() -> Self {
     163            0 :         Self {
     164            0 :             conns: Vec::new(),
     165            0 :             initialized: false,
     166            0 :         }
     167            0 :     }
     168              : }
     169              : 
     170              : impl<C: ClientInnerExt> DbUserConnPool<C> {
     171            0 :     fn clear_closed_clients(&mut self, conns: &mut usize) -> usize {
     172            0 :         let old_len = self.conns.len();
     173            0 : 
     174            0 :         self.conns.retain(|conn| !conn.conn.is_closed());
     175            0 : 
     176            0 :         let new_len = self.conns.len();
     177            0 :         let removed = old_len - new_len;
     178            0 :         *conns -= removed;
     179            0 :         removed
     180            0 :     }
     181              : 
     182            0 :     fn get_conn_entry(&mut self, conns: &mut usize) -> Option<ConnPoolEntry<C>> {
     183            0 :         let mut removed = self.clear_closed_clients(conns);
     184            0 :         let conn = self.conns.pop();
     185            0 :         if conn.is_some() {
     186            0 :             *conns -= 1;
     187            0 :             removed += 1;
     188            0 :         }
     189            0 :         Metrics::get()
     190            0 :             .proxy
     191            0 :             .http_pool_opened_connections
     192            0 :             .get_metric()
     193            0 :             .dec_by(removed as i64);
     194            0 :         conn
     195            0 :     }
     196              : }
     197              : 
     198              : pub(crate) struct LocalConnPool<C: ClientInnerExt> {
     199              :     global_pool: RwLock<EndpointConnPool<C>>,
     200              : 
     201              :     config: &'static crate::config::HttpConfig,
     202              : }
     203              : 
     204              : impl<C: ClientInnerExt> LocalConnPool<C> {
     205            0 :     pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
     206            0 :         Arc::new(Self {
     207            0 :             global_pool: RwLock::new(EndpointConnPool {
     208            0 :                 pools: HashMap::new(),
     209            0 :                 total_conns: 0,
     210            0 :                 max_conns: config.pool_options.max_conns_per_endpoint,
     211            0 :                 global_pool_size_max_conns: config.pool_options.max_total_conns,
     212            0 :             }),
     213            0 :             config,
     214            0 :         })
     215            0 :     }
     216              : 
     217            0 :     pub(crate) fn get_idle_timeout(&self) -> Duration {
     218            0 :         self.config.pool_options.idle_timeout
     219            0 :     }
     220              : 
     221            0 :     pub(crate) fn get(
     222            0 :         self: &Arc<Self>,
     223            0 :         ctx: &RequestMonitoring,
     224            0 :         conn_info: &ConnInfo,
     225            0 :     ) -> Result<Option<LocalClient<C>>, HttpConnError> {
     226            0 :         let client = self
     227            0 :             .global_pool
     228            0 :             .write()
     229            0 :             .get_conn_entry(conn_info.db_and_user())
     230            0 :             .map(|entry| entry.conn);
     231              : 
     232              :         // ok return cached connection if found and establish a new one otherwise
     233            0 :         if let Some(client) = client {
     234            0 :             if client.is_closed() {
     235            0 :                 info!("local_pool: cached connection '{conn_info}' is closed, opening a new one");
     236            0 :                 return Ok(None);
     237            0 :             }
     238            0 :             tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
     239            0 :             tracing::Span::current().record(
     240            0 :                 "pid",
     241            0 :                 tracing::field::display(client.inner.get_process_id()),
     242            0 :             );
     243            0 :             info!(
     244            0 :                 cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
     245            0 :                 "local_pool: reusing connection '{conn_info}'"
     246              :             );
     247            0 :             client.session.send(ctx.session_id())?;
     248            0 :             ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
     249            0 :             ctx.success();
     250            0 :             return Ok(Some(LocalClient::new(
     251            0 :                 client,
     252            0 :                 conn_info.clone(),
     253            0 :                 Arc::downgrade(self),
     254            0 :             )));
     255            0 :         }
     256            0 :         Ok(None)
     257            0 :     }
     258              : 
     259            0 :     pub(crate) fn initialized(self: &Arc<Self>, conn_info: &ConnInfo) -> bool {
     260            0 :         self.global_pool
     261            0 :             .read()
     262            0 :             .pools
     263            0 :             .get(&conn_info.db_and_user())
     264            0 :             .map_or(false, |pool| pool.initialized)
     265            0 :     }
     266              : 
     267            0 :     pub(crate) fn set_initialized(self: &Arc<Self>, conn_info: &ConnInfo) {
     268            0 :         self.global_pool
     269            0 :             .write()
     270            0 :             .pools
     271            0 :             .entry(conn_info.db_and_user())
     272            0 :             .or_default()
     273            0 :             .initialized = true;
     274            0 :     }
     275              : }
     276              : 
     277              : #[allow(clippy::too_many_arguments)]
     278            0 : pub(crate) fn poll_client(
     279            0 :     global_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
     280            0 :     ctx: &RequestMonitoring,
     281            0 :     conn_info: ConnInfo,
     282            0 :     client: tokio_postgres::Client,
     283            0 :     mut connection: tokio_postgres::Connection<Socket, NoTlsStream>,
     284            0 :     key: SigningKey,
     285            0 :     conn_id: uuid::Uuid,
     286            0 :     aux: MetricsAuxInfo,
     287            0 : ) -> LocalClient<tokio_postgres::Client> {
     288            0 :     let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
     289            0 :     let mut session_id = ctx.session_id();
     290            0 :     let (tx, mut rx) = tokio::sync::watch::channel(session_id);
     291              : 
     292            0 :     let span = info_span!(parent: None, "connection", %conn_id);
     293            0 :     let cold_start_info = ctx.cold_start_info();
     294            0 :     span.in_scope(|| {
     295            0 :         info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
     296            0 :     });
     297            0 :     let pool = Arc::downgrade(&global_pool);
     298            0 :     let pool_clone = pool.clone();
     299            0 : 
     300            0 :     let db_user = conn_info.db_and_user();
     301            0 :     let idle = global_pool.get_idle_timeout();
     302            0 :     let cancel = CancellationToken::new();
     303            0 :     let cancelled = cancel.clone().cancelled_owned();
     304            0 : 
     305            0 :     tokio::spawn(
     306            0 :     async move {
     307            0 :         let _conn_gauge = conn_gauge;
     308            0 :         let mut idle_timeout = pin!(tokio::time::sleep(idle));
     309            0 :         let mut cancelled = pin!(cancelled);
     310            0 : 
     311            0 :         poll_fn(move |cx| {
     312            0 :             if cancelled.as_mut().poll(cx).is_ready() {
     313            0 :                 info!("connection dropped");
     314            0 :                 return Poll::Ready(())
     315            0 :             }
     316            0 : 
     317            0 :             match rx.has_changed() {
     318              :                 Ok(true) => {
     319            0 :                     session_id = *rx.borrow_and_update();
     320            0 :                     info!(%session_id, "changed session");
     321            0 :                     idle_timeout.as_mut().reset(Instant::now() + idle);
     322              :                 }
     323              :                 Err(_) => {
     324            0 :                     info!("connection dropped");
     325            0 :                     return Poll::Ready(())
     326              :                 }
     327            0 :                 _ => {}
     328              :             }
     329              : 
     330              :             // 5 minute idle connection timeout
     331            0 :             if idle_timeout.as_mut().poll(cx).is_ready() {
     332            0 :                 idle_timeout.as_mut().reset(Instant::now() + idle);
     333            0 :                 info!("connection idle");
     334            0 :                 if let Some(pool) = pool.clone().upgrade() {
     335              :                     // remove client from pool - should close the connection if it's idle.
     336              :                     // does nothing if the client is currently checked-out and in-use
     337            0 :                     if pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
     338            0 :                         info!("idle connection removed");
     339            0 :                     }
     340            0 :                 }
     341            0 :             }
     342              : 
     343              :             loop {
     344            0 :                 let message = ready!(connection.poll_message(cx));
     345              : 
     346            0 :                 match message {
     347            0 :                     Some(Ok(AsyncMessage::Notice(notice))) => {
     348            0 :                         info!(%session_id, "notice: {}", notice);
     349              :                     }
     350            0 :                     Some(Ok(AsyncMessage::Notification(notif))) => {
     351            0 :                         warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
     352              :                     }
     353              :                     Some(Ok(_)) => {
     354            0 :                         warn!(%session_id, "unknown message");
     355              :                     }
     356            0 :                     Some(Err(e)) => {
     357            0 :                         error!(%session_id, "connection error: {}", e);
     358            0 :                         break
     359              :                     }
     360              :                     None => {
     361            0 :                         info!("connection closed");
     362            0 :                         break
     363              :                     }
     364              :                 }
     365              :             }
     366              : 
     367              :             // remove from connection pool
     368            0 :             if let Some(pool) = pool.clone().upgrade() {
     369            0 :                 if pool.global_pool.write().remove_client(db_user.clone(), conn_id) {
     370            0 :                     info!("closed connection removed");
     371            0 :                 }
     372            0 :             }
     373              : 
     374            0 :             Poll::Ready(())
     375            0 :         }).await;
     376              : 
     377            0 :     }
     378            0 :     .instrument(span));
     379            0 : 
     380            0 :     let inner = ClientInner {
     381            0 :         inner: client,
     382            0 :         session: tx,
     383            0 :         cancel,
     384            0 :         aux,
     385            0 :         conn_id,
     386            0 :         key,
     387            0 :         jti: 0,
     388            0 :     };
     389            0 :     LocalClient::new(inner, conn_info, pool_clone)
     390            0 : }
     391              : 
     392              : pub(crate) struct ClientInner<C: ClientInnerExt> {
     393              :     inner: C,
     394              :     session: tokio::sync::watch::Sender<uuid::Uuid>,
     395              :     cancel: CancellationToken,
     396              :     aux: MetricsAuxInfo,
     397              :     conn_id: uuid::Uuid,
     398              : 
     399              :     // needed for pg_session_jwt state
     400              :     key: SigningKey,
     401              :     jti: u64,
     402              : }
     403              : 
     404              : impl<C: ClientInnerExt> Drop for ClientInner<C> {
     405            0 :     fn drop(&mut self) {
     406            0 :         // on client drop, tell the conn to shut down
     407            0 :         self.cancel.cancel();
     408            0 :     }
     409              : }
     410              : 
     411              : impl<C: ClientInnerExt> ClientInner<C> {
     412            0 :     pub(crate) fn is_closed(&self) -> bool {
     413            0 :         self.inner.is_closed()
     414            0 :     }
     415              : }
     416              : 
     417              : impl ClientInner<tokio_postgres::Client> {
     418            0 :     pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), HttpConnError> {
     419            0 :         self.jti += 1;
     420            0 :         let token = resign_jwt(&self.key, payload, self.jti)?;
     421              : 
     422              :         // initiates the auth session
     423            0 :         self.inner.simple_query("discard all").await?;
     424            0 :         self.inner
     425            0 :             .query(
     426            0 :                 "select auth.jwt_session_init($1)",
     427            0 :                 &[&token as &(dyn ToSql + Sync)],
     428            0 :             )
     429            0 :             .await?;
     430              : 
     431            0 :         let pid = self.inner.get_process_id();
     432            0 :         info!(pid, jti = self.jti, "user session state init");
     433              : 
     434            0 :         Ok(())
     435            0 :     }
     436              : }
     437              : 
     438              : pub(crate) struct LocalClient<C: ClientInnerExt> {
     439              :     span: Span,
     440              :     inner: Option<ClientInner<C>>,
     441              :     conn_info: ConnInfo,
     442              :     pool: Weak<LocalConnPool<C>>,
     443              : }
     444              : 
     445              : pub(crate) struct Discard<'a, C: ClientInnerExt> {
     446              :     conn_info: &'a ConnInfo,
     447              :     pool: &'a mut Weak<LocalConnPool<C>>,
     448              : }
     449              : 
     450              : impl<C: ClientInnerExt> LocalClient<C> {
     451            0 :     pub(self) fn new(
     452            0 :         inner: ClientInner<C>,
     453            0 :         conn_info: ConnInfo,
     454            0 :         pool: Weak<LocalConnPool<C>>,
     455            0 :     ) -> Self {
     456            0 :         Self {
     457            0 :             inner: Some(inner),
     458            0 :             span: Span::current(),
     459            0 :             conn_info,
     460            0 :             pool,
     461            0 :         }
     462            0 :     }
     463              : 
     464            0 :     pub(crate) fn client_inner(&mut self) -> (&mut ClientInner<C>, Discard<'_, C>) {
     465            0 :         let Self {
     466            0 :             inner,
     467            0 :             pool,
     468            0 :             conn_info,
     469            0 :             span: _,
     470            0 :         } = self;
     471            0 :         let inner_m = inner.as_mut().expect("client inner should not be removed");
     472            0 :         (inner_m, Discard { conn_info, pool })
     473            0 :     }
     474              : 
     475            0 :     pub(crate) fn inner(&mut self) -> (&mut C, Discard<'_, C>) {
     476            0 :         let Self {
     477            0 :             inner,
     478            0 :             pool,
     479            0 :             conn_info,
     480            0 :             span: _,
     481            0 :         } = self;
     482            0 :         let inner = inner.as_mut().expect("client inner should not be removed");
     483            0 :         (&mut inner.inner, Discard { conn_info, pool })
     484            0 :     }
     485              : }
     486              : 
     487              : /// implements relatively efficient in-place json object key upserting
     488              : ///
     489              : /// only supports top-level keys
     490            1 : fn upsert_json_object(
     491            1 :     payload: &[u8],
     492            1 :     key: &str,
     493            1 :     value: &RawValue,
     494            1 : ) -> Result<String, serde_json::Error> {
     495            1 :     let mut payload = serde_json::from_slice::<IndexMap<&str, &RawValue>>(payload)?;
     496            1 :     payload.insert(key, value);
     497            1 :     serde_json::to_string(&payload)
     498            1 : }
     499              : 
     500            1 : fn resign_jwt(sk: &SigningKey, payload: &[u8], jti: u64) -> Result<String, HttpConnError> {
     501            1 :     let mut buffer = itoa::Buffer::new();
     502            1 : 
     503            1 :     // encode the jti integer to a json rawvalue
     504            1 :     let jti = serde_json::from_str::<&RawValue>(buffer.format(jti)).unwrap();
     505              : 
     506              :     // update the jti in-place
     507            1 :     let payload =
     508            1 :         upsert_json_object(payload, "jti", jti).map_err(HttpConnError::JwtPayloadError)?;
     509              : 
     510              :     // sign the jwt
     511            1 :     let token = sign_jwt(sk, payload.as_bytes());
     512            1 : 
     513            1 :     Ok(token)
     514            1 : }
     515              : 
     516            1 : fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String {
     517            1 :     let header_len = 20;
     518            1 :     let payload_len = Base64UrlUnpadded::encoded_len(payload);
     519            1 :     let signature_len = Base64UrlUnpadded::encoded_len(&[0; 64]);
     520            1 :     let total_len = header_len + payload_len + signature_len + 2;
     521            1 : 
     522            1 :     let mut jwt = String::with_capacity(total_len);
     523            1 :     let cap = jwt.capacity();
     524            1 : 
     525            1 :     // we only need an empty header with the alg specified.
     526            1 :     // base64url(r#"{"alg":"ES256"}"#) == "eyJhbGciOiJFUzI1NiJ9"
     527            1 :     jwt.push_str("eyJhbGciOiJFUzI1NiJ9.");
     528            1 : 
     529            1 :     // encode the jwt payload in-place
     530            1 :     base64::encode_config_buf(payload, base64::URL_SAFE_NO_PAD, &mut jwt);
     531            1 : 
     532            1 :     // create the signature from the encoded header || payload
     533            1 :     let sig: Signature = sk.sign(jwt.as_bytes());
     534            1 : 
     535            1 :     jwt.push('.');
     536            1 : 
     537            1 :     // encode the jwt signature in-place
     538            1 :     base64::encode_config_buf(sig.to_bytes(), base64::URL_SAFE_NO_PAD, &mut jwt);
     539            1 : 
     540            1 :     debug_assert_eq!(
     541            1 :         jwt.len(),
     542              :         total_len,
     543            0 :         "the jwt len should match our expected len"
     544              :     );
     545            1 :     debug_assert_eq!(jwt.capacity(), cap, "the jwt capacity should not change");
     546              : 
     547            1 :     jwt
     548            1 : }
     549              : 
     550              : impl<C: ClientInnerExt> LocalClient<C> {
     551            0 :     pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
     552            0 :         let aux = &self.inner.as_ref().unwrap().aux;
     553            0 :         USAGE_METRICS.register(Ids {
     554            0 :             endpoint_id: aux.endpoint_id,
     555            0 :             branch_id: aux.branch_id,
     556            0 :         })
     557            0 :     }
     558              : 
     559            0 :     fn do_drop(&mut self) -> Option<impl FnOnce() + use<C>> {
     560            0 :         let conn_info = self.conn_info.clone();
     561            0 :         let client = self
     562            0 :             .inner
     563            0 :             .take()
     564            0 :             .expect("client inner should not be removed");
     565            0 :         if let Some(conn_pool) = std::mem::take(&mut self.pool).upgrade() {
     566            0 :             let current_span = self.span.clone();
     567            0 :             // return connection to the pool
     568            0 :             return Some(move || {
     569            0 :                 let _span = current_span.enter();
     570            0 :                 EndpointConnPool::put(&conn_pool.global_pool, &conn_info, client);
     571            0 :             });
     572            0 :         }
     573            0 :         None
     574            0 :     }
     575              : }
     576              : 
     577              : impl<C: ClientInnerExt> Drop for LocalClient<C> {
     578            0 :     fn drop(&mut self) {
     579            0 :         if let Some(drop) = self.do_drop() {
     580            0 :             tokio::task::spawn_blocking(drop);
     581            0 :         }
     582            0 :     }
     583              : }
     584              : 
     585              : impl<C: ClientInnerExt> Discard<'_, C> {
     586            0 :     pub(crate) fn check_idle(&mut self, status: ReadyForQueryStatus) {
     587            0 :         let conn_info = &self.conn_info;
     588            0 :         if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
     589            0 :             info!(
     590            0 :                 "local_pool: throwing away connection '{conn_info}' because connection is not idle"
     591              :             );
     592            0 :         }
     593            0 :     }
     594            0 :     pub(crate) fn discard(&mut self) {
     595            0 :         let conn_info = &self.conn_info;
     596            0 :         if std::mem::take(self.pool).strong_count() > 0 {
     597            0 :             info!("local_pool: throwing away connection '{conn_info}' because connection is potentially in a broken state");
     598            0 :         }
     599            0 :     }
     600              : }
     601              : 
     602              : #[cfg(test)]
     603              : mod tests {
     604              :     use p256::ecdsa::SigningKey;
     605              :     use typed_json::json;
     606              : 
     607              :     use super::resign_jwt;
     608              : 
     609              :     #[test]
     610            1 :     fn jwt_token_snapshot() {
     611            1 :         let key = SigningKey::from_bytes(&[1; 32].into()).unwrap();
     612            1 :         let data =
     613            1 :             json!({"foo":"bar","jti":"foo\nbar","nested":{"jti":"tricky nesting"}}).to_string();
     614            1 : 
     615            1 :         let jwt = resign_jwt(&key, data.as_bytes(), 2).unwrap();
     616            1 : 
     617            1 :         // To validate the JWT, copy the JWT string and paste it into https://jwt.io/.
     618            1 :         // In the public-key box, paste the following jwk public key
     619            1 :         // `{"kty":"EC","crv":"P-256","x":"b_A7lJJBzh2t1DUZ5pYOCoW0GmmgXDKBA6orzhWUyhY","y":"PE91OlW_AdxT9sCwx-7ni0DG_30lqW4igrmJzvccFEo"}`
     620            1 : 
     621            1 :         // let pub_key = p256::ecdsa::VerifyingKey::from(&key);
     622            1 :         // let pub_key = p256::PublicKey::from(pub_key);
     623            1 :         // println!("{}", pub_key.to_jwk_string());
     624            1 : 
     625            1 :         assert_eq!(jwt, "eyJhbGciOiJFUzI1NiJ9.eyJmb28iOiJiYXIiLCJqdGkiOjIsIm5lc3RlZCI6eyJqdGkiOiJ0cmlja3kgbmVzdGluZyJ9fQ.pYf0LxoJ8sDgpmsYOgrbNecOSipnPBEGwnZzB-JhW2cONrKlqRsgXwK8_cOsyolGy-hTTe8GXbWTl_UdpF5RyA");
     626            1 :     }
     627              : }
        

Generated by: LCOV version 2.1-beta