LCOV - code coverage report
Current view: top level - control_plane/attachment_service/src - http.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 90.8 % 295 268
Test Date: 2024-02-07 07:37:29 Functions: 85.6 % 97 83

            Line data    Source code
       1              : use crate::reconciler::ReconcileError;
       2              : use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
       3              : use hyper::{Body, Request, Response};
       4              : use hyper::{StatusCode, Uri};
       5              : use pageserver_api::models::{
       6              :     TenantCreateRequest, TenantLocationConfigRequest, TimelineCreateRequest,
       7              : };
       8              : use pageserver_api::shard::TenantShardId;
       9              : use pageserver_client::mgmt_api;
      10              : use std::sync::Arc;
      11              : use std::time::{Duration, Instant};
      12              : use utils::auth::SwappableJwtAuth;
      13              : use utils::http::endpoint::{auth_middleware, request_span};
      14              : use utils::http::request::parse_request_param;
      15              : use utils::id::{TenantId, TimelineId};
      16              : 
      17              : use utils::{
      18              :     http::{
      19              :         endpoint::{self},
      20              :         error::ApiError,
      21              :         json::{json_request, json_response},
      22              :         RequestExt, RouterBuilder,
      23              :     },
      24              :     id::NodeId,
      25              : };
      26              : 
      27              : use pageserver_api::control_api::{ReAttachRequest, ValidateRequest};
      28              : 
      29              : use control_plane::attachment_service::{
      30              :     AttachHookRequest, InspectRequest, NodeConfigureRequest, NodeRegisterRequest,
      31              :     TenantShardMigrateRequest,
      32              : };
      33              : 
      34              : /// State available to HTTP request handlers
      35            0 : #[derive(Clone)]
      36              : pub struct HttpState {
      37              :     service: Arc<crate::service::Service>,
      38              :     auth: Option<Arc<SwappableJwtAuth>>,
      39              :     allowlist_routes: Vec<Uri>,
      40              : }
      41              : 
      42              : impl HttpState {
      43          361 :     pub fn new(service: Arc<crate::service::Service>, auth: Option<Arc<SwappableJwtAuth>>) -> Self {
      44          361 :         let allowlist_routes = ["/status"]
      45          361 :             .iter()
      46          361 :             .map(|v| v.parse().unwrap())
      47          361 :             .collect::<Vec<_>>();
      48          361 :         Self {
      49          361 :             service,
      50          361 :             auth,
      51          361 :             allowlist_routes,
      52          361 :         }
      53          361 :     }
      54              : }
      55              : 
      56              : #[inline(always)]
      57         4092 : fn get_state(request: &Request<Body>) -> &HttpState {
      58         4092 :     request
      59         4092 :         .data::<Arc<HttpState>>()
      60         4092 :         .expect("unknown state type")
      61         4092 :         .as_ref()
      62         4092 : }
      63              : 
      64              : /// Pageserver calls into this on startup, to learn which tenants it should attach
      65          603 : async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
      66          603 :     let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
      67          603 :     let state = get_state(&req);
      68          603 :     json_response(
      69          603 :         StatusCode::OK,
      70          603 :         state
      71          603 :             .service
      72          603 :             .re_attach(reattach_req)
      73          603 :             .await
      74          603 :             .map_err(ApiError::InternalServerError)?,
      75              :     )
      76          603 : }
      77              : 
      78              : /// Pageserver calls into this before doing deletions, to confirm that it still
      79              : /// holds the latest generation for the tenants with deletions enqueued
      80          411 : async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
      81          411 :     let validate_req = json_request::<ValidateRequest>(&mut req).await?;
      82          411 :     let state = get_state(&req);
      83          411 :     json_response(StatusCode::OK, state.service.validate(validate_req))
      84          411 : }
      85              : 
      86              : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
      87              : /// (in the real control plane this is unnecessary, because the same program is managing
      88              : ///  generation numbers and doing attachments).
      89          212 : async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
      90          212 :     let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
      91          212 :     let state = get_state(&req);
      92          212 : 
      93          212 :     json_response(
      94          212 :         StatusCode::OK,
      95          212 :         state
      96          212 :             .service
      97          212 :             .attach_hook(attach_req)
      98          236 :             .await
      99          212 :             .map_err(ApiError::InternalServerError)?,
     100              :     )
     101          212 : }
     102              : 
     103           72 : async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
     104           72 :     let inspect_req = json_request::<InspectRequest>(&mut req).await?;
     105              : 
     106           72 :     let state = get_state(&req);
     107           72 : 
     108           72 :     json_response(StatusCode::OK, state.service.inspect(inspect_req))
     109           72 : }
     110              : 
     111          459 : async fn handle_tenant_create(
     112          459 :     service: Arc<Service>,
     113          459 :     mut req: Request<Body>,
     114          459 : ) -> Result<Response<Body>, ApiError> {
     115          459 :     let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
     116          927 :     json_response(StatusCode::OK, service.tenant_create(create_req).await?)
     117          459 : }
     118              : 
     119              : // For tenant and timeline deletions, which both implement an "initially return 202, then 404 once
     120              : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.  This avoids
     121              : // needing to track a "deleting" state for tenants.
     122            7 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
     123            7 : where
     124            7 :     R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
     125            7 :     F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
     126            7 : {
     127            7 :     let started_at = Instant::now();
     128            7 :     // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
     129            7 :     // completed.
     130            7 :     let mut retry_period = Duration::from_secs(1);
     131            7 :     // On subsequent retries, wait longer.
     132            7 :     let max_retry_period = Duration::from_secs(5);
     133            7 :     // Enable callers with a 30 second request timeout to reliably get a response
     134            7 :     let max_wait = Duration::from_secs(25);
     135              : 
     136              :     loop {
     137          116 :         let status = f(service.clone()).await?;
     138           14 :         match status {
     139              :             StatusCode::ACCEPTED => {
     140            7 :                 tracing::info!("Deletion accepted, waiting to try again...");
     141            7 :                 tokio::time::sleep(retry_period).await;
     142            7 :                 retry_period = max_retry_period;
     143              :             }
     144              :             StatusCode::NOT_FOUND => {
     145            7 :                 tracing::info!("Deletion complete");
     146            7 :                 return json_response(StatusCode::OK, ());
     147              :             }
     148              :             _ => {
     149            0 :                 tracing::warn!("Unexpected status {status}");
     150            0 :                 return json_response(status, ());
     151              :             }
     152              :         }
     153              : 
     154            7 :         let now = Instant::now();
     155            7 :         if now + retry_period > started_at + max_wait {
     156            0 :             tracing::info!("Deletion timed out waiting for 404");
     157              :             // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
     158              :             // the pageserver's swagger definition for this endpoint, and has the same desired
     159              :             // effect of causing the control plane to retry later.
     160            0 :             return json_response(StatusCode::CONFLICT, ());
     161            7 :         }
     162              :     }
     163            7 : }
     164              : 
     165            2 : async fn handle_tenant_location_config(
     166            2 :     service: Arc<Service>,
     167            2 :     mut req: Request<Body>,
     168            2 : ) -> Result<Response<Body>, ApiError> {
     169            2 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     170            2 :     let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
     171              :     json_response(
     172              :         StatusCode::OK,
     173            2 :         service
     174            2 :             .tenant_location_config(tenant_id, config_req)
     175            2 :             .await?,
     176              :     )
     177            2 : }
     178              : 
     179            6 : async fn handle_tenant_delete(
     180            6 :     service: Arc<Service>,
     181            6 :     req: Request<Body>,
     182            6 : ) -> Result<Response<Body>, ApiError> {
     183            6 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     184              : 
     185           12 :     deletion_wrapper(service, move |service| async move {
     186          100 :         service.tenant_delete(tenant_id).await
     187           12 :     })
     188          106 :     .await
     189            6 : }
     190              : 
     191          797 : async fn handle_tenant_timeline_create(
     192          797 :     service: Arc<Service>,
     193          797 :     mut req: Request<Body>,
     194          797 : ) -> Result<Response<Body>, ApiError> {
     195          797 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     196          797 :     let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
     197              :     json_response(
     198              :         StatusCode::OK,
     199          797 :         service
     200          797 :             .tenant_timeline_create(tenant_id, create_req)
     201         3204 :             .await?,
     202              :     )
     203          797 : }
     204              : 
     205            1 : async fn handle_tenant_timeline_delete(
     206            1 :     service: Arc<Service>,
     207            1 :     req: Request<Body>,
     208            1 : ) -> Result<Response<Body>, ApiError> {
     209            1 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     210            1 :     let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
     211              : 
     212            2 :     deletion_wrapper(service, move |service| async move {
     213           16 :         service.tenant_timeline_delete(tenant_id, timeline_id).await
     214            2 :     })
     215           17 :     .await
     216            1 : }
     217              : 
     218           11 : async fn handle_tenant_timeline_passthrough(
     219           11 :     service: Arc<Service>,
     220           11 :     req: Request<Body>,
     221           11 : ) -> Result<Response<Body>, ApiError> {
     222           11 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     223              : 
     224           11 :     let Some(path) = req.uri().path_and_query() else {
     225              :         // This should never happen, our request router only calls us if there is a path
     226            0 :         return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
     227              :     };
     228              : 
     229           11 :     tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
     230              : 
     231              :     // Find the node that holds shard zero
     232           11 :     let (base_url, tenant_shard_id) = service.tenant_shard0_baseurl(tenant_id)?;
     233              : 
     234              :     // Callers will always pass an unsharded tenant ID.  Before proxying, we must
     235              :     // rewrite this to a shard-aware shard zero ID.
     236            5 :     let path = format!("{}", path);
     237            5 :     let tenant_str = tenant_id.to_string();
     238            5 :     let tenant_shard_str = format!("{}", tenant_shard_id);
     239            5 :     let path = path.replace(&tenant_str, &tenant_shard_str);
     240            5 : 
     241            5 :     let client = mgmt_api::Client::new(base_url, service.get_config().jwt_token.as_deref());
     242           19 :     let resp = client.get_raw(path).await.map_err(|_e|
     243              :         // FIXME: give APiError a proper Unavailable variant.  We return 503 here because
     244              :         // if we can't successfully send a request to the pageserver, we aren't available.
     245            5 :         ApiError::ShuttingDown)?;
     246              : 
     247              :     // We have a reqest::Response, would like a http::Response
     248            5 :     let mut builder = hyper::Response::builder()
     249            5 :         .status(resp.status())
     250            5 :         .version(resp.version());
     251           25 :     for (k, v) in resp.headers() {
     252           25 :         builder = builder.header(k, v);
     253           25 :     }
     254              : 
     255            5 :     let response = builder
     256            5 :         .body(Body::wrap_stream(resp.bytes_stream()))
     257            5 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     258              : 
     259            5 :     Ok(response)
     260           11 : }
     261              : 
     262          823 : async fn handle_tenant_locate(
     263          823 :     service: Arc<Service>,
     264          823 :     req: Request<Body>,
     265          823 : ) -> Result<Response<Body>, ApiError> {
     266          823 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     267          823 :     json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
     268          823 : }
     269              : 
     270          603 : async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
     271          603 :     let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
     272          603 :     let state = get_state(&req);
     273          603 :     state.service.node_register(register_req).await?;
     274          603 :     json_response(StatusCode::OK, ())
     275          603 : }
     276              : 
     277            2 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
     278            2 :     let state = get_state(&req);
     279            2 :     json_response(StatusCode::OK, state.service.node_list().await?)
     280            2 : }
     281              : 
     282            4 : async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
     283            4 :     let node_id: NodeId = parse_request_param(&req, "node_id")?;
     284            4 :     let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
     285            4 :     if node_id != config_req.node_id {
     286            0 :         return Err(ApiError::BadRequest(anyhow::anyhow!(
     287            0 :             "Path and body node_id differ"
     288            0 :         )));
     289            4 :     }
     290            4 :     let state = get_state(&req);
     291            4 : 
     292            4 :     json_response(StatusCode::OK, state.service.node_configure(config_req)?)
     293            4 : }
     294              : 
     295            0 : async fn handle_tenant_shard_migrate(
     296            0 :     service: Arc<Service>,
     297            0 :     mut req: Request<Body>,
     298            0 : ) -> Result<Response<Body>, ApiError> {
     299            0 :     let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
     300            0 :     let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
     301              :     json_response(
     302              :         StatusCode::OK,
     303            0 :         service
     304            0 :             .tenant_shard_migrate(tenant_shard_id, migrate_req)
     305            0 :             .await?,
     306              :     )
     307            0 : }
     308              : 
     309              : /// Status endpoint is just used for checking that our HTTP listener is up
     310          361 : async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
     311          361 :     json_response(StatusCode::OK, ())
     312          361 : }
     313              : 
     314              : impl From<ReconcileError> for ApiError {
     315            0 :     fn from(value: ReconcileError) -> Self {
     316            0 :         ApiError::Conflict(format!("Reconciliation error: {}", value))
     317            0 :     }
     318              : }
     319              : 
     320              : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
     321              : /// be allowed to run if Service has finished its initial reconciliation.
     322         2099 : async fn tenant_service_handler<R, H>(request: Request<Body>, handler: H) -> R::Output
     323         2099 : where
     324         2099 :     R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
     325         2099 :     H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
     326         2099 : {
     327         2099 :     let state = get_state(&request);
     328         2099 :     let service = state.service.clone();
     329         2099 : 
     330         2099 :     let startup_complete = service.startup_complete.clone();
     331         2099 :     if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
     332            0 :         .await
     333         2099 :         .is_err()
     334              :     {
     335              :         // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
     336              :         // timeouts around its remote calls, to bound its runtime.
     337            0 :         return Err(ApiError::Timeout(
     338            0 :             "Timed out waiting for service readiness".into(),
     339            0 :         ));
     340         2099 :     }
     341         2099 : 
     342         2099 :     request_span(
     343         2099 :         request,
     344         4275 :         |request| async move { handler(service, request).await },
     345         2099 :     )
     346         4275 :     .await
     347         2099 : }
     348              : 
     349          361 : pub fn make_router(
     350          361 :     service: Arc<Service>,
     351          361 :     auth: Option<Arc<SwappableJwtAuth>>,
     352          361 : ) -> RouterBuilder<hyper::Body, ApiError> {
     353          361 :     let mut router = endpoint::make_router();
     354          361 :     if auth.is_some() {
     355           86 :         router = router.middleware(auth_middleware(|request| {
     356           86 :             let state = get_state(request);
     357           86 :             if state.allowlist_routes.contains(request.uri()) {
     358           11 :                 None
     359              :             } else {
     360           75 :                 state.auth.as_deref()
     361              :             }
     362           86 :         }))
     363          350 :     }
     364              : 
     365          361 :     router
     366          361 :         .data(Arc::new(HttpState::new(service, auth)))
     367          361 :         // Non-prefixed generic endpoints (status, metrics)
     368          361 :         .get("/status", |r| request_span(r, handle_status))
     369          361 :         // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
     370          603 :         .post("/upcall/v1/re-attach", |r| {
     371          603 :             request_span(r, handle_re_attach)
     372          603 :         })
     373          411 :         .post("/upcall/v1/validate", |r| request_span(r, handle_validate))
     374          361 :         // Test/dev/debug endpoints
     375          361 :         .post("/debug/v1/attach-hook", |r| {
     376          212 :             request_span(r, handle_attach_hook)
     377          361 :         })
     378          361 :         .post("/debug/v1/inspect", |r| request_span(r, handle_inspect))
     379          823 :         .get("/control/v1/tenant/:tenant_id/locate", |r| {
     380          823 :             tenant_service_handler(r, handle_tenant_locate)
     381          823 :         })
     382          361 :         // Node operations
     383          603 :         .post("/control/v1/node", |r| {
     384          603 :             request_span(r, handle_node_register)
     385          603 :         })
     386          361 :         .get("/control/v1/node", |r| request_span(r, handle_node_list))
     387          361 :         .put("/control/v1/node/:node_id/config", |r| {
     388            4 :             request_span(r, handle_node_configure)
     389          361 :         })
     390          361 :         // Tenant Shard operations
     391          361 :         .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
     392            0 :             tenant_service_handler(r, handle_tenant_shard_migrate)
     393          361 :         })
     394          361 :         // Tenant operations
     395          361 :         // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
     396          361 :         // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
     397          459 :         .post("/v1/tenant", |r| {
     398          459 :             tenant_service_handler(r, handle_tenant_create)
     399          459 :         })
     400          361 :         .delete("/v1/tenant/:tenant_id", |r| {
     401            6 :             tenant_service_handler(r, handle_tenant_delete)
     402          361 :         })
     403          361 :         .put("/v1/tenant/:tenant_id/location_config", |r| {
     404            2 :             tenant_service_handler(r, handle_tenant_location_config)
     405          361 :         })
     406          361 :         // Timeline operations
     407          361 :         .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     408            1 :             tenant_service_handler(r, handle_tenant_timeline_delete)
     409          361 :         })
     410          797 :         .post("/v1/tenant/:tenant_id/timeline", |r| {
     411          797 :             tenant_service_handler(r, handle_tenant_timeline_create)
     412          797 :         })
     413          361 :         // Tenant detail GET passthrough to shard zero
     414          361 :         .get("/v1/tenant/:tenant_id", |r| {
     415            7 :             tenant_service_handler(r, handle_tenant_timeline_passthrough)
     416          361 :         })
     417          361 :         // Timeline GET passthrough to shard zero.  Note that the `*` in the URL is a wildcard: any future
     418          361 :         // timeline GET APIs will be implicitly included.
     419          361 :         .get("/v1/tenant/:tenant_id/timeline*", |r| {
     420            4 :             tenant_service_handler(r, handle_tenant_timeline_passthrough)
     421          361 :         })
     422          361 : }
        

Generated by: LCOV version 2.1-beta