Line data Source code
1 : use crate::reconciler::ReconcileError;
2 : use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
3 : use hyper::{Body, Request, Response};
4 : use hyper::{StatusCode, Uri};
5 : use pageserver_api::models::{
6 : TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
7 : TenantTimeTravelRequest, TimelineCreateRequest,
8 : };
9 : use pageserver_api::shard::TenantShardId;
10 : use pageserver_client::mgmt_api;
11 : use std::sync::Arc;
12 : use std::time::{Duration, Instant};
13 : use utils::auth::SwappableJwtAuth;
14 : use utils::http::endpoint::{auth_middleware, request_span};
15 : use utils::http::request::{must_get_query_param, parse_request_param};
16 : use utils::id::{TenantId, TimelineId};
17 :
18 : use utils::{
19 : http::{
20 : endpoint::{self},
21 : error::ApiError,
22 : json::{json_request, json_response},
23 : RequestExt, RouterBuilder,
24 : },
25 : id::NodeId,
26 : };
27 :
28 : use pageserver_api::control_api::{ReAttachRequest, ValidateRequest};
29 :
30 : use control_plane::attachment_service::{
31 : AttachHookRequest, InspectRequest, NodeConfigureRequest, NodeRegisterRequest,
32 : TenantShardMigrateRequest,
33 : };
34 :
35 : /// State available to HTTP request handlers
36 0 : #[derive(Clone)]
37 : pub struct HttpState {
38 : service: Arc<crate::service::Service>,
39 : auth: Option<Arc<SwappableJwtAuth>>,
40 : allowlist_routes: Vec<Uri>,
41 : }
42 :
43 : impl HttpState {
44 0 : pub fn new(service: Arc<crate::service::Service>, auth: Option<Arc<SwappableJwtAuth>>) -> Self {
45 0 : let allowlist_routes = ["/status", "/ready", "/metrics"]
46 0 : .iter()
47 0 : .map(|v| v.parse().unwrap())
48 0 : .collect::<Vec<_>>();
49 0 : Self {
50 0 : service,
51 0 : auth,
52 0 : allowlist_routes,
53 0 : }
54 0 : }
55 : }
56 :
57 : #[inline(always)]
58 0 : fn get_state(request: &Request<Body>) -> &HttpState {
59 0 : request
60 0 : .data::<Arc<HttpState>>()
61 0 : .expect("unknown state type")
62 0 : .as_ref()
63 0 : }
64 :
65 : /// Pageserver calls into this on startup, to learn which tenants it should attach
66 0 : async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
67 0 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
68 0 : let state = get_state(&req);
69 0 : json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
70 0 : }
71 :
72 : /// Pageserver calls into this before doing deletions, to confirm that it still
73 : /// holds the latest generation for the tenants with deletions enqueued
74 0 : async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
75 0 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
76 0 : let state = get_state(&req);
77 0 : json_response(StatusCode::OK, state.service.validate(validate_req))
78 0 : }
79 :
80 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
81 : /// (in the real control plane this is unnecessary, because the same program is managing
82 : /// generation numbers and doing attachments).
83 0 : async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
84 0 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
85 0 : let state = get_state(&req);
86 0 :
87 0 : json_response(
88 0 : StatusCode::OK,
89 0 : state
90 0 : .service
91 0 : .attach_hook(attach_req)
92 0 : .await
93 0 : .map_err(ApiError::InternalServerError)?,
94 : )
95 0 : }
96 :
97 0 : async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
98 0 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
99 :
100 0 : let state = get_state(&req);
101 0 :
102 0 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
103 0 : }
104 :
105 0 : async fn handle_tenant_create(
106 0 : service: Arc<Service>,
107 0 : mut req: Request<Body>,
108 0 : ) -> Result<Response<Body>, ApiError> {
109 0 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
110 : json_response(
111 : StatusCode::CREATED,
112 0 : service.tenant_create(create_req).await?,
113 : )
114 0 : }
115 :
116 : // For tenant and timeline deletions, which both implement an "initially return 202, then 404 once
117 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream. This avoids
118 : // needing to track a "deleting" state for tenants.
119 0 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
120 0 : where
121 0 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
122 0 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
123 0 : {
124 0 : let started_at = Instant::now();
125 0 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
126 0 : // completed.
127 0 : let mut retry_period = Duration::from_secs(1);
128 0 : // On subsequent retries, wait longer.
129 0 : let max_retry_period = Duration::from_secs(5);
130 0 : // Enable callers with a 30 second request timeout to reliably get a response
131 0 : let max_wait = Duration::from_secs(25);
132 :
133 : loop {
134 0 : let status = f(service.clone()).await?;
135 0 : match status {
136 : StatusCode::ACCEPTED => {
137 0 : tracing::info!("Deletion accepted, waiting to try again...");
138 0 : tokio::time::sleep(retry_period).await;
139 0 : retry_period = max_retry_period;
140 : }
141 : StatusCode::NOT_FOUND => {
142 0 : tracing::info!("Deletion complete");
143 0 : return json_response(StatusCode::OK, ());
144 : }
145 : _ => {
146 0 : tracing::warn!("Unexpected status {status}");
147 0 : return json_response(status, ());
148 : }
149 : }
150 :
151 0 : let now = Instant::now();
152 0 : if now + retry_period > started_at + max_wait {
153 0 : tracing::info!("Deletion timed out waiting for 404");
154 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
155 : // the pageserver's swagger definition for this endpoint, and has the same desired
156 : // effect of causing the control plane to retry later.
157 0 : return json_response(StatusCode::CONFLICT, ());
158 0 : }
159 : }
160 0 : }
161 :
162 0 : async fn handle_tenant_location_config(
163 0 : service: Arc<Service>,
164 0 : mut req: Request<Body>,
165 0 : ) -> Result<Response<Body>, ApiError> {
166 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
167 0 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
168 : json_response(
169 : StatusCode::OK,
170 0 : service
171 0 : .tenant_location_config(tenant_id, config_req)
172 0 : .await?,
173 : )
174 0 : }
175 :
176 0 : async fn handle_tenant_time_travel_remote_storage(
177 0 : service: Arc<Service>,
178 0 : mut req: Request<Body>,
179 0 : ) -> Result<Response<Body>, ApiError> {
180 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
181 0 : let time_travel_req = json_request::<TenantTimeTravelRequest>(&mut req).await?;
182 :
183 0 : let timestamp_raw = must_get_query_param(&req, "travel_to")?;
184 0 : let _timestamp = humantime::parse_rfc3339(×tamp_raw).map_err(|_e| {
185 0 : ApiError::BadRequest(anyhow::anyhow!(
186 0 : "Invalid time for travel_to: {timestamp_raw:?}"
187 0 : ))
188 0 : })?;
189 :
190 0 : let done_if_after_raw = must_get_query_param(&req, "done_if_after")?;
191 0 : let _done_if_after = humantime::parse_rfc3339(&done_if_after_raw).map_err(|_e| {
192 0 : ApiError::BadRequest(anyhow::anyhow!(
193 0 : "Invalid time for done_if_after: {done_if_after_raw:?}"
194 0 : ))
195 0 : })?;
196 :
197 0 : service
198 0 : .tenant_time_travel_remote_storage(
199 0 : &time_travel_req,
200 0 : tenant_id,
201 0 : timestamp_raw,
202 0 : done_if_after_raw,
203 0 : )
204 0 : .await?;
205 :
206 0 : json_response(StatusCode::OK, ())
207 0 : }
208 :
209 0 : async fn handle_tenant_delete(
210 0 : service: Arc<Service>,
211 0 : req: Request<Body>,
212 0 : ) -> Result<Response<Body>, ApiError> {
213 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
214 :
215 0 : deletion_wrapper(service, move |service| async move {
216 0 : service.tenant_delete(tenant_id).await
217 0 : })
218 0 : .await
219 0 : }
220 :
221 0 : async fn handle_tenant_timeline_create(
222 0 : service: Arc<Service>,
223 0 : mut req: Request<Body>,
224 0 : ) -> Result<Response<Body>, ApiError> {
225 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
226 0 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
227 : json_response(
228 : StatusCode::CREATED,
229 0 : service
230 0 : .tenant_timeline_create(tenant_id, create_req)
231 0 : .await?,
232 : )
233 0 : }
234 :
235 0 : async fn handle_tenant_timeline_delete(
236 0 : service: Arc<Service>,
237 0 : req: Request<Body>,
238 0 : ) -> Result<Response<Body>, ApiError> {
239 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
240 0 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
241 :
242 0 : deletion_wrapper(service, move |service| async move {
243 0 : service.tenant_timeline_delete(tenant_id, timeline_id).await
244 0 : })
245 0 : .await
246 0 : }
247 :
248 0 : async fn handle_tenant_timeline_passthrough(
249 0 : service: Arc<Service>,
250 0 : req: Request<Body>,
251 0 : ) -> Result<Response<Body>, ApiError> {
252 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
253 :
254 0 : let Some(path) = req.uri().path_and_query() else {
255 : // This should never happen, our request router only calls us if there is a path
256 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
257 : };
258 :
259 0 : tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
260 :
261 : // Find the node that holds shard zero
262 0 : let (base_url, tenant_shard_id) = service.tenant_shard0_baseurl(tenant_id)?;
263 :
264 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
265 : // rewrite this to a shard-aware shard zero ID.
266 0 : let path = format!("{}", path);
267 0 : let tenant_str = tenant_id.to_string();
268 0 : let tenant_shard_str = format!("{}", tenant_shard_id);
269 0 : let path = path.replace(&tenant_str, &tenant_shard_str);
270 0 :
271 0 : let client = mgmt_api::Client::new(base_url, service.get_config().jwt_token.as_deref());
272 0 : let resp = client.get_raw(path).await.map_err(|_e|
273 : // FIXME: give APiError a proper Unavailable variant. We return 503 here because
274 : // if we can't successfully send a request to the pageserver, we aren't available.
275 0 : ApiError::ShuttingDown)?;
276 :
277 : // We have a reqest::Response, would like a http::Response
278 0 : let mut builder = hyper::Response::builder()
279 0 : .status(resp.status())
280 0 : .version(resp.version());
281 0 : for (k, v) in resp.headers() {
282 0 : builder = builder.header(k, v);
283 0 : }
284 :
285 0 : let response = builder
286 0 : .body(Body::wrap_stream(resp.bytes_stream()))
287 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
288 :
289 0 : Ok(response)
290 0 : }
291 :
292 0 : async fn handle_tenant_locate(
293 0 : service: Arc<Service>,
294 0 : req: Request<Body>,
295 0 : ) -> Result<Response<Body>, ApiError> {
296 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
297 0 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
298 0 : }
299 :
300 0 : async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
301 0 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
302 0 : let state = get_state(&req);
303 0 : state.service.node_register(register_req).await?;
304 0 : json_response(StatusCode::OK, ())
305 0 : }
306 :
307 0 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
308 0 : let state = get_state(&req);
309 0 : json_response(StatusCode::OK, state.service.node_list().await?)
310 0 : }
311 :
312 0 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
313 0 : let state = get_state(&req);
314 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
315 0 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
316 0 : }
317 :
318 0 : async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
319 0 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
320 0 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
321 0 : if node_id != config_req.node_id {
322 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
323 0 : "Path and body node_id differ"
324 0 : )));
325 0 : }
326 0 : let state = get_state(&req);
327 0 :
328 0 : json_response(
329 0 : StatusCode::OK,
330 0 : state.service.node_configure(config_req).await?,
331 : )
332 0 : }
333 :
334 0 : async fn handle_tenant_shard_split(
335 0 : service: Arc<Service>,
336 0 : mut req: Request<Body>,
337 0 : ) -> Result<Response<Body>, ApiError> {
338 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
339 0 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
340 :
341 : json_response(
342 : StatusCode::OK,
343 0 : service.tenant_shard_split(tenant_id, split_req).await?,
344 : )
345 0 : }
346 :
347 0 : async fn handle_tenant_shard_migrate(
348 0 : service: Arc<Service>,
349 0 : mut req: Request<Body>,
350 0 : ) -> Result<Response<Body>, ApiError> {
351 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
352 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
353 : json_response(
354 : StatusCode::OK,
355 0 : service
356 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
357 0 : .await?,
358 : )
359 0 : }
360 :
361 0 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
362 0 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
363 0 : let state = get_state(&req);
364 0 :
365 0 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
366 0 : }
367 :
368 0 : async fn handle_tenants_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
369 0 : let state = get_state(&req);
370 0 : state.service.tenants_dump()
371 0 : }
372 :
373 0 : async fn handle_scheduler_dump(req: Request<Body>) -> Result<Response<Body>, ApiError> {
374 0 : let state = get_state(&req);
375 0 : state.service.scheduler_dump()
376 0 : }
377 :
378 0 : async fn handle_consistency_check(req: Request<Body>) -> Result<Response<Body>, ApiError> {
379 0 : let state = get_state(&req);
380 0 :
381 0 : json_response(StatusCode::OK, state.service.consistency_check().await?)
382 0 : }
383 :
384 : /// Status endpoint is just used for checking that our HTTP listener is up
385 0 : async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
386 0 : json_response(StatusCode::OK, ())
387 0 : }
388 :
389 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
390 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
391 0 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
392 0 : let state = get_state(&req);
393 0 : if state.service.startup_complete.is_ready() {
394 0 : json_response(StatusCode::OK, ())
395 : } else {
396 0 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
397 : }
398 0 : }
399 :
400 : impl From<ReconcileError> for ApiError {
401 0 : fn from(value: ReconcileError) -> Self {
402 0 : ApiError::Conflict(format!("Reconciliation error: {}", value))
403 0 : }
404 : }
405 :
406 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
407 : /// be allowed to run if Service has finished its initial reconciliation.
408 0 : async fn tenant_service_handler<R, H>(request: Request<Body>, handler: H) -> R::Output
409 0 : where
410 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
411 0 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
412 0 : {
413 0 : let state = get_state(&request);
414 0 : let service = state.service.clone();
415 0 :
416 0 : let startup_complete = service.startup_complete.clone();
417 0 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
418 0 : .await
419 0 : .is_err()
420 : {
421 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
422 : // timeouts around its remote calls, to bound its runtime.
423 0 : return Err(ApiError::Timeout(
424 0 : "Timed out waiting for service readiness".into(),
425 0 : ));
426 0 : }
427 0 :
428 0 : request_span(
429 0 : request,
430 0 : |request| async move { handler(service, request).await },
431 0 : )
432 0 : .await
433 0 : }
434 :
435 0 : pub fn make_router(
436 0 : service: Arc<Service>,
437 0 : auth: Option<Arc<SwappableJwtAuth>>,
438 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
439 0 : let mut router = endpoint::make_router();
440 0 : if auth.is_some() {
441 0 : router = router.middleware(auth_middleware(|request| {
442 0 : let state = get_state(request);
443 0 : if state.allowlist_routes.contains(request.uri()) {
444 0 : None
445 : } else {
446 0 : state.auth.as_deref()
447 : }
448 0 : }))
449 0 : }
450 :
451 0 : router
452 0 : .data(Arc::new(HttpState::new(service, auth)))
453 0 : // Non-prefixed generic endpoints (status, metrics)
454 0 : .get("/status", |r| request_span(r, handle_status))
455 0 : .get("/ready", |r| request_span(r, handle_ready))
456 0 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
457 0 : .post("/upcall/v1/re-attach", |r| {
458 0 : request_span(r, handle_re_attach)
459 0 : })
460 0 : .post("/upcall/v1/validate", |r| request_span(r, handle_validate))
461 0 : // Test/dev/debug endpoints
462 0 : .post("/debug/v1/attach-hook", |r| {
463 0 : request_span(r, handle_attach_hook)
464 0 : })
465 0 : .post("/debug/v1/inspect", |r| request_span(r, handle_inspect))
466 0 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
467 0 : request_span(r, handle_tenant_drop)
468 0 : })
469 0 : .post("/debug/v1/node/:node_id/drop", |r| {
470 0 : request_span(r, handle_node_drop)
471 0 : })
472 0 : .get("/debug/v1/tenant", |r| request_span(r, handle_tenants_dump))
473 0 : .get("/debug/v1/scheduler", |r| {
474 0 : request_span(r, handle_scheduler_dump)
475 0 : })
476 0 : .post("/debug/v1/consistency_check", |r| {
477 0 : request_span(r, handle_consistency_check)
478 0 : })
479 0 : .get("/control/v1/tenant/:tenant_id/locate", |r| {
480 0 : tenant_service_handler(r, handle_tenant_locate)
481 0 : })
482 0 : // Node operations
483 0 : .post("/control/v1/node", |r| {
484 0 : request_span(r, handle_node_register)
485 0 : })
486 0 : .get("/control/v1/node", |r| request_span(r, handle_node_list))
487 0 : .put("/control/v1/node/:node_id/config", |r| {
488 0 : request_span(r, handle_node_configure)
489 0 : })
490 0 : // Tenant Shard operations
491 0 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
492 0 : tenant_service_handler(r, handle_tenant_shard_migrate)
493 0 : })
494 0 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
495 0 : tenant_service_handler(r, handle_tenant_shard_split)
496 0 : })
497 0 : // Tenant operations
498 0 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
499 0 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
500 0 : .post("/v1/tenant", |r| {
501 0 : tenant_service_handler(r, handle_tenant_create)
502 0 : })
503 0 : .delete("/v1/tenant/:tenant_id", |r| {
504 0 : tenant_service_handler(r, handle_tenant_delete)
505 0 : })
506 0 : .put("/v1/tenant/:tenant_id/location_config", |r| {
507 0 : tenant_service_handler(r, handle_tenant_location_config)
508 0 : })
509 0 : .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
510 0 : tenant_service_handler(r, handle_tenant_time_travel_remote_storage)
511 0 : })
512 0 : // Timeline operations
513 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
514 0 : tenant_service_handler(r, handle_tenant_timeline_delete)
515 0 : })
516 0 : .post("/v1/tenant/:tenant_id/timeline", |r| {
517 0 : tenant_service_handler(r, handle_tenant_timeline_create)
518 0 : })
519 0 : // Tenant detail GET passthrough to shard zero
520 0 : .get("/v1/tenant/:tenant_id", |r| {
521 0 : tenant_service_handler(r, handle_tenant_timeline_passthrough)
522 0 : })
523 0 : // Timeline GET passthrough to shard zero. Note that the `*` in the URL is a wildcard: any future
524 0 : // timeline GET APIs will be implicitly included.
525 0 : .get("/v1/tenant/:tenant_id/timeline*", |r| {
526 0 : tenant_service_handler(r, handle_tenant_timeline_passthrough)
527 0 : })
528 0 : }
|