LCOV - code coverage report
Current view: top level - proxy/src/proxy - connect_compute.rs (source / functions) Coverage Total Hit
Test: 1d18b743246dcf78c27c0bad0234a4c0da6fde89.info Lines: 0.0 % 8 0
Test Date: 2025-02-14 00:11:37 Functions: 0.0 % 2 0

            Line data    Source code
       1              : use async_trait::async_trait;
       2              : use pq_proto::StartupMessageParams;
       3              : use tokio::time;
       4              : use tracing::{debug, info, warn};
       5              : 
       6              : use super::retry::ShouldRetryWakeCompute;
       7              : use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
       8              : use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT};
       9              : use crate::config::{ComputeConfig, RetryConfig};
      10              : use crate::context::RequestContext;
      11              : use crate::control_plane::errors::WakeComputeError;
      12              : use crate::control_plane::locks::ApiLocks;
      13              : use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
      14              : use crate::error::ReportableError;
      15              : use crate::metrics::{
      16              :     ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType,
      17              : };
      18              : use crate::proxy::retry::{retry_after, should_retry, CouldRetry};
      19              : use crate::proxy::wake_compute::wake_compute;
      20              : use crate::types::Host;
      21              : 
      22              : /// If we couldn't connect, a cached connection info might be to blame
      23              : /// (e.g. the compute node's address might've changed at the wrong time).
      24              : /// Invalidate the cache entry (if any) to prevent subsequent errors.
      25              : #[tracing::instrument(name = "invalidate_cache", skip_all)]
      26              : pub(crate) fn invalidate_cache(node_info: control_plane::CachedNodeInfo) -> NodeInfo {
      27              :     let is_cached = node_info.cached();
      28              :     if is_cached {
      29              :         warn!("invalidating stalled compute node info cache entry");
      30              :     }
      31              :     let label = if is_cached {
      32              :         ConnectionFailureKind::ComputeCached
      33              :     } else {
      34              :         ConnectionFailureKind::ComputeUncached
      35              :     };
      36              :     Metrics::get().proxy.connection_failures_total.inc(label);
      37              : 
      38              :     node_info.invalidate()
      39              : }
      40              : 
      41              : #[async_trait]
      42              : pub(crate) trait ConnectMechanism {
      43              :     type Connection;
      44              :     type ConnectError: ReportableError;
      45              :     type Error: From<Self::ConnectError>;
      46              :     async fn connect_once(
      47              :         &self,
      48              :         ctx: &RequestContext,
      49              :         node_info: &control_plane::CachedNodeInfo,
      50              :         config: &ComputeConfig,
      51              :     ) -> Result<Self::Connection, Self::ConnectError>;
      52              : 
      53              :     fn update_connect_config(&self, conf: &mut compute::ConnCfg);
      54              : }
      55              : 
      56              : #[async_trait]
      57              : pub(crate) trait ComputeConnectBackend {
      58              :     async fn wake_compute(
      59              :         &self,
      60              :         ctx: &RequestContext,
      61              :     ) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError>;
      62              : 
      63              :     fn get_keys(&self) -> &ComputeCredentialKeys;
      64              : }
      65              : 
      66              : pub(crate) struct TcpMechanism<'a> {
      67              :     pub(crate) params_compat: bool,
      68              : 
      69              :     /// KV-dictionary with PostgreSQL connection params.
      70              :     pub(crate) params: &'a StartupMessageParams,
      71              : 
      72              :     /// connect_to_compute concurrency lock
      73              :     pub(crate) locks: &'static ApiLocks<Host>,
      74              : 
      75              :     pub(crate) user_info: ComputeUserInfo,
      76              : }
      77              : 
      78              : #[async_trait]
      79              : impl ConnectMechanism for TcpMechanism<'_> {
      80              :     type Connection = PostgresConnection;
      81              :     type ConnectError = compute::ConnectionError;
      82              :     type Error = compute::ConnectionError;
      83              : 
      84              :     #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
      85              :     async fn connect_once(
      86              :         &self,
      87              :         ctx: &RequestContext,
      88              :         node_info: &control_plane::CachedNodeInfo,
      89              :         config: &ComputeConfig,
      90            0 :     ) -> Result<PostgresConnection, Self::Error> {
      91            0 :         let host = node_info.config.get_host();
      92            0 :         let permit = self.locks.get_permit(&host).await?;
      93            0 :         permit.release_result(node_info.connect(ctx, config, self.user_info.clone()).await)
      94            0 :     }
      95              : 
      96            0 :     fn update_connect_config(&self, config: &mut compute::ConnCfg) {
      97            0 :         config.set_startup_params(self.params, self.params_compat);
      98            0 :     }
      99              : }
     100              : 
     101              : /// Try to connect to the compute node, retrying if necessary.
     102              : #[tracing::instrument(skip_all)]
     103              : pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
     104              :     ctx: &RequestContext,
     105              :     mechanism: &M,
     106              :     user_info: &B,
     107              :     wake_compute_retry_config: RetryConfig,
     108              :     compute: &ComputeConfig,
     109              : ) -> Result<M::Connection, M::Error>
     110              : where
     111              :     M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
     112              :     M::Error: From<WakeComputeError>,
     113              : {
     114              :     let mut num_retries = 0;
     115              :     let mut node_info =
     116              :         wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
     117              : 
     118              :     node_info.set_keys(user_info.get_keys());
     119              :     mechanism.update_connect_config(&mut node_info.config);
     120              : 
     121              :     // try once
     122              :     let err = match mechanism.connect_once(ctx, &node_info, compute).await {
     123              :         Ok(res) => {
     124              :             ctx.success();
     125              :             Metrics::get().proxy.retries_metric.observe(
     126              :                 RetriesMetricGroup {
     127              :                     outcome: ConnectOutcome::Success,
     128              :                     retry_type: RetryType::ConnectToCompute,
     129              :                 },
     130              :                 num_retries.into(),
     131              :             );
     132              :             return Ok(res);
     133              :         }
     134              :         Err(e) => e,
     135              :     };
     136              : 
     137              :     debug!(error = ?err, COULD_NOT_CONNECT);
     138              : 
     139              :     let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
     140              :         // If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
     141              :         // Do not need to retrieve a new node_info, just return the old one.
     142              :         if should_retry(&err, num_retries, compute.retry) {
     143              :             Metrics::get().proxy.retries_metric.observe(
     144              :                 RetriesMetricGroup {
     145              :                     outcome: ConnectOutcome::Failed,
     146              :                     retry_type: RetryType::ConnectToCompute,
     147              :                 },
     148              :                 num_retries.into(),
     149              :             );
     150              :             return Err(err.into());
     151              :         }
     152              :         node_info
     153              :     } else {
     154              :         // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
     155              :         debug!("compute node's state has likely changed; requesting a wake-up");
     156              :         let old_node_info = invalidate_cache(node_info);
     157              :         // TODO: increment num_retries?
     158              :         let mut node_info =
     159              :             wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
     160              :         node_info.reuse_settings(old_node_info);
     161              : 
     162              :         mechanism.update_connect_config(&mut node_info.config);
     163              :         node_info
     164              :     };
     165              : 
     166              :     // now that we have a new node, try connect to it repeatedly.
     167              :     // this can error for a few reasons, for instance:
     168              :     // * DNS connection settings haven't quite propagated yet
     169              :     debug!("wake_compute success. attempting to connect");
     170              :     num_retries = 1;
     171              :     loop {
     172              :         match mechanism.connect_once(ctx, &node_info, compute).await {
     173              :             Ok(res) => {
     174              :                 ctx.success();
     175              :                 Metrics::get().proxy.retries_metric.observe(
     176              :                     RetriesMetricGroup {
     177              :                         outcome: ConnectOutcome::Success,
     178              :                         retry_type: RetryType::ConnectToCompute,
     179              :                     },
     180              :                     num_retries.into(),
     181              :                 );
     182              :                 // TODO: is this necessary? We have a metric.
     183              :                 info!(?num_retries, "connected to compute node after");
     184              :                 return Ok(res);
     185              :             }
     186              :             Err(e) => {
     187              :                 if !should_retry(&e, num_retries, compute.retry) {
     188              :                     // Don't log an error here, caller will print the error
     189              :                     Metrics::get().proxy.retries_metric.observe(
     190              :                         RetriesMetricGroup {
     191              :                             outcome: ConnectOutcome::Failed,
     192              :                             retry_type: RetryType::ConnectToCompute,
     193              :                         },
     194              :                         num_retries.into(),
     195              :                     );
     196              :                     return Err(e.into());
     197              :                 }
     198              : 
     199              :                 warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT);
     200              :             }
     201              :         };
     202              : 
     203              :         let wait_duration = retry_after(num_retries, compute.retry);
     204              :         num_retries += 1;
     205              : 
     206              :         let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);
     207              :         time::sleep(wait_duration).await;
     208              :         drop(pause);
     209              :     }
     210              : }
        

Generated by: LCOV version 2.1-beta