Line data Source code
1 : //!
2 : //! Management HTTP API
3 : //!
4 : use std::collections::HashMap;
5 : use std::str::FromStr;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use anyhow::{anyhow, Context, Result};
10 : use enumset::EnumSet;
11 : use futures::TryFutureExt;
12 : use humantime::format_rfc3339;
13 : use hyper::header;
14 : use hyper::StatusCode;
15 : use hyper::{Body, Request, Response, Uri};
16 : use metrics::launch_timestamp::LaunchTimestamp;
17 : use pageserver_api::models::LocationConfigListResponse;
18 : use pageserver_api::models::ShardParameters;
19 : use pageserver_api::models::TenantDetails;
20 : use pageserver_api::models::TenantLocationConfigResponse;
21 : use pageserver_api::models::TenantShardLocation;
22 : use pageserver_api::models::TenantShardSplitRequest;
23 : use pageserver_api::models::TenantShardSplitResponse;
24 : use pageserver_api::models::TenantState;
25 : use pageserver_api::models::{
26 : DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
27 : TenantLoadRequest, TenantLocationConfigRequest,
28 : };
29 : use pageserver_api::shard::ShardCount;
30 : use pageserver_api::shard::TenantShardId;
31 : use remote_storage::GenericRemoteStorage;
32 : use remote_storage::TimeTravelError;
33 : use tenant_size_model::{SizeResult, StorageModel};
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::*;
36 : use utils::auth::JwtAuth;
37 : use utils::failpoint_support::failpoints_handler;
38 : use utils::http::endpoint::request_span;
39 : use utils::http::json::json_request_or_empty_body;
40 : use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
41 :
42 : use crate::context::{DownloadBehavior, RequestContext};
43 : use crate::deletion_queue::DeletionQueueClient;
44 : use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
45 : use crate::pgdatadir_mapping::LsnForTimestamp;
46 : use crate::task_mgr::TaskKind;
47 : use crate::tenant::config::{LocationConf, TenantConfOpt};
48 : use crate::tenant::mgr::GetActiveTenantError;
49 : use crate::tenant::mgr::{
50 : GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
51 : TenantSlotError, TenantSlotUpsertError, TenantStateError,
52 : };
53 : use crate::tenant::mgr::{TenantSlot, UpsertLocationError};
54 : use crate::tenant::remote_timeline_client;
55 : use crate::tenant::secondary::SecondaryController;
56 : use crate::tenant::size::ModelInputs;
57 : use crate::tenant::storage_layer::LayerAccessStatsReset;
58 : use crate::tenant::timeline::CompactFlags;
59 : use crate::tenant::timeline::Timeline;
60 : use crate::tenant::SpawnMode;
61 : use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
62 : use crate::{config::PageServerConf, tenant::mgr};
63 : use crate::{disk_usage_eviction_task, tenant};
64 : use pageserver_api::models::{
65 : StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
66 : TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
67 : };
68 : use utils::{
69 : auth::SwappableJwtAuth,
70 : generation::Generation,
71 : http::{
72 : endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
73 : error::{ApiError, HttpErrorBody},
74 : json::{json_request, json_response},
75 : request::parse_request_param,
76 : RequestExt, RouterBuilder,
77 : },
78 : id::{TenantId, TimelineId},
79 : lsn::Lsn,
80 : };
81 :
82 : // For APIs that require an Active tenant, how long should we block waiting for that state?
83 : // This is not functionally necessary (clients will retry), but avoids generating a lot of
84 : // failed API calls while tenants are activating.
85 : #[cfg(not(feature = "testing"))]
86 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
87 :
88 : // Tests run on slow/oversubscribed nodes, and may need to wait much longer for tenants to
89 : // finish attaching, if calls to remote storage are slow.
90 : #[cfg(feature = "testing")]
91 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
92 :
93 : pub struct State {
94 : conf: &'static PageServerConf,
95 : tenant_manager: Arc<TenantManager>,
96 : auth: Option<Arc<SwappableJwtAuth>>,
97 : allowlist_routes: Vec<Uri>,
98 : remote_storage: Option<GenericRemoteStorage>,
99 : broker_client: storage_broker::BrokerClientChannel,
100 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
101 : deletion_queue_client: DeletionQueueClient,
102 : secondary_controller: SecondaryController,
103 : }
104 :
105 : impl State {
106 : #[allow(clippy::too_many_arguments)]
107 624 : pub fn new(
108 624 : conf: &'static PageServerConf,
109 624 : tenant_manager: Arc<TenantManager>,
110 624 : auth: Option<Arc<SwappableJwtAuth>>,
111 624 : remote_storage: Option<GenericRemoteStorage>,
112 624 : broker_client: storage_broker::BrokerClientChannel,
113 624 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
114 624 : deletion_queue_client: DeletionQueueClient,
115 624 : secondary_controller: SecondaryController,
116 624 : ) -> anyhow::Result<Self> {
117 624 : let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
118 624 : .iter()
119 2496 : .map(|v| v.parse().unwrap())
120 624 : .collect::<Vec<_>>();
121 624 : Ok(Self {
122 624 : conf,
123 624 : tenant_manager,
124 624 : auth,
125 624 : allowlist_routes,
126 624 : remote_storage,
127 624 : broker_client,
128 624 : disk_usage_eviction_state,
129 624 : deletion_queue_client,
130 624 : secondary_controller,
131 624 : })
132 624 : }
133 : }
134 :
135 : #[inline(always)]
136 2840 : fn get_state(request: &Request<Body>) -> &State {
137 2840 : request
138 2840 : .data::<Arc<State>>()
139 2840 : .expect("unknown state type")
140 2840 : .as_ref()
141 2840 : }
142 :
143 : #[inline(always)]
144 636 : fn get_config(request: &Request<Body>) -> &'static PageServerConf {
145 636 : get_state(request).conf
146 636 : }
147 :
148 : /// Check that the requester is authorized to operate on given tenant
149 9345 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
150 9345 : check_permission_with(request, |claims| {
151 81 : crate::auth::check_permission(claims, tenant_id)
152 9345 : })
153 9345 : }
154 :
155 : impl From<PageReconstructError> for ApiError {
156 2 : fn from(pre: PageReconstructError) -> ApiError {
157 2 : match pre {
158 2 : PageReconstructError::Other(pre) => ApiError::InternalServerError(pre),
159 : PageReconstructError::Cancelled => {
160 0 : ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
161 : }
162 : PageReconstructError::AncestorStopping(_) => {
163 0 : ApiError::ResourceUnavailable(format!("{pre}").into())
164 : }
165 0 : PageReconstructError::AncestorLsnTimeout(e) => ApiError::Timeout(format!("{e}").into()),
166 0 : PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
167 : }
168 2 : }
169 : }
170 :
171 : impl From<TenantMapInsertError> for ApiError {
172 1 : fn from(tmie: TenantMapInsertError) -> ApiError {
173 1 : match tmie {
174 1 : TenantMapInsertError::SlotError(e) => e.into(),
175 0 : TenantMapInsertError::SlotUpsertError(e) => e.into(),
176 0 : TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
177 : }
178 1 : }
179 : }
180 :
181 : impl From<TenantSlotError> for ApiError {
182 21 : fn from(e: TenantSlotError) -> ApiError {
183 21 : use TenantSlotError::*;
184 21 : match e {
185 17 : NotFound(tenant_id) => {
186 17 : ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
187 : }
188 1 : e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")),
189 : InProgress => {
190 3 : ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
191 : }
192 0 : MapState(e) => e.into(),
193 : }
194 21 : }
195 : }
196 :
197 : impl From<TenantSlotUpsertError> for ApiError {
198 0 : fn from(e: TenantSlotUpsertError) -> ApiError {
199 0 : use TenantSlotUpsertError::*;
200 0 : match e {
201 0 : InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")),
202 0 : MapState(e) => e.into(),
203 0 : ShuttingDown(_) => ApiError::ShuttingDown,
204 : }
205 0 : }
206 : }
207 :
208 : impl From<UpsertLocationError> for ApiError {
209 84 : fn from(e: UpsertLocationError) -> ApiError {
210 84 : use UpsertLocationError::*;
211 84 : match e {
212 29 : BadRequest(e) => ApiError::BadRequest(e),
213 34 : Unavailable(_) => ApiError::ShuttingDown,
214 20 : e @ InProgress => ApiError::Conflict(format!("{e}")),
215 1 : Flush(e) | Other(e) => ApiError::InternalServerError(e),
216 : }
217 84 : }
218 : }
219 :
220 : impl From<TenantMapError> for ApiError {
221 0 : fn from(e: TenantMapError) -> ApiError {
222 0 : use TenantMapError::*;
223 0 : match e {
224 : StillInitializing | ShuttingDown => {
225 0 : ApiError::ResourceUnavailable(format!("{e}").into())
226 0 : }
227 0 : }
228 0 : }
229 : }
230 :
231 : impl From<TenantStateError> for ApiError {
232 3 : fn from(tse: TenantStateError) -> ApiError {
233 3 : match tse {
234 : TenantStateError::IsStopping(_) => {
235 0 : ApiError::ResourceUnavailable("Tenant is stopping".into())
236 : }
237 3 : TenantStateError::SlotError(e) => e.into(),
238 0 : TenantStateError::SlotUpsertError(e) => e.into(),
239 0 : TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)),
240 : }
241 3 : }
242 : }
243 :
244 : impl From<GetTenantError> for ApiError {
245 79 : fn from(tse: GetTenantError) -> ApiError {
246 79 : match tse {
247 68 : GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
248 0 : GetTenantError::Broken(reason) => {
249 0 : ApiError::InternalServerError(anyhow!("tenant is broken: {}", reason))
250 : }
251 : GetTenantError::NotActive(_) => {
252 : // Why is this not `ApiError::NotFound`?
253 : // Because we must be careful to never return 404 for a tenant if it does
254 : // in fact exist locally. If we did, the caller could draw the conclusion
255 : // that it can attach the tenant to another PS and we'd be in split-brain.
256 : //
257 : // (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
258 11 : ApiError::ResourceUnavailable("Tenant not yet active".into())
259 : }
260 0 : GetTenantError::MapState(e) => ApiError::ResourceUnavailable(format!("{e}").into()),
261 : }
262 79 : }
263 : }
264 :
265 : impl From<GetActiveTenantError> for ApiError {
266 9 : fn from(e: GetActiveTenantError) -> ApiError {
267 9 : match e {
268 2 : GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)),
269 7 : GetActiveTenantError::Cancelled => ApiError::ShuttingDown,
270 0 : GetActiveTenantError::NotFound(gte) => gte.into(),
271 : GetActiveTenantError::WaitForActiveTimeout { .. } => {
272 0 : ApiError::ResourceUnavailable(format!("{}", e).into())
273 : }
274 : }
275 9 : }
276 : }
277 :
278 : impl From<SetNewTenantConfigError> for ApiError {
279 0 : fn from(e: SetNewTenantConfigError) -> ApiError {
280 0 : match e {
281 0 : SetNewTenantConfigError::GetTenant(tid) => {
282 0 : ApiError::NotFound(anyhow!("tenant {}", tid).into())
283 : }
284 0 : e @ (SetNewTenantConfigError::Persist(_) | SetNewTenantConfigError::Other(_)) => {
285 0 : ApiError::InternalServerError(anyhow::Error::new(e))
286 : }
287 : }
288 0 : }
289 : }
290 :
291 : impl From<crate::tenant::DeleteTimelineError> for ApiError {
292 11 : fn from(value: crate::tenant::DeleteTimelineError) -> Self {
293 11 : use crate::tenant::DeleteTimelineError::*;
294 11 : match value {
295 3 : NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
296 1 : HasChildren(children) => ApiError::PreconditionFailed(
297 1 : format!("Cannot delete timeline which has child timelines: {children:?}")
298 1 : .into_boxed_str(),
299 1 : ),
300 3 : a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
301 4 : Other(e) => ApiError::InternalServerError(e),
302 : }
303 11 : }
304 : }
305 :
306 : impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
307 0 : fn from(value: crate::tenant::mgr::DeleteTimelineError) -> Self {
308 : use crate::tenant::mgr::DeleteTimelineError::*;
309 0 : match value {
310 : // Report Precondition failed so client can distinguish between
311 : // "tenant is missing" case from "timeline is missing"
312 0 : Tenant(GetTenantError::NotFound(..)) => ApiError::PreconditionFailed(
313 0 : "Requested tenant is missing".to_owned().into_boxed_str(),
314 0 : ),
315 0 : Tenant(t) => ApiError::from(t),
316 0 : Timeline(t) => ApiError::from(t),
317 : }
318 0 : }
319 : }
320 :
321 : impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
322 35 : fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
323 35 : use crate::tenant::delete::DeleteTenantError::*;
324 35 : match value {
325 0 : Get(g) => ApiError::from(g),
326 0 : e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
327 0 : Timeline(t) => ApiError::from(t),
328 0 : NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
329 17 : SlotError(e) => e.into(),
330 0 : SlotUpsertError(e) => e.into(),
331 16 : Other(o) => ApiError::InternalServerError(o),
332 2 : e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
333 0 : Cancelled => ApiError::ShuttingDown,
334 : }
335 35 : }
336 : }
337 :
338 : // Helper function to construct a TimelineInfo struct for a timeline
339 2222 : async fn build_timeline_info(
340 2222 : timeline: &Arc<Timeline>,
341 2222 : include_non_incremental_logical_size: bool,
342 2222 : force_await_initial_logical_size: bool,
343 2222 : ctx: &RequestContext,
344 2222 : ) -> anyhow::Result<TimelineInfo> {
345 2222 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
346 2222 :
347 2222 : if force_await_initial_logical_size {
348 11 : timeline.clone().await_initial_logical_size().await
349 2211 : }
350 :
351 2222 : let mut info = build_timeline_info_common(
352 2222 : timeline,
353 2222 : ctx,
354 2222 : tenant::timeline::GetLogicalSizePriority::Background,
355 2222 : )
356 11 : .await?;
357 2222 : if include_non_incremental_logical_size {
358 : // XXX we should be using spawn_ondemand_logical_size_calculation here.
359 : // Otherwise, if someone deletes the timeline / detaches the tenant while
360 : // we're executing this function, we will outlive the timeline on-disk state.
361 : info.current_logical_size_non_incremental = Some(
362 37 : timeline
363 37 : .get_current_logical_size_non_incremental(info.last_record_lsn, ctx)
364 2467 : .await?,
365 : );
366 2185 : }
367 2222 : Ok(info)
368 2222 : }
369 :
370 3090 : async fn build_timeline_info_common(
371 3090 : timeline: &Arc<Timeline>,
372 3090 : ctx: &RequestContext,
373 3090 : logical_size_task_priority: tenant::timeline::GetLogicalSizePriority,
374 3090 : ) -> anyhow::Result<TimelineInfo> {
375 3090 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
376 3090 : let initdb_lsn = timeline.initdb_lsn;
377 3090 : let last_record_lsn = timeline.get_last_record_lsn();
378 3090 : let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
379 3090 : let guard = timeline.last_received_wal.lock().unwrap();
380 3090 : if let Some(info) = guard.as_ref() {
381 1904 : (
382 1904 : Some(format!("{:?}", info.wal_source_connconf)), // Password is hidden, but it's for statistics only.
383 1904 : Some(info.last_received_msg_lsn),
384 1904 : Some(info.last_received_msg_ts),
385 1904 : )
386 : } else {
387 1186 : (None, None, None)
388 : }
389 : };
390 :
391 3090 : let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
392 3090 : let ancestor_lsn = match timeline.get_ancestor_lsn() {
393 2378 : Lsn(0) => None,
394 712 : lsn @ Lsn(_) => Some(lsn),
395 : };
396 3090 : let current_logical_size = timeline.get_current_logical_size(logical_size_task_priority, ctx);
397 3090 : let current_physical_size = Some(timeline.layer_size_sum().await);
398 3090 : let state = timeline.current_state();
399 3090 : let remote_consistent_lsn_projected = timeline
400 3090 : .get_remote_consistent_lsn_projected()
401 3090 : .unwrap_or(Lsn(0));
402 3090 : let remote_consistent_lsn_visible = timeline
403 3090 : .get_remote_consistent_lsn_visible()
404 3090 : .unwrap_or(Lsn(0));
405 3090 :
406 3090 : let walreceiver_status = timeline.walreceiver_status();
407 :
408 3090 : let info = TimelineInfo {
409 3090 : tenant_id: timeline.tenant_shard_id,
410 3090 : timeline_id: timeline.timeline_id,
411 3090 : ancestor_timeline_id,
412 3090 : ancestor_lsn,
413 3090 : disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
414 3090 : remote_consistent_lsn: remote_consistent_lsn_projected,
415 3090 : remote_consistent_lsn_visible,
416 3090 : initdb_lsn,
417 3090 : last_record_lsn,
418 3090 : prev_record_lsn: Some(timeline.get_prev_record_lsn()),
419 3090 : latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
420 3090 : current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
421 3090 : current_logical_size_is_accurate: match current_logical_size.accuracy() {
422 594 : tenant::timeline::logical_size::Accuracy::Approximate => false,
423 2496 : tenant::timeline::logical_size::Accuracy::Exact => true,
424 : },
425 3090 : current_physical_size,
426 3090 : current_logical_size_non_incremental: None,
427 3090 : timeline_dir_layer_file_size_sum: None,
428 3090 : wal_source_connstr,
429 3090 : last_received_msg_lsn,
430 3090 : last_received_msg_ts,
431 3090 : pg_version: timeline.pg_version,
432 3090 :
433 3090 : state,
434 3090 :
435 3090 : walreceiver_status,
436 3090 : };
437 3090 : Ok(info)
438 3090 : }
439 :
440 : // healthcheck handler
441 630 : async fn status_handler(
442 630 : request: Request<Body>,
443 630 : _cancel: CancellationToken,
444 630 : ) -> Result<Response<Body>, ApiError> {
445 630 : check_permission(&request, None)?;
446 630 : let config = get_config(&request);
447 630 : json_response(StatusCode::OK, StatusResponse { id: config.id })
448 630 : }
449 :
450 6 : async fn reload_auth_validation_keys_handler(
451 6 : request: Request<Body>,
452 6 : _cancel: CancellationToken,
453 6 : ) -> Result<Response<Body>, ApiError> {
454 6 : check_permission(&request, None)?;
455 6 : let config = get_config(&request);
456 6 : let state = get_state(&request);
457 6 : let Some(shared_auth) = &state.auth else {
458 0 : return json_response(StatusCode::BAD_REQUEST, ());
459 : };
460 : // unwrap is ok because check is performed when creating config, so path is set and exists
461 6 : let key_path = config.auth_validation_public_key_path.as_ref().unwrap();
462 6 : info!("Reloading public key(s) for verifying JWT tokens from {key_path:?}");
463 :
464 6 : match JwtAuth::from_key_path(key_path) {
465 6 : Ok(new_auth) => {
466 6 : shared_auth.swap(new_auth);
467 6 : json_response(StatusCode::OK, ())
468 : }
469 0 : Err(e) => {
470 0 : warn!("Error reloading public keys from {key_path:?}: {e:}");
471 0 : json_response(StatusCode::INTERNAL_SERVER_ERROR, ())
472 : }
473 : }
474 6 : }
475 :
476 900 : async fn timeline_create_handler(
477 900 : mut request: Request<Body>,
478 900 : _cancel: CancellationToken,
479 900 : ) -> Result<Response<Body>, ApiError> {
480 900 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
481 900 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
482 900 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
483 :
484 899 : let new_timeline_id = request_data.new_timeline_id;
485 899 :
486 899 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
487 899 :
488 899 : let state = get_state(&request);
489 :
490 899 : async {
491 899 : let tenant = state
492 899 : .tenant_manager
493 899 : .get_attached_tenant_shard(tenant_shard_id, false)?;
494 :
495 896 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
496 :
497 894 : if let Some(ancestor_id) = request_data.ancestor_timeline_id.as_ref() {
498 268 : tracing::info!(%ancestor_id, "starting to branch");
499 : } else {
500 626 : tracing::info!("bootstrapping");
501 : }
502 :
503 894 : match tenant
504 894 : .create_timeline(
505 894 : new_timeline_id,
506 894 : request_data.ancestor_timeline_id,
507 894 : request_data.ancestor_start_lsn,
508 894 : request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
509 894 : request_data.existing_initdb_timeline_id,
510 894 : state.broker_client.clone(),
511 894 : &ctx,
512 894 : )
513 7683365 : .await
514 : {
515 868 : Ok(new_timeline) => {
516 : // Created. Construct a TimelineInfo for it.
517 868 : let timeline_info = build_timeline_info_common(
518 868 : &new_timeline,
519 868 : &ctx,
520 868 : tenant::timeline::GetLogicalSizePriority::User,
521 868 : )
522 0 : .await
523 868 : .map_err(ApiError::InternalServerError)?;
524 868 : json_response(StatusCode::CREATED, timeline_info)
525 : }
526 20 : Err(_) if tenant.cancel.is_cancelled() => {
527 3 : // In case we get some ugly error type during shutdown, cast it into a clean 503.
528 3 : json_response(
529 3 : StatusCode::SERVICE_UNAVAILABLE,
530 3 : HttpErrorBody::from_msg("Tenant shutting down".to_string()),
531 3 : )
532 : }
533 : Err(
534 : tenant::CreateTimelineError::Conflict
535 : | tenant::CreateTimelineError::AlreadyCreating,
536 0 : ) => json_response(StatusCode::CONFLICT, ()),
537 8 : Err(tenant::CreateTimelineError::AncestorLsn(err)) => json_response(
538 8 : StatusCode::NOT_ACCEPTABLE,
539 8 : HttpErrorBody::from_msg(format!("{err:#}")),
540 8 : ),
541 6 : Err(e @ tenant::CreateTimelineError::AncestorNotActive) => json_response(
542 6 : StatusCode::SERVICE_UNAVAILABLE,
543 6 : HttpErrorBody::from_msg(e.to_string()),
544 6 : ),
545 0 : Err(tenant::CreateTimelineError::ShuttingDown) => json_response(
546 0 : StatusCode::SERVICE_UNAVAILABLE,
547 0 : HttpErrorBody::from_msg("tenant shutting down".to_string()),
548 0 : ),
549 3 : Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
550 : }
551 893 : }
552 : .instrument(info_span!("timeline_create",
553 : tenant_id = %tenant_shard_id.tenant_id,
554 899 : shard_id = %tenant_shard_id.shard_slug(),
555 : timeline_id = %new_timeline_id,
556 : lsn=?request_data.ancestor_start_lsn,
557 : pg_version=?request_data.pg_version
558 : ))
559 7683511 : .await
560 894 : }
561 :
562 64 : async fn timeline_list_handler(
563 64 : request: Request<Body>,
564 64 : _cancel: CancellationToken,
565 64 : ) -> Result<Response<Body>, ApiError> {
566 64 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
567 64 : let include_non_incremental_logical_size: Option<bool> =
568 64 : parse_query_param(&request, "include-non-incremental-logical-size")?;
569 64 : let force_await_initial_logical_size: Option<bool> =
570 64 : parse_query_param(&request, "force-await-initial-logical-size")?;
571 64 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
572 :
573 64 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
574 :
575 64 : let response_data = async {
576 64 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
577 60 : let timelines = tenant.list_timelines();
578 60 :
579 60 : let mut response_data = Vec::with_capacity(timelines.len());
580 162 : for timeline in timelines {
581 102 : let timeline_info = build_timeline_info(
582 102 : &timeline,
583 102 : include_non_incremental_logical_size.unwrap_or(false),
584 102 : force_await_initial_logical_size.unwrap_or(false),
585 102 : &ctx,
586 102 : )
587 102 : .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
588 0 : .await
589 102 : .context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
590 102 : .map_err(ApiError::InternalServerError)?;
591 :
592 102 : response_data.push(timeline_info);
593 : }
594 60 : Ok::<Vec<TimelineInfo>, ApiError>(response_data)
595 64 : }
596 : .instrument(info_span!("timeline_list",
597 : tenant_id = %tenant_shard_id.tenant_id,
598 64 : shard_id = %tenant_shard_id.shard_slug()))
599 4 : .await?;
600 :
601 60 : json_response(StatusCode::OK, response_data)
602 64 : }
603 :
604 2 : async fn timeline_preserve_initdb_handler(
605 2 : request: Request<Body>,
606 2 : _cancel: CancellationToken,
607 2 : ) -> Result<Response<Body>, ApiError> {
608 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
609 2 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
610 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
611 :
612 : // Part of the process for disaster recovery from safekeeper-stored WAL:
613 : // If we don't recover into a new timeline but want to keep the timeline ID,
614 : // then the initdb archive is deleted. This endpoint copies it to a different
615 : // location where timeline recreation cand find it.
616 :
617 2 : async {
618 2 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
619 :
620 2 : let timeline = tenant
621 2 : .get_timeline(timeline_id, false)
622 2 : .map_err(|e| ApiError::NotFound(e.into()))?;
623 :
624 2 : timeline
625 2 : .preserve_initdb_archive()
626 2 : .await
627 2 : .context("preserving initdb archive")
628 2 : .map_err(ApiError::InternalServerError)?;
629 :
630 2 : Ok::<_, ApiError>(())
631 2 : }
632 : .instrument(info_span!("timeline_preserve_initdb_archive",
633 : tenant_id = %tenant_shard_id.tenant_id,
634 2 : shard_id = %tenant_shard_id.shard_slug(),
635 : %timeline_id))
636 2 : .await?;
637 :
638 2 : json_response(StatusCode::OK, ())
639 2 : }
640 :
641 2179 : async fn timeline_detail_handler(
642 2179 : request: Request<Body>,
643 2179 : _cancel: CancellationToken,
644 2179 : ) -> Result<Response<Body>, ApiError> {
645 2179 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
646 2179 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
647 2179 : let include_non_incremental_logical_size: Option<bool> =
648 2179 : parse_query_param(&request, "include-non-incremental-logical-size")?;
649 2179 : let force_await_initial_logical_size: Option<bool> =
650 2179 : parse_query_param(&request, "force-await-initial-logical-size")?;
651 2179 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
652 :
653 : // Logical size calculation needs downloading.
654 2179 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
655 :
656 2179 : let timeline_info = async {
657 2179 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
658 :
659 2170 : let timeline = tenant
660 2170 : .get_timeline(timeline_id, false)
661 2170 : .map_err(|e| ApiError::NotFound(e.into()))?;
662 :
663 2120 : let timeline_info = build_timeline_info(
664 2120 : &timeline,
665 2120 : include_non_incremental_logical_size.unwrap_or(false),
666 2120 : force_await_initial_logical_size.unwrap_or(false),
667 2120 : &ctx,
668 2120 : )
669 2484 : .await
670 2120 : .context("get local timeline info")
671 2120 : .map_err(ApiError::InternalServerError)?;
672 :
673 2120 : Ok::<_, ApiError>(timeline_info)
674 2179 : }
675 : .instrument(info_span!("timeline_detail",
676 : tenant_id = %tenant_shard_id.tenant_id,
677 2179 : shard_id = %tenant_shard_id.shard_slug(),
678 : %timeline_id))
679 2484 : .await?;
680 :
681 2120 : json_response(StatusCode::OK, timeline_info)
682 2179 : }
683 :
684 13 : async fn get_lsn_by_timestamp_handler(
685 13 : request: Request<Body>,
686 13 : cancel: CancellationToken,
687 13 : ) -> Result<Response<Body>, ApiError> {
688 13 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
689 13 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
690 :
691 13 : if !tenant_shard_id.is_zero() {
692 : // Requires SLRU contents, which are only stored on shard zero
693 0 : return Err(ApiError::BadRequest(anyhow!(
694 0 : "Size calculations are only available on shard zero"
695 0 : )));
696 13 : }
697 :
698 13 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
699 13 : let timestamp_raw = must_get_query_param(&request, "timestamp")?;
700 13 : let timestamp = humantime::parse_rfc3339(×tamp_raw)
701 13 : .with_context(|| format!("Invalid time: {:?}", timestamp_raw))
702 13 : .map_err(ApiError::BadRequest)?;
703 13 : let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
704 13 :
705 13 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
706 13 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
707 13 : let result = timeline
708 13 : .find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
709 1337 : .await?;
710 13 : #[derive(serde::Serialize, Debug)]
711 : struct Result {
712 : lsn: Lsn,
713 : kind: &'static str,
714 : }
715 13 : let (lsn, kind) = match result {
716 10 : LsnForTimestamp::Present(lsn) => (lsn, "present"),
717 1 : LsnForTimestamp::Future(lsn) => (lsn, "future"),
718 2 : LsnForTimestamp::Past(lsn) => (lsn, "past"),
719 0 : LsnForTimestamp::NoData(lsn) => (lsn, "nodata"),
720 : };
721 13 : let result = Result { lsn, kind };
722 13 : tracing::info!(
723 13 : lsn=?result.lsn,
724 13 : kind=%result.kind,
725 13 : timestamp=%timestamp_raw,
726 13 : "lsn_by_timestamp finished"
727 13 : );
728 13 : json_response(StatusCode::OK, result)
729 13 : }
730 :
731 12 : async fn get_timestamp_of_lsn_handler(
732 12 : request: Request<Body>,
733 12 : _cancel: CancellationToken,
734 12 : ) -> Result<Response<Body>, ApiError> {
735 12 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
736 12 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
737 :
738 12 : if !tenant_shard_id.is_zero() {
739 : // Requires SLRU contents, which are only stored on shard zero
740 0 : return Err(ApiError::BadRequest(anyhow!(
741 0 : "Size calculations are only available on shard zero"
742 0 : )));
743 12 : }
744 :
745 12 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
746 :
747 12 : let lsn_str = must_get_query_param(&request, "lsn")?;
748 12 : let lsn = Lsn::from_str(&lsn_str)
749 12 : .with_context(|| format!("Invalid LSN: {lsn_str:?}"))
750 12 : .map_err(ApiError::BadRequest)?;
751 :
752 12 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
753 12 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
754 80 : let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;
755 :
756 10 : match result {
757 10 : Some(time) => {
758 10 : let time = format_rfc3339(postgres_ffi::from_pg_timestamp(time)).to_string();
759 10 : json_response(StatusCode::OK, time)
760 : }
761 0 : None => json_response(StatusCode::NOT_FOUND, ()),
762 : }
763 12 : }
764 :
765 41 : async fn tenant_attach_handler(
766 41 : mut request: Request<Body>,
767 41 : _cancel: CancellationToken,
768 41 : ) -> Result<Response<Body>, ApiError> {
769 41 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
770 41 : check_permission(&request, Some(tenant_id))?;
771 :
772 41 : let maybe_body: Option<TenantAttachRequest> = json_request_or_empty_body(&mut request).await?;
773 38 : let tenant_conf = match &maybe_body {
774 38 : Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?,
775 0 : None => TenantConfOpt::default(),
776 : };
777 :
778 38 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
779 :
780 38 : info!("Handling tenant attach {tenant_id}");
781 :
782 38 : let state = get_state(&request);
783 :
784 38 : let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
785 :
786 38 : if state.remote_storage.is_none() {
787 0 : return Err(ApiError::BadRequest(anyhow!(
788 0 : "attach_tenant is not possible because pageserver was configured without remote storage"
789 0 : )));
790 38 : }
791 38 :
792 38 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
793 38 : let shard_params = ShardParameters::default();
794 38 : let location_conf = LocationConf::attached_single(tenant_conf, generation, &shard_params);
795 :
796 38 : let tenant = state
797 38 : .tenant_manager
798 38 : .upsert_location(
799 38 : tenant_shard_id,
800 38 : location_conf,
801 38 : None,
802 38 : SpawnMode::Normal,
803 38 : &ctx,
804 38 : )
805 108 : .await?;
806 :
807 38 : let Some(tenant) = tenant else {
808 : // This should never happen: indicates a bug in upsert_location
809 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
810 0 : "Upsert succeeded but didn't return tenant!"
811 0 : )));
812 : };
813 :
814 : // We might have successfully constructed a Tenant, but it could still
815 : // end up in a broken state:
816 : if let TenantState::Broken {
817 6 : reason,
818 : backtrace: _,
819 38 : } = tenant.current_state()
820 : {
821 6 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
822 6 : "Tenant state is Broken: {reason}"
823 6 : )));
824 32 : }
825 32 :
826 32 : json_response(StatusCode::ACCEPTED, ())
827 41 : }
828 :
829 65 : async fn timeline_delete_handler(
830 65 : request: Request<Body>,
831 65 : _cancel: CancellationToken,
832 65 : ) -> Result<Response<Body>, ApiError> {
833 65 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
834 65 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
835 65 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
836 :
837 65 : let state = get_state(&request);
838 :
839 65 : let tenant = state
840 65 : .tenant_manager
841 65 : .get_attached_tenant_shard(tenant_shard_id, false)
842 65 : .map_err(|e| {
843 1 : match e {
844 : // GetTenantError has a built-in conversion to ApiError, but in this context we don't
845 : // want to treat missing tenants as 404, to avoid ambiguity with successful deletions.
846 1 : GetTenantError::NotFound(_) => ApiError::PreconditionFailed(
847 1 : "Requested tenant is missing".to_string().into_boxed_str(),
848 1 : ),
849 0 : e => e.into(),
850 : }
851 65 : })?;
852 64 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
853 64 : tenant.delete_timeline(timeline_id).instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
854 370 : .await?;
855 :
856 53 : json_response(StatusCode::ACCEPTED, ())
857 65 : }
858 :
859 38 : async fn tenant_detach_handler(
860 38 : request: Request<Body>,
861 38 : _cancel: CancellationToken,
862 38 : ) -> Result<Response<Body>, ApiError> {
863 38 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
864 38 : check_permission(&request, Some(tenant_id))?;
865 38 : let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
866 :
867 : // This is a legacy API (`/location_conf` is the replacement). It only supports unsharded tenants
868 38 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
869 38 :
870 38 : let state = get_state(&request);
871 38 : let conf = state.conf;
872 38 : mgr::detach_tenant(
873 38 : conf,
874 38 : tenant_shard_id,
875 38 : detach_ignored.unwrap_or(false),
876 38 : &state.deletion_queue_client,
877 38 : )
878 38 : .instrument(info_span!("tenant_detach", %tenant_id, shard_id=%tenant_shard_id.shard_slug()))
879 183 : .await?;
880 :
881 35 : json_response(StatusCode::OK, ())
882 38 : }
883 :
884 2 : async fn tenant_reset_handler(
885 2 : request: Request<Body>,
886 2 : _cancel: CancellationToken,
887 2 : ) -> Result<Response<Body>, ApiError> {
888 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
889 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
890 :
891 2 : let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
892 :
893 2 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
894 2 : let state = get_state(&request);
895 2 : state
896 2 : .tenant_manager
897 2 : .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &ctx)
898 6 : .await
899 2 : .map_err(ApiError::InternalServerError)?;
900 :
901 2 : json_response(StatusCode::OK, ())
902 2 : }
903 :
904 4 : async fn tenant_load_handler(
905 4 : mut request: Request<Body>,
906 4 : _cancel: CancellationToken,
907 4 : ) -> Result<Response<Body>, ApiError> {
908 4 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
909 4 : check_permission(&request, Some(tenant_id))?;
910 :
911 4 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
912 :
913 4 : let maybe_body: Option<TenantLoadRequest> = json_request_or_empty_body(&mut request).await?;
914 :
915 4 : let state = get_state(&request);
916 :
917 : // The /load request is only usable when control_plane_api is not set. Once it is set, callers
918 : // should always use /attach instead.
919 4 : let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
920 :
921 4 : mgr::load_tenant(
922 4 : state.conf,
923 4 : tenant_id,
924 4 : generation,
925 4 : state.broker_client.clone(),
926 4 : state.remote_storage.clone(),
927 4 : state.deletion_queue_client.clone(),
928 4 : &ctx,
929 4 : )
930 4 : .instrument(info_span!("load", %tenant_id))
931 6 : .await?;
932 :
933 3 : json_response(StatusCode::ACCEPTED, ())
934 4 : }
935 :
936 5 : async fn tenant_ignore_handler(
937 5 : request: Request<Body>,
938 5 : _cancel: CancellationToken,
939 5 : ) -> Result<Response<Body>, ApiError> {
940 5 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
941 5 : check_permission(&request, Some(tenant_id))?;
942 :
943 5 : let state = get_state(&request);
944 5 : let conf = state.conf;
945 5 : mgr::ignore_tenant(conf, tenant_id)
946 5 : .instrument(info_span!("ignore_tenant", %tenant_id))
947 16 : .await?;
948 :
949 5 : json_response(StatusCode::OK, ())
950 5 : }
951 :
952 183 : async fn tenant_list_handler(
953 183 : request: Request<Body>,
954 183 : _cancel: CancellationToken,
955 183 : ) -> Result<Response<Body>, ApiError> {
956 183 : check_permission(&request, None)?;
957 :
958 183 : let response_data = mgr::list_tenants()
959 183 : .instrument(info_span!("tenant_list"))
960 0 : .await
961 183 : .map_err(|_| {
962 0 : ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
963 183 : })?
964 183 : .iter()
965 183 : .map(|(id, state, gen)| TenantInfo {
966 181 : id: *id,
967 181 : state: state.clone(),
968 181 : current_physical_size: None,
969 181 : attachment_status: state.attachment_status(),
970 181 : generation: (*gen).into(),
971 183 : })
972 183 : .collect::<Vec<TenantInfo>>();
973 183 :
974 183 : json_response(StatusCode::OK, response_data)
975 183 : }
976 :
977 542 : async fn tenant_status(
978 542 : request: Request<Body>,
979 542 : _cancel: CancellationToken,
980 542 : ) -> Result<Response<Body>, ApiError> {
981 542 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
982 542 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
983 :
984 542 : let tenant_info = async {
985 542 : let tenant = mgr::get_tenant(tenant_shard_id, false)?;
986 :
987 : // Calculate total physical size of all timelines
988 479 : let mut current_physical_size = 0;
989 480 : for timeline in tenant.list_timelines().iter() {
990 480 : current_physical_size += timeline.layer_size_sum().await;
991 : }
992 :
993 479 : let state = tenant.current_state();
994 479 : Result::<_, ApiError>::Ok(TenantDetails {
995 479 : tenant_info: TenantInfo {
996 479 : id: tenant_shard_id,
997 479 : state: state.clone(),
998 479 : current_physical_size: Some(current_physical_size),
999 479 : attachment_status: state.attachment_status(),
1000 479 : generation: tenant.generation().into(),
1001 479 : },
1002 479 : walredo: tenant.wal_redo_manager_status(),
1003 479 : timelines: tenant.list_timeline_ids(),
1004 479 : })
1005 542 : }
1006 : .instrument(info_span!("tenant_status_handler",
1007 : tenant_id = %tenant_shard_id.tenant_id,
1008 542 : shard_id = %tenant_shard_id.shard_slug()))
1009 63 : .await?;
1010 :
1011 479 : json_response(StatusCode::OK, tenant_info)
1012 542 : }
1013 :
1014 121 : async fn tenant_delete_handler(
1015 121 : request: Request<Body>,
1016 121 : _cancel: CancellationToken,
1017 121 : ) -> Result<Response<Body>, ApiError> {
1018 : // TODO openapi spec
1019 121 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1020 121 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1021 :
1022 121 : let state = get_state(&request);
1023 121 :
1024 121 : state
1025 121 : .tenant_manager
1026 121 : .delete_tenant(tenant_shard_id, ACTIVE_TENANT_TIMEOUT)
1027 : .instrument(info_span!("tenant_delete_handler",
1028 : tenant_id = %tenant_shard_id.tenant_id,
1029 121 : shard_id = %tenant_shard_id.shard_slug()
1030 : ))
1031 516 : .await?;
1032 :
1033 86 : json_response(StatusCode::ACCEPTED, ())
1034 121 : }
1035 :
1036 : /// HTTP endpoint to query the current tenant_size of a tenant.
1037 : ///
1038 : /// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used
1039 : /// to debug any of the calculations. Requires `tenant_id` request parameter, supports
1040 : /// `inputs_only=true|false` (default false) which supports debugging failure to calculate model
1041 : /// values.
1042 : ///
1043 : /// 'retention_period' query parameter overrides the cutoff that is used to calculate the size
1044 : /// (only if it is shorter than the real cutoff).
1045 : ///
1046 : /// Note: we don't update the cached size and prometheus metric here.
1047 : /// The retention period might be different, and it's nice to have a method to just calculate it
1048 : /// without modifying anything anyway.
1049 55 : async fn tenant_size_handler(
1050 55 : request: Request<Body>,
1051 55 : cancel: CancellationToken,
1052 55 : ) -> Result<Response<Body>, ApiError> {
1053 55 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1054 55 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1055 55 : let inputs_only: Option<bool> = parse_query_param(&request, "inputs_only")?;
1056 55 : let retention_period: Option<u64> = parse_query_param(&request, "retention_period")?;
1057 55 : let headers = request.headers();
1058 55 :
1059 55 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1060 55 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
1061 :
1062 55 : if !tenant_shard_id.is_zero() {
1063 0 : return Err(ApiError::BadRequest(anyhow!(
1064 0 : "Size calculations are only available on shard zero"
1065 0 : )));
1066 55 : }
1067 :
1068 : // this can be long operation
1069 55 : let inputs = tenant
1070 55 : .gather_size_inputs(
1071 55 : retention_period,
1072 55 : LogicalSizeCalculationCause::TenantSizeHandler,
1073 55 : &cancel,
1074 55 : &ctx,
1075 55 : )
1076 44 : .await
1077 55 : .map_err(ApiError::InternalServerError)?;
1078 :
1079 55 : let mut sizes = None;
1080 55 : let accepts_html = headers
1081 55 : .get(header::ACCEPT)
1082 55 : .map(|v| v == "text/html")
1083 55 : .unwrap_or_default();
1084 55 : if !inputs_only.unwrap_or(false) {
1085 55 : let storage_model = inputs
1086 55 : .calculate_model()
1087 55 : .map_err(ApiError::InternalServerError)?;
1088 55 : let size = storage_model.calculate();
1089 55 :
1090 55 : // If request header expects html, return html
1091 55 : if accepts_html {
1092 19 : return synthetic_size_html_response(inputs, storage_model, size);
1093 36 : }
1094 36 : sizes = Some(size);
1095 0 : } else if accepts_html {
1096 0 : return Err(ApiError::BadRequest(anyhow!(
1097 0 : "inputs_only parameter is incompatible with html output request"
1098 0 : )));
1099 0 : }
1100 :
1101 : /// The type resides in the pageserver not to expose `ModelInputs`.
1102 36 : #[derive(serde::Serialize)]
1103 : struct TenantHistorySize {
1104 : id: TenantId,
1105 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
1106 : ///
1107 : /// Will be none if `?inputs_only=true` was given.
1108 : size: Option<u64>,
1109 : /// Size of each segment used in the model.
1110 : /// Will be null if `?inputs_only=true` was given.
1111 : segment_sizes: Option<Vec<tenant_size_model::SegmentSizeResult>>,
1112 : inputs: crate::tenant::size::ModelInputs,
1113 : }
1114 :
1115 36 : json_response(
1116 36 : StatusCode::OK,
1117 36 : TenantHistorySize {
1118 36 : id: tenant_shard_id.tenant_id,
1119 36 : size: sizes.as_ref().map(|x| x.total_size),
1120 36 : segment_sizes: sizes.map(|x| x.segments),
1121 36 : inputs,
1122 36 : },
1123 36 : )
1124 55 : }
1125 :
1126 5 : async fn tenant_shard_split_handler(
1127 5 : mut request: Request<Body>,
1128 5 : _cancel: CancellationToken,
1129 5 : ) -> Result<Response<Body>, ApiError> {
1130 5 : let req: TenantShardSplitRequest = json_request(&mut request).await?;
1131 :
1132 5 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1133 5 : let state = get_state(&request);
1134 5 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1135 :
1136 5 : let new_shards = state
1137 5 : .tenant_manager
1138 5 : .shard_split(tenant_shard_id, ShardCount(req.new_shard_count), &ctx)
1139 74 : .await
1140 5 : .map_err(ApiError::InternalServerError)?;
1141 :
1142 5 : json_response(StatusCode::OK, TenantShardSplitResponse { new_shards })
1143 5 : }
1144 :
1145 116 : async fn layer_map_info_handler(
1146 116 : request: Request<Body>,
1147 116 : _cancel: CancellationToken,
1148 116 : ) -> Result<Response<Body>, ApiError> {
1149 116 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1150 116 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1151 116 : let reset: LayerAccessStatsReset =
1152 116 : parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset);
1153 116 :
1154 116 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1155 :
1156 116 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1157 116 : let layer_map_info = timeline.layer_map_info(reset).await;
1158 :
1159 116 : json_response(StatusCode::OK, layer_map_info)
1160 116 : }
1161 :
1162 2 : async fn layer_download_handler(
1163 2 : request: Request<Body>,
1164 2 : _cancel: CancellationToken,
1165 2 : ) -> Result<Response<Body>, ApiError> {
1166 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1167 2 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1168 2 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
1169 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1170 :
1171 2 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1172 2 : let downloaded = timeline
1173 2 : .download_layer(layer_file_name)
1174 6 : .await
1175 2 : .map_err(ApiError::InternalServerError)?;
1176 :
1177 2 : match downloaded {
1178 2 : Some(true) => json_response(StatusCode::OK, ()),
1179 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
1180 0 : None => json_response(
1181 0 : StatusCode::BAD_REQUEST,
1182 0 : format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"),
1183 0 : ),
1184 : }
1185 2 : }
1186 :
1187 2242 : async fn evict_timeline_layer_handler(
1188 2242 : request: Request<Body>,
1189 2242 : _cancel: CancellationToken,
1190 2242 : ) -> Result<Response<Body>, ApiError> {
1191 2242 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1192 2242 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1193 2242 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1194 2242 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
1195 :
1196 2242 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1197 2242 : let evicted = timeline
1198 2242 : .evict_layer(layer_file_name)
1199 2218 : .await
1200 2242 : .map_err(ApiError::InternalServerError)?;
1201 :
1202 2242 : match evicted {
1203 2242 : Some(true) => json_response(StatusCode::OK, ()),
1204 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
1205 0 : None => json_response(
1206 0 : StatusCode::BAD_REQUEST,
1207 0 : format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"),
1208 0 : ),
1209 : }
1210 2242 : }
1211 :
1212 : /// Get tenant_size SVG graph along with the JSON data.
1213 19 : fn synthetic_size_html_response(
1214 19 : inputs: ModelInputs,
1215 19 : storage_model: StorageModel,
1216 19 : sizes: SizeResult,
1217 19 : ) -> Result<Response<Body>, ApiError> {
1218 19 : let mut timeline_ids: Vec<String> = Vec::new();
1219 19 : let mut timeline_map: HashMap<TimelineId, usize> = HashMap::new();
1220 27 : for (index, ti) in inputs.timeline_inputs.iter().enumerate() {
1221 27 : timeline_map.insert(ti.timeline_id, index);
1222 27 : timeline_ids.push(ti.timeline_id.to_string());
1223 27 : }
1224 19 : let seg_to_branch: Vec<usize> = inputs
1225 19 : .segments
1226 19 : .iter()
1227 68 : .map(|seg| *timeline_map.get(&seg.timeline_id).unwrap())
1228 19 : .collect();
1229 :
1230 19 : let svg =
1231 19 : tenant_size_model::svg::draw_svg(&storage_model, &timeline_ids, &seg_to_branch, &sizes)
1232 19 : .map_err(ApiError::InternalServerError)?;
1233 :
1234 19 : let mut response = String::new();
1235 19 :
1236 19 : use std::fmt::Write;
1237 19 : write!(response, "<html>\n<body>\n").unwrap();
1238 19 : write!(response, "<div>\n{svg}\n</div>").unwrap();
1239 19 : writeln!(response, "Project size: {}", sizes.total_size).unwrap();
1240 19 : writeln!(response, "<pre>").unwrap();
1241 19 : writeln!(
1242 19 : response,
1243 19 : "{}",
1244 19 : serde_json::to_string_pretty(&inputs).unwrap()
1245 19 : )
1246 19 : .unwrap();
1247 19 : writeln!(
1248 19 : response,
1249 19 : "{}",
1250 19 : serde_json::to_string_pretty(&sizes.segments).unwrap()
1251 19 : )
1252 19 : .unwrap();
1253 19 : writeln!(response, "</pre>").unwrap();
1254 19 : write!(response, "</body>\n</html>\n").unwrap();
1255 19 :
1256 19 : html_response(StatusCode::OK, response)
1257 19 : }
1258 :
1259 19 : pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>, ApiError> {
1260 19 : let response = Response::builder()
1261 19 : .status(status)
1262 19 : .header(header::CONTENT_TYPE, "text/html")
1263 19 : .body(Body::from(data.as_bytes().to_vec()))
1264 19 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1265 19 : Ok(response)
1266 19 : }
1267 :
1268 : /// Helper for requests that may take a generation, which is mandatory
1269 : /// when control_plane_api is set, but otherwise defaults to Generation::none()
1270 177 : fn get_request_generation(state: &State, req_gen: Option<u32>) -> Result<Generation, ApiError> {
1271 177 : if state.conf.control_plane_api.is_some() {
1272 177 : req_gen
1273 177 : .map(Generation::new)
1274 177 : .ok_or(ApiError::BadRequest(anyhow!(
1275 177 : "generation attribute missing"
1276 177 : )))
1277 : } else {
1278 : // Legacy mode: all tenants operate with no generation
1279 0 : Ok(Generation::none())
1280 : }
1281 177 : }
1282 :
1283 136 : async fn tenant_create_handler(
1284 136 : mut request: Request<Body>,
1285 136 : _cancel: CancellationToken,
1286 136 : ) -> Result<Response<Body>, ApiError> {
1287 136 : let request_data: TenantCreateRequest = json_request(&mut request).await?;
1288 136 : let target_tenant_id = request_data.new_tenant_id;
1289 136 : check_permission(&request, None)?;
1290 :
1291 135 : let _timer = STORAGE_TIME_GLOBAL
1292 135 : .get_metric_with_label_values(&[StorageTimeOperation::CreateTenant.into()])
1293 135 : .expect("bug")
1294 135 : .start_timer();
1295 :
1296 135 : let tenant_conf =
1297 135 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1298 :
1299 135 : let state = get_state(&request);
1300 :
1301 135 : let generation = get_request_generation(state, request_data.generation)?;
1302 :
1303 135 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1304 135 :
1305 135 : let location_conf =
1306 135 : LocationConf::attached_single(tenant_conf, generation, &request_data.shard_parameters);
1307 :
1308 135 : let new_tenant = state
1309 135 : .tenant_manager
1310 135 : .upsert_location(
1311 135 : target_tenant_id,
1312 135 : location_conf,
1313 135 : None,
1314 135 : SpawnMode::Create,
1315 135 : &ctx,
1316 135 : )
1317 275 : .await?;
1318 :
1319 52 : let Some(new_tenant) = new_tenant else {
1320 : // This should never happen: indicates a bug in upsert_location
1321 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1322 0 : "Upsert succeeded but didn't return tenant!"
1323 0 : )));
1324 : };
1325 : // We created the tenant. Existing API semantics are that the tenant
1326 : // is Active when this function returns.
1327 52 : new_tenant
1328 52 : .wait_to_become_active(ACTIVE_TENANT_TIMEOUT)
1329 78 : .await?;
1330 :
1331 45 : json_response(
1332 45 : StatusCode::CREATED,
1333 45 : TenantCreateResponse(new_tenant.tenant_shard_id().tenant_id),
1334 45 : )
1335 136 : }
1336 :
1337 45 : async fn get_tenant_config_handler(
1338 45 : request: Request<Body>,
1339 45 : _cancel: CancellationToken,
1340 45 : ) -> Result<Response<Body>, ApiError> {
1341 45 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1342 45 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1343 :
1344 45 : let tenant = mgr::get_tenant(tenant_shard_id, false)?;
1345 :
1346 45 : let response = HashMap::from([
1347 : (
1348 : "tenant_specific_overrides",
1349 45 : serde_json::to_value(tenant.tenant_specific_overrides())
1350 45 : .context("serializing tenant specific overrides")
1351 45 : .map_err(ApiError::InternalServerError)?,
1352 : ),
1353 : (
1354 45 : "effective_config",
1355 45 : serde_json::to_value(tenant.effective_config())
1356 45 : .context("serializing effective config")
1357 45 : .map_err(ApiError::InternalServerError)?,
1358 : ),
1359 : ]);
1360 :
1361 45 : json_response(StatusCode::OK, response)
1362 45 : }
1363 :
1364 30 : async fn update_tenant_config_handler(
1365 30 : mut request: Request<Body>,
1366 30 : _cancel: CancellationToken,
1367 30 : ) -> Result<Response<Body>, ApiError> {
1368 30 : let request_data: TenantConfigRequest = json_request(&mut request).await?;
1369 30 : let tenant_id = request_data.tenant_id;
1370 30 : check_permission(&request, Some(tenant_id))?;
1371 :
1372 30 : let tenant_conf =
1373 30 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1374 :
1375 30 : let state = get_state(&request);
1376 30 : mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
1377 30 : .instrument(info_span!("tenant_config", %tenant_id))
1378 60 : .await?;
1379 :
1380 30 : json_response(StatusCode::OK, ())
1381 30 : }
1382 :
1383 685 : async fn put_tenant_location_config_handler(
1384 685 : mut request: Request<Body>,
1385 685 : _cancel: CancellationToken,
1386 685 : ) -> Result<Response<Body>, ApiError> {
1387 685 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1388 :
1389 685 : let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
1390 685 : let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis);
1391 685 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1392 :
1393 685 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1394 685 : let state = get_state(&request);
1395 685 : let conf = state.conf;
1396 685 :
1397 685 : // The `Detached` state is special, it doesn't upsert a tenant, it removes
1398 685 : // its local disk content and drops it from memory.
1399 685 : if let LocationConfigMode::Detached = request_data.config.mode {
1400 10 : if let Err(e) =
1401 56 : mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
1402 : .instrument(info_span!("tenant_detach",
1403 : tenant_id = %tenant_shard_id.tenant_id,
1404 56 : shard_id = %tenant_shard_id.shard_slug()
1405 : ))
1406 216 : .await
1407 : {
1408 10 : match e {
1409 10 : TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
1410 10 : // This API is idempotent: a NotFound on a detach is fine.
1411 10 : }
1412 0 : _ => return Err(e.into()),
1413 : }
1414 46 : }
1415 56 : return json_response(StatusCode::OK, ());
1416 629 : }
1417 :
1418 629 : let location_conf =
1419 629 : LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1420 :
1421 629 : let attached = state
1422 629 : .tenant_manager
1423 629 : .upsert_location(
1424 629 : tenant_shard_id,
1425 629 : location_conf,
1426 629 : flush,
1427 629 : tenant::SpawnMode::Normal,
1428 629 : &ctx,
1429 629 : )
1430 1948 : .await?
1431 628 : .is_some();
1432 :
1433 628 : if let Some(_flush_ms) = flush {
1434 5 : match state
1435 5 : .secondary_controller
1436 5 : .upload_tenant(tenant_shard_id)
1437 5 : .await
1438 : {
1439 : Ok(()) => {
1440 5 : tracing::info!("Uploaded heatmap during flush");
1441 : }
1442 0 : Err(e) => {
1443 0 : tracing::warn!("Failed to flush heatmap: {e}");
1444 : }
1445 : }
1446 : } else {
1447 623 : tracing::info!("No flush requested when configuring");
1448 : }
1449 :
1450 : // This API returns a vector of pageservers where the tenant is attached: this is
1451 : // primarily for use in the sharding service. For compatibilty, we also return this
1452 : // when called directly on a pageserver, but the payload is always zero or one shards.
1453 628 : let mut response = TenantLocationConfigResponse { shards: Vec::new() };
1454 628 : if attached {
1455 594 : response.shards.push(TenantShardLocation {
1456 594 : shard_id: tenant_shard_id,
1457 594 : node_id: state.conf.id,
1458 594 : })
1459 34 : }
1460 :
1461 628 : json_response(StatusCode::OK, response)
1462 685 : }
1463 :
1464 6 : async fn list_location_config_handler(
1465 6 : request: Request<Body>,
1466 6 : _cancel: CancellationToken,
1467 6 : ) -> Result<Response<Body>, ApiError> {
1468 6 : let state = get_state(&request);
1469 6 : let slots = state.tenant_manager.list();
1470 6 : let result = LocationConfigListResponse {
1471 6 : tenant_shards: slots
1472 6 : .into_iter()
1473 6 : .map(|(tenant_shard_id, slot)| {
1474 5 : let v = match slot {
1475 5 : TenantSlot::Attached(t) => Some(t.get_location_conf()),
1476 0 : TenantSlot::Secondary(s) => Some(s.get_location_conf()),
1477 0 : TenantSlot::InProgress(_) => None,
1478 : };
1479 5 : (tenant_shard_id, v)
1480 6 : })
1481 6 : .collect(),
1482 6 : };
1483 6 : json_response(StatusCode::OK, result)
1484 6 : }
1485 :
1486 : // Do a time travel recovery on the given tenant/tenant shard. Tenant needs to be detached
1487 : // (from all pageservers) as it invalidates consistency assumptions.
1488 1 : async fn tenant_time_travel_remote_storage_handler(
1489 1 : request: Request<Body>,
1490 1 : cancel: CancellationToken,
1491 1 : ) -> Result<Response<Body>, ApiError> {
1492 1 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1493 :
1494 1 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1495 :
1496 1 : let timestamp_raw = must_get_query_param(&request, "travel_to")?;
1497 1 : let timestamp = humantime::parse_rfc3339(×tamp_raw)
1498 1 : .with_context(|| format!("Invalid time for travel_to: {timestamp_raw:?}"))
1499 1 : .map_err(ApiError::BadRequest)?;
1500 :
1501 1 : let done_if_after_raw = must_get_query_param(&request, "done_if_after")?;
1502 1 : let done_if_after = humantime::parse_rfc3339(&done_if_after_raw)
1503 1 : .with_context(|| format!("Invalid time for done_if_after: {done_if_after_raw:?}"))
1504 1 : .map_err(ApiError::BadRequest)?;
1505 :
1506 : // This is just a sanity check to fend off naive wrong usages of the API:
1507 : // the tenant needs to be detached *everywhere*
1508 1 : let state = get_state(&request);
1509 1 : let we_manage_tenant = state.tenant_manager.manages_tenant_shard(tenant_shard_id);
1510 1 : if we_manage_tenant {
1511 0 : return Err(ApiError::BadRequest(anyhow!(
1512 0 : "Tenant {tenant_shard_id} is already attached at this pageserver"
1513 0 : )));
1514 1 : }
1515 :
1516 1 : let Some(storage) = state.remote_storage.as_ref() else {
1517 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1518 0 : "remote storage not configured, cannot run time travel"
1519 0 : )));
1520 : };
1521 :
1522 1 : if timestamp > done_if_after {
1523 0 : return Err(ApiError::BadRequest(anyhow!(
1524 0 : "The done_if_after timestamp comes before the timestamp to recover to"
1525 0 : )));
1526 1 : }
1527 :
1528 1 : tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}");
1529 :
1530 1 : remote_timeline_client::upload::time_travel_recover_tenant(
1531 1 : storage,
1532 1 : &tenant_shard_id,
1533 1 : timestamp,
1534 1 : done_if_after,
1535 1 : &cancel,
1536 1 : )
1537 144 : .await
1538 1 : .map_err(|e| match e {
1539 0 : TimeTravelError::BadInput(e) => {
1540 0 : warn!("bad input error: {e}");
1541 0 : ApiError::BadRequest(anyhow!("bad input error"))
1542 : }
1543 : TimeTravelError::Unimplemented => {
1544 0 : ApiError::BadRequest(anyhow!("unimplemented for the configured remote storage"))
1545 : }
1546 0 : TimeTravelError::Cancelled => ApiError::InternalServerError(anyhow!("cancelled")),
1547 : TimeTravelError::TooManyVersions => {
1548 0 : ApiError::InternalServerError(anyhow!("too many versions in remote storage"))
1549 : }
1550 0 : TimeTravelError::Other(e) => {
1551 0 : warn!("internal error: {e}");
1552 0 : ApiError::InternalServerError(anyhow!("internal error"))
1553 : }
1554 1 : })?;
1555 :
1556 1 : json_response(StatusCode::OK, ())
1557 1 : }
1558 :
1559 : /// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
1560 2 : async fn handle_tenant_break(
1561 2 : r: Request<Body>,
1562 2 : _cancel: CancellationToken,
1563 2 : ) -> Result<Response<Body>, ApiError> {
1564 2 : let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
1565 :
1566 2 : let tenant = crate::tenant::mgr::get_tenant(tenant_shard_id, true)
1567 2 : .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
1568 :
1569 2 : tenant.set_broken("broken from test".to_owned()).await;
1570 :
1571 2 : json_response(StatusCode::OK, ())
1572 2 : }
1573 :
1574 : // Run GC immediately on given timeline.
1575 374 : async fn timeline_gc_handler(
1576 374 : mut request: Request<Body>,
1577 374 : cancel: CancellationToken,
1578 374 : ) -> Result<Response<Body>, ApiError> {
1579 374 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1580 374 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1581 374 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1582 :
1583 374 : let gc_req: TimelineGcRequest = json_request(&mut request).await?;
1584 :
1585 374 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1586 373 : let wait_task_done =
1587 374 : mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
1588 373 : let gc_result = wait_task_done
1589 373 : .await
1590 373 : .context("wait for gc task")
1591 373 : .map_err(ApiError::InternalServerError)?
1592 373 : .map_err(ApiError::InternalServerError)?;
1593 :
1594 372 : json_response(StatusCode::OK, gc_result)
1595 374 : }
1596 :
1597 : // Run compaction immediately on given timeline.
1598 125 : async fn timeline_compact_handler(
1599 125 : request: Request<Body>,
1600 125 : cancel: CancellationToken,
1601 125 : ) -> Result<Response<Body>, ApiError> {
1602 125 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1603 125 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1604 125 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1605 :
1606 125 : let mut flags = EnumSet::empty();
1607 125 : if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
1608 2 : flags |= CompactFlags::ForceRepartition;
1609 123 : }
1610 125 : async {
1611 125 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1612 125 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1613 125 : timeline
1614 125 : .compact(&cancel, flags, &ctx)
1615 19377 : .await
1616 125 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1617 125 : json_response(StatusCode::OK, ())
1618 125 : }
1619 125 : .instrument(info_span!("manual_compaction", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1620 19377 : .await
1621 125 : }
1622 :
1623 : // Run checkpoint immediately on given timeline.
1624 663 : async fn timeline_checkpoint_handler(
1625 663 : request: Request<Body>,
1626 663 : cancel: CancellationToken,
1627 663 : ) -> Result<Response<Body>, ApiError> {
1628 663 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1629 663 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1630 663 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1631 :
1632 663 : let mut flags = EnumSet::empty();
1633 663 : if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
1634 2 : flags |= CompactFlags::ForceRepartition;
1635 661 : }
1636 663 : async {
1637 663 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1638 663 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1639 663 : timeline
1640 663 : .freeze_and_flush()
1641 775 : .await
1642 663 : .map_err(ApiError::InternalServerError)?;
1643 663 : timeline
1644 663 : .compact(&cancel, flags, &ctx)
1645 403764 : .await
1646 662 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1647 :
1648 662 : json_response(StatusCode::OK, ())
1649 662 : }
1650 663 : .instrument(info_span!("manual_checkpoint", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1651 404539 : .await
1652 662 : }
1653 :
1654 2 : async fn timeline_download_remote_layers_handler_post(
1655 2 : mut request: Request<Body>,
1656 2 : _cancel: CancellationToken,
1657 2 : ) -> Result<Response<Body>, ApiError> {
1658 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1659 2 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1660 2 : let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?;
1661 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1662 :
1663 2 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1664 2 : match timeline.spawn_download_all_remote_layers(body).await {
1665 2 : Ok(st) => json_response(StatusCode::ACCEPTED, st),
1666 0 : Err(st) => json_response(StatusCode::CONFLICT, st),
1667 : }
1668 2 : }
1669 :
1670 49 : async fn timeline_download_remote_layers_handler_get(
1671 49 : request: Request<Body>,
1672 49 : _cancel: CancellationToken,
1673 49 : ) -> Result<Response<Body>, ApiError> {
1674 49 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1675 49 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1676 49 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1677 :
1678 49 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1679 49 : let info = timeline
1680 49 : .get_download_all_remote_layers_task_info()
1681 49 : .context("task never started since last pageserver process start")
1682 49 : .map_err(|e| ApiError::NotFound(e.into()))?;
1683 49 : json_response(StatusCode::OK, info)
1684 49 : }
1685 :
1686 20 : async fn deletion_queue_flush(
1687 20 : r: Request<Body>,
1688 20 : cancel: CancellationToken,
1689 20 : ) -> Result<Response<Body>, ApiError> {
1690 20 : let state = get_state(&r);
1691 20 :
1692 20 : if state.remote_storage.is_none() {
1693 : // Nothing to do if remote storage is disabled.
1694 0 : return json_response(StatusCode::OK, ());
1695 20 : }
1696 :
1697 20 : let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
1698 20 :
1699 20 : let flush = async {
1700 20 : if execute {
1701 36 : state.deletion_queue_client.flush_execute().await
1702 : } else {
1703 8 : state.deletion_queue_client.flush().await
1704 : }
1705 20 : }
1706 : // DeletionQueueError's only case is shutting down.
1707 20 : .map_err(|_| ApiError::ShuttingDown);
1708 20 :
1709 64 : tokio::select! {
1710 20 : res = flush => {
1711 20 : res.map(|()| json_response(StatusCode::OK, ()))?
1712 : }
1713 : _ = cancel.cancelled() => {
1714 : Err(ApiError::ShuttingDown)
1715 : }
1716 : }
1717 20 : }
1718 :
1719 : /// Try if `GetPage@Lsn` is successful, useful for manual debugging.
1720 0 : async fn getpage_at_lsn_handler(
1721 0 : request: Request<Body>,
1722 0 : _cancel: CancellationToken,
1723 0 : ) -> Result<Response<Body>, ApiError> {
1724 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1725 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1726 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1727 :
1728 : struct Key(crate::repository::Key);
1729 :
1730 : impl std::str::FromStr for Key {
1731 : type Err = anyhow::Error;
1732 :
1733 0 : fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
1734 0 : crate::repository::Key::from_hex(s).map(Key)
1735 0 : }
1736 : }
1737 :
1738 0 : let key: Key = parse_query_param(&request, "key")?
1739 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
1740 0 : let lsn: Lsn = parse_query_param(&request, "lsn")?
1741 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
1742 :
1743 0 : async {
1744 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1745 0 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1746 :
1747 0 : let page = timeline.get(key.0, lsn, &ctx).await?;
1748 :
1749 0 : Result::<_, ApiError>::Ok(
1750 0 : Response::builder()
1751 0 : .status(StatusCode::OK)
1752 0 : .header(header::CONTENT_TYPE, "application/octet-stream")
1753 0 : .body(hyper::Body::from(page))
1754 0 : .unwrap(),
1755 0 : )
1756 0 : }
1757 0 : .instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1758 0 : .await
1759 0 : }
1760 :
1761 0 : async fn timeline_collect_keyspace(
1762 0 : request: Request<Body>,
1763 0 : _cancel: CancellationToken,
1764 0 : ) -> Result<Response<Body>, ApiError> {
1765 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1766 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1767 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1768 :
1769 0 : let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
1770 :
1771 0 : async {
1772 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1773 0 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1774 0 : let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
1775 0 : let keys = timeline
1776 0 : .collect_keyspace(at_lsn, &ctx)
1777 0 : .await
1778 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1779 :
1780 0 : let res = pageserver_api::models::partitioning::Partitioning { keys, at_lsn };
1781 0 :
1782 0 : json_response(StatusCode::OK, res)
1783 0 : }
1784 0 : .instrument(info_span!("timeline_collect_keyspace", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1785 0 : .await
1786 0 : }
1787 :
1788 3224 : async fn active_timeline_of_active_tenant(
1789 3224 : tenant_shard_id: TenantShardId,
1790 3224 : timeline_id: TimelineId,
1791 3224 : ) -> Result<Arc<Timeline>, ApiError> {
1792 3224 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
1793 3224 : tenant
1794 3224 : .get_timeline(timeline_id, true)
1795 3224 : .map_err(|e| ApiError::NotFound(e.into()))
1796 3224 : }
1797 :
1798 0 : async fn always_panic_handler(
1799 0 : req: Request<Body>,
1800 0 : _cancel: CancellationToken,
1801 0 : ) -> Result<Response<Body>, ApiError> {
1802 0 : // Deliberately cause a panic to exercise the panic hook registered via std::panic::set_hook().
1803 0 : // For pageserver, the relevant panic hook is `tracing_panic_hook` , and the `sentry` crate's wrapper around it.
1804 0 : // Use catch_unwind to ensure that tokio nor hyper are distracted by our panic.
1805 0 : let query = req.uri().query();
1806 0 : let _ = std::panic::catch_unwind(|| {
1807 0 : panic!("unconditional panic for testing panic hook integration; request query: {query:?}")
1808 0 : });
1809 0 : json_response(StatusCode::NO_CONTENT, ())
1810 0 : }
1811 :
1812 13 : async fn disk_usage_eviction_run(
1813 13 : mut r: Request<Body>,
1814 13 : cancel: CancellationToken,
1815 13 : ) -> Result<Response<Body>, ApiError> {
1816 13 : check_permission(&r, None)?;
1817 :
1818 63 : #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
1819 : struct Config {
1820 : /// How many bytes to evict before reporting that pressure is relieved.
1821 : evict_bytes: u64,
1822 :
1823 : #[serde(default)]
1824 : eviction_order: crate::disk_usage_eviction_task::EvictionOrder,
1825 : }
1826 :
1827 61 : #[derive(Debug, Clone, Copy, serde::Serialize)]
1828 : struct Usage {
1829 : // remains unchanged after instantiation of the struct
1830 : evict_bytes: u64,
1831 : // updated by `add_available_bytes`
1832 : freed_bytes: u64,
1833 : }
1834 :
1835 : impl crate::disk_usage_eviction_task::Usage for Usage {
1836 253 : fn has_pressure(&self) -> bool {
1837 253 : self.evict_bytes > self.freed_bytes
1838 253 : }
1839 :
1840 460 : fn add_available_bytes(&mut self, bytes: u64) {
1841 460 : self.freed_bytes += bytes;
1842 460 : }
1843 : }
1844 :
1845 13 : let config = json_request::<Config>(&mut r).await?;
1846 :
1847 13 : let usage = Usage {
1848 13 : evict_bytes: config.evict_bytes,
1849 13 : freed_bytes: 0,
1850 13 : };
1851 13 :
1852 13 : let state = get_state(&r);
1853 :
1854 13 : let Some(storage) = state.remote_storage.as_ref() else {
1855 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1856 0 : "remote storage not configured, cannot run eviction iteration"
1857 0 : )));
1858 : };
1859 :
1860 13 : let eviction_state = state.disk_usage_eviction_state.clone();
1861 :
1862 13 : let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
1863 13 : &eviction_state,
1864 13 : storage,
1865 13 : usage,
1866 13 : &state.tenant_manager,
1867 13 : config.eviction_order,
1868 13 : &cancel,
1869 13 : )
1870 326 : .await;
1871 :
1872 13 : info!(?res, "disk_usage_eviction_task_iteration_impl finished");
1873 :
1874 13 : let res = res.map_err(ApiError::InternalServerError)?;
1875 :
1876 13 : json_response(StatusCode::OK, res)
1877 13 : }
1878 :
1879 7 : async fn secondary_upload_handler(
1880 7 : request: Request<Body>,
1881 7 : _cancel: CancellationToken,
1882 7 : ) -> Result<Response<Body>, ApiError> {
1883 7 : let state = get_state(&request);
1884 7 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1885 7 : state
1886 7 : .secondary_controller
1887 7 : .upload_tenant(tenant_shard_id)
1888 7 : .await
1889 7 : .map_err(ApiError::InternalServerError)?;
1890 :
1891 7 : json_response(StatusCode::OK, ())
1892 7 : }
1893 :
1894 6 : async fn secondary_download_handler(
1895 6 : request: Request<Body>,
1896 6 : _cancel: CancellationToken,
1897 6 : ) -> Result<Response<Body>, ApiError> {
1898 6 : let state = get_state(&request);
1899 6 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1900 6 : state
1901 6 : .secondary_controller
1902 6 : .download_tenant(tenant_shard_id)
1903 6 : .await
1904 6 : .map_err(ApiError::InternalServerError)?;
1905 :
1906 6 : json_response(StatusCode::OK, ())
1907 6 : }
1908 :
1909 0 : async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
1910 0 : json_response(
1911 0 : StatusCode::NOT_FOUND,
1912 0 : HttpErrorBody::from_msg("page not found".to_owned()),
1913 0 : )
1914 0 : }
1915 :
1916 0 : async fn post_tracing_event_handler(
1917 0 : mut r: Request<Body>,
1918 0 : _cancel: CancellationToken,
1919 0 : ) -> Result<Response<Body>, ApiError> {
1920 0 : #[derive(Debug, serde::Deserialize)]
1921 : #[serde(rename_all = "lowercase")]
1922 : enum Level {
1923 : Error,
1924 : Warn,
1925 : Info,
1926 : Debug,
1927 : Trace,
1928 : }
1929 0 : #[derive(Debug, serde::Deserialize)]
1930 : struct Request {
1931 : level: Level,
1932 : message: String,
1933 : }
1934 0 : let body: Request = json_request(&mut r)
1935 0 : .await
1936 0 : .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
1937 :
1938 0 : match body.level {
1939 0 : Level::Error => tracing::error!(?body.message),
1940 0 : Level::Warn => tracing::warn!(?body.message),
1941 0 : Level::Info => tracing::info!(?body.message),
1942 0 : Level::Debug => tracing::debug!(?body.message),
1943 0 : Level::Trace => tracing::trace!(?body.message),
1944 : }
1945 :
1946 0 : json_response(StatusCode::OK, ())
1947 0 : }
1948 :
1949 0 : async fn put_io_engine_handler(
1950 0 : mut r: Request<Body>,
1951 0 : _cancel: CancellationToken,
1952 0 : ) -> Result<Response<Body>, ApiError> {
1953 0 : let kind: crate::virtual_file::IoEngineKind = json_request(&mut r).await?;
1954 0 : crate::virtual_file::io_engine::set(kind);
1955 0 : json_response(StatusCode::OK, ())
1956 0 : }
1957 :
1958 : /// Common functionality of all the HTTP API handlers.
1959 : ///
1960 : /// - Adds a tracing span to each request (by `request_span`)
1961 : /// - Logs the request depending on the request method (by `request_span`)
1962 : /// - Logs the response if it was not successful (by `request_span`
1963 : /// - Shields the handler function from async cancellations. Hyper can drop the handler
1964 : /// Future if the connection to the client is lost, but most of the pageserver code is
1965 : /// not async cancellation safe. This converts the dropped future into a graceful cancellation
1966 : /// request with a CancellationToken.
1967 9570 : async fn api_handler<R, H>(request: Request<Body>, handler: H) -> Result<Response<Body>, ApiError>
1968 9570 : where
1969 9570 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1970 9570 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
1971 9570 : {
1972 9570 : // Spawn a new task to handle the request, to protect the handler from unexpected
1973 9570 : // async cancellations. Most pageserver functions are not async cancellation safe.
1974 9570 : // We arm a drop-guard, so that if Hyper drops the Future, we signal the task
1975 9570 : // with the cancellation token.
1976 9570 : let token = CancellationToken::new();
1977 9570 : let cancel_guard = token.clone().drop_guard();
1978 9570 : let result = request_span(request, move |r| async {
1979 9570 : let handle = tokio::spawn(
1980 9570 : async {
1981 9570 : let token_cloned = token.clone();
1982 8118448 : let result = handler(r, token).await;
1983 9563 : if token_cloned.is_cancelled() {
1984 : // dropguard has executed: we will never turn this result into response.
1985 : //
1986 : // at least temporarily do {:?} logging; these failures are rare enough but
1987 : // could hide difficult errors.
1988 2 : match &result {
1989 2 : Ok(response) => {
1990 2 : let status = response.status();
1991 2 : info!(%status, "Cancelled request finished successfully")
1992 : }
1993 0 : Err(e) => error!("Cancelled request finished with an error: {e:?}"),
1994 : }
1995 9561 : }
1996 : // only logging for cancelled panicked request handlers is the tracing_panic_hook,
1997 : // which should suffice.
1998 : //
1999 : // there is still a chance to lose the result due to race between
2000 : // returning from here and the actual connection closing happening
2001 : // before outer task gets to execute. leaving that up for #5815.
2002 9563 : result
2003 9570 : }
2004 9570 : .in_current_span(),
2005 9570 : );
2006 9570 :
2007 9658 : match handle.await {
2008 : // TODO: never actually return Err from here, always Ok(...) so that we can log
2009 : // spanned errors. Call api_error_handler instead and return appropriate Body.
2010 9561 : Ok(result) => result,
2011 0 : Err(e) => {
2012 0 : // The handler task panicked. We have a global panic handler that logs the
2013 0 : // panic with its backtrace, so no need to log that here. Only log a brief
2014 0 : // message to make it clear that we returned the error to the client.
2015 0 : error!("HTTP request handler task panicked: {e:#}");
2016 :
2017 : // Don't return an Error here, because then fallback error handler that was
2018 : // installed in make_router() will print the error. Instead, construct the
2019 : // HTTP error response and return that.
2020 0 : Ok(
2021 0 : ApiError::InternalServerError(anyhow!("HTTP request handler task panicked"))
2022 0 : .into_response(),
2023 0 : )
2024 : }
2025 : }
2026 19131 : })
2027 9658 : .await;
2028 :
2029 9561 : cancel_guard.disarm();
2030 9561 :
2031 9561 : result
2032 9561 : }
2033 :
2034 : /// Like api_handler, but returns an error response if the server is built without
2035 : /// the 'testing' feature.
2036 969 : async fn testing_api_handler<R, H>(
2037 969 : desc: &str,
2038 969 : request: Request<Body>,
2039 969 : handler: H,
2040 969 : ) -> Result<Response<Body>, ApiError>
2041 969 : where
2042 969 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
2043 969 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
2044 969 : {
2045 969 : if cfg!(feature = "testing") {
2046 969 : api_handler(request, handler).await
2047 : } else {
2048 0 : std::future::ready(Err(ApiError::BadRequest(anyhow!(
2049 0 : "Cannot {desc} because pageserver was compiled without testing APIs",
2050 0 : ))))
2051 0 : .await
2052 : }
2053 968 : }
2054 :
2055 624 : pub fn make_router(
2056 624 : state: Arc<State>,
2057 624 : launch_ts: &'static LaunchTimestamp,
2058 624 : auth: Option<Arc<SwappableJwtAuth>>,
2059 624 : ) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
2060 624 : let spec = include_bytes!("openapi_spec.yml");
2061 624 : let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
2062 624 : if auth.is_some() {
2063 118 : router = router.middleware(auth_middleware(|request| {
2064 118 : let state = get_state(request);
2065 118 : if state.allowlist_routes.contains(request.uri()) {
2066 34 : None
2067 : } else {
2068 84 : state.auth.as_deref()
2069 : }
2070 118 : }))
2071 613 : }
2072 :
2073 624 : router = router.middleware(
2074 624 : endpoint::add_response_header_middleware(
2075 624 : "PAGESERVER_LAUNCH_TIMESTAMP",
2076 624 : &launch_ts.to_string(),
2077 624 : )
2078 624 : .expect("construct launch timestamp header middleware"),
2079 624 : );
2080 624 :
2081 624 : Ok(router
2082 624 : .data(state)
2083 630 : .get("/v1/status", |r| api_handler(r, status_handler))
2084 624 : .put("/v1/failpoints", |r| {
2085 179 : testing_api_handler("manage failpoints", r, failpoints_handler)
2086 624 : })
2087 624 : .post("/v1/reload_auth_validation_keys", |r| {
2088 6 : api_handler(r, reload_auth_validation_keys_handler)
2089 624 : })
2090 624 : .get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
2091 624 : .post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
2092 624 : .get("/v1/tenant/:tenant_shard_id", |r| {
2093 542 : api_handler(r, tenant_status)
2094 624 : })
2095 624 : .delete("/v1/tenant/:tenant_shard_id", |r| {
2096 121 : api_handler(r, tenant_delete_handler)
2097 624 : })
2098 624 : .get("/v1/tenant/:tenant_shard_id/synthetic_size", |r| {
2099 55 : api_handler(r, tenant_size_handler)
2100 624 : })
2101 624 : .put("/v1/tenant/config", |r| {
2102 30 : api_handler(r, update_tenant_config_handler)
2103 624 : })
2104 624 : .put("/v1/tenant/:tenant_shard_id/shard_split", |r| {
2105 5 : api_handler(r, tenant_shard_split_handler)
2106 624 : })
2107 624 : .get("/v1/tenant/:tenant_shard_id/config", |r| {
2108 45 : api_handler(r, get_tenant_config_handler)
2109 624 : })
2110 685 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
2111 685 : api_handler(r, put_tenant_location_config_handler)
2112 685 : })
2113 624 : .get("/v1/location_config", |r| {
2114 6 : api_handler(r, list_location_config_handler)
2115 624 : })
2116 624 : .put(
2117 624 : "/v1/tenant/:tenant_shard_id/time_travel_remote_storage",
2118 624 : |r| api_handler(r, tenant_time_travel_remote_storage_handler),
2119 624 : )
2120 624 : .get("/v1/tenant/:tenant_shard_id/timeline", |r| {
2121 64 : api_handler(r, timeline_list_handler)
2122 624 : })
2123 900 : .post("/v1/tenant/:tenant_shard_id/timeline", |r| {
2124 900 : api_handler(r, timeline_create_handler)
2125 900 : })
2126 624 : .post("/v1/tenant/:tenant_id/attach", |r| {
2127 41 : api_handler(r, tenant_attach_handler)
2128 624 : })
2129 624 : .post("/v1/tenant/:tenant_id/detach", |r| {
2130 38 : api_handler(r, tenant_detach_handler)
2131 624 : })
2132 624 : .post("/v1/tenant/:tenant_shard_id/reset", |r| {
2133 2 : api_handler(r, tenant_reset_handler)
2134 624 : })
2135 624 : .post("/v1/tenant/:tenant_id/load", |r| {
2136 4 : api_handler(r, tenant_load_handler)
2137 624 : })
2138 624 : .post("/v1/tenant/:tenant_id/ignore", |r| {
2139 5 : api_handler(r, tenant_ignore_handler)
2140 624 : })
2141 624 : .post(
2142 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/preserve_initdb_archive",
2143 624 : |r| api_handler(r, timeline_preserve_initdb_handler),
2144 624 : )
2145 2179 : .get("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
2146 2179 : api_handler(r, timeline_detail_handler)
2147 2179 : })
2148 624 : .get(
2149 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_lsn_by_timestamp",
2150 624 : |r| api_handler(r, get_lsn_by_timestamp_handler),
2151 624 : )
2152 624 : .get(
2153 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_timestamp_of_lsn",
2154 624 : |r| api_handler(r, get_timestamp_of_lsn_handler),
2155 624 : )
2156 624 : .put(
2157 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
2158 624 : |r| api_handler(r, timeline_gc_handler),
2159 624 : )
2160 624 : .put(
2161 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
2162 624 : |r| testing_api_handler("run timeline compaction", r, timeline_compact_handler),
2163 624 : )
2164 624 : .put(
2165 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",
2166 663 : |r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler),
2167 624 : )
2168 624 : .post(
2169 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
2170 624 : |r| api_handler(r, timeline_download_remote_layers_handler_post),
2171 624 : )
2172 624 : .get(
2173 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
2174 624 : |r| api_handler(r, timeline_download_remote_layers_handler_get),
2175 624 : )
2176 624 : .delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
2177 65 : api_handler(r, timeline_delete_handler)
2178 624 : })
2179 624 : .get(
2180 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer",
2181 624 : |r| api_handler(r, layer_map_info_handler),
2182 624 : )
2183 624 : .get(
2184 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
2185 624 : |r| api_handler(r, layer_download_handler),
2186 624 : )
2187 624 : .delete(
2188 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
2189 2242 : |r| api_handler(r, evict_timeline_layer_handler),
2190 624 : )
2191 624 : .post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
2192 7 : api_handler(r, secondary_upload_handler)
2193 624 : })
2194 624 : .put("/v1/disk_usage_eviction/run", |r| {
2195 13 : api_handler(r, disk_usage_eviction_run)
2196 624 : })
2197 624 : .put("/v1/deletion_queue/flush", |r| {
2198 20 : api_handler(r, deletion_queue_flush)
2199 624 : })
2200 624 : .post("/v1/tenant/:tenant_shard_id/secondary/download", |r| {
2201 6 : api_handler(r, secondary_download_handler)
2202 624 : })
2203 624 : .put("/v1/tenant/:tenant_shard_id/break", |r| {
2204 2 : testing_api_handler("set tenant state to broken", r, handle_tenant_break)
2205 624 : })
2206 624 : .get("/v1/panic", |r| api_handler(r, always_panic_handler))
2207 624 : .post("/v1/tracing/event", |r| {
2208 0 : testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
2209 624 : })
2210 624 : .get(
2211 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage",
2212 624 : |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
2213 624 : )
2214 624 : .get(
2215 624 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
2216 624 : |r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
2217 624 : )
2218 624 : .put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
2219 624 : .any(handler_404))
2220 624 : }
|