Line data Source code
1 : use crate::metrics::{
2 : HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, PageserverRequestLabelGroup,
3 : METRICS_REGISTRY,
4 : };
5 : use crate::reconciler::ReconcileError;
6 : use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
7 : use futures::Future;
8 : use hyper::header::CONTENT_TYPE;
9 : use hyper::{Body, Request, Response};
10 : use hyper::{StatusCode, Uri};
11 : use pageserver_api::models::{
12 : TenantConfigRequest, TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
13 : TenantTimeTravelRequest, TimelineCreateRequest,
14 : };
15 : use pageserver_api::shard::TenantShardId;
16 : use pageserver_client::mgmt_api;
17 : use std::sync::Arc;
18 : use std::time::{Duration, Instant};
19 : use tokio_util::sync::CancellationToken;
20 : use utils::auth::{Scope, SwappableJwtAuth};
21 : use utils::failpoint_support::failpoints_handler;
22 : use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
23 : use utils::http::request::{must_get_query_param, parse_query_param, parse_request_param};
24 : use utils::id::{TenantId, TimelineId};
25 :
26 : use utils::{
27 : http::{
28 : endpoint::{self},
29 : error::ApiError,
30 : json::{json_request, json_response},
31 : RequestExt, RouterBuilder,
32 : },
33 : id::NodeId,
34 : };
35 :
36 : use pageserver_api::controller_api::{
37 : NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantPolicyRequest,
38 : TenantShardMigrateRequest,
39 : };
40 : use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
41 :
42 : use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
43 :
44 : use routerify::Middleware;
45 :
46 : /// State available to HTTP request handlers
47 : #[derive(Clone)]
48 : pub struct HttpState {
49 : service: Arc<crate::service::Service>,
50 : auth: Option<Arc<SwappableJwtAuth>>,
51 : allowlist_routes: Vec<Uri>,
52 : }
53 :
54 : impl HttpState {
55 0 : pub fn new(service: Arc<crate::service::Service>, auth: Option<Arc<SwappableJwtAuth>>) -> Self {
56 0 : let allowlist_routes = ["/status", "/ready", "/metrics"]
57 0 : .iter()
58 0 : .map(|v| v.parse().unwrap())
59 0 : .collect::<Vec<_>>();
60 0 : Self {
61 0 : service,
62 0 : auth,
63 0 : allowlist_routes,
64 0 : }
65 0 : }
66 : }
67 :
68 : #[inline(always)]
69 0 : fn get_state(request: &Request<Body>) -> &HttpState {
70 0 : request
71 0 : .data::<Arc<HttpState>>()
72 0 : .expect("unknown state type")
73 0 : .as_ref()
74 0 : }
75 :
76 : /// Pageserver calls into this on startup, to learn which tenants it should attach
77 0 : async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
78 0 : check_permissions(&req, Scope::GenerationsApi)?;
79 :
80 0 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
81 0 : let state = get_state(&req);
82 0 : json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
83 0 : }
84 :
85 : /// Pageserver calls into this before doing deletions, to confirm that it still
86 : /// holds the latest generation for the tenants with deletions enqueued
87 0 : async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
88 0 : check_permissions(&req, Scope::GenerationsApi)?;
89 :
90 0 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
91 0 : let state = get_state(&req);
92 0 : json_response(StatusCode::OK, state.service.validate(validate_req))
93 0 : }
94 :
95 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
96 : /// (in the real control plane this is unnecessary, because the same program is managing
97 : /// generation numbers and doing attachments).
98 0 : async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
99 0 : check_permissions(&req, Scope::Admin)?;
100 :
101 0 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
102 0 : let state = get_state(&req);
103 0 :
104 0 : json_response(
105 0 : StatusCode::OK,
106 0 : state
107 0 : .service
108 0 : .attach_hook(attach_req)
109 0 : .await
110 0 : .map_err(ApiError::InternalServerError)?,
111 : )
112 0 : }
113 :
114 0 : async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
115 0 : check_permissions(&req, Scope::Admin)?;
116 :
117 0 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
118 :
119 0 : let state = get_state(&req);
120 0 :
121 0 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
122 0 : }
123 :
124 0 : async fn handle_tenant_create(
125 0 : service: Arc<Service>,
126 0 : mut req: Request<Body>,
127 0 : ) -> Result<Response<Body>, ApiError> {
128 0 : check_permissions(&req, Scope::PageServerApi)?;
129 :
130 0 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
131 :
132 : json_response(
133 : StatusCode::CREATED,
134 0 : service.tenant_create(create_req).await?,
135 : )
136 0 : }
137 :
138 : // For tenant and timeline deletions, which both implement an "initially return 202, then 404 once
139 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream. This avoids
140 : // needing to track a "deleting" state for tenants.
141 0 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
142 0 : where
143 0 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
144 0 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
145 0 : {
146 0 : let started_at = Instant::now();
147 0 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
148 0 : // completed.
149 0 : let mut retry_period = Duration::from_secs(1);
150 0 : // On subsequent retries, wait longer.
151 0 : let max_retry_period = Duration::from_secs(5);
152 0 : // Enable callers with a 30 second request timeout to reliably get a response
153 0 : let max_wait = Duration::from_secs(25);
154 :
155 : loop {
156 0 : let status = f(service.clone()).await?;
157 0 : match status {
158 : StatusCode::ACCEPTED => {
159 0 : tracing::info!("Deletion accepted, waiting to try again...");
160 0 : tokio::time::sleep(retry_period).await;
161 0 : retry_period = max_retry_period;
162 : }
163 : StatusCode::NOT_FOUND => {
164 0 : tracing::info!("Deletion complete");
165 0 : return json_response(StatusCode::OK, ());
166 : }
167 : _ => {
168 0 : tracing::warn!("Unexpected status {status}");
169 0 : return json_response(status, ());
170 : }
171 : }
172 :
173 0 : let now = Instant::now();
174 0 : if now + retry_period > started_at + max_wait {
175 0 : tracing::info!("Deletion timed out waiting for 404");
176 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
177 : // the pageserver's swagger definition for this endpoint, and has the same desired
178 : // effect of causing the control plane to retry later.
179 0 : return json_response(StatusCode::CONFLICT, ());
180 0 : }
181 : }
182 0 : }
183 :
184 0 : async fn handle_tenant_location_config(
185 0 : service: Arc<Service>,
186 0 : mut req: Request<Body>,
187 0 : ) -> Result<Response<Body>, ApiError> {
188 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
189 0 : check_permissions(&req, Scope::PageServerApi)?;
190 :
191 0 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
192 : json_response(
193 : StatusCode::OK,
194 0 : service
195 0 : .tenant_location_config(tenant_shard_id, config_req)
196 0 : .await?,
197 : )
198 0 : }
199 :
200 0 : async fn handle_tenant_config_set(
201 0 : service: Arc<Service>,
202 0 : mut req: Request<Body>,
203 0 : ) -> Result<Response<Body>, ApiError> {
204 0 : check_permissions(&req, Scope::PageServerApi)?;
205 :
206 0 : let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
207 :
208 0 : json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
209 0 : }
210 :
211 0 : async fn handle_tenant_config_get(
212 0 : service: Arc<Service>,
213 0 : req: Request<Body>,
214 0 : ) -> Result<Response<Body>, ApiError> {
215 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
216 0 : check_permissions(&req, Scope::PageServerApi)?;
217 :
218 0 : json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
219 0 : }
220 :
221 0 : async fn handle_tenant_time_travel_remote_storage(
222 0 : service: Arc<Service>,
223 0 : mut req: Request<Body>,
224 0 : ) -> Result<Response<Body>, ApiError> {
225 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
226 0 : check_permissions(&req, Scope::PageServerApi)?;
227 :
228 0 : let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
229 :
230 0 : let timestamp_raw = must_get_query_param(&req, "travel_to")?;
231 0 : let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
232 0 : ApiError::BadRequest(anyhow::anyhow!(
233 0 : "Invalid time for travel_to: {timestamp_raw:?}"
234 0 : ))
235 0 : })?;
236 :
237 0 : let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
238 0 : let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
239 0 : ApiError::BadRequest(anyhow::anyhow!(
240 0 : "Invalid time for done_if_after: {done_if_after_raw:?}"
241 0 : ))
242 0 : })?;
243 :
244 0 : service
245 0 : .tenant_time_travel_remote_storage(
246 0 : &time_travel_req,
247 0 : tenant_id,
248 0 : timestamp_raw,
249 0 : done_if_after_raw,
250 0 : )
251 0 : .await?;
252 0 : json_response(StatusCode::OK, ())
253 0 : }
254 :
255 0 : async fn handle_tenant_secondary_download(
256 0 : service: Arc<Service>,
257 0 : req: Request<Body>,
258 0 : ) -> Result<Response<Body>, ApiError> {
259 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
260 0 : let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
261 :
262 0 : let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
263 0 : json_response(status, progress)
264 0 : }
265 :
266 0 : async fn handle_tenant_delete(
267 0 : service: Arc<Service>,
268 0 : req: Request<Body>,
269 0 : ) -> Result<Response<Body>, ApiError> {
270 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
271 0 : check_permissions(&req, Scope::PageServerApi)?;
272 :
273 0 : deletion_wrapper(service, move |service| async move {
274 0 : service.tenant_delete(tenant_id).await
275 0 : })
276 0 : .await
277 0 : }
278 :
279 0 : async fn handle_tenant_timeline_create(
280 0 : service: Arc<Service>,
281 0 : mut req: Request<Body>,
282 0 : ) -> Result<Response<Body>, ApiError> {
283 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
284 0 : check_permissions(&req, Scope::PageServerApi)?;
285 :
286 0 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
287 : json_response(
288 : StatusCode::CREATED,
289 0 : service
290 0 : .tenant_timeline_create(tenant_id, create_req)
291 0 : .await?,
292 : )
293 0 : }
294 :
295 0 : async fn handle_tenant_timeline_delete(
296 0 : service: Arc<Service>,
297 0 : req: Request<Body>,
298 0 : ) -> Result<Response<Body>, ApiError> {
299 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
300 0 : check_permissions(&req, Scope::PageServerApi)?;
301 :
302 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
303 :
304 0 : deletion_wrapper(service, move |service| async move {
305 0 : service.tenant_timeline_delete(tenant_id, timeline_id).await
306 0 : })
307 0 : .await
308 0 : }
309 :
310 0 : async fn handle_tenant_timeline_passthrough(
311 0 : service: Arc<Service>,
312 0 : req: Request<Body>,
313 0 : ) -> Result<Response<Body>, ApiError> {
314 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
315 0 : check_permissions(&req, Scope::PageServerApi)?;
316 :
317 0 : let Some(path) = req.uri().path_and_query() else {
318 : // This should never happen, our request router only calls us if there is a path
319 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
320 : };
321 :
322 0 : tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
323 :
324 : // Find the node that holds shard zero
325 0 : let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?;
326 :
327 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
328 : // rewrite this to a shard-aware shard zero ID.
329 0 : let path = format!("{}", path);
330 0 : let tenant_str = tenant_id.to_string();
331 0 : let tenant_shard_str = format!("{}", tenant_shard_id);
332 0 : let path = path.replace(&tenant_str, &tenant_shard_str);
333 0 :
334 0 : let latency = &METRICS_REGISTRY
335 0 : .metrics_group
336 0 : .storage_controller_passthrough_request_latency;
337 0 :
338 0 : // This is a bit awkward. We remove the param from the request
339 0 : // and join the words by '_' to get a label for the request.
340 0 : let just_path = path.replace(&tenant_shard_str, "");
341 0 : let path_label = just_path
342 0 : .split('/')
343 0 : .filter(|token| !token.is_empty())
344 0 : .collect::<Vec<_>>()
345 0 : .join("_");
346 0 : let labels = PageserverRequestLabelGroup {
347 0 : pageserver_id: &node.get_id().to_string(),
348 0 : path: &path_label,
349 0 : method: crate::metrics::Method::Get,
350 0 : };
351 0 :
352 0 : let _timer = latency.start_timer(labels.clone());
353 0 :
354 0 : let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
355 0 : let resp = client.get_raw(path).await.map_err(|_e|
356 : // FIXME: give APiError a proper Unavailable variant. We return 503 here because
357 : // if we can't successfully send a request to the pageserver, we aren't available.
358 0 : ApiError::ShuttingDown)?;
359 :
360 0 : if !resp.status().is_success() {
361 0 : let error_counter = &METRICS_REGISTRY
362 0 : .metrics_group
363 0 : .storage_controller_passthrough_request_error;
364 0 : error_counter.inc(labels);
365 0 : }
366 :
367 : // We have a reqest::Response, would like a http::Response
368 0 : let mut builder = hyper::Response::builder()
369 0 : .status(resp.status())
370 0 : .version(resp.version());
371 0 : for (k, v) in resp.headers() {
372 0 : builder = builder.header(k, v);
373 0 : }
374 :
375 0 : let response = builder
376 0 : .body(Body::wrap_stream(resp.bytes_stream()))
377 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
378 :
379 0 : Ok(response)
380 0 : }
381 :
382 0 : async fn handle_tenant_locate(
383 0 : service: Arc<Service>,
384 0 : req: Request<Body>,
385 0 : ) -> Result<Response<Body>, ApiError> {
386 0 : check_permissions(&req, Scope::Admin)?;
387 :
388 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
389 0 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
390 0 : }
391 :
392 0 : async fn handle_tenant_describe(
393 0 : service: Arc<Service>,
394 0 : req: Request<Body>,
395 0 : ) -> Result<Response<Body>, ApiError> {
396 0 : check_permissions(&req, Scope::Admin)?;
397 :
398 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
399 0 : json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
400 0 : }
401 :
402 0 : async fn handle_tenant_list(
403 0 : service: Arc<Service>,
404 0 : req: Request<Body>,
405 0 : ) -> Result<Response<Body>, ApiError> {
406 0 : check_permissions(&req, Scope::Admin)?;
407 :
408 0 : json_response(StatusCode::OK, service.tenant_list())
409 0 : }
410 :
411 0 : async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
412 0 : check_permissions(&req, Scope::Admin)?;
413 :
414 0 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
415 0 : let state = get_state(&req);
416 0 : state.service.node_register(register_req).await?;
417 0 : json_response(StatusCode::OK, ())
418 0 : }
419 :
420 0 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
421 0 : check_permissions(&req, Scope::Admin)?;
422 :
423 0 : let state = get_state(&req);
424 0 : let nodes = state.service.node_list().await?;
425 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
426 0 :
427 0 : json_response(StatusCode::OK, api_nodes)
428 0 : }
429 :
430 0 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
431 0 : check_permissions(&req, Scope::Admin)?;
432 :
433 0 : let state = get_state(&req);
434 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
435 0 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
436 0 : }
437 :
438 0 : async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
439 0 : check_permissions(&req, Scope::Admin)?;
440 :
441 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
442 0 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
443 0 : if node_id != config_req.node_id {
444 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
445 0 : "Path and body node_id differ"
446 0 : )));
447 0 : }
448 0 : let state = get_state(&req);
449 0 :
450 0 : json_response(
451 0 : StatusCode::OK,
452 0 : state
453 0 : .service
454 0 : .node_configure(
455 0 : config_req.node_id,
456 0 : config_req.availability.map(NodeAvailability::from),
457 0 : config_req.scheduling,
458 0 : )
459 0 : .await?,
460 : )
461 0 : }
462 :
463 0 : async fn handle_tenant_shard_split(
464 0 : service: Arc<Service>,
465 0 : mut req: Request<Body>,
466 0 : ) -> Result<Response<Body>, ApiError> {
467 0 : check_permissions(&req, Scope::Admin)?;
468 :
469 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
470 0 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
471 :
472 : json_response(
473 : StatusCode::OK,
474 0 : service.tenant_shard_split(tenant_id, split_req).await?,
475 : )
476 0 : }
477 :
478 0 : async fn handle_tenant_shard_migrate(
479 0 : service: Arc<Service>,
480 0 : mut req: Request<Body>,
481 0 : ) -> Result<Response<Body>, ApiError> {
482 0 : check_permissions(&req, Scope::Admin)?;
483 :
484 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
485 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
486 : json_response(
487 : StatusCode::OK,
488 0 : service
489 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
490 0 : .await?,
491 : )
492 0 : }
493 :
494 0 : async fn handle_tenant_update_policy(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
495 0 : check_permissions(&req, Scope::Admin)?;
496 :
497 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
498 0 : let update_req = json_request::<TenantPolicyRequest>(&mut req).await?;
499 0 : let state = get_state(&req);
500 0 :
501 0 : json_response(
502 0 : StatusCode::OK,
503 0 : state
504 0 : .service
505 0 : .tenant_update_policy(tenant_id, update_req)
506 0 : .await?,
507 : )
508 0 : }
509 :
510 0 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
511 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
512 0 : check_permissions(&req, Scope::PageServerApi)?;
513 :
514 0 : let state = get_state(&req);
515 0 :
516 0 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
517 0 : }
518 :
519 0 : async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
520 0 : check_permissions(&req, Scope::Admin)?;
521 :
522 0 : let state = get_state(&req);
523 0 : state.service.tenants_dump()
524 0 : }
525 :
526 0 : async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
527 0 : check_permissions(&req, Scope::Admin)?;
528 :
529 0 : let state = get_state(&req);
530 0 : state.service.scheduler_dump()
531 0 : }
532 :
533 0 : async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
534 0 : check_permissions(&req, Scope::Admin)?;
535 :
536 0 : let state = get_state(&req);
537 0 :
538 0 : json_response(StatusCode::OK, state.service.consistency_check().await?)
539 0 : }
540 :
541 0 : async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiError> {
542 0 : check_permissions(&req, Scope::Admin)?;
543 :
544 0 : let state = get_state(&req);
545 0 :
546 0 : json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
547 0 : }
548 :
549 : /// Status endpoint is just used for checking that our HTTP listener is up
550 0 : async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
551 0 : json_response(StatusCode::OK, ())
552 0 : }
553 :
554 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
555 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
556 0 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
557 0 : let state = get_state(&req);
558 0 : if state.service.startup_complete.is_ready() {
559 0 : json_response(StatusCode::OK, ())
560 : } else {
561 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
562 : }
563 0 : }
564 :
565 : impl From<ReconcileError> for ApiError {
566 0 : fn from(value: ReconcileError) -> Self {
567 0 : ApiError::Conflict(format!("Reconciliation error: {}", value))
568 0 : }
569 : }
570 :
571 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
572 : /// be allowed to run if Service has finished its initial reconciliation.
573 0 : async fn tenant_service_handler<R, H>(
574 0 : request: Request<Body>,
575 0 : handler: H,
576 0 : request_name: RequestName,
577 0 : ) -> R::Output
578 0 : where
579 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
580 0 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
581 0 : {
582 0 : let state = get_state(&request);
583 0 : let service = state.service.clone();
584 0 :
585 0 : let startup_complete = service.startup_complete.clone();
586 0 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
587 0 : .await
588 0 : .is_err()
589 : {
590 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
591 : // timeouts around its remote calls, to bound its runtime.
592 0 : return Err(ApiError::Timeout(
593 0 : "Timed out waiting for service readiness".into(),
594 0 : ));
595 0 : }
596 0 :
597 0 : named_request_span(
598 0 : request,
599 0 : |request| async move { handler(service, request).await },
600 0 : request_name,
601 0 : )
602 0 : .await
603 0 : }
604 :
605 : /// Check if the required scope is held in the request's token, or if the request has
606 : /// a token with 'admin' scope then always permit it.
607 0 : fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(), ApiError> {
608 0 : check_permission_with(request, |claims| {
609 0 : match crate::auth::check_permission(claims, required_scope) {
610 0 : Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
611 0 : Ok(()) => Ok(()),
612 0 : Err(_) => Err(e),
613 : },
614 0 : Ok(()) => Ok(()),
615 : }
616 0 : })
617 0 : }
618 :
619 : #[derive(Clone, Debug)]
620 : struct RequestMeta {
621 : method: hyper::http::Method,
622 : at: Instant,
623 : }
624 :
625 0 : fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
626 0 : ) -> Middleware<B, ApiError> {
627 0 : Middleware::pre(move |req| async move {
628 0 : let meta = RequestMeta {
629 0 : method: req.method().clone(),
630 0 : at: Instant::now(),
631 0 : };
632 0 :
633 0 : req.set_context(meta);
634 0 :
635 0 : Ok(req)
636 0 : })
637 0 : }
638 :
639 0 : fn epilogue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
640 0 : ) -> Middleware<B, ApiError> {
641 0 : Middleware::post_with_info(move |resp, req_info| async move {
642 0 : let request_name = match req_info.context::<RequestName>() {
643 0 : Some(name) => name,
644 : None => {
645 0 : return Ok(resp);
646 : }
647 : };
648 :
649 0 : if let Some(meta) = req_info.context::<RequestMeta>() {
650 0 : let status = &crate::metrics::METRICS_REGISTRY
651 0 : .metrics_group
652 0 : .storage_controller_http_request_status;
653 0 : let latency = &crate::metrics::METRICS_REGISTRY
654 0 : .metrics_group
655 0 : .storage_controller_http_request_latency;
656 0 :
657 0 : status.inc(HttpRequestStatusLabelGroup {
658 0 : path: request_name.0,
659 0 : method: meta.method.clone().into(),
660 0 : status: crate::metrics::StatusCode(resp.status()),
661 0 : });
662 0 :
663 0 : latency.observe(
664 0 : HttpRequestLatencyLabelGroup {
665 0 : path: request_name.0,
666 0 : method: meta.method.into(),
667 0 : },
668 0 : meta.at.elapsed().as_secs_f64(),
669 0 : );
670 0 : }
671 0 : Ok(resp)
672 0 : })
673 0 : }
674 :
675 0 : pub async fn measured_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
676 0 : pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
677 0 :
678 0 : let payload = crate::metrics::METRICS_REGISTRY.encode();
679 0 : let response = Response::builder()
680 0 : .status(200)
681 0 : .header(CONTENT_TYPE, TEXT_FORMAT)
682 0 : .body(payload.into())
683 0 : .unwrap();
684 0 :
685 0 : Ok(response)
686 0 : }
687 :
688 : #[derive(Clone)]
689 : struct RequestName(&'static str);
690 :
691 0 : async fn named_request_span<R, H>(
692 0 : request: Request<Body>,
693 0 : handler: H,
694 0 : name: RequestName,
695 0 : ) -> R::Output
696 0 : where
697 0 : R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
698 0 : H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
699 0 : {
700 0 : request.set_context(name);
701 0 : request_span(request, handler).await
702 0 : }
703 :
704 0 : pub fn make_router(
705 0 : service: Arc<Service>,
706 0 : auth: Option<Arc<SwappableJwtAuth>>,
707 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
708 0 : let mut router = endpoint::make_router()
709 0 : .middleware(prologue_metrics_middleware())
710 0 : .middleware(epilogue_metrics_middleware());
711 0 : if auth.is_some() {
712 0 : router = router.middleware(auth_middleware(|request| {
713 0 : let state = get_state(request);
714 0 : if state.allowlist_routes.contains(request.uri()) {
715 0 : None
716 : } else {
717 0 : state.auth.as_deref()
718 : }
719 0 : }));
720 0 : }
721 :
722 0 : router
723 0 : .data(Arc::new(HttpState::new(service, auth)))
724 0 : .get("/metrics", |r| {
725 0 : named_request_span(r, measured_metrics_handler, RequestName("metrics"))
726 0 : })
727 0 : // Non-prefixed generic endpoints (status, metrics)
728 0 : .get("/status", |r| {
729 0 : named_request_span(r, handle_status, RequestName("status"))
730 0 : })
731 0 : .get("/ready", |r| {
732 0 : named_request_span(r, handle_ready, RequestName("ready"))
733 0 : })
734 0 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
735 0 : .post("/upcall/v1/re-attach", |r| {
736 0 : named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
737 0 : })
738 0 : .post("/upcall/v1/validate", |r| {
739 0 : named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
740 0 : })
741 0 : // Test/dev/debug endpoints
742 0 : .post("/debug/v1/attach-hook", |r| {
743 0 : named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
744 0 : })
745 0 : .post("/debug/v1/inspect", |r| {
746 0 : named_request_span(r, handle_inspect, RequestName("debug_v1_inspect"))
747 0 : })
748 0 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
749 0 : named_request_span(r, handle_tenant_drop, RequestName("debug_v1_tenant_drop"))
750 0 : })
751 0 : .post("/debug/v1/node/:node_id/drop", |r| {
752 0 : named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
753 0 : })
754 0 : .get("/debug/v1/tenant", |r| {
755 0 : named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
756 0 : })
757 0 : .get("/debug/v1/tenant/:tenant_id/locate", |r| {
758 0 : tenant_service_handler(
759 0 : r,
760 0 : handle_tenant_locate,
761 0 : RequestName("debug_v1_tenant_locate"),
762 0 : )
763 0 : })
764 0 : .get("/debug/v1/scheduler", |r| {
765 0 : named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
766 0 : })
767 0 : .post("/debug/v1/consistency_check", |r| {
768 0 : named_request_span(
769 0 : r,
770 0 : handle_consistency_check,
771 0 : RequestName("debug_v1_consistency_check"),
772 0 : )
773 0 : })
774 0 : .post("/debug/v1/reconcile_all", |r| {
775 0 : request_span(r, handle_reconcile_all)
776 0 : })
777 0 : .put("/debug/v1/failpoints", |r| {
778 0 : request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
779 0 : })
780 0 : // Node operations
781 0 : .post("/control/v1/node", |r| {
782 0 : named_request_span(r, handle_node_register, RequestName("control_v1_node"))
783 0 : })
784 0 : .get("/control/v1/node", |r| {
785 0 : named_request_span(r, handle_node_list, RequestName("control_v1_node"))
786 0 : })
787 0 : .put("/control/v1/node/:node_id/config", |r| {
788 0 : named_request_span(
789 0 : r,
790 0 : handle_node_configure,
791 0 : RequestName("control_v1_node_config"),
792 0 : )
793 0 : })
794 0 : // Tenant Shard operations
795 0 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
796 0 : tenant_service_handler(
797 0 : r,
798 0 : handle_tenant_shard_migrate,
799 0 : RequestName("control_v1_tenant_migrate"),
800 0 : )
801 0 : })
802 0 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
803 0 : tenant_service_handler(
804 0 : r,
805 0 : handle_tenant_shard_split,
806 0 : RequestName("control_v1_tenant_shard_split"),
807 0 : )
808 0 : })
809 0 : .get("/control/v1/tenant/:tenant_id", |r| {
810 0 : tenant_service_handler(
811 0 : r,
812 0 : handle_tenant_describe,
813 0 : RequestName("control_v1_tenant_describe"),
814 0 : )
815 0 : })
816 0 : .get("/control/v1/tenant", |r| {
817 0 : tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list"))
818 0 : })
819 0 : .put("/control/v1/tenant/:tenant_id/policy", |r| {
820 0 : named_request_span(
821 0 : r,
822 0 : handle_tenant_update_policy,
823 0 : RequestName("control_v1_tenant_policy"),
824 0 : )
825 0 : })
826 0 : // Tenant operations
827 0 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
828 0 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
829 0 : .post("/v1/tenant", |r| {
830 0 : tenant_service_handler(r, handle_tenant_create, RequestName("v1_tenant"))
831 0 : })
832 0 : .delete("/v1/tenant/:tenant_id", |r| {
833 0 : tenant_service_handler(r, handle_tenant_delete, RequestName("v1_tenant"))
834 0 : })
835 0 : .put("/v1/tenant/config", |r| {
836 0 : tenant_service_handler(r, handle_tenant_config_set, RequestName("v1_tenant_config"))
837 0 : })
838 0 : .get("/v1/tenant/:tenant_id/config", |r| {
839 0 : tenant_service_handler(r, handle_tenant_config_get, RequestName("v1_tenant_config"))
840 0 : })
841 0 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
842 0 : tenant_service_handler(
843 0 : r,
844 0 : handle_tenant_location_config,
845 0 : RequestName("v1_tenant_location_config"),
846 0 : )
847 0 : })
848 0 : .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
849 0 : tenant_service_handler(
850 0 : r,
851 0 : handle_tenant_time_travel_remote_storage,
852 0 : RequestName("v1_tenant_time_travel_remote_storage"),
853 0 : )
854 0 : })
855 0 : .post("/v1/tenant/:tenant_id/secondary/download", |r| {
856 0 : tenant_service_handler(
857 0 : r,
858 0 : handle_tenant_secondary_download,
859 0 : RequestName("v1_tenant_secondary_download"),
860 0 : )
861 0 : })
862 0 : // Timeline operations
863 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
864 0 : tenant_service_handler(
865 0 : r,
866 0 : handle_tenant_timeline_delete,
867 0 : RequestName("v1_tenant_timeline"),
868 0 : )
869 0 : })
870 0 : .post("/v1/tenant/:tenant_id/timeline", |r| {
871 0 : tenant_service_handler(
872 0 : r,
873 0 : handle_tenant_timeline_create,
874 0 : RequestName("v1_tenant_timeline"),
875 0 : )
876 0 : })
877 0 : // Tenant detail GET passthrough to shard zero
878 0 : .get("/v1/tenant/:tenant_id", |r| {
879 0 : tenant_service_handler(
880 0 : r,
881 0 : handle_tenant_timeline_passthrough,
882 0 : RequestName("v1_tenant_passthrough"),
883 0 : )
884 0 : })
885 0 : // Timeline GET passthrough to shard zero. Note that the `*` in the URL is a wildcard: any future
886 0 : // timeline GET APIs will be implicitly included.
887 0 : .get("/v1/tenant/:tenant_id/timeline*", |r| {
888 0 : tenant_service_handler(
889 0 : r,
890 0 : handle_tenant_timeline_passthrough,
891 0 : RequestName("v1_tenant_timeline_passthrough"),
892 0 : )
893 0 : })
894 0 : }
|