Line data Source code
1 : //! Production console backend.
2 :
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use ::http::header::AUTHORIZATION;
7 : use ::http::HeaderName;
8 : use futures::TryFutureExt;
9 : use postgres_client::config::SslMode;
10 : use tokio::time::Instant;
11 : use tracing::{debug, info, info_span, warn, Instrument};
12 :
13 : use super::super::messages::{ControlPlaneErrorMessage, GetEndpointAccessControl, WakeCompute};
14 : use crate::auth::backend::jwt::AuthRule;
15 : use crate::auth::backend::ComputeUserInfo;
16 : use crate::cache::Cached;
17 : use crate::context::RequestContext;
18 : use crate::control_plane::caches::ApiCaches;
19 : use crate::control_plane::errors::{
20 : ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
21 : };
22 : use crate::control_plane::locks::ApiLocks;
23 : use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason};
24 : use crate::control_plane::{
25 : AccessBlockerFlags, AuthInfo, AuthSecret, CachedAccessBlockerFlags, CachedAllowedIps,
26 : CachedAllowedVpcEndpointIds, CachedNodeInfo, CachedRoleSecret, NodeInfo,
27 : };
28 : use crate::metrics::{CacheOutcome, Metrics};
29 : use crate::rate_limiter::WakeComputeRateLimiter;
30 : use crate::types::{EndpointCacheKey, EndpointId};
31 : use crate::{compute, http, scram};
32 :
33 : pub(crate) const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
34 :
35 : #[derive(Clone)]
36 : pub struct NeonControlPlaneClient {
37 : endpoint: http::Endpoint,
38 : pub caches: &'static ApiCaches,
39 : pub(crate) locks: &'static ApiLocks<EndpointCacheKey>,
40 : pub(crate) wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
41 : // put in a shared ref so we don't copy secrets all over in memory
42 : jwt: Arc<str>,
43 : }
44 :
45 : impl NeonControlPlaneClient {
46 : /// Construct an API object containing the auth parameters.
47 0 : pub fn new(
48 0 : endpoint: http::Endpoint,
49 0 : jwt: Arc<str>,
50 0 : caches: &'static ApiCaches,
51 0 : locks: &'static ApiLocks<EndpointCacheKey>,
52 0 : wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
53 0 : ) -> Self {
54 0 : Self {
55 0 : endpoint,
56 0 : caches,
57 0 : locks,
58 0 : wake_compute_endpoint_rate_limiter,
59 0 : jwt,
60 0 : }
61 0 : }
62 :
63 0 : pub(crate) fn url(&self) -> &str {
64 0 : self.endpoint.url().as_str()
65 0 : }
66 :
67 0 : async fn do_get_auth_info(
68 0 : &self,
69 0 : ctx: &RequestContext,
70 0 : user_info: &ComputeUserInfo,
71 0 : ) -> Result<AuthInfo, GetAuthInfoError> {
72 0 : if !self
73 0 : .caches
74 0 : .endpoints_cache
75 0 : .is_valid(ctx, &user_info.endpoint.normalize())
76 : {
77 : // TODO: refactor this because it's weird
78 : // this is a failure to authenticate but we return Ok.
79 0 : info!("endpoint is not valid, skipping the request");
80 0 : return Ok(AuthInfo::default());
81 0 : }
82 0 : self.do_get_auth_req(user_info, &ctx.session_id(), Some(ctx))
83 0 : .await
84 0 : }
85 :
86 0 : async fn do_get_auth_req(
87 0 : &self,
88 0 : user_info: &ComputeUserInfo,
89 0 : session_id: &uuid::Uuid,
90 0 : ctx: Option<&RequestContext>,
91 0 : ) -> Result<AuthInfo, GetAuthInfoError> {
92 0 : let request_id: String = session_id.to_string();
93 0 : let application_name = if let Some(ctx) = ctx {
94 0 : ctx.console_application_name()
95 : } else {
96 0 : "auth_cancellation".to_string()
97 : };
98 :
99 0 : async {
100 0 : let request = self
101 0 : .endpoint
102 0 : .get_path("get_endpoint_access_control")
103 0 : .header(X_REQUEST_ID, &request_id)
104 0 : .header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
105 0 : .query(&[("session_id", session_id)])
106 0 : .query(&[
107 0 : ("application_name", application_name.as_str()),
108 0 : ("endpointish", user_info.endpoint.as_str()),
109 0 : ("role", user_info.user.as_str()),
110 0 : ])
111 0 : .build()?;
112 :
113 0 : debug!(url = request.url().as_str(), "sending http request");
114 0 : let start = Instant::now();
115 0 : let response = match ctx {
116 0 : Some(ctx) => {
117 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
118 0 : let rsp = self.endpoint.execute(request).await;
119 0 : drop(pause);
120 0 : rsp?
121 : }
122 0 : None => self.endpoint.execute(request).await?,
123 : };
124 :
125 0 : info!(duration = ?start.elapsed(), "received http response");
126 0 : let body = match parse_body::<GetEndpointAccessControl>(response).await {
127 0 : Ok(body) => body,
128 : // Error 404 is special: it's ok not to have a secret.
129 : // TODO(anna): retry
130 0 : Err(e) => {
131 0 : return if e.get_reason().is_not_found() {
132 : // TODO: refactor this because it's weird
133 : // this is a failure to authenticate but we return Ok.
134 0 : Ok(AuthInfo::default())
135 : } else {
136 0 : Err(e.into())
137 : };
138 : }
139 : };
140 :
141 0 : let secret = if body.role_secret.is_empty() {
142 0 : None
143 : } else {
144 0 : let secret = scram::ServerSecret::parse(&body.role_secret)
145 0 : .map(AuthSecret::Scram)
146 0 : .ok_or(GetAuthInfoError::BadSecret)?;
147 0 : Some(secret)
148 : };
149 0 : let allowed_ips = body.allowed_ips.unwrap_or_default();
150 0 : Metrics::get()
151 0 : .proxy
152 0 : .allowed_ips_number
153 0 : .observe(allowed_ips.len() as f64);
154 0 : let allowed_vpc_endpoint_ids = body.allowed_vpc_endpoint_ids.unwrap_or_default();
155 0 : Metrics::get()
156 0 : .proxy
157 0 : .allowed_vpc_endpoint_ids
158 0 : .observe(allowed_vpc_endpoint_ids.len() as f64);
159 0 : let block_public_connections = body.block_public_connections.unwrap_or_default();
160 0 : let block_vpc_connections = body.block_vpc_connections.unwrap_or_default();
161 0 : Ok(AuthInfo {
162 0 : secret,
163 0 : allowed_ips,
164 0 : allowed_vpc_endpoint_ids,
165 0 : project_id: body.project_id,
166 0 : account_id: body.account_id,
167 0 : access_blocker_flags: AccessBlockerFlags {
168 0 : public_access_blocked: block_public_connections,
169 0 : vpc_access_blocked: block_vpc_connections,
170 0 : },
171 0 : })
172 0 : }
173 0 : .inspect_err(|e| tracing::debug!(error = ?e))
174 0 : .instrument(info_span!("do_get_auth_info"))
175 0 : .await
176 0 : }
177 :
178 0 : async fn do_get_endpoint_jwks(
179 0 : &self,
180 0 : ctx: &RequestContext,
181 0 : endpoint: EndpointId,
182 0 : ) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
183 0 : if !self
184 0 : .caches
185 0 : .endpoints_cache
186 0 : .is_valid(ctx, &endpoint.normalize())
187 : {
188 0 : return Err(GetEndpointJwksError::EndpointNotFound);
189 0 : }
190 0 : let request_id = ctx.session_id().to_string();
191 0 : async {
192 0 : let request = self
193 0 : .endpoint
194 0 : .get_with_url(|url| {
195 0 : url.path_segments_mut()
196 0 : .push("endpoints")
197 0 : .push(endpoint.as_str())
198 0 : .push("jwks");
199 0 : })
200 0 : .header(X_REQUEST_ID, &request_id)
201 0 : .header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
202 0 : .query(&[("session_id", ctx.session_id())])
203 0 : .build()
204 0 : .map_err(GetEndpointJwksError::RequestBuild)?;
205 :
206 0 : debug!(url = request.url().as_str(), "sending http request");
207 0 : let start = Instant::now();
208 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
209 0 : let response = self
210 0 : .endpoint
211 0 : .execute(request)
212 0 : .await
213 0 : .map_err(GetEndpointJwksError::RequestExecute)?;
214 0 : drop(pause);
215 0 : info!(duration = ?start.elapsed(), "received http response");
216 :
217 0 : let body = parse_body::<EndpointJwksResponse>(response).await?;
218 :
219 0 : let rules = body
220 0 : .jwks
221 0 : .into_iter()
222 0 : .map(|jwks| AuthRule {
223 0 : id: jwks.id,
224 0 : jwks_url: jwks.jwks_url,
225 0 : audience: jwks.jwt_audience,
226 0 : role_names: jwks.role_names,
227 0 : })
228 0 : .collect();
229 0 :
230 0 : Ok(rules)
231 0 : }
232 0 : .inspect_err(|e| tracing::debug!(error = ?e))
233 0 : .instrument(info_span!("do_get_endpoint_jwks"))
234 0 : .await
235 0 : }
236 :
237 0 : async fn do_wake_compute(
238 0 : &self,
239 0 : ctx: &RequestContext,
240 0 : user_info: &ComputeUserInfo,
241 0 : ) -> Result<NodeInfo, WakeComputeError> {
242 0 : let request_id = ctx.session_id().to_string();
243 0 : let application_name = ctx.console_application_name();
244 0 : async {
245 0 : let mut request_builder = self
246 0 : .endpoint
247 0 : .get_path("wake_compute")
248 0 : .header("X-Request-ID", &request_id)
249 0 : .header("Authorization", format!("Bearer {}", &self.jwt))
250 0 : .query(&[("session_id", ctx.session_id())])
251 0 : .query(&[
252 0 : ("application_name", application_name.as_str()),
253 0 : ("endpointish", user_info.endpoint.as_str()),
254 0 : ]);
255 0 :
256 0 : let options = user_info.options.to_deep_object();
257 0 : if !options.is_empty() {
258 0 : request_builder = request_builder.query(&options);
259 0 : }
260 :
261 0 : let request = request_builder.build()?;
262 :
263 0 : debug!(url = request.url().as_str(), "sending http request");
264 0 : let start = Instant::now();
265 0 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
266 0 : let response = self.endpoint.execute(request).await?;
267 0 : drop(pause);
268 0 : info!(duration = ?start.elapsed(), "received http response");
269 0 : let body = parse_body::<WakeCompute>(response).await?;
270 :
271 : // Unfortunately, ownership won't let us use `Option::ok_or` here.
272 0 : let (host, port) = match parse_host_port(&body.address) {
273 0 : None => return Err(WakeComputeError::BadComputeAddress(body.address)),
274 0 : Some(x) => x,
275 0 : };
276 0 :
277 0 : // Don't set anything but host and port! This config will be cached.
278 0 : // We'll set username and such later using the startup message.
279 0 : // TODO: add more type safety (in progress).
280 0 : let mut config = compute::ConnCfg::new(host.to_owned(), port);
281 0 : config.ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes.
282 0 :
283 0 : let node = NodeInfo {
284 0 : config,
285 0 : aux: body.aux,
286 0 : };
287 0 :
288 0 : Ok(node)
289 0 : }
290 0 : .inspect_err(|e| tracing::debug!(error = ?e))
291 0 : .instrument(info_span!("do_wake_compute"))
292 0 : .await
293 0 : }
294 : }
295 :
296 : impl super::ControlPlaneApi for NeonControlPlaneClient {
297 : #[tracing::instrument(skip_all)]
298 : async fn get_role_secret(
299 : &self,
300 : ctx: &RequestContext,
301 : user_info: &ComputeUserInfo,
302 : ) -> Result<CachedRoleSecret, GetAuthInfoError> {
303 : let normalized_ep = &user_info.endpoint.normalize();
304 : let user = &user_info.user;
305 : if let Some(role_secret) = self
306 : .caches
307 : .project_info
308 : .get_role_secret(normalized_ep, user)
309 : {
310 : return Ok(role_secret);
311 : }
312 : let auth_info = self.do_get_auth_info(ctx, user_info).await?;
313 : let account_id = auth_info.account_id;
314 : if let Some(project_id) = auth_info.project_id {
315 : let normalized_ep_int = normalized_ep.into();
316 : self.caches.project_info.insert_role_secret(
317 : project_id,
318 : normalized_ep_int,
319 : user.into(),
320 : auth_info.secret.clone(),
321 : );
322 : self.caches.project_info.insert_allowed_ips(
323 : project_id,
324 : normalized_ep_int,
325 : Arc::new(auth_info.allowed_ips),
326 : );
327 : self.caches.project_info.insert_allowed_vpc_endpoint_ids(
328 : account_id,
329 : project_id,
330 : normalized_ep_int,
331 : Arc::new(auth_info.allowed_vpc_endpoint_ids),
332 : );
333 : self.caches.project_info.insert_block_public_or_vpc_access(
334 : project_id,
335 : normalized_ep_int,
336 : auth_info.access_blocker_flags,
337 : );
338 : ctx.set_project_id(project_id);
339 : }
340 : // When we just got a secret, we don't need to invalidate it.
341 : Ok(Cached::new_uncached(auth_info.secret))
342 : }
343 :
344 0 : async fn get_allowed_ips(
345 0 : &self,
346 0 : ctx: &RequestContext,
347 0 : user_info: &ComputeUserInfo,
348 0 : ) -> Result<CachedAllowedIps, GetAuthInfoError> {
349 0 : let normalized_ep = &user_info.endpoint.normalize();
350 0 : if let Some(allowed_ips) = self.caches.project_info.get_allowed_ips(normalized_ep) {
351 0 : Metrics::get()
352 0 : .proxy
353 0 : .allowed_ips_cache_misses // TODO SR: Should we rename this variable to something like allowed_ip_cache_stats?
354 0 : .inc(CacheOutcome::Hit);
355 0 : return Ok(allowed_ips);
356 0 : }
357 0 : Metrics::get()
358 0 : .proxy
359 0 : .allowed_ips_cache_misses
360 0 : .inc(CacheOutcome::Miss);
361 0 : let auth_info = self.do_get_auth_info(ctx, user_info).await?;
362 0 : let allowed_ips = Arc::new(auth_info.allowed_ips);
363 0 : let allowed_vpc_endpoint_ids = Arc::new(auth_info.allowed_vpc_endpoint_ids);
364 0 : let access_blocker_flags = auth_info.access_blocker_flags;
365 0 : let user = &user_info.user;
366 0 : let account_id = auth_info.account_id;
367 0 : if let Some(project_id) = auth_info.project_id {
368 0 : let normalized_ep_int = normalized_ep.into();
369 0 : self.caches.project_info.insert_role_secret(
370 0 : project_id,
371 0 : normalized_ep_int,
372 0 : user.into(),
373 0 : auth_info.secret.clone(),
374 0 : );
375 0 : self.caches.project_info.insert_allowed_ips(
376 0 : project_id,
377 0 : normalized_ep_int,
378 0 : allowed_ips.clone(),
379 0 : );
380 0 : self.caches.project_info.insert_allowed_vpc_endpoint_ids(
381 0 : account_id,
382 0 : project_id,
383 0 : normalized_ep_int,
384 0 : allowed_vpc_endpoint_ids.clone(),
385 0 : );
386 0 : self.caches.project_info.insert_block_public_or_vpc_access(
387 0 : project_id,
388 0 : normalized_ep_int,
389 0 : access_blocker_flags,
390 0 : );
391 0 : ctx.set_project_id(project_id);
392 0 : }
393 0 : Ok(Cached::new_uncached(allowed_ips))
394 0 : }
395 :
396 0 : async fn get_allowed_vpc_endpoint_ids(
397 0 : &self,
398 0 : ctx: &RequestContext,
399 0 : user_info: &ComputeUserInfo,
400 0 : ) -> Result<CachedAllowedVpcEndpointIds, GetAuthInfoError> {
401 0 : let normalized_ep = &user_info.endpoint.normalize();
402 0 : if let Some(allowed_vpc_endpoint_ids) = self
403 0 : .caches
404 0 : .project_info
405 0 : .get_allowed_vpc_endpoint_ids(normalized_ep)
406 : {
407 0 : Metrics::get()
408 0 : .proxy
409 0 : .vpc_endpoint_id_cache_stats
410 0 : .inc(CacheOutcome::Hit);
411 0 : return Ok(allowed_vpc_endpoint_ids);
412 0 : }
413 0 :
414 0 : Metrics::get()
415 0 : .proxy
416 0 : .vpc_endpoint_id_cache_stats
417 0 : .inc(CacheOutcome::Miss);
418 :
419 0 : let auth_info = self.do_get_auth_info(ctx, user_info).await?;
420 0 : let allowed_ips = Arc::new(auth_info.allowed_ips);
421 0 : let allowed_vpc_endpoint_ids = Arc::new(auth_info.allowed_vpc_endpoint_ids);
422 0 : let access_blocker_flags = auth_info.access_blocker_flags;
423 0 : let user = &user_info.user;
424 0 : let account_id = auth_info.account_id;
425 0 : if let Some(project_id) = auth_info.project_id {
426 0 : let normalized_ep_int = normalized_ep.into();
427 0 : self.caches.project_info.insert_role_secret(
428 0 : project_id,
429 0 : normalized_ep_int,
430 0 : user.into(),
431 0 : auth_info.secret.clone(),
432 0 : );
433 0 : self.caches.project_info.insert_allowed_ips(
434 0 : project_id,
435 0 : normalized_ep_int,
436 0 : allowed_ips.clone(),
437 0 : );
438 0 : self.caches.project_info.insert_allowed_vpc_endpoint_ids(
439 0 : account_id,
440 0 : project_id,
441 0 : normalized_ep_int,
442 0 : allowed_vpc_endpoint_ids.clone(),
443 0 : );
444 0 : self.caches.project_info.insert_block_public_or_vpc_access(
445 0 : project_id,
446 0 : normalized_ep_int,
447 0 : access_blocker_flags,
448 0 : );
449 0 : ctx.set_project_id(project_id);
450 0 : }
451 0 : Ok(Cached::new_uncached(allowed_vpc_endpoint_ids))
452 0 : }
453 :
454 0 : async fn get_block_public_or_vpc_access(
455 0 : &self,
456 0 : ctx: &RequestContext,
457 0 : user_info: &ComputeUserInfo,
458 0 : ) -> Result<CachedAccessBlockerFlags, GetAuthInfoError> {
459 0 : let normalized_ep = &user_info.endpoint.normalize();
460 0 : if let Some(access_blocker_flags) = self
461 0 : .caches
462 0 : .project_info
463 0 : .get_block_public_or_vpc_access(normalized_ep)
464 : {
465 0 : Metrics::get()
466 0 : .proxy
467 0 : .access_blocker_flags_cache_stats
468 0 : .inc(CacheOutcome::Hit);
469 0 : return Ok(access_blocker_flags);
470 0 : }
471 0 :
472 0 : Metrics::get()
473 0 : .proxy
474 0 : .access_blocker_flags_cache_stats
475 0 : .inc(CacheOutcome::Miss);
476 :
477 0 : let auth_info = self.do_get_auth_info(ctx, user_info).await?;
478 0 : let allowed_ips = Arc::new(auth_info.allowed_ips);
479 0 : let allowed_vpc_endpoint_ids = Arc::new(auth_info.allowed_vpc_endpoint_ids);
480 0 : let access_blocker_flags = auth_info.access_blocker_flags;
481 0 : let user = &user_info.user;
482 0 : let account_id = auth_info.account_id;
483 0 : if let Some(project_id) = auth_info.project_id {
484 0 : let normalized_ep_int = normalized_ep.into();
485 0 : self.caches.project_info.insert_role_secret(
486 0 : project_id,
487 0 : normalized_ep_int,
488 0 : user.into(),
489 0 : auth_info.secret.clone(),
490 0 : );
491 0 : self.caches.project_info.insert_allowed_ips(
492 0 : project_id,
493 0 : normalized_ep_int,
494 0 : allowed_ips.clone(),
495 0 : );
496 0 : self.caches.project_info.insert_allowed_vpc_endpoint_ids(
497 0 : account_id,
498 0 : project_id,
499 0 : normalized_ep_int,
500 0 : allowed_vpc_endpoint_ids.clone(),
501 0 : );
502 0 : self.caches.project_info.insert_block_public_or_vpc_access(
503 0 : project_id,
504 0 : normalized_ep_int,
505 0 : access_blocker_flags.clone(),
506 0 : );
507 0 : ctx.set_project_id(project_id);
508 0 : }
509 0 : Ok(Cached::new_uncached(access_blocker_flags))
510 0 : }
511 :
512 : #[tracing::instrument(skip_all)]
513 : async fn get_endpoint_jwks(
514 : &self,
515 : ctx: &RequestContext,
516 : endpoint: EndpointId,
517 : ) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
518 : self.do_get_endpoint_jwks(ctx, endpoint).await
519 : }
520 :
521 : #[tracing::instrument(skip_all)]
522 : async fn wake_compute(
523 : &self,
524 : ctx: &RequestContext,
525 : user_info: &ComputeUserInfo,
526 : ) -> Result<CachedNodeInfo, WakeComputeError> {
527 : let key = user_info.endpoint_cache_key();
528 :
529 : macro_rules! check_cache {
530 : () => {
531 : if let Some(cached) = self.caches.node_info.get(&key) {
532 : let (cached, info) = cached.take_value();
533 0 : let info = info.map_err(|c| {
534 0 : info!(key = &*key, "found cached wake_compute error");
535 0 : WakeComputeError::ControlPlane(ControlPlaneError::Message(Box::new(*c)))
536 0 : })?;
537 :
538 : debug!(key = &*key, "found cached compute node info");
539 : ctx.set_project(info.aux.clone());
540 0 : return Ok(cached.map(|()| info));
541 : }
542 : };
543 : }
544 :
545 : // Every time we do a wakeup http request, the compute node will stay up
546 : // for some time (highly depends on the console's scale-to-zero policy);
547 : // The connection info remains the same during that period of time,
548 : // which means that we might cache it to reduce the load and latency.
549 : check_cache!();
550 :
551 : let permit = self.locks.get_permit(&key).await?;
552 :
553 : // after getting back a permit - it's possible the cache was filled
554 : // double check
555 : if permit.should_check_cache() {
556 : // TODO: if there is something in the cache, mark the permit as success.
557 : check_cache!();
558 : }
559 :
560 : // check rate limit
561 : if !self
562 : .wake_compute_endpoint_rate_limiter
563 : .check(user_info.endpoint.normalize_intern(), 1)
564 : {
565 : return Err(WakeComputeError::TooManyConnections);
566 : }
567 :
568 : let node = permit.release_result(self.do_wake_compute(ctx, user_info).await);
569 : match node {
570 : Ok(node) => {
571 : ctx.set_project(node.aux.clone());
572 : debug!(key = &*key, "created a cache entry for woken compute node");
573 :
574 : let mut stored_node = node.clone();
575 : // store the cached node as 'warm_cached'
576 : stored_node.aux.cold_start_info = ColdStartInfo::WarmCached;
577 :
578 : let (_, cached) = self.caches.node_info.insert_unit(key, Ok(stored_node));
579 :
580 0 : Ok(cached.map(|()| node))
581 : }
582 : Err(err) => match err {
583 : WakeComputeError::ControlPlane(ControlPlaneError::Message(err)) => {
584 : let Some(status) = &err.status else {
585 : return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
586 : err,
587 : )));
588 : };
589 :
590 : let reason = status
591 : .details
592 : .error_info
593 0 : .map_or(Reason::Unknown, |x| x.reason);
594 :
595 : // if we can retry this error, do not cache it.
596 : if reason.can_retry() {
597 : return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
598 : err,
599 : )));
600 : }
601 :
602 : // at this point, we should only have quota errors.
603 : debug!(
604 : key = &*key,
605 : "created a cache entry for the wake compute error"
606 : );
607 :
608 : self.caches.node_info.insert_ttl(
609 : key,
610 : Err(err.clone()),
611 : Duration::from_secs(30),
612 : );
613 :
614 : Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
615 : err,
616 : )))
617 : }
618 : err => return Err(err),
619 : },
620 : }
621 : }
622 : }
623 :
624 : /// Parse http response body, taking status code into account.
625 0 : async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
626 0 : response: http::Response,
627 0 : ) -> Result<T, ControlPlaneError> {
628 0 : let status = response.status();
629 0 : if status.is_success() {
630 : // We shouldn't log raw body because it may contain secrets.
631 0 : info!("request succeeded, processing the body");
632 0 : return Ok(response.json().await?);
633 0 : }
634 0 : let s = response.bytes().await?;
635 : // Log plaintext to be able to detect, whether there are some cases not covered by the error struct.
636 0 : info!("response_error plaintext: {:?}", s);
637 :
638 : // Don't throw an error here because it's not as important
639 : // as the fact that the request itself has failed.
640 0 : let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| {
641 0 : warn!("failed to parse error body: {e}");
642 0 : ControlPlaneErrorMessage {
643 0 : error: "reason unclear (malformed error message)".into(),
644 0 : http_status_code: status,
645 0 : status: None,
646 0 : }
647 0 : });
648 0 : body.http_status_code = status;
649 0 :
650 0 : warn!("console responded with an error ({status}): {body:?}");
651 0 : Err(ControlPlaneError::Message(Box::new(body)))
652 0 : }
653 :
654 3 : fn parse_host_port(input: &str) -> Option<(&str, u16)> {
655 3 : let (host, port) = input.rsplit_once(':')?;
656 3 : let ipv6_brackets: &[_] = &['[', ']'];
657 3 : Some((host.trim_matches(ipv6_brackets), port.parse().ok()?))
658 3 : }
659 :
660 : #[cfg(test)]
661 : mod tests {
662 : use super::*;
663 :
664 : #[test]
665 1 : fn test_parse_host_port_v4() {
666 1 : let (host, port) = parse_host_port("127.0.0.1:5432").expect("failed to parse");
667 1 : assert_eq!(host, "127.0.0.1");
668 1 : assert_eq!(port, 5432);
669 1 : }
670 :
671 : #[test]
672 1 : fn test_parse_host_port_v6() {
673 1 : let (host, port) = parse_host_port("[2001:db8::1]:5432").expect("failed to parse");
674 1 : assert_eq!(host, "2001:db8::1");
675 1 : assert_eq!(port, 5432);
676 1 : }
677 :
678 : #[test]
679 1 : fn test_parse_host_port_url() {
680 1 : let (host, port) = parse_host_port("compute-foo-bar-1234.default.svc.cluster.local:5432")
681 1 : .expect("failed to parse");
682 1 : assert_eq!(host, "compute-foo-bar-1234.default.svc.cluster.local");
683 1 : assert_eq!(port, 5432);
684 1 : }
685 : }
|