Line data Source code
1 : use crate::{
2 : auth::backend::ComputeCredentialKeys,
3 : compute::{self, PostgresConnection},
4 : config::RetryConfig,
5 : console::{self, errors::WakeComputeError, locks::ApiLocks, CachedNodeInfo, NodeInfo},
6 : context::RequestMonitoring,
7 : error::ReportableError,
8 : metrics::{ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType},
9 : proxy::{
10 : retry::{retry_after, ShouldRetry},
11 : wake_compute::wake_compute,
12 : },
13 : Host,
14 : };
15 : use async_trait::async_trait;
16 : use pq_proto::StartupMessageParams;
17 : use tokio::time;
18 : use tracing::{error, info, warn};
19 :
20 : const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);
21 :
22 : /// If we couldn't connect, a cached connection info might be to blame
23 : /// (e.g. the compute node's address might've changed at the wrong time).
24 : /// Invalidate the cache entry (if any) to prevent subsequent errors.
25 8 : #[tracing::instrument(name = "invalidate_cache", skip_all)]
26 : pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> NodeInfo {
27 : let is_cached = node_info.cached();
28 : if is_cached {
29 : warn!("invalidating stalled compute node info cache entry");
30 : }
31 : let label = match is_cached {
32 : true => ConnectionFailureKind::ComputeCached,
33 : false => ConnectionFailureKind::ComputeUncached,
34 : };
35 : Metrics::get().proxy.connection_failures_total.inc(label);
36 :
37 : node_info.invalidate()
38 : }
39 :
40 : #[async_trait]
41 : pub trait ConnectMechanism {
42 : type Connection;
43 : type ConnectError: ReportableError;
44 : type Error: From<Self::ConnectError>;
45 : async fn connect_once(
46 : &self,
47 : ctx: &mut RequestMonitoring,
48 : node_info: &console::CachedNodeInfo,
49 : timeout: time::Duration,
50 : ) -> Result<Self::Connection, Self::ConnectError>;
51 :
52 : fn update_connect_config(&self, conf: &mut compute::ConnCfg);
53 : }
54 :
55 : #[async_trait]
56 : pub trait ComputeConnectBackend {
57 : async fn wake_compute(
58 : &self,
59 : ctx: &mut RequestMonitoring,
60 : ) -> Result<CachedNodeInfo, console::errors::WakeComputeError>;
61 :
62 : fn get_keys(&self) -> Option<&ComputeCredentialKeys>;
63 : }
64 :
65 : pub struct TcpMechanism<'a> {
66 : /// KV-dictionary with PostgreSQL connection params.
67 : pub params: &'a StartupMessageParams,
68 :
69 : /// connect_to_compute concurrency lock
70 : pub locks: &'static ApiLocks<Host>,
71 : }
72 :
73 : #[async_trait]
74 : impl ConnectMechanism for TcpMechanism<'_> {
75 : type Connection = PostgresConnection;
76 : type ConnectError = compute::ConnectionError;
77 : type Error = compute::ConnectionError;
78 :
79 0 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
80 : async fn connect_once(
81 : &self,
82 : ctx: &mut RequestMonitoring,
83 : node_info: &console::CachedNodeInfo,
84 : timeout: time::Duration,
85 0 : ) -> Result<PostgresConnection, Self::Error> {
86 0 : let host = node_info.config.get_host()?;
87 0 : let permit = self.locks.get_permit(&host).await?;
88 0 : permit.release_result(node_info.connect(ctx, timeout).await)
89 0 : }
90 :
91 0 : fn update_connect_config(&self, config: &mut compute::ConnCfg) {
92 0 : config.set_startup_params(self.params);
93 0 : }
94 : }
95 :
96 : /// Try to connect to the compute node, retrying if necessary.
97 28 : #[tracing::instrument(skip_all)]
98 : pub async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
99 : ctx: &mut RequestMonitoring,
100 : mechanism: &M,
101 : user_info: &B,
102 : allow_self_signed_compute: bool,
103 : wake_compute_retry_config: RetryConfig,
104 : connect_to_compute_retry_config: RetryConfig,
105 : ) -> Result<M::Connection, M::Error>
106 : where
107 : M::ConnectError: ShouldRetry + std::fmt::Debug,
108 : M::Error: From<WakeComputeError>,
109 : {
110 : let mut num_retries = 0;
111 : let mut node_info =
112 : wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
113 : if let Some(keys) = user_info.get_keys() {
114 : node_info.set_keys(keys);
115 : }
116 : node_info.allow_self_signed_compute = allow_self_signed_compute;
117 : // let mut node_info = credentials.get_node_info(ctx, user_info).await?;
118 : mechanism.update_connect_config(&mut node_info.config);
119 : let retry_type = RetryType::ConnectToCompute;
120 :
121 : // try once
122 : let err = match mechanism
123 : .connect_once(ctx, &node_info, CONNECT_TIMEOUT)
124 : .await
125 : {
126 : Ok(res) => {
127 : ctx.latency_timer.success();
128 : Metrics::get().proxy.retries_metric.observe(
129 : RetriesMetricGroup {
130 : outcome: ConnectOutcome::Success,
131 : retry_type,
132 : },
133 : num_retries.into(),
134 : );
135 : return Ok(res);
136 : }
137 : Err(e) => e,
138 : };
139 :
140 : error!(error = ?err, "could not connect to compute node");
141 :
142 : let node_info = if !node_info.cached() || !err.should_retry_database_address() {
143 : // If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry.
144 : // Do not need to retrieve a new node_info, just return the old one.
145 : if !err.should_retry(num_retries, connect_to_compute_retry_config) {
146 : Metrics::get().proxy.retries_metric.observe(
147 : RetriesMetricGroup {
148 : outcome: ConnectOutcome::Failed,
149 : retry_type,
150 : },
151 : num_retries.into(),
152 : );
153 : return Err(err.into());
154 : }
155 : node_info
156 : } else {
157 : // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
158 : info!("compute node's state has likely changed; requesting a wake-up");
159 : let old_node_info = invalidate_cache(node_info);
160 : let mut node_info =
161 : wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
162 : node_info.reuse_settings(old_node_info);
163 :
164 : mechanism.update_connect_config(&mut node_info.config);
165 : node_info
166 : };
167 :
168 : // now that we have a new node, try connect to it repeatedly.
169 : // this can error for a few reasons, for instance:
170 : // * DNS connection settings haven't quite propagated yet
171 : info!("wake_compute success. attempting to connect");
172 : num_retries = 1;
173 : loop {
174 : match mechanism
175 : .connect_once(ctx, &node_info, CONNECT_TIMEOUT)
176 : .await
177 : {
178 : Ok(res) => {
179 : ctx.latency_timer.success();
180 : Metrics::get().proxy.retries_metric.observe(
181 : RetriesMetricGroup {
182 : outcome: ConnectOutcome::Success,
183 : retry_type,
184 : },
185 : num_retries.into(),
186 : );
187 : info!(?num_retries, "connected to compute node after");
188 : return Ok(res);
189 : }
190 : Err(e) => {
191 : let retriable = e.should_retry(num_retries, connect_to_compute_retry_config);
192 : if !retriable {
193 : error!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
194 : Metrics::get().proxy.retries_metric.observe(
195 : RetriesMetricGroup {
196 : outcome: ConnectOutcome::Failed,
197 : retry_type,
198 : },
199 : num_retries.into(),
200 : );
201 : return Err(e.into());
202 : }
203 : warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
204 : }
205 : }
206 :
207 : let wait_duration = retry_after(num_retries, connect_to_compute_retry_config);
208 : num_retries += 1;
209 :
210 : let pause = ctx
211 : .latency_timer
212 : .pause(crate::metrics::Waiting::RetryTimeout);
213 : time::sleep(wait_duration).await;
214 : drop(pause);
215 : }
216 : }
|