Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::{Arc, LazyLock};
3 : use std::time::{Duration, Instant};
4 :
5 : use anyhow::Context;
6 : use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
7 : use futures::Future;
8 : use http_utils::endpoint::{
9 : self, auth_middleware, check_permission_with, profile_cpu_handler, profile_heap_handler,
10 : request_span,
11 : };
12 : use http_utils::error::ApiError;
13 : use http_utils::failpoints::failpoints_handler;
14 : use http_utils::json::{json_request, json_response};
15 : use http_utils::request::{must_get_query_param, parse_query_param, parse_request_param};
16 : use http_utils::{RequestExt, RouterBuilder};
17 : use hyper::header::CONTENT_TYPE;
18 : use hyper::{Body, Request, Response, StatusCode, Uri};
19 : use metrics::{BuildInfo, NeonMetrics};
20 : use pageserver_api::controller_api::{
21 : MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
22 : MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
23 : NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest,
24 : ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
25 : TimelineImportRequest, TimelineSafekeeperMigrateRequest,
26 : };
27 : use pageserver_api::models::{
28 : DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
29 : TenantLocationConfigRequest, TenantShardSplitRequest, TenantTimeTravelRequest,
30 : TimelineArchivalConfigRequest, TimelineCreateRequest,
31 : };
32 : use pageserver_api::shard::TenantShardId;
33 : use pageserver_api::upcall_api::{
34 : PutTimelineImportStatusRequest, ReAttachRequest, TimelineImportStatusRequest, ValidateRequest,
35 : };
36 : use pageserver_client::{BlockUnblock, mgmt_api};
37 :
38 : use routerify::Middleware;
39 : use tokio_util::sync::CancellationToken;
40 : use tracing::warn;
41 : use utils::auth::{Scope, SwappableJwtAuth};
42 : use utils::id::{NodeId, TenantId, TimelineId};
43 : use uuid::Uuid;
44 :
45 : use crate::http;
46 : use crate::metrics::{
47 : HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, METRICS_REGISTRY,
48 : PageserverRequestLabelGroup,
49 : };
50 : use crate::persistence::SafekeeperUpsert;
51 : use crate::reconciler::ReconcileError;
52 : use crate::service::{
53 : LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service,
54 : TenantMutationLocations,
55 : };
56 :
57 : /// State available to HTTP request handlers
58 : pub struct HttpState {
59 : service: Arc<crate::service::Service>,
60 : auth: Option<Arc<SwappableJwtAuth>>,
61 : rate_limiter: governor::DefaultKeyedRateLimiter<TenantId>,
62 : neon_metrics: NeonMetrics,
63 : allowlist_routes: &'static [&'static str],
64 : }
65 :
66 : impl HttpState {
67 0 : pub fn new(
68 0 : service: Arc<crate::service::Service>,
69 0 : auth: Option<Arc<SwappableJwtAuth>>,
70 0 : build_info: BuildInfo,
71 0 : ) -> Self {
72 0 : let quota = governor::Quota::per_second(service.get_config().tenant_rate_limit);
73 0 : Self {
74 0 : service,
75 0 : auth,
76 0 : rate_limiter: governor::RateLimiter::keyed(quota),
77 0 : neon_metrics: NeonMetrics::new(build_info),
78 0 : allowlist_routes: &[
79 0 : "/status",
80 0 : "/live",
81 0 : "/ready",
82 0 : "/metrics",
83 0 : "/profile/cpu",
84 0 : "/profile/heap",
85 0 : ],
86 0 : }
87 0 : }
88 : }
89 :
90 : #[inline(always)]
91 0 : fn get_state(request: &Request<Body>) -> &HttpState {
92 0 : request
93 0 : .data::<Arc<HttpState>>()
94 0 : .expect("unknown state type")
95 0 : .as_ref()
96 0 : }
97 :
98 : /// Rate limits tenant requests.
99 : ///
100 : /// TODO: this should be a request middleware, but requires us to extract the tenant ID from
101 : /// different URLs in a systematic way.
102 : ///
103 : /// TODO: consider returning a 429 response if these start piling up.
104 0 : async fn maybe_rate_limit(request: &Request<Body>, tenant_id: TenantId) {
105 : // Check if the tenant should be rate-limited.
106 0 : let rate_limiter = &get_state(request).rate_limiter;
107 0 : if rate_limiter.check_key(&tenant_id).is_ok() {
108 0 : return;
109 0 : }
110 :
111 : // Measure the rate limiting delay.
112 0 : let _timer = METRICS_REGISTRY
113 0 : .metrics_group
114 0 : .storage_controller_http_request_rate_limited
115 0 : .start_timer();
116 :
117 : // Log rate limited tenants once every 10 seconds.
118 : static LOG_RATE_LIMITER: LazyLock<governor::DefaultKeyedRateLimiter<TenantId>> =
119 0 : LazyLock::new(|| {
120 0 : let quota = governor::Quota::with_period(Duration::from_secs(10)).unwrap();
121 0 : governor::RateLimiter::keyed(quota)
122 0 : });
123 :
124 0 : if LOG_RATE_LIMITER.check_key(&tenant_id).is_ok() {
125 0 : warn!("tenant {tenant_id} is rate limited")
126 0 : }
127 :
128 : // Wait for quota.
129 0 : rate_limiter.until_key_ready(&tenant_id).await;
130 0 : }
131 :
132 : /// Pageserver calls into this on startup, to learn which tenants it should attach
133 0 : async fn handle_re_attach(req: Request<Body>) -> Result<Response<Body>, ApiError> {
134 0 : check_permissions(&req, Scope::GenerationsApi)?;
135 :
136 0 : let mut req = match maybe_forward(req).await {
137 0 : ForwardOutcome::Forwarded(res) => {
138 0 : return res;
139 : }
140 0 : ForwardOutcome::NotForwarded(req) => req,
141 : };
142 :
143 0 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
144 0 : let state = get_state(&req);
145 0 : json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
146 0 : }
147 :
148 : /// Pageserver calls into this before doing deletions, to confirm that it still
149 : /// holds the latest generation for the tenants with deletions enqueued
150 0 : async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError> {
151 0 : check_permissions(&req, Scope::GenerationsApi)?;
152 :
153 0 : let mut req = match maybe_forward(req).await {
154 0 : ForwardOutcome::Forwarded(res) => {
155 0 : return res;
156 : }
157 0 : ForwardOutcome::NotForwarded(req) => req,
158 : };
159 :
160 0 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
161 0 : let state = get_state(&req);
162 0 : json_response(StatusCode::OK, state.service.validate(validate_req).await?)
163 0 : }
164 :
165 0 : async fn handle_get_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
166 0 : check_permissions(&req, Scope::GenerationsApi)?;
167 :
168 0 : let mut req = match maybe_forward(req).await {
169 0 : ForwardOutcome::Forwarded(res) => {
170 0 : return res;
171 : }
172 0 : ForwardOutcome::NotForwarded(req) => req,
173 : };
174 :
175 0 : let get_req = json_request::<TimelineImportStatusRequest>(&mut req).await?;
176 :
177 0 : let state = get_state(&req);
178 :
179 0 : json_response(
180 : StatusCode::OK,
181 0 : state
182 0 : .service
183 0 : .handle_timeline_shard_import_progress(get_req)
184 0 : .await?,
185 : )
186 0 : }
187 :
188 0 : async fn handle_put_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
189 0 : check_permissions(&req, Scope::GenerationsApi)?;
190 :
191 0 : let mut req = match maybe_forward(req).await {
192 0 : ForwardOutcome::Forwarded(res) => {
193 0 : return res;
194 : }
195 0 : ForwardOutcome::NotForwarded(req) => req,
196 : };
197 :
198 0 : let put_req = json_request::<PutTimelineImportStatusRequest>(&mut req).await?;
199 :
200 0 : let state = get_state(&req);
201 0 : json_response(
202 : StatusCode::OK,
203 0 : state
204 0 : .service
205 0 : .handle_timeline_shard_import_progress_upcall(put_req)
206 0 : .await?,
207 : )
208 0 : }
209 :
210 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
211 : /// (in the real control plane this is unnecessary, because the same program is managing
212 : /// generation numbers and doing attachments).
213 0 : async fn handle_attach_hook(req: Request<Body>) -> Result<Response<Body>, ApiError> {
214 0 : check_permissions(&req, Scope::Admin)?;
215 :
216 0 : let mut req = match maybe_forward(req).await {
217 0 : ForwardOutcome::Forwarded(res) => {
218 0 : return res;
219 : }
220 0 : ForwardOutcome::NotForwarded(req) => req,
221 : };
222 :
223 0 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
224 0 : let state = get_state(&req);
225 :
226 0 : json_response(
227 : StatusCode::OK,
228 0 : state
229 0 : .service
230 0 : .attach_hook(attach_req)
231 0 : .await
232 0 : .map_err(ApiError::InternalServerError)?,
233 : )
234 0 : }
235 :
236 0 : async fn handle_inspect(req: Request<Body>) -> Result<Response<Body>, ApiError> {
237 0 : check_permissions(&req, Scope::Admin)?;
238 :
239 0 : let mut req = match maybe_forward(req).await {
240 0 : ForwardOutcome::Forwarded(res) => {
241 0 : return res;
242 : }
243 0 : ForwardOutcome::NotForwarded(req) => req,
244 : };
245 :
246 0 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
247 :
248 0 : let state = get_state(&req);
249 :
250 0 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
251 0 : }
252 :
253 0 : async fn handle_tenant_create(
254 0 : service: Arc<Service>,
255 0 : req: Request<Body>,
256 0 : ) -> Result<Response<Body>, ApiError> {
257 0 : check_permissions(&req, Scope::PageServerApi)?;
258 :
259 0 : let mut req = match maybe_forward(req).await {
260 0 : ForwardOutcome::Forwarded(res) => {
261 0 : return res;
262 : }
263 0 : ForwardOutcome::NotForwarded(req) => req,
264 : };
265 :
266 0 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
267 :
268 0 : json_response(
269 : StatusCode::CREATED,
270 0 : service.tenant_create(create_req).await?,
271 : )
272 0 : }
273 :
274 0 : async fn handle_tenant_location_config(
275 0 : service: Arc<Service>,
276 0 : req: Request<Body>,
277 0 : ) -> Result<Response<Body>, ApiError> {
278 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
279 0 : check_permissions(&req, Scope::PageServerApi)?;
280 :
281 0 : let mut req = match maybe_forward(req).await {
282 0 : ForwardOutcome::Forwarded(res) => {
283 0 : return res;
284 : }
285 0 : ForwardOutcome::NotForwarded(req) => req,
286 : };
287 :
288 0 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
289 0 : json_response(
290 : StatusCode::OK,
291 0 : service
292 0 : .tenant_location_config(tenant_shard_id, config_req)
293 0 : .await?,
294 : )
295 0 : }
296 :
297 0 : async fn handle_tenant_config_patch(
298 0 : service: Arc<Service>,
299 0 : req: Request<Body>,
300 0 : ) -> Result<Response<Body>, ApiError> {
301 0 : check_permissions(&req, Scope::PageServerApi)?;
302 :
303 0 : let mut req = match maybe_forward(req).await {
304 0 : ForwardOutcome::Forwarded(res) => {
305 0 : return res;
306 : }
307 0 : ForwardOutcome::NotForwarded(req) => req,
308 : };
309 :
310 0 : let config_req = json_request::<TenantConfigPatchRequest>(&mut req).await?;
311 :
312 0 : json_response(
313 : StatusCode::OK,
314 0 : service.tenant_config_patch(config_req).await?,
315 : )
316 0 : }
317 :
318 0 : async fn handle_tenant_config_set(
319 0 : service: Arc<Service>,
320 0 : req: Request<Body>,
321 0 : ) -> Result<Response<Body>, ApiError> {
322 0 : check_permissions(&req, Scope::PageServerApi)?;
323 :
324 0 : let mut req = match maybe_forward(req).await {
325 0 : ForwardOutcome::Forwarded(res) => {
326 0 : return res;
327 : }
328 0 : ForwardOutcome::NotForwarded(req) => req,
329 : };
330 :
331 0 : let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
332 :
333 0 : json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
334 0 : }
335 :
336 0 : async fn handle_tenant_config_get(
337 0 : service: Arc<Service>,
338 0 : req: Request<Body>,
339 0 : ) -> Result<Response<Body>, ApiError> {
340 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
341 0 : check_permissions(&req, Scope::PageServerApi)?;
342 0 : maybe_rate_limit(&req, tenant_id).await;
343 :
344 0 : match maybe_forward(req).await {
345 0 : ForwardOutcome::Forwarded(res) => {
346 0 : return res;
347 : }
348 0 : ForwardOutcome::NotForwarded(_req) => {}
349 : };
350 :
351 0 : json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
352 0 : }
353 :
354 0 : async fn handle_tenant_time_travel_remote_storage(
355 0 : service: Arc<Service>,
356 0 : req: Request<Body>,
357 0 : ) -> Result<Response<Body>, ApiError> {
358 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
359 0 : check_permissions(&req, Scope::PageServerApi)?;
360 0 : maybe_rate_limit(&req, tenant_id).await;
361 :
362 0 : let mut req = match maybe_forward(req).await {
363 0 : ForwardOutcome::Forwarded(res) => {
364 0 : return res;
365 : }
366 0 : ForwardOutcome::NotForwarded(req) => req,
367 : };
368 :
369 0 : let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
370 :
371 0 : let timestamp_raw = must_get_query_param(&req, "travel_to")?;
372 0 : let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
373 0 : ApiError::BadRequest(anyhow::anyhow!(
374 0 : "Invalid time for travel_to: {timestamp_raw:?}"
375 0 : ))
376 0 : })?;
377 :
378 0 : let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
379 0 : let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
380 0 : ApiError::BadRequest(anyhow::anyhow!(
381 0 : "Invalid time for done_if_after: {done_if_after_raw:?}"
382 0 : ))
383 0 : })?;
384 :
385 0 : service
386 0 : .tenant_time_travel_remote_storage(
387 0 : &time_travel_req,
388 0 : tenant_id,
389 0 : timestamp_raw,
390 0 : done_if_after_raw,
391 0 : )
392 0 : .await?;
393 0 : json_response(StatusCode::OK, ())
394 0 : }
395 :
396 0 : fn map_reqwest_hyper_status(status: reqwest::StatusCode) -> Result<hyper::StatusCode, ApiError> {
397 0 : hyper::StatusCode::from_u16(status.as_u16())
398 0 : .context("invalid status code")
399 0 : .map_err(ApiError::InternalServerError)
400 0 : }
401 :
402 0 : async fn handle_tenant_secondary_download(
403 0 : service: Arc<Service>,
404 0 : req: Request<Body>,
405 0 : ) -> Result<Response<Body>, ApiError> {
406 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
407 0 : let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
408 0 : maybe_rate_limit(&req, tenant_id).await;
409 :
410 0 : match maybe_forward(req).await {
411 0 : ForwardOutcome::Forwarded(res) => {
412 0 : return res;
413 : }
414 0 : ForwardOutcome::NotForwarded(_req) => {}
415 : };
416 :
417 0 : let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
418 0 : json_response(map_reqwest_hyper_status(status)?, progress)
419 0 : }
420 :
421 0 : async fn handle_tenant_delete(
422 0 : service: Arc<Service>,
423 0 : req: Request<Body>,
424 0 : ) -> Result<Response<Body>, ApiError> {
425 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
426 0 : check_permissions(&req, Scope::PageServerApi)?;
427 0 : maybe_rate_limit(&req, tenant_id).await;
428 :
429 0 : match maybe_forward(req).await {
430 0 : ForwardOutcome::Forwarded(res) => {
431 0 : return res;
432 : }
433 0 : ForwardOutcome::NotForwarded(_req) => {}
434 : };
435 :
436 0 : let status_code = service
437 0 : .tenant_delete(tenant_id)
438 0 : .await
439 0 : .and_then(map_reqwest_hyper_status)?;
440 :
441 0 : if status_code == StatusCode::NOT_FOUND {
442 : // The pageserver uses 404 for successful deletion, but we use 200
443 0 : json_response(StatusCode::OK, ())
444 : } else {
445 0 : json_response(status_code, ())
446 : }
447 0 : }
448 :
449 0 : async fn handle_tenant_timeline_create(
450 0 : service: Arc<Service>,
451 0 : req: Request<Body>,
452 0 : ) -> Result<Response<Body>, ApiError> {
453 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
454 0 : check_permissions(&req, Scope::PageServerApi)?;
455 0 : maybe_rate_limit(&req, tenant_id).await;
456 :
457 0 : let mut req = match maybe_forward(req).await {
458 0 : ForwardOutcome::Forwarded(res) => {
459 0 : return res;
460 : }
461 0 : ForwardOutcome::NotForwarded(req) => req,
462 : };
463 :
464 0 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
465 0 : json_response(
466 : StatusCode::CREATED,
467 0 : service
468 0 : .tenant_timeline_create(tenant_id, create_req)
469 0 : .await?,
470 : )
471 0 : }
472 :
473 0 : async fn handle_tenant_timeline_delete(
474 0 : service: Arc<Service>,
475 0 : req: Request<Body>,
476 0 : ) -> Result<Response<Body>, ApiError> {
477 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
478 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
479 :
480 0 : check_permissions(&req, Scope::PageServerApi)?;
481 0 : maybe_rate_limit(&req, tenant_id).await;
482 :
483 0 : match maybe_forward(req).await {
484 0 : ForwardOutcome::Forwarded(res) => {
485 0 : return res;
486 : }
487 0 : ForwardOutcome::NotForwarded(_req) => {}
488 : };
489 :
490 0 : service
491 0 : .maybe_delete_timeline_import(tenant_id, timeline_id)
492 0 : .await?;
493 :
494 : // For timeline deletions, which both implement an "initially return 202, then 404 once
495 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
496 0 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
497 0 : where
498 0 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
499 0 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
500 0 : {
501 : // On subsequent retries, wait longer.
502 : // Enable callers with a 25 second request timeout to reliably get a response
503 : const MAX_WAIT: Duration = Duration::from_secs(25);
504 : const MAX_RETRY_PERIOD: Duration = Duration::from_secs(5);
505 :
506 0 : let started_at = Instant::now();
507 :
508 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
509 : // completed.
510 0 : let mut retry_period = Duration::from_secs(1);
511 :
512 : loop {
513 0 : let status = f(service.clone()).await?;
514 0 : match status {
515 : StatusCode::ACCEPTED => {
516 0 : tracing::info!("Deletion accepted, waiting to try again...");
517 0 : tokio::time::sleep(retry_period).await;
518 0 : retry_period = MAX_RETRY_PERIOD;
519 : }
520 : StatusCode::CONFLICT => {
521 0 : tracing::info!("Deletion already in progress, waiting to try again...");
522 0 : tokio::time::sleep(retry_period).await;
523 : }
524 : StatusCode::NOT_FOUND => {
525 0 : tracing::info!("Deletion complete");
526 0 : return json_response(StatusCode::OK, ());
527 : }
528 : _ => {
529 0 : tracing::warn!("Unexpected status {status}");
530 0 : return json_response(status, ());
531 : }
532 : }
533 :
534 0 : let now = Instant::now();
535 0 : if now + retry_period > started_at + MAX_WAIT {
536 0 : tracing::info!("Deletion timed out waiting for 404");
537 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
538 : // the pageserver's swagger definition for this endpoint, and has the same desired
539 : // effect of causing the control plane to retry later.
540 0 : return json_response(StatusCode::CONFLICT, ());
541 0 : }
542 : }
543 0 : }
544 :
545 0 : deletion_wrapper(service, move |service| async move {
546 0 : service
547 0 : .tenant_timeline_delete(tenant_id, timeline_id)
548 0 : .await
549 0 : .and_then(map_reqwest_hyper_status)
550 0 : })
551 0 : .await
552 0 : }
553 :
554 0 : async fn handle_tenant_timeline_archival_config(
555 0 : service: Arc<Service>,
556 0 : req: Request<Body>,
557 0 : ) -> Result<Response<Body>, ApiError> {
558 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
559 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
560 :
561 0 : check_permissions(&req, Scope::PageServerApi)?;
562 0 : maybe_rate_limit(&req, tenant_id).await;
563 :
564 0 : let mut req = match maybe_forward(req).await {
565 0 : ForwardOutcome::Forwarded(res) => {
566 0 : return res;
567 : }
568 0 : ForwardOutcome::NotForwarded(req) => req,
569 : };
570 :
571 0 : let create_req = json_request::<TimelineArchivalConfigRequest>(&mut req).await?;
572 :
573 0 : service
574 0 : .tenant_timeline_archival_config(tenant_id, timeline_id, create_req)
575 0 : .await?;
576 :
577 0 : json_response(StatusCode::OK, ())
578 0 : }
579 :
580 0 : async fn handle_tenant_timeline_detach_ancestor(
581 0 : service: Arc<Service>,
582 0 : req: Request<Body>,
583 0 : ) -> Result<Response<Body>, ApiError> {
584 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
585 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
586 0 : let behavior: Option<DetachBehavior> = parse_query_param(&req, "detach_behavior")?;
587 :
588 0 : check_permissions(&req, Scope::PageServerApi)?;
589 0 : maybe_rate_limit(&req, tenant_id).await;
590 :
591 0 : match maybe_forward(req).await {
592 0 : ForwardOutcome::Forwarded(res) => {
593 0 : return res;
594 : }
595 0 : ForwardOutcome::NotForwarded(_req) => {}
596 : };
597 :
598 0 : let res = service
599 0 : .tenant_timeline_detach_ancestor(tenant_id, timeline_id, behavior)
600 0 : .await?;
601 :
602 0 : json_response(StatusCode::OK, res)
603 0 : }
604 :
605 0 : async fn handle_tenant_timeline_block_unblock_gc(
606 0 : service: Arc<Service>,
607 0 : req: Request<Body>,
608 0 : dir: BlockUnblock,
609 0 : ) -> Result<Response<Body>, ApiError> {
610 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
611 0 : check_permissions(&req, Scope::PageServerApi)?;
612 0 : maybe_rate_limit(&req, tenant_id).await;
613 :
614 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
615 :
616 0 : service
617 0 : .tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
618 0 : .await?;
619 :
620 0 : json_response(StatusCode::OK, ())
621 0 : }
622 :
623 0 : async fn handle_tenant_timeline_download_heatmap_layers(
624 0 : service: Arc<Service>,
625 0 : req: Request<Body>,
626 0 : ) -> Result<Response<Body>, ApiError> {
627 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
628 :
629 0 : check_permissions(&req, Scope::PageServerApi)?;
630 0 : maybe_rate_limit(&req, tenant_shard_id.tenant_id).await;
631 :
632 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
633 0 : let concurrency: Option<usize> = parse_query_param(&req, "concurrency")?;
634 0 : let recurse = parse_query_param(&req, "recurse")?.unwrap_or(false);
635 :
636 0 : service
637 0 : .tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency, recurse)
638 0 : .await?;
639 :
640 0 : json_response(StatusCode::OK, ())
641 0 : }
642 :
643 0 : async fn handle_tenant_timeline_safekeeper_migrate(
644 0 : service: Arc<Service>,
645 0 : req: Request<Body>,
646 0 : ) -> Result<Response<Body>, ApiError> {
647 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
648 0 : check_permissions(&req, Scope::PageServerApi)?;
649 0 : maybe_rate_limit(&req, tenant_id).await;
650 :
651 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
652 :
653 0 : let mut req = match maybe_forward(req).await {
654 0 : ForwardOutcome::Forwarded(res) => {
655 0 : return res;
656 : }
657 0 : ForwardOutcome::NotForwarded(req) => req,
658 : };
659 :
660 0 : let migrate_req = json_request::<TimelineSafekeeperMigrateRequest>(&mut req).await?;
661 :
662 0 : service
663 0 : .tenant_timeline_safekeeper_migrate(tenant_id, timeline_id, migrate_req)
664 0 : .await?;
665 :
666 0 : json_response(StatusCode::OK, ())
667 0 : }
668 :
669 0 : async fn handle_tenant_timeline_lsn_lease(
670 0 : service: Arc<Service>,
671 0 : req: Request<Body>,
672 0 : ) -> Result<Response<Body>, ApiError> {
673 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
674 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
675 :
676 0 : check_permissions(&req, Scope::PageServerApi)?;
677 0 : maybe_rate_limit(&req, tenant_id).await;
678 :
679 0 : let mut req = match maybe_forward(req).await {
680 0 : ForwardOutcome::Forwarded(res) => {
681 0 : return res;
682 : }
683 0 : ForwardOutcome::NotForwarded(req) => req,
684 : };
685 :
686 0 : let lsn_lease_request = json_request::<LsnLeaseRequest>(&mut req).await?;
687 :
688 0 : service
689 0 : .tenant_timeline_lsn_lease(tenant_id, timeline_id, lsn_lease_request.lsn)
690 0 : .await?;
691 :
692 0 : json_response(StatusCode::OK, ())
693 0 : }
694 :
695 : // For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
696 : // and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
697 : // compare to, so we can just filter out our well known ID format with regexes.
698 3 : fn path_without_ids(path: &str) -> String {
699 : static ID_REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
700 3 : ID_REGEX
701 3 : .get_or_init(|| regex::Regex::new(r"([0-9a-fA-F]{32}(-[0-9]{4})?|\?.*)").unwrap())
702 3 : .replace_all(path, "")
703 3 : .to_string()
704 3 : }
705 :
706 0 : async fn handle_tenant_timeline_passthrough(
707 0 : service: Arc<Service>,
708 0 : req: Request<Body>,
709 0 : ) -> Result<Response<Body>, ApiError> {
710 0 : let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_id")?;
711 0 : check_permissions(&req, Scope::PageServerApi)?;
712 0 : maybe_rate_limit(&req, tenant_or_shard_id.tenant_id).await;
713 :
714 0 : let req = match maybe_forward(req).await {
715 0 : ForwardOutcome::Forwarded(res) => {
716 0 : return res;
717 : }
718 0 : ForwardOutcome::NotForwarded(req) => req,
719 : };
720 :
721 0 : let Some(path) = req.uri().path_and_query() else {
722 : // This should never happen, our request router only calls us if there is a path
723 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
724 : };
725 :
726 0 : let method = match *req.method() {
727 0 : hyper::Method::GET => reqwest::Method::GET,
728 0 : hyper::Method::POST => reqwest::Method::POST,
729 0 : hyper::Method::PUT => reqwest::Method::PUT,
730 0 : hyper::Method::DELETE => reqwest::Method::DELETE,
731 0 : hyper::Method::PATCH => reqwest::Method::PATCH,
732 0 : _ => return Err(ApiError::BadRequest(anyhow::anyhow!("Unsupported method"))),
733 : };
734 :
735 0 : tracing::info!(
736 0 : "Proxying request for tenant {} ({})",
737 : tenant_or_shard_id.tenant_id,
738 : path
739 : );
740 :
741 0 : let tenant_shard_id = if tenant_or_shard_id.is_unsharded() {
742 : // If the request contains only tenant ID, find the node that holds shard zero
743 0 : let (_, shard_id) = service
744 0 : .tenant_shard0_node(tenant_or_shard_id.tenant_id)
745 0 : .await?;
746 0 : shard_id
747 : } else {
748 0 : tenant_or_shard_id
749 : };
750 :
751 0 : let service_inner = service.clone();
752 :
753 0 : service.tenant_shard_remote_mutation(tenant_shard_id, |locations| async move {
754 0 : let TenantMutationLocations(locations) = locations;
755 0 : if locations.is_empty() {
756 0 : return Err(ApiError::NotFound(anyhow::anyhow!("Tenant {} not found", tenant_or_shard_id.tenant_id).into()));
757 0 : }
758 :
759 0 : let (tenant_or_shard_id, locations) = locations.into_iter().next().unwrap();
760 0 : let node = locations.latest.node;
761 :
762 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
763 : // rewrite this to a shard-aware shard zero ID.
764 0 : let path = format!("{path}");
765 0 : let tenant_str = tenant_or_shard_id.tenant_id.to_string();
766 0 : let tenant_shard_str = format!("{tenant_shard_id}");
767 0 : let path = path.replace(&tenant_str, &tenant_shard_str);
768 :
769 0 : let latency = &METRICS_REGISTRY
770 0 : .metrics_group
771 0 : .storage_controller_passthrough_request_latency;
772 :
773 0 : let path_label = path_without_ids(&path)
774 0 : .split('/')
775 0 : .filter(|token| !token.is_empty())
776 0 : .collect::<Vec<_>>()
777 0 : .join("_");
778 0 : let labels = PageserverRequestLabelGroup {
779 0 : pageserver_id: &node.get_id().to_string(),
780 0 : path: &path_label,
781 0 : method: crate::metrics::Method::Get,
782 0 : };
783 :
784 0 : let _timer = latency.start_timer(labels.clone());
785 :
786 0 : let client = mgmt_api::Client::new(
787 0 : service_inner.get_http_client().clone(),
788 0 : node.base_url(),
789 0 : service_inner.get_config().pageserver_jwt_token.as_deref(),
790 : );
791 0 : let resp = client.op_raw(method, path).await.map_err(|e|
792 : // We return 503 here because if we can't successfully send a request to the pageserver,
793 : // either we aren't available or the pageserver is unavailable.
794 0 : ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
795 :
796 0 : if !resp.status().is_success() {
797 0 : let error_counter = &METRICS_REGISTRY
798 0 : .metrics_group
799 0 : .storage_controller_passthrough_request_error;
800 0 : error_counter.inc(labels);
801 0 : }
802 0 : let resp_staus = resp.status();
803 :
804 : // We have a reqest::Response, would like a http::Response
805 0 : let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp_staus)?);
806 0 : for (k, v) in resp.headers() {
807 0 : builder = builder.header(k.as_str(), v.as_bytes());
808 0 : }
809 0 : let resp_bytes = resp
810 0 : .bytes()
811 0 : .await
812 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
813 : // Inspect 404 errors: at this point, we know that the tenant exists, but the pageserver we route
814 : // the request to might not yet be ready. Therefore, if it is a _tenant_ not found error, we can
815 : // convert it into a 503. TODO: we should make this part of the check in `tenant_shard_remote_mutation`.
816 : // However, `tenant_shard_remote_mutation` currently cannot inspect the HTTP error response body,
817 : // so we have to do it here instead.
818 0 : if resp_staus == reqwest::StatusCode::NOT_FOUND {
819 0 : let resp_str = std::str::from_utf8(&resp_bytes)
820 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
821 : // We only handle "tenant not found" errors; other 404s like timeline not found should
822 : // be forwarded as-is.
823 0 : if Service::is_tenant_not_found_error(resp_str, tenant_or_shard_id.tenant_id) {
824 : // Rather than retry here, send the client a 503 to prompt a retry: this matches
825 : // the pageserver's use of 503, and all clients calling this API should retry on 503.
826 0 : return Err(ApiError::ResourceUnavailable(
827 0 : format!(
828 0 : "Pageserver {node} returned tenant 404 due to ongoing migration, retry later"
829 0 : )
830 0 : .into(),
831 0 : ));
832 0 : }
833 0 : }
834 0 : let response = builder
835 0 : .body(Body::from(resp_bytes))
836 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
837 0 : Ok(response)
838 0 : }).await?
839 0 : }
840 :
841 0 : async fn handle_tenant_locate(
842 0 : service: Arc<Service>,
843 0 : req: Request<Body>,
844 0 : ) -> Result<Response<Body>, ApiError> {
845 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
846 :
847 0 : check_permissions(&req, Scope::Admin)?;
848 : // NB: don't rate limit: admin operation.
849 :
850 0 : match maybe_forward(req).await {
851 0 : ForwardOutcome::Forwarded(res) => {
852 0 : return res;
853 : }
854 0 : ForwardOutcome::NotForwarded(_req) => {}
855 : };
856 :
857 0 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
858 0 : }
859 :
860 0 : async fn handle_tenant_describe(
861 0 : service: Arc<Service>,
862 0 : req: Request<Body>,
863 0 : ) -> Result<Response<Body>, ApiError> {
864 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
865 0 : check_permissions(&req, Scope::Scrubber)?;
866 : // NB: don't rate limit: scrubber operation.
867 :
868 0 : match maybe_forward(req).await {
869 0 : ForwardOutcome::Forwarded(res) => {
870 0 : return res;
871 : }
872 0 : ForwardOutcome::NotForwarded(_req) => {}
873 : };
874 :
875 0 : json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
876 0 : }
877 :
878 : /* BEGIN_HADRON */
879 0 : async fn handle_tenant_timeline_describe(
880 0 : service: Arc<Service>,
881 0 : req: Request<Body>,
882 0 : ) -> Result<Response<Body>, ApiError> {
883 0 : check_permissions(&req, Scope::Scrubber)?;
884 :
885 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
886 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
887 0 : match maybe_forward(req).await {
888 0 : ForwardOutcome::Forwarded(res) => {
889 0 : return res;
890 : }
891 0 : ForwardOutcome::NotForwarded(_req) => {}
892 : };
893 :
894 0 : json_response(
895 : StatusCode::OK,
896 0 : service
897 0 : .tenant_timeline_describe(tenant_id, timeline_id)
898 0 : .await?,
899 : )
900 0 : }
901 : /* END_HADRON */
902 :
903 0 : async fn handle_tenant_list(
904 0 : service: Arc<Service>,
905 0 : req: Request<Body>,
906 0 : ) -> Result<Response<Body>, ApiError> {
907 0 : check_permissions(&req, Scope::Admin)?;
908 :
909 0 : let limit: Option<usize> = parse_query_param(&req, "limit")?;
910 0 : let start_after: Option<TenantId> = parse_query_param(&req, "start_after")?;
911 0 : tracing::info!("start_after: {:?}", start_after);
912 :
913 0 : match maybe_forward(req).await {
914 0 : ForwardOutcome::Forwarded(res) => {
915 0 : return res;
916 : }
917 0 : ForwardOutcome::NotForwarded(_req) => {}
918 : };
919 :
920 0 : json_response(StatusCode::OK, service.tenant_list(limit, start_after))
921 0 : }
922 :
923 0 : async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiError> {
924 0 : check_permissions(&req, Scope::Infra)?;
925 :
926 0 : let mut req = match maybe_forward(req).await {
927 0 : ForwardOutcome::Forwarded(res) => {
928 0 : return res;
929 : }
930 0 : ForwardOutcome::NotForwarded(req) => req,
931 : };
932 :
933 0 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
934 0 : let state = get_state(&req);
935 0 : state.service.node_register(register_req).await?;
936 0 : json_response(StatusCode::OK, ())
937 0 : }
938 :
939 0 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
940 0 : check_permissions(&req, Scope::Infra)?;
941 :
942 0 : let req = match maybe_forward(req).await {
943 0 : ForwardOutcome::Forwarded(res) => {
944 0 : return res;
945 : }
946 0 : ForwardOutcome::NotForwarded(req) => req,
947 : };
948 :
949 0 : let state = get_state(&req);
950 0 : let mut nodes = state.service.node_list().await?;
951 0 : nodes.sort_by_key(|n| n.get_id());
952 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
953 :
954 0 : json_response(StatusCode::OK, api_nodes)
955 0 : }
956 :
957 0 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
958 0 : check_permissions(&req, Scope::Admin)?;
959 :
960 0 : let req = match maybe_forward(req).await {
961 0 : ForwardOutcome::Forwarded(res) => {
962 0 : return res;
963 : }
964 0 : ForwardOutcome::NotForwarded(req) => req,
965 : };
966 :
967 0 : let state = get_state(&req);
968 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
969 0 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
970 0 : }
971 :
972 0 : async fn handle_node_delete_old(req: Request<Body>) -> Result<Response<Body>, ApiError> {
973 0 : check_permissions(&req, Scope::Admin)?;
974 :
975 0 : let req = match maybe_forward(req).await {
976 0 : ForwardOutcome::Forwarded(res) => {
977 0 : return res;
978 : }
979 0 : ForwardOutcome::NotForwarded(req) => req,
980 : };
981 :
982 0 : let state = get_state(&req);
983 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
984 0 : json_response(
985 : StatusCode::OK,
986 0 : state.service.node_delete_old(node_id).await?,
987 : )
988 0 : }
989 :
990 0 : async fn handle_tombstone_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
991 0 : check_permissions(&req, Scope::Admin)?;
992 :
993 0 : let req = match maybe_forward(req).await {
994 0 : ForwardOutcome::Forwarded(res) => {
995 0 : return res;
996 : }
997 0 : ForwardOutcome::NotForwarded(req) => req,
998 : };
999 :
1000 0 : let state = get_state(&req);
1001 0 : let mut nodes = state.service.tombstone_list().await?;
1002 0 : nodes.sort_by_key(|n| n.get_id());
1003 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
1004 :
1005 0 : json_response(StatusCode::OK, api_nodes)
1006 0 : }
1007 :
1008 0 : async fn handle_tombstone_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1009 0 : check_permissions(&req, Scope::Admin)?;
1010 :
1011 0 : let req = match maybe_forward(req).await {
1012 0 : ForwardOutcome::Forwarded(res) => {
1013 0 : return res;
1014 : }
1015 0 : ForwardOutcome::NotForwarded(req) => req,
1016 : };
1017 :
1018 0 : let state = get_state(&req);
1019 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1020 0 : json_response(
1021 : StatusCode::OK,
1022 0 : state.service.tombstone_delete(node_id).await?,
1023 : )
1024 0 : }
1025 :
1026 0 : async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1027 0 : check_permissions(&req, Scope::Admin)?;
1028 :
1029 0 : let mut req = match maybe_forward(req).await {
1030 0 : ForwardOutcome::Forwarded(res) => {
1031 0 : return res;
1032 : }
1033 0 : ForwardOutcome::NotForwarded(req) => req,
1034 : };
1035 :
1036 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1037 0 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
1038 0 : if node_id != config_req.node_id {
1039 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1040 0 : "Path and body node_id differ"
1041 0 : )));
1042 0 : }
1043 0 : let state = get_state(&req);
1044 :
1045 0 : json_response(
1046 : StatusCode::OK,
1047 0 : state
1048 0 : .service
1049 0 : .external_node_configure(
1050 0 : config_req.node_id,
1051 0 : config_req.availability.map(NodeAvailability::from),
1052 0 : config_req.scheduling,
1053 0 : )
1054 0 : .await?,
1055 : )
1056 0 : }
1057 :
1058 0 : async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1059 0 : check_permissions(&req, Scope::Infra)?;
1060 :
1061 0 : let req = match maybe_forward(req).await {
1062 0 : ForwardOutcome::Forwarded(res) => {
1063 0 : return res;
1064 : }
1065 0 : ForwardOutcome::NotForwarded(req) => req,
1066 : };
1067 :
1068 0 : let state = get_state(&req);
1069 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1070 :
1071 0 : let node_status = state.service.get_node(node_id).await?.describe();
1072 :
1073 0 : json_response(StatusCode::OK, node_status)
1074 0 : }
1075 :
1076 0 : async fn handle_node_shards(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1077 0 : check_permissions(&req, Scope::Admin)?;
1078 :
1079 0 : let state = get_state(&req);
1080 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1081 :
1082 0 : let node_status = state.service.get_node_shards(node_id).await?;
1083 :
1084 0 : json_response(StatusCode::OK, node_status)
1085 0 : }
1086 :
1087 0 : async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1088 0 : check_permissions(&req, Scope::Admin)?;
1089 :
1090 0 : let req = match maybe_forward(req).await {
1091 0 : ForwardOutcome::Forwarded(res) => {
1092 0 : return res;
1093 : }
1094 0 : ForwardOutcome::NotForwarded(req) => req,
1095 : };
1096 :
1097 0 : let state = get_state(&req);
1098 0 : let leader = state.service.get_leader().await.map_err(|err| {
1099 0 : ApiError::InternalServerError(anyhow::anyhow!(
1100 0 : "Failed to read leader from database: {err}"
1101 0 : ))
1102 0 : })?;
1103 :
1104 0 : json_response(StatusCode::OK, leader)
1105 0 : }
1106 :
1107 0 : async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1108 0 : check_permissions(&req, Scope::Admin)?;
1109 :
1110 0 : let req = match maybe_forward(req).await {
1111 0 : ForwardOutcome::Forwarded(res) => {
1112 0 : return res;
1113 : }
1114 0 : ForwardOutcome::NotForwarded(req) => req,
1115 : };
1116 :
1117 0 : let state = get_state(&req);
1118 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1119 0 : let force: bool = parse_query_param(&req, "force")?.unwrap_or(false);
1120 0 : json_response(
1121 : StatusCode::OK,
1122 0 : state.service.start_node_delete(node_id, force).await?,
1123 : )
1124 0 : }
1125 :
1126 0 : async fn handle_cancel_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1127 0 : check_permissions(&req, Scope::Infra)?;
1128 :
1129 0 : let req = match maybe_forward(req).await {
1130 0 : ForwardOutcome::Forwarded(res) => {
1131 0 : return res;
1132 : }
1133 0 : ForwardOutcome::NotForwarded(req) => req,
1134 : };
1135 :
1136 0 : let state = get_state(&req);
1137 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1138 0 : json_response(
1139 : StatusCode::ACCEPTED,
1140 0 : state.service.cancel_node_delete(node_id).await?,
1141 : )
1142 0 : }
1143 :
1144 0 : async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1145 0 : check_permissions(&req, Scope::Infra)?;
1146 :
1147 0 : let req = match maybe_forward(req).await {
1148 0 : ForwardOutcome::Forwarded(res) => {
1149 0 : return res;
1150 : }
1151 0 : ForwardOutcome::NotForwarded(req) => req,
1152 : };
1153 :
1154 0 : let state = get_state(&req);
1155 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1156 :
1157 0 : state.service.start_node_drain(node_id).await?;
1158 :
1159 0 : json_response(StatusCode::ACCEPTED, ())
1160 0 : }
1161 :
1162 0 : async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1163 0 : check_permissions(&req, Scope::Infra)?;
1164 :
1165 0 : let req = match maybe_forward(req).await {
1166 0 : ForwardOutcome::Forwarded(res) => {
1167 0 : return res;
1168 : }
1169 0 : ForwardOutcome::NotForwarded(req) => req,
1170 : };
1171 :
1172 0 : let state = get_state(&req);
1173 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1174 :
1175 0 : state.service.cancel_node_drain(node_id).await?;
1176 :
1177 0 : json_response(StatusCode::ACCEPTED, ())
1178 0 : }
1179 :
1180 0 : async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1181 0 : check_permissions(&req, Scope::Infra)?;
1182 :
1183 0 : let req = match maybe_forward(req).await {
1184 0 : ForwardOutcome::Forwarded(res) => {
1185 0 : return res;
1186 : }
1187 0 : ForwardOutcome::NotForwarded(req) => req,
1188 : };
1189 :
1190 0 : let state = get_state(&req);
1191 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1192 :
1193 0 : state.service.start_node_fill(node_id).await?;
1194 :
1195 0 : json_response(StatusCode::ACCEPTED, ())
1196 0 : }
1197 :
1198 0 : async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1199 0 : check_permissions(&req, Scope::Infra)?;
1200 :
1201 0 : let req = match maybe_forward(req).await {
1202 0 : ForwardOutcome::Forwarded(res) => {
1203 0 : return res;
1204 : }
1205 0 : ForwardOutcome::NotForwarded(req) => req,
1206 : };
1207 :
1208 0 : let state = get_state(&req);
1209 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1210 :
1211 0 : state.service.cancel_node_fill(node_id).await?;
1212 :
1213 0 : json_response(StatusCode::ACCEPTED, ())
1214 0 : }
1215 :
1216 0 : async fn handle_safekeeper_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1217 0 : check_permissions(&req, Scope::Infra)?;
1218 :
1219 0 : let req = match maybe_forward(req).await {
1220 0 : ForwardOutcome::Forwarded(res) => {
1221 0 : return res;
1222 : }
1223 0 : ForwardOutcome::NotForwarded(req) => req,
1224 : };
1225 :
1226 0 : let state = get_state(&req);
1227 0 : let safekeepers = state.service.safekeepers_list().await?;
1228 0 : json_response(StatusCode::OK, safekeepers)
1229 0 : }
1230 :
1231 0 : async fn handle_metadata_health_update(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1232 0 : check_permissions(&req, Scope::Scrubber)?;
1233 :
1234 0 : let mut req = match maybe_forward(req).await {
1235 0 : ForwardOutcome::Forwarded(res) => {
1236 0 : return res;
1237 : }
1238 0 : ForwardOutcome::NotForwarded(req) => req,
1239 : };
1240 :
1241 0 : let update_req = json_request::<MetadataHealthUpdateRequest>(&mut req).await?;
1242 0 : let state = get_state(&req);
1243 :
1244 0 : state.service.metadata_health_update(update_req).await?;
1245 :
1246 0 : json_response(StatusCode::OK, MetadataHealthUpdateResponse {})
1247 0 : }
1248 :
1249 0 : async fn handle_metadata_health_list_unhealthy(
1250 0 : req: Request<Body>,
1251 0 : ) -> Result<Response<Body>, ApiError> {
1252 0 : check_permissions(&req, Scope::Admin)?;
1253 :
1254 0 : let req = match maybe_forward(req).await {
1255 0 : ForwardOutcome::Forwarded(res) => {
1256 0 : return res;
1257 : }
1258 0 : ForwardOutcome::NotForwarded(req) => req,
1259 : };
1260 :
1261 0 : let state = get_state(&req);
1262 0 : let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
1263 :
1264 0 : json_response(
1265 : StatusCode::OK,
1266 0 : MetadataHealthListUnhealthyResponse {
1267 0 : unhealthy_tenant_shards,
1268 0 : },
1269 : )
1270 0 : }
1271 :
1272 0 : async fn handle_metadata_health_list_outdated(
1273 0 : req: Request<Body>,
1274 0 : ) -> Result<Response<Body>, ApiError> {
1275 0 : check_permissions(&req, Scope::Admin)?;
1276 :
1277 0 : let mut req = match maybe_forward(req).await {
1278 0 : ForwardOutcome::Forwarded(res) => {
1279 0 : return res;
1280 : }
1281 0 : ForwardOutcome::NotForwarded(req) => req,
1282 : };
1283 :
1284 0 : let list_outdated_req = json_request::<MetadataHealthListOutdatedRequest>(&mut req).await?;
1285 0 : let state = get_state(&req);
1286 0 : let health_records = state
1287 0 : .service
1288 0 : .metadata_health_list_outdated(list_outdated_req.not_scrubbed_for)
1289 0 : .await?;
1290 :
1291 0 : json_response(
1292 : StatusCode::OK,
1293 0 : MetadataHealthListOutdatedResponse { health_records },
1294 : )
1295 0 : }
1296 :
1297 0 : async fn handle_tenant_shard_split(
1298 0 : service: Arc<Service>,
1299 0 : req: Request<Body>,
1300 0 : ) -> Result<Response<Body>, ApiError> {
1301 0 : check_permissions(&req, Scope::Admin)?;
1302 : // NB: don't rate limit: admin operation.
1303 :
1304 0 : let mut req = match maybe_forward(req).await {
1305 0 : ForwardOutcome::Forwarded(res) => {
1306 0 : return res;
1307 : }
1308 0 : ForwardOutcome::NotForwarded(req) => req,
1309 : };
1310 :
1311 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1312 0 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
1313 :
1314 0 : json_response(
1315 : StatusCode::OK,
1316 0 : service.tenant_shard_split(tenant_id, split_req).await?,
1317 : )
1318 0 : }
1319 :
1320 0 : async fn handle_tenant_shard_migrate(
1321 0 : service: Arc<Service>,
1322 0 : req: Request<Body>,
1323 0 : ) -> Result<Response<Body>, ApiError> {
1324 0 : check_permissions(&req, Scope::Admin)?;
1325 : // NB: don't rate limit: admin operation.
1326 :
1327 0 : let mut req = match maybe_forward(req).await {
1328 0 : ForwardOutcome::Forwarded(res) => {
1329 0 : return res;
1330 : }
1331 0 : ForwardOutcome::NotForwarded(req) => req,
1332 : };
1333 :
1334 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1335 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1336 0 : json_response(
1337 : StatusCode::OK,
1338 0 : service
1339 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
1340 0 : .await?,
1341 : )
1342 0 : }
1343 :
1344 0 : async fn handle_tenant_shard_migrate_secondary(
1345 0 : service: Arc<Service>,
1346 0 : req: Request<Body>,
1347 0 : ) -> Result<Response<Body>, ApiError> {
1348 0 : check_permissions(&req, Scope::Admin)?;
1349 : // NB: don't rate limit: admin operation.
1350 :
1351 0 : let mut req = match maybe_forward(req).await {
1352 0 : ForwardOutcome::Forwarded(res) => {
1353 0 : return res;
1354 : }
1355 0 : ForwardOutcome::NotForwarded(req) => req,
1356 : };
1357 :
1358 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1359 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1360 0 : json_response(
1361 : StatusCode::OK,
1362 0 : service
1363 0 : .tenant_shard_migrate_secondary(tenant_shard_id, migrate_req)
1364 0 : .await?,
1365 : )
1366 0 : }
1367 :
1368 0 : async fn handle_tenant_shard_cancel_reconcile(
1369 0 : service: Arc<Service>,
1370 0 : req: Request<Body>,
1371 0 : ) -> Result<Response<Body>, ApiError> {
1372 0 : check_permissions(&req, Scope::Admin)?;
1373 : // NB: don't rate limit: admin operation.
1374 :
1375 0 : let req = match maybe_forward(req).await {
1376 0 : ForwardOutcome::Forwarded(res) => {
1377 0 : return res;
1378 : }
1379 0 : ForwardOutcome::NotForwarded(req) => req,
1380 : };
1381 :
1382 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1383 0 : json_response(
1384 : StatusCode::OK,
1385 0 : service
1386 0 : .tenant_shard_cancel_reconcile(tenant_shard_id)
1387 0 : .await?,
1388 : )
1389 0 : }
1390 :
1391 0 : async fn handle_tenant_update_policy(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1392 0 : check_permissions(&req, Scope::Admin)?;
1393 : // NB: don't rate limit: admin operation.
1394 :
1395 0 : let mut req = match maybe_forward(req).await {
1396 0 : ForwardOutcome::Forwarded(res) => {
1397 0 : return res;
1398 : }
1399 0 : ForwardOutcome::NotForwarded(req) => req,
1400 : };
1401 :
1402 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1403 0 : let update_req = json_request::<TenantPolicyRequest>(&mut req).await?;
1404 0 : let state = get_state(&req);
1405 :
1406 0 : json_response(
1407 : StatusCode::OK,
1408 0 : state
1409 0 : .service
1410 0 : .tenant_update_policy(tenant_id, update_req)
1411 0 : .await?,
1412 : )
1413 0 : }
1414 :
1415 0 : async fn handle_update_preferred_azs(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1416 0 : check_permissions(&req, Scope::Admin)?;
1417 :
1418 0 : let mut req = match maybe_forward(req).await {
1419 0 : ForwardOutcome::Forwarded(res) => {
1420 0 : return res;
1421 : }
1422 0 : ForwardOutcome::NotForwarded(req) => req,
1423 : };
1424 :
1425 0 : let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
1426 0 : let state = get_state(&req);
1427 :
1428 0 : json_response(
1429 : StatusCode::OK,
1430 0 : state.service.update_shards_preferred_azs(azs_req).await?,
1431 : )
1432 0 : }
1433 :
1434 0 : async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1435 0 : check_permissions(&req, Scope::ControllerPeer)?;
1436 :
1437 0 : let req = match maybe_forward(req).await {
1438 0 : ForwardOutcome::Forwarded(res) => {
1439 0 : return res;
1440 : }
1441 0 : ForwardOutcome::NotForwarded(req) => req,
1442 : };
1443 :
1444 0 : let state = get_state(&req);
1445 0 : let result = state.service.step_down().await;
1446 :
1447 0 : json_response(StatusCode::OK, result)
1448 0 : }
1449 :
1450 0 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1451 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1452 0 : check_permissions(&req, Scope::PageServerApi)?;
1453 0 : maybe_rate_limit(&req, tenant_id).await;
1454 :
1455 0 : let req = match maybe_forward(req).await {
1456 0 : ForwardOutcome::Forwarded(res) => {
1457 0 : return res;
1458 : }
1459 0 : ForwardOutcome::NotForwarded(req) => req,
1460 : };
1461 :
1462 0 : let state = get_state(&req);
1463 :
1464 0 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
1465 0 : }
1466 :
1467 0 : async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1468 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1469 0 : check_permissions(&req, Scope::PageServerApi)?;
1470 0 : maybe_rate_limit(&req, tenant_id).await;
1471 :
1472 0 : let req = match maybe_forward(req).await {
1473 0 : ForwardOutcome::Forwarded(res) => {
1474 0 : return res;
1475 : }
1476 0 : ForwardOutcome::NotForwarded(req) => req,
1477 : };
1478 :
1479 0 : let state = get_state(&req);
1480 :
1481 0 : json_response(
1482 : StatusCode::OK,
1483 0 : state.service.tenant_import(tenant_id).await?,
1484 : )
1485 0 : }
1486 :
1487 0 : async fn handle_timeline_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1488 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1489 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
1490 0 : check_permissions(&req, Scope::PageServerApi)?;
1491 0 : maybe_rate_limit(&req, tenant_id).await;
1492 :
1493 0 : let mut req = match maybe_forward(req).await {
1494 0 : ForwardOutcome::Forwarded(res) => {
1495 0 : return res;
1496 : }
1497 0 : ForwardOutcome::NotForwarded(req) => req,
1498 : };
1499 :
1500 0 : let import_req = json_request::<TimelineImportRequest>(&mut req).await?;
1501 :
1502 0 : let state = get_state(&req);
1503 :
1504 0 : if import_req.tenant_id != tenant_id || import_req.timeline_id != timeline_id {
1505 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1506 0 : "tenant id or timeline id mismatch: url={tenant_id}/{timeline_id}, body={}/{}",
1507 0 : import_req.tenant_id,
1508 0 : import_req.timeline_id
1509 0 : )));
1510 0 : }
1511 :
1512 0 : json_response(
1513 : StatusCode::OK,
1514 0 : state.service.timeline_import(import_req).await?,
1515 : )
1516 0 : }
1517 :
1518 0 : async fn handle_tenant_timeline_locate(
1519 0 : service: Arc<Service>,
1520 0 : req: Request<Body>,
1521 0 : ) -> Result<Response<Body>, ApiError> {
1522 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1523 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
1524 :
1525 0 : check_permissions(&req, Scope::Admin)?;
1526 0 : maybe_rate_limit(&req, tenant_id).await;
1527 :
1528 0 : match maybe_forward(req).await {
1529 0 : ForwardOutcome::Forwarded(res) => {
1530 0 : return res;
1531 : }
1532 0 : ForwardOutcome::NotForwarded(_req) => {}
1533 : };
1534 :
1535 0 : json_response(
1536 : StatusCode::OK,
1537 0 : service
1538 0 : .tenant_timeline_locate(tenant_id, timeline_id)
1539 0 : .await?,
1540 : )
1541 0 : }
1542 :
1543 0 : async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1544 0 : check_permissions(&req, Scope::Admin)?;
1545 :
1546 0 : let req = match maybe_forward(req).await {
1547 0 : ForwardOutcome::Forwarded(res) => {
1548 0 : return res;
1549 : }
1550 0 : ForwardOutcome::NotForwarded(req) => req,
1551 : };
1552 :
1553 0 : let state = get_state(&req);
1554 0 : state.service.tenants_dump()
1555 0 : }
1556 :
1557 0 : async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1558 0 : check_permissions(&req, Scope::Admin)?;
1559 :
1560 0 : let req = match maybe_forward(req).await {
1561 0 : ForwardOutcome::Forwarded(res) => {
1562 0 : return res;
1563 : }
1564 0 : ForwardOutcome::NotForwarded(req) => req,
1565 : };
1566 :
1567 0 : let state = get_state(&req);
1568 0 : state.service.scheduler_dump()
1569 0 : }
1570 :
1571 0 : async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1572 0 : check_permissions(&req, Scope::Admin)?;
1573 :
1574 0 : let req = match maybe_forward(req).await {
1575 0 : ForwardOutcome::Forwarded(res) => {
1576 0 : return res;
1577 : }
1578 0 : ForwardOutcome::NotForwarded(req) => req,
1579 : };
1580 :
1581 0 : let state = get_state(&req);
1582 :
1583 0 : json_response(StatusCode::OK, state.service.consistency_check().await?)
1584 0 : }
1585 :
1586 0 : async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1587 0 : check_permissions(&req, Scope::Admin)?;
1588 :
1589 0 : let req = match maybe_forward(req).await {
1590 0 : ForwardOutcome::Forwarded(res) => {
1591 0 : return res;
1592 : }
1593 0 : ForwardOutcome::NotForwarded(req) => req,
1594 : };
1595 :
1596 0 : let state = get_state(&req);
1597 :
1598 0 : json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
1599 0 : }
1600 :
1601 : /// Status endpoint is just used for checking that our HTTP listener is up
1602 : ///
1603 : /// This serves as our k8s startup probe.
1604 0 : async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1605 0 : match maybe_forward(req).await {
1606 0 : ForwardOutcome::Forwarded(res) => {
1607 0 : return res;
1608 : }
1609 0 : ForwardOutcome::NotForwarded(_req) => {}
1610 : };
1611 :
1612 0 : json_response(StatusCode::OK, ())
1613 0 : }
1614 :
1615 : /// Liveness endpoint indicates that this storage controller is in a state
1616 : /// where it can fulfill it's responsibilties. Namely, startup has finished
1617 : /// and it is the current leader.
1618 : ///
1619 : /// This serves as our k8s liveness probe.
1620 0 : async fn handle_live(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1621 0 : let req = match maybe_forward(req).await {
1622 0 : ForwardOutcome::Forwarded(res) => {
1623 0 : return res;
1624 : }
1625 0 : ForwardOutcome::NotForwarded(req) => req,
1626 : };
1627 :
1628 0 : let state = get_state(&req);
1629 0 : let live = state.service.startup_complete.is_ready()
1630 0 : && state.service.get_leadership_status() == LeadershipStatus::Leader;
1631 :
1632 0 : if live {
1633 0 : json_response(StatusCode::OK, ())
1634 : } else {
1635 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
1636 : }
1637 0 : }
1638 :
1639 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
1640 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
1641 0 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1642 0 : let req = match maybe_forward(req).await {
1643 0 : ForwardOutcome::Forwarded(res) => {
1644 0 : return res;
1645 : }
1646 0 : ForwardOutcome::NotForwarded(req) => req,
1647 : };
1648 :
1649 0 : let state = get_state(&req);
1650 0 : if state.service.startup_complete.is_ready() {
1651 0 : json_response(StatusCode::OK, ())
1652 : } else {
1653 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
1654 : }
1655 0 : }
1656 :
1657 : impl From<ReconcileError> for ApiError {
1658 0 : fn from(value: ReconcileError) -> Self {
1659 0 : ApiError::Conflict(format!("Reconciliation error: {value}"))
1660 0 : }
1661 : }
1662 :
1663 : /// Return the safekeeper record by instance id, or 404.
1664 : ///
1665 : /// Not used by anything except manual testing.
1666 0 : async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1667 0 : check_permissions(&req, Scope::Infra)?;
1668 :
1669 0 : let id = parse_request_param::<i64>(&req, "id")?;
1670 :
1671 0 : let req = match maybe_forward(req).await {
1672 0 : ForwardOutcome::Forwarded(res) => {
1673 0 : return res;
1674 : }
1675 0 : ForwardOutcome::NotForwarded(req) => req,
1676 : };
1677 :
1678 0 : let state = get_state(&req);
1679 :
1680 0 : let res = state.service.get_safekeeper(id).await;
1681 :
1682 0 : match res {
1683 0 : Ok(b) => json_response(StatusCode::OK, b),
1684 : Err(crate::persistence::DatabaseError::Query(diesel::result::Error::NotFound)) => {
1685 0 : Err(ApiError::NotFound("unknown instance id".into()))
1686 : }
1687 0 : Err(other) => Err(other.into()),
1688 : }
1689 0 : }
1690 :
1691 : /// Used as part of deployment scripts.
1692 : ///
1693 : /// Assumes information is only relayed to storage controller after first selecting an unique id on
1694 : /// control plane database, which means we have an id field in the request and payload.
1695 0 : async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
1696 0 : check_permissions(&req, Scope::Infra)?;
1697 :
1698 0 : let body = json_request::<SafekeeperUpsert>(&mut req).await?;
1699 0 : let id = parse_request_param::<i64>(&req, "id")?;
1700 :
1701 0 : if id != body.id {
1702 : // it should be repeated
1703 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1704 0 : "id mismatch: url={id:?}, body={:?}",
1705 0 : body.id
1706 0 : )));
1707 0 : }
1708 :
1709 0 : if id <= 0 {
1710 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1711 0 : "id not allowed to be zero or negative: {id}"
1712 0 : )));
1713 0 : }
1714 :
1715 0 : let req = match maybe_forward(req).await {
1716 0 : ForwardOutcome::Forwarded(res) => {
1717 0 : return res;
1718 : }
1719 0 : ForwardOutcome::NotForwarded(req) => req,
1720 : };
1721 :
1722 0 : let state = get_state(&req);
1723 :
1724 0 : state.service.upsert_safekeeper(body).await?;
1725 :
1726 0 : Ok(Response::builder()
1727 0 : .status(StatusCode::NO_CONTENT)
1728 0 : .body(Body::empty())
1729 0 : .unwrap())
1730 0 : }
1731 :
1732 : /// Sets the scheduling policy of the specified safekeeper
1733 0 : async fn handle_safekeeper_scheduling_policy(
1734 0 : mut req: Request<Body>,
1735 0 : ) -> Result<Response<Body>, ApiError> {
1736 0 : check_permissions(&req, Scope::Admin)?;
1737 :
1738 0 : let body = json_request::<SafekeeperSchedulingPolicyRequest>(&mut req).await?;
1739 0 : let id = parse_request_param::<i64>(&req, "id")?;
1740 :
1741 0 : let req = match maybe_forward(req).await {
1742 0 : ForwardOutcome::Forwarded(res) => {
1743 0 : return res;
1744 : }
1745 0 : ForwardOutcome::NotForwarded(req) => req,
1746 : };
1747 :
1748 0 : let state = get_state(&req);
1749 :
1750 0 : state
1751 0 : .service
1752 0 : .set_safekeeper_scheduling_policy(id, body.scheduling_policy)
1753 0 : .await?;
1754 :
1755 0 : json_response(StatusCode::OK, ())
1756 0 : }
1757 :
1758 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
1759 : /// be allowed to run if Service has finished its initial reconciliation.
1760 0 : async fn tenant_service_handler<R, H>(
1761 0 : request: Request<Body>,
1762 0 : handler: H,
1763 0 : request_name: RequestName,
1764 0 : ) -> R::Output
1765 0 : where
1766 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1767 0 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
1768 0 : {
1769 0 : let state = get_state(&request);
1770 0 : let service = state.service.clone();
1771 :
1772 0 : let startup_complete = service.startup_complete.clone();
1773 0 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
1774 0 : .await
1775 0 : .is_err()
1776 : {
1777 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
1778 : // timeouts around its remote calls, to bound its runtime.
1779 0 : return Err(ApiError::Timeout(
1780 0 : "Timed out waiting for service readiness".into(),
1781 0 : ));
1782 0 : }
1783 :
1784 0 : named_request_span(
1785 0 : request,
1786 0 : |request| async move { handler(service, request).await },
1787 0 : request_name,
1788 : )
1789 0 : .await
1790 0 : }
1791 :
1792 : /// Check if the required scope is held in the request's token, or if the request has
1793 : /// a token with 'admin' scope then always permit it.
1794 0 : fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(), ApiError> {
1795 0 : check_permission_with(request, |claims| {
1796 0 : match crate::auth::check_permission(claims, required_scope) {
1797 0 : Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
1798 0 : Ok(()) => Ok(()),
1799 0 : Err(_) => Err(e),
1800 : },
1801 0 : Ok(()) => Ok(()),
1802 : }
1803 0 : })
1804 0 : }
1805 : /// Similar to `check_permissions()` above, but checks for TenantEndpoint scope specifically. Used by the compute spec-fetch API.
1806 : /// Access by Admin-scope tokens is also permitted.
1807 : /// TODO(william.huang): Merge with the previous function by refactoring `Scope` to make it carry the dependent arguments.
1808 : /// E.g., `Scope::TenantEndpoint(EndpointId)`, `Scope::Tenant(TenantId)`, etc.
1809 : #[allow(unused)]
1810 0 : fn check_endpoint_permission(request: &Request<Body>, endpoint_id: Uuid) -> Result<(), ApiError> {
1811 0 : check_permission_with(
1812 0 : request,
1813 0 : |claims| match crate::auth::check_endpoint_permission(claims, endpoint_id) {
1814 0 : Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
1815 0 : Ok(()) => Ok(()),
1816 0 : Err(_) => Err(e),
1817 : },
1818 0 : Ok(()) => Ok(()),
1819 0 : },
1820 : )
1821 0 : }
1822 :
1823 : #[derive(Clone, Debug)]
1824 : struct RequestMeta {
1825 : method: hyper::http::Method,
1826 : at: Instant,
1827 : }
1828 :
1829 0 : pub fn prologue_leadership_status_check_middleware<
1830 0 : B: hyper::body::HttpBody + Send + Sync + 'static,
1831 0 : >() -> Middleware<B, ApiError> {
1832 0 : Middleware::pre(move |req| async move {
1833 0 : let state = get_state(&req);
1834 0 : let leadership_status = state.service.get_leadership_status();
1835 :
1836 : enum AllowedRoutes {
1837 : All,
1838 : Some(&'static [&'static str]),
1839 : }
1840 :
1841 0 : let allowed_routes = match leadership_status {
1842 0 : LeadershipStatus::Leader => AllowedRoutes::All,
1843 0 : LeadershipStatus::SteppedDown => AllowedRoutes::All,
1844 0 : LeadershipStatus::Candidate => AllowedRoutes::Some(&[
1845 0 : "/ready",
1846 0 : "/status",
1847 0 : "/metrics",
1848 0 : "/profile/cpu",
1849 0 : "/profile/heap",
1850 0 : ]),
1851 : };
1852 :
1853 0 : match allowed_routes {
1854 0 : AllowedRoutes::All => Ok(req),
1855 0 : AllowedRoutes::Some(allowed) if allowed.contains(&req.uri().path()) => Ok(req),
1856 : _ => {
1857 0 : tracing::info!(
1858 0 : "Request {} not allowed due to current leadership state",
1859 0 : req.uri()
1860 : );
1861 :
1862 0 : Err(ApiError::ResourceUnavailable(
1863 0 : format!("Current leadership status is {leadership_status}").into(),
1864 0 : ))
1865 : }
1866 : }
1867 0 : })
1868 0 : }
1869 :
1870 0 : fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
1871 0 : -> Middleware<B, ApiError> {
1872 0 : Middleware::pre(move |req| async move {
1873 0 : let meta = RequestMeta {
1874 0 : method: req.method().clone(),
1875 0 : at: Instant::now(),
1876 0 : };
1877 :
1878 0 : req.set_context(meta);
1879 :
1880 0 : Ok(req)
1881 0 : })
1882 0 : }
1883 :
1884 0 : fn epilogue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
1885 0 : -> Middleware<B, ApiError> {
1886 0 : Middleware::post_with_info(move |resp, req_info| async move {
1887 0 : let request_name = match req_info.context::<RequestName>() {
1888 0 : Some(name) => name,
1889 : None => {
1890 0 : return Ok(resp);
1891 : }
1892 : };
1893 :
1894 0 : if let Some(meta) = req_info.context::<RequestMeta>() {
1895 0 : let status = &crate::metrics::METRICS_REGISTRY
1896 0 : .metrics_group
1897 0 : .storage_controller_http_request_status;
1898 0 : let latency = &crate::metrics::METRICS_REGISTRY
1899 0 : .metrics_group
1900 0 : .storage_controller_http_request_latency;
1901 0 :
1902 0 : status.inc(HttpRequestStatusLabelGroup {
1903 0 : path: request_name.0,
1904 0 : method: meta.method.clone().into(),
1905 0 : status: crate::metrics::StatusCode(resp.status()),
1906 0 : });
1907 0 :
1908 0 : latency.observe(
1909 0 : HttpRequestLatencyLabelGroup {
1910 0 : path: request_name.0,
1911 0 : method: meta.method.into(),
1912 0 : },
1913 0 : meta.at.elapsed().as_secs_f64(),
1914 0 : );
1915 0 : }
1916 0 : Ok(resp)
1917 0 : })
1918 0 : }
1919 :
1920 0 : pub async fn measured_metrics_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1921 : pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
1922 :
1923 0 : let req = match maybe_forward(req).await {
1924 0 : ForwardOutcome::Forwarded(res) => {
1925 0 : return res;
1926 : }
1927 0 : ForwardOutcome::NotForwarded(req) => req,
1928 : };
1929 :
1930 0 : let state = get_state(&req);
1931 0 : let payload = crate::metrics::METRICS_REGISTRY.encode(&state.neon_metrics);
1932 0 : let response = Response::builder()
1933 0 : .status(200)
1934 0 : .header(CONTENT_TYPE, TEXT_FORMAT)
1935 0 : .body(payload.into())
1936 0 : .unwrap();
1937 :
1938 0 : Ok(response)
1939 0 : }
1940 :
1941 : #[derive(Clone)]
1942 : struct RequestName(&'static str);
1943 :
1944 0 : async fn named_request_span<R, H>(
1945 0 : request: Request<Body>,
1946 0 : handler: H,
1947 0 : name: RequestName,
1948 0 : ) -> R::Output
1949 0 : where
1950 0 : R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1951 0 : H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
1952 0 : {
1953 0 : request.set_context(name);
1954 0 : request_span(request, handler).await
1955 0 : }
1956 :
1957 : enum ForwardOutcome {
1958 : Forwarded(Result<Response<Body>, ApiError>),
1959 : NotForwarded(Request<Body>),
1960 : }
1961 :
1962 : /// Potentially forward the request to the current storage controler leader.
1963 : /// More specifically we forward when:
1964 : /// 1. Request is not one of:
1965 : /// ["/control/v1/step_down", "/status", "/ready", "/metrics", "/profile/cpu", "/profile/heap"]
1966 : /// 2. Current instance is in [`LeadershipStatus::SteppedDown`] state
1967 : /// 3. There is a leader in the database to forward to
1968 : /// 4. Leader from step (3) is not the current instance
1969 : ///
1970 : /// Why forward?
1971 : /// It turns out that we can't rely on external orchestration to promptly route trafic to the
1972 : /// new leader. This is downtime inducing. Forwarding provides a safe way out.
1973 : ///
1974 : /// Why is it safe?
1975 : /// If a storcon instance is persisted in the database, then we know that it is the current leader.
1976 : /// There's one exception: time between handling step-down request and the new leader updating the
1977 : /// database.
1978 : ///
1979 : /// Let's treat the happy case first. The stepped down node does not produce any side effects,
1980 : /// since all request handling happens on the leader.
1981 : ///
1982 : /// As for the edge case, we are guaranteed to always have a maximum of two running instances.
1983 : /// Hence, if we are in the edge case scenario the leader persisted in the database is the
1984 : /// stepped down instance that received the request. Condition (4) above covers this scenario.
1985 0 : async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
1986 : const NOT_FOR_FORWARD: &[&str] = &[
1987 : "/control/v1/step_down",
1988 : "/status",
1989 : "/live",
1990 : "/ready",
1991 : "/metrics",
1992 : "/profile/cpu",
1993 : "/profile/heap",
1994 : ];
1995 :
1996 0 : let uri = req.uri();
1997 0 : let uri_for_forward = !NOT_FOR_FORWARD.contains(&uri.path());
1998 :
1999 : // Fast return before trying to take any Service locks, if we will never forward anyway
2000 0 : if !uri_for_forward {
2001 0 : return ForwardOutcome::NotForwarded(req);
2002 0 : }
2003 :
2004 0 : let state = get_state(&req);
2005 0 : let leadership_status = state.service.get_leadership_status();
2006 :
2007 0 : if leadership_status != LeadershipStatus::SteppedDown {
2008 0 : return ForwardOutcome::NotForwarded(req);
2009 0 : }
2010 :
2011 0 : let leader = state.service.get_leader().await;
2012 0 : let leader = {
2013 0 : match leader {
2014 0 : Ok(Some(leader)) => leader,
2015 : Ok(None) => {
2016 0 : return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
2017 0 : "No leader to forward to while in stepped down state".into(),
2018 0 : )));
2019 : }
2020 0 : Err(err) => {
2021 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
2022 0 : anyhow::anyhow!(
2023 0 : "Failed to get leader for forwarding while in stepped down state: {err}"
2024 0 : ),
2025 0 : )));
2026 : }
2027 : }
2028 : };
2029 :
2030 0 : let cfg = state.service.get_config();
2031 0 : if let Some(ref self_addr) = cfg.address_for_peers {
2032 0 : let leader_addr = match Uri::from_str(leader.address.as_str()) {
2033 0 : Ok(uri) => uri,
2034 0 : Err(err) => {
2035 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
2036 0 : anyhow::anyhow!(
2037 0 : "Failed to parse leader uri for forwarding while in stepped down state: {err}"
2038 0 : ),
2039 0 : )));
2040 : }
2041 : };
2042 :
2043 0 : if *self_addr == leader_addr {
2044 0 : return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
2045 0 : "Leader is stepped down instance".into(),
2046 0 : )));
2047 0 : }
2048 0 : }
2049 :
2050 0 : tracing::info!("Forwarding {} to leader at {}", uri, leader.address);
2051 :
2052 : // Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
2053 : // include some leeway to get the timeout for proxied requests.
2054 : const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
2055 :
2056 0 : let client = state.service.get_http_client().clone();
2057 :
2058 0 : let request: reqwest::Request = match convert_request(
2059 0 : req,
2060 0 : &client,
2061 0 : leader.address,
2062 : PROXIED_REQUEST_TIMEOUT,
2063 : )
2064 0 : .await
2065 : {
2066 0 : Ok(r) => r,
2067 0 : Err(err) => {
2068 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
2069 0 : "Failed to convert request for forwarding while in stepped down state: {err}"
2070 0 : ))));
2071 : }
2072 : };
2073 :
2074 0 : let response = match client.execute(request).await {
2075 0 : Ok(r) => r,
2076 0 : Err(err) => {
2077 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
2078 0 : "Failed to forward while in stepped down state: {err}"
2079 0 : ))));
2080 : }
2081 : };
2082 :
2083 0 : ForwardOutcome::Forwarded(convert_response(response).await)
2084 0 : }
2085 :
2086 : /// Convert a [`reqwest::Response`] to a [hyper::Response`] by passing through
2087 : /// a stable representation (string, bytes or integer)
2088 : ///
2089 : /// Ideally, we would not have to do this since both types use the http crate
2090 : /// under the hood. However, they use different versions of the crate and keeping
2091 : /// second order dependencies in sync is difficult.
2092 0 : async fn convert_response(resp: reqwest::Response) -> Result<hyper::Response<Body>, ApiError> {
2093 : use std::str::FromStr;
2094 :
2095 0 : let mut builder = hyper::Response::builder().status(resp.status().as_u16());
2096 0 : for (key, value) in resp.headers().into_iter() {
2097 0 : let key = hyper::header::HeaderName::from_str(key.as_str()).map_err(|err| {
2098 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
2099 0 : })?;
2100 :
2101 0 : let value = hyper::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
2102 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
2103 0 : })?;
2104 :
2105 0 : builder = builder.header(key, value);
2106 : }
2107 :
2108 0 : let body = http::Body::wrap_stream(resp.bytes_stream());
2109 :
2110 0 : builder.body(body).map_err(|err| {
2111 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
2112 0 : })
2113 0 : }
2114 :
2115 : /// Convert a [`reqwest::Request`] to a [hyper::Request`] by passing through
2116 : /// a stable representation (string, bytes or integer)
2117 : ///
2118 : /// See [`convert_response`] for why we are doing it this way.
2119 0 : async fn convert_request(
2120 0 : req: hyper::Request<Body>,
2121 0 : client: &reqwest::Client,
2122 0 : to_address: String,
2123 0 : timeout: Duration,
2124 0 : ) -> Result<reqwest::Request, ApiError> {
2125 : use std::str::FromStr;
2126 :
2127 0 : let (parts, body) = req.into_parts();
2128 0 : let method = reqwest::Method::from_str(parts.method.as_str()).map_err(|err| {
2129 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2130 0 : })?;
2131 :
2132 0 : let path_and_query = parts.uri.path_and_query().ok_or_else(|| {
2133 0 : ApiError::InternalServerError(anyhow::anyhow!(
2134 0 : "Request conversion failed: no path and query"
2135 0 : ))
2136 0 : })?;
2137 :
2138 0 : let uri = reqwest::Url::from_str(
2139 0 : format!(
2140 0 : "{}{}",
2141 0 : to_address.trim_end_matches("/"),
2142 0 : path_and_query.as_str()
2143 0 : )
2144 0 : .as_str(),
2145 : )
2146 0 : .map_err(|err| {
2147 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2148 0 : })?;
2149 :
2150 0 : let mut headers = reqwest::header::HeaderMap::new();
2151 0 : for (key, value) in parts.headers.into_iter() {
2152 0 : let key = match key {
2153 0 : Some(k) => k,
2154 : None => {
2155 0 : continue;
2156 : }
2157 : };
2158 :
2159 0 : let key = reqwest::header::HeaderName::from_str(key.as_str()).map_err(|err| {
2160 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2161 0 : })?;
2162 :
2163 0 : let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
2164 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2165 0 : })?;
2166 :
2167 0 : headers.insert(key, value);
2168 : }
2169 :
2170 0 : let body = hyper::body::to_bytes(body).await.map_err(|err| {
2171 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2172 0 : })?;
2173 :
2174 0 : client
2175 0 : .request(method, uri)
2176 0 : .headers(headers)
2177 0 : .body(body)
2178 0 : .timeout(timeout)
2179 0 : .build()
2180 0 : .map_err(|err| {
2181 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2182 0 : })
2183 0 : }
2184 :
2185 0 : pub fn make_router(
2186 0 : service: Arc<Service>,
2187 0 : auth: Option<Arc<SwappableJwtAuth>>,
2188 0 : build_info: BuildInfo,
2189 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
2190 0 : let mut router = endpoint::make_router()
2191 0 : .middleware(prologue_leadership_status_check_middleware())
2192 0 : .middleware(prologue_metrics_middleware())
2193 0 : .middleware(epilogue_metrics_middleware());
2194 0 : if auth.is_some() {
2195 0 : router = router.middleware(auth_middleware(|request| {
2196 0 : let state = get_state(request);
2197 0 : if state.allowlist_routes.contains(&request.uri().path()) {
2198 0 : None
2199 : } else {
2200 0 : state.auth.as_deref()
2201 : }
2202 0 : }));
2203 0 : }
2204 :
2205 0 : router
2206 0 : .data(Arc::new(HttpState::new(service, auth, build_info)))
2207 : // Non-prefixed generic endpoints (status, metrics, profiling)
2208 0 : .get("/metrics", |r| {
2209 0 : named_request_span(r, measured_metrics_handler, RequestName("metrics"))
2210 0 : })
2211 0 : .get("/status", |r| {
2212 0 : named_request_span(r, handle_status, RequestName("status"))
2213 0 : })
2214 0 : .get("/live", |r| {
2215 0 : named_request_span(r, handle_live, RequestName("live"))
2216 0 : })
2217 0 : .get("/ready", |r| {
2218 0 : named_request_span(r, handle_ready, RequestName("ready"))
2219 0 : })
2220 0 : .get("/profile/cpu", |r| {
2221 0 : named_request_span(r, profile_cpu_handler, RequestName("profile_cpu"))
2222 0 : })
2223 0 : .get("/profile/heap", |r| {
2224 0 : named_request_span(r, profile_heap_handler, RequestName("profile_heap"))
2225 0 : })
2226 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
2227 0 : .post("/upcall/v1/re-attach", |r| {
2228 0 : named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
2229 0 : })
2230 0 : .post("/upcall/v1/validate", |r| {
2231 0 : named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
2232 0 : })
2233 0 : .get("/upcall/v1/timeline_import_status", |r| {
2234 0 : named_request_span(
2235 0 : r,
2236 : handle_get_timeline_import_status,
2237 0 : RequestName("upcall_v1_timeline_import_status"),
2238 : )
2239 0 : })
2240 0 : .post("/upcall/v1/timeline_import_status", |r| {
2241 0 : named_request_span(
2242 0 : r,
2243 : handle_put_timeline_import_status,
2244 0 : RequestName("upcall_v1_timeline_import_status"),
2245 : )
2246 0 : })
2247 : // Test/dev/debug endpoints
2248 0 : .post("/debug/v1/attach-hook", |r| {
2249 0 : named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
2250 0 : })
2251 0 : .post("/debug/v1/inspect", |r| {
2252 0 : named_request_span(r, handle_inspect, RequestName("debug_v1_inspect"))
2253 0 : })
2254 0 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
2255 0 : named_request_span(r, handle_tenant_drop, RequestName("debug_v1_tenant_drop"))
2256 0 : })
2257 0 : .post("/debug/v1/node/:node_id/drop", |r| {
2258 0 : named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
2259 0 : })
2260 0 : .delete("/debug/v1/tombstone/:node_id", |r| {
2261 0 : named_request_span(
2262 0 : r,
2263 : handle_tombstone_delete,
2264 0 : RequestName("debug_v1_tombstone_delete"),
2265 : )
2266 0 : })
2267 0 : .get("/debug/v1/tombstone", |r| {
2268 0 : named_request_span(
2269 0 : r,
2270 : handle_tombstone_list,
2271 0 : RequestName("debug_v1_tombstone_list"),
2272 : )
2273 0 : })
2274 0 : .post("/debug/v1/tenant/:tenant_id/import", |r| {
2275 0 : named_request_span(
2276 0 : r,
2277 : handle_tenant_import,
2278 0 : RequestName("debug_v1_tenant_import"),
2279 : )
2280 0 : })
2281 0 : .get("/debug/v1/tenant", |r| {
2282 0 : named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
2283 0 : })
2284 0 : .get("/debug/v1/tenant/:tenant_id/locate", |r| {
2285 0 : tenant_service_handler(
2286 0 : r,
2287 : handle_tenant_locate,
2288 0 : RequestName("debug_v1_tenant_locate"),
2289 : )
2290 0 : })
2291 0 : .post(
2292 : "/debug/v1/tenant/:tenant_id/timeline/:timeline_id/import",
2293 0 : |r| {
2294 0 : named_request_span(
2295 0 : r,
2296 : handle_timeline_import,
2297 0 : RequestName("debug_v1_timeline_import"),
2298 : )
2299 0 : },
2300 : )
2301 0 : .get(
2302 : "/debug/v1/tenant/:tenant_id/timeline/:timeline_id/locate",
2303 0 : |r| {
2304 0 : tenant_service_handler(
2305 0 : r,
2306 : handle_tenant_timeline_locate,
2307 0 : RequestName("v1_tenant_timeline_locate"),
2308 : )
2309 0 : },
2310 : )
2311 0 : .get("/debug/v1/scheduler", |r| {
2312 0 : named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
2313 0 : })
2314 0 : .post("/debug/v1/consistency_check", |r| {
2315 0 : named_request_span(
2316 0 : r,
2317 : handle_consistency_check,
2318 0 : RequestName("debug_v1_consistency_check"),
2319 : )
2320 0 : })
2321 0 : .post("/debug/v1/reconcile_all", |r| {
2322 0 : request_span(r, handle_reconcile_all)
2323 0 : })
2324 0 : .put("/debug/v1/failpoints", |r| {
2325 0 : request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
2326 0 : })
2327 : // Node operations
2328 0 : .post("/control/v1/node", |r| {
2329 0 : named_request_span(r, handle_node_register, RequestName("control_v1_node"))
2330 0 : })
2331 : // This endpoint is deprecated and will be removed in a future version.
2332 : // Use PUT /control/v1/node/:node_id/delete instead.
2333 0 : .delete("/control/v1/node/:node_id", |r| {
2334 0 : named_request_span(
2335 0 : r,
2336 : handle_node_delete_old,
2337 0 : RequestName("control_v1_node_delete"),
2338 : )
2339 0 : })
2340 0 : .get("/control/v1/node", |r| {
2341 0 : named_request_span(r, handle_node_list, RequestName("control_v1_node"))
2342 0 : })
2343 0 : .put("/control/v1/node/:node_id/config", |r| {
2344 0 : named_request_span(
2345 0 : r,
2346 : handle_node_configure,
2347 0 : RequestName("control_v1_node_config"),
2348 : )
2349 0 : })
2350 0 : .get("/control/v1/node/:node_id", |r| {
2351 0 : named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
2352 0 : })
2353 0 : .get("/control/v1/node/:node_id/shards", |r| {
2354 0 : named_request_span(
2355 0 : r,
2356 : handle_node_shards,
2357 0 : RequestName("control_v1_node_describe"),
2358 : )
2359 0 : })
2360 0 : .get("/control/v1/leader", |r| {
2361 0 : named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
2362 0 : })
2363 0 : .put("/control/v1/node/:node_id/delete", |r| {
2364 0 : named_request_span(
2365 0 : r,
2366 : handle_node_delete,
2367 0 : RequestName("control_v1_start_node_delete"),
2368 : )
2369 0 : })
2370 0 : .delete("/control/v1/node/:node_id/delete", |r| {
2371 0 : named_request_span(
2372 0 : r,
2373 : handle_cancel_node_delete,
2374 0 : RequestName("control_v1_cancel_node_delete"),
2375 : )
2376 0 : })
2377 0 : .put("/control/v1/node/:node_id/drain", |r| {
2378 0 : named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
2379 0 : })
2380 0 : .delete("/control/v1/node/:node_id/drain", |r| {
2381 0 : named_request_span(
2382 0 : r,
2383 : handle_cancel_node_drain,
2384 0 : RequestName("control_v1_cancel_node_drain"),
2385 : )
2386 0 : })
2387 0 : .put("/control/v1/node/:node_id/fill", |r| {
2388 0 : named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
2389 0 : })
2390 0 : .delete("/control/v1/node/:node_id/fill", |r| {
2391 0 : named_request_span(
2392 0 : r,
2393 : handle_cancel_node_fill,
2394 0 : RequestName("control_v1_cancel_node_fill"),
2395 : )
2396 0 : })
2397 : // Metadata health operations
2398 0 : .post("/control/v1/metadata_health/update", |r| {
2399 0 : named_request_span(
2400 0 : r,
2401 : handle_metadata_health_update,
2402 0 : RequestName("control_v1_metadata_health_update"),
2403 : )
2404 0 : })
2405 0 : .get("/control/v1/metadata_health/unhealthy", |r| {
2406 0 : named_request_span(
2407 0 : r,
2408 : handle_metadata_health_list_unhealthy,
2409 0 : RequestName("control_v1_metadata_health_list_unhealthy"),
2410 : )
2411 0 : })
2412 0 : .post("/control/v1/metadata_health/outdated", |r| {
2413 0 : named_request_span(
2414 0 : r,
2415 : handle_metadata_health_list_outdated,
2416 0 : RequestName("control_v1_metadata_health_list_outdated"),
2417 : )
2418 0 : })
2419 : // Safekeepers
2420 0 : .get("/control/v1/safekeeper", |r| {
2421 0 : named_request_span(
2422 0 : r,
2423 : handle_safekeeper_list,
2424 0 : RequestName("control_v1_safekeeper_list"),
2425 : )
2426 0 : })
2427 0 : .get("/control/v1/safekeeper/:id", |r| {
2428 0 : named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
2429 0 : })
2430 0 : .post("/control/v1/safekeeper/:id", |r| {
2431 : // id is in the body
2432 0 : named_request_span(
2433 0 : r,
2434 : handle_upsert_safekeeper,
2435 0 : RequestName("v1_safekeeper_post"),
2436 : )
2437 0 : })
2438 0 : .post("/control/v1/safekeeper/:id/scheduling_policy", |r| {
2439 0 : named_request_span(
2440 0 : r,
2441 : handle_safekeeper_scheduling_policy,
2442 0 : RequestName("v1_safekeeper_scheduling_policy"),
2443 : )
2444 0 : })
2445 : // Tenant Shard operations
2446 0 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
2447 0 : tenant_service_handler(
2448 0 : r,
2449 : handle_tenant_shard_migrate,
2450 0 : RequestName("control_v1_tenant_migrate"),
2451 : )
2452 0 : })
2453 0 : .put(
2454 : "/control/v1/tenant/:tenant_shard_id/migrate_secondary",
2455 0 : |r| {
2456 0 : tenant_service_handler(
2457 0 : r,
2458 : handle_tenant_shard_migrate_secondary,
2459 0 : RequestName("control_v1_tenant_migrate_secondary"),
2460 : )
2461 0 : },
2462 : )
2463 0 : .put(
2464 : "/control/v1/tenant/:tenant_shard_id/cancel_reconcile",
2465 0 : |r| {
2466 0 : tenant_service_handler(
2467 0 : r,
2468 : handle_tenant_shard_cancel_reconcile,
2469 0 : RequestName("control_v1_tenant_cancel_reconcile"),
2470 : )
2471 0 : },
2472 : )
2473 0 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
2474 0 : tenant_service_handler(
2475 0 : r,
2476 : handle_tenant_shard_split,
2477 0 : RequestName("control_v1_tenant_shard_split"),
2478 : )
2479 0 : })
2480 0 : .get("/control/v1/tenant/:tenant_id", |r| {
2481 0 : tenant_service_handler(
2482 0 : r,
2483 : handle_tenant_describe,
2484 0 : RequestName("control_v1_tenant_describe"),
2485 : )
2486 0 : })
2487 0 : .get("/control/v1/tenant", |r| {
2488 0 : tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list"))
2489 0 : })
2490 0 : .put("/control/v1/tenant/:tenant_id/policy", |r| {
2491 0 : named_request_span(
2492 0 : r,
2493 : handle_tenant_update_policy,
2494 0 : RequestName("control_v1_tenant_policy"),
2495 : )
2496 0 : })
2497 0 : .put("/control/v1/preferred_azs", |r| {
2498 0 : named_request_span(
2499 0 : r,
2500 : handle_update_preferred_azs,
2501 0 : RequestName("control_v1_preferred_azs"),
2502 : )
2503 0 : })
2504 0 : .put("/control/v1/step_down", |r| {
2505 0 : named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
2506 0 : })
2507 : // Tenant operations
2508 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
2509 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
2510 0 : .post("/v1/tenant", |r| {
2511 0 : tenant_service_handler(r, handle_tenant_create, RequestName("v1_tenant"))
2512 0 : })
2513 0 : .delete("/v1/tenant/:tenant_id", |r| {
2514 0 : tenant_service_handler(r, handle_tenant_delete, RequestName("v1_tenant"))
2515 0 : })
2516 0 : .patch("/v1/tenant/config", |r| {
2517 0 : tenant_service_handler(
2518 0 : r,
2519 : handle_tenant_config_patch,
2520 0 : RequestName("v1_tenant_config"),
2521 : )
2522 0 : })
2523 0 : .put("/v1/tenant/config", |r| {
2524 0 : tenant_service_handler(r, handle_tenant_config_set, RequestName("v1_tenant_config"))
2525 0 : })
2526 0 : .get("/v1/tenant/:tenant_id/config", |r| {
2527 0 : tenant_service_handler(r, handle_tenant_config_get, RequestName("v1_tenant_config"))
2528 0 : })
2529 0 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
2530 0 : tenant_service_handler(
2531 0 : r,
2532 : handle_tenant_location_config,
2533 0 : RequestName("v1_tenant_location_config"),
2534 : )
2535 0 : })
2536 0 : .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
2537 0 : tenant_service_handler(
2538 0 : r,
2539 : handle_tenant_time_travel_remote_storage,
2540 0 : RequestName("v1_tenant_time_travel_remote_storage"),
2541 : )
2542 0 : })
2543 0 : .post("/v1/tenant/:tenant_id/secondary/download", |r| {
2544 0 : tenant_service_handler(
2545 0 : r,
2546 : handle_tenant_secondary_download,
2547 0 : RequestName("v1_tenant_secondary_download"),
2548 : )
2549 0 : })
2550 : // Timeline operations
2551 0 : .get("/control/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
2552 0 : tenant_service_handler(
2553 0 : r,
2554 : handle_tenant_timeline_describe,
2555 0 : RequestName("v1_tenant_timeline_describe"),
2556 : )
2557 0 : })
2558 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
2559 0 : tenant_service_handler(
2560 0 : r,
2561 : handle_tenant_timeline_delete,
2562 0 : RequestName("v1_tenant_timeline"),
2563 : )
2564 0 : })
2565 0 : .post("/v1/tenant/:tenant_id/timeline", |r| {
2566 0 : tenant_service_handler(
2567 0 : r,
2568 : handle_tenant_timeline_create,
2569 0 : RequestName("v1_tenant_timeline"),
2570 : )
2571 0 : })
2572 0 : .put(
2573 : "/v1/tenant/:tenant_id/timeline/:timeline_id/archival_config",
2574 0 : |r| {
2575 0 : tenant_service_handler(
2576 0 : r,
2577 : handle_tenant_timeline_archival_config,
2578 0 : RequestName("v1_tenant_timeline_archival_config"),
2579 : )
2580 0 : },
2581 : )
2582 0 : .put(
2583 : "/v1/tenant/:tenant_id/timeline/:timeline_id/detach_ancestor",
2584 0 : |r| {
2585 0 : tenant_service_handler(
2586 0 : r,
2587 : handle_tenant_timeline_detach_ancestor,
2588 0 : RequestName("v1_tenant_timeline_detach_ancestor"),
2589 : )
2590 0 : },
2591 : )
2592 0 : .post(
2593 : "/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
2594 0 : |r| {
2595 0 : tenant_service_handler(
2596 0 : r,
2597 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
2598 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2599 : )
2600 0 : },
2601 : )
2602 0 : .post(
2603 : "/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
2604 0 : |r| {
2605 0 : tenant_service_handler(
2606 0 : r,
2607 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
2608 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2609 : )
2610 0 : },
2611 : )
2612 0 : .post(
2613 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
2614 0 : |r| {
2615 0 : tenant_service_handler(
2616 0 : r,
2617 : handle_tenant_timeline_download_heatmap_layers,
2618 0 : RequestName("v1_tenant_timeline_download_heatmap_layers"),
2619 : )
2620 0 : },
2621 : )
2622 0 : .post(
2623 : "/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate",
2624 0 : |r| {
2625 0 : tenant_service_handler(
2626 0 : r,
2627 : handle_tenant_timeline_safekeeper_migrate,
2628 0 : RequestName("v1_tenant_timeline_safekeeper_migrate"),
2629 : )
2630 0 : },
2631 : )
2632 : // LSN lease passthrough to all shards
2633 0 : .post(
2634 : "/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",
2635 0 : |r| {
2636 0 : tenant_service_handler(
2637 0 : r,
2638 : handle_tenant_timeline_lsn_lease,
2639 0 : RequestName("v1_tenant_timeline_lsn_lease"),
2640 : )
2641 0 : },
2642 : )
2643 : // Tenant timeline mark_invisible passthrough to shard zero
2644 0 : .put(
2645 : "/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible",
2646 0 : |r| {
2647 0 : tenant_service_handler(
2648 0 : r,
2649 : handle_tenant_timeline_passthrough,
2650 0 : RequestName("v1_tenant_timeline_mark_invisible_passthrough"),
2651 : )
2652 0 : },
2653 : )
2654 : // Tenant detail GET passthrough to shard zero:
2655 0 : .get("/v1/tenant/:tenant_id", |r| {
2656 0 : tenant_service_handler(
2657 0 : r,
2658 : handle_tenant_timeline_passthrough,
2659 0 : RequestName("v1_tenant_passthrough"),
2660 : )
2661 0 : })
2662 : // The `*` in the URL is a wildcard: any tenant/timeline GET APIs on the pageserver
2663 : // are implicitly exposed here. This must be last in the list to avoid
2664 : // taking precedence over other GET methods we might implement by hand.
2665 0 : .get("/v1/tenant/:tenant_id/*", |r| {
2666 0 : tenant_service_handler(
2667 0 : r,
2668 : handle_tenant_timeline_passthrough,
2669 0 : RequestName("v1_tenant_passthrough"),
2670 : )
2671 0 : })
2672 0 : }
2673 :
2674 : #[cfg(test)]
2675 : mod test {
2676 :
2677 : use super::path_without_ids;
2678 :
2679 : #[test]
2680 1 : fn test_path_without_ids() {
2681 1 : assert_eq!(
2682 1 : path_without_ids(
2683 1 : "/v1/tenant/1a2b3344556677881122334455667788/timeline/AA223344556677881122334455667788"
2684 : ),
2685 : "/v1/tenant//timeline/"
2686 : );
2687 1 : assert_eq!(
2688 1 : path_without_ids(
2689 1 : "/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788"
2690 : ),
2691 : "/v1/tenant//timeline/"
2692 : );
2693 1 : assert_eq!(
2694 1 : path_without_ids(
2695 1 : "/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788?parameter=foo"
2696 : ),
2697 : "/v1/tenant//timeline/"
2698 : );
2699 1 : }
2700 : }
|