Line data Source code
1 : use crate::reconciler::ReconcileError;
2 : use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
3 : use hyper::{Body, Request, Response};
4 : use hyper::{StatusCode, Uri};
5 : use pageserver_api::models::{
6 : TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
7 : TimelineCreateRequest,
8 : };
9 : use pageserver_api::shard::TenantShardId;
10 : use pageserver_client::mgmt_api;
11 : use std::sync::Arc;
12 : use std::time::{Duration, Instant};
13 : use utils::auth::SwappableJwtAuth;
14 : use utils::http::endpoint::{auth_middleware, request_span};
15 : use utils::http::request::parse_request_param;
16 : use utils::id::{TenantId, TimelineId};
17 :
18 : use utils::{
19 : http::{
20 : endpoint::{self},
21 : error::ApiError,
22 : json::{json_request, json_response},
23 : RequestExt, RouterBuilder,
24 : },
25 : id::NodeId,
26 : };
27 :
28 : use pageserver_api::control_api::{ReAttachRequest, ValidateRequest};
29 :
30 : use control_plane::attachment_service::{
31 : AttachHookRequest, InspectRequest, NodeConfigureRequest, NodeRegisterRequest,
32 : TenantShardMigrateRequest,
33 : };
34 :
35 : /// State available to HTTP request handlers
36 0 : #[derive(Clone)]
37 : pub struct HttpState {
38 : service: Arc<crate::service::Service>,
39 : auth: Option<Arc<SwappableJwtAuth>>,
40 : allowlist_routes: Vec<Uri>,
41 : }
42 :
43 : impl HttpState {
44 366 : pub fn new(service: Arc<crate::service::Service>, auth: Option<Arc<SwappableJwtAuth>>) -> Self {
45 366 : let allowlist_routes = ["/status", "/ready", "/metrics"]
46 366 : .iter()
47 1098 : .map(|v| v.parse().unwrap())
48 366 : .collect::<Vec<_>>();
49 366 : Self {
50 366 : service,
51 366 : auth,
52 366 : allowlist_routes,
53 366 : }
54 366 : }
55 : }
56 :
57 : #[inline(always)]
58 4203 : fn get_state(request: &Request<Body>) -> &HttpState {
59 4203 : request
60 4203 : .data::<Arc<HttpState>>()
61 4203 : .expect("unknown state type")
62 4203 : .as_ref()
63 4203 : }
64 :
65 : /// Pageserver calls into this on startup, to learn which tenants it should attach
66 624 : async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
67 624 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
68 624 : let state = get_state(&req);
69 624 : json_response(
70 624 : StatusCode::OK,
71 624 : state
72 624 : .service
73 624 : .re_attach(reattach_req)
74 624 : .await
75 624 : .map_err(ApiError::InternalServerError)?,
76 : )
77 624 : }
78 :
79 : /// Pageserver calls into this before doing deletions, to confirm that it still
80 : /// holds the latest generation for the tenants with deletions enqueued
81 441 : async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
82 441 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
83 441 : let state = get_state(&req);
84 441 : json_response(StatusCode::OK, state.service.validate(validate_req))
85 441 : }
86 :
87 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
88 : /// (in the real control plane this is unnecessary, because the same program is managing
89 : /// generation numbers and doing attachments).
90 207 : async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
91 207 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
92 207 : let state = get_state(&req);
93 207 :
94 207 : json_response(
95 207 : StatusCode::OK,
96 207 : state
97 207 : .service
98 207 : .attach_hook(attach_req)
99 239 : .await
100 207 : .map_err(ApiError::InternalServerError)?,
101 : )
102 207 : }
103 :
104 73 : async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
105 73 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
106 :
107 73 : let state = get_state(&req);
108 73 :
109 73 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
110 73 : }
111 :
112 463 : async fn handle_tenant_create(
113 463 : service: Arc<Service>,
114 463 : mut req: Request<Body>,
115 463 : ) -> Result<Response<Body>, ApiError> {
116 463 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
117 935 : json_response(StatusCode::OK, service.tenant_create(create_req).await?)
118 463 : }
119 :
120 : // For tenant and timeline deletions, which both implement an "initially return 202, then 404 once
121 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream. This avoids
122 : // needing to track a "deleting" state for tenants.
123 7 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
124 7 : where
125 7 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
126 7 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
127 7 : {
128 7 : let started_at = Instant::now();
129 7 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
130 7 : // completed.
131 7 : let mut retry_period = Duration::from_secs(1);
132 7 : // On subsequent retries, wait longer.
133 7 : let max_retry_period = Duration::from_secs(5);
134 7 : // Enable callers with a 30 second request timeout to reliably get a response
135 7 : let max_wait = Duration::from_secs(25);
136 :
137 : loop {
138 118 : let status = f(service.clone()).await?;
139 14 : match status {
140 : StatusCode::ACCEPTED => {
141 7 : tracing::info!("Deletion accepted, waiting to try again...");
142 7 : tokio::time::sleep(retry_period).await;
143 7 : retry_period = max_retry_period;
144 : }
145 : StatusCode::NOT_FOUND => {
146 7 : tracing::info!("Deletion complete");
147 7 : return json_response(StatusCode::OK, ());
148 : }
149 : _ => {
150 0 : tracing::warn!("Unexpected status {status}");
151 0 : return json_response(status, ());
152 : }
153 : }
154 :
155 7 : let now = Instant::now();
156 7 : if now + retry_period > started_at + max_wait {
157 0 : tracing::info!("Deletion timed out waiting for 404");
158 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
159 : // the pageserver's swagger definition for this endpoint, and has the same desired
160 : // effect of causing the control plane to retry later.
161 0 : return json_response(StatusCode::CONFLICT, ());
162 7 : }
163 : }
164 7 : }
165 :
166 2 : async fn handle_tenant_location_config(
167 2 : service: Arc<Service>,
168 2 : mut req: Request<Body>,
169 2 : ) -> Result<Response<Body>, ApiError> {
170 2 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
171 2 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
172 : json_response(
173 : StatusCode::OK,
174 2 : service
175 2 : .tenant_location_config(tenant_id, config_req)
176 2 : .await?,
177 : )
178 2 : }
179 :
180 6 : async fn handle_tenant_delete(
181 6 : service: Arc<Service>,
182 6 : req: Request<Body>,
183 6 : ) -> Result<Response<Body>, ApiError> {
184 6 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
185 :
186 12 : deletion_wrapper(service, move |service| async move {
187 102 : service.tenant_delete(tenant_id).await
188 24 : })
189 108 : .await
190 6 : }
191 :
192 797 : async fn handle_tenant_timeline_create(
193 797 : service: Arc<Service>,
194 797 : mut req: Request<Body>,
195 797 : ) -> Result<Response<Body>, ApiError> {
196 797 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
197 797 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
198 : json_response(
199 : StatusCode::OK,
200 797 : service
201 797 : .tenant_timeline_create(tenant_id, create_req)
202 3275 : .await?,
203 : )
204 797 : }
205 :
206 1 : async fn handle_tenant_timeline_delete(
207 1 : service: Arc<Service>,
208 1 : req: Request<Body>,
209 1 : ) -> Result<Response<Body>, ApiError> {
210 1 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
211 1 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
212 :
213 2 : deletion_wrapper(service, move |service| async move {
214 16 : service.tenant_timeline_delete(tenant_id, timeline_id).await
215 4 : })
216 17 : .await
217 1 : }
218 :
219 11 : async fn handle_tenant_timeline_passthrough(
220 11 : service: Arc<Service>,
221 11 : req: Request<Body>,
222 11 : ) -> Result<Response<Body>, ApiError> {
223 11 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
224 :
225 11 : let Some(path) = req.uri().path_and_query() else {
226 : // This should never happen, our request router only calls us if there is a path
227 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
228 : };
229 :
230 11 : tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
231 :
232 : // Find the node that holds shard zero
233 11 : let (base_url, tenant_shard_id) = service.tenant_shard0_baseurl(tenant_id)?;
234 :
235 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
236 : // rewrite this to a shard-aware shard zero ID.
237 5 : let path = format!("{}", path);
238 5 : let tenant_str = tenant_id.to_string();
239 5 : let tenant_shard_str = format!("{}", tenant_shard_id);
240 5 : let path = path.replace(&tenant_str, &tenant_shard_str);
241 5 :
242 5 : let client = mgmt_api::Client::new(base_url, service.get_config().jwt_token.as_deref());
243 20 : let resp = client.get_raw(path).await.map_err(|_e|
244 : // FIXME: give APiError a proper Unavailable variant. We return 503 here because
245 : // if we can't successfully send a request to the pageserver, we aren't available.
246 5 : ApiError::ShuttingDown)?;
247 :
248 : // We have a reqest::Response, would like a http::Response
249 5 : let mut builder = hyper::Response::builder()
250 5 : .status(resp.status())
251 5 : .version(resp.version());
252 25 : for (k, v) in resp.headers() {
253 25 : builder = builder.header(k, v);
254 25 : }
255 :
256 5 : let response = builder
257 5 : .body(Body::wrap_stream(resp.bytes_stream()))
258 5 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
259 :
260 5 : Ok(response)
261 11 : }
262 :
263 841 : async fn handle_tenant_locate(
264 841 : service: Arc<Service>,
265 841 : req: Request<Body>,
266 841 : ) -> Result<Response<Body>, ApiError> {
267 841 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
268 841 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
269 841 : }
270 :
271 624 : async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
272 624 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
273 624 : let state = get_state(&req);
274 624 : state.service.node_register(register_req).await?;
275 624 : json_response(StatusCode::OK, ())
276 624 : }
277 :
278 5 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
279 5 : let state = get_state(&req);
280 5 : json_response(StatusCode::OK, state.service.node_list().await?)
281 5 : }
282 :
283 1 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
284 1 : let state = get_state(&req);
285 1 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
286 1 : json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
287 1 : }
288 :
289 4 : async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
290 4 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
291 4 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
292 4 : if node_id != config_req.node_id {
293 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
294 0 : "Path and body node_id differ"
295 0 : )));
296 4 : }
297 4 : let state = get_state(&req);
298 4 :
299 4 : json_response(StatusCode::OK, state.service.node_configure(config_req)?)
300 4 : }
301 :
302 2 : async fn handle_tenant_shard_split(
303 2 : service: Arc<Service>,
304 2 : mut req: Request<Body>,
305 2 : ) -> Result<Response<Body>, ApiError> {
306 2 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
307 2 : let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
308 :
309 : json_response(
310 : StatusCode::OK,
311 30 : service.tenant_shard_split(tenant_id, split_req).await?,
312 : )
313 2 : }
314 :
315 4 : async fn handle_tenant_shard_migrate(
316 4 : service: Arc<Service>,
317 4 : mut req: Request<Body>,
318 4 : ) -> Result<Response<Body>, ApiError> {
319 4 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
320 4 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
321 : json_response(
322 : StatusCode::OK,
323 4 : service
324 4 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
325 4 : .await?,
326 : )
327 4 : }
328 :
329 1 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
330 1 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
331 1 : let state = get_state(&req);
332 1 :
333 1 : json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
334 1 : }
335 :
336 : /// Status endpoint is just used for checking that our HTTP listener is up
337 366 : async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
338 366 : json_response(StatusCode::OK, ())
339 366 : }
340 :
341 : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
342 : /// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
343 8 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
344 8 : let state = get_state(&req);
345 8 : if state.service.startup_complete.is_ready() {
346 1 : json_response(StatusCode::OK, ())
347 : } else {
348 7 : json_response(StatusCode::SERVICE_UNAVAILABLE, ())
349 : }
350 8 : }
351 :
352 : impl From<ReconcileError> for ApiError {
353 0 : fn from(value: ReconcileError) -> Self {
354 0 : ApiError::Conflict(format!("Reconciliation error: {}", value))
355 0 : }
356 : }
357 :
358 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
359 : /// be allowed to run if Service has finished its initial reconciliation.
360 2127 : async fn tenant_service_handler<R, H>(request: Request<Body>, handler: H) -> R::Output
361 2127 : where
362 2127 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
363 2127 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
364 2127 : {
365 2127 : let state = get_state(&request);
366 2127 : let service = state.service.clone();
367 2127 :
368 2127 : let startup_complete = service.startup_complete.clone();
369 2127 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
370 0 : .await
371 2127 : .is_err()
372 : {
373 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
374 : // timeouts around its remote calls, to bound its runtime.
375 0 : return Err(ApiError::Timeout(
376 0 : "Timed out waiting for service readiness".into(),
377 0 : ));
378 2127 : }
379 2127 :
380 2127 : request_span(
381 2127 : request,
382 4391 : |request| async move { handler(service, request).await },
383 2127 : )
384 4391 : .await
385 2127 : }
386 :
387 366 : pub fn make_router(
388 366 : service: Arc<Service>,
389 366 : auth: Option<Arc<SwappableJwtAuth>>,
390 366 : ) -> RouterBuilder<hyper::Body, ApiError> {
391 366 : let mut router = endpoint::make_router();
392 366 : if auth.is_some() {
393 88 : router = router.middleware(auth_middleware(|request| {
394 88 : let state = get_state(request);
395 88 : if state.allowlist_routes.contains(request.uri()) {
396 11 : None
397 : } else {
398 77 : state.auth.as_deref()
399 : }
400 88 : }))
401 355 : }
402 :
403 366 : router
404 366 : .data(Arc::new(HttpState::new(service, auth)))
405 366 : // Non-prefixed generic endpoints (status, metrics)
406 366 : .get("/status", |r| request_span(r, handle_status))
407 366 : .get("/ready", |r| request_span(r, handle_ready))
408 366 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
409 624 : .post("/upcall/v1/re-attach", |r| {
410 624 : request_span(r, handle_re_attach)
411 624 : })
412 441 : .post("/upcall/v1/validate", |r| request_span(r, handle_validate))
413 366 : // Test/dev/debug endpoints
414 366 : .post("/debug/v1/attach-hook", |r| {
415 207 : request_span(r, handle_attach_hook)
416 366 : })
417 366 : .post("/debug/v1/inspect", |r| request_span(r, handle_inspect))
418 366 : .post("/debug/v1/tenant/:tenant_id/drop", |r| {
419 1 : request_span(r, handle_tenant_drop)
420 366 : })
421 366 : .post("/debug/v1/node/:node_id/drop", |r| {
422 1 : request_span(r, handle_node_drop)
423 366 : })
424 841 : .get("/control/v1/tenant/:tenant_id/locate", |r| {
425 841 : tenant_service_handler(r, handle_tenant_locate)
426 841 : })
427 366 : // Node operations
428 624 : .post("/control/v1/node", |r| {
429 624 : request_span(r, handle_node_register)
430 624 : })
431 366 : .get("/control/v1/node", |r| request_span(r, handle_node_list))
432 366 : .put("/control/v1/node/:node_id/config", |r| {
433 4 : request_span(r, handle_node_configure)
434 366 : })
435 366 : // Tenant Shard operations
436 366 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
437 4 : tenant_service_handler(r, handle_tenant_shard_migrate)
438 366 : })
439 366 : .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
440 2 : tenant_service_handler(r, handle_tenant_shard_split)
441 366 : })
442 366 : // Tenant operations
443 366 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
444 366 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
445 463 : .post("/v1/tenant", |r| {
446 463 : tenant_service_handler(r, handle_tenant_create)
447 463 : })
448 366 : .delete("/v1/tenant/:tenant_id", |r| {
449 6 : tenant_service_handler(r, handle_tenant_delete)
450 366 : })
451 366 : .put("/v1/tenant/:tenant_id/location_config", |r| {
452 2 : tenant_service_handler(r, handle_tenant_location_config)
453 366 : })
454 366 : // Timeline operations
455 366 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
456 1 : tenant_service_handler(r, handle_tenant_timeline_delete)
457 366 : })
458 797 : .post("/v1/tenant/:tenant_id/timeline", |r| {
459 797 : tenant_service_handler(r, handle_tenant_timeline_create)
460 797 : })
461 366 : // Tenant detail GET passthrough to shard zero
462 366 : .get("/v1/tenant/:tenant_id", |r| {
463 7 : tenant_service_handler(r, handle_tenant_timeline_passthrough)
464 366 : })
465 366 : // Timeline GET passthrough to shard zero. Note that the `*` in the URL is a wildcard: any future
466 366 : // timeline GET APIs will be implicitly included.
467 366 : .get("/v1/tenant/:tenant_id/timeline*", |r| {
468 4 : tenant_service_handler(r, handle_tenant_timeline_passthrough)
469 366 : })
470 366 : }
|