LCOV - code coverage report
Current view: top level - proxy/src/proxy - connect_compute.rs (source / functions) Coverage Total Hit
Test: 75747cdbffeb0b6d2a2a311584368de68cd9aadc.info Lines: 18.2 % 11 2
Test Date: 2024-06-24 06:52:57 Functions: 23.1 % 13 3

            Line data    Source code
       1              : use crate::{
       2              :     auth::backend::ComputeCredentialKeys,
       3              :     compute::{self, PostgresConnection},
       4              :     config::RetryConfig,
       5              :     console::{self, errors::WakeComputeError, locks::ApiLocks, CachedNodeInfo, NodeInfo},
       6              :     context::RequestMonitoring,
       7              :     error::ReportableError,
       8              :     metrics::{ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType},
       9              :     proxy::{
      10              :         retry::{retry_after, ShouldRetry},
      11              :         wake_compute::wake_compute,
      12              :     },
      13              :     Host,
      14              : };
      15              : use async_trait::async_trait;
      16              : use pq_proto::StartupMessageParams;
      17              : use tokio::time;
      18              : use tracing::{error, info, warn};
      19              : 
      20              : const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);
      21              : 
      22              : /// If we couldn't connect, a cached connection info might be to blame
      23              : /// (e.g. the compute node's address might've changed at the wrong time).
      24              : /// Invalidate the cache entry (if any) to prevent subsequent errors.
      25            8 : #[tracing::instrument(name = "invalidate_cache", skip_all)]
      26              : pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> NodeInfo {
      27              :     let is_cached = node_info.cached();
      28              :     if is_cached {
      29              :         warn!("invalidating stalled compute node info cache entry");
      30              :     }
      31              :     let label = match is_cached {
      32              :         true => ConnectionFailureKind::ComputeCached,
      33              :         false => ConnectionFailureKind::ComputeUncached,
      34              :     };
      35              :     Metrics::get().proxy.connection_failures_total.inc(label);
      36              : 
      37              :     node_info.invalidate()
      38              : }
      39              : 
      40              : #[async_trait]
      41              : pub trait ConnectMechanism {
      42              :     type Connection;
      43              :     type ConnectError: ReportableError;
      44              :     type Error: From<Self::ConnectError>;
      45              :     async fn connect_once(
      46              :         &self,
      47              :         ctx: &mut RequestMonitoring,
      48              :         node_info: &console::CachedNodeInfo,
      49              :         timeout: time::Duration,
      50              :     ) -> Result<Self::Connection, Self::ConnectError>;
      51              : 
      52              :     fn update_connect_config(&self, conf: &mut compute::ConnCfg);
      53              : }
      54              : 
      55              : #[async_trait]
      56              : pub trait ComputeConnectBackend {
      57              :     async fn wake_compute(
      58              :         &self,
      59              :         ctx: &mut RequestMonitoring,
      60              :     ) -> Result<CachedNodeInfo, console::errors::WakeComputeError>;
      61              : 
      62              :     fn get_keys(&self) -> Option<&ComputeCredentialKeys>;
      63              : }
      64              : 
      65              : pub struct TcpMechanism<'a> {
      66              :     /// KV-dictionary with PostgreSQL connection params.
      67              :     pub params: &'a StartupMessageParams,
      68              : 
      69              :     /// connect_to_compute concurrency lock
      70              :     pub locks: &'static ApiLocks<Host>,
      71              : }
      72              : 
      73              : #[async_trait]
      74              : impl ConnectMechanism for TcpMechanism<'_> {
      75              :     type Connection = PostgresConnection;
      76              :     type ConnectError = compute::ConnectionError;
      77              :     type Error = compute::ConnectionError;
      78              : 
      79            0 :     #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
      80              :     async fn connect_once(
      81              :         &self,
      82              :         ctx: &mut RequestMonitoring,
      83              :         node_info: &console::CachedNodeInfo,
      84              :         timeout: time::Duration,
      85            0 :     ) -> Result<PostgresConnection, Self::Error> {
      86            0 :         let host = node_info.config.get_host()?;
      87            0 :         let permit = self.locks.get_permit(&host).await?;
      88            0 :         permit.release_result(node_info.connect(ctx, timeout).await)
      89            0 :     }
      90              : 
      91            0 :     fn update_connect_config(&self, config: &mut compute::ConnCfg) {
      92            0 :         config.set_startup_params(self.params);
      93            0 :     }
      94              : }
      95              : 
      96              : /// Try to connect to the compute node, retrying if necessary.
      97           28 : #[tracing::instrument(skip_all)]
      98              : pub async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
      99              :     ctx: &mut RequestMonitoring,
     100              :     mechanism: &M,
     101              :     user_info: &B,
     102              :     allow_self_signed_compute: bool,
     103              :     wake_compute_retry_config: RetryConfig,
     104              :     connect_to_compute_retry_config: RetryConfig,
     105              : ) -> Result<M::Connection, M::Error>
     106              : where
     107              :     M::ConnectError: ShouldRetry + std::fmt::Debug,
     108              :     M::Error: From<WakeComputeError>,
     109              : {
     110              :     let mut num_retries = 0;
     111              :     let mut node_info =
     112              :         wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
     113              :     if let Some(keys) = user_info.get_keys() {
     114              :         node_info.set_keys(keys);
     115              :     }
     116              :     node_info.allow_self_signed_compute = allow_self_signed_compute;
     117              :     // let mut node_info = credentials.get_node_info(ctx, user_info).await?;
     118              :     mechanism.update_connect_config(&mut node_info.config);
     119              :     let retry_type = RetryType::ConnectToCompute;
     120              : 
     121              :     // try once
     122              :     let err = match mechanism
     123              :         .connect_once(ctx, &node_info, CONNECT_TIMEOUT)
     124              :         .await
     125              :     {
     126              :         Ok(res) => {
     127              :             ctx.latency_timer.success();
     128              :             Metrics::get().proxy.retries_metric.observe(
     129              :                 RetriesMetricGroup {
     130              :                     outcome: ConnectOutcome::Success,
     131              :                     retry_type,
     132              :                 },
     133              :                 num_retries.into(),
     134              :             );
     135              :             return Ok(res);
     136              :         }
     137              :         Err(e) => e,
     138              :     };
     139              : 
     140              :     error!(error = ?err, "could not connect to compute node");
     141              : 
     142              :     let node_info = if !node_info.cached() || !err.should_retry_database_address() {
     143              :         // If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry.
     144              :         // Do not need to retrieve a new node_info, just return the old one.
     145              :         if !err.should_retry(num_retries, connect_to_compute_retry_config) {
     146              :             Metrics::get().proxy.retries_metric.observe(
     147              :                 RetriesMetricGroup {
     148              :                     outcome: ConnectOutcome::Failed,
     149              :                     retry_type,
     150              :                 },
     151              :                 num_retries.into(),
     152              :             );
     153              :             return Err(err.into());
     154              :         }
     155              :         node_info
     156              :     } else {
     157              :         // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
     158              :         info!("compute node's state has likely changed; requesting a wake-up");
     159              :         let old_node_info = invalidate_cache(node_info);
     160              :         let mut node_info =
     161              :             wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
     162              :         node_info.reuse_settings(old_node_info);
     163              : 
     164              :         mechanism.update_connect_config(&mut node_info.config);
     165              :         node_info
     166              :     };
     167              : 
     168              :     // now that we have a new node, try connect to it repeatedly.
     169              :     // this can error for a few reasons, for instance:
     170              :     // * DNS connection settings haven't quite propagated yet
     171              :     info!("wake_compute success. attempting to connect");
     172              :     num_retries = 1;
     173              :     loop {
     174              :         match mechanism
     175              :             .connect_once(ctx, &node_info, CONNECT_TIMEOUT)
     176              :             .await
     177              :         {
     178              :             Ok(res) => {
     179              :                 ctx.latency_timer.success();
     180              :                 Metrics::get().proxy.retries_metric.observe(
     181              :                     RetriesMetricGroup {
     182              :                         outcome: ConnectOutcome::Success,
     183              :                         retry_type,
     184              :                     },
     185              :                     num_retries.into(),
     186              :                 );
     187              :                 info!(?num_retries, "connected to compute node after");
     188              :                 return Ok(res);
     189              :             }
     190              :             Err(e) => {
     191              :                 let retriable = e.should_retry(num_retries, connect_to_compute_retry_config);
     192              :                 if !retriable {
     193              :                     error!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
     194              :                     Metrics::get().proxy.retries_metric.observe(
     195              :                         RetriesMetricGroup {
     196              :                             outcome: ConnectOutcome::Failed,
     197              :                             retry_type,
     198              :                         },
     199              :                         num_retries.into(),
     200              :                     );
     201              :                     return Err(e.into());
     202              :                 }
     203              :                 warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
     204              :             }
     205              :         }
     206              : 
     207              :         let wait_duration = retry_after(num_retries, connect_to_compute_retry_config);
     208              :         num_retries += 1;
     209              : 
     210              :         let pause = ctx
     211              :             .latency_timer
     212              :             .pause(crate::metrics::Waiting::RetryTimeout);
     213              :         time::sleep(wait_duration).await;
     214              :         drop(pause);
     215              :     }
     216              : }
        

Generated by: LCOV version 2.1-beta