Line data Source code
1 : use async_trait::async_trait;
2 : use tracing::{error, info};
3 :
4 : use crate::config::RetryConfig;
5 : use crate::context::RequestContext;
6 : use crate::control_plane::CachedNodeInfo;
7 : use crate::control_plane::errors::{ControlPlaneError, WakeComputeError};
8 : use crate::error::ReportableError;
9 : use crate::metrics::{
10 : ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
11 : };
12 : use crate::proxy::retry::{retry_after, should_retry};
13 :
14 : // Use macro to retain original callsite.
15 : macro_rules! log_wake_compute_error {
16 : (error = ?$error:expr, $num_retries:expr, retriable = $retriable:literal) => {
17 : match $error {
18 : WakeComputeError::ControlPlane(ControlPlaneError::Message(_)) => {
19 : info!(error = ?$error, num_retries = $num_retries, retriable = $retriable, "couldn't wake compute node")
20 : }
21 : _ => error!(error = ?$error, num_retries = $num_retries, retriable = $retriable, "couldn't wake compute node"),
22 : }
23 : };
24 : }
25 :
26 : #[async_trait]
27 : pub(crate) trait WakeComputeBackend {
28 : async fn wake_compute(&self, ctx: &RequestContext) -> Result<CachedNodeInfo, WakeComputeError>;
29 : }
30 :
31 19 : pub(crate) async fn wake_compute<B: WakeComputeBackend>(
32 19 : num_retries: &mut u32,
33 19 : ctx: &RequestContext,
34 19 : api: &B,
35 19 : config: RetryConfig,
36 19 : ) -> Result<CachedNodeInfo, WakeComputeError> {
37 : loop {
38 21 : match api.wake_compute(ctx).await {
39 3 : Err(e) if !should_retry(&e, *num_retries, config) => {
40 1 : log_wake_compute_error!(error = ?e, num_retries, retriable = false);
41 1 : report_error(&e, false);
42 1 : Metrics::get().proxy.retries_metric.observe(
43 1 : RetriesMetricGroup {
44 1 : outcome: ConnectOutcome::Failed,
45 1 : retry_type: RetryType::WakeCompute,
46 1 : },
47 1 : (*num_retries).into(),
48 : );
49 1 : return Err(e);
50 : }
51 2 : Err(e) => {
52 2 : log_wake_compute_error!(error = ?e, num_retries, retriable = true);
53 2 : report_error(&e, true);
54 : }
55 18 : Ok(n) => {
56 18 : Metrics::get().proxy.retries_metric.observe(
57 18 : RetriesMetricGroup {
58 18 : outcome: ConnectOutcome::Success,
59 18 : retry_type: RetryType::WakeCompute,
60 18 : },
61 18 : (*num_retries).into(),
62 : );
63 : // TODO: is this necessary? We have a metric.
64 : // TODO: this log line is misleading as "wake_compute" might return cached (and stale) info.
65 18 : info!(?num_retries, "compute node woken up after");
66 18 : return Ok(n);
67 : }
68 : }
69 :
70 2 : let wait_duration = retry_after(*num_retries, config);
71 2 : *num_retries += 1;
72 2 : let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);
73 2 : tokio::time::sleep(wait_duration).await;
74 2 : drop(pause);
75 : }
76 19 : }
77 :
78 3 : fn report_error(e: &WakeComputeError, retry: bool) {
79 3 : let kind = e.get_error_kind();
80 :
81 3 : Metrics::get()
82 3 : .proxy
83 3 : .connection_failures_breakdown
84 3 : .inc(ConnectionFailuresBreakdownGroup {
85 3 : kind,
86 3 : retry: retry.into(),
87 3 : });
88 3 : }
|