Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::{Arc, LazyLock};
3 : use std::time::{Duration, Instant};
4 :
5 : use anyhow::Context;
6 : use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
7 : use futures::Future;
8 : use http_utils::endpoint::{
9 : self, auth_middleware, check_permission_with, profile_cpu_handler, profile_heap_handler,
10 : request_span,
11 : };
12 : use http_utils::error::ApiError;
13 : use http_utils::failpoints::failpoints_handler;
14 : use http_utils::json::{json_request, json_response};
15 : use http_utils::request::{must_get_query_param, parse_query_param, parse_request_param};
16 : use http_utils::{RequestExt, RouterBuilder};
17 : use hyper::header::CONTENT_TYPE;
18 : use hyper::{Body, Request, Response, StatusCode, Uri};
19 : use metrics::{BuildInfo, NeonMetrics};
20 : use pageserver_api::controller_api::{
21 : MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
22 : MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
23 : NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest,
24 : ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
25 : TimelineImportRequest,
26 : };
27 : use pageserver_api::models::{
28 : DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
29 : TenantLocationConfigRequest, TenantShardSplitRequest, TenantTimeTravelRequest,
30 : TimelineArchivalConfigRequest, TimelineCreateRequest,
31 : };
32 : use pageserver_api::shard::TenantShardId;
33 : use pageserver_api::upcall_api::{
34 : PutTimelineImportStatusRequest, ReAttachRequest, TimelineImportStatusRequest, ValidateRequest,
35 : };
36 : use pageserver_client::{BlockUnblock, mgmt_api};
37 : use routerify::Middleware;
38 : use tokio_util::sync::CancellationToken;
39 : use tracing::warn;
40 : use utils::auth::{Scope, SwappableJwtAuth};
41 : use utils::id::{NodeId, TenantId, TimelineId};
42 :
43 : use crate::http;
44 : use crate::metrics::{
45 : HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, METRICS_REGISTRY,
46 : PageserverRequestLabelGroup,
47 : };
48 : use crate::persistence::SafekeeperUpsert;
49 : use crate::reconciler::ReconcileError;
50 : use crate::service::{LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service};
51 :
52 : /// State available to HTTP request handlers
53 : pub struct HttpState {
54 : service: Arc<crate::service::Service>,
55 : auth: Option<Arc<SwappableJwtAuth>>,
56 : rate_limiter: governor::DefaultKeyedRateLimiter<TenantId>,
57 : neon_metrics: NeonMetrics,
58 : allowlist_routes: &'static [&'static str],
59 : }
60 :
61 : impl HttpState {
62 0 : pub fn new(
63 0 : service: Arc<crate::service::Service>,
64 0 : auth: Option<Arc<SwappableJwtAuth>>,
65 0 : build_info: BuildInfo,
66 0 : ) -> Self {
67 0 : let quota = governor::Quota::per_second(service.get_config().tenant_rate_limit);
68 0 : Self {
69 0 : service,
70 0 : auth,
71 0 : rate_limiter: governor::RateLimiter::keyed(quota),
72 0 : neon_metrics: NeonMetrics::new(build_info),
73 0 : allowlist_routes: &[
74 0 : "/status",
75 0 : "/live",
76 0 : "/ready",
77 0 : "/metrics",
78 0 : "/profile/cpu",
79 0 : "/profile/heap",
80 0 : ],
81 0 : }
82 0 : }
83 : }
84 :
85 : #[inline(always)]
86 0 : fn get_state(request: &Request<Body>) -> &HttpState {
87 0 : request
88 0 : .data::<Arc<HttpState>>()
89 0 : .expect("unknown state type")
90 0 : .as_ref()
91 0 : }
92 :
93 : /// Rate limits tenant requests.
94 : ///
95 : /// TODO: this should be a request middleware, but requires us to extract the tenant ID from
96 : /// different URLs in a systematic way.
97 : ///
98 : /// TODO: consider returning a 429 response if these start piling up.
99 0 : async fn maybe_rate_limit(request: &Request<Body>, tenant_id: TenantId) {
100 0 : // Check if the tenant should be rate-limited.
101 0 : let rate_limiter = &get_state(request).rate_limiter;
102 0 : if rate_limiter.check_key(&tenant_id).is_ok() {
103 0 : return;
104 0 : }
105 0 :
106 0 : // Measure the rate limiting delay.
107 0 : let _timer = METRICS_REGISTRY
108 0 : .metrics_group
109 0 : .storage_controller_http_request_rate_limited
110 0 : .start_timer();
111 :
112 : // Log rate limited tenants once every 10 seconds.
113 : static LOG_RATE_LIMITER: LazyLock<governor::DefaultKeyedRateLimiter<TenantId>> =
114 0 : LazyLock::new(|| {
115 0 : let quota = governor::Quota::with_period(Duration::from_secs(10)).unwrap();
116 0 : governor::RateLimiter::keyed(quota)
117 0 : });
118 :
119 0 : if LOG_RATE_LIMITER.check_key(&tenant_id).is_ok() {
120 0 : warn!("tenant {tenant_id} is rate limited")
121 0 : }
122 :
123 : // Wait for quota.
124 0 : rate_limiter.until_key_ready(&tenant_id).await;
125 0 : }
126 :
127 : /// Pageserver calls into this on startup, to learn which tenants it should attach
128 0 : async fn handle_re_attach(req: Request<Body>) -> Result<Response<Body>, ApiError> {
129 0 : check_permissions(&req, Scope::GenerationsApi)?;
130 :
131 0 : let mut req = match maybe_forward(req).await {
132 0 : ForwardOutcome::Forwarded(res) => {
133 0 : return res;
134 : }
135 0 : ForwardOutcome::NotForwarded(req) => req,
136 : };
137 :
138 0 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
139 0 : let state = get_state(&req);
140 0 : json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
141 0 : }
142 :
143 : /// Pageserver calls into this before doing deletions, to confirm that it still
144 : /// holds the latest generation for the tenants with deletions enqueued
145 0 : async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError> {
146 0 : check_permissions(&req, Scope::GenerationsApi)?;
147 :
148 0 : let mut req = match maybe_forward(req).await {
149 0 : ForwardOutcome::Forwarded(res) => {
150 0 : return res;
151 : }
152 0 : ForwardOutcome::NotForwarded(req) => req,
153 : };
154 :
155 0 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
156 0 : let state = get_state(&req);
157 0 : json_response(StatusCode::OK, state.service.validate(validate_req).await?)
158 0 : }
159 :
160 0 : async fn handle_get_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
161 0 : check_permissions(&req, Scope::GenerationsApi)?;
162 :
163 0 : let mut req = match maybe_forward(req).await {
164 0 : ForwardOutcome::Forwarded(res) => {
165 0 : return res;
166 : }
167 0 : ForwardOutcome::NotForwarded(req) => req,
168 : };
169 :
170 0 : let get_req = json_request::<TimelineImportStatusRequest>(&mut req).await?;
171 :
172 0 : let state = get_state(&req);
173 0 :
174 0 : json_response(
175 0 : StatusCode::OK,
176 0 : state
177 0 : .service
178 0 : .handle_timeline_shard_import_progress(get_req)
179 0 : .await?,
180 : )
181 0 : }
182 :
183 0 : async fn handle_put_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
184 0 : check_permissions(&req, Scope::GenerationsApi)?;
185 :
186 0 : let mut req = match maybe_forward(req).await {
187 0 : ForwardOutcome::Forwarded(res) => {
188 0 : return res;
189 : }
190 0 : ForwardOutcome::NotForwarded(req) => req,
191 : };
192 :
193 0 : let put_req = json_request::<PutTimelineImportStatusRequest>(&mut req).await?;
194 :
195 0 : let state = get_state(&req);
196 0 : json_response(
197 0 : StatusCode::OK,
198 0 : state
199 0 : .service
200 0 : .handle_timeline_shard_import_progress_upcall(put_req)
201 0 : .await?,
202 : )
203 0 : }
204 :
205 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
206 : /// (in the real control plane this is unnecessary, because the same program is managing
207 : /// generation numbers and doing attachments).
208 0 : async fn handle_attach_hook(req: Request<Body>) -> Result<Response<Body>, ApiError> {
209 0 : check_permissions(&req, Scope::Admin)?;
210 :
211 0 : let mut req = match maybe_forward(req).await {
212 0 : ForwardOutcome::Forwarded(res) => {
213 0 : return res;
214 : }
215 0 : ForwardOutcome::NotForwarded(req) => req,
216 : };
217 :
218 0 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
219 0 : let state = get_state(&req);
220 0 :
221 0 : json_response(
222 0 : StatusCode::OK,
223 0 : state
224 0 : .service
225 0 : .attach_hook(attach_req)
226 0 : .await
227 0 : .map_err(ApiError::InternalServerError)?,
228 : )
229 0 : }
230 :
231 0 : async fn handle_inspect(req: Request<Body>) -> Result<Response<Body>, ApiError> {
232 0 : check_permissions(&req, Scope::Admin)?;
233 :
234 0 : let mut req = match maybe_forward(req).await {
235 0 : ForwardOutcome::Forwarded(res) => {
236 0 : return res;
237 : }
238 0 : ForwardOutcome::NotForwarded(req) => req,
239 : };
240 :
241 0 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
242 :
243 0 : let state = get_state(&req);
244 0 :
245 0 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
246 0 : }
247 :
248 0 : async fn handle_tenant_create(
249 0 : service: Arc<Service>,
250 0 : req: Request<Body>,
251 0 : ) -> Result<Response<Body>, ApiError> {
252 0 : check_permissions(&req, Scope::PageServerApi)?;
253 :
254 0 : let mut req = match maybe_forward(req).await {
255 0 : ForwardOutcome::Forwarded(res) => {
256 0 : return res;
257 : }
258 0 : ForwardOutcome::NotForwarded(req) => req,
259 : };
260 :
261 0 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
262 :
263 : json_response(
264 : StatusCode::CREATED,
265 0 : service.tenant_create(create_req).await?,
266 : )
267 0 : }
268 :
269 0 : async fn handle_tenant_location_config(
270 0 : service: Arc<Service>,
271 0 : req: Request<Body>,
272 0 : ) -> Result<Response<Body>, ApiError> {
273 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
274 0 : check_permissions(&req, Scope::PageServerApi)?;
275 :
276 0 : let mut req = match maybe_forward(req).await {
277 0 : ForwardOutcome::Forwarded(res) => {
278 0 : return res;
279 : }
280 0 : ForwardOutcome::NotForwarded(req) => req,
281 : };
282 :
283 0 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
284 : json_response(
285 : StatusCode::OK,
286 0 : service
287 0 : .tenant_location_config(tenant_shard_id, config_req)
288 0 : .await?,
289 : )
290 0 : }
291 :
292 0 : async fn handle_tenant_config_patch(
293 0 : service: Arc<Service>,
294 0 : req: Request<Body>,
295 0 : ) -> Result<Response<Body>, ApiError> {
296 0 : check_permissions(&req, Scope::PageServerApi)?;
297 :
298 0 : let mut req = match maybe_forward(req).await {
299 0 : ForwardOutcome::Forwarded(res) => {
300 0 : return res;
301 : }
302 0 : ForwardOutcome::NotForwarded(req) => req,
303 : };
304 :
305 0 : let config_req = json_request::<TenantConfigPatchRequest>(&mut req).await?;
306 :
307 : json_response(
308 : StatusCode::OK,
309 0 : service.tenant_config_patch(config_req).await?,
310 : )
311 0 : }
312 :
313 0 : async fn handle_tenant_config_set(
314 0 : service: Arc<Service>,
315 0 : req: Request<Body>,
316 0 : ) -> Result<Response<Body>, ApiError> {
317 0 : check_permissions(&req, Scope::PageServerApi)?;
318 :
319 0 : let mut req = match maybe_forward(req).await {
320 0 : ForwardOutcome::Forwarded(res) => {
321 0 : return res;
322 : }
323 0 : ForwardOutcome::NotForwarded(req) => req,
324 : };
325 :
326 0 : let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
327 :
328 0 : json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
329 0 : }
330 :
331 0 : async fn handle_tenant_config_get(
332 0 : service: Arc<Service>,
333 0 : req: Request<Body>,
334 0 : ) -> Result<Response<Body>, ApiError> {
335 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
336 0 : check_permissions(&req, Scope::PageServerApi)?;
337 0 : maybe_rate_limit(&req, tenant_id).await;
338 :
339 0 : match maybe_forward(req).await {
340 0 : ForwardOutcome::Forwarded(res) => {
341 0 : return res;
342 : }
343 0 : ForwardOutcome::NotForwarded(_req) => {}
344 0 : };
345 0 :
346 0 : json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
347 0 : }
348 :
349 0 : async fn handle_tenant_time_travel_remote_storage(
350 0 : service: Arc<Service>,
351 0 : req: Request<Body>,
352 0 : ) -> Result<Response<Body>, ApiError> {
353 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
354 0 : check_permissions(&req, Scope::PageServerApi)?;
355 0 : maybe_rate_limit(&req, tenant_id).await;
356 :
357 0 : let mut req = match maybe_forward(req).await {
358 0 : ForwardOutcome::Forwarded(res) => {
359 0 : return res;
360 : }
361 0 : ForwardOutcome::NotForwarded(req) => req,
362 : };
363 :
364 0 : let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
365 :
366 0 : let timestamp_raw = must_get_query_param(&req, "travel_to")?;
367 0 : let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
368 0 : ApiError::BadRequest(anyhow::anyhow!(
369 0 : "Invalid time for travel_to: {timestamp_raw:?}"
370 0 : ))
371 0 : })?;
372 :
373 0 : let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
374 0 : let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
375 0 : ApiError::BadRequest(anyhow::anyhow!(
376 0 : "Invalid time for done_if_after: {done_if_after_raw:?}"
377 0 : ))
378 0 : })?;
379 :
380 0 : service
381 0 : .tenant_time_travel_remote_storage(
382 0 : &time_travel_req,
383 0 : tenant_id,
384 0 : timestamp_raw,
385 0 : done_if_after_raw,
386 0 : )
387 0 : .await?;
388 0 : json_response(StatusCode::OK, ())
389 0 : }
390 :
391 0 : fn map_reqwest_hyper_status(status: reqwest::StatusCode) -> Result<hyper::StatusCode, ApiError> {
392 0 : hyper::StatusCode::from_u16(status.as_u16())
393 0 : .context("invalid status code")
394 0 : .map_err(ApiError::InternalServerError)
395 0 : }
396 :
397 0 : async fn handle_tenant_secondary_download(
398 0 : service: Arc<Service>,
399 0 : req: Request<Body>,
400 0 : ) -> Result<Response<Body>, ApiError> {
401 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
402 0 : let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
403 0 : maybe_rate_limit(&req, tenant_id).await;
404 :
405 0 : match maybe_forward(req).await {
406 0 : ForwardOutcome::Forwarded(res) => {
407 0 : return res;
408 : }
409 0 : ForwardOutcome::NotForwarded(_req) => {}
410 : };
411 :
412 0 : let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
413 0 : json_response(map_reqwest_hyper_status(status)?, progress)
414 0 : }
415 :
416 0 : async fn handle_tenant_delete(
417 0 : service: Arc<Service>,
418 0 : req: Request<Body>,
419 0 : ) -> Result<Response<Body>, ApiError> {
420 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
421 0 : check_permissions(&req, Scope::PageServerApi)?;
422 0 : maybe_rate_limit(&req, tenant_id).await;
423 :
424 0 : match maybe_forward(req).await {
425 0 : ForwardOutcome::Forwarded(res) => {
426 0 : return res;
427 : }
428 0 : ForwardOutcome::NotForwarded(_req) => {}
429 : };
430 :
431 0 : let status_code = service
432 0 : .tenant_delete(tenant_id)
433 0 : .await
434 0 : .and_then(map_reqwest_hyper_status)?;
435 :
436 0 : if status_code == StatusCode::NOT_FOUND {
437 : // The pageserver uses 404 for successful deletion, but we use 200
438 0 : json_response(StatusCode::OK, ())
439 : } else {
440 0 : json_response(status_code, ())
441 : }
442 0 : }
443 :
444 0 : async fn handle_tenant_timeline_create(
445 0 : service: Arc<Service>,
446 0 : req: Request<Body>,
447 0 : ) -> Result<Response<Body>, ApiError> {
448 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
449 0 : check_permissions(&req, Scope::PageServerApi)?;
450 0 : maybe_rate_limit(&req, tenant_id).await;
451 :
452 0 : let mut req = match maybe_forward(req).await {
453 0 : ForwardOutcome::Forwarded(res) => {
454 0 : return res;
455 : }
456 0 : ForwardOutcome::NotForwarded(req) => req,
457 : };
458 :
459 0 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
460 : json_response(
461 : StatusCode::CREATED,
462 0 : service
463 0 : .tenant_timeline_create(tenant_id, create_req)
464 0 : .await?,
465 : )
466 0 : }
467 :
468 0 : async fn handle_tenant_timeline_delete(
469 0 : service: Arc<Service>,
470 0 : req: Request<Body>,
471 0 : ) -> Result<Response<Body>, ApiError> {
472 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
473 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
474 :
475 0 : check_permissions(&req, Scope::PageServerApi)?;
476 0 : maybe_rate_limit(&req, tenant_id).await;
477 :
478 0 : match maybe_forward(req).await {
479 0 : ForwardOutcome::Forwarded(res) => {
480 0 : return res;
481 : }
482 0 : ForwardOutcome::NotForwarded(_req) => {}
483 0 : };
484 0 :
485 0 : service
486 0 : .maybe_delete_timeline_import(tenant_id, timeline_id)
487 0 : .await?;
488 :
489 : // For timeline deletions, which both implement an "initially return 202, then 404 once
490 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
491 0 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
492 0 : where
493 0 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
494 0 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
495 0 : {
496 : // On subsequent retries, wait longer.
497 : // Enable callers with a 25 second request timeout to reliably get a response
498 : const MAX_WAIT: Duration = Duration::from_secs(25);
499 : const MAX_RETRY_PERIOD: Duration = Duration::from_secs(5);
500 :
501 0 : let started_at = Instant::now();
502 0 :
503 0 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
504 0 : // completed.
505 0 : let mut retry_period = Duration::from_secs(1);
506 :
507 : loop {
508 0 : let status = f(service.clone()).await?;
509 0 : match status {
510 : StatusCode::ACCEPTED => {
511 0 : tracing::info!("Deletion accepted, waiting to try again...");
512 0 : tokio::time::sleep(retry_period).await;
513 0 : retry_period = MAX_RETRY_PERIOD;
514 : }
515 : StatusCode::CONFLICT => {
516 0 : tracing::info!("Deletion already in progress, waiting to try again...");
517 0 : tokio::time::sleep(retry_period).await;
518 : }
519 : StatusCode::NOT_FOUND => {
520 0 : tracing::info!("Deletion complete");
521 0 : return json_response(StatusCode::OK, ());
522 : }
523 : _ => {
524 0 : tracing::warn!("Unexpected status {status}");
525 0 : return json_response(status, ());
526 : }
527 : }
528 :
529 0 : let now = Instant::now();
530 0 : if now + retry_period > started_at + MAX_WAIT {
531 0 : tracing::info!("Deletion timed out waiting for 404");
532 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
533 : // the pageserver's swagger definition for this endpoint, and has the same desired
534 : // effect of causing the control plane to retry later.
535 0 : return json_response(StatusCode::CONFLICT, ());
536 0 : }
537 : }
538 0 : }
539 :
540 0 : deletion_wrapper(service, move |service| async move {
541 0 : service
542 0 : .tenant_timeline_delete(tenant_id, timeline_id)
543 0 : .await
544 0 : .and_then(map_reqwest_hyper_status)
545 0 : })
546 0 : .await
547 0 : }
548 :
549 0 : async fn handle_tenant_timeline_archival_config(
550 0 : service: Arc<Service>,
551 0 : req: Request<Body>,
552 0 : ) -> Result<Response<Body>, ApiError> {
553 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
554 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
555 :
556 0 : check_permissions(&req, Scope::PageServerApi)?;
557 0 : maybe_rate_limit(&req, tenant_id).await;
558 :
559 0 : let mut req = match maybe_forward(req).await {
560 0 : ForwardOutcome::Forwarded(res) => {
561 0 : return res;
562 : }
563 0 : ForwardOutcome::NotForwarded(req) => req,
564 : };
565 :
566 0 : let create_req = json_request::<TimelineArchivalConfigRequest>(&mut req).await?;
567 :
568 0 : service
569 0 : .tenant_timeline_archival_config(tenant_id, timeline_id, create_req)
570 0 : .await?;
571 :
572 0 : json_response(StatusCode::OK, ())
573 0 : }
574 :
575 0 : async fn handle_tenant_timeline_detach_ancestor(
576 0 : service: Arc<Service>,
577 0 : req: Request<Body>,
578 0 : ) -> Result<Response<Body>, ApiError> {
579 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
580 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
581 0 : let behavior: Option<DetachBehavior> = parse_query_param(&req, "detach_behavior")?;
582 :
583 0 : check_permissions(&req, Scope::PageServerApi)?;
584 0 : maybe_rate_limit(&req, tenant_id).await;
585 :
586 0 : match maybe_forward(req).await {
587 0 : ForwardOutcome::Forwarded(res) => {
588 0 : return res;
589 : }
590 0 : ForwardOutcome::NotForwarded(_req) => {}
591 : };
592 :
593 0 : let res = service
594 0 : .tenant_timeline_detach_ancestor(tenant_id, timeline_id, behavior)
595 0 : .await?;
596 :
597 0 : json_response(StatusCode::OK, res)
598 0 : }
599 :
600 0 : async fn handle_tenant_timeline_block_unblock_gc(
601 0 : service: Arc<Service>,
602 0 : req: Request<Body>,
603 0 : dir: BlockUnblock,
604 0 : ) -> Result<Response<Body>, ApiError> {
605 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
606 0 : check_permissions(&req, Scope::PageServerApi)?;
607 0 : maybe_rate_limit(&req, tenant_id).await;
608 :
609 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
610 :
611 0 : service
612 0 : .tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
613 0 : .await?;
614 :
615 0 : json_response(StatusCode::OK, ())
616 0 : }
617 :
618 0 : async fn handle_tenant_timeline_download_heatmap_layers(
619 0 : service: Arc<Service>,
620 0 : req: Request<Body>,
621 0 : ) -> Result<Response<Body>, ApiError> {
622 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
623 :
624 0 : check_permissions(&req, Scope::PageServerApi)?;
625 0 : maybe_rate_limit(&req, tenant_shard_id.tenant_id).await;
626 :
627 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
628 0 : let concurrency: Option<usize> = parse_query_param(&req, "concurrency")?;
629 0 : let recurse = parse_query_param(&req, "recurse")?.unwrap_or(false);
630 0 :
631 0 : service
632 0 : .tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency, recurse)
633 0 : .await?;
634 :
635 0 : json_response(StatusCode::OK, ())
636 0 : }
637 :
638 0 : async fn handle_tenant_timeline_lsn_lease(
639 0 : service: Arc<Service>,
640 0 : req: Request<Body>,
641 0 : ) -> Result<Response<Body>, ApiError> {
642 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
643 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
644 :
645 0 : check_permissions(&req, Scope::PageServerApi)?;
646 0 : maybe_rate_limit(&req, tenant_id).await;
647 :
648 0 : let mut req = match maybe_forward(req).await {
649 0 : ForwardOutcome::Forwarded(res) => {
650 0 : return res;
651 : }
652 0 : ForwardOutcome::NotForwarded(req) => req,
653 : };
654 :
655 0 : let lsn_lease_request = json_request::<LsnLeaseRequest>(&mut req).await?;
656 :
657 0 : service
658 0 : .tenant_timeline_lsn_lease(tenant_id, timeline_id, lsn_lease_request.lsn)
659 0 : .await?;
660 :
661 0 : json_response(StatusCode::OK, ())
662 0 : }
663 :
664 : // For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
665 : // and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
666 : // compare to, so we can just filter out our well known ID format with regexes.
667 3 : fn path_without_ids(path: &str) -> String {
668 : static ID_REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
669 3 : ID_REGEX
670 3 : .get_or_init(|| regex::Regex::new(r"([0-9a-fA-F]{32}(-[0-9]{4})?|\?.*)").unwrap())
671 3 : .replace_all(path, "")
672 3 : .to_string()
673 3 : }
674 :
675 0 : async fn handle_tenant_timeline_passthrough(
676 0 : service: Arc<Service>,
677 0 : req: Request<Body>,
678 0 : ) -> Result<Response<Body>, ApiError> {
679 0 : let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
680 0 : check_permissions(&req, Scope::PageServerApi)?;
681 0 : maybe_rate_limit(&req, tenant_or_shard_id.tenant_id).await;
682 :
683 0 : let req = match maybe_forward(req).await {
684 0 : ForwardOutcome::Forwarded(res) => {
685 0 : return res;
686 : }
687 0 : ForwardOutcome::NotForwarded(req) => req,
688 : };
689 :
690 0 : let Some(path) = req.uri().path_and_query() else {
691 : // This should never happen, our request router only calls us if there is a path
692 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
693 : };
694 :
695 0 : let method = match *req.method() {
696 0 : hyper::Method::GET => reqwest::Method::GET,
697 0 : hyper::Method::POST => reqwest::Method::POST,
698 0 : hyper::Method::PUT => reqwest::Method::PUT,
699 0 : hyper::Method::DELETE => reqwest::Method::DELETE,
700 0 : hyper::Method::PATCH => reqwest::Method::PATCH,
701 0 : _ => return Err(ApiError::BadRequest(anyhow::anyhow!("Unsupported method"))),
702 : };
703 :
704 0 : tracing::info!(
705 0 : "Proxying request for tenant {} ({})",
706 : tenant_or_shard_id.tenant_id,
707 : path
708 : );
709 :
710 : // Find the node that holds shard zero
711 0 : let (node, tenant_shard_id) = if tenant_or_shard_id.is_unsharded() {
712 0 : service
713 0 : .tenant_shard0_node(tenant_or_shard_id.tenant_id)
714 0 : .await?
715 : } else {
716 : (
717 0 : service.tenant_shard_node(tenant_or_shard_id).await?,
718 0 : tenant_or_shard_id,
719 : )
720 : };
721 :
722 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
723 : // rewrite this to a shard-aware shard zero ID.
724 0 : let path = format!("{}", path);
725 0 : let tenant_str = tenant_or_shard_id.tenant_id.to_string();
726 0 : let tenant_shard_str = format!("{}", tenant_shard_id);
727 0 : let path = path.replace(&tenant_str, &tenant_shard_str);
728 0 :
729 0 : let latency = &METRICS_REGISTRY
730 0 : .metrics_group
731 0 : .storage_controller_passthrough_request_latency;
732 0 :
733 0 : let path_label = path_without_ids(&path)
734 0 : .split('/')
735 0 : .filter(|token| !token.is_empty())
736 0 : .collect::<Vec<_>>()
737 0 : .join("_");
738 0 : let labels = PageserverRequestLabelGroup {
739 0 : pageserver_id: &node.get_id().to_string(),
740 0 : path: &path_label,
741 0 : method: crate::metrics::Method::Get,
742 0 : };
743 0 :
744 0 : let _timer = latency.start_timer(labels.clone());
745 0 :
746 0 : let client = mgmt_api::Client::new(
747 0 : service.get_http_client().clone(),
748 0 : node.base_url(),
749 0 : service.get_config().pageserver_jwt_token.as_deref(),
750 0 : );
751 0 : let resp = client.op_raw(method, path).await.map_err(|e|
752 : // We return 503 here because if we can't successfully send a request to the pageserver,
753 : // either we aren't available or the pageserver is unavailable.
754 0 : ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
755 :
756 0 : if !resp.status().is_success() {
757 0 : let error_counter = &METRICS_REGISTRY
758 0 : .metrics_group
759 0 : .storage_controller_passthrough_request_error;
760 0 : error_counter.inc(labels);
761 0 : }
762 :
763 : // Transform 404 into 503 if we raced with a migration
764 0 : if resp.status() == reqwest::StatusCode::NOT_FOUND {
765 : // Look up node again: if we migrated it will be different
766 0 : let new_node = service.tenant_shard_node(tenant_shard_id).await?;
767 0 : if new_node.get_id() != node.get_id() {
768 : // Rather than retry here, send the client a 503 to prompt a retry: this matches
769 : // the pageserver's use of 503, and all clients calling this API should retry on 503.
770 0 : return Err(ApiError::ResourceUnavailable(
771 0 : format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
772 0 : ));
773 0 : }
774 0 : }
775 :
776 : // We have a reqest::Response, would like a http::Response
777 0 : let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
778 0 : for (k, v) in resp.headers() {
779 0 : builder = builder.header(k.as_str(), v.as_bytes());
780 0 : }
781 :
782 0 : let response = builder
783 0 : .body(Body::wrap_stream(resp.bytes_stream()))
784 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
785 :
786 0 : Ok(response)
787 0 : }
788 :
789 0 : async fn handle_tenant_locate(
790 0 : service: Arc<Service>,
791 0 : req: Request<Body>,
792 0 : ) -> Result<Response<Body>, ApiError> {
793 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
794 :
795 0 : check_permissions(&req, Scope::Admin)?;
796 : // NB: don't rate limit: admin operation.
797 :
798 0 : match maybe_forward(req).await {
799 0 : ForwardOutcome::Forwarded(res) => {
800 0 : return res;
801 : }
802 0 : ForwardOutcome::NotForwarded(_req) => {}
803 0 : };
804 0 :
805 0 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
806 0 : }
807 :
808 0 : async fn handle_tenant_describe(
809 0 : service: Arc<Service>,
810 0 : req: Request<Body>,
811 0 : ) -> Result<Response<Body>, ApiError> {
812 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
813 0 : check_permissions(&req, Scope::Scrubber)?;
814 : // NB: don't rate limit: scrubber operation.
815 :
816 0 : match maybe_forward(req).await {
817 0 : ForwardOutcome::Forwarded(res) => {
818 0 : return res;
819 : }
820 0 : ForwardOutcome::NotForwarded(_req) => {}
821 0 : };
822 0 :
823 0 : json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
824 0 : }
825 :
826 0 : async fn handle_tenant_list(
827 0 : service: Arc<Service>,
828 0 : req: Request<Body>,
829 0 : ) -> Result<Response<Body>, ApiError> {
830 0 : check_permissions(&req, Scope::Admin)?;
831 :
832 0 : let limit: Option<usize> = parse_query_param(&req, "limit")?;
833 0 : let start_after: Option<TenantId> = parse_query_param(&req, "start_after")?;
834 0 : tracing::info!("start_after: {:?}", start_after);
835 :
836 0 : match maybe_forward(req).await {
837 0 : ForwardOutcome::Forwarded(res) => {
838 0 : return res;
839 : }
840 0 : ForwardOutcome::NotForwarded(_req) => {}
841 0 : };
842 0 :
843 0 : json_response(StatusCode::OK, service.tenant_list(limit, start_after))
844 0 : }
845 :
846 0 : async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiError> {
847 0 : check_permissions(&req, Scope::Infra)?;
848 :
849 0 : let mut req = match maybe_forward(req).await {
850 0 : ForwardOutcome::Forwarded(res) => {
851 0 : return res;
852 : }
853 0 : ForwardOutcome::NotForwarded(req) => req,
854 : };
855 :
856 0 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
857 0 : let state = get_state(&req);
858 0 : state.service.node_register(register_req).await?;
859 0 : json_response(StatusCode::OK, ())
860 0 : }
861 :
862 0 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
863 0 : check_permissions(&req, Scope::Infra)?;
864 :
865 0 : let req = match maybe_forward(req).await {
866 0 : ForwardOutcome::Forwarded(res) => {
867 0 : return res;
868 : }
869 0 : ForwardOutcome::NotForwarded(req) => req,
870 0 : };
871 0 :
872 0 : let state = get_state(&req);
873 0 : let mut nodes = state.service.node_list().await?;
874 0 : nodes.sort_by_key(|n| n.get_id());
875 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
876 0 :
877 0 : json_response(StatusCode::OK, api_nodes)
878 0 : }
879 :
880 0 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
881 0 : check_permissions(&req, Scope::Admin)?;
882 :
883 0 : let req = match maybe_forward(req).await {
884 0 : ForwardOutcome::Forwarded(res) => {
885 0 : return res;
886 : }
887 0 : ForwardOutcome::NotForwarded(req) => req,
888 0 : };
889 0 :
890 0 : let state = get_state(&req);
891 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
892 0 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
893 0 : }
894 :
895 0 : async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
896 0 : check_permissions(&req, Scope::Admin)?;
897 :
898 0 : let req = match maybe_forward(req).await {
899 0 : ForwardOutcome::Forwarded(res) => {
900 0 : return res;
901 : }
902 0 : ForwardOutcome::NotForwarded(req) => req,
903 0 : };
904 0 :
905 0 : let state = get_state(&req);
906 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
907 0 : json_response(StatusCode::OK, state.service.node_delete(node_id).await?)
908 0 : }
909 :
910 0 : async fn handle_tombstone_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
911 0 : check_permissions(&req, Scope::Admin)?;
912 :
913 0 : let req = match maybe_forward(req).await {
914 0 : ForwardOutcome::Forwarded(res) => {
915 0 : return res;
916 : }
917 0 : ForwardOutcome::NotForwarded(req) => req,
918 0 : };
919 0 :
920 0 : let state = get_state(&req);
921 0 : let mut nodes = state.service.tombstone_list().await?;
922 0 : nodes.sort_by_key(|n| n.get_id());
923 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
924 0 :
925 0 : json_response(StatusCode::OK, api_nodes)
926 0 : }
927 :
928 0 : async fn handle_tombstone_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
929 0 : check_permissions(&req, Scope::Admin)?;
930 :
931 0 : let req = match maybe_forward(req).await {
932 0 : ForwardOutcome::Forwarded(res) => {
933 0 : return res;
934 : }
935 0 : ForwardOutcome::NotForwarded(req) => req,
936 0 : };
937 0 :
938 0 : let state = get_state(&req);
939 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
940 : json_response(
941 : StatusCode::OK,
942 0 : state.service.tombstone_delete(node_id).await?,
943 : )
944 0 : }
945 :
946 0 : async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, ApiError> {
947 0 : check_permissions(&req, Scope::Admin)?;
948 :
949 0 : let mut req = match maybe_forward(req).await {
950 0 : ForwardOutcome::Forwarded(res) => {
951 0 : return res;
952 : }
953 0 : ForwardOutcome::NotForwarded(req) => req,
954 : };
955 :
956 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
957 0 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
958 0 : if node_id != config_req.node_id {
959 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
960 0 : "Path and body node_id differ"
961 0 : )));
962 0 : }
963 0 : let state = get_state(&req);
964 0 :
965 0 : json_response(
966 0 : StatusCode::OK,
967 0 : state
968 0 : .service
969 0 : .external_node_configure(
970 0 : config_req.node_id,
971 0 : config_req.availability.map(NodeAvailability::from),
972 0 : config_req.scheduling,
973 0 : )
974 0 : .await?,
975 : )
976 0 : }
977 :
978 0 : async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
979 0 : check_permissions(&req, Scope::Infra)?;
980 :
981 0 : let req = match maybe_forward(req).await {
982 0 : ForwardOutcome::Forwarded(res) => {
983 0 : return res;
984 : }
985 0 : ForwardOutcome::NotForwarded(req) => req,
986 0 : };
987 0 :
988 0 : let state = get_state(&req);
989 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
990 :
991 0 : let node_status = state.service.get_node(node_id).await?.describe();
992 0 :
993 0 : json_response(StatusCode::OK, node_status)
994 0 : }
995 :
996 0 : async fn handle_node_shards(req: Request<Body>) -> Result<Response<Body>, ApiError> {
997 0 : check_permissions(&req, Scope::Admin)?;
998 :
999 0 : let state = get_state(&req);
1000 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1001 :
1002 0 : let node_status = state.service.get_node_shards(node_id).await?;
1003 :
1004 0 : json_response(StatusCode::OK, node_status)
1005 0 : }
1006 :
1007 0 : async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1008 0 : check_permissions(&req, Scope::Admin)?;
1009 :
1010 0 : let req = match maybe_forward(req).await {
1011 0 : ForwardOutcome::Forwarded(res) => {
1012 0 : return res;
1013 : }
1014 0 : ForwardOutcome::NotForwarded(req) => req,
1015 0 : };
1016 0 :
1017 0 : let state = get_state(&req);
1018 0 : let leader = state.service.get_leader().await.map_err(|err| {
1019 0 : ApiError::InternalServerError(anyhow::anyhow!(
1020 0 : "Failed to read leader from database: {err}"
1021 0 : ))
1022 0 : })?;
1023 :
1024 0 : json_response(StatusCode::OK, leader)
1025 0 : }
1026 :
1027 0 : async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1028 0 : check_permissions(&req, Scope::Infra)?;
1029 :
1030 0 : let req = match maybe_forward(req).await {
1031 0 : ForwardOutcome::Forwarded(res) => {
1032 0 : return res;
1033 : }
1034 0 : ForwardOutcome::NotForwarded(req) => req,
1035 0 : };
1036 0 :
1037 0 : let state = get_state(&req);
1038 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1039 :
1040 0 : state.service.start_node_drain(node_id).await?;
1041 :
1042 0 : json_response(StatusCode::ACCEPTED, ())
1043 0 : }
1044 :
1045 0 : async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1046 0 : check_permissions(&req, Scope::Infra)?;
1047 :
1048 0 : let req = match maybe_forward(req).await {
1049 0 : ForwardOutcome::Forwarded(res) => {
1050 0 : return res;
1051 : }
1052 0 : ForwardOutcome::NotForwarded(req) => req,
1053 0 : };
1054 0 :
1055 0 : let state = get_state(&req);
1056 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1057 :
1058 0 : state.service.cancel_node_drain(node_id).await?;
1059 :
1060 0 : json_response(StatusCode::ACCEPTED, ())
1061 0 : }
1062 :
1063 0 : async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1064 0 : check_permissions(&req, Scope::Infra)?;
1065 :
1066 0 : let req = match maybe_forward(req).await {
1067 0 : ForwardOutcome::Forwarded(res) => {
1068 0 : return res;
1069 : }
1070 0 : ForwardOutcome::NotForwarded(req) => req,
1071 0 : };
1072 0 :
1073 0 : let state = get_state(&req);
1074 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1075 :
1076 0 : state.service.start_node_fill(node_id).await?;
1077 :
1078 0 : json_response(StatusCode::ACCEPTED, ())
1079 0 : }
1080 :
1081 0 : async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1082 0 : check_permissions(&req, Scope::Infra)?;
1083 :
1084 0 : let req = match maybe_forward(req).await {
1085 0 : ForwardOutcome::Forwarded(res) => {
1086 0 : return res;
1087 : }
1088 0 : ForwardOutcome::NotForwarded(req) => req,
1089 0 : };
1090 0 :
1091 0 : let state = get_state(&req);
1092 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1093 :
1094 0 : state.service.cancel_node_fill(node_id).await?;
1095 :
1096 0 : json_response(StatusCode::ACCEPTED, ())
1097 0 : }
1098 :
1099 0 : async fn handle_safekeeper_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1100 0 : check_permissions(&req, Scope::Infra)?;
1101 :
1102 0 : let req = match maybe_forward(req).await {
1103 0 : ForwardOutcome::Forwarded(res) => {
1104 0 : return res;
1105 : }
1106 0 : ForwardOutcome::NotForwarded(req) => req,
1107 0 : };
1108 0 :
1109 0 : let state = get_state(&req);
1110 0 : let safekeepers = state.service.safekeepers_list().await?;
1111 0 : json_response(StatusCode::OK, safekeepers)
1112 0 : }
1113 :
1114 0 : async fn handle_metadata_health_update(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1115 0 : check_permissions(&req, Scope::Scrubber)?;
1116 :
1117 0 : let mut req = match maybe_forward(req).await {
1118 0 : ForwardOutcome::Forwarded(res) => {
1119 0 : return res;
1120 : }
1121 0 : ForwardOutcome::NotForwarded(req) => req,
1122 : };
1123 :
1124 0 : let update_req = json_request::<MetadataHealthUpdateRequest>(&mut req).await?;
1125 0 : let state = get_state(&req);
1126 0 :
1127 0 : state.service.metadata_health_update(update_req).await?;
1128 :
1129 0 : json_response(StatusCode::OK, MetadataHealthUpdateResponse {})
1130 0 : }
1131 :
1132 0 : async fn handle_metadata_health_list_unhealthy(
1133 0 : req: Request<Body>,
1134 0 : ) -> Result<Response<Body>, ApiError> {
1135 0 : check_permissions(&req, Scope::Admin)?;
1136 :
1137 0 : let req = match maybe_forward(req).await {
1138 0 : ForwardOutcome::Forwarded(res) => {
1139 0 : return res;
1140 : }
1141 0 : ForwardOutcome::NotForwarded(req) => req,
1142 0 : };
1143 0 :
1144 0 : let state = get_state(&req);
1145 0 : let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
1146 :
1147 0 : json_response(
1148 0 : StatusCode::OK,
1149 0 : MetadataHealthListUnhealthyResponse {
1150 0 : unhealthy_tenant_shards,
1151 0 : },
1152 0 : )
1153 0 : }
1154 :
1155 0 : async fn handle_metadata_health_list_outdated(
1156 0 : req: Request<Body>,
1157 0 : ) -> Result<Response<Body>, ApiError> {
1158 0 : check_permissions(&req, Scope::Admin)?;
1159 :
1160 0 : let mut req = match maybe_forward(req).await {
1161 0 : ForwardOutcome::Forwarded(res) => {
1162 0 : return res;
1163 : }
1164 0 : ForwardOutcome::NotForwarded(req) => req,
1165 : };
1166 :
1167 0 : let list_outdated_req = json_request::<MetadataHealthListOutdatedRequest>(&mut req).await?;
1168 0 : let state = get_state(&req);
1169 0 : let health_records = state
1170 0 : .service
1171 0 : .metadata_health_list_outdated(list_outdated_req.not_scrubbed_for)
1172 0 : .await?;
1173 :
1174 0 : json_response(
1175 0 : StatusCode::OK,
1176 0 : MetadataHealthListOutdatedResponse { health_records },
1177 0 : )
1178 0 : }
1179 :
1180 0 : async fn handle_tenant_shard_split(
1181 0 : service: Arc<Service>,
1182 0 : req: Request<Body>,
1183 0 : ) -> Result<Response<Body>, ApiError> {
1184 0 : check_permissions(&req, Scope::Admin)?;
1185 : // NB: don't rate limit: admin operation.
1186 :
1187 0 : let mut req = match maybe_forward(req).await {
1188 0 : ForwardOutcome::Forwarded(res) => {
1189 0 : return res;
1190 : }
1191 0 : ForwardOutcome::NotForwarded(req) => req,
1192 : };
1193 :
1194 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1195 0 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
1196 :
1197 : json_response(
1198 : StatusCode::OK,
1199 0 : service.tenant_shard_split(tenant_id, split_req).await?,
1200 : )
1201 0 : }
1202 :
1203 0 : async fn handle_tenant_shard_migrate(
1204 0 : service: Arc<Service>,
1205 0 : req: Request<Body>,
1206 0 : ) -> Result<Response<Body>, ApiError> {
1207 0 : check_permissions(&req, Scope::Admin)?;
1208 : // NB: don't rate limit: admin operation.
1209 :
1210 0 : let mut req = match maybe_forward(req).await {
1211 0 : ForwardOutcome::Forwarded(res) => {
1212 0 : return res;
1213 : }
1214 0 : ForwardOutcome::NotForwarded(req) => req,
1215 : };
1216 :
1217 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1218 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1219 : json_response(
1220 : StatusCode::OK,
1221 0 : service
1222 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
1223 0 : .await?,
1224 : )
1225 0 : }
1226 :
1227 0 : async fn handle_tenant_shard_migrate_secondary(
1228 0 : service: Arc<Service>,
1229 0 : req: Request<Body>,
1230 0 : ) -> Result<Response<Body>, ApiError> {
1231 0 : check_permissions(&req, Scope::Admin)?;
1232 : // NB: don't rate limit: admin operation.
1233 :
1234 0 : let mut req = match maybe_forward(req).await {
1235 0 : ForwardOutcome::Forwarded(res) => {
1236 0 : return res;
1237 : }
1238 0 : ForwardOutcome::NotForwarded(req) => req,
1239 : };
1240 :
1241 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1242 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1243 : json_response(
1244 : StatusCode::OK,
1245 0 : service
1246 0 : .tenant_shard_migrate_secondary(tenant_shard_id, migrate_req)
1247 0 : .await?,
1248 : )
1249 0 : }
1250 :
1251 0 : async fn handle_tenant_shard_cancel_reconcile(
1252 0 : service: Arc<Service>,
1253 0 : req: Request<Body>,
1254 0 : ) -> Result<Response<Body>, ApiError> {
1255 0 : check_permissions(&req, Scope::Admin)?;
1256 : // NB: don't rate limit: admin operation.
1257 :
1258 0 : let req = match maybe_forward(req).await {
1259 0 : ForwardOutcome::Forwarded(res) => {
1260 0 : return res;
1261 : }
1262 0 : ForwardOutcome::NotForwarded(req) => req,
1263 : };
1264 :
1265 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1266 : json_response(
1267 : StatusCode::OK,
1268 0 : service
1269 0 : .tenant_shard_cancel_reconcile(tenant_shard_id)
1270 0 : .await?,
1271 : )
1272 0 : }
1273 :
1274 0 : async fn handle_tenant_update_policy(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1275 0 : check_permissions(&req, Scope::Admin)?;
1276 : // NB: don't rate limit: admin operation.
1277 :
1278 0 : let mut req = match maybe_forward(req).await {
1279 0 : ForwardOutcome::Forwarded(res) => {
1280 0 : return res;
1281 : }
1282 0 : ForwardOutcome::NotForwarded(req) => req,
1283 : };
1284 :
1285 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1286 0 : let update_req = json_request::<TenantPolicyRequest>(&mut req).await?;
1287 0 : let state = get_state(&req);
1288 0 :
1289 0 : json_response(
1290 0 : StatusCode::OK,
1291 0 : state
1292 0 : .service
1293 0 : .tenant_update_policy(tenant_id, update_req)
1294 0 : .await?,
1295 : )
1296 0 : }
1297 :
1298 0 : async fn handle_update_preferred_azs(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1299 0 : check_permissions(&req, Scope::Admin)?;
1300 :
1301 0 : let mut req = match maybe_forward(req).await {
1302 0 : ForwardOutcome::Forwarded(res) => {
1303 0 : return res;
1304 : }
1305 0 : ForwardOutcome::NotForwarded(req) => req,
1306 : };
1307 :
1308 0 : let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
1309 0 : let state = get_state(&req);
1310 0 :
1311 0 : json_response(
1312 0 : StatusCode::OK,
1313 0 : state.service.update_shards_preferred_azs(azs_req).await?,
1314 : )
1315 0 : }
1316 :
1317 0 : async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1318 0 : check_permissions(&req, Scope::ControllerPeer)?;
1319 :
1320 0 : let req = match maybe_forward(req).await {
1321 0 : ForwardOutcome::Forwarded(res) => {
1322 0 : return res;
1323 : }
1324 0 : ForwardOutcome::NotForwarded(req) => req,
1325 0 : };
1326 0 :
1327 0 : let state = get_state(&req);
1328 0 : let result = state.service.step_down().await;
1329 :
1330 0 : json_response(StatusCode::OK, result)
1331 0 : }
1332 :
1333 0 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1334 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1335 0 : check_permissions(&req, Scope::PageServerApi)?;
1336 0 : maybe_rate_limit(&req, tenant_id).await;
1337 :
1338 0 : let req = match maybe_forward(req).await {
1339 0 : ForwardOutcome::Forwarded(res) => {
1340 0 : return res;
1341 : }
1342 0 : ForwardOutcome::NotForwarded(req) => req,
1343 0 : };
1344 0 :
1345 0 : let state = get_state(&req);
1346 0 :
1347 0 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
1348 0 : }
1349 :
1350 0 : async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1351 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1352 0 : check_permissions(&req, Scope::PageServerApi)?;
1353 0 : maybe_rate_limit(&req, tenant_id).await;
1354 :
1355 0 : let req = match maybe_forward(req).await {
1356 0 : ForwardOutcome::Forwarded(res) => {
1357 0 : return res;
1358 : }
1359 0 : ForwardOutcome::NotForwarded(req) => req,
1360 0 : };
1361 0 :
1362 0 : let state = get_state(&req);
1363 0 :
1364 0 : json_response(
1365 0 : StatusCode::OK,
1366 0 : state.service.tenant_import(tenant_id).await?,
1367 : )
1368 0 : }
1369 :
1370 0 : async fn handle_timeline_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1371 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1372 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
1373 0 : check_permissions(&req, Scope::PageServerApi)?;
1374 0 : maybe_rate_limit(&req, tenant_id).await;
1375 :
1376 0 : let mut req = match maybe_forward(req).await {
1377 0 : ForwardOutcome::Forwarded(res) => {
1378 0 : return res;
1379 : }
1380 0 : ForwardOutcome::NotForwarded(req) => req,
1381 : };
1382 :
1383 0 : let import_req = json_request::<TimelineImportRequest>(&mut req).await?;
1384 :
1385 0 : let state = get_state(&req);
1386 0 :
1387 0 : if import_req.tenant_id != tenant_id || import_req.timeline_id != timeline_id {
1388 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1389 0 : "tenant id or timeline id mismatch: url={tenant_id}/{timeline_id}, body={}/{}",
1390 0 : import_req.tenant_id,
1391 0 : import_req.timeline_id
1392 0 : )));
1393 0 : }
1394 0 :
1395 0 : json_response(
1396 0 : StatusCode::OK,
1397 0 : state.service.timeline_import(import_req).await?,
1398 : )
1399 0 : }
1400 :
1401 0 : async fn handle_tenant_timeline_locate(
1402 0 : service: Arc<Service>,
1403 0 : req: Request<Body>,
1404 0 : ) -> Result<Response<Body>, ApiError> {
1405 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1406 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
1407 :
1408 0 : check_permissions(&req, Scope::Admin)?;
1409 0 : maybe_rate_limit(&req, tenant_id).await;
1410 :
1411 0 : match maybe_forward(req).await {
1412 0 : ForwardOutcome::Forwarded(res) => {
1413 0 : return res;
1414 : }
1415 0 : ForwardOutcome::NotForwarded(_req) => {}
1416 0 : };
1417 0 :
1418 0 : json_response(
1419 0 : StatusCode::OK,
1420 0 : service
1421 0 : .tenant_timeline_locate(tenant_id, timeline_id)
1422 0 : .await?,
1423 : )
1424 0 : }
1425 :
1426 0 : async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1427 0 : check_permissions(&req, Scope::Admin)?;
1428 :
1429 0 : let req = match maybe_forward(req).await {
1430 0 : ForwardOutcome::Forwarded(res) => {
1431 0 : return res;
1432 : }
1433 0 : ForwardOutcome::NotForwarded(req) => req,
1434 0 : };
1435 0 :
1436 0 : let state = get_state(&req);
1437 0 : state.service.tenants_dump()
1438 0 : }
1439 :
1440 0 : async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1441 0 : check_permissions(&req, Scope::Admin)?;
1442 :
1443 0 : let req = match maybe_forward(req).await {
1444 0 : ForwardOutcome::Forwarded(res) => {
1445 0 : return res;
1446 : }
1447 0 : ForwardOutcome::NotForwarded(req) => req,
1448 0 : };
1449 0 :
1450 0 : let state = get_state(&req);
1451 0 : state.service.scheduler_dump()
1452 0 : }
1453 :
1454 0 : async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1455 0 : check_permissions(&req, Scope::Admin)?;
1456 :
1457 0 : let req = match maybe_forward(req).await {
1458 0 : ForwardOutcome::Forwarded(res) => {
1459 0 : return res;
1460 : }
1461 0 : ForwardOutcome::NotForwarded(req) => req,
1462 0 : };
1463 0 :
1464 0 : let state = get_state(&req);
1465 0 :
1466 0 : json_response(StatusCode::OK, state.service.consistency_check().await?)
1467 0 : }
1468 :
1469 0 : async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1470 0 : check_permissions(&req, Scope::Admin)?;
1471 :
1472 0 : let req = match maybe_forward(req).await {
1473 0 : ForwardOutcome::Forwarded(res) => {
1474 0 : return res;
1475 : }
1476 0 : ForwardOutcome::NotForwarded(req) => req,
1477 0 : };
1478 0 :
1479 0 : let state = get_state(&req);
1480 0 :
1481 0 : json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
1482 0 : }
1483 :
1484 : /// Status endpoint is just used for checking that our HTTP listener is up
1485 : ///
1486 : /// This serves as our k8s startup probe.
1487 0 : async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1488 0 : match maybe_forward(req).await {
1489 0 : ForwardOutcome::Forwarded(res) => {
1490 0 : return res;
1491 : }
1492 0 : ForwardOutcome::NotForwarded(_req) => {}
1493 0 : };
1494 0 :
1495 0 : json_response(StatusCode::OK, ())
1496 0 : }
1497 :
1498 : /// Liveness endpoint indicates that this storage controller is in a state
1499 : /// where it can fulfill it's responsibilties. Namely, startup has finished
1500 : /// and it is the current leader.
1501 : ///
1502 : /// This serves as our k8s liveness probe.
1503 0 : async fn handle_live(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1504 0 : let req = match maybe_forward(req).await {
1505 0 : ForwardOutcome::Forwarded(res) => {
1506 0 : return res;
1507 : }
1508 0 : ForwardOutcome::NotForwarded(req) => req,
1509 0 : };
1510 0 :
1511 0 : let state = get_state(&req);
1512 0 : let live = state.service.startup_complete.is_ready()
1513 0 : && state.service.get_leadership_status() == LeadershipStatus::Leader;
1514 :
1515 0 : if live {
1516 0 : json_response(StatusCode::OK, ())
1517 : } else {
1518 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
1519 : }
1520 0 : }
1521 :
1522 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
1523 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
1524 0 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1525 0 : let req = match maybe_forward(req).await {
1526 0 : ForwardOutcome::Forwarded(res) => {
1527 0 : return res;
1528 : }
1529 0 : ForwardOutcome::NotForwarded(req) => req,
1530 0 : };
1531 0 :
1532 0 : let state = get_state(&req);
1533 0 : if state.service.startup_complete.is_ready() {
1534 0 : json_response(StatusCode::OK, ())
1535 : } else {
1536 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
1537 : }
1538 0 : }
1539 :
1540 : impl From<ReconcileError> for ApiError {
1541 0 : fn from(value: ReconcileError) -> Self {
1542 0 : ApiError::Conflict(format!("Reconciliation error: {}", value))
1543 0 : }
1544 : }
1545 :
1546 : /// Return the safekeeper record by instance id, or 404.
1547 : ///
1548 : /// Not used by anything except manual testing.
1549 0 : async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1550 0 : check_permissions(&req, Scope::Infra)?;
1551 :
1552 0 : let id = parse_request_param::<i64>(&req, "id")?;
1553 :
1554 0 : let req = match maybe_forward(req).await {
1555 0 : ForwardOutcome::Forwarded(res) => {
1556 0 : return res;
1557 : }
1558 0 : ForwardOutcome::NotForwarded(req) => req,
1559 0 : };
1560 0 :
1561 0 : let state = get_state(&req);
1562 :
1563 0 : let res = state.service.get_safekeeper(id).await;
1564 :
1565 0 : match res {
1566 0 : Ok(b) => json_response(StatusCode::OK, b),
1567 : Err(crate::persistence::DatabaseError::Query(diesel::result::Error::NotFound)) => {
1568 0 : Err(ApiError::NotFound("unknown instance id".into()))
1569 : }
1570 0 : Err(other) => Err(other.into()),
1571 : }
1572 0 : }
1573 :
1574 : /// Used as part of deployment scripts.
1575 : ///
1576 : /// Assumes information is only relayed to storage controller after first selecting an unique id on
1577 : /// control plane database, which means we have an id field in the request and payload.
1578 0 : async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
1579 0 : check_permissions(&req, Scope::Infra)?;
1580 :
1581 0 : let body = json_request::<SafekeeperUpsert>(&mut req).await?;
1582 0 : let id = parse_request_param::<i64>(&req, "id")?;
1583 :
1584 0 : if id != body.id {
1585 : // it should be repeated
1586 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1587 0 : "id mismatch: url={id:?}, body={:?}",
1588 0 : body.id
1589 0 : )));
1590 0 : }
1591 0 :
1592 0 : if id <= 0 {
1593 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1594 0 : "id not allowed to be zero or negative: {id}"
1595 0 : )));
1596 0 : }
1597 :
1598 0 : let req = match maybe_forward(req).await {
1599 0 : ForwardOutcome::Forwarded(res) => {
1600 0 : return res;
1601 : }
1602 0 : ForwardOutcome::NotForwarded(req) => req,
1603 0 : };
1604 0 :
1605 0 : let state = get_state(&req);
1606 0 :
1607 0 : state.service.upsert_safekeeper(body).await?;
1608 :
1609 0 : Ok(Response::builder()
1610 0 : .status(StatusCode::NO_CONTENT)
1611 0 : .body(Body::empty())
1612 0 : .unwrap())
1613 0 : }
1614 :
1615 : /// Sets the scheduling policy of the specified safekeeper
1616 0 : async fn handle_safekeeper_scheduling_policy(
1617 0 : mut req: Request<Body>,
1618 0 : ) -> Result<Response<Body>, ApiError> {
1619 0 : check_permissions(&req, Scope::Admin)?;
1620 :
1621 0 : let body = json_request::<SafekeeperSchedulingPolicyRequest>(&mut req).await?;
1622 0 : let id = parse_request_param::<i64>(&req, "id")?;
1623 :
1624 0 : let req = match maybe_forward(req).await {
1625 0 : ForwardOutcome::Forwarded(res) => {
1626 0 : return res;
1627 : }
1628 0 : ForwardOutcome::NotForwarded(req) => req,
1629 0 : };
1630 0 :
1631 0 : let state = get_state(&req);
1632 0 :
1633 0 : state
1634 0 : .service
1635 0 : .set_safekeeper_scheduling_policy(id, body.scheduling_policy)
1636 0 : .await?;
1637 :
1638 0 : json_response(StatusCode::OK, ())
1639 0 : }
1640 :
1641 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
1642 : /// be allowed to run if Service has finished its initial reconciliation.
1643 0 : async fn tenant_service_handler<R, H>(
1644 0 : request: Request<Body>,
1645 0 : handler: H,
1646 0 : request_name: RequestName,
1647 0 : ) -> R::Output
1648 0 : where
1649 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1650 0 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
1651 0 : {
1652 0 : let state = get_state(&request);
1653 0 : let service = state.service.clone();
1654 0 :
1655 0 : let startup_complete = service.startup_complete.clone();
1656 0 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
1657 0 : .await
1658 0 : .is_err()
1659 : {
1660 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
1661 : // timeouts around its remote calls, to bound its runtime.
1662 0 : return Err(ApiError::Timeout(
1663 0 : "Timed out waiting for service readiness".into(),
1664 0 : ));
1665 0 : }
1666 0 :
1667 0 : named_request_span(
1668 0 : request,
1669 0 : |request| async move { handler(service, request).await },
1670 0 : request_name,
1671 0 : )
1672 0 : .await
1673 0 : }
1674 :
1675 : /// Check if the required scope is held in the request's token, or if the request has
1676 : /// a token with 'admin' scope then always permit it.
1677 0 : fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(), ApiError> {
1678 0 : check_permission_with(request, |claims| {
1679 0 : match crate::auth::check_permission(claims, required_scope) {
1680 0 : Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
1681 0 : Ok(()) => Ok(()),
1682 0 : Err(_) => Err(e),
1683 : },
1684 0 : Ok(()) => Ok(()),
1685 : }
1686 0 : })
1687 0 : }
1688 :
1689 : #[derive(Clone, Debug)]
1690 : struct RequestMeta {
1691 : method: hyper::http::Method,
1692 : at: Instant,
1693 : }
1694 :
1695 0 : pub fn prologue_leadership_status_check_middleware<
1696 0 : B: hyper::body::HttpBody + Send + Sync + 'static,
1697 0 : >() -> Middleware<B, ApiError> {
1698 0 : Middleware::pre(move |req| async move {
1699 0 : let state = get_state(&req);
1700 0 : let leadership_status = state.service.get_leadership_status();
1701 :
1702 : enum AllowedRoutes {
1703 : All,
1704 : Some(&'static [&'static str]),
1705 : }
1706 :
1707 0 : let allowed_routes = match leadership_status {
1708 0 : LeadershipStatus::Leader => AllowedRoutes::All,
1709 0 : LeadershipStatus::SteppedDown => AllowedRoutes::All,
1710 0 : LeadershipStatus::Candidate => AllowedRoutes::Some(&[
1711 0 : "/ready",
1712 0 : "/status",
1713 0 : "/metrics",
1714 0 : "/profile/cpu",
1715 0 : "/profile/heap",
1716 0 : ]),
1717 : };
1718 :
1719 0 : match allowed_routes {
1720 0 : AllowedRoutes::All => Ok(req),
1721 0 : AllowedRoutes::Some(allowed) if allowed.contains(&req.uri().path()) => Ok(req),
1722 : _ => {
1723 0 : tracing::info!(
1724 0 : "Request {} not allowed due to current leadership state",
1725 0 : req.uri()
1726 : );
1727 :
1728 0 : Err(ApiError::ResourceUnavailable(
1729 0 : format!("Current leadership status is {leadership_status}").into(),
1730 0 : ))
1731 : }
1732 : }
1733 0 : })
1734 0 : }
1735 :
1736 0 : fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
1737 0 : -> Middleware<B, ApiError> {
1738 0 : Middleware::pre(move |req| async move {
1739 0 : let meta = RequestMeta {
1740 0 : method: req.method().clone(),
1741 0 : at: Instant::now(),
1742 0 : };
1743 0 :
1744 0 : req.set_context(meta);
1745 0 :
1746 0 : Ok(req)
1747 0 : })
1748 0 : }
1749 :
1750 0 : fn epilogue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
1751 0 : -> Middleware<B, ApiError> {
1752 0 : Middleware::post_with_info(move |resp, req_info| async move {
1753 0 : let request_name = match req_info.context::<RequestName>() {
1754 0 : Some(name) => name,
1755 : None => {
1756 0 : return Ok(resp);
1757 : }
1758 : };
1759 :
1760 0 : if let Some(meta) = req_info.context::<RequestMeta>() {
1761 0 : let status = &crate::metrics::METRICS_REGISTRY
1762 0 : .metrics_group
1763 0 : .storage_controller_http_request_status;
1764 0 : let latency = &crate::metrics::METRICS_REGISTRY
1765 0 : .metrics_group
1766 0 : .storage_controller_http_request_latency;
1767 0 :
1768 0 : status.inc(HttpRequestStatusLabelGroup {
1769 0 : path: request_name.0,
1770 0 : method: meta.method.clone().into(),
1771 0 : status: crate::metrics::StatusCode(resp.status()),
1772 0 : });
1773 0 :
1774 0 : latency.observe(
1775 0 : HttpRequestLatencyLabelGroup {
1776 0 : path: request_name.0,
1777 0 : method: meta.method.into(),
1778 0 : },
1779 0 : meta.at.elapsed().as_secs_f64(),
1780 0 : );
1781 0 : }
1782 0 : Ok(resp)
1783 0 : })
1784 0 : }
1785 :
1786 0 : pub async fn measured_metrics_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1787 : pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
1788 :
1789 0 : let req = match maybe_forward(req).await {
1790 0 : ForwardOutcome::Forwarded(res) => {
1791 0 : return res;
1792 : }
1793 0 : ForwardOutcome::NotForwarded(req) => req,
1794 0 : };
1795 0 :
1796 0 : let state = get_state(&req);
1797 0 : let payload = crate::metrics::METRICS_REGISTRY.encode(&state.neon_metrics);
1798 0 : let response = Response::builder()
1799 0 : .status(200)
1800 0 : .header(CONTENT_TYPE, TEXT_FORMAT)
1801 0 : .body(payload.into())
1802 0 : .unwrap();
1803 0 :
1804 0 : Ok(response)
1805 0 : }
1806 :
1807 : #[derive(Clone)]
1808 : struct RequestName(&'static str);
1809 :
1810 0 : async fn named_request_span<R, H>(
1811 0 : request: Request<Body>,
1812 0 : handler: H,
1813 0 : name: RequestName,
1814 0 : ) -> R::Output
1815 0 : where
1816 0 : R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1817 0 : H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
1818 0 : {
1819 0 : request.set_context(name);
1820 0 : request_span(request, handler).await
1821 0 : }
1822 :
1823 : enum ForwardOutcome {
1824 : Forwarded(Result<Response<Body>, ApiError>),
1825 : NotForwarded(Request<Body>),
1826 : }
1827 :
1828 : /// Potentially forward the request to the current storage controler leader.
1829 : /// More specifically we forward when:
1830 : /// 1. Request is not one of:
1831 : /// ["/control/v1/step_down", "/status", "/ready", "/metrics", "/profile/cpu", "/profile/heap"]
1832 : /// 2. Current instance is in [`LeadershipStatus::SteppedDown`] state
1833 : /// 3. There is a leader in the database to forward to
1834 : /// 4. Leader from step (3) is not the current instance
1835 : ///
1836 : /// Why forward?
1837 : /// It turns out that we can't rely on external orchestration to promptly route trafic to the
1838 : /// new leader. This is downtime inducing. Forwarding provides a safe way out.
1839 : ///
1840 : /// Why is it safe?
1841 : /// If a storcon instance is persisted in the database, then we know that it is the current leader.
1842 : /// There's one exception: time between handling step-down request and the new leader updating the
1843 : /// database.
1844 : ///
1845 : /// Let's treat the happy case first. The stepped down node does not produce any side effects,
1846 : /// since all request handling happens on the leader.
1847 : ///
1848 : /// As for the edge case, we are guaranteed to always have a maximum of two running instances.
1849 : /// Hence, if we are in the edge case scenario the leader persisted in the database is the
1850 : /// stepped down instance that received the request. Condition (4) above covers this scenario.
1851 0 : async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
1852 : const NOT_FOR_FORWARD: &[&str] = &[
1853 : "/control/v1/step_down",
1854 : "/status",
1855 : "/live",
1856 : "/ready",
1857 : "/metrics",
1858 : "/profile/cpu",
1859 : "/profile/heap",
1860 : ];
1861 :
1862 0 : let uri = req.uri();
1863 0 : let uri_for_forward = !NOT_FOR_FORWARD.contains(&uri.path());
1864 0 :
1865 0 : // Fast return before trying to take any Service locks, if we will never forward anyway
1866 0 : if !uri_for_forward {
1867 0 : return ForwardOutcome::NotForwarded(req);
1868 0 : }
1869 0 :
1870 0 : let state = get_state(&req);
1871 0 : let leadership_status = state.service.get_leadership_status();
1872 0 :
1873 0 : if leadership_status != LeadershipStatus::SteppedDown {
1874 0 : return ForwardOutcome::NotForwarded(req);
1875 0 : }
1876 :
1877 0 : let leader = state.service.get_leader().await;
1878 0 : let leader = {
1879 0 : match leader {
1880 0 : Ok(Some(leader)) => leader,
1881 : Ok(None) => {
1882 0 : return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
1883 0 : "No leader to forward to while in stepped down state".into(),
1884 0 : )));
1885 : }
1886 0 : Err(err) => {
1887 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
1888 0 : anyhow::anyhow!(
1889 0 : "Failed to get leader for forwarding while in stepped down state: {err}"
1890 0 : ),
1891 0 : )));
1892 : }
1893 : }
1894 : };
1895 :
1896 0 : let cfg = state.service.get_config();
1897 0 : if let Some(ref self_addr) = cfg.address_for_peers {
1898 0 : let leader_addr = match Uri::from_str(leader.address.as_str()) {
1899 0 : Ok(uri) => uri,
1900 0 : Err(err) => {
1901 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
1902 0 : anyhow::anyhow!(
1903 0 : "Failed to parse leader uri for forwarding while in stepped down state: {err}"
1904 0 : ),
1905 0 : )));
1906 : }
1907 : };
1908 :
1909 0 : if *self_addr == leader_addr {
1910 0 : return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
1911 0 : "Leader is stepped down instance".into(),
1912 0 : )));
1913 0 : }
1914 0 : }
1915 :
1916 0 : tracing::info!("Forwarding {} to leader at {}", uri, leader.address);
1917 :
1918 : // Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
1919 : // include some leeway to get the timeout for proxied requests.
1920 : const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
1921 :
1922 0 : let client = state.service.get_http_client().clone();
1923 :
1924 0 : let request: reqwest::Request = match convert_request(
1925 0 : req,
1926 0 : &client,
1927 0 : leader.address,
1928 0 : PROXIED_REQUEST_TIMEOUT,
1929 0 : )
1930 0 : .await
1931 : {
1932 0 : Ok(r) => r,
1933 0 : Err(err) => {
1934 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1935 0 : "Failed to convert request for forwarding while in stepped down state: {err}"
1936 0 : ))));
1937 : }
1938 : };
1939 :
1940 0 : let response = match client.execute(request).await {
1941 0 : Ok(r) => r,
1942 0 : Err(err) => {
1943 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1944 0 : "Failed to forward while in stepped down state: {err}"
1945 0 : ))));
1946 : }
1947 : };
1948 :
1949 0 : ForwardOutcome::Forwarded(convert_response(response).await)
1950 0 : }
1951 :
1952 : /// Convert a [`reqwest::Response`] to a [hyper::Response`] by passing through
1953 : /// a stable representation (string, bytes or integer)
1954 : ///
1955 : /// Ideally, we would not have to do this since both types use the http crate
1956 : /// under the hood. However, they use different versions of the crate and keeping
1957 : /// second order dependencies in sync is difficult.
1958 0 : async fn convert_response(resp: reqwest::Response) -> Result<hyper::Response<Body>, ApiError> {
1959 : use std::str::FromStr;
1960 :
1961 0 : let mut builder = hyper::Response::builder().status(resp.status().as_u16());
1962 0 : for (key, value) in resp.headers().into_iter() {
1963 0 : let key = hyper::header::HeaderName::from_str(key.as_str()).map_err(|err| {
1964 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1965 0 : })?;
1966 :
1967 0 : let value = hyper::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
1968 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1969 0 : })?;
1970 :
1971 0 : builder = builder.header(key, value);
1972 : }
1973 :
1974 0 : let body = http::Body::wrap_stream(resp.bytes_stream());
1975 0 :
1976 0 : builder.body(body).map_err(|err| {
1977 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1978 0 : })
1979 0 : }
1980 :
1981 : /// Convert a [`reqwest::Request`] to a [hyper::Request`] by passing through
1982 : /// a stable representation (string, bytes or integer)
1983 : ///
1984 : /// See [`convert_response`] for why we are doing it this way.
1985 0 : async fn convert_request(
1986 0 : req: hyper::Request<Body>,
1987 0 : client: &reqwest::Client,
1988 0 : to_address: String,
1989 0 : timeout: Duration,
1990 0 : ) -> Result<reqwest::Request, ApiError> {
1991 : use std::str::FromStr;
1992 :
1993 0 : let (parts, body) = req.into_parts();
1994 0 : let method = reqwest::Method::from_str(parts.method.as_str()).map_err(|err| {
1995 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1996 0 : })?;
1997 :
1998 0 : let path_and_query = parts.uri.path_and_query().ok_or_else(|| {
1999 0 : ApiError::InternalServerError(anyhow::anyhow!(
2000 0 : "Request conversion failed: no path and query"
2001 0 : ))
2002 0 : })?;
2003 :
2004 0 : let uri = reqwest::Url::from_str(
2005 0 : format!(
2006 0 : "{}{}",
2007 0 : to_address.trim_end_matches("/"),
2008 0 : path_and_query.as_str()
2009 0 : )
2010 0 : .as_str(),
2011 0 : )
2012 0 : .map_err(|err| {
2013 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2014 0 : })?;
2015 :
2016 0 : let mut headers = reqwest::header::HeaderMap::new();
2017 0 : for (key, value) in parts.headers.into_iter() {
2018 0 : let key = match key {
2019 0 : Some(k) => k,
2020 : None => {
2021 0 : continue;
2022 : }
2023 : };
2024 :
2025 0 : let key = reqwest::header::HeaderName::from_str(key.as_str()).map_err(|err| {
2026 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2027 0 : })?;
2028 :
2029 0 : let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
2030 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2031 0 : })?;
2032 :
2033 0 : headers.insert(key, value);
2034 : }
2035 :
2036 0 : let body = hyper::body::to_bytes(body).await.map_err(|err| {
2037 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2038 0 : })?;
2039 :
2040 0 : client
2041 0 : .request(method, uri)
2042 0 : .headers(headers)
2043 0 : .body(body)
2044 0 : .timeout(timeout)
2045 0 : .build()
2046 0 : .map_err(|err| {
2047 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2048 0 : })
2049 0 : }
2050 :
2051 0 : pub fn make_router(
2052 0 : service: Arc<Service>,
2053 0 : auth: Option<Arc<SwappableJwtAuth>>,
2054 0 : build_info: BuildInfo,
2055 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
2056 0 : let mut router = endpoint::make_router()
2057 0 : .middleware(prologue_leadership_status_check_middleware())
2058 0 : .middleware(prologue_metrics_middleware())
2059 0 : .middleware(epilogue_metrics_middleware());
2060 0 : if auth.is_some() {
2061 0 : router = router.middleware(auth_middleware(|request| {
2062 0 : let state = get_state(request);
2063 0 : if state.allowlist_routes.contains(&request.uri().path()) {
2064 0 : None
2065 : } else {
2066 0 : state.auth.as_deref()
2067 : }
2068 0 : }));
2069 0 : }
2070 :
2071 0 : router
2072 0 : .data(Arc::new(HttpState::new(service, auth, build_info)))
2073 0 : // Non-prefixed generic endpoints (status, metrics, profiling)
2074 0 : .get("/metrics", |r| {
2075 0 : named_request_span(r, measured_metrics_handler, RequestName("metrics"))
2076 0 : })
2077 0 : .get("/status", |r| {
2078 0 : named_request_span(r, handle_status, RequestName("status"))
2079 0 : })
2080 0 : .get("/live", |r| {
2081 0 : named_request_span(r, handle_live, RequestName("live"))
2082 0 : })
2083 0 : .get("/ready", |r| {
2084 0 : named_request_span(r, handle_ready, RequestName("ready"))
2085 0 : })
2086 0 : .get("/profile/cpu", |r| {
2087 0 : named_request_span(r, profile_cpu_handler, RequestName("profile_cpu"))
2088 0 : })
2089 0 : .get("/profile/heap", |r| {
2090 0 : named_request_span(r, profile_heap_handler, RequestName("profile_heap"))
2091 0 : })
2092 0 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
2093 0 : .post("/upcall/v1/re-attach", |r| {
2094 0 : named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
2095 0 : })
2096 0 : .post("/upcall/v1/validate", |r| {
2097 0 : named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
2098 0 : })
2099 0 : .get("/upcall/v1/timeline_import_status", |r| {
2100 0 : named_request_span(
2101 0 : r,
2102 0 : handle_get_timeline_import_status,
2103 0 : RequestName("upcall_v1_timeline_import_status"),
2104 0 : )
2105 0 : })
2106 0 : .post("/upcall/v1/timeline_import_status", |r| {
2107 0 : named_request_span(
2108 0 : r,
2109 0 : handle_put_timeline_import_status,
2110 0 : RequestName("upcall_v1_timeline_import_status"),
2111 0 : )
2112 0 : })
2113 0 : // Test/dev/debug endpoints
2114 0 : .post("/debug/v1/attach-hook", |r| {
2115 0 : named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
2116 0 : })
2117 0 : .post("/debug/v1/inspect", |r| {
2118 0 : named_request_span(r, handle_inspect, RequestName("debug_v1_inspect"))
2119 0 : })
2120 0 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
2121 0 : named_request_span(r, handle_tenant_drop, RequestName("debug_v1_tenant_drop"))
2122 0 : })
2123 0 : .post("/debug/v1/node/:node_id/drop", |r| {
2124 0 : named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
2125 0 : })
2126 0 : .delete("/debug/v1/tombstone/:node_id", |r| {
2127 0 : named_request_span(
2128 0 : r,
2129 0 : handle_tombstone_delete,
2130 0 : RequestName("debug_v1_tombstone_delete"),
2131 0 : )
2132 0 : })
2133 0 : .get("/debug/v1/tombstone", |r| {
2134 0 : named_request_span(
2135 0 : r,
2136 0 : handle_tombstone_list,
2137 0 : RequestName("debug_v1_tombstone_list"),
2138 0 : )
2139 0 : })
2140 0 : .post("/debug/v1/tenant/:tenant_id/import", |r| {
2141 0 : named_request_span(
2142 0 : r,
2143 0 : handle_tenant_import,
2144 0 : RequestName("debug_v1_tenant_import"),
2145 0 : )
2146 0 : })
2147 0 : .get("/debug/v1/tenant", |r| {
2148 0 : named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
2149 0 : })
2150 0 : .get("/debug/v1/tenant/:tenant_id/locate", |r| {
2151 0 : tenant_service_handler(
2152 0 : r,
2153 0 : handle_tenant_locate,
2154 0 : RequestName("debug_v1_tenant_locate"),
2155 0 : )
2156 0 : })
2157 0 : .post(
2158 0 : "/debug/v1/tenant/:tenant_id/timeline/:timeline_id/import",
2159 0 : |r| {
2160 0 : named_request_span(
2161 0 : r,
2162 0 : handle_timeline_import,
2163 0 : RequestName("debug_v1_timeline_import"),
2164 0 : )
2165 0 : },
2166 0 : )
2167 0 : .get(
2168 0 : "/debug/v1/tenant/:tenant_id/timeline/:timeline_id/locate",
2169 0 : |r| {
2170 0 : tenant_service_handler(
2171 0 : r,
2172 0 : handle_tenant_timeline_locate,
2173 0 : RequestName("v1_tenant_timeline_locate"),
2174 0 : )
2175 0 : },
2176 0 : )
2177 0 : .get("/debug/v1/scheduler", |r| {
2178 0 : named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
2179 0 : })
2180 0 : .post("/debug/v1/consistency_check", |r| {
2181 0 : named_request_span(
2182 0 : r,
2183 0 : handle_consistency_check,
2184 0 : RequestName("debug_v1_consistency_check"),
2185 0 : )
2186 0 : })
2187 0 : .post("/debug/v1/reconcile_all", |r| {
2188 0 : request_span(r, handle_reconcile_all)
2189 0 : })
2190 0 : .put("/debug/v1/failpoints", |r| {
2191 0 : request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
2192 0 : })
2193 0 : // Node operations
2194 0 : .post("/control/v1/node", |r| {
2195 0 : named_request_span(r, handle_node_register, RequestName("control_v1_node"))
2196 0 : })
2197 0 : .delete("/control/v1/node/:node_id", |r| {
2198 0 : named_request_span(r, handle_node_delete, RequestName("control_v1_node_delete"))
2199 0 : })
2200 0 : .get("/control/v1/node", |r| {
2201 0 : named_request_span(r, handle_node_list, RequestName("control_v1_node"))
2202 0 : })
2203 0 : .put("/control/v1/node/:node_id/config", |r| {
2204 0 : named_request_span(
2205 0 : r,
2206 0 : handle_node_configure,
2207 0 : RequestName("control_v1_node_config"),
2208 0 : )
2209 0 : })
2210 0 : .get("/control/v1/node/:node_id", |r| {
2211 0 : named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
2212 0 : })
2213 0 : .get("/control/v1/node/:node_id/shards", |r| {
2214 0 : named_request_span(
2215 0 : r,
2216 0 : handle_node_shards,
2217 0 : RequestName("control_v1_node_describe"),
2218 0 : )
2219 0 : })
2220 0 : .get("/control/v1/leader", |r| {
2221 0 : named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
2222 0 : })
2223 0 : .put("/control/v1/node/:node_id/drain", |r| {
2224 0 : named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
2225 0 : })
2226 0 : .delete("/control/v1/node/:node_id/drain", |r| {
2227 0 : named_request_span(
2228 0 : r,
2229 0 : handle_cancel_node_drain,
2230 0 : RequestName("control_v1_cancel_node_drain"),
2231 0 : )
2232 0 : })
2233 0 : .put("/control/v1/node/:node_id/fill", |r| {
2234 0 : named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
2235 0 : })
2236 0 : .delete("/control/v1/node/:node_id/fill", |r| {
2237 0 : named_request_span(
2238 0 : r,
2239 0 : handle_cancel_node_fill,
2240 0 : RequestName("control_v1_cancel_node_fill"),
2241 0 : )
2242 0 : })
2243 0 : // Metadata health operations
2244 0 : .post("/control/v1/metadata_health/update", |r| {
2245 0 : named_request_span(
2246 0 : r,
2247 0 : handle_metadata_health_update,
2248 0 : RequestName("control_v1_metadata_health_update"),
2249 0 : )
2250 0 : })
2251 0 : .get("/control/v1/metadata_health/unhealthy", |r| {
2252 0 : named_request_span(
2253 0 : r,
2254 0 : handle_metadata_health_list_unhealthy,
2255 0 : RequestName("control_v1_metadata_health_list_unhealthy"),
2256 0 : )
2257 0 : })
2258 0 : .post("/control/v1/metadata_health/outdated", |r| {
2259 0 : named_request_span(
2260 0 : r,
2261 0 : handle_metadata_health_list_outdated,
2262 0 : RequestName("control_v1_metadata_health_list_outdated"),
2263 0 : )
2264 0 : })
2265 0 : // Safekeepers
2266 0 : .get("/control/v1/safekeeper", |r| {
2267 0 : named_request_span(
2268 0 : r,
2269 0 : handle_safekeeper_list,
2270 0 : RequestName("control_v1_safekeeper_list"),
2271 0 : )
2272 0 : })
2273 0 : .get("/control/v1/safekeeper/:id", |r| {
2274 0 : named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
2275 0 : })
2276 0 : .post("/control/v1/safekeeper/:id", |r| {
2277 0 : // id is in the body
2278 0 : named_request_span(
2279 0 : r,
2280 0 : handle_upsert_safekeeper,
2281 0 : RequestName("v1_safekeeper_post"),
2282 0 : )
2283 0 : })
2284 0 : .post("/control/v1/safekeeper/:id/scheduling_policy", |r| {
2285 0 : named_request_span(
2286 0 : r,
2287 0 : handle_safekeeper_scheduling_policy,
2288 0 : RequestName("v1_safekeeper_status"),
2289 0 : )
2290 0 : })
2291 0 : // Tenant Shard operations
2292 0 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
2293 0 : tenant_service_handler(
2294 0 : r,
2295 0 : handle_tenant_shard_migrate,
2296 0 : RequestName("control_v1_tenant_migrate"),
2297 0 : )
2298 0 : })
2299 0 : .put(
2300 0 : "/control/v1/tenant/:tenant_shard_id/migrate_secondary",
2301 0 : |r| {
2302 0 : tenant_service_handler(
2303 0 : r,
2304 0 : handle_tenant_shard_migrate_secondary,
2305 0 : RequestName("control_v1_tenant_migrate_secondary"),
2306 0 : )
2307 0 : },
2308 0 : )
2309 0 : .put(
2310 0 : "/control/v1/tenant/:tenant_shard_id/cancel_reconcile",
2311 0 : |r| {
2312 0 : tenant_service_handler(
2313 0 : r,
2314 0 : handle_tenant_shard_cancel_reconcile,
2315 0 : RequestName("control_v1_tenant_cancel_reconcile"),
2316 0 : )
2317 0 : },
2318 0 : )
2319 0 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
2320 0 : tenant_service_handler(
2321 0 : r,
2322 0 : handle_tenant_shard_split,
2323 0 : RequestName("control_v1_tenant_shard_split"),
2324 0 : )
2325 0 : })
2326 0 : .get("/control/v1/tenant/:tenant_id", |r| {
2327 0 : tenant_service_handler(
2328 0 : r,
2329 0 : handle_tenant_describe,
2330 0 : RequestName("control_v1_tenant_describe"),
2331 0 : )
2332 0 : })
2333 0 : .get("/control/v1/tenant", |r| {
2334 0 : tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list"))
2335 0 : })
2336 0 : .put("/control/v1/tenant/:tenant_id/policy", |r| {
2337 0 : named_request_span(
2338 0 : r,
2339 0 : handle_tenant_update_policy,
2340 0 : RequestName("control_v1_tenant_policy"),
2341 0 : )
2342 0 : })
2343 0 : .put("/control/v1/preferred_azs", |r| {
2344 0 : named_request_span(
2345 0 : r,
2346 0 : handle_update_preferred_azs,
2347 0 : RequestName("control_v1_preferred_azs"),
2348 0 : )
2349 0 : })
2350 0 : .put("/control/v1/step_down", |r| {
2351 0 : named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
2352 0 : })
2353 0 : // Tenant operations
2354 0 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
2355 0 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
2356 0 : .post("/v1/tenant", |r| {
2357 0 : tenant_service_handler(r, handle_tenant_create, RequestName("v1_tenant"))
2358 0 : })
2359 0 : .delete("/v1/tenant/:tenant_id", |r| {
2360 0 : tenant_service_handler(r, handle_tenant_delete, RequestName("v1_tenant"))
2361 0 : })
2362 0 : .patch("/v1/tenant/config", |r| {
2363 0 : tenant_service_handler(
2364 0 : r,
2365 0 : handle_tenant_config_patch,
2366 0 : RequestName("v1_tenant_config"),
2367 0 : )
2368 0 : })
2369 0 : .put("/v1/tenant/config", |r| {
2370 0 : tenant_service_handler(r, handle_tenant_config_set, RequestName("v1_tenant_config"))
2371 0 : })
2372 0 : .get("/v1/tenant/:tenant_id/config", |r| {
2373 0 : tenant_service_handler(r, handle_tenant_config_get, RequestName("v1_tenant_config"))
2374 0 : })
2375 0 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
2376 0 : tenant_service_handler(
2377 0 : r,
2378 0 : handle_tenant_location_config,
2379 0 : RequestName("v1_tenant_location_config"),
2380 0 : )
2381 0 : })
2382 0 : .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
2383 0 : tenant_service_handler(
2384 0 : r,
2385 0 : handle_tenant_time_travel_remote_storage,
2386 0 : RequestName("v1_tenant_time_travel_remote_storage"),
2387 0 : )
2388 0 : })
2389 0 : .post("/v1/tenant/:tenant_id/secondary/download", |r| {
2390 0 : tenant_service_handler(
2391 0 : r,
2392 0 : handle_tenant_secondary_download,
2393 0 : RequestName("v1_tenant_secondary_download"),
2394 0 : )
2395 0 : })
2396 0 : // Timeline operations
2397 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
2398 0 : tenant_service_handler(
2399 0 : r,
2400 0 : handle_tenant_timeline_delete,
2401 0 : RequestName("v1_tenant_timeline"),
2402 0 : )
2403 0 : })
2404 0 : .post("/v1/tenant/:tenant_id/timeline", |r| {
2405 0 : tenant_service_handler(
2406 0 : r,
2407 0 : handle_tenant_timeline_create,
2408 0 : RequestName("v1_tenant_timeline"),
2409 0 : )
2410 0 : })
2411 0 : .put(
2412 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/archival_config",
2413 0 : |r| {
2414 0 : tenant_service_handler(
2415 0 : r,
2416 0 : handle_tenant_timeline_archival_config,
2417 0 : RequestName("v1_tenant_timeline_archival_config"),
2418 0 : )
2419 0 : },
2420 0 : )
2421 0 : .put(
2422 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/detach_ancestor",
2423 0 : |r| {
2424 0 : tenant_service_handler(
2425 0 : r,
2426 0 : handle_tenant_timeline_detach_ancestor,
2427 0 : RequestName("v1_tenant_timeline_detach_ancestor"),
2428 0 : )
2429 0 : },
2430 0 : )
2431 0 : .post(
2432 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
2433 0 : |r| {
2434 0 : tenant_service_handler(
2435 0 : r,
2436 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
2437 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2438 0 : )
2439 0 : },
2440 0 : )
2441 0 : .post(
2442 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
2443 0 : |r| {
2444 0 : tenant_service_handler(
2445 0 : r,
2446 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
2447 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2448 0 : )
2449 0 : },
2450 0 : )
2451 0 : .post(
2452 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
2453 0 : |r| {
2454 0 : tenant_service_handler(
2455 0 : r,
2456 0 : handle_tenant_timeline_download_heatmap_layers,
2457 0 : RequestName("v1_tenant_timeline_download_heatmap_layers"),
2458 0 : )
2459 0 : },
2460 0 : )
2461 0 : // LSN lease passthrough to all shards
2462 0 : .post(
2463 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",
2464 0 : |r| {
2465 0 : tenant_service_handler(
2466 0 : r,
2467 0 : handle_tenant_timeline_lsn_lease,
2468 0 : RequestName("v1_tenant_timeline_lsn_lease"),
2469 0 : )
2470 0 : },
2471 0 : )
2472 0 : // Tenant detail GET passthrough to shard zero:
2473 0 : .get("/v1/tenant/:tenant_shard_id", |r| {
2474 0 : tenant_service_handler(
2475 0 : r,
2476 0 : handle_tenant_timeline_passthrough,
2477 0 : RequestName("v1_tenant_passthrough"),
2478 0 : )
2479 0 : })
2480 0 : // The `*` in the URL is a wildcard: any tenant/timeline GET APIs on the pageserver
2481 0 : // are implicitly exposed here. This must be last in the list to avoid
2482 0 : // taking precedence over other GET methods we might implement by hand.
2483 0 : .get("/v1/tenant/:tenant_shard_id/*", |r| {
2484 0 : tenant_service_handler(
2485 0 : r,
2486 0 : handle_tenant_timeline_passthrough,
2487 0 : RequestName("v1_tenant_passthrough"),
2488 0 : )
2489 0 : })
2490 0 : // Tenant timeline mark_invisible passthrough to shard zero
2491 0 : .put(
2492 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/mark_invisible",
2493 0 : |r| {
2494 0 : tenant_service_handler(
2495 0 : r,
2496 0 : handle_tenant_timeline_passthrough,
2497 0 : RequestName("v1_tenant_timeline_mark_invisible_passthrough"),
2498 0 : )
2499 0 : },
2500 0 : )
2501 0 : }
2502 :
2503 : #[cfg(test)]
2504 : mod test {
2505 :
2506 : use super::path_without_ids;
2507 :
2508 : #[test]
2509 1 : fn test_path_without_ids() {
2510 1 : assert_eq!(
2511 1 : path_without_ids(
2512 1 : "/v1/tenant/1a2b3344556677881122334455667788/timeline/AA223344556677881122334455667788"
2513 1 : ),
2514 1 : "/v1/tenant//timeline/"
2515 1 : );
2516 1 : assert_eq!(
2517 1 : path_without_ids(
2518 1 : "/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788"
2519 1 : ),
2520 1 : "/v1/tenant//timeline/"
2521 1 : );
2522 1 : assert_eq!(
2523 1 : path_without_ids(
2524 1 : "/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788?parameter=foo"
2525 1 : ),
2526 1 : "/v1/tenant//timeline/"
2527 1 : );
2528 1 : }
2529 : }
|