Line data Source code
1 : use async_trait::async_trait;
2 : use tokio::time;
3 : use tracing::{debug, info, warn};
4 :
5 : use crate::compute::{self, COULD_NOT_CONNECT, ComputeConnection};
6 : use crate::config::{ComputeConfig, RetryConfig};
7 : use crate::context::RequestContext;
8 : use crate::control_plane::errors::WakeComputeError;
9 : use crate::control_plane::locks::ApiLocks;
10 : use crate::control_plane::{self, NodeInfo};
11 : use crate::error::ReportableError;
12 : use crate::metrics::{
13 : ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType,
14 : };
15 : use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute, retry_after, should_retry};
16 : use crate::proxy::wake_compute::{WakeComputeBackend, wake_compute};
17 : use crate::types::Host;
18 :
19 : /// If we couldn't connect, a cached connection info might be to blame
20 : /// (e.g. the compute node's address might've changed at the wrong time).
21 : /// Invalidate the cache entry (if any) to prevent subsequent errors.
22 : #[tracing::instrument(skip_all)]
23 : pub(crate) fn invalidate_cache(node_info: control_plane::CachedNodeInfo) -> NodeInfo {
24 : let is_cached = node_info.cached();
25 : if is_cached {
26 : warn!("invalidating stalled compute node info cache entry");
27 : }
28 : let label = if is_cached {
29 : ConnectionFailureKind::ComputeCached
30 : } else {
31 : ConnectionFailureKind::ComputeUncached
32 : };
33 : Metrics::get().proxy.connection_failures_total.inc(label);
34 :
35 : node_info.invalidate()
36 : }
37 :
38 : #[async_trait]
39 : pub(crate) trait ConnectMechanism {
40 : type Connection;
41 : type ConnectError: ReportableError;
42 : type Error: From<Self::ConnectError>;
43 : async fn connect_once(
44 : &self,
45 : ctx: &RequestContext,
46 : node_info: &control_plane::CachedNodeInfo,
47 : config: &ComputeConfig,
48 : ) -> Result<Self::Connection, Self::ConnectError>;
49 : }
50 :
51 : pub(crate) struct TcpMechanism {
52 : /// connect_to_compute concurrency lock
53 : pub(crate) locks: &'static ApiLocks<Host>,
54 : }
55 :
56 : #[async_trait]
57 : impl ConnectMechanism for TcpMechanism {
58 : type Connection = ComputeConnection;
59 : type ConnectError = compute::ConnectionError;
60 : type Error = compute::ConnectionError;
61 :
62 : #[tracing::instrument(skip_all, fields(
63 : pid = tracing::field::Empty,
64 : compute_id = tracing::field::Empty
65 : ))]
66 : async fn connect_once(
67 : &self,
68 : ctx: &RequestContext,
69 : node_info: &control_plane::CachedNodeInfo,
70 : config: &ComputeConfig,
71 0 : ) -> Result<ComputeConnection, Self::Error> {
72 0 : let permit = self.locks.get_permit(&node_info.conn_info.host).await?;
73 0 : permit.release_result(node_info.connect(ctx, config).await)
74 0 : }
75 : }
76 :
77 : /// Try to connect to the compute node, retrying if necessary.
78 : #[tracing::instrument(skip_all)]
79 : pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: WakeComputeBackend>(
80 : ctx: &RequestContext,
81 : mechanism: &M,
82 : user_info: &B,
83 : wake_compute_retry_config: RetryConfig,
84 : compute: &ComputeConfig,
85 : ) -> Result<M::Connection, M::Error>
86 : where
87 : M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
88 : M::Error: From<WakeComputeError>,
89 : {
90 : let mut num_retries = 0;
91 : let node_info =
92 : wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
93 :
94 : // try once
95 : let err = match mechanism.connect_once(ctx, &node_info, compute).await {
96 : Ok(res) => {
97 : ctx.success();
98 : Metrics::get().proxy.retries_metric.observe(
99 : RetriesMetricGroup {
100 : outcome: ConnectOutcome::Success,
101 : retry_type: RetryType::ConnectToCompute,
102 : },
103 : num_retries.into(),
104 : );
105 : return Ok(res);
106 : }
107 : Err(e) => e,
108 : };
109 :
110 : debug!(error = ?err, COULD_NOT_CONNECT);
111 :
112 : let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
113 : // If we just received this from cplane and not from the cache, we shouldn't retry.
114 : // Do not need to retrieve a new node_info, just return the old one.
115 : if !should_retry(&err, num_retries, compute.retry) {
116 : Metrics::get().proxy.retries_metric.observe(
117 : RetriesMetricGroup {
118 : outcome: ConnectOutcome::Failed,
119 : retry_type: RetryType::ConnectToCompute,
120 : },
121 : num_retries.into(),
122 : );
123 : return Err(err.into());
124 : }
125 : node_info
126 : } else {
127 : // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
128 : debug!("compute node's state has likely changed; requesting a wake-up");
129 : invalidate_cache(node_info);
130 : // TODO: increment num_retries?
131 : wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?
132 : };
133 :
134 : // now that we have a new node, try connect to it repeatedly.
135 : // this can error for a few reasons, for instance:
136 : // * DNS connection settings haven't quite propagated yet
137 : debug!("wake_compute success. attempting to connect");
138 : num_retries = 1;
139 : loop {
140 : match mechanism.connect_once(ctx, &node_info, compute).await {
141 : Ok(res) => {
142 : ctx.success();
143 : Metrics::get().proxy.retries_metric.observe(
144 : RetriesMetricGroup {
145 : outcome: ConnectOutcome::Success,
146 : retry_type: RetryType::ConnectToCompute,
147 : },
148 : num_retries.into(),
149 : );
150 : // TODO: is this necessary? We have a metric.
151 : info!(?num_retries, "connected to compute node after");
152 : return Ok(res);
153 : }
154 : Err(e) => {
155 : if !should_retry(&e, num_retries, compute.retry) {
156 : // Don't log an error here, caller will print the error
157 : Metrics::get().proxy.retries_metric.observe(
158 : RetriesMetricGroup {
159 : outcome: ConnectOutcome::Failed,
160 : retry_type: RetryType::ConnectToCompute,
161 : },
162 : num_retries.into(),
163 : );
164 : return Err(e.into());
165 : }
166 :
167 : warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT);
168 : }
169 : }
170 :
171 : let wait_duration = retry_after(num_retries, compute.retry);
172 : num_retries += 1;
173 :
174 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);
175 : time::sleep(wait_duration).await;
176 : drop(pause);
177 : }
178 : }
|