Line data Source code
1 : use std::fs::File;
2 : use std::thread;
3 : use std::{path::Path, sync::Arc};
4 :
5 : use anyhow::Result;
6 : use compute_api::responses::{ComputeConfig, ComputeStatus};
7 : use tracing::{error, info, instrument};
8 :
9 : use crate::compute::{ComputeNode, ParsedSpec};
10 : use crate::spec::get_config_from_control_plane;
11 :
12 : #[instrument(skip_all)]
13 : fn configurator_main_loop(compute: &Arc<ComputeNode>) {
14 : info!("waiting for reconfiguration requests");
15 : loop {
16 : let mut state = compute.state.lock().unwrap();
17 : /* BEGIN_HADRON */
18 : // RefreshConfiguration should only be used inside the loop
19 : assert_ne!(state.status, ComputeStatus::RefreshConfiguration);
20 : /* END_HADRON */
21 :
22 : if compute.params.lakebase_mode {
23 : while state.status != ComputeStatus::ConfigurationPending
24 : && state.status != ComputeStatus::RefreshConfigurationPending
25 : && state.status != ComputeStatus::Failed
26 : {
27 : info!("configurator: compute status: {:?}, sleeping", state.status);
28 : state = compute.state_changed.wait(state).unwrap();
29 : }
30 : } else {
31 : // We have to re-check the status after re-acquiring the lock because it could be that
32 : // the status has changed while we were waiting for the lock, and we might not need to
33 : // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
34 : // we are waiting for a condition variable that will never be signaled.
35 : if state.status != ComputeStatus::ConfigurationPending {
36 : state = compute.state_changed.wait(state).unwrap();
37 : }
38 : }
39 :
40 : // Re-check the status after waking up
41 : if state.status == ComputeStatus::ConfigurationPending {
42 : info!("got configuration request");
43 : state.set_status(ComputeStatus::Configuration, &compute.state_changed);
44 : drop(state);
45 :
46 : let mut new_status = ComputeStatus::Failed;
47 : if let Err(e) = compute.reconfigure() {
48 : error!("could not configure compute node: {}", e);
49 : } else {
50 : new_status = ComputeStatus::Running;
51 : info!("compute node configured");
52 : }
53 :
54 : // XXX: used to test that API is blocking
55 : // std::thread::sleep(std::time::Duration::from_millis(10000));
56 :
57 : compute.set_status(new_status);
58 : } else if state.status == ComputeStatus::RefreshConfigurationPending {
59 : info!(
60 : "compute node suspects its configuration is out of date, now refreshing configuration"
61 : );
62 : state.set_status(ComputeStatus::RefreshConfiguration, &compute.state_changed);
63 : // Drop the lock guard here to avoid holding the lock while downloading config from the control plane / HCC.
64 : // This is the only thread that can move compute_ctl out of the `RefreshConfiguration` state, so it
65 : // is safe to drop the lock like this.
66 : drop(state);
67 :
68 : let get_config_result: anyhow::Result<ComputeConfig> =
69 : if let Some(config_path) = &compute.params.config_path_test_only {
70 : // This path is only to make testing easier. In production we always get the config from the HCC.
71 : info!(
72 : "reloading config.json from path: {}",
73 : config_path.to_string_lossy()
74 : );
75 : let path = Path::new(config_path);
76 : if let Ok(file) = File::open(path) {
77 : match serde_json::from_reader::<File, ComputeConfig>(file) {
78 : Ok(config) => Ok(config),
79 : Err(e) => {
80 : error!("could not parse config file: {}", e);
81 : Err(anyhow::anyhow!("could not parse config file: {}", e))
82 : }
83 : }
84 : } else {
85 : error!(
86 : "could not open config file at path: {:?}",
87 : config_path.to_string_lossy()
88 : );
89 : Err(anyhow::anyhow!(
90 : "could not open config file at path: {}",
91 : config_path.to_string_lossy()
92 : ))
93 : }
94 : } else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
95 : get_config_from_control_plane(control_plane_uri, &compute.params.compute_id)
96 : } else {
97 : Err(anyhow::anyhow!("config_path_test_only is not set"))
98 : };
99 :
100 : // Parse any received ComputeSpec and transpose the result into a Result<Option<ParsedSpec>>.
101 : let parsed_spec_result: Result<Option<ParsedSpec>> =
102 0 : get_config_result.and_then(|config| {
103 0 : if let Some(spec) = config.spec {
104 0 : if let Ok(pspec) = ParsedSpec::try_from(spec) {
105 0 : Ok(Some(pspec))
106 : } else {
107 0 : Err(anyhow::anyhow!("could not parse spec"))
108 : }
109 : } else {
110 0 : Ok(None)
111 : }
112 0 : });
113 :
114 : let new_status: ComputeStatus;
115 : match parsed_spec_result {
116 : // Control plane (HCM) returned a spec and we were able to parse it.
117 : Ok(Some(pspec)) => {
118 : {
119 : let mut state = compute.state.lock().unwrap();
120 : // Defensive programming to make sure this thread is indeed the only one that can move the compute
121 : // node out of the `RefreshConfiguration` state. Would be nice if we can encode this invariant
122 : // into the type system.
123 : assert_eq!(state.status, ComputeStatus::RefreshConfiguration);
124 :
125 : if state
126 : .pspec
127 : .as_ref()
128 0 : .map(|ps| ps.pageserver_conninfo.clone())
129 : == Some(pspec.pageserver_conninfo.clone())
130 : {
131 : info!(
132 : "Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration."
133 : );
134 : state.status = ComputeStatus::Running;
135 : compute.state_changed.notify_all();
136 : drop(state);
137 : std::thread::sleep(std::time::Duration::from_secs(5));
138 : continue;
139 : }
140 : // state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
141 : // the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
142 : // "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
143 : // but it's not worth forking the codebase too much for this minor point alone right now.
144 : state.pspec = Some(pspec);
145 : }
146 : match compute.reconfigure() {
147 : Ok(_) => {
148 : info!("Refresh configuration: compute node configured");
149 : new_status = ComputeStatus::Running;
150 : }
151 : Err(e) => {
152 : error!(
153 : "Refresh configuration: could not configure compute node: {}",
154 : e
155 : );
156 : // Set the compute node back to the `RefreshConfigurationPending` state if the configuration
157 : // was not successful. It should be okay to treat this situation the same as if the loop
158 : // hasn't executed yet as long as the detection side keeps notifying.
159 : new_status = ComputeStatus::RefreshConfigurationPending;
160 : }
161 : }
162 : }
163 : // Control plane (HCM)'s response does not contain a spec. This is the "Empty" attachment case.
164 : Ok(None) => {
165 : info!(
166 : "Compute Manager signaled that this compute is no longer attached to any storage. Exiting."
167 : );
168 : // We just immediately terminate the whole compute_ctl in this case. It's not necessary to attempt a
169 : // clean shutdown as Postgres is probably not responding anyway (which is why we are in this refresh
170 : // configuration state).
171 : std::process::exit(1);
172 : }
173 : // Various error cases:
174 : // - The request to the control plane (HCM) either failed or returned a malformed spec.
175 : // - compute_ctl itself is configured incorrectly (e.g., compute_id is not set).
176 : Err(e) => {
177 : error!(
178 : "Refresh configuration: error getting a parsed spec: {:?}",
179 : e
180 : );
181 : new_status = ComputeStatus::RefreshConfigurationPending;
182 : // We may be dealing with an overloaded HCM if we end up in this path. Backoff 5 seconds before
183 : // retrying to avoid hammering the HCM.
184 : std::thread::sleep(std::time::Duration::from_secs(5));
185 : }
186 : }
187 : compute.set_status(new_status);
188 : } else if state.status == ComputeStatus::Failed {
189 : info!("compute node is now in Failed state, exiting");
190 : break;
191 : } else {
192 : info!("woken up for compute status: {:?}, sleeping", state.status);
193 : }
194 : }
195 : }
196 :
197 0 : pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
198 0 : let compute = Arc::clone(compute);
199 :
200 0 : let runtime = tokio::runtime::Handle::current();
201 :
202 0 : thread::Builder::new()
203 0 : .name("compute-configurator".into())
204 0 : .spawn(move || {
205 0 : let _rt_guard = runtime.enter();
206 0 : configurator_main_loop(&compute);
207 0 : info!("configurator thread is exited");
208 0 : })
209 0 : .expect("cannot launch configurator thread")
210 0 : }
|