Line data Source code
1 : use async_trait::async_trait;
2 : use pq_proto::StartupMessageParams;
3 : use tokio::time;
4 : use tracing::{debug, info, warn};
5 :
6 : use super::retry::ShouldRetryWakeCompute;
7 : use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
8 : use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT};
9 : use crate::config::{ComputeConfig, RetryConfig};
10 : use crate::context::RequestContext;
11 : use crate::control_plane::errors::WakeComputeError;
12 : use crate::control_plane::locks::ApiLocks;
13 : use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
14 : use crate::error::ReportableError;
15 : use crate::metrics::{
16 : ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType,
17 : };
18 : use crate::proxy::retry::{retry_after, should_retry, CouldRetry};
19 : use crate::proxy::wake_compute::wake_compute;
20 : use crate::types::Host;
21 :
22 : /// If we couldn't connect, a cached connection info might be to blame
23 : /// (e.g. the compute node's address might've changed at the wrong time).
24 : /// Invalidate the cache entry (if any) to prevent subsequent errors.
25 : #[tracing::instrument(name = "invalidate_cache", skip_all)]
26 : pub(crate) fn invalidate_cache(node_info: control_plane::CachedNodeInfo) -> NodeInfo {
27 : let is_cached = node_info.cached();
28 : if is_cached {
29 : warn!("invalidating stalled compute node info cache entry");
30 : }
31 : let label = if is_cached {
32 : ConnectionFailureKind::ComputeCached
33 : } else {
34 : ConnectionFailureKind::ComputeUncached
35 : };
36 : Metrics::get().proxy.connection_failures_total.inc(label);
37 :
38 : node_info.invalidate()
39 : }
40 :
41 : #[async_trait]
42 : pub(crate) trait ConnectMechanism {
43 : type Connection;
44 : type ConnectError: ReportableError;
45 : type Error: From<Self::ConnectError>;
46 : async fn connect_once(
47 : &self,
48 : ctx: &RequestContext,
49 : node_info: &control_plane::CachedNodeInfo,
50 : config: &ComputeConfig,
51 : ) -> Result<Self::Connection, Self::ConnectError>;
52 :
53 : fn update_connect_config(&self, conf: &mut compute::ConnCfg);
54 : }
55 :
56 : #[async_trait]
57 : pub(crate) trait ComputeConnectBackend {
58 : async fn wake_compute(
59 : &self,
60 : ctx: &RequestContext,
61 : ) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError>;
62 :
63 : fn get_keys(&self) -> &ComputeCredentialKeys;
64 : }
65 :
66 : pub(crate) struct TcpMechanism<'a> {
67 : pub(crate) params_compat: bool,
68 :
69 : /// KV-dictionary with PostgreSQL connection params.
70 : pub(crate) params: &'a StartupMessageParams,
71 :
72 : /// connect_to_compute concurrency lock
73 : pub(crate) locks: &'static ApiLocks<Host>,
74 :
75 : pub(crate) user_info: ComputeUserInfo,
76 : }
77 :
78 : #[async_trait]
79 : impl ConnectMechanism for TcpMechanism<'_> {
80 : type Connection = PostgresConnection;
81 : type ConnectError = compute::ConnectionError;
82 : type Error = compute::ConnectionError;
83 :
84 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
85 : async fn connect_once(
86 : &self,
87 : ctx: &RequestContext,
88 : node_info: &control_plane::CachedNodeInfo,
89 : config: &ComputeConfig,
90 0 : ) -> Result<PostgresConnection, Self::Error> {
91 0 : let host = node_info.config.get_host();
92 0 : let permit = self.locks.get_permit(&host).await?;
93 0 : permit.release_result(node_info.connect(ctx, config, self.user_info.clone()).await)
94 0 : }
95 :
96 0 : fn update_connect_config(&self, config: &mut compute::ConnCfg) {
97 0 : config.set_startup_params(self.params, self.params_compat);
98 0 : }
99 : }
100 :
101 : /// Try to connect to the compute node, retrying if necessary.
102 : #[tracing::instrument(skip_all)]
103 : pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
104 : ctx: &RequestContext,
105 : mechanism: &M,
106 : user_info: &B,
107 : wake_compute_retry_config: RetryConfig,
108 : compute: &ComputeConfig,
109 : ) -> Result<M::Connection, M::Error>
110 : where
111 : M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
112 : M::Error: From<WakeComputeError>,
113 : {
114 : let mut num_retries = 0;
115 : let mut node_info =
116 : wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
117 :
118 : node_info.set_keys(user_info.get_keys());
119 : mechanism.update_connect_config(&mut node_info.config);
120 :
121 : // try once
122 : let err = match mechanism.connect_once(ctx, &node_info, compute).await {
123 : Ok(res) => {
124 : ctx.success();
125 : Metrics::get().proxy.retries_metric.observe(
126 : RetriesMetricGroup {
127 : outcome: ConnectOutcome::Success,
128 : retry_type: RetryType::ConnectToCompute,
129 : },
130 : num_retries.into(),
131 : );
132 : return Ok(res);
133 : }
134 : Err(e) => e,
135 : };
136 :
137 : debug!(error = ?err, COULD_NOT_CONNECT);
138 :
139 : let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
140 : // If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
141 : // Do not need to retrieve a new node_info, just return the old one.
142 : if should_retry(&err, num_retries, compute.retry) {
143 : Metrics::get().proxy.retries_metric.observe(
144 : RetriesMetricGroup {
145 : outcome: ConnectOutcome::Failed,
146 : retry_type: RetryType::ConnectToCompute,
147 : },
148 : num_retries.into(),
149 : );
150 : return Err(err.into());
151 : }
152 : node_info
153 : } else {
154 : // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
155 : debug!("compute node's state has likely changed; requesting a wake-up");
156 : let old_node_info = invalidate_cache(node_info);
157 : // TODO: increment num_retries?
158 : let mut node_info =
159 : wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
160 : node_info.reuse_settings(old_node_info);
161 :
162 : mechanism.update_connect_config(&mut node_info.config);
163 : node_info
164 : };
165 :
166 : // now that we have a new node, try connect to it repeatedly.
167 : // this can error for a few reasons, for instance:
168 : // * DNS connection settings haven't quite propagated yet
169 : debug!("wake_compute success. attempting to connect");
170 : num_retries = 1;
171 : loop {
172 : match mechanism.connect_once(ctx, &node_info, compute).await {
173 : Ok(res) => {
174 : ctx.success();
175 : Metrics::get().proxy.retries_metric.observe(
176 : RetriesMetricGroup {
177 : outcome: ConnectOutcome::Success,
178 : retry_type: RetryType::ConnectToCompute,
179 : },
180 : num_retries.into(),
181 : );
182 : // TODO: is this necessary? We have a metric.
183 : info!(?num_retries, "connected to compute node after");
184 : return Ok(res);
185 : }
186 : Err(e) => {
187 : if !should_retry(&e, num_retries, compute.retry) {
188 : // Don't log an error here, caller will print the error
189 : Metrics::get().proxy.retries_metric.observe(
190 : RetriesMetricGroup {
191 : outcome: ConnectOutcome::Failed,
192 : retry_type: RetryType::ConnectToCompute,
193 : },
194 : num_retries.into(),
195 : );
196 : return Err(e.into());
197 : }
198 :
199 : warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT);
200 : }
201 : };
202 :
203 : let wait_duration = retry_after(num_retries, compute.retry);
204 : num_retries += 1;
205 :
206 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);
207 : time::sleep(wait_duration).await;
208 : drop(pause);
209 : }
210 : }
|