Line data Source code
1 : use crate::http;
2 : use crate::metrics::{
3 : HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, PageserverRequestLabelGroup,
4 : METRICS_REGISTRY,
5 : };
6 : use crate::persistence::SafekeeperPersistence;
7 : use crate::reconciler::ReconcileError;
8 : use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT};
9 : use anyhow::Context;
10 : use futures::Future;
11 : use hyper::header::CONTENT_TYPE;
12 : use hyper::{Body, Request, Response};
13 : use hyper::{StatusCode, Uri};
14 : use metrics::{BuildInfo, NeonMetrics};
15 : use pageserver_api::controller_api::{
16 : MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
17 : MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
18 : ShardsPreferredAzsRequest, TenantCreateRequest,
19 : };
20 : use pageserver_api::models::{
21 : TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
22 : TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest,
23 : };
24 : use pageserver_api::shard::TenantShardId;
25 : use pageserver_client::{mgmt_api, BlockUnblock};
26 : use std::str::FromStr;
27 : use std::sync::Arc;
28 : use std::time::{Duration, Instant};
29 : use tokio_util::sync::CancellationToken;
30 : use utils::auth::{Scope, SwappableJwtAuth};
31 : use utils::failpoint_support::failpoints_handler;
32 : use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
33 : use utils::http::request::{must_get_query_param, parse_query_param, parse_request_param};
34 : use utils::id::{TenantId, TimelineId};
35 :
36 : use utils::{
37 : http::{
38 : endpoint::{self},
39 : error::ApiError,
40 : json::{json_request, json_response},
41 : RequestExt, RouterBuilder,
42 : },
43 : id::NodeId,
44 : };
45 :
46 : use pageserver_api::controller_api::{
47 : NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantPolicyRequest,
48 : TenantShardMigrateRequest,
49 : };
50 : use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
51 :
52 : use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
53 :
54 : use routerify::Middleware;
55 :
56 : /// State available to HTTP request handlers
57 : pub struct HttpState {
58 : service: Arc<crate::service::Service>,
59 : auth: Option<Arc<SwappableJwtAuth>>,
60 : neon_metrics: NeonMetrics,
61 : allowlist_routes: Vec<Uri>,
62 : }
63 :
64 : impl HttpState {
65 0 : pub fn new(
66 0 : service: Arc<crate::service::Service>,
67 0 : auth: Option<Arc<SwappableJwtAuth>>,
68 0 : build_info: BuildInfo,
69 0 : ) -> Self {
70 0 : let allowlist_routes = ["/status", "/ready", "/metrics"]
71 0 : .iter()
72 0 : .map(|v| v.parse().unwrap())
73 0 : .collect::<Vec<_>>();
74 0 : Self {
75 0 : service,
76 0 : auth,
77 0 : neon_metrics: NeonMetrics::new(build_info),
78 0 : allowlist_routes,
79 0 : }
80 0 : }
81 : }
82 :
83 : #[inline(always)]
84 0 : fn get_state(request: &Request<Body>) -> &HttpState {
85 0 : request
86 0 : .data::<Arc<HttpState>>()
87 0 : .expect("unknown state type")
88 0 : .as_ref()
89 0 : }
90 :
91 : /// Pageserver calls into this on startup, to learn which tenants it should attach
92 0 : async fn handle_re_attach(req: Request<Body>) -> Result<Response<Body>, ApiError> {
93 0 : check_permissions(&req, Scope::GenerationsApi)?;
94 :
95 0 : let mut req = match maybe_forward(req).await {
96 0 : ForwardOutcome::Forwarded(res) => {
97 0 : return res;
98 : }
99 0 : ForwardOutcome::NotForwarded(req) => req,
100 : };
101 :
102 0 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
103 0 : let state = get_state(&req);
104 0 : json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
105 0 : }
106 :
107 : /// Pageserver calls into this before doing deletions, to confirm that it still
108 : /// holds the latest generation for the tenants with deletions enqueued
109 0 : async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError> {
110 0 : check_permissions(&req, Scope::GenerationsApi)?;
111 :
112 0 : let mut req = match maybe_forward(req).await {
113 0 : ForwardOutcome::Forwarded(res) => {
114 0 : return res;
115 : }
116 0 : ForwardOutcome::NotForwarded(req) => req,
117 : };
118 :
119 0 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
120 0 : let state = get_state(&req);
121 0 : json_response(StatusCode::OK, state.service.validate(validate_req).await?)
122 0 : }
123 :
124 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
125 : /// (in the real control plane this is unnecessary, because the same program is managing
126 : /// generation numbers and doing attachments).
127 0 : async fn handle_attach_hook(req: Request<Body>) -> Result<Response<Body>, ApiError> {
128 0 : check_permissions(&req, Scope::Admin)?;
129 :
130 0 : let mut req = match maybe_forward(req).await {
131 0 : ForwardOutcome::Forwarded(res) => {
132 0 : return res;
133 : }
134 0 : ForwardOutcome::NotForwarded(req) => req,
135 : };
136 :
137 0 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
138 0 : let state = get_state(&req);
139 0 :
140 0 : json_response(
141 0 : StatusCode::OK,
142 0 : state
143 0 : .service
144 0 : .attach_hook(attach_req)
145 0 : .await
146 0 : .map_err(ApiError::InternalServerError)?,
147 : )
148 0 : }
149 :
150 0 : async fn handle_inspect(req: Request<Body>) -> Result<Response<Body>, ApiError> {
151 0 : check_permissions(&req, Scope::Admin)?;
152 :
153 0 : let mut req = match maybe_forward(req).await {
154 0 : ForwardOutcome::Forwarded(res) => {
155 0 : return res;
156 : }
157 0 : ForwardOutcome::NotForwarded(req) => req,
158 : };
159 :
160 0 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
161 :
162 0 : let state = get_state(&req);
163 0 :
164 0 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
165 0 : }
166 :
167 0 : async fn handle_tenant_create(
168 0 : service: Arc<Service>,
169 0 : req: Request<Body>,
170 0 : ) -> Result<Response<Body>, ApiError> {
171 0 : check_permissions(&req, Scope::PageServerApi)?;
172 :
173 0 : let mut req = match maybe_forward(req).await {
174 0 : ForwardOutcome::Forwarded(res) => {
175 0 : return res;
176 : }
177 0 : ForwardOutcome::NotForwarded(req) => req,
178 : };
179 :
180 0 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
181 :
182 : json_response(
183 : StatusCode::CREATED,
184 0 : service.tenant_create(create_req).await?,
185 : )
186 0 : }
187 :
188 0 : async fn handle_tenant_location_config(
189 0 : service: Arc<Service>,
190 0 : req: Request<Body>,
191 0 : ) -> Result<Response<Body>, ApiError> {
192 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
193 0 : check_permissions(&req, Scope::PageServerApi)?;
194 :
195 0 : let mut req = match maybe_forward(req).await {
196 0 : ForwardOutcome::Forwarded(res) => {
197 0 : return res;
198 : }
199 0 : ForwardOutcome::NotForwarded(req) => req,
200 : };
201 :
202 0 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
203 : json_response(
204 : StatusCode::OK,
205 0 : service
206 0 : .tenant_location_config(tenant_shard_id, config_req)
207 0 : .await?,
208 : )
209 0 : }
210 :
211 0 : async fn handle_tenant_config_set(
212 0 : service: Arc<Service>,
213 0 : req: Request<Body>,
214 0 : ) -> Result<Response<Body>, ApiError> {
215 0 : check_permissions(&req, Scope::PageServerApi)?;
216 :
217 0 : let mut req = match maybe_forward(req).await {
218 0 : ForwardOutcome::Forwarded(res) => {
219 0 : return res;
220 : }
221 0 : ForwardOutcome::NotForwarded(req) => req,
222 : };
223 :
224 0 : let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
225 :
226 0 : json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
227 0 : }
228 :
229 0 : async fn handle_tenant_config_get(
230 0 : service: Arc<Service>,
231 0 : req: Request<Body>,
232 0 : ) -> Result<Response<Body>, ApiError> {
233 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
234 0 : check_permissions(&req, Scope::PageServerApi)?;
235 :
236 0 : match maybe_forward(req).await {
237 0 : ForwardOutcome::Forwarded(res) => {
238 0 : return res;
239 : }
240 0 : ForwardOutcome::NotForwarded(_req) => {}
241 0 : };
242 0 :
243 0 : json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
244 0 : }
245 :
246 0 : async fn handle_tenant_time_travel_remote_storage(
247 0 : service: Arc<Service>,
248 0 : req: Request<Body>,
249 0 : ) -> Result<Response<Body>, ApiError> {
250 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
251 0 : check_permissions(&req, Scope::PageServerApi)?;
252 :
253 0 : let mut req = match maybe_forward(req).await {
254 0 : ForwardOutcome::Forwarded(res) => {
255 0 : return res;
256 : }
257 0 : ForwardOutcome::NotForwarded(req) => req,
258 : };
259 :
260 0 : let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
261 :
262 0 : let timestamp_raw = must_get_query_param(&req, "travel_to")?;
263 0 : let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
264 0 : ApiError::BadRequest(anyhow::anyhow!(
265 0 : "Invalid time for travel_to: {timestamp_raw:?}"
266 0 : ))
267 0 : })?;
268 :
269 0 : let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
270 0 : let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
271 0 : ApiError::BadRequest(anyhow::anyhow!(
272 0 : "Invalid time for done_if_after: {done_if_after_raw:?}"
273 0 : ))
274 0 : })?;
275 :
276 0 : service
277 0 : .tenant_time_travel_remote_storage(
278 0 : &time_travel_req,
279 0 : tenant_id,
280 0 : timestamp_raw,
281 0 : done_if_after_raw,
282 0 : )
283 0 : .await?;
284 0 : json_response(StatusCode::OK, ())
285 0 : }
286 :
287 0 : fn map_reqwest_hyper_status(status: reqwest::StatusCode) -> Result<hyper::StatusCode, ApiError> {
288 0 : hyper::StatusCode::from_u16(status.as_u16())
289 0 : .context("invalid status code")
290 0 : .map_err(ApiError::InternalServerError)
291 0 : }
292 :
293 0 : async fn handle_tenant_secondary_download(
294 0 : service: Arc<Service>,
295 0 : req: Request<Body>,
296 0 : ) -> Result<Response<Body>, ApiError> {
297 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
298 0 : let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
299 0 :
300 0 : match maybe_forward(req).await {
301 0 : ForwardOutcome::Forwarded(res) => {
302 0 : return res;
303 : }
304 0 : ForwardOutcome::NotForwarded(_req) => {}
305 : };
306 :
307 0 : let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
308 0 : json_response(map_reqwest_hyper_status(status)?, progress)
309 0 : }
310 :
311 0 : async fn handle_tenant_delete(
312 0 : service: Arc<Service>,
313 0 : req: Request<Body>,
314 0 : ) -> Result<Response<Body>, ApiError> {
315 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
316 0 : check_permissions(&req, Scope::PageServerApi)?;
317 :
318 0 : match maybe_forward(req).await {
319 0 : ForwardOutcome::Forwarded(res) => {
320 0 : return res;
321 : }
322 0 : ForwardOutcome::NotForwarded(_req) => {}
323 : };
324 :
325 0 : let status_code = service
326 0 : .tenant_delete(tenant_id)
327 0 : .await
328 0 : .and_then(map_reqwest_hyper_status)?;
329 :
330 0 : if status_code == StatusCode::NOT_FOUND {
331 : // The pageserver uses 404 for successful deletion, but we use 200
332 0 : json_response(StatusCode::OK, ())
333 : } else {
334 0 : json_response(status_code, ())
335 : }
336 0 : }
337 :
338 0 : async fn handle_tenant_timeline_create(
339 0 : service: Arc<Service>,
340 0 : req: Request<Body>,
341 0 : ) -> Result<Response<Body>, ApiError> {
342 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
343 0 : check_permissions(&req, Scope::PageServerApi)?;
344 :
345 0 : let mut req = match maybe_forward(req).await {
346 0 : ForwardOutcome::Forwarded(res) => {
347 0 : return res;
348 : }
349 0 : ForwardOutcome::NotForwarded(req) => req,
350 : };
351 :
352 0 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
353 : json_response(
354 : StatusCode::CREATED,
355 0 : service
356 0 : .tenant_timeline_create(tenant_id, create_req)
357 0 : .await?,
358 : )
359 0 : }
360 :
361 0 : async fn handle_tenant_timeline_delete(
362 0 : service: Arc<Service>,
363 0 : req: Request<Body>,
364 0 : ) -> Result<Response<Body>, ApiError> {
365 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
366 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
367 :
368 0 : check_permissions(&req, Scope::PageServerApi)?;
369 :
370 0 : match maybe_forward(req).await {
371 0 : ForwardOutcome::Forwarded(res) => {
372 0 : return res;
373 : }
374 0 : ForwardOutcome::NotForwarded(_req) => {}
375 : };
376 :
377 : // For timeline deletions, which both implement an "initially return 202, then 404 once
378 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
379 0 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
380 0 : where
381 0 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
382 0 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
383 0 : {
384 0 : let started_at = Instant::now();
385 0 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
386 0 : // completed.
387 0 : let mut retry_period = Duration::from_secs(1);
388 0 : // On subsequent retries, wait longer.
389 0 : let max_retry_period = Duration::from_secs(5);
390 0 : // Enable callers with a 30 second request timeout to reliably get a response
391 0 : let max_wait = Duration::from_secs(25);
392 :
393 : loop {
394 0 : let status = f(service.clone()).await?;
395 0 : match status {
396 : StatusCode::ACCEPTED => {
397 0 : tracing::info!("Deletion accepted, waiting to try again...");
398 0 : tokio::time::sleep(retry_period).await;
399 0 : retry_period = max_retry_period;
400 : }
401 : StatusCode::NOT_FOUND => {
402 0 : tracing::info!("Deletion complete");
403 0 : return json_response(StatusCode::OK, ());
404 : }
405 : _ => {
406 0 : tracing::warn!("Unexpected status {status}");
407 0 : return json_response(status, ());
408 : }
409 : }
410 :
411 0 : let now = Instant::now();
412 0 : if now + retry_period > started_at + max_wait {
413 0 : tracing::info!("Deletion timed out waiting for 404");
414 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
415 : // the pageserver's swagger definition for this endpoint, and has the same desired
416 : // effect of causing the control plane to retry later.
417 0 : return json_response(StatusCode::CONFLICT, ());
418 0 : }
419 : }
420 0 : }
421 :
422 0 : deletion_wrapper(service, move |service| async move {
423 0 : service
424 0 : .tenant_timeline_delete(tenant_id, timeline_id)
425 0 : .await
426 0 : .and_then(map_reqwest_hyper_status)
427 0 : })
428 0 : .await
429 0 : }
430 :
431 0 : async fn handle_tenant_timeline_archival_config(
432 0 : service: Arc<Service>,
433 0 : req: Request<Body>,
434 0 : ) -> Result<Response<Body>, ApiError> {
435 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
436 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
437 :
438 0 : check_permissions(&req, Scope::PageServerApi)?;
439 :
440 0 : let mut req = match maybe_forward(req).await {
441 0 : ForwardOutcome::Forwarded(res) => {
442 0 : return res;
443 : }
444 0 : ForwardOutcome::NotForwarded(req) => req,
445 : };
446 :
447 0 : let create_req = json_request::<TimelineArchivalConfigRequest>(&mut req).await?;
448 :
449 0 : service
450 0 : .tenant_timeline_archival_config(tenant_id, timeline_id, create_req)
451 0 : .await?;
452 :
453 0 : json_response(StatusCode::OK, ())
454 0 : }
455 :
456 0 : async fn handle_tenant_timeline_detach_ancestor(
457 0 : service: Arc<Service>,
458 0 : req: Request<Body>,
459 0 : ) -> Result<Response<Body>, ApiError> {
460 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
461 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
462 :
463 0 : check_permissions(&req, Scope::PageServerApi)?;
464 :
465 0 : match maybe_forward(req).await {
466 0 : ForwardOutcome::Forwarded(res) => {
467 0 : return res;
468 : }
469 0 : ForwardOutcome::NotForwarded(_req) => {}
470 : };
471 :
472 0 : let res = service
473 0 : .tenant_timeline_detach_ancestor(tenant_id, timeline_id)
474 0 : .await?;
475 :
476 0 : json_response(StatusCode::OK, res)
477 0 : }
478 :
479 0 : async fn handle_tenant_timeline_block_unblock_gc(
480 0 : service: Arc<Service>,
481 0 : req: Request<Body>,
482 0 : dir: BlockUnblock,
483 0 : ) -> Result<Response<Body>, ApiError> {
484 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
485 0 : check_permissions(&req, Scope::PageServerApi)?;
486 :
487 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
488 :
489 0 : service
490 0 : .tenant_timeline_block_unblock_gc(tenant_id, timeline_id, dir)
491 0 : .await?;
492 :
493 0 : json_response(StatusCode::OK, ())
494 0 : }
495 :
496 0 : async fn handle_tenant_timeline_passthrough(
497 0 : service: Arc<Service>,
498 0 : req: Request<Body>,
499 0 : ) -> Result<Response<Body>, ApiError> {
500 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
501 0 : check_permissions(&req, Scope::PageServerApi)?;
502 :
503 0 : let req = match maybe_forward(req).await {
504 0 : ForwardOutcome::Forwarded(res) => {
505 0 : return res;
506 : }
507 0 : ForwardOutcome::NotForwarded(req) => req,
508 : };
509 :
510 0 : let Some(path) = req.uri().path_and_query() else {
511 : // This should never happen, our request router only calls us if there is a path
512 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
513 : };
514 :
515 0 : tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
516 :
517 : // Find the node that holds shard zero
518 0 : let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?;
519 :
520 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
521 : // rewrite this to a shard-aware shard zero ID.
522 0 : let path = format!("{}", path);
523 0 : let tenant_str = tenant_id.to_string();
524 0 : let tenant_shard_str = format!("{}", tenant_shard_id);
525 0 : let path = path.replace(&tenant_str, &tenant_shard_str);
526 0 :
527 0 : let latency = &METRICS_REGISTRY
528 0 : .metrics_group
529 0 : .storage_controller_passthrough_request_latency;
530 0 :
531 0 : // This is a bit awkward. We remove the param from the request
532 0 : // and join the words by '_' to get a label for the request.
533 0 : let just_path = path.replace(&tenant_shard_str, "");
534 0 : let path_label = just_path
535 0 : .split('/')
536 0 : .filter(|token| !token.is_empty())
537 0 : .collect::<Vec<_>>()
538 0 : .join("_");
539 0 : let labels = PageserverRequestLabelGroup {
540 0 : pageserver_id: &node.get_id().to_string(),
541 0 : path: &path_label,
542 0 : method: crate::metrics::Method::Get,
543 0 : };
544 0 :
545 0 : let _timer = latency.start_timer(labels.clone());
546 0 :
547 0 : let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
548 0 : let resp = client.get_raw(path).await.map_err(|_e|
549 : // FIXME: give APiError a proper Unavailable variant. We return 503 here because
550 : // if we can't successfully send a request to the pageserver, we aren't available.
551 0 : ApiError::ShuttingDown)?;
552 :
553 0 : if !resp.status().is_success() {
554 0 : let error_counter = &METRICS_REGISTRY
555 0 : .metrics_group
556 0 : .storage_controller_passthrough_request_error;
557 0 : error_counter.inc(labels);
558 0 : }
559 :
560 : // We have a reqest::Response, would like a http::Response
561 0 : let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
562 0 : for (k, v) in resp.headers() {
563 0 : builder = builder.header(k.as_str(), v.as_bytes());
564 0 : }
565 :
566 0 : let response = builder
567 0 : .body(Body::wrap_stream(resp.bytes_stream()))
568 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
569 :
570 0 : Ok(response)
571 0 : }
572 :
573 0 : async fn handle_tenant_locate(
574 0 : service: Arc<Service>,
575 0 : req: Request<Body>,
576 0 : ) -> Result<Response<Body>, ApiError> {
577 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
578 :
579 0 : check_permissions(&req, Scope::Admin)?;
580 :
581 0 : match maybe_forward(req).await {
582 0 : ForwardOutcome::Forwarded(res) => {
583 0 : return res;
584 : }
585 0 : ForwardOutcome::NotForwarded(_req) => {}
586 0 : };
587 0 :
588 0 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
589 0 : }
590 :
591 0 : async fn handle_tenant_describe(
592 0 : service: Arc<Service>,
593 0 : req: Request<Body>,
594 0 : ) -> Result<Response<Body>, ApiError> {
595 0 : check_permissions(&req, Scope::Scrubber)?;
596 :
597 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
598 :
599 0 : match maybe_forward(req).await {
600 0 : ForwardOutcome::Forwarded(res) => {
601 0 : return res;
602 : }
603 0 : ForwardOutcome::NotForwarded(_req) => {}
604 0 : };
605 0 :
606 0 : json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
607 0 : }
608 :
609 0 : async fn handle_tenant_list(
610 0 : service: Arc<Service>,
611 0 : req: Request<Body>,
612 0 : ) -> Result<Response<Body>, ApiError> {
613 0 : check_permissions(&req, Scope::Admin)?;
614 :
615 0 : match maybe_forward(req).await {
616 0 : ForwardOutcome::Forwarded(res) => {
617 0 : return res;
618 : }
619 0 : ForwardOutcome::NotForwarded(_req) => {}
620 0 : };
621 0 :
622 0 : json_response(StatusCode::OK, service.tenant_list())
623 0 : }
624 :
625 0 : async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiError> {
626 0 : check_permissions(&req, Scope::Admin)?;
627 :
628 0 : let mut req = match maybe_forward(req).await {
629 0 : ForwardOutcome::Forwarded(res) => {
630 0 : return res;
631 : }
632 0 : ForwardOutcome::NotForwarded(req) => req,
633 : };
634 :
635 0 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
636 0 : let state = get_state(&req);
637 0 : state.service.node_register(register_req).await?;
638 0 : json_response(StatusCode::OK, ())
639 0 : }
640 :
641 0 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
642 0 : check_permissions(&req, Scope::Admin)?;
643 :
644 0 : let req = match maybe_forward(req).await {
645 0 : ForwardOutcome::Forwarded(res) => {
646 0 : return res;
647 : }
648 0 : ForwardOutcome::NotForwarded(req) => req,
649 0 : };
650 0 :
651 0 : let state = get_state(&req);
652 0 : let nodes = state.service.node_list().await?;
653 0 : let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
654 0 :
655 0 : json_response(StatusCode::OK, api_nodes)
656 0 : }
657 :
658 0 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
659 0 : check_permissions(&req, Scope::Admin)?;
660 :
661 0 : let req = match maybe_forward(req).await {
662 0 : ForwardOutcome::Forwarded(res) => {
663 0 : return res;
664 : }
665 0 : ForwardOutcome::NotForwarded(req) => req,
666 0 : };
667 0 :
668 0 : let state = get_state(&req);
669 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
670 0 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
671 0 : }
672 :
673 0 : async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
674 0 : check_permissions(&req, Scope::Admin)?;
675 :
676 0 : let req = match maybe_forward(req).await {
677 0 : ForwardOutcome::Forwarded(res) => {
678 0 : return res;
679 : }
680 0 : ForwardOutcome::NotForwarded(req) => req,
681 0 : };
682 0 :
683 0 : let state = get_state(&req);
684 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
685 0 : json_response(StatusCode::OK, state.service.node_delete(node_id).await?)
686 0 : }
687 :
688 0 : async fn handle_node_configure(req: Request<Body>) -> Result<Response<Body>, ApiError> {
689 0 : check_permissions(&req, Scope::Admin)?;
690 :
691 0 : let mut req = match maybe_forward(req).await {
692 0 : ForwardOutcome::Forwarded(res) => {
693 0 : return res;
694 : }
695 0 : ForwardOutcome::NotForwarded(req) => req,
696 : };
697 :
698 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
699 0 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
700 0 : if node_id != config_req.node_id {
701 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
702 0 : "Path and body node_id differ"
703 0 : )));
704 0 : }
705 0 : let state = get_state(&req);
706 0 :
707 0 : json_response(
708 0 : StatusCode::OK,
709 0 : state
710 0 : .service
711 0 : .external_node_configure(
712 0 : config_req.node_id,
713 0 : config_req.availability.map(NodeAvailability::from),
714 0 : config_req.scheduling,
715 0 : )
716 0 : .await?,
717 : )
718 0 : }
719 :
720 0 : async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
721 0 : check_permissions(&req, Scope::Admin)?;
722 :
723 0 : let req = match maybe_forward(req).await {
724 0 : ForwardOutcome::Forwarded(res) => {
725 0 : return res;
726 : }
727 0 : ForwardOutcome::NotForwarded(req) => req,
728 0 : };
729 0 :
730 0 : let state = get_state(&req);
731 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
732 :
733 0 : let node_status = state.service.get_node(node_id).await?;
734 :
735 0 : json_response(StatusCode::OK, node_status)
736 0 : }
737 :
738 0 : async fn handle_node_shards(req: Request<Body>) -> Result<Response<Body>, ApiError> {
739 0 : check_permissions(&req, Scope::Admin)?;
740 :
741 0 : let state = get_state(&req);
742 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
743 :
744 0 : let node_status = state.service.get_node_shards(node_id).await?;
745 :
746 0 : json_response(StatusCode::OK, node_status)
747 0 : }
748 :
749 0 : async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiError> {
750 0 : check_permissions(&req, Scope::Admin)?;
751 :
752 0 : let req = match maybe_forward(req).await {
753 0 : ForwardOutcome::Forwarded(res) => {
754 0 : return res;
755 : }
756 0 : ForwardOutcome::NotForwarded(req) => req,
757 0 : };
758 0 :
759 0 : let state = get_state(&req);
760 0 : let leader = state.service.get_leader().await.map_err(|err| {
761 0 : ApiError::InternalServerError(anyhow::anyhow!(
762 0 : "Failed to read leader from database: {err}"
763 0 : ))
764 0 : })?;
765 :
766 0 : json_response(StatusCode::OK, leader)
767 0 : }
768 :
769 0 : async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
770 0 : check_permissions(&req, Scope::Admin)?;
771 :
772 0 : let req = match maybe_forward(req).await {
773 0 : ForwardOutcome::Forwarded(res) => {
774 0 : return res;
775 : }
776 0 : ForwardOutcome::NotForwarded(req) => req,
777 0 : };
778 0 :
779 0 : let state = get_state(&req);
780 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
781 :
782 0 : state.service.start_node_drain(node_id).await?;
783 :
784 0 : json_response(StatusCode::ACCEPTED, ())
785 0 : }
786 :
787 0 : async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
788 0 : check_permissions(&req, Scope::Admin)?;
789 :
790 0 : let req = match maybe_forward(req).await {
791 0 : ForwardOutcome::Forwarded(res) => {
792 0 : return res;
793 : }
794 0 : ForwardOutcome::NotForwarded(req) => req,
795 0 : };
796 0 :
797 0 : let state = get_state(&req);
798 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
799 :
800 0 : state.service.cancel_node_drain(node_id).await?;
801 :
802 0 : json_response(StatusCode::ACCEPTED, ())
803 0 : }
804 :
805 0 : async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
806 0 : check_permissions(&req, Scope::Admin)?;
807 :
808 0 : let req = match maybe_forward(req).await {
809 0 : ForwardOutcome::Forwarded(res) => {
810 0 : return res;
811 : }
812 0 : ForwardOutcome::NotForwarded(req) => req,
813 0 : };
814 0 :
815 0 : let state = get_state(&req);
816 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
817 :
818 0 : state.service.start_node_fill(node_id).await?;
819 :
820 0 : json_response(StatusCode::ACCEPTED, ())
821 0 : }
822 :
823 0 : async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
824 0 : check_permissions(&req, Scope::Admin)?;
825 :
826 0 : let req = match maybe_forward(req).await {
827 0 : ForwardOutcome::Forwarded(res) => {
828 0 : return res;
829 : }
830 0 : ForwardOutcome::NotForwarded(req) => req,
831 0 : };
832 0 :
833 0 : let state = get_state(&req);
834 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
835 :
836 0 : state.service.cancel_node_fill(node_id).await?;
837 :
838 0 : json_response(StatusCode::ACCEPTED, ())
839 0 : }
840 :
841 0 : async fn handle_metadata_health_update(req: Request<Body>) -> Result<Response<Body>, ApiError> {
842 0 : check_permissions(&req, Scope::Scrubber)?;
843 :
844 0 : let mut req = match maybe_forward(req).await {
845 0 : ForwardOutcome::Forwarded(res) => {
846 0 : return res;
847 : }
848 0 : ForwardOutcome::NotForwarded(req) => req,
849 : };
850 :
851 0 : let update_req = json_request::<MetadataHealthUpdateRequest>(&mut req).await?;
852 0 : let state = get_state(&req);
853 0 :
854 0 : state.service.metadata_health_update(update_req).await?;
855 :
856 0 : json_response(StatusCode::OK, MetadataHealthUpdateResponse {})
857 0 : }
858 :
859 0 : async fn handle_metadata_health_list_unhealthy(
860 0 : req: Request<Body>,
861 0 : ) -> Result<Response<Body>, ApiError> {
862 0 : check_permissions(&req, Scope::Admin)?;
863 :
864 0 : let req = match maybe_forward(req).await {
865 0 : ForwardOutcome::Forwarded(res) => {
866 0 : return res;
867 : }
868 0 : ForwardOutcome::NotForwarded(req) => req,
869 0 : };
870 0 :
871 0 : let state = get_state(&req);
872 0 : let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
873 :
874 0 : json_response(
875 0 : StatusCode::OK,
876 0 : MetadataHealthListUnhealthyResponse {
877 0 : unhealthy_tenant_shards,
878 0 : },
879 0 : )
880 0 : }
881 :
882 0 : async fn handle_metadata_health_list_outdated(
883 0 : req: Request<Body>,
884 0 : ) -> Result<Response<Body>, ApiError> {
885 0 : check_permissions(&req, Scope::Admin)?;
886 :
887 0 : let mut req = match maybe_forward(req).await {
888 0 : ForwardOutcome::Forwarded(res) => {
889 0 : return res;
890 : }
891 0 : ForwardOutcome::NotForwarded(req) => req,
892 : };
893 :
894 0 : let list_outdated_req = json_request::<MetadataHealthListOutdatedRequest>(&mut req).await?;
895 0 : let state = get_state(&req);
896 0 : let health_records = state
897 0 : .service
898 0 : .metadata_health_list_outdated(list_outdated_req.not_scrubbed_for)
899 0 : .await?;
900 :
901 0 : json_response(
902 0 : StatusCode::OK,
903 0 : MetadataHealthListOutdatedResponse { health_records },
904 0 : )
905 0 : }
906 :
907 0 : async fn handle_tenant_shard_split(
908 0 : service: Arc<Service>,
909 0 : req: Request<Body>,
910 0 : ) -> Result<Response<Body>, ApiError> {
911 0 : check_permissions(&req, Scope::Admin)?;
912 :
913 0 : let mut req = match maybe_forward(req).await {
914 0 : ForwardOutcome::Forwarded(res) => {
915 0 : return res;
916 : }
917 0 : ForwardOutcome::NotForwarded(req) => req,
918 : };
919 :
920 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
921 0 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
922 :
923 : json_response(
924 : StatusCode::OK,
925 0 : service.tenant_shard_split(tenant_id, split_req).await?,
926 : )
927 0 : }
928 :
929 0 : async fn handle_tenant_shard_migrate(
930 0 : service: Arc<Service>,
931 0 : req: Request<Body>,
932 0 : ) -> Result<Response<Body>, ApiError> {
933 0 : check_permissions(&req, Scope::Admin)?;
934 :
935 0 : let mut req = match maybe_forward(req).await {
936 0 : ForwardOutcome::Forwarded(res) => {
937 0 : return res;
938 : }
939 0 : ForwardOutcome::NotForwarded(req) => req,
940 : };
941 :
942 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
943 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
944 : json_response(
945 : StatusCode::OK,
946 0 : service
947 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
948 0 : .await?,
949 : )
950 0 : }
951 :
952 0 : async fn handle_tenant_update_policy(req: Request<Body>) -> Result<Response<Body>, ApiError> {
953 0 : check_permissions(&req, Scope::Admin)?;
954 :
955 0 : let mut req = match maybe_forward(req).await {
956 0 : ForwardOutcome::Forwarded(res) => {
957 0 : return res;
958 : }
959 0 : ForwardOutcome::NotForwarded(req) => req,
960 : };
961 :
962 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
963 0 : let update_req = json_request::<TenantPolicyRequest>(&mut req).await?;
964 0 : let state = get_state(&req);
965 0 :
966 0 : json_response(
967 0 : StatusCode::OK,
968 0 : state
969 0 : .service
970 0 : .tenant_update_policy(tenant_id, update_req)
971 0 : .await?,
972 : )
973 0 : }
974 :
975 0 : async fn handle_update_preferred_azs(req: Request<Body>) -> Result<Response<Body>, ApiError> {
976 0 : check_permissions(&req, Scope::Admin)?;
977 :
978 0 : let mut req = match maybe_forward(req).await {
979 0 : ForwardOutcome::Forwarded(res) => {
980 0 : return res;
981 : }
982 0 : ForwardOutcome::NotForwarded(req) => req,
983 : };
984 :
985 0 : let azs_req = json_request::<ShardsPreferredAzsRequest>(&mut req).await?;
986 0 : let state = get_state(&req);
987 0 :
988 0 : json_response(
989 0 : StatusCode::OK,
990 0 : state.service.update_shards_preferred_azs(azs_req).await?,
991 : )
992 0 : }
993 :
994 0 : async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
995 0 : check_permissions(&req, Scope::Admin)?;
996 :
997 0 : let req = match maybe_forward(req).await {
998 0 : ForwardOutcome::Forwarded(res) => {
999 0 : return res;
1000 : }
1001 0 : ForwardOutcome::NotForwarded(req) => req,
1002 0 : };
1003 0 :
1004 0 : let state = get_state(&req);
1005 0 : json_response(StatusCode::OK, state.service.step_down().await)
1006 0 : }
1007 :
1008 0 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1009 0 : check_permissions(&req, Scope::PageServerApi)?;
1010 :
1011 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1012 :
1013 0 : let req = match maybe_forward(req).await {
1014 0 : ForwardOutcome::Forwarded(res) => {
1015 0 : return res;
1016 : }
1017 0 : ForwardOutcome::NotForwarded(req) => req,
1018 0 : };
1019 0 :
1020 0 : let state = get_state(&req);
1021 0 :
1022 0 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
1023 0 : }
1024 :
1025 0 : async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1026 0 : check_permissions(&req, Scope::PageServerApi)?;
1027 :
1028 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
1029 :
1030 0 : let req = match maybe_forward(req).await {
1031 0 : ForwardOutcome::Forwarded(res) => {
1032 0 : return res;
1033 : }
1034 0 : ForwardOutcome::NotForwarded(req) => req,
1035 0 : };
1036 0 :
1037 0 : let state = get_state(&req);
1038 0 :
1039 0 : json_response(
1040 0 : StatusCode::OK,
1041 0 : state.service.tenant_import(tenant_id).await?,
1042 : )
1043 0 : }
1044 :
1045 0 : async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1046 0 : check_permissions(&req, Scope::Admin)?;
1047 :
1048 0 : let req = match maybe_forward(req).await {
1049 0 : ForwardOutcome::Forwarded(res) => {
1050 0 : return res;
1051 : }
1052 0 : ForwardOutcome::NotForwarded(req) => req,
1053 0 : };
1054 0 :
1055 0 : let state = get_state(&req);
1056 0 : state.service.tenants_dump()
1057 0 : }
1058 :
1059 0 : async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1060 0 : check_permissions(&req, Scope::Admin)?;
1061 :
1062 0 : let req = match maybe_forward(req).await {
1063 0 : ForwardOutcome::Forwarded(res) => {
1064 0 : return res;
1065 : }
1066 0 : ForwardOutcome::NotForwarded(req) => req,
1067 0 : };
1068 0 :
1069 0 : let state = get_state(&req);
1070 0 : state.service.scheduler_dump()
1071 0 : }
1072 :
1073 0 : async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1074 0 : check_permissions(&req, Scope::Admin)?;
1075 :
1076 0 : let req = match maybe_forward(req).await {
1077 0 : ForwardOutcome::Forwarded(res) => {
1078 0 : return res;
1079 : }
1080 0 : ForwardOutcome::NotForwarded(req) => req,
1081 0 : };
1082 0 :
1083 0 : let state = get_state(&req);
1084 0 :
1085 0 : json_response(StatusCode::OK, state.service.consistency_check().await?)
1086 0 : }
1087 :
1088 0 : async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1089 0 : check_permissions(&req, Scope::Admin)?;
1090 :
1091 0 : let req = match maybe_forward(req).await {
1092 0 : ForwardOutcome::Forwarded(res) => {
1093 0 : return res;
1094 : }
1095 0 : ForwardOutcome::NotForwarded(req) => req,
1096 0 : };
1097 0 :
1098 0 : let state = get_state(&req);
1099 0 :
1100 0 : json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
1101 0 : }
1102 :
1103 : /// Status endpoint is just used for checking that our HTTP listener is up
1104 0 : async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1105 0 : match maybe_forward(req).await {
1106 0 : ForwardOutcome::Forwarded(res) => {
1107 0 : return res;
1108 : }
1109 0 : ForwardOutcome::NotForwarded(_req) => {}
1110 0 : };
1111 0 :
1112 0 : json_response(StatusCode::OK, ())
1113 0 : }
1114 :
1115 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
1116 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
1117 0 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1118 0 : let req = match maybe_forward(req).await {
1119 0 : ForwardOutcome::Forwarded(res) => {
1120 0 : return res;
1121 : }
1122 0 : ForwardOutcome::NotForwarded(req) => req,
1123 0 : };
1124 0 :
1125 0 : let state = get_state(&req);
1126 0 : if state.service.startup_complete.is_ready() {
1127 0 : json_response(StatusCode::OK, ())
1128 : } else {
1129 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
1130 : }
1131 0 : }
1132 :
1133 : impl From<ReconcileError> for ApiError {
1134 0 : fn from(value: ReconcileError) -> Self {
1135 0 : ApiError::Conflict(format!("Reconciliation error: {}", value))
1136 0 : }
1137 : }
1138 :
1139 : /// Return the safekeeper record by instance id, or 404.
1140 : ///
1141 : /// Not used by anything except manual testing.
1142 0 : async fn handle_get_safekeeper(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1143 0 : check_permissions(&req, Scope::Admin)?;
1144 :
1145 0 : let id = parse_request_param::<i64>(&req, "id")?;
1146 :
1147 0 : let req = match maybe_forward(req).await {
1148 0 : ForwardOutcome::Forwarded(res) => {
1149 0 : return res;
1150 : }
1151 0 : ForwardOutcome::NotForwarded(req) => req,
1152 0 : };
1153 0 :
1154 0 : let state = get_state(&req);
1155 :
1156 0 : let res = state.service.get_safekeeper(id).await;
1157 :
1158 0 : match res {
1159 0 : Ok(b) => json_response(StatusCode::OK, b),
1160 : Err(crate::persistence::DatabaseError::Query(diesel::result::Error::NotFound)) => {
1161 0 : Err(ApiError::NotFound("unknown instance_id".into()))
1162 : }
1163 0 : Err(other) => Err(other.into()),
1164 : }
1165 0 : }
1166 :
1167 : /// Used as part of deployment scripts.
1168 : ///
1169 : /// Assumes information is only relayed to storage controller after first selecting an unique id on
1170 : /// control plane database, which means we have an id field in the request and payload.
1171 0 : async fn handle_upsert_safekeeper(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
1172 0 : check_permissions(&req, Scope::Admin)?;
1173 :
1174 0 : let body = json_request::<SafekeeperPersistence>(&mut req).await?;
1175 0 : let id = parse_request_param::<i64>(&req, "id")?;
1176 :
1177 0 : if id != body.id {
1178 : // it should be repeated
1179 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1180 0 : "id mismatch: url={id:?}, body={:?}",
1181 0 : body.id
1182 0 : )));
1183 0 : }
1184 :
1185 0 : let req = match maybe_forward(req).await {
1186 0 : ForwardOutcome::Forwarded(res) => {
1187 0 : return res;
1188 : }
1189 0 : ForwardOutcome::NotForwarded(req) => req,
1190 0 : };
1191 0 :
1192 0 : let state = get_state(&req);
1193 0 :
1194 0 : state.service.upsert_safekeeper(body).await?;
1195 :
1196 0 : Ok(Response::builder()
1197 0 : .status(StatusCode::NO_CONTENT)
1198 0 : .body(Body::empty())
1199 0 : .unwrap())
1200 0 : }
1201 :
1202 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
1203 : /// be allowed to run if Service has finished its initial reconciliation.
1204 0 : async fn tenant_service_handler<R, H>(
1205 0 : request: Request<Body>,
1206 0 : handler: H,
1207 0 : request_name: RequestName,
1208 0 : ) -> R::Output
1209 0 : where
1210 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1211 0 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
1212 0 : {
1213 0 : let state = get_state(&request);
1214 0 : let service = state.service.clone();
1215 0 :
1216 0 : let startup_complete = service.startup_complete.clone();
1217 0 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
1218 0 : .await
1219 0 : .is_err()
1220 : {
1221 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
1222 : // timeouts around its remote calls, to bound its runtime.
1223 0 : return Err(ApiError::Timeout(
1224 0 : "Timed out waiting for service readiness".into(),
1225 0 : ));
1226 0 : }
1227 0 :
1228 0 : named_request_span(
1229 0 : request,
1230 0 : |request| async move { handler(service, request).await },
1231 0 : request_name,
1232 0 : )
1233 0 : .await
1234 0 : }
1235 :
1236 : /// Check if the required scope is held in the request's token, or if the request has
1237 : /// a token with 'admin' scope then always permit it.
1238 0 : fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(), ApiError> {
1239 0 : check_permission_with(request, |claims| {
1240 0 : match crate::auth::check_permission(claims, required_scope) {
1241 0 : Err(e) => match crate::auth::check_permission(claims, Scope::Admin) {
1242 0 : Ok(()) => Ok(()),
1243 0 : Err(_) => Err(e),
1244 : },
1245 0 : Ok(()) => Ok(()),
1246 : }
1247 0 : })
1248 0 : }
1249 :
1250 : #[derive(Clone, Debug)]
1251 : struct RequestMeta {
1252 : method: hyper::http::Method,
1253 : at: Instant,
1254 : }
1255 :
1256 0 : pub fn prologue_leadership_status_check_middleware<
1257 0 : B: hyper::body::HttpBody + Send + Sync + 'static,
1258 0 : >() -> Middleware<B, ApiError> {
1259 0 : Middleware::pre(move |req| async move {
1260 0 : let state = get_state(&req);
1261 0 : let leadership_status = state.service.get_leadership_status();
1262 :
1263 : enum AllowedRoutes<'a> {
1264 : All,
1265 : Some(Vec<&'a str>),
1266 : }
1267 :
1268 0 : let allowed_routes = match leadership_status {
1269 0 : LeadershipStatus::Leader => AllowedRoutes::All,
1270 0 : LeadershipStatus::SteppedDown => AllowedRoutes::All,
1271 : LeadershipStatus::Candidate => {
1272 0 : AllowedRoutes::Some(["/ready", "/status", "/metrics"].to_vec())
1273 : }
1274 : };
1275 :
1276 0 : let uri = req.uri().to_string();
1277 0 : match allowed_routes {
1278 0 : AllowedRoutes::All => Ok(req),
1279 0 : AllowedRoutes::Some(allowed) if allowed.contains(&uri.as_str()) => Ok(req),
1280 : _ => {
1281 0 : tracing::info!(
1282 0 : "Request {} not allowed due to current leadership state",
1283 0 : req.uri()
1284 : );
1285 :
1286 0 : Err(ApiError::ResourceUnavailable(
1287 0 : format!("Current leadership status is {leadership_status}").into(),
1288 0 : ))
1289 : }
1290 : }
1291 0 : })
1292 0 : }
1293 :
1294 0 : fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
1295 0 : ) -> Middleware<B, ApiError> {
1296 0 : Middleware::pre(move |req| async move {
1297 0 : let meta = RequestMeta {
1298 0 : method: req.method().clone(),
1299 0 : at: Instant::now(),
1300 0 : };
1301 0 :
1302 0 : req.set_context(meta);
1303 0 :
1304 0 : Ok(req)
1305 0 : })
1306 0 : }
1307 :
1308 0 : fn epilogue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
1309 0 : ) -> Middleware<B, ApiError> {
1310 0 : Middleware::post_with_info(move |resp, req_info| async move {
1311 0 : let request_name = match req_info.context::<RequestName>() {
1312 0 : Some(name) => name,
1313 : None => {
1314 0 : return Ok(resp);
1315 : }
1316 : };
1317 :
1318 0 : if let Some(meta) = req_info.context::<RequestMeta>() {
1319 0 : let status = &crate::metrics::METRICS_REGISTRY
1320 0 : .metrics_group
1321 0 : .storage_controller_http_request_status;
1322 0 : let latency = &crate::metrics::METRICS_REGISTRY
1323 0 : .metrics_group
1324 0 : .storage_controller_http_request_latency;
1325 0 :
1326 0 : status.inc(HttpRequestStatusLabelGroup {
1327 0 : path: request_name.0,
1328 0 : method: meta.method.clone().into(),
1329 0 : status: crate::metrics::StatusCode(resp.status()),
1330 0 : });
1331 0 :
1332 0 : latency.observe(
1333 0 : HttpRequestLatencyLabelGroup {
1334 0 : path: request_name.0,
1335 0 : method: meta.method.into(),
1336 0 : },
1337 0 : meta.at.elapsed().as_secs_f64(),
1338 0 : );
1339 0 : }
1340 0 : Ok(resp)
1341 0 : })
1342 0 : }
1343 :
1344 0 : pub async fn measured_metrics_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
1345 : pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
1346 :
1347 0 : let req = match maybe_forward(req).await {
1348 0 : ForwardOutcome::Forwarded(res) => {
1349 0 : return res;
1350 : }
1351 0 : ForwardOutcome::NotForwarded(req) => req,
1352 0 : };
1353 0 :
1354 0 : let state = get_state(&req);
1355 0 : let payload = crate::metrics::METRICS_REGISTRY.encode(&state.neon_metrics);
1356 0 : let response = Response::builder()
1357 0 : .status(200)
1358 0 : .header(CONTENT_TYPE, TEXT_FORMAT)
1359 0 : .body(payload.into())
1360 0 : .unwrap();
1361 0 :
1362 0 : Ok(response)
1363 0 : }
1364 :
1365 : #[derive(Clone)]
1366 : struct RequestName(&'static str);
1367 :
1368 0 : async fn named_request_span<R, H>(
1369 0 : request: Request<Body>,
1370 0 : handler: H,
1371 0 : name: RequestName,
1372 0 : ) -> R::Output
1373 0 : where
1374 0 : R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1375 0 : H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
1376 0 : {
1377 0 : request.set_context(name);
1378 0 : request_span(request, handler).await
1379 0 : }
1380 :
1381 : enum ForwardOutcome {
1382 : Forwarded(Result<Response<Body>, ApiError>),
1383 : NotForwarded(Request<Body>),
1384 : }
1385 :
1386 : /// Potentially forward the request to the current storage controler leader.
1387 : /// More specifically we forward when:
1388 : /// 1. Request is not one of ["/control/v1/step_down", "/status", "/ready", "/metrics"]
1389 : /// 2. Current instance is in [`LeadershipStatus::SteppedDown`] state
1390 : /// 3. There is a leader in the database to forward to
1391 : /// 4. Leader from step (3) is not the current instance
1392 : ///
1393 : /// Why forward?
1394 : /// It turns out that we can't rely on external orchestration to promptly route trafic to the
1395 : /// new leader. This is downtime inducing. Forwarding provides a safe way out.
1396 : ///
1397 : /// Why is it safe?
1398 : /// If a storcon instance is persisted in the database, then we know that it is the current leader.
1399 : /// There's one exception: time between handling step-down request and the new leader updating the
1400 : /// database.
1401 : ///
1402 : /// Let's treat the happy case first. The stepped down node does not produce any side effects,
1403 : /// since all request handling happens on the leader.
1404 : ///
1405 : /// As for the edge case, we are guaranteed to always have a maximum of two running instances.
1406 : /// Hence, if we are in the edge case scenario the leader persisted in the database is the
1407 : /// stepped down instance that received the request. Condition (4) above covers this scenario.
1408 0 : async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
1409 : const NOT_FOR_FORWARD: [&str; 4] = ["/control/v1/step_down", "/status", "/ready", "/metrics"];
1410 :
1411 0 : let uri = req.uri().to_string();
1412 0 : let uri_for_forward = !NOT_FOR_FORWARD.contains(&uri.as_str());
1413 0 :
1414 0 : let state = get_state(&req);
1415 0 : let leadership_status = state.service.get_leadership_status();
1416 0 :
1417 0 : if leadership_status != LeadershipStatus::SteppedDown || !uri_for_forward {
1418 0 : return ForwardOutcome::NotForwarded(req);
1419 0 : }
1420 :
1421 0 : let leader = state.service.get_leader().await;
1422 0 : let leader = {
1423 0 : match leader {
1424 0 : Ok(Some(leader)) => leader,
1425 : Ok(None) => {
1426 0 : return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
1427 0 : "No leader to forward to while in stepped down state".into(),
1428 0 : )));
1429 : }
1430 0 : Err(err) => {
1431 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
1432 0 : anyhow::anyhow!(
1433 0 : "Failed to get leader for forwarding while in stepped down state: {err}"
1434 0 : ),
1435 0 : )));
1436 : }
1437 : }
1438 : };
1439 :
1440 0 : let cfg = state.service.get_config();
1441 0 : if let Some(ref self_addr) = cfg.address_for_peers {
1442 0 : let leader_addr = match Uri::from_str(leader.address.as_str()) {
1443 0 : Ok(uri) => uri,
1444 0 : Err(err) => {
1445 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
1446 0 : anyhow::anyhow!(
1447 0 : "Failed to parse leader uri for forwarding while in stepped down state: {err}"
1448 0 : ),
1449 0 : )));
1450 : }
1451 : };
1452 :
1453 0 : if *self_addr == leader_addr {
1454 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1455 0 : "Leader is stepped down instance"
1456 0 : ))));
1457 0 : }
1458 0 : }
1459 :
1460 0 : tracing::info!("Forwarding {} to leader at {}", uri, leader.address);
1461 :
1462 : // Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
1463 : // include some leeway to get the timeout for proxied requests.
1464 : const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
1465 0 : let client = reqwest::ClientBuilder::new()
1466 0 : .timeout(PROXIED_REQUEST_TIMEOUT)
1467 0 : .build();
1468 0 : let client = match client {
1469 0 : Ok(client) => client,
1470 0 : Err(err) => {
1471 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1472 0 : "Failed to build leader client for forwarding while in stepped down state: {err}"
1473 0 : ))));
1474 : }
1475 : };
1476 :
1477 0 : let request: reqwest::Request = match convert_request(req, &client, leader.address).await {
1478 0 : Ok(r) => r,
1479 0 : Err(err) => {
1480 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1481 0 : "Failed to convert request for forwarding while in stepped down state: {err}"
1482 0 : ))));
1483 : }
1484 : };
1485 :
1486 0 : let response = match client.execute(request).await {
1487 0 : Ok(r) => r,
1488 0 : Err(err) => {
1489 0 : return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
1490 0 : "Failed to forward while in stepped down state: {err}"
1491 0 : ))));
1492 : }
1493 : };
1494 :
1495 0 : ForwardOutcome::Forwarded(convert_response(response).await)
1496 0 : }
1497 :
1498 : /// Convert a [`reqwest::Response`] to a [hyper::Response`] by passing through
1499 : /// a stable representation (string, bytes or integer)
1500 : ///
1501 : /// Ideally, we would not have to do this since both types use the http crate
1502 : /// under the hood. However, they use different versions of the crate and keeping
1503 : /// second order dependencies in sync is difficult.
1504 0 : async fn convert_response(resp: reqwest::Response) -> Result<hyper::Response<Body>, ApiError> {
1505 : use std::str::FromStr;
1506 :
1507 0 : let mut builder = hyper::Response::builder().status(resp.status().as_u16());
1508 0 : for (key, value) in resp.headers().into_iter() {
1509 0 : let key = hyper::header::HeaderName::from_str(key.as_str()).map_err(|err| {
1510 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1511 0 : })?;
1512 :
1513 0 : let value = hyper::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
1514 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1515 0 : })?;
1516 :
1517 0 : builder = builder.header(key, value);
1518 : }
1519 :
1520 0 : let body = http::Body::wrap_stream(resp.bytes_stream());
1521 0 :
1522 0 : builder.body(body).map_err(|err| {
1523 0 : ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
1524 0 : })
1525 0 : }
1526 :
1527 : /// Convert a [`reqwest::Request`] to a [hyper::Request`] by passing through
1528 : /// a stable representation (string, bytes or integer)
1529 : ///
1530 : /// See [`convert_response`] for why we are doing it this way.
1531 0 : async fn convert_request(
1532 0 : req: hyper::Request<Body>,
1533 0 : client: &reqwest::Client,
1534 0 : to_address: String,
1535 0 : ) -> Result<reqwest::Request, ApiError> {
1536 : use std::str::FromStr;
1537 :
1538 0 : let (parts, body) = req.into_parts();
1539 0 : let method = reqwest::Method::from_str(parts.method.as_str()).map_err(|err| {
1540 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1541 0 : })?;
1542 :
1543 0 : let path_and_query = parts.uri.path_and_query().ok_or_else(|| {
1544 0 : ApiError::InternalServerError(anyhow::anyhow!(
1545 0 : "Request conversion failed: no path and query"
1546 0 : ))
1547 0 : })?;
1548 :
1549 0 : let uri = reqwest::Url::from_str(
1550 0 : format!(
1551 0 : "{}{}",
1552 0 : to_address.trim_end_matches("/"),
1553 0 : path_and_query.as_str()
1554 0 : )
1555 0 : .as_str(),
1556 0 : )
1557 0 : .map_err(|err| {
1558 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1559 0 : })?;
1560 :
1561 0 : let mut headers = reqwest::header::HeaderMap::new();
1562 0 : for (key, value) in parts.headers.into_iter() {
1563 0 : let key = match key {
1564 0 : Some(k) => k,
1565 : None => {
1566 0 : continue;
1567 : }
1568 : };
1569 :
1570 0 : let key = reqwest::header::HeaderName::from_str(key.as_str()).map_err(|err| {
1571 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1572 0 : })?;
1573 :
1574 0 : let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
1575 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1576 0 : })?;
1577 :
1578 0 : headers.insert(key, value);
1579 : }
1580 :
1581 0 : let body = hyper::body::to_bytes(body).await.map_err(|err| {
1582 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1583 0 : })?;
1584 :
1585 0 : client
1586 0 : .request(method, uri)
1587 0 : .headers(headers)
1588 0 : .body(body)
1589 0 : .build()
1590 0 : .map_err(|err| {
1591 0 : ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
1592 0 : })
1593 0 : }
1594 :
1595 0 : pub fn make_router(
1596 0 : service: Arc<Service>,
1597 0 : auth: Option<Arc<SwappableJwtAuth>>,
1598 0 : build_info: BuildInfo,
1599 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
1600 0 : let mut router = endpoint::make_router()
1601 0 : .middleware(prologue_leadership_status_check_middleware())
1602 0 : .middleware(prologue_metrics_middleware())
1603 0 : .middleware(epilogue_metrics_middleware());
1604 0 : if auth.is_some() {
1605 0 : router = router.middleware(auth_middleware(|request| {
1606 0 : let state = get_state(request);
1607 0 : if state.allowlist_routes.contains(request.uri()) {
1608 0 : None
1609 : } else {
1610 0 : state.auth.as_deref()
1611 : }
1612 0 : }));
1613 0 : }
1614 :
1615 0 : router
1616 0 : .data(Arc::new(HttpState::new(service, auth, build_info)))
1617 0 : .get("/metrics", |r| {
1618 0 : named_request_span(r, measured_metrics_handler, RequestName("metrics"))
1619 0 : })
1620 0 : // Non-prefixed generic endpoints (status, metrics)
1621 0 : .get("/status", |r| {
1622 0 : named_request_span(r, handle_status, RequestName("status"))
1623 0 : })
1624 0 : .get("/ready", |r| {
1625 0 : named_request_span(r, handle_ready, RequestName("ready"))
1626 0 : })
1627 0 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
1628 0 : .post("/upcall/v1/re-attach", |r| {
1629 0 : named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
1630 0 : })
1631 0 : .post("/upcall/v1/validate", |r| {
1632 0 : named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
1633 0 : })
1634 0 : // Test/dev/debug endpoints
1635 0 : .post("/debug/v1/attach-hook", |r| {
1636 0 : named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
1637 0 : })
1638 0 : .post("/debug/v1/inspect", |r| {
1639 0 : named_request_span(r, handle_inspect, RequestName("debug_v1_inspect"))
1640 0 : })
1641 0 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
1642 0 : named_request_span(r, handle_tenant_drop, RequestName("debug_v1_tenant_drop"))
1643 0 : })
1644 0 : .post("/debug/v1/node/:node_id/drop", |r| {
1645 0 : named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
1646 0 : })
1647 0 : .post("/debug/v1/tenant/:tenant_id/import", |r| {
1648 0 : named_request_span(
1649 0 : r,
1650 0 : handle_tenant_import,
1651 0 : RequestName("debug_v1_tenant_import"),
1652 0 : )
1653 0 : })
1654 0 : .get("/debug/v1/tenant", |r| {
1655 0 : named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
1656 0 : })
1657 0 : .get("/debug/v1/tenant/:tenant_id/locate", |r| {
1658 0 : tenant_service_handler(
1659 0 : r,
1660 0 : handle_tenant_locate,
1661 0 : RequestName("debug_v1_tenant_locate"),
1662 0 : )
1663 0 : })
1664 0 : .get("/debug/v1/scheduler", |r| {
1665 0 : named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
1666 0 : })
1667 0 : .post("/debug/v1/consistency_check", |r| {
1668 0 : named_request_span(
1669 0 : r,
1670 0 : handle_consistency_check,
1671 0 : RequestName("debug_v1_consistency_check"),
1672 0 : )
1673 0 : })
1674 0 : .post("/debug/v1/reconcile_all", |r| {
1675 0 : request_span(r, handle_reconcile_all)
1676 0 : })
1677 0 : .put("/debug/v1/failpoints", |r| {
1678 0 : request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
1679 0 : })
1680 0 : // Node operations
1681 0 : .post("/control/v1/node", |r| {
1682 0 : named_request_span(r, handle_node_register, RequestName("control_v1_node"))
1683 0 : })
1684 0 : .delete("/control/v1/node/:node_id", |r| {
1685 0 : named_request_span(r, handle_node_delete, RequestName("control_v1_node_delete"))
1686 0 : })
1687 0 : .get("/control/v1/node", |r| {
1688 0 : named_request_span(r, handle_node_list, RequestName("control_v1_node"))
1689 0 : })
1690 0 : .put("/control/v1/node/:node_id/config", |r| {
1691 0 : named_request_span(
1692 0 : r,
1693 0 : handle_node_configure,
1694 0 : RequestName("control_v1_node_config"),
1695 0 : )
1696 0 : })
1697 0 : .get("/control/v1/node/:node_id", |r| {
1698 0 : named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
1699 0 : })
1700 0 : .get("/control/v1/node/:node_id/shards", |r| {
1701 0 : named_request_span(
1702 0 : r,
1703 0 : handle_node_shards,
1704 0 : RequestName("control_v1_node_describe"),
1705 0 : )
1706 0 : })
1707 0 : .get("/control/v1/leader", |r| {
1708 0 : named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
1709 0 : })
1710 0 : .put("/control/v1/node/:node_id/drain", |r| {
1711 0 : named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
1712 0 : })
1713 0 : .delete("/control/v1/node/:node_id/drain", |r| {
1714 0 : named_request_span(
1715 0 : r,
1716 0 : handle_cancel_node_drain,
1717 0 : RequestName("control_v1_cancel_node_drain"),
1718 0 : )
1719 0 : })
1720 0 : .put("/control/v1/node/:node_id/fill", |r| {
1721 0 : named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
1722 0 : })
1723 0 : .delete("/control/v1/node/:node_id/fill", |r| {
1724 0 : named_request_span(
1725 0 : r,
1726 0 : handle_cancel_node_fill,
1727 0 : RequestName("control_v1_cancel_node_fill"),
1728 0 : )
1729 0 : })
1730 0 : // Metadata health operations
1731 0 : .post("/control/v1/metadata_health/update", |r| {
1732 0 : named_request_span(
1733 0 : r,
1734 0 : handle_metadata_health_update,
1735 0 : RequestName("control_v1_metadata_health_update"),
1736 0 : )
1737 0 : })
1738 0 : .get("/control/v1/metadata_health/unhealthy", |r| {
1739 0 : named_request_span(
1740 0 : r,
1741 0 : handle_metadata_health_list_unhealthy,
1742 0 : RequestName("control_v1_metadata_health_list_unhealthy"),
1743 0 : )
1744 0 : })
1745 0 : .post("/control/v1/metadata_health/outdated", |r| {
1746 0 : named_request_span(
1747 0 : r,
1748 0 : handle_metadata_health_list_outdated,
1749 0 : RequestName("control_v1_metadata_health_list_outdated"),
1750 0 : )
1751 0 : })
1752 0 : // Tenant Shard operations
1753 0 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
1754 0 : tenant_service_handler(
1755 0 : r,
1756 0 : handle_tenant_shard_migrate,
1757 0 : RequestName("control_v1_tenant_migrate"),
1758 0 : )
1759 0 : })
1760 0 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
1761 0 : tenant_service_handler(
1762 0 : r,
1763 0 : handle_tenant_shard_split,
1764 0 : RequestName("control_v1_tenant_shard_split"),
1765 0 : )
1766 0 : })
1767 0 : .get("/control/v1/tenant/:tenant_id", |r| {
1768 0 : tenant_service_handler(
1769 0 : r,
1770 0 : handle_tenant_describe,
1771 0 : RequestName("control_v1_tenant_describe"),
1772 0 : )
1773 0 : })
1774 0 : .get("/control/v1/tenant", |r| {
1775 0 : tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list"))
1776 0 : })
1777 0 : .put("/control/v1/tenant/:tenant_id/policy", |r| {
1778 0 : named_request_span(
1779 0 : r,
1780 0 : handle_tenant_update_policy,
1781 0 : RequestName("control_v1_tenant_policy"),
1782 0 : )
1783 0 : })
1784 0 : .put("/control/v1/preferred_azs", |r| {
1785 0 : named_request_span(
1786 0 : r,
1787 0 : handle_update_preferred_azs,
1788 0 : RequestName("control_v1_preferred_azs"),
1789 0 : )
1790 0 : })
1791 0 : .put("/control/v1/step_down", |r| {
1792 0 : named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
1793 0 : })
1794 0 : .get("/control/v1/safekeeper/:id", |r| {
1795 0 : named_request_span(r, handle_get_safekeeper, RequestName("v1_safekeeper"))
1796 0 : })
1797 0 : .post("/control/v1/safekeeper/:id", |r| {
1798 0 : // id is in the body
1799 0 : named_request_span(r, handle_upsert_safekeeper, RequestName("v1_safekeeper"))
1800 0 : })
1801 0 : // Tenant operations
1802 0 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
1803 0 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
1804 0 : .post("/v1/tenant", |r| {
1805 0 : tenant_service_handler(r, handle_tenant_create, RequestName("v1_tenant"))
1806 0 : })
1807 0 : .delete("/v1/tenant/:tenant_id", |r| {
1808 0 : tenant_service_handler(r, handle_tenant_delete, RequestName("v1_tenant"))
1809 0 : })
1810 0 : .put("/v1/tenant/config", |r| {
1811 0 : tenant_service_handler(r, handle_tenant_config_set, RequestName("v1_tenant_config"))
1812 0 : })
1813 0 : .get("/v1/tenant/:tenant_id/config", |r| {
1814 0 : tenant_service_handler(r, handle_tenant_config_get, RequestName("v1_tenant_config"))
1815 0 : })
1816 0 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
1817 0 : tenant_service_handler(
1818 0 : r,
1819 0 : handle_tenant_location_config,
1820 0 : RequestName("v1_tenant_location_config"),
1821 0 : )
1822 0 : })
1823 0 : .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
1824 0 : tenant_service_handler(
1825 0 : r,
1826 0 : handle_tenant_time_travel_remote_storage,
1827 0 : RequestName("v1_tenant_time_travel_remote_storage"),
1828 0 : )
1829 0 : })
1830 0 : .post("/v1/tenant/:tenant_id/secondary/download", |r| {
1831 0 : tenant_service_handler(
1832 0 : r,
1833 0 : handle_tenant_secondary_download,
1834 0 : RequestName("v1_tenant_secondary_download"),
1835 0 : )
1836 0 : })
1837 0 : // Timeline operations
1838 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
1839 0 : tenant_service_handler(
1840 0 : r,
1841 0 : handle_tenant_timeline_delete,
1842 0 : RequestName("v1_tenant_timeline"),
1843 0 : )
1844 0 : })
1845 0 : .post("/v1/tenant/:tenant_id/timeline", |r| {
1846 0 : tenant_service_handler(
1847 0 : r,
1848 0 : handle_tenant_timeline_create,
1849 0 : RequestName("v1_tenant_timeline"),
1850 0 : )
1851 0 : })
1852 0 : .put(
1853 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/archival_config",
1854 0 : |r| {
1855 0 : tenant_service_handler(
1856 0 : r,
1857 0 : handle_tenant_timeline_archival_config,
1858 0 : RequestName("v1_tenant_timeline_archival_config"),
1859 0 : )
1860 0 : },
1861 0 : )
1862 0 : .put(
1863 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/detach_ancestor",
1864 0 : |r| {
1865 0 : tenant_service_handler(
1866 0 : r,
1867 0 : handle_tenant_timeline_detach_ancestor,
1868 0 : RequestName("v1_tenant_timeline_detach_ancestor"),
1869 0 : )
1870 0 : },
1871 0 : )
1872 0 : .post(
1873 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/block_gc",
1874 0 : |r| {
1875 0 : tenant_service_handler(
1876 0 : r,
1877 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Block),
1878 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
1879 0 : )
1880 0 : },
1881 0 : )
1882 0 : .post(
1883 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/unblock_gc",
1884 0 : |r| {
1885 0 : tenant_service_handler(
1886 0 : r,
1887 0 : |s, r| handle_tenant_timeline_block_unblock_gc(s, r, BlockUnblock::Unblock),
1888 0 : RequestName("v1_tenant_timeline_block_unblock_gc"),
1889 0 : )
1890 0 : },
1891 0 : )
1892 0 : // Tenant detail GET passthrough to shard zero:
1893 0 : .get("/v1/tenant/:tenant_id", |r| {
1894 0 : tenant_service_handler(
1895 0 : r,
1896 0 : handle_tenant_timeline_passthrough,
1897 0 : RequestName("v1_tenant_passthrough"),
1898 0 : )
1899 0 : })
1900 0 : // The `*` in the URL is a wildcard: any tenant/timeline GET APIs on the pageserver
1901 0 : // are implicitly exposed here. This must be last in the list to avoid
1902 0 : // taking precedence over other GET methods we might implement by hand.
1903 0 : .get("/v1/tenant/:tenant_id/*", |r| {
1904 0 : tenant_service_handler(
1905 0 : r,
1906 0 : handle_tenant_timeline_passthrough,
1907 0 : RequestName("v1_tenant_passthrough"),
1908 0 : )
1909 0 : })
1910 0 : }
|