LCOV - code coverage report
Current view: top level - proxy/src/serverless - backend.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 0.0 % 61 0
Test Date: 2024-02-29 11:57:12 Functions: 0.0 % 24 0

            Line data    Source code
       1              : use std::{sync::Arc, time::Duration};
       2              : 
       3              : use async_trait::async_trait;
       4              : use tracing::{field::display, info};
       5              : 
       6              : use crate::{
       7              :     auth::{backend::ComputeCredentials, check_peer_addr_is_in_list, AuthError},
       8              :     compute,
       9              :     config::ProxyConfig,
      10              :     console::{
      11              :         errors::{GetAuthInfoError, WakeComputeError},
      12              :         CachedNodeInfo,
      13              :     },
      14              :     context::RequestMonitoring,
      15              :     proxy::connect_compute::ConnectMechanism,
      16              : };
      17              : 
      18              : use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool};
      19              : 
      20              : pub struct PoolingBackend {
      21              :     pub pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
      22              :     pub config: &'static ProxyConfig,
      23              : }
      24              : 
      25              : impl PoolingBackend {
      26            0 :     pub async fn authenticate(
      27            0 :         &self,
      28            0 :         ctx: &mut RequestMonitoring,
      29            0 :         conn_info: &ConnInfo,
      30            0 :     ) -> Result<ComputeCredentials, AuthError> {
      31            0 :         let user_info = conn_info.user_info.clone();
      32            0 :         let backend = self.config.auth_backend.as_ref().map(|_| user_info.clone());
      33            0 :         let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
      34            0 :         if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
      35            0 :             return Err(AuthError::ip_address_not_allowed(ctx.peer_addr));
      36            0 :         }
      37            0 :         let cached_secret = match maybe_secret {
      38            0 :             Some(secret) => secret,
      39            0 :             None => backend.get_role_secret(ctx).await?,
      40              :         };
      41              : 
      42            0 :         let secret = match cached_secret.value.clone() {
      43            0 :             Some(secret) => secret,
      44              :             None => {
      45              :                 // If we don't have an authentication secret, for the http flow we can just return an error.
      46            0 :                 info!("authentication info not found");
      47            0 :                 return Err(AuthError::auth_failed(&*user_info.user));
      48              :             }
      49              :         };
      50            0 :         let auth_outcome =
      51            0 :             crate::auth::validate_password_and_exchange(&conn_info.password, secret)?;
      52            0 :         let res = match auth_outcome {
      53            0 :             crate::sasl::Outcome::Success(key) => Ok(key),
      54            0 :             crate::sasl::Outcome::Failure(reason) => {
      55            0 :                 info!("auth backend failed with an error: {reason}");
      56            0 :                 Err(AuthError::auth_failed(&*conn_info.user_info.user))
      57              :             }
      58              :         };
      59            0 :         res.map(|key| ComputeCredentials {
      60            0 :             info: user_info,
      61            0 :             keys: key,
      62            0 :         })
      63            0 :     }
      64              : 
      65              :     // Wake up the destination if needed. Code here is a bit involved because
      66              :     // we reuse the code from the usual proxy and we need to prepare few structures
      67              :     // that this code expects.
      68            0 :     #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
      69              :     pub async fn connect_to_compute(
      70              :         &self,
      71              :         ctx: &mut RequestMonitoring,
      72              :         conn_info: ConnInfo,
      73              :         keys: ComputeCredentials,
      74              :         force_new: bool,
      75              :     ) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
      76              :         let maybe_client = if !force_new {
      77            0 :             info!("pool: looking for an existing connection");
      78              :             self.pool.get(ctx, &conn_info).await?
      79              :         } else {
      80            0 :             info!("pool: pool is disabled");
      81              :             None
      82              :         };
      83              : 
      84              :         if let Some(client) = maybe_client {
      85              :             return Ok(client);
      86              :         }
      87              :         let conn_id = uuid::Uuid::new_v4();
      88              :         tracing::Span::current().record("conn_id", display(conn_id));
      89            0 :         info!(%conn_id, "pool: opening a new connection '{conn_info}'");
      90            0 :         let backend = self.config.auth_backend.as_ref().map(|_| keys);
      91              :         crate::proxy::connect_compute::connect_to_compute(
      92              :             ctx,
      93              :             &TokioMechanism {
      94              :                 conn_id,
      95              :                 conn_info,
      96              :                 pool: self.pool.clone(),
      97              :             },
      98              :             &backend,
      99              :             false, // do not allow self signed compute for http flow
     100              :         )
     101              :         .await
     102              :     }
     103              : }
     104              : 
     105            0 : #[derive(Debug, thiserror::Error)]
     106              : pub enum HttpConnError {
     107              :     #[error("pooled connection closed at inconsistent state")]
     108              :     ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
     109              :     #[error("could not connection to compute")]
     110              :     ConnectionError(#[from] tokio_postgres::Error),
     111              : 
     112              :     #[error("could not get auth info")]
     113              :     GetAuthInfo(#[from] GetAuthInfoError),
     114              :     #[error("user not authenticated")]
     115              :     AuthError(#[from] AuthError),
     116              :     #[error("wake_compute returned error")]
     117              :     WakeCompute(#[from] WakeComputeError),
     118              : }
     119              : 
     120              : struct TokioMechanism {
     121              :     pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
     122              :     conn_info: ConnInfo,
     123              :     conn_id: uuid::Uuid,
     124              : }
     125              : 
     126              : #[async_trait]
     127              : impl ConnectMechanism for TokioMechanism {
     128              :     type Connection = Client<tokio_postgres::Client>;
     129              :     type ConnectError = tokio_postgres::Error;
     130              :     type Error = HttpConnError;
     131              : 
     132            0 :     async fn connect_once(
     133            0 :         &self,
     134            0 :         ctx: &mut RequestMonitoring,
     135            0 :         node_info: &CachedNodeInfo,
     136            0 :         timeout: Duration,
     137            0 :     ) -> Result<Self::Connection, Self::ConnectError> {
     138            0 :         let mut config = (*node_info.config).clone();
     139            0 :         let config = config
     140            0 :             .user(&self.conn_info.user_info.user)
     141            0 :             .password(&*self.conn_info.password)
     142            0 :             .dbname(&self.conn_info.dbname)
     143            0 :             .connect_timeout(timeout);
     144              : 
     145            0 :         let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
     146              : 
     147            0 :         tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
     148            0 :         Ok(poll_client(
     149            0 :             self.pool.clone(),
     150            0 :             ctx,
     151            0 :             self.conn_info.clone(),
     152            0 :             client,
     153            0 :             connection,
     154            0 :             self.conn_id,
     155            0 :             node_info.aux.clone(),
     156            0 :         ))
     157            0 :     }
     158              : 
     159            0 :     fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
     160              : }
        

Generated by: LCOV version 2.1-beta