Line data Source code
1 : use crate::http;
2 : use crate::metrics::{
3 : HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, PageserverRequestLabelGroup,
4 : METRICS_REGISTRY,
5 : };
6 : use crate::persistence::SafekeeperUpsert;
7 : use crate::reconciler::ReconcileError;
8 : use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT};
9 : use anyhow::Context;
10 : use futures::Future;
11 : use http_utils::{
12 : endpoint::{
13 : self, auth_middleware, check_permission_with, profile_cpu_handler, profile_heap_handler,
14 : request_span,
15 : },
16 : error::ApiError,
17 : failpoints::failpoints_handler,
18 : json::{json_request, json_response},
19 : request::{must_get_query_param, parse_query_param, parse_request_param},
20 : RequestExt, RouterBuilder,
21 : };
22 : use hyper::header::CONTENT_TYPE;
23 : use hyper::{Body, Request, Response};
24 : use hyper::{StatusCode, Uri};
25 : use metrics::{BuildInfo, NeonMetrics};
26 : use pageserver_api::controller_api::{
27 : MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
28 : MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
29 : SafekeeperSchedulingPolicyRequest, ShardsPreferredAzsRequest, TenantCreateRequest,
30 : };
31 : use pageserver_api::models::{
32 : TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
33 : TenantShardSplitRequest, TenantTimeTravelRequest, TimelineArchivalConfigRequest,
34 : TimelineCreateRequest,
35 : };
36 : use pageserver_api::shard::TenantShardId;
37 : use pageserver_client::{mgmt_api, BlockUnblock};
38 : use std::str::FromStr;
39 : use std::sync::Arc;
40 : use std::time::{Duration, Instant};
41 : use tokio_util::sync::CancellationToken;
42 : use utils::auth::{Scope, SwappableJwtAuth};
43 : use utils::id::{NodeId, TenantId, TimelineId};
44 :
45 : use pageserver_api::controller_api::{
46 : NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantPolicyRequest,
47 : TenantShardMigrateRequest,
48 : };
49 : use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
50 :
51 : use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
52 :
53 : use routerify::Middleware;
54 :
55 : /// State available to HTTP request handlers
56 : pub struct HttpState {
57 : service: Arc<crate::service::Service>,
58 : auth: Option<Arc<SwappableJwtAuth>>,
59 : neon_metrics: NeonMetrics,
60 : allowlist_routes: &'static [&'static str],
61 : }
62 :
63 : impl HttpState {
64 0 : pub fn new(
65 0 : service: Arc<crate::service::Service>,
66 0 : auth: Option<Arc<SwappableJwtAuth>>,
67 0 : build_info: BuildInfo,
68 0 : ) -> Self {
69 0 : Self {
70 0 : service,
71 0 : auth,
72 0 : neon_metrics: NeonMetrics::new(build_info),
73 0 : allowlist_routes: &[
74 0 : "/status",
75 0 : "/ready",
76 0 : "/metrics",
77 0 : "/profile/cpu",
78 0 : "/profile/heap",
79 0 : ],
80 0 : }
81 0 : }
82 : }
83 :
84 : #[inline(always)]
85 0 : fn get_state(request: &Request<Body>) -> &HttpState {
86 0 : request
87 0 : .data::<Arc<HttpState>>()
88 0 : .expect("unknown state type")
89 0 : .as_ref()
90 0 : }
91 :
92 : /// Pageserver calls into this on startup, to learn which tenants it should attach
93 0 : async fn handle_re_attach(req: Request<Body>) -> Result<Response<Body>, ApiError> {
94 0 : check_permissions(&req, Scope::GenerationsApi)?;
95 :
96 0 : let mut req = match maybe_forward(req).await {
97 0 : ForwardOutcome::Forwarded(res) => {
98 0 : return res;
99 : }
100 0 : ForwardOutcome::NotForwarded(req) => req,
101 : };
102 :
103 0 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
104 0 : let state = get_state(&req);
105 0 : json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
106 0 : }
107 :
108 : /// Pageserver calls into this before doing deletions, to confirm that it still
109 : /// holds the latest generation for the tenants with deletions enqueued
110 0 : async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError> {
111 0 : check_permissions(&req, Scope::GenerationsApi)?;
112 :
113 0 : let mut req = match maybe_forward(req).await {
114 0 : ForwardOutcome::Forwarded(res) => {
115 0 : return res;
116 : }
117 0 : ForwardOutcome::NotForwarded(req) => req,
118 : };
119 :
120 0 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
121 0 : let state = get_state(&req);
122 0 : json_response(StatusCode::OK, state.service.validate(validate_req).await?)
123 0 : }
124 :
125 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
126 : /// (in the real control plane this is unnecessary, because the same program is managing
127 : /// generation numbers and doing attachments).
128 0 : async fn handle_attach_hook(req: Request<Body>) -> Result<Response<Body>, ApiError> {
129 0 : check_permissions(&req, Scope::Admin)?;
130 :
131 0 : let mut req = match maybe_forward(req).await {
132 0 : ForwardOutcome::Forwarded(res) => {
133 0 : return res;
134 : }
135 0 : ForwardOutcome::NotForwarded(req) => req,
136 : };
137 :
138 0 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
139 0 : let state = get_state(&req);
140 0 :
141 0 : json_response(
142 0 : StatusCode::OK,
143 0 : state
144 0 : .service
145 0 : .attach_hook(attach_req)
146 0 : .await
147 0 : .map_err(ApiError::InternalServerError)?,
148 : )
149 0 : }
150 :
151 0 : async fn handle_inspect(req: Request<Body>) -> Result<Response<Body>, ApiError> {
152 0 : check_permissions(&req, Scope::Admin)?;
153 :
154 0 : let mut req = match maybe_forward(req).await {
155 0 : ForwardOutcome::Forwarded(res) => {
156 0 : return res;
157 : }
158 0 : ForwardOutcome::NotForwarded(req) => req,
159 : };
160 :
161 0 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
162 :
163 0 : let state = get_state(&req);
164 0 :
165 0 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
166 0 : }
167 :
168 0 : async fn handle_tenant_create(
169 0 : service: Arc<Service>,
170 0 : req: Request<Body>,
171 0 : ) -> Result<Response<Body>, ApiError> {
172 0 : check_permissions(&req, Scope::PageServerApi)?;
173 :
174 0 : let mut req = match maybe_forward(req).await {
175 0 : ForwardOutcome::Forwarded(res) => {
176 0 : return res;
177 : }
178 0 : ForwardOutcome::NotForwarded(req) => req,
179 : };
180 :
181 0 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
182 :
183 : json_response(
184 : StatusCode::CREATED,
185 0 : service.tenant_create(create_req).await?,
186 : )
187 0 : }
188 :
189 0 : async fn handle_tenant_location_config(
190 0 : service: Arc<Service>,
191 0 : req: Request<Body>,
192 0 : ) -> Result<Response<Body>, ApiError> {
193 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
194 0 : check_permissions(&req, Scope::PageServerApi)?;
195 :
196 0 : let mut req = match maybe_forward(req).await {
197 0 : ForwardOutcome::Forwarded(res) => {
198 0 : return res;
199 : }
200 0 : ForwardOutcome::NotForwarded(req) => req,
201 : };
202 :
203 0 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
204 : json_response(
205 : StatusCode::OK,
206 0 : service
207 0 : .tenant_location_config(tenant_shard_id, config_req)
208 0 : .await?,
209 : )
210 0 : }
211 :
212 0 : async fn handle_tenant_config_patch(
213 0 : service: Arc<Service>,
214 0 : req: Request<Body>,
215 0 : ) -> Result<Response<Body>, ApiError> {
216 0 : check_permissions(&req, Scope::PageServerApi)?;
217 :
218 0 : let mut req = match maybe_forward(req).await {
219 0 : ForwardOutcome::Forwarded(res) => {
220 0 : return res;
221 : }
222 0 : ForwardOutcome::NotForwarded(req) => req,
223 : };
224 :
225 0 : let config_req = json_request::<TenantConfigPatchRequest>(&mut req).await?;
226 :
227 : json_response(
228 : StatusCode::OK,
229 0 : service.tenant_config_patch(config_req).await?,
230 : )
231 0 : }
232 :
233 0 : async fn handle_tenant_config_set(
234 0 : service: Arc<Service>,
235 0 : req: Request<Body>,
236 0 : ) -> Result<Response<Body>, ApiError> {
237 0 : check_permissions(&req, Scope::PageServerApi)?;
238 :
239 0 : let mut req = match maybe_forward(req).await {
240 0 : ForwardOutcome::Forwarded(res) => {
241 0 : return res;
242 : }
243 0 : ForwardOutcome::NotForwarded(req) => req,
244 : };
245 :
246 0 : let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
247 :
248 0 : json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
249 0 : }
250 :
251 0 : async fn handle_tenant_config_get(
252 0 : service: Arc<Service>,
253 0 : req: Request<Body>,
254 0 : ) -> Result<Response<Body>, ApiError> {
255 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
256 0 : check_permissions(&req, Scope::PageServerApi)?;
257 :
258 0 : match maybe_forward(req).await {
259 0 : ForwardOutcome::Forwarded(res) => {
260 0 : return res;
261 : }
262 0 : ForwardOutcome::NotForwarded(_req) => {}
263 0 : };
264 0 :
265 0 : json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
266 0 : }
267 :
268 0 : async fn handle_tenant_time_travel_remote_storage(
269 0 : service: Arc<Service>,
270 0 : req: Request<Body>,
271 0 : ) -> Result<Response<Body>, ApiError> {
272 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
273 0 : check_permissions(&req, Scope::PageServerApi)?;
274 :
275 0 : let mut req = match maybe_forward(req).await {
276 0 : ForwardOutcome::Forwarded(res) => {
277 0 : return res;
278 : }
279 0 : ForwardOutcome::NotForwarded(req) => req,
280 : };
281 :
282 0 : let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
283 :
284 0 : let timestamp_raw = must_get_query_param(&req, "travel_to")?;
285 0 : let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
286 0 : ApiError::BadRequest(anyhow::anyhow!(
287 0 : "Invalid time for travel_to: {timestamp_raw:?}"
288 0 : ))
289 0 : })?;
290 :
291 0 : let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
292 0 : let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
293 0 : ApiError::BadRequest(anyhow::anyhow!(
294 0 : "Invalid time for done_if_after: {done_if_after_raw:?}"
295 0 : ))
296 0 : })?;
297 :
298 0 : service
299 0 : .tenant_time_travel_remote_storage(
300 0 : &time_travel_req,
301 0 : tenant_id,
302 0 : timestamp_raw,
303 0 : done_if_after_raw,
304 0 : )
305 0 : .await?;
306 0 : json_response(StatusCode::OK, ())
307 0 : }
308 :
309 0 : fn map_reqwest_hyper_status(status: reqwest::StatusCode) -> Result<hyper::StatusCode, ApiError> {
310 0 : hyper::StatusCode::from_u16(status.as_u16())
311 0 : .context("invalid status code")
312 0 : .map_err(ApiError::InternalServerError)
313 0 : }
314 :
315 0 : async fn handle_tenant_secondary_download(
316 0 : service: Arc<Service>,
317 0 : req: Request<Body>,
318 0 : ) -> Result<Response<Body>, ApiError> {
319 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
320 0 : let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
321 0 :
322 0 : match maybe_forward(req).await {
323 0 : ForwardOutcome::Forwarded(res) => {
324 0 : return res;
325 : }
326 0 : ForwardOutcome::NotForwarded(_req) => {}
327 : };
328 :
329 0 : let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
330 0 : json_response(map_reqwest_hyper_status(status)?, progress)
331 0 : }
332 :
333 0 : async fn handle_tenant_delete(
334 0 : service: Arc<Service>,
335 0 : req: Request<Body>,
336 0 : ) -> Result<Response<Body>, ApiError> {
337 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
338 0 : check_permissions(&req, Scope::PageServerApi)?;
339 :
340 0 : match maybe_forward(req).await {
341 0 : ForwardOutcome::Forwarded(res) => {
342 0 : return res;
343 : }
344 0 : ForwardOutcome::NotForwarded(_req) => {}
345 : };
346 :
347 0 : let status_code = service
348 0 : .tenant_delete(tenant_id)
349 0 : .await
350 0 : .and_then(map_reqwest_hyper_status)?;
351 :
352 0 : if status_code == StatusCode::NOT_FOUND {
353 : // The pageserver uses 404 for successful deletion, but we use 200
354 0 : json_response(StatusCode::OK, ())
355 : } else {
356 0 : json_response(status_code, ())
357 : }
358 0 : }
359 :
360 0 : async fn handle_tenant_timeline_create(
361 0 : service: Arc<Service>,
362 0 : req: Request<Body>,
363 0 : ) -> Result<Response<Body>, ApiError> {
364 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
365 0 : check_permissions(&req, Scope::PageServerApi)?;
366 :
367 0 : let mut req = match maybe_forward(req).await {
368 0 : ForwardOutcome::Forwarded(res) => {
369 0 : return res;
370 : }
371 0 : ForwardOutcome::NotForwarded(req) => req,
372 : };
373 :
374 0 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
375 : json_response(
376 : StatusCode::CREATED,
377 0 : service
378 0 : .tenant_timeline_create(tenant_id, create_req)
379 0 : .await?,
380 : )
381 0 : }
382 :
383 0 : async fn handle_tenant_timeline_delete(
384 0 : service: Arc<Service>,
385 0 : req: Request<Body>,
386 0 : ) -> Result<Response<Body>, ApiError> {
387 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
388 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
389 :
390 0 : check_permissions(&req, Scope::PageServerApi)?;
391 :
392 0 : match maybe_forward(req).await {
393 0 : ForwardOutcome::Forwarded(res) => {
394 0 : return res;
395 : }
396 0 : ForwardOutcome::NotForwarded(_req) => {}
397 : };
398 :
399 : // For timeline deletions, which both implement an "initially return 202, then 404 once
400 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
401 0 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
402 0 : where
403 0 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
404 0 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
405 0 : {
406 : // On subsequent retries, wait longer.
407 : // Enable callers with a 25 second request timeout to reliably get a response
408 : const MAX_WAIT: Duration = Duration::from_secs(25);
409 : const MAX_RETRY_PERIOD: Duration = Duration::from_secs(5);
410 :
411 0 : let started_at = Instant::now();
412 0 :
413 0 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
414 0 : // completed.
415 0 : let mut retry_period = Duration::from_secs(1);
416 :
417 : loop {
418 0 : let status = f(service.clone()).await?;
419 0 : match status {
420 : StatusCode::ACCEPTED => {
421 0 : tracing::info!("Deletion accepted, waiting to try again...");
422 0 : tokio::time::sleep(retry_period).await;
423 0 : retry_period = MAX_RETRY_PERIOD;
424 : }
425 : StatusCode::CONFLICT => {
426 0 : tracing::info!("Deletion already in progress, waiting to try again...");
427 0 : tokio::time::sleep(retry_period).await;
428 : }
429 : StatusCode::NOT_FOUND => {
430 0 : tracing::info!("Deletion complete");
431 0 : return json_response(StatusCode::OK, ());
432 : }
433 : _ => {
434 0 : tracing::warn!("Unexpected status {status}");
435 0 : return json_response(status, ());
436 : }
437 : }
438 :
439 0 : let now = Instant::now();
440 0 : if now + retry_period > started_at + MAX_WAIT {
441 0 : tracing::info!("Deletion timed out waiting for 404");
442 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
443 : // the pageserver's swagger definition for this endpoint, and has the same desired
444 : // effect of causing the control plane to retry later.
445 0 : return json_response(StatusCode::CONFLICT, ());
446 0 : }
447 : }
448 0 : }
449 :
450 0 : deletion_wrapper(service, move |service| async move {
451 0 : service
452 0 : .tenant_timeline_delete(tenant_id, timeline_id)
453 0 : .await
454 0 : .and_then(map_reqwest_hyper_status)
455 0 : })
456 0 : .await
457 0 : }
458 :
459 0 : async fn handle_tenant_timeline_archival_config(
460 0 : service: Arc<Service>,
461 0 : req: Request<Body>,
462 0 : ) -> Result<Response<Body>, ApiError> {
463 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
464 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
465 :
466 0 : check_permissions(&req, Scope::PageServerApi)?;
467 :
468 0 : let mut req = match maybe_forward(req).await {
469 0 : ForwardOutcome::Forwarded(res) => {
470 0 : return res;
471 : }
472 0 : ForwardOutcome::NotForwarded(req) => req,
473 : };
474 :
475 0 : let create_req = json_request::<TimelineArchivalConfigRequest>(&mut req).await?;
476 :
477 0 : service
478 0 : .tenant_timeline_archival_config(tenant_id, timeline_id, create_req)
479 0 : .await?;
480 :
481 0 : json_response(StatusCode::OK, ())
482 0 : }
483 :
484 0 : async fn handle_tenant_timeline_detach_ancestor(
485 0 : service: Arc<Service>,
486 0 : req: Request<Body>,
487 0 : ) -> Result<Response<Body>, ApiError> {
488 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
489 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
490 :
491 0 : check_permissions(&req, Scope::PageServerApi)?;
492 :
493 0 : match maybe_forward(req).await {
494 0 : ForwardOutcome::Forwarded(res) => {
495 0 : return res;
496 : }
497 0 : ForwardOutcome::NotForwarded(_req) => {}
498 : };
499 :
500 0 : let res = service
501 0 : .tenant_timeline_detach_ancestor(tenant_id, timeline_id)
502 0 : .await?;
503 :
504 0 : json_response(StatusCode::OK, res)
505 0 : }
506 :
507 0 : async fn handle_tenant_timeline_block_unblock_gc(
508 0 : service: Arc<Service>,
509 0 : req: Request<Body>,
510 0 : dir: BlockUnblock,
511 0 : ) -> Result<Response<Body>, ApiError> {
512 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
513 0 : check_permissions(&req, Scope::PageServerApi)?;
514 :
515 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
516 :
517 0 : service
518 0 : .tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
519 0 : .await?;
520 :
521 0 : json_response(StatusCode::OK, ())
522 0 : }
523 :
524 0 : async fn handle_tenant_timeline_download_heatmap_layers(
525 0 : service: Arc<Service>,
526 0 : req: Request<Body>,
527 0 : ) -> Result<Response<Body>, ApiError> {
528 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
529 :
530 0 : check_permissions(&req, Scope::PageServerApi)?;
531 :
532 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
533 0 : let concurrency: Option<usize> = parse_query_param(&req, "concurrency")?;
534 :
535 0 : service
536 0 : .tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency)
537 0 : .await?;
538 :
539 0 : json_response(StatusCode::OK, ())
540 0 : }
541 :
542 : // For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
543 : // and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
544 : // compare to, so we can just filter out our well known ID format with regexes.
545 3 : fn path_without_ids(path: &str) -> String {
546 : static ID_REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
547 3 : ID_REGEX
548 3 : .get_or_init(|| regex::Regex::new(r"([0-9a-fA-F]{32}(-[0-9]{4})?|\?.*)").unwrap())
549 3 : .replace_all(path, "")
550 3 : .to_string()
551 3 : }
552 :
553 0 : async fn handle_tenant_timeline_passthrough(
554 0 : service: Arc<Service>,
555 0 : req: Request<Body>,
556 0 : ) -> Result<Response<Body>, ApiError> {
557 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
558 0 : check_permissions(&req, Scope::PageServerApi)?;
559 :
560 0 : let req = match maybe_forward(req).await {
561 0 : ForwardOutcome::Forwarded(res) => {
562 0 : return res;
563 : }
564 0 : ForwardOutcome::NotForwarded(req) => req,
565 : };
566 :
567 0 : let Some(path) = req.uri().path_and_query() else {
568 : // This should never happen, our request router only calls us if there is a path
569 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
570 : };
571 :
572 0 : tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
573 :
574 : // Find the node that holds shard zero
575 0 : let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
576 :
577 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
578 : // rewrite this to a shard-aware shard zero ID.
579 0 : let path = format!("{}", path);
580 0 : let tenant_str = tenant_id.to_string();
581 0 : let tenant_shard_str = format!("{}", tenant_shard_id);
582 0 : let path = path.replace(&tenant_str, &tenant_shard_str);
583 0 :
584 0 : let latency = &METRICS_REGISTRY
585 0 : .metrics_group
586 0 : .storage_controller_passthrough_request_latency;
587 0 :
588 0 : let path_label = path_without_ids(&path)
589 0 : .split('/')
590 0 : .filter(|token| !token.is_empty())
591 0 : .collect::<Vec<_>>()
592 0 : .join("_");
593 0 : let labels = PageserverRequestLabelGroup {
594 0 : pageserver_id: &node.get_id().to_string(),
595 0 : path: &path_label,
596 0 : method: crate::metrics::Method::Get,
597 0 : };
598 0 :
599 0 : let _timer = latency.start_timer(labels.clone());
600 0 :
601 0 : let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
602 0 : let resp = client.get_raw(path).await.map_err(|e|
603 : // We return 503 here because if we can't successfully send a request to the pageserver,
604 : // either we aren't available or the pageserver is unavailable.
605 0 : ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
606 :
607 0 : if !resp.status().is_success() {
608 0 : let error_counter = &METRICS_REGISTRY
609 0 : .metrics_group
610 0 : .storage_controller_passthrough_request_error;
611 0 : error_counter.inc(labels);
612 0 : }
613 :
614 : // Transform 404 into 503 if we raced with a migration
615 0 : if resp.status() == reqwest::StatusCode::NOT_FOUND {
616 : // Look up node again: if we migrated it will be different
617 0 : let (new_node, _tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
618 0 : if new_node.get_id() != node.get_id() {
619 : // Rather than retry here, send the client a 503 to prompt a retry: this matches
620 : // the pageserver's use of 503, and all clients calling this API should retry on 503.
621 0 : return Err(ApiError::ResourceUnavailable(
622 0 : format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
623 0 : ));
624 0 : }
625 0 : }
626 :
627 : // We have a reqest::Response, would like a http::Response
628 0 : let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
629 0 : for (k, v) in resp.headers() {
630 0 : builder = builder.header(k.as_str(), v.as_bytes());
631 0 : }
632 :
633 0 : let response = builder
634 0 : .body(Body::wrap_stream(resp.bytes_stream()))
635 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
636 :
637 0 : Ok(response)
638 0 : }
639 :
640 0 : async fn handle_tenant_locate(
641 0 : service: Arc<Service>,
642 0 : req: Request<Body>,
643 0 : ) -> Result<Response<Body>, ApiError> {
644 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
645 :
646 0 : check_permissions(&req, Scope::Admin)?;
647 :
648 0 : match maybe_forward(req).await {
649 0 : ForwardOutcome::Forwarded(res) => {
650 0 : return res;
651 : }
652 0 : ForwardOutcome::NotForwarded(_req) => {}
653 0 : };
654 0 :
655 0 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
656 0 : }
657 :
658 0 : async fn handle_tenant_describe(
659 0 : service: Arc<Service>,
660 0 : req: Request<Body>,
661 0 : ) -> Result<Response<Body>, ApiError> {
662 0 : check_permissions(&req, Scope::Scrubber)?;
663 :
664 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
665 :
666 0 : match maybe_forward(req).await {
667 0 : ForwardOutcome::Forwarded(res) => {
668 0 : return res;
669 : }
670 0 : ForwardOutcome::NotForwarded(_req) => {}
671 0 : };
672 0 :
673 0 : json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
674 0 : }
675 :
676 0 : async fn handle_tenant_list(
677 0 : service: Arc<Service>,
678 0 : req: Request<Body>,
679 0 : ) -> Result<Response<Body>, ApiError> {
680 0 : check_permissions(&req, Scope::Admin)?;
681 :
682 0 : let limit: Option<usize> = parse_query_param(&req, "limit")?;
683 0 : let start_after: Option<TenantId> = parse_query_param(&req, "start_after")?;
684 0 : tracing::info!("start_after: {:?}", start_after);
685 :
686 0 : match maybe_forward(req).await {
687 0 : ForwardOutcome::Forwarded(res) => {
688 0 : return res;
689 : }
690 0 : ForwardOutcome::NotForwarded(_req) => {}
691 0 : };
692 0 :
693 0 : json_response(StatusCode::OK, service.tenant_list(limit, start_after))
694 0 : }
695 :
696 0 : async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiError> {
697 0 : check_permissions(&req, Scope::Infra)?;
698 :
699 0 : let mut req = match maybe_forward(req).await {
700 0 : ForwardOutcome::Forwarded(res) => {
701 0 : return res;
702 : }
703 0 : ForwardOutcome::NotForwarded(req) => req,
704 : };
705 :
706 0 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
707 0 : let state = get_state(&req);
708 0 : state.service.node_register(register_req).await?;
709 0 : json_response(StatusCode::OK, ())
710 0 : }
711 :
712 0 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
713 0 : check_permissions(&req, Scope::Infra)?;
714 :
715 0 : let req = match maybe_forward(req).await {
716 0 : ForwardOutcome::Forwarded(res) => {
717 0 : return res;
718 : }
719 0 : ForwardOutcome::NotForwarded(req) => req,
720 0 : };
721 0 :
722 0 : let state = get_state(&req);
723 0 : let mut nodes = state.service.node_list().await?;
724 0 : nodes.sort_by_key(|n| n.get_id());
725 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
726 0 :
727 0 : json_response(StatusCode::OK, api_nodes)
728 0 : }
729 :
730 0 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
731 0 : check_permissions(&req, Scope::Admin)?;
732 :
733 0 : let req = match maybe_forward(req).await {
734 0 : ForwardOutcome::Forwarded(res) => {
735 0 : return res;
736 : }
737 0 : ForwardOutcome::NotForwarded(req) => req,
738 0 : };
739 0 :
740 0 : let state = get_state(&req);
741 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
742 0 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
743 0 : }
744 :
745 0 : async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
746 0 : check_permissions(&req, Scope::Admin)?;
747 :
748 0 : let req = match maybe_forward(req).await {
749 0 : ForwardOutcome::Forwarded(res) => {
750 0 : return res;
751 : }
752 0 : ForwardOutcome::NotForwarded(req) => req,
753 0 : };
754 0 :
755 0 : let state = get_state(&req);
756 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
757 0 : json_response(StatusCode::OK, state.service.node_delete(node_id).await?)
758 0 : }
759 :
760 0 : async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, ApiError> {
761 0 : check_permissions(&req, Scope::Admin)?;
762 :
763 0 : let mut req = match maybe_forward(req).await {
764 0 : ForwardOutcome::Forwarded(res) => {
765 0 : return res;
766 : }
767 0 : ForwardOutcome::NotForwarded(req) => req,
768 : };
769 :
770 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
771 0 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
772 0 : if node_id != config_req.node_id {
773 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
774 0 : "Path and body node_id differ"
775 0 : )));
776 0 : }
777 0 : let state = get_state(&req);
778 0 :
779 0 : json_response(
780 0 : StatusCode::OK,
781 0 : state
782 0 : .service
783 0 : .external_node_configure(
784 0 : config_req.node_id,
785 0 : config_req.availability.map(NodeAvailability::from),
786 0 : config_req.scheduling,
787 0 : )
788 0 : .await?,
789 : )
790 0 : }
791 :
792 0 : async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
793 0 : check_permissions(&req, Scope::Infra)?;
794 :
795 0 : let req = match maybe_forward(req).await {
796 0 : ForwardOutcome::Forwarded(res) => {
797 0 : return res;
798 : }
799 0 : ForwardOutcome::NotForwarded(req) => req,
800 0 : };
801 0 :
802 0 : let state = get_state(&req);
803 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
804 :
805 0 : let node_status = state.service.get_node(node_id).await?;
806 :
807 0 : json_response(StatusCode::OK, node_status)
808 0 : }
809 :
810 0 : async fn handle_node_shards(req: Request<Body>) -> Result<Response<Body>, ApiError> {
811 0 : check_permissions(&req, Scope::Admin)?;
812 :
813 0 : let state = get_state(&req);
814 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
815 :
816 0 : let node_status = state.service.get_node_shards(node_id).await?;
817 :
818 0 : json_response(StatusCode::OK, node_status)
819 0 : }
820 :
821 0 : async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
822 0 : check_permissions(&req, Scope::Admin)?;
823 :
824 0 : let req = match maybe_forward(req).await {
825 0 : ForwardOutcome::Forwarded(res) => {
826 0 : return res;
827 : }
828 0 : ForwardOutcome::NotForwarded(req) => req,
829 0 : };
830 0 :
831 0 : let state = get_state(&req);
832 0 : let leader = state.service.get_leader().await.map_err(|err| {
833 0 : ApiError::InternalServerError(anyhow::anyhow!(
834 0 : "Failed to read leader from database: {err}"
835 0 : ))
836 0 : })?;
837 :
838 0 : json_response(StatusCode::OK, leader)
839 0 : }
840 :
841 0 : async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
842 0 : check_permissions(&req, Scope::Infra)?;
843 :
844 0 : let req = match maybe_forward(req).await {
845 0 : ForwardOutcome::Forwarded(res) => {
846 0 : return res;
847 : }
848 0 : ForwardOutcome::NotForwarded(req) => req,
849 0 : };
850 0 :
851 0 : let state = get_state(&req);
852 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
853 :
854 0 : state.service.start_node_drain(node_id).await?;
855 :
856 0 : json_response(StatusCode::ACCEPTED, ())
857 0 : }
858 :
859 0 : async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
860 0 : check_permissions(&req, Scope::Infra)?;
861 :
862 0 : let req = match maybe_forward(req).await {
863 0 : ForwardOutcome::Forwarded(res) => {
864 0 : return res;
865 : }
866 0 : ForwardOutcome::NotForwarded(req) => req,
867 0 : };
868 0 :
869 0 : let state = get_state(&req);
870 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
871 :
872 0 : state.service.cancel_node_drain(node_id).await?;
873 :
874 0 : json_response(StatusCode::ACCEPTED, ())
875 0 : }
876 :
877 0 : async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
878 0 : check_permissions(&req, Scope::Infra)?;
879 :
880 0 : let req = match maybe_forward(req).await {
881 0 : ForwardOutcome::Forwarded(res) => {
882 0 : return res;
883 : }
884 0 : ForwardOutcome::NotForwarded(req) => req,
885 0 : };
886 0 :
887 0 : let state = get_state(&req);
888 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
889 :
890 0 : state.service.start_node_fill(node_id).await?;
891 :
892 0 : json_response(StatusCode::ACCEPTED, ())
893 0 : }
894 :
895 0 : async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
896 0 : check_permissions(&req, Scope::Infra)?;
897 :
898 0 : let req = match maybe_forward(req).await {
899 0 : ForwardOutcome::Forwarded(res) => {
900 0 : return res;
901 : }
902 0 : ForwardOutcome::NotForwarded(req) => req,
903 0 : };
904 0 :
905 0 : let state = get_state(&req);
906 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
907 :
908 0 : state.service.cancel_node_fill(node_id).await?;
909 :
910 0 : json_response(StatusCode::ACCEPTED, ())
911 0 : }
912 :
913 0 : async fn handle_safekeeper_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
914 0 : check_permissions(&req, Scope::Infra)?;
915 :
916 0 : let req = match maybe_forward(req).await {
917 0 : ForwardOutcome::Forwarded(res) => {
918 0 : return res;
919 : }
920 0 : ForwardOutcome::NotForwarded(req) => req,
921 0 : };
922 0 :
923 0 : let state = get_state(&req);
924 0 : let safekeepers = state.service.safekeepers_list().await?;
925 0 : json_response(StatusCode::OK, safekeepers)
926 0 : }
927 :
928 0 : async fn handle_metadata_health_update(req: Request<Body>) -> Result<Response<Body>, ApiError> {
929 0 : check_permissions(&req, Scope::Scrubber)?;
930 :
931 0 : let mut req = match maybe_forward(req).await {
932 0 : ForwardOutcome::Forwarded(res) => {
933 0 : return res;
934 : }
935 0 : ForwardOutcome::NotForwarded(req) => req,
936 : };
937 :
938 0 : let update_req = json_request::<MetadataHealthUpdateRequest>(&mut req).await?;
939 0 : let state = get_state(&req);
940 0 :
941 0 : state.service.metadata_health_update(update_req).await?;
942 :
943 0 : json_response(StatusCode::OK, MetadataHealthUpdateResponse {})
944 0 : }
945 :
946 0 : async fn handle_metadata_health_list_unhealthy(
947 0 : req: Request<Body>,
948 0 : ) -> Result<Response<Body>, ApiError> {
949 0 : check_permissions(&req, Scope::Admin)?;
950 :
951 0 : let req = match maybe_forward(req).await {
952 0 : ForwardOutcome::Forwarded(res) => {
953 0 : return res;
954 : }
955 0 : ForwardOutcome::NotForwarded(req) => req,
956 0 : };
957 0 :
958 0 : let state = get_state(&req);
959 0 : let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
960 :
961 0 : json_response(
962 0 : StatusCode::OK,
963 0 : MetadataHealthListUnhealthyResponse {
964 0 : unhealthy_tenant_shards,
965 0 : },
966 0 : )
967 0 : }
968 :
969 0 : async fn handle_metadata_health_list_outdated(
970 0 : req: Request<Body>,
971 0 : ) -> Result<Response<Body>, ApiError> {
972 0 : check_permissions(&req, Scope::Admin)?;
973 :
974 0 : let mut req = match maybe_forward(req).await {
975 0 : ForwardOutcome::Forwarded(res) => {
976 0 : return res;
977 : }
978 0 : ForwardOutcome::NotForwarded(req) => req,
979 : };
980 :
981 0 : let list_outdated_req = json_request::<MetadataHealthListOutdatedRequest>(&mut req).await?;
982 0 : let state = get_state(&req);
983 0 : let health_records = state
984 0 : .service
985 0 : .metadata_health_list_outdated(list_outdated_req.not_scrubbed_for)
986 0 : .await?;
987 :
988 0 : json_response(
989 0 : StatusCode::OK,
990 0 : MetadataHealthListOutdatedResponse { health_records },
991 0 : )
992 0 : }
993 :
994 0 : async fn handle_tenant_shard_split(
995 0 : service: Arc<Service>,
996 0 : req: Request<Body>,
997 0 : ) -> Result<Response<Body>, ApiError> {
998 0 : check_permissions(&req, Scope::Admin)?;
999 :
1000 0 : let mut req = match maybe_forward(req).await {
1001 0 : ForwardOutcome::Forwarded(res) => {
1002 0 : return res;
1003 : }
1004 0 : ForwardOutcome::NotForwarded(req) => req,
1005 : };
1006 :
1007 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1008 0 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
1009 :
1010 : json_response(
1011 : StatusCode::OK,
1012 0 : service.tenant_shard_split(tenant_id, split_req).await?,
1013 : )
1014 0 : }
1015 :
1016 0 : async fn handle_tenant_shard_migrate(
1017 0 : service: Arc<Service>,
1018 0 : req: Request<Body>,
1019 0 : ) -> Result<Response<Body>, ApiError> {
1020 0 : check_permissions(&req, Scope::Admin)?;
1021 :
1022 0 : let mut req = match maybe_forward(req).await {
1023 0 : ForwardOutcome::Forwarded(res) => {
1024 0 : return res;
1025 : }
1026 0 : ForwardOutcome::NotForwarded(req) => req,
1027 : };
1028 :
1029 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1030 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1031 : json_response(
1032 : StatusCode::OK,
1033 0 : service
1034 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
1035 0 : .await?,
1036 : )
1037 0 : }
1038 :
1039 0 : async fn handle_tenant_shard_migrate_secondary(
1040 0 : service: Arc<Service>,
1041 0 : req: Request<Body>,
1042 0 : ) -> Result<Response<Body>, ApiError> {
1043 0 : check_permissions(&req, Scope::Admin)?;
1044 :
1045 0 : let mut req = match maybe_forward(req).await {
1046 0 : ForwardOutcome::Forwarded(res) => {
1047 0 : return res;
1048 : }
1049 0 : ForwardOutcome::NotForwarded(req) => req,
1050 : };
1051 :
1052 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1053 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1054 : json_response(
1055 : StatusCode::OK,
1056 0 : service
1057 0 : .tenant_shard_migrate_secondary(tenant_shard_id, migrate_req)
1058 0 : .await?,
1059 : )
1060 0 : }
1061 :
1062 0 : async fn handle_tenant_shard_cancel_reconcile(
1063 0 : service: Arc<Service>,
1064 0 : req: Request<Body>,
1065 0 : ) -> Result<Response<Body>, ApiError> {
1066 0 : check_permissions(&req, Scope::Admin)?;
1067 :
1068 0 : let req = match maybe_forward(req).await {
1069 0 : ForwardOutcome::Forwarded(res) => {
1070 0 : return res;
1071 : }
1072 0 : ForwardOutcome::NotForwarded(req) => req,
1073 : };
1074 :
1075 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1076 : json_response(
1077 : StatusCode::OK,
1078 0 : service
1079 0 : .tenant_shard_cancel_reconcile(tenant_shard_id)
1080 0 : .await?,
1081 : )
1082 0 : }
1083 :
1084 0 : async fn handle_tenant_update_policy(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1085 0 : check_permissions(&req, Scope::Admin)?;
1086 :
1087 0 : let mut req = match maybe_forward(req).await {
1088 0 : ForwardOutcome::Forwarded(res) => {
1089 0 : return res;
1090 : }
1091 0 : ForwardOutcome::NotForwarded(req) => req,
1092 : };
1093 :
1094 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1095 0 : let update_req = json_request::<TenantPolicyRequest>(&mut req).await?;
1096 0 : let state = get_state(&req);
1097 0 :
1098 0 : json_response(
1099 0 : StatusCode::OK,
1100 0 : state
1101 0 : .service
1102 0 : .tenant_update_policy(tenant_id, update_req)
1103 0 : .await?,
1104 : )
1105 0 : }
1106 :
1107 0 : async fn handle_update_preferred_azs(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1108 0 : check_permissions(&req, Scope::Admin)?;
1109 :
1110 0 : let mut req = match maybe_forward(req).await {
1111 0 : ForwardOutcome::Forwarded(res) => {
1112 0 : return res;
1113 : }
1114 0 : ForwardOutcome::NotForwarded(req) => req,
1115 : };
1116 :
1117 0 : let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
1118 0 : let state = get_state(&req);
1119 0 :
1120 0 : json_response(
1121 0 : StatusCode::OK,
1122 0 : state.service.update_shards_preferred_azs(azs_req).await?,
1123 : )
1124 0 : }
1125 :
1126 0 : async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1127 0 : check_permissions(&req, Scope::ControllerPeer)?;
1128 :
1129 0 : let req = match maybe_forward(req).await {
1130 0 : ForwardOutcome::Forwarded(res) => {
1131 0 : return res;
1132 : }
1133 0 : ForwardOutcome::NotForwarded(req) => req,
1134 0 : };
1135 0 :
1136 0 : let state = get_state(&req);
1137 0 : json_response(StatusCode::OK, state.service.step_down().await)
1138 0 : }
1139 :
1140 0 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1141 0 : check_permissions(&req, Scope::PageServerApi)?;
1142 :
1143 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1144 :
1145 0 : let req = match maybe_forward(req).await {
1146 0 : ForwardOutcome::Forwarded(res) => {
1147 0 : return res;
1148 : }
1149 0 : ForwardOutcome::NotForwarded(req) => req,
1150 0 : };
1151 0 :
1152 0 : let state = get_state(&req);
1153 0 :
1154 0 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
1155 0 : }
1156 :
1157 0 : async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1158 0 : check_permissions(&req, Scope::PageServerApi)?;
1159 :
1160 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1161 :
1162 0 : let req = match maybe_forward(req).await {
1163 0 : ForwardOutcome::Forwarded(res) => {
1164 0 : return res;
1165 : }
1166 0 : ForwardOutcome::NotForwarded(req) => req,
1167 0 : };
1168 0 :
1169 0 : let state = get_state(&req);
1170 0 :
1171 0 : json_response(
1172 0 : StatusCode::OK,
1173 0 : state.service.tenant_import(tenant_id).await?,
1174 : )
1175 0 : }
1176 :
1177 0 : async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1178 0 : check_permissions(&req, Scope::Admin)?;
1179 :
1180 0 : let req = match maybe_forward(req).await {
1181 0 : ForwardOutcome::Forwarded(res) => {
1182 0 : return res;
1183 : }
1184 0 : ForwardOutcome::NotForwarded(req) => req,
1185 0 : };
1186 0 :
1187 0 : let state = get_state(&req);
1188 0 : state.service.tenants_dump()
1189 0 : }
1190 :
1191 0 : async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1192 0 : check_permissions(&req, Scope::Admin)?;
1193 :
1194 0 : let req = match maybe_forward(req).await {
1195 0 : ForwardOutcome::Forwarded(res) => {
1196 0 : return res;
1197 : }
1198 0 : ForwardOutcome::NotForwarded(req) => req,
1199 0 : };
1200 0 :
1201 0 : let state = get_state(&req);
1202 0 : state.service.scheduler_dump()
1203 0 : }
1204 :
1205 0 : async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1206 0 : check_permissions(&req, Scope::Admin)?;
1207 :
1208 0 : let req = match maybe_forward(req).await {
1209 0 : ForwardOutcome::Forwarded(res) => {
1210 0 : return res;
1211 : }
1212 0 : ForwardOutcome::NotForwarded(req) => req,
1213 0 : };
1214 0 :
1215 0 : let state = get_state(&req);
1216 0 :
1217 0 : json_response(StatusCode::OK, state.service.consistency_check().await?)
1218 0 : }
1219 :
1220 0 : async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1221 0 : check_permissions(&req, Scope::Admin)?;
1222 :
1223 0 : let req = match maybe_forward(req).await {
1224 0 : ForwardOutcome::Forwarded(res) => {
1225 0 : return res;
1226 : }
1227 0 : ForwardOutcome::NotForwarded(req) => req,
1228 0 : };
1229 0 :
1230 0 : let state = get_state(&req);
1231 0 :
1232 0 : json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
1233 0 : }
1234 :
1235 : /// Status endpoint is just used for checking that our HTTP listener is up
1236 0 : async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1237 0 : match maybe_forward(req).await {
1238 0 : ForwardOutcome::Forwarded(res) => {
1239 0 : return res;
1240 : }
1241 0 : ForwardOutcome::NotForwarded(_req) => {}
1242 0 : };
1243 0 :
1244 0 : json_response(StatusCode::OK, ())
1245 0 : }
1246 :
1247 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
1248 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
1249 0 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1250 0 : let req = match maybe_forward(req).await {
1251 0 : ForwardOutcome::Forwarded(res) => {
1252 0 : return res;
1253 : }
1254 0 : ForwardOutcome::NotForwarded(req) => req,
1255 0 : };
1256 0 :
1257 0 : let state = get_state(&req);
1258 0 : if state.service.startup_complete.is_ready() {
1259 0 : json_response(StatusCode::OK, ())
1260 : } else {
1261 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
1262 : }
1263 0 : }
1264 :
1265 : impl From<ReconcileError> for ApiError {
1266 0 : fn from(value: ReconcileError) -> Self {
1267 0 : ApiError::Conflict(format!("Reconciliation error: {}", value))
1268 0 : }
1269 : }
1270 :
1271 : /// Return the safekeeper record by instance id, or 404.
1272 : ///
1273 : /// Not used by anything except manual testing.
1274 0 : async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1275 0 : check_permissions(&req, Scope::Infra)?;
1276 :
1277 0 : let id = parse_request_param::<i64>(&req, "id")?;
1278 :
1279 0 : let req = match maybe_forward(req).await {
1280 0 : ForwardOutcome::Forwarded(res) => {
1281 0 : return res;
1282 : }
1283 0 : ForwardOutcome::NotForwarded(req) => req,
1284 0 : };
1285 0 :
1286 0 : let state = get_state(&req);
1287 :
1288 0 : let res = state.service.get_safekeeper(id).await;
1289 :
1290 0 : match res {
1291 0 : Ok(b) => json_response(StatusCode::OK, b),
1292 : Err(crate::persistence::DatabaseError::Query(diesel::result::Error::NotFound)) => {
1293 0 : Err(ApiError::NotFound("unknown instance id".into()))
1294 : }
1295 0 : Err(other) => Err(other.into()),
1296 : }
1297 0 : }
1298 :
1299 : /// Used as part of deployment scripts.
1300 : ///
1301 : /// Assumes information is only relayed to storage controller after first selecting an unique id on
1302 : /// control plane database, which means we have an id field in the request and payload.
1303 0 : async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
1304 0 : check_permissions(&req, Scope::Infra)?;
1305 :
1306 0 : let body = json_request::<SafekeeperUpsert>(&mut req).await?;
1307 0 : let id = parse_request_param::<i64>(&req, "id")?;
1308 :
1309 0 : if id != body.id {
1310 : // it should be repeated
1311 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1312 0 : "id mismatch: url={id:?}, body={:?}",
1313 0 : body.id
1314 0 : )));
1315 0 : }
1316 :
1317 0 : let req = match maybe_forward(req).await {
1318 0 : ForwardOutcome::Forwarded(res) => {
1319 0 : return res;
1320 : }
1321 0 : ForwardOutcome::NotForwarded(req) => req,
1322 0 : };
1323 0 :
1324 0 : let state = get_state(&req);
1325 0 :
1326 0 : state.service.upsert_safekeeper(body).await?;
1327 :
1328 0 : Ok(Response::builder()
1329 0 : .status(StatusCode::NO_CONTENT)
1330 0 : .body(Body::empty())
1331 0 : .unwrap())
1332 0 : }
1333 :
1334 : /// Sets the scheduling policy of the specified safekeeper
1335 0 : async fn handle_safekeeper_scheduling_policy(
1336 0 : mut req: Request<Body>,
1337 0 : ) -> Result<Response<Body>, ApiError> {
1338 0 : check_permissions(&req, Scope::Admin)?;
1339 :
1340 0 : let body = json_request::<SafekeeperSchedulingPolicyRequest>(&mut req).await?;
1341 0 : let id = parse_request_param::<i64>(&req, "id")?;
1342 :
1343 0 : let req = match maybe_forward(req).await {
1344 0 : ForwardOutcome::Forwarded(res) => {
1345 0 : return res;
1346 : }
1347 0 : ForwardOutcome::NotForwarded(req) => req,
1348 0 : };
1349 0 :
1350 0 : let state = get_state(&req);
1351 0 :
1352 0 : state
1353 0 : .service
1354 0 : .set_safekeeper_scheduling_policy(id, body.scheduling_policy)
1355 0 : .await?;
1356 :
1357 0 : Ok(Response::builder()
1358 0 : .status(StatusCode::NO_CONTENT)
1359 0 : .body(Body::empty())
1360 0 : .unwrap())
1361 0 : }
1362 :
1363 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
1364 : /// be allowed to run if Service has finished its initial reconciliation.
1365 0 : async fn tenant_service_handler<R, H>(
1366 0 : request: Request<Body>,
1367 0 : handler: H,
1368 0 : request_name: RequestName,
1369 0 : ) -> R::Output
1370 0 : where
1371 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1372 0 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
1373 0 : {
1374 0 : let state = get_state(&request);
1375 0 : let service = state.service.clone();
1376 0 :
1377 0 : let startup_complete = service.startup_complete.clone();
1378 0 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
1379 0 : .await
1380 0 : .is_err()
1381 : {
1382 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
1383 : // timeouts around its remote calls, to bound its runtime.
1384 0 : return Err(ApiError::Timeout(
1385 0 : "Timed out waiting for service readiness".into(),
1386 0 : ));
1387 0 : }
1388 0 :
1389 0 : named_request_span(
1390 0 : request,
1391 0 : |request| async move { handler(service, request).await },
1392 0 : request_name,
1393 0 : )
1394 0 : .await
1395 0 : }
1396 :
1397 : /// Check if the required scope is held in the request's token, or if the request has
1398 : /// a token with 'admin' scope then always permit it.
1399 0 : fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(), ApiError> {
1400 0 : check_permission_with(request, |claims| {
1401 0 : match crate::auth::check_permission(claims, required_scope) {
1402 0 : Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
1403 0 : Ok(()) => Ok(()),
1404 0 : Err(_) => Err(e),
1405 : },
1406 0 : Ok(()) => Ok(()),
1407 : }
1408 0 : })
1409 0 : }
1410 :
1411 : #[derive(Clone, Debug)]
1412 : struct RequestMeta {
1413 : method: hyper::http::Method,
1414 : at: Instant,
1415 : }
1416 :
1417 0 : pub fn prologue_leadership_status_check_middleware<
1418 0 : B: hyper::body::HttpBody + Send + Sync + 'static,
1419 0 : >() -> Middleware<B, ApiError> {
1420 0 : Middleware::pre(move |req| async move {
1421 0 : let state = get_state(&req);
1422 0 : let leadership_status = state.service.get_leadership_status();
1423 :
1424 : enum AllowedRoutes {
1425 : All,
1426 : Some(&'static [&'static str]),
1427 : }
1428 :
1429 0 : let allowed_routes = match leadership_status {
1430 0 : LeadershipStatus::Leader => AllowedRoutes::All,
1431 0 : LeadershipStatus::SteppedDown => AllowedRoutes::All,
1432 0 : LeadershipStatus::Candidate => AllowedRoutes::Some(&[
1433 0 : "/ready",
1434 0 : "/status",
1435 0 : "/metrics",
1436 0 : "/profile/cpu",
1437 0 : "/profile/heap",
1438 0 : ]),
1439 : };
1440 :
1441 0 : match allowed_routes {
1442 0 : AllowedRoutes::All => Ok(req),
1443 0 : AllowedRoutes::Some(allowed) if allowed.contains(&req.uri().path()) => Ok(req),
1444 : _ => {
1445 0 : tracing::info!(
1446 0 : "Request {} not allowed due to current leadership state",
1447 0 : req.uri()
1448 : );
1449 :
1450 0 : Err(ApiError::ResourceUnavailable(
1451 0 : format!("Current leadership status is {leadership_status}").into(),
1452 0 : ))
1453 : }
1454 : }
1455 0 : })
1456 0 : }
1457 :
1458 0 : fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
1459 0 : ) -> Middleware<B, ApiError> {
1460 0 : Middleware::pre(move |req| async move {
1461 0 : let meta = RequestMeta {
1462 0 : method: req.method().clone(),
1463 0 : at: Instant::now(),
1464 0 : };
1465 0 :
1466 0 : req.set_context(meta);
1467 0 :
1468 0 : Ok(req)
1469 0 : })
1470 0 : }
1471 :
1472 0 : fn epilogue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
1473 0 : ) -> Middleware<B, ApiError> {
1474 0 : Middleware::post_with_info(move |resp, req_info| async move {
1475 0 : let request_name = match req_info.context::<RequestName>() {
1476 0 : Some(name) => name,
1477 : None => {
1478 0 : return Ok(resp);
1479 : }
1480 : };
1481 :
1482 0 : if let Some(meta) = req_info.context::<RequestMeta>() {
1483 0 : let status = &crate::metrics::METRICS_REGISTRY
1484 0 : .metrics_group
1485 0 : .storage_controller_http_request_status;
1486 0 : let latency = &crate::metrics::METRICS_REGISTRY
1487 0 : .metrics_group
1488 0 : .storage_controller_http_request_latency;
1489 0 :
1490 0 : status.inc(HttpRequestStatusLabelGroup {
1491 0 : path: request_name.0,
1492 0 : method: meta.method.clone().into(),
1493 0 : status: crate::metrics::StatusCode(resp.status()),
1494 0 : });
1495 0 :
1496 0 : latency.observe(
1497 0 : HttpRequestLatencyLabelGroup {
1498 0 : path: request_name.0,
1499 0 : method: meta.method.into(),
1500 0 : },
1501 0 : meta.at.elapsed().as_secs_f64(),
1502 0 : );
1503 0 : }
1504 0 : Ok(resp)
1505 0 : })
1506 0 : }
1507 :
1508 0 : pub async fn measured_metrics_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1509 : pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
1510 :
1511 0 : let req = match maybe_forward(req).await {
1512 0 : ForwardOutcome::Forwarded(res) => {
1513 0 : return res;
1514 : }
1515 0 : ForwardOutcome::NotForwarded(req) => req,
1516 0 : };
1517 0 :
1518 0 : let state = get_state(&req);
1519 0 : let payload = crate::metrics::METRICS_REGISTRY.encode(&state.neon_metrics);
1520 0 : let response = Response::builder()
1521 0 : .status(200)
1522 0 : .header(CONTENT_TYPE, TEXT_FORMAT)
1523 0 : .body(payload.into())
1524 0 : .unwrap();
1525 0 :
1526 0 : Ok(response)
1527 0 : }
1528 :
1529 : #[derive(Clone)]
1530 : struct RequestName(&'static str);
1531 :
1532 0 : async fn named_request_span<R, H>(
1533 0 : request: Request<Body>,
1534 0 : handler: H,
1535 0 : name: RequestName,
1536 0 : ) -> R::Output
1537 0 : where
1538 0 : R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1539 0 : H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
1540 0 : {
1541 0 : request.set_context(name);
1542 0 : request_span(request, handler).await
1543 0 : }
1544 :
1545 : enum ForwardOutcome {
1546 : Forwarded(Result<Response<Body>, ApiError>),
1547 : NotForwarded(Request<Body>),
1548 : }
1549 :
1550 : /// Potentially forward the request to the current storage controler leader.
1551 : /// More specifically we forward when:
1552 : /// 1. Request is not one of:
1553 : /// ["/control/v1/step_down", "/status", "/ready", "/metrics", "/profile/cpu", "/profile/heap"]
1554 : /// 2. Current instance is in [`LeadershipStatus::SteppedDown`] state
1555 : /// 3. There is a leader in the database to forward to
1556 : /// 4. Leader from step (3) is not the current instance
1557 : ///
1558 : /// Why forward?
1559 : /// It turns out that we can't rely on external orchestration to promptly route trafic to the
1560 : /// new leader. This is downtime inducing. Forwarding provides a safe way out.
1561 : ///
1562 : /// Why is it safe?
1563 : /// If a storcon instance is persisted in the database, then we know that it is the current leader.
1564 : /// There's one exception: time between handling step-down request and the new leader updating the
1565 : /// database.
1566 : ///
1567 : /// Let's treat the happy case first. The stepped down node does not produce any side effects,
1568 : /// since all request handling happens on the leader.
1569 : ///
1570 : /// As for the edge case, we are guaranteed to always have a maximum of two running instances.
1571 : /// Hence, if we are in the edge case scenario the leader persisted in the database is the
1572 : /// stepped down instance that received the request. Condition (4) above covers this scenario.
1573 0 : async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
1574 : const NOT_FOR_FORWARD: &[&str] = &[
1575 : "/control/v1/step_down",
1576 : "/status",
1577 : "/ready",
1578 : "/metrics",
1579 : "/profile/cpu",
1580 : "/profile/heap",
1581 : ];
1582 :
1583 0 : let uri = req.uri();
1584 0 : let uri_for_forward = !NOT_FOR_FORWARD.contains(&uri.path());
1585 0 :
1586 0 : // Fast return before trying to take any Service locks, if we will never forward anyway
1587 0 : if !uri_for_forward {
1588 0 : return ForwardOutcome::NotForwarded(req);
1589 0 : }
1590 0 :
1591 0 : let state = get_state(&req);
1592 0 : let leadership_status = state.service.get_leadership_status();
1593 0 :
1594 0 : if leadership_status != LeadershipStatus::SteppedDown {
1595 0 : return ForwardOutcome::NotForwarded(req);
1596 0 : }
1597 :
1598 0 : let leader = state.service.get_leader().await;
1599 0 : let leader = {
1600 0 : match leader {
1601 0 : Ok(Some(leader)) => leader,
1602 : Ok(None) => {
1603 0 : return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
1604 0 : "No leader to forward to while in stepped down state".into(),
1605 0 : )));
1606 : }
1607 0 : Err(err) => {
1608 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
1609 0 : anyhow::anyhow!(
1610 0 : "Failed to get leader for forwarding while in stepped down state: {err}"
1611 0 : ),
1612 0 : )));
1613 : }
1614 : }
1615 : };
1616 :
1617 0 : let cfg = state.service.get_config();
1618 0 : if let Some(ref self_addr) = cfg.address_for_peers {
1619 0 : let leader_addr = match Uri::from_str(leader.address.as_str()) {
1620 0 : Ok(uri) => uri,
1621 0 : Err(err) => {
1622 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
1623 0 : anyhow::anyhow!(
1624 0 : "Failed to parse leader uri for forwarding while in stepped down state: {err}"
1625 0 : ),
1626 0 : )));
1627 : }
1628 : };
1629 :
1630 0 : if *self_addr == leader_addr {
1631 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1632 0 : "Leader is stepped down instance"
1633 0 : ))));
1634 0 : }
1635 0 : }
1636 :
1637 0 : tracing::info!("Forwarding {} to leader at {}", uri, leader.address);
1638 :
1639 : // Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
1640 : // include some leeway to get the timeout for proxied requests.
1641 : const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
1642 0 : let client = reqwest::ClientBuilder::new()
1643 0 : .timeout(PROXIED_REQUEST_TIMEOUT)
1644 0 : .build();
1645 0 : let client = match client {
1646 0 : Ok(client) => client,
1647 0 : Err(err) => {
1648 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1649 0 : "Failed to build leader client for forwarding while in stepped down state: {err}"
1650 0 : ))));
1651 : }
1652 : };
1653 :
1654 0 : let request: reqwest::Request = match convert_request(req, &client, leader.address).await {
1655 0 : Ok(r) => r,
1656 0 : Err(err) => {
1657 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1658 0 : "Failed to convert request for forwarding while in stepped down state: {err}"
1659 0 : ))));
1660 : }
1661 : };
1662 :
1663 0 : let response = match client.execute(request).await {
1664 0 : Ok(r) => r,
1665 0 : Err(err) => {
1666 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1667 0 : "Failed to forward while in stepped down state: {err}"
1668 0 : ))));
1669 : }
1670 : };
1671 :
1672 0 : ForwardOutcome::Forwarded(convert_response(response).await)
1673 0 : }
1674 :
1675 : /// Convert a [`reqwest::Response`] to a [hyper::Response`] by passing through
1676 : /// a stable representation (string, bytes or integer)
1677 : ///
1678 : /// Ideally, we would not have to do this since both types use the http crate
1679 : /// under the hood. However, they use different versions of the crate and keeping
1680 : /// second order dependencies in sync is difficult.
1681 0 : async fn convert_response(resp: reqwest::Response) -> Result<hyper::Response<Body>, ApiError> {
1682 : use std::str::FromStr;
1683 :
1684 0 : let mut builder = hyper::Response::builder().status(resp.status().as_u16());
1685 0 : for (key, value) in resp.headers().into_iter() {
1686 0 : let key = hyper::header::HeaderName::from_str(key.as_str()).map_err(|err| {
1687 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1688 0 : })?;
1689 :
1690 0 : let value = hyper::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
1691 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1692 0 : })?;
1693 :
1694 0 : builder = builder.header(key, value);
1695 : }
1696 :
1697 0 : let body = http::Body::wrap_stream(resp.bytes_stream());
1698 0 :
1699 0 : builder.body(body).map_err(|err| {
1700 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1701 0 : })
1702 0 : }
1703 :
1704 : /// Convert a [`reqwest::Request`] to a [hyper::Request`] by passing through
1705 : /// a stable representation (string, bytes or integer)
1706 : ///
1707 : /// See [`convert_response`] for why we are doing it this way.
1708 0 : async fn convert_request(
1709 0 : req: hyper::Request<Body>,
1710 0 : client: &reqwest::Client,
1711 0 : to_address: String,
1712 0 : ) -> Result<reqwest::Request, ApiError> {
1713 : use std::str::FromStr;
1714 :
1715 0 : let (parts, body) = req.into_parts();
1716 0 : let method = reqwest::Method::from_str(parts.method.as_str()).map_err(|err| {
1717 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1718 0 : })?;
1719 :
1720 0 : let path_and_query = parts.uri.path_and_query().ok_or_else(|| {
1721 0 : ApiError::InternalServerError(anyhow::anyhow!(
1722 0 : "Request conversion failed: no path and query"
1723 0 : ))
1724 0 : })?;
1725 :
1726 0 : let uri = reqwest::Url::from_str(
1727 0 : format!(
1728 0 : "{}{}",
1729 0 : to_address.trim_end_matches("/"),
1730 0 : path_and_query.as_str()
1731 0 : )
1732 0 : .as_str(),
1733 0 : )
1734 0 : .map_err(|err| {
1735 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1736 0 : })?;
1737 :
1738 0 : let mut headers = reqwest::header::HeaderMap::new();
1739 0 : for (key, value) in parts.headers.into_iter() {
1740 0 : let key = match key {
1741 0 : Some(k) => k,
1742 : None => {
1743 0 : continue;
1744 : }
1745 : };
1746 :
1747 0 : let key = reqwest::header::HeaderName::from_str(key.as_str()).map_err(|err| {
1748 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1749 0 : })?;
1750 :
1751 0 : let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
1752 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1753 0 : })?;
1754 :
1755 0 : headers.insert(key, value);
1756 : }
1757 :
1758 0 : let body = hyper::body::to_bytes(body).await.map_err(|err| {
1759 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1760 0 : })?;
1761 :
1762 0 : client
1763 0 : .request(method, uri)
1764 0 : .headers(headers)
1765 0 : .body(body)
1766 0 : .build()
1767 0 : .map_err(|err| {
1768 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1769 0 : })
1770 0 : }
1771 :
1772 0 : pub fn make_router(
1773 0 : service: Arc<Service>,
1774 0 : auth: Option<Arc<SwappableJwtAuth>>,
1775 0 : build_info: BuildInfo,
1776 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
1777 0 : let mut router = endpoint::make_router()
1778 0 : .middleware(prologue_leadership_status_check_middleware())
1779 0 : .middleware(prologue_metrics_middleware())
1780 0 : .middleware(epilogue_metrics_middleware());
1781 0 : if auth.is_some() {
1782 0 : router = router.middleware(auth_middleware(|request| {
1783 0 : let state = get_state(request);
1784 0 : if state.allowlist_routes.contains(&request.uri().path()) {
1785 0 : None
1786 : } else {
1787 0 : state.auth.as_deref()
1788 : }
1789 0 : }));
1790 0 : }
1791 :
1792 0 : router
1793 0 : .data(Arc::new(HttpState::new(service, auth, build_info)))
1794 0 : .get("/metrics", |r| {
1795 0 : named_request_span(r, measured_metrics_handler, RequestName("metrics"))
1796 0 : })
1797 0 : // Non-prefixed generic endpoints (status, metrics, profiling)
1798 0 : .get("/status", |r| {
1799 0 : named_request_span(r, handle_status, RequestName("status"))
1800 0 : })
1801 0 : .get("/ready", |r| {
1802 0 : named_request_span(r, handle_ready, RequestName("ready"))
1803 0 : })
1804 0 : .get("/profile/cpu", |r| {
1805 0 : named_request_span(r, profile_cpu_handler, RequestName("profile_cpu"))
1806 0 : })
1807 0 : .get("/profile/heap", |r| {
1808 0 : named_request_span(r, profile_heap_handler, RequestName("profile_heap"))
1809 0 : })
1810 0 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
1811 0 : .post("/upcall/v1/re-attach", |r| {
1812 0 : named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
1813 0 : })
1814 0 : .post("/upcall/v1/validate", |r| {
1815 0 : named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
1816 0 : })
1817 0 : // Test/dev/debug endpoints
1818 0 : .post("/debug/v1/attach-hook", |r| {
1819 0 : named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
1820 0 : })
1821 0 : .post("/debug/v1/inspect", |r| {
1822 0 : named_request_span(r, handle_inspect, RequestName("debug_v1_inspect"))
1823 0 : })
1824 0 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
1825 0 : named_request_span(r, handle_tenant_drop, RequestName("debug_v1_tenant_drop"))
1826 0 : })
1827 0 : .post("/debug/v1/node/:node_id/drop", |r| {
1828 0 : named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
1829 0 : })
1830 0 : .post("/debug/v1/tenant/:tenant_id/import", |r| {
1831 0 : named_request_span(
1832 0 : r,
1833 0 : handle_tenant_import,
1834 0 : RequestName("debug_v1_tenant_import"),
1835 0 : )
1836 0 : })
1837 0 : .get("/debug/v1/tenant", |r| {
1838 0 : named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
1839 0 : })
1840 0 : .get("/debug/v1/tenant/:tenant_id/locate", |r| {
1841 0 : tenant_service_handler(
1842 0 : r,
1843 0 : handle_tenant_locate,
1844 0 : RequestName("debug_v1_tenant_locate"),
1845 0 : )
1846 0 : })
1847 0 : .get("/debug/v1/scheduler", |r| {
1848 0 : named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
1849 0 : })
1850 0 : .post("/debug/v1/consistency_check", |r| {
1851 0 : named_request_span(
1852 0 : r,
1853 0 : handle_consistency_check,
1854 0 : RequestName("debug_v1_consistency_check"),
1855 0 : )
1856 0 : })
1857 0 : .post("/debug/v1/reconcile_all", |r| {
1858 0 : request_span(r, handle_reconcile_all)
1859 0 : })
1860 0 : .put("/debug/v1/failpoints", |r| {
1861 0 : request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
1862 0 : })
1863 0 : // Node operations
1864 0 : .post("/control/v1/node", |r| {
1865 0 : named_request_span(r, handle_node_register, RequestName("control_v1_node"))
1866 0 : })
1867 0 : .delete("/control/v1/node/:node_id", |r| {
1868 0 : named_request_span(r, handle_node_delete, RequestName("control_v1_node_delete"))
1869 0 : })
1870 0 : .get("/control/v1/node", |r| {
1871 0 : named_request_span(r, handle_node_list, RequestName("control_v1_node"))
1872 0 : })
1873 0 : .put("/control/v1/node/:node_id/config", |r| {
1874 0 : named_request_span(
1875 0 : r,
1876 0 : handle_node_configure,
1877 0 : RequestName("control_v1_node_config"),
1878 0 : )
1879 0 : })
1880 0 : .get("/control/v1/node/:node_id", |r| {
1881 0 : named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
1882 0 : })
1883 0 : .get("/control/v1/node/:node_id/shards", |r| {
1884 0 : named_request_span(
1885 0 : r,
1886 0 : handle_node_shards,
1887 0 : RequestName("control_v1_node_describe"),
1888 0 : )
1889 0 : })
1890 0 : .get("/control/v1/leader", |r| {
1891 0 : named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
1892 0 : })
1893 0 : .put("/control/v1/node/:node_id/drain", |r| {
1894 0 : named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
1895 0 : })
1896 0 : .delete("/control/v1/node/:node_id/drain", |r| {
1897 0 : named_request_span(
1898 0 : r,
1899 0 : handle_cancel_node_drain,
1900 0 : RequestName("control_v1_cancel_node_drain"),
1901 0 : )
1902 0 : })
1903 0 : .put("/control/v1/node/:node_id/fill", |r| {
1904 0 : named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
1905 0 : })
1906 0 : .delete("/control/v1/node/:node_id/fill", |r| {
1907 0 : named_request_span(
1908 0 : r,
1909 0 : handle_cancel_node_fill,
1910 0 : RequestName("control_v1_cancel_node_fill"),
1911 0 : )
1912 0 : })
1913 0 : // Metadata health operations
1914 0 : .post("/control/v1/metadata_health/update", |r| {
1915 0 : named_request_span(
1916 0 : r,
1917 0 : handle_metadata_health_update,
1918 0 : RequestName("control_v1_metadata_health_update"),
1919 0 : )
1920 0 : })
1921 0 : .get("/control/v1/metadata_health/unhealthy", |r| {
1922 0 : named_request_span(
1923 0 : r,
1924 0 : handle_metadata_health_list_unhealthy,
1925 0 : RequestName("control_v1_metadata_health_list_unhealthy"),
1926 0 : )
1927 0 : })
1928 0 : .post("/control/v1/metadata_health/outdated", |r| {
1929 0 : named_request_span(
1930 0 : r,
1931 0 : handle_metadata_health_list_outdated,
1932 0 : RequestName("control_v1_metadata_health_list_outdated"),
1933 0 : )
1934 0 : })
1935 0 : // Safekeepers
1936 0 : .get("/control/v1/safekeeper", |r| {
1937 0 : named_request_span(
1938 0 : r,
1939 0 : handle_safekeeper_list,
1940 0 : RequestName("control_v1_safekeeper_list"),
1941 0 : )
1942 0 : })
1943 0 : .get("/control/v1/safekeeper/:id", |r| {
1944 0 : named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
1945 0 : })
1946 0 : .post("/control/v1/safekeeper/:id", |r| {
1947 0 : // id is in the body
1948 0 : named_request_span(
1949 0 : r,
1950 0 : handle_upsert_safekeeper,
1951 0 : RequestName("v1_safekeeper_post"),
1952 0 : )
1953 0 : })
1954 0 : .post("/control/v1/safekeeper/:id/scheduling_policy", |r| {
1955 0 : named_request_span(
1956 0 : r,
1957 0 : handle_safekeeper_scheduling_policy,
1958 0 : RequestName("v1_safekeeper_status"),
1959 0 : )
1960 0 : })
1961 0 : // Tenant Shard operations
1962 0 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
1963 0 : tenant_service_handler(
1964 0 : r,
1965 0 : handle_tenant_shard_migrate,
1966 0 : RequestName("control_v1_tenant_migrate"),
1967 0 : )
1968 0 : })
1969 0 : .put(
1970 0 : "/control/v1/tenant/:tenant_shard_id/migrate_secondary",
1971 0 : |r| {
1972 0 : tenant_service_handler(
1973 0 : r,
1974 0 : handle_tenant_shard_migrate_secondary,
1975 0 : RequestName("control_v1_tenant_migrate_secondary"),
1976 0 : )
1977 0 : },
1978 0 : )
1979 0 : .put(
1980 0 : "/control/v1/tenant/:tenant_shard_id/cancel_reconcile",
1981 0 : |r| {
1982 0 : tenant_service_handler(
1983 0 : r,
1984 0 : handle_tenant_shard_cancel_reconcile,
1985 0 : RequestName("control_v1_tenant_cancel_reconcile"),
1986 0 : )
1987 0 : },
1988 0 : )
1989 0 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
1990 0 : tenant_service_handler(
1991 0 : r,
1992 0 : handle_tenant_shard_split,
1993 0 : RequestName("control_v1_tenant_shard_split"),
1994 0 : )
1995 0 : })
1996 0 : .get("/control/v1/tenant/:tenant_id", |r| {
1997 0 : tenant_service_handler(
1998 0 : r,
1999 0 : handle_tenant_describe,
2000 0 : RequestName("control_v1_tenant_describe"),
2001 0 : )
2002 0 : })
2003 0 : .get("/control/v1/tenant", |r| {
2004 0 : tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list"))
2005 0 : })
2006 0 : .put("/control/v1/tenant/:tenant_id/policy", |r| {
2007 0 : named_request_span(
2008 0 : r,
2009 0 : handle_tenant_update_policy,
2010 0 : RequestName("control_v1_tenant_policy"),
2011 0 : )
2012 0 : })
2013 0 : .put("/control/v1/preferred_azs", |r| {
2014 0 : named_request_span(
2015 0 : r,
2016 0 : handle_update_preferred_azs,
2017 0 : RequestName("control_v1_preferred_azs"),
2018 0 : )
2019 0 : })
2020 0 : .put("/control/v1/step_down", |r| {
2021 0 : named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
2022 0 : })
2023 0 : // Tenant operations
2024 0 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
2025 0 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
2026 0 : .post("/v1/tenant", |r| {
2027 0 : tenant_service_handler(r, handle_tenant_create, RequestName("v1_tenant"))
2028 0 : })
2029 0 : .delete("/v1/tenant/:tenant_id", |r| {
2030 0 : tenant_service_handler(r, handle_tenant_delete, RequestName("v1_tenant"))
2031 0 : })
2032 0 : .patch("/v1/tenant/config", |r| {
2033 0 : tenant_service_handler(
2034 0 : r,
2035 0 : handle_tenant_config_patch,
2036 0 : RequestName("v1_tenant_config"),
2037 0 : )
2038 0 : })
2039 0 : .put("/v1/tenant/config", |r| {
2040 0 : tenant_service_handler(r, handle_tenant_config_set, RequestName("v1_tenant_config"))
2041 0 : })
2042 0 : .get("/v1/tenant/:tenant_id/config", |r| {
2043 0 : tenant_service_handler(r, handle_tenant_config_get, RequestName("v1_tenant_config"))
2044 0 : })
2045 0 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
2046 0 : tenant_service_handler(
2047 0 : r,
2048 0 : handle_tenant_location_config,
2049 0 : RequestName("v1_tenant_location_config"),
2050 0 : )
2051 0 : })
2052 0 : .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
2053 0 : tenant_service_handler(
2054 0 : r,
2055 0 : handle_tenant_time_travel_remote_storage,
2056 0 : RequestName("v1_tenant_time_travel_remote_storage"),
2057 0 : )
2058 0 : })
2059 0 : .post("/v1/tenant/:tenant_id/secondary/download", |r| {
2060 0 : tenant_service_handler(
2061 0 : r,
2062 0 : handle_tenant_secondary_download,
2063 0 : RequestName("v1_tenant_secondary_download"),
2064 0 : )
2065 0 : })
2066 0 : // Timeline operations
2067 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
2068 0 : tenant_service_handler(
2069 0 : r,
2070 0 : handle_tenant_timeline_delete,
2071 0 : RequestName("v1_tenant_timeline"),
2072 0 : )
2073 0 : })
2074 0 : .post("/v1/tenant/:tenant_id/timeline", |r| {
2075 0 : tenant_service_handler(
2076 0 : r,
2077 0 : handle_tenant_timeline_create,
2078 0 : RequestName("v1_tenant_timeline"),
2079 0 : )
2080 0 : })
2081 0 : .put(
2082 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/archival_config",
2083 0 : |r| {
2084 0 : tenant_service_handler(
2085 0 : r,
2086 0 : handle_tenant_timeline_archival_config,
2087 0 : RequestName("v1_tenant_timeline_archival_config"),
2088 0 : )
2089 0 : },
2090 0 : )
2091 0 : .put(
2092 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/detach_ancestor",
2093 0 : |r| {
2094 0 : tenant_service_handler(
2095 0 : r,
2096 0 : handle_tenant_timeline_detach_ancestor,
2097 0 : RequestName("v1_tenant_timeline_detach_ancestor"),
2098 0 : )
2099 0 : },
2100 0 : )
2101 0 : .post(
2102 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
2103 0 : |r| {
2104 0 : tenant_service_handler(
2105 0 : r,
2106 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
2107 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2108 0 : )
2109 0 : },
2110 0 : )
2111 0 : .post(
2112 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
2113 0 : |r| {
2114 0 : tenant_service_handler(
2115 0 : r,
2116 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
2117 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2118 0 : )
2119 0 : },
2120 0 : )
2121 0 : .post(
2122 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
2123 0 : |r| {
2124 0 : tenant_service_handler(
2125 0 : r,
2126 0 : handle_tenant_timeline_download_heatmap_layers,
2127 0 : RequestName("v1_tenant_timeline_download_heatmap_layers"),
2128 0 : )
2129 0 : },
2130 0 : )
2131 0 : // Tenant detail GET passthrough to shard zero:
2132 0 : .get("/v1/tenant/:tenant_id", |r| {
2133 0 : tenant_service_handler(
2134 0 : r,
2135 0 : handle_tenant_timeline_passthrough,
2136 0 : RequestName("v1_tenant_passthrough"),
2137 0 : )
2138 0 : })
2139 0 : // The `*` in the URL is a wildcard: any tenant/timeline GET APIs on the pageserver
2140 0 : // are implicitly exposed here. This must be last in the list to avoid
2141 0 : // taking precedence over other GET methods we might implement by hand.
2142 0 : .get("/v1/tenant/:tenant_id/*", |r| {
2143 0 : tenant_service_handler(
2144 0 : r,
2145 0 : handle_tenant_timeline_passthrough,
2146 0 : RequestName("v1_tenant_passthrough"),
2147 0 : )
2148 0 : })
2149 0 : }
2150 :
2151 : #[cfg(test)]
2152 : mod test {
2153 :
2154 : use super::path_without_ids;
2155 :
2156 : #[test]
2157 1 : fn test_path_without_ids() {
2158 1 : assert_eq!(path_without_ids("/v1/tenant/1a2b3344556677881122334455667788/timeline/AA223344556677881122334455667788"), "/v1/tenant//timeline/");
2159 1 : assert_eq!(path_without_ids("/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788"), "/v1/tenant//timeline/");
2160 1 : assert_eq!(path_without_ids("/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788?parameter=foo"), "/v1/tenant//timeline/");
2161 1 : }
2162 : }
|