LCOV - code coverage report
Current view: top level - proxy/src/proxy - connect_compute.rs (source / functions) Coverage Total Hit
Test: 915229b2d22dd355ad718d9afbb773e7f2fba970.info Lines: 0.0 % 4 0
Test Date: 2025-07-24 10:33:41 Functions: 0.0 % 1 0

            Line data    Source code
       1              : use async_trait::async_trait;
       2              : use tokio::time;
       3              : use tracing::{debug, info, warn};
       4              : 
       5              : use crate::compute::{self, COULD_NOT_CONNECT, ComputeConnection};
       6              : use crate::config::{ComputeConfig, RetryConfig};
       7              : use crate::context::RequestContext;
       8              : use crate::control_plane::errors::WakeComputeError;
       9              : use crate::control_plane::locks::ApiLocks;
      10              : use crate::control_plane::{self, NodeInfo};
      11              : use crate::error::ReportableError;
      12              : use crate::metrics::{
      13              :     ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType,
      14              : };
      15              : use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute, retry_after, should_retry};
      16              : use crate::proxy::wake_compute::{WakeComputeBackend, wake_compute};
      17              : use crate::types::Host;
      18              : 
      19              : /// If we couldn't connect, a cached connection info might be to blame
      20              : /// (e.g. the compute node's address might've changed at the wrong time).
      21              : /// Invalidate the cache entry (if any) to prevent subsequent errors.
      22              : #[tracing::instrument(skip_all)]
      23              : pub(crate) fn invalidate_cache(node_info: control_plane::CachedNodeInfo) -> NodeInfo {
      24              :     let is_cached = node_info.cached();
      25              :     if is_cached {
      26              :         warn!("invalidating stalled compute node info cache entry");
      27              :     }
      28              :     let label = if is_cached {
      29              :         ConnectionFailureKind::ComputeCached
      30              :     } else {
      31              :         ConnectionFailureKind::ComputeUncached
      32              :     };
      33              :     Metrics::get().proxy.connection_failures_total.inc(label);
      34              : 
      35              :     node_info.invalidate()
      36              : }
      37              : 
      38              : #[async_trait]
      39              : pub(crate) trait ConnectMechanism {
      40              :     type Connection;
      41              :     type ConnectError: ReportableError;
      42              :     type Error: From<Self::ConnectError>;
      43              :     async fn connect_once(
      44              :         &self,
      45              :         ctx: &RequestContext,
      46              :         node_info: &control_plane::CachedNodeInfo,
      47              :         config: &ComputeConfig,
      48              :     ) -> Result<Self::Connection, Self::ConnectError>;
      49              : }
      50              : 
      51              : pub(crate) struct TcpMechanism {
      52              :     /// connect_to_compute concurrency lock
      53              :     pub(crate) locks: &'static ApiLocks<Host>,
      54              : }
      55              : 
      56              : #[async_trait]
      57              : impl ConnectMechanism for TcpMechanism {
      58              :     type Connection = ComputeConnection;
      59              :     type ConnectError = compute::ConnectionError;
      60              :     type Error = compute::ConnectionError;
      61              : 
      62              :     #[tracing::instrument(skip_all, fields(
      63              :         pid = tracing::field::Empty,
      64              :         compute_id = tracing::field::Empty
      65              :     ))]
      66              :     async fn connect_once(
      67              :         &self,
      68              :         ctx: &RequestContext,
      69              :         node_info: &control_plane::CachedNodeInfo,
      70              :         config: &ComputeConfig,
      71            0 :     ) -> Result<ComputeConnection, Self::Error> {
      72            0 :         let permit = self.locks.get_permit(&node_info.conn_info.host).await?;
      73            0 :         permit.release_result(node_info.connect(ctx, config).await)
      74            0 :     }
      75              : }
      76              : 
      77              : /// Try to connect to the compute node, retrying if necessary.
      78              : #[tracing::instrument(skip_all)]
      79              : pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: WakeComputeBackend>(
      80              :     ctx: &RequestContext,
      81              :     mechanism: &M,
      82              :     user_info: &B,
      83              :     wake_compute_retry_config: RetryConfig,
      84              :     compute: &ComputeConfig,
      85              : ) -> Result<M::Connection, M::Error>
      86              : where
      87              :     M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
      88              :     M::Error: From<WakeComputeError>,
      89              : {
      90              :     let mut num_retries = 0;
      91              :     let node_info =
      92              :         wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
      93              : 
      94              :     // try once
      95              :     let err = match mechanism.connect_once(ctx, &node_info, compute).await {
      96              :         Ok(res) => {
      97              :             ctx.success();
      98              :             Metrics::get().proxy.retries_metric.observe(
      99              :                 RetriesMetricGroup {
     100              :                     outcome: ConnectOutcome::Success,
     101              :                     retry_type: RetryType::ConnectToCompute,
     102              :                 },
     103              :                 num_retries.into(),
     104              :             );
     105              :             return Ok(res);
     106              :         }
     107              :         Err(e) => e,
     108              :     };
     109              : 
     110              :     debug!(error = ?err, COULD_NOT_CONNECT);
     111              : 
     112              :     let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
     113              :         // If we just received this from cplane and not from the cache, we shouldn't retry.
     114              :         // Do not need to retrieve a new node_info, just return the old one.
     115              :         if !should_retry(&err, num_retries, compute.retry) {
     116              :             Metrics::get().proxy.retries_metric.observe(
     117              :                 RetriesMetricGroup {
     118              :                     outcome: ConnectOutcome::Failed,
     119              :                     retry_type: RetryType::ConnectToCompute,
     120              :                 },
     121              :                 num_retries.into(),
     122              :             );
     123              :             return Err(err.into());
     124              :         }
     125              :         node_info
     126              :     } else {
     127              :         // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
     128              :         debug!("compute node's state has likely changed; requesting a wake-up");
     129              :         invalidate_cache(node_info);
     130              :         // TODO: increment num_retries?
     131              :         wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?
     132              :     };
     133              : 
     134              :     // now that we have a new node, try connect to it repeatedly.
     135              :     // this can error for a few reasons, for instance:
     136              :     // * DNS connection settings haven't quite propagated yet
     137              :     debug!("wake_compute success. attempting to connect");
     138              :     num_retries = 1;
     139              :     loop {
     140              :         match mechanism.connect_once(ctx, &node_info, compute).await {
     141              :             Ok(res) => {
     142              :                 ctx.success();
     143              :                 Metrics::get().proxy.retries_metric.observe(
     144              :                     RetriesMetricGroup {
     145              :                         outcome: ConnectOutcome::Success,
     146              :                         retry_type: RetryType::ConnectToCompute,
     147              :                     },
     148              :                     num_retries.into(),
     149              :                 );
     150              :                 // TODO: is this necessary? We have a metric.
     151              :                 info!(?num_retries, "connected to compute node after");
     152              :                 return Ok(res);
     153              :             }
     154              :             Err(e) => {
     155              :                 if !should_retry(&e, num_retries, compute.retry) {
     156              :                     // Don't log an error here, caller will print the error
     157              :                     Metrics::get().proxy.retries_metric.observe(
     158              :                         RetriesMetricGroup {
     159              :                             outcome: ConnectOutcome::Failed,
     160              :                             retry_type: RetryType::ConnectToCompute,
     161              :                         },
     162              :                         num_retries.into(),
     163              :                     );
     164              :                     return Err(e.into());
     165              :                 }
     166              : 
     167              :                 warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT);
     168              :             }
     169              :         }
     170              : 
     171              :         let wait_duration = retry_after(num_retries, compute.retry);
     172              :         num_retries += 1;
     173              : 
     174              :         let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);
     175              :         time::sleep(wait_duration).await;
     176              :         drop(pause);
     177              :     }
     178              : }
        

Generated by: LCOV version 2.1-beta