Line data Source code
1 : use crate::reconciler::ReconcileError;
2 : use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
3 : use hyper::{Body, Request, Response};
4 : use hyper::{StatusCode, Uri};
5 : use pageserver_api::models::{
6 : TenantCreateRequest, TenantLocationConfigRequest, TimelineCreateRequest,
7 : };
8 : use pageserver_api::shard::TenantShardId;
9 : use pageserver_client::mgmt_api;
10 : use std::sync::Arc;
11 : use std::time::{Duration, Instant};
12 : use utils::auth::SwappableJwtAuth;
13 : use utils::http::endpoint::{auth_middleware, request_span};
14 : use utils::http::request::parse_request_param;
15 : use utils::id::{TenantId, TimelineId};
16 :
17 : use utils::{
18 : http::{
19 : endpoint::{self},
20 : error::ApiError,
21 : json::{json_request, json_response},
22 : RequestExt, RouterBuilder,
23 : },
24 : id::NodeId,
25 : };
26 :
27 : use pageserver_api::control_api::{ReAttachRequest, ValidateRequest};
28 :
29 : use control_plane::attachment_service::{
30 : AttachHookRequest, InspectRequest, NodeConfigureRequest, NodeRegisterRequest,
31 : TenantShardMigrateRequest,
32 : };
33 :
34 : /// State available to HTTP request handlers
35 0 : #[derive(Clone)]
36 : pub struct HttpState {
37 : service: Arc<crate::service::Service>,
38 : auth: Option<Arc<SwappableJwtAuth>>,
39 : allowlist_routes: Vec<Uri>,
40 : }
41 :
42 : impl HttpState {
43 361 : pub fn new(service: Arc<crate::service::Service>, auth: Option<Arc<SwappableJwtAuth>>) -> Self {
44 361 : let allowlist_routes = ["/status"]
45 361 : .iter()
46 361 : .map(|v| v.parse().unwrap())
47 361 : .collect::<Vec<_>>();
48 361 : Self {
49 361 : service,
50 361 : auth,
51 361 : allowlist_routes,
52 361 : }
53 361 : }
54 : }
55 :
56 : #[inline(always)]
57 4092 : fn get_state(request: &Request<Body>) -> &HttpState {
58 4092 : request
59 4092 : .data::<Arc<HttpState>>()
60 4092 : .expect("unknown state type")
61 4092 : .as_ref()
62 4092 : }
63 :
64 : /// Pageserver calls into this on startup, to learn which tenants it should attach
65 603 : async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
66 603 : let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
67 603 : let state = get_state(&req);
68 603 : json_response(
69 603 : StatusCode::OK,
70 603 : state
71 603 : .service
72 603 : .re_attach(reattach_req)
73 603 : .await
74 603 : .map_err(ApiError::InternalServerError)?,
75 : )
76 603 : }
77 :
78 : /// Pageserver calls into this before doing deletions, to confirm that it still
79 : /// holds the latest generation for the tenants with deletions enqueued
80 411 : async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
81 411 : let validate_req = json_request::<ValidateRequest>(&mut req).await?;
82 411 : let state = get_state(&req);
83 411 : json_response(StatusCode::OK, state.service.validate(validate_req))
84 411 : }
85 :
86 : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
87 : /// (in the real control plane this is unnecessary, because the same program is managing
88 : /// generation numbers and doing attachments).
89 212 : async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
90 212 : let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
91 212 : let state = get_state(&req);
92 212 :
93 212 : json_response(
94 212 : StatusCode::OK,
95 212 : state
96 212 : .service
97 212 : .attach_hook(attach_req)
98 236 : .await
99 212 : .map_err(ApiError::InternalServerError)?,
100 : )
101 212 : }
102 :
103 72 : async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
104 72 : let inspect_req = json_request::<InspectRequest>(&mut req).await?;
105 :
106 72 : let state = get_state(&req);
107 72 :
108 72 : json_response(StatusCode::OK, state.service.inspect(inspect_req))
109 72 : }
110 :
111 459 : async fn handle_tenant_create(
112 459 : service: Arc<Service>,
113 459 : mut req: Request<Body>,
114 459 : ) -> Result<Response<Body>, ApiError> {
115 459 : let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
116 927 : json_response(StatusCode::OK, service.tenant_create(create_req).await?)
117 459 : }
118 :
119 : // For tenant and timeline deletions, which both implement an "initially return 202, then 404 once
120 : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream. This avoids
121 : // needing to track a "deleting" state for tenants.
122 7 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
123 7 : where
124 7 : R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
125 7 : F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
126 7 : {
127 7 : let started_at = Instant::now();
128 7 : // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
129 7 : // completed.
130 7 : let mut retry_period = Duration::from_secs(1);
131 7 : // On subsequent retries, wait longer.
132 7 : let max_retry_period = Duration::from_secs(5);
133 7 : // Enable callers with a 30 second request timeout to reliably get a response
134 7 : let max_wait = Duration::from_secs(25);
135 :
136 : loop {
137 116 : let status = f(service.clone()).await?;
138 14 : match status {
139 : StatusCode::ACCEPTED => {
140 7 : tracing::info!("Deletion accepted, waiting to try again...");
141 7 : tokio::time::sleep(retry_period).await;
142 7 : retry_period = max_retry_period;
143 : }
144 : StatusCode::NOT_FOUND => {
145 7 : tracing::info!("Deletion complete");
146 7 : return json_response(StatusCode::OK, ());
147 : }
148 : _ => {
149 0 : tracing::warn!("Unexpected status {status}");
150 0 : return json_response(status, ());
151 : }
152 : }
153 :
154 7 : let now = Instant::now();
155 7 : if now + retry_period > started_at + max_wait {
156 0 : tracing::info!("Deletion timed out waiting for 404");
157 : // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
158 : // the pageserver's swagger definition for this endpoint, and has the same desired
159 : // effect of causing the control plane to retry later.
160 0 : return json_response(StatusCode::CONFLICT, ());
161 7 : }
162 : }
163 7 : }
164 :
165 2 : async fn handle_tenant_location_config(
166 2 : service: Arc<Service>,
167 2 : mut req: Request<Body>,
168 2 : ) -> Result<Response<Body>, ApiError> {
169 2 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
170 2 : let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
171 : json_response(
172 : StatusCode::OK,
173 2 : service
174 2 : .tenant_location_config(tenant_id, config_req)
175 2 : .await?,
176 : )
177 2 : }
178 :
179 6 : async fn handle_tenant_delete(
180 6 : service: Arc<Service>,
181 6 : req: Request<Body>,
182 6 : ) -> Result<Response<Body>, ApiError> {
183 6 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
184 :
185 12 : deletion_wrapper(service, move |service| async move {
186 100 : service.tenant_delete(tenant_id).await
187 12 : })
188 106 : .await
189 6 : }
190 :
191 797 : async fn handle_tenant_timeline_create(
192 797 : service: Arc<Service>,
193 797 : mut req: Request<Body>,
194 797 : ) -> Result<Response<Body>, ApiError> {
195 797 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
196 797 : let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
197 : json_response(
198 : StatusCode::OK,
199 797 : service
200 797 : .tenant_timeline_create(tenant_id, create_req)
201 3204 : .await?,
202 : )
203 797 : }
204 :
205 1 : async fn handle_tenant_timeline_delete(
206 1 : service: Arc<Service>,
207 1 : req: Request<Body>,
208 1 : ) -> Result<Response<Body>, ApiError> {
209 1 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
210 1 : let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
211 :
212 2 : deletion_wrapper(service, move |service| async move {
213 16 : service.tenant_timeline_delete(tenant_id, timeline_id).await
214 2 : })
215 17 : .await
216 1 : }
217 :
218 11 : async fn handle_tenant_timeline_passthrough(
219 11 : service: Arc<Service>,
220 11 : req: Request<Body>,
221 11 : ) -> Result<Response<Body>, ApiError> {
222 11 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
223 :
224 11 : let Some(path) = req.uri().path_and_query() else {
225 : // This should never happen, our request router only calls us if there is a path
226 0 : return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
227 : };
228 :
229 11 : tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
230 :
231 : // Find the node that holds shard zero
232 11 : let (base_url, tenant_shard_id) = service.tenant_shard0_baseurl(tenant_id)?;
233 :
234 : // Callers will always pass an unsharded tenant ID. Before proxying, we must
235 : // rewrite this to a shard-aware shard zero ID.
236 5 : let path = format!("{}", path);
237 5 : let tenant_str = tenant_id.to_string();
238 5 : let tenant_shard_str = format!("{}", tenant_shard_id);
239 5 : let path = path.replace(&tenant_str, &tenant_shard_str);
240 5 :
241 5 : let client = mgmt_api::Client::new(base_url, service.get_config().jwt_token.as_deref());
242 19 : let resp = client.get_raw(path).await.map_err(|_e|
243 : // FIXME: give APiError a proper Unavailable variant. We return 503 here because
244 : // if we can't successfully send a request to the pageserver, we aren't available.
245 5 : ApiError::ShuttingDown)?;
246 :
247 : // We have a reqest::Response, would like a http::Response
248 5 : let mut builder = hyper::Response::builder()
249 5 : .status(resp.status())
250 5 : .version(resp.version());
251 25 : for (k, v) in resp.headers() {
252 25 : builder = builder.header(k, v);
253 25 : }
254 :
255 5 : let response = builder
256 5 : .body(Body::wrap_stream(resp.bytes_stream()))
257 5 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
258 :
259 5 : Ok(response)
260 11 : }
261 :
262 823 : async fn handle_tenant_locate(
263 823 : service: Arc<Service>,
264 823 : req: Request<Body>,
265 823 : ) -> Result<Response<Body>, ApiError> {
266 823 : let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
267 823 : json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
268 823 : }
269 :
270 603 : async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
271 603 : let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
272 603 : let state = get_state(&req);
273 603 : state.service.node_register(register_req).await?;
274 603 : json_response(StatusCode::OK, ())
275 603 : }
276 :
277 2 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
278 2 : let state = get_state(&req);
279 2 : json_response(StatusCode::OK, state.service.node_list().await?)
280 2 : }
281 :
282 4 : async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
283 4 : let node_id: NodeId = parse_request_param(&req, "node_id")?;
284 4 : let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
285 4 : if node_id != config_req.node_id {
286 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
287 0 : "Path and body node_id differ"
288 0 : )));
289 4 : }
290 4 : let state = get_state(&req);
291 4 :
292 4 : json_response(StatusCode::OK, state.service.node_configure(config_req)?)
293 4 : }
294 :
295 0 : async fn handle_tenant_shard_migrate(
296 0 : service: Arc<Service>,
297 0 : mut req: Request<Body>,
298 0 : ) -> Result<Response<Body>, ApiError> {
299 0 : let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
300 0 : let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
301 : json_response(
302 : StatusCode::OK,
303 0 : service
304 0 : .tenant_shard_migrate(tenant_shard_id, migrate_req)
305 0 : .await?,
306 : )
307 0 : }
308 :
309 : /// Status endpoint is just used for checking that our HTTP listener is up
310 361 : async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
311 361 : json_response(StatusCode::OK, ())
312 361 : }
313 :
314 : impl From<ReconcileError> for ApiError {
315 0 : fn from(value: ReconcileError) -> Self {
316 0 : ApiError::Conflict(format!("Reconciliation error: {}", value))
317 0 : }
318 : }
319 :
320 : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
321 : /// be allowed to run if Service has finished its initial reconciliation.
322 2099 : async fn tenant_service_handler<R, H>(request: Request<Body>, handler: H) -> R::Output
323 2099 : where
324 2099 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
325 2099 : H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
326 2099 : {
327 2099 : let state = get_state(&request);
328 2099 : let service = state.service.clone();
329 2099 :
330 2099 : let startup_complete = service.startup_complete.clone();
331 2099 : if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
332 0 : .await
333 2099 : .is_err()
334 : {
335 : // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
336 : // timeouts around its remote calls, to bound its runtime.
337 0 : return Err(ApiError::Timeout(
338 0 : "Timed out waiting for service readiness".into(),
339 0 : ));
340 2099 : }
341 2099 :
342 2099 : request_span(
343 2099 : request,
344 4275 : |request| async move { handler(service, request).await },
345 2099 : )
346 4275 : .await
347 2099 : }
348 :
349 361 : pub fn make_router(
350 361 : service: Arc<Service>,
351 361 : auth: Option<Arc<SwappableJwtAuth>>,
352 361 : ) -> RouterBuilder<hyper::Body, ApiError> {
353 361 : let mut router = endpoint::make_router();
354 361 : if auth.is_some() {
355 86 : router = router.middleware(auth_middleware(|request| {
356 86 : let state = get_state(request);
357 86 : if state.allowlist_routes.contains(request.uri()) {
358 11 : None
359 : } else {
360 75 : state.auth.as_deref()
361 : }
362 86 : }))
363 350 : }
364 :
365 361 : router
366 361 : .data(Arc::new(HttpState::new(service, auth)))
367 361 : // Non-prefixed generic endpoints (status, metrics)
368 361 : .get("/status", |r| request_span(r, handle_status))
369 361 : // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
370 603 : .post("/upcall/v1/re-attach", |r| {
371 603 : request_span(r, handle_re_attach)
372 603 : })
373 411 : .post("/upcall/v1/validate", |r| request_span(r, handle_validate))
374 361 : // Test/dev/debug endpoints
375 361 : .post("/debug/v1/attach-hook", |r| {
376 212 : request_span(r, handle_attach_hook)
377 361 : })
378 361 : .post("/debug/v1/inspect", |r| request_span(r, handle_inspect))
379 823 : .get("/control/v1/tenant/:tenant_id/locate", |r| {
380 823 : tenant_service_handler(r, handle_tenant_locate)
381 823 : })
382 361 : // Node operations
383 603 : .post("/control/v1/node", |r| {
384 603 : request_span(r, handle_node_register)
385 603 : })
386 361 : .get("/control/v1/node", |r| request_span(r, handle_node_list))
387 361 : .put("/control/v1/node/:node_id/config", |r| {
388 4 : request_span(r, handle_node_configure)
389 361 : })
390 361 : // Tenant Shard operations
391 361 : .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
392 0 : tenant_service_handler(r, handle_tenant_shard_migrate)
393 361 : })
394 361 : // Tenant operations
395 361 : // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
396 361 : // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
397 459 : .post("/v1/tenant", |r| {
398 459 : tenant_service_handler(r, handle_tenant_create)
399 459 : })
400 361 : .delete("/v1/tenant/:tenant_id", |r| {
401 6 : tenant_service_handler(r, handle_tenant_delete)
402 361 : })
403 361 : .put("/v1/tenant/:tenant_id/location_config", |r| {
404 2 : tenant_service_handler(r, handle_tenant_location_config)
405 361 : })
406 361 : // Timeline operations
407 361 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
408 1 : tenant_service_handler(r, handle_tenant_timeline_delete)
409 361 : })
410 797 : .post("/v1/tenant/:tenant_id/timeline", |r| {
411 797 : tenant_service_handler(r, handle_tenant_timeline_create)
412 797 : })
413 361 : // Tenant detail GET passthrough to shard zero
414 361 : .get("/v1/tenant/:tenant_id", |r| {
415 7 : tenant_service_handler(r, handle_tenant_timeline_passthrough)
416 361 : })
417 361 : // Timeline GET passthrough to shard zero. Note that the `*` in the URL is a wildcard: any future
418 361 : // timeline GET APIs will be implicitly included.
419 361 : .get("/v1/tenant/:tenant_id/timeline*", |r| {
420 4 : tenant_service_handler(r, handle_tenant_timeline_passthrough)
421 361 : })
422 361 : }
|