Line data Source code
1 : use std::{sync::Arc, time::Duration};
2 :
3 : use async_trait::async_trait;
4 : use tracing::{field::display, info};
5 :
6 : use crate::{
7 : auth::{
8 : backend::{local::StaticAuthRules, ComputeCredentials, ComputeUserInfo},
9 : check_peer_addr_is_in_list, AuthError,
10 : },
11 : compute,
12 : config::{AuthenticationConfig, ProxyConfig},
13 : console::{
14 : errors::{GetAuthInfoError, WakeComputeError},
15 : locks::ApiLocks,
16 : provider::ApiLockError,
17 : CachedNodeInfo,
18 : },
19 : context::RequestMonitoring,
20 : error::{ErrorKind, ReportableError, UserFacingError},
21 : intern::EndpointIdInt,
22 : proxy::{
23 : connect_compute::ConnectMechanism,
24 : retry::{CouldRetry, ShouldRetryWakeCompute},
25 : },
26 : rate_limiter::EndpointRateLimiter,
27 : Host,
28 : };
29 :
30 : use super::conn_pool::{poll_client, AuthData, Client, ConnInfo, GlobalConnPool};
31 :
32 : pub(crate) struct PoolingBackend {
33 : pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
34 : pub(crate) config: &'static ProxyConfig,
35 : pub(crate) endpoint_rate_limiter: Arc<EndpointRateLimiter>,
36 : }
37 :
38 : impl PoolingBackend {
39 0 : pub(crate) async fn authenticate_with_password(
40 0 : &self,
41 0 : ctx: &RequestMonitoring,
42 0 : config: &AuthenticationConfig,
43 0 : user_info: &ComputeUserInfo,
44 0 : password: &[u8],
45 0 : ) -> Result<ComputeCredentials, AuthError> {
46 0 : let user_info = user_info.clone();
47 0 : let backend = self
48 0 : .config
49 0 : .auth_backend
50 0 : .as_ref()
51 0 : .map(|()| user_info.clone());
52 0 : let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
53 0 : if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
54 0 : return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
55 0 : }
56 0 : if !self
57 0 : .endpoint_rate_limiter
58 0 : .check(user_info.endpoint.clone().into(), 1)
59 : {
60 0 : return Err(AuthError::too_many_connections());
61 0 : }
62 0 : let cached_secret = match maybe_secret {
63 0 : Some(secret) => secret,
64 0 : None => backend.get_role_secret(ctx).await?,
65 : };
66 :
67 0 : let secret = match cached_secret.value.clone() {
68 0 : Some(secret) => self.config.authentication_config.check_rate_limit(
69 0 : ctx,
70 0 : config,
71 0 : secret,
72 0 : &user_info.endpoint,
73 0 : true,
74 0 : )?,
75 : None => {
76 : // If we don't have an authentication secret, for the http flow we can just return an error.
77 0 : info!("authentication info not found");
78 0 : return Err(AuthError::auth_failed(&*user_info.user));
79 : }
80 : };
81 0 : let ep = EndpointIdInt::from(&user_info.endpoint);
82 0 : let auth_outcome =
83 0 : crate::auth::validate_password_and_exchange(&config.thread_pool, ep, password, secret)
84 0 : .await?;
85 0 : let res = match auth_outcome {
86 0 : crate::sasl::Outcome::Success(key) => {
87 0 : info!("user successfully authenticated");
88 0 : Ok(key)
89 : }
90 0 : crate::sasl::Outcome::Failure(reason) => {
91 0 : info!("auth backend failed with an error: {reason}");
92 0 : Err(AuthError::auth_failed(&*user_info.user))
93 : }
94 : };
95 0 : res.map(|key| ComputeCredentials {
96 0 : info: user_info,
97 0 : keys: key,
98 0 : })
99 0 : }
100 :
101 0 : pub(crate) async fn authenticate_with_jwt(
102 0 : &self,
103 0 : ctx: &RequestMonitoring,
104 0 : user_info: &ComputeUserInfo,
105 0 : jwt: &str,
106 0 : ) -> Result<ComputeCredentials, AuthError> {
107 0 : match &self.config.auth_backend {
108 : crate::auth::Backend::Console(_, ()) => {
109 0 : Err(AuthError::auth_failed("JWT login is not yet supported"))
110 : }
111 0 : crate::auth::Backend::Web(_, ()) => Err(AuthError::auth_failed(
112 0 : "JWT login over web auth proxy is not supported",
113 0 : )),
114 0 : crate::auth::Backend::Local(cache) => {
115 0 : cache
116 0 : .jwks_cache
117 0 : .check_jwt(
118 0 : ctx,
119 0 : user_info.endpoint.clone(),
120 0 : user_info.user.clone(),
121 0 : &StaticAuthRules,
122 0 : jwt,
123 0 : )
124 0 : .await
125 0 : .map_err(|e| AuthError::auth_failed(e.to_string()))?;
126 0 : Ok(ComputeCredentials {
127 0 : info: user_info.clone(),
128 0 : keys: crate::auth::backend::ComputeCredentialKeys::None,
129 0 : })
130 : }
131 : }
132 0 : }
133 :
134 : // Wake up the destination if needed. Code here is a bit involved because
135 : // we reuse the code from the usual proxy and we need to prepare few structures
136 : // that this code expects.
137 0 : #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
138 : pub(crate) async fn connect_to_compute(
139 : &self,
140 : ctx: &RequestMonitoring,
141 : conn_info: ConnInfo,
142 : keys: ComputeCredentials,
143 : force_new: bool,
144 : ) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
145 : let maybe_client = if force_new {
146 : info!("pool: pool is disabled");
147 : None
148 : } else {
149 : info!("pool: looking for an existing connection");
150 : self.pool.get(ctx, &conn_info)?
151 : };
152 :
153 : if let Some(client) = maybe_client {
154 : return Ok(client);
155 : }
156 : let conn_id = uuid::Uuid::new_v4();
157 : tracing::Span::current().record("conn_id", display(conn_id));
158 : info!(%conn_id, "pool: opening a new connection '{conn_info}'");
159 0 : let backend = self.config.auth_backend.as_ref().map(|()| keys);
160 : crate::proxy::connect_compute::connect_to_compute(
161 : ctx,
162 : &TokioMechanism {
163 : conn_id,
164 : conn_info,
165 : pool: self.pool.clone(),
166 : locks: &self.config.connect_compute_locks,
167 : },
168 : &backend,
169 : false, // do not allow self signed compute for http flow
170 : self.config.wake_compute_retry_config,
171 : self.config.connect_to_compute_retry_config,
172 : )
173 : .await
174 : }
175 : }
176 :
177 0 : #[derive(Debug, thiserror::Error)]
178 : pub(crate) enum HttpConnError {
179 : #[error("pooled connection closed at inconsistent state")]
180 : ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
181 : #[error("could not connection to compute")]
182 : ConnectionError(#[from] tokio_postgres::Error),
183 :
184 : #[error("could not get auth info")]
185 : GetAuthInfo(#[from] GetAuthInfoError),
186 : #[error("user not authenticated")]
187 : AuthError(#[from] AuthError),
188 : #[error("wake_compute returned error")]
189 : WakeCompute(#[from] WakeComputeError),
190 : #[error("error acquiring resource permit: {0}")]
191 : TooManyConnectionAttempts(#[from] ApiLockError),
192 : }
193 :
194 : impl ReportableError for HttpConnError {
195 0 : fn get_error_kind(&self) -> ErrorKind {
196 0 : match self {
197 0 : HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
198 0 : HttpConnError::ConnectionError(p) => p.get_error_kind(),
199 0 : HttpConnError::GetAuthInfo(a) => a.get_error_kind(),
200 0 : HttpConnError::AuthError(a) => a.get_error_kind(),
201 0 : HttpConnError::WakeCompute(w) => w.get_error_kind(),
202 0 : HttpConnError::TooManyConnectionAttempts(w) => w.get_error_kind(),
203 : }
204 0 : }
205 : }
206 :
207 : impl UserFacingError for HttpConnError {
208 0 : fn to_string_client(&self) -> String {
209 0 : match self {
210 0 : HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(),
211 0 : HttpConnError::ConnectionError(p) => p.to_string(),
212 0 : HttpConnError::GetAuthInfo(c) => c.to_string_client(),
213 0 : HttpConnError::AuthError(c) => c.to_string_client(),
214 0 : HttpConnError::WakeCompute(c) => c.to_string_client(),
215 : HttpConnError::TooManyConnectionAttempts(_) => {
216 0 : "Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
217 : }
218 : }
219 0 : }
220 : }
221 :
222 : impl CouldRetry for HttpConnError {
223 0 : fn could_retry(&self) -> bool {
224 0 : match self {
225 0 : HttpConnError::ConnectionError(e) => e.could_retry(),
226 0 : HttpConnError::ConnectionClosedAbruptly(_) => false,
227 0 : HttpConnError::GetAuthInfo(_) => false,
228 0 : HttpConnError::AuthError(_) => false,
229 0 : HttpConnError::WakeCompute(_) => false,
230 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
231 : }
232 0 : }
233 : }
234 : impl ShouldRetryWakeCompute for HttpConnError {
235 0 : fn should_retry_wake_compute(&self) -> bool {
236 0 : match self {
237 0 : HttpConnError::ConnectionError(e) => e.should_retry_wake_compute(),
238 : // we never checked cache validity
239 0 : HttpConnError::TooManyConnectionAttempts(_) => false,
240 0 : _ => true,
241 : }
242 0 : }
243 : }
244 :
245 : struct TokioMechanism {
246 : pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
247 : conn_info: ConnInfo,
248 : conn_id: uuid::Uuid,
249 :
250 : /// connect_to_compute concurrency lock
251 : locks: &'static ApiLocks<Host>,
252 : }
253 :
254 : #[async_trait]
255 : impl ConnectMechanism for TokioMechanism {
256 : type Connection = Client<tokio_postgres::Client>;
257 : type ConnectError = HttpConnError;
258 : type Error = HttpConnError;
259 :
260 0 : async fn connect_once(
261 0 : &self,
262 0 : ctx: &RequestMonitoring,
263 0 : node_info: &CachedNodeInfo,
264 0 : timeout: Duration,
265 0 : ) -> Result<Self::Connection, Self::ConnectError> {
266 0 : let host = node_info.config.get_host()?;
267 0 : let permit = self.locks.get_permit(&host).await?;
268 0 :
269 0 : let mut config = (*node_info.config).clone();
270 0 : let config = config
271 0 : .user(&self.conn_info.user_info.user)
272 0 : .dbname(&self.conn_info.dbname)
273 0 : .connect_timeout(timeout);
274 0 :
275 0 : match &self.conn_info.auth {
276 0 : AuthData::Jwt(_) => {}
277 0 : AuthData::Password(pw) => {
278 0 : config.password(pw);
279 0 : }
280 0 : }
281 0 :
282 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
283 0 : let res = config.connect(tokio_postgres::NoTls).await;
284 0 : drop(pause);
285 0 : let (client, connection) = permit.release_result(res)?;
286 0 :
287 0 : tracing::Span::current().record("pid", tracing::field::display(client.get_process_id()));
288 0 : Ok(poll_client(
289 0 : self.pool.clone(),
290 0 : ctx,
291 0 : self.conn_info.clone(),
292 0 : client,
293 0 : connection,
294 0 : self.conn_id,
295 0 : node_info.aux.clone(),
296 0 : ))
297 0 : }
298 :
299 0 : fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
300 : }
|