Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::{Arc, LazyLock};
3 : use std::time::{Duration, Instant};
4 :
5 : use anyhow::Context;
6 : use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
7 : use futures::Future;
8 : use http_utils::endpoint::{
9 : self, auth_middleware, check_permission_with, profile_cpu_handler, profile_heap_handler,
10 : request_span,
11 : };
12 : use http_utils::error::ApiError;
13 : use http_utils::failpoints::failpoints_handler;
14 : use http_utils::json::{json_request, json_response};
15 : use http_utils::request::{must_get_query_param, parse_query_param, parse_request_param};
16 : use http_utils::{RequestExt, RouterBuilder};
17 : use hyper::header::CONTENT_TYPE;
18 : use hyper::{Body, Request, Response, StatusCode, Uri};
19 : use metrics::{BuildInfo, NeonMetrics};
20 : use pageserver_api::controller_api::{
21 : MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
22 : MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
23 : NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest,
24 : ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
25 : };
26 : use pageserver_api::models::{
27 : DetachBehavior, TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
28 : TenantShardSplitRequest, TenantTimeTravelRequest, TimelineArchivalConfigRequest,
29 : TimelineCreateRequest,
30 : };
31 : use pageserver_api::shard::TenantShardId;
32 : use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
33 : use pageserver_client::{BlockUnblock, mgmt_api};
34 : use routerify::Middleware;
35 : use tokio_util::sync::CancellationToken;
36 : use tracing::warn;
37 : use utils::auth::{Scope, SwappableJwtAuth};
38 : use utils::id::{NodeId, TenantId, TimelineId};
39 :
40 : use crate::http;
41 : use crate::metrics::{
42 : HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, METRICS_REGISTRY,
43 : PageserverRequestLabelGroup,
44 : };
45 : use crate::persistence::SafekeeperUpsert;
46 : use crate::reconciler::ReconcileError;
47 : use crate::service::{LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service};
48 :
49 : /// State available to HTTP request handlers
50 : pub struct HttpState {
51 : service: Arc<crate::service::Service>,
52 : auth: Option<Arc<SwappableJwtAuth>>,
53 : rate_limiter: governor::DefaultKeyedRateLimiter<TenantId>,
54 : neon_metrics: NeonMetrics,
55 : allowlist_routes: &'static [&'static str],
56 : }
57 :
58 : impl HttpState {
59 0 : pub fn new(
60 0 : service: Arc<crate::service::Service>,
61 0 : auth: Option<Arc<SwappableJwtAuth>>,
62 0 : build_info: BuildInfo,
63 0 : ) -> Self {
64 0 : let quota = governor::Quota::per_second(service.get_config().tenant_rate_limit);
65 0 : Self {
66 0 : service,
67 0 : auth,
68 0 : rate_limiter: governor::RateLimiter::keyed(quota),
69 0 : neon_metrics: NeonMetrics::new(build_info),
70 0 : allowlist_routes: &[
71 0 : "/status",
72 0 : "/ready",
73 0 : "/metrics",
74 0 : "/profile/cpu",
75 0 : "/profile/heap",
76 0 : ],
77 0 : }
78 0 : }
79 : }
80 :
81 : #[inline(always)]
82 0 : fn get_state(request: &Request<Body>) -> &HttpState {
83 0 : request
84 0 : .data::<Arc<HttpState>>()
85 0 : .expect("unknown state type")
86 0 : .as_ref()
87 0 : }
88 :
89 : /// Rate limits tenant requests.
90 : ///
91 : /// TODO: this should be a request middleware, but requires us to extract the tenant ID from
92 : /// different URLs in a systematic way.
93 : ///
94 : /// TODO: consider returning a 429 response if these start piling up.
95 0 : async fn maybe_rate_limit(request: &Request<Body>, tenant_id: TenantId) {
96 0 : // Check if the tenant should be rate-limited.
97 0 : let rate_limiter = &get_state(request).rate_limiter;
98 0 : if rate_limiter.check_key(&tenant_id).is_ok() {
99 0 : return;
100 0 : }
101 0 :
102 0 : // Measure the rate limiting delay.
103 0 : let _timer = METRICS_REGISTRY
104 0 : .metrics_group
105 0 : .storage_controller_http_request_rate_limited
106 0 : .start_timer();
107 :
108 : // Log rate limited tenants once every 10 seconds.
109 : static LOG_RATE_LIMITER: LazyLock<governor::DefaultKeyedRateLimiter<TenantId>> =
110 0 : LazyLock::new(|| {
111 0 : let quota = governor::Quota::with_period(Duration::from_secs(10)).unwrap();
112 0 : governor::RateLimiter::keyed(quota)
113 0 : });
114 :
115 0 : if LOG_RATE_LIMITER.check_key(&tenant_id).is_ok() {
116 0 : warn!("tenant {tenant_id} is rate limited")
117 0 : }
118 :
119 : // Wait for quota.
120 0 : rate_limiter.until_key_ready(&tenant_id).await;
121 0 : }
122 :
123 : /// Pageserver calls into this on startup, to learn which tenants it should attach
124 0 : async fn handle_re_attach(req: Request<Body>) -> Result<Response<Body>, ApiError> {
125 0 : check_permissions(&req, Scope::GenerationsApi)?;
126 :
127 0 : let mut req = match maybe_forward(req).await {
128 0 : ForwardOutcome::Forwarded(res) => {
129 0 : return res;
130 : }
131 0 : ForwardOutcome::NotForwarded(req) => req,
132 : };
133 :
134 0 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
135 0 : let state = get_state(&req);
136 0 : json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
137 0 : }
138 :
139 : /// Pageserver calls into this before doing deletions, to confirm that it still
140 : /// holds the latest generation for the tenants with deletions enqueued
141 0 : async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError> {
142 0 : check_permissions(&req, Scope::GenerationsApi)?;
143 :
144 0 : let mut req = match maybe_forward(req).await {
145 0 : ForwardOutcome::Forwarded(res) => {
146 0 : return res;
147 : }
148 0 : ForwardOutcome::NotForwarded(req) => req,
149 : };
150 :
151 0 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
152 0 : let state = get_state(&req);
153 0 : json_response(StatusCode::OK, state.service.validate(validate_req).await?)
154 0 : }
155 :
156 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
157 : /// (in the real control plane this is unnecessary, because the same program is managing
158 : /// generation numbers and doing attachments).
159 0 : async fn handle_attach_hook(req: Request<Body>) -> Result<Response<Body>, ApiError> {
160 0 : check_permissions(&req, Scope::Admin)?;
161 :
162 0 : let mut req = match maybe_forward(req).await {
163 0 : ForwardOutcome::Forwarded(res) => {
164 0 : return res;
165 : }
166 0 : ForwardOutcome::NotForwarded(req) => req,
167 : };
168 :
169 0 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
170 0 : let state = get_state(&req);
171 0 :
172 0 : json_response(
173 0 : StatusCode::OK,
174 0 : state
175 0 : .service
176 0 : .attach_hook(attach_req)
177 0 : .await
178 0 : .map_err(ApiError::InternalServerError)?,
179 : )
180 0 : }
181 :
182 0 : async fn handle_inspect(req: Request<Body>) -> Result<Response<Body>, ApiError> {
183 0 : check_permissions(&req, Scope::Admin)?;
184 :
185 0 : let mut req = match maybe_forward(req).await {
186 0 : ForwardOutcome::Forwarded(res) => {
187 0 : return res;
188 : }
189 0 : ForwardOutcome::NotForwarded(req) => req,
190 : };
191 :
192 0 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
193 :
194 0 : let state = get_state(&req);
195 0 :
196 0 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
197 0 : }
198 :
199 0 : async fn handle_tenant_create(
200 0 : service: Arc<Service>,
201 0 : req: Request<Body>,
202 0 : ) -> Result<Response<Body>, ApiError> {
203 0 : check_permissions(&req, Scope::PageServerApi)?;
204 :
205 0 : let mut req = match maybe_forward(req).await {
206 0 : ForwardOutcome::Forwarded(res) => {
207 0 : return res;
208 : }
209 0 : ForwardOutcome::NotForwarded(req) => req,
210 : };
211 :
212 0 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
213 :
214 : json_response(
215 : StatusCode::CREATED,
216 0 : service.tenant_create(create_req).await?,
217 : )
218 0 : }
219 :
220 0 : async fn handle_tenant_location_config(
221 0 : service: Arc<Service>,
222 0 : req: Request<Body>,
223 0 : ) -> Result<Response<Body>, ApiError> {
224 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
225 0 : check_permissions(&req, Scope::PageServerApi)?;
226 :
227 0 : let mut req = match maybe_forward(req).await {
228 0 : ForwardOutcome::Forwarded(res) => {
229 0 : return res;
230 : }
231 0 : ForwardOutcome::NotForwarded(req) => req,
232 : };
233 :
234 0 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
235 : json_response(
236 : StatusCode::OK,
237 0 : service
238 0 : .tenant_location_config(tenant_shard_id, config_req)
239 0 : .await?,
240 : )
241 0 : }
242 :
243 0 : async fn handle_tenant_config_patch(
244 0 : service: Arc<Service>,
245 0 : req: Request<Body>,
246 0 : ) -> Result<Response<Body>, ApiError> {
247 0 : check_permissions(&req, Scope::PageServerApi)?;
248 :
249 0 : let mut req = match maybe_forward(req).await {
250 0 : ForwardOutcome::Forwarded(res) => {
251 0 : return res;
252 : }
253 0 : ForwardOutcome::NotForwarded(req) => req,
254 : };
255 :
256 0 : let config_req = json_request::<TenantConfigPatchRequest>(&mut req).await?;
257 :
258 : json_response(
259 : StatusCode::OK,
260 0 : service.tenant_config_patch(config_req).await?,
261 : )
262 0 : }
263 :
264 0 : async fn handle_tenant_config_set(
265 0 : service: Arc<Service>,
266 0 : req: Request<Body>,
267 0 : ) -> Result<Response<Body>, ApiError> {
268 0 : check_permissions(&req, Scope::PageServerApi)?;
269 :
270 0 : let mut req = match maybe_forward(req).await {
271 0 : ForwardOutcome::Forwarded(res) => {
272 0 : return res;
273 : }
274 0 : ForwardOutcome::NotForwarded(req) => req,
275 : };
276 :
277 0 : let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
278 :
279 0 : json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
280 0 : }
281 :
282 0 : async fn handle_tenant_config_get(
283 0 : service: Arc<Service>,
284 0 : req: Request<Body>,
285 0 : ) -> Result<Response<Body>, ApiError> {
286 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
287 0 : check_permissions(&req, Scope::PageServerApi)?;
288 0 : maybe_rate_limit(&req, tenant_id).await;
289 :
290 0 : match maybe_forward(req).await {
291 0 : ForwardOutcome::Forwarded(res) => {
292 0 : return res;
293 : }
294 0 : ForwardOutcome::NotForwarded(_req) => {}
295 0 : };
296 0 :
297 0 : json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
298 0 : }
299 :
300 0 : async fn handle_tenant_time_travel_remote_storage(
301 0 : service: Arc<Service>,
302 0 : req: Request<Body>,
303 0 : ) -> Result<Response<Body>, ApiError> {
304 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
305 0 : check_permissions(&req, Scope::PageServerApi)?;
306 0 : maybe_rate_limit(&req, tenant_id).await;
307 :
308 0 : let mut req = match maybe_forward(req).await {
309 0 : ForwardOutcome::Forwarded(res) => {
310 0 : return res;
311 : }
312 0 : ForwardOutcome::NotForwarded(req) => req,
313 : };
314 :
315 0 : let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
316 :
317 0 : let timestamp_raw = must_get_query_param(&req, "travel_to")?;
318 0 : let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
319 0 : ApiError::BadRequest(anyhow::anyhow!(
320 0 : "Invalid time for travel_to: {timestamp_raw:?}"
321 0 : ))
322 0 : })?;
323 :
324 0 : let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
325 0 : let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
326 0 : ApiError::BadRequest(anyhow::anyhow!(
327 0 : "Invalid time for done_if_after: {done_if_after_raw:?}"
328 0 : ))
329 0 : })?;
330 :
331 0 : service
332 0 : .tenant_time_travel_remote_storage(
333 0 : &time_travel_req,
334 0 : tenant_id,
335 0 : timestamp_raw,
336 0 : done_if_after_raw,
337 0 : )
338 0 : .await?;
339 0 : json_response(StatusCode::OK, ())
340 0 : }
341 :
342 0 : fn map_reqwest_hyper_status(status: reqwest::StatusCode) -> Result<hyper::StatusCode, ApiError> {
343 0 : hyper::StatusCode::from_u16(status.as_u16())
344 0 : .context("invalid status code")
345 0 : .map_err(ApiError::InternalServerError)
346 0 : }
347 :
348 0 : async fn handle_tenant_secondary_download(
349 0 : service: Arc<Service>,
350 0 : req: Request<Body>,
351 0 : ) -> Result<Response<Body>, ApiError> {
352 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
353 0 : let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
354 0 : maybe_rate_limit(&req, tenant_id).await;
355 :
356 0 : match maybe_forward(req).await {
357 0 : ForwardOutcome::Forwarded(res) => {
358 0 : return res;
359 : }
360 0 : ForwardOutcome::NotForwarded(_req) => {}
361 : };
362 :
363 0 : let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
364 0 : json_response(map_reqwest_hyper_status(status)?, progress)
365 0 : }
366 :
367 0 : async fn handle_tenant_delete(
368 0 : service: Arc<Service>,
369 0 : req: Request<Body>,
370 0 : ) -> Result<Response<Body>, ApiError> {
371 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
372 0 : check_permissions(&req, Scope::PageServerApi)?;
373 0 : maybe_rate_limit(&req, tenant_id).await;
374 :
375 0 : match maybe_forward(req).await {
376 0 : ForwardOutcome::Forwarded(res) => {
377 0 : return res;
378 : }
379 0 : ForwardOutcome::NotForwarded(_req) => {}
380 : };
381 :
382 0 : let status_code = service
383 0 : .tenant_delete(tenant_id)
384 0 : .await
385 0 : .and_then(map_reqwest_hyper_status)?;
386 :
387 0 : if status_code == StatusCode::NOT_FOUND {
388 : // The pageserver uses 404 for successful deletion, but we use 200
389 0 : json_response(StatusCode::OK, ())
390 : } else {
391 0 : json_response(status_code, ())
392 : }
393 0 : }
394 :
395 0 : async fn handle_tenant_timeline_create(
396 0 : service: Arc<Service>,
397 0 : req: Request<Body>,
398 0 : ) -> Result<Response<Body>, ApiError> {
399 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
400 0 : check_permissions(&req, Scope::PageServerApi)?;
401 0 : maybe_rate_limit(&req, tenant_id).await;
402 :
403 0 : let mut req = match maybe_forward(req).await {
404 0 : ForwardOutcome::Forwarded(res) => {
405 0 : return res;
406 : }
407 0 : ForwardOutcome::NotForwarded(req) => req,
408 : };
409 :
410 0 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
411 : json_response(
412 : StatusCode::CREATED,
413 0 : service
414 0 : .tenant_timeline_create(tenant_id, create_req)
415 0 : .await?,
416 : )
417 0 : }
418 :
419 0 : async fn handle_tenant_timeline_delete(
420 0 : service: Arc<Service>,
421 0 : req: Request<Body>,
422 0 : ) -> Result<Response<Body>, ApiError> {
423 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
424 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
425 :
426 0 : check_permissions(&req, Scope::PageServerApi)?;
427 0 : maybe_rate_limit(&req, tenant_id).await;
428 :
429 0 : match maybe_forward(req).await {
430 0 : ForwardOutcome::Forwarded(res) => {
431 0 : return res;
432 : }
433 0 : ForwardOutcome::NotForwarded(_req) => {}
434 : };
435 :
436 : // For timeline deletions, which both implement an "initially return 202, then 404 once
437 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
438 0 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
439 0 : where
440 0 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
441 0 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
442 0 : {
443 : // On subsequent retries, wait longer.
444 : // Enable callers with a 25 second request timeout to reliably get a response
445 : const MAX_WAIT: Duration = Duration::from_secs(25);
446 : const MAX_RETRY_PERIOD: Duration = Duration::from_secs(5);
447 :
448 0 : let started_at = Instant::now();
449 0 :
450 0 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
451 0 : // completed.
452 0 : let mut retry_period = Duration::from_secs(1);
453 :
454 : loop {
455 0 : let status = f(service.clone()).await?;
456 0 : match status {
457 : StatusCode::ACCEPTED => {
458 0 : tracing::info!("Deletion accepted, waiting to try again...");
459 0 : tokio::time::sleep(retry_period).await;
460 0 : retry_period = MAX_RETRY_PERIOD;
461 : }
462 : StatusCode::CONFLICT => {
463 0 : tracing::info!("Deletion already in progress, waiting to try again...");
464 0 : tokio::time::sleep(retry_period).await;
465 : }
466 : StatusCode::NOT_FOUND => {
467 0 : tracing::info!("Deletion complete");
468 0 : return json_response(StatusCode::OK, ());
469 : }
470 : _ => {
471 0 : tracing::warn!("Unexpected status {status}");
472 0 : return json_response(status, ());
473 : }
474 : }
475 :
476 0 : let now = Instant::now();
477 0 : if now + retry_period > started_at + MAX_WAIT {
478 0 : tracing::info!("Deletion timed out waiting for 404");
479 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
480 : // the pageserver's swagger definition for this endpoint, and has the same desired
481 : // effect of causing the control plane to retry later.
482 0 : return json_response(StatusCode::CONFLICT, ());
483 0 : }
484 : }
485 0 : }
486 :
487 0 : deletion_wrapper(service, move |service| async move {
488 0 : service
489 0 : .tenant_timeline_delete(tenant_id, timeline_id)
490 0 : .await
491 0 : .and_then(map_reqwest_hyper_status)
492 0 : })
493 0 : .await
494 0 : }
495 :
496 0 : async fn handle_tenant_timeline_archival_config(
497 0 : service: Arc<Service>,
498 0 : req: Request<Body>,
499 0 : ) -> Result<Response<Body>, ApiError> {
500 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
501 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
502 :
503 0 : check_permissions(&req, Scope::PageServerApi)?;
504 0 : maybe_rate_limit(&req, tenant_id).await;
505 :
506 0 : let mut req = match maybe_forward(req).await {
507 0 : ForwardOutcome::Forwarded(res) => {
508 0 : return res;
509 : }
510 0 : ForwardOutcome::NotForwarded(req) => req,
511 : };
512 :
513 0 : let create_req = json_request::<TimelineArchivalConfigRequest>(&mut req).await?;
514 :
515 0 : service
516 0 : .tenant_timeline_archival_config(tenant_id, timeline_id, create_req)
517 0 : .await?;
518 :
519 0 : json_response(StatusCode::OK, ())
520 0 : }
521 :
522 0 : async fn handle_tenant_timeline_detach_ancestor(
523 0 : service: Arc<Service>,
524 0 : req: Request<Body>,
525 0 : ) -> Result<Response<Body>, ApiError> {
526 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
527 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
528 0 : let behavior: Option<DetachBehavior> = parse_query_param(&req, "detach_behavior")?;
529 :
530 0 : check_permissions(&req, Scope::PageServerApi)?;
531 0 : maybe_rate_limit(&req, tenant_id).await;
532 :
533 0 : match maybe_forward(req).await {
534 0 : ForwardOutcome::Forwarded(res) => {
535 0 : return res;
536 : }
537 0 : ForwardOutcome::NotForwarded(_req) => {}
538 : };
539 :
540 0 : let res = service
541 0 : .tenant_timeline_detach_ancestor(tenant_id, timeline_id, behavior)
542 0 : .await?;
543 :
544 0 : json_response(StatusCode::OK, res)
545 0 : }
546 :
547 0 : async fn handle_tenant_timeline_block_unblock_gc(
548 0 : service: Arc<Service>,
549 0 : req: Request<Body>,
550 0 : dir: BlockUnblock,
551 0 : ) -> Result<Response<Body>, ApiError> {
552 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
553 0 : check_permissions(&req, Scope::PageServerApi)?;
554 0 : maybe_rate_limit(&req, tenant_id).await;
555 :
556 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
557 :
558 0 : service
559 0 : .tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
560 0 : .await?;
561 :
562 0 : json_response(StatusCode::OK, ())
563 0 : }
564 :
565 0 : async fn handle_tenant_timeline_download_heatmap_layers(
566 0 : service: Arc<Service>,
567 0 : req: Request<Body>,
568 0 : ) -> Result<Response<Body>, ApiError> {
569 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
570 :
571 0 : check_permissions(&req, Scope::PageServerApi)?;
572 0 : maybe_rate_limit(&req, tenant_shard_id.tenant_id).await;
573 :
574 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
575 0 : let concurrency: Option<usize> = parse_query_param(&req, "concurrency")?;
576 0 : let recurse = parse_query_param(&req, "recurse")?.unwrap_or(false);
577 0 :
578 0 : service
579 0 : .tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency, recurse)
580 0 : .await?;
581 :
582 0 : json_response(StatusCode::OK, ())
583 0 : }
584 :
585 : // For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
586 : // and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
587 : // compare to, so we can just filter out our well known ID format with regexes.
588 3 : fn path_without_ids(path: &str) -> String {
589 : static ID_REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
590 3 : ID_REGEX
591 3 : .get_or_init(|| regex::Regex::new(r"([0-9a-fA-F]{32}(-[0-9]{4})?|\?.*)").unwrap())
592 3 : .replace_all(path, "")
593 3 : .to_string()
594 3 : }
595 :
596 0 : async fn handle_tenant_timeline_passthrough(
597 0 : service: Arc<Service>,
598 0 : req: Request<Body>,
599 0 : ) -> Result<Response<Body>, ApiError> {
600 0 : let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_id")?;
601 0 : check_permissions(&req, Scope::PageServerApi)?;
602 0 : maybe_rate_limit(&req, tenant_or_shard_id.tenant_id).await;
603 :
604 0 : let req = match maybe_forward(req).await {
605 0 : ForwardOutcome::Forwarded(res) => {
606 0 : return res;
607 : }
608 0 : ForwardOutcome::NotForwarded(req) => req,
609 : };
610 :
611 0 : let Some(path) = req.uri().path_and_query() else {
612 : // This should never happen, our request router only calls us if there is a path
613 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
614 : };
615 :
616 0 : tracing::info!(
617 0 : "Proxying request for tenant {} ({})",
618 : tenant_or_shard_id.tenant_id,
619 : path
620 : );
621 :
622 : // Find the node that holds shard zero
623 0 : let (node, tenant_shard_id) = if tenant_or_shard_id.is_unsharded() {
624 0 : service
625 0 : .tenant_shard0_node(tenant_or_shard_id.tenant_id)
626 0 : .await?
627 : } else {
628 : (
629 0 : service.tenant_shard_node(tenant_or_shard_id).await?,
630 0 : tenant_or_shard_id,
631 : )
632 : };
633 :
634 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
635 : // rewrite this to a shard-aware shard zero ID.
636 0 : let path = format!("{}", path);
637 0 : let tenant_str = tenant_or_shard_id.tenant_id.to_string();
638 0 : let tenant_shard_str = format!("{}", tenant_shard_id);
639 0 : let path = path.replace(&tenant_str, &tenant_shard_str);
640 0 :
641 0 : let latency = &METRICS_REGISTRY
642 0 : .metrics_group
643 0 : .storage_controller_passthrough_request_latency;
644 0 :
645 0 : let path_label = path_without_ids(&path)
646 0 : .split('/')
647 0 : .filter(|token| !token.is_empty())
648 0 : .collect::<Vec<_>>()
649 0 : .join("_");
650 0 : let labels = PageserverRequestLabelGroup {
651 0 : pageserver_id: &node.get_id().to_string(),
652 0 : path: &path_label,
653 0 : method: crate::metrics::Method::Get,
654 0 : };
655 0 :
656 0 : let _timer = latency.start_timer(labels.clone());
657 :
658 0 : let client = mgmt_api::Client::new(
659 0 : node.base_url(),
660 0 : service.get_config().pageserver_jwt_token.as_deref(),
661 0 : service.get_config().ssl_ca_cert.clone(),
662 0 : )
663 0 : .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
664 0 : let resp = client.get_raw(path).await.map_err(|e|
665 : // We return 503 here because if we can't successfully send a request to the pageserver,
666 : // either we aren't available or the pageserver is unavailable.
667 0 : ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
668 :
669 0 : if !resp.status().is_success() {
670 0 : let error_counter = &METRICS_REGISTRY
671 0 : .metrics_group
672 0 : .storage_controller_passthrough_request_error;
673 0 : error_counter.inc(labels);
674 0 : }
675 :
676 : // Transform 404 into 503 if we raced with a migration
677 0 : if resp.status() == reqwest::StatusCode::NOT_FOUND {
678 : // Look up node again: if we migrated it will be different
679 0 : let new_node = service.tenant_shard_node(tenant_shard_id).await?;
680 0 : if new_node.get_id() != node.get_id() {
681 : // Rather than retry here, send the client a 503 to prompt a retry: this matches
682 : // the pageserver's use of 503, and all clients calling this API should retry on 503.
683 0 : return Err(ApiError::ResourceUnavailable(
684 0 : format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
685 0 : ));
686 0 : }
687 0 : }
688 :
689 : // We have a reqest::Response, would like a http::Response
690 0 : let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
691 0 : for (k, v) in resp.headers() {
692 0 : builder = builder.header(k.as_str(), v.as_bytes());
693 0 : }
694 :
695 0 : let response = builder
696 0 : .body(Body::wrap_stream(resp.bytes_stream()))
697 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
698 :
699 0 : Ok(response)
700 0 : }
701 :
702 0 : async fn handle_tenant_locate(
703 0 : service: Arc<Service>,
704 0 : req: Request<Body>,
705 0 : ) -> Result<Response<Body>, ApiError> {
706 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
707 :
708 0 : check_permissions(&req, Scope::Admin)?;
709 : // NB: don't rate limit: admin operation.
710 :
711 0 : match maybe_forward(req).await {
712 0 : ForwardOutcome::Forwarded(res) => {
713 0 : return res;
714 : }
715 0 : ForwardOutcome::NotForwarded(_req) => {}
716 0 : };
717 0 :
718 0 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
719 0 : }
720 :
721 0 : async fn handle_tenant_describe(
722 0 : service: Arc<Service>,
723 0 : req: Request<Body>,
724 0 : ) -> Result<Response<Body>, ApiError> {
725 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
726 0 : check_permissions(&req, Scope::Scrubber)?;
727 : // NB: don't rate limit: scrubber operation.
728 :
729 0 : match maybe_forward(req).await {
730 0 : ForwardOutcome::Forwarded(res) => {
731 0 : return res;
732 : }
733 0 : ForwardOutcome::NotForwarded(_req) => {}
734 0 : };
735 0 :
736 0 : json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
737 0 : }
738 :
739 0 : async fn handle_tenant_list(
740 0 : service: Arc<Service>,
741 0 : req: Request<Body>,
742 0 : ) -> Result<Response<Body>, ApiError> {
743 0 : check_permissions(&req, Scope::Admin)?;
744 :
745 0 : let limit: Option<usize> = parse_query_param(&req, "limit")?;
746 0 : let start_after: Option<TenantId> = parse_query_param(&req, "start_after")?;
747 0 : tracing::info!("start_after: {:?}", start_after);
748 :
749 0 : match maybe_forward(req).await {
750 0 : ForwardOutcome::Forwarded(res) => {
751 0 : return res;
752 : }
753 0 : ForwardOutcome::NotForwarded(_req) => {}
754 0 : };
755 0 :
756 0 : json_response(StatusCode::OK, service.tenant_list(limit, start_after))
757 0 : }
758 :
759 0 : async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiError> {
760 0 : check_permissions(&req, Scope::Infra)?;
761 :
762 0 : let mut req = match maybe_forward(req).await {
763 0 : ForwardOutcome::Forwarded(res) => {
764 0 : return res;
765 : }
766 0 : ForwardOutcome::NotForwarded(req) => req,
767 : };
768 :
769 0 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
770 0 : let state = get_state(&req);
771 0 : state.service.node_register(register_req).await?;
772 0 : json_response(StatusCode::OK, ())
773 0 : }
774 :
775 0 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
776 0 : check_permissions(&req, Scope::Infra)?;
777 :
778 0 : let req = match maybe_forward(req).await {
779 0 : ForwardOutcome::Forwarded(res) => {
780 0 : return res;
781 : }
782 0 : ForwardOutcome::NotForwarded(req) => req,
783 0 : };
784 0 :
785 0 : let state = get_state(&req);
786 0 : let mut nodes = state.service.node_list().await?;
787 0 : nodes.sort_by_key(|n| n.get_id());
788 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
789 0 :
790 0 : json_response(StatusCode::OK, api_nodes)
791 0 : }
792 :
793 0 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
794 0 : check_permissions(&req, Scope::Admin)?;
795 :
796 0 : let req = match maybe_forward(req).await {
797 0 : ForwardOutcome::Forwarded(res) => {
798 0 : return res;
799 : }
800 0 : ForwardOutcome::NotForwarded(req) => req,
801 0 : };
802 0 :
803 0 : let state = get_state(&req);
804 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
805 0 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
806 0 : }
807 :
808 0 : async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
809 0 : check_permissions(&req, Scope::Admin)?;
810 :
811 0 : let req = match maybe_forward(req).await {
812 0 : ForwardOutcome::Forwarded(res) => {
813 0 : return res;
814 : }
815 0 : ForwardOutcome::NotForwarded(req) => req,
816 0 : };
817 0 :
818 0 : let state = get_state(&req);
819 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
820 0 : json_response(StatusCode::OK, state.service.node_delete(node_id).await?)
821 0 : }
822 :
823 0 : async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, ApiError> {
824 0 : check_permissions(&req, Scope::Admin)?;
825 :
826 0 : let mut req = match maybe_forward(req).await {
827 0 : ForwardOutcome::Forwarded(res) => {
828 0 : return res;
829 : }
830 0 : ForwardOutcome::NotForwarded(req) => req,
831 : };
832 :
833 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
834 0 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
835 0 : if node_id != config_req.node_id {
836 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
837 0 : "Path and body node_id differ"
838 0 : )));
839 0 : }
840 0 : let state = get_state(&req);
841 0 :
842 0 : json_response(
843 0 : StatusCode::OK,
844 0 : state
845 0 : .service
846 0 : .external_node_configure(
847 0 : config_req.node_id,
848 0 : config_req.availability.map(NodeAvailability::from),
849 0 : config_req.scheduling,
850 0 : )
851 0 : .await?,
852 : )
853 0 : }
854 :
855 0 : async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
856 0 : check_permissions(&req, Scope::Infra)?;
857 :
858 0 : let req = match maybe_forward(req).await {
859 0 : ForwardOutcome::Forwarded(res) => {
860 0 : return res;
861 : }
862 0 : ForwardOutcome::NotForwarded(req) => req,
863 0 : };
864 0 :
865 0 : let state = get_state(&req);
866 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
867 :
868 0 : let node_status = state.service.get_node(node_id).await?;
869 :
870 0 : json_response(StatusCode::OK, node_status)
871 0 : }
872 :
873 0 : async fn handle_node_shards(req: Request<Body>) -> Result<Response<Body>, ApiError> {
874 0 : check_permissions(&req, Scope::Admin)?;
875 :
876 0 : let state = get_state(&req);
877 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
878 :
879 0 : let node_status = state.service.get_node_shards(node_id).await?;
880 :
881 0 : json_response(StatusCode::OK, node_status)
882 0 : }
883 :
884 0 : async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
885 0 : check_permissions(&req, Scope::Admin)?;
886 :
887 0 : let req = match maybe_forward(req).await {
888 0 : ForwardOutcome::Forwarded(res) => {
889 0 : return res;
890 : }
891 0 : ForwardOutcome::NotForwarded(req) => req,
892 0 : };
893 0 :
894 0 : let state = get_state(&req);
895 0 : let leader = state.service.get_leader().await.map_err(|err| {
896 0 : ApiError::InternalServerError(anyhow::anyhow!(
897 0 : "Failed to read leader from database: {err}"
898 0 : ))
899 0 : })?;
900 :
901 0 : json_response(StatusCode::OK, leader)
902 0 : }
903 :
904 0 : async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
905 0 : check_permissions(&req, Scope::Infra)?;
906 :
907 0 : let req = match maybe_forward(req).await {
908 0 : ForwardOutcome::Forwarded(res) => {
909 0 : return res;
910 : }
911 0 : ForwardOutcome::NotForwarded(req) => req,
912 0 : };
913 0 :
914 0 : let state = get_state(&req);
915 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
916 :
917 0 : state.service.start_node_drain(node_id).await?;
918 :
919 0 : json_response(StatusCode::ACCEPTED, ())
920 0 : }
921 :
922 0 : async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
923 0 : check_permissions(&req, Scope::Infra)?;
924 :
925 0 : let req = match maybe_forward(req).await {
926 0 : ForwardOutcome::Forwarded(res) => {
927 0 : return res;
928 : }
929 0 : ForwardOutcome::NotForwarded(req) => req,
930 0 : };
931 0 :
932 0 : let state = get_state(&req);
933 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
934 :
935 0 : state.service.cancel_node_drain(node_id).await?;
936 :
937 0 : json_response(StatusCode::ACCEPTED, ())
938 0 : }
939 :
940 0 : async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
941 0 : check_permissions(&req, Scope::Infra)?;
942 :
943 0 : let req = match maybe_forward(req).await {
944 0 : ForwardOutcome::Forwarded(res) => {
945 0 : return res;
946 : }
947 0 : ForwardOutcome::NotForwarded(req) => req,
948 0 : };
949 0 :
950 0 : let state = get_state(&req);
951 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
952 :
953 0 : state.service.start_node_fill(node_id).await?;
954 :
955 0 : json_response(StatusCode::ACCEPTED, ())
956 0 : }
957 :
958 0 : async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
959 0 : check_permissions(&req, Scope::Infra)?;
960 :
961 0 : let req = match maybe_forward(req).await {
962 0 : ForwardOutcome::Forwarded(res) => {
963 0 : return res;
964 : }
965 0 : ForwardOutcome::NotForwarded(req) => req,
966 0 : };
967 0 :
968 0 : let state = get_state(&req);
969 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
970 :
971 0 : state.service.cancel_node_fill(node_id).await?;
972 :
973 0 : json_response(StatusCode::ACCEPTED, ())
974 0 : }
975 :
976 0 : async fn handle_safekeeper_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
977 0 : check_permissions(&req, Scope::Infra)?;
978 :
979 0 : let req = match maybe_forward(req).await {
980 0 : ForwardOutcome::Forwarded(res) => {
981 0 : return res;
982 : }
983 0 : ForwardOutcome::NotForwarded(req) => req,
984 0 : };
985 0 :
986 0 : let state = get_state(&req);
987 0 : let safekeepers = state.service.safekeepers_list().await?;
988 0 : json_response(StatusCode::OK, safekeepers)
989 0 : }
990 :
991 0 : async fn handle_metadata_health_update(req: Request<Body>) -> Result<Response<Body>, ApiError> {
992 0 : check_permissions(&req, Scope::Scrubber)?;
993 :
994 0 : let mut req = match maybe_forward(req).await {
995 0 : ForwardOutcome::Forwarded(res) => {
996 0 : return res;
997 : }
998 0 : ForwardOutcome::NotForwarded(req) => req,
999 : };
1000 :
1001 0 : let update_req = json_request::<MetadataHealthUpdateRequest>(&mut req).await?;
1002 0 : let state = get_state(&req);
1003 0 :
1004 0 : state.service.metadata_health_update(update_req).await?;
1005 :
1006 0 : json_response(StatusCode::OK, MetadataHealthUpdateResponse {})
1007 0 : }
1008 :
1009 0 : async fn handle_metadata_health_list_unhealthy(
1010 0 : req: Request<Body>,
1011 0 : ) -> Result<Response<Body>, ApiError> {
1012 0 : check_permissions(&req, Scope::Admin)?;
1013 :
1014 0 : let req = match maybe_forward(req).await {
1015 0 : ForwardOutcome::Forwarded(res) => {
1016 0 : return res;
1017 : }
1018 0 : ForwardOutcome::NotForwarded(req) => req,
1019 0 : };
1020 0 :
1021 0 : let state = get_state(&req);
1022 0 : let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
1023 :
1024 0 : json_response(
1025 0 : StatusCode::OK,
1026 0 : MetadataHealthListUnhealthyResponse {
1027 0 : unhealthy_tenant_shards,
1028 0 : },
1029 0 : )
1030 0 : }
1031 :
1032 0 : async fn handle_metadata_health_list_outdated(
1033 0 : req: Request<Body>,
1034 0 : ) -> Result<Response<Body>, ApiError> {
1035 0 : check_permissions(&req, Scope::Admin)?;
1036 :
1037 0 : let mut req = match maybe_forward(req).await {
1038 0 : ForwardOutcome::Forwarded(res) => {
1039 0 : return res;
1040 : }
1041 0 : ForwardOutcome::NotForwarded(req) => req,
1042 : };
1043 :
1044 0 : let list_outdated_req = json_request::<MetadataHealthListOutdatedRequest>(&mut req).await?;
1045 0 : let state = get_state(&req);
1046 0 : let health_records = state
1047 0 : .service
1048 0 : .metadata_health_list_outdated(list_outdated_req.not_scrubbed_for)
1049 0 : .await?;
1050 :
1051 0 : json_response(
1052 0 : StatusCode::OK,
1053 0 : MetadataHealthListOutdatedResponse { health_records },
1054 0 : )
1055 0 : }
1056 :
1057 0 : async fn handle_tenant_shard_split(
1058 0 : service: Arc<Service>,
1059 0 : req: Request<Body>,
1060 0 : ) -> Result<Response<Body>, ApiError> {
1061 0 : check_permissions(&req, Scope::Admin)?;
1062 : // NB: don't rate limit: admin operation.
1063 :
1064 0 : let mut req = match maybe_forward(req).await {
1065 0 : ForwardOutcome::Forwarded(res) => {
1066 0 : return res;
1067 : }
1068 0 : ForwardOutcome::NotForwarded(req) => req,
1069 : };
1070 :
1071 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1072 0 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
1073 :
1074 : json_response(
1075 : StatusCode::OK,
1076 0 : service.tenant_shard_split(tenant_id, split_req).await?,
1077 : )
1078 0 : }
1079 :
1080 0 : async fn handle_tenant_shard_migrate(
1081 0 : service: Arc<Service>,
1082 0 : req: Request<Body>,
1083 0 : ) -> Result<Response<Body>, ApiError> {
1084 0 : check_permissions(&req, Scope::Admin)?;
1085 : // NB: don't rate limit: admin operation.
1086 :
1087 0 : let mut req = match maybe_forward(req).await {
1088 0 : ForwardOutcome::Forwarded(res) => {
1089 0 : return res;
1090 : }
1091 0 : ForwardOutcome::NotForwarded(req) => req,
1092 : };
1093 :
1094 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1095 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1096 : json_response(
1097 : StatusCode::OK,
1098 0 : service
1099 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
1100 0 : .await?,
1101 : )
1102 0 : }
1103 :
1104 0 : async fn handle_tenant_shard_migrate_secondary(
1105 0 : service: Arc<Service>,
1106 0 : req: Request<Body>,
1107 0 : ) -> Result<Response<Body>, ApiError> {
1108 0 : check_permissions(&req, Scope::Admin)?;
1109 : // NB: don't rate limit: admin operation.
1110 :
1111 0 : let mut req = match maybe_forward(req).await {
1112 0 : ForwardOutcome::Forwarded(res) => {
1113 0 : return res;
1114 : }
1115 0 : ForwardOutcome::NotForwarded(req) => req,
1116 : };
1117 :
1118 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1119 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1120 : json_response(
1121 : StatusCode::OK,
1122 0 : service
1123 0 : .tenant_shard_migrate_secondary(tenant_shard_id, migrate_req)
1124 0 : .await?,
1125 : )
1126 0 : }
1127 :
1128 0 : async fn handle_tenant_shard_cancel_reconcile(
1129 0 : service: Arc<Service>,
1130 0 : req: Request<Body>,
1131 0 : ) -> Result<Response<Body>, ApiError> {
1132 0 : check_permissions(&req, Scope::Admin)?;
1133 : // NB: don't rate limit: admin operation.
1134 :
1135 0 : let req = match maybe_forward(req).await {
1136 0 : ForwardOutcome::Forwarded(res) => {
1137 0 : return res;
1138 : }
1139 0 : ForwardOutcome::NotForwarded(req) => req,
1140 : };
1141 :
1142 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1143 : json_response(
1144 : StatusCode::OK,
1145 0 : service
1146 0 : .tenant_shard_cancel_reconcile(tenant_shard_id)
1147 0 : .await?,
1148 : )
1149 0 : }
1150 :
1151 0 : async fn handle_tenant_update_policy(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1152 0 : check_permissions(&req, Scope::Admin)?;
1153 : // NB: don't rate limit: admin operation.
1154 :
1155 0 : let mut req = match maybe_forward(req).await {
1156 0 : ForwardOutcome::Forwarded(res) => {
1157 0 : return res;
1158 : }
1159 0 : ForwardOutcome::NotForwarded(req) => req,
1160 : };
1161 :
1162 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1163 0 : let update_req = json_request::<TenantPolicyRequest>(&mut req).await?;
1164 0 : let state = get_state(&req);
1165 0 :
1166 0 : json_response(
1167 0 : StatusCode::OK,
1168 0 : state
1169 0 : .service
1170 0 : .tenant_update_policy(tenant_id, update_req)
1171 0 : .await?,
1172 : )
1173 0 : }
1174 :
1175 0 : async fn handle_update_preferred_azs(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1176 0 : check_permissions(&req, Scope::Admin)?;
1177 :
1178 0 : let mut req = match maybe_forward(req).await {
1179 0 : ForwardOutcome::Forwarded(res) => {
1180 0 : return res;
1181 : }
1182 0 : ForwardOutcome::NotForwarded(req) => req,
1183 : };
1184 :
1185 0 : let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
1186 0 : let state = get_state(&req);
1187 0 :
1188 0 : json_response(
1189 0 : StatusCode::OK,
1190 0 : state.service.update_shards_preferred_azs(azs_req).await?,
1191 : )
1192 0 : }
1193 :
1194 0 : async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1195 0 : check_permissions(&req, Scope::ControllerPeer)?;
1196 :
1197 0 : let req = match maybe_forward(req).await {
1198 0 : ForwardOutcome::Forwarded(res) => {
1199 0 : return res;
1200 : }
1201 0 : ForwardOutcome::NotForwarded(req) => req,
1202 0 : };
1203 0 :
1204 0 : let state = get_state(&req);
1205 0 : json_response(StatusCode::OK, state.service.step_down().await)
1206 0 : }
1207 :
1208 0 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1209 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1210 0 : check_permissions(&req, Scope::PageServerApi)?;
1211 0 : maybe_rate_limit(&req, tenant_id).await;
1212 :
1213 0 : let req = match maybe_forward(req).await {
1214 0 : ForwardOutcome::Forwarded(res) => {
1215 0 : return res;
1216 : }
1217 0 : ForwardOutcome::NotForwarded(req) => req,
1218 0 : };
1219 0 :
1220 0 : let state = get_state(&req);
1221 0 :
1222 0 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
1223 0 : }
1224 :
1225 0 : async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1226 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1227 0 : check_permissions(&req, Scope::PageServerApi)?;
1228 0 : maybe_rate_limit(&req, tenant_id).await;
1229 :
1230 0 : let req = match maybe_forward(req).await {
1231 0 : ForwardOutcome::Forwarded(res) => {
1232 0 : return res;
1233 : }
1234 0 : ForwardOutcome::NotForwarded(req) => req,
1235 0 : };
1236 0 :
1237 0 : let state = get_state(&req);
1238 0 :
1239 0 : json_response(
1240 0 : StatusCode::OK,
1241 0 : state.service.tenant_import(tenant_id).await?,
1242 : )
1243 0 : }
1244 :
1245 0 : async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1246 0 : check_permissions(&req, Scope::Admin)?;
1247 :
1248 0 : let req = match maybe_forward(req).await {
1249 0 : ForwardOutcome::Forwarded(res) => {
1250 0 : return res;
1251 : }
1252 0 : ForwardOutcome::NotForwarded(req) => req,
1253 0 : };
1254 0 :
1255 0 : let state = get_state(&req);
1256 0 : state.service.tenants_dump()
1257 0 : }
1258 :
1259 0 : async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1260 0 : check_permissions(&req, Scope::Admin)?;
1261 :
1262 0 : let req = match maybe_forward(req).await {
1263 0 : ForwardOutcome::Forwarded(res) => {
1264 0 : return res;
1265 : }
1266 0 : ForwardOutcome::NotForwarded(req) => req,
1267 0 : };
1268 0 :
1269 0 : let state = get_state(&req);
1270 0 : state.service.scheduler_dump()
1271 0 : }
1272 :
1273 0 : async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1274 0 : check_permissions(&req, Scope::Admin)?;
1275 :
1276 0 : let req = match maybe_forward(req).await {
1277 0 : ForwardOutcome::Forwarded(res) => {
1278 0 : return res;
1279 : }
1280 0 : ForwardOutcome::NotForwarded(req) => req,
1281 0 : };
1282 0 :
1283 0 : let state = get_state(&req);
1284 0 :
1285 0 : json_response(StatusCode::OK, state.service.consistency_check().await?)
1286 0 : }
1287 :
1288 0 : async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1289 0 : check_permissions(&req, Scope::Admin)?;
1290 :
1291 0 : let req = match maybe_forward(req).await {
1292 0 : ForwardOutcome::Forwarded(res) => {
1293 0 : return res;
1294 : }
1295 0 : ForwardOutcome::NotForwarded(req) => req,
1296 0 : };
1297 0 :
1298 0 : let state = get_state(&req);
1299 0 :
1300 0 : json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
1301 0 : }
1302 :
1303 : /// Status endpoint is just used for checking that our HTTP listener is up
1304 0 : async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1305 0 : match maybe_forward(req).await {
1306 0 : ForwardOutcome::Forwarded(res) => {
1307 0 : return res;
1308 : }
1309 0 : ForwardOutcome::NotForwarded(_req) => {}
1310 0 : };
1311 0 :
1312 0 : json_response(StatusCode::OK, ())
1313 0 : }
1314 :
1315 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
1316 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
1317 0 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1318 0 : let req = match maybe_forward(req).await {
1319 0 : ForwardOutcome::Forwarded(res) => {
1320 0 : return res;
1321 : }
1322 0 : ForwardOutcome::NotForwarded(req) => req,
1323 0 : };
1324 0 :
1325 0 : let state = get_state(&req);
1326 0 : if state.service.startup_complete.is_ready() {
1327 0 : json_response(StatusCode::OK, ())
1328 : } else {
1329 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
1330 : }
1331 0 : }
1332 :
1333 : impl From<ReconcileError> for ApiError {
1334 0 : fn from(value: ReconcileError) -> Self {
1335 0 : ApiError::Conflict(format!("Reconciliation error: {}", value))
1336 0 : }
1337 : }
1338 :
1339 : /// Return the safekeeper record by instance id, or 404.
1340 : ///
1341 : /// Not used by anything except manual testing.
1342 0 : async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1343 0 : check_permissions(&req, Scope::Infra)?;
1344 :
1345 0 : let id = parse_request_param::<i64>(&req, "id")?;
1346 :
1347 0 : let req = match maybe_forward(req).await {
1348 0 : ForwardOutcome::Forwarded(res) => {
1349 0 : return res;
1350 : }
1351 0 : ForwardOutcome::NotForwarded(req) => req,
1352 0 : };
1353 0 :
1354 0 : let state = get_state(&req);
1355 :
1356 0 : let res = state.service.get_safekeeper(id).await;
1357 :
1358 0 : match res {
1359 0 : Ok(b) => json_response(StatusCode::OK, b),
1360 : Err(crate::persistence::DatabaseError::Query(diesel::result::Error::NotFound)) => {
1361 0 : Err(ApiError::NotFound("unknown instance id".into()))
1362 : }
1363 0 : Err(other) => Err(other.into()),
1364 : }
1365 0 : }
1366 :
1367 : /// Used as part of deployment scripts.
1368 : ///
1369 : /// Assumes information is only relayed to storage controller after first selecting an unique id on
1370 : /// control plane database, which means we have an id field in the request and payload.
1371 0 : async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
1372 0 : check_permissions(&req, Scope::Infra)?;
1373 :
1374 0 : let body = json_request::<SafekeeperUpsert>(&mut req).await?;
1375 0 : let id = parse_request_param::<i64>(&req, "id")?;
1376 :
1377 0 : if id != body.id {
1378 : // it should be repeated
1379 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1380 0 : "id mismatch: url={id:?}, body={:?}",
1381 0 : body.id
1382 0 : )));
1383 0 : }
1384 :
1385 0 : let req = match maybe_forward(req).await {
1386 0 : ForwardOutcome::Forwarded(res) => {
1387 0 : return res;
1388 : }
1389 0 : ForwardOutcome::NotForwarded(req) => req,
1390 0 : };
1391 0 :
1392 0 : let state = get_state(&req);
1393 0 :
1394 0 : state.service.upsert_safekeeper(body).await?;
1395 :
1396 0 : Ok(Response::builder()
1397 0 : .status(StatusCode::NO_CONTENT)
1398 0 : .body(Body::empty())
1399 0 : .unwrap())
1400 0 : }
1401 :
1402 : /// Sets the scheduling policy of the specified safekeeper
1403 0 : async fn handle_safekeeper_scheduling_policy(
1404 0 : mut req: Request<Body>,
1405 0 : ) -> Result<Response<Body>, ApiError> {
1406 0 : check_permissions(&req, Scope::Admin)?;
1407 :
1408 0 : let body = json_request::<SafekeeperSchedulingPolicyRequest>(&mut req).await?;
1409 0 : let id = parse_request_param::<i64>(&req, "id")?;
1410 :
1411 0 : let req = match maybe_forward(req).await {
1412 0 : ForwardOutcome::Forwarded(res) => {
1413 0 : return res;
1414 : }
1415 0 : ForwardOutcome::NotForwarded(req) => req,
1416 0 : };
1417 0 :
1418 0 : let state = get_state(&req);
1419 0 :
1420 0 : state
1421 0 : .service
1422 0 : .set_safekeeper_scheduling_policy(id, body.scheduling_policy)
1423 0 : .await?;
1424 :
1425 0 : json_response(StatusCode::OK, ())
1426 0 : }
1427 :
1428 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
1429 : /// be allowed to run if Service has finished its initial reconciliation.
1430 0 : async fn tenant_service_handler<R, H>(
1431 0 : request: Request<Body>,
1432 0 : handler: H,
1433 0 : request_name: RequestName,
1434 0 : ) -> R::Output
1435 0 : where
1436 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1437 0 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
1438 0 : {
1439 0 : let state = get_state(&request);
1440 0 : let service = state.service.clone();
1441 0 :
1442 0 : let startup_complete = service.startup_complete.clone();
1443 0 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
1444 0 : .await
1445 0 : .is_err()
1446 : {
1447 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
1448 : // timeouts around its remote calls, to bound its runtime.
1449 0 : return Err(ApiError::Timeout(
1450 0 : "Timed out waiting for service readiness".into(),
1451 0 : ));
1452 0 : }
1453 0 :
1454 0 : named_request_span(
1455 0 : request,
1456 0 : |request| async move { handler(service, request).await },
1457 0 : request_name,
1458 0 : )
1459 0 : .await
1460 0 : }
1461 :
1462 : /// Check if the required scope is held in the request's token, or if the request has
1463 : /// a token with 'admin' scope then always permit it.
1464 0 : fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(), ApiError> {
1465 0 : check_permission_with(request, |claims| {
1466 0 : match crate::auth::check_permission(claims, required_scope) {
1467 0 : Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
1468 0 : Ok(()) => Ok(()),
1469 0 : Err(_) => Err(e),
1470 : },
1471 0 : Ok(()) => Ok(()),
1472 : }
1473 0 : })
1474 0 : }
1475 :
1476 : #[derive(Clone, Debug)]
1477 : struct RequestMeta {
1478 : method: hyper::http::Method,
1479 : at: Instant,
1480 : }
1481 :
1482 0 : pub fn prologue_leadership_status_check_middleware<
1483 0 : B: hyper::body::HttpBody + Send + Sync + 'static,
1484 0 : >() -> Middleware<B, ApiError> {
1485 0 : Middleware::pre(move |req| async move {
1486 0 : let state = get_state(&req);
1487 0 : let leadership_status = state.service.get_leadership_status();
1488 :
1489 : enum AllowedRoutes {
1490 : All,
1491 : Some(&'static [&'static str]),
1492 : }
1493 :
1494 0 : let allowed_routes = match leadership_status {
1495 0 : LeadershipStatus::Leader => AllowedRoutes::All,
1496 0 : LeadershipStatus::SteppedDown => AllowedRoutes::All,
1497 0 : LeadershipStatus::Candidate => AllowedRoutes::Some(&[
1498 0 : "/ready",
1499 0 : "/status",
1500 0 : "/metrics",
1501 0 : "/profile/cpu",
1502 0 : "/profile/heap",
1503 0 : ]),
1504 : };
1505 :
1506 0 : match allowed_routes {
1507 0 : AllowedRoutes::All => Ok(req),
1508 0 : AllowedRoutes::Some(allowed) if allowed.contains(&req.uri().path()) => Ok(req),
1509 : _ => {
1510 0 : tracing::info!(
1511 0 : "Request {} not allowed due to current leadership state",
1512 0 : req.uri()
1513 : );
1514 :
1515 0 : Err(ApiError::ResourceUnavailable(
1516 0 : format!("Current leadership status is {leadership_status}").into(),
1517 0 : ))
1518 : }
1519 : }
1520 0 : })
1521 0 : }
1522 :
1523 0 : fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
1524 0 : -> Middleware<B, ApiError> {
1525 0 : Middleware::pre(move |req| async move {
1526 0 : let meta = RequestMeta {
1527 0 : method: req.method().clone(),
1528 0 : at: Instant::now(),
1529 0 : };
1530 0 :
1531 0 : req.set_context(meta);
1532 0 :
1533 0 : Ok(req)
1534 0 : })
1535 0 : }
1536 :
1537 0 : fn epilogue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
1538 0 : -> Middleware<B, ApiError> {
1539 0 : Middleware::post_with_info(move |resp, req_info| async move {
1540 0 : let request_name = match req_info.context::<RequestName>() {
1541 0 : Some(name) => name,
1542 : None => {
1543 0 : return Ok(resp);
1544 : }
1545 : };
1546 :
1547 0 : if let Some(meta) = req_info.context::<RequestMeta>() {
1548 0 : let status = &crate::metrics::METRICS_REGISTRY
1549 0 : .metrics_group
1550 0 : .storage_controller_http_request_status;
1551 0 : let latency = &crate::metrics::METRICS_REGISTRY
1552 0 : .metrics_group
1553 0 : .storage_controller_http_request_latency;
1554 0 :
1555 0 : status.inc(HttpRequestStatusLabelGroup {
1556 0 : path: request_name.0,
1557 0 : method: meta.method.clone().into(),
1558 0 : status: crate::metrics::StatusCode(resp.status()),
1559 0 : });
1560 0 :
1561 0 : latency.observe(
1562 0 : HttpRequestLatencyLabelGroup {
1563 0 : path: request_name.0,
1564 0 : method: meta.method.into(),
1565 0 : },
1566 0 : meta.at.elapsed().as_secs_f64(),
1567 0 : );
1568 0 : }
1569 0 : Ok(resp)
1570 0 : })
1571 0 : }
1572 :
1573 0 : pub async fn measured_metrics_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1574 : pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
1575 :
1576 0 : let req = match maybe_forward(req).await {
1577 0 : ForwardOutcome::Forwarded(res) => {
1578 0 : return res;
1579 : }
1580 0 : ForwardOutcome::NotForwarded(req) => req,
1581 0 : };
1582 0 :
1583 0 : let state = get_state(&req);
1584 0 : let payload = crate::metrics::METRICS_REGISTRY.encode(&state.neon_metrics);
1585 0 : let response = Response::builder()
1586 0 : .status(200)
1587 0 : .header(CONTENT_TYPE, TEXT_FORMAT)
1588 0 : .body(payload.into())
1589 0 : .unwrap();
1590 0 :
1591 0 : Ok(response)
1592 0 : }
1593 :
1594 : #[derive(Clone)]
1595 : struct RequestName(&'static str);
1596 :
1597 0 : async fn named_request_span<R, H>(
1598 0 : request: Request<Body>,
1599 0 : handler: H,
1600 0 : name: RequestName,
1601 0 : ) -> R::Output
1602 0 : where
1603 0 : R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1604 0 : H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
1605 0 : {
1606 0 : request.set_context(name);
1607 0 : request_span(request, handler).await
1608 0 : }
1609 :
1610 : enum ForwardOutcome {
1611 : Forwarded(Result<Response<Body>, ApiError>),
1612 : NotForwarded(Request<Body>),
1613 : }
1614 :
1615 : /// Potentially forward the request to the current storage controler leader.
1616 : /// More specifically we forward when:
1617 : /// 1. Request is not one of:
1618 : /// ["/control/v1/step_down", "/status", "/ready", "/metrics", "/profile/cpu", "/profile/heap"]
1619 : /// 2. Current instance is in [`LeadershipStatus::SteppedDown`] state
1620 : /// 3. There is a leader in the database to forward to
1621 : /// 4. Leader from step (3) is not the current instance
1622 : ///
1623 : /// Why forward?
1624 : /// It turns out that we can't rely on external orchestration to promptly route trafic to the
1625 : /// new leader. This is downtime inducing. Forwarding provides a safe way out.
1626 : ///
1627 : /// Why is it safe?
1628 : /// If a storcon instance is persisted in the database, then we know that it is the current leader.
1629 : /// There's one exception: time between handling step-down request and the new leader updating the
1630 : /// database.
1631 : ///
1632 : /// Let's treat the happy case first. The stepped down node does not produce any side effects,
1633 : /// since all request handling happens on the leader.
1634 : ///
1635 : /// As for the edge case, we are guaranteed to always have a maximum of two running instances.
1636 : /// Hence, if we are in the edge case scenario the leader persisted in the database is the
1637 : /// stepped down instance that received the request. Condition (4) above covers this scenario.
1638 0 : async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
1639 : const NOT_FOR_FORWARD: &[&str] = &[
1640 : "/control/v1/step_down",
1641 : "/status",
1642 : "/ready",
1643 : "/metrics",
1644 : "/profile/cpu",
1645 : "/profile/heap",
1646 : ];
1647 :
1648 0 : let uri = req.uri();
1649 0 : let uri_for_forward = !NOT_FOR_FORWARD.contains(&uri.path());
1650 0 :
1651 0 : // Fast return before trying to take any Service locks, if we will never forward anyway
1652 0 : if !uri_for_forward {
1653 0 : return ForwardOutcome::NotForwarded(req);
1654 0 : }
1655 0 :
1656 0 : let state = get_state(&req);
1657 0 : let leadership_status = state.service.get_leadership_status();
1658 0 :
1659 0 : if leadership_status != LeadershipStatus::SteppedDown {
1660 0 : return ForwardOutcome::NotForwarded(req);
1661 0 : }
1662 :
1663 0 : let leader = state.service.get_leader().await;
1664 0 : let leader = {
1665 0 : match leader {
1666 0 : Ok(Some(leader)) => leader,
1667 : Ok(None) => {
1668 0 : return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
1669 0 : "No leader to forward to while in stepped down state".into(),
1670 0 : )));
1671 : }
1672 0 : Err(err) => {
1673 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
1674 0 : anyhow::anyhow!(
1675 0 : "Failed to get leader for forwarding while in stepped down state: {err}"
1676 0 : ),
1677 0 : )));
1678 : }
1679 : }
1680 : };
1681 :
1682 0 : let cfg = state.service.get_config();
1683 0 : if let Some(ref self_addr) = cfg.address_for_peers {
1684 0 : let leader_addr = match Uri::from_str(leader.address.as_str()) {
1685 0 : Ok(uri) => uri,
1686 0 : Err(err) => {
1687 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
1688 0 : anyhow::anyhow!(
1689 0 : "Failed to parse leader uri for forwarding while in stepped down state: {err}"
1690 0 : ),
1691 0 : )));
1692 : }
1693 : };
1694 :
1695 0 : if *self_addr == leader_addr {
1696 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1697 0 : "Leader is stepped down instance"
1698 0 : ))));
1699 0 : }
1700 0 : }
1701 :
1702 0 : tracing::info!("Forwarding {} to leader at {}", uri, leader.address);
1703 :
1704 : // Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
1705 : // include some leeway to get the timeout for proxied requests.
1706 : const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
1707 0 : let client = reqwest::ClientBuilder::new()
1708 0 : .timeout(PROXIED_REQUEST_TIMEOUT)
1709 0 : .build();
1710 0 : let client = match client {
1711 0 : Ok(client) => client,
1712 0 : Err(err) => {
1713 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1714 0 : "Failed to build leader client for forwarding while in stepped down state: {err}"
1715 0 : ))));
1716 : }
1717 : };
1718 :
1719 0 : let request: reqwest::Request = match convert_request(req, &client, leader.address).await {
1720 0 : Ok(r) => r,
1721 0 : Err(err) => {
1722 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1723 0 : "Failed to convert request for forwarding while in stepped down state: {err}"
1724 0 : ))));
1725 : }
1726 : };
1727 :
1728 0 : let response = match client.execute(request).await {
1729 0 : Ok(r) => r,
1730 0 : Err(err) => {
1731 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1732 0 : "Failed to forward while in stepped down state: {err}"
1733 0 : ))));
1734 : }
1735 : };
1736 :
1737 0 : ForwardOutcome::Forwarded(convert_response(response).await)
1738 0 : }
1739 :
1740 : /// Convert a [`reqwest::Response`] to a [hyper::Response`] by passing through
1741 : /// a stable representation (string, bytes or integer)
1742 : ///
1743 : /// Ideally, we would not have to do this since both types use the http crate
1744 : /// under the hood. However, they use different versions of the crate and keeping
1745 : /// second order dependencies in sync is difficult.
1746 0 : async fn convert_response(resp: reqwest::Response) -> Result<hyper::Response<Body>, ApiError> {
1747 : use std::str::FromStr;
1748 :
1749 0 : let mut builder = hyper::Response::builder().status(resp.status().as_u16());
1750 0 : for (key, value) in resp.headers().into_iter() {
1751 0 : let key = hyper::header::HeaderName::from_str(key.as_str()).map_err(|err| {
1752 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1753 0 : })?;
1754 :
1755 0 : let value = hyper::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
1756 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1757 0 : })?;
1758 :
1759 0 : builder = builder.header(key, value);
1760 : }
1761 :
1762 0 : let body = http::Body::wrap_stream(resp.bytes_stream());
1763 0 :
1764 0 : builder.body(body).map_err(|err| {
1765 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1766 0 : })
1767 0 : }
1768 :
1769 : /// Convert a [`reqwest::Request`] to a [hyper::Request`] by passing through
1770 : /// a stable representation (string, bytes or integer)
1771 : ///
1772 : /// See [`convert_response`] for why we are doing it this way.
1773 0 : async fn convert_request(
1774 0 : req: hyper::Request<Body>,
1775 0 : client: &reqwest::Client,
1776 0 : to_address: String,
1777 0 : ) -> Result<reqwest::Request, ApiError> {
1778 : use std::str::FromStr;
1779 :
1780 0 : let (parts, body) = req.into_parts();
1781 0 : let method = reqwest::Method::from_str(parts.method.as_str()).map_err(|err| {
1782 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1783 0 : })?;
1784 :
1785 0 : let path_and_query = parts.uri.path_and_query().ok_or_else(|| {
1786 0 : ApiError::InternalServerError(anyhow::anyhow!(
1787 0 : "Request conversion failed: no path and query"
1788 0 : ))
1789 0 : })?;
1790 :
1791 0 : let uri = reqwest::Url::from_str(
1792 0 : format!(
1793 0 : "{}{}",
1794 0 : to_address.trim_end_matches("/"),
1795 0 : path_and_query.as_str()
1796 0 : )
1797 0 : .as_str(),
1798 0 : )
1799 0 : .map_err(|err| {
1800 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1801 0 : })?;
1802 :
1803 0 : let mut headers = reqwest::header::HeaderMap::new();
1804 0 : for (key, value) in parts.headers.into_iter() {
1805 0 : let key = match key {
1806 0 : Some(k) => k,
1807 : None => {
1808 0 : continue;
1809 : }
1810 : };
1811 :
1812 0 : let key = reqwest::header::HeaderName::from_str(key.as_str()).map_err(|err| {
1813 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1814 0 : })?;
1815 :
1816 0 : let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
1817 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1818 0 : })?;
1819 :
1820 0 : headers.insert(key, value);
1821 : }
1822 :
1823 0 : let body = hyper::body::to_bytes(body).await.map_err(|err| {
1824 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1825 0 : })?;
1826 :
1827 0 : client
1828 0 : .request(method, uri)
1829 0 : .headers(headers)
1830 0 : .body(body)
1831 0 : .build()
1832 0 : .map_err(|err| {
1833 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1834 0 : })
1835 0 : }
1836 :
1837 0 : pub fn make_router(
1838 0 : service: Arc<Service>,
1839 0 : auth: Option<Arc<SwappableJwtAuth>>,
1840 0 : build_info: BuildInfo,
1841 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
1842 0 : let mut router = endpoint::make_router()
1843 0 : .middleware(prologue_leadership_status_check_middleware())
1844 0 : .middleware(prologue_metrics_middleware())
1845 0 : .middleware(epilogue_metrics_middleware());
1846 0 : if auth.is_some() {
1847 0 : router = router.middleware(auth_middleware(|request| {
1848 0 : let state = get_state(request);
1849 0 : if state.allowlist_routes.contains(&request.uri().path()) {
1850 0 : None
1851 : } else {
1852 0 : state.auth.as_deref()
1853 : }
1854 0 : }));
1855 0 : }
1856 :
1857 0 : router
1858 0 : .data(Arc::new(HttpState::new(service, auth, build_info)))
1859 0 : .get("/metrics", |r| {
1860 0 : named_request_span(r, measured_metrics_handler, RequestName("metrics"))
1861 0 : })
1862 0 : // Non-prefixed generic endpoints (status, metrics, profiling)
1863 0 : .get("/status", |r| {
1864 0 : named_request_span(r, handle_status, RequestName("status"))
1865 0 : })
1866 0 : .get("/ready", |r| {
1867 0 : named_request_span(r, handle_ready, RequestName("ready"))
1868 0 : })
1869 0 : .get("/profile/cpu", |r| {
1870 0 : named_request_span(r, profile_cpu_handler, RequestName("profile_cpu"))
1871 0 : })
1872 0 : .get("/profile/heap", |r| {
1873 0 : named_request_span(r, profile_heap_handler, RequestName("profile_heap"))
1874 0 : })
1875 0 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
1876 0 : .post("/upcall/v1/re-attach", |r| {
1877 0 : named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
1878 0 : })
1879 0 : .post("/upcall/v1/validate", |r| {
1880 0 : named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
1881 0 : })
1882 0 : // Test/dev/debug endpoints
1883 0 : .post("/debug/v1/attach-hook", |r| {
1884 0 : named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
1885 0 : })
1886 0 : .post("/debug/v1/inspect", |r| {
1887 0 : named_request_span(r, handle_inspect, RequestName("debug_v1_inspect"))
1888 0 : })
1889 0 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
1890 0 : named_request_span(r, handle_tenant_drop, RequestName("debug_v1_tenant_drop"))
1891 0 : })
1892 0 : .post("/debug/v1/node/:node_id/drop", |r| {
1893 0 : named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
1894 0 : })
1895 0 : .post("/debug/v1/tenant/:tenant_id/import", |r| {
1896 0 : named_request_span(
1897 0 : r,
1898 0 : handle_tenant_import,
1899 0 : RequestName("debug_v1_tenant_import"),
1900 0 : )
1901 0 : })
1902 0 : .get("/debug/v1/tenant", |r| {
1903 0 : named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
1904 0 : })
1905 0 : .get("/debug/v1/tenant/:tenant_id/locate", |r| {
1906 0 : tenant_service_handler(
1907 0 : r,
1908 0 : handle_tenant_locate,
1909 0 : RequestName("debug_v1_tenant_locate"),
1910 0 : )
1911 0 : })
1912 0 : .get("/debug/v1/scheduler", |r| {
1913 0 : named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
1914 0 : })
1915 0 : .post("/debug/v1/consistency_check", |r| {
1916 0 : named_request_span(
1917 0 : r,
1918 0 : handle_consistency_check,
1919 0 : RequestName("debug_v1_consistency_check"),
1920 0 : )
1921 0 : })
1922 0 : .post("/debug/v1/reconcile_all", |r| {
1923 0 : request_span(r, handle_reconcile_all)
1924 0 : })
1925 0 : .put("/debug/v1/failpoints", |r| {
1926 0 : request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
1927 0 : })
1928 0 : // Node operations
1929 0 : .post("/control/v1/node", |r| {
1930 0 : named_request_span(r, handle_node_register, RequestName("control_v1_node"))
1931 0 : })
1932 0 : .delete("/control/v1/node/:node_id", |r| {
1933 0 : named_request_span(r, handle_node_delete, RequestName("control_v1_node_delete"))
1934 0 : })
1935 0 : .get("/control/v1/node", |r| {
1936 0 : named_request_span(r, handle_node_list, RequestName("control_v1_node"))
1937 0 : })
1938 0 : .put("/control/v1/node/:node_id/config", |r| {
1939 0 : named_request_span(
1940 0 : r,
1941 0 : handle_node_configure,
1942 0 : RequestName("control_v1_node_config"),
1943 0 : )
1944 0 : })
1945 0 : .get("/control/v1/node/:node_id", |r| {
1946 0 : named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
1947 0 : })
1948 0 : .get("/control/v1/node/:node_id/shards", |r| {
1949 0 : named_request_span(
1950 0 : r,
1951 0 : handle_node_shards,
1952 0 : RequestName("control_v1_node_describe"),
1953 0 : )
1954 0 : })
1955 0 : .get("/control/v1/leader", |r| {
1956 0 : named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
1957 0 : })
1958 0 : .put("/control/v1/node/:node_id/drain", |r| {
1959 0 : named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
1960 0 : })
1961 0 : .delete("/control/v1/node/:node_id/drain", |r| {
1962 0 : named_request_span(
1963 0 : r,
1964 0 : handle_cancel_node_drain,
1965 0 : RequestName("control_v1_cancel_node_drain"),
1966 0 : )
1967 0 : })
1968 0 : .put("/control/v1/node/:node_id/fill", |r| {
1969 0 : named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
1970 0 : })
1971 0 : .delete("/control/v1/node/:node_id/fill", |r| {
1972 0 : named_request_span(
1973 0 : r,
1974 0 : handle_cancel_node_fill,
1975 0 : RequestName("control_v1_cancel_node_fill"),
1976 0 : )
1977 0 : })
1978 0 : // Metadata health operations
1979 0 : .post("/control/v1/metadata_health/update", |r| {
1980 0 : named_request_span(
1981 0 : r,
1982 0 : handle_metadata_health_update,
1983 0 : RequestName("control_v1_metadata_health_update"),
1984 0 : )
1985 0 : })
1986 0 : .get("/control/v1/metadata_health/unhealthy", |r| {
1987 0 : named_request_span(
1988 0 : r,
1989 0 : handle_metadata_health_list_unhealthy,
1990 0 : RequestName("control_v1_metadata_health_list_unhealthy"),
1991 0 : )
1992 0 : })
1993 0 : .post("/control/v1/metadata_health/outdated", |r| {
1994 0 : named_request_span(
1995 0 : r,
1996 0 : handle_metadata_health_list_outdated,
1997 0 : RequestName("control_v1_metadata_health_list_outdated"),
1998 0 : )
1999 0 : })
2000 0 : // Safekeepers
2001 0 : .get("/control/v1/safekeeper", |r| {
2002 0 : named_request_span(
2003 0 : r,
2004 0 : handle_safekeeper_list,
2005 0 : RequestName("control_v1_safekeeper_list"),
2006 0 : )
2007 0 : })
2008 0 : .get("/control/v1/safekeeper/:id", |r| {
2009 0 : named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
2010 0 : })
2011 0 : .post("/control/v1/safekeeper/:id", |r| {
2012 0 : // id is in the body
2013 0 : named_request_span(
2014 0 : r,
2015 0 : handle_upsert_safekeeper,
2016 0 : RequestName("v1_safekeeper_post"),
2017 0 : )
2018 0 : })
2019 0 : .post("/control/v1/safekeeper/:id/scheduling_policy", |r| {
2020 0 : named_request_span(
2021 0 : r,
2022 0 : handle_safekeeper_scheduling_policy,
2023 0 : RequestName("v1_safekeeper_status"),
2024 0 : )
2025 0 : })
2026 0 : // Tenant Shard operations
2027 0 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
2028 0 : tenant_service_handler(
2029 0 : r,
2030 0 : handle_tenant_shard_migrate,
2031 0 : RequestName("control_v1_tenant_migrate"),
2032 0 : )
2033 0 : })
2034 0 : .put(
2035 0 : "/control/v1/tenant/:tenant_shard_id/migrate_secondary",
2036 0 : |r| {
2037 0 : tenant_service_handler(
2038 0 : r,
2039 0 : handle_tenant_shard_migrate_secondary,
2040 0 : RequestName("control_v1_tenant_migrate_secondary"),
2041 0 : )
2042 0 : },
2043 0 : )
2044 0 : .put(
2045 0 : "/control/v1/tenant/:tenant_shard_id/cancel_reconcile",
2046 0 : |r| {
2047 0 : tenant_service_handler(
2048 0 : r,
2049 0 : handle_tenant_shard_cancel_reconcile,
2050 0 : RequestName("control_v1_tenant_cancel_reconcile"),
2051 0 : )
2052 0 : },
2053 0 : )
2054 0 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
2055 0 : tenant_service_handler(
2056 0 : r,
2057 0 : handle_tenant_shard_split,
2058 0 : RequestName("control_v1_tenant_shard_split"),
2059 0 : )
2060 0 : })
2061 0 : .get("/control/v1/tenant/:tenant_id", |r| {
2062 0 : tenant_service_handler(
2063 0 : r,
2064 0 : handle_tenant_describe,
2065 0 : RequestName("control_v1_tenant_describe"),
2066 0 : )
2067 0 : })
2068 0 : .get("/control/v1/tenant", |r| {
2069 0 : tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list"))
2070 0 : })
2071 0 : .put("/control/v1/tenant/:tenant_id/policy", |r| {
2072 0 : named_request_span(
2073 0 : r,
2074 0 : handle_tenant_update_policy,
2075 0 : RequestName("control_v1_tenant_policy"),
2076 0 : )
2077 0 : })
2078 0 : .put("/control/v1/preferred_azs", |r| {
2079 0 : named_request_span(
2080 0 : r,
2081 0 : handle_update_preferred_azs,
2082 0 : RequestName("control_v1_preferred_azs"),
2083 0 : )
2084 0 : })
2085 0 : .put("/control/v1/step_down", |r| {
2086 0 : named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
2087 0 : })
2088 0 : // Tenant operations
2089 0 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
2090 0 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
2091 0 : .post("/v1/tenant", |r| {
2092 0 : tenant_service_handler(r, handle_tenant_create, RequestName("v1_tenant"))
2093 0 : })
2094 0 : .delete("/v1/tenant/:tenant_id", |r| {
2095 0 : tenant_service_handler(r, handle_tenant_delete, RequestName("v1_tenant"))
2096 0 : })
2097 0 : .patch("/v1/tenant/config", |r| {
2098 0 : tenant_service_handler(
2099 0 : r,
2100 0 : handle_tenant_config_patch,
2101 0 : RequestName("v1_tenant_config"),
2102 0 : )
2103 0 : })
2104 0 : .put("/v1/tenant/config", |r| {
2105 0 : tenant_service_handler(r, handle_tenant_config_set, RequestName("v1_tenant_config"))
2106 0 : })
2107 0 : .get("/v1/tenant/:tenant_id/config", |r| {
2108 0 : tenant_service_handler(r, handle_tenant_config_get, RequestName("v1_tenant_config"))
2109 0 : })
2110 0 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
2111 0 : tenant_service_handler(
2112 0 : r,
2113 0 : handle_tenant_location_config,
2114 0 : RequestName("v1_tenant_location_config"),
2115 0 : )
2116 0 : })
2117 0 : .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
2118 0 : tenant_service_handler(
2119 0 : r,
2120 0 : handle_tenant_time_travel_remote_storage,
2121 0 : RequestName("v1_tenant_time_travel_remote_storage"),
2122 0 : )
2123 0 : })
2124 0 : .post("/v1/tenant/:tenant_id/secondary/download", |r| {
2125 0 : tenant_service_handler(
2126 0 : r,
2127 0 : handle_tenant_secondary_download,
2128 0 : RequestName("v1_tenant_secondary_download"),
2129 0 : )
2130 0 : })
2131 0 : // Timeline operations
2132 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
2133 0 : tenant_service_handler(
2134 0 : r,
2135 0 : handle_tenant_timeline_delete,
2136 0 : RequestName("v1_tenant_timeline"),
2137 0 : )
2138 0 : })
2139 0 : .post("/v1/tenant/:tenant_id/timeline", |r| {
2140 0 : tenant_service_handler(
2141 0 : r,
2142 0 : handle_tenant_timeline_create,
2143 0 : RequestName("v1_tenant_timeline"),
2144 0 : )
2145 0 : })
2146 0 : .put(
2147 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/archival_config",
2148 0 : |r| {
2149 0 : tenant_service_handler(
2150 0 : r,
2151 0 : handle_tenant_timeline_archival_config,
2152 0 : RequestName("v1_tenant_timeline_archival_config"),
2153 0 : )
2154 0 : },
2155 0 : )
2156 0 : .put(
2157 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/detach_ancestor",
2158 0 : |r| {
2159 0 : tenant_service_handler(
2160 0 : r,
2161 0 : handle_tenant_timeline_detach_ancestor,
2162 0 : RequestName("v1_tenant_timeline_detach_ancestor"),
2163 0 : )
2164 0 : },
2165 0 : )
2166 0 : .post(
2167 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
2168 0 : |r| {
2169 0 : tenant_service_handler(
2170 0 : r,
2171 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
2172 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2173 0 : )
2174 0 : },
2175 0 : )
2176 0 : .post(
2177 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
2178 0 : |r| {
2179 0 : tenant_service_handler(
2180 0 : r,
2181 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
2182 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2183 0 : )
2184 0 : },
2185 0 : )
2186 0 : .post(
2187 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
2188 0 : |r| {
2189 0 : tenant_service_handler(
2190 0 : r,
2191 0 : handle_tenant_timeline_download_heatmap_layers,
2192 0 : RequestName("v1_tenant_timeline_download_heatmap_layers"),
2193 0 : )
2194 0 : },
2195 0 : )
2196 0 : // Tenant detail GET passthrough to shard zero:
2197 0 : .get("/v1/tenant/:tenant_id", |r| {
2198 0 : tenant_service_handler(
2199 0 : r,
2200 0 : handle_tenant_timeline_passthrough,
2201 0 : RequestName("v1_tenant_passthrough"),
2202 0 : )
2203 0 : })
2204 0 : // The `*` in the URL is a wildcard: any tenant/timeline GET APIs on the pageserver
2205 0 : // are implicitly exposed here. This must be last in the list to avoid
2206 0 : // taking precedence over other GET methods we might implement by hand.
2207 0 : .get("/v1/tenant/:tenant_id/*", |r| {
2208 0 : tenant_service_handler(
2209 0 : r,
2210 0 : handle_tenant_timeline_passthrough,
2211 0 : RequestName("v1_tenant_passthrough"),
2212 0 : )
2213 0 : })
2214 0 : }
2215 :
2216 : #[cfg(test)]
2217 : mod test {
2218 :
2219 : use super::path_without_ids;
2220 :
2221 : #[test]
2222 1 : fn test_path_without_ids() {
2223 1 : assert_eq!(
2224 1 : path_without_ids(
2225 1 : "/v1/tenant/1a2b3344556677881122334455667788/timeline/AA223344556677881122334455667788"
2226 1 : ),
2227 1 : "/v1/tenant//timeline/"
2228 1 : );
2229 1 : assert_eq!(
2230 1 : path_without_ids(
2231 1 : "/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788"
2232 1 : ),
2233 1 : "/v1/tenant//timeline/"
2234 1 : );
2235 1 : assert_eq!(
2236 1 : path_without_ids(
2237 1 : "/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788?parameter=foo"
2238 1 : ),
2239 1 : "/v1/tenant//timeline/"
2240 1 : );
2241 1 : }
2242 : }
|