LCOV - code coverage report
Current view: top level - pageserver/src/http - routes.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 91.0 % 1071 975
Test Date: 2023-09-06 10:18:01 Functions: 72.5 % 378 274

            Line data    Source code
       1              : //!
       2              : //! Management HTTP API
       3              : //!
       4              : use std::collections::HashMap;
       5              : use std::sync::Arc;
       6              : 
       7              : use anyhow::{anyhow, Context, Result};
       8              : use hyper::StatusCode;
       9              : use hyper::{Body, Request, Response, Uri};
      10              : use metrics::launch_timestamp::LaunchTimestamp;
      11              : use pageserver_api::models::{DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest};
      12              : use remote_storage::GenericRemoteStorage;
      13              : use storage_broker::BrokerClientChannel;
      14              : use tenant_size_model::{SizeResult, StorageModel};
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::*;
      17              : use utils::http::endpoint::request_span;
      18              : use utils::http::json::json_request_or_empty_body;
      19              : use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
      20              : 
      21              : use super::models::{
      22              :     StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
      23              :     TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
      24              : };
      25              : use crate::context::{DownloadBehavior, RequestContext};
      26              : use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
      27              : use crate::pgdatadir_mapping::LsnForTimestamp;
      28              : use crate::task_mgr::TaskKind;
      29              : use crate::tenant::config::TenantConfOpt;
      30              : use crate::tenant::mgr::{
      31              :     GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
      32              : };
      33              : use crate::tenant::size::ModelInputs;
      34              : use crate::tenant::storage_layer::LayerAccessStatsReset;
      35              : use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
      36              : use crate::{config::PageServerConf, tenant::mgr};
      37              : use crate::{disk_usage_eviction_task, tenant};
      38              : use utils::{
      39              :     auth::JwtAuth,
      40              :     http::{
      41              :         endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
      42              :         error::{ApiError, HttpErrorBody},
      43              :         json::{json_request, json_response},
      44              :         request::parse_request_param,
      45              :         RequestExt, RouterBuilder,
      46              :     },
      47              :     id::{TenantId, TimelineId},
      48              :     lsn::Lsn,
      49              : };
      50              : 
      51              : // Imports only used for testing APIs
      52              : use super::models::ConfigureFailpointsRequest;
      53              : 
      54              : struct State {
      55              :     conf: &'static PageServerConf,
      56              :     auth: Option<Arc<JwtAuth>>,
      57              :     allowlist_routes: Vec<Uri>,
      58              :     remote_storage: Option<GenericRemoteStorage>,
      59              :     broker_client: storage_broker::BrokerClientChannel,
      60              :     disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
      61              : }
      62              : 
      63              : impl State {
      64          575 :     fn new(
      65          575 :         conf: &'static PageServerConf,
      66          575 :         auth: Option<Arc<JwtAuth>>,
      67          575 :         remote_storage: Option<GenericRemoteStorage>,
      68          575 :         broker_client: storage_broker::BrokerClientChannel,
      69          575 :         disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
      70          575 :     ) -> anyhow::Result<Self> {
      71          575 :         let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
      72          575 :             .iter()
      73         1725 :             .map(|v| v.parse().unwrap())
      74          575 :             .collect::<Vec<_>>();
      75          575 :         Ok(Self {
      76          575 :             conf,
      77          575 :             auth,
      78          575 :             allowlist_routes,
      79          575 :             remote_storage,
      80          575 :             broker_client,
      81          575 :             disk_usage_eviction_state,
      82          575 :         })
      83          575 :     }
      84              : }
      85              : 
      86              : #[inline(always)]
      87         2292 : fn get_state(request: &Request<Body>) -> &State {
      88         2292 :     request
      89         2292 :         .data::<Arc<State>>()
      90         2292 :         .expect("unknown state type")
      91         2292 :         .as_ref()
      92         2292 : }
      93              : 
      94              : #[inline(always)]
      95          579 : fn get_config(request: &Request<Body>) -> &'static PageServerConf {
      96          579 :     get_state(request).conf
      97          579 : }
      98              : 
      99              : /// Check that the requester is authorized to operate on given tenant
     100         6019 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
     101         6019 :     check_permission_with(request, |claims| {
     102           50 :         crate::auth::check_permission(claims, tenant_id)
     103         6019 :     })
     104         6019 : }
     105              : 
     106              : impl From<PageReconstructError> for ApiError {
     107            0 :     fn from(pre: PageReconstructError) -> ApiError {
     108            0 :         match pre {
     109            0 :             PageReconstructError::Other(pre) => ApiError::InternalServerError(pre),
     110              :             PageReconstructError::NeedsDownload(_, _) => {
     111              :                 // This shouldn't happen, because we use a RequestContext that requests to
     112              :                 // download any missing layer files on-demand.
     113            0 :                 ApiError::InternalServerError(anyhow::anyhow!("need to download remote layer file"))
     114              :             }
     115              :             PageReconstructError::Cancelled => {
     116            0 :                 ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
     117              :             }
     118              :             PageReconstructError::AncestorStopping(_) => {
     119            0 :                 ApiError::InternalServerError(anyhow::Error::new(pre))
     120              :             }
     121            0 :             PageReconstructError::WalRedo(pre) => {
     122            0 :                 ApiError::InternalServerError(anyhow::Error::new(pre))
     123              :             }
     124              :         }
     125            0 :     }
     126              : }
     127              : 
     128              : impl From<TenantMapInsertError> for ApiError {
     129           11 :     fn from(tmie: TenantMapInsertError) -> ApiError {
     130           11 :         match tmie {
     131              :             TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
     132            0 :                 ApiError::InternalServerError(anyhow::Error::new(tmie))
     133              :             }
     134            8 :             TenantMapInsertError::TenantAlreadyExists(id, state) => {
     135            8 :                 ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
     136              :             }
     137            3 :             TenantMapInsertError::Closure(e) => ApiError::InternalServerError(e),
     138              :         }
     139           11 :     }
     140              : }
     141              : 
     142              : impl From<TenantStateError> for ApiError {
     143            3 :     fn from(tse: TenantStateError) -> ApiError {
     144            3 :         match tse {
     145            3 :             TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
     146            0 :             _ => ApiError::InternalServerError(anyhow::Error::new(tse)),
     147              :         }
     148            3 :     }
     149              : }
     150              : 
     151              : impl From<GetTenantError> for ApiError {
     152           94 :     fn from(tse: GetTenantError) -> ApiError {
     153           94 :         match tse {
     154           91 :             GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
     155            3 :             e @ GetTenantError::NotActive(_) => {
     156            3 :                 // Why is this not `ApiError::NotFound`?
     157            3 :                 // Because we must be careful to never return 404 for a tenant if it does
     158            3 :                 // in fact exist locally. If we did, the caller could draw the conclusion
     159            3 :                 // that it can attach the tenant to another PS and we'd be in split-brain.
     160            3 :                 //
     161            3 :                 // (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
     162            3 :                 ApiError::InternalServerError(anyhow::Error::new(e))
     163              :             }
     164              :         }
     165           94 :     }
     166              : }
     167              : 
     168              : impl From<SetNewTenantConfigError> for ApiError {
     169            0 :     fn from(e: SetNewTenantConfigError) -> ApiError {
     170            0 :         match e {
     171            0 :             SetNewTenantConfigError::GetTenant(tid) => {
     172            0 :                 ApiError::NotFound(anyhow!("tenant {}", tid).into())
     173              :             }
     174            0 :             e @ SetNewTenantConfigError::Persist(_) => {
     175            0 :                 ApiError::InternalServerError(anyhow::Error::new(e))
     176              :             }
     177              :         }
     178            0 :     }
     179              : }
     180              : 
     181              : impl From<crate::tenant::DeleteTimelineError> for ApiError {
     182           17 :     fn from(value: crate::tenant::DeleteTimelineError) -> Self {
     183           17 :         use crate::tenant::DeleteTimelineError::*;
     184           17 :         match value {
     185            1 :             NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
     186            1 :             HasChildren(children) => ApiError::PreconditionFailed(
     187            1 :                 format!("Cannot delete timeline which has child timelines: {children:?}")
     188            1 :                     .into_boxed_str(),
     189            1 :             ),
     190            3 :             a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
     191           12 :             Other(e) => ApiError::InternalServerError(e),
     192              :         }
     193           17 :     }
     194              : }
     195              : 
     196              : impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
     197              :     fn from(value: crate::tenant::mgr::DeleteTimelineError) -> Self {
     198              :         use crate::tenant::mgr::DeleteTimelineError::*;
     199            1 :         match value {
     200              :             // Report Precondition failed so client can distinguish between
     201              :             // "tenant is missing" case from "timeline is missing"
     202            1 :             Tenant(GetTenantError::NotFound(..)) => ApiError::PreconditionFailed(
     203            1 :                 "Requested tenant is missing".to_owned().into_boxed_str(),
     204            1 :             ),
     205            0 :             Tenant(t) => ApiError::from(t),
     206           17 :             Timeline(t) => ApiError::from(t),
     207              :         }
     208           18 :     }
     209              : }
     210              : 
     211              : impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
     212           28 :     fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
     213           28 :         use crate::tenant::delete::DeleteTenantError::*;
     214           28 :         match value {
     215            4 :             Get(g) => ApiError::from(g),
     216            0 :             e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
     217            0 :             Timeline(t) => ApiError::from(t),
     218           24 :             Other(o) => ApiError::InternalServerError(o),
     219            0 :             e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
     220              :         }
     221           28 :     }
     222              : }
     223              : 
     224              : // Helper function to construct a TimelineInfo struct for a timeline
     225         1443 : async fn build_timeline_info(
     226         1443 :     timeline: &Arc<Timeline>,
     227         1443 :     include_non_incremental_logical_size: bool,
     228         1443 :     ctx: &RequestContext,
     229         1443 : ) -> anyhow::Result<TimelineInfo> {
     230         1443 :     crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
     231              : 
     232         1443 :     let mut info = build_timeline_info_common(timeline, ctx).await?;
     233         1443 :     if include_non_incremental_logical_size {
     234              :         // XXX we should be using spawn_ondemand_logical_size_calculation here.
     235              :         // Otherwise, if someone deletes the timeline / detaches the tenant while
     236              :         // we're executing this function, we will outlive the timeline on-disk state.
     237              :         info.current_logical_size_non_incremental = Some(
     238           19 :             timeline
     239           19 :                 .get_current_logical_size_non_incremental(
     240           19 :                     info.last_record_lsn,
     241           19 :                     CancellationToken::new(),
     242           19 :                     ctx,
     243           19 :                 )
     244          654 :                 .await?,
     245              :         );
     246         1424 :     }
     247         1442 :     Ok(info)
     248         1443 : }
     249              : 
     250         2337 : async fn build_timeline_info_common(
     251         2337 :     timeline: &Arc<Timeline>,
     252         2337 :     ctx: &RequestContext,
     253         2337 : ) -> anyhow::Result<TimelineInfo> {
     254         2337 :     crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
     255         2337 :     let last_record_lsn = timeline.get_last_record_lsn();
     256         2337 :     let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
     257         2337 :         let guard = timeline.last_received_wal.lock().unwrap();
     258         2337 :         if let Some(info) = guard.as_ref() {
     259         1234 :             (
     260         1234 :                 Some(format!("{:?}", info.wal_source_connconf)), // Password is hidden, but it's for statistics only.
     261         1234 :                 Some(info.last_received_msg_lsn),
     262         1234 :                 Some(info.last_received_msg_ts),
     263         1234 :             )
     264              :         } else {
     265         1103 :             (None, None, None)
     266              :         }
     267              :     };
     268              : 
     269         2337 :     let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
     270         2337 :     let ancestor_lsn = match timeline.get_ancestor_lsn() {
     271         1770 :         Lsn(0) => None,
     272          567 :         lsn @ Lsn(_) => Some(lsn),
     273              :     };
     274         2337 :     let current_logical_size = match timeline.get_current_logical_size(ctx) {
     275         2337 :         Ok((size, _)) => Some(size),
     276            0 :         Err(err) => {
     277            0 :             error!("Timeline info creation failed to get current logical size: {err:?}");
     278            0 :             None
     279              :         }
     280              :     };
     281         2337 :     let current_physical_size = Some(timeline.layer_size_sum().await);
     282         2337 :     let state = timeline.current_state();
     283         2337 :     let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
     284         2337 : 
     285         2337 :     let info = TimelineInfo {
     286         2337 :         tenant_id: timeline.tenant_id,
     287         2337 :         timeline_id: timeline.timeline_id,
     288         2337 :         ancestor_timeline_id,
     289         2337 :         ancestor_lsn,
     290         2337 :         disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
     291         2337 :         remote_consistent_lsn,
     292         2337 :         last_record_lsn,
     293         2337 :         prev_record_lsn: Some(timeline.get_prev_record_lsn()),
     294         2337 :         latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
     295         2337 :         current_logical_size,
     296         2337 :         current_physical_size,
     297         2337 :         current_logical_size_non_incremental: None,
     298         2337 :         timeline_dir_layer_file_size_sum: None,
     299         2337 :         wal_source_connstr,
     300         2337 :         last_received_msg_lsn,
     301         2337 :         last_received_msg_ts,
     302         2337 :         pg_version: timeline.pg_version,
     303         2337 : 
     304         2337 :         state,
     305         2337 :     };
     306         2337 :     Ok(info)
     307         2337 : }
     308              : 
     309              : // healthcheck handler
     310          579 : async fn status_handler(
     311          579 :     request: Request<Body>,
     312          579 :     _cancel: CancellationToken,
     313          579 : ) -> Result<Response<Body>, ApiError> {
     314          579 :     check_permission(&request, None)?;
     315          579 :     let config = get_config(&request);
     316          579 :     json_response(StatusCode::OK, StatusResponse { id: config.id })
     317          579 : }
     318              : 
     319          907 : async fn timeline_create_handler(
     320          907 :     mut request: Request<Body>,
     321          907 :     _cancel: CancellationToken,
     322          907 : ) -> Result<Response<Body>, ApiError> {
     323          907 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     324          907 :     let request_data: TimelineCreateRequest = json_request(&mut request).await?;
     325          907 :     check_permission(&request, Some(tenant_id))?;
     326              : 
     327          906 :     let new_timeline_id = request_data.new_timeline_id;
     328          906 : 
     329          906 :     let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
     330          906 : 
     331          906 :     let state = get_state(&request);
     332          906 : 
     333          906 :     async {
     334          906 :         let tenant = mgr::get_tenant(tenant_id, true).await?;
     335          906 :         match tenant.create_timeline(
     336          906 :             new_timeline_id,
     337          906 :             request_data.ancestor_timeline_id.map(TimelineId::from),
     338          906 :             request_data.ancestor_start_lsn,
     339          906 :             request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
     340          906 :             state.broker_client.clone(),
     341          906 :             &ctx,
     342          906 :         )
     343      3211326 :         .await {
     344          894 :             Ok(new_timeline) => {
     345              :                 // Created. Construct a TimelineInfo for it.
     346          894 :                 let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
     347            0 :                     .await
     348          894 :                     .map_err(ApiError::InternalServerError)?;
     349          894 :                 json_response(StatusCode::CREATED, timeline_info)
     350              :             }
     351              :             Err(tenant::CreateTimelineError::AlreadyExists) => {
     352            1 :                 json_response(StatusCode::CONFLICT, ())
     353              :             }
     354            8 :             Err(tenant::CreateTimelineError::AncestorLsn(err)) => {
     355            8 :                 json_response(StatusCode::NOT_ACCEPTABLE, HttpErrorBody::from_msg(
     356            8 :                     format!("{err:#}")
     357            8 :                 ))
     358              :             }
     359            2 :             Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
     360              :         }
     361          905 :     }
     362          906 :     .instrument(info_span!("timeline_create", %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
     363      3211326 :     .await
     364          906 : }
     365              : 
     366           51 : async fn timeline_list_handler(
     367           51 :     request: Request<Body>,
     368           51 :     _cancel: CancellationToken,
     369           51 : ) -> Result<Response<Body>, ApiError> {
     370           51 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     371           51 :     let include_non_incremental_logical_size: Option<bool> =
     372           51 :         parse_query_param(&request, "include-non-incremental-logical-size")?;
     373           51 :     check_permission(&request, Some(tenant_id))?;
     374              : 
     375           51 :     let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
     376              : 
     377           51 :     let response_data = async {
     378           51 :         let tenant = mgr::get_tenant(tenant_id, true).await?;
     379           51 :         let timelines = tenant.list_timelines();
     380           51 : 
     381           51 :         let mut response_data = Vec::with_capacity(timelines.len());
     382          145 :         for timeline in timelines {
     383           94 :             let timeline_info = build_timeline_info(
     384           94 :                 &timeline,
     385           94 :                 include_non_incremental_logical_size.unwrap_or(false),
     386           94 :                 &ctx,
     387           94 :             )
     388           94 :             .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
     389            0 :             .await
     390           94 :             .context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
     391           94 :             .map_err(ApiError::InternalServerError)?;
     392              : 
     393           94 :             response_data.push(timeline_info);
     394              :         }
     395           51 :         Ok::<Vec<TimelineInfo>, ApiError>(response_data)
     396           51 :     }
     397           51 :     .instrument(info_span!("timeline_list", %tenant_id))
     398            0 :     .await?;
     399              : 
     400           51 :     json_response(StatusCode::OK, response_data)
     401           51 : }
     402              : 
     403         1440 : async fn timeline_detail_handler(
     404         1440 :     request: Request<Body>,
     405         1440 :     _cancel: CancellationToken,
     406         1440 : ) -> Result<Response<Body>, ApiError> {
     407         1440 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     408         1440 :     let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
     409         1440 :     let include_non_incremental_logical_size: Option<bool> =
     410         1440 :         parse_query_param(&request, "include-non-incremental-logical-size")?;
     411         1440 :     check_permission(&request, Some(tenant_id))?;
     412              : 
     413              :     // Logical size calculation needs downloading.
     414         1440 :     let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
     415              : 
     416         1440 :     let timeline_info = async {
     417         1440 :         let tenant = mgr::get_tenant(tenant_id, true).await?;
     418              : 
     419         1437 :         let timeline = tenant
     420         1437 :             .get_timeline(timeline_id, false)
     421         1437 :             .map_err(|e| ApiError::NotFound(e.into()))?;
     422              : 
     423         1349 :         let timeline_info = build_timeline_info(
     424         1349 :             &timeline,
     425         1349 :             include_non_incremental_logical_size.unwrap_or(false),
     426         1349 :             &ctx,
     427         1349 :         )
     428          667 :         .await
     429         1349 :         .context("get local timeline info")
     430         1349 :         .map_err(ApiError::InternalServerError)?;
     431              : 
     432         1348 :         Ok::<_, ApiError>(timeline_info)
     433         1440 :     }
     434         1440 :     .instrument(info_span!("timeline_detail", %tenant_id, %timeline_id))
     435          667 :     .await?;
     436              : 
     437         1348 :     json_response(StatusCode::OK, timeline_info)
     438         1440 : }
     439              : 
     440           12 : async fn get_lsn_by_timestamp_handler(
     441           12 :     request: Request<Body>,
     442           12 :     _cancel: CancellationToken,
     443           12 : ) -> Result<Response<Body>, ApiError> {
     444           12 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     445           12 :     check_permission(&request, Some(tenant_id))?;
     446              : 
     447           12 :     let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
     448           12 :     let timestamp_raw = must_get_query_param(&request, "timestamp")?;
     449           12 :     let timestamp = humantime::parse_rfc3339(&timestamp_raw)
     450           12 :         .with_context(|| format!("Invalid time: {:?}", timestamp_raw))
     451           12 :         .map_err(ApiError::BadRequest)?;
     452           12 :     let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
     453           12 : 
     454           12 :     let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
     455           12 :     let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
     456          662 :     let result = timeline.find_lsn_for_timestamp(timestamp_pg, &ctx).await?;
     457              : 
     458           12 :     let result = match result {
     459           10 :         LsnForTimestamp::Present(lsn) => format!("{lsn}"),
     460            1 :         LsnForTimestamp::Future(_lsn) => "future".into(),
     461            1 :         LsnForTimestamp::Past(_lsn) => "past".into(),
     462            0 :         LsnForTimestamp::NoData(_lsn) => "nodata".into(),
     463              :     };
     464           12 :     json_response(StatusCode::OK, result)
     465           12 : }
     466              : 
     467           51 : async fn tenant_attach_handler(
     468           51 :     mut request: Request<Body>,
     469           51 :     _cancel: CancellationToken,
     470           51 : ) -> Result<Response<Body>, ApiError> {
     471           51 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     472           51 :     check_permission(&request, Some(tenant_id))?;
     473              : 
     474           51 :     let maybe_body: Option<TenantAttachRequest> = json_request_or_empty_body(&mut request).await?;
     475           48 :     let tenant_conf = match maybe_body {
     476           46 :         Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?,
     477            2 :         None => TenantConfOpt::default(),
     478              :     };
     479              : 
     480           48 :     let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
     481              : 
     482           48 :     info!("Handling tenant attach {tenant_id}");
     483              : 
     484           48 :     let state = get_state(&request);
     485              : 
     486           48 :     if let Some(remote_storage) = &state.remote_storage {
     487           48 :         mgr::attach_tenant(
     488           48 :             state.conf,
     489           48 :             tenant_id,
     490           48 :             tenant_conf,
     491           48 :             state.broker_client.clone(),
     492           48 :             remote_storage.clone(),
     493           48 :             &ctx,
     494           48 :         )
     495           48 :         .instrument(info_span!("tenant_attach", %tenant_id))
     496            9 :         .await?;
     497              :     } else {
     498            0 :         return Err(ApiError::BadRequest(anyhow!(
     499            0 :             "attach_tenant is not possible because pageserver was configured without remote storage"
     500            0 :         )));
     501              :     }
     502              : 
     503           39 :     json_response(StatusCode::ACCEPTED, ())
     504           51 : }
     505              : 
     506          121 : async fn timeline_delete_handler(
     507          121 :     request: Request<Body>,
     508          121 :     _cancel: CancellationToken,
     509          121 : ) -> Result<Response<Body>, ApiError> {
     510          121 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     511          121 :     let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
     512          121 :     check_permission(&request, Some(tenant_id))?;
     513              : 
     514          121 :     let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
     515          121 : 
     516          121 :     mgr::delete_timeline(tenant_id, timeline_id, &ctx)
     517          121 :         .instrument(info_span!("timeline_delete", %tenant_id, %timeline_id))
     518          543 :         .await?;
     519              : 
     520          103 :     json_response(StatusCode::ACCEPTED, ())
     521          121 : }
     522              : 
     523           40 : async fn tenant_detach_handler(
     524           40 :     request: Request<Body>,
     525           40 :     _cancel: CancellationToken,
     526           40 : ) -> Result<Response<Body>, ApiError> {
     527           40 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     528           40 :     check_permission(&request, Some(tenant_id))?;
     529           40 :     let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
     530              : 
     531           40 :     let state = get_state(&request);
     532           40 :     let conf = state.conf;
     533           40 :     mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false))
     534           40 :         .instrument(info_span!("tenant_detach", %tenant_id))
     535          270 :         .await?;
     536              : 
     537           37 :     json_response(StatusCode::OK, ())
     538           40 : }
     539              : 
     540            7 : async fn tenant_load_handler(
     541            7 :     request: Request<Body>,
     542            7 :     _cancel: CancellationToken,
     543            7 : ) -> Result<Response<Body>, ApiError> {
     544            7 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     545            7 :     check_permission(&request, Some(tenant_id))?;
     546              : 
     547            7 :     let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
     548            7 : 
     549            7 :     let state = get_state(&request);
     550            7 :     mgr::load_tenant(
     551            7 :         state.conf,
     552            7 :         tenant_id,
     553            7 :         state.broker_client.clone(),
     554            7 :         state.remote_storage.clone(),
     555            7 :         &ctx,
     556            7 :     )
     557            7 :     .instrument(info_span!("load", %tenant_id))
     558            1 :     .await?;
     559              : 
     560            6 :     json_response(StatusCode::ACCEPTED, ())
     561            7 : }
     562              : 
     563            8 : async fn tenant_ignore_handler(
     564            8 :     request: Request<Body>,
     565            8 :     _cancel: CancellationToken,
     566            8 : ) -> Result<Response<Body>, ApiError> {
     567            8 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     568            8 :     check_permission(&request, Some(tenant_id))?;
     569              : 
     570            8 :     let state = get_state(&request);
     571            8 :     let conf = state.conf;
     572            8 :     mgr::ignore_tenant(conf, tenant_id)
     573            8 :         .instrument(info_span!("ignore_tenant", %tenant_id))
     574           34 :         .await?;
     575              : 
     576            8 :     json_response(StatusCode::OK, ())
     577            8 : }
     578              : 
     579           77 : async fn tenant_list_handler(
     580           77 :     request: Request<Body>,
     581           77 :     _cancel: CancellationToken,
     582           77 : ) -> Result<Response<Body>, ApiError> {
     583           77 :     check_permission(&request, None)?;
     584              : 
     585           77 :     let response_data = mgr::list_tenants()
     586           77 :         .instrument(info_span!("tenant_list"))
     587            0 :         .await
     588           77 :         .map_err(anyhow::Error::new)
     589           77 :         .map_err(ApiError::InternalServerError)?
     590           77 :         .iter()
     591          132 :         .map(|(id, state)| TenantInfo {
     592          132 :             id: *id,
     593          132 :             state: state.clone(),
     594          132 :             current_physical_size: None,
     595          132 :             attachment_status: state.attachment_status(),
     596          132 :         })
     597           77 :         .collect::<Vec<TenantInfo>>();
     598           77 : 
     599           77 :     json_response(StatusCode::OK, response_data)
     600           77 : }
     601              : 
     602          504 : async fn tenant_status(
     603          504 :     request: Request<Body>,
     604          504 :     _cancel: CancellationToken,
     605          504 : ) -> Result<Response<Body>, ApiError> {
     606          504 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     607          504 :     check_permission(&request, Some(tenant_id))?;
     608              : 
     609          504 :     let tenant_info = async {
     610          504 :         let tenant = mgr::get_tenant(tenant_id, false).await?;
     611              : 
     612              :         // Calculate total physical size of all timelines
     613          417 :         let mut current_physical_size = 0;
     614          444 :         for timeline in tenant.list_timelines().iter() {
     615          444 :             current_physical_size += timeline.layer_size_sum().await;
     616              :         }
     617              : 
     618          417 :         let state = tenant.current_state();
     619          417 :         Result::<_, ApiError>::Ok(TenantInfo {
     620          417 :             id: tenant_id,
     621          417 :             state: state.clone(),
     622          417 :             current_physical_size: Some(current_physical_size),
     623          417 :             attachment_status: state.attachment_status(),
     624          417 :         })
     625          504 :     }
     626          504 :     .instrument(info_span!("tenant_status_handler", %tenant_id))
     627           87 :     .await?;
     628              : 
     629          417 :     json_response(StatusCode::OK, tenant_info)
     630          504 : }
     631              : 
     632          132 : async fn tenant_delete_handler(
     633          132 :     request: Request<Body>,
     634          132 :     _cancel: CancellationToken,
     635          132 : ) -> Result<Response<Body>, ApiError> {
     636              :     // TODO openapi spec
     637          132 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     638          132 :     check_permission(&request, Some(tenant_id))?;
     639              : 
     640          132 :     let state = get_state(&request);
     641          132 : 
     642          132 :     mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id)
     643          132 :         .instrument(info_span!("tenant_delete_handler", %tenant_id))
     644          675 :         .await?;
     645              : 
     646          104 :     json_response(StatusCode::ACCEPTED, ())
     647          132 : }
     648              : 
     649              : /// HTTP endpoint to query the current tenant_size of a tenant.
     650              : ///
     651              : /// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used
     652              : /// to debug any of the calculations. Requires `tenant_id` request parameter, supports
     653              : /// `inputs_only=true|false` (default false) which supports debugging failure to calculate model
     654              : /// values.
     655              : ///
     656              : /// 'retention_period' query parameter overrides the cutoff that is used to calculate the size
     657              : /// (only if it is shorter than the real cutoff).
     658              : ///
     659              : /// Note: we don't update the cached size and prometheus metric here.
     660              : /// The retention period might be different, and it's nice to have a method to just calculate it
     661              : /// without modifying anything anyway.
     662           53 : async fn tenant_size_handler(
     663           53 :     request: Request<Body>,
     664           53 :     _cancel: CancellationToken,
     665           53 : ) -> Result<Response<Body>, ApiError> {
     666           53 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     667           53 :     check_permission(&request, Some(tenant_id))?;
     668           53 :     let inputs_only: Option<bool> = parse_query_param(&request, "inputs_only")?;
     669           53 :     let retention_period: Option<u64> = parse_query_param(&request, "retention_period")?;
     670           53 :     let headers = request.headers();
     671           53 : 
     672           53 :     let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
     673           53 :     let tenant = mgr::get_tenant(tenant_id, true).await?;
     674              : 
     675              :     // this can be long operation
     676           53 :     let inputs = tenant
     677           53 :         .gather_size_inputs(
     678           53 :             retention_period,
     679           53 :             LogicalSizeCalculationCause::TenantSizeHandler,
     680           53 :             &ctx,
     681           53 :         )
     682           37 :         .await
     683           53 :         .map_err(ApiError::InternalServerError)?;
     684              : 
     685           53 :     let mut sizes = None;
     686           53 :     if !inputs_only.unwrap_or(false) {
     687           53 :         let storage_model = inputs
     688           53 :             .calculate_model()
     689           53 :             .map_err(ApiError::InternalServerError)?;
     690           53 :         let size = storage_model.calculate();
     691           53 : 
     692           53 :         // If request header expects html, return html
     693           53 :         if headers["Accept"] == "text/html" {
     694           18 :             return synthetic_size_html_response(inputs, storage_model, size);
     695           35 :         }
     696           35 :         sizes = Some(size);
     697            0 :     } else if headers["Accept"] == "text/html" {
     698            0 :         return Err(ApiError::BadRequest(anyhow!(
     699            0 :             "inputs_only parameter is incompatible with html output request"
     700            0 :         )));
     701            0 :     }
     702              : 
     703              :     /// The type resides in the pageserver not to expose `ModelInputs`.
     704              :     #[serde_with::serde_as]
     705           35 :     #[derive(serde::Serialize)]
     706              :     struct TenantHistorySize {
     707              :         #[serde_as(as = "serde_with::DisplayFromStr")]
     708              :         id: TenantId,
     709              :         /// Size is a mixture of WAL and logical size, so the unit is bytes.
     710              :         ///
     711              :         /// Will be none if `?inputs_only=true` was given.
     712              :         size: Option<u64>,
     713              :         /// Size of each segment used in the model.
     714              :         /// Will be null if `?inputs_only=true` was given.
     715              :         segment_sizes: Option<Vec<tenant_size_model::SegmentSizeResult>>,
     716              :         inputs: crate::tenant::size::ModelInputs,
     717              :     }
     718              : 
     719           35 :     json_response(
     720           35 :         StatusCode::OK,
     721           35 :         TenantHistorySize {
     722           35 :             id: tenant_id,
     723           35 :             size: sizes.as_ref().map(|x| x.total_size),
     724           35 :             segment_sizes: sizes.map(|x| x.segments),
     725           35 :             inputs,
     726           35 :         },
     727           35 :     )
     728           53 : }
     729              : 
     730          101 : async fn layer_map_info_handler(
     731          101 :     request: Request<Body>,
     732          101 :     _cancel: CancellationToken,
     733          101 : ) -> Result<Response<Body>, ApiError> {
     734          101 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     735          101 :     let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
     736          101 :     let reset: LayerAccessStatsReset =
     737          101 :         parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset);
     738          101 : 
     739          101 :     check_permission(&request, Some(tenant_id))?;
     740              : 
     741          101 :     let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
     742          101 :     let layer_map_info = timeline.layer_map_info(reset).await;
     743              : 
     744          101 :     json_response(StatusCode::OK, layer_map_info)
     745          101 : }
     746              : 
     747            6 : async fn layer_download_handler(
     748            6 :     request: Request<Body>,
     749            6 :     _cancel: CancellationToken,
     750            6 : ) -> Result<Response<Body>, ApiError> {
     751            6 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     752            6 :     check_permission(&request, Some(tenant_id))?;
     753            6 :     let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
     754            6 :     let layer_file_name = get_request_param(&request, "layer_file_name")?;
     755            6 :     check_permission(&request, Some(tenant_id))?;
     756              : 
     757            6 :     let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
     758            6 :     let downloaded = timeline
     759            6 :         .download_layer(layer_file_name)
     760            6 :         .await
     761            6 :         .map_err(ApiError::InternalServerError)?;
     762              : 
     763            6 :     match downloaded {
     764            6 :         Some(true) => json_response(StatusCode::OK, ()),
     765            0 :         Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
     766            0 :         None => json_response(
     767            0 :             StatusCode::BAD_REQUEST,
     768            0 :             format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"),
     769            0 :         ),
     770              :     }
     771            6 : }
     772              : 
     773           25 : async fn evict_timeline_layer_handler(
     774           25 :     request: Request<Body>,
     775           25 :     _cancel: CancellationToken,
     776           25 : ) -> Result<Response<Body>, ApiError> {
     777           25 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     778           25 :     check_permission(&request, Some(tenant_id))?;
     779           25 :     let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
     780           25 :     let layer_file_name = get_request_param(&request, "layer_file_name")?;
     781              : 
     782           25 :     let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
     783           25 :     let evicted = timeline
     784           25 :         .evict_layer(layer_file_name)
     785            2 :         .await
     786           25 :         .map_err(ApiError::InternalServerError)?;
     787              : 
     788           25 :     match evicted {
     789           25 :         Some(true) => json_response(StatusCode::OK, ()),
     790            0 :         Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
     791            0 :         None => json_response(
     792            0 :             StatusCode::BAD_REQUEST,
     793            0 :             format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"),
     794            0 :         ),
     795              :     }
     796           25 : }
     797              : 
     798              : /// Get tenant_size SVG graph along with the JSON data.
     799           18 : fn synthetic_size_html_response(
     800           18 :     inputs: ModelInputs,
     801           18 :     storage_model: StorageModel,
     802           18 :     sizes: SizeResult,
     803           18 : ) -> Result<Response<Body>, ApiError> {
     804           18 :     let mut timeline_ids: Vec<String> = Vec::new();
     805           18 :     let mut timeline_map: HashMap<TimelineId, usize> = HashMap::new();
     806           26 :     for (index, ti) in inputs.timeline_inputs.iter().enumerate() {
     807           26 :         timeline_map.insert(ti.timeline_id, index);
     808           26 :         timeline_ids.push(ti.timeline_id.to_string());
     809           26 :     }
     810           18 :     let seg_to_branch: Vec<usize> = inputs
     811           18 :         .segments
     812           18 :         .iter()
     813           66 :         .map(|seg| *timeline_map.get(&seg.timeline_id).unwrap())
     814           18 :         .collect();
     815              : 
     816           18 :     let svg =
     817           18 :         tenant_size_model::svg::draw_svg(&storage_model, &timeline_ids, &seg_to_branch, &sizes)
     818           18 :             .map_err(ApiError::InternalServerError)?;
     819              : 
     820           18 :     let mut response = String::new();
     821           18 : 
     822           18 :     use std::fmt::Write;
     823           18 :     write!(response, "<html>\n<body>\n").unwrap();
     824           18 :     write!(response, "<div>\n{svg}\n</div>").unwrap();
     825           18 :     writeln!(response, "Project size: {}", sizes.total_size).unwrap();
     826           18 :     writeln!(response, "<pre>").unwrap();
     827           18 :     writeln!(
     828           18 :         response,
     829           18 :         "{}",
     830           18 :         serde_json::to_string_pretty(&inputs).unwrap()
     831           18 :     )
     832           18 :     .unwrap();
     833           18 :     writeln!(
     834           18 :         response,
     835           18 :         "{}",
     836           18 :         serde_json::to_string_pretty(&sizes.segments).unwrap()
     837           18 :     )
     838           18 :     .unwrap();
     839           18 :     writeln!(response, "</pre>").unwrap();
     840           18 :     write!(response, "</body>\n</html>\n").unwrap();
     841           18 : 
     842           18 :     html_response(StatusCode::OK, response)
     843           18 : }
     844              : 
     845           18 : pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>, ApiError> {
     846           18 :     let response = Response::builder()
     847           18 :         .status(status)
     848           18 :         .header(hyper::header::CONTENT_TYPE, "text/html")
     849           18 :         .body(Body::from(data.as_bytes().to_vec()))
     850           18 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     851           18 :     Ok(response)
     852           18 : }
     853              : 
     854          481 : async fn tenant_create_handler(
     855          481 :     mut request: Request<Body>,
     856          481 :     _cancel: CancellationToken,
     857          481 : ) -> Result<Response<Body>, ApiError> {
     858          481 :     let request_data: TenantCreateRequest = json_request(&mut request).await?;
     859          481 :     let target_tenant_id = request_data.new_tenant_id;
     860          481 :     check_permission(&request, None)?;
     861              : 
     862          480 :     let _timer = STORAGE_TIME_GLOBAL
     863          480 :         .get_metric_with_label_values(&[StorageTimeOperation::CreateTenant.into()])
     864          480 :         .expect("bug")
     865          480 :         .start_timer();
     866              : 
     867          480 :     let tenant_conf =
     868          480 :         TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
     869              : 
     870          480 :     let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
     871          480 : 
     872          480 :     let state = get_state(&request);
     873              : 
     874          480 :     let new_tenant = mgr::create_tenant(
     875          480 :         state.conf,
     876          480 :         tenant_conf,
     877          480 :         target_tenant_id,
     878          480 :         state.broker_client.clone(),
     879          480 :         state.remote_storage.clone(),
     880          480 :         &ctx,
     881          480 :     )
     882          480 :     .instrument(info_span!("tenant_create", tenant_id = %target_tenant_id))
     883            1 :     .await?;
     884              : 
     885              :     // We created the tenant. Existing API semantics are that the tenant
     886              :     // is Active when this function returns.
     887          686 :     if let res @ Err(_) = new_tenant.wait_to_become_active().await {
     888              :         // This shouldn't happen because we just created the tenant directory
     889              :         // in tenant::mgr::create_tenant, and there aren't any remote timelines
     890              :         // to load, so, nothing can really fail during load.
     891              :         // Don't do cleanup because we don't know how we got here.
     892              :         // The tenant will likely be in `Broken` state and subsequent
     893              :         // calls will fail.
     894            0 :         res.context("created tenant failed to become active")
     895            0 :             .map_err(ApiError::InternalServerError)?;
     896          479 :     }
     897              : 
     898          479 :     json_response(
     899          479 :         StatusCode::CREATED,
     900          479 :         TenantCreateResponse(new_tenant.tenant_id()),
     901          479 :     )
     902          481 : }
     903              : 
     904           40 : async fn get_tenant_config_handler(
     905           40 :     request: Request<Body>,
     906           40 :     _cancel: CancellationToken,
     907           40 : ) -> Result<Response<Body>, ApiError> {
     908           40 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
     909           40 :     check_permission(&request, Some(tenant_id))?;
     910              : 
     911           40 :     let tenant = mgr::get_tenant(tenant_id, false).await?;
     912              : 
     913           40 :     let response = HashMap::from([
     914              :         (
     915              :             "tenant_specific_overrides",
     916           40 :             serde_json::to_value(tenant.tenant_specific_overrides())
     917           40 :                 .context("serializing tenant specific overrides")
     918           40 :                 .map_err(ApiError::InternalServerError)?,
     919              :         ),
     920              :         (
     921           40 :             "effective_config",
     922           40 :             serde_json::to_value(tenant.effective_config())
     923           40 :                 .context("serializing effective config")
     924           40 :                 .map_err(ApiError::InternalServerError)?,
     925              :         ),
     926              :     ]);
     927              : 
     928           40 :     json_response(StatusCode::OK, response)
     929           40 : }
     930              : 
     931           27 : async fn update_tenant_config_handler(
     932           27 :     mut request: Request<Body>,
     933           27 :     _cancel: CancellationToken,
     934           27 : ) -> Result<Response<Body>, ApiError> {
     935           27 :     let request_data: TenantConfigRequest = json_request(&mut request).await?;
     936           27 :     let tenant_id = request_data.tenant_id;
     937           27 :     check_permission(&request, Some(tenant_id))?;
     938              : 
     939           27 :     let tenant_conf =
     940           27 :         TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
     941              : 
     942           27 :     let state = get_state(&request);
     943           27 :     mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
     944           27 :         .instrument(info_span!("tenant_config", %tenant_id))
     945            0 :         .await?;
     946              : 
     947           27 :     json_response(StatusCode::OK, ())
     948           27 : }
     949              : 
     950              : /// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
     951            2 : async fn handle_tenant_break(
     952            2 :     r: Request<Body>,
     953            2 :     _cancel: CancellationToken,
     954            2 : ) -> Result<Response<Body>, ApiError> {
     955            2 :     let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?;
     956              : 
     957            2 :     let tenant = crate::tenant::mgr::get_tenant(tenant_id, true)
     958            0 :         .await
     959            2 :         .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
     960              : 
     961            2 :     tenant.set_broken("broken from test".to_owned()).await;
     962              : 
     963            2 :     json_response(StatusCode::OK, ())
     964            2 : }
     965              : 
     966          246 : async fn failpoints_handler(
     967          246 :     mut request: Request<Body>,
     968          246 :     _cancel: CancellationToken,
     969          246 : ) -> Result<Response<Body>, ApiError> {
     970          246 :     if !fail::has_failpoints() {
     971            0 :         return Err(ApiError::BadRequest(anyhow!(
     972            0 :             "Cannot manage failpoints because pageserver was compiled without failpoints support"
     973            0 :         )));
     974          246 :     }
     975              : 
     976          246 :     let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
     977          497 :     for fp in failpoints {
     978          251 :         info!("cfg failpoint: {} {}", fp.name, fp.actions);
     979              : 
     980              :         // We recognize one extra "action" that's not natively recognized
     981              :         // by the failpoints crate: exit, to immediately kill the process
     982          251 :         let cfg_result = crate::failpoint_support::apply_failpoint(&fp.name, &fp.actions);
     983              : 
     984          251 :         if let Err(err_msg) = cfg_result {
     985            0 :             return Err(ApiError::BadRequest(anyhow!(
     986            0 :                 "Failed to configure failpoints: {err_msg}"
     987            0 :             )));
     988          251 :         }
     989              :     }
     990              : 
     991          246 :     json_response(StatusCode::OK, ())
     992          246 : }
     993              : 
     994              : // Run GC immediately on given timeline.
     995          505 : async fn timeline_gc_handler(
     996          505 :     mut request: Request<Body>,
     997          505 :     _cancel: CancellationToken,
     998          505 : ) -> Result<Response<Body>, ApiError> {
     999          505 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
    1000          505 :     let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
    1001          505 :     check_permission(&request, Some(tenant_id))?;
    1002              : 
    1003          505 :     let gc_req: TimelineGcRequest = json_request(&mut request).await?;
    1004              : 
    1005          505 :     let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
    1006          505 :     let wait_task_done = mgr::immediate_gc(tenant_id, timeline_id, gc_req, &ctx).await?;
    1007          504 :     let gc_result = wait_task_done
    1008          504 :         .await
    1009          504 :         .context("wait for gc task")
    1010          504 :         .map_err(ApiError::InternalServerError)?
    1011          504 :         .map_err(ApiError::InternalServerError)?;
    1012              : 
    1013          503 :     json_response(StatusCode::OK, gc_result)
    1014          505 : }
    1015              : 
    1016              : // Run compaction immediately on given timeline.
    1017          123 : async fn timeline_compact_handler(
    1018          123 :     request: Request<Body>,
    1019          123 :     cancel: CancellationToken,
    1020          123 : ) -> Result<Response<Body>, ApiError> {
    1021          123 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
    1022          123 :     let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
    1023          123 :     check_permission(&request, Some(tenant_id))?;
    1024              : 
    1025          123 :     async {
    1026          123 :         let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
    1027          123 :         let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
    1028          123 :         timeline
    1029          123 :             .compact(&cancel, &ctx)
    1030        11894 :             .await
    1031          123 :             .map_err(ApiError::InternalServerError)?;
    1032          123 :         json_response(StatusCode::OK, ())
    1033          123 :     }
    1034          123 :     .instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
    1035        11894 :     .await
    1036          123 : }
    1037              : 
    1038              : // Run checkpoint immediately on given timeline.
    1039          694 : async fn timeline_checkpoint_handler(
    1040          694 :     request: Request<Body>,
    1041          694 :     cancel: CancellationToken,
    1042          694 : ) -> Result<Response<Body>, ApiError> {
    1043          694 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
    1044          694 :     let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
    1045          694 :     check_permission(&request, Some(tenant_id))?;
    1046          694 :     async {
    1047          694 :         let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
    1048          694 :         let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
    1049          694 :         timeline
    1050          694 :             .freeze_and_flush()
    1051          775 :             .await
    1052          694 :             .map_err(ApiError::InternalServerError)?;
    1053          694 :         timeline
    1054          694 :             .compact(&cancel, &ctx)
    1055       241577 :             .await
    1056          694 :             .map_err(ApiError::InternalServerError)?;
    1057              : 
    1058          693 :         json_response(StatusCode::OK, ())
    1059          694 :     }
    1060          694 :     .instrument(info_span!("manual_checkpoint", %tenant_id, %timeline_id))
    1061       242352 :     .await
    1062          694 : }
    1063              : 
    1064            3 : async fn timeline_download_remote_layers_handler_post(
    1065            3 :     mut request: Request<Body>,
    1066            3 :     _cancel: CancellationToken,
    1067            3 : ) -> Result<Response<Body>, ApiError> {
    1068            3 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
    1069            3 :     let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
    1070            3 :     let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?;
    1071            3 :     check_permission(&request, Some(tenant_id))?;
    1072              : 
    1073            3 :     let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
    1074            3 :     match timeline.spawn_download_all_remote_layers(body).await {
    1075            3 :         Ok(st) => json_response(StatusCode::ACCEPTED, st),
    1076            0 :         Err(st) => json_response(StatusCode::CONFLICT, st),
    1077              :     }
    1078            3 : }
    1079              : 
    1080           21 : async fn timeline_download_remote_layers_handler_get(
    1081           21 :     request: Request<Body>,
    1082           21 :     _cancel: CancellationToken,
    1083           21 : ) -> Result<Response<Body>, ApiError> {
    1084           21 :     let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
    1085           21 :     check_permission(&request, Some(tenant_id))?;
    1086           21 :     let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
    1087              : 
    1088           21 :     let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
    1089           21 :     let info = timeline
    1090           21 :         .get_download_all_remote_layers_task_info()
    1091           21 :         .context("task never started since last pageserver process start")
    1092           21 :         .map_err(|e| ApiError::NotFound(e.into()))?;
    1093           21 :     json_response(StatusCode::OK, info)
    1094           21 : }
    1095              : 
    1096          985 : async fn active_timeline_of_active_tenant(
    1097          985 :     tenant_id: TenantId,
    1098          985 :     timeline_id: TimelineId,
    1099          985 : ) -> Result<Arc<Timeline>, ApiError> {
    1100          985 :     let tenant = mgr::get_tenant(tenant_id, true).await?;
    1101          985 :     tenant
    1102          985 :         .get_timeline(timeline_id, true)
    1103          985 :         .map_err(|e| ApiError::NotFound(e.into()))
    1104          985 : }
    1105              : 
    1106            0 : async fn always_panic_handler(
    1107            0 :     req: Request<Body>,
    1108            0 :     _cancel: CancellationToken,
    1109            0 : ) -> Result<Response<Body>, ApiError> {
    1110            0 :     // Deliberately cause a panic to exercise the panic hook registered via std::panic::set_hook().
    1111            0 :     // For pageserver, the relevant panic hook is `tracing_panic_hook` , and the `sentry` crate's wrapper around it.
    1112            0 :     // Use catch_unwind to ensure that tokio nor hyper are distracted by our panic.
    1113            0 :     let query = req.uri().query();
    1114            0 :     let _ = std::panic::catch_unwind(|| {
    1115            0 :         panic!("unconditional panic for testing panic hook integration; request query: {query:?}")
    1116            0 :     });
    1117            0 :     json_response(StatusCode::NO_CONTENT, ())
    1118            0 : }
    1119              : 
    1120            5 : async fn disk_usage_eviction_run(
    1121            5 :     mut r: Request<Body>,
    1122            5 :     _cancel: CancellationToken,
    1123            5 : ) -> Result<Response<Body>, ApiError> {
    1124            5 :     check_permission(&r, None)?;
    1125              : 
    1126           26 :     #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
    1127              :     struct Config {
    1128              :         /// How many bytes to evict before reporting that pressure is relieved.
    1129              :         evict_bytes: u64,
    1130              :     }
    1131              : 
    1132           26 :     #[derive(Debug, Clone, Copy, serde::Serialize)]
    1133              :     struct Usage {
    1134              :         // remains unchanged after instantiation of the struct
    1135              :         config: Config,
    1136              :         // updated by `add_available_bytes`
    1137              :         freed_bytes: u64,
    1138              :     }
    1139              : 
    1140              :     impl crate::disk_usage_eviction_task::Usage for Usage {
    1141          113 :         fn has_pressure(&self) -> bool {
    1142          113 :             self.config.evict_bytes > self.freed_bytes
    1143          113 :         }
    1144              : 
    1145          210 :         fn add_available_bytes(&mut self, bytes: u64) {
    1146          210 :             self.freed_bytes += bytes;
    1147          210 :         }
    1148              :     }
    1149              : 
    1150            5 :     let config = json_request::<Config>(&mut r)
    1151            1 :         .await
    1152            5 :         .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
    1153              : 
    1154            5 :     let usage = Usage {
    1155            5 :         config,
    1156            5 :         freed_bytes: 0,
    1157            5 :     };
    1158            5 : 
    1159            5 :     let (tx, rx) = tokio::sync::oneshot::channel();
    1160            5 : 
    1161            5 :     let state = get_state(&r);
    1162              : 
    1163            5 :     let Some(storage) = state.remote_storage.clone() else {
    1164            0 :         return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1165            0 :             "remote storage not configured, cannot run eviction iteration"
    1166            0 :         )));
    1167              :     };
    1168              : 
    1169            5 :     let state = state.disk_usage_eviction_state.clone();
    1170            5 : 
    1171            5 :     let cancel = CancellationToken::new();
    1172            5 :     let child_cancel = cancel.clone();
    1173            5 :     let _g = cancel.drop_guard();
    1174            5 : 
    1175            5 :     crate::task_mgr::spawn(
    1176            5 :         crate::task_mgr::BACKGROUND_RUNTIME.handle(),
    1177            5 :         TaskKind::DiskUsageEviction,
    1178            5 :         None,
    1179            5 :         None,
    1180            5 :         "ondemand disk usage eviction",
    1181            5 :         false,
    1182            5 :         async move {
    1183            5 :             let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
    1184            5 :                 &state,
    1185            5 :                 &storage,
    1186            5 :                 usage,
    1187            5 :                 &child_cancel,
    1188            5 :             )
    1189            0 :             .await;
    1190              : 
    1191            5 :             info!(?res, "disk_usage_eviction_task_iteration_impl finished");
    1192              : 
    1193            5 :             let _ = tx.send(res);
    1194            5 :             Ok(())
    1195            5 :         }
    1196            5 :         .in_current_span(),
    1197            5 :     );
    1198              : 
    1199            5 :     let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?;
    1200              : 
    1201            5 :     json_response(StatusCode::OK, response)
    1202            5 : }
    1203              : 
    1204            0 : async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
    1205            0 :     json_response(
    1206            0 :         StatusCode::NOT_FOUND,
    1207            0 :         HttpErrorBody::from_msg("page not found".to_owned()),
    1208            0 :     )
    1209            0 : }
    1210              : 
    1211            5 : async fn post_tracing_event_handler(
    1212            5 :     mut r: Request<Body>,
    1213            5 :     _cancel: CancellationToken,
    1214            5 : ) -> Result<Response<Body>, ApiError> {
    1215           10 :     #[derive(Debug, serde::Deserialize)]
    1216              :     #[serde(rename_all = "lowercase")]
    1217              :     enum Level {
    1218              :         Error,
    1219              :         Warn,
    1220              :         Info,
    1221              :         Debug,
    1222              :         Trace,
    1223              :     }
    1224           25 :     #[derive(Debug, serde::Deserialize)]
    1225              :     struct Request {
    1226              :         level: Level,
    1227              :         message: String,
    1228              :     }
    1229            5 :     let body: Request = json_request(&mut r)
    1230            0 :         .await
    1231            5 :         .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
    1232              : 
    1233            5 :     match body.level {
    1234            2 :         Level::Error => tracing::error!(?body.message),
    1235            2 :         Level::Warn => tracing::warn!(?body.message),
    1236            2 :         Level::Info => tracing::info!(?body.message),
    1237            1 :         Level::Debug => tracing::debug!(?body.message),
    1238            1 :         Level::Trace => tracing::trace!(?body.message),
    1239              :     }
    1240              : 
    1241            5 :     json_response(StatusCode::OK, ())
    1242            5 : }
    1243              : 
    1244              : /// Common functionality of all the HTTP API handlers.
    1245              : ///
    1246              : /// - Adds a tracing span to each request (by `request_span`)
    1247              : /// - Logs the request depending on the request method (by `request_span`)
    1248              : /// - Logs the response if it was not successful (by `request_span`
    1249              : /// - Shields the handler function from async cancellations. Hyper can drop the handler
    1250              : ///   Future if the connection to the client is lost, but most of the pageserver code is
    1251              : ///   not async cancellation safe. This converts the dropped future into a graceful cancellation
    1252              : ///   request with a CancellationToken.
    1253         6266 : async fn api_handler<R, H>(request: Request<Body>, handler: H) -> Result<Response<Body>, ApiError>
    1254         6266 : where
    1255         6266 :     R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
    1256         6266 :     H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
    1257         6266 : {
    1258         6266 :     // Spawn a new task to handle the request, to protect the handler from unexpected
    1259         6266 :     // async cancellations. Most pageserver functions are not async cancellation safe.
    1260         6266 :     // We arm a drop-guard, so that if Hyper drops the Future, we signal the task
    1261         6266 :     // with the cancellation token.
    1262         6266 :     let token = CancellationToken::new();
    1263         6266 :     let cancel_guard = token.clone().drop_guard();
    1264         6266 :     let result = request_span(request, move |r| async {
    1265         6266 :         let handle = tokio::spawn(
    1266         6266 :             async {
    1267         6266 :                 let token_cloned = token.clone();
    1268      3469830 :                 let result = handler(r, token).await;
    1269         6265 :                 if token_cloned.is_cancelled() {
    1270            1 :                     info!("Cancelled request finished");
    1271         6264 :                 }
    1272         6265 :                 result
    1273         6266 :             }
    1274         6266 :             .in_current_span(),
    1275         6266 :         );
    1276         6266 : 
    1277         6431 :         match handle.await {
    1278         6264 :             Ok(result) => result,
    1279            0 :             Err(e) => {
    1280            0 :                 // The handler task panicked. We have a global panic handler that logs the
    1281            0 :                 // panic with its backtrace, so no need to log that here. Only log a brief
    1282            0 :                 // message to make it clear that we returned the error to the client.
    1283            0 :                 error!("HTTP request handler task panicked: {e:#}");
    1284              : 
    1285              :                 // Don't return an Error here, because then fallback error handler that was
    1286              :                 // installed in make_router() will print the error. Instead, construct the
    1287              :                 // HTTP error response and return that.
    1288            0 :                 Ok(
    1289            0 :                     ApiError::InternalServerError(anyhow!("HTTP request handler task panicked"))
    1290            0 :                         .into_response(),
    1291            0 :                 )
    1292              :             }
    1293              :         }
    1294         6266 :     })
    1295         6431 :     .await;
    1296              : 
    1297         6264 :     cancel_guard.disarm();
    1298         6264 : 
    1299         6264 :     result
    1300         6264 : }
    1301              : 
    1302              : /// Like api_handler, but returns an error response if the server is built without
    1303              : /// the 'testing' feature.
    1304         1070 : async fn testing_api_handler<R, H>(
    1305         1070 :     desc: &str,
    1306         1070 :     request: Request<Body>,
    1307         1070 :     handler: H,
    1308         1070 : ) -> Result<Response<Body>, ApiError>
    1309         1070 : where
    1310         1070 :     R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
    1311         1070 :     H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
    1312         1070 : {
    1313         1070 :     if cfg!(feature = "testing") {
    1314         1070 :         api_handler(request, handler).await
    1315              :     } else {
    1316            0 :         std::future::ready(Err(ApiError::BadRequest(anyhow!(
    1317            0 :             "Cannot {desc} because pageserver was compiled without testing APIs",
    1318            0 :         ))))
    1319            0 :         .await
    1320              :     }
    1321         1070 : }
    1322              : 
    1323          575 : pub fn make_router(
    1324          575 :     conf: &'static PageServerConf,
    1325          575 :     launch_ts: &'static LaunchTimestamp,
    1326          575 :     auth: Option<Arc<JwtAuth>>,
    1327          575 :     broker_client: BrokerClientChannel,
    1328          575 :     remote_storage: Option<GenericRemoteStorage>,
    1329          575 :     disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
    1330          575 : ) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
    1331          575 :     let spec = include_bytes!("openapi_spec.yml");
    1332          575 :     let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
    1333          575 :     if auth.is_some() {
    1334           60 :         router = router.middleware(auth_middleware(|request| {
    1335           60 :             let state = get_state(request);
    1336           60 :             if state.allowlist_routes.contains(request.uri()) {
    1337           10 :                 None
    1338              :             } else {
    1339           50 :                 state.auth.as_deref()
    1340              :             }
    1341           60 :         }))
    1342          566 :     }
    1343              : 
    1344          575 :     router = router.middleware(
    1345          575 :         endpoint::add_response_header_middleware(
    1346          575 :             "PAGESERVER_LAUNCH_TIMESTAMP",
    1347          575 :             &launch_ts.to_string(),
    1348          575 :         )
    1349          575 :         .expect("construct launch timestamp header middleware"),
    1350          575 :     );
    1351          575 : 
    1352          575 :     Ok(router
    1353          575 :         .data(Arc::new(
    1354          575 :             State::new(
    1355          575 :                 conf,
    1356          575 :                 auth,
    1357          575 :                 remote_storage,
    1358          575 :                 broker_client,
    1359          575 :                 disk_usage_eviction_state,
    1360          575 :             )
    1361          575 :             .context("Failed to initialize router state")?,
    1362              :         ))
    1363          579 :         .get("/v1/status", |r| api_handler(r, status_handler))
    1364          575 :         .put("/v1/failpoints", |r| {
    1365          246 :             testing_api_handler("manage failpoints", r, failpoints_handler)
    1366          575 :         })
    1367          575 :         .get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
    1368          575 :         .post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
    1369          575 :         .get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
    1370          575 :         .delete("/v1/tenant/:tenant_id", |r| {
    1371          132 :             api_handler(r, tenant_delete_handler)
    1372          575 :         })
    1373          575 :         .get("/v1/tenant/:tenant_id/synthetic_size", |r| {
    1374           53 :             api_handler(r, tenant_size_handler)
    1375          575 :         })
    1376          575 :         .put("/v1/tenant/config", |r| {
    1377           27 :             api_handler(r, update_tenant_config_handler)
    1378          575 :         })
    1379          575 :         .get("/v1/tenant/:tenant_id/config", |r| {
    1380           40 :             api_handler(r, get_tenant_config_handler)
    1381          575 :         })
    1382          575 :         .get("/v1/tenant/:tenant_id/timeline", |r| {
    1383           51 :             api_handler(r, timeline_list_handler)
    1384          575 :         })
    1385          907 :         .post("/v1/tenant/:tenant_id/timeline", |r| {
    1386          907 :             api_handler(r, timeline_create_handler)
    1387          907 :         })
    1388          575 :         .post("/v1/tenant/:tenant_id/attach", |r| {
    1389           51 :             api_handler(r, tenant_attach_handler)
    1390          575 :         })
    1391          575 :         .post("/v1/tenant/:tenant_id/detach", |r| {
    1392           40 :             api_handler(r, tenant_detach_handler)
    1393          575 :         })
    1394          575 :         .post("/v1/tenant/:tenant_id/load", |r| {
    1395            7 :             api_handler(r, tenant_load_handler)
    1396          575 :         })
    1397          575 :         .post("/v1/tenant/:tenant_id/ignore", |r| {
    1398            8 :             api_handler(r, tenant_ignore_handler)
    1399          575 :         })
    1400         1440 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
    1401         1440 :             api_handler(r, timeline_detail_handler)
    1402         1440 :         })
    1403          575 :         .get(
    1404          575 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
    1405          575 :             |r| api_handler(r, get_lsn_by_timestamp_handler),
    1406          575 :         )
    1407          575 :         .put("/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", |r| {
    1408          505 :             api_handler(r, timeline_gc_handler)
    1409          575 :         })
    1410          575 :         .put("/v1/tenant/:tenant_id/timeline/:timeline_id/compact", |r| {
    1411          123 :             testing_api_handler("run timeline compaction", r, timeline_compact_handler)
    1412          575 :         })
    1413          575 :         .put(
    1414          575 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
    1415          694 :             |r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler),
    1416          575 :         )
    1417          575 :         .post(
    1418          575 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
    1419          575 :             |r| api_handler(r, timeline_download_remote_layers_handler_post),
    1420          575 :         )
    1421          575 :         .get(
    1422          575 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
    1423          575 :             |r| api_handler(r, timeline_download_remote_layers_handler_get),
    1424          575 :         )
    1425          575 :         .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
    1426          121 :             api_handler(r, timeline_delete_handler)
    1427          575 :         })
    1428          575 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {
    1429          101 :             api_handler(r, layer_map_info_handler)
    1430          575 :         })
    1431          575 :         .get(
    1432          575 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
    1433          575 :             |r| api_handler(r, layer_download_handler),
    1434          575 :         )
    1435          575 :         .delete(
    1436          575 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
    1437          575 :             |r| api_handler(r, evict_timeline_layer_handler),
    1438          575 :         )
    1439          575 :         .put("/v1/disk_usage_eviction/run", |r| {
    1440            5 :             api_handler(r, disk_usage_eviction_run)
    1441          575 :         })
    1442          575 :         .put("/v1/tenant/:tenant_id/break", |r| {
    1443            2 :             testing_api_handler("set tenant state to broken", r, handle_tenant_break)
    1444          575 :         })
    1445          575 :         .get("/v1/panic", |r| api_handler(r, always_panic_handler))
    1446          575 :         .post("/v1/tracing/event", |r| {
    1447            5 :             testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
    1448          575 :         })
    1449          575 :         .any(handler_404))
    1450          575 : }
        

Generated by: LCOV version 2.1-beta