Line data Source code
1 : use std::fs::File;
2 : use std::thread;
3 : use std::{path::Path, sync::Arc};
4 :
5 : use anyhow::Result;
6 : use compute_api::responses::{ComputeConfig, ComputeStatus};
7 : use tracing::{error, info, instrument};
8 :
9 : use crate::compute::{ComputeNode, ParsedSpec};
10 : use crate::spec::get_config_from_control_plane;
11 :
12 : #[instrument(skip_all)]
13 : fn configurator_main_loop(compute: &Arc<ComputeNode>) {
14 : info!("waiting for reconfiguration requests");
15 : loop {
16 : let mut state = compute.state.lock().unwrap();
17 : /* BEGIN_HADRON */
18 : // RefreshConfiguration should only be used inside the loop
19 : assert_ne!(state.status, ComputeStatus::RefreshConfiguration);
20 : /* END_HADRON */
21 :
22 : if compute.params.lakebase_mode {
23 : while state.status != ComputeStatus::ConfigurationPending
24 : && state.status != ComputeStatus::RefreshConfigurationPending
25 : && state.status != ComputeStatus::Failed
26 : {
27 : info!("configurator: compute status: {:?}, sleeping", state.status);
28 : state = compute.state_changed.wait(state).unwrap();
29 : }
30 : } else {
31 : // We have to re-check the status after re-acquiring the lock because it could be that
32 : // the status has changed while we were waiting for the lock, and we might not need to
33 : // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
34 : // we are waiting for a condition variable that will never be signaled.
35 : if state.status != ComputeStatus::ConfigurationPending {
36 : state = compute.state_changed.wait(state).unwrap();
37 : }
38 : }
39 :
40 : // Re-check the status after waking up
41 : if state.status == ComputeStatus::ConfigurationPending {
42 : info!("got configuration request");
43 : state.set_status(ComputeStatus::Configuration, &compute.state_changed);
44 : drop(state);
45 :
46 : let mut new_status = ComputeStatus::Failed;
47 : if let Err(e) = compute.reconfigure() {
48 : error!("could not configure compute node: {}", e);
49 : } else {
50 : new_status = ComputeStatus::Running;
51 : info!("compute node configured");
52 : }
53 :
54 : // XXX: used to test that API is blocking
55 : // std::thread::sleep(std::time::Duration::from_millis(10000));
56 :
57 : compute.set_status(new_status);
58 : } else if state.status == ComputeStatus::RefreshConfigurationPending {
59 : info!(
60 : "compute node suspects its configuration is out of date, now refreshing configuration"
61 : );
62 : state.set_status(ComputeStatus::RefreshConfiguration, &compute.state_changed);
63 : // Drop the lock guard here to avoid holding the lock while downloading config from the control plane / HCC.
64 : // This is the only thread that can move compute_ctl out of the `RefreshConfiguration` state, so it
65 : // is safe to drop the lock like this.
66 : drop(state);
67 :
68 : let get_config_result: anyhow::Result<ComputeConfig> =
69 : if let Some(config_path) = &compute.params.config_path_test_only {
70 : // This path is only to make testing easier. In production we always get the config from the HCC.
71 : info!(
72 : "reloading config.json from path: {}",
73 : config_path.to_string_lossy()
74 : );
75 : let path = Path::new(config_path);
76 : if let Ok(file) = File::open(path) {
77 : match serde_json::from_reader::<File, ComputeConfig>(file) {
78 : Ok(config) => Ok(config),
79 : Err(e) => {
80 : error!("could not parse config file: {}", e);
81 : Err(anyhow::anyhow!("could not parse config file: {}", e))
82 : }
83 : }
84 : } else {
85 : error!(
86 : "could not open config file at path: {:?}",
87 : config_path.to_string_lossy()
88 : );
89 : Err(anyhow::anyhow!(
90 : "could not open config file at path: {}",
91 : config_path.to_string_lossy()
92 : ))
93 : }
94 : } else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
95 : get_config_from_control_plane(control_plane_uri, &compute.params.compute_id)
96 : } else {
97 : Err(anyhow::anyhow!("config_path_test_only is not set"))
98 : };
99 :
100 : // Parse any received ComputeSpec and transpose the result into a Result<Option<ParsedSpec>>.
101 : let parsed_spec_result: Result<Option<ParsedSpec>> =
102 0 : get_config_result.and_then(|config| {
103 0 : if let Some(spec) = config.spec {
104 0 : if let Ok(pspec) = ParsedSpec::try_from(spec) {
105 0 : Ok(Some(pspec))
106 : } else {
107 0 : Err(anyhow::anyhow!("could not parse spec"))
108 : }
109 : } else {
110 0 : Ok(None)
111 : }
112 0 : });
113 :
114 : let new_status: ComputeStatus;
115 : match parsed_spec_result {
116 : // Control plane (HCM) returned a spec and we were able to parse it.
117 : Ok(Some(pspec)) => {
118 : {
119 : let mut state = compute.state.lock().unwrap();
120 : // Defensive programming to make sure this thread is indeed the only one that can move the compute
121 : // node out of the `RefreshConfiguration` state. Would be nice if we can encode this invariant
122 : // into the type system.
123 : assert_eq!(state.status, ComputeStatus::RefreshConfiguration);
124 :
125 0 : if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone())
126 : == Some(pspec.pageserver_connstr.clone())
127 : {
128 : info!(
129 : "Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration."
130 : );
131 : state.status = ComputeStatus::Running;
132 : compute.state_changed.notify_all();
133 : drop(state);
134 : std::thread::sleep(std::time::Duration::from_secs(5));
135 : continue;
136 : }
137 : // state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
138 : // the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
139 : // "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
140 : // but it's not worth forking the codebase too much for this minor point alone right now.
141 : state.pspec = Some(pspec);
142 : }
143 : match compute.reconfigure() {
144 : Ok(_) => {
145 : info!("Refresh configuration: compute node configured");
146 : new_status = ComputeStatus::Running;
147 : }
148 : Err(e) => {
149 : error!(
150 : "Refresh configuration: could not configure compute node: {}",
151 : e
152 : );
153 : // Set the compute node back to the `RefreshConfigurationPending` state if the configuration
154 : // was not successful. It should be okay to treat this situation the same as if the loop
155 : // hasn't executed yet as long as the detection side keeps notifying.
156 : new_status = ComputeStatus::RefreshConfigurationPending;
157 : }
158 : }
159 : }
160 : // Control plane (HCM)'s response does not contain a spec. This is the "Empty" attachment case.
161 : Ok(None) => {
162 : info!(
163 : "Compute Manager signaled that this compute is no longer attached to any storage. Exiting."
164 : );
165 : // We just immediately terminate the whole compute_ctl in this case. It's not necessary to attempt a
166 : // clean shutdown as Postgres is probably not responding anyway (which is why we are in this refresh
167 : // configuration state).
168 : std::process::exit(1);
169 : }
170 : // Various error cases:
171 : // - The request to the control plane (HCM) either failed or returned a malformed spec.
172 : // - compute_ctl itself is configured incorrectly (e.g., compute_id is not set).
173 : Err(e) => {
174 : error!(
175 : "Refresh configuration: error getting a parsed spec: {:?}",
176 : e
177 : );
178 : new_status = ComputeStatus::RefreshConfigurationPending;
179 : // We may be dealing with an overloaded HCM if we end up in this path. Backoff 5 seconds before
180 : // retrying to avoid hammering the HCM.
181 : std::thread::sleep(std::time::Duration::from_secs(5));
182 : }
183 : }
184 : compute.set_status(new_status);
185 : } else if state.status == ComputeStatus::Failed {
186 : info!("compute node is now in Failed state, exiting");
187 : break;
188 : } else {
189 : info!("woken up for compute status: {:?}, sleeping", state.status);
190 : }
191 : }
192 : }
193 :
194 0 : pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
195 0 : let compute = Arc::clone(compute);
196 :
197 0 : let runtime = tokio::runtime::Handle::current();
198 :
199 0 : thread::Builder::new()
200 0 : .name("compute-configurator".into())
201 0 : .spawn(move || {
202 0 : let _rt_guard = runtime.enter();
203 0 : configurator_main_loop(&compute);
204 0 : info!("configurator thread is exited");
205 0 : })
206 0 : .expect("cannot launch configurator thread")
207 0 : }
|