Line data Source code
1 : use std::str::FromStr;
2 : use std::sync::{Arc, LazyLock};
3 : use std::time::{Duration, Instant};
4 :
5 : use anyhow::Context;
6 : use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
7 : use futures::Future;
8 : use http_utils::endpoint::{
9 : self, auth_middleware, check_permission_with, profile_cpu_handler, profile_heap_handler,
10 : request_span,
11 : };
12 : use http_utils::error::ApiError;
13 : use http_utils::failpoints::failpoints_handler;
14 : use http_utils::json::{json_request, json_response};
15 : use http_utils::request::{must_get_query_param, parse_query_param, parse_request_param};
16 : use http_utils::{RequestExt, RouterBuilder};
17 : use hyper::header::CONTENT_TYPE;
18 : use hyper::{Body, Request, Response, StatusCode, Uri};
19 : use metrics::{BuildInfo, NeonMetrics};
20 : use pageserver_api::controller_api::{
21 : MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
22 : MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
23 : NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest,
24 : ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
25 : TimelineImportRequest, TimelineSafekeeperMigrateRequest,
26 : };
27 : use pageserver_api::models::{
28 : DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
29 : TenantLocationConfigRequest, TenantShardSplitRequest, TenantTimeTravelRequest,
30 : TimelineArchivalConfigRequest, TimelineCreateRequest,
31 : };
32 : use pageserver_api::shard::TenantShardId;
33 : use pageserver_api::upcall_api::{
34 : PutTimelineImportStatusRequest, ReAttachRequest, TimelineImportStatusRequest, ValidateRequest,
35 : };
36 : use pageserver_client::{BlockUnblock, mgmt_api};
37 :
38 : use routerify::Middleware;
39 : use tokio_util::sync::CancellationToken;
40 : use tracing::warn;
41 : use utils::auth::{Scope, SwappableJwtAuth};
42 : use utils::id::{NodeId, TenantId, TimelineId};
43 :
44 : use crate::http;
45 : use crate::metrics::{
46 : HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, METRICS_REGISTRY,
47 : PageserverRequestLabelGroup,
48 : };
49 : use crate::persistence::SafekeeperUpsert;
50 : use crate::reconciler::ReconcileError;
51 : use crate::service::{
52 : LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service,
53 : TenantMutationLocations,
54 : };
55 :
56 : /// State available to HTTP request handlers
57 : pub struct HttpState {
58 : service: Arc<crate::service::Service>,
59 : auth: Option<Arc<SwappableJwtAuth>>,
60 : rate_limiter: governor::DefaultKeyedRateLimiter<TenantId>,
61 : neon_metrics: NeonMetrics,
62 : allowlist_routes: &'static [&'static str],
63 : }
64 :
65 : impl HttpState {
66 0 : pub fn new(
67 0 : service: Arc<crate::service::Service>,
68 0 : auth: Option<Arc<SwappableJwtAuth>>,
69 0 : build_info: BuildInfo,
70 0 : ) -> Self {
71 0 : let quota = governor::Quota::per_second(service.get_config().tenant_rate_limit);
72 0 : Self {
73 0 : service,
74 0 : auth,
75 0 : rate_limiter: governor::RateLimiter::keyed(quota),
76 0 : neon_metrics: NeonMetrics::new(build_info),
77 0 : allowlist_routes: &[
78 0 : "/status",
79 0 : "/live",
80 0 : "/ready",
81 0 : "/metrics",
82 0 : "/profile/cpu",
83 0 : "/profile/heap",
84 0 : ],
85 0 : }
86 0 : }
87 : }
88 :
89 : #[inline(always)]
90 0 : fn get_state(request: &Request<Body>) -> &HttpState {
91 0 : request
92 0 : .data::<Arc<HttpState>>()
93 0 : .expect("unknown state type")
94 0 : .as_ref()
95 0 : }
96 :
97 : /// Rate limits tenant requests.
98 : ///
99 : /// TODO: this should be a request middleware, but requires us to extract the tenant ID from
100 : /// different URLs in a systematic way.
101 : ///
102 : /// TODO: consider returning a 429 response if these start piling up.
103 0 : async fn maybe_rate_limit(request: &Request<Body>, tenant_id: TenantId) {
104 : // Check if the tenant should be rate-limited.
105 0 : let rate_limiter = &get_state(request).rate_limiter;
106 0 : if rate_limiter.check_key(&tenant_id).is_ok() {
107 0 : return;
108 0 : }
109 :
110 : // Measure the rate limiting delay.
111 0 : let _timer = METRICS_REGISTRY
112 0 : .metrics_group
113 0 : .storage_controller_http_request_rate_limited
114 0 : .start_timer();
115 :
116 : // Log rate limited tenants once every 10 seconds.
117 : static LOG_RATE_LIMITER: LazyLock<governor::DefaultKeyedRateLimiter<TenantId>> =
118 0 : LazyLock::new(|| {
119 0 : let quota = governor::Quota::with_period(Duration::from_secs(10)).unwrap();
120 0 : governor::RateLimiter::keyed(quota)
121 0 : });
122 :
123 0 : if LOG_RATE_LIMITER.check_key(&tenant_id).is_ok() {
124 0 : warn!("tenant {tenant_id} is rate limited")
125 0 : }
126 :
127 : // Wait for quota.
128 0 : rate_limiter.until_key_ready(&tenant_id).await;
129 0 : }
130 :
131 : /// Pageserver calls into this on startup, to learn which tenants it should attach
132 0 : async fn handle_re_attach(req: Request<Body>) -> Result<Response<Body>, ApiError> {
133 0 : check_permissions(&req, Scope::GenerationsApi)?;
134 :
135 0 : let mut req = match maybe_forward(req).await {
136 0 : ForwardOutcome::Forwarded(res) => {
137 0 : return res;
138 : }
139 0 : ForwardOutcome::NotForwarded(req) => req,
140 : };
141 :
142 0 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
143 0 : let state = get_state(&req);
144 0 : json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
145 0 : }
146 :
147 : /// Pageserver calls into this before doing deletions, to confirm that it still
148 : /// holds the latest generation for the tenants with deletions enqueued
149 0 : async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError> {
150 0 : check_permissions(&req, Scope::GenerationsApi)?;
151 :
152 0 : let mut req = match maybe_forward(req).await {
153 0 : ForwardOutcome::Forwarded(res) => {
154 0 : return res;
155 : }
156 0 : ForwardOutcome::NotForwarded(req) => req,
157 : };
158 :
159 0 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
160 0 : let state = get_state(&req);
161 0 : json_response(StatusCode::OK, state.service.validate(validate_req).await?)
162 0 : }
163 :
164 0 : async fn handle_get_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
165 0 : check_permissions(&req, Scope::GenerationsApi)?;
166 :
167 0 : let mut req = match maybe_forward(req).await {
168 0 : ForwardOutcome::Forwarded(res) => {
169 0 : return res;
170 : }
171 0 : ForwardOutcome::NotForwarded(req) => req,
172 : };
173 :
174 0 : let get_req = json_request::<TimelineImportStatusRequest>(&mut req).await?;
175 :
176 0 : let state = get_state(&req);
177 :
178 0 : json_response(
179 : StatusCode::OK,
180 0 : state
181 0 : .service
182 0 : .handle_timeline_shard_import_progress(get_req)
183 0 : .await?,
184 : )
185 0 : }
186 :
187 0 : async fn handle_put_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
188 0 : check_permissions(&req, Scope::GenerationsApi)?;
189 :
190 0 : let mut req = match maybe_forward(req).await {
191 0 : ForwardOutcome::Forwarded(res) => {
192 0 : return res;
193 : }
194 0 : ForwardOutcome::NotForwarded(req) => req,
195 : };
196 :
197 0 : let put_req = json_request::<PutTimelineImportStatusRequest>(&mut req).await?;
198 :
199 0 : let state = get_state(&req);
200 0 : json_response(
201 : StatusCode::OK,
202 0 : state
203 0 : .service
204 0 : .handle_timeline_shard_import_progress_upcall(put_req)
205 0 : .await?,
206 : )
207 0 : }
208 :
209 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
210 : /// (in the real control plane this is unnecessary, because the same program is managing
211 : /// generation numbers and doing attachments).
212 0 : async fn handle_attach_hook(req: Request<Body>) -> Result<Response<Body>, ApiError> {
213 0 : check_permissions(&req, Scope::Admin)?;
214 :
215 0 : let mut req = match maybe_forward(req).await {
216 0 : ForwardOutcome::Forwarded(res) => {
217 0 : return res;
218 : }
219 0 : ForwardOutcome::NotForwarded(req) => req,
220 : };
221 :
222 0 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
223 0 : let state = get_state(&req);
224 :
225 0 : json_response(
226 : StatusCode::OK,
227 0 : state
228 0 : .service
229 0 : .attach_hook(attach_req)
230 0 : .await
231 0 : .map_err(ApiError::InternalServerError)?,
232 : )
233 0 : }
234 :
235 0 : async fn handle_inspect(req: Request<Body>) -> Result<Response<Body>, ApiError> {
236 0 : check_permissions(&req, Scope::Admin)?;
237 :
238 0 : let mut req = match maybe_forward(req).await {
239 0 : ForwardOutcome::Forwarded(res) => {
240 0 : return res;
241 : }
242 0 : ForwardOutcome::NotForwarded(req) => req,
243 : };
244 :
245 0 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
246 :
247 0 : let state = get_state(&req);
248 :
249 0 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
250 0 : }
251 :
252 0 : async fn handle_tenant_create(
253 0 : service: Arc<Service>,
254 0 : req: Request<Body>,
255 0 : ) -> Result<Response<Body>, ApiError> {
256 0 : check_permissions(&req, Scope::PageServerApi)?;
257 :
258 0 : let mut req = match maybe_forward(req).await {
259 0 : ForwardOutcome::Forwarded(res) => {
260 0 : return res;
261 : }
262 0 : ForwardOutcome::NotForwarded(req) => req,
263 : };
264 :
265 0 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
266 :
267 0 : json_response(
268 : StatusCode::CREATED,
269 0 : service.tenant_create(create_req).await?,
270 : )
271 0 : }
272 :
273 0 : async fn handle_tenant_location_config(
274 0 : service: Arc<Service>,
275 0 : req: Request<Body>,
276 0 : ) -> Result<Response<Body>, ApiError> {
277 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
278 0 : check_permissions(&req, Scope::PageServerApi)?;
279 :
280 0 : let mut req = match maybe_forward(req).await {
281 0 : ForwardOutcome::Forwarded(res) => {
282 0 : return res;
283 : }
284 0 : ForwardOutcome::NotForwarded(req) => req,
285 : };
286 :
287 0 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
288 0 : json_response(
289 : StatusCode::OK,
290 0 : service
291 0 : .tenant_location_config(tenant_shard_id, config_req)
292 0 : .await?,
293 : )
294 0 : }
295 :
296 0 : async fn handle_tenant_config_patch(
297 0 : service: Arc<Service>,
298 0 : req: Request<Body>,
299 0 : ) -> Result<Response<Body>, ApiError> {
300 0 : check_permissions(&req, Scope::PageServerApi)?;
301 :
302 0 : let mut req = match maybe_forward(req).await {
303 0 : ForwardOutcome::Forwarded(res) => {
304 0 : return res;
305 : }
306 0 : ForwardOutcome::NotForwarded(req) => req,
307 : };
308 :
309 0 : let config_req = json_request::<TenantConfigPatchRequest>(&mut req).await?;
310 :
311 0 : json_response(
312 : StatusCode::OK,
313 0 : service.tenant_config_patch(config_req).await?,
314 : )
315 0 : }
316 :
317 0 : async fn handle_tenant_config_set(
318 0 : service: Arc<Service>,
319 0 : req: Request<Body>,
320 0 : ) -> Result<Response<Body>, ApiError> {
321 0 : check_permissions(&req, Scope::PageServerApi)?;
322 :
323 0 : let mut req = match maybe_forward(req).await {
324 0 : ForwardOutcome::Forwarded(res) => {
325 0 : return res;
326 : }
327 0 : ForwardOutcome::NotForwarded(req) => req,
328 : };
329 :
330 0 : let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
331 :
332 0 : json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
333 0 : }
334 :
335 0 : async fn handle_tenant_config_get(
336 0 : service: Arc<Service>,
337 0 : req: Request<Body>,
338 0 : ) -> Result<Response<Body>, ApiError> {
339 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
340 0 : check_permissions(&req, Scope::PageServerApi)?;
341 0 : maybe_rate_limit(&req, tenant_id).await;
342 :
343 0 : match maybe_forward(req).await {
344 0 : ForwardOutcome::Forwarded(res) => {
345 0 : return res;
346 : }
347 0 : ForwardOutcome::NotForwarded(_req) => {}
348 : };
349 :
350 0 : json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
351 0 : }
352 :
353 0 : async fn handle_tenant_time_travel_remote_storage(
354 0 : service: Arc<Service>,
355 0 : req: Request<Body>,
356 0 : ) -> Result<Response<Body>, ApiError> {
357 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
358 0 : check_permissions(&req, Scope::PageServerApi)?;
359 0 : maybe_rate_limit(&req, tenant_id).await;
360 :
361 0 : let mut req = match maybe_forward(req).await {
362 0 : ForwardOutcome::Forwarded(res) => {
363 0 : return res;
364 : }
365 0 : ForwardOutcome::NotForwarded(req) => req,
366 : };
367 :
368 0 : let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
369 :
370 0 : let timestamp_raw = must_get_query_param(&req, "travel_to")?;
371 0 : let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
372 0 : ApiError::BadRequest(anyhow::anyhow!(
373 0 : "Invalid time for travel_to: {timestamp_raw:?}"
374 0 : ))
375 0 : })?;
376 :
377 0 : let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
378 0 : let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
379 0 : ApiError::BadRequest(anyhow::anyhow!(
380 0 : "Invalid time for done_if_after: {done_if_after_raw:?}"
381 0 : ))
382 0 : })?;
383 :
384 0 : service
385 0 : .tenant_time_travel_remote_storage(
386 0 : &time_travel_req,
387 0 : tenant_id,
388 0 : timestamp_raw,
389 0 : done_if_after_raw,
390 0 : )
391 0 : .await?;
392 0 : json_response(StatusCode::OK, ())
393 0 : }
394 :
395 0 : fn map_reqwest_hyper_status(status: reqwest::StatusCode) -> Result<hyper::StatusCode, ApiError> {
396 0 : hyper::StatusCode::from_u16(status.as_u16())
397 0 : .context("invalid status code")
398 0 : .map_err(ApiError::InternalServerError)
399 0 : }
400 :
401 0 : async fn handle_tenant_secondary_download(
402 0 : service: Arc<Service>,
403 0 : req: Request<Body>,
404 0 : ) -> Result<Response<Body>, ApiError> {
405 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
406 0 : let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
407 0 : maybe_rate_limit(&req, tenant_id).await;
408 :
409 0 : match maybe_forward(req).await {
410 0 : ForwardOutcome::Forwarded(res) => {
411 0 : return res;
412 : }
413 0 : ForwardOutcome::NotForwarded(_req) => {}
414 : };
415 :
416 0 : let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
417 0 : json_response(map_reqwest_hyper_status(status)?, progress)
418 0 : }
419 :
420 0 : async fn handle_tenant_delete(
421 0 : service: Arc<Service>,
422 0 : req: Request<Body>,
423 0 : ) -> Result<Response<Body>, ApiError> {
424 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
425 0 : check_permissions(&req, Scope::PageServerApi)?;
426 0 : maybe_rate_limit(&req, tenant_id).await;
427 :
428 0 : match maybe_forward(req).await {
429 0 : ForwardOutcome::Forwarded(res) => {
430 0 : return res;
431 : }
432 0 : ForwardOutcome::NotForwarded(_req) => {}
433 : };
434 :
435 0 : let status_code = service
436 0 : .tenant_delete(tenant_id)
437 0 : .await
438 0 : .and_then(map_reqwest_hyper_status)?;
439 :
440 0 : if status_code == StatusCode::NOT_FOUND {
441 : // The pageserver uses 404 for successful deletion, but we use 200
442 0 : json_response(StatusCode::OK, ())
443 : } else {
444 0 : json_response(status_code, ())
445 : }
446 0 : }
447 :
448 0 : async fn handle_tenant_timeline_create(
449 0 : service: Arc<Service>,
450 0 : req: Request<Body>,
451 0 : ) -> Result<Response<Body>, ApiError> {
452 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
453 0 : check_permissions(&req, Scope::PageServerApi)?;
454 0 : maybe_rate_limit(&req, tenant_id).await;
455 :
456 0 : let mut req = match maybe_forward(req).await {
457 0 : ForwardOutcome::Forwarded(res) => {
458 0 : return res;
459 : }
460 0 : ForwardOutcome::NotForwarded(req) => req,
461 : };
462 :
463 0 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
464 0 : json_response(
465 : StatusCode::CREATED,
466 0 : service
467 0 : .tenant_timeline_create(tenant_id, create_req)
468 0 : .await?,
469 : )
470 0 : }
471 :
472 0 : async fn handle_tenant_timeline_delete(
473 0 : service: Arc<Service>,
474 0 : req: Request<Body>,
475 0 : ) -> Result<Response<Body>, ApiError> {
476 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
477 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
478 :
479 0 : check_permissions(&req, Scope::PageServerApi)?;
480 0 : maybe_rate_limit(&req, tenant_id).await;
481 :
482 0 : match maybe_forward(req).await {
483 0 : ForwardOutcome::Forwarded(res) => {
484 0 : return res;
485 : }
486 0 : ForwardOutcome::NotForwarded(_req) => {}
487 : };
488 :
489 0 : service
490 0 : .maybe_delete_timeline_import(tenant_id, timeline_id)
491 0 : .await?;
492 :
493 : // For timeline deletions, which both implement an "initially return 202, then 404 once
494 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
495 0 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
496 0 : where
497 0 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
498 0 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
499 0 : {
500 : // On subsequent retries, wait longer.
501 : // Enable callers with a 25 second request timeout to reliably get a response
502 : const MAX_WAIT: Duration = Duration::from_secs(25);
503 : const MAX_RETRY_PERIOD: Duration = Duration::from_secs(5);
504 :
505 0 : let started_at = Instant::now();
506 :
507 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
508 : // completed.
509 0 : let mut retry_period = Duration::from_secs(1);
510 :
511 : loop {
512 0 : let status = f(service.clone()).await?;
513 0 : match status {
514 : StatusCode::ACCEPTED => {
515 0 : tracing::info!("Deletion accepted, waiting to try again...");
516 0 : tokio::time::sleep(retry_period).await;
517 0 : retry_period = MAX_RETRY_PERIOD;
518 : }
519 : StatusCode::CONFLICT => {
520 0 : tracing::info!("Deletion already in progress, waiting to try again...");
521 0 : tokio::time::sleep(retry_period).await;
522 : }
523 : StatusCode::NOT_FOUND => {
524 0 : tracing::info!("Deletion complete");
525 0 : return json_response(StatusCode::OK, ());
526 : }
527 : _ => {
528 0 : tracing::warn!("Unexpected status {status}");
529 0 : return json_response(status, ());
530 : }
531 : }
532 :
533 0 : let now = Instant::now();
534 0 : if now + retry_period > started_at + MAX_WAIT {
535 0 : tracing::info!("Deletion timed out waiting for 404");
536 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
537 : // the pageserver's swagger definition for this endpoint, and has the same desired
538 : // effect of causing the control plane to retry later.
539 0 : return json_response(StatusCode::CONFLICT, ());
540 0 : }
541 : }
542 0 : }
543 :
544 0 : deletion_wrapper(service, move |service| async move {
545 0 : service
546 0 : .tenant_timeline_delete(tenant_id, timeline_id)
547 0 : .await
548 0 : .and_then(map_reqwest_hyper_status)
549 0 : })
550 0 : .await
551 0 : }
552 :
553 0 : async fn handle_tenant_timeline_archival_config(
554 0 : service: Arc<Service>,
555 0 : req: Request<Body>,
556 0 : ) -> Result<Response<Body>, ApiError> {
557 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
558 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
559 :
560 0 : check_permissions(&req, Scope::PageServerApi)?;
561 0 : maybe_rate_limit(&req, tenant_id).await;
562 :
563 0 : let mut req = match maybe_forward(req).await {
564 0 : ForwardOutcome::Forwarded(res) => {
565 0 : return res;
566 : }
567 0 : ForwardOutcome::NotForwarded(req) => req,
568 : };
569 :
570 0 : let create_req = json_request::<TimelineArchivalConfigRequest>(&mut req).await?;
571 :
572 0 : service
573 0 : .tenant_timeline_archival_config(tenant_id, timeline_id, create_req)
574 0 : .await?;
575 :
576 0 : json_response(StatusCode::OK, ())
577 0 : }
578 :
579 0 : async fn handle_tenant_timeline_detach_ancestor(
580 0 : service: Arc<Service>,
581 0 : req: Request<Body>,
582 0 : ) -> Result<Response<Body>, ApiError> {
583 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
584 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
585 0 : let behavior: Option<DetachBehavior> = parse_query_param(&req, "detach_behavior")?;
586 :
587 0 : check_permissions(&req, Scope::PageServerApi)?;
588 0 : maybe_rate_limit(&req, tenant_id).await;
589 :
590 0 : match maybe_forward(req).await {
591 0 : ForwardOutcome::Forwarded(res) => {
592 0 : return res;
593 : }
594 0 : ForwardOutcome::NotForwarded(_req) => {}
595 : };
596 :
597 0 : let res = service
598 0 : .tenant_timeline_detach_ancestor(tenant_id, timeline_id, behavior)
599 0 : .await?;
600 :
601 0 : json_response(StatusCode::OK, res)
602 0 : }
603 :
604 0 : async fn handle_tenant_timeline_block_unblock_gc(
605 0 : service: Arc<Service>,
606 0 : req: Request<Body>,
607 0 : dir: BlockUnblock,
608 0 : ) -> Result<Response<Body>, ApiError> {
609 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
610 0 : check_permissions(&req, Scope::PageServerApi)?;
611 0 : maybe_rate_limit(&req, tenant_id).await;
612 :
613 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
614 :
615 0 : service
616 0 : .tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
617 0 : .await?;
618 :
619 0 : json_response(StatusCode::OK, ())
620 0 : }
621 :
622 0 : async fn handle_tenant_timeline_download_heatmap_layers(
623 0 : service: Arc<Service>,
624 0 : req: Request<Body>,
625 0 : ) -> Result<Response<Body>, ApiError> {
626 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
627 :
628 0 : check_permissions(&req, Scope::PageServerApi)?;
629 0 : maybe_rate_limit(&req, tenant_shard_id.tenant_id).await;
630 :
631 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
632 0 : let concurrency: Option<usize> = parse_query_param(&req, "concurrency")?;
633 0 : let recurse = parse_query_param(&req, "recurse")?.unwrap_or(false);
634 :
635 0 : service
636 0 : .tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency, recurse)
637 0 : .await?;
638 :
639 0 : json_response(StatusCode::OK, ())
640 0 : }
641 :
642 0 : async fn handle_tenant_timeline_safekeeper_migrate(
643 0 : service: Arc<Service>,
644 0 : req: Request<Body>,
645 0 : ) -> Result<Response<Body>, ApiError> {
646 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
647 0 : check_permissions(&req, Scope::PageServerApi)?;
648 0 : maybe_rate_limit(&req, tenant_id).await;
649 :
650 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
651 :
652 0 : let mut req = match maybe_forward(req).await {
653 0 : ForwardOutcome::Forwarded(res) => {
654 0 : return res;
655 : }
656 0 : ForwardOutcome::NotForwarded(req) => req,
657 : };
658 :
659 0 : let migrate_req = json_request::<TimelineSafekeeperMigrateRequest>(&mut req).await?;
660 :
661 0 : service
662 0 : .tenant_timeline_safekeeper_migrate(tenant_id, timeline_id, migrate_req)
663 0 : .await?;
664 :
665 0 : json_response(StatusCode::OK, ())
666 0 : }
667 :
668 0 : async fn handle_tenant_timeline_lsn_lease(
669 0 : service: Arc<Service>,
670 0 : req: Request<Body>,
671 0 : ) -> Result<Response<Body>, ApiError> {
672 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
673 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
674 :
675 0 : check_permissions(&req, Scope::PageServerApi)?;
676 0 : maybe_rate_limit(&req, tenant_id).await;
677 :
678 0 : let mut req = match maybe_forward(req).await {
679 0 : ForwardOutcome::Forwarded(res) => {
680 0 : return res;
681 : }
682 0 : ForwardOutcome::NotForwarded(req) => req,
683 : };
684 :
685 0 : let lsn_lease_request = json_request::<LsnLeaseRequest>(&mut req).await?;
686 :
687 0 : service
688 0 : .tenant_timeline_lsn_lease(tenant_id, timeline_id, lsn_lease_request.lsn)
689 0 : .await?;
690 :
691 0 : json_response(StatusCode::OK, ())
692 0 : }
693 :
694 : // For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
695 : // and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
696 : // compare to, so we can just filter out our well known ID format with regexes.
697 3 : fn path_without_ids(path: &str) -> String {
698 : static ID_REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
699 3 : ID_REGEX
700 3 : .get_or_init(|| regex::Regex::new(r"([0-9a-fA-F]{32}(-[0-9]{4})?|\?.*)").unwrap())
701 3 : .replace_all(path, "")
702 3 : .to_string()
703 3 : }
704 :
705 0 : async fn handle_tenant_timeline_passthrough(
706 0 : service: Arc<Service>,
707 0 : req: Request<Body>,
708 0 : ) -> Result<Response<Body>, ApiError> {
709 0 : let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_id")?;
710 0 : check_permissions(&req, Scope::PageServerApi)?;
711 0 : maybe_rate_limit(&req, tenant_or_shard_id.tenant_id).await;
712 :
713 0 : let req = match maybe_forward(req).await {
714 0 : ForwardOutcome::Forwarded(res) => {
715 0 : return res;
716 : }
717 0 : ForwardOutcome::NotForwarded(req) => req,
718 : };
719 :
720 0 : let Some(path) = req.uri().path_and_query() else {
721 : // This should never happen, our request router only calls us if there is a path
722 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
723 : };
724 :
725 0 : let method = match *req.method() {
726 0 : hyper::Method::GET => reqwest::Method::GET,
727 0 : hyper::Method::POST => reqwest::Method::POST,
728 0 : hyper::Method::PUT => reqwest::Method::PUT,
729 0 : hyper::Method::DELETE => reqwest::Method::DELETE,
730 0 : hyper::Method::PATCH => reqwest::Method::PATCH,
731 0 : _ => return Err(ApiError::BadRequest(anyhow::anyhow!("Unsupported method"))),
732 : };
733 :
734 0 : tracing::info!(
735 0 : "Proxying request for tenant {} ({})",
736 : tenant_or_shard_id.tenant_id,
737 : path
738 : );
739 :
740 0 : let tenant_shard_id = if tenant_or_shard_id.is_unsharded() {
741 : // If the request contains only tenant ID, find the node that holds shard zero
742 0 : let (_, shard_id) = service
743 0 : .tenant_shard0_node(tenant_or_shard_id.tenant_id)
744 0 : .await?;
745 0 : shard_id
746 : } else {
747 0 : tenant_or_shard_id
748 : };
749 :
750 0 : let service_inner = service.clone();
751 :
752 0 : service.tenant_shard_remote_mutation(tenant_shard_id, |locations| async move {
753 0 : let TenantMutationLocations(locations) = locations;
754 0 : if locations.is_empty() {
755 0 : return Err(ApiError::NotFound(anyhow::anyhow!("Tenant {} not found", tenant_or_shard_id.tenant_id).into()));
756 0 : }
757 :
758 0 : let (tenant_or_shard_id, locations) = locations.into_iter().next().unwrap();
759 0 : let node = locations.latest.node;
760 :
761 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
762 : // rewrite this to a shard-aware shard zero ID.
763 0 : let path = format!("{path}");
764 0 : let tenant_str = tenant_or_shard_id.tenant_id.to_string();
765 0 : let tenant_shard_str = format!("{tenant_shard_id}");
766 0 : let path = path.replace(&tenant_str, &tenant_shard_str);
767 :
768 0 : let latency = &METRICS_REGISTRY
769 0 : .metrics_group
770 0 : .storage_controller_passthrough_request_latency;
771 :
772 0 : let path_label = path_without_ids(&path)
773 0 : .split('/')
774 0 : .filter(|token| !token.is_empty())
775 0 : .collect::<Vec<_>>()
776 0 : .join("_");
777 0 : let labels = PageserverRequestLabelGroup {
778 0 : pageserver_id: &node.get_id().to_string(),
779 0 : path: &path_label,
780 0 : method: crate::metrics::Method::Get,
781 0 : };
782 :
783 0 : let _timer = latency.start_timer(labels.clone());
784 :
785 0 : let client = mgmt_api::Client::new(
786 0 : service_inner.get_http_client().clone(),
787 0 : node.base_url(),
788 0 : service_inner.get_config().pageserver_jwt_token.as_deref(),
789 : );
790 0 : let resp = client.op_raw(method, path).await.map_err(|e|
791 : // We return 503 here because if we can't successfully send a request to the pageserver,
792 : // either we aren't available or the pageserver is unavailable.
793 0 : ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
794 :
795 0 : if !resp.status().is_success() {
796 0 : let error_counter = &METRICS_REGISTRY
797 0 : .metrics_group
798 0 : .storage_controller_passthrough_request_error;
799 0 : error_counter.inc(labels);
800 0 : }
801 0 : let resp_staus = resp.status();
802 :
803 : // We have a reqest::Response, would like a http::Response
804 0 : let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp_staus)?);
805 0 : for (k, v) in resp.headers() {
806 0 : builder = builder.header(k.as_str(), v.as_bytes());
807 0 : }
808 0 : let resp_bytes = resp
809 0 : .bytes()
810 0 : .await
811 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
812 : // Inspect 404 errors: at this point, we know that the tenant exists, but the pageserver we route
813 : // the request to might not yet be ready. Therefore, if it is a _tenant_ not found error, we can
814 : // convert it into a 503. TODO: we should make this part of the check in `tenant_shard_remote_mutation`.
815 : // However, `tenant_shard_remote_mutation` currently cannot inspect the HTTP error response body,
816 : // so we have to do it here instead.
817 0 : if resp_staus == reqwest::StatusCode::NOT_FOUND {
818 0 : let resp_str = std::str::from_utf8(&resp_bytes)
819 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
820 : // We only handle "tenant not found" errors; other 404s like timeline not found should
821 : // be forwarded as-is.
822 0 : if Service::is_tenant_not_found_error(resp_str, tenant_or_shard_id.tenant_id) {
823 : // Rather than retry here, send the client a 503 to prompt a retry: this matches
824 : // the pageserver's use of 503, and all clients calling this API should retry on 503.
825 0 : return Err(ApiError::ResourceUnavailable(
826 0 : format!(
827 0 : "Pageserver {node} returned tenant 404 due to ongoing migration, retry later"
828 0 : )
829 0 : .into(),
830 0 : ));
831 0 : }
832 0 : }
833 0 : let response = builder
834 0 : .body(Body::from(resp_bytes))
835 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
836 0 : Ok(response)
837 0 : }).await?
838 0 : }
839 :
840 0 : async fn handle_tenant_locate(
841 0 : service: Arc<Service>,
842 0 : req: Request<Body>,
843 0 : ) -> Result<Response<Body>, ApiError> {
844 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
845 :
846 0 : check_permissions(&req, Scope::Admin)?;
847 : // NB: don't rate limit: admin operation.
848 :
849 0 : match maybe_forward(req).await {
850 0 : ForwardOutcome::Forwarded(res) => {
851 0 : return res;
852 : }
853 0 : ForwardOutcome::NotForwarded(_req) => {}
854 : };
855 :
856 0 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
857 0 : }
858 :
859 0 : async fn handle_tenant_describe(
860 0 : service: Arc<Service>,
861 0 : req: Request<Body>,
862 0 : ) -> Result<Response<Body>, ApiError> {
863 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
864 0 : check_permissions(&req, Scope::Scrubber)?;
865 : // NB: don't rate limit: scrubber operation.
866 :
867 0 : match maybe_forward(req).await {
868 0 : ForwardOutcome::Forwarded(res) => {
869 0 : return res;
870 : }
871 0 : ForwardOutcome::NotForwarded(_req) => {}
872 : };
873 :
874 0 : json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
875 0 : }
876 :
877 : /* BEGIN_HADRON */
878 0 : async fn handle_tenant_timeline_describe(
879 0 : service: Arc<Service>,
880 0 : req: Request<Body>,
881 0 : ) -> Result<Response<Body>, ApiError> {
882 0 : check_permissions(&req, Scope::Scrubber)?;
883 :
884 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
885 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
886 0 : match maybe_forward(req).await {
887 0 : ForwardOutcome::Forwarded(res) => {
888 0 : return res;
889 : }
890 0 : ForwardOutcome::NotForwarded(_req) => {}
891 : };
892 :
893 0 : json_response(
894 : StatusCode::OK,
895 0 : service
896 0 : .tenant_timeline_describe(tenant_id, timeline_id)
897 0 : .await?,
898 : )
899 0 : }
900 : /* END_HADRON */
901 :
902 0 : async fn handle_tenant_list(
903 0 : service: Arc<Service>,
904 0 : req: Request<Body>,
905 0 : ) -> Result<Response<Body>, ApiError> {
906 0 : check_permissions(&req, Scope::Admin)?;
907 :
908 0 : let limit: Option<usize> = parse_query_param(&req, "limit")?;
909 0 : let start_after: Option<TenantId> = parse_query_param(&req, "start_after")?;
910 0 : tracing::info!("start_after: {:?}", start_after);
911 :
912 0 : match maybe_forward(req).await {
913 0 : ForwardOutcome::Forwarded(res) => {
914 0 : return res;
915 : }
916 0 : ForwardOutcome::NotForwarded(_req) => {}
917 : };
918 :
919 0 : json_response(StatusCode::OK, service.tenant_list(limit, start_after))
920 0 : }
921 :
922 0 : async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiError> {
923 0 : check_permissions(&req, Scope::Infra)?;
924 :
925 0 : let mut req = match maybe_forward(req).await {
926 0 : ForwardOutcome::Forwarded(res) => {
927 0 : return res;
928 : }
929 0 : ForwardOutcome::NotForwarded(req) => req,
930 : };
931 :
932 0 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
933 0 : let state = get_state(&req);
934 0 : state.service.node_register(register_req).await?;
935 0 : json_response(StatusCode::OK, ())
936 0 : }
937 :
938 0 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
939 0 : check_permissions(&req, Scope::Infra)?;
940 :
941 0 : let req = match maybe_forward(req).await {
942 0 : ForwardOutcome::Forwarded(res) => {
943 0 : return res;
944 : }
945 0 : ForwardOutcome::NotForwarded(req) => req,
946 : };
947 :
948 0 : let state = get_state(&req);
949 0 : let mut nodes = state.service.node_list().await?;
950 0 : nodes.sort_by_key(|n| n.get_id());
951 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
952 :
953 0 : json_response(StatusCode::OK, api_nodes)
954 0 : }
955 :
956 0 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
957 0 : check_permissions(&req, Scope::Admin)?;
958 :
959 0 : let req = match maybe_forward(req).await {
960 0 : ForwardOutcome::Forwarded(res) => {
961 0 : return res;
962 : }
963 0 : ForwardOutcome::NotForwarded(req) => req,
964 : };
965 :
966 0 : let state = get_state(&req);
967 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
968 0 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
969 0 : }
970 :
971 0 : async fn handle_node_delete_old(req: Request<Body>) -> Result<Response<Body>, ApiError> {
972 0 : check_permissions(&req, Scope::Admin)?;
973 :
974 0 : let req = match maybe_forward(req).await {
975 0 : ForwardOutcome::Forwarded(res) => {
976 0 : return res;
977 : }
978 0 : ForwardOutcome::NotForwarded(req) => req,
979 : };
980 :
981 0 : let state = get_state(&req);
982 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
983 0 : json_response(
984 : StatusCode::OK,
985 0 : state.service.node_delete_old(node_id).await?,
986 : )
987 0 : }
988 :
989 0 : async fn handle_tombstone_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
990 0 : check_permissions(&req, Scope::Admin)?;
991 :
992 0 : let req = match maybe_forward(req).await {
993 0 : ForwardOutcome::Forwarded(res) => {
994 0 : return res;
995 : }
996 0 : ForwardOutcome::NotForwarded(req) => req,
997 : };
998 :
999 0 : let state = get_state(&req);
1000 0 : let mut nodes = state.service.tombstone_list().await?;
1001 0 : nodes.sort_by_key(|n| n.get_id());
1002 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
1003 :
1004 0 : json_response(StatusCode::OK, api_nodes)
1005 0 : }
1006 :
1007 0 : async fn handle_tombstone_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1008 0 : check_permissions(&req, Scope::Admin)?;
1009 :
1010 0 : let req = match maybe_forward(req).await {
1011 0 : ForwardOutcome::Forwarded(res) => {
1012 0 : return res;
1013 : }
1014 0 : ForwardOutcome::NotForwarded(req) => req,
1015 : };
1016 :
1017 0 : let state = get_state(&req);
1018 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1019 0 : json_response(
1020 : StatusCode::OK,
1021 0 : state.service.tombstone_delete(node_id).await?,
1022 : )
1023 0 : }
1024 :
1025 0 : async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1026 0 : check_permissions(&req, Scope::Admin)?;
1027 :
1028 0 : let mut req = match maybe_forward(req).await {
1029 0 : ForwardOutcome::Forwarded(res) => {
1030 0 : return res;
1031 : }
1032 0 : ForwardOutcome::NotForwarded(req) => req,
1033 : };
1034 :
1035 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1036 0 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
1037 0 : if node_id != config_req.node_id {
1038 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1039 0 : "Path and body node_id differ"
1040 0 : )));
1041 0 : }
1042 0 : let state = get_state(&req);
1043 :
1044 0 : json_response(
1045 : StatusCode::OK,
1046 0 : state
1047 0 : .service
1048 0 : .external_node_configure(
1049 0 : config_req.node_id,
1050 0 : config_req.availability.map(NodeAvailability::from),
1051 0 : config_req.scheduling,
1052 0 : )
1053 0 : .await?,
1054 : )
1055 0 : }
1056 :
1057 0 : async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1058 0 : check_permissions(&req, Scope::Infra)?;
1059 :
1060 0 : let req = match maybe_forward(req).await {
1061 0 : ForwardOutcome::Forwarded(res) => {
1062 0 : return res;
1063 : }
1064 0 : ForwardOutcome::NotForwarded(req) => req,
1065 : };
1066 :
1067 0 : let state = get_state(&req);
1068 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1069 :
1070 0 : let node_status = state.service.get_node(node_id).await?.describe();
1071 :
1072 0 : json_response(StatusCode::OK, node_status)
1073 0 : }
1074 :
1075 0 : async fn handle_node_shards(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1076 0 : check_permissions(&req, Scope::Admin)?;
1077 :
1078 0 : let state = get_state(&req);
1079 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1080 :
1081 0 : let node_status = state.service.get_node_shards(node_id).await?;
1082 :
1083 0 : json_response(StatusCode::OK, node_status)
1084 0 : }
1085 :
1086 0 : async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1087 0 : check_permissions(&req, Scope::Admin)?;
1088 :
1089 0 : let req = match maybe_forward(req).await {
1090 0 : ForwardOutcome::Forwarded(res) => {
1091 0 : return res;
1092 : }
1093 0 : ForwardOutcome::NotForwarded(req) => req,
1094 : };
1095 :
1096 0 : let state = get_state(&req);
1097 0 : let leader = state.service.get_leader().await.map_err(|err| {
1098 0 : ApiError::InternalServerError(anyhow::anyhow!(
1099 0 : "Failed to read leader from database: {err}"
1100 0 : ))
1101 0 : })?;
1102 :
1103 0 : json_response(StatusCode::OK, leader)
1104 0 : }
1105 :
1106 0 : async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1107 0 : check_permissions(&req, Scope::Admin)?;
1108 :
1109 0 : let req = match maybe_forward(req).await {
1110 0 : ForwardOutcome::Forwarded(res) => {
1111 0 : return res;
1112 : }
1113 0 : ForwardOutcome::NotForwarded(req) => req,
1114 : };
1115 :
1116 0 : let state = get_state(&req);
1117 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1118 0 : let force: bool = parse_query_param(&req, "force")?.unwrap_or(false);
1119 0 : json_response(
1120 : StatusCode::OK,
1121 0 : state.service.start_node_delete(node_id, force).await?,
1122 : )
1123 0 : }
1124 :
1125 0 : async fn handle_cancel_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1126 0 : check_permissions(&req, Scope::Infra)?;
1127 :
1128 0 : let req = match maybe_forward(req).await {
1129 0 : ForwardOutcome::Forwarded(res) => {
1130 0 : return res;
1131 : }
1132 0 : ForwardOutcome::NotForwarded(req) => req,
1133 : };
1134 :
1135 0 : let state = get_state(&req);
1136 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1137 0 : json_response(
1138 : StatusCode::ACCEPTED,
1139 0 : state.service.cancel_node_delete(node_id).await?,
1140 : )
1141 0 : }
1142 :
1143 0 : async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1144 0 : check_permissions(&req, Scope::Infra)?;
1145 :
1146 0 : let req = match maybe_forward(req).await {
1147 0 : ForwardOutcome::Forwarded(res) => {
1148 0 : return res;
1149 : }
1150 0 : ForwardOutcome::NotForwarded(req) => req,
1151 : };
1152 :
1153 0 : let state = get_state(&req);
1154 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1155 :
1156 0 : state.service.start_node_drain(node_id).await?;
1157 :
1158 0 : json_response(StatusCode::ACCEPTED, ())
1159 0 : }
1160 :
1161 0 : async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1162 0 : check_permissions(&req, Scope::Infra)?;
1163 :
1164 0 : let req = match maybe_forward(req).await {
1165 0 : ForwardOutcome::Forwarded(res) => {
1166 0 : return res;
1167 : }
1168 0 : ForwardOutcome::NotForwarded(req) => req,
1169 : };
1170 :
1171 0 : let state = get_state(&req);
1172 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1173 :
1174 0 : state.service.cancel_node_drain(node_id).await?;
1175 :
1176 0 : json_response(StatusCode::ACCEPTED, ())
1177 0 : }
1178 :
1179 0 : async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1180 0 : check_permissions(&req, Scope::Infra)?;
1181 :
1182 0 : let req = match maybe_forward(req).await {
1183 0 : ForwardOutcome::Forwarded(res) => {
1184 0 : return res;
1185 : }
1186 0 : ForwardOutcome::NotForwarded(req) => req,
1187 : };
1188 :
1189 0 : let state = get_state(&req);
1190 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1191 :
1192 0 : state.service.start_node_fill(node_id).await?;
1193 :
1194 0 : json_response(StatusCode::ACCEPTED, ())
1195 0 : }
1196 :
1197 0 : async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1198 0 : check_permissions(&req, Scope::Infra)?;
1199 :
1200 0 : let req = match maybe_forward(req).await {
1201 0 : ForwardOutcome::Forwarded(res) => {
1202 0 : return res;
1203 : }
1204 0 : ForwardOutcome::NotForwarded(req) => req,
1205 : };
1206 :
1207 0 : let state = get_state(&req);
1208 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
1209 :
1210 0 : state.service.cancel_node_fill(node_id).await?;
1211 :
1212 0 : json_response(StatusCode::ACCEPTED, ())
1213 0 : }
1214 :
1215 0 : async fn handle_safekeeper_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1216 0 : check_permissions(&req, Scope::Infra)?;
1217 :
1218 0 : let req = match maybe_forward(req).await {
1219 0 : ForwardOutcome::Forwarded(res) => {
1220 0 : return res;
1221 : }
1222 0 : ForwardOutcome::NotForwarded(req) => req,
1223 : };
1224 :
1225 0 : let state = get_state(&req);
1226 0 : let safekeepers = state.service.safekeepers_list().await?;
1227 0 : json_response(StatusCode::OK, safekeepers)
1228 0 : }
1229 :
1230 0 : async fn handle_metadata_health_update(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1231 0 : check_permissions(&req, Scope::Scrubber)?;
1232 :
1233 0 : let mut req = match maybe_forward(req).await {
1234 0 : ForwardOutcome::Forwarded(res) => {
1235 0 : return res;
1236 : }
1237 0 : ForwardOutcome::NotForwarded(req) => req,
1238 : };
1239 :
1240 0 : let update_req = json_request::<MetadataHealthUpdateRequest>(&mut req).await?;
1241 0 : let state = get_state(&req);
1242 :
1243 0 : state.service.metadata_health_update(update_req).await?;
1244 :
1245 0 : json_response(StatusCode::OK, MetadataHealthUpdateResponse {})
1246 0 : }
1247 :
1248 0 : async fn handle_metadata_health_list_unhealthy(
1249 0 : req: Request<Body>,
1250 0 : ) -> Result<Response<Body>, ApiError> {
1251 0 : check_permissions(&req, Scope::Admin)?;
1252 :
1253 0 : let req = match maybe_forward(req).await {
1254 0 : ForwardOutcome::Forwarded(res) => {
1255 0 : return res;
1256 : }
1257 0 : ForwardOutcome::NotForwarded(req) => req,
1258 : };
1259 :
1260 0 : let state = get_state(&req);
1261 0 : let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
1262 :
1263 0 : json_response(
1264 : StatusCode::OK,
1265 0 : MetadataHealthListUnhealthyResponse {
1266 0 : unhealthy_tenant_shards,
1267 0 : },
1268 : )
1269 0 : }
1270 :
1271 0 : async fn handle_metadata_health_list_outdated(
1272 0 : req: Request<Body>,
1273 0 : ) -> Result<Response<Body>, ApiError> {
1274 0 : check_permissions(&req, Scope::Admin)?;
1275 :
1276 0 : let mut req = match maybe_forward(req).await {
1277 0 : ForwardOutcome::Forwarded(res) => {
1278 0 : return res;
1279 : }
1280 0 : ForwardOutcome::NotForwarded(req) => req,
1281 : };
1282 :
1283 0 : let list_outdated_req = json_request::<MetadataHealthListOutdatedRequest>(&mut req).await?;
1284 0 : let state = get_state(&req);
1285 0 : let health_records = state
1286 0 : .service
1287 0 : .metadata_health_list_outdated(list_outdated_req.not_scrubbed_for)
1288 0 : .await?;
1289 :
1290 0 : json_response(
1291 : StatusCode::OK,
1292 0 : MetadataHealthListOutdatedResponse { health_records },
1293 : )
1294 0 : }
1295 :
1296 0 : async fn handle_tenant_shard_split(
1297 0 : service: Arc<Service>,
1298 0 : req: Request<Body>,
1299 0 : ) -> Result<Response<Body>, ApiError> {
1300 0 : check_permissions(&req, Scope::Admin)?;
1301 : // NB: don't rate limit: admin operation.
1302 :
1303 0 : let mut req = match maybe_forward(req).await {
1304 0 : ForwardOutcome::Forwarded(res) => {
1305 0 : return res;
1306 : }
1307 0 : ForwardOutcome::NotForwarded(req) => req,
1308 : };
1309 :
1310 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1311 0 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
1312 :
1313 0 : json_response(
1314 : StatusCode::OK,
1315 0 : service.tenant_shard_split(tenant_id, split_req).await?,
1316 : )
1317 0 : }
1318 :
1319 0 : async fn handle_tenant_shard_migrate(
1320 0 : service: Arc<Service>,
1321 0 : req: Request<Body>,
1322 0 : ) -> Result<Response<Body>, ApiError> {
1323 0 : check_permissions(&req, Scope::Admin)?;
1324 : // NB: don't rate limit: admin operation.
1325 :
1326 0 : let mut req = match maybe_forward(req).await {
1327 0 : ForwardOutcome::Forwarded(res) => {
1328 0 : return res;
1329 : }
1330 0 : ForwardOutcome::NotForwarded(req) => req,
1331 : };
1332 :
1333 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1334 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1335 0 : json_response(
1336 : StatusCode::OK,
1337 0 : service
1338 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
1339 0 : .await?,
1340 : )
1341 0 : }
1342 :
1343 0 : async fn handle_tenant_shard_migrate_secondary(
1344 0 : service: Arc<Service>,
1345 0 : req: Request<Body>,
1346 0 : ) -> Result<Response<Body>, ApiError> {
1347 0 : check_permissions(&req, Scope::Admin)?;
1348 : // NB: don't rate limit: admin operation.
1349 :
1350 0 : let mut req = match maybe_forward(req).await {
1351 0 : ForwardOutcome::Forwarded(res) => {
1352 0 : return res;
1353 : }
1354 0 : ForwardOutcome::NotForwarded(req) => req,
1355 : };
1356 :
1357 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1358 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
1359 0 : json_response(
1360 : StatusCode::OK,
1361 0 : service
1362 0 : .tenant_shard_migrate_secondary(tenant_shard_id, migrate_req)
1363 0 : .await?,
1364 : )
1365 0 : }
1366 :
1367 0 : async fn handle_tenant_shard_cancel_reconcile(
1368 0 : service: Arc<Service>,
1369 0 : req: Request<Body>,
1370 0 : ) -> Result<Response<Body>, ApiError> {
1371 0 : check_permissions(&req, Scope::Admin)?;
1372 : // NB: don't rate limit: admin operation.
1373 :
1374 0 : let req = match maybe_forward(req).await {
1375 0 : ForwardOutcome::Forwarded(res) => {
1376 0 : return res;
1377 : }
1378 0 : ForwardOutcome::NotForwarded(req) => req,
1379 : };
1380 :
1381 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
1382 0 : json_response(
1383 : StatusCode::OK,
1384 0 : service
1385 0 : .tenant_shard_cancel_reconcile(tenant_shard_id)
1386 0 : .await?,
1387 : )
1388 0 : }
1389 :
1390 0 : async fn handle_tenant_update_policy(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1391 0 : check_permissions(&req, Scope::Admin)?;
1392 : // NB: don't rate limit: admin operation.
1393 :
1394 0 : let mut req = match maybe_forward(req).await {
1395 0 : ForwardOutcome::Forwarded(res) => {
1396 0 : return res;
1397 : }
1398 0 : ForwardOutcome::NotForwarded(req) => req,
1399 : };
1400 :
1401 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1402 0 : let update_req = json_request::<TenantPolicyRequest>(&mut req).await?;
1403 0 : let state = get_state(&req);
1404 :
1405 0 : json_response(
1406 : StatusCode::OK,
1407 0 : state
1408 0 : .service
1409 0 : .tenant_update_policy(tenant_id, update_req)
1410 0 : .await?,
1411 : )
1412 0 : }
1413 :
1414 0 : async fn handle_update_preferred_azs(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1415 0 : check_permissions(&req, Scope::Admin)?;
1416 :
1417 0 : let mut req = match maybe_forward(req).await {
1418 0 : ForwardOutcome::Forwarded(res) => {
1419 0 : return res;
1420 : }
1421 0 : ForwardOutcome::NotForwarded(req) => req,
1422 : };
1423 :
1424 0 : let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
1425 0 : let state = get_state(&req);
1426 :
1427 0 : json_response(
1428 : StatusCode::OK,
1429 0 : state.service.update_shards_preferred_azs(azs_req).await?,
1430 : )
1431 0 : }
1432 :
1433 0 : async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1434 0 : check_permissions(&req, Scope::ControllerPeer)?;
1435 :
1436 0 : let req = match maybe_forward(req).await {
1437 0 : ForwardOutcome::Forwarded(res) => {
1438 0 : return res;
1439 : }
1440 0 : ForwardOutcome::NotForwarded(req) => req,
1441 : };
1442 :
1443 0 : let state = get_state(&req);
1444 0 : let result = state.service.step_down().await;
1445 :
1446 0 : json_response(StatusCode::OK, result)
1447 0 : }
1448 :
1449 0 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1450 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1451 0 : check_permissions(&req, Scope::PageServerApi)?;
1452 0 : maybe_rate_limit(&req, tenant_id).await;
1453 :
1454 0 : let req = match maybe_forward(req).await {
1455 0 : ForwardOutcome::Forwarded(res) => {
1456 0 : return res;
1457 : }
1458 0 : ForwardOutcome::NotForwarded(req) => req,
1459 : };
1460 :
1461 0 : let state = get_state(&req);
1462 :
1463 0 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
1464 0 : }
1465 :
1466 0 : async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1467 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1468 0 : check_permissions(&req, Scope::PageServerApi)?;
1469 0 : maybe_rate_limit(&req, tenant_id).await;
1470 :
1471 0 : let req = match maybe_forward(req).await {
1472 0 : ForwardOutcome::Forwarded(res) => {
1473 0 : return res;
1474 : }
1475 0 : ForwardOutcome::NotForwarded(req) => req,
1476 : };
1477 :
1478 0 : let state = get_state(&req);
1479 :
1480 0 : json_response(
1481 : StatusCode::OK,
1482 0 : state.service.tenant_import(tenant_id).await?,
1483 : )
1484 0 : }
1485 :
1486 0 : async fn handle_timeline_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1487 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1488 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
1489 0 : check_permissions(&req, Scope::PageServerApi)?;
1490 0 : maybe_rate_limit(&req, tenant_id).await;
1491 :
1492 0 : let mut req = match maybe_forward(req).await {
1493 0 : ForwardOutcome::Forwarded(res) => {
1494 0 : return res;
1495 : }
1496 0 : ForwardOutcome::NotForwarded(req) => req,
1497 : };
1498 :
1499 0 : let import_req = json_request::<TimelineImportRequest>(&mut req).await?;
1500 :
1501 0 : let state = get_state(&req);
1502 :
1503 0 : if import_req.tenant_id != tenant_id || import_req.timeline_id != timeline_id {
1504 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1505 0 : "tenant id or timeline id mismatch: url={tenant_id}/{timeline_id}, body={}/{}",
1506 0 : import_req.tenant_id,
1507 0 : import_req.timeline_id
1508 0 : )));
1509 0 : }
1510 :
1511 0 : json_response(
1512 : StatusCode::OK,
1513 0 : state.service.timeline_import(import_req).await?,
1514 : )
1515 0 : }
1516 :
1517 0 : async fn handle_tenant_timeline_locate(
1518 0 : service: Arc<Service>,
1519 0 : req: Request<Body>,
1520 0 : ) -> Result<Response<Body>, ApiError> {
1521 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1522 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
1523 :
1524 0 : check_permissions(&req, Scope::Admin)?;
1525 0 : maybe_rate_limit(&req, tenant_id).await;
1526 :
1527 0 : match maybe_forward(req).await {
1528 0 : ForwardOutcome::Forwarded(res) => {
1529 0 : return res;
1530 : }
1531 0 : ForwardOutcome::NotForwarded(_req) => {}
1532 : };
1533 :
1534 0 : json_response(
1535 : StatusCode::OK,
1536 0 : service
1537 0 : .tenant_timeline_locate(tenant_id, timeline_id)
1538 0 : .await?,
1539 : )
1540 0 : }
1541 :
1542 0 : async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1543 0 : check_permissions(&req, Scope::Admin)?;
1544 :
1545 0 : let req = match maybe_forward(req).await {
1546 0 : ForwardOutcome::Forwarded(res) => {
1547 0 : return res;
1548 : }
1549 0 : ForwardOutcome::NotForwarded(req) => req,
1550 : };
1551 :
1552 0 : let state = get_state(&req);
1553 0 : state.service.tenants_dump()
1554 0 : }
1555 :
1556 0 : async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1557 0 : check_permissions(&req, Scope::Admin)?;
1558 :
1559 0 : let req = match maybe_forward(req).await {
1560 0 : ForwardOutcome::Forwarded(res) => {
1561 0 : return res;
1562 : }
1563 0 : ForwardOutcome::NotForwarded(req) => req,
1564 : };
1565 :
1566 0 : let state = get_state(&req);
1567 0 : state.service.scheduler_dump()
1568 0 : }
1569 :
1570 0 : async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1571 0 : check_permissions(&req, Scope::Admin)?;
1572 :
1573 0 : let req = match maybe_forward(req).await {
1574 0 : ForwardOutcome::Forwarded(res) => {
1575 0 : return res;
1576 : }
1577 0 : ForwardOutcome::NotForwarded(req) => req,
1578 : };
1579 :
1580 0 : let state = get_state(&req);
1581 :
1582 0 : json_response(StatusCode::OK, state.service.consistency_check().await?)
1583 0 : }
1584 :
1585 0 : async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1586 0 : check_permissions(&req, Scope::Admin)?;
1587 :
1588 0 : let req = match maybe_forward(req).await {
1589 0 : ForwardOutcome::Forwarded(res) => {
1590 0 : return res;
1591 : }
1592 0 : ForwardOutcome::NotForwarded(req) => req,
1593 : };
1594 :
1595 0 : let state = get_state(&req);
1596 :
1597 0 : json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
1598 0 : }
1599 :
1600 : /// Status endpoint is just used for checking that our HTTP listener is up
1601 : ///
1602 : /// This serves as our k8s startup probe.
1603 0 : async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1604 0 : match maybe_forward(req).await {
1605 0 : ForwardOutcome::Forwarded(res) => {
1606 0 : return res;
1607 : }
1608 0 : ForwardOutcome::NotForwarded(_req) => {}
1609 : };
1610 :
1611 0 : json_response(StatusCode::OK, ())
1612 0 : }
1613 :
1614 : /// Liveness endpoint indicates that this storage controller is in a state
1615 : /// where it can fulfill it's responsibilties. Namely, startup has finished
1616 : /// and it is the current leader.
1617 : ///
1618 : /// This serves as our k8s liveness probe.
1619 0 : async fn handle_live(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1620 0 : let req = match maybe_forward(req).await {
1621 0 : ForwardOutcome::Forwarded(res) => {
1622 0 : return res;
1623 : }
1624 0 : ForwardOutcome::NotForwarded(req) => req,
1625 : };
1626 :
1627 0 : let state = get_state(&req);
1628 0 : let live = state.service.startup_complete.is_ready()
1629 0 : && state.service.get_leadership_status() == LeadershipStatus::Leader;
1630 :
1631 0 : if live {
1632 0 : json_response(StatusCode::OK, ())
1633 : } else {
1634 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
1635 : }
1636 0 : }
1637 :
1638 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
1639 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
1640 0 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1641 0 : let req = match maybe_forward(req).await {
1642 0 : ForwardOutcome::Forwarded(res) => {
1643 0 : return res;
1644 : }
1645 0 : ForwardOutcome::NotForwarded(req) => req,
1646 : };
1647 :
1648 0 : let state = get_state(&req);
1649 0 : if state.service.startup_complete.is_ready() {
1650 0 : json_response(StatusCode::OK, ())
1651 : } else {
1652 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
1653 : }
1654 0 : }
1655 :
1656 : impl From<ReconcileError> for ApiError {
1657 0 : fn from(value: ReconcileError) -> Self {
1658 0 : ApiError::Conflict(format!("Reconciliation error: {value}"))
1659 0 : }
1660 : }
1661 :
1662 : /// Return the safekeeper record by instance id, or 404.
1663 : ///
1664 : /// Not used by anything except manual testing.
1665 0 : async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1666 0 : check_permissions(&req, Scope::Infra)?;
1667 :
1668 0 : let id = parse_request_param::<i64>(&req, "id")?;
1669 :
1670 0 : let req = match maybe_forward(req).await {
1671 0 : ForwardOutcome::Forwarded(res) => {
1672 0 : return res;
1673 : }
1674 0 : ForwardOutcome::NotForwarded(req) => req,
1675 : };
1676 :
1677 0 : let state = get_state(&req);
1678 :
1679 0 : let res = state.service.get_safekeeper(id).await;
1680 :
1681 0 : match res {
1682 0 : Ok(b) => json_response(StatusCode::OK, b),
1683 : Err(crate::persistence::DatabaseError::Query(diesel::result::Error::NotFound)) => {
1684 0 : Err(ApiError::NotFound("unknown instance id".into()))
1685 : }
1686 0 : Err(other) => Err(other.into()),
1687 : }
1688 0 : }
1689 :
1690 : /// Used as part of deployment scripts.
1691 : ///
1692 : /// Assumes information is only relayed to storage controller after first selecting an unique id on
1693 : /// control plane database, which means we have an id field in the request and payload.
1694 0 : async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
1695 0 : check_permissions(&req, Scope::Infra)?;
1696 :
1697 0 : let body = json_request::<SafekeeperUpsert>(&mut req).await?;
1698 0 : let id = parse_request_param::<i64>(&req, "id")?;
1699 :
1700 0 : if id != body.id {
1701 : // it should be repeated
1702 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1703 0 : "id mismatch: url={id:?}, body={:?}",
1704 0 : body.id
1705 0 : )));
1706 0 : }
1707 :
1708 0 : if id <= 0 {
1709 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1710 0 : "id not allowed to be zero or negative: {id}"
1711 0 : )));
1712 0 : }
1713 :
1714 0 : let req = match maybe_forward(req).await {
1715 0 : ForwardOutcome::Forwarded(res) => {
1716 0 : return res;
1717 : }
1718 0 : ForwardOutcome::NotForwarded(req) => req,
1719 : };
1720 :
1721 0 : let state = get_state(&req);
1722 :
1723 0 : state.service.upsert_safekeeper(body).await?;
1724 :
1725 0 : Ok(Response::builder()
1726 0 : .status(StatusCode::NO_CONTENT)
1727 0 : .body(Body::empty())
1728 0 : .unwrap())
1729 0 : }
1730 :
1731 : /// Sets the scheduling policy of the specified safekeeper
1732 0 : async fn handle_safekeeper_scheduling_policy(
1733 0 : mut req: Request<Body>,
1734 0 : ) -> Result<Response<Body>, ApiError> {
1735 0 : check_permissions(&req, Scope::Admin)?;
1736 :
1737 0 : let body = json_request::<SafekeeperSchedulingPolicyRequest>(&mut req).await?;
1738 0 : let id = parse_request_param::<i64>(&req, "id")?;
1739 :
1740 0 : let req = match maybe_forward(req).await {
1741 0 : ForwardOutcome::Forwarded(res) => {
1742 0 : return res;
1743 : }
1744 0 : ForwardOutcome::NotForwarded(req) => req,
1745 : };
1746 :
1747 0 : let state = get_state(&req);
1748 :
1749 0 : state
1750 0 : .service
1751 0 : .set_safekeeper_scheduling_policy(id, body.scheduling_policy)
1752 0 : .await?;
1753 :
1754 0 : json_response(StatusCode::OK, ())
1755 0 : }
1756 :
1757 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
1758 : /// be allowed to run if Service has finished its initial reconciliation.
1759 0 : async fn tenant_service_handler<R, H>(
1760 0 : request: Request<Body>,
1761 0 : handler: H,
1762 0 : request_name: RequestName,
1763 0 : ) -> R::Output
1764 0 : where
1765 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1766 0 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
1767 0 : {
1768 0 : let state = get_state(&request);
1769 0 : let service = state.service.clone();
1770 :
1771 0 : let startup_complete = service.startup_complete.clone();
1772 0 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
1773 0 : .await
1774 0 : .is_err()
1775 : {
1776 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
1777 : // timeouts around its remote calls, to bound its runtime.
1778 0 : return Err(ApiError::Timeout(
1779 0 : "Timed out waiting for service readiness".into(),
1780 0 : ));
1781 0 : }
1782 :
1783 0 : named_request_span(
1784 0 : request,
1785 0 : |request| async move { handler(service, request).await },
1786 0 : request_name,
1787 : )
1788 0 : .await
1789 0 : }
1790 :
1791 : /// Check if the required scope is held in the request's token, or if the request has
1792 : /// a token with 'admin' scope then always permit it.
1793 0 : fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(), ApiError> {
1794 0 : check_permission_with(request, |claims| {
1795 0 : match crate::auth::check_permission(claims, required_scope) {
1796 0 : Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
1797 0 : Ok(()) => Ok(()),
1798 0 : Err(_) => Err(e),
1799 : },
1800 0 : Ok(()) => Ok(()),
1801 : }
1802 0 : })
1803 0 : }
1804 :
1805 : #[derive(Clone, Debug)]
1806 : struct RequestMeta {
1807 : method: hyper::http::Method,
1808 : at: Instant,
1809 : }
1810 :
1811 0 : pub fn prologue_leadership_status_check_middleware<
1812 0 : B: hyper::body::HttpBody + Send + Sync + 'static,
1813 0 : >() -> Middleware<B, ApiError> {
1814 0 : Middleware::pre(move |req| async move {
1815 0 : let state = get_state(&req);
1816 0 : let leadership_status = state.service.get_leadership_status();
1817 :
1818 : enum AllowedRoutes {
1819 : All,
1820 : Some(&'static [&'static str]),
1821 : }
1822 :
1823 0 : let allowed_routes = match leadership_status {
1824 0 : LeadershipStatus::Leader => AllowedRoutes::All,
1825 0 : LeadershipStatus::SteppedDown => AllowedRoutes::All,
1826 0 : LeadershipStatus::Candidate => AllowedRoutes::Some(&[
1827 0 : "/ready",
1828 0 : "/status",
1829 0 : "/metrics",
1830 0 : "/profile/cpu",
1831 0 : "/profile/heap",
1832 0 : ]),
1833 : };
1834 :
1835 0 : match allowed_routes {
1836 0 : AllowedRoutes::All => Ok(req),
1837 0 : AllowedRoutes::Some(allowed) if allowed.contains(&req.uri().path()) => Ok(req),
1838 : _ => {
1839 0 : tracing::info!(
1840 0 : "Request {} not allowed due to current leadership state",
1841 0 : req.uri()
1842 : );
1843 :
1844 0 : Err(ApiError::ResourceUnavailable(
1845 0 : format!("Current leadership status is {leadership_status}").into(),
1846 0 : ))
1847 : }
1848 : }
1849 0 : })
1850 0 : }
1851 :
1852 0 : fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
1853 0 : -> Middleware<B, ApiError> {
1854 0 : Middleware::pre(move |req| async move {
1855 0 : let meta = RequestMeta {
1856 0 : method: req.method().clone(),
1857 0 : at: Instant::now(),
1858 0 : };
1859 :
1860 0 : req.set_context(meta);
1861 :
1862 0 : Ok(req)
1863 0 : })
1864 0 : }
1865 :
1866 0 : fn epilogue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
1867 0 : -> Middleware<B, ApiError> {
1868 0 : Middleware::post_with_info(move |resp, req_info| async move {
1869 0 : let request_name = match req_info.context::<RequestName>() {
1870 0 : Some(name) => name,
1871 : None => {
1872 0 : return Ok(resp);
1873 : }
1874 : };
1875 :
1876 0 : if let Some(meta) = req_info.context::<RequestMeta>() {
1877 0 : let status = &crate::metrics::METRICS_REGISTRY
1878 0 : .metrics_group
1879 0 : .storage_controller_http_request_status;
1880 0 : let latency = &crate::metrics::METRICS_REGISTRY
1881 0 : .metrics_group
1882 0 : .storage_controller_http_request_latency;
1883 0 :
1884 0 : status.inc(HttpRequestStatusLabelGroup {
1885 0 : path: request_name.0,
1886 0 : method: meta.method.clone().into(),
1887 0 : status: crate::metrics::StatusCode(resp.status()),
1888 0 : });
1889 0 :
1890 0 : latency.observe(
1891 0 : HttpRequestLatencyLabelGroup {
1892 0 : path: request_name.0,
1893 0 : method: meta.method.into(),
1894 0 : },
1895 0 : meta.at.elapsed().as_secs_f64(),
1896 0 : );
1897 0 : }
1898 0 : Ok(resp)
1899 0 : })
1900 0 : }
1901 :
1902 0 : pub async fn measured_metrics_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1903 : pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
1904 :
1905 0 : let req = match maybe_forward(req).await {
1906 0 : ForwardOutcome::Forwarded(res) => {
1907 0 : return res;
1908 : }
1909 0 : ForwardOutcome::NotForwarded(req) => req,
1910 : };
1911 :
1912 0 : let state = get_state(&req);
1913 0 : let payload = crate::metrics::METRICS_REGISTRY.encode(&state.neon_metrics);
1914 0 : let response = Response::builder()
1915 0 : .status(200)
1916 0 : .header(CONTENT_TYPE, TEXT_FORMAT)
1917 0 : .body(payload.into())
1918 0 : .unwrap();
1919 :
1920 0 : Ok(response)
1921 0 : }
1922 :
1923 : #[derive(Clone)]
1924 : struct RequestName(&'static str);
1925 :
1926 0 : async fn named_request_span<R, H>(
1927 0 : request: Request<Body>,
1928 0 : handler: H,
1929 0 : name: RequestName,
1930 0 : ) -> R::Output
1931 0 : where
1932 0 : R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1933 0 : H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
1934 0 : {
1935 0 : request.set_context(name);
1936 0 : request_span(request, handler).await
1937 0 : }
1938 :
1939 : enum ForwardOutcome {
1940 : Forwarded(Result<Response<Body>, ApiError>),
1941 : NotForwarded(Request<Body>),
1942 : }
1943 :
1944 : /// Potentially forward the request to the current storage controler leader.
1945 : /// More specifically we forward when:
1946 : /// 1. Request is not one of:
1947 : /// ["/control/v1/step_down", "/status", "/ready", "/metrics", "/profile/cpu", "/profile/heap"]
1948 : /// 2. Current instance is in [`LeadershipStatus::SteppedDown`] state
1949 : /// 3. There is a leader in the database to forward to
1950 : /// 4. Leader from step (3) is not the current instance
1951 : ///
1952 : /// Why forward?
1953 : /// It turns out that we can't rely on external orchestration to promptly route trafic to the
1954 : /// new leader. This is downtime inducing. Forwarding provides a safe way out.
1955 : ///
1956 : /// Why is it safe?
1957 : /// If a storcon instance is persisted in the database, then we know that it is the current leader.
1958 : /// There's one exception: time between handling step-down request and the new leader updating the
1959 : /// database.
1960 : ///
1961 : /// Let's treat the happy case first. The stepped down node does not produce any side effects,
1962 : /// since all request handling happens on the leader.
1963 : ///
1964 : /// As for the edge case, we are guaranteed to always have a maximum of two running instances.
1965 : /// Hence, if we are in the edge case scenario the leader persisted in the database is the
1966 : /// stepped down instance that received the request. Condition (4) above covers this scenario.
1967 0 : async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
1968 : const NOT_FOR_FORWARD: &[&str] = &[
1969 : "/control/v1/step_down",
1970 : "/status",
1971 : "/live",
1972 : "/ready",
1973 : "/metrics",
1974 : "/profile/cpu",
1975 : "/profile/heap",
1976 : ];
1977 :
1978 0 : let uri = req.uri();
1979 0 : let uri_for_forward = !NOT_FOR_FORWARD.contains(&uri.path());
1980 :
1981 : // Fast return before trying to take any Service locks, if we will never forward anyway
1982 0 : if !uri_for_forward {
1983 0 : return ForwardOutcome::NotForwarded(req);
1984 0 : }
1985 :
1986 0 : let state = get_state(&req);
1987 0 : let leadership_status = state.service.get_leadership_status();
1988 :
1989 0 : if leadership_status != LeadershipStatus::SteppedDown {
1990 0 : return ForwardOutcome::NotForwarded(req);
1991 0 : }
1992 :
1993 0 : let leader = state.service.get_leader().await;
1994 0 : let leader = {
1995 0 : match leader {
1996 0 : Ok(Some(leader)) => leader,
1997 : Ok(None) => {
1998 0 : return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
1999 0 : "No leader to forward to while in stepped down state".into(),
2000 0 : )));
2001 : }
2002 0 : Err(err) => {
2003 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
2004 0 : anyhow::anyhow!(
2005 0 : "Failed to get leader for forwarding while in stepped down state: {err}"
2006 0 : ),
2007 0 : )));
2008 : }
2009 : }
2010 : };
2011 :
2012 0 : let cfg = state.service.get_config();
2013 0 : if let Some(ref self_addr) = cfg.address_for_peers {
2014 0 : let leader_addr = match Uri::from_str(leader.address.as_str()) {
2015 0 : Ok(uri) => uri,
2016 0 : Err(err) => {
2017 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
2018 0 : anyhow::anyhow!(
2019 0 : "Failed to parse leader uri for forwarding while in stepped down state: {err}"
2020 0 : ),
2021 0 : )));
2022 : }
2023 : };
2024 :
2025 0 : if *self_addr == leader_addr {
2026 0 : return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
2027 0 : "Leader is stepped down instance".into(),
2028 0 : )));
2029 0 : }
2030 0 : }
2031 :
2032 0 : tracing::info!("Forwarding {} to leader at {}", uri, leader.address);
2033 :
2034 : // Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
2035 : // include some leeway to get the timeout for proxied requests.
2036 : const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
2037 :
2038 0 : let client = state.service.get_http_client().clone();
2039 :
2040 0 : let request: reqwest::Request = match convert_request(
2041 0 : req,
2042 0 : &client,
2043 0 : leader.address,
2044 : PROXIED_REQUEST_TIMEOUT,
2045 : )
2046 0 : .await
2047 : {
2048 0 : Ok(r) => r,
2049 0 : Err(err) => {
2050 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
2051 0 : "Failed to convert request for forwarding while in stepped down state: {err}"
2052 0 : ))));
2053 : }
2054 : };
2055 :
2056 0 : let response = match client.execute(request).await {
2057 0 : Ok(r) => r,
2058 0 : Err(err) => {
2059 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
2060 0 : "Failed to forward while in stepped down state: {err}"
2061 0 : ))));
2062 : }
2063 : };
2064 :
2065 0 : ForwardOutcome::Forwarded(convert_response(response).await)
2066 0 : }
2067 :
2068 : /// Convert a [`reqwest::Response`] to a [hyper::Response`] by passing through
2069 : /// a stable representation (string, bytes or integer)
2070 : ///
2071 : /// Ideally, we would not have to do this since both types use the http crate
2072 : /// under the hood. However, they use different versions of the crate and keeping
2073 : /// second order dependencies in sync is difficult.
2074 0 : async fn convert_response(resp: reqwest::Response) -> Result<hyper::Response<Body>, ApiError> {
2075 : use std::str::FromStr;
2076 :
2077 0 : let mut builder = hyper::Response::builder().status(resp.status().as_u16());
2078 0 : for (key, value) in resp.headers().into_iter() {
2079 0 : let key = hyper::header::HeaderName::from_str(key.as_str()).map_err(|err| {
2080 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
2081 0 : })?;
2082 :
2083 0 : let value = hyper::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
2084 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
2085 0 : })?;
2086 :
2087 0 : builder = builder.header(key, value);
2088 : }
2089 :
2090 0 : let body = http::Body::wrap_stream(resp.bytes_stream());
2091 :
2092 0 : builder.body(body).map_err(|err| {
2093 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
2094 0 : })
2095 0 : }
2096 :
2097 : /// Convert a [`reqwest::Request`] to a [hyper::Request`] by passing through
2098 : /// a stable representation (string, bytes or integer)
2099 : ///
2100 : /// See [`convert_response`] for why we are doing it this way.
2101 0 : async fn convert_request(
2102 0 : req: hyper::Request<Body>,
2103 0 : client: &reqwest::Client,
2104 0 : to_address: String,
2105 0 : timeout: Duration,
2106 0 : ) -> Result<reqwest::Request, ApiError> {
2107 : use std::str::FromStr;
2108 :
2109 0 : let (parts, body) = req.into_parts();
2110 0 : let method = reqwest::Method::from_str(parts.method.as_str()).map_err(|err| {
2111 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2112 0 : })?;
2113 :
2114 0 : let path_and_query = parts.uri.path_and_query().ok_or_else(|| {
2115 0 : ApiError::InternalServerError(anyhow::anyhow!(
2116 0 : "Request conversion failed: no path and query"
2117 0 : ))
2118 0 : })?;
2119 :
2120 0 : let uri = reqwest::Url::from_str(
2121 0 : format!(
2122 0 : "{}{}",
2123 0 : to_address.trim_end_matches("/"),
2124 0 : path_and_query.as_str()
2125 0 : )
2126 0 : .as_str(),
2127 : )
2128 0 : .map_err(|err| {
2129 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2130 0 : })?;
2131 :
2132 0 : let mut headers = reqwest::header::HeaderMap::new();
2133 0 : for (key, value) in parts.headers.into_iter() {
2134 0 : let key = match key {
2135 0 : Some(k) => k,
2136 : None => {
2137 0 : continue;
2138 : }
2139 : };
2140 :
2141 0 : let key = reqwest::header::HeaderName::from_str(key.as_str()).map_err(|err| {
2142 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2143 0 : })?;
2144 :
2145 0 : let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
2146 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2147 0 : })?;
2148 :
2149 0 : headers.insert(key, value);
2150 : }
2151 :
2152 0 : let body = hyper::body::to_bytes(body).await.map_err(|err| {
2153 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2154 0 : })?;
2155 :
2156 0 : client
2157 0 : .request(method, uri)
2158 0 : .headers(headers)
2159 0 : .body(body)
2160 0 : .timeout(timeout)
2161 0 : .build()
2162 0 : .map_err(|err| {
2163 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
2164 0 : })
2165 0 : }
2166 :
2167 0 : pub fn make_router(
2168 0 : service: Arc<Service>,
2169 0 : auth: Option<Arc<SwappableJwtAuth>>,
2170 0 : build_info: BuildInfo,
2171 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
2172 0 : let mut router = endpoint::make_router()
2173 0 : .middleware(prologue_leadership_status_check_middleware())
2174 0 : .middleware(prologue_metrics_middleware())
2175 0 : .middleware(epilogue_metrics_middleware());
2176 0 : if auth.is_some() {
2177 0 : router = router.middleware(auth_middleware(|request| {
2178 0 : let state = get_state(request);
2179 0 : if state.allowlist_routes.contains(&request.uri().path()) {
2180 0 : None
2181 : } else {
2182 0 : state.auth.as_deref()
2183 : }
2184 0 : }));
2185 0 : }
2186 :
2187 0 : router
2188 0 : .data(Arc::new(HttpState::new(service, auth, build_info)))
2189 : // Non-prefixed generic endpoints (status, metrics, profiling)
2190 0 : .get("/metrics", |r| {
2191 0 : named_request_span(r, measured_metrics_handler, RequestName("metrics"))
2192 0 : })
2193 0 : .get("/status", |r| {
2194 0 : named_request_span(r, handle_status, RequestName("status"))
2195 0 : })
2196 0 : .get("/live", |r| {
2197 0 : named_request_span(r, handle_live, RequestName("live"))
2198 0 : })
2199 0 : .get("/ready", |r| {
2200 0 : named_request_span(r, handle_ready, RequestName("ready"))
2201 0 : })
2202 0 : .get("/profile/cpu", |r| {
2203 0 : named_request_span(r, profile_cpu_handler, RequestName("profile_cpu"))
2204 0 : })
2205 0 : .get("/profile/heap", |r| {
2206 0 : named_request_span(r, profile_heap_handler, RequestName("profile_heap"))
2207 0 : })
2208 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
2209 0 : .post("/upcall/v1/re-attach", |r| {
2210 0 : named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
2211 0 : })
2212 0 : .post("/upcall/v1/validate", |r| {
2213 0 : named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
2214 0 : })
2215 0 : .get("/upcall/v1/timeline_import_status", |r| {
2216 0 : named_request_span(
2217 0 : r,
2218 : handle_get_timeline_import_status,
2219 0 : RequestName("upcall_v1_timeline_import_status"),
2220 : )
2221 0 : })
2222 0 : .post("/upcall/v1/timeline_import_status", |r| {
2223 0 : named_request_span(
2224 0 : r,
2225 : handle_put_timeline_import_status,
2226 0 : RequestName("upcall_v1_timeline_import_status"),
2227 : )
2228 0 : })
2229 : // Test/dev/debug endpoints
2230 0 : .post("/debug/v1/attach-hook", |r| {
2231 0 : named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
2232 0 : })
2233 0 : .post("/debug/v1/inspect", |r| {
2234 0 : named_request_span(r, handle_inspect, RequestName("debug_v1_inspect"))
2235 0 : })
2236 0 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
2237 0 : named_request_span(r, handle_tenant_drop, RequestName("debug_v1_tenant_drop"))
2238 0 : })
2239 0 : .post("/debug/v1/node/:node_id/drop", |r| {
2240 0 : named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
2241 0 : })
2242 0 : .delete("/debug/v1/tombstone/:node_id", |r| {
2243 0 : named_request_span(
2244 0 : r,
2245 : handle_tombstone_delete,
2246 0 : RequestName("debug_v1_tombstone_delete"),
2247 : )
2248 0 : })
2249 0 : .get("/debug/v1/tombstone", |r| {
2250 0 : named_request_span(
2251 0 : r,
2252 : handle_tombstone_list,
2253 0 : RequestName("debug_v1_tombstone_list"),
2254 : )
2255 0 : })
2256 0 : .post("/debug/v1/tenant/:tenant_id/import", |r| {
2257 0 : named_request_span(
2258 0 : r,
2259 : handle_tenant_import,
2260 0 : RequestName("debug_v1_tenant_import"),
2261 : )
2262 0 : })
2263 0 : .get("/debug/v1/tenant", |r| {
2264 0 : named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
2265 0 : })
2266 0 : .get("/debug/v1/tenant/:tenant_id/locate", |r| {
2267 0 : tenant_service_handler(
2268 0 : r,
2269 : handle_tenant_locate,
2270 0 : RequestName("debug_v1_tenant_locate"),
2271 : )
2272 0 : })
2273 0 : .post(
2274 : "/debug/v1/tenant/:tenant_id/timeline/:timeline_id/import",
2275 0 : |r| {
2276 0 : named_request_span(
2277 0 : r,
2278 : handle_timeline_import,
2279 0 : RequestName("debug_v1_timeline_import"),
2280 : )
2281 0 : },
2282 : )
2283 0 : .get(
2284 : "/debug/v1/tenant/:tenant_id/timeline/:timeline_id/locate",
2285 0 : |r| {
2286 0 : tenant_service_handler(
2287 0 : r,
2288 : handle_tenant_timeline_locate,
2289 0 : RequestName("v1_tenant_timeline_locate"),
2290 : )
2291 0 : },
2292 : )
2293 0 : .get("/debug/v1/scheduler", |r| {
2294 0 : named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
2295 0 : })
2296 0 : .post("/debug/v1/consistency_check", |r| {
2297 0 : named_request_span(
2298 0 : r,
2299 : handle_consistency_check,
2300 0 : RequestName("debug_v1_consistency_check"),
2301 : )
2302 0 : })
2303 0 : .post("/debug/v1/reconcile_all", |r| {
2304 0 : request_span(r, handle_reconcile_all)
2305 0 : })
2306 0 : .put("/debug/v1/failpoints", |r| {
2307 0 : request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
2308 0 : })
2309 : // Node operations
2310 0 : .post("/control/v1/node", |r| {
2311 0 : named_request_span(r, handle_node_register, RequestName("control_v1_node"))
2312 0 : })
2313 : // This endpoint is deprecated and will be removed in a future version.
2314 : // Use PUT /control/v1/node/:node_id/delete instead.
2315 0 : .delete("/control/v1/node/:node_id", |r| {
2316 0 : named_request_span(
2317 0 : r,
2318 : handle_node_delete_old,
2319 0 : RequestName("control_v1_node_delete"),
2320 : )
2321 0 : })
2322 0 : .get("/control/v1/node", |r| {
2323 0 : named_request_span(r, handle_node_list, RequestName("control_v1_node"))
2324 0 : })
2325 0 : .put("/control/v1/node/:node_id/config", |r| {
2326 0 : named_request_span(
2327 0 : r,
2328 : handle_node_configure,
2329 0 : RequestName("control_v1_node_config"),
2330 : )
2331 0 : })
2332 0 : .get("/control/v1/node/:node_id", |r| {
2333 0 : named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
2334 0 : })
2335 0 : .get("/control/v1/node/:node_id/shards", |r| {
2336 0 : named_request_span(
2337 0 : r,
2338 : handle_node_shards,
2339 0 : RequestName("control_v1_node_describe"),
2340 : )
2341 0 : })
2342 0 : .get("/control/v1/leader", |r| {
2343 0 : named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
2344 0 : })
2345 0 : .put("/control/v1/node/:node_id/delete", |r| {
2346 0 : named_request_span(
2347 0 : r,
2348 : handle_node_delete,
2349 0 : RequestName("control_v1_start_node_delete"),
2350 : )
2351 0 : })
2352 0 : .delete("/control/v1/node/:node_id/delete", |r| {
2353 0 : named_request_span(
2354 0 : r,
2355 : handle_cancel_node_delete,
2356 0 : RequestName("control_v1_cancel_node_delete"),
2357 : )
2358 0 : })
2359 0 : .put("/control/v1/node/:node_id/drain", |r| {
2360 0 : named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
2361 0 : })
2362 0 : .delete("/control/v1/node/:node_id/drain", |r| {
2363 0 : named_request_span(
2364 0 : r,
2365 : handle_cancel_node_drain,
2366 0 : RequestName("control_v1_cancel_node_drain"),
2367 : )
2368 0 : })
2369 0 : .put("/control/v1/node/:node_id/fill", |r| {
2370 0 : named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
2371 0 : })
2372 0 : .delete("/control/v1/node/:node_id/fill", |r| {
2373 0 : named_request_span(
2374 0 : r,
2375 : handle_cancel_node_fill,
2376 0 : RequestName("control_v1_cancel_node_fill"),
2377 : )
2378 0 : })
2379 : // Metadata health operations
2380 0 : .post("/control/v1/metadata_health/update", |r| {
2381 0 : named_request_span(
2382 0 : r,
2383 : handle_metadata_health_update,
2384 0 : RequestName("control_v1_metadata_health_update"),
2385 : )
2386 0 : })
2387 0 : .get("/control/v1/metadata_health/unhealthy", |r| {
2388 0 : named_request_span(
2389 0 : r,
2390 : handle_metadata_health_list_unhealthy,
2391 0 : RequestName("control_v1_metadata_health_list_unhealthy"),
2392 : )
2393 0 : })
2394 0 : .post("/control/v1/metadata_health/outdated", |r| {
2395 0 : named_request_span(
2396 0 : r,
2397 : handle_metadata_health_list_outdated,
2398 0 : RequestName("control_v1_metadata_health_list_outdated"),
2399 : )
2400 0 : })
2401 : // Safekeepers
2402 0 : .get("/control/v1/safekeeper", |r| {
2403 0 : named_request_span(
2404 0 : r,
2405 : handle_safekeeper_list,
2406 0 : RequestName("control_v1_safekeeper_list"),
2407 : )
2408 0 : })
2409 0 : .get("/control/v1/safekeeper/:id", |r| {
2410 0 : named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
2411 0 : })
2412 0 : .post("/control/v1/safekeeper/:id", |r| {
2413 : // id is in the body
2414 0 : named_request_span(
2415 0 : r,
2416 : handle_upsert_safekeeper,
2417 0 : RequestName("v1_safekeeper_post"),
2418 : )
2419 0 : })
2420 0 : .post("/control/v1/safekeeper/:id/scheduling_policy", |r| {
2421 0 : named_request_span(
2422 0 : r,
2423 : handle_safekeeper_scheduling_policy,
2424 0 : RequestName("v1_safekeeper_scheduling_policy"),
2425 : )
2426 0 : })
2427 : // Tenant Shard operations
2428 0 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
2429 0 : tenant_service_handler(
2430 0 : r,
2431 : handle_tenant_shard_migrate,
2432 0 : RequestName("control_v1_tenant_migrate"),
2433 : )
2434 0 : })
2435 0 : .put(
2436 : "/control/v1/tenant/:tenant_shard_id/migrate_secondary",
2437 0 : |r| {
2438 0 : tenant_service_handler(
2439 0 : r,
2440 : handle_tenant_shard_migrate_secondary,
2441 0 : RequestName("control_v1_tenant_migrate_secondary"),
2442 : )
2443 0 : },
2444 : )
2445 0 : .put(
2446 : "/control/v1/tenant/:tenant_shard_id/cancel_reconcile",
2447 0 : |r| {
2448 0 : tenant_service_handler(
2449 0 : r,
2450 : handle_tenant_shard_cancel_reconcile,
2451 0 : RequestName("control_v1_tenant_cancel_reconcile"),
2452 : )
2453 0 : },
2454 : )
2455 0 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
2456 0 : tenant_service_handler(
2457 0 : r,
2458 : handle_tenant_shard_split,
2459 0 : RequestName("control_v1_tenant_shard_split"),
2460 : )
2461 0 : })
2462 0 : .get("/control/v1/tenant/:tenant_id", |r| {
2463 0 : tenant_service_handler(
2464 0 : r,
2465 : handle_tenant_describe,
2466 0 : RequestName("control_v1_tenant_describe"),
2467 : )
2468 0 : })
2469 0 : .get("/control/v1/tenant", |r| {
2470 0 : tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list"))
2471 0 : })
2472 0 : .put("/control/v1/tenant/:tenant_id/policy", |r| {
2473 0 : named_request_span(
2474 0 : r,
2475 : handle_tenant_update_policy,
2476 0 : RequestName("control_v1_tenant_policy"),
2477 : )
2478 0 : })
2479 0 : .put("/control/v1/preferred_azs", |r| {
2480 0 : named_request_span(
2481 0 : r,
2482 : handle_update_preferred_azs,
2483 0 : RequestName("control_v1_preferred_azs"),
2484 : )
2485 0 : })
2486 0 : .put("/control/v1/step_down", |r| {
2487 0 : named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
2488 0 : })
2489 : // Tenant operations
2490 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
2491 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
2492 0 : .post("/v1/tenant", |r| {
2493 0 : tenant_service_handler(r, handle_tenant_create, RequestName("v1_tenant"))
2494 0 : })
2495 0 : .delete("/v1/tenant/:tenant_id", |r| {
2496 0 : tenant_service_handler(r, handle_tenant_delete, RequestName("v1_tenant"))
2497 0 : })
2498 0 : .patch("/v1/tenant/config", |r| {
2499 0 : tenant_service_handler(
2500 0 : r,
2501 : handle_tenant_config_patch,
2502 0 : RequestName("v1_tenant_config"),
2503 : )
2504 0 : })
2505 0 : .put("/v1/tenant/config", |r| {
2506 0 : tenant_service_handler(r, handle_tenant_config_set, RequestName("v1_tenant_config"))
2507 0 : })
2508 0 : .get("/v1/tenant/:tenant_id/config", |r| {
2509 0 : tenant_service_handler(r, handle_tenant_config_get, RequestName("v1_tenant_config"))
2510 0 : })
2511 0 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
2512 0 : tenant_service_handler(
2513 0 : r,
2514 : handle_tenant_location_config,
2515 0 : RequestName("v1_tenant_location_config"),
2516 : )
2517 0 : })
2518 0 : .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
2519 0 : tenant_service_handler(
2520 0 : r,
2521 : handle_tenant_time_travel_remote_storage,
2522 0 : RequestName("v1_tenant_time_travel_remote_storage"),
2523 : )
2524 0 : })
2525 0 : .post("/v1/tenant/:tenant_id/secondary/download", |r| {
2526 0 : tenant_service_handler(
2527 0 : r,
2528 : handle_tenant_secondary_download,
2529 0 : RequestName("v1_tenant_secondary_download"),
2530 : )
2531 0 : })
2532 : // Timeline operations
2533 0 : .get("/control/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
2534 0 : tenant_service_handler(
2535 0 : r,
2536 : handle_tenant_timeline_describe,
2537 0 : RequestName("v1_tenant_timeline_describe"),
2538 : )
2539 0 : })
2540 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
2541 0 : tenant_service_handler(
2542 0 : r,
2543 : handle_tenant_timeline_delete,
2544 0 : RequestName("v1_tenant_timeline"),
2545 : )
2546 0 : })
2547 0 : .post("/v1/tenant/:tenant_id/timeline", |r| {
2548 0 : tenant_service_handler(
2549 0 : r,
2550 : handle_tenant_timeline_create,
2551 0 : RequestName("v1_tenant_timeline"),
2552 : )
2553 0 : })
2554 0 : .put(
2555 : "/v1/tenant/:tenant_id/timeline/:timeline_id/archival_config",
2556 0 : |r| {
2557 0 : tenant_service_handler(
2558 0 : r,
2559 : handle_tenant_timeline_archival_config,
2560 0 : RequestName("v1_tenant_timeline_archival_config"),
2561 : )
2562 0 : },
2563 : )
2564 0 : .put(
2565 : "/v1/tenant/:tenant_id/timeline/:timeline_id/detach_ancestor",
2566 0 : |r| {
2567 0 : tenant_service_handler(
2568 0 : r,
2569 : handle_tenant_timeline_detach_ancestor,
2570 0 : RequestName("v1_tenant_timeline_detach_ancestor"),
2571 : )
2572 0 : },
2573 : )
2574 0 : .post(
2575 : "/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
2576 0 : |r| {
2577 0 : tenant_service_handler(
2578 0 : r,
2579 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
2580 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2581 : )
2582 0 : },
2583 : )
2584 0 : .post(
2585 : "/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
2586 0 : |r| {
2587 0 : tenant_service_handler(
2588 0 : r,
2589 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
2590 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
2591 : )
2592 0 : },
2593 : )
2594 0 : .post(
2595 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_heatmap_layers",
2596 0 : |r| {
2597 0 : tenant_service_handler(
2598 0 : r,
2599 : handle_tenant_timeline_download_heatmap_layers,
2600 0 : RequestName("v1_tenant_timeline_download_heatmap_layers"),
2601 : )
2602 0 : },
2603 : )
2604 0 : .post(
2605 : "/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate",
2606 0 : |r| {
2607 0 : tenant_service_handler(
2608 0 : r,
2609 : handle_tenant_timeline_safekeeper_migrate,
2610 0 : RequestName("v1_tenant_timeline_safekeeper_migrate"),
2611 : )
2612 0 : },
2613 : )
2614 : // LSN lease passthrough to all shards
2615 0 : .post(
2616 : "/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",
2617 0 : |r| {
2618 0 : tenant_service_handler(
2619 0 : r,
2620 : handle_tenant_timeline_lsn_lease,
2621 0 : RequestName("v1_tenant_timeline_lsn_lease"),
2622 : )
2623 0 : },
2624 : )
2625 : // Tenant timeline mark_invisible passthrough to shard zero
2626 0 : .put(
2627 : "/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible",
2628 0 : |r| {
2629 0 : tenant_service_handler(
2630 0 : r,
2631 : handle_tenant_timeline_passthrough,
2632 0 : RequestName("v1_tenant_timeline_mark_invisible_passthrough"),
2633 : )
2634 0 : },
2635 : )
2636 : // Tenant detail GET passthrough to shard zero:
2637 0 : .get("/v1/tenant/:tenant_id", |r| {
2638 0 : tenant_service_handler(
2639 0 : r,
2640 : handle_tenant_timeline_passthrough,
2641 0 : RequestName("v1_tenant_passthrough"),
2642 : )
2643 0 : })
2644 : // The `*` in the URL is a wildcard: any tenant/timeline GET APIs on the pageserver
2645 : // are implicitly exposed here. This must be last in the list to avoid
2646 : // taking precedence over other GET methods we might implement by hand.
2647 0 : .get("/v1/tenant/:tenant_id/*", |r| {
2648 0 : tenant_service_handler(
2649 0 : r,
2650 : handle_tenant_timeline_passthrough,
2651 0 : RequestName("v1_tenant_passthrough"),
2652 : )
2653 0 : })
2654 0 : }
2655 :
2656 : #[cfg(test)]
2657 : mod test {
2658 :
2659 : use super::path_without_ids;
2660 :
2661 : #[test]
2662 1 : fn test_path_without_ids() {
2663 1 : assert_eq!(
2664 1 : path_without_ids(
2665 1 : "/v1/tenant/1a2b3344556677881122334455667788/timeline/AA223344556677881122334455667788"
2666 : ),
2667 : "/v1/tenant//timeline/"
2668 : );
2669 1 : assert_eq!(
2670 1 : path_without_ids(
2671 1 : "/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788"
2672 : ),
2673 : "/v1/tenant//timeline/"
2674 : );
2675 1 : assert_eq!(
2676 1 : path_without_ids(
2677 1 : "/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788?parameter=foo"
2678 : ),
2679 : "/v1/tenant//timeline/"
2680 : );
2681 1 : }
2682 : }
|