Line data Source code
1 : use crate::metrics::{
2 : HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, PageserverRequestLabelGroup,
3 : METRICS_REGISTRY,
4 : };
5 : use crate::reconciler::ReconcileError;
6 : use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
7 : use anyhow::Context;
8 : use futures::Future;
9 : use hyper::header::CONTENT_TYPE;
10 : use hyper::{Body, Request, Response};
11 : use hyper::{StatusCode, Uri};
12 : use metrics::{BuildInfo, NeonMetrics};
13 : use pageserver_api::controller_api::TenantCreateRequest;
14 : use pageserver_api::models::{
15 : TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
16 : TenantTimeTravelRequest, TimelineCreateRequest,
17 : };
18 : use pageserver_api::shard::TenantShardId;
19 : use pageserver_client::mgmt_api;
20 : use std::sync::Arc;
21 : use std::time::{Duration, Instant};
22 : use tokio_util::sync::CancellationToken;
23 : use utils::auth::{Scope, SwappableJwtAuth};
24 : use utils::failpoint_support::failpoints_handler;
25 : use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
26 : use utils::http::request::{must_get_query_param, parse_query_param, parse_request_param};
27 : use utils::id::{TenantId, TimelineId};
28 :
29 : use utils::{
30 : http::{
31 : endpoint::{self},
32 : error::ApiError,
33 : json::{json_request, json_response},
34 : RequestExt, RouterBuilder,
35 : },
36 : id::NodeId,
37 : };
38 :
39 : use pageserver_api::controller_api::{
40 : NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantPolicyRequest,
41 : TenantShardMigrateRequest,
42 : };
43 : use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
44 :
45 : use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
46 :
47 : use routerify::Middleware;
48 :
49 : /// State available to HTTP request handlers
50 : pub struct HttpState {
51 : service: Arc<crate::service::Service>,
52 : auth: Option<Arc<SwappableJwtAuth>>,
53 : neon_metrics: NeonMetrics,
54 : allowlist_routes: Vec<Uri>,
55 : }
56 :
57 : impl HttpState {
58 0 : pub fn new(
59 0 : service: Arc<crate::service::Service>,
60 0 : auth: Option<Arc<SwappableJwtAuth>>,
61 0 : build_info: BuildInfo,
62 0 : ) -> Self {
63 0 : let allowlist_routes = ["/status", "/ready", "/metrics"]
64 0 : .iter()
65 0 : .map(|v| v.parse().unwrap())
66 0 : .collect::<Vec<_>>();
67 0 : Self {
68 0 : service,
69 0 : auth,
70 0 : neon_metrics: NeonMetrics::new(build_info),
71 0 : allowlist_routes,
72 0 : }
73 0 : }
74 : }
75 :
76 : #[inline(always)]
77 0 : fn get_state(request: &Request<Body>) -> &HttpState {
78 0 : request
79 0 : .data::<Arc<HttpState>>()
80 0 : .expect("unknown state type")
81 0 : .as_ref()
82 0 : }
83 :
84 : /// Pageserver calls into this on startup, to learn which tenants it should attach
85 0 : async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
86 0 : check_permissions(&req, Scope::GenerationsApi)?;
87 :
88 0 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
89 0 : let state = get_state(&req);
90 0 : json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
91 0 : }
92 :
93 : /// Pageserver calls into this before doing deletions, to confirm that it still
94 : /// holds the latest generation for the tenants with deletions enqueued
95 0 : async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
96 0 : check_permissions(&req, Scope::GenerationsApi)?;
97 :
98 0 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
99 0 : let state = get_state(&req);
100 0 : json_response(StatusCode::OK, state.service.validate(validate_req))
101 0 : }
102 :
103 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
104 : /// (in the real control plane this is unnecessary, because the same program is managing
105 : /// generation numbers and doing attachments).
106 0 : async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
107 0 : check_permissions(&req, Scope::Admin)?;
108 :
109 0 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
110 0 : let state = get_state(&req);
111 0 :
112 0 : json_response(
113 0 : StatusCode::OK,
114 0 : state
115 0 : .service
116 0 : .attach_hook(attach_req)
117 0 : .await
118 0 : .map_err(ApiError::InternalServerError)?,
119 : )
120 0 : }
121 :
122 0 : async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
123 0 : check_permissions(&req, Scope::Admin)?;
124 :
125 0 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
126 :
127 0 : let state = get_state(&req);
128 0 :
129 0 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
130 0 : }
131 :
132 0 : async fn handle_tenant_create(
133 0 : service: Arc<Service>,
134 0 : mut req: Request<Body>,
135 0 : ) -> Result<Response<Body>, ApiError> {
136 0 : check_permissions(&req, Scope::PageServerApi)?;
137 :
138 0 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
139 :
140 : json_response(
141 : StatusCode::CREATED,
142 0 : service.tenant_create(create_req).await?,
143 : )
144 0 : }
145 :
146 0 : async fn handle_tenant_location_config(
147 0 : service: Arc<Service>,
148 0 : mut req: Request<Body>,
149 0 : ) -> Result<Response<Body>, ApiError> {
150 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
151 0 : check_permissions(&req, Scope::PageServerApi)?;
152 :
153 0 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
154 : json_response(
155 : StatusCode::OK,
156 0 : service
157 0 : .tenant_location_config(tenant_shard_id, config_req)
158 0 : .await?,
159 : )
160 0 : }
161 :
162 0 : async fn handle_tenant_config_set(
163 0 : service: Arc<Service>,
164 0 : mut req: Request<Body>,
165 0 : ) -> Result<Response<Body>, ApiError> {
166 0 : check_permissions(&req, Scope::PageServerApi)?;
167 :
168 0 : let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
169 :
170 0 : json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
171 0 : }
172 :
173 0 : async fn handle_tenant_config_get(
174 0 : service: Arc<Service>,
175 0 : req: Request<Body>,
176 0 : ) -> Result<Response<Body>, ApiError> {
177 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
178 0 : check_permissions(&req, Scope::PageServerApi)?;
179 :
180 0 : json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
181 0 : }
182 :
183 0 : async fn handle_tenant_time_travel_remote_storage(
184 0 : service: Arc<Service>,
185 0 : mut req: Request<Body>,
186 0 : ) -> Result<Response<Body>, ApiError> {
187 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
188 0 : check_permissions(&req, Scope::PageServerApi)?;
189 :
190 0 : let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
191 :
192 0 : let timestamp_raw = must_get_query_param(&req, "travel_to")?;
193 0 : let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
194 0 : ApiError::BadRequest(anyhow::anyhow!(
195 0 : "Invalid time for travel_to: {timestamp_raw:?}"
196 0 : ))
197 0 : })?;
198 :
199 0 : let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
200 0 : let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
201 0 : ApiError::BadRequest(anyhow::anyhow!(
202 0 : "Invalid time for done_if_after: {done_if_after_raw:?}"
203 0 : ))
204 0 : })?;
205 :
206 0 : service
207 0 : .tenant_time_travel_remote_storage(
208 0 : &time_travel_req,
209 0 : tenant_id,
210 0 : timestamp_raw,
211 0 : done_if_after_raw,
212 0 : )
213 0 : .await?;
214 0 : json_response(StatusCode::OK, ())
215 0 : }
216 :
217 0 : fn map_reqwest_hyper_status(status: reqwest::StatusCode) -> Result<hyper::StatusCode, ApiError> {
218 0 : hyper::StatusCode::from_u16(status.as_u16())
219 0 : .context("invalid status code")
220 0 : .map_err(ApiError::InternalServerError)
221 0 : }
222 :
223 0 : async fn handle_tenant_secondary_download(
224 0 : service: Arc<Service>,
225 0 : req: Request<Body>,
226 0 : ) -> Result<Response<Body>, ApiError> {
227 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
228 0 : let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
229 :
230 0 : let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
231 0 : json_response(map_reqwest_hyper_status(status)?, progress)
232 0 : }
233 :
234 0 : async fn handle_tenant_delete(
235 0 : service: Arc<Service>,
236 0 : req: Request<Body>,
237 0 : ) -> Result<Response<Body>, ApiError> {
238 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
239 0 : check_permissions(&req, Scope::PageServerApi)?;
240 :
241 0 : let status_code = service
242 0 : .tenant_delete(tenant_id)
243 0 : .await
244 0 : .and_then(map_reqwest_hyper_status)?;
245 :
246 0 : if status_code == StatusCode::NOT_FOUND {
247 : // The pageserver uses 404 for successful deletion, but we use 200
248 0 : json_response(StatusCode::OK, ())
249 : } else {
250 0 : json_response(status_code, ())
251 : }
252 0 : }
253 :
254 0 : async fn handle_tenant_timeline_create(
255 0 : service: Arc<Service>,
256 0 : mut req: Request<Body>,
257 0 : ) -> Result<Response<Body>, ApiError> {
258 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
259 0 : check_permissions(&req, Scope::PageServerApi)?;
260 :
261 0 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
262 : json_response(
263 : StatusCode::CREATED,
264 0 : service
265 0 : .tenant_timeline_create(tenant_id, create_req)
266 0 : .await?,
267 : )
268 0 : }
269 :
270 0 : async fn handle_tenant_timeline_delete(
271 0 : service: Arc<Service>,
272 0 : req: Request<Body>,
273 0 : ) -> Result<Response<Body>, ApiError> {
274 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
275 0 : check_permissions(&req, Scope::PageServerApi)?;
276 :
277 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
278 :
279 : // For timeline deletions, which both implement an "initially return 202, then 404 once
280 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
281 0 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
282 0 : where
283 0 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
284 0 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
285 0 : {
286 0 : let started_at = Instant::now();
287 0 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
288 0 : // completed.
289 0 : let mut retry_period = Duration::from_secs(1);
290 0 : // On subsequent retries, wait longer.
291 0 : let max_retry_period = Duration::from_secs(5);
292 0 : // Enable callers with a 30 second request timeout to reliably get a response
293 0 : let max_wait = Duration::from_secs(25);
294 :
295 : loop {
296 0 : let status = f(service.clone()).await?;
297 0 : match status {
298 : StatusCode::ACCEPTED => {
299 0 : tracing::info!("Deletion accepted, waiting to try again...");
300 0 : tokio::time::sleep(retry_period).await;
301 0 : retry_period = max_retry_period;
302 : }
303 : StatusCode::NOT_FOUND => {
304 0 : tracing::info!("Deletion complete");
305 0 : return json_response(StatusCode::OK, ());
306 : }
307 : _ => {
308 0 : tracing::warn!("Unexpected status {status}");
309 0 : return json_response(status, ());
310 : }
311 : }
312 :
313 0 : let now = Instant::now();
314 0 : if now + retry_period > started_at + max_wait {
315 0 : tracing::info!("Deletion timed out waiting for 404");
316 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
317 : // the pageserver's swagger definition for this endpoint, and has the same desired
318 : // effect of causing the control plane to retry later.
319 0 : return json_response(StatusCode::CONFLICT, ());
320 0 : }
321 : }
322 0 : }
323 :
324 0 : deletion_wrapper(service, move |service| async move {
325 0 : service
326 0 : .tenant_timeline_delete(tenant_id, timeline_id)
327 0 : .await
328 0 : .and_then(map_reqwest_hyper_status)
329 0 : })
330 0 : .await
331 0 : }
332 :
333 0 : async fn handle_tenant_timeline_passthrough(
334 0 : service: Arc<Service>,
335 0 : req: Request<Body>,
336 0 : ) -> Result<Response<Body>, ApiError> {
337 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
338 0 : check_permissions(&req, Scope::PageServerApi)?;
339 :
340 0 : let Some(path) = req.uri().path_and_query() else {
341 : // This should never happen, our request router only calls us if there is a path
342 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
343 : };
344 :
345 0 : tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
346 :
347 : // Find the node that holds shard zero
348 0 : let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?;
349 :
350 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
351 : // rewrite this to a shard-aware shard zero ID.
352 0 : let path = format!("{}", path);
353 0 : let tenant_str = tenant_id.to_string();
354 0 : let tenant_shard_str = format!("{}", tenant_shard_id);
355 0 : let path = path.replace(&tenant_str, &tenant_shard_str);
356 0 :
357 0 : let latency = &METRICS_REGISTRY
358 0 : .metrics_group
359 0 : .storage_controller_passthrough_request_latency;
360 0 :
361 0 : // This is a bit awkward. We remove the param from the request
362 0 : // and join the words by '_' to get a label for the request.
363 0 : let just_path = path.replace(&tenant_shard_str, "");
364 0 : let path_label = just_path
365 0 : .split('/')
366 0 : .filter(|token| !token.is_empty())
367 0 : .collect::<Vec<_>>()
368 0 : .join("_");
369 0 : let labels = PageserverRequestLabelGroup {
370 0 : pageserver_id: &node.get_id().to_string(),
371 0 : path: &path_label,
372 0 : method: crate::metrics::Method::Get,
373 0 : };
374 0 :
375 0 : let _timer = latency.start_timer(labels.clone());
376 0 :
377 0 : let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
378 0 : let resp = client.get_raw(path).await.map_err(|_e|
379 : // FIXME: give APiError a proper Unavailable variant. We return 503 here because
380 : // if we can't successfully send a request to the pageserver, we aren't available.
381 0 : ApiError::ShuttingDown)?;
382 :
383 0 : if !resp.status().is_success() {
384 0 : let error_counter = &METRICS_REGISTRY
385 0 : .metrics_group
386 0 : .storage_controller_passthrough_request_error;
387 0 : error_counter.inc(labels);
388 0 : }
389 :
390 : // We have a reqest::Response, would like a http::Response
391 0 : let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
392 0 : for (k, v) in resp.headers() {
393 0 : builder = builder.header(k.as_str(), v.as_bytes());
394 0 : }
395 :
396 0 : let response = builder
397 0 : .body(Body::wrap_stream(resp.bytes_stream()))
398 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
399 :
400 0 : Ok(response)
401 0 : }
402 :
403 0 : async fn handle_tenant_locate(
404 0 : service: Arc<Service>,
405 0 : req: Request<Body>,
406 0 : ) -> Result<Response<Body>, ApiError> {
407 0 : check_permissions(&req, Scope::Admin)?;
408 :
409 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
410 0 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
411 0 : }
412 :
413 0 : async fn handle_tenant_describe(
414 0 : service: Arc<Service>,
415 0 : req: Request<Body>,
416 0 : ) -> Result<Response<Body>, ApiError> {
417 0 : check_permissions(&req, Scope::Admin)?;
418 :
419 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
420 0 : json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
421 0 : }
422 :
423 0 : async fn handle_tenant_list(
424 0 : service: Arc<Service>,
425 0 : req: Request<Body>,
426 0 : ) -> Result<Response<Body>, ApiError> {
427 0 : check_permissions(&req, Scope::Admin)?;
428 :
429 0 : json_response(StatusCode::OK, service.tenant_list())
430 0 : }
431 :
432 0 : async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
433 0 : check_permissions(&req, Scope::Admin)?;
434 :
435 0 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
436 0 : let state = get_state(&req);
437 0 : state.service.node_register(register_req).await?;
438 0 : json_response(StatusCode::OK, ())
439 0 : }
440 :
441 0 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
442 0 : check_permissions(&req, Scope::Admin)?;
443 :
444 0 : let state = get_state(&req);
445 0 : let nodes = state.service.node_list().await?;
446 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
447 0 :
448 0 : json_response(StatusCode::OK, api_nodes)
449 0 : }
450 :
451 0 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
452 0 : check_permissions(&req, Scope::Admin)?;
453 :
454 0 : let state = get_state(&req);
455 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
456 0 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
457 0 : }
458 :
459 0 : async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
460 0 : check_permissions(&req, Scope::Admin)?;
461 :
462 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
463 0 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
464 0 : if node_id != config_req.node_id {
465 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
466 0 : "Path and body node_id differ"
467 0 : )));
468 0 : }
469 0 : let state = get_state(&req);
470 0 :
471 0 : json_response(
472 0 : StatusCode::OK,
473 0 : state
474 0 : .service
475 0 : .node_configure(
476 0 : config_req.node_id,
477 0 : config_req.availability.map(NodeAvailability::from),
478 0 : config_req.scheduling,
479 0 : )
480 0 : .await?,
481 : )
482 0 : }
483 :
484 0 : async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
485 0 : check_permissions(&req, Scope::Admin)?;
486 :
487 0 : let state = get_state(&req);
488 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
489 :
490 0 : let node_status = state.service.get_node(node_id).await?;
491 :
492 0 : json_response(StatusCode::OK, node_status)
493 0 : }
494 :
495 0 : async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
496 0 : check_permissions(&req, Scope::Admin)?;
497 :
498 0 : let state = get_state(&req);
499 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
500 :
501 0 : state.service.start_node_drain(node_id).await?;
502 :
503 0 : json_response(StatusCode::ACCEPTED, ())
504 0 : }
505 :
506 0 : async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
507 0 : check_permissions(&req, Scope::Admin)?;
508 :
509 0 : let state = get_state(&req);
510 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
511 :
512 0 : state.service.cancel_node_drain(node_id).await?;
513 :
514 0 : json_response(StatusCode::ACCEPTED, ())
515 0 : }
516 :
517 0 : async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
518 0 : check_permissions(&req, Scope::Admin)?;
519 :
520 0 : let state = get_state(&req);
521 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
522 :
523 0 : state.service.start_node_fill(node_id).await?;
524 :
525 0 : json_response(StatusCode::ACCEPTED, ())
526 0 : }
527 :
528 0 : async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
529 0 : check_permissions(&req, Scope::Admin)?;
530 :
531 0 : let state = get_state(&req);
532 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
533 :
534 0 : state.service.cancel_node_fill(node_id).await?;
535 :
536 0 : json_response(StatusCode::ACCEPTED, ())
537 0 : }
538 :
539 0 : async fn handle_tenant_shard_split(
540 0 : service: Arc<Service>,
541 0 : mut req: Request<Body>,
542 0 : ) -> Result<Response<Body>, ApiError> {
543 0 : check_permissions(&req, Scope::Admin)?;
544 :
545 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
546 0 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
547 :
548 : json_response(
549 : StatusCode::OK,
550 0 : service.tenant_shard_split(tenant_id, split_req).await?,
551 : )
552 0 : }
553 :
554 0 : async fn handle_tenant_shard_migrate(
555 0 : service: Arc<Service>,
556 0 : mut req: Request<Body>,
557 0 : ) -> Result<Response<Body>, ApiError> {
558 0 : check_permissions(&req, Scope::Admin)?;
559 :
560 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
561 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
562 : json_response(
563 : StatusCode::OK,
564 0 : service
565 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
566 0 : .await?,
567 : )
568 0 : }
569 :
570 0 : async fn handle_tenant_update_policy(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
571 0 : check_permissions(&req, Scope::Admin)?;
572 :
573 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
574 0 : let update_req = json_request::<TenantPolicyRequest>(&mut req).await?;
575 0 : let state = get_state(&req);
576 0 :
577 0 : json_response(
578 0 : StatusCode::OK,
579 0 : state
580 0 : .service
581 0 : .tenant_update_policy(tenant_id, update_req)
582 0 : .await?,
583 : )
584 0 : }
585 :
586 0 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
587 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
588 0 : check_permissions(&req, Scope::PageServerApi)?;
589 :
590 0 : let state = get_state(&req);
591 0 :
592 0 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
593 0 : }
594 :
595 0 : async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
596 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
597 0 : check_permissions(&req, Scope::PageServerApi)?;
598 :
599 0 : let state = get_state(&req);
600 0 :
601 0 : json_response(
602 0 : StatusCode::OK,
603 0 : state.service.tenant_import(tenant_id).await?,
604 : )
605 0 : }
606 :
607 0 : async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
608 0 : check_permissions(&req, Scope::Admin)?;
609 :
610 0 : let state = get_state(&req);
611 0 : state.service.tenants_dump()
612 0 : }
613 :
614 0 : async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
615 0 : check_permissions(&req, Scope::Admin)?;
616 :
617 0 : let state = get_state(&req);
618 0 : state.service.scheduler_dump()
619 0 : }
620 :
621 0 : async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
622 0 : check_permissions(&req, Scope::Admin)?;
623 :
624 0 : let state = get_state(&req);
625 0 :
626 0 : json_response(StatusCode::OK, state.service.consistency_check().await?)
627 0 : }
628 :
629 0 : async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiError> {
630 0 : check_permissions(&req, Scope::Admin)?;
631 :
632 0 : let state = get_state(&req);
633 0 :
634 0 : json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
635 0 : }
636 :
637 : /// Status endpoint is just used for checking that our HTTP listener is up
638 0 : async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
639 0 : json_response(StatusCode::OK, ())
640 0 : }
641 :
642 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
643 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
644 0 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
645 0 : let state = get_state(&req);
646 0 : if state.service.startup_complete.is_ready() {
647 0 : json_response(StatusCode::OK, ())
648 : } else {
649 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
650 : }
651 0 : }
652 :
653 : impl From<ReconcileError> for ApiError {
654 0 : fn from(value: ReconcileError) -> Self {
655 0 : ApiError::Conflict(format!("Reconciliation error: {}", value))
656 0 : }
657 : }
658 :
659 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
660 : /// be allowed to run if Service has finished its initial reconciliation.
661 0 : async fn tenant_service_handler<R, H>(
662 0 : request: Request<Body>,
663 0 : handler: H,
664 0 : request_name: RequestName,
665 0 : ) -> R::Output
666 0 : where
667 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
668 0 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
669 0 : {
670 0 : let state = get_state(&request);
671 0 : let service = state.service.clone();
672 0 :
673 0 : let startup_complete = service.startup_complete.clone();
674 0 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
675 0 : .await
676 0 : .is_err()
677 : {
678 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
679 : // timeouts around its remote calls, to bound its runtime.
680 0 : return Err(ApiError::Timeout(
681 0 : "Timed out waiting for service readiness".into(),
682 0 : ));
683 0 : }
684 0 :
685 0 : named_request_span(
686 0 : request,
687 0 : |request| async move { handler(service, request).await },
688 0 : request_name,
689 0 : )
690 0 : .await
691 0 : }
692 :
693 : /// Check if the required scope is held in the request's token, or if the request has
694 : /// a token with 'admin' scope then always permit it.
695 0 : fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(), ApiError> {
696 0 : check_permission_with(request, |claims| {
697 0 : match crate::auth::check_permission(claims, required_scope) {
698 0 : Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
699 0 : Ok(()) => Ok(()),
700 0 : Err(_) => Err(e),
701 : },
702 0 : Ok(()) => Ok(()),
703 : }
704 0 : })
705 0 : }
706 :
707 : #[derive(Clone, Debug)]
708 : struct RequestMeta {
709 : method: hyper::http::Method,
710 : at: Instant,
711 : }
712 :
713 0 : fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
714 0 : ) -> Middleware<B, ApiError> {
715 0 : Middleware::pre(move |req| async move {
716 0 : let meta = RequestMeta {
717 0 : method: req.method().clone(),
718 0 : at: Instant::now(),
719 0 : };
720 0 :
721 0 : req.set_context(meta);
722 0 :
723 0 : Ok(req)
724 0 : })
725 0 : }
726 :
727 0 : fn epilogue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
728 0 : ) -> Middleware<B, ApiError> {
729 0 : Middleware::post_with_info(move |resp, req_info| async move {
730 0 : let request_name = match req_info.context::<RequestName>() {
731 0 : Some(name) => name,
732 0 : None => {
733 0 : return Ok(resp);
734 0 : }
735 0 : };
736 0 :
737 0 : if let Some(meta) = req_info.context::<RequestMeta>() {
738 0 : let status = &crate::metrics::METRICS_REGISTRY
739 0 : .metrics_group
740 0 : .storage_controller_http_request_status;
741 0 : let latency = &crate::metrics::METRICS_REGISTRY
742 0 : .metrics_group
743 0 : .storage_controller_http_request_latency;
744 0 :
745 0 : status.inc(HttpRequestStatusLabelGroup {
746 0 : path: request_name.0,
747 0 : method: meta.method.clone().into(),
748 0 : status: crate::metrics::StatusCode(resp.status()),
749 0 : });
750 0 :
751 0 : latency.observe(
752 0 : HttpRequestLatencyLabelGroup {
753 0 : path: request_name.0,
754 0 : method: meta.method.into(),
755 0 : },
756 0 : meta.at.elapsed().as_secs_f64(),
757 0 : );
758 0 : }
759 0 : Ok(resp)
760 0 : })
761 0 : }
762 :
763 0 : pub async fn measured_metrics_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
764 0 : pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
765 0 :
766 0 : let state = get_state(&req);
767 0 : let payload = crate::metrics::METRICS_REGISTRY.encode(&state.neon_metrics);
768 0 : let response = Response::builder()
769 0 : .status(200)
770 0 : .header(CONTENT_TYPE, TEXT_FORMAT)
771 0 : .body(payload.into())
772 0 : .unwrap();
773 0 :
774 0 : Ok(response)
775 0 : }
776 :
777 : #[derive(Clone)]
778 : struct RequestName(&'static str);
779 :
780 0 : async fn named_request_span<R, H>(
781 0 : request: Request<Body>,
782 0 : handler: H,
783 0 : name: RequestName,
784 0 : ) -> R::Output
785 0 : where
786 0 : R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
787 0 : H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
788 0 : {
789 0 : request.set_context(name);
790 0 : request_span(request, handler).await
791 0 : }
792 :
793 0 : pub fn make_router(
794 0 : service: Arc<Service>,
795 0 : auth: Option<Arc<SwappableJwtAuth>>,
796 0 : build_info: BuildInfo,
797 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
798 0 : let mut router = endpoint::make_router()
799 0 : .middleware(prologue_metrics_middleware())
800 0 : .middleware(epilogue_metrics_middleware());
801 0 : if auth.is_some() {
802 0 : router = router.middleware(auth_middleware(|request| {
803 0 : let state = get_state(request);
804 0 : if state.allowlist_routes.contains(request.uri()) {
805 0 : None
806 : } else {
807 0 : state.auth.as_deref()
808 : }
809 0 : }));
810 0 : }
811 :
812 0 : router
813 0 : .data(Arc::new(HttpState::new(service, auth, build_info)))
814 0 : .get("/metrics", |r| {
815 0 : named_request_span(r, measured_metrics_handler, RequestName("metrics"))
816 0 : })
817 0 : // Non-prefixed generic endpoints (status, metrics)
818 0 : .get("/status", |r| {
819 0 : named_request_span(r, handle_status, RequestName("status"))
820 0 : })
821 0 : .get("/ready", |r| {
822 0 : named_request_span(r, handle_ready, RequestName("ready"))
823 0 : })
824 0 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
825 0 : .post("/upcall/v1/re-attach", |r| {
826 0 : named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
827 0 : })
828 0 : .post("/upcall/v1/validate", |r| {
829 0 : named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
830 0 : })
831 0 : // Test/dev/debug endpoints
832 0 : .post("/debug/v1/attach-hook", |r| {
833 0 : named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
834 0 : })
835 0 : .post("/debug/v1/inspect", |r| {
836 0 : named_request_span(r, handle_inspect, RequestName("debug_v1_inspect"))
837 0 : })
838 0 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
839 0 : named_request_span(r, handle_tenant_drop, RequestName("debug_v1_tenant_drop"))
840 0 : })
841 0 : .post("/debug/v1/node/:node_id/drop", |r| {
842 0 : named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
843 0 : })
844 0 : .post("/debug/v1/tenant/:tenant_id/import", |r| {
845 0 : named_request_span(
846 0 : r,
847 0 : handle_tenant_import,
848 0 : RequestName("debug_v1_tenant_import"),
849 0 : )
850 0 : })
851 0 : .get("/debug/v1/tenant", |r| {
852 0 : named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
853 0 : })
854 0 : .get("/debug/v1/tenant/:tenant_id/locate", |r| {
855 0 : tenant_service_handler(
856 0 : r,
857 0 : handle_tenant_locate,
858 0 : RequestName("debug_v1_tenant_locate"),
859 0 : )
860 0 : })
861 0 : .get("/debug/v1/scheduler", |r| {
862 0 : named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
863 0 : })
864 0 : .post("/debug/v1/consistency_check", |r| {
865 0 : named_request_span(
866 0 : r,
867 0 : handle_consistency_check,
868 0 : RequestName("debug_v1_consistency_check"),
869 0 : )
870 0 : })
871 0 : .post("/debug/v1/reconcile_all", |r| {
872 0 : request_span(r, handle_reconcile_all)
873 0 : })
874 0 : .put("/debug/v1/failpoints", |r| {
875 0 : request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
876 0 : })
877 0 : // Node operations
878 0 : .post("/control/v1/node", |r| {
879 0 : named_request_span(r, handle_node_register, RequestName("control_v1_node"))
880 0 : })
881 0 : .get("/control/v1/node", |r| {
882 0 : named_request_span(r, handle_node_list, RequestName("control_v1_node"))
883 0 : })
884 0 : .put("/control/v1/node/:node_id/config", |r| {
885 0 : named_request_span(
886 0 : r,
887 0 : handle_node_configure,
888 0 : RequestName("control_v1_node_config"),
889 0 : )
890 0 : })
891 0 : .get("/control/v1/node/:node_id", |r| {
892 0 : named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
893 0 : })
894 0 : .put("/control/v1/node/:node_id/drain", |r| {
895 0 : named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
896 0 : })
897 0 : .delete("/control/v1/node/:node_id/drain", |r| {
898 0 : named_request_span(
899 0 : r,
900 0 : handle_cancel_node_drain,
901 0 : RequestName("control_v1_cancel_node_drain"),
902 0 : )
903 0 : })
904 0 : .put("/control/v1/node/:node_id/fill", |r| {
905 0 : named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
906 0 : })
907 0 : .delete("/control/v1/node/:node_id/fill", |r| {
908 0 : named_request_span(
909 0 : r,
910 0 : handle_cancel_node_fill,
911 0 : RequestName("control_v1_cancel_node_fill"),
912 0 : )
913 0 : })
914 0 : // TODO(vlad): endpoint for cancelling drain and fill
915 0 : // Tenant Shard operations
916 0 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
917 0 : tenant_service_handler(
918 0 : r,
919 0 : handle_tenant_shard_migrate,
920 0 : RequestName("control_v1_tenant_migrate"),
921 0 : )
922 0 : })
923 0 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
924 0 : tenant_service_handler(
925 0 : r,
926 0 : handle_tenant_shard_split,
927 0 : RequestName("control_v1_tenant_shard_split"),
928 0 : )
929 0 : })
930 0 : .get("/control/v1/tenant/:tenant_id", |r| {
931 0 : tenant_service_handler(
932 0 : r,
933 0 : handle_tenant_describe,
934 0 : RequestName("control_v1_tenant_describe"),
935 0 : )
936 0 : })
937 0 : .get("/control/v1/tenant", |r| {
938 0 : tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list"))
939 0 : })
940 0 : .put("/control/v1/tenant/:tenant_id/policy", |r| {
941 0 : named_request_span(
942 0 : r,
943 0 : handle_tenant_update_policy,
944 0 : RequestName("control_v1_tenant_policy"),
945 0 : )
946 0 : })
947 0 : // Tenant operations
948 0 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
949 0 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
950 0 : .post("/v1/tenant", |r| {
951 0 : tenant_service_handler(r, handle_tenant_create, RequestName("v1_tenant"))
952 0 : })
953 0 : .delete("/v1/tenant/:tenant_id", |r| {
954 0 : tenant_service_handler(r, handle_tenant_delete, RequestName("v1_tenant"))
955 0 : })
956 0 : .put("/v1/tenant/config", |r| {
957 0 : tenant_service_handler(r, handle_tenant_config_set, RequestName("v1_tenant_config"))
958 0 : })
959 0 : .get("/v1/tenant/:tenant_id/config", |r| {
960 0 : tenant_service_handler(r, handle_tenant_config_get, RequestName("v1_tenant_config"))
961 0 : })
962 0 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
963 0 : tenant_service_handler(
964 0 : r,
965 0 : handle_tenant_location_config,
966 0 : RequestName("v1_tenant_location_config"),
967 0 : )
968 0 : })
969 0 : .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
970 0 : tenant_service_handler(
971 0 : r,
972 0 : handle_tenant_time_travel_remote_storage,
973 0 : RequestName("v1_tenant_time_travel_remote_storage"),
974 0 : )
975 0 : })
976 0 : .post("/v1/tenant/:tenant_id/secondary/download", |r| {
977 0 : tenant_service_handler(
978 0 : r,
979 0 : handle_tenant_secondary_download,
980 0 : RequestName("v1_tenant_secondary_download"),
981 0 : )
982 0 : })
983 0 : // Timeline operations
984 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
985 0 : tenant_service_handler(
986 0 : r,
987 0 : handle_tenant_timeline_delete,
988 0 : RequestName("v1_tenant_timeline"),
989 0 : )
990 0 : })
991 0 : .post("/v1/tenant/:tenant_id/timeline", |r| {
992 0 : tenant_service_handler(
993 0 : r,
994 0 : handle_tenant_timeline_create,
995 0 : RequestName("v1_tenant_timeline"),
996 0 : )
997 0 : })
998 0 : // Tenant detail GET passthrough to shard zero:
999 0 : .get("/v1/tenant/:tenant_id", |r| {
1000 0 : tenant_service_handler(
1001 0 : r,
1002 0 : handle_tenant_timeline_passthrough,
1003 0 : RequestName("v1_tenant_passthrough"),
1004 0 : )
1005 0 : })
1006 0 : // The `*` in the URL is a wildcard: any tenant/timeline GET APIs on the pageserver
1007 0 : // are implicitly exposed here. This must be last in the list to avoid
1008 0 : // taking precedence over other GET methods we might implement by hand.
1009 0 : .get("/v1/tenant/:tenant_id/*", |r| {
1010 0 : tenant_service_handler(
1011 0 : r,
1012 0 : handle_tenant_timeline_passthrough,
1013 0 : RequestName("v1_tenant_passthrough"),
1014 0 : )
1015 0 : })
1016 0 : }
|