Line data Source code
1 : use std::fs::File;
2 : use std::thread;
3 : use std::{path::Path, sync::Arc};
4 :
5 : use compute_api::responses::{ComputeConfig, ComputeStatus};
6 : use tracing::{error, info, instrument};
7 :
8 : use crate::compute::{ComputeNode, ParsedSpec};
9 : use crate::spec::get_config_from_control_plane;
10 :
11 : #[instrument(skip_all)]
12 : fn configurator_main_loop(compute: &Arc<ComputeNode>) {
13 : info!("waiting for reconfiguration requests");
14 : loop {
15 : let mut state = compute.state.lock().unwrap();
16 :
17 : if compute.params.lakebase_mode {
18 : while state.status != ComputeStatus::ConfigurationPending
19 : && state.status != ComputeStatus::RefreshConfigurationPending
20 : && state.status != ComputeStatus::Failed
21 : {
22 : info!("configurator: compute status: {:?}, sleeping", state.status);
23 : state = compute.state_changed.wait(state).unwrap();
24 : }
25 : } else {
26 : // We have to re-check the status after re-acquiring the lock because it could be that
27 : // the status has changed while we were waiting for the lock, and we might not need to
28 : // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e.
29 : // we are waiting for a condition variable that will never be signaled.
30 : if state.status != ComputeStatus::ConfigurationPending {
31 : state = compute.state_changed.wait(state).unwrap();
32 : }
33 : }
34 :
35 : // Re-check the status after waking up
36 : if state.status == ComputeStatus::ConfigurationPending {
37 : info!("got configuration request");
38 : state.set_status(ComputeStatus::Configuration, &compute.state_changed);
39 : drop(state);
40 :
41 : let mut new_status = ComputeStatus::Failed;
42 : if let Err(e) = compute.reconfigure() {
43 : error!("could not configure compute node: {}", e);
44 : } else {
45 : new_status = ComputeStatus::Running;
46 : info!("compute node configured");
47 : }
48 :
49 : // XXX: used to test that API is blocking
50 : // std::thread::sleep(std::time::Duration::from_millis(10000));
51 :
52 : compute.set_status(new_status);
53 : } else if state.status == ComputeStatus::RefreshConfigurationPending {
54 : info!(
55 : "compute node suspects its configuration is out of date, now refreshing configuration"
56 : );
57 : // Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC.
58 : // This is the only thread that can move compute_ctl out of the `RefreshConfigurationPending` state, so it
59 : // is safe to drop the lock like this.
60 : drop(state);
61 :
62 : let spec = if let Some(config_path) = &compute.params.config_path_test_only {
63 : // This path is only to make testing easier. In production we always get the spec from the HCC.
64 : info!(
65 : "reloading config.json from path: {}",
66 : config_path.to_string_lossy()
67 : );
68 : let path = Path::new(config_path);
69 : if let Ok(file) = File::open(path) {
70 : match serde_json::from_reader::<File, ComputeConfig>(file) {
71 : Ok(config) => config.spec,
72 : Err(e) => {
73 : error!("could not parse spec file: {}", e);
74 : None
75 : }
76 : }
77 : } else {
78 : error!(
79 : "could not open config file at path: {}",
80 : config_path.to_string_lossy()
81 : );
82 : None
83 : }
84 : } else if let Some(control_plane_uri) = &compute.params.control_plane_uri {
85 : match get_config_from_control_plane(control_plane_uri, &compute.params.compute_id) {
86 : Ok(config) => config.spec,
87 : Err(e) => {
88 : error!("could not get config from control plane: {}", e);
89 : None
90 : }
91 : }
92 : } else {
93 : None
94 : };
95 :
96 : if let Some(spec) = spec {
97 : if let Ok(pspec) = ParsedSpec::try_from(spec) {
98 : {
99 : let mut state = compute.state.lock().unwrap();
100 : // Defensive programming to make sure this thread is indeed the only one that can move the compute
101 : // node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant
102 : // into the type system.
103 : assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending);
104 :
105 0 : if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone())
106 : == Some(pspec.pageserver_connstr.clone())
107 : {
108 : info!(
109 : "Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration."
110 : );
111 : state.status = ComputeStatus::Running;
112 : compute.state_changed.notify_all();
113 : drop(state);
114 : std::thread::sleep(std::time::Duration::from_secs(5));
115 : continue;
116 : }
117 : // state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
118 : // the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
119 : // "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
120 : // but it's not worth forking the codebase too much for this minor point alone right now.
121 : state.pspec = Some(pspec);
122 : }
123 : match compute.reconfigure() {
124 : Ok(_) => {
125 : info!("Refresh configuration: compute node configured");
126 : compute.set_status(ComputeStatus::Running);
127 : }
128 : Err(e) => {
129 : error!(
130 : "Refresh configuration: could not configure compute node: {}",
131 : e
132 : );
133 : // Leave the compute node in the `RefreshConfigurationPending` state if the configuration
134 : // was not successful. It should be okay to treat this situation the same as if the loop
135 : // hasn't executed yet as long as the detection side keeps notifying.
136 : }
137 : }
138 : }
139 : }
140 : } else if state.status == ComputeStatus::Failed {
141 : info!("compute node is now in Failed state, exiting");
142 : break;
143 : } else {
144 : info!("woken up for compute status: {:?}, sleeping", state.status);
145 : }
146 : }
147 : }
148 :
149 0 : pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
150 0 : let compute = Arc::clone(compute);
151 :
152 0 : let runtime = tokio::runtime::Handle::current();
153 :
154 0 : thread::Builder::new()
155 0 : .name("compute-configurator".into())
156 0 : .spawn(move || {
157 0 : let _rt_guard = runtime.enter();
158 0 : configurator_main_loop(&compute);
159 0 : info!("configurator thread is exited");
160 0 : })
161 0 : .expect("cannot launch configurator thread")
162 0 : }
|