Line data Source code
1 : use crate::metrics::{
2 : HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, PageserverRequestLabelGroup,
3 : METRICS_REGISTRY,
4 : };
5 : use crate::persistence::SafekeeperPersistence;
6 : use crate::reconciler::ReconcileError;
7 : use crate::service::{LeadershipStatus, Service, STARTUP_RECONCILE_TIMEOUT};
8 : use anyhow::Context;
9 : use futures::Future;
10 : use hyper::header::CONTENT_TYPE;
11 : use hyper::{Body, Request, Response};
12 : use hyper::{StatusCode, Uri};
13 : use metrics::{BuildInfo, NeonMetrics};
14 : use pageserver_api::controller_api::{
15 : MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
16 : MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
17 : ShardsPreferredAzsRequest, TenantCreateRequest,
18 : };
19 : use pageserver_api::models::{
20 : TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
21 : TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest,
22 : };
23 : use pageserver_api::shard::TenantShardId;
24 : use pageserver_client::{mgmt_api, BlockUnblock};
25 : use std::sync::Arc;
26 : use std::time::{Duration, Instant};
27 : use tokio_util::sync::CancellationToken;
28 : use utils::auth::{Scope, SwappableJwtAuth};
29 : use utils::failpoint_support::failpoints_handler;
30 : use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
31 : use utils::http::request::{must_get_query_param, parse_query_param, parse_request_param};
32 : use utils::id::{TenantId, TimelineId};
33 :
34 : use utils::{
35 : http::{
36 : endpoint::{self},
37 : error::ApiError,
38 : json::{json_request, json_response},
39 : RequestExt, RouterBuilder,
40 : },
41 : id::NodeId,
42 : };
43 :
44 : use pageserver_api::controller_api::{
45 : NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantPolicyRequest,
46 : TenantShardMigrateRequest,
47 : };
48 : use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
49 :
50 : use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
51 :
52 : use routerify::Middleware;
53 :
54 : /// State available to HTTP request handlers
55 : pub struct HttpState {
56 : service: Arc<crate::service::Service>,
57 : auth: Option<Arc<SwappableJwtAuth>>,
58 : neon_metrics: NeonMetrics,
59 : allowlist_routes: Vec<Uri>,
60 : }
61 :
62 : impl HttpState {
63 0 : pub fn new(
64 0 : service: Arc<crate::service::Service>,
65 0 : auth: Option<Arc<SwappableJwtAuth>>,
66 0 : build_info: BuildInfo,
67 0 : ) -> Self {
68 0 : let allowlist_routes = ["/status", "/ready", "/metrics"]
69 0 : .iter()
70 0 : .map(|v| v.parse().unwrap())
71 0 : .collect::<Vec<_>>();
72 0 : Self {
73 0 : service,
74 0 : auth,
75 0 : neon_metrics: NeonMetrics::new(build_info),
76 0 : allowlist_routes,
77 0 : }
78 0 : }
79 : }
80 :
81 : #[inline(always)]
82 0 : fn get_state(request: &Request<Body>) -> &HttpState {
83 0 : request
84 0 : .data::<Arc<HttpState>>()
85 0 : .expect("unknown state type")
86 0 : .as_ref()
87 0 : }
88 :
89 : /// Pageserver calls into this on startup, to learn which tenants it should attach
90 0 : async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
91 0 : check_permissions(&req, Scope::GenerationsApi)?;
92 :
93 0 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
94 0 : let state = get_state(&req);
95 0 : json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
96 0 : }
97 :
98 : /// Pageserver calls into this before doing deletions, to confirm that it still
99 : /// holds the latest generation for the tenants with deletions enqueued
100 0 : async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
101 0 : check_permissions(&req, Scope::GenerationsApi)?;
102 :
103 0 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
104 0 : let state = get_state(&req);
105 0 : json_response(StatusCode::OK, state.service.validate(validate_req).await?)
106 0 : }
107 :
108 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
109 : /// (in the real control plane this is unnecessary, because the same program is managing
110 : /// generation numbers and doing attachments).
111 0 : async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
112 0 : check_permissions(&req, Scope::Admin)?;
113 :
114 0 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
115 0 : let state = get_state(&req);
116 0 :
117 0 : json_response(
118 0 : StatusCode::OK,
119 0 : state
120 0 : .service
121 0 : .attach_hook(attach_req)
122 0 : .await
123 0 : .map_err(ApiError::InternalServerError)?,
124 : )
125 0 : }
126 :
127 0 : async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
128 0 : check_permissions(&req, Scope::Admin)?;
129 :
130 0 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
131 :
132 0 : let state = get_state(&req);
133 0 :
134 0 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
135 0 : }
136 :
137 0 : async fn handle_tenant_create(
138 0 : service: Arc<Service>,
139 0 : mut req: Request<Body>,
140 0 : ) -> Result<Response<Body>, ApiError> {
141 0 : check_permissions(&req, Scope::PageServerApi)?;
142 :
143 0 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
144 :
145 : json_response(
146 : StatusCode::CREATED,
147 0 : service.tenant_create(create_req).await?,
148 : )
149 0 : }
150 :
151 0 : async fn handle_tenant_location_config(
152 0 : service: Arc<Service>,
153 0 : mut req: Request<Body>,
154 0 : ) -> Result<Response<Body>, ApiError> {
155 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
156 0 : check_permissions(&req, Scope::PageServerApi)?;
157 :
158 0 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
159 : json_response(
160 : StatusCode::OK,
161 0 : service
162 0 : .tenant_location_config(tenant_shard_id, config_req)
163 0 : .await?,
164 : )
165 0 : }
166 :
167 0 : async fn handle_tenant_config_set(
168 0 : service: Arc<Service>,
169 0 : mut req: Request<Body>,
170 0 : ) -> Result<Response<Body>, ApiError> {
171 0 : check_permissions(&req, Scope::PageServerApi)?;
172 :
173 0 : let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
174 :
175 0 : json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
176 0 : }
177 :
178 0 : async fn handle_tenant_config_get(
179 0 : service: Arc<Service>,
180 0 : req: Request<Body>,
181 0 : ) -> Result<Response<Body>, ApiError> {
182 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
183 0 : check_permissions(&req, Scope::PageServerApi)?;
184 :
185 0 : json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
186 0 : }
187 :
188 0 : async fn handle_tenant_time_travel_remote_storage(
189 0 : service: Arc<Service>,
190 0 : mut req: Request<Body>,
191 0 : ) -> Result<Response<Body>, ApiError> {
192 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
193 0 : check_permissions(&req, Scope::PageServerApi)?;
194 :
195 0 : let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
196 :
197 0 : let timestamp_raw = must_get_query_param(&req, "travel_to")?;
198 0 : let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
199 0 : ApiError::BadRequest(anyhow::anyhow!(
200 0 : "Invalid time for travel_to: {timestamp_raw:?}"
201 0 : ))
202 0 : })?;
203 :
204 0 : let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
205 0 : let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
206 0 : ApiError::BadRequest(anyhow::anyhow!(
207 0 : "Invalid time for done_if_after: {done_if_after_raw:?}"
208 0 : ))
209 0 : })?;
210 :
211 0 : service
212 0 : .tenant_time_travel_remote_storage(
213 0 : &time_travel_req,
214 0 : tenant_id,
215 0 : timestamp_raw,
216 0 : done_if_after_raw,
217 0 : )
218 0 : .await?;
219 0 : json_response(StatusCode::OK, ())
220 0 : }
221 :
222 0 : fn map_reqwest_hyper_status(status: reqwest::StatusCode) -> Result<hyper::StatusCode, ApiError> {
223 0 : hyper::StatusCode::from_u16(status.as_u16())
224 0 : .context("invalid status code")
225 0 : .map_err(ApiError::InternalServerError)
226 0 : }
227 :
228 0 : async fn handle_tenant_secondary_download(
229 0 : service: Arc<Service>,
230 0 : req: Request<Body>,
231 0 : ) -> Result<Response<Body>, ApiError> {
232 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
233 0 : let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
234 :
235 0 : let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
236 0 : json_response(map_reqwest_hyper_status(status)?, progress)
237 0 : }
238 :
239 0 : async fn handle_tenant_delete(
240 0 : service: Arc<Service>,
241 0 : req: Request<Body>,
242 0 : ) -> Result<Response<Body>, ApiError> {
243 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
244 0 : check_permissions(&req, Scope::PageServerApi)?;
245 :
246 0 : let status_code = service
247 0 : .tenant_delete(tenant_id)
248 0 : .await
249 0 : .and_then(map_reqwest_hyper_status)?;
250 :
251 0 : if status_code == StatusCode::NOT_FOUND {
252 : // The pageserver uses 404 for successful deletion, but we use 200
253 0 : json_response(StatusCode::OK, ())
254 : } else {
255 0 : json_response(status_code, ())
256 : }
257 0 : }
258 :
259 0 : async fn handle_tenant_timeline_create(
260 0 : service: Arc<Service>,
261 0 : mut req: Request<Body>,
262 0 : ) -> Result<Response<Body>, ApiError> {
263 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
264 0 : check_permissions(&req, Scope::PageServerApi)?;
265 :
266 0 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
267 : json_response(
268 : StatusCode::CREATED,
269 0 : service
270 0 : .tenant_timeline_create(tenant_id, create_req)
271 0 : .await?,
272 : )
273 0 : }
274 :
275 0 : async fn handle_tenant_timeline_delete(
276 0 : service: Arc<Service>,
277 0 : req: Request<Body>,
278 0 : ) -> Result<Response<Body>, ApiError> {
279 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
280 0 : check_permissions(&req, Scope::PageServerApi)?;
281 :
282 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
283 :
284 : // For timeline deletions, which both implement an "initially return 202, then 404 once
285 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
286 0 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
287 0 : where
288 0 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
289 0 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
290 0 : {
291 0 : let started_at = Instant::now();
292 0 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
293 0 : // completed.
294 0 : let mut retry_period = Duration::from_secs(1);
295 0 : // On subsequent retries, wait longer.
296 0 : let max_retry_period = Duration::from_secs(5);
297 0 : // Enable callers with a 30 second request timeout to reliably get a response
298 0 : let max_wait = Duration::from_secs(25);
299 :
300 : loop {
301 0 : let status = f(service.clone()).await?;
302 0 : match status {
303 : StatusCode::ACCEPTED => {
304 0 : tracing::info!("Deletion accepted, waiting to try again...");
305 0 : tokio::time::sleep(retry_period).await;
306 0 : retry_period = max_retry_period;
307 : }
308 : StatusCode::NOT_FOUND => {
309 0 : tracing::info!("Deletion complete");
310 0 : return json_response(StatusCode::OK, ());
311 : }
312 : _ => {
313 0 : tracing::warn!("Unexpected status {status}");
314 0 : return json_response(status, ());
315 : }
316 : }
317 :
318 0 : let now = Instant::now();
319 0 : if now + retry_period > started_at + max_wait {
320 0 : tracing::info!("Deletion timed out waiting for 404");
321 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
322 : // the pageserver's swagger definition for this endpoint, and has the same desired
323 : // effect of causing the control plane to retry later.
324 0 : return json_response(StatusCode::CONFLICT, ());
325 0 : }
326 : }
327 0 : }
328 :
329 0 : deletion_wrapper(service, move |service| async move {
330 0 : service
331 0 : .tenant_timeline_delete(tenant_id, timeline_id)
332 0 : .await
333 0 : .and_then(map_reqwest_hyper_status)
334 0 : })
335 0 : .await
336 0 : }
337 :
338 0 : async fn handle_tenant_timeline_archival_config(
339 0 : service: Arc<Service>,
340 0 : mut req: Request<Body>,
341 0 : ) -> Result<Response<Body>, ApiError> {
342 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
343 0 : check_permissions(&req, Scope::PageServerApi)?;
344 :
345 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
346 :
347 0 : let create_req = json_request::<TimelineArchivalConfigRequest>(&mut req).await?;
348 :
349 0 : service
350 0 : .tenant_timeline_archival_config(tenant_id, timeline_id, create_req)
351 0 : .await?;
352 :
353 0 : json_response(StatusCode::OK, ())
354 0 : }
355 :
356 0 : async fn handle_tenant_timeline_detach_ancestor(
357 0 : service: Arc<Service>,
358 0 : req: Request<Body>,
359 0 : ) -> Result<Response<Body>, ApiError> {
360 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
361 0 : check_permissions(&req, Scope::PageServerApi)?;
362 :
363 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
364 :
365 0 : let res = service
366 0 : .tenant_timeline_detach_ancestor(tenant_id, timeline_id)
367 0 : .await?;
368 :
369 0 : json_response(StatusCode::OK, res)
370 0 : }
371 :
372 0 : async fn handle_tenant_timeline_block_unblock_gc(
373 0 : service: Arc<Service>,
374 0 : req: Request<Body>,
375 0 : dir: BlockUnblock,
376 0 : ) -> Result<Response<Body>, ApiError> {
377 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
378 0 : check_permissions(&req, Scope::PageServerApi)?;
379 :
380 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
381 :
382 0 : service
383 0 : .tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
384 0 : .await?;
385 :
386 0 : json_response(StatusCode::OK, ())
387 0 : }
388 :
389 0 : async fn handle_tenant_timeline_passthrough(
390 0 : service: Arc<Service>,
391 0 : req: Request<Body>,
392 0 : ) -> Result<Response<Body>, ApiError> {
393 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
394 0 : check_permissions(&req, Scope::PageServerApi)?;
395 :
396 0 : let Some(path) = req.uri().path_and_query() else {
397 : // This should never happen, our request router only calls us if there is a path
398 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
399 : };
400 :
401 0 : tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
402 :
403 : // Find the node that holds shard zero
404 0 : let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?;
405 :
406 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
407 : // rewrite this to a shard-aware shard zero ID.
408 0 : let path = format!("{}", path);
409 0 : let tenant_str = tenant_id.to_string();
410 0 : let tenant_shard_str = format!("{}", tenant_shard_id);
411 0 : let path = path.replace(&tenant_str, &tenant_shard_str);
412 0 :
413 0 : let latency = &METRICS_REGISTRY
414 0 : .metrics_group
415 0 : .storage_controller_passthrough_request_latency;
416 0 :
417 0 : // This is a bit awkward. We remove the param from the request
418 0 : // and join the words by '_' to get a label for the request.
419 0 : let just_path = path.replace(&tenant_shard_str, "");
420 0 : let path_label = just_path
421 0 : .split('/')
422 0 : .filter(|token| !token.is_empty())
423 0 : .collect::<Vec<_>>()
424 0 : .join("_");
425 0 : let labels = PageserverRequestLabelGroup {
426 0 : pageserver_id: &node.get_id().to_string(),
427 0 : path: &path_label,
428 0 : method: crate::metrics::Method::Get,
429 0 : };
430 0 :
431 0 : let _timer = latency.start_timer(labels.clone());
432 0 :
433 0 : let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
434 0 : let resp = client.get_raw(path).await.map_err(|_e|
435 : // FIXME: give APiError a proper Unavailable variant. We return 503 here because
436 : // if we can't successfully send a request to the pageserver, we aren't available.
437 0 : ApiError::ShuttingDown)?;
438 :
439 0 : if !resp.status().is_success() {
440 0 : let error_counter = &METRICS_REGISTRY
441 0 : .metrics_group
442 0 : .storage_controller_passthrough_request_error;
443 0 : error_counter.inc(labels);
444 0 : }
445 :
446 : // We have a reqest::Response, would like a http::Response
447 0 : let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
448 0 : for (k, v) in resp.headers() {
449 0 : builder = builder.header(k.as_str(), v.as_bytes());
450 0 : }
451 :
452 0 : let response = builder
453 0 : .body(Body::wrap_stream(resp.bytes_stream()))
454 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
455 :
456 0 : Ok(response)
457 0 : }
458 :
459 0 : async fn handle_tenant_locate(
460 0 : service: Arc<Service>,
461 0 : req: Request<Body>,
462 0 : ) -> Result<Response<Body>, ApiError> {
463 0 : check_permissions(&req, Scope::Admin)?;
464 :
465 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
466 0 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
467 0 : }
468 :
469 0 : async fn handle_tenant_describe(
470 0 : service: Arc<Service>,
471 0 : req: Request<Body>,
472 0 : ) -> Result<Response<Body>, ApiError> {
473 0 : check_permissions(&req, Scope::Scrubber)?;
474 :
475 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
476 0 : json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
477 0 : }
478 :
479 0 : async fn handle_tenant_list(
480 0 : service: Arc<Service>,
481 0 : req: Request<Body>,
482 0 : ) -> Result<Response<Body>, ApiError> {
483 0 : check_permissions(&req, Scope::Admin)?;
484 :
485 0 : json_response(StatusCode::OK, service.tenant_list())
486 0 : }
487 :
488 0 : async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
489 0 : check_permissions(&req, Scope::Admin)?;
490 :
491 0 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
492 0 : let state = get_state(&req);
493 0 : state.service.node_register(register_req).await?;
494 0 : json_response(StatusCode::OK, ())
495 0 : }
496 :
497 0 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
498 0 : check_permissions(&req, Scope::Admin)?;
499 :
500 0 : let state = get_state(&req);
501 0 : let nodes = state.service.node_list().await?;
502 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
503 0 :
504 0 : json_response(StatusCode::OK, api_nodes)
505 0 : }
506 :
507 0 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
508 0 : check_permissions(&req, Scope::Admin)?;
509 :
510 0 : let state = get_state(&req);
511 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
512 0 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
513 0 : }
514 :
515 0 : async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
516 0 : check_permissions(&req, Scope::Admin)?;
517 :
518 0 : let state = get_state(&req);
519 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
520 0 : json_response(StatusCode::OK, state.service.node_delete(node_id).await?)
521 0 : }
522 :
523 0 : async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
524 0 : check_permissions(&req, Scope::Admin)?;
525 :
526 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
527 0 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
528 0 : if node_id != config_req.node_id {
529 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
530 0 : "Path and body node_id differ"
531 0 : )));
532 0 : }
533 0 : let state = get_state(&req);
534 0 :
535 0 : json_response(
536 0 : StatusCode::OK,
537 0 : state
538 0 : .service
539 0 : .external_node_configure(
540 0 : config_req.node_id,
541 0 : config_req.availability.map(NodeAvailability::from),
542 0 : config_req.scheduling,
543 0 : )
544 0 : .await?,
545 : )
546 0 : }
547 :
548 0 : async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
549 0 : check_permissions(&req, Scope::Admin)?;
550 :
551 0 : let state = get_state(&req);
552 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
553 :
554 0 : let node_status = state.service.get_node(node_id).await?;
555 :
556 0 : json_response(StatusCode::OK, node_status)
557 0 : }
558 :
559 0 : async fn handle_node_shards(req: Request<Body>) -> Result<Response<Body>, ApiError> {
560 0 : check_permissions(&req, Scope::Admin)?;
561 :
562 0 : let state = get_state(&req);
563 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
564 :
565 0 : let node_status = state.service.get_node_shards(node_id).await?;
566 :
567 0 : json_response(StatusCode::OK, node_status)
568 0 : }
569 :
570 0 : async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
571 0 : check_permissions(&req, Scope::Admin)?;
572 :
573 0 : let state = get_state(&req);
574 0 : let leader = state.service.get_leader().await.map_err(|err| {
575 0 : ApiError::InternalServerError(anyhow::anyhow!(
576 0 : "Failed to read leader from database: {err}"
577 0 : ))
578 0 : })?;
579 :
580 0 : json_response(StatusCode::OK, leader)
581 0 : }
582 :
583 0 : async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
584 0 : check_permissions(&req, Scope::Admin)?;
585 :
586 0 : let state = get_state(&req);
587 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
588 :
589 0 : state.service.start_node_drain(node_id).await?;
590 :
591 0 : json_response(StatusCode::ACCEPTED, ())
592 0 : }
593 :
594 0 : async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
595 0 : check_permissions(&req, Scope::Admin)?;
596 :
597 0 : let state = get_state(&req);
598 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
599 :
600 0 : state.service.cancel_node_drain(node_id).await?;
601 :
602 0 : json_response(StatusCode::ACCEPTED, ())
603 0 : }
604 :
605 0 : async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
606 0 : check_permissions(&req, Scope::Admin)?;
607 :
608 0 : let state = get_state(&req);
609 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
610 :
611 0 : state.service.start_node_fill(node_id).await?;
612 :
613 0 : json_response(StatusCode::ACCEPTED, ())
614 0 : }
615 :
616 0 : async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
617 0 : check_permissions(&req, Scope::Admin)?;
618 :
619 0 : let state = get_state(&req);
620 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
621 :
622 0 : state.service.cancel_node_fill(node_id).await?;
623 :
624 0 : json_response(StatusCode::ACCEPTED, ())
625 0 : }
626 :
627 0 : async fn handle_metadata_health_update(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
628 0 : check_permissions(&req, Scope::Scrubber)?;
629 :
630 0 : let update_req = json_request::<MetadataHealthUpdateRequest>(&mut req).await?;
631 0 : let state = get_state(&req);
632 0 :
633 0 : state.service.metadata_health_update(update_req).await?;
634 :
635 0 : json_response(StatusCode::OK, MetadataHealthUpdateResponse {})
636 0 : }
637 :
638 0 : async fn handle_metadata_health_list_unhealthy(
639 0 : req: Request<Body>,
640 0 : ) -> Result<Response<Body>, ApiError> {
641 0 : check_permissions(&req, Scope::Admin)?;
642 :
643 0 : let state = get_state(&req);
644 0 : let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
645 :
646 0 : json_response(
647 0 : StatusCode::OK,
648 0 : MetadataHealthListUnhealthyResponse {
649 0 : unhealthy_tenant_shards,
650 0 : },
651 0 : )
652 0 : }
653 :
654 0 : async fn handle_metadata_health_list_outdated(
655 0 : mut req: Request<Body>,
656 0 : ) -> Result<Response<Body>, ApiError> {
657 0 : check_permissions(&req, Scope::Admin)?;
658 :
659 0 : let list_outdated_req = json_request::<MetadataHealthListOutdatedRequest>(&mut req).await?;
660 0 : let state = get_state(&req);
661 0 : let health_records = state
662 0 : .service
663 0 : .metadata_health_list_outdated(list_outdated_req.not_scrubbed_for)
664 0 : .await?;
665 :
666 0 : json_response(
667 0 : StatusCode::OK,
668 0 : MetadataHealthListOutdatedResponse { health_records },
669 0 : )
670 0 : }
671 :
672 0 : async fn handle_tenant_shard_split(
673 0 : service: Arc<Service>,
674 0 : mut req: Request<Body>,
675 0 : ) -> Result<Response<Body>, ApiError> {
676 0 : check_permissions(&req, Scope::Admin)?;
677 :
678 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
679 0 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
680 :
681 : json_response(
682 : StatusCode::OK,
683 0 : service.tenant_shard_split(tenant_id, split_req).await?,
684 : )
685 0 : }
686 :
687 0 : async fn handle_tenant_shard_migrate(
688 0 : service: Arc<Service>,
689 0 : mut req: Request<Body>,
690 0 : ) -> Result<Response<Body>, ApiError> {
691 0 : check_permissions(&req, Scope::Admin)?;
692 :
693 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
694 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
695 : json_response(
696 : StatusCode::OK,
697 0 : service
698 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
699 0 : .await?,
700 : )
701 0 : }
702 :
703 0 : async fn handle_tenant_update_policy(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
704 0 : check_permissions(&req, Scope::Admin)?;
705 :
706 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
707 0 : let update_req = json_request::<TenantPolicyRequest>(&mut req).await?;
708 0 : let state = get_state(&req);
709 0 :
710 0 : json_response(
711 0 : StatusCode::OK,
712 0 : state
713 0 : .service
714 0 : .tenant_update_policy(tenant_id, update_req)
715 0 : .await?,
716 : )
717 0 : }
718 :
719 0 : async fn handle_update_preferred_azs(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
720 0 : check_permissions(&req, Scope::Admin)?;
721 :
722 0 : let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
723 0 : let state = get_state(&req);
724 0 :
725 0 : json_response(
726 0 : StatusCode::OK,
727 0 : state.service.update_shards_preferred_azs(azs_req).await?,
728 : )
729 0 : }
730 :
731 0 : async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
732 0 : check_permissions(&req, Scope::Admin)?;
733 :
734 0 : let state = get_state(&req);
735 0 : json_response(StatusCode::OK, state.service.step_down().await)
736 0 : }
737 :
738 0 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
739 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
740 0 : check_permissions(&req, Scope::PageServerApi)?;
741 :
742 0 : let state = get_state(&req);
743 0 :
744 0 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
745 0 : }
746 :
747 0 : async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
748 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
749 0 : check_permissions(&req, Scope::PageServerApi)?;
750 :
751 0 : let state = get_state(&req);
752 0 :
753 0 : json_response(
754 0 : StatusCode::OK,
755 0 : state.service.tenant_import(tenant_id).await?,
756 : )
757 0 : }
758 :
759 0 : async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
760 0 : check_permissions(&req, Scope::Admin)?;
761 :
762 0 : let state = get_state(&req);
763 0 : state.service.tenants_dump()
764 0 : }
765 :
766 0 : async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
767 0 : check_permissions(&req, Scope::Admin)?;
768 :
769 0 : let state = get_state(&req);
770 0 : state.service.scheduler_dump()
771 0 : }
772 :
773 0 : async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
774 0 : check_permissions(&req, Scope::Admin)?;
775 :
776 0 : let state = get_state(&req);
777 0 :
778 0 : json_response(StatusCode::OK, state.service.consistency_check().await?)
779 0 : }
780 :
781 0 : async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiError> {
782 0 : check_permissions(&req, Scope::Admin)?;
783 :
784 0 : let state = get_state(&req);
785 0 :
786 0 : json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
787 0 : }
788 :
789 : /// Status endpoint is just used for checking that our HTTP listener is up
790 0 : async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
791 0 : json_response(StatusCode::OK, ())
792 0 : }
793 :
794 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
795 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
796 0 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
797 0 : let state = get_state(&req);
798 0 : if state.service.startup_complete.is_ready() {
799 0 : json_response(StatusCode::OK, ())
800 : } else {
801 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
802 : }
803 0 : }
804 :
805 : impl From<ReconcileError> for ApiError {
806 0 : fn from(value: ReconcileError) -> Self {
807 0 : ApiError::Conflict(format!("Reconciliation error: {}", value))
808 0 : }
809 : }
810 :
811 : /// Return the safekeeper record by instance id, or 404.
812 : ///
813 : /// Not used by anything except manual testing.
814 0 : async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, ApiError> {
815 0 : check_permissions(&req, Scope::Admin)?;
816 :
817 0 : let id = parse_request_param::<i64>(&req, "id")?;
818 :
819 0 : let state = get_state(&req);
820 :
821 0 : let res = state.service.get_safekeeper(id).await;
822 :
823 0 : match res {
824 0 : Ok(b) => json_response(StatusCode::OK, b),
825 : Err(crate::persistence::DatabaseError::Query(diesel::result::Error::NotFound)) => {
826 0 : Err(ApiError::NotFound("unknown instance_id".into()))
827 : }
828 0 : Err(other) => Err(other.into()),
829 : }
830 0 : }
831 :
832 : /// Used as part of deployment scripts.
833 : ///
834 : /// Assumes information is only relayed to storage controller after first selecting an unique id on
835 : /// control plane database, which means we have an id field in the request and payload.
836 0 : async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
837 0 : check_permissions(&req, Scope::Admin)?;
838 :
839 0 : let body = json_request::<SafekeeperPersistence>(&mut req).await?;
840 0 : let id = parse_request_param::<i64>(&req, "id")?;
841 :
842 0 : if id != body.id {
843 : // it should be repeated
844 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
845 0 : "id mismatch: url={id:?}, body={:?}",
846 0 : body.id
847 0 : )));
848 0 : }
849 0 :
850 0 : let state = get_state(&req);
851 0 :
852 0 : state.service.upsert_safekeeper(body).await?;
853 :
854 0 : Ok(Response::builder()
855 0 : .status(StatusCode::NO_CONTENT)
856 0 : .body(Body::empty())
857 0 : .unwrap())
858 0 : }
859 :
860 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
861 : /// be allowed to run if Service has finished its initial reconciliation.
862 0 : async fn tenant_service_handler<R, H>(
863 0 : request: Request<Body>,
864 0 : handler: H,
865 0 : request_name: RequestName,
866 0 : ) -> R::Output
867 0 : where
868 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
869 0 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
870 0 : {
871 0 : let state = get_state(&request);
872 0 : let service = state.service.clone();
873 0 :
874 0 : let startup_complete = service.startup_complete.clone();
875 0 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
876 0 : .await
877 0 : .is_err()
878 : {
879 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
880 : // timeouts around its remote calls, to bound its runtime.
881 0 : return Err(ApiError::Timeout(
882 0 : "Timed out waiting for service readiness".into(),
883 0 : ));
884 0 : }
885 0 :
886 0 : named_request_span(
887 0 : request,
888 0 : |request| async move { handler(service, request).await },
889 0 : request_name,
890 0 : )
891 0 : .await
892 0 : }
893 :
894 : /// Check if the required scope is held in the request's token, or if the request has
895 : /// a token with 'admin' scope then always permit it.
896 0 : fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(), ApiError> {
897 0 : check_permission_with(request, |claims| {
898 0 : match crate::auth::check_permission(claims, required_scope) {
899 0 : Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
900 0 : Ok(()) => Ok(()),
901 0 : Err(_) => Err(e),
902 : },
903 0 : Ok(()) => Ok(()),
904 : }
905 0 : })
906 0 : }
907 :
908 : #[derive(Clone, Debug)]
909 : struct RequestMeta {
910 : method: hyper::http::Method,
911 : at: Instant,
912 : }
913 :
914 0 : pub fn prologue_leadership_status_check_middleware<
915 0 : B: hyper::body::HttpBody + Send + Sync + 'static,
916 0 : >() -> Middleware<B, ApiError> {
917 0 : Middleware::pre(move |req| async move {
918 0 : let state = get_state(&req);
919 0 : let leadership_status = state.service.get_leadership_status();
920 :
921 : enum AllowedRoutes<'a> {
922 : All,
923 : Some(Vec<&'a str>),
924 : }
925 :
926 0 : let allowed_routes = match leadership_status {
927 0 : LeadershipStatus::Leader => AllowedRoutes::All,
928 : LeadershipStatus::SteppedDown => {
929 : // TODO: does it make sense to allow /status here?
930 0 : AllowedRoutes::Some(["/control/v1/step_down", "/status", "/metrics"].to_vec())
931 : }
932 : LeadershipStatus::Candidate => {
933 0 : AllowedRoutes::Some(["/ready", "/status", "/metrics"].to_vec())
934 : }
935 : };
936 :
937 0 : let uri = req.uri().to_string();
938 0 : match allowed_routes {
939 0 : AllowedRoutes::All => Ok(req),
940 0 : AllowedRoutes::Some(allowed) if allowed.contains(&uri.as_str()) => Ok(req),
941 : _ => {
942 0 : tracing::info!(
943 0 : "Request {} not allowed due to current leadership state",
944 0 : req.uri()
945 : );
946 :
947 0 : Err(ApiError::ResourceUnavailable(
948 0 : format!("Current leadership status is {leadership_status}").into(),
949 0 : ))
950 : }
951 : }
952 0 : })
953 0 : }
954 :
955 0 : fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
956 0 : ) -> Middleware<B, ApiError> {
957 0 : Middleware::pre(move |req| async move {
958 0 : let meta = RequestMeta {
959 0 : method: req.method().clone(),
960 0 : at: Instant::now(),
961 0 : };
962 0 :
963 0 : req.set_context(meta);
964 0 :
965 0 : Ok(req)
966 0 : })
967 0 : }
968 :
969 0 : fn epilogue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
970 0 : ) -> Middleware<B, ApiError> {
971 0 : Middleware::post_with_info(move |resp, req_info| async move {
972 0 : let request_name = match req_info.context::<RequestName>() {
973 0 : Some(name) => name,
974 : None => {
975 0 : return Ok(resp);
976 : }
977 : };
978 :
979 0 : if let Some(meta) = req_info.context::<RequestMeta>() {
980 0 : let status = &crate::metrics::METRICS_REGISTRY
981 0 : .metrics_group
982 0 : .storage_controller_http_request_status;
983 0 : let latency = &crate::metrics::METRICS_REGISTRY
984 0 : .metrics_group
985 0 : .storage_controller_http_request_latency;
986 0 :
987 0 : status.inc(HttpRequestStatusLabelGroup {
988 0 : path: request_name.0,
989 0 : method: meta.method.clone().into(),
990 0 : status: crate::metrics::StatusCode(resp.status()),
991 0 : });
992 0 :
993 0 : latency.observe(
994 0 : HttpRequestLatencyLabelGroup {
995 0 : path: request_name.0,
996 0 : method: meta.method.into(),
997 0 : },
998 0 : meta.at.elapsed().as_secs_f64(),
999 0 : );
1000 0 : }
1001 0 : Ok(resp)
1002 0 : })
1003 0 : }
1004 :
1005 0 : pub async fn measured_metrics_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1006 : pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
1007 :
1008 0 : let state = get_state(&req);
1009 0 : let payload = crate::metrics::METRICS_REGISTRY.encode(&state.neon_metrics);
1010 0 : let response = Response::builder()
1011 0 : .status(200)
1012 0 : .header(CONTENT_TYPE, TEXT_FORMAT)
1013 0 : .body(payload.into())
1014 0 : .unwrap();
1015 0 :
1016 0 : Ok(response)
1017 0 : }
1018 :
1019 : #[derive(Clone)]
1020 : struct RequestName(&'static str);
1021 :
1022 0 : async fn named_request_span<R, H>(
1023 0 : request: Request<Body>,
1024 0 : handler: H,
1025 0 : name: RequestName,
1026 0 : ) -> R::Output
1027 0 : where
1028 0 : R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1029 0 : H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
1030 0 : {
1031 0 : request.set_context(name);
1032 0 : request_span(request, handler).await
1033 0 : }
1034 :
1035 0 : pub fn make_router(
1036 0 : service: Arc<Service>,
1037 0 : auth: Option<Arc<SwappableJwtAuth>>,
1038 0 : build_info: BuildInfo,
1039 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
1040 0 : let mut router = endpoint::make_router()
1041 0 : .middleware(prologue_leadership_status_check_middleware())
1042 0 : .middleware(prologue_metrics_middleware())
1043 0 : .middleware(epilogue_metrics_middleware());
1044 0 : if auth.is_some() {
1045 0 : router = router.middleware(auth_middleware(|request| {
1046 0 : let state = get_state(request);
1047 0 : if state.allowlist_routes.contains(request.uri()) {
1048 0 : None
1049 : } else {
1050 0 : state.auth.as_deref()
1051 : }
1052 0 : }));
1053 0 : }
1054 :
1055 0 : router
1056 0 : .data(Arc::new(HttpState::new(service, auth, build_info)))
1057 0 : .get("/metrics", |r| {
1058 0 : named_request_span(r, measured_metrics_handler, RequestName("metrics"))
1059 0 : })
1060 0 : // Non-prefixed generic endpoints (status, metrics)
1061 0 : .get("/status", |r| {
1062 0 : named_request_span(r, handle_status, RequestName("status"))
1063 0 : })
1064 0 : .get("/ready", |r| {
1065 0 : named_request_span(r, handle_ready, RequestName("ready"))
1066 0 : })
1067 0 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
1068 0 : .post("/upcall/v1/re-attach", |r| {
1069 0 : named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
1070 0 : })
1071 0 : .post("/upcall/v1/validate", |r| {
1072 0 : named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
1073 0 : })
1074 0 : // Test/dev/debug endpoints
1075 0 : .post("/debug/v1/attach-hook", |r| {
1076 0 : named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
1077 0 : })
1078 0 : .post("/debug/v1/inspect", |r| {
1079 0 : named_request_span(r, handle_inspect, RequestName("debug_v1_inspect"))
1080 0 : })
1081 0 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
1082 0 : named_request_span(r, handle_tenant_drop, RequestName("debug_v1_tenant_drop"))
1083 0 : })
1084 0 : .post("/debug/v1/node/:node_id/drop", |r| {
1085 0 : named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
1086 0 : })
1087 0 : .post("/debug/v1/tenant/:tenant_id/import", |r| {
1088 0 : named_request_span(
1089 0 : r,
1090 0 : handle_tenant_import,
1091 0 : RequestName("debug_v1_tenant_import"),
1092 0 : )
1093 0 : })
1094 0 : .get("/debug/v1/tenant", |r| {
1095 0 : named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
1096 0 : })
1097 0 : .get("/debug/v1/tenant/:tenant_id/locate", |r| {
1098 0 : tenant_service_handler(
1099 0 : r,
1100 0 : handle_tenant_locate,
1101 0 : RequestName("debug_v1_tenant_locate"),
1102 0 : )
1103 0 : })
1104 0 : .get("/debug/v1/scheduler", |r| {
1105 0 : named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
1106 0 : })
1107 0 : .post("/debug/v1/consistency_check", |r| {
1108 0 : named_request_span(
1109 0 : r,
1110 0 : handle_consistency_check,
1111 0 : RequestName("debug_v1_consistency_check"),
1112 0 : )
1113 0 : })
1114 0 : .post("/debug/v1/reconcile_all", |r| {
1115 0 : request_span(r, handle_reconcile_all)
1116 0 : })
1117 0 : .put("/debug/v1/failpoints", |r| {
1118 0 : request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
1119 0 : })
1120 0 : // Node operations
1121 0 : .post("/control/v1/node", |r| {
1122 0 : named_request_span(r, handle_node_register, RequestName("control_v1_node"))
1123 0 : })
1124 0 : .delete("/control/v1/node/:node_id", |r| {
1125 0 : named_request_span(r, handle_node_delete, RequestName("control_v1_node_delete"))
1126 0 : })
1127 0 : .get("/control/v1/node", |r| {
1128 0 : named_request_span(r, handle_node_list, RequestName("control_v1_node"))
1129 0 : })
1130 0 : .put("/control/v1/node/:node_id/config", |r| {
1131 0 : named_request_span(
1132 0 : r,
1133 0 : handle_node_configure,
1134 0 : RequestName("control_v1_node_config"),
1135 0 : )
1136 0 : })
1137 0 : .get("/control/v1/node/:node_id", |r| {
1138 0 : named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
1139 0 : })
1140 0 : .get("/control/v1/node/:node_id/shards", |r| {
1141 0 : named_request_span(
1142 0 : r,
1143 0 : handle_node_shards,
1144 0 : RequestName("control_v1_node_describe"),
1145 0 : )
1146 0 : })
1147 0 : .get("/control/v1/leader", |r| {
1148 0 : named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
1149 0 : })
1150 0 : .put("/control/v1/node/:node_id/drain", |r| {
1151 0 : named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
1152 0 : })
1153 0 : .delete("/control/v1/node/:node_id/drain", |r| {
1154 0 : named_request_span(
1155 0 : r,
1156 0 : handle_cancel_node_drain,
1157 0 : RequestName("control_v1_cancel_node_drain"),
1158 0 : )
1159 0 : })
1160 0 : .put("/control/v1/node/:node_id/fill", |r| {
1161 0 : named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
1162 0 : })
1163 0 : .delete("/control/v1/node/:node_id/fill", |r| {
1164 0 : named_request_span(
1165 0 : r,
1166 0 : handle_cancel_node_fill,
1167 0 : RequestName("control_v1_cancel_node_fill"),
1168 0 : )
1169 0 : })
1170 0 : // Metadata health operations
1171 0 : .post("/control/v1/metadata_health/update", |r| {
1172 0 : named_request_span(
1173 0 : r,
1174 0 : handle_metadata_health_update,
1175 0 : RequestName("control_v1_metadata_health_update"),
1176 0 : )
1177 0 : })
1178 0 : .get("/control/v1/metadata_health/unhealthy", |r| {
1179 0 : named_request_span(
1180 0 : r,
1181 0 : handle_metadata_health_list_unhealthy,
1182 0 : RequestName("control_v1_metadata_health_list_unhealthy"),
1183 0 : )
1184 0 : })
1185 0 : .post("/control/v1/metadata_health/outdated", |r| {
1186 0 : named_request_span(
1187 0 : r,
1188 0 : handle_metadata_health_list_outdated,
1189 0 : RequestName("control_v1_metadata_health_list_outdated"),
1190 0 : )
1191 0 : })
1192 0 : // Tenant Shard operations
1193 0 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
1194 0 : tenant_service_handler(
1195 0 : r,
1196 0 : handle_tenant_shard_migrate,
1197 0 : RequestName("control_v1_tenant_migrate"),
1198 0 : )
1199 0 : })
1200 0 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
1201 0 : tenant_service_handler(
1202 0 : r,
1203 0 : handle_tenant_shard_split,
1204 0 : RequestName("control_v1_tenant_shard_split"),
1205 0 : )
1206 0 : })
1207 0 : .get("/control/v1/tenant/:tenant_id", |r| {
1208 0 : tenant_service_handler(
1209 0 : r,
1210 0 : handle_tenant_describe,
1211 0 : RequestName("control_v1_tenant_describe"),
1212 0 : )
1213 0 : })
1214 0 : .get("/control/v1/tenant", |r| {
1215 0 : tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list"))
1216 0 : })
1217 0 : .put("/control/v1/tenant/:tenant_id/policy", |r| {
1218 0 : named_request_span(
1219 0 : r,
1220 0 : handle_tenant_update_policy,
1221 0 : RequestName("control_v1_tenant_policy"),
1222 0 : )
1223 0 : })
1224 0 : .put("/control/v1/preferred_azs", |r| {
1225 0 : named_request_span(
1226 0 : r,
1227 0 : handle_update_preferred_azs,
1228 0 : RequestName("control_v1_preferred_azs"),
1229 0 : )
1230 0 : })
1231 0 : .put("/control/v1/step_down", |r| {
1232 0 : named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
1233 0 : })
1234 0 : .get("/control/v1/safekeeper/:id", |r| {
1235 0 : named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
1236 0 : })
1237 0 : .post("/control/v1/safekeeper/:id", |r| {
1238 0 : // id is in the body
1239 0 : named_request_span(r, handle_upsert_safekeeper, RequestName("v1_safekeeper"))
1240 0 : })
1241 0 : // Tenant operations
1242 0 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
1243 0 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
1244 0 : .post("/v1/tenant", |r| {
1245 0 : tenant_service_handler(r, handle_tenant_create, RequestName("v1_tenant"))
1246 0 : })
1247 0 : .delete("/v1/tenant/:tenant_id", |r| {
1248 0 : tenant_service_handler(r, handle_tenant_delete, RequestName("v1_tenant"))
1249 0 : })
1250 0 : .put("/v1/tenant/config", |r| {
1251 0 : tenant_service_handler(r, handle_tenant_config_set, RequestName("v1_tenant_config"))
1252 0 : })
1253 0 : .get("/v1/tenant/:tenant_id/config", |r| {
1254 0 : tenant_service_handler(r, handle_tenant_config_get, RequestName("v1_tenant_config"))
1255 0 : })
1256 0 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
1257 0 : tenant_service_handler(
1258 0 : r,
1259 0 : handle_tenant_location_config,
1260 0 : RequestName("v1_tenant_location_config"),
1261 0 : )
1262 0 : })
1263 0 : .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
1264 0 : tenant_service_handler(
1265 0 : r,
1266 0 : handle_tenant_time_travel_remote_storage,
1267 0 : RequestName("v1_tenant_time_travel_remote_storage"),
1268 0 : )
1269 0 : })
1270 0 : .post("/v1/tenant/:tenant_id/secondary/download", |r| {
1271 0 : tenant_service_handler(
1272 0 : r,
1273 0 : handle_tenant_secondary_download,
1274 0 : RequestName("v1_tenant_secondary_download"),
1275 0 : )
1276 0 : })
1277 0 : // Timeline operations
1278 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
1279 0 : tenant_service_handler(
1280 0 : r,
1281 0 : handle_tenant_timeline_delete,
1282 0 : RequestName("v1_tenant_timeline"),
1283 0 : )
1284 0 : })
1285 0 : .post("/v1/tenant/:tenant_id/timeline", |r| {
1286 0 : tenant_service_handler(
1287 0 : r,
1288 0 : handle_tenant_timeline_create,
1289 0 : RequestName("v1_tenant_timeline"),
1290 0 : )
1291 0 : })
1292 0 : .post(
1293 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/archival_config",
1294 0 : |r| {
1295 0 : tenant_service_handler(
1296 0 : r,
1297 0 : handle_tenant_timeline_archival_config,
1298 0 : RequestName("v1_tenant_timeline_archival_config"),
1299 0 : )
1300 0 : },
1301 0 : )
1302 0 : .put(
1303 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/detach_ancestor",
1304 0 : |r| {
1305 0 : tenant_service_handler(
1306 0 : r,
1307 0 : handle_tenant_timeline_detach_ancestor,
1308 0 : RequestName("v1_tenant_timeline_detach_ancestor"),
1309 0 : )
1310 0 : },
1311 0 : )
1312 0 : .post(
1313 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
1314 0 : |r| {
1315 0 : tenant_service_handler(
1316 0 : r,
1317 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
1318 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
1319 0 : )
1320 0 : },
1321 0 : )
1322 0 : .post(
1323 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
1324 0 : |r| {
1325 0 : tenant_service_handler(
1326 0 : r,
1327 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
1328 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
1329 0 : )
1330 0 : },
1331 0 : )
1332 0 : // Tenant detail GET passthrough to shard zero:
1333 0 : .get("/v1/tenant/:tenant_id", |r| {
1334 0 : tenant_service_handler(
1335 0 : r,
1336 0 : handle_tenant_timeline_passthrough,
1337 0 : RequestName("v1_tenant_passthrough"),
1338 0 : )
1339 0 : })
1340 0 : // The `*` in the URL is a wildcard: any tenant/timeline GET APIs on the pageserver
1341 0 : // are implicitly exposed here. This must be last in the list to avoid
1342 0 : // taking precedence over other GET methods we might implement by hand.
1343 0 : .get("/v1/tenant/:tenant_id/*", |r| {
1344 0 : tenant_service_handler(
1345 0 : r,
1346 0 : handle_tenant_timeline_passthrough,
1347 0 : RequestName("v1_tenant_passthrough"),
1348 0 : )
1349 0 : })
1350 0 : }
|