Line data Source code
1 : //!
2 : //! Management HTTP API
3 : //!
4 : use std::collections::HashMap;
5 : use std::sync::Arc;
6 :
7 : use anyhow::{anyhow, Context, Result};
8 : use hyper::StatusCode;
9 : use hyper::{Body, Request, Response, Uri};
10 : use metrics::launch_timestamp::LaunchTimestamp;
11 : use pageserver_api::models::{DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest};
12 : use remote_storage::GenericRemoteStorage;
13 : use storage_broker::BrokerClientChannel;
14 : use tenant_size_model::{SizeResult, StorageModel};
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::*;
17 : use utils::http::endpoint::request_span;
18 : use utils::http::json::json_request_or_empty_body;
19 : use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
20 :
21 : use super::models::{
22 : StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
23 : TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
24 : };
25 : use crate::context::{DownloadBehavior, RequestContext};
26 : use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
27 : use crate::pgdatadir_mapping::LsnForTimestamp;
28 : use crate::task_mgr::TaskKind;
29 : use crate::tenant::config::TenantConfOpt;
30 : use crate::tenant::mgr::{
31 : GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
32 : };
33 : use crate::tenant::size::ModelInputs;
34 : use crate::tenant::storage_layer::LayerAccessStatsReset;
35 : use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
36 : use crate::{config::PageServerConf, tenant::mgr};
37 : use crate::{disk_usage_eviction_task, tenant};
38 : use utils::{
39 : auth::JwtAuth,
40 : http::{
41 : endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
42 : error::{ApiError, HttpErrorBody},
43 : json::{json_request, json_response},
44 : request::parse_request_param,
45 : RequestExt, RouterBuilder,
46 : },
47 : id::{TenantId, TimelineId},
48 : lsn::Lsn,
49 : };
50 :
51 : // Imports only used for testing APIs
52 : use super::models::ConfigureFailpointsRequest;
53 :
54 : struct State {
55 : conf: &'static PageServerConf,
56 : auth: Option<Arc<JwtAuth>>,
57 : allowlist_routes: Vec<Uri>,
58 : remote_storage: Option<GenericRemoteStorage>,
59 : broker_client: storage_broker::BrokerClientChannel,
60 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
61 : }
62 :
63 : impl State {
64 575 : fn new(
65 575 : conf: &'static PageServerConf,
66 575 : auth: Option<Arc<JwtAuth>>,
67 575 : remote_storage: Option<GenericRemoteStorage>,
68 575 : broker_client: storage_broker::BrokerClientChannel,
69 575 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
70 575 : ) -> anyhow::Result<Self> {
71 575 : let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
72 575 : .iter()
73 1725 : .map(|v| v.parse().unwrap())
74 575 : .collect::<Vec<_>>();
75 575 : Ok(Self {
76 575 : conf,
77 575 : auth,
78 575 : allowlist_routes,
79 575 : remote_storage,
80 575 : broker_client,
81 575 : disk_usage_eviction_state,
82 575 : })
83 575 : }
84 : }
85 :
86 : #[inline(always)]
87 2292 : fn get_state(request: &Request<Body>) -> &State {
88 2292 : request
89 2292 : .data::<Arc<State>>()
90 2292 : .expect("unknown state type")
91 2292 : .as_ref()
92 2292 : }
93 :
94 : #[inline(always)]
95 579 : fn get_config(request: &Request<Body>) -> &'static PageServerConf {
96 579 : get_state(request).conf
97 579 : }
98 :
99 : /// Check that the requester is authorized to operate on given tenant
100 6019 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
101 6019 : check_permission_with(request, |claims| {
102 50 : crate::auth::check_permission(claims, tenant_id)
103 6019 : })
104 6019 : }
105 :
106 : impl From<PageReconstructError> for ApiError {
107 0 : fn from(pre: PageReconstructError) -> ApiError {
108 0 : match pre {
109 0 : PageReconstructError::Other(pre) => ApiError::InternalServerError(pre),
110 : PageReconstructError::NeedsDownload(_, _) => {
111 : // This shouldn't happen, because we use a RequestContext that requests to
112 : // download any missing layer files on-demand.
113 0 : ApiError::InternalServerError(anyhow::anyhow!("need to download remote layer file"))
114 : }
115 : PageReconstructError::Cancelled => {
116 0 : ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
117 : }
118 : PageReconstructError::AncestorStopping(_) => {
119 0 : ApiError::InternalServerError(anyhow::Error::new(pre))
120 : }
121 0 : PageReconstructError::WalRedo(pre) => {
122 0 : ApiError::InternalServerError(anyhow::Error::new(pre))
123 : }
124 : }
125 0 : }
126 : }
127 :
128 : impl From<TenantMapInsertError> for ApiError {
129 11 : fn from(tmie: TenantMapInsertError) -> ApiError {
130 11 : match tmie {
131 : TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
132 0 : ApiError::InternalServerError(anyhow::Error::new(tmie))
133 : }
134 8 : TenantMapInsertError::TenantAlreadyExists(id, state) => {
135 8 : ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
136 : }
137 3 : TenantMapInsertError::Closure(e) => ApiError::InternalServerError(e),
138 : }
139 11 : }
140 : }
141 :
142 : impl From<TenantStateError> for ApiError {
143 3 : fn from(tse: TenantStateError) -> ApiError {
144 3 : match tse {
145 3 : TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
146 0 : _ => ApiError::InternalServerError(anyhow::Error::new(tse)),
147 : }
148 3 : }
149 : }
150 :
151 : impl From<GetTenantError> for ApiError {
152 94 : fn from(tse: GetTenantError) -> ApiError {
153 94 : match tse {
154 91 : GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
155 3 : e @ GetTenantError::NotActive(_) => {
156 3 : // Why is this not `ApiError::NotFound`?
157 3 : // Because we must be careful to never return 404 for a tenant if it does
158 3 : // in fact exist locally. If we did, the caller could draw the conclusion
159 3 : // that it can attach the tenant to another PS and we'd be in split-brain.
160 3 : //
161 3 : // (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
162 3 : ApiError::InternalServerError(anyhow::Error::new(e))
163 : }
164 : }
165 94 : }
166 : }
167 :
168 : impl From<SetNewTenantConfigError> for ApiError {
169 0 : fn from(e: SetNewTenantConfigError) -> ApiError {
170 0 : match e {
171 0 : SetNewTenantConfigError::GetTenant(tid) => {
172 0 : ApiError::NotFound(anyhow!("tenant {}", tid).into())
173 : }
174 0 : e @ SetNewTenantConfigError::Persist(_) => {
175 0 : ApiError::InternalServerError(anyhow::Error::new(e))
176 : }
177 : }
178 0 : }
179 : }
180 :
181 : impl From<crate::tenant::DeleteTimelineError> for ApiError {
182 17 : fn from(value: crate::tenant::DeleteTimelineError) -> Self {
183 17 : use crate::tenant::DeleteTimelineError::*;
184 17 : match value {
185 1 : NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
186 1 : HasChildren(children) => ApiError::PreconditionFailed(
187 1 : format!("Cannot delete timeline which has child timelines: {children:?}")
188 1 : .into_boxed_str(),
189 1 : ),
190 3 : a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
191 12 : Other(e) => ApiError::InternalServerError(e),
192 : }
193 17 : }
194 : }
195 :
196 : impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
197 : fn from(value: crate::tenant::mgr::DeleteTimelineError) -> Self {
198 : use crate::tenant::mgr::DeleteTimelineError::*;
199 1 : match value {
200 : // Report Precondition failed so client can distinguish between
201 : // "tenant is missing" case from "timeline is missing"
202 1 : Tenant(GetTenantError::NotFound(..)) => ApiError::PreconditionFailed(
203 1 : "Requested tenant is missing".to_owned().into_boxed_str(),
204 1 : ),
205 0 : Tenant(t) => ApiError::from(t),
206 17 : Timeline(t) => ApiError::from(t),
207 : }
208 18 : }
209 : }
210 :
211 : impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
212 28 : fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
213 28 : use crate::tenant::delete::DeleteTenantError::*;
214 28 : match value {
215 4 : Get(g) => ApiError::from(g),
216 0 : e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
217 0 : Timeline(t) => ApiError::from(t),
218 24 : Other(o) => ApiError::InternalServerError(o),
219 0 : e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
220 : }
221 28 : }
222 : }
223 :
224 : // Helper function to construct a TimelineInfo struct for a timeline
225 1443 : async fn build_timeline_info(
226 1443 : timeline: &Arc<Timeline>,
227 1443 : include_non_incremental_logical_size: bool,
228 1443 : ctx: &RequestContext,
229 1443 : ) -> anyhow::Result<TimelineInfo> {
230 1443 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
231 :
232 1443 : let mut info = build_timeline_info_common(timeline, ctx).await?;
233 1443 : if include_non_incremental_logical_size {
234 : // XXX we should be using spawn_ondemand_logical_size_calculation here.
235 : // Otherwise, if someone deletes the timeline / detaches the tenant while
236 : // we're executing this function, we will outlive the timeline on-disk state.
237 : info.current_logical_size_non_incremental = Some(
238 19 : timeline
239 19 : .get_current_logical_size_non_incremental(
240 19 : info.last_record_lsn,
241 19 : CancellationToken::new(),
242 19 : ctx,
243 19 : )
244 654 : .await?,
245 : );
246 1424 : }
247 1442 : Ok(info)
248 1443 : }
249 :
250 2337 : async fn build_timeline_info_common(
251 2337 : timeline: &Arc<Timeline>,
252 2337 : ctx: &RequestContext,
253 2337 : ) -> anyhow::Result<TimelineInfo> {
254 2337 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
255 2337 : let last_record_lsn = timeline.get_last_record_lsn();
256 2337 : let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
257 2337 : let guard = timeline.last_received_wal.lock().unwrap();
258 2337 : if let Some(info) = guard.as_ref() {
259 1234 : (
260 1234 : Some(format!("{:?}", info.wal_source_connconf)), // Password is hidden, but it's for statistics only.
261 1234 : Some(info.last_received_msg_lsn),
262 1234 : Some(info.last_received_msg_ts),
263 1234 : )
264 : } else {
265 1103 : (None, None, None)
266 : }
267 : };
268 :
269 2337 : let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
270 2337 : let ancestor_lsn = match timeline.get_ancestor_lsn() {
271 1770 : Lsn(0) => None,
272 567 : lsn @ Lsn(_) => Some(lsn),
273 : };
274 2337 : let current_logical_size = match timeline.get_current_logical_size(ctx) {
275 2337 : Ok((size, _)) => Some(size),
276 0 : Err(err) => {
277 0 : error!("Timeline info creation failed to get current logical size: {err:?}");
278 0 : None
279 : }
280 : };
281 2337 : let current_physical_size = Some(timeline.layer_size_sum().await);
282 2337 : let state = timeline.current_state();
283 2337 : let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
284 2337 :
285 2337 : let info = TimelineInfo {
286 2337 : tenant_id: timeline.tenant_id,
287 2337 : timeline_id: timeline.timeline_id,
288 2337 : ancestor_timeline_id,
289 2337 : ancestor_lsn,
290 2337 : disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
291 2337 : remote_consistent_lsn,
292 2337 : last_record_lsn,
293 2337 : prev_record_lsn: Some(timeline.get_prev_record_lsn()),
294 2337 : latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
295 2337 : current_logical_size,
296 2337 : current_physical_size,
297 2337 : current_logical_size_non_incremental: None,
298 2337 : timeline_dir_layer_file_size_sum: None,
299 2337 : wal_source_connstr,
300 2337 : last_received_msg_lsn,
301 2337 : last_received_msg_ts,
302 2337 : pg_version: timeline.pg_version,
303 2337 :
304 2337 : state,
305 2337 : };
306 2337 : Ok(info)
307 2337 : }
308 :
309 : // healthcheck handler
310 579 : async fn status_handler(
311 579 : request: Request<Body>,
312 579 : _cancel: CancellationToken,
313 579 : ) -> Result<Response<Body>, ApiError> {
314 579 : check_permission(&request, None)?;
315 579 : let config = get_config(&request);
316 579 : json_response(StatusCode::OK, StatusResponse { id: config.id })
317 579 : }
318 :
319 907 : async fn timeline_create_handler(
320 907 : mut request: Request<Body>,
321 907 : _cancel: CancellationToken,
322 907 : ) -> Result<Response<Body>, ApiError> {
323 907 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
324 907 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
325 907 : check_permission(&request, Some(tenant_id))?;
326 :
327 906 : let new_timeline_id = request_data.new_timeline_id;
328 906 :
329 906 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
330 906 :
331 906 : let state = get_state(&request);
332 906 :
333 906 : async {
334 906 : let tenant = mgr::get_tenant(tenant_id, true).await?;
335 906 : match tenant.create_timeline(
336 906 : new_timeline_id,
337 906 : request_data.ancestor_timeline_id.map(TimelineId::from),
338 906 : request_data.ancestor_start_lsn,
339 906 : request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
340 906 : state.broker_client.clone(),
341 906 : &ctx,
342 906 : )
343 3211326 : .await {
344 894 : Ok(new_timeline) => {
345 : // Created. Construct a TimelineInfo for it.
346 894 : let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
347 0 : .await
348 894 : .map_err(ApiError::InternalServerError)?;
349 894 : json_response(StatusCode::CREATED, timeline_info)
350 : }
351 : Err(tenant::CreateTimelineError::AlreadyExists) => {
352 1 : json_response(StatusCode::CONFLICT, ())
353 : }
354 8 : Err(tenant::CreateTimelineError::AncestorLsn(err)) => {
355 8 : json_response(StatusCode::NOT_ACCEPTABLE, HttpErrorBody::from_msg(
356 8 : format!("{err:#}")
357 8 : ))
358 : }
359 2 : Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
360 : }
361 905 : }
362 906 : .instrument(info_span!("timeline_create", %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
363 3211326 : .await
364 906 : }
365 :
366 51 : async fn timeline_list_handler(
367 51 : request: Request<Body>,
368 51 : _cancel: CancellationToken,
369 51 : ) -> Result<Response<Body>, ApiError> {
370 51 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
371 51 : let include_non_incremental_logical_size: Option<bool> =
372 51 : parse_query_param(&request, "include-non-incremental-logical-size")?;
373 51 : check_permission(&request, Some(tenant_id))?;
374 :
375 51 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
376 :
377 51 : let response_data = async {
378 51 : let tenant = mgr::get_tenant(tenant_id, true).await?;
379 51 : let timelines = tenant.list_timelines();
380 51 :
381 51 : let mut response_data = Vec::with_capacity(timelines.len());
382 145 : for timeline in timelines {
383 94 : let timeline_info = build_timeline_info(
384 94 : &timeline,
385 94 : include_non_incremental_logical_size.unwrap_or(false),
386 94 : &ctx,
387 94 : )
388 94 : .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
389 0 : .await
390 94 : .context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
391 94 : .map_err(ApiError::InternalServerError)?;
392 :
393 94 : response_data.push(timeline_info);
394 : }
395 51 : Ok::<Vec<TimelineInfo>, ApiError>(response_data)
396 51 : }
397 51 : .instrument(info_span!("timeline_list", %tenant_id))
398 0 : .await?;
399 :
400 51 : json_response(StatusCode::OK, response_data)
401 51 : }
402 :
403 1440 : async fn timeline_detail_handler(
404 1440 : request: Request<Body>,
405 1440 : _cancel: CancellationToken,
406 1440 : ) -> Result<Response<Body>, ApiError> {
407 1440 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
408 1440 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
409 1440 : let include_non_incremental_logical_size: Option<bool> =
410 1440 : parse_query_param(&request, "include-non-incremental-logical-size")?;
411 1440 : check_permission(&request, Some(tenant_id))?;
412 :
413 : // Logical size calculation needs downloading.
414 1440 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
415 :
416 1440 : let timeline_info = async {
417 1440 : let tenant = mgr::get_tenant(tenant_id, true).await?;
418 :
419 1437 : let timeline = tenant
420 1437 : .get_timeline(timeline_id, false)
421 1437 : .map_err(|e| ApiError::NotFound(e.into()))?;
422 :
423 1349 : let timeline_info = build_timeline_info(
424 1349 : &timeline,
425 1349 : include_non_incremental_logical_size.unwrap_or(false),
426 1349 : &ctx,
427 1349 : )
428 667 : .await
429 1349 : .context("get local timeline info")
430 1349 : .map_err(ApiError::InternalServerError)?;
431 :
432 1348 : Ok::<_, ApiError>(timeline_info)
433 1440 : }
434 1440 : .instrument(info_span!("timeline_detail", %tenant_id, %timeline_id))
435 667 : .await?;
436 :
437 1348 : json_response(StatusCode::OK, timeline_info)
438 1440 : }
439 :
440 12 : async fn get_lsn_by_timestamp_handler(
441 12 : request: Request<Body>,
442 12 : _cancel: CancellationToken,
443 12 : ) -> Result<Response<Body>, ApiError> {
444 12 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
445 12 : check_permission(&request, Some(tenant_id))?;
446 :
447 12 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
448 12 : let timestamp_raw = must_get_query_param(&request, "timestamp")?;
449 12 : let timestamp = humantime::parse_rfc3339(×tamp_raw)
450 12 : .with_context(|| format!("Invalid time: {:?}", timestamp_raw))
451 12 : .map_err(ApiError::BadRequest)?;
452 12 : let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
453 12 :
454 12 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
455 12 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
456 662 : let result = timeline.find_lsn_for_timestamp(timestamp_pg, &ctx).await?;
457 :
458 12 : let result = match result {
459 10 : LsnForTimestamp::Present(lsn) => format!("{lsn}"),
460 1 : LsnForTimestamp::Future(_lsn) => "future".into(),
461 1 : LsnForTimestamp::Past(_lsn) => "past".into(),
462 0 : LsnForTimestamp::NoData(_lsn) => "nodata".into(),
463 : };
464 12 : json_response(StatusCode::OK, result)
465 12 : }
466 :
467 51 : async fn tenant_attach_handler(
468 51 : mut request: Request<Body>,
469 51 : _cancel: CancellationToken,
470 51 : ) -> Result<Response<Body>, ApiError> {
471 51 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
472 51 : check_permission(&request, Some(tenant_id))?;
473 :
474 51 : let maybe_body: Option<TenantAttachRequest> = json_request_or_empty_body(&mut request).await?;
475 48 : let tenant_conf = match maybe_body {
476 46 : Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?,
477 2 : None => TenantConfOpt::default(),
478 : };
479 :
480 48 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
481 :
482 48 : info!("Handling tenant attach {tenant_id}");
483 :
484 48 : let state = get_state(&request);
485 :
486 48 : if let Some(remote_storage) = &state.remote_storage {
487 48 : mgr::attach_tenant(
488 48 : state.conf,
489 48 : tenant_id,
490 48 : tenant_conf,
491 48 : state.broker_client.clone(),
492 48 : remote_storage.clone(),
493 48 : &ctx,
494 48 : )
495 48 : .instrument(info_span!("tenant_attach", %tenant_id))
496 9 : .await?;
497 : } else {
498 0 : return Err(ApiError::BadRequest(anyhow!(
499 0 : "attach_tenant is not possible because pageserver was configured without remote storage"
500 0 : )));
501 : }
502 :
503 39 : json_response(StatusCode::ACCEPTED, ())
504 51 : }
505 :
506 121 : async fn timeline_delete_handler(
507 121 : request: Request<Body>,
508 121 : _cancel: CancellationToken,
509 121 : ) -> Result<Response<Body>, ApiError> {
510 121 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
511 121 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
512 121 : check_permission(&request, Some(tenant_id))?;
513 :
514 121 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
515 121 :
516 121 : mgr::delete_timeline(tenant_id, timeline_id, &ctx)
517 121 : .instrument(info_span!("timeline_delete", %tenant_id, %timeline_id))
518 543 : .await?;
519 :
520 103 : json_response(StatusCode::ACCEPTED, ())
521 121 : }
522 :
523 40 : async fn tenant_detach_handler(
524 40 : request: Request<Body>,
525 40 : _cancel: CancellationToken,
526 40 : ) -> Result<Response<Body>, ApiError> {
527 40 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
528 40 : check_permission(&request, Some(tenant_id))?;
529 40 : let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
530 :
531 40 : let state = get_state(&request);
532 40 : let conf = state.conf;
533 40 : mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false))
534 40 : .instrument(info_span!("tenant_detach", %tenant_id))
535 270 : .await?;
536 :
537 37 : json_response(StatusCode::OK, ())
538 40 : }
539 :
540 7 : async fn tenant_load_handler(
541 7 : request: Request<Body>,
542 7 : _cancel: CancellationToken,
543 7 : ) -> Result<Response<Body>, ApiError> {
544 7 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
545 7 : check_permission(&request, Some(tenant_id))?;
546 :
547 7 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
548 7 :
549 7 : let state = get_state(&request);
550 7 : mgr::load_tenant(
551 7 : state.conf,
552 7 : tenant_id,
553 7 : state.broker_client.clone(),
554 7 : state.remote_storage.clone(),
555 7 : &ctx,
556 7 : )
557 7 : .instrument(info_span!("load", %tenant_id))
558 1 : .await?;
559 :
560 6 : json_response(StatusCode::ACCEPTED, ())
561 7 : }
562 :
563 8 : async fn tenant_ignore_handler(
564 8 : request: Request<Body>,
565 8 : _cancel: CancellationToken,
566 8 : ) -> Result<Response<Body>, ApiError> {
567 8 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
568 8 : check_permission(&request, Some(tenant_id))?;
569 :
570 8 : let state = get_state(&request);
571 8 : let conf = state.conf;
572 8 : mgr::ignore_tenant(conf, tenant_id)
573 8 : .instrument(info_span!("ignore_tenant", %tenant_id))
574 34 : .await?;
575 :
576 8 : json_response(StatusCode::OK, ())
577 8 : }
578 :
579 77 : async fn tenant_list_handler(
580 77 : request: Request<Body>,
581 77 : _cancel: CancellationToken,
582 77 : ) -> Result<Response<Body>, ApiError> {
583 77 : check_permission(&request, None)?;
584 :
585 77 : let response_data = mgr::list_tenants()
586 77 : .instrument(info_span!("tenant_list"))
587 0 : .await
588 77 : .map_err(anyhow::Error::new)
589 77 : .map_err(ApiError::InternalServerError)?
590 77 : .iter()
591 132 : .map(|(id, state)| TenantInfo {
592 132 : id: *id,
593 132 : state: state.clone(),
594 132 : current_physical_size: None,
595 132 : attachment_status: state.attachment_status(),
596 132 : })
597 77 : .collect::<Vec<TenantInfo>>();
598 77 :
599 77 : json_response(StatusCode::OK, response_data)
600 77 : }
601 :
602 504 : async fn tenant_status(
603 504 : request: Request<Body>,
604 504 : _cancel: CancellationToken,
605 504 : ) -> Result<Response<Body>, ApiError> {
606 504 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
607 504 : check_permission(&request, Some(tenant_id))?;
608 :
609 504 : let tenant_info = async {
610 504 : let tenant = mgr::get_tenant(tenant_id, false).await?;
611 :
612 : // Calculate total physical size of all timelines
613 417 : let mut current_physical_size = 0;
614 444 : for timeline in tenant.list_timelines().iter() {
615 444 : current_physical_size += timeline.layer_size_sum().await;
616 : }
617 :
618 417 : let state = tenant.current_state();
619 417 : Result::<_, ApiError>::Ok(TenantInfo {
620 417 : id: tenant_id,
621 417 : state: state.clone(),
622 417 : current_physical_size: Some(current_physical_size),
623 417 : attachment_status: state.attachment_status(),
624 417 : })
625 504 : }
626 504 : .instrument(info_span!("tenant_status_handler", %tenant_id))
627 87 : .await?;
628 :
629 417 : json_response(StatusCode::OK, tenant_info)
630 504 : }
631 :
632 132 : async fn tenant_delete_handler(
633 132 : request: Request<Body>,
634 132 : _cancel: CancellationToken,
635 132 : ) -> Result<Response<Body>, ApiError> {
636 : // TODO openapi spec
637 132 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
638 132 : check_permission(&request, Some(tenant_id))?;
639 :
640 132 : let state = get_state(&request);
641 132 :
642 132 : mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id)
643 132 : .instrument(info_span!("tenant_delete_handler", %tenant_id))
644 675 : .await?;
645 :
646 104 : json_response(StatusCode::ACCEPTED, ())
647 132 : }
648 :
649 : /// HTTP endpoint to query the current tenant_size of a tenant.
650 : ///
651 : /// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used
652 : /// to debug any of the calculations. Requires `tenant_id` request parameter, supports
653 : /// `inputs_only=true|false` (default false) which supports debugging failure to calculate model
654 : /// values.
655 : ///
656 : /// 'retention_period' query parameter overrides the cutoff that is used to calculate the size
657 : /// (only if it is shorter than the real cutoff).
658 : ///
659 : /// Note: we don't update the cached size and prometheus metric here.
660 : /// The retention period might be different, and it's nice to have a method to just calculate it
661 : /// without modifying anything anyway.
662 53 : async fn tenant_size_handler(
663 53 : request: Request<Body>,
664 53 : _cancel: CancellationToken,
665 53 : ) -> Result<Response<Body>, ApiError> {
666 53 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
667 53 : check_permission(&request, Some(tenant_id))?;
668 53 : let inputs_only: Option<bool> = parse_query_param(&request, "inputs_only")?;
669 53 : let retention_period: Option<u64> = parse_query_param(&request, "retention_period")?;
670 53 : let headers = request.headers();
671 53 :
672 53 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
673 53 : let tenant = mgr::get_tenant(tenant_id, true).await?;
674 :
675 : // this can be long operation
676 53 : let inputs = tenant
677 53 : .gather_size_inputs(
678 53 : retention_period,
679 53 : LogicalSizeCalculationCause::TenantSizeHandler,
680 53 : &ctx,
681 53 : )
682 37 : .await
683 53 : .map_err(ApiError::InternalServerError)?;
684 :
685 53 : let mut sizes = None;
686 53 : if !inputs_only.unwrap_or(false) {
687 53 : let storage_model = inputs
688 53 : .calculate_model()
689 53 : .map_err(ApiError::InternalServerError)?;
690 53 : let size = storage_model.calculate();
691 53 :
692 53 : // If request header expects html, return html
693 53 : if headers["Accept"] == "text/html" {
694 18 : return synthetic_size_html_response(inputs, storage_model, size);
695 35 : }
696 35 : sizes = Some(size);
697 0 : } else if headers["Accept"] == "text/html" {
698 0 : return Err(ApiError::BadRequest(anyhow!(
699 0 : "inputs_only parameter is incompatible with html output request"
700 0 : )));
701 0 : }
702 :
703 : /// The type resides in the pageserver not to expose `ModelInputs`.
704 : #[serde_with::serde_as]
705 35 : #[derive(serde::Serialize)]
706 : struct TenantHistorySize {
707 : #[serde_as(as = "serde_with::DisplayFromStr")]
708 : id: TenantId,
709 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
710 : ///
711 : /// Will be none if `?inputs_only=true` was given.
712 : size: Option<u64>,
713 : /// Size of each segment used in the model.
714 : /// Will be null if `?inputs_only=true` was given.
715 : segment_sizes: Option<Vec<tenant_size_model::SegmentSizeResult>>,
716 : inputs: crate::tenant::size::ModelInputs,
717 : }
718 :
719 35 : json_response(
720 35 : StatusCode::OK,
721 35 : TenantHistorySize {
722 35 : id: tenant_id,
723 35 : size: sizes.as_ref().map(|x| x.total_size),
724 35 : segment_sizes: sizes.map(|x| x.segments),
725 35 : inputs,
726 35 : },
727 35 : )
728 53 : }
729 :
730 101 : async fn layer_map_info_handler(
731 101 : request: Request<Body>,
732 101 : _cancel: CancellationToken,
733 101 : ) -> Result<Response<Body>, ApiError> {
734 101 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
735 101 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
736 101 : let reset: LayerAccessStatsReset =
737 101 : parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset);
738 101 :
739 101 : check_permission(&request, Some(tenant_id))?;
740 :
741 101 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
742 101 : let layer_map_info = timeline.layer_map_info(reset).await;
743 :
744 101 : json_response(StatusCode::OK, layer_map_info)
745 101 : }
746 :
747 6 : async fn layer_download_handler(
748 6 : request: Request<Body>,
749 6 : _cancel: CancellationToken,
750 6 : ) -> Result<Response<Body>, ApiError> {
751 6 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
752 6 : check_permission(&request, Some(tenant_id))?;
753 6 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
754 6 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
755 6 : check_permission(&request, Some(tenant_id))?;
756 :
757 6 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
758 6 : let downloaded = timeline
759 6 : .download_layer(layer_file_name)
760 6 : .await
761 6 : .map_err(ApiError::InternalServerError)?;
762 :
763 6 : match downloaded {
764 6 : Some(true) => json_response(StatusCode::OK, ()),
765 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
766 0 : None => json_response(
767 0 : StatusCode::BAD_REQUEST,
768 0 : format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"),
769 0 : ),
770 : }
771 6 : }
772 :
773 25 : async fn evict_timeline_layer_handler(
774 25 : request: Request<Body>,
775 25 : _cancel: CancellationToken,
776 25 : ) -> Result<Response<Body>, ApiError> {
777 25 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
778 25 : check_permission(&request, Some(tenant_id))?;
779 25 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
780 25 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
781 :
782 25 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
783 25 : let evicted = timeline
784 25 : .evict_layer(layer_file_name)
785 2 : .await
786 25 : .map_err(ApiError::InternalServerError)?;
787 :
788 25 : match evicted {
789 25 : Some(true) => json_response(StatusCode::OK, ()),
790 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
791 0 : None => json_response(
792 0 : StatusCode::BAD_REQUEST,
793 0 : format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"),
794 0 : ),
795 : }
796 25 : }
797 :
798 : /// Get tenant_size SVG graph along with the JSON data.
799 18 : fn synthetic_size_html_response(
800 18 : inputs: ModelInputs,
801 18 : storage_model: StorageModel,
802 18 : sizes: SizeResult,
803 18 : ) -> Result<Response<Body>, ApiError> {
804 18 : let mut timeline_ids: Vec<String> = Vec::new();
805 18 : let mut timeline_map: HashMap<TimelineId, usize> = HashMap::new();
806 26 : for (index, ti) in inputs.timeline_inputs.iter().enumerate() {
807 26 : timeline_map.insert(ti.timeline_id, index);
808 26 : timeline_ids.push(ti.timeline_id.to_string());
809 26 : }
810 18 : let seg_to_branch: Vec<usize> = inputs
811 18 : .segments
812 18 : .iter()
813 66 : .map(|seg| *timeline_map.get(&seg.timeline_id).unwrap())
814 18 : .collect();
815 :
816 18 : let svg =
817 18 : tenant_size_model::svg::draw_svg(&storage_model, &timeline_ids, &seg_to_branch, &sizes)
818 18 : .map_err(ApiError::InternalServerError)?;
819 :
820 18 : let mut response = String::new();
821 18 :
822 18 : use std::fmt::Write;
823 18 : write!(response, "<html>\n<body>\n").unwrap();
824 18 : write!(response, "<div>\n{svg}\n</div>").unwrap();
825 18 : writeln!(response, "Project size: {}", sizes.total_size).unwrap();
826 18 : writeln!(response, "<pre>").unwrap();
827 18 : writeln!(
828 18 : response,
829 18 : "{}",
830 18 : serde_json::to_string_pretty(&inputs).unwrap()
831 18 : )
832 18 : .unwrap();
833 18 : writeln!(
834 18 : response,
835 18 : "{}",
836 18 : serde_json::to_string_pretty(&sizes.segments).unwrap()
837 18 : )
838 18 : .unwrap();
839 18 : writeln!(response, "</pre>").unwrap();
840 18 : write!(response, "</body>\n</html>\n").unwrap();
841 18 :
842 18 : html_response(StatusCode::OK, response)
843 18 : }
844 :
845 18 : pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>, ApiError> {
846 18 : let response = Response::builder()
847 18 : .status(status)
848 18 : .header(hyper::header::CONTENT_TYPE, "text/html")
849 18 : .body(Body::from(data.as_bytes().to_vec()))
850 18 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
851 18 : Ok(response)
852 18 : }
853 :
854 481 : async fn tenant_create_handler(
855 481 : mut request: Request<Body>,
856 481 : _cancel: CancellationToken,
857 481 : ) -> Result<Response<Body>, ApiError> {
858 481 : let request_data: TenantCreateRequest = json_request(&mut request).await?;
859 481 : let target_tenant_id = request_data.new_tenant_id;
860 481 : check_permission(&request, None)?;
861 :
862 480 : let _timer = STORAGE_TIME_GLOBAL
863 480 : .get_metric_with_label_values(&[StorageTimeOperation::CreateTenant.into()])
864 480 : .expect("bug")
865 480 : .start_timer();
866 :
867 480 : let tenant_conf =
868 480 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
869 :
870 480 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
871 480 :
872 480 : let state = get_state(&request);
873 :
874 480 : let new_tenant = mgr::create_tenant(
875 480 : state.conf,
876 480 : tenant_conf,
877 480 : target_tenant_id,
878 480 : state.broker_client.clone(),
879 480 : state.remote_storage.clone(),
880 480 : &ctx,
881 480 : )
882 480 : .instrument(info_span!("tenant_create", tenant_id = %target_tenant_id))
883 1 : .await?;
884 :
885 : // We created the tenant. Existing API semantics are that the tenant
886 : // is Active when this function returns.
887 686 : if let res @ Err(_) = new_tenant.wait_to_become_active().await {
888 : // This shouldn't happen because we just created the tenant directory
889 : // in tenant::mgr::create_tenant, and there aren't any remote timelines
890 : // to load, so, nothing can really fail during load.
891 : // Don't do cleanup because we don't know how we got here.
892 : // The tenant will likely be in `Broken` state and subsequent
893 : // calls will fail.
894 0 : res.context("created tenant failed to become active")
895 0 : .map_err(ApiError::InternalServerError)?;
896 479 : }
897 :
898 479 : json_response(
899 479 : StatusCode::CREATED,
900 479 : TenantCreateResponse(new_tenant.tenant_id()),
901 479 : )
902 481 : }
903 :
904 40 : async fn get_tenant_config_handler(
905 40 : request: Request<Body>,
906 40 : _cancel: CancellationToken,
907 40 : ) -> Result<Response<Body>, ApiError> {
908 40 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
909 40 : check_permission(&request, Some(tenant_id))?;
910 :
911 40 : let tenant = mgr::get_tenant(tenant_id, false).await?;
912 :
913 40 : let response = HashMap::from([
914 : (
915 : "tenant_specific_overrides",
916 40 : serde_json::to_value(tenant.tenant_specific_overrides())
917 40 : .context("serializing tenant specific overrides")
918 40 : .map_err(ApiError::InternalServerError)?,
919 : ),
920 : (
921 40 : "effective_config",
922 40 : serde_json::to_value(tenant.effective_config())
923 40 : .context("serializing effective config")
924 40 : .map_err(ApiError::InternalServerError)?,
925 : ),
926 : ]);
927 :
928 40 : json_response(StatusCode::OK, response)
929 40 : }
930 :
931 27 : async fn update_tenant_config_handler(
932 27 : mut request: Request<Body>,
933 27 : _cancel: CancellationToken,
934 27 : ) -> Result<Response<Body>, ApiError> {
935 27 : let request_data: TenantConfigRequest = json_request(&mut request).await?;
936 27 : let tenant_id = request_data.tenant_id;
937 27 : check_permission(&request, Some(tenant_id))?;
938 :
939 27 : let tenant_conf =
940 27 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
941 :
942 27 : let state = get_state(&request);
943 27 : mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
944 27 : .instrument(info_span!("tenant_config", %tenant_id))
945 0 : .await?;
946 :
947 27 : json_response(StatusCode::OK, ())
948 27 : }
949 :
950 : /// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
951 2 : async fn handle_tenant_break(
952 2 : r: Request<Body>,
953 2 : _cancel: CancellationToken,
954 2 : ) -> Result<Response<Body>, ApiError> {
955 2 : let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?;
956 :
957 2 : let tenant = crate::tenant::mgr::get_tenant(tenant_id, true)
958 0 : .await
959 2 : .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
960 :
961 2 : tenant.set_broken("broken from test".to_owned()).await;
962 :
963 2 : json_response(StatusCode::OK, ())
964 2 : }
965 :
966 246 : async fn failpoints_handler(
967 246 : mut request: Request<Body>,
968 246 : _cancel: CancellationToken,
969 246 : ) -> Result<Response<Body>, ApiError> {
970 246 : if !fail::has_failpoints() {
971 0 : return Err(ApiError::BadRequest(anyhow!(
972 0 : "Cannot manage failpoints because pageserver was compiled without failpoints support"
973 0 : )));
974 246 : }
975 :
976 246 : let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
977 497 : for fp in failpoints {
978 251 : info!("cfg failpoint: {} {}", fp.name, fp.actions);
979 :
980 : // We recognize one extra "action" that's not natively recognized
981 : // by the failpoints crate: exit, to immediately kill the process
982 251 : let cfg_result = crate::failpoint_support::apply_failpoint(&fp.name, &fp.actions);
983 :
984 251 : if let Err(err_msg) = cfg_result {
985 0 : return Err(ApiError::BadRequest(anyhow!(
986 0 : "Failed to configure failpoints: {err_msg}"
987 0 : )));
988 251 : }
989 : }
990 :
991 246 : json_response(StatusCode::OK, ())
992 246 : }
993 :
994 : // Run GC immediately on given timeline.
995 505 : async fn timeline_gc_handler(
996 505 : mut request: Request<Body>,
997 505 : _cancel: CancellationToken,
998 505 : ) -> Result<Response<Body>, ApiError> {
999 505 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1000 505 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1001 505 : check_permission(&request, Some(tenant_id))?;
1002 :
1003 505 : let gc_req: TimelineGcRequest = json_request(&mut request).await?;
1004 :
1005 505 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1006 505 : let wait_task_done = mgr::immediate_gc(tenant_id, timeline_id, gc_req, &ctx).await?;
1007 504 : let gc_result = wait_task_done
1008 504 : .await
1009 504 : .context("wait for gc task")
1010 504 : .map_err(ApiError::InternalServerError)?
1011 504 : .map_err(ApiError::InternalServerError)?;
1012 :
1013 503 : json_response(StatusCode::OK, gc_result)
1014 505 : }
1015 :
1016 : // Run compaction immediately on given timeline.
1017 123 : async fn timeline_compact_handler(
1018 123 : request: Request<Body>,
1019 123 : cancel: CancellationToken,
1020 123 : ) -> Result<Response<Body>, ApiError> {
1021 123 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1022 123 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1023 123 : check_permission(&request, Some(tenant_id))?;
1024 :
1025 123 : async {
1026 123 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1027 123 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
1028 123 : timeline
1029 123 : .compact(&cancel, &ctx)
1030 11894 : .await
1031 123 : .map_err(ApiError::InternalServerError)?;
1032 123 : json_response(StatusCode::OK, ())
1033 123 : }
1034 123 : .instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
1035 11894 : .await
1036 123 : }
1037 :
1038 : // Run checkpoint immediately on given timeline.
1039 694 : async fn timeline_checkpoint_handler(
1040 694 : request: Request<Body>,
1041 694 : cancel: CancellationToken,
1042 694 : ) -> Result<Response<Body>, ApiError> {
1043 694 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1044 694 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1045 694 : check_permission(&request, Some(tenant_id))?;
1046 694 : async {
1047 694 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1048 694 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
1049 694 : timeline
1050 694 : .freeze_and_flush()
1051 775 : .await
1052 694 : .map_err(ApiError::InternalServerError)?;
1053 694 : timeline
1054 694 : .compact(&cancel, &ctx)
1055 241577 : .await
1056 694 : .map_err(ApiError::InternalServerError)?;
1057 :
1058 693 : json_response(StatusCode::OK, ())
1059 694 : }
1060 694 : .instrument(info_span!("manual_checkpoint", %tenant_id, %timeline_id))
1061 242352 : .await
1062 694 : }
1063 :
1064 3 : async fn timeline_download_remote_layers_handler_post(
1065 3 : mut request: Request<Body>,
1066 3 : _cancel: CancellationToken,
1067 3 : ) -> Result<Response<Body>, ApiError> {
1068 3 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1069 3 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1070 3 : let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?;
1071 3 : check_permission(&request, Some(tenant_id))?;
1072 :
1073 3 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
1074 3 : match timeline.spawn_download_all_remote_layers(body).await {
1075 3 : Ok(st) => json_response(StatusCode::ACCEPTED, st),
1076 0 : Err(st) => json_response(StatusCode::CONFLICT, st),
1077 : }
1078 3 : }
1079 :
1080 21 : async fn timeline_download_remote_layers_handler_get(
1081 21 : request: Request<Body>,
1082 21 : _cancel: CancellationToken,
1083 21 : ) -> Result<Response<Body>, ApiError> {
1084 21 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1085 21 : check_permission(&request, Some(tenant_id))?;
1086 21 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1087 :
1088 21 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
1089 21 : let info = timeline
1090 21 : .get_download_all_remote_layers_task_info()
1091 21 : .context("task never started since last pageserver process start")
1092 21 : .map_err(|e| ApiError::NotFound(e.into()))?;
1093 21 : json_response(StatusCode::OK, info)
1094 21 : }
1095 :
1096 985 : async fn active_timeline_of_active_tenant(
1097 985 : tenant_id: TenantId,
1098 985 : timeline_id: TimelineId,
1099 985 : ) -> Result<Arc<Timeline>, ApiError> {
1100 985 : let tenant = mgr::get_tenant(tenant_id, true).await?;
1101 985 : tenant
1102 985 : .get_timeline(timeline_id, true)
1103 985 : .map_err(|e| ApiError::NotFound(e.into()))
1104 985 : }
1105 :
1106 0 : async fn always_panic_handler(
1107 0 : req: Request<Body>,
1108 0 : _cancel: CancellationToken,
1109 0 : ) -> Result<Response<Body>, ApiError> {
1110 0 : // Deliberately cause a panic to exercise the panic hook registered via std::panic::set_hook().
1111 0 : // For pageserver, the relevant panic hook is `tracing_panic_hook` , and the `sentry` crate's wrapper around it.
1112 0 : // Use catch_unwind to ensure that tokio nor hyper are distracted by our panic.
1113 0 : let query = req.uri().query();
1114 0 : let _ = std::panic::catch_unwind(|| {
1115 0 : panic!("unconditional panic for testing panic hook integration; request query: {query:?}")
1116 0 : });
1117 0 : json_response(StatusCode::NO_CONTENT, ())
1118 0 : }
1119 :
1120 5 : async fn disk_usage_eviction_run(
1121 5 : mut r: Request<Body>,
1122 5 : _cancel: CancellationToken,
1123 5 : ) -> Result<Response<Body>, ApiError> {
1124 5 : check_permission(&r, None)?;
1125 :
1126 26 : #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
1127 : struct Config {
1128 : /// How many bytes to evict before reporting that pressure is relieved.
1129 : evict_bytes: u64,
1130 : }
1131 :
1132 26 : #[derive(Debug, Clone, Copy, serde::Serialize)]
1133 : struct Usage {
1134 : // remains unchanged after instantiation of the struct
1135 : config: Config,
1136 : // updated by `add_available_bytes`
1137 : freed_bytes: u64,
1138 : }
1139 :
1140 : impl crate::disk_usage_eviction_task::Usage for Usage {
1141 113 : fn has_pressure(&self) -> bool {
1142 113 : self.config.evict_bytes > self.freed_bytes
1143 113 : }
1144 :
1145 210 : fn add_available_bytes(&mut self, bytes: u64) {
1146 210 : self.freed_bytes += bytes;
1147 210 : }
1148 : }
1149 :
1150 5 : let config = json_request::<Config>(&mut r)
1151 1 : .await
1152 5 : .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
1153 :
1154 5 : let usage = Usage {
1155 5 : config,
1156 5 : freed_bytes: 0,
1157 5 : };
1158 5 :
1159 5 : let (tx, rx) = tokio::sync::oneshot::channel();
1160 5 :
1161 5 : let state = get_state(&r);
1162 :
1163 5 : let Some(storage) = state.remote_storage.clone() else {
1164 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1165 0 : "remote storage not configured, cannot run eviction iteration"
1166 0 : )));
1167 : };
1168 :
1169 5 : let state = state.disk_usage_eviction_state.clone();
1170 5 :
1171 5 : let cancel = CancellationToken::new();
1172 5 : let child_cancel = cancel.clone();
1173 5 : let _g = cancel.drop_guard();
1174 5 :
1175 5 : crate::task_mgr::spawn(
1176 5 : crate::task_mgr::BACKGROUND_RUNTIME.handle(),
1177 5 : TaskKind::DiskUsageEviction,
1178 5 : None,
1179 5 : None,
1180 5 : "ondemand disk usage eviction",
1181 5 : false,
1182 5 : async move {
1183 5 : let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
1184 5 : &state,
1185 5 : &storage,
1186 5 : usage,
1187 5 : &child_cancel,
1188 5 : )
1189 0 : .await;
1190 :
1191 5 : info!(?res, "disk_usage_eviction_task_iteration_impl finished");
1192 :
1193 5 : let _ = tx.send(res);
1194 5 : Ok(())
1195 5 : }
1196 5 : .in_current_span(),
1197 5 : );
1198 :
1199 5 : let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?;
1200 :
1201 5 : json_response(StatusCode::OK, response)
1202 5 : }
1203 :
1204 0 : async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
1205 0 : json_response(
1206 0 : StatusCode::NOT_FOUND,
1207 0 : HttpErrorBody::from_msg("page not found".to_owned()),
1208 0 : )
1209 0 : }
1210 :
1211 5 : async fn post_tracing_event_handler(
1212 5 : mut r: Request<Body>,
1213 5 : _cancel: CancellationToken,
1214 5 : ) -> Result<Response<Body>, ApiError> {
1215 10 : #[derive(Debug, serde::Deserialize)]
1216 : #[serde(rename_all = "lowercase")]
1217 : enum Level {
1218 : Error,
1219 : Warn,
1220 : Info,
1221 : Debug,
1222 : Trace,
1223 : }
1224 25 : #[derive(Debug, serde::Deserialize)]
1225 : struct Request {
1226 : level: Level,
1227 : message: String,
1228 : }
1229 5 : let body: Request = json_request(&mut r)
1230 0 : .await
1231 5 : .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
1232 :
1233 5 : match body.level {
1234 2 : Level::Error => tracing::error!(?body.message),
1235 2 : Level::Warn => tracing::warn!(?body.message),
1236 2 : Level::Info => tracing::info!(?body.message),
1237 1 : Level::Debug => tracing::debug!(?body.message),
1238 1 : Level::Trace => tracing::trace!(?body.message),
1239 : }
1240 :
1241 5 : json_response(StatusCode::OK, ())
1242 5 : }
1243 :
1244 : /// Common functionality of all the HTTP API handlers.
1245 : ///
1246 : /// - Adds a tracing span to each request (by `request_span`)
1247 : /// - Logs the request depending on the request method (by `request_span`)
1248 : /// - Logs the response if it was not successful (by `request_span`
1249 : /// - Shields the handler function from async cancellations. Hyper can drop the handler
1250 : /// Future if the connection to the client is lost, but most of the pageserver code is
1251 : /// not async cancellation safe. This converts the dropped future into a graceful cancellation
1252 : /// request with a CancellationToken.
1253 6266 : async fn api_handler<R, H>(request: Request<Body>, handler: H) -> Result<Response<Body>, ApiError>
1254 6266 : where
1255 6266 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1256 6266 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
1257 6266 : {
1258 6266 : // Spawn a new task to handle the request, to protect the handler from unexpected
1259 6266 : // async cancellations. Most pageserver functions are not async cancellation safe.
1260 6266 : // We arm a drop-guard, so that if Hyper drops the Future, we signal the task
1261 6266 : // with the cancellation token.
1262 6266 : let token = CancellationToken::new();
1263 6266 : let cancel_guard = token.clone().drop_guard();
1264 6266 : let result = request_span(request, move |r| async {
1265 6266 : let handle = tokio::spawn(
1266 6266 : async {
1267 6266 : let token_cloned = token.clone();
1268 3469830 : let result = handler(r, token).await;
1269 6265 : if token_cloned.is_cancelled() {
1270 1 : info!("Cancelled request finished");
1271 6264 : }
1272 6265 : result
1273 6266 : }
1274 6266 : .in_current_span(),
1275 6266 : );
1276 6266 :
1277 6431 : match handle.await {
1278 6264 : Ok(result) => result,
1279 0 : Err(e) => {
1280 0 : // The handler task panicked. We have a global panic handler that logs the
1281 0 : // panic with its backtrace, so no need to log that here. Only log a brief
1282 0 : // message to make it clear that we returned the error to the client.
1283 0 : error!("HTTP request handler task panicked: {e:#}");
1284 :
1285 : // Don't return an Error here, because then fallback error handler that was
1286 : // installed in make_router() will print the error. Instead, construct the
1287 : // HTTP error response and return that.
1288 0 : Ok(
1289 0 : ApiError::InternalServerError(anyhow!("HTTP request handler task panicked"))
1290 0 : .into_response(),
1291 0 : )
1292 : }
1293 : }
1294 6266 : })
1295 6431 : .await;
1296 :
1297 6264 : cancel_guard.disarm();
1298 6264 :
1299 6264 : result
1300 6264 : }
1301 :
1302 : /// Like api_handler, but returns an error response if the server is built without
1303 : /// the 'testing' feature.
1304 1070 : async fn testing_api_handler<R, H>(
1305 1070 : desc: &str,
1306 1070 : request: Request<Body>,
1307 1070 : handler: H,
1308 1070 : ) -> Result<Response<Body>, ApiError>
1309 1070 : where
1310 1070 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1311 1070 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
1312 1070 : {
1313 1070 : if cfg!(feature = "testing") {
1314 1070 : api_handler(request, handler).await
1315 : } else {
1316 0 : std::future::ready(Err(ApiError::BadRequest(anyhow!(
1317 0 : "Cannot {desc} because pageserver was compiled without testing APIs",
1318 0 : ))))
1319 0 : .await
1320 : }
1321 1070 : }
1322 :
1323 575 : pub fn make_router(
1324 575 : conf: &'static PageServerConf,
1325 575 : launch_ts: &'static LaunchTimestamp,
1326 575 : auth: Option<Arc<JwtAuth>>,
1327 575 : broker_client: BrokerClientChannel,
1328 575 : remote_storage: Option<GenericRemoteStorage>,
1329 575 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
1330 575 : ) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
1331 575 : let spec = include_bytes!("openapi_spec.yml");
1332 575 : let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
1333 575 : if auth.is_some() {
1334 60 : router = router.middleware(auth_middleware(|request| {
1335 60 : let state = get_state(request);
1336 60 : if state.allowlist_routes.contains(request.uri()) {
1337 10 : None
1338 : } else {
1339 50 : state.auth.as_deref()
1340 : }
1341 60 : }))
1342 566 : }
1343 :
1344 575 : router = router.middleware(
1345 575 : endpoint::add_response_header_middleware(
1346 575 : "PAGESERVER_LAUNCH_TIMESTAMP",
1347 575 : &launch_ts.to_string(),
1348 575 : )
1349 575 : .expect("construct launch timestamp header middleware"),
1350 575 : );
1351 575 :
1352 575 : Ok(router
1353 575 : .data(Arc::new(
1354 575 : State::new(
1355 575 : conf,
1356 575 : auth,
1357 575 : remote_storage,
1358 575 : broker_client,
1359 575 : disk_usage_eviction_state,
1360 575 : )
1361 575 : .context("Failed to initialize router state")?,
1362 : ))
1363 579 : .get("/v1/status", |r| api_handler(r, status_handler))
1364 575 : .put("/v1/failpoints", |r| {
1365 246 : testing_api_handler("manage failpoints", r, failpoints_handler)
1366 575 : })
1367 575 : .get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
1368 575 : .post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
1369 575 : .get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
1370 575 : .delete("/v1/tenant/:tenant_id", |r| {
1371 132 : api_handler(r, tenant_delete_handler)
1372 575 : })
1373 575 : .get("/v1/tenant/:tenant_id/synthetic_size", |r| {
1374 53 : api_handler(r, tenant_size_handler)
1375 575 : })
1376 575 : .put("/v1/tenant/config", |r| {
1377 27 : api_handler(r, update_tenant_config_handler)
1378 575 : })
1379 575 : .get("/v1/tenant/:tenant_id/config", |r| {
1380 40 : api_handler(r, get_tenant_config_handler)
1381 575 : })
1382 575 : .get("/v1/tenant/:tenant_id/timeline", |r| {
1383 51 : api_handler(r, timeline_list_handler)
1384 575 : })
1385 907 : .post("/v1/tenant/:tenant_id/timeline", |r| {
1386 907 : api_handler(r, timeline_create_handler)
1387 907 : })
1388 575 : .post("/v1/tenant/:tenant_id/attach", |r| {
1389 51 : api_handler(r, tenant_attach_handler)
1390 575 : })
1391 575 : .post("/v1/tenant/:tenant_id/detach", |r| {
1392 40 : api_handler(r, tenant_detach_handler)
1393 575 : })
1394 575 : .post("/v1/tenant/:tenant_id/load", |r| {
1395 7 : api_handler(r, tenant_load_handler)
1396 575 : })
1397 575 : .post("/v1/tenant/:tenant_id/ignore", |r| {
1398 8 : api_handler(r, tenant_ignore_handler)
1399 575 : })
1400 1440 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
1401 1440 : api_handler(r, timeline_detail_handler)
1402 1440 : })
1403 575 : .get(
1404 575 : "/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
1405 575 : |r| api_handler(r, get_lsn_by_timestamp_handler),
1406 575 : )
1407 575 : .put("/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", |r| {
1408 505 : api_handler(r, timeline_gc_handler)
1409 575 : })
1410 575 : .put("/v1/tenant/:tenant_id/timeline/:timeline_id/compact", |r| {
1411 123 : testing_api_handler("run timeline compaction", r, timeline_compact_handler)
1412 575 : })
1413 575 : .put(
1414 575 : "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
1415 694 : |r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler),
1416 575 : )
1417 575 : .post(
1418 575 : "/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
1419 575 : |r| api_handler(r, timeline_download_remote_layers_handler_post),
1420 575 : )
1421 575 : .get(
1422 575 : "/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
1423 575 : |r| api_handler(r, timeline_download_remote_layers_handler_get),
1424 575 : )
1425 575 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
1426 121 : api_handler(r, timeline_delete_handler)
1427 575 : })
1428 575 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {
1429 101 : api_handler(r, layer_map_info_handler)
1430 575 : })
1431 575 : .get(
1432 575 : "/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
1433 575 : |r| api_handler(r, layer_download_handler),
1434 575 : )
1435 575 : .delete(
1436 575 : "/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
1437 575 : |r| api_handler(r, evict_timeline_layer_handler),
1438 575 : )
1439 575 : .put("/v1/disk_usage_eviction/run", |r| {
1440 5 : api_handler(r, disk_usage_eviction_run)
1441 575 : })
1442 575 : .put("/v1/tenant/:tenant_id/break", |r| {
1443 2 : testing_api_handler("set tenant state to broken", r, handle_tenant_break)
1444 575 : })
1445 575 : .get("/v1/panic", |r| api_handler(r, always_panic_handler))
1446 575 : .post("/v1/tracing/event", |r| {
1447 5 : testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
1448 575 : })
1449 575 : .any(handler_404))
1450 575 : }
|