Line data Source code
1 : use std::convert::Infallible;
2 : use std::net::IpAddr;
3 : use std::net::Ipv6Addr;
4 : use std::net::SocketAddr;
5 : use std::sync::Arc;
6 : use std::thread;
7 :
8 : use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
9 : use compute_api::requests::ConfigurationRequest;
10 : use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
11 :
12 : use anyhow::Result;
13 : use hyper::service::{make_service_fn, service_fn};
14 : use hyper::{Body, Method, Request, Response, Server, StatusCode};
15 : use num_cpus;
16 : use serde_json;
17 : use tokio::task;
18 : use tracing::{error, info, warn};
19 : use tracing_utils::http::OtelName;
20 :
21 2218 : fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
22 2218 : ComputeStatusResponse {
23 2218 : start_time: state.start_time,
24 2218 : tenant: state
25 2218 : .pspec
26 2218 : .as_ref()
27 2218 : .map(|pspec| pspec.tenant_id.to_string()),
28 2218 : timeline: state
29 2218 : .pspec
30 2218 : .as_ref()
31 2218 : .map(|pspec| pspec.timeline_id.to_string()),
32 2218 : status: state.status,
33 2218 : last_active: state.last_active,
34 2218 : error: state.error.clone(),
35 2218 : }
36 2218 : }
37 :
38 : // Service function to handle all available routes.
39 2261 : async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
40 2261 : //
41 2261 : // NOTE: The URI path is currently included in traces. That's OK because
42 2261 : // it doesn't contain any variable parts or sensitive information. But
43 2261 : // please keep that in mind if you change the routing here.
44 2261 : //
45 2261 : match (req.method(), req.uri().path()) {
46 : // Serialized compute state.
47 1997 : (&Method::GET, "/status") => {
48 1997 : info!("serving /status GET request");
49 1997 : let state = compute.state.lock().unwrap();
50 1997 : let status_response = status_response_from_state(&state);
51 1997 : Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
52 : }
53 :
54 : // Startup metrics in JSON format. Keep /metrics reserved for a possible
55 : // future use for Prometheus metrics format.
56 0 : (&Method::GET, "/metrics.json") => {
57 0 : info!("serving /metrics.json GET request");
58 0 : let metrics = compute.state.lock().unwrap().metrics.clone();
59 0 : Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
60 : }
61 :
62 : // Collect Postgres current usage insights
63 0 : (&Method::GET, "/insights") => {
64 0 : info!("serving /insights GET request");
65 0 : let status = compute.get_status();
66 0 : if status != ComputeStatus::Running {
67 0 : let msg = format!("compute is not running, current status: {:?}", status);
68 0 : error!(msg);
69 0 : return Response::new(Body::from(msg));
70 0 : }
71 :
72 0 : let insights = compute.collect_insights().await;
73 0 : Response::new(Body::from(insights))
74 : }
75 :
76 264 : (&Method::POST, "/check_writability") => {
77 0 : info!("serving /check_writability POST request");
78 0 : let status = compute.get_status();
79 0 : if status != ComputeStatus::Running {
80 0 : let msg = format!(
81 0 : "invalid compute status for check_writability request: {:?}",
82 0 : status
83 0 : );
84 0 : error!(msg);
85 0 : return Response::new(Body::from(msg));
86 0 : }
87 :
88 0 : let res = crate::checker::check_writability(compute).await;
89 0 : match res {
90 0 : Ok(_) => Response::new(Body::from("true")),
91 0 : Err(e) => {
92 0 : error!("check_writability failed: {}", e);
93 0 : Response::new(Body::from(e.to_string()))
94 : }
95 : }
96 : }
97 :
98 0 : (&Method::GET, "/info") => {
99 0 : let num_cpus = num_cpus::get_physical();
100 0 : info!("serving /info GET request. num_cpus: {}", num_cpus);
101 0 : Response::new(Body::from(
102 0 : serde_json::json!({
103 0 : "num_cpus": num_cpus,
104 0 : })
105 0 : .to_string(),
106 0 : ))
107 : }
108 :
109 : // Accept spec in JSON format and request compute configuration. If
110 : // anything goes wrong after we set the compute status to `ConfigurationPending`
111 : // and update compute state with new spec, we basically leave compute
112 : // in the potentially wrong state. That said, it's control-plane's
113 : // responsibility to watch compute state after reconfiguration request
114 : // and to clean restart in case of errors.
115 264 : (&Method::POST, "/configure") => {
116 221 : info!("serving /configure POST request");
117 442 : match handle_configure_request(req, compute).await {
118 221 : Ok(msg) => Response::new(Body::from(msg)),
119 0 : Err((msg, code)) => {
120 0 : error!("error handling /configure request: {msg}");
121 0 : render_json_error(&msg, code)
122 : }
123 : }
124 : }
125 :
126 : // download extension files from remote extension storage on demand
127 43 : (&Method::POST, route) if route.starts_with("/extension_server/") => {
128 43 : info!("serving {:?} POST request", route);
129 43 : info!("req.uri {:?}", req.uri());
130 :
131 : // don't even try to download extensions
132 : // if no remote storage is configured
133 43 : if compute.ext_remote_storage.is_none() {
134 41 : info!("no extensions remote storage configured");
135 41 : let mut resp = Response::new(Body::from("no remote storage configured"));
136 41 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
137 41 : return resp;
138 2 : }
139 2 :
140 2 : let mut is_library = false;
141 2 : if let Some(params) = req.uri().query() {
142 0 : info!("serving {:?} POST request with params: {}", route, params);
143 0 : if params == "is_library=true" {
144 0 : is_library = true;
145 0 : } else {
146 0 : let mut resp = Response::new(Body::from("Wrong request parameters"));
147 0 : *resp.status_mut() = StatusCode::BAD_REQUEST;
148 0 : return resp;
149 : }
150 2 : }
151 2 : let filename = route.split('/').last().unwrap().to_string();
152 2 : info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
153 :
154 : // get ext_name and path from spec
155 : // don't lock compute_state for too long
156 2 : let ext = {
157 2 : let compute_state = compute.state.lock().unwrap();
158 2 : let pspec = compute_state.pspec.as_ref().expect("spec must be set");
159 2 : let spec = &pspec.spec;
160 :
161 : // debug only
162 2 : info!("spec: {:?}", spec);
163 :
164 2 : let remote_extensions = match spec.remote_extensions.as_ref() {
165 2 : Some(r) => r,
166 : None => {
167 0 : info!("no remote extensions spec was provided");
168 0 : let mut resp = Response::new(Body::from("no remote storage configured"));
169 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
170 0 : return resp;
171 : }
172 : };
173 :
174 2 : remote_extensions.get_ext(
175 2 : &filename,
176 2 : is_library,
177 2 : &compute.build_tag,
178 2 : &compute.pgversion,
179 2 : )
180 2 : };
181 2 :
182 2 : match ext {
183 2 : Ok((ext_name, ext_path)) => {
184 10 : match compute.download_extension(ext_name, ext_path).await {
185 2 : Ok(_) => Response::new(Body::from("OK")),
186 0 : Err(e) => {
187 0 : error!("extension download failed: {}", e);
188 0 : let mut resp = Response::new(Body::from(e.to_string()));
189 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
190 0 : resp
191 : }
192 : }
193 : }
194 0 : Err(e) => {
195 0 : warn!("extension download failed to find extension: {}", e);
196 0 : let mut resp = Response::new(Body::from("failed to find file"));
197 0 : *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
198 0 : resp
199 : }
200 : }
201 : }
202 :
203 : // Return the `404 Not Found` for any other routes.
204 : _ => {
205 0 : let mut not_found = Response::new(Body::from("404 Not Found"));
206 0 : *not_found.status_mut() = StatusCode::NOT_FOUND;
207 0 : not_found
208 : }
209 : }
210 2261 : }
211 :
212 221 : async fn handle_configure_request(
213 221 : req: Request<Body>,
214 221 : compute: &Arc<ComputeNode>,
215 221 : ) -> Result<String, (String, StatusCode)> {
216 221 : if !compute.live_config_allowed {
217 0 : return Err((
218 0 : "live configuration is not allowed for this compute node".to_string(),
219 0 : StatusCode::PRECONDITION_FAILED,
220 0 : ));
221 221 : }
222 :
223 221 : let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
224 221 : let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
225 221 : if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
226 221 : let spec = request.spec;
227 :
228 221 : let parsed_spec = match ParsedSpec::try_from(spec) {
229 221 : Ok(ps) => ps,
230 0 : Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)),
231 : };
232 :
233 : // XXX: wrap state update under lock in code blocks. Otherwise,
234 : // we will try to `Send` `mut state` into the spawned thread
235 : // bellow, which will cause error:
236 : // ```
237 : // error: future cannot be sent between threads safely
238 : // ```
239 : {
240 221 : let mut state = compute.state.lock().unwrap();
241 221 : if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
242 0 : let msg = format!(
243 0 : "invalid compute status for configuration request: {:?}",
244 0 : state.status.clone()
245 0 : );
246 0 : return Err((msg, StatusCode::PRECONDITION_FAILED));
247 221 : }
248 221 : state.pspec = Some(parsed_spec);
249 221 : state.status = ComputeStatus::ConfigurationPending;
250 221 : compute.state_changed.notify_all();
251 221 : drop(state);
252 221 : info!("set new spec and notified waiters");
253 : }
254 :
255 : // Spawn a blocking thread to wait for compute to become Running.
256 : // This is needed to do not block the main pool of workers and
257 : // be able to serve other requests while some particular request
258 : // is waiting for compute to finish configuration.
259 221 : let c = compute.clone();
260 221 : task::spawn_blocking(move || {
261 221 : let mut state = c.state.lock().unwrap();
262 450 : while state.status != ComputeStatus::Running {
263 229 : state = c.state_changed.wait(state).unwrap();
264 229 : info!(
265 229 : "waiting for compute to become Running, current status: {:?}",
266 229 : state.status
267 229 : );
268 :
269 229 : if state.status == ComputeStatus::Failed {
270 0 : let err = state.error.as_ref().map_or("unknown error", |x| x);
271 0 : let msg = format!("compute configuration failed: {:?}", err);
272 0 : return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
273 229 : }
274 : }
275 :
276 221 : Ok(())
277 221 : })
278 442 : .await
279 221 : .unwrap()?;
280 :
281 : // Return current compute state if everything went well.
282 221 : let state = compute.state.lock().unwrap().clone();
283 221 : let status_response = status_response_from_state(&state);
284 221 : Ok(serde_json::to_string(&status_response).unwrap())
285 : } else {
286 0 : Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
287 : }
288 221 : }
289 :
290 0 : fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
291 0 : let error = GenericAPIError {
292 0 : error: e.to_string(),
293 0 : };
294 0 : Response::builder()
295 0 : .status(status)
296 0 : .body(Body::from(serde_json::to_string(&error).unwrap()))
297 0 : .unwrap()
298 0 : }
299 :
300 : // Main Hyper HTTP server function that runs it and blocks waiting on it forever.
301 : #[tokio::main]
302 575 : async fn serve(port: u16, state: Arc<ComputeNode>) {
303 575 : // this usually binds to both IPv4 and IPv6 on linux
304 575 : // see e.g. https://github.com/rust-lang/rust/pull/34440
305 575 : let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
306 575 :
307 2261 : let make_service = make_service_fn(move |_conn| {
308 2261 : let state = state.clone();
309 2261 : async move {
310 2261 : Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
311 2261 : let state = state.clone();
312 2261 : async move {
313 2261 : Ok::<_, Infallible>(
314 2261 : // NOTE: We include the URI path in the string. It
315 2261 : // doesn't contain any variable parts or sensitive
316 2261 : // information in this API.
317 2261 : tracing_utils::http::tracing_handler(
318 2261 : req,
319 2261 : |req| routes(req, &state),
320 2261 : OtelName::UriPath,
321 2261 : )
322 452 : .await,
323 : )
324 2261 : }
325 2261 : }))
326 2261 : }
327 2261 : });
328 575 :
329 575 : info!("starting HTTP server on {}", addr);
330 :
331 575 : let server = Server::bind(&addr).serve(make_service);
332 :
333 : // Run this server forever
334 2261 : if let Err(e) = server.await {
335 0 : error!("server error: {}", e);
336 0 : }
337 : }
338 :
339 : /// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
340 575 : pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
341 575 : let state = Arc::clone(state);
342 575 :
343 575 : Ok(thread::Builder::new()
344 575 : .name("http-endpoint".into())
345 575 : .spawn(move || serve(port, state))?)
346 575 : }
|