Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::{Arc, LazyLock};
3 : use std::time::{Duration, Instant};
4 :
5 : use anyhow::Context;
6 : use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
7 : use futures::Future;
8 : use http_utils::endpoint::{
9 : self, auth_middleware, check_permission_with, profile_cpu_handler, profile_heap_handler,
10 : request_span,
11 : };
12 : use http_utils::error::ApiError;
13 : use http_utils::failpoints::failpoints_handler;
14 : use http_utils::json::{json_request, json_response};
15 : use http_utils::request::{must_get_query_param, parse_query_param, parse_request_param};
16 : use http_utils::{RequestExt, RouterBuilder};
17 : use hyper::header::CONTENT_TYPE;
18 : use hyper::{Body, Request, Response, StatusCode, Uri};
19 : use metrics::{BuildInfo, NeonMetrics};
20 : use pageserver_api::controller_api::{
21 : MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
22 : MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
23 : NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest,
24 : ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
25 : TimelineImportRequest, TimelineSafekeeperMigrateRequest,
26 : };
27 : use pageserver_api::models::{
28 : DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
29 : TenantLocationConfigRequest, TenantShardSplitRequest, TenantTimeTravelRequest,
30 : TimelineArchivalConfigRequest, TimelineCreateRequest,
31 : };
32 : use pageserver_api::shard::TenantShardId;
33 : use pageserver_api::upcall_api::{
34 : PutTimelineImportStatusRequest, ReAttachRequest, TimelineImportStatusRequest, ValidateRequest,
35 : };
36 : use pageserver_client::{BlockUnblock, mgmt_api};
37 :
38 : use routerify::Middleware;
39 : use tokio_util::sync::CancellationToken;
40 : use tracing::warn;
41 : use utils::auth::{Scope, SwappableJwtAuth};
42 : use utils::id::{NodeId, TenantId, TimelineId};
43 :
44 : use crate::http;
45 : use crate::metrics::{
46 : HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, METRICS_REGISTRY,
47 : PageserverRequestLabelGroup,
48 : };
49 : use crate::persistence::SafekeeperUpsert;
50 : use crate::reconciler::ReconcileError;
51 : use crate::service::{LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service};
52 :
53 : /// State available to HTTP request handlers
54 : pub struct HttpState {
55 : service: Arc<crate::service::Service>,
56 : auth: Option<Arc<SwappableJwtAuth>>,
57 : rate_limiter: governor::DefaultKeyedRateLimiter<TenantId>,
58 : neon_metrics: NeonMetrics,
59 : allowlist_routes: &'static [&'static str],
60 : }
61 :
62 : impl HttpState {
63 0 : pub fn new(
64 0 : service: Arc<crate::service::Service>,
65 0 : auth: Option<Arc<SwappableJwtAuth>>,
66 0 : build_info: BuildInfo,
67 0 : ) -> Self {
68 0 : let quota = governor::Quota::per_second(service.get_config().tenant_rate_limit);
69 0 : Self {
70 0 : service,
71 0 : auth,
72 0 : rate_limiter: governor::RateLimiter::keyed(quota),
73 0 : neon_metrics: NeonMetrics::new(build_info),
74 0 : allowlist_routes: &[
75 0 : "/status",
76 0 : "/live",
77 0 : "/ready",
78 0 : "/metrics",
79 0 : "/profile/cpu",
80 0 : "/profile/heap",
81 0 : ],
82 0 : }
83 0 : }
84 : }
85 :
86 : #[inline(always)]
87 0 : fn get_state(request: &Request<Body>) -> &HttpState {
88 0 : request
89 0 : .data::<Arc<HttpState>>()
90 0 : .expect("unknown state type")
91 0 : .as_ref()
92 0 : }
93 :
94 : /// Rate limits tenant requests.
95 : ///
96 : /// TODO: this should be a request middleware, but requires us to extract the tenant ID from
97 : /// different URLs in a systematic way.
98 : ///
99 : /// TODO: consider returning a 429 response if these start piling up.
100 0 : async fn maybe_rate_limit(request: &Request<Body>, tenant_id: TenantId) {
101 : // Check if the tenant should be rate-limited.
102 0 : let rate_limiter = &get_state(request).rate_limiter;
103 0 : if rate_limiter.check_key(&tenant_id).is_ok() {
104 0 : return;
105 0 : }
106 :
107 : // Measure the rate limiting delay.
108 0 : let _timer = METRICS_REGISTRY
109 0 : .metrics_group
110 0 : .storage_controller_http_request_rate_limited
111 0 : .start_timer();
112 :
113 : // Log rate limited tenants once every 10 seconds.
114 : static LOG_RATE_LIMITER: LazyLock<governor::DefaultKeyedRateLimiter<TenantId>> =
115 0 : LazyLock::new(|| {
116 0 : let quota = governor::Quota::with_period(Duration::from_secs(10)).unwrap();
117 0 : governor::RateLimiter::keyed(quota)
118 0 : });
119 :
120 0 : if LOG_RATE_LIMITER.check_key(&tenant_id).is_ok() {
121 0 : warn!("tenant {tenant_id} is rate limited")
122 0 : }
123 :
124 : // Wait for quota.
125 0 : rate_limiter.until_key_ready(&tenant_id).await;
126 0 : }
127 :
128 : /// Pageserver calls into this on startup, to learn which tenants it should attach
129 0 : async fn handle_re_attach(req: Request<Body>) -> Result<Response<Body>, ApiError> {
130 0 : check_permissions(&req, Scope::GenerationsApi)?;
131 :
132 0 : let mut req = match maybe_forward(req).await {
133 0 : ForwardOutcome::Forwarded(res) => {
134 0 : return res;
135 : }
136 0 : ForwardOutcome::NotForwarded(req) => req,
137 : };
138 :
139 0 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
140 0 : let state = get_state(&req);
141 0 : json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
142 0 : }
143 :
144 : /// Pageserver calls into this before doing deletions, to confirm that it still
145 : /// holds the latest generation for the tenants with deletions enqueued
146 0 : async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError> {
147 0 : check_permissions(&req, Scope::GenerationsApi)?;
148 :
149 0 : let mut req = match maybe_forward(req).await {
150 0 : ForwardOutcome::Forwarded(res) => {
151 0 : return res;
152 : }
153 0 : ForwardOutcome::NotForwarded(req) => req,
154 : };
155 :
156 0 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
157 0 : let state = get_state(&req);
158 0 : json_response(StatusCode::OK, state.service.validate(validate_req).await?)
159 0 : }
160 :
161 0 : async fn handle_get_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
162 0 : check_permissions(&req, Scope::GenerationsApi)?;
163 :
164 0 : let mut req = match maybe_forward(req).await {
165 0 : ForwardOutcome::Forwarded(res) => {
166 0 : return res;
167 : }
168 0 : ForwardOutcome::NotForwarded(req) => req,
169 : };
170 :
171 0 : let get_req = json_request::<TimelineImportStatusRequest>(&mut req).await?;
172 :
173 0 : let state = get_state(&req);
174 :
175 0 : json_response(
176 : StatusCode::OK,
177 0 : state
178 0 : .service
179 0 : .handle_timeline_shard_import_progress(get_req)
180 0 : .await?,
181 : )
182 0 : }
183 :
184 0 : async fn handle_put_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
185 0 : check_permissions(&req, Scope::GenerationsApi)?;
186 :
187 0 : let mut req = match maybe_forward(req).await {
188 0 : ForwardOutcome::Forwarded(res) => {
189 0 : return res;
190 : }
191 0 : ForwardOutcome::NotForwarded(req) => req,
192 : };
193 :
194 0 : let put_req = json_request::<PutTimelineImportStatusRequest>(&mut req).await?;
195 :
196 0 : let state = get_state(&req);
197 0 : json_response(
198 : StatusCode::OK,
199 0 : state
200 0 : .service
201 0 : .handle_timeline_shard_import_progress_upcall(put_req)
202 0 : .await?,
203 : )
204 0 : }
205 :
206 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
207 : /// (in the real control plane this is unnecessary, because the same program is managing
208 : /// generation numbers and doing attachments).
209 0 : async fn handle_attach_hook(req: Request<Body>) -> Result<Response<Body>, ApiError> {
210 0 : check_permissions(&req, Scope::Admin)?;
211 :
212 0 : let mut req = match maybe_forward(req).await {
213 0 : ForwardOutcome::Forwarded(res) => {
214 0 : return res;
215 : }
216 0 : ForwardOutcome::NotForwarded(req) => req,
217 : };
218 :
219 0 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
220 0 : let state = get_state(&req);
221 :
222 0 : json_response(
223 : StatusCode::OK,
224 0 : state
225 0 : .service
226 0 : .attach_hook(attach_req)
227 0 : .await
228 0 : .map_err(ApiError::InternalServerError)?,
229 : )
230 0 : }
231 :
232 0 : async fn handle_inspect(req: Request<Body>) -> Result<Response<Body>, ApiError> {
233 0 : check_permissions(&req, Scope::Admin)?;
234 :
235 0 : let mut req = match maybe_forward(req).await {
236 0 : ForwardOutcome::Forwarded(res) => {
237 0 : return res;
238 : }
239 0 : ForwardOutcome::NotForwarded(req) => req,
240 : };
241 :
242 0 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
243 :
244 0 : let state = get_state(&req);
245 :
246 0 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
247 0 : }
248 :
249 0 : async fn handle_tenant_create(
250 0 : service: Arc<Service>,
251 0 : req: Request<Body>,
252 0 : ) -> Result<Response<Body>, ApiError> {
253 0 : check_permissions(&req, Scope::PageServerApi)?;
254 :
255 0 : let mut req = match maybe_forward(req).await {
256 0 : ForwardOutcome::Forwarded(res) => {
257 0 : return res;
258 : }
259 0 : ForwardOutcome::NotForwarded(req) => req,
260 : };
261 :
262 0 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
263 :
264 0 : json_response(
265 : StatusCode::CREATED,
266 0 : service.tenant_create(create_req).await?,
267 : )
268 0 : }
269 :
270 0 : async fn handle_tenant_location_config(
271 0 : service: Arc<Service>,
272 0 : req: Request<Body>,
273 0 : ) -> Result<Response<Body>, ApiError> {
274 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
275 0 : check_permissions(&req, Scope::PageServerApi)?;
276 :
277 0 : let mut req = match maybe_forward(req).await {
278 0 : ForwardOutcome::Forwarded(res) => {
279 0 : return res;
280 : }
281 0 : ForwardOutcome::NotForwarded(req) => req,
282 : };
283 :
284 0 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
285 0 : json_response(
286 : StatusCode::OK,
287 0 : service
288 0 : .tenant_location_config(tenant_shard_id, config_req)
289 0 : .await?,
290 : )
291 0 : }
292 :
293 0 : async fn handle_tenant_config_patch(
294 0 : service: Arc<Service>,
295 0 : req: Request<Body>,
296 0 : ) -> Result<Response<Body>, ApiError> {
297 0 : check_permissions(&req, Scope::PageServerApi)?;
298 :
299 0 : let mut req = match maybe_forward(req).await {
300 0 : ForwardOutcome::Forwarded(res) => {
301 0 : return res;
302 : }
303 0 : ForwardOutcome::NotForwarded(req) => req,
304 : };
305 :
306 0 : let config_req = json_request::<TenantConfigPatchRequest>(&mut req).await?;
307 :
308 0 : json_response(
309 : StatusCode::OK,
310 0 : service.tenant_config_patch(config_req).await?,
311 : )
312 0 : }
313 :
314 0 : async fn handle_tenant_config_set(
315 0 : service: Arc<Service>,
316 0 : req: Request<Body>,
317 0 : ) -> Result<Response<Body>, ApiError> {
318 0 : check_permissions(&req, Scope::PageServerApi)?;
319 :
320 0 : let mut req = match maybe_forward(req).await {
321 0 : ForwardOutcome::Forwarded(res) => {
322 0 : return res;
323 : }
324 0 : ForwardOutcome::NotForwarded(req) => req,
325 : };
326 :
327 0 : let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
328 :
329 0 : json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
330 0 : }
331 :
332 0 : async fn handle_tenant_config_get(
333 0 : service: Arc<Service>,
334 0 : req: Request<Body>,
335 0 : ) -> Result<Response<Body>, ApiError> {
336 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
337 0 : check_permissions(&req, Scope::PageServerApi)?;
338 0 : maybe_rate_limit(&req, tenant_id).await;
339 :
340 0 : match maybe_forward(req).await {
341 0 : ForwardOutcome::Forwarded(res) => {
342 0 : return res;
343 : }
344 0 : ForwardOutcome::NotForwarded(_req) => {}
345 : };
346 :
347 0 : json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
348 0 : }
349 :
350 0 : async fn handle_tenant_time_travel_remote_storage(
351 0 : service: Arc<Service>,
352 0 : req: Request<Body>,
353 0 : ) -> Result<Response<Body>, ApiError> {
354 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
355 0 : check_permissions(&req, Scope::PageServerApi)?;
356 0 : maybe_rate_limit(&req, tenant_id).await;
357 :
358 0 : let mut req = match maybe_forward(req).await {
359 0 : ForwardOutcome::Forwarded(res) => {
360 0 : return res;
361 : }
362 0 : ForwardOutcome::NotForwarded(req) => req,
363 : };
364 :
365 0 : let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
366 :
367 0 : let timestamp_raw = must_get_query_param(&req, "travel_to")?;
368 0 : let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
369 0 : ApiError::BadRequest(anyhow::anyhow!(
370 0 : "Invalid time for travel_to: {timestamp_raw:?}"
371 0 : ))
372 0 : })?;
373 :
374 0 : let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
375 0 : let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
376 0 : ApiError::BadRequest(anyhow::anyhow!(
377 0 : "Invalid time for done_if_after: {done_if_after_raw:?}"
378 0 : ))
379 0 : })?;
380 :
381 0 : service
382 0 : .tenant_time_travel_remote_storage(
383 0 : &time_travel_req,
384 0 : tenant_id,
385 0 : timestamp_raw,
386 0 : done_if_after_raw,
387 0 : )
388 0 : .await?;
389 0 : json_response(StatusCode::OK, ())
390 0 : }
391 :
392 0 : fn map_reqwest_hyper_status(status: reqwest::StatusCode) -> Result<hyper::StatusCode, ApiError> {
393 0 : hyper::StatusCode::from_u16(status.as_u16())
394 0 : .context("invalid status code")
395 0 : .map_err(ApiError::InternalServerError)
396 0 : }
397 :
398 0 : async fn handle_tenant_secondary_download(
399 0 : service: Arc<Service>,
400 0 : req: Request<Body>,
401 0 : ) -> Result<Response<Body>, ApiError> {
402 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
403 0 : let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
404 0 : maybe_rate_limit(&req, tenant_id).await;
405 :
406 0 : match maybe_forward(req).await {
407 0 : ForwardOutcome::Forwarded(res) => {
408 0 : return res;
409 : }
410 0 : ForwardOutcome::NotForwarded(_req) => {}
411 : };
412 :
413 0 : let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
414 0 : json_response(map_reqwest_hyper_status(status)?, progress)
415 0 : }
416 :
417 0 : async fn handle_tenant_delete(
418 0 : service: Arc<Service>,
419 0 : req: Request<Body>,
420 0 : ) -> Result<Response<Body>, ApiError> {
421 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
422 0 : check_permissions(&req, Scope::PageServerApi)?;
423 0 : maybe_rate_limit(&req, tenant_id).await;
424 :
425 0 : match maybe_forward(req).await {
426 0 : ForwardOutcome::Forwarded(res) => {
427 0 : return res;
428 : }
429 0 : ForwardOutcome::NotForwarded(_req) => {}
430 : };
431 :
432 0 : let status_code = service
433 0 : .tenant_delete(tenant_id)
434 0 : .await
435 0 : .and_then(map_reqwest_hyper_status)?;
436 :
437 0 : if status_code == StatusCode::NOT_FOUND {
438 : // The pageserver uses 404 for successful deletion, but we use 200
439 0 : json_response(StatusCode::OK, ())
440 : } else {
441 0 : json_response(status_code, ())
442 : }
443 0 : }
444 :
445 0 : async fn handle_tenant_timeline_create(
446 0 : service: Arc<Service>,
447 0 : req: Request<Body>,
448 0 : ) -> Result<Response<Body>, ApiError> {
449 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
450 0 : check_permissions(&req, Scope::PageServerApi)?;
451 0 : maybe_rate_limit(&req, tenant_id).await;
452 :
453 0 : let mut req = match maybe_forward(req).await {
454 0 : ForwardOutcome::Forwarded(res) => {
455 0 : return res;
456 : }
457 0 : ForwardOutcome::NotForwarded(req) => req,
458 : };
459 :
460 0 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
461 0 : json_response(
462 : StatusCode::CREATED,
463 0 : service
464 0 : .tenant_timeline_create(tenant_id, create_req)
465 0 : .await?,
466 : )
467 0 : }
468 :
469 0 : async fn handle_tenant_timeline_delete(
470 0 : service: Arc<Service>,
471 0 : req: Request<Body>,
472 0 : ) -> Result<Response<Body>, ApiError> {
473 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
474 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
475 :
476 0 : check_permissions(&req, Scope::PageServerApi)?;
477 0 : maybe_rate_limit(&req, tenant_id).await;
478 :
479 0 : match maybe_forward(req).await {
480 0 : ForwardOutcome::Forwarded(res) => {
481 0 : return res;
482 : }
483 0 : ForwardOutcome::NotForwarded(_req) => {}
484 : };
485 :
486 0 : service
487 0 : .maybe_delete_timeline_import(tenant_id, timeline_id)
488 0 : .await?;
489 :
490 : // For timeline deletions, which both implement an "initially return 202, then 404 once
491 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
492 0 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
493 0 : where
494 0 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
495 0 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
496 0 : {
497 : // On subsequent retries, wait longer.
498 : // Enable callers with a 25 second request timeout to reliably get a response
499 : const MAX_WAIT: Duration = Duration::from_secs(25);
500 : const MAX_RETRY_PERIOD: Duration = Duration::from_secs(5);
501 :
502 0 : let started_at = Instant::now();
503 :
504 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
505 : // completed.
506 0 : let mut retry_period = Duration::from_secs(1);
507 :
508 : loop {
509 0 : let status = f(service.clone()).await?;
510 0 : match status {
511 : StatusCode::ACCEPTED => {
512 0 : tracing::info!("Deletion accepted, waiting to try again...");
513 0 : tokio::time::sleep(retry_period).await;
514 0 : retry_period = MAX_RETRY_PERIOD;
515 : }
516 : StatusCode::CONFLICT => {
517 0 : tracing::info!("Deletion already in progress, waiting to try again...");
518 0 : tokio::time::sleep(retry_period).await;
519 : }
520 : StatusCode::NOT_FOUND => {
521 0 : tracing::info!("Deletion complete");
522 0 : return json_response(StatusCode::OK, ());
523 : }
524 : _ => {
525 0 : tracing::warn!("Unexpected status {status}");
526 0 : return json_response(status, ());
527 : }
528 : }
529 :
530 0 : let now = Instant::now();
531 0 : if now + retry_period > started_at + MAX_WAIT {
532 0 : tracing::info!("Deletion timed out waiting for 404");
533 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
534 : // the pageserver's swagger definition for this endpoint, and has the same desired
535 : // effect of causing the control plane to retry later.
536 0 : return json_response(StatusCode::CONFLICT, ());
537 0 : }
538 : }
539 0 : }
540 :
541 0 : deletion_wrapper(service, move |service| async move {
542 0 : service
543 0 : .tenant_timeline_delete(tenant_id, timeline_id)
544 0 : .await
545 0 : .and_then(map_reqwest_hyper_status)
546 0 : })
547 0 : .await
548 0 : }
549 :
550 0 : async fn handle_tenant_timeline_archival_config(
551 0 : service: Arc<Service>,
552 0 : req: Request<Body>,
553 0 : ) -> Result<Response<Body>, ApiError> {
554 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
555 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
556 :
557 0 : check_permissions(&req, Scope::PageServerApi)?;
558 0 : maybe_rate_limit(&req, tenant_id).await;
559 :
560 0 : let mut req = match maybe_forward(req).await {
561 0 : ForwardOutcome::Forwarded(res) => {
562 0 : return res;
563 : }
564 0 : ForwardOutcome::NotForwarded(req) => req,
565 : };
566 :
567 0 : let create_req = json_request::<TimelineArchivalConfigRequest>(&mut req).await?;
568 :
569 0 : service
570 0 : .tenant_timeline_archival_config(tenant_id, timeline_id, create_req)
571 0 : .await?;
572 :
573 0 : json_response(StatusCode::OK, ())
574 0 : }
575 :
576 0 : async fn handle_tenant_timeline_detach_ancestor(
577 0 : service: Arc<Service>,
578 0 : req: Request<Body>,
579 0 : ) -> Result<Response<Body>, ApiError> {
580 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
581 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
582 0 : let behavior: Option<DetachBehavior> = parse_query_param(&req, "detach_behavior")?;
583 :
584 0 : check_permissions(&req, Scope::PageServerApi)?;
585 0 : maybe_rate_limit(&req, tenant_id).await;
586 :
587 0 : match maybe_forward(req).await {
588 0 : ForwardOutcome::Forwarded(res) => {
589 0 : return res;
590 : }
591 0 : ForwardOutcome::NotForwarded(_req) => {}
592 : };
593 :
594 0 : let res = service
595 0 : .tenant_timeline_detach_ancestor(tenant_id, timeline_id, behavior)
596 0 : .await?;
597 :
598 0 : json_response(StatusCode::OK, res)
599 0 : }
600 :
601 0 : async fn handle_tenant_timeline_block_unblock_gc(
602 0 : service: Arc<Service>,
603 0 : req: Request<Body>,
604 0 : dir: BlockUnblock,
605 0 : ) -> Result<Response<Body>, ApiError> {
606 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
607 0 : check_permissions(&req, Scope::PageServerApi)?;
608 0 : maybe_rate_limit(&req, tenant_id).await;
609 :
610 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
611 :
612 0 : service
613 0 : .tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
614 0 : .await?;
615 :
616 0 : json_response(StatusCode::OK, ())
617 0 : }
618 :
619 0 : async fn handle_tenant_timeline_download_heatmap_layers(
620 0 : service: Arc<Service>,
621 0 : req: Request<Body>,
622 0 : ) -> Result<Response<Body>, ApiError> {
623 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
624 :
625 0 : check_permissions(&req, Scope::PageServerApi)?;
626 0 : maybe_rate_limit(&req, tenant_shard_id.tenant_id).await;
627 :
628 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
629 0 : let concurrency: Option<usize> = parse_query_param(&req, "concurrency")?;
630 0 : let recurse = parse_query_param(&req, "recurse")?.unwrap_or(false);
631 :
632 0 : service
633 0 : .tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency, recurse)
634 0 : .await?;
635 :
636 0 : json_response(StatusCode::OK, ())
637 0 : }
638 :
639 0 : async fn handle_tenant_timeline_safekeeper_migrate(
640 0 : service: Arc<Service>,
641 0 : req: Request<Body>,
642 0 : ) -> Result<Response<Body>, ApiError> {
643 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
644 0 : check_permissions(&req, Scope::PageServerApi)?;
645 0 : maybe_rate_limit(&req, tenant_id).await;
646 :
647 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
648 :
649 0 : let mut req = match maybe_forward(req).await {
650 0 : ForwardOutcome::Forwarded(res) => {
651 0 : return res;
652 : }
653 0 : ForwardOutcome::NotForwarded(req) => req,
654 : };
655 :
656 0 : let migrate_req = json_request::<TimelineSafekeeperMigrateRequest>(&mut req).await?;
657 :
658 0 : service
659 0 : .tenant_timeline_safekeeper_migrate(tenant_id, timeline_id, migrate_req)
660 0 : .await?;
661 :
662 0 : json_response(StatusCode::OK, ())
663 0 : }
664 :
665 0 : async fn handle_tenant_timeline_lsn_lease(
666 0 : service: Arc<Service>,
667 0 : req: Request<Body>,
668 0 : ) -> Result<Response<Body>, ApiError> {
669 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
670 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
671 :
672 0 : check_permissions(&req, Scope::PageServerApi)?;
673 0 : maybe_rate_limit(&req, tenant_id).await;
674 :
675 0 : let mut req = match maybe_forward(req).await {
676 0 : ForwardOutcome::Forwarded(res) => {
677 0 : return res;
678 : }
679 0 : ForwardOutcome::NotForwarded(req) => req,
680 : };
681 :
682 0 : let lsn_lease_request = json_request::<LsnLeaseRequest>(&mut req).await?;
683 :
684 0 : service
685 0 : .tenant_timeline_lsn_lease(tenant_id, timeline_id, lsn_lease_request.lsn)
686 0 : .await?;
687 :
688 0 : json_response(StatusCode::OK, ())
689 0 : }
690 :
691 : // For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
692 : // and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
693 : // compare to, so we can just filter out our well known ID format with regexes.
694 3 : fn path_without_ids(path: &str) -> String {
695 : static ID_REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
696 3 : ID_REGEX
697 3 : .get_or_init(|| regex::Regex::new(r"([0-9a-fA-F]{32}(-[0-9]{4})?|\?.*)").unwrap())
698 3 : .replace_all(path, "")
699 3 : .to_string()
700 3 : }
701 :
702 0 : async fn handle_tenant_timeline_passthrough(
703 0 : service: Arc<Service>,
704 0 : req: Request<Body>,
705 0 : ) -> Result<Response<Body>, ApiError> {
706 0 : let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_id")?;
707 0 : check_permissions(&req, Scope::PageServerApi)?;
708 0 : maybe_rate_limit(&req, tenant_or_shard_id.tenant_id).await;
709 :
710 0 : let req = match maybe_forward(req).await {
711 0 : ForwardOutcome::Forwarded(res) => {
712 0 : return res;
713 : }
714 0 : ForwardOutcome::NotForwarded(req) => req,
715 : };
716 :
717 0 : let Some(path) = req.uri().path_and_query() else {
718 : // This should never happen, our request router only calls us if there is a path
719 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
720 : };
721 :
722 0 : let method = match *req.method() {
723 0 : hyper::Method::GET => reqwest::Method::GET,
724 0 : hyper::Method::POST => reqwest::Method::POST,
725 0 : hyper::Method::PUT => reqwest::Method::PUT,
726 0 : hyper::Method::DELETE => reqwest::Method::DELETE,
727 0 : hyper::Method::PATCH => reqwest::Method::PATCH,
728 0 : _ => return Err(ApiError::BadRequest(anyhow::anyhow!("Unsupported method"))),
729 : };
730 :
731 0 : tracing::info!(
732 0 : "Proxying request for tenant {} ({})",
733 : tenant_or_shard_id.tenant_id,
734 : path
735 : );
736 :
737 : // Find the node that holds shard zero
738 0 : let (node, tenant_shard_id) = if tenant_or_shard_id.is_unsharded() {
739 0 : service
740 0 : .tenant_shard0_node(tenant_or_shard_id.tenant_id)
741 0 : .await?
742 : } else {
743 : (
744 0 : service.tenant_shard_node(tenant_or_shard_id).await?,
745 0 : tenant_or_shard_id,
746 : )
747 : };
748 :
749 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
750 : // rewrite this to a shard-aware shard zero ID.
751 0 : let path = format!("{path}");
752 0 : let tenant_str = tenant_or_shard_id.tenant_id.to_string();
753 0 : let tenant_shard_str = format!("{tenant_shard_id}");
754 0 : let path = path.replace(&tenant_str, &tenant_shard_str);
755 :
756 0 : let latency = &METRICS_REGISTRY
757 0 : .metrics_group
758 0 : .storage_controller_passthrough_request_latency;
759 :
760 0 : let path_label = path_without_ids(&path)
761 0 : .split('/')
762 0 : .filter(|token| !token.is_empty())
763 0 : .collect::<Vec<_>>()
764 0 : .join("_");
765 0 : let labels = PageserverRequestLabelGroup {
766 0 : pageserver_id: &node.get_id().to_string(),
767 0 : path: &path_label,
768 0 : method: crate::metrics::Method::Get,
769 0 : };
770 :
771 0 : let _timer = latency.start_timer(labels.clone());
772 :
773 0 : let client = mgmt_api::Client::new(
774 0 : service.get_http_client().clone(),
775 0 : node.base_url(),
776 0 : service.get_config().pageserver_jwt_token.as_deref(),
777 : );
778 0 : let resp = client.op_raw(method, path).await.map_err(|e|
779 : // We return 503 here because if we can't successfully send a request to the pageserver,
780 : // either we aren't available or the pageserver is unavailable.
781 0 : ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
782 :
783 0 : if !resp.status().is_success() {
784 0 : let error_counter = &METRICS_REGISTRY
785 0 : .metrics_group
786 0 : .storage_controller_passthrough_request_error;
787 0 : error_counter.inc(labels);
788 0 : }
789 :
790 : // Transform 404 into 503 if we raced with a migration
791 0 : if resp.status() == reqwest::StatusCode::NOT_FOUND {
792 : // Look up node again: if we migrated it will be different
793 0 : let new_node = service.tenant_shard_node(tenant_shard_id).await?;
794 0 : if new_node.get_id() != node.get_id() {
795 : // Rather than retry here, send the client a 503 to prompt a retry: this matches
796 : // the pageserver's use of 503, and all clients calling this API should retry on 503.
797 0 : return Err(ApiError::ResourceUnavailable(
798 0 : format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
799 0 : ));
800 0 : }
801 0 : }
802 :
803 : // We have a reqest::Response, would like a http::Response
804 0 : let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
805 0 : for (k, v) in resp.headers() {
806 0 : builder = builder.header(k.as_str(), v.as_bytes());
807 0 : }
808 :
809 0 : let response = builder
810 0 : .body(Body::wrap_stream(resp.bytes_stream()))
811 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
812 :
813 0 : Ok(response)
814 0 : }
815 :
816 0 : async fn handle_tenant_locate(
817 0 : service: Arc<Service>,
818 0 : req: Request<Body>,
819 0 : ) -> Result<Response<Body>, ApiError> {
820 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
821 :
822 0 : check_permissions(&req, Scope::Admin)?;
823 : // NB: don't rate limit: admin operation.
824 :
825 0 : match maybe_forward(req).await {
826 0 : ForwardOutcome::Forwarded(res) => {
827 0 : return res;
828 : }
829 0 : ForwardOutcome::NotForwarded(_req) => {}
830 : };
831 :
832 0 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
833 0 : }
834 :
835 0 : async fn handle_tenant_describe(
836 0 : service: Arc<Service>,
837 0 : req: Request<Body>,
838 0 : ) -> Result<Response<Body>, ApiError> {
839 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
840 0 : check_permissions(&req, Scope::Scrubber)?;
841 : // NB: don't rate limit: scrubber operation.
842 :
843 0 : match maybe_forward(req).await {
844 0 : ForwardOutcome::Forwarded(res) => {
845 0 : return res;
846 : }
847 0 : ForwardOutcome::NotForwarded(_req) => {}
848 : };
849 :
850 0 : json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
851 0 : }
852 :
853 : /* BEGIN_HADRON */
854 0 : async fn handle_tenant_timeline_describe(
855 0 : service: Arc<Service>,
856 0 : req: Request<Body>,
857 0 : ) -> Result<Response<Body>, ApiError> {
858 0 : check_permissions(&req, Scope::Scrubber)?;
859 :
860 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
861 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
862 0 : match maybe_forward(req).await {
863 0 : ForwardOutcome::Forwarded(res) => {
864 0 : return res;
865 : }
866 0 : ForwardOutcome::NotForwarded(_req) => {}
867 : };
868 :
869 0 : json_response(
870 : StatusCode::OK,
871 0 : service
872 0 : .tenant_timeline_describe(tenant_id, timeline_id)
873 0 : .await?,
874 : )
875 0 : }
876 : /* END_HADRON */
877 :
878 0 : async fn handle_tenant_list(
879 0 : service: Arc<Service>,
880 0 : req: Request<Body>,
881 0 : ) -> Result<Response<Body>, ApiError> {
882 0 : check_permissions(&req, Scope::Admin)?;
883 :
884 0 : let limit: Option<usize> = parse_query_param(&req, "limit")?;
885 0 : let start_after: Option<TenantId> = parse_query_param(&req, "start_after")?;
886 0 : tracing::info!("start_after: {:?}", start_after);
887 :
888 0 : match maybe_forward(req).await {
889 0 : ForwardOutcome::Forwarded(res) => {
890 0 : return res;
891 : }
892 0 : ForwardOutcome::NotForwarded(_req) => {}
893 : };
894 :
895 0 : json_response(StatusCode::OK, service.tenant_list(limit, start_after))
896 0 : }
897 :
898 0 : async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiError> {
899 0 : check_permissions(&req, Scope::Infra)?;
900 :
901 0 : let mut req = match maybe_forward(req).await {
902 0 : ForwardOutcome::Forwarded(res) => {
903 0 : return res;
904 : }
905 0 : ForwardOutcome::NotForwarded(req) => req,
906 : };
907 :
908 0 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
909 0 : let state = get_state(&req);
910 0 : state.service.node_register(register_req).await?;
911 0 : json_response(StatusCode::OK, ())
912 0 : }
913 :
914 0 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
915 0 : check_permissions(&req, Scope::Infra)?;
916 :
917 0 : let req = match maybe_forward(req).await {
918 0 : ForwardOutcome::Forwarded(res) => {
919 0 : return res;
920 : }
921 0 : ForwardOutcome::NotForwarded(req) => req,
922 : };
923 :
924 0 : let state = get_state(&req);
925 0 : let mut nodes = state.service.node_list().await?;
926 0 : nodes.sort_by_key(|n| n.get_id());
927 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
928 :
929 0 : json_response(StatusCode::OK, api_nodes)
930 0 : }
931 :
932 0 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
933 0 : check_permissions(&req, Scope::Admin)?;
934 :
935 0 : let req = match maybe_forward(req).await {
936 0 : ForwardOutcome::Forwarded(res) => {
937 0 : return res;
938 : }
939 0 : ForwardOutcome::NotForwarded(req) => req,
940 : };
941 :
942 0 : let state = get_state(&req);
943 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
944 0 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
945 0 : }
946 :
947 0 : async fn handle_node_delete_old(req: Request<Body>) -> Result<Response<Body>, ApiError> {
948 0 : check_permissions(&req, Scope::Admin)?;
949 :
950 0 : let req = match maybe_forward(req).await {
951 0 : ForwardOutcome::Forwarded(res) => {
952 0 : return res;
953 : }
954 0 : ForwardOutcome::NotForwarded(req) => req,
955 : };
956 :
957 0 : let state = get_state(&req);
958 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
959 0 : json_response(
960 : StatusCode::OK,
961 0 : state.service.node_delete_old(node_id).await?,
962 : )
963 0 : }
964 :
965 0 : async fn handle_tombstone_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
966 0 : check_permissions(&req, Scope::Admin)?;
967 :
968 0 : let req = match maybe_forward(req).await {
969 0 : ForwardOutcome::Forwarded(res) => {
970 0 : return res;
971 : }
972 0 : ForwardOutcome::NotForwarded(req) => req,
973 : };
974 :
975 0 : let state = get_state(&req);
976 0 : let mut nodes = state.service.tombstone_list().await?;
977 0 : nodes.sort_by_key(|n| n.get_id());
978 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
979 :
980 0 : json_response(StatusCode::OK, api_nodes)
981 0 : }
982 :
983 0 : async fn handle_tombstone_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
984 0 : check_permissions(&req, Scope::Admin)?;
985 :
986 0 : let req = match maybe_forward(req).await {
987 0 : ForwardOutcome::Forwarded(res) => {
988 0 : return res;
989 : }
990 0 : ForwardOutcome::NotForwarded(req) => req,
991 : };
992 :
993 0 : let state = get_state(&req);
994 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
995 0 : json_response(
996 : StatusCode::OK,
997 0 : state.service.tombstone_delete(node_id).await?,
998 : )
999 0 : }
1000 :
1001 0 : async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1002 0 : check_permissions(&req, Scope::Admin)?;
1003 :
1004 0 : let mut req = match maybe_forward(req).await {
1005 0 : ForwardOutcome::Forwarded(res) => {
1006 0 : return res;
1007 : }
1008 0 : ForwardOutcome::NotForwarded(req) => req,
1009 : };
1010 :
1011 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1012 0 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
1013 0 : if node_id != config_req.node_id {
1014 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1015 0 : "Path and body node_id differ"
1016 0 : )));
1017 0 : }
1018 0 : let state = get_state(&req);
1019 :
1020 0 : json_response(
1021 : StatusCode::OK,
1022 0 : state
1023 0 : .service
1024 0 : .external_node_configure(
1025 0 : config_req.node_id,
1026 0 : config_req.availability.map(NodeAvailability::from),
1027 0 : config_req.scheduling,
1028 0 : )
1029 0 : .await?,
1030 : )
1031 0 : }
1032 :
1033 0 : async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1034 0 : check_permissions(&req, Scope::Infra)?;
1035 :
1036 0 : let req = match maybe_forward(req).await {
1037 0 : ForwardOutcome::Forwarded(res) => {
1038 0 : return res;
1039 : }
1040 0 : ForwardOutcome::NotForwarded(req) => req,
1041 : };
1042 :
1043 0 : let state = get_state(&req);
1044 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1045 :
1046 0 : let node_status = state.service.get_node(node_id).await?.describe();
1047 :
1048 0 : json_response(StatusCode::OK, node_status)
1049 0 : }
1050 :
1051 0 : async fn handle_node_shards(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1052 0 : check_permissions(&req, Scope::Admin)?;
1053 :
1054 0 : let state = get_state(&req);
1055 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1056 :
1057 0 : let node_status = state.service.get_node_shards(node_id).await?;
1058 :
1059 0 : json_response(StatusCode::OK, node_status)
1060 0 : }
1061 :
1062 0 : async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1063 0 : check_permissions(&req, Scope::Admin)?;
1064 :
1065 0 : let req = match maybe_forward(req).await {
1066 0 : ForwardOutcome::Forwarded(res) => {
1067 0 : return res;
1068 : }
1069 0 : ForwardOutcome::NotForwarded(req) => req,
1070 : };
1071 :
1072 0 : let state = get_state(&req);
1073 0 : let leader = state.service.get_leader().await.map_err(|err| {
1074 0 : ApiError::InternalServerError(anyhow::anyhow!(
1075 0 : "Failed to read leader from database: {err}"
1076 0 : ))
1077 0 : })?;
1078 :
1079 0 : json_response(StatusCode::OK, leader)
1080 0 : }
1081 :
1082 0 : async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1083 0 : check_permissions(&req, Scope::Admin)?;
1084 :
1085 0 : let req = match maybe_forward(req).await {
1086 0 : ForwardOutcome::Forwarded(res) => {
1087 0 : return res;
1088 : }
1089 0 : ForwardOutcome::NotForwarded(req) => req,
1090 : };
1091 :
1092 0 : let state = get_state(&req);
1093 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1094 0 : json_response(
1095 : StatusCode::OK,
1096 0 : state.service.start_node_delete(node_id).await?,
1097 : )
1098 0 : }
1099 :
1100 0 : async fn handle_cancel_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1101 0 : check_permissions(&req, Scope::Infra)?;
1102 :
1103 0 : let req = match maybe_forward(req).await {
1104 0 : ForwardOutcome::Forwarded(res) => {
1105 0 : return res;
1106 : }
1107 0 : ForwardOutcome::NotForwarded(req) => req,
1108 : };
1109 :
1110 0 : let state = get_state(&req);
1111 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1112 0 : json_response(
1113 : StatusCode::ACCEPTED,
1114 0 : state.service.cancel_node_delete(node_id).await?,
1115 : )
1116 0 : }
1117 :
1118 0 : async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1119 0 : check_permissions(&req, Scope::Infra)?;
1120 :
1121 0 : let req = match maybe_forward(req).await {
1122 0 : ForwardOutcome::Forwarded(res) => {
1123 0 : return res;
1124 : }
1125 0 : ForwardOutcome::NotForwarded(req) => req,
1126 : };
1127 :
1128 0 : let state = get_state(&req);
1129 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1130 :
1131 0 : state.service.start_node_drain(node_id).await?;
1132 :
1133 0 : json_response(StatusCode::ACCEPTED, ())
1134 0 : }
1135 :
1136 0 : async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1137 0 : check_permissions(&req, Scope::Infra)?;
1138 :
1139 0 : let req = match maybe_forward(req).await {
1140 0 : ForwardOutcome::Forwarded(res) => {
1141 0 : return res;
1142 : }
1143 0 : ForwardOutcome::NotForwarded(req) => req,
1144 : };
1145 :
1146 0 : let state = get_state(&req);
1147 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1148 :
1149 0 : state.service.cancel_node_drain(node_id).await?;
1150 :
1151 0 : json_response(StatusCode::ACCEPTED, ())
1152 0 : }
1153 :
1154 0 : async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1155 0 : check_permissions(&req, Scope::Infra)?;
1156 :
1157 0 : let req = match maybe_forward(req).await {
1158 0 : ForwardOutcome::Forwarded(res) => {
1159 0 : return res;
1160 : }
1161 0 : ForwardOutcome::NotForwarded(req) => req,
1162 : };
1163 :
1164 0 : let state = get_state(&req);
1165 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1166 :
1167 0 : state.service.start_node_fill(node_id).await?;
1168 :
1169 0 : json_response(StatusCode::ACCEPTED, ())
1170 0 : }
1171 :
1172 0 : async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1173 0 : check_permissions(&req, Scope::Infra)?;
1174 :
1175 0 : let req = match maybe_forward(req).await {
1176 0 : ForwardOutcome::Forwarded(res) => {
1177 0 : return res;
1178 : }
1179 0 : ForwardOutcome::NotForwarded(req) => req,
1180 : };
1181 :
1182 0 : let state = get_state(&req);
1183 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1184 :
1185 0 : state.service.cancel_node_fill(node_id).await?;
1186 :
1187 0 : json_response(StatusCode::ACCEPTED, ())
1188 0 : }
1189 :
1190 0 : async fn handle_safekeeper_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1191 0 : check_permissions(&req, Scope::Infra)?;
1192 :
1193 0 : let req = match maybe_forward(req).await {
1194 0 : ForwardOutcome::Forwarded(res) => {
1195 0 : return res;
1196 : }
1197 0 : ForwardOutcome::NotForwarded(req) => req,
1198 : };
1199 :
1200 0 : let state = get_state(&req);
1201 0 : let safekeepers = state.service.safekeepers_list().await?;
1202 0 : json_response(StatusCode::OK, safekeepers)
1203 0 : }
1204 :
1205 0 : async fn handle_metadata_health_update(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1206 0 : check_permissions(&req, Scope::Scrubber)?;
1207 :
1208 0 : let mut req = match maybe_forward(req).await {
1209 0 : ForwardOutcome::Forwarded(res) => {
1210 0 : return res;
1211 : }
1212 0 : ForwardOutcome::NotForwarded(req) => req,
1213 : };
1214 :
1215 0 : let update_req = json_request::<MetadataHealthUpdateRequest>(&mut req).await?;
1216 0 : let state = get_state(&req);
1217 :
1218 0 : state.service.metadata_health_update(update_req).await?;
1219 :
1220 0 : json_response(StatusCode::OK, MetadataHealthUpdateResponse {})
1221 0 : }
1222 :
1223 0 : async fn handle_metadata_health_list_unhealthy(
1224 0 : req: Request<Body>,
1225 0 : ) -> Result<Response<Body>, ApiError> {
1226 0 : check_permissions(&req, Scope::Admin)?;
1227 :
1228 0 : let req = match maybe_forward(req).await {
1229 0 : ForwardOutcome::Forwarded(res) => {
1230 0 : return res;
1231 : }
1232 0 : ForwardOutcome::NotForwarded(req) => req,
1233 : };
1234 :
1235 0 : let state = get_state(&req);
1236 0 : let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
1237 :
1238 0 : json_response(
1239 : StatusCode::OK,
1240 0 : MetadataHealthListUnhealthyResponse {
1241 0 : unhealthy_tenant_shards,
1242 0 : },
1243 : )
1244 0 : }
1245 :
1246 0 : async fn handle_metadata_health_list_outdated(
1247 0 : req: Request<Body>,
1248 0 : ) -> Result<Response<Body>, ApiError> {
1249 0 : check_permissions(&req, Scope::Admin)?;
1250 :
1251 0 : let mut req = match maybe_forward(req).await {
1252 0 : ForwardOutcome::Forwarded(res) => {
1253 0 : return res;
1254 : }
1255 0 : ForwardOutcome::NotForwarded(req) => req,
1256 : };
1257 :
1258 0 : let list_outdated_req = json_request::<MetadataHealthListOutdatedRequest>(&mut req).await?;
1259 0 : let state = get_state(&req);
1260 0 : let health_records = state
1261 0 : .service
1262 0 : .metadata_health_list_outdated(list_outdated_req.not_scrubbed_for)
1263 0 : .await?;
1264 :
1265 0 : json_response(
1266 : StatusCode::OK,
1267 0 : MetadataHealthListOutdatedResponse { health_records },
1268 : )
1269 0 : }
1270 :
1271 0 : async fn handle_tenant_shard_split(
1272 0 : service: Arc<Service>,
1273 0 : req: Request<Body>,
1274 0 : ) -> Result<Response<Body>, ApiError> {
1275 0 : check_permissions(&req, Scope::Admin)?;
1276 : // NB: don't rate limit: admin operation.
1277 :
1278 0 : let mut req = match maybe_forward(req).await {
1279 0 : ForwardOutcome::Forwarded(res) => {
1280 0 : return res;
1281 : }
1282 0 : ForwardOutcome::NotForwarded(req) => req,
1283 : };
1284 :
1285 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1286 0 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
1287 :
1288 0 : json_response(
1289 : StatusCode::OK,
1290 0 : service.tenant_shard_split(tenant_id, split_req).await?,
1291 : )
1292 0 : }
1293 :
1294 0 : async fn handle_tenant_shard_migrate(
1295 0 : service: Arc<Service>,
1296 0 : req: Request<Body>,
1297 0 : ) -> Result<Response<Body>, ApiError> {
1298 0 : check_permissions(&req, Scope::Admin)?;
1299 : // NB: don't rate limit: admin operation.
1300 :
1301 0 : let mut req = match maybe_forward(req).await {
1302 0 : ForwardOutcome::Forwarded(res) => {
1303 0 : return res;
1304 : }
1305 0 : ForwardOutcome::NotForwarded(req) => req,
1306 : };
1307 :
1308 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1309 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1310 0 : json_response(
1311 : StatusCode::OK,
1312 0 : service
1313 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
1314 0 : .await?,
1315 : )
1316 0 : }
1317 :
1318 0 : async fn handle_tenant_shard_migrate_secondary(
1319 0 : service: Arc<Service>,
1320 0 : req: Request<Body>,
1321 0 : ) -> Result<Response<Body>, ApiError> {
1322 0 : check_permissions(&req, Scope::Admin)?;
1323 : // NB: don't rate limit: admin operation.
1324 :
1325 0 : let mut req = match maybe_forward(req).await {
1326 0 : ForwardOutcome::Forwarded(res) => {
1327 0 : return res;
1328 : }
1329 0 : ForwardOutcome::NotForwarded(req) => req,
1330 : };
1331 :
1332 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1333 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1334 0 : json_response(
1335 : StatusCode::OK,
1336 0 : service
1337 0 : .tenant_shard_migrate_secondary(tenant_shard_id, migrate_req)
1338 0 : .await?,
1339 : )
1340 0 : }
1341 :
1342 0 : async fn handle_tenant_shard_cancel_reconcile(
1343 0 : service: Arc<Service>,
1344 0 : req: Request<Body>,
1345 0 : ) -> Result<Response<Body>, ApiError> {
1346 0 : check_permissions(&req, Scope::Admin)?;
1347 : // NB: don't rate limit: admin operation.
1348 :
1349 0 : let req = match maybe_forward(req).await {
1350 0 : ForwardOutcome::Forwarded(res) => {
1351 0 : return res;
1352 : }
1353 0 : ForwardOutcome::NotForwarded(req) => req,
1354 : };
1355 :
1356 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1357 0 : json_response(
1358 : StatusCode::OK,
1359 0 : service
1360 0 : .tenant_shard_cancel_reconcile(tenant_shard_id)
1361 0 : .await?,
1362 : )
1363 0 : }
1364 :
1365 0 : async fn handle_tenant_update_policy(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1366 0 : check_permissions(&req, Scope::Admin)?;
1367 : // NB: don't rate limit: admin operation.
1368 :
1369 0 : let mut req = match maybe_forward(req).await {
1370 0 : ForwardOutcome::Forwarded(res) => {
1371 0 : return res;
1372 : }
1373 0 : ForwardOutcome::NotForwarded(req) => req,
1374 : };
1375 :
1376 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1377 0 : let update_req = json_request::<TenantPolicyRequest>(&mut req).await?;
1378 0 : let state = get_state(&req);
1379 :
1380 0 : json_response(
1381 : StatusCode::OK,
1382 0 : state
1383 0 : .service
1384 0 : .tenant_update_policy(tenant_id, update_req)
1385 0 : .await?,
1386 : )
1387 0 : }
1388 :
1389 0 : async fn handle_update_preferred_azs(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1390 0 : check_permissions(&req, Scope::Admin)?;
1391 :
1392 0 : let mut req = match maybe_forward(req).await {
1393 0 : ForwardOutcome::Forwarded(res) => {
1394 0 : return res;
1395 : }
1396 0 : ForwardOutcome::NotForwarded(req) => req,
1397 : };
1398 :
1399 0 : let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
1400 0 : let state = get_state(&req);
1401 :
1402 0 : json_response(
1403 : StatusCode::OK,
1404 0 : state.service.update_shards_preferred_azs(azs_req).await?,
1405 : )
1406 0 : }
1407 :
1408 0 : async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1409 0 : check_permissions(&req, Scope::ControllerPeer)?;
1410 :
1411 0 : let req = match maybe_forward(req).await {
1412 0 : ForwardOutcome::Forwarded(res) => {
1413 0 : return res;
1414 : }
1415 0 : ForwardOutcome::NotForwarded(req) => req,
1416 : };
1417 :
1418 0 : let state = get_state(&req);
1419 0 : let result = state.service.step_down().await;
1420 :
1421 0 : json_response(StatusCode::OK, result)
1422 0 : }
1423 :
1424 0 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1425 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1426 0 : check_permissions(&req, Scope::PageServerApi)?;
1427 0 : maybe_rate_limit(&req, tenant_id).await;
1428 :
1429 0 : let req = match maybe_forward(req).await {
1430 0 : ForwardOutcome::Forwarded(res) => {
1431 0 : return res;
1432 : }
1433 0 : ForwardOutcome::NotForwarded(req) => req,
1434 : };
1435 :
1436 0 : let state = get_state(&req);
1437 :
1438 0 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
1439 0 : }
1440 :
1441 0 : async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1442 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1443 0 : check_permissions(&req, Scope::PageServerApi)?;
1444 0 : maybe_rate_limit(&req, tenant_id).await;
1445 :
1446 0 : let req = match maybe_forward(req).await {
1447 0 : ForwardOutcome::Forwarded(res) => {
1448 0 : return res;
1449 : }
1450 0 : ForwardOutcome::NotForwarded(req) => req,
1451 : };
1452 :
1453 0 : let state = get_state(&req);
1454 :
1455 0 : json_response(
1456 : StatusCode::OK,
1457 0 : state.service.tenant_import(tenant_id).await?,
1458 : )
1459 0 : }
1460 :
1461 0 : async fn handle_timeline_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1462 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1463 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
1464 0 : check_permissions(&req, Scope::PageServerApi)?;
1465 0 : maybe_rate_limit(&req, tenant_id).await;
1466 :
1467 0 : let mut req = match maybe_forward(req).await {
1468 0 : ForwardOutcome::Forwarded(res) => {
1469 0 : return res;
1470 : }
1471 0 : ForwardOutcome::NotForwarded(req) => req,
1472 : };
1473 :
1474 0 : let import_req = json_request::<TimelineImportRequest>(&mut req).await?;
1475 :
1476 0 : let state = get_state(&req);
1477 :
1478 0 : if import_req.tenant_id != tenant_id || import_req.timeline_id != timeline_id {
1479 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1480 0 : "tenant id or timeline id mismatch: url={tenant_id}/{timeline_id}, body={}/{}",
1481 0 : import_req.tenant_id,
1482 0 : import_req.timeline_id
1483 0 : )));
1484 0 : }
1485 :
1486 0 : json_response(
1487 : StatusCode::OK,
1488 0 : state.service.timeline_import(import_req).await?,
1489 : )
1490 0 : }
1491 :
1492 0 : async fn handle_tenant_timeline_locate(
1493 0 : service: Arc<Service>,
1494 0 : req: Request<Body>,
1495 0 : ) -> Result<Response<Body>, ApiError> {
1496 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1497 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
1498 :
1499 0 : check_permissions(&req, Scope::Admin)?;
1500 0 : maybe_rate_limit(&req, tenant_id).await;
1501 :
1502 0 : match maybe_forward(req).await {
1503 0 : ForwardOutcome::Forwarded(res) => {
1504 0 : return res;
1505 : }
1506 0 : ForwardOutcome::NotForwarded(_req) => {}
1507 : };
1508 :
1509 0 : json_response(
1510 : StatusCode::OK,
1511 0 : service
1512 0 : .tenant_timeline_locate(tenant_id, timeline_id)
1513 0 : .await?,
1514 : )
1515 0 : }
1516 :
1517 0 : async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1518 0 : check_permissions(&req, Scope::Admin)?;
1519 :
1520 0 : let req = match maybe_forward(req).await {
1521 0 : ForwardOutcome::Forwarded(res) => {
1522 0 : return res;
1523 : }
1524 0 : ForwardOutcome::NotForwarded(req) => req,
1525 : };
1526 :
1527 0 : let state = get_state(&req);
1528 0 : state.service.tenants_dump()
1529 0 : }
1530 :
1531 0 : async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1532 0 : check_permissions(&req, Scope::Admin)?;
1533 :
1534 0 : let req = match maybe_forward(req).await {
1535 0 : ForwardOutcome::Forwarded(res) => {
1536 0 : return res;
1537 : }
1538 0 : ForwardOutcome::NotForwarded(req) => req,
1539 : };
1540 :
1541 0 : let state = get_state(&req);
1542 0 : state.service.scheduler_dump()
1543 0 : }
1544 :
1545 0 : async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1546 0 : check_permissions(&req, Scope::Admin)?;
1547 :
1548 0 : let req = match maybe_forward(req).await {
1549 0 : ForwardOutcome::Forwarded(res) => {
1550 0 : return res;
1551 : }
1552 0 : ForwardOutcome::NotForwarded(req) => req,
1553 : };
1554 :
1555 0 : let state = get_state(&req);
1556 :
1557 0 : json_response(StatusCode::OK, state.service.consistency_check().await?)
1558 0 : }
1559 :
1560 0 : async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1561 0 : check_permissions(&req, Scope::Admin)?;
1562 :
1563 0 : let req = match maybe_forward(req).await {
1564 0 : ForwardOutcome::Forwarded(res) => {
1565 0 : return res;
1566 : }
1567 0 : ForwardOutcome::NotForwarded(req) => req,
1568 : };
1569 :
1570 0 : let state = get_state(&req);
1571 :
1572 0 : json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
1573 0 : }
1574 :
1575 : /// Status endpoint is just used for checking that our HTTP listener is up
1576 : ///
1577 : /// This serves as our k8s startup probe.
1578 0 : async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1579 0 : match maybe_forward(req).await {
1580 0 : ForwardOutcome::Forwarded(res) => {
1581 0 : return res;
1582 : }
1583 0 : ForwardOutcome::NotForwarded(_req) => {}
1584 : };
1585 :
1586 0 : json_response(StatusCode::OK, ())
1587 0 : }
1588 :
1589 : /// Liveness endpoint indicates that this storage controller is in a state
1590 : /// where it can fulfill it's responsibilties. Namely, startup has finished
1591 : /// and it is the current leader.
1592 : ///
1593 : /// This serves as our k8s liveness probe.
1594 0 : async fn handle_live(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1595 0 : let req = match maybe_forward(req).await {
1596 0 : ForwardOutcome::Forwarded(res) => {
1597 0 : return res;
1598 : }
1599 0 : ForwardOutcome::NotForwarded(req) => req,
1600 : };
1601 :
1602 0 : let state = get_state(&req);
1603 0 : let live = state.service.startup_complete.is_ready()
1604 0 : && state.service.get_leadership_status() == LeadershipStatus::Leader;
1605 :
1606 0 : if live {
1607 0 : json_response(StatusCode::OK, ())
1608 : } else {
1609 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
1610 : }
1611 0 : }
1612 :
1613 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
1614 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
1615 0 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1616 0 : let req = match maybe_forward(req).await {
1617 0 : ForwardOutcome::Forwarded(res) => {
1618 0 : return res;
1619 : }
1620 0 : ForwardOutcome::NotForwarded(req) => req,
1621 : };
1622 :
1623 0 : let state = get_state(&req);
1624 0 : if state.service.startup_complete.is_ready() {
1625 0 : json_response(StatusCode::OK, ())
1626 : } else {
1627 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
1628 : }
1629 0 : }
1630 :
1631 : impl From<ReconcileError> for ApiError {
1632 0 : fn from(value: ReconcileError) -> Self {
1633 0 : ApiError::Conflict(format!("Reconciliation error: {value}"))
1634 0 : }
1635 : }
1636 :
1637 : /// Return the safekeeper record by instance id, or 404.
1638 : ///
1639 : /// Not used by anything except manual testing.
1640 0 : async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1641 0 : check_permissions(&req, Scope::Infra)?;
1642 :
1643 0 : let id = parse_request_param::<i64>(&req, "id")?;
1644 :
1645 0 : let req = match maybe_forward(req).await {
1646 0 : ForwardOutcome::Forwarded(res) => {
1647 0 : return res;
1648 : }
1649 0 : ForwardOutcome::NotForwarded(req) => req,
1650 : };
1651 :
1652 0 : let state = get_state(&req);
1653 :
1654 0 : let res = state.service.get_safekeeper(id).await;
1655 :
1656 0 : match res {
1657 0 : Ok(b) => json_response(StatusCode::OK, b),
1658 : Err(crate::persistence::DatabaseError::Query(diesel::result::Error::NotFound)) => {
1659 0 : Err(ApiError::NotFound("unknown instance id".into()))
1660 : }
1661 0 : Err(other) => Err(other.into()),
1662 : }
1663 0 : }
1664 :
1665 : /// Used as part of deployment scripts.
1666 : ///
1667 : /// Assumes information is only relayed to storage controller after first selecting an unique id on
1668 : /// control plane database, which means we have an id field in the request and payload.
1669 0 : async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
1670 0 : check_permissions(&req, Scope::Infra)?;
1671 :
1672 0 : let body = json_request::<SafekeeperUpsert>(&mut req).await?;
1673 0 : let id = parse_request_param::<i64>(&req, "id")?;
1674 :
1675 0 : if id != body.id {
1676 : // it should be repeated
1677 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1678 0 : "id mismatch: url={id:?}, body={:?}",
1679 0 : body.id
1680 0 : )));
1681 0 : }
1682 :
1683 0 : if id <= 0 {
1684 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1685 0 : "id not allowed to be zero or negative: {id}"
1686 0 : )));
1687 0 : }
1688 :
1689 0 : let req = match maybe_forward(req).await {
1690 0 : ForwardOutcome::Forwarded(res) => {
1691 0 : return res;
1692 : }
1693 0 : ForwardOutcome::NotForwarded(req) => req,
1694 : };
1695 :
1696 0 : let state = get_state(&req);
1697 :
1698 0 : state.service.upsert_safekeeper(body).await?;
1699 :
1700 0 : Ok(Response::builder()
1701 0 : .status(StatusCode::NO_CONTENT)
1702 0 : .body(Body::empty())
1703 0 : .unwrap())
1704 0 : }
1705 :
1706 : /// Sets the scheduling policy of the specified safekeeper
1707 0 : async fn handle_safekeeper_scheduling_policy(
1708 0 : mut req: Request<Body>,
1709 0 : ) -> Result<Response<Body>, ApiError> {
1710 0 : check_permissions(&req, Scope::Admin)?;
1711 :
1712 0 : let body = json_request::<SafekeeperSchedulingPolicyRequest>(&mut req).await?;
1713 0 : let id = parse_request_param::<i64>(&req, "id")?;
1714 :
1715 0 : let req = match maybe_forward(req).await {
1716 0 : ForwardOutcome::Forwarded(res) => {
1717 0 : return res;
1718 : }
1719 0 : ForwardOutcome::NotForwarded(req) => req,
1720 : };
1721 :
1722 0 : let state = get_state(&req);
1723 :
1724 0 : state
1725 0 : .service
1726 0 : .set_safekeeper_scheduling_policy(id, body.scheduling_policy)
1727 0 : .await?;
1728 :
1729 0 : json_response(StatusCode::OK, ())
1730 0 : }
1731 :
1732 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
1733 : /// be allowed to run if Service has finished its initial reconciliation.
1734 0 : async fn tenant_service_handler<R, H>(
1735 0 : request: Request<Body>,
1736 0 : handler: H,
1737 0 : request_name: RequestName,
1738 0 : ) -> R::Output
1739 0 : where
1740 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1741 0 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
1742 0 : {
1743 0 : let state = get_state(&request);
1744 0 : let service = state.service.clone();
1745 :
1746 0 : let startup_complete = service.startup_complete.clone();
1747 0 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
1748 0 : .await
1749 0 : .is_err()
1750 : {
1751 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
1752 : // timeouts around its remote calls, to bound its runtime.
1753 0 : return Err(ApiError::Timeout(
1754 0 : "Timed out waiting for service readiness".into(),
1755 0 : ));
1756 0 : }
1757 :
1758 0 : named_request_span(
1759 0 : request,
1760 0 : |request| async move { handler(service, request).await },
1761 0 : request_name,
1762 : )
1763 0 : .await
1764 0 : }
1765 :
1766 : /// Check if the required scope is held in the request's token, or if the request has
1767 : /// a token with 'admin' scope then always permit it.
1768 0 : fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(), ApiError> {
1769 0 : check_permission_with(request, |claims| {
1770 0 : match crate::auth::check_permission(claims, required_scope) {
1771 0 : Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
1772 0 : Ok(()) => Ok(()),
1773 0 : Err(_) => Err(e),
1774 : },
1775 0 : Ok(()) => Ok(()),
1776 : }
1777 0 : })
1778 0 : }
1779 :
1780 : #[derive(Clone, Debug)]
1781 : struct RequestMeta {
1782 : method: hyper::http::Method,
1783 : at: Instant,
1784 : }
1785 :
1786 0 : pub fn prologue_leadership_status_check_middleware<
1787 0 : B: hyper::body::HttpBody + Send + Sync + 'static,
1788 0 : >() -> Middleware<B, ApiError> {
1789 0 : Middleware::pre(move |req| async move {
1790 0 : let state = get_state(&req);
1791 0 : let leadership_status = state.service.get_leadership_status();
1792 :
1793 : enum AllowedRoutes {
1794 : All,
1795 : Some(&'static [&'static str]),
1796 : }
1797 :
1798 0 : let allowed_routes = match leadership_status {
1799 0 : LeadershipStatus::Leader => AllowedRoutes::All,
1800 0 : LeadershipStatus::SteppedDown => AllowedRoutes::All,
1801 0 : LeadershipStatus::Candidate => AllowedRoutes::Some(&[
1802 0 : "/ready",
1803 0 : "/status",
1804 0 : "/metrics",
1805 0 : "/profile/cpu",
1806 0 : "/profile/heap",
1807 0 : ]),
1808 : };
1809 :
1810 0 : match allowed_routes {
1811 0 : AllowedRoutes::All => Ok(req),
1812 0 : AllowedRoutes::Some(allowed) if allowed.contains(&req.uri().path()) => Ok(req),
1813 : _ => {
1814 0 : tracing::info!(
1815 0 : "Request {} not allowed due to current leadership state",
1816 0 : req.uri()
1817 : );
1818 :
1819 0 : Err(ApiError::ResourceUnavailable(
1820 0 : format!("Current leadership status is {leadership_status}").into(),
1821 0 : ))
1822 : }
1823 : }
1824 0 : })
1825 0 : }
1826 :
1827 0 : fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
1828 0 : -> Middleware<B, ApiError> {
1829 0 : Middleware::pre(move |req| async move {
1830 0 : let meta = RequestMeta {
1831 0 : method: req.method().clone(),
1832 0 : at: Instant::now(),
1833 0 : };
1834 :
1835 0 : req.set_context(meta);
1836 :
1837 0 : Ok(req)
1838 0 : })
1839 0 : }
1840 :
1841 0 : fn epilogue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
1842 0 : -> Middleware<B, ApiError> {
1843 0 : Middleware::post_with_info(move |resp, req_info| async move {
1844 0 : let request_name = match req_info.context::<RequestName>() {
1845 0 : Some(name) => name,
1846 : None => {
1847 0 : return Ok(resp);
1848 : }
1849 : };
1850 :
1851 0 : if let Some(meta) = req_info.context::<RequestMeta>() {
1852 0 : let status = &crate::metrics::METRICS_REGISTRY
1853 0 : .metrics_group
1854 0 : .storage_controller_http_request_status;
1855 0 : let latency = &crate::metrics::METRICS_REGISTRY
1856 0 : .metrics_group
1857 0 : .storage_controller_http_request_latency;
1858 0 :
1859 0 : status.inc(HttpRequestStatusLabelGroup {
1860 0 : path: request_name.0,
1861 0 : method: meta.method.clone().into(),
1862 0 : status: crate::metrics::StatusCode(resp.status()),
1863 0 : });
1864 0 :
1865 0 : latency.observe(
1866 0 : HttpRequestLatencyLabelGroup {
1867 0 : path: request_name.0,
1868 0 : method: meta.method.into(),
1869 0 : },
1870 0 : meta.at.elapsed().as_secs_f64(),
1871 0 : );
1872 0 : }
1873 0 : Ok(resp)
1874 0 : })
1875 0 : }
1876 :
1877 0 : pub async fn measured_metrics_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1878 : pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
1879 :
1880 0 : let req = match maybe_forward(req).await {
1881 0 : ForwardOutcome::Forwarded(res) => {
1882 0 : return res;
1883 : }
1884 0 : ForwardOutcome::NotForwarded(req) => req,
1885 : };
1886 :
1887 0 : let state = get_state(&req);
1888 0 : let payload = crate::metrics::METRICS_REGISTRY.encode(&state.neon_metrics);
1889 0 : let response = Response::builder()
1890 0 : .status(200)
1891 0 : .header(CONTENT_TYPE, TEXT_FORMAT)
1892 0 : .body(payload.into())
1893 0 : .unwrap();
1894 :
1895 0 : Ok(response)
1896 0 : }
1897 :
1898 : #[derive(Clone)]
1899 : struct RequestName(&'static str);
1900 :
1901 0 : async fn named_request_span<R, H>(
1902 0 : request: Request<Body>,
1903 0 : handler: H,
1904 0 : name: RequestName,
1905 0 : ) -> R::Output
1906 0 : where
1907 0 : R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1908 0 : H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
1909 0 : {
1910 0 : request.set_context(name);
1911 0 : request_span(request, handler).await
1912 0 : }
1913 :
1914 : enum ForwardOutcome {
1915 : Forwarded(Result<Response<Body>, ApiError>),
1916 : NotForwarded(Request<Body>),
1917 : }
1918 :
1919 : /// Potentially forward the request to the current storage controler leader.
1920 : /// More specifically we forward when:
1921 : /// 1. Request is not one of:
1922 : /// ["/control/v1/step_down", "/status", "/ready", "/metrics", "/profile/cpu", "/profile/heap"]
1923 : /// 2. Current instance is in [`LeadershipStatus::SteppedDown`] state
1924 : /// 3. There is a leader in the database to forward to
1925 : /// 4. Leader from step (3) is not the current instance
1926 : ///
1927 : /// Why forward?
1928 : /// It turns out that we can't rely on external orchestration to promptly route trafic to the
1929 : /// new leader. This is downtime inducing. Forwarding provides a safe way out.
1930 : ///
1931 : /// Why is it safe?
1932 : /// If a storcon instance is persisted in the database, then we know that it is the current leader.
1933 : /// There's one exception: time between handling step-down request and the new leader updating the
1934 : /// database.
1935 : ///
1936 : /// Let's treat the happy case first. The stepped down node does not produce any side effects,
1937 : /// since all request handling happens on the leader.
1938 : ///
1939 : /// As for the edge case, we are guaranteed to always have a maximum of two running instances.
1940 : /// Hence, if we are in the edge case scenario the leader persisted in the database is the
1941 : /// stepped down instance that received the request. Condition (4) above covers this scenario.
1942 0 : async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
1943 : const NOT_FOR_FORWARD: &[&str] = &[
1944 : "/control/v1/step_down",
1945 : "/status",
1946 : "/live",
1947 : "/ready",
1948 : "/metrics",
1949 : "/profile/cpu",
1950 : "/profile/heap",
1951 : ];
1952 :
1953 0 : let uri = req.uri();
1954 0 : let uri_for_forward = !NOT_FOR_FORWARD.contains(&uri.path());
1955 :
1956 : // Fast return before trying to take any Service locks, if we will never forward anyway
1957 0 : if !uri_for_forward {
1958 0 : return ForwardOutcome::NotForwarded(req);
1959 0 : }
1960 :
1961 0 : let state = get_state(&req);
1962 0 : let leadership_status = state.service.get_leadership_status();
1963 :
1964 0 : if leadership_status != LeadershipStatus::SteppedDown {
1965 0 : return ForwardOutcome::NotForwarded(req);
1966 0 : }
1967 :
1968 0 : let leader = state.service.get_leader().await;
1969 0 : let leader = {
1970 0 : match leader {
1971 0 : Ok(Some(leader)) => leader,
1972 : Ok(None) => {
1973 0 : return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
1974 0 : "No leader to forward to while in stepped down state".into(),
1975 0 : )));
1976 : }
1977 0 : Err(err) => {
1978 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
1979 0 : anyhow::anyhow!(
1980 0 : "Failed to get leader for forwarding while in stepped down state: {err}"
1981 0 : ),
1982 0 : )));
1983 : }
1984 : }
1985 : };
1986 :
1987 0 : let cfg = state.service.get_config();
1988 0 : if let Some(ref self_addr) = cfg.address_for_peers {
1989 0 : let leader_addr = match Uri::from_str(leader.address.as_str()) {
1990 0 : Ok(uri) => uri,
1991 0 : Err(err) => {
1992 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
1993 0 : anyhow::anyhow!(
1994 0 : "Failed to parse leader uri for forwarding while in stepped down state: {err}"
1995 0 : ),
1996 0 : )));
1997 : }
1998 : };
1999 :
2000 0 : if *self_addr == leader_addr {
2001 0 : return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
2002 0 : "Leader is stepped down instance".into(),
2003 0 : )));
2004 0 : }
2005 0 : }
2006 :
2007 0 : tracing::info!("Forwarding {} to leader at {}", uri, leader.address);
2008 :
2009 : // Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
2010 : // include some leeway to get the timeout for proxied requests.
2011 : const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
2012 :
2013 0 : let client = state.service.get_http_client().clone();
2014 :
2015 0 : let request: reqwest::Request = match convert_request(
2016 0 : req,
2017 0 : &client,
2018 0 : leader.address,
2019 : PROXIED_REQUEST_TIMEOUT,
2020 : )
2021 0 : .await
2022 : {
2023 0 : Ok(r) => r,
2024 0 : Err(err) => {
2025 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
2026 0 : "Failed to convert request for forwarding while in stepped down state: {err}"
2027 0 : ))));
2028 : }
2029 : };
2030 :
2031 0 : let response = match client.execute(request).await {
2032 0 : Ok(r) => r,
2033 0 : Err(err) => {
2034 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
2035 0 : "Failed to forward while in stepped down state: {err}"
2036 0 : ))));
2037 : }
2038 : };
2039 :
2040 0 : ForwardOutcome::Forwarded(convert_response(response).await)
2041 0 : }
2042 :
2043 : /// Convert a [`reqwest::Response`] to a [hyper::Response`] by passing through
2044 : /// a stable representation (string, bytes or integer)
2045 : ///
2046 : /// Ideally, we would not have to do this since both types use the http crate
2047 : /// under the hood. However, they use different versions of the crate and keeping
2048 : /// second order dependencies in sync is difficult.
2049 0 : async fn convert_response(resp: reqwest::Response) -> Result<hyper::Response<Body>, ApiError> {
2050 : use std::str::FromStr;
2051 :
2052 0 : let mut builder = hyper::Response::builder().status(resp.status().as_u16());
2053 0 : for (key, value) in resp.headers().into_iter() {
2054 0 : let key = hyper::header::HeaderName::from_str(key.as_str()).map_err(|err| {
2055 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
2056 0 : })?;
2057 :
2058 0 : let value = hyper::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
2059 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
2060 0 : })?;
2061 :
2062 0 : builder = builder.header(key, value);
2063 : }
2064 :
2065 0 : let body = http::Body::wrap_stream(resp.bytes_stream());
2066 :
2067 0 : builder.body(body).map_err(|err| {
2068 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
2069 0 : })
2070 0 : }
2071 :
2072 : /// Convert a [`reqwest::Request`] to a [hyper::Request`] by passing through
2073 : /// a stable representation (string, bytes or integer)
2074 : ///
2075 : /// See [`convert_response`] for why we are doing it this way.
2076 0 : async fn convert_request(
2077 0 : req: hyper::Request<Body>,
2078 0 : client: &reqwest::Client,
2079 0 : to_address: String,
2080 0 : timeout: Duration,
2081 0 : ) -> Result<reqwest::Request, ApiError> {
2082 : use std::str::FromStr;
2083 :
2084 0 : let (parts, body) = req.into_parts();
2085 0 : let method = reqwest::Method::from_str(parts.method.as_str()).map_err(|err| {
2086 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2087 0 : })?;
2088 :
2089 0 : let path_and_query = parts.uri.path_and_query().ok_or_else(|| {
2090 0 : ApiError::InternalServerError(anyhow::anyhow!(
2091 0 : "Request conversion failed: no path and query"
2092 0 : ))
2093 0 : })?;
2094 :
2095 0 : let uri = reqwest::Url::from_str(
2096 0 : format!(
2097 0 : "{}{}",
2098 0 : to_address.trim_end_matches("/"),
2099 0 : path_and_query.as_str()
2100 0 : )
2101 0 : .as_str(),
2102 : )
2103 0 : .map_err(|err| {
2104 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2105 0 : })?;
2106 :
2107 0 : let mut headers = reqwest::header::HeaderMap::new();
2108 0 : for (key, value) in parts.headers.into_iter() {
2109 0 : let key = match key {
2110 0 : Some(k) => k,
2111 : None => {
2112 0 : continue;
2113 : }
2114 : };
2115 :
2116 0 : let key = reqwest::header::HeaderName::from_str(key.as_str()).map_err(|err| {
2117 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2118 0 : })?;
2119 :
2120 0 : let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
2121 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2122 0 : })?;
2123 :
2124 0 : headers.insert(key, value);
2125 : }
2126 :
2127 0 : let body = hyper::body::to_bytes(body).await.map_err(|err| {
2128 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2129 0 : })?;
2130 :
2131 0 : client
2132 0 : .request(method, uri)
2133 0 : .headers(headers)
2134 0 : .body(body)
2135 0 : .timeout(timeout)
2136 0 : .build()
2137 0 : .map_err(|err| {
2138 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2139 0 : })
2140 0 : }
2141 :
2142 0 : pub fn make_router(
2143 0 : service: Arc<Service>,
2144 0 : auth: Option<Arc<SwappableJwtAuth>>,
2145 0 : build_info: BuildInfo,
2146 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
2147 0 : let mut router = endpoint::make_router()
2148 0 : .middleware(prologue_leadership_status_check_middleware())
2149 0 : .middleware(prologue_metrics_middleware())
2150 0 : .middleware(epilogue_metrics_middleware());
2151 0 : if auth.is_some() {
2152 0 : router = router.middleware(auth_middleware(|request| {
2153 0 : let state = get_state(request);
2154 0 : if state.allowlist_routes.contains(&request.uri().path()) {
2155 0 : None
2156 : } else {
2157 0 : state.auth.as_deref()
2158 : }
2159 0 : }));
2160 0 : }
2161 :
2162 0 : router
2163 0 : .data(Arc::new(HttpState::new(service, auth, build_info)))
2164 : // Non-prefixed generic endpoints (status, metrics, profiling)
2165 0 : .get("/metrics", |r| {
2166 0 : named_request_span(r, measured_metrics_handler, RequestName("metrics"))
2167 0 : })
2168 0 : .get("/status", |r| {
2169 0 : named_request_span(r, handle_status, RequestName("status"))
2170 0 : })
2171 0 : .get("/live", |r| {
2172 0 : named_request_span(r, handle_live, RequestName("live"))
2173 0 : })
2174 0 : .get("/ready", |r| {
2175 0 : named_request_span(r, handle_ready, RequestName("ready"))
2176 0 : })
2177 0 : .get("/profile/cpu", |r| {
2178 0 : named_request_span(r, profile_cpu_handler, RequestName("profile_cpu"))
2179 0 : })
2180 0 : .get("/profile/heap", |r| {
2181 0 : named_request_span(r, profile_heap_handler, RequestName("profile_heap"))
2182 0 : })
2183 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
2184 0 : .post("/upcall/v1/re-attach", |r| {
2185 0 : named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
2186 0 : })
2187 0 : .post("/upcall/v1/validate", |r| {
2188 0 : named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
2189 0 : })
2190 0 : .get("/upcall/v1/timeline_import_status", |r| {
2191 0 : named_request_span(
2192 0 : r,
2193 : handle_get_timeline_import_status,
2194 0 : RequestName("upcall_v1_timeline_import_status"),
2195 : )
2196 0 : })
2197 0 : .post("/upcall/v1/timeline_import_status", |r| {
2198 0 : named_request_span(
2199 0 : r,
2200 : handle_put_timeline_import_status,
2201 0 : RequestName("upcall_v1_timeline_import_status"),
2202 : )
2203 0 : })
2204 : // Test/dev/debug endpoints
2205 0 : .post("/debug/v1/attach-hook", |r| {
2206 0 : named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
2207 0 : })
2208 0 : .post("/debug/v1/inspect", |r| {
2209 0 : named_request_span(r, handle_inspect, RequestName("debug_v1_inspect"))
2210 0 : })
2211 0 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
2212 0 : named_request_span(r, handle_tenant_drop, RequestName("debug_v1_tenant_drop"))
2213 0 : })
2214 0 : .post("/debug/v1/node/:node_id/drop", |r| {
2215 0 : named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
2216 0 : })
2217 0 : .delete("/debug/v1/tombstone/:node_id", |r| {
2218 0 : named_request_span(
2219 0 : r,
2220 : handle_tombstone_delete,
2221 0 : RequestName("debug_v1_tombstone_delete"),
2222 : )
2223 0 : })
2224 0 : .get("/debug/v1/tombstone", |r| {
2225 0 : named_request_span(
2226 0 : r,
2227 : handle_tombstone_list,
2228 0 : RequestName("debug_v1_tombstone_list"),
2229 : )
2230 0 : })
2231 0 : .post("/debug/v1/tenant/:tenant_id/import", |r| {
2232 0 : named_request_span(
2233 0 : r,
2234 : handle_tenant_import,
2235 0 : RequestName("debug_v1_tenant_import"),
2236 : )
2237 0 : })
2238 0 : .get("/debug/v1/tenant", |r| {
2239 0 : named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
2240 0 : })
2241 0 : .get("/debug/v1/tenant/:tenant_id/locate", |r| {
2242 0 : tenant_service_handler(
2243 0 : r,
2244 : handle_tenant_locate,
2245 0 : RequestName("debug_v1_tenant_locate"),
2246 : )
2247 0 : })
2248 0 : .post(
2249 : "/debug/v1/tenant/:tenant_id/timeline/:timeline_id/import",
2250 0 : |r| {
2251 0 : named_request_span(
2252 0 : r,
2253 : handle_timeline_import,
2254 0 : RequestName("debug_v1_timeline_import"),
2255 : )
2256 0 : },
2257 : )
2258 0 : .get(
2259 : "/debug/v1/tenant/:tenant_id/timeline/:timeline_id/locate",
2260 0 : |r| {
2261 0 : tenant_service_handler(
2262 0 : r,
2263 : handle_tenant_timeline_locate,
2264 0 : RequestName("v1_tenant_timeline_locate"),
2265 : )
2266 0 : },
2267 : )
2268 0 : .get("/debug/v1/scheduler", |r| {
2269 0 : named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
2270 0 : })
2271 0 : .post("/debug/v1/consistency_check", |r| {
2272 0 : named_request_span(
2273 0 : r,
2274 : handle_consistency_check,
2275 0 : RequestName("debug_v1_consistency_check"),
2276 : )
2277 0 : })
2278 0 : .post("/debug/v1/reconcile_all", |r| {
2279 0 : request_span(r, handle_reconcile_all)
2280 0 : })
2281 0 : .put("/debug/v1/failpoints", |r| {
2282 0 : request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
2283 0 : })
2284 : // Node operations
2285 0 : .post("/control/v1/node", |r| {
2286 0 : named_request_span(r, handle_node_register, RequestName("control_v1_node"))
2287 0 : })
2288 : // This endpoint is deprecated and will be removed in a future version.
2289 : // Use PUT /control/v1/node/:node_id/delete instead.
2290 0 : .delete("/control/v1/node/:node_id", |r| {
2291 0 : named_request_span(
2292 0 : r,
2293 : handle_node_delete_old,
2294 0 : RequestName("control_v1_node_delete"),
2295 : )
2296 0 : })
2297 0 : .get("/control/v1/node", |r| {
2298 0 : named_request_span(r, handle_node_list, RequestName("control_v1_node"))
2299 0 : })
2300 0 : .put("/control/v1/node/:node_id/config", |r| {
2301 0 : named_request_span(
2302 0 : r,
2303 : handle_node_configure,
2304 0 : RequestName("control_v1_node_config"),
2305 : )
2306 0 : })
2307 0 : .get("/control/v1/node/:node_id", |r| {
2308 0 : named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
2309 0 : })
2310 0 : .get("/control/v1/node/:node_id/shards", |r| {
2311 0 : named_request_span(
2312 0 : r,
2313 : handle_node_shards,
2314 0 : RequestName("control_v1_node_describe"),
2315 : )
2316 0 : })
2317 0 : .get("/control/v1/leader", |r| {
2318 0 : named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
2319 0 : })
2320 0 : .put("/control/v1/node/:node_id/delete", |r| {
2321 0 : named_request_span(
2322 0 : r,
2323 : handle_node_delete,
2324 0 : RequestName("control_v1_start_node_delete"),
2325 : )
2326 0 : })
2327 0 : .delete("/control/v1/node/:node_id/delete", |r| {
2328 0 : named_request_span(
2329 0 : r,
2330 : handle_cancel_node_delete,
2331 0 : RequestName("control_v1_cancel_node_delete"),
2332 : )
2333 0 : })
2334 0 : .put("/control/v1/node/:node_id/drain", |r| {
2335 0 : named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
2336 0 : })
2337 0 : .delete("/control/v1/node/:node_id/drain", |r| {
2338 0 : named_request_span(
2339 0 : r,
2340 : handle_cancel_node_drain,
2341 0 : RequestName("control_v1_cancel_node_drain"),
2342 : )
2343 0 : })
2344 0 : .put("/control/v1/node/:node_id/fill", |r| {
2345 0 : named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
2346 0 : })
2347 0 : .delete("/control/v1/node/:node_id/fill", |r| {
2348 0 : named_request_span(
2349 0 : r,
2350 : handle_cancel_node_fill,
2351 0 : RequestName("control_v1_cancel_node_fill"),
2352 : )
2353 0 : })
2354 : // Metadata health operations
2355 0 : .post("/control/v1/metadata_health/update", |r| {
2356 0 : named_request_span(
2357 0 : r,
2358 : handle_metadata_health_update,
2359 0 : RequestName("control_v1_metadata_health_update"),
2360 : )
2361 0 : })
2362 0 : .get("/control/v1/metadata_health/unhealthy", |r| {
2363 0 : named_request_span(
2364 0 : r,
2365 : handle_metadata_health_list_unhealthy,
2366 0 : RequestName("control_v1_metadata_health_list_unhealthy"),
2367 : )
2368 0 : })
2369 0 : .post("/control/v1/metadata_health/outdated", |r| {
2370 0 : named_request_span(
2371 0 : r,
2372 : handle_metadata_health_list_outdated,
2373 0 : RequestName("control_v1_metadata_health_list_outdated"),
2374 : )
2375 0 : })
2376 : // Safekeepers
2377 0 : .get("/control/v1/safekeeper", |r| {
2378 0 : named_request_span(
2379 0 : r,
2380 : handle_safekeeper_list,
2381 0 : RequestName("control_v1_safekeeper_list"),
2382 : )
2383 0 : })
2384 0 : .get("/control/v1/safekeeper/:id", |r| {
2385 0 : named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
2386 0 : })
2387 0 : .post("/control/v1/safekeeper/:id", |r| {
2388 : // id is in the body
2389 0 : named_request_span(
2390 0 : r,
2391 : handle_upsert_safekeeper,
2392 0 : RequestName("v1_safekeeper_post"),
2393 : )
2394 0 : })
2395 0 : .post("/control/v1/safekeeper/:id/scheduling_policy", |r| {
2396 0 : named_request_span(
2397 0 : r,
2398 : handle_safekeeper_scheduling_policy,
2399 0 : RequestName("v1_safekeeper_scheduling_policy"),
2400 : )
2401 0 : })
2402 : // Tenant Shard operations
2403 0 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
2404 0 : tenant_service_handler(
2405 0 : r,
2406 : handle_tenant_shard_migrate,
2407 0 : RequestName("control_v1_tenant_migrate"),
2408 : )
2409 0 : })
2410 0 : .put(
2411 : "/control/v1/tenant/:tenant_shard_id/migrate_secondary",
2412 0 : |r| {
2413 0 : tenant_service_handler(
2414 0 : r,
2415 : handle_tenant_shard_migrate_secondary,
2416 0 : RequestName("control_v1_tenant_migrate_secondary"),
2417 : )
2418 0 : },
2419 : )
2420 0 : .put(
2421 : "/control/v1/tenant/:tenant_shard_id/cancel_reconcile",
2422 0 : |r| {
2423 0 : tenant_service_handler(
2424 0 : r,
2425 : handle_tenant_shard_cancel_reconcile,
2426 0 : RequestName("control_v1_tenant_cancel_reconcile"),
2427 : )
2428 0 : },
2429 : )
2430 0 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
2431 0 : tenant_service_handler(
2432 0 : r,
2433 : handle_tenant_shard_split,
2434 0 : RequestName("control_v1_tenant_shard_split"),
2435 : )
2436 0 : })
2437 0 : .get("/control/v1/tenant/:tenant_id", |r| {
2438 0 : tenant_service_handler(
2439 0 : r,
2440 : handle_tenant_describe,
2441 0 : RequestName("control_v1_tenant_describe"),
2442 : )
2443 0 : })
2444 0 : .get("/control/v1/tenant", |r| {
2445 0 : tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list"))
2446 0 : })
2447 0 : .put("/control/v1/tenant/:tenant_id/policy", |r| {
2448 0 : named_request_span(
2449 0 : r,
2450 : handle_tenant_update_policy,
2451 0 : RequestName("control_v1_tenant_policy"),
2452 : )
2453 0 : })
2454 0 : .put("/control/v1/preferred_azs", |r| {
2455 0 : named_request_span(
2456 0 : r,
2457 : handle_update_preferred_azs,
2458 0 : RequestName("control_v1_preferred_azs"),
2459 : )
2460 0 : })
2461 0 : .put("/control/v1/step_down", |r| {
2462 0 : named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
2463 0 : })
2464 : // Tenant operations
2465 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
2466 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
2467 0 : .post("/v1/tenant", |r| {
2468 0 : tenant_service_handler(r, handle_tenant_create, RequestName("v1_tenant"))
2469 0 : })
2470 0 : .delete("/v1/tenant/:tenant_id", |r| {
2471 0 : tenant_service_handler(r, handle_tenant_delete, RequestName("v1_tenant"))
2472 0 : })
2473 0 : .patch("/v1/tenant/config", |r| {
2474 0 : tenant_service_handler(
2475 0 : r,
2476 : handle_tenant_config_patch,
2477 0 : RequestName("v1_tenant_config"),
2478 : )
2479 0 : })
2480 0 : .put("/v1/tenant/config", |r| {
2481 0 : tenant_service_handler(r, handle_tenant_config_set, RequestName("v1_tenant_config"))
2482 0 : })
2483 0 : .get("/v1/tenant/:tenant_id/config", |r| {
2484 0 : tenant_service_handler(r, handle_tenant_config_get, RequestName("v1_tenant_config"))
2485 0 : })
2486 0 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
2487 0 : tenant_service_handler(
2488 0 : r,
2489 : handle_tenant_location_config,
2490 0 : RequestName("v1_tenant_location_config"),
2491 : )
2492 0 : })
2493 0 : .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
2494 0 : tenant_service_handler(
2495 0 : r,
2496 : handle_tenant_time_travel_remote_storage,
2497 0 : RequestName("v1_tenant_time_travel_remote_storage"),
2498 : )
2499 0 : })
2500 0 : .post("/v1/tenant/:tenant_id/secondary/download", |r| {
2501 0 : tenant_service_handler(
2502 0 : r,
2503 : handle_tenant_secondary_download,
2504 0 : RequestName("v1_tenant_secondary_download"),
2505 : )
2506 0 : })
2507 : // Timeline operations
2508 0 : .get("/control/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
2509 0 : tenant_service_handler(
2510 0 : r,
2511 : handle_tenant_timeline_describe,
2512 0 : RequestName("v1_tenant_timeline_describe"),
2513 : )
2514 0 : })
2515 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
2516 0 : tenant_service_handler(
2517 0 : r,
2518 : handle_tenant_timeline_delete,
2519 0 : RequestName("v1_tenant_timeline"),
2520 : )
2521 0 : })
2522 0 : .post("/v1/tenant/:tenant_id/timeline", |r| {
2523 0 : tenant_service_handler(
2524 0 : r,
2525 : handle_tenant_timeline_create,
2526 0 : RequestName("v1_tenant_timeline"),
2527 : )
2528 0 : })
2529 0 : .put(
2530 : "/v1/tenant/:tenant_id/timeline/:timeline_id/archival_config",
2531 0 : |r| {
2532 0 : tenant_service_handler(
2533 0 : r,
2534 : handle_tenant_timeline_archival_config,
2535 0 : RequestName("v1_tenant_timeline_archival_config"),
2536 : )
2537 0 : },
2538 : )
2539 0 : .put(
2540 : "/v1/tenant/:tenant_id/timeline/:timeline_id/detach_ancestor",
2541 0 : |r| {
2542 0 : tenant_service_handler(
2543 0 : r,
2544 : handle_tenant_timeline_detach_ancestor,
2545 0 : RequestName("v1_tenant_timeline_detach_ancestor"),
2546 : )
2547 0 : },
2548 : )
2549 0 : .post(
2550 : "/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
2551 0 : |r| {
2552 0 : tenant_service_handler(
2553 0 : r,
2554 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
2555 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2556 : )
2557 0 : },
2558 : )
2559 0 : .post(
2560 : "/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
2561 0 : |r| {
2562 0 : tenant_service_handler(
2563 0 : r,
2564 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
2565 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2566 : )
2567 0 : },
2568 : )
2569 0 : .post(
2570 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
2571 0 : |r| {
2572 0 : tenant_service_handler(
2573 0 : r,
2574 : handle_tenant_timeline_download_heatmap_layers,
2575 0 : RequestName("v1_tenant_timeline_download_heatmap_layers"),
2576 : )
2577 0 : },
2578 : )
2579 0 : .post(
2580 : "/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate",
2581 0 : |r| {
2582 0 : tenant_service_handler(
2583 0 : r,
2584 : handle_tenant_timeline_safekeeper_migrate,
2585 0 : RequestName("v1_tenant_timeline_safekeeper_migrate"),
2586 : )
2587 0 : },
2588 : )
2589 : // LSN lease passthrough to all shards
2590 0 : .post(
2591 : "/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",
2592 0 : |r| {
2593 0 : tenant_service_handler(
2594 0 : r,
2595 : handle_tenant_timeline_lsn_lease,
2596 0 : RequestName("v1_tenant_timeline_lsn_lease"),
2597 : )
2598 0 : },
2599 : )
2600 : // Tenant detail GET passthrough to shard zero:
2601 0 : .get("/v1/tenant/:tenant_id", |r| {
2602 0 : tenant_service_handler(
2603 0 : r,
2604 : handle_tenant_timeline_passthrough,
2605 0 : RequestName("v1_tenant_passthrough"),
2606 : )
2607 0 : })
2608 : // The `*` in the URL is a wildcard: any tenant/timeline GET APIs on the pageserver
2609 : // are implicitly exposed here. This must be last in the list to avoid
2610 : // taking precedence over other GET methods we might implement by hand.
2611 0 : .get("/v1/tenant/:tenant_id/*", |r| {
2612 0 : tenant_service_handler(
2613 0 : r,
2614 : handle_tenant_timeline_passthrough,
2615 0 : RequestName("v1_tenant_passthrough"),
2616 : )
2617 0 : })
2618 : // Tenant timeline mark_invisible passthrough to shard zero
2619 0 : .put(
2620 : "/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible",
2621 0 : |r| {
2622 0 : tenant_service_handler(
2623 0 : r,
2624 : handle_tenant_timeline_passthrough,
2625 0 : RequestName("v1_tenant_timeline_mark_invisible_passthrough"),
2626 : )
2627 0 : },
2628 : )
2629 0 : }
2630 :
2631 : #[cfg(test)]
2632 : mod test {
2633 :
2634 : use super::path_without_ids;
2635 :
2636 : #[test]
2637 1 : fn test_path_without_ids() {
2638 1 : assert_eq!(
2639 1 : path_without_ids(
2640 1 : "/v1/tenant/1a2b3344556677881122334455667788/timeline/AA223344556677881122334455667788"
2641 : ),
2642 : "/v1/tenant//timeline/"
2643 : );
2644 1 : assert_eq!(
2645 1 : path_without_ids(
2646 1 : "/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788"
2647 : ),
2648 : "/v1/tenant//timeline/"
2649 : );
2650 1 : assert_eq!(
2651 1 : path_without_ids(
2652 1 : "/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788?parameter=foo"
2653 : ),
2654 : "/v1/tenant//timeline/"
2655 : );
2656 1 : }
2657 : }
|