LCOV - code coverage report
Current view: top level - control_plane/attachment_service/src - http.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 95.2 % 330 314
Test Date: 2024-02-14 18:05:35 Functions: 93.5 % 123 115

            Line data    Source code
       1              : use crate::reconciler::ReconcileError;
       2              : use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
       3              : use hyper::{Body, Request, Response};
       4              : use hyper::{StatusCode, Uri};
       5              : use pageserver_api::models::{
       6              :     TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
       7              :     TimelineCreateRequest,
       8              : };
       9              : use pageserver_api::shard::TenantShardId;
      10              : use pageserver_client::mgmt_api;
      11              : use std::sync::Arc;
      12              : use std::time::{Duration, Instant};
      13              : use utils::auth::SwappableJwtAuth;
      14              : use utils::http::endpoint::{auth_middleware, request_span};
      15              : use utils::http::request::parse_request_param;
      16              : use utils::id::{TenantId, TimelineId};
      17              : 
      18              : use utils::{
      19              :     http::{
      20              :         endpoint::{self},
      21              :         error::ApiError,
      22              :         json::{json_request, json_response},
      23              :         RequestExt, RouterBuilder,
      24              :     },
      25              :     id::NodeId,
      26              : };
      27              : 
      28              : use pageserver_api::control_api::{ReAttachRequest, ValidateRequest};
      29              : 
      30              : use control_plane::attachment_service::{
      31              :     AttachHookRequest, InspectRequest, NodeConfigureRequest, NodeRegisterRequest,
      32              :     TenantShardMigrateRequest,
      33              : };
      34              : 
      35              : /// State available to HTTP request handlers
      36            0 : #[derive(Clone)]
      37              : pub struct HttpState {
      38              :     service: Arc<crate::service::Service>,
      39              :     auth: Option<Arc<SwappableJwtAuth>>,
      40              :     allowlist_routes: Vec<Uri>,
      41              : }
      42              : 
      43              : impl HttpState {
      44          366 :     pub fn new(service: Arc<crate::service::Service>, auth: Option<Arc<SwappableJwtAuth>>) -> Self {
      45          366 :         let allowlist_routes = ["/status", "/ready", "/metrics"]
      46          366 :             .iter()
      47         1098 :             .map(|v| v.parse().unwrap())
      48          366 :             .collect::<Vec<_>>();
      49          366 :         Self {
      50          366 :             service,
      51          366 :             auth,
      52          366 :             allowlist_routes,
      53          366 :         }
      54          366 :     }
      55              : }
      56              : 
      57              : #[inline(always)]
      58         4203 : fn get_state(request: &Request<Body>) -> &HttpState {
      59         4203 :     request
      60         4203 :         .data::<Arc<HttpState>>()
      61         4203 :         .expect("unknown state type")
      62         4203 :         .as_ref()
      63         4203 : }
      64              : 
      65              : /// Pageserver calls into this on startup, to learn which tenants it should attach
      66          624 : async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
      67          624 :     let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
      68          624 :     let state = get_state(&req);
      69          624 :     json_response(
      70          624 :         StatusCode::OK,
      71          624 :         state
      72          624 :             .service
      73          624 :             .re_attach(reattach_req)
      74          624 :             .await
      75          624 :             .map_err(ApiError::InternalServerError)?,
      76              :     )
      77          624 : }
      78              : 
      79              : /// Pageserver calls into this before doing deletions, to confirm that it still
      80              : /// holds the latest generation for the tenants with deletions enqueued
      81          441 : async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
      82          441 :     let validate_req = json_request::<ValidateRequest>(&mut req).await?;
      83          441 :     let state = get_state(&req);
      84          441 :     json_response(StatusCode::OK, state.service.validate(validate_req))
      85          441 : }
      86              : 
      87              : /// Call into this before attaching a tenant to a pageserver, to acquire a generation number
      88              : /// (in the real control plane this is unnecessary, because the same program is managing
      89              : ///  generation numbers and doing attachments).
      90          207 : async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
      91          207 :     let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
      92          207 :     let state = get_state(&req);
      93          207 : 
      94          207 :     json_response(
      95          207 :         StatusCode::OK,
      96          207 :         state
      97          207 :             .service
      98          207 :             .attach_hook(attach_req)
      99          239 :             .await
     100          207 :             .map_err(ApiError::InternalServerError)?,
     101              :     )
     102          207 : }
     103              : 
     104           73 : async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
     105           73 :     let inspect_req = json_request::<InspectRequest>(&mut req).await?;
     106              : 
     107           73 :     let state = get_state(&req);
     108           73 : 
     109           73 :     json_response(StatusCode::OK, state.service.inspect(inspect_req))
     110           73 : }
     111              : 
     112          463 : async fn handle_tenant_create(
     113          463 :     service: Arc<Service>,
     114          463 :     mut req: Request<Body>,
     115          463 : ) -> Result<Response<Body>, ApiError> {
     116          463 :     let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
     117          935 :     json_response(StatusCode::OK, service.tenant_create(create_req).await?)
     118          463 : }
     119              : 
     120              : // For tenant and timeline deletions, which both implement an "initially return 202, then 404 once
     121              : // we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.  This avoids
     122              : // needing to track a "deleting" state for tenants.
     123            7 : async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
     124            7 : where
     125            7 :     R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
     126            7 :     F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
     127            7 : {
     128            7 :     let started_at = Instant::now();
     129            7 :     // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
     130            7 :     // completed.
     131            7 :     let mut retry_period = Duration::from_secs(1);
     132            7 :     // On subsequent retries, wait longer.
     133            7 :     let max_retry_period = Duration::from_secs(5);
     134            7 :     // Enable callers with a 30 second request timeout to reliably get a response
     135            7 :     let max_wait = Duration::from_secs(25);
     136              : 
     137              :     loop {
     138          118 :         let status = f(service.clone()).await?;
     139           14 :         match status {
     140              :             StatusCode::ACCEPTED => {
     141            7 :                 tracing::info!("Deletion accepted, waiting to try again...");
     142            7 :                 tokio::time::sleep(retry_period).await;
     143            7 :                 retry_period = max_retry_period;
     144              :             }
     145              :             StatusCode::NOT_FOUND => {
     146            7 :                 tracing::info!("Deletion complete");
     147            7 :                 return json_response(StatusCode::OK, ());
     148              :             }
     149              :             _ => {
     150            0 :                 tracing::warn!("Unexpected status {status}");
     151            0 :                 return json_response(status, ());
     152              :             }
     153              :         }
     154              : 
     155            7 :         let now = Instant::now();
     156            7 :         if now + retry_period > started_at + max_wait {
     157            0 :             tracing::info!("Deletion timed out waiting for 404");
     158              :             // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
     159              :             // the pageserver's swagger definition for this endpoint, and has the same desired
     160              :             // effect of causing the control plane to retry later.
     161            0 :             return json_response(StatusCode::CONFLICT, ());
     162            7 :         }
     163              :     }
     164            7 : }
     165              : 
     166            2 : async fn handle_tenant_location_config(
     167            2 :     service: Arc<Service>,
     168            2 :     mut req: Request<Body>,
     169            2 : ) -> Result<Response<Body>, ApiError> {
     170            2 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     171            2 :     let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
     172              :     json_response(
     173              :         StatusCode::OK,
     174            2 :         service
     175            2 :             .tenant_location_config(tenant_id, config_req)
     176            2 :             .await?,
     177              :     )
     178            2 : }
     179              : 
     180            6 : async fn handle_tenant_delete(
     181            6 :     service: Arc<Service>,
     182            6 :     req: Request<Body>,
     183            6 : ) -> Result<Response<Body>, ApiError> {
     184            6 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     185              : 
     186           12 :     deletion_wrapper(service, move |service| async move {
     187          102 :         service.tenant_delete(tenant_id).await
     188           24 :     })
     189          108 :     .await
     190            6 : }
     191              : 
     192          797 : async fn handle_tenant_timeline_create(
     193          797 :     service: Arc<Service>,
     194          797 :     mut req: Request<Body>,
     195          797 : ) -> Result<Response<Body>, ApiError> {
     196          797 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     197          797 :     let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
     198              :     json_response(
     199              :         StatusCode::OK,
     200          797 :         service
     201          797 :             .tenant_timeline_create(tenant_id, create_req)
     202         3275 :             .await?,
     203              :     )
     204          797 : }
     205              : 
     206            1 : async fn handle_tenant_timeline_delete(
     207            1 :     service: Arc<Service>,
     208            1 :     req: Request<Body>,
     209            1 : ) -> Result<Response<Body>, ApiError> {
     210            1 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     211            1 :     let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
     212              : 
     213            2 :     deletion_wrapper(service, move |service| async move {
     214           16 :         service.tenant_timeline_delete(tenant_id, timeline_id).await
     215            4 :     })
     216           17 :     .await
     217            1 : }
     218              : 
     219           11 : async fn handle_tenant_timeline_passthrough(
     220           11 :     service: Arc<Service>,
     221           11 :     req: Request<Body>,
     222           11 : ) -> Result<Response<Body>, ApiError> {
     223           11 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     224              : 
     225           11 :     let Some(path) = req.uri().path_and_query() else {
     226              :         // This should never happen, our request router only calls us if there is a path
     227            0 :         return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
     228              :     };
     229              : 
     230           11 :     tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
     231              : 
     232              :     // Find the node that holds shard zero
     233           11 :     let (base_url, tenant_shard_id) = service.tenant_shard0_baseurl(tenant_id)?;
     234              : 
     235              :     // Callers will always pass an unsharded tenant ID.  Before proxying, we must
     236              :     // rewrite this to a shard-aware shard zero ID.
     237            5 :     let path = format!("{}", path);
     238            5 :     let tenant_str = tenant_id.to_string();
     239            5 :     let tenant_shard_str = format!("{}", tenant_shard_id);
     240            5 :     let path = path.replace(&tenant_str, &tenant_shard_str);
     241            5 : 
     242            5 :     let client = mgmt_api::Client::new(base_url, service.get_config().jwt_token.as_deref());
     243           20 :     let resp = client.get_raw(path).await.map_err(|_e|
     244              :         // FIXME: give APiError a proper Unavailable variant.  We return 503 here because
     245              :         // if we can't successfully send a request to the pageserver, we aren't available.
     246            5 :         ApiError::ShuttingDown)?;
     247              : 
     248              :     // We have a reqest::Response, would like a http::Response
     249            5 :     let mut builder = hyper::Response::builder()
     250            5 :         .status(resp.status())
     251            5 :         .version(resp.version());
     252           25 :     for (k, v) in resp.headers() {
     253           25 :         builder = builder.header(k, v);
     254           25 :     }
     255              : 
     256            5 :     let response = builder
     257            5 :         .body(Body::wrap_stream(resp.bytes_stream()))
     258            5 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     259              : 
     260            5 :     Ok(response)
     261           11 : }
     262              : 
     263          841 : async fn handle_tenant_locate(
     264          841 :     service: Arc<Service>,
     265          841 :     req: Request<Body>,
     266          841 : ) -> Result<Response<Body>, ApiError> {
     267          841 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     268          841 :     json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
     269          841 : }
     270              : 
     271          624 : async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
     272          624 :     let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
     273          624 :     let state = get_state(&req);
     274          624 :     state.service.node_register(register_req).await?;
     275          624 :     json_response(StatusCode::OK, ())
     276          624 : }
     277              : 
     278            5 : async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
     279            5 :     let state = get_state(&req);
     280            5 :     json_response(StatusCode::OK, state.service.node_list().await?)
     281            5 : }
     282              : 
     283            1 : async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
     284            1 :     let state = get_state(&req);
     285            1 :     let node_id: NodeId = parse_request_param(&req, "node_id")?;
     286            1 :     json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
     287            1 : }
     288              : 
     289            4 : async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
     290            4 :     let node_id: NodeId = parse_request_param(&req, "node_id")?;
     291            4 :     let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
     292            4 :     if node_id != config_req.node_id {
     293            0 :         return Err(ApiError::BadRequest(anyhow::anyhow!(
     294            0 :             "Path and body node_id differ"
     295            0 :         )));
     296            4 :     }
     297            4 :     let state = get_state(&req);
     298            4 : 
     299            4 :     json_response(StatusCode::OK, state.service.node_configure(config_req)?)
     300            4 : }
     301              : 
     302            2 : async fn handle_tenant_shard_split(
     303            2 :     service: Arc<Service>,
     304            2 :     mut req: Request<Body>,
     305            2 : ) -> Result<Response<Body>, ApiError> {
     306            2 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     307            2 :     let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
     308              : 
     309              :     json_response(
     310              :         StatusCode::OK,
     311           30 :         service.tenant_shard_split(tenant_id, split_req).await?,
     312              :     )
     313            2 : }
     314              : 
     315            4 : async fn handle_tenant_shard_migrate(
     316            4 :     service: Arc<Service>,
     317            4 :     mut req: Request<Body>,
     318            4 : ) -> Result<Response<Body>, ApiError> {
     319            4 :     let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
     320            4 :     let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
     321              :     json_response(
     322              :         StatusCode::OK,
     323            4 :         service
     324            4 :             .tenant_shard_migrate(tenant_shard_id, migrate_req)
     325            4 :             .await?,
     326              :     )
     327            4 : }
     328              : 
     329            1 : async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
     330            1 :     let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
     331            1 :     let state = get_state(&req);
     332            1 : 
     333            1 :     json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
     334            1 : }
     335              : 
     336              : /// Status endpoint is just used for checking that our HTTP listener is up
     337          366 : async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
     338          366 :     json_response(StatusCode::OK, ())
     339          366 : }
     340              : 
     341              : /// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
     342              : /// with remote pageserver nodes).  This is intended for use as a kubernetes readiness probe.
     343            8 : async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
     344            8 :     let state = get_state(&req);
     345            8 :     if state.service.startup_complete.is_ready() {
     346            1 :         json_response(StatusCode::OK, ())
     347              :     } else {
     348            7 :         json_response(StatusCode::SERVICE_UNAVAILABLE, ())
     349              :     }
     350            8 : }
     351              : 
     352              : impl From<ReconcileError> for ApiError {
     353            0 :     fn from(value: ReconcileError) -> Self {
     354            0 :         ApiError::Conflict(format!("Reconciliation error: {}", value))
     355            0 :     }
     356              : }
     357              : 
     358              : /// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
     359              : /// be allowed to run if Service has finished its initial reconciliation.
     360         2127 : async fn tenant_service_handler<R, H>(request: Request<Body>, handler: H) -> R::Output
     361         2127 : where
     362         2127 :     R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
     363         2127 :     H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
     364         2127 : {
     365         2127 :     let state = get_state(&request);
     366         2127 :     let service = state.service.clone();
     367         2127 : 
     368         2127 :     let startup_complete = service.startup_complete.clone();
     369         2127 :     if tokio::time::timeout(STARTUP_RECONCILE_TIMEOUT, startup_complete.wait())
     370            0 :         .await
     371         2127 :         .is_err()
     372              :     {
     373              :         // This shouldn't happen: it is the responsibilty of [`Service::startup_reconcile`] to use appropriate
     374              :         // timeouts around its remote calls, to bound its runtime.
     375            0 :         return Err(ApiError::Timeout(
     376            0 :             "Timed out waiting for service readiness".into(),
     377            0 :         ));
     378         2127 :     }
     379         2127 : 
     380         2127 :     request_span(
     381         2127 :         request,
     382         4391 :         |request| async move { handler(service, request).await },
     383         2127 :     )
     384         4391 :     .await
     385         2127 : }
     386              : 
     387          366 : pub fn make_router(
     388          366 :     service: Arc<Service>,
     389          366 :     auth: Option<Arc<SwappableJwtAuth>>,
     390          366 : ) -> RouterBuilder<hyper::Body, ApiError> {
     391          366 :     let mut router = endpoint::make_router();
     392          366 :     if auth.is_some() {
     393           88 :         router = router.middleware(auth_middleware(|request| {
     394           88 :             let state = get_state(request);
     395           88 :             if state.allowlist_routes.contains(request.uri()) {
     396           11 :                 None
     397              :             } else {
     398           77 :                 state.auth.as_deref()
     399              :             }
     400           88 :         }))
     401          355 :     }
     402              : 
     403          366 :     router
     404          366 :         .data(Arc::new(HttpState::new(service, auth)))
     405          366 :         // Non-prefixed generic endpoints (status, metrics)
     406          366 :         .get("/status", |r| request_span(r, handle_status))
     407          366 :         .get("/ready", |r| request_span(r, handle_ready))
     408          366 :         // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
     409          624 :         .post("/upcall/v1/re-attach", |r| {
     410          624 :             request_span(r, handle_re_attach)
     411          624 :         })
     412          441 :         .post("/upcall/v1/validate", |r| request_span(r, handle_validate))
     413          366 :         // Test/dev/debug endpoints
     414          366 :         .post("/debug/v1/attach-hook", |r| {
     415          207 :             request_span(r, handle_attach_hook)
     416          366 :         })
     417          366 :         .post("/debug/v1/inspect", |r| request_span(r, handle_inspect))
     418          366 :         .post("/debug/v1/tenant/:tenant_id/drop", |r| {
     419            1 :             request_span(r, handle_tenant_drop)
     420          366 :         })
     421          366 :         .post("/debug/v1/node/:node_id/drop", |r| {
     422            1 :             request_span(r, handle_node_drop)
     423          366 :         })
     424          841 :         .get("/control/v1/tenant/:tenant_id/locate", |r| {
     425          841 :             tenant_service_handler(r, handle_tenant_locate)
     426          841 :         })
     427          366 :         // Node operations
     428          624 :         .post("/control/v1/node", |r| {
     429          624 :             request_span(r, handle_node_register)
     430          624 :         })
     431          366 :         .get("/control/v1/node", |r| request_span(r, handle_node_list))
     432          366 :         .put("/control/v1/node/:node_id/config", |r| {
     433            4 :             request_span(r, handle_node_configure)
     434          366 :         })
     435          366 :         // Tenant Shard operations
     436          366 :         .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
     437            4 :             tenant_service_handler(r, handle_tenant_shard_migrate)
     438          366 :         })
     439          366 :         .put("/control/v1/tenant/:tenant_id/shard_split", |r| {
     440            2 :             tenant_service_handler(r, handle_tenant_shard_split)
     441          366 :         })
     442          366 :         // Tenant operations
     443          366 :         // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
     444          366 :         // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
     445          463 :         .post("/v1/tenant", |r| {
     446          463 :             tenant_service_handler(r, handle_tenant_create)
     447          463 :         })
     448          366 :         .delete("/v1/tenant/:tenant_id", |r| {
     449            6 :             tenant_service_handler(r, handle_tenant_delete)
     450          366 :         })
     451          366 :         .put("/v1/tenant/:tenant_id/location_config", |r| {
     452            2 :             tenant_service_handler(r, handle_tenant_location_config)
     453          366 :         })
     454          366 :         // Timeline operations
     455          366 :         .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     456            1 :             tenant_service_handler(r, handle_tenant_timeline_delete)
     457          366 :         })
     458          797 :         .post("/v1/tenant/:tenant_id/timeline", |r| {
     459          797 :             tenant_service_handler(r, handle_tenant_timeline_create)
     460          797 :         })
     461          366 :         // Tenant detail GET passthrough to shard zero
     462          366 :         .get("/v1/tenant/:tenant_id", |r| {
     463            7 :             tenant_service_handler(r, handle_tenant_timeline_passthrough)
     464          366 :         })
     465          366 :         // Timeline GET passthrough to shard zero.  Note that the `*` in the URL is a wildcard: any future
     466          366 :         // timeline GET APIs will be implicitly included.
     467          366 :         .get("/v1/tenant/:tenant_id/timeline*", |r| {
     468            4 :             tenant_service_handler(r, handle_tenant_timeline_passthrough)
     469          366 :         })
     470          366 : }
        

Generated by: LCOV version 2.1-beta