Line data Source code
1 : //!
2 : //! Management HTTP API
3 : //!
4 : use std::collections::HashMap;
5 : use std::str::FromStr;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use anyhow::{anyhow, Context, Result};
10 : use enumset::EnumSet;
11 : use futures::TryFutureExt;
12 : use humantime::format_rfc3339;
13 : use hyper::header;
14 : use hyper::StatusCode;
15 : use hyper::{Body, Request, Response, Uri};
16 : use metrics::launch_timestamp::LaunchTimestamp;
17 : use pageserver_api::models::LocationConfigListResponse;
18 : use pageserver_api::models::ShardParameters;
19 : use pageserver_api::models::TenantDetails;
20 : use pageserver_api::models::TenantLocationConfigResponse;
21 : use pageserver_api::models::TenantShardLocation;
22 : use pageserver_api::models::TenantShardSplitRequest;
23 : use pageserver_api::models::TenantShardSplitResponse;
24 : use pageserver_api::models::TenantState;
25 : use pageserver_api::models::{
26 : DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
27 : TenantLoadRequest, TenantLocationConfigRequest,
28 : };
29 : use pageserver_api::shard::ShardCount;
30 : use pageserver_api::shard::TenantShardId;
31 : use remote_storage::GenericRemoteStorage;
32 : use remote_storage::TimeTravelError;
33 : use tenant_size_model::{SizeResult, StorageModel};
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::*;
36 : use utils::auth::JwtAuth;
37 : use utils::failpoint_support::failpoints_handler;
38 : use utils::http::endpoint::request_span;
39 : use utils::http::json::json_request_or_empty_body;
40 : use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
41 :
42 : use crate::context::{DownloadBehavior, RequestContext};
43 : use crate::deletion_queue::DeletionQueueClient;
44 : use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
45 : use crate::pgdatadir_mapping::LsnForTimestamp;
46 : use crate::task_mgr::TaskKind;
47 : use crate::tenant::config::{LocationConf, TenantConfOpt};
48 : use crate::tenant::mgr::GetActiveTenantError;
49 : use crate::tenant::mgr::{
50 : GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
51 : TenantSlotError, TenantSlotUpsertError, TenantStateError,
52 : };
53 : use crate::tenant::mgr::{TenantSlot, UpsertLocationError};
54 : use crate::tenant::remote_timeline_client;
55 : use crate::tenant::secondary::SecondaryController;
56 : use crate::tenant::size::ModelInputs;
57 : use crate::tenant::storage_layer::LayerAccessStatsReset;
58 : use crate::tenant::timeline::CompactFlags;
59 : use crate::tenant::timeline::Timeline;
60 : use crate::tenant::SpawnMode;
61 : use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
62 : use crate::{config::PageServerConf, tenant::mgr};
63 : use crate::{disk_usage_eviction_task, tenant};
64 : use pageserver_api::models::{
65 : StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
66 : TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
67 : };
68 : use utils::{
69 : auth::SwappableJwtAuth,
70 : generation::Generation,
71 : http::{
72 : endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
73 : error::{ApiError, HttpErrorBody},
74 : json::{json_request, json_response},
75 : request::parse_request_param,
76 : RequestExt, RouterBuilder,
77 : },
78 : id::{TenantId, TimelineId},
79 : lsn::Lsn,
80 : };
81 :
82 : // For APIs that require an Active tenant, how long should we block waiting for that state?
83 : // This is not functionally necessary (clients will retry), but avoids generating a lot of
84 : // failed API calls while tenants are activating.
85 : #[cfg(not(feature = "testing"))]
86 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
87 :
88 : // Tests run on slow/oversubscribed nodes, and may need to wait much longer for tenants to
89 : // finish attaching, if calls to remote storage are slow.
90 : #[cfg(feature = "testing")]
91 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
92 :
93 : pub struct State {
94 : conf: &'static PageServerConf,
95 : tenant_manager: Arc<TenantManager>,
96 : auth: Option<Arc<SwappableJwtAuth>>,
97 : allowlist_routes: Vec<Uri>,
98 : remote_storage: Option<GenericRemoteStorage>,
99 : broker_client: storage_broker::BrokerClientChannel,
100 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
101 : deletion_queue_client: DeletionQueueClient,
102 : secondary_controller: SecondaryController,
103 : }
104 :
105 : impl State {
106 : #[allow(clippy::too_many_arguments)]
107 625 : pub fn new(
108 625 : conf: &'static PageServerConf,
109 625 : tenant_manager: Arc<TenantManager>,
110 625 : auth: Option<Arc<SwappableJwtAuth>>,
111 625 : remote_storage: Option<GenericRemoteStorage>,
112 625 : broker_client: storage_broker::BrokerClientChannel,
113 625 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
114 625 : deletion_queue_client: DeletionQueueClient,
115 625 : secondary_controller: SecondaryController,
116 625 : ) -> anyhow::Result<Self> {
117 625 : let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
118 625 : .iter()
119 2500 : .map(|v| v.parse().unwrap())
120 625 : .collect::<Vec<_>>();
121 625 : Ok(Self {
122 625 : conf,
123 625 : tenant_manager,
124 625 : auth,
125 625 : allowlist_routes,
126 625 : remote_storage,
127 625 : broker_client,
128 625 : disk_usage_eviction_state,
129 625 : deletion_queue_client,
130 625 : secondary_controller,
131 625 : })
132 625 : }
133 : }
134 :
135 : #[inline(always)]
136 2836 : fn get_state(request: &Request<Body>) -> &State {
137 2836 : request
138 2836 : .data::<Arc<State>>()
139 2836 : .expect("unknown state type")
140 2836 : .as_ref()
141 2836 : }
142 :
143 : #[inline(always)]
144 637 : fn get_config(request: &Request<Body>) -> &'static PageServerConf {
145 637 : get_state(request).conf
146 637 : }
147 :
148 : /// Check that the requester is authorized to operate on given tenant
149 9284 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
150 9284 : check_permission_with(request, |claims| {
151 90 : crate::auth::check_permission(claims, tenant_id)
152 9284 : })
153 9284 : }
154 :
155 : impl From<PageReconstructError> for ApiError {
156 2 : fn from(pre: PageReconstructError) -> ApiError {
157 2 : match pre {
158 2 : PageReconstructError::Other(pre) => ApiError::InternalServerError(pre),
159 : PageReconstructError::Cancelled => {
160 0 : ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
161 : }
162 : PageReconstructError::AncestorStopping(_) => {
163 0 : ApiError::ResourceUnavailable(format!("{pre}").into())
164 : }
165 0 : PageReconstructError::AncestorLsnTimeout(e) => ApiError::Timeout(format!("{e}").into()),
166 0 : PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
167 : }
168 2 : }
169 : }
170 :
171 : impl From<TenantMapInsertError> for ApiError {
172 1 : fn from(tmie: TenantMapInsertError) -> ApiError {
173 1 : match tmie {
174 1 : TenantMapInsertError::SlotError(e) => e.into(),
175 0 : TenantMapInsertError::SlotUpsertError(e) => e.into(),
176 0 : TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
177 : }
178 1 : }
179 : }
180 :
181 : impl From<TenantSlotError> for ApiError {
182 20 : fn from(e: TenantSlotError) -> ApiError {
183 20 : use TenantSlotError::*;
184 20 : match e {
185 17 : NotFound(tenant_id) => {
186 17 : ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
187 : }
188 1 : e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")),
189 : InProgress => {
190 2 : ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
191 : }
192 0 : MapState(e) => e.into(),
193 : }
194 20 : }
195 : }
196 :
197 : impl From<TenantSlotUpsertError> for ApiError {
198 0 : fn from(e: TenantSlotUpsertError) -> ApiError {
199 0 : use TenantSlotUpsertError::*;
200 0 : match e {
201 0 : InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")),
202 0 : MapState(e) => e.into(),
203 0 : ShuttingDown(_) => ApiError::ShuttingDown,
204 : }
205 0 : }
206 : }
207 :
208 : impl From<UpsertLocationError> for ApiError {
209 74 : fn from(e: UpsertLocationError) -> ApiError {
210 74 : use UpsertLocationError::*;
211 74 : match e {
212 37 : BadRequest(e) => ApiError::BadRequest(e),
213 18 : Unavailable(_) => ApiError::ShuttingDown,
214 18 : e @ InProgress => ApiError::Conflict(format!("{e}")),
215 1 : Flush(e) | Other(e) => ApiError::InternalServerError(e),
216 : }
217 74 : }
218 : }
219 :
220 : impl From<TenantMapError> for ApiError {
221 0 : fn from(e: TenantMapError) -> ApiError {
222 0 : use TenantMapError::*;
223 0 : match e {
224 : StillInitializing | ShuttingDown => {
225 0 : ApiError::ResourceUnavailable(format!("{e}").into())
226 0 : }
227 0 : }
228 0 : }
229 : }
230 :
231 : impl From<TenantStateError> for ApiError {
232 3 : fn from(tse: TenantStateError) -> ApiError {
233 3 : match tse {
234 : TenantStateError::IsStopping(_) => {
235 0 : ApiError::ResourceUnavailable("Tenant is stopping".into())
236 : }
237 3 : TenantStateError::SlotError(e) => e.into(),
238 0 : TenantStateError::SlotUpsertError(e) => e.into(),
239 0 : TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)),
240 : }
241 3 : }
242 : }
243 :
244 : impl From<GetTenantError> for ApiError {
245 84 : fn from(tse: GetTenantError) -> ApiError {
246 84 : match tse {
247 69 : GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
248 0 : GetTenantError::Broken(reason) => {
249 0 : ApiError::InternalServerError(anyhow!("tenant is broken: {}", reason))
250 : }
251 : GetTenantError::NotActive(_) => {
252 : // Why is this not `ApiError::NotFound`?
253 : // Because we must be careful to never return 404 for a tenant if it does
254 : // in fact exist locally. If we did, the caller could draw the conclusion
255 : // that it can attach the tenant to another PS and we'd be in split-brain.
256 : //
257 : // (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
258 15 : ApiError::ResourceUnavailable("Tenant not yet active".into())
259 : }
260 0 : GetTenantError::MapState(e) => ApiError::ResourceUnavailable(format!("{e}").into()),
261 : }
262 84 : }
263 : }
264 :
265 : impl From<GetActiveTenantError> for ApiError {
266 9 : fn from(e: GetActiveTenantError) -> ApiError {
267 9 : match e {
268 0 : GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)),
269 9 : GetActiveTenantError::Cancelled => ApiError::ShuttingDown,
270 0 : GetActiveTenantError::NotFound(gte) => gte.into(),
271 : GetActiveTenantError::WaitForActiveTimeout { .. } => {
272 0 : ApiError::ResourceUnavailable(format!("{}", e).into())
273 : }
274 : }
275 9 : }
276 : }
277 :
278 : impl From<SetNewTenantConfigError> for ApiError {
279 0 : fn from(e: SetNewTenantConfigError) -> ApiError {
280 0 : match e {
281 0 : SetNewTenantConfigError::GetTenant(tid) => {
282 0 : ApiError::NotFound(anyhow!("tenant {}", tid).into())
283 : }
284 0 : e @ (SetNewTenantConfigError::Persist(_) | SetNewTenantConfigError::Other(_)) => {
285 0 : ApiError::InternalServerError(anyhow::Error::new(e))
286 : }
287 : }
288 0 : }
289 : }
290 :
291 : impl From<crate::tenant::DeleteTimelineError> for ApiError {
292 11 : fn from(value: crate::tenant::DeleteTimelineError) -> Self {
293 11 : use crate::tenant::DeleteTimelineError::*;
294 11 : match value {
295 3 : NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
296 1 : HasChildren(children) => ApiError::PreconditionFailed(
297 1 : format!("Cannot delete timeline which has child timelines: {children:?}")
298 1 : .into_boxed_str(),
299 1 : ),
300 3 : a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
301 4 : Other(e) => ApiError::InternalServerError(e),
302 : }
303 11 : }
304 : }
305 :
306 : impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
307 0 : fn from(value: crate::tenant::mgr::DeleteTimelineError) -> Self {
308 : use crate::tenant::mgr::DeleteTimelineError::*;
309 0 : match value {
310 : // Report Precondition failed so client can distinguish between
311 : // "tenant is missing" case from "timeline is missing"
312 0 : Tenant(GetTenantError::NotFound(..)) => ApiError::PreconditionFailed(
313 0 : "Requested tenant is missing".to_owned().into_boxed_str(),
314 0 : ),
315 0 : Tenant(t) => ApiError::from(t),
316 0 : Timeline(t) => ApiError::from(t),
317 : }
318 0 : }
319 : }
320 :
321 : impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
322 34 : fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
323 34 : use crate::tenant::delete::DeleteTenantError::*;
324 34 : match value {
325 0 : Get(g) => ApiError::from(g),
326 0 : e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
327 0 : Timeline(t) => ApiError::from(t),
328 0 : NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
329 16 : SlotError(e) => e.into(),
330 0 : SlotUpsertError(e) => e.into(),
331 16 : Other(o) => ApiError::InternalServerError(o),
332 2 : e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
333 0 : Cancelled => ApiError::ShuttingDown,
334 : }
335 34 : }
336 : }
337 :
338 : // Helper function to construct a TimelineInfo struct for a timeline
339 2320 : async fn build_timeline_info(
340 2320 : timeline: &Arc<Timeline>,
341 2320 : include_non_incremental_logical_size: bool,
342 2320 : force_await_initial_logical_size: bool,
343 2320 : ctx: &RequestContext,
344 2320 : ) -> anyhow::Result<TimelineInfo> {
345 2320 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
346 2320 :
347 2320 : if force_await_initial_logical_size {
348 11 : timeline.clone().await_initial_logical_size().await
349 2309 : }
350 :
351 2320 : let mut info = build_timeline_info_common(
352 2320 : timeline,
353 2320 : ctx,
354 2320 : tenant::timeline::GetLogicalSizePriority::Background,
355 2320 : )
356 16 : .await?;
357 2320 : if include_non_incremental_logical_size {
358 : // XXX we should be using spawn_ondemand_logical_size_calculation here.
359 : // Otherwise, if someone deletes the timeline / detaches the tenant while
360 : // we're executing this function, we will outlive the timeline on-disk state.
361 : info.current_logical_size_non_incremental = Some(
362 37 : timeline
363 37 : .get_current_logical_size_non_incremental(info.last_record_lsn, ctx)
364 2425 : .await?,
365 : );
366 2283 : }
367 2320 : Ok(info)
368 2320 : }
369 :
370 3185 : async fn build_timeline_info_common(
371 3185 : timeline: &Arc<Timeline>,
372 3185 : ctx: &RequestContext,
373 3185 : logical_size_task_priority: tenant::timeline::GetLogicalSizePriority,
374 3185 : ) -> anyhow::Result<TimelineInfo> {
375 3185 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
376 3185 : let initdb_lsn = timeline.initdb_lsn;
377 3185 : let last_record_lsn = timeline.get_last_record_lsn();
378 3185 : let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
379 3185 : let guard = timeline.last_received_wal.lock().unwrap();
380 3185 : if let Some(info) = guard.as_ref() {
381 1952 : (
382 1952 : Some(format!("{:?}", info.wal_source_connconf)), // Password is hidden, but it's for statistics only.
383 1952 : Some(info.last_received_msg_lsn),
384 1952 : Some(info.last_received_msg_ts),
385 1952 : )
386 : } else {
387 1233 : (None, None, None)
388 : }
389 : };
390 :
391 3185 : let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
392 3185 : let ancestor_lsn = match timeline.get_ancestor_lsn() {
393 2436 : Lsn(0) => None,
394 749 : lsn @ Lsn(_) => Some(lsn),
395 : };
396 3185 : let current_logical_size = timeline.get_current_logical_size(logical_size_task_priority, ctx);
397 3185 : let current_physical_size = Some(timeline.layer_size_sum().await);
398 3185 : let state = timeline.current_state();
399 3185 : let remote_consistent_lsn_projected = timeline
400 3185 : .get_remote_consistent_lsn_projected()
401 3185 : .unwrap_or(Lsn(0));
402 3185 : let remote_consistent_lsn_visible = timeline
403 3185 : .get_remote_consistent_lsn_visible()
404 3185 : .unwrap_or(Lsn(0));
405 3185 :
406 3185 : let walreceiver_status = timeline.walreceiver_status();
407 :
408 3185 : let info = TimelineInfo {
409 3185 : tenant_id: timeline.tenant_shard_id,
410 3185 : timeline_id: timeline.timeline_id,
411 3185 : ancestor_timeline_id,
412 3185 : ancestor_lsn,
413 3185 : disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
414 3185 : remote_consistent_lsn: remote_consistent_lsn_projected,
415 3185 : remote_consistent_lsn_visible,
416 3185 : initdb_lsn,
417 3185 : last_record_lsn,
418 3185 : prev_record_lsn: Some(timeline.get_prev_record_lsn()),
419 3185 : latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
420 3185 : current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
421 3185 : current_logical_size_is_accurate: match current_logical_size.accuracy() {
422 620 : tenant::timeline::logical_size::Accuracy::Approximate => false,
423 2565 : tenant::timeline::logical_size::Accuracy::Exact => true,
424 : },
425 3185 : directory_entries_counts: timeline.get_directory_metrics().to_vec(),
426 3185 : current_physical_size,
427 3185 : current_logical_size_non_incremental: None,
428 3185 : timeline_dir_layer_file_size_sum: None,
429 3185 : wal_source_connstr,
430 3185 : last_received_msg_lsn,
431 3185 : last_received_msg_ts,
432 3185 : pg_version: timeline.pg_version,
433 3185 :
434 3185 : state,
435 3185 :
436 3185 : walreceiver_status,
437 3185 : };
438 3185 : Ok(info)
439 3185 : }
440 :
441 : // healthcheck handler
442 631 : async fn status_handler(
443 631 : request: Request<Body>,
444 631 : _cancel: CancellationToken,
445 631 : ) -> Result<Response<Body>, ApiError> {
446 631 : check_permission(&request, None)?;
447 631 : let config = get_config(&request);
448 631 : json_response(StatusCode::OK, StatusResponse { id: config.id })
449 631 : }
450 :
451 6 : async fn reload_auth_validation_keys_handler(
452 6 : request: Request<Body>,
453 6 : _cancel: CancellationToken,
454 6 : ) -> Result<Response<Body>, ApiError> {
455 6 : check_permission(&request, None)?;
456 6 : let config = get_config(&request);
457 6 : let state = get_state(&request);
458 6 : let Some(shared_auth) = &state.auth else {
459 0 : return json_response(StatusCode::BAD_REQUEST, ());
460 : };
461 : // unwrap is ok because check is performed when creating config, so path is set and exists
462 6 : let key_path = config.auth_validation_public_key_path.as_ref().unwrap();
463 6 : info!("Reloading public key(s) for verifying JWT tokens from {key_path:?}");
464 :
465 6 : match JwtAuth::from_key_path(key_path) {
466 6 : Ok(new_auth) => {
467 6 : shared_auth.swap(new_auth);
468 6 : json_response(StatusCode::OK, ())
469 : }
470 0 : Err(e) => {
471 0 : warn!("Error reloading public keys from {key_path:?}: {e:}");
472 0 : json_response(StatusCode::INTERNAL_SERVER_ERROR, ())
473 : }
474 : }
475 6 : }
476 :
477 897 : async fn timeline_create_handler(
478 897 : mut request: Request<Body>,
479 897 : _cancel: CancellationToken,
480 897 : ) -> Result<Response<Body>, ApiError> {
481 897 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
482 897 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
483 897 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
484 :
485 896 : let new_timeline_id = request_data.new_timeline_id;
486 896 :
487 896 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
488 896 :
489 896 : let state = get_state(&request);
490 :
491 896 : async {
492 896 : let tenant = state
493 896 : .tenant_manager
494 896 : .get_attached_tenant_shard(tenant_shard_id, false)?;
495 :
496 892 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
497 :
498 892 : if let Some(ancestor_id) = request_data.ancestor_timeline_id.as_ref() {
499 268 : tracing::info!(%ancestor_id, "starting to branch");
500 : } else {
501 624 : tracing::info!("bootstrapping");
502 : }
503 :
504 892 : match tenant
505 892 : .create_timeline(
506 892 : new_timeline_id,
507 892 : request_data.ancestor_timeline_id,
508 892 : request_data.ancestor_start_lsn,
509 892 : request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
510 892 : request_data.existing_initdb_timeline_id,
511 892 : state.broker_client.clone(),
512 892 : &ctx,
513 892 : )
514 7752708 : .await
515 : {
516 865 : Ok(new_timeline) => {
517 : // Created. Construct a TimelineInfo for it.
518 865 : let timeline_info = build_timeline_info_common(
519 865 : &new_timeline,
520 865 : &ctx,
521 865 : tenant::timeline::GetLogicalSizePriority::User,
522 865 : )
523 0 : .await
524 865 : .map_err(ApiError::InternalServerError)?;
525 865 : json_response(StatusCode::CREATED, timeline_info)
526 : }
527 21 : Err(_) if tenant.cancel.is_cancelled() => {
528 3 : // In case we get some ugly error type during shutdown, cast it into a clean 503.
529 3 : json_response(
530 3 : StatusCode::SERVICE_UNAVAILABLE,
531 3 : HttpErrorBody::from_msg("Tenant shutting down".to_string()),
532 3 : )
533 : }
534 : Err(
535 : tenant::CreateTimelineError::Conflict
536 : | tenant::CreateTimelineError::AlreadyCreating,
537 1 : ) => json_response(StatusCode::CONFLICT, ()),
538 8 : Err(tenant::CreateTimelineError::AncestorLsn(err)) => json_response(
539 8 : StatusCode::NOT_ACCEPTABLE,
540 8 : HttpErrorBody::from_msg(format!("{err:#}")),
541 8 : ),
542 6 : Err(e @ tenant::CreateTimelineError::AncestorNotActive) => json_response(
543 6 : StatusCode::SERVICE_UNAVAILABLE,
544 6 : HttpErrorBody::from_msg(e.to_string()),
545 6 : ),
546 0 : Err(tenant::CreateTimelineError::ShuttingDown) => json_response(
547 0 : StatusCode::SERVICE_UNAVAILABLE,
548 0 : HttpErrorBody::from_msg("tenant shutting down".to_string()),
549 0 : ),
550 3 : Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
551 : }
552 890 : }
553 : .instrument(info_span!("timeline_create",
554 : tenant_id = %tenant_shard_id.tenant_id,
555 896 : shard_id = %tenant_shard_id.shard_slug(),
556 : timeline_id = %new_timeline_id,
557 : lsn=?request_data.ancestor_start_lsn,
558 : pg_version=?request_data.pg_version
559 : ))
560 7752839 : .await
561 891 : }
562 :
563 64 : async fn timeline_list_handler(
564 64 : request: Request<Body>,
565 64 : _cancel: CancellationToken,
566 64 : ) -> Result<Response<Body>, ApiError> {
567 64 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
568 64 : let include_non_incremental_logical_size: Option<bool> =
569 64 : parse_query_param(&request, "include-non-incremental-logical-size")?;
570 64 : let force_await_initial_logical_size: Option<bool> =
571 64 : parse_query_param(&request, "force-await-initial-logical-size")?;
572 64 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
573 :
574 64 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
575 :
576 64 : let response_data = async {
577 64 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
578 60 : let timelines = tenant.list_timelines();
579 60 :
580 60 : let mut response_data = Vec::with_capacity(timelines.len());
581 162 : for timeline in timelines {
582 102 : let timeline_info = build_timeline_info(
583 102 : &timeline,
584 102 : include_non_incremental_logical_size.unwrap_or(false),
585 102 : force_await_initial_logical_size.unwrap_or(false),
586 102 : &ctx,
587 102 : )
588 102 : .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
589 0 : .await
590 102 : .context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
591 102 : .map_err(ApiError::InternalServerError)?;
592 :
593 102 : response_data.push(timeline_info);
594 : }
595 60 : Ok::<Vec<TimelineInfo>, ApiError>(response_data)
596 64 : }
597 : .instrument(info_span!("timeline_list",
598 : tenant_id = %tenant_shard_id.tenant_id,
599 64 : shard_id = %tenant_shard_id.shard_slug()))
600 4 : .await?;
601 :
602 60 : json_response(StatusCode::OK, response_data)
603 64 : }
604 :
605 2 : async fn timeline_preserve_initdb_handler(
606 2 : request: Request<Body>,
607 2 : _cancel: CancellationToken,
608 2 : ) -> Result<Response<Body>, ApiError> {
609 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
610 2 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
611 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
612 :
613 : // Part of the process for disaster recovery from safekeeper-stored WAL:
614 : // If we don't recover into a new timeline but want to keep the timeline ID,
615 : // then the initdb archive is deleted. This endpoint copies it to a different
616 : // location where timeline recreation cand find it.
617 :
618 2 : async {
619 2 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
620 :
621 2 : let timeline = tenant
622 2 : .get_timeline(timeline_id, false)
623 2 : .map_err(|e| ApiError::NotFound(e.into()))?;
624 :
625 2 : timeline
626 2 : .preserve_initdb_archive()
627 2 : .await
628 2 : .context("preserving initdb archive")
629 2 : .map_err(ApiError::InternalServerError)?;
630 :
631 2 : Ok::<_, ApiError>(())
632 2 : }
633 : .instrument(info_span!("timeline_preserve_initdb_archive",
634 : tenant_id = %tenant_shard_id.tenant_id,
635 2 : shard_id = %tenant_shard_id.shard_slug(),
636 : %timeline_id))
637 2 : .await?;
638 :
639 2 : json_response(StatusCode::OK, ())
640 2 : }
641 :
642 2280 : async fn timeline_detail_handler(
643 2280 : request: Request<Body>,
644 2280 : _cancel: CancellationToken,
645 2280 : ) -> Result<Response<Body>, ApiError> {
646 2280 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
647 2280 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
648 2280 : let include_non_incremental_logical_size: Option<bool> =
649 2280 : parse_query_param(&request, "include-non-incremental-logical-size")?;
650 2280 : let force_await_initial_logical_size: Option<bool> =
651 2280 : parse_query_param(&request, "force-await-initial-logical-size")?;
652 2280 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
653 :
654 : // Logical size calculation needs downloading.
655 2280 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
656 :
657 2280 : let timeline_info = async {
658 2280 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
659 :
660 2268 : let timeline = tenant
661 2268 : .get_timeline(timeline_id, false)
662 2268 : .map_err(|e| ApiError::NotFound(e.into()))?;
663 :
664 2218 : let timeline_info = build_timeline_info(
665 2218 : &timeline,
666 2218 : include_non_incremental_logical_size.unwrap_or(false),
667 2218 : force_await_initial_logical_size.unwrap_or(false),
668 2218 : &ctx,
669 2218 : )
670 2447 : .await
671 2218 : .context("get local timeline info")
672 2218 : .map_err(ApiError::InternalServerError)?;
673 :
674 2218 : Ok::<_, ApiError>(timeline_info)
675 2280 : }
676 : .instrument(info_span!("timeline_detail",
677 : tenant_id = %tenant_shard_id.tenant_id,
678 2280 : shard_id = %tenant_shard_id.shard_slug(),
679 : %timeline_id))
680 2447 : .await?;
681 :
682 2218 : json_response(StatusCode::OK, timeline_info)
683 2280 : }
684 :
685 13 : async fn get_lsn_by_timestamp_handler(
686 13 : request: Request<Body>,
687 13 : cancel: CancellationToken,
688 13 : ) -> Result<Response<Body>, ApiError> {
689 13 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
690 13 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
691 :
692 13 : if !tenant_shard_id.is_zero() {
693 : // Requires SLRU contents, which are only stored on shard zero
694 0 : return Err(ApiError::BadRequest(anyhow!(
695 0 : "Size calculations are only available on shard zero"
696 0 : )));
697 13 : }
698 :
699 13 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
700 13 : let timestamp_raw = must_get_query_param(&request, "timestamp")?;
701 13 : let timestamp = humantime::parse_rfc3339(×tamp_raw)
702 13 : .with_context(|| format!("Invalid time: {:?}", timestamp_raw))
703 13 : .map_err(ApiError::BadRequest)?;
704 13 : let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
705 13 :
706 13 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
707 13 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
708 13 : let result = timeline
709 13 : .find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
710 1351 : .await?;
711 13 : #[derive(serde::Serialize, Debug)]
712 : struct Result {
713 : lsn: Lsn,
714 : kind: &'static str,
715 : }
716 13 : let (lsn, kind) = match result {
717 10 : LsnForTimestamp::Present(lsn) => (lsn, "present"),
718 1 : LsnForTimestamp::Future(lsn) => (lsn, "future"),
719 2 : LsnForTimestamp::Past(lsn) => (lsn, "past"),
720 0 : LsnForTimestamp::NoData(lsn) => (lsn, "nodata"),
721 : };
722 13 : let result = Result { lsn, kind };
723 13 : tracing::info!(
724 13 : lsn=?result.lsn,
725 13 : kind=%result.kind,
726 13 : timestamp=%timestamp_raw,
727 13 : "lsn_by_timestamp finished"
728 13 : );
729 13 : json_response(StatusCode::OK, result)
730 13 : }
731 :
732 12 : async fn get_timestamp_of_lsn_handler(
733 12 : request: Request<Body>,
734 12 : _cancel: CancellationToken,
735 12 : ) -> Result<Response<Body>, ApiError> {
736 12 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
737 12 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
738 :
739 12 : if !tenant_shard_id.is_zero() {
740 : // Requires SLRU contents, which are only stored on shard zero
741 0 : return Err(ApiError::BadRequest(anyhow!(
742 0 : "Size calculations are only available on shard zero"
743 0 : )));
744 12 : }
745 :
746 12 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
747 :
748 12 : let lsn_str = must_get_query_param(&request, "lsn")?;
749 12 : let lsn = Lsn::from_str(&lsn_str)
750 12 : .with_context(|| format!("Invalid LSN: {lsn_str:?}"))
751 12 : .map_err(ApiError::BadRequest)?;
752 :
753 12 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
754 12 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
755 82 : let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;
756 :
757 10 : match result {
758 10 : Some(time) => {
759 10 : let time = format_rfc3339(postgres_ffi::from_pg_timestamp(time)).to_string();
760 10 : json_response(StatusCode::OK, time)
761 : }
762 0 : None => json_response(StatusCode::NOT_FOUND, ()),
763 : }
764 12 : }
765 :
766 41 : async fn tenant_attach_handler(
767 41 : mut request: Request<Body>,
768 41 : _cancel: CancellationToken,
769 41 : ) -> Result<Response<Body>, ApiError> {
770 41 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
771 41 : check_permission(&request, Some(tenant_id))?;
772 :
773 41 : let maybe_body: Option<TenantAttachRequest> = json_request_or_empty_body(&mut request).await?;
774 38 : let tenant_conf = match &maybe_body {
775 38 : Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?,
776 0 : None => TenantConfOpt::default(),
777 : };
778 :
779 38 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
780 :
781 38 : info!("Handling tenant attach {tenant_id}");
782 :
783 38 : let state = get_state(&request);
784 :
785 38 : let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
786 :
787 38 : if state.remote_storage.is_none() {
788 0 : return Err(ApiError::BadRequest(anyhow!(
789 0 : "attach_tenant is not possible because pageserver was configured without remote storage"
790 0 : )));
791 38 : }
792 38 :
793 38 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
794 38 : let shard_params = ShardParameters::default();
795 38 : let location_conf = LocationConf::attached_single(tenant_conf, generation, &shard_params);
796 :
797 38 : let tenant = state
798 38 : .tenant_manager
799 38 : .upsert_location(
800 38 : tenant_shard_id,
801 38 : location_conf,
802 38 : None,
803 38 : SpawnMode::Normal,
804 38 : &ctx,
805 38 : )
806 108 : .await?;
807 :
808 38 : let Some(tenant) = tenant else {
809 : // This should never happen: indicates a bug in upsert_location
810 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
811 0 : "Upsert succeeded but didn't return tenant!"
812 0 : )));
813 : };
814 :
815 : // We might have successfully constructed a Tenant, but it could still
816 : // end up in a broken state:
817 : if let TenantState::Broken {
818 6 : reason,
819 : backtrace: _,
820 38 : } = tenant.current_state()
821 : {
822 6 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
823 6 : "Tenant state is Broken: {reason}"
824 6 : )));
825 32 : }
826 32 :
827 32 : json_response(StatusCode::ACCEPTED, ())
828 41 : }
829 :
830 65 : async fn timeline_delete_handler(
831 65 : request: Request<Body>,
832 65 : _cancel: CancellationToken,
833 65 : ) -> Result<Response<Body>, ApiError> {
834 65 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
835 65 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
836 65 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
837 :
838 65 : let state = get_state(&request);
839 :
840 65 : let tenant = state
841 65 : .tenant_manager
842 65 : .get_attached_tenant_shard(tenant_shard_id, false)
843 65 : .map_err(|e| {
844 1 : match e {
845 : // GetTenantError has a built-in conversion to ApiError, but in this context we don't
846 : // want to treat missing tenants as 404, to avoid ambiguity with successful deletions.
847 1 : GetTenantError::NotFound(_) => ApiError::PreconditionFailed(
848 1 : "Requested tenant is missing".to_string().into_boxed_str(),
849 1 : ),
850 0 : e => e.into(),
851 : }
852 65 : })?;
853 64 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
854 64 : tenant.delete_timeline(timeline_id).instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
855 370 : .await?;
856 :
857 53 : json_response(StatusCode::ACCEPTED, ())
858 65 : }
859 :
860 38 : async fn tenant_detach_handler(
861 38 : request: Request<Body>,
862 38 : _cancel: CancellationToken,
863 38 : ) -> Result<Response<Body>, ApiError> {
864 38 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
865 38 : check_permission(&request, Some(tenant_id))?;
866 38 : let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
867 :
868 : // This is a legacy API (`/location_conf` is the replacement). It only supports unsharded tenants
869 38 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
870 38 :
871 38 : let state = get_state(&request);
872 38 : let conf = state.conf;
873 38 : mgr::detach_tenant(
874 38 : conf,
875 38 : tenant_shard_id,
876 38 : detach_ignored.unwrap_or(false),
877 38 : &state.deletion_queue_client,
878 38 : )
879 38 : .instrument(info_span!("tenant_detach", %tenant_id, shard_id=%tenant_shard_id.shard_slug()))
880 177 : .await?;
881 :
882 35 : json_response(StatusCode::OK, ())
883 38 : }
884 :
885 2 : async fn tenant_reset_handler(
886 2 : request: Request<Body>,
887 2 : _cancel: CancellationToken,
888 2 : ) -> Result<Response<Body>, ApiError> {
889 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
890 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
891 :
892 2 : let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
893 :
894 2 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
895 2 : let state = get_state(&request);
896 2 : state
897 2 : .tenant_manager
898 2 : .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &ctx)
899 6 : .await
900 2 : .map_err(ApiError::InternalServerError)?;
901 :
902 2 : json_response(StatusCode::OK, ())
903 2 : }
904 :
905 4 : async fn tenant_load_handler(
906 4 : mut request: Request<Body>,
907 4 : _cancel: CancellationToken,
908 4 : ) -> Result<Response<Body>, ApiError> {
909 4 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
910 4 : check_permission(&request, Some(tenant_id))?;
911 :
912 4 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
913 :
914 4 : let maybe_body: Option<TenantLoadRequest> = json_request_or_empty_body(&mut request).await?;
915 :
916 4 : let state = get_state(&request);
917 :
918 : // The /load request is only usable when control_plane_api is not set. Once it is set, callers
919 : // should always use /attach instead.
920 4 : let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
921 :
922 4 : mgr::load_tenant(
923 4 : state.conf,
924 4 : tenant_id,
925 4 : generation,
926 4 : state.broker_client.clone(),
927 4 : state.remote_storage.clone(),
928 4 : state.deletion_queue_client.clone(),
929 4 : &ctx,
930 4 : )
931 4 : .instrument(info_span!("load", %tenant_id))
932 6 : .await?;
933 :
934 3 : json_response(StatusCode::ACCEPTED, ())
935 4 : }
936 :
937 5 : async fn tenant_ignore_handler(
938 5 : request: Request<Body>,
939 5 : _cancel: CancellationToken,
940 5 : ) -> Result<Response<Body>, ApiError> {
941 5 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
942 5 : check_permission(&request, Some(tenant_id))?;
943 :
944 5 : let state = get_state(&request);
945 5 : let conf = state.conf;
946 5 : mgr::ignore_tenant(conf, tenant_id)
947 5 : .instrument(info_span!("ignore_tenant", %tenant_id))
948 17 : .await?;
949 :
950 5 : json_response(StatusCode::OK, ())
951 5 : }
952 :
953 178 : async fn tenant_list_handler(
954 178 : request: Request<Body>,
955 178 : _cancel: CancellationToken,
956 178 : ) -> Result<Response<Body>, ApiError> {
957 178 : check_permission(&request, None)?;
958 :
959 178 : let response_data = mgr::list_tenants()
960 178 : .instrument(info_span!("tenant_list"))
961 0 : .await
962 178 : .map_err(|_| {
963 0 : ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
964 178 : })?
965 178 : .iter()
966 178 : .map(|(id, state, gen)| TenantInfo {
967 176 : id: *id,
968 176 : state: state.clone(),
969 176 : current_physical_size: None,
970 176 : attachment_status: state.attachment_status(),
971 176 : generation: (*gen).into(),
972 178 : })
973 178 : .collect::<Vec<TenantInfo>>();
974 178 :
975 178 : json_response(StatusCode::OK, response_data)
976 178 : }
977 :
978 546 : async fn tenant_status(
979 546 : request: Request<Body>,
980 546 : _cancel: CancellationToken,
981 546 : ) -> Result<Response<Body>, ApiError> {
982 546 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
983 546 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
984 :
985 546 : let tenant_info = async {
986 546 : let tenant = mgr::get_tenant(tenant_shard_id, false)?;
987 :
988 : // Calculate total physical size of all timelines
989 482 : let mut current_physical_size = 0;
990 482 : for timeline in tenant.list_timelines().iter() {
991 476 : current_physical_size += timeline.layer_size_sum().await;
992 : }
993 :
994 482 : let state = tenant.current_state();
995 482 : Result::<_, ApiError>::Ok(TenantDetails {
996 482 : tenant_info: TenantInfo {
997 482 : id: tenant_shard_id,
998 482 : state: state.clone(),
999 482 : current_physical_size: Some(current_physical_size),
1000 482 : attachment_status: state.attachment_status(),
1001 482 : generation: tenant.generation().into(),
1002 482 : },
1003 482 : walredo: tenant.wal_redo_manager_status(),
1004 482 : timelines: tenant.list_timeline_ids(),
1005 482 : })
1006 546 : }
1007 : .instrument(info_span!("tenant_status_handler",
1008 : tenant_id = %tenant_shard_id.tenant_id,
1009 546 : shard_id = %tenant_shard_id.shard_slug()))
1010 64 : .await?;
1011 :
1012 482 : json_response(StatusCode::OK, tenant_info)
1013 546 : }
1014 :
1015 121 : async fn tenant_delete_handler(
1016 121 : request: Request<Body>,
1017 121 : _cancel: CancellationToken,
1018 121 : ) -> Result<Response<Body>, ApiError> {
1019 : // TODO openapi spec
1020 121 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1021 121 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1022 :
1023 121 : let state = get_state(&request);
1024 121 :
1025 121 : state
1026 121 : .tenant_manager
1027 121 : .delete_tenant(tenant_shard_id, ACTIVE_TENANT_TIMEOUT)
1028 : .instrument(info_span!("tenant_delete_handler",
1029 : tenant_id = %tenant_shard_id.tenant_id,
1030 121 : shard_id = %tenant_shard_id.shard_slug()
1031 : ))
1032 529 : .await?;
1033 :
1034 87 : json_response(StatusCode::ACCEPTED, ())
1035 121 : }
1036 :
1037 : /// HTTP endpoint to query the current tenant_size of a tenant.
1038 : ///
1039 : /// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used
1040 : /// to debug any of the calculations. Requires `tenant_id` request parameter, supports
1041 : /// `inputs_only=true|false` (default false) which supports debugging failure to calculate model
1042 : /// values.
1043 : ///
1044 : /// 'retention_period' query parameter overrides the cutoff that is used to calculate the size
1045 : /// (only if it is shorter than the real cutoff).
1046 : ///
1047 : /// Note: we don't update the cached size and prometheus metric here.
1048 : /// The retention period might be different, and it's nice to have a method to just calculate it
1049 : /// without modifying anything anyway.
1050 55 : async fn tenant_size_handler(
1051 55 : request: Request<Body>,
1052 55 : cancel: CancellationToken,
1053 55 : ) -> Result<Response<Body>, ApiError> {
1054 55 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1055 55 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1056 55 : let inputs_only: Option<bool> = parse_query_param(&request, "inputs_only")?;
1057 55 : let retention_period: Option<u64> = parse_query_param(&request, "retention_period")?;
1058 55 : let headers = request.headers();
1059 55 :
1060 55 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1061 55 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
1062 :
1063 55 : if !tenant_shard_id.is_zero() {
1064 0 : return Err(ApiError::BadRequest(anyhow!(
1065 0 : "Size calculations are only available on shard zero"
1066 0 : )));
1067 55 : }
1068 :
1069 : // this can be long operation
1070 55 : let inputs = tenant
1071 55 : .gather_size_inputs(
1072 55 : retention_period,
1073 55 : LogicalSizeCalculationCause::TenantSizeHandler,
1074 55 : &cancel,
1075 55 : &ctx,
1076 55 : )
1077 48 : .await
1078 55 : .map_err(ApiError::InternalServerError)?;
1079 :
1080 55 : let mut sizes = None;
1081 55 : let accepts_html = headers
1082 55 : .get(header::ACCEPT)
1083 55 : .map(|v| v == "text/html")
1084 55 : .unwrap_or_default();
1085 55 : if !inputs_only.unwrap_or(false) {
1086 55 : let storage_model = inputs
1087 55 : .calculate_model()
1088 55 : .map_err(ApiError::InternalServerError)?;
1089 55 : let size = storage_model.calculate();
1090 55 :
1091 55 : // If request header expects html, return html
1092 55 : if accepts_html {
1093 19 : return synthetic_size_html_response(inputs, storage_model, size);
1094 36 : }
1095 36 : sizes = Some(size);
1096 0 : } else if accepts_html {
1097 0 : return Err(ApiError::BadRequest(anyhow!(
1098 0 : "inputs_only parameter is incompatible with html output request"
1099 0 : )));
1100 0 : }
1101 :
1102 : /// The type resides in the pageserver not to expose `ModelInputs`.
1103 36 : #[derive(serde::Serialize)]
1104 : struct TenantHistorySize {
1105 : id: TenantId,
1106 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
1107 : ///
1108 : /// Will be none if `?inputs_only=true` was given.
1109 : size: Option<u64>,
1110 : /// Size of each segment used in the model.
1111 : /// Will be null if `?inputs_only=true` was given.
1112 : segment_sizes: Option<Vec<tenant_size_model::SegmentSizeResult>>,
1113 : inputs: crate::tenant::size::ModelInputs,
1114 : }
1115 :
1116 36 : json_response(
1117 36 : StatusCode::OK,
1118 36 : TenantHistorySize {
1119 36 : id: tenant_shard_id.tenant_id,
1120 36 : size: sizes.as_ref().map(|x| x.total_size),
1121 36 : segment_sizes: sizes.map(|x| x.segments),
1122 36 : inputs,
1123 36 : },
1124 36 : )
1125 55 : }
1126 :
1127 5 : async fn tenant_shard_split_handler(
1128 5 : mut request: Request<Body>,
1129 5 : _cancel: CancellationToken,
1130 5 : ) -> Result<Response<Body>, ApiError> {
1131 5 : let req: TenantShardSplitRequest = json_request(&mut request).await?;
1132 :
1133 5 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1134 5 : let state = get_state(&request);
1135 5 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1136 :
1137 5 : let new_shards = state
1138 5 : .tenant_manager
1139 5 : .shard_split(tenant_shard_id, ShardCount(req.new_shard_count), &ctx)
1140 74 : .await
1141 5 : .map_err(ApiError::InternalServerError)?;
1142 :
1143 5 : json_response(StatusCode::OK, TenantShardSplitResponse { new_shards })
1144 5 : }
1145 :
1146 97 : async fn layer_map_info_handler(
1147 97 : request: Request<Body>,
1148 97 : _cancel: CancellationToken,
1149 97 : ) -> Result<Response<Body>, ApiError> {
1150 97 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1151 97 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1152 97 : let reset: LayerAccessStatsReset =
1153 97 : parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset);
1154 97 :
1155 97 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1156 :
1157 97 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1158 97 : let layer_map_info = timeline.layer_map_info(reset).await;
1159 :
1160 97 : json_response(StatusCode::OK, layer_map_info)
1161 97 : }
1162 :
1163 2 : async fn layer_download_handler(
1164 2 : request: Request<Body>,
1165 2 : _cancel: CancellationToken,
1166 2 : ) -> Result<Response<Body>, ApiError> {
1167 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1168 2 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1169 2 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
1170 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1171 :
1172 2 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1173 2 : let downloaded = timeline
1174 2 : .download_layer(layer_file_name)
1175 6 : .await
1176 2 : .map_err(ApiError::InternalServerError)?;
1177 :
1178 2 : match downloaded {
1179 2 : Some(true) => json_response(StatusCode::OK, ()),
1180 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
1181 0 : None => json_response(
1182 0 : StatusCode::BAD_REQUEST,
1183 0 : format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"),
1184 0 : ),
1185 : }
1186 2 : }
1187 :
1188 2206 : async fn evict_timeline_layer_handler(
1189 2206 : request: Request<Body>,
1190 2206 : _cancel: CancellationToken,
1191 2206 : ) -> Result<Response<Body>, ApiError> {
1192 2206 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1193 2206 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1194 2206 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1195 2206 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
1196 :
1197 2206 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1198 2206 : let evicted = timeline
1199 2206 : .evict_layer(layer_file_name)
1200 2161 : .await
1201 2206 : .map_err(ApiError::InternalServerError)?;
1202 :
1203 2206 : match evicted {
1204 2206 : Some(true) => json_response(StatusCode::OK, ()),
1205 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
1206 0 : None => json_response(
1207 0 : StatusCode::BAD_REQUEST,
1208 0 : format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"),
1209 0 : ),
1210 : }
1211 2206 : }
1212 :
1213 : /// Get tenant_size SVG graph along with the JSON data.
1214 19 : fn synthetic_size_html_response(
1215 19 : inputs: ModelInputs,
1216 19 : storage_model: StorageModel,
1217 19 : sizes: SizeResult,
1218 19 : ) -> Result<Response<Body>, ApiError> {
1219 19 : let mut timeline_ids: Vec<String> = Vec::new();
1220 19 : let mut timeline_map: HashMap<TimelineId, usize> = HashMap::new();
1221 27 : for (index, ti) in inputs.timeline_inputs.iter().enumerate() {
1222 27 : timeline_map.insert(ti.timeline_id, index);
1223 27 : timeline_ids.push(ti.timeline_id.to_string());
1224 27 : }
1225 19 : let seg_to_branch: Vec<usize> = inputs
1226 19 : .segments
1227 19 : .iter()
1228 68 : .map(|seg| *timeline_map.get(&seg.timeline_id).unwrap())
1229 19 : .collect();
1230 :
1231 19 : let svg =
1232 19 : tenant_size_model::svg::draw_svg(&storage_model, &timeline_ids, &seg_to_branch, &sizes)
1233 19 : .map_err(ApiError::InternalServerError)?;
1234 :
1235 19 : let mut response = String::new();
1236 19 :
1237 19 : use std::fmt::Write;
1238 19 : write!(response, "<html>\n<body>\n").unwrap();
1239 19 : write!(response, "<div>\n{svg}\n</div>").unwrap();
1240 19 : writeln!(response, "Project size: {}", sizes.total_size).unwrap();
1241 19 : writeln!(response, "<pre>").unwrap();
1242 19 : writeln!(
1243 19 : response,
1244 19 : "{}",
1245 19 : serde_json::to_string_pretty(&inputs).unwrap()
1246 19 : )
1247 19 : .unwrap();
1248 19 : writeln!(
1249 19 : response,
1250 19 : "{}",
1251 19 : serde_json::to_string_pretty(&sizes.segments).unwrap()
1252 19 : )
1253 19 : .unwrap();
1254 19 : writeln!(response, "</pre>").unwrap();
1255 19 : write!(response, "</body>\n</html>\n").unwrap();
1256 19 :
1257 19 : html_response(StatusCode::OK, response)
1258 19 : }
1259 :
1260 19 : pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>, ApiError> {
1261 19 : let response = Response::builder()
1262 19 : .status(status)
1263 19 : .header(header::CONTENT_TYPE, "text/html")
1264 19 : .body(Body::from(data.as_bytes().to_vec()))
1265 19 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1266 19 : Ok(response)
1267 19 : }
1268 :
1269 : /// Helper for requests that may take a generation, which is mandatory
1270 : /// when control_plane_api is set, but otherwise defaults to Generation::none()
1271 163 : fn get_request_generation(state: &State, req_gen: Option<u32>) -> Result<Generation, ApiError> {
1272 163 : if state.conf.control_plane_api.is_some() {
1273 163 : req_gen
1274 163 : .map(Generation::new)
1275 163 : .ok_or(ApiError::BadRequest(anyhow!(
1276 163 : "generation attribute missing"
1277 163 : )))
1278 : } else {
1279 : // Legacy mode: all tenants operate with no generation
1280 0 : Ok(Generation::none())
1281 : }
1282 163 : }
1283 :
1284 122 : async fn tenant_create_handler(
1285 122 : mut request: Request<Body>,
1286 122 : _cancel: CancellationToken,
1287 122 : ) -> Result<Response<Body>, ApiError> {
1288 122 : let request_data: TenantCreateRequest = json_request(&mut request).await?;
1289 122 : let target_tenant_id = request_data.new_tenant_id;
1290 122 : check_permission(&request, None)?;
1291 :
1292 121 : let _timer = STORAGE_TIME_GLOBAL
1293 121 : .get_metric_with_label_values(&[StorageTimeOperation::CreateTenant.into()])
1294 121 : .expect("bug")
1295 121 : .start_timer();
1296 :
1297 121 : let tenant_conf =
1298 121 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1299 :
1300 121 : let state = get_state(&request);
1301 :
1302 121 : let generation = get_request_generation(state, request_data.generation)?;
1303 :
1304 121 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1305 121 :
1306 121 : let location_conf =
1307 121 : LocationConf::attached_single(tenant_conf, generation, &request_data.shard_parameters);
1308 :
1309 121 : let new_tenant = state
1310 121 : .tenant_manager
1311 121 : .upsert_location(
1312 121 : target_tenant_id,
1313 121 : location_conf,
1314 121 : None,
1315 121 : SpawnMode::Create,
1316 121 : &ctx,
1317 121 : )
1318 275 : .await?;
1319 :
1320 48 : let Some(new_tenant) = new_tenant else {
1321 : // This should never happen: indicates a bug in upsert_location
1322 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1323 0 : "Upsert succeeded but didn't return tenant!"
1324 0 : )));
1325 : };
1326 : // We created the tenant. Existing API semantics are that the tenant
1327 : // is Active when this function returns.
1328 48 : new_tenant
1329 48 : .wait_to_become_active(ACTIVE_TENANT_TIMEOUT)
1330 63 : .await?;
1331 :
1332 39 : json_response(
1333 39 : StatusCode::CREATED,
1334 39 : TenantCreateResponse(new_tenant.tenant_shard_id().tenant_id),
1335 39 : )
1336 122 : }
1337 :
1338 45 : async fn get_tenant_config_handler(
1339 45 : request: Request<Body>,
1340 45 : _cancel: CancellationToken,
1341 45 : ) -> Result<Response<Body>, ApiError> {
1342 45 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1343 45 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1344 :
1345 45 : let tenant = mgr::get_tenant(tenant_shard_id, false)?;
1346 :
1347 45 : let response = HashMap::from([
1348 : (
1349 : "tenant_specific_overrides",
1350 45 : serde_json::to_value(tenant.tenant_specific_overrides())
1351 45 : .context("serializing tenant specific overrides")
1352 45 : .map_err(ApiError::InternalServerError)?,
1353 : ),
1354 : (
1355 45 : "effective_config",
1356 45 : serde_json::to_value(tenant.effective_config())
1357 45 : .context("serializing effective config")
1358 45 : .map_err(ApiError::InternalServerError)?,
1359 : ),
1360 : ]);
1361 :
1362 45 : json_response(StatusCode::OK, response)
1363 45 : }
1364 :
1365 30 : async fn update_tenant_config_handler(
1366 30 : mut request: Request<Body>,
1367 30 : _cancel: CancellationToken,
1368 30 : ) -> Result<Response<Body>, ApiError> {
1369 30 : let request_data: TenantConfigRequest = json_request(&mut request).await?;
1370 30 : let tenant_id = request_data.tenant_id;
1371 30 : check_permission(&request, Some(tenant_id))?;
1372 :
1373 30 : let tenant_conf =
1374 30 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1375 :
1376 30 : let state = get_state(&request);
1377 30 : mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
1378 30 : .instrument(info_span!("tenant_config", %tenant_id))
1379 60 : .await?;
1380 :
1381 30 : json_response(StatusCode::OK, ())
1382 30 : }
1383 :
1384 687 : async fn put_tenant_location_config_handler(
1385 687 : mut request: Request<Body>,
1386 687 : _cancel: CancellationToken,
1387 687 : ) -> Result<Response<Body>, ApiError> {
1388 687 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1389 :
1390 687 : let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
1391 687 : let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis);
1392 687 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1393 :
1394 687 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1395 687 : let state = get_state(&request);
1396 687 : let conf = state.conf;
1397 687 :
1398 687 : // The `Detached` state is special, it doesn't upsert a tenant, it removes
1399 687 : // its local disk content and drops it from memory.
1400 687 : if let LocationConfigMode::Detached = request_data.config.mode {
1401 10 : if let Err(e) =
1402 56 : mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
1403 : .instrument(info_span!("tenant_detach",
1404 : tenant_id = %tenant_shard_id.tenant_id,
1405 56 : shard_id = %tenant_shard_id.shard_slug()
1406 : ))
1407 219 : .await
1408 : {
1409 10 : match e {
1410 10 : TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
1411 10 : // This API is idempotent: a NotFound on a detach is fine.
1412 10 : }
1413 0 : _ => return Err(e.into()),
1414 : }
1415 46 : }
1416 56 : return json_response(StatusCode::OK, ());
1417 631 : }
1418 :
1419 631 : let location_conf =
1420 631 : LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1421 :
1422 631 : let attached = state
1423 631 : .tenant_manager
1424 631 : .upsert_location(
1425 631 : tenant_shard_id,
1426 631 : location_conf,
1427 631 : flush,
1428 631 : tenant::SpawnMode::Normal,
1429 631 : &ctx,
1430 631 : )
1431 1954 : .await?
1432 630 : .is_some();
1433 :
1434 630 : if let Some(_flush_ms) = flush {
1435 5 : match state
1436 5 : .secondary_controller
1437 5 : .upload_tenant(tenant_shard_id)
1438 5 : .await
1439 : {
1440 : Ok(()) => {
1441 5 : tracing::info!("Uploaded heatmap during flush");
1442 : }
1443 0 : Err(e) => {
1444 0 : tracing::warn!("Failed to flush heatmap: {e}");
1445 : }
1446 : }
1447 : } else {
1448 625 : tracing::info!("No flush requested when configuring");
1449 : }
1450 :
1451 : // This API returns a vector of pageservers where the tenant is attached: this is
1452 : // primarily for use in the sharding service. For compatibilty, we also return this
1453 : // when called directly on a pageserver, but the payload is always zero or one shards.
1454 630 : let mut response = TenantLocationConfigResponse { shards: Vec::new() };
1455 630 : if attached {
1456 596 : response.shards.push(TenantShardLocation {
1457 596 : shard_id: tenant_shard_id,
1458 596 : node_id: state.conf.id,
1459 596 : })
1460 34 : }
1461 :
1462 630 : json_response(StatusCode::OK, response)
1463 687 : }
1464 :
1465 6 : async fn list_location_config_handler(
1466 6 : request: Request<Body>,
1467 6 : _cancel: CancellationToken,
1468 6 : ) -> Result<Response<Body>, ApiError> {
1469 6 : let state = get_state(&request);
1470 6 : let slots = state.tenant_manager.list();
1471 6 : let result = LocationConfigListResponse {
1472 6 : tenant_shards: slots
1473 6 : .into_iter()
1474 6 : .map(|(tenant_shard_id, slot)| {
1475 5 : let v = match slot {
1476 5 : TenantSlot::Attached(t) => Some(t.get_location_conf()),
1477 0 : TenantSlot::Secondary(s) => Some(s.get_location_conf()),
1478 0 : TenantSlot::InProgress(_) => None,
1479 : };
1480 5 : (tenant_shard_id, v)
1481 6 : })
1482 6 : .collect(),
1483 6 : };
1484 6 : json_response(StatusCode::OK, result)
1485 6 : }
1486 :
1487 : // Do a time travel recovery on the given tenant/tenant shard. Tenant needs to be detached
1488 : // (from all pageservers) as it invalidates consistency assumptions.
1489 1 : async fn tenant_time_travel_remote_storage_handler(
1490 1 : request: Request<Body>,
1491 1 : cancel: CancellationToken,
1492 1 : ) -> Result<Response<Body>, ApiError> {
1493 1 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1494 :
1495 1 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1496 :
1497 1 : let timestamp_raw = must_get_query_param(&request, "travel_to")?;
1498 1 : let timestamp = humantime::parse_rfc3339(×tamp_raw)
1499 1 : .with_context(|| format!("Invalid time for travel_to: {timestamp_raw:?}"))
1500 1 : .map_err(ApiError::BadRequest)?;
1501 :
1502 1 : let done_if_after_raw = must_get_query_param(&request, "done_if_after")?;
1503 1 : let done_if_after = humantime::parse_rfc3339(&done_if_after_raw)
1504 1 : .with_context(|| format!("Invalid time for done_if_after: {done_if_after_raw:?}"))
1505 1 : .map_err(ApiError::BadRequest)?;
1506 :
1507 : // This is just a sanity check to fend off naive wrong usages of the API:
1508 : // the tenant needs to be detached *everywhere*
1509 1 : let state = get_state(&request);
1510 1 : let we_manage_tenant = state.tenant_manager.manages_tenant_shard(tenant_shard_id);
1511 1 : if we_manage_tenant {
1512 0 : return Err(ApiError::BadRequest(anyhow!(
1513 0 : "Tenant {tenant_shard_id} is already attached at this pageserver"
1514 0 : )));
1515 1 : }
1516 :
1517 1 : let Some(storage) = state.remote_storage.as_ref() else {
1518 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1519 0 : "remote storage not configured, cannot run time travel"
1520 0 : )));
1521 : };
1522 :
1523 1 : if timestamp > done_if_after {
1524 0 : return Err(ApiError::BadRequest(anyhow!(
1525 0 : "The done_if_after timestamp comes before the timestamp to recover to"
1526 0 : )));
1527 1 : }
1528 :
1529 1 : tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}");
1530 :
1531 1 : remote_timeline_client::upload::time_travel_recover_tenant(
1532 1 : storage,
1533 1 : &tenant_shard_id,
1534 1 : timestamp,
1535 1 : done_if_after,
1536 1 : &cancel,
1537 1 : )
1538 151 : .await
1539 1 : .map_err(|e| match e {
1540 0 : TimeTravelError::BadInput(e) => {
1541 0 : warn!("bad input error: {e}");
1542 0 : ApiError::BadRequest(anyhow!("bad input error"))
1543 : }
1544 : TimeTravelError::Unimplemented => {
1545 0 : ApiError::BadRequest(anyhow!("unimplemented for the configured remote storage"))
1546 : }
1547 0 : TimeTravelError::Cancelled => ApiError::InternalServerError(anyhow!("cancelled")),
1548 : TimeTravelError::TooManyVersions => {
1549 0 : ApiError::InternalServerError(anyhow!("too many versions in remote storage"))
1550 : }
1551 0 : TimeTravelError::Other(e) => {
1552 0 : warn!("internal error: {e}");
1553 0 : ApiError::InternalServerError(anyhow!("internal error"))
1554 : }
1555 1 : })?;
1556 :
1557 1 : json_response(StatusCode::OK, ())
1558 1 : }
1559 :
1560 : /// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
1561 2 : async fn handle_tenant_break(
1562 2 : r: Request<Body>,
1563 2 : _cancel: CancellationToken,
1564 2 : ) -> Result<Response<Body>, ApiError> {
1565 2 : let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
1566 :
1567 2 : let tenant = crate::tenant::mgr::get_tenant(tenant_shard_id, true)
1568 2 : .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
1569 :
1570 2 : tenant.set_broken("broken from test".to_owned()).await;
1571 :
1572 2 : json_response(StatusCode::OK, ())
1573 2 : }
1574 :
1575 : // Run GC immediately on given timeline.
1576 324 : async fn timeline_gc_handler(
1577 324 : mut request: Request<Body>,
1578 324 : cancel: CancellationToken,
1579 324 : ) -> Result<Response<Body>, ApiError> {
1580 324 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1581 324 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1582 324 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1583 :
1584 324 : let gc_req: TimelineGcRequest = json_request(&mut request).await?;
1585 :
1586 324 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1587 323 : let wait_task_done =
1588 324 : mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
1589 323 : let gc_result = wait_task_done
1590 323 : .await
1591 323 : .context("wait for gc task")
1592 323 : .map_err(ApiError::InternalServerError)?
1593 323 : .map_err(ApiError::InternalServerError)?;
1594 :
1595 322 : json_response(StatusCode::OK, gc_result)
1596 324 : }
1597 :
1598 : // Run compaction immediately on given timeline.
1599 125 : async fn timeline_compact_handler(
1600 125 : request: Request<Body>,
1601 125 : cancel: CancellationToken,
1602 125 : ) -> Result<Response<Body>, ApiError> {
1603 125 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1604 125 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1605 125 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1606 :
1607 125 : let mut flags = EnumSet::empty();
1608 125 : if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
1609 2 : flags |= CompactFlags::ForceRepartition;
1610 123 : }
1611 125 : async {
1612 125 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1613 125 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1614 125 : timeline
1615 125 : .compact(&cancel, flags, &ctx)
1616 19172 : .await
1617 125 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1618 125 : json_response(StatusCode::OK, ())
1619 125 : }
1620 125 : .instrument(info_span!("manual_compaction", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1621 19172 : .await
1622 125 : }
1623 :
1624 : // Run checkpoint immediately on given timeline.
1625 614 : async fn timeline_checkpoint_handler(
1626 614 : request: Request<Body>,
1627 614 : cancel: CancellationToken,
1628 614 : ) -> Result<Response<Body>, ApiError> {
1629 614 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1630 614 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1631 614 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1632 :
1633 614 : let mut flags = EnumSet::empty();
1634 614 : if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
1635 2 : flags |= CompactFlags::ForceRepartition;
1636 612 : }
1637 614 : async {
1638 614 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1639 614 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1640 614 : timeline
1641 614 : .freeze_and_flush()
1642 711 : .await
1643 614 : .map_err(ApiError::InternalServerError)?;
1644 614 : timeline
1645 614 : .compact(&cancel, flags, &ctx)
1646 400956 : .await
1647 613 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1648 :
1649 613 : json_response(StatusCode::OK, ())
1650 613 : }
1651 614 : .instrument(info_span!("manual_checkpoint", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1652 401667 : .await
1653 613 : }
1654 :
1655 2 : async fn timeline_download_remote_layers_handler_post(
1656 2 : mut request: Request<Body>,
1657 2 : _cancel: CancellationToken,
1658 2 : ) -> Result<Response<Body>, ApiError> {
1659 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1660 2 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1661 2 : let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?;
1662 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1663 :
1664 2 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1665 2 : match timeline.spawn_download_all_remote_layers(body).await {
1666 2 : Ok(st) => json_response(StatusCode::ACCEPTED, st),
1667 0 : Err(st) => json_response(StatusCode::CONFLICT, st),
1668 : }
1669 2 : }
1670 :
1671 55 : async fn timeline_download_remote_layers_handler_get(
1672 55 : request: Request<Body>,
1673 55 : _cancel: CancellationToken,
1674 55 : ) -> Result<Response<Body>, ApiError> {
1675 55 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1676 55 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1677 55 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1678 :
1679 55 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1680 55 : let info = timeline
1681 55 : .get_download_all_remote_layers_task_info()
1682 55 : .context("task never started since last pageserver process start")
1683 55 : .map_err(|e| ApiError::NotFound(e.into()))?;
1684 55 : json_response(StatusCode::OK, info)
1685 55 : }
1686 :
1687 20 : async fn deletion_queue_flush(
1688 20 : r: Request<Body>,
1689 20 : cancel: CancellationToken,
1690 20 : ) -> Result<Response<Body>, ApiError> {
1691 20 : let state = get_state(&r);
1692 20 :
1693 20 : if state.remote_storage.is_none() {
1694 : // Nothing to do if remote storage is disabled.
1695 0 : return json_response(StatusCode::OK, ());
1696 20 : }
1697 :
1698 20 : let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
1699 20 :
1700 20 : let flush = async {
1701 20 : if execute {
1702 36 : state.deletion_queue_client.flush_execute().await
1703 : } else {
1704 8 : state.deletion_queue_client.flush().await
1705 : }
1706 20 : }
1707 : // DeletionQueueError's only case is shutting down.
1708 20 : .map_err(|_| ApiError::ShuttingDown);
1709 20 :
1710 64 : tokio::select! {
1711 20 : res = flush => {
1712 20 : res.map(|()| json_response(StatusCode::OK, ()))?
1713 : }
1714 : _ = cancel.cancelled() => {
1715 : Err(ApiError::ShuttingDown)
1716 : }
1717 : }
1718 20 : }
1719 :
1720 : /// Try if `GetPage@Lsn` is successful, useful for manual debugging.
1721 0 : async fn getpage_at_lsn_handler(
1722 0 : request: Request<Body>,
1723 0 : _cancel: CancellationToken,
1724 0 : ) -> Result<Response<Body>, ApiError> {
1725 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1726 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1727 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1728 :
1729 : struct Key(crate::repository::Key);
1730 :
1731 : impl std::str::FromStr for Key {
1732 : type Err = anyhow::Error;
1733 :
1734 0 : fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
1735 0 : crate::repository::Key::from_hex(s).map(Key)
1736 0 : }
1737 : }
1738 :
1739 0 : let key: Key = parse_query_param(&request, "key")?
1740 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
1741 0 : let lsn: Lsn = parse_query_param(&request, "lsn")?
1742 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
1743 :
1744 0 : async {
1745 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1746 0 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1747 :
1748 0 : let page = timeline.get(key.0, lsn, &ctx).await?;
1749 :
1750 0 : Result::<_, ApiError>::Ok(
1751 0 : Response::builder()
1752 0 : .status(StatusCode::OK)
1753 0 : .header(header::CONTENT_TYPE, "application/octet-stream")
1754 0 : .body(hyper::Body::from(page))
1755 0 : .unwrap(),
1756 0 : )
1757 0 : }
1758 0 : .instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1759 0 : .await
1760 0 : }
1761 :
1762 0 : async fn timeline_collect_keyspace(
1763 0 : request: Request<Body>,
1764 0 : _cancel: CancellationToken,
1765 0 : ) -> Result<Response<Body>, ApiError> {
1766 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1767 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1768 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1769 :
1770 0 : let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
1771 :
1772 0 : async {
1773 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1774 0 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1775 0 : let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
1776 0 : let keys = timeline
1777 0 : .collect_keyspace(at_lsn, &ctx)
1778 0 : .await
1779 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1780 :
1781 0 : let res = pageserver_api::models::partitioning::Partitioning { keys, at_lsn };
1782 0 :
1783 0 : json_response(StatusCode::OK, res)
1784 0 : }
1785 0 : .instrument(info_span!("timeline_collect_keyspace", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1786 0 : .await
1787 0 : }
1788 :
1789 3126 : async fn active_timeline_of_active_tenant(
1790 3126 : tenant_shard_id: TenantShardId,
1791 3126 : timeline_id: TimelineId,
1792 3126 : ) -> Result<Arc<Timeline>, ApiError> {
1793 3126 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
1794 3126 : tenant
1795 3126 : .get_timeline(timeline_id, true)
1796 3126 : .map_err(|e| ApiError::NotFound(e.into()))
1797 3126 : }
1798 :
1799 0 : async fn always_panic_handler(
1800 0 : req: Request<Body>,
1801 0 : _cancel: CancellationToken,
1802 0 : ) -> Result<Response<Body>, ApiError> {
1803 0 : // Deliberately cause a panic to exercise the panic hook registered via std::panic::set_hook().
1804 0 : // For pageserver, the relevant panic hook is `tracing_panic_hook` , and the `sentry` crate's wrapper around it.
1805 0 : // Use catch_unwind to ensure that tokio nor hyper are distracted by our panic.
1806 0 : let query = req.uri().query();
1807 0 : let _ = std::panic::catch_unwind(|| {
1808 0 : panic!("unconditional panic for testing panic hook integration; request query: {query:?}")
1809 0 : });
1810 0 : json_response(StatusCode::NO_CONTENT, ())
1811 0 : }
1812 :
1813 14 : async fn disk_usage_eviction_run(
1814 14 : mut r: Request<Body>,
1815 14 : cancel: CancellationToken,
1816 14 : ) -> Result<Response<Body>, ApiError> {
1817 14 : check_permission(&r, None)?;
1818 :
1819 66 : #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
1820 : struct Config {
1821 : /// How many bytes to evict before reporting that pressure is relieved.
1822 : evict_bytes: u64,
1823 :
1824 : #[serde(default)]
1825 : eviction_order: crate::disk_usage_eviction_task::EvictionOrder,
1826 : }
1827 :
1828 65 : #[derive(Debug, Clone, Copy, serde::Serialize)]
1829 : struct Usage {
1830 : // remains unchanged after instantiation of the struct
1831 : evict_bytes: u64,
1832 : // updated by `add_available_bytes`
1833 : freed_bytes: u64,
1834 : }
1835 :
1836 : impl crate::disk_usage_eviction_task::Usage for Usage {
1837 269 : fn has_pressure(&self) -> bool {
1838 269 : self.evict_bytes > self.freed_bytes
1839 269 : }
1840 :
1841 488 : fn add_available_bytes(&mut self, bytes: u64) {
1842 488 : self.freed_bytes += bytes;
1843 488 : }
1844 : }
1845 :
1846 14 : let config = json_request::<Config>(&mut r).await?;
1847 :
1848 14 : let usage = Usage {
1849 14 : evict_bytes: config.evict_bytes,
1850 14 : freed_bytes: 0,
1851 14 : };
1852 14 :
1853 14 : let state = get_state(&r);
1854 :
1855 14 : let Some(storage) = state.remote_storage.as_ref() else {
1856 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1857 0 : "remote storage not configured, cannot run eviction iteration"
1858 0 : )));
1859 : };
1860 :
1861 14 : let eviction_state = state.disk_usage_eviction_state.clone();
1862 :
1863 14 : let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
1864 14 : &eviction_state,
1865 14 : storage,
1866 14 : usage,
1867 14 : &state.tenant_manager,
1868 14 : config.eviction_order,
1869 14 : &cancel,
1870 14 : )
1871 305 : .await;
1872 :
1873 14 : info!(?res, "disk_usage_eviction_task_iteration_impl finished");
1874 :
1875 14 : let res = res.map_err(ApiError::InternalServerError)?;
1876 :
1877 14 : json_response(StatusCode::OK, res)
1878 14 : }
1879 :
1880 7 : async fn secondary_upload_handler(
1881 7 : request: Request<Body>,
1882 7 : _cancel: CancellationToken,
1883 7 : ) -> Result<Response<Body>, ApiError> {
1884 7 : let state = get_state(&request);
1885 7 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1886 7 : state
1887 7 : .secondary_controller
1888 7 : .upload_tenant(tenant_shard_id)
1889 7 : .await
1890 7 : .map_err(ApiError::InternalServerError)?;
1891 :
1892 7 : json_response(StatusCode::OK, ())
1893 7 : }
1894 :
1895 6 : async fn secondary_download_handler(
1896 6 : request: Request<Body>,
1897 6 : _cancel: CancellationToken,
1898 6 : ) -> Result<Response<Body>, ApiError> {
1899 6 : let state = get_state(&request);
1900 6 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1901 6 : state
1902 6 : .secondary_controller
1903 6 : .download_tenant(tenant_shard_id)
1904 6 : .await
1905 6 : .map_err(ApiError::InternalServerError)?;
1906 :
1907 6 : json_response(StatusCode::OK, ())
1908 6 : }
1909 :
1910 0 : async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
1911 0 : json_response(
1912 0 : StatusCode::NOT_FOUND,
1913 0 : HttpErrorBody::from_msg("page not found".to_owned()),
1914 0 : )
1915 0 : }
1916 :
1917 0 : async fn post_tracing_event_handler(
1918 0 : mut r: Request<Body>,
1919 0 : _cancel: CancellationToken,
1920 0 : ) -> Result<Response<Body>, ApiError> {
1921 0 : #[derive(Debug, serde::Deserialize)]
1922 : #[serde(rename_all = "lowercase")]
1923 : enum Level {
1924 : Error,
1925 : Warn,
1926 : Info,
1927 : Debug,
1928 : Trace,
1929 : }
1930 0 : #[derive(Debug, serde::Deserialize)]
1931 : struct Request {
1932 : level: Level,
1933 : message: String,
1934 : }
1935 0 : let body: Request = json_request(&mut r)
1936 0 : .await
1937 0 : .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
1938 :
1939 0 : match body.level {
1940 0 : Level::Error => tracing::error!(?body.message),
1941 0 : Level::Warn => tracing::warn!(?body.message),
1942 0 : Level::Info => tracing::info!(?body.message),
1943 0 : Level::Debug => tracing::debug!(?body.message),
1944 0 : Level::Trace => tracing::trace!(?body.message),
1945 : }
1946 :
1947 0 : json_response(StatusCode::OK, ())
1948 0 : }
1949 :
1950 0 : async fn put_io_engine_handler(
1951 0 : mut r: Request<Body>,
1952 0 : _cancel: CancellationToken,
1953 0 : ) -> Result<Response<Body>, ApiError> {
1954 0 : let kind: crate::virtual_file::IoEngineKind = json_request(&mut r).await?;
1955 0 : crate::virtual_file::io_engine::set(kind);
1956 0 : json_response(StatusCode::OK, ())
1957 0 : }
1958 :
1959 : /// Common functionality of all the HTTP API handlers.
1960 : ///
1961 : /// - Adds a tracing span to each request (by `request_span`)
1962 : /// - Logs the request depending on the request method (by `request_span`)
1963 : /// - Logs the response if it was not successful (by `request_span`
1964 : /// - Shields the handler function from async cancellations. Hyper can drop the handler
1965 : /// Future if the connection to the client is lost, but most of the pageserver code is
1966 : /// not async cancellation safe. This converts the dropped future into a graceful cancellation
1967 : /// request with a CancellationToken.
1968 9509 : async fn api_handler<R, H>(request: Request<Body>, handler: H) -> Result<Response<Body>, ApiError>
1969 9509 : where
1970 9509 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1971 9509 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
1972 9509 : {
1973 9509 : // Spawn a new task to handle the request, to protect the handler from unexpected
1974 9509 : // async cancellations. Most pageserver functions are not async cancellation safe.
1975 9509 : // We arm a drop-guard, so that if Hyper drops the Future, we signal the task
1976 9509 : // with the cancellation token.
1977 9509 : let token = CancellationToken::new();
1978 9509 : let cancel_guard = token.clone().drop_guard();
1979 9509 : let result = request_span(request, move |r| async {
1980 9509 : let handle = tokio::spawn(
1981 9509 : async {
1982 9509 : let token_cloned = token.clone();
1983 8184561 : let result = handler(r, token).await;
1984 9502 : if token_cloned.is_cancelled() {
1985 : // dropguard has executed: we will never turn this result into response.
1986 : //
1987 : // at least temporarily do {:?} logging; these failures are rare enough but
1988 : // could hide difficult errors.
1989 2 : match &result {
1990 2 : Ok(response) => {
1991 2 : let status = response.status();
1992 2 : info!(%status, "Cancelled request finished successfully")
1993 : }
1994 0 : Err(e) => error!("Cancelled request finished with an error: {e:?}"),
1995 : }
1996 9500 : }
1997 : // only logging for cancelled panicked request handlers is the tracing_panic_hook,
1998 : // which should suffice.
1999 : //
2000 : // there is still a chance to lose the result due to race between
2001 : // returning from here and the actual connection closing happening
2002 : // before outer task gets to execute. leaving that up for #5815.
2003 9502 : result
2004 9509 : }
2005 9509 : .in_current_span(),
2006 9509 : );
2007 9509 :
2008 9591 : match handle.await {
2009 : // TODO: never actually return Err from here, always Ok(...) so that we can log
2010 : // spanned errors. Call api_error_handler instead and return appropriate Body.
2011 9500 : Ok(result) => result,
2012 0 : Err(e) => {
2013 0 : // The handler task panicked. We have a global panic handler that logs the
2014 0 : // panic with its backtrace, so no need to log that here. Only log a brief
2015 0 : // message to make it clear that we returned the error to the client.
2016 0 : error!("HTTP request handler task panicked: {e:#}");
2017 :
2018 : // Don't return an Error here, because then fallback error handler that was
2019 : // installed in make_router() will print the error. Instead, construct the
2020 : // HTTP error response and return that.
2021 0 : Ok(
2022 0 : ApiError::InternalServerError(anyhow!("HTTP request handler task panicked"))
2023 0 : .into_response(),
2024 0 : )
2025 : }
2026 : }
2027 19009 : })
2028 9591 : .await;
2029 :
2030 9500 : cancel_guard.disarm();
2031 9500 :
2032 9500 : result
2033 9500 : }
2034 :
2035 : /// Like api_handler, but returns an error response if the server is built without
2036 : /// the 'testing' feature.
2037 920 : async fn testing_api_handler<R, H>(
2038 920 : desc: &str,
2039 920 : request: Request<Body>,
2040 920 : handler: H,
2041 920 : ) -> Result<Response<Body>, ApiError>
2042 920 : where
2043 920 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
2044 920 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
2045 920 : {
2046 920 : if cfg!(feature = "testing") {
2047 920 : api_handler(request, handler).await
2048 : } else {
2049 0 : std::future::ready(Err(ApiError::BadRequest(anyhow!(
2050 0 : "Cannot {desc} because pageserver was compiled without testing APIs",
2051 0 : ))))
2052 0 : .await
2053 : }
2054 919 : }
2055 :
2056 625 : pub fn make_router(
2057 625 : state: Arc<State>,
2058 625 : launch_ts: &'static LaunchTimestamp,
2059 625 : auth: Option<Arc<SwappableJwtAuth>>,
2060 625 : ) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
2061 625 : let spec = include_bytes!("openapi_spec.yml");
2062 625 : let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
2063 625 : if auth.is_some() {
2064 127 : router = router.middleware(auth_middleware(|request| {
2065 127 : let state = get_state(request);
2066 127 : if state.allowlist_routes.contains(request.uri()) {
2067 34 : None
2068 : } else {
2069 93 : state.auth.as_deref()
2070 : }
2071 127 : }))
2072 614 : }
2073 :
2074 625 : router = router.middleware(
2075 625 : endpoint::add_response_header_middleware(
2076 625 : "PAGESERVER_LAUNCH_TIMESTAMP",
2077 625 : &launch_ts.to_string(),
2078 625 : )
2079 625 : .expect("construct launch timestamp header middleware"),
2080 625 : );
2081 625 :
2082 625 : Ok(router
2083 625 : .data(state)
2084 631 : .get("/v1/status", |r| api_handler(r, status_handler))
2085 625 : .put("/v1/failpoints", |r| {
2086 179 : testing_api_handler("manage failpoints", r, failpoints_handler)
2087 625 : })
2088 625 : .post("/v1/reload_auth_validation_keys", |r| {
2089 6 : api_handler(r, reload_auth_validation_keys_handler)
2090 625 : })
2091 625 : .get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
2092 625 : .post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
2093 625 : .get("/v1/tenant/:tenant_shard_id", |r| {
2094 546 : api_handler(r, tenant_status)
2095 625 : })
2096 625 : .delete("/v1/tenant/:tenant_shard_id", |r| {
2097 121 : api_handler(r, tenant_delete_handler)
2098 625 : })
2099 625 : .get("/v1/tenant/:tenant_shard_id/synthetic_size", |r| {
2100 55 : api_handler(r, tenant_size_handler)
2101 625 : })
2102 625 : .put("/v1/tenant/config", |r| {
2103 30 : api_handler(r, update_tenant_config_handler)
2104 625 : })
2105 625 : .put("/v1/tenant/:tenant_shard_id/shard_split", |r| {
2106 5 : api_handler(r, tenant_shard_split_handler)
2107 625 : })
2108 625 : .get("/v1/tenant/:tenant_shard_id/config", |r| {
2109 45 : api_handler(r, get_tenant_config_handler)
2110 625 : })
2111 687 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
2112 687 : api_handler(r, put_tenant_location_config_handler)
2113 687 : })
2114 625 : .get("/v1/location_config", |r| {
2115 6 : api_handler(r, list_location_config_handler)
2116 625 : })
2117 625 : .put(
2118 625 : "/v1/tenant/:tenant_shard_id/time_travel_remote_storage",
2119 625 : |r| api_handler(r, tenant_time_travel_remote_storage_handler),
2120 625 : )
2121 625 : .get("/v1/tenant/:tenant_shard_id/timeline", |r| {
2122 64 : api_handler(r, timeline_list_handler)
2123 625 : })
2124 897 : .post("/v1/tenant/:tenant_shard_id/timeline", |r| {
2125 897 : api_handler(r, timeline_create_handler)
2126 897 : })
2127 625 : .post("/v1/tenant/:tenant_id/attach", |r| {
2128 41 : api_handler(r, tenant_attach_handler)
2129 625 : })
2130 625 : .post("/v1/tenant/:tenant_id/detach", |r| {
2131 38 : api_handler(r, tenant_detach_handler)
2132 625 : })
2133 625 : .post("/v1/tenant/:tenant_shard_id/reset", |r| {
2134 2 : api_handler(r, tenant_reset_handler)
2135 625 : })
2136 625 : .post("/v1/tenant/:tenant_id/load", |r| {
2137 4 : api_handler(r, tenant_load_handler)
2138 625 : })
2139 625 : .post("/v1/tenant/:tenant_id/ignore", |r| {
2140 5 : api_handler(r, tenant_ignore_handler)
2141 625 : })
2142 625 : .post(
2143 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/preserve_initdb_archive",
2144 625 : |r| api_handler(r, timeline_preserve_initdb_handler),
2145 625 : )
2146 2280 : .get("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
2147 2280 : api_handler(r, timeline_detail_handler)
2148 2280 : })
2149 625 : .get(
2150 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_lsn_by_timestamp",
2151 625 : |r| api_handler(r, get_lsn_by_timestamp_handler),
2152 625 : )
2153 625 : .get(
2154 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_timestamp_of_lsn",
2155 625 : |r| api_handler(r, get_timestamp_of_lsn_handler),
2156 625 : )
2157 625 : .put(
2158 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
2159 625 : |r| api_handler(r, timeline_gc_handler),
2160 625 : )
2161 625 : .put(
2162 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
2163 625 : |r| testing_api_handler("run timeline compaction", r, timeline_compact_handler),
2164 625 : )
2165 625 : .put(
2166 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",
2167 625 : |r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler),
2168 625 : )
2169 625 : .post(
2170 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
2171 625 : |r| api_handler(r, timeline_download_remote_layers_handler_post),
2172 625 : )
2173 625 : .get(
2174 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
2175 625 : |r| api_handler(r, timeline_download_remote_layers_handler_get),
2176 625 : )
2177 625 : .delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
2178 65 : api_handler(r, timeline_delete_handler)
2179 625 : })
2180 625 : .get(
2181 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer",
2182 625 : |r| api_handler(r, layer_map_info_handler),
2183 625 : )
2184 625 : .get(
2185 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
2186 625 : |r| api_handler(r, layer_download_handler),
2187 625 : )
2188 625 : .delete(
2189 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
2190 2206 : |r| api_handler(r, evict_timeline_layer_handler),
2191 625 : )
2192 625 : .post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
2193 7 : api_handler(r, secondary_upload_handler)
2194 625 : })
2195 625 : .put("/v1/disk_usage_eviction/run", |r| {
2196 14 : api_handler(r, disk_usage_eviction_run)
2197 625 : })
2198 625 : .put("/v1/deletion_queue/flush", |r| {
2199 20 : api_handler(r, deletion_queue_flush)
2200 625 : })
2201 625 : .post("/v1/tenant/:tenant_shard_id/secondary/download", |r| {
2202 6 : api_handler(r, secondary_download_handler)
2203 625 : })
2204 625 : .put("/v1/tenant/:tenant_shard_id/break", |r| {
2205 2 : testing_api_handler("set tenant state to broken", r, handle_tenant_break)
2206 625 : })
2207 625 : .get("/v1/panic", |r| api_handler(r, always_panic_handler))
2208 625 : .post("/v1/tracing/event", |r| {
2209 0 : testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
2210 625 : })
2211 625 : .get(
2212 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage",
2213 625 : |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
2214 625 : )
2215 625 : .get(
2216 625 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
2217 625 : |r| api_handler(r, timeline_collect_keyspace),
2218 625 : )
2219 625 : .put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
2220 625 : .any(handler_404))
2221 625 : }
|