Line data Source code
1 : //!
2 : //! Management HTTP API
3 : //!
4 : use std::collections::HashMap;
5 : use std::str::FromStr;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use anyhow::{anyhow, Context, Result};
10 : use enumset::EnumSet;
11 : use futures::TryFutureExt;
12 : use humantime::format_rfc3339;
13 : use hyper::header;
14 : use hyper::StatusCode;
15 : use hyper::{Body, Request, Response, Uri};
16 : use metrics::launch_timestamp::LaunchTimestamp;
17 : use pageserver_api::models::LocationConfigListResponse;
18 : use pageserver_api::models::ShardParameters;
19 : use pageserver_api::models::TenantDetails;
20 : use pageserver_api::models::TenantLocationConfigResponse;
21 : use pageserver_api::models::TenantShardLocation;
22 : use pageserver_api::models::TenantState;
23 : use pageserver_api::models::{
24 : DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
25 : TenantLoadRequest, TenantLocationConfigRequest,
26 : };
27 : use pageserver_api::shard::TenantShardId;
28 : use remote_storage::GenericRemoteStorage;
29 : use remote_storage::TimeTravelError;
30 : use tenant_size_model::{SizeResult, StorageModel};
31 : use tokio_util::sync::CancellationToken;
32 : use tracing::*;
33 : use utils::auth::JwtAuth;
34 : use utils::failpoint_support::failpoints_handler;
35 : use utils::http::endpoint::request_span;
36 : use utils::http::json::json_request_or_empty_body;
37 : use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
38 :
39 : use crate::context::{DownloadBehavior, RequestContext};
40 : use crate::deletion_queue::DeletionQueueClient;
41 : use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
42 : use crate::pgdatadir_mapping::LsnForTimestamp;
43 : use crate::task_mgr::TaskKind;
44 : use crate::tenant::config::{LocationConf, TenantConfOpt};
45 : use crate::tenant::mgr::GetActiveTenantError;
46 : use crate::tenant::mgr::{
47 : GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
48 : TenantSlotError, TenantSlotUpsertError, TenantStateError,
49 : };
50 : use crate::tenant::mgr::{TenantSlot, UpsertLocationError};
51 : use crate::tenant::remote_timeline_client;
52 : use crate::tenant::secondary::SecondaryController;
53 : use crate::tenant::size::ModelInputs;
54 : use crate::tenant::storage_layer::LayerAccessStatsReset;
55 : use crate::tenant::timeline::CompactFlags;
56 : use crate::tenant::timeline::Timeline;
57 : use crate::tenant::SpawnMode;
58 : use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
59 : use crate::{config::PageServerConf, tenant::mgr};
60 : use crate::{disk_usage_eviction_task, tenant};
61 : use pageserver_api::models::{
62 : StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
63 : TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
64 : };
65 : use utils::{
66 : auth::SwappableJwtAuth,
67 : generation::Generation,
68 : http::{
69 : endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
70 : error::{ApiError, HttpErrorBody},
71 : json::{json_request, json_response},
72 : request::parse_request_param,
73 : RequestExt, RouterBuilder,
74 : },
75 : id::{TenantId, TimelineId},
76 : lsn::Lsn,
77 : };
78 :
79 : // For APIs that require an Active tenant, how long should we block waiting for that state?
80 : // This is not functionally necessary (clients will retry), but avoids generating a lot of
81 : // failed API calls while tenants are activating.
82 : #[cfg(not(feature = "testing"))]
83 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
84 :
85 : // Tests run on slow/oversubscribed nodes, and may need to wait much longer for tenants to
86 : // finish attaching, if calls to remote storage are slow.
87 : #[cfg(feature = "testing")]
88 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
89 :
90 : pub struct State {
91 : conf: &'static PageServerConf,
92 : tenant_manager: Arc<TenantManager>,
93 : auth: Option<Arc<SwappableJwtAuth>>,
94 : allowlist_routes: Vec<Uri>,
95 : remote_storage: Option<GenericRemoteStorage>,
96 : broker_client: storage_broker::BrokerClientChannel,
97 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
98 : deletion_queue_client: DeletionQueueClient,
99 : secondary_controller: SecondaryController,
100 : }
101 :
102 : impl State {
103 : #[allow(clippy::too_many_arguments)]
104 604 : pub fn new(
105 604 : conf: &'static PageServerConf,
106 604 : tenant_manager: Arc<TenantManager>,
107 604 : auth: Option<Arc<SwappableJwtAuth>>,
108 604 : remote_storage: Option<GenericRemoteStorage>,
109 604 : broker_client: storage_broker::BrokerClientChannel,
110 604 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
111 604 : deletion_queue_client: DeletionQueueClient,
112 604 : secondary_controller: SecondaryController,
113 604 : ) -> anyhow::Result<Self> {
114 604 : let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
115 604 : .iter()
116 2416 : .map(|v| v.parse().unwrap())
117 604 : .collect::<Vec<_>>();
118 604 : Ok(Self {
119 604 : conf,
120 604 : tenant_manager,
121 604 : auth,
122 604 : allowlist_routes,
123 604 : remote_storage,
124 604 : broker_client,
125 604 : disk_usage_eviction_state,
126 604 : deletion_queue_client,
127 604 : secondary_controller,
128 604 : })
129 604 : }
130 : }
131 :
132 : #[inline(always)]
133 2778 : fn get_state(request: &Request<Body>) -> &State {
134 2778 : request
135 2778 : .data::<Arc<State>>()
136 2778 : .expect("unknown state type")
137 2778 : .as_ref()
138 2778 : }
139 :
140 : #[inline(always)]
141 616 : fn get_config(request: &Request<Body>) -> &'static PageServerConf {
142 616 : get_state(request).conf
143 616 : }
144 :
145 : /// Check that the requester is authorized to operate on given tenant
146 8957 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
147 8957 : check_permission_with(request, |claims| {
148 83 : crate::auth::check_permission(claims, tenant_id)
149 8957 : })
150 8957 : }
151 :
152 : impl From<PageReconstructError> for ApiError {
153 2 : fn from(pre: PageReconstructError) -> ApiError {
154 2 : match pre {
155 2 : PageReconstructError::Other(pre) => ApiError::InternalServerError(pre),
156 : PageReconstructError::Cancelled => {
157 0 : ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
158 : }
159 : PageReconstructError::AncestorStopping(_) => {
160 0 : ApiError::ResourceUnavailable(format!("{pre}").into())
161 : }
162 0 : PageReconstructError::AncestorLsnTimeout(e) => ApiError::Timeout(format!("{e}").into()),
163 0 : PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
164 : }
165 2 : }
166 : }
167 :
168 : impl From<TenantMapInsertError> for ApiError {
169 1 : fn from(tmie: TenantMapInsertError) -> ApiError {
170 1 : match tmie {
171 1 : TenantMapInsertError::SlotError(e) => e.into(),
172 0 : TenantMapInsertError::SlotUpsertError(e) => e.into(),
173 0 : TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
174 : }
175 1 : }
176 : }
177 :
178 : impl From<TenantSlotError> for ApiError {
179 20 : fn from(e: TenantSlotError) -> ApiError {
180 20 : use TenantSlotError::*;
181 20 : match e {
182 17 : NotFound(tenant_id) => {
183 17 : ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
184 : }
185 1 : e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")),
186 : InProgress => {
187 2 : ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
188 : }
189 0 : MapState(e) => e.into(),
190 : }
191 20 : }
192 : }
193 :
194 : impl From<TenantSlotUpsertError> for ApiError {
195 0 : fn from(e: TenantSlotUpsertError) -> ApiError {
196 0 : use TenantSlotUpsertError::*;
197 0 : match e {
198 0 : InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")),
199 0 : MapState(e) => e.into(),
200 0 : ShuttingDown(_) => ApiError::ShuttingDown,
201 : }
202 0 : }
203 : }
204 :
205 : impl From<UpsertLocationError> for ApiError {
206 76 : fn from(e: UpsertLocationError) -> ApiError {
207 76 : use UpsertLocationError::*;
208 76 : match e {
209 32 : BadRequest(e) => ApiError::BadRequest(e),
210 25 : Unavailable(_) => ApiError::ShuttingDown,
211 18 : e @ InProgress => ApiError::Conflict(format!("{e}")),
212 1 : Flush(e) | Other(e) => ApiError::InternalServerError(e),
213 : }
214 76 : }
215 : }
216 :
217 : impl From<TenantMapError> for ApiError {
218 0 : fn from(e: TenantMapError) -> ApiError {
219 0 : use TenantMapError::*;
220 0 : match e {
221 : StillInitializing | ShuttingDown => {
222 0 : ApiError::ResourceUnavailable(format!("{e}").into())
223 0 : }
224 0 : }
225 0 : }
226 : }
227 :
228 : impl From<TenantStateError> for ApiError {
229 3 : fn from(tse: TenantStateError) -> ApiError {
230 3 : match tse {
231 : TenantStateError::IsStopping(_) => {
232 0 : ApiError::ResourceUnavailable("Tenant is stopping".into())
233 : }
234 3 : TenantStateError::SlotError(e) => e.into(),
235 0 : TenantStateError::SlotUpsertError(e) => e.into(),
236 0 : TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)),
237 : }
238 3 : }
239 : }
240 :
241 : impl From<GetTenantError> for ApiError {
242 75 : fn from(tse: GetTenantError) -> ApiError {
243 75 : match tse {
244 68 : GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
245 0 : GetTenantError::Broken(reason) => {
246 0 : ApiError::InternalServerError(anyhow!("tenant is broken: {}", reason))
247 : }
248 : GetTenantError::NotActive(_) => {
249 : // Why is this not `ApiError::NotFound`?
250 : // Because we must be careful to never return 404 for a tenant if it does
251 : // in fact exist locally. If we did, the caller could draw the conclusion
252 : // that it can attach the tenant to another PS and we'd be in split-brain.
253 : //
254 : // (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
255 7 : ApiError::ResourceUnavailable("Tenant not yet active".into())
256 : }
257 0 : GetTenantError::MapState(e) => ApiError::ResourceUnavailable(format!("{e}").into()),
258 : }
259 75 : }
260 : }
261 :
262 : impl From<GetActiveTenantError> for ApiError {
263 5 : fn from(e: GetActiveTenantError) -> ApiError {
264 5 : match e {
265 1 : GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)),
266 4 : GetActiveTenantError::Cancelled => ApiError::ShuttingDown,
267 0 : GetActiveTenantError::NotFound(gte) => gte.into(),
268 : GetActiveTenantError::WaitForActiveTimeout { .. } => {
269 0 : ApiError::ResourceUnavailable(format!("{}", e).into())
270 : }
271 : }
272 5 : }
273 : }
274 :
275 : impl From<SetNewTenantConfigError> for ApiError {
276 0 : fn from(e: SetNewTenantConfigError) -> ApiError {
277 0 : match e {
278 0 : SetNewTenantConfigError::GetTenant(tid) => {
279 0 : ApiError::NotFound(anyhow!("tenant {}", tid).into())
280 : }
281 0 : e @ (SetNewTenantConfigError::Persist(_) | SetNewTenantConfigError::Other(_)) => {
282 0 : ApiError::InternalServerError(anyhow::Error::new(e))
283 : }
284 : }
285 0 : }
286 : }
287 :
288 : impl From<crate::tenant::DeleteTimelineError> for ApiError {
289 11 : fn from(value: crate::tenant::DeleteTimelineError) -> Self {
290 11 : use crate::tenant::DeleteTimelineError::*;
291 11 : match value {
292 3 : NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
293 1 : HasChildren(children) => ApiError::PreconditionFailed(
294 1 : format!("Cannot delete timeline which has child timelines: {children:?}")
295 1 : .into_boxed_str(),
296 1 : ),
297 3 : a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
298 4 : Other(e) => ApiError::InternalServerError(e),
299 : }
300 11 : }
301 : }
302 :
303 : impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
304 0 : fn from(value: crate::tenant::mgr::DeleteTimelineError) -> Self {
305 : use crate::tenant::mgr::DeleteTimelineError::*;
306 0 : match value {
307 : // Report Precondition failed so client can distinguish between
308 : // "tenant is missing" case from "timeline is missing"
309 0 : Tenant(GetTenantError::NotFound(..)) => ApiError::PreconditionFailed(
310 0 : "Requested tenant is missing".to_owned().into_boxed_str(),
311 0 : ),
312 0 : Tenant(t) => ApiError::from(t),
313 0 : Timeline(t) => ApiError::from(t),
314 : }
315 0 : }
316 : }
317 :
318 : impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
319 34 : fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
320 34 : use crate::tenant::delete::DeleteTenantError::*;
321 34 : match value {
322 0 : Get(g) => ApiError::from(g),
323 0 : e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
324 0 : Timeline(t) => ApiError::from(t),
325 0 : NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
326 16 : SlotError(e) => e.into(),
327 0 : SlotUpsertError(e) => e.into(),
328 16 : Other(o) => ApiError::InternalServerError(o),
329 2 : e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
330 0 : Cancelled => ApiError::ShuttingDown,
331 : }
332 34 : }
333 : }
334 :
335 : // Helper function to construct a TimelineInfo struct for a timeline
336 2176 : async fn build_timeline_info(
337 2176 : timeline: &Arc<Timeline>,
338 2176 : include_non_incremental_logical_size: bool,
339 2176 : force_await_initial_logical_size: bool,
340 2176 : ctx: &RequestContext,
341 2176 : ) -> anyhow::Result<TimelineInfo> {
342 2176 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
343 2176 :
344 2176 : if force_await_initial_logical_size {
345 11 : timeline.clone().await_initial_logical_size().await
346 2165 : }
347 :
348 2176 : let mut info = build_timeline_info_common(
349 2176 : timeline,
350 2176 : ctx,
351 2176 : tenant::timeline::GetLogicalSizePriority::Background,
352 2176 : )
353 11 : .await?;
354 2176 : if include_non_incremental_logical_size {
355 : // XXX we should be using spawn_ondemand_logical_size_calculation here.
356 : // Otherwise, if someone deletes the timeline / detaches the tenant while
357 : // we're executing this function, we will outlive the timeline on-disk state.
358 : info.current_logical_size_non_incremental = Some(
359 37 : timeline
360 37 : .get_current_logical_size_non_incremental(info.last_record_lsn, ctx)
361 2798 : .await?,
362 : );
363 2139 : }
364 2176 : Ok(info)
365 2176 : }
366 :
367 3042 : async fn build_timeline_info_common(
368 3042 : timeline: &Arc<Timeline>,
369 3042 : ctx: &RequestContext,
370 3042 : logical_size_task_priority: tenant::timeline::GetLogicalSizePriority,
371 3042 : ) -> anyhow::Result<TimelineInfo> {
372 3042 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
373 3042 : let initdb_lsn = timeline.initdb_lsn;
374 3042 : let last_record_lsn = timeline.get_last_record_lsn();
375 3042 : let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
376 3042 : let guard = timeline.last_received_wal.lock().unwrap();
377 3042 : if let Some(info) = guard.as_ref() {
378 1829 : (
379 1829 : Some(format!("{:?}", info.wal_source_connconf)), // Password is hidden, but it's for statistics only.
380 1829 : Some(info.last_received_msg_lsn),
381 1829 : Some(info.last_received_msg_ts),
382 1829 : )
383 : } else {
384 1213 : (None, None, None)
385 : }
386 : };
387 :
388 3042 : let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
389 3042 : let ancestor_lsn = match timeline.get_ancestor_lsn() {
390 2305 : Lsn(0) => None,
391 737 : lsn @ Lsn(_) => Some(lsn),
392 : };
393 3042 : let current_logical_size = timeline.get_current_logical_size(logical_size_task_priority, ctx);
394 3042 : let current_physical_size = Some(timeline.layer_size_sum().await);
395 3042 : let state = timeline.current_state();
396 3042 : let remote_consistent_lsn_projected = timeline
397 3042 : .get_remote_consistent_lsn_projected()
398 3042 : .unwrap_or(Lsn(0));
399 3042 : let remote_consistent_lsn_visible = timeline
400 3042 : .get_remote_consistent_lsn_visible()
401 3042 : .unwrap_or(Lsn(0));
402 3042 :
403 3042 : let walreceiver_status = timeline.walreceiver_status();
404 :
405 3042 : let info = TimelineInfo {
406 3042 : tenant_id: timeline.tenant_shard_id,
407 3042 : timeline_id: timeline.timeline_id,
408 3042 : ancestor_timeline_id,
409 3042 : ancestor_lsn,
410 3042 : disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
411 3042 : remote_consistent_lsn: remote_consistent_lsn_projected,
412 3042 : remote_consistent_lsn_visible,
413 3042 : initdb_lsn,
414 3042 : last_record_lsn,
415 3042 : prev_record_lsn: Some(timeline.get_prev_record_lsn()),
416 3042 : latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
417 3042 : current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
418 3042 : current_logical_size_is_accurate: match current_logical_size.accuracy() {
419 504 : tenant::timeline::logical_size::Accuracy::Approximate => false,
420 2538 : tenant::timeline::logical_size::Accuracy::Exact => true,
421 : },
422 3042 : current_physical_size,
423 3042 : current_logical_size_non_incremental: None,
424 3042 : timeline_dir_layer_file_size_sum: None,
425 3042 : wal_source_connstr,
426 3042 : last_received_msg_lsn,
427 3042 : last_received_msg_ts,
428 3042 : pg_version: timeline.pg_version,
429 3042 :
430 3042 : state,
431 3042 :
432 3042 : walreceiver_status,
433 3042 : };
434 3042 : Ok(info)
435 3042 : }
436 :
437 : // healthcheck handler
438 610 : async fn status_handler(
439 610 : request: Request<Body>,
440 610 : _cancel: CancellationToken,
441 610 : ) -> Result<Response<Body>, ApiError> {
442 610 : check_permission(&request, None)?;
443 610 : let config = get_config(&request);
444 610 : json_response(StatusCode::OK, StatusResponse { id: config.id })
445 610 : }
446 :
447 6 : async fn reload_auth_validation_keys_handler(
448 6 : request: Request<Body>,
449 6 : _cancel: CancellationToken,
450 6 : ) -> Result<Response<Body>, ApiError> {
451 6 : check_permission(&request, None)?;
452 6 : let config = get_config(&request);
453 6 : let state = get_state(&request);
454 6 : let Some(shared_auth) = &state.auth else {
455 0 : return json_response(StatusCode::BAD_REQUEST, ());
456 : };
457 : // unwrap is ok because check is performed when creating config, so path is set and exists
458 6 : let key_path = config.auth_validation_public_key_path.as_ref().unwrap();
459 6 : info!("Reloading public key(s) for verifying JWT tokens from {key_path:?}");
460 :
461 6 : match JwtAuth::from_key_path(key_path) {
462 6 : Ok(new_auth) => {
463 6 : shared_auth.swap(new_auth);
464 6 : json_response(StatusCode::OK, ())
465 : }
466 0 : Err(e) => {
467 0 : warn!("Error reloading public keys from {key_path:?}: {e:}");
468 0 : json_response(StatusCode::INTERNAL_SERVER_ERROR, ())
469 : }
470 : }
471 6 : }
472 :
473 897 : async fn timeline_create_handler(
474 897 : mut request: Request<Body>,
475 897 : _cancel: CancellationToken,
476 897 : ) -> Result<Response<Body>, ApiError> {
477 897 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
478 897 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
479 897 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
480 :
481 896 : let new_timeline_id = request_data.new_timeline_id;
482 896 :
483 896 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
484 896 :
485 896 : let state = get_state(&request);
486 :
487 896 : async {
488 896 : let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, false)?;
489 :
490 893 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
491 :
492 893 : if let Some(ancestor_id) = request_data.ancestor_timeline_id.as_ref() {
493 270 : tracing::info!(%ancestor_id, "starting to branch");
494 : } else {
495 623 : tracing::info!("bootstrapping");
496 : }
497 :
498 893 : match tenant.create_timeline(
499 893 : new_timeline_id,
500 893 : request_data.ancestor_timeline_id.map(TimelineId::from),
501 893 : request_data.ancestor_start_lsn,
502 893 : request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
503 893 : request_data.existing_initdb_timeline_id,
504 893 : state.broker_client.clone(),
505 893 : &ctx,
506 893 : )
507 7640938 : .await {
508 866 : Ok(new_timeline) => {
509 : // Created. Construct a TimelineInfo for it.
510 866 : let timeline_info = build_timeline_info_common(&new_timeline, &ctx, tenant::timeline::GetLogicalSizePriority::User)
511 0 : .await
512 866 : .map_err(ApiError::InternalServerError)?;
513 866 : json_response(StatusCode::CREATED, timeline_info)
514 : }
515 21 : Err(_) if tenant.cancel.is_cancelled() => {
516 3 : // In case we get some ugly error type during shutdown, cast it into a clean 503.
517 3 : json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg("Tenant shutting down".to_string()))
518 : }
519 : Err(tenant::CreateTimelineError::Conflict | tenant::CreateTimelineError::AlreadyCreating) => {
520 1 : json_response(StatusCode::CONFLICT, ())
521 : }
522 8 : Err(tenant::CreateTimelineError::AncestorLsn(err)) => {
523 8 : json_response(StatusCode::NOT_ACCEPTABLE, HttpErrorBody::from_msg(
524 8 : format!("{err:#}")
525 8 : ))
526 : }
527 6 : Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
528 6 : json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
529 : }
530 : Err(tenant::CreateTimelineError::ShuttingDown) => {
531 0 : json_response(StatusCode::SERVICE_UNAVAILABLE,HttpErrorBody::from_msg("tenant shutting down".to_string()))
532 : }
533 3 : Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
534 : }
535 890 : }
536 : .instrument(info_span!("timeline_create",
537 : tenant_id = %tenant_shard_id.tenant_id,
538 896 : shard_id = %tenant_shard_id.shard_slug(),
539 : timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
540 7641072 : .await
541 891 : }
542 :
543 52 : async fn timeline_list_handler(
544 52 : request: Request<Body>,
545 52 : _cancel: CancellationToken,
546 52 : ) -> Result<Response<Body>, ApiError> {
547 52 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
548 52 : let include_non_incremental_logical_size: Option<bool> =
549 52 : parse_query_param(&request, "include-non-incremental-logical-size")?;
550 52 : let force_await_initial_logical_size: Option<bool> =
551 52 : parse_query_param(&request, "force-await-initial-logical-size")?;
552 52 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
553 :
554 52 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
555 :
556 52 : let response_data = async {
557 52 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
558 52 : let timelines = tenant.list_timelines();
559 52 :
560 52 : let mut response_data = Vec::with_capacity(timelines.len());
561 146 : for timeline in timelines {
562 94 : let timeline_info = build_timeline_info(
563 94 : &timeline,
564 94 : include_non_incremental_logical_size.unwrap_or(false),
565 94 : force_await_initial_logical_size.unwrap_or(false),
566 94 : &ctx,
567 94 : )
568 94 : .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
569 0 : .await
570 94 : .context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
571 94 : .map_err(ApiError::InternalServerError)?;
572 :
573 94 : response_data.push(timeline_info);
574 : }
575 52 : Ok::<Vec<TimelineInfo>, ApiError>(response_data)
576 52 : }
577 : .instrument(info_span!("timeline_list",
578 : tenant_id = %tenant_shard_id.tenant_id,
579 52 : shard_id = %tenant_shard_id.shard_slug()))
580 0 : .await?;
581 :
582 52 : json_response(StatusCode::OK, response_data)
583 52 : }
584 :
585 2 : async fn timeline_preserve_initdb_handler(
586 2 : request: Request<Body>,
587 2 : _cancel: CancellationToken,
588 2 : ) -> Result<Response<Body>, ApiError> {
589 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
590 2 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
591 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
592 :
593 : // Part of the process for disaster recovery from safekeeper-stored WAL:
594 : // If we don't recover into a new timeline but want to keep the timeline ID,
595 : // then the initdb archive is deleted. This endpoint copies it to a different
596 : // location where timeline recreation cand find it.
597 :
598 2 : async {
599 2 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
600 :
601 2 : let timeline = tenant
602 2 : .get_timeline(timeline_id, false)
603 2 : .map_err(|e| ApiError::NotFound(e.into()))?;
604 :
605 2 : timeline
606 2 : .preserve_initdb_archive()
607 2 : .await
608 2 : .context("preserving initdb archive")
609 2 : .map_err(ApiError::InternalServerError)?;
610 :
611 2 : Ok::<_, ApiError>(())
612 2 : }
613 : .instrument(info_span!("timeline_preserve_initdb_archive",
614 : tenant_id = %tenant_shard_id.tenant_id,
615 2 : shard_id = %tenant_shard_id.shard_slug(),
616 : %timeline_id))
617 2 : .await?;
618 :
619 2 : json_response(StatusCode::OK, ())
620 2 : }
621 :
622 2141 : async fn timeline_detail_handler(
623 2141 : request: Request<Body>,
624 2141 : _cancel: CancellationToken,
625 2141 : ) -> Result<Response<Body>, ApiError> {
626 2141 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
627 2141 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
628 2141 : let include_non_incremental_logical_size: Option<bool> =
629 2141 : parse_query_param(&request, "include-non-incremental-logical-size")?;
630 2141 : let force_await_initial_logical_size: Option<bool> =
631 2141 : parse_query_param(&request, "force-await-initial-logical-size")?;
632 2141 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
633 :
634 : // Logical size calculation needs downloading.
635 2141 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
636 :
637 2141 : let timeline_info = async {
638 2141 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
639 :
640 2132 : let timeline = tenant
641 2132 : .get_timeline(timeline_id, false)
642 2132 : .map_err(|e| ApiError::NotFound(e.into()))?;
643 :
644 2082 : let timeline_info = build_timeline_info(
645 2082 : &timeline,
646 2082 : include_non_incremental_logical_size.unwrap_or(false),
647 2082 : force_await_initial_logical_size.unwrap_or(false),
648 2082 : &ctx,
649 2082 : )
650 2814 : .await
651 2082 : .context("get local timeline info")
652 2082 : .map_err(ApiError::InternalServerError)?;
653 :
654 2082 : Ok::<_, ApiError>(timeline_info)
655 2141 : }
656 : .instrument(info_span!("timeline_detail",
657 : tenant_id = %tenant_shard_id.tenant_id,
658 2141 : shard_id = %tenant_shard_id.shard_slug(),
659 : %timeline_id))
660 2814 : .await?;
661 :
662 2082 : json_response(StatusCode::OK, timeline_info)
663 2141 : }
664 :
665 13 : async fn get_lsn_by_timestamp_handler(
666 13 : request: Request<Body>,
667 13 : cancel: CancellationToken,
668 13 : ) -> Result<Response<Body>, ApiError> {
669 13 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
670 13 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
671 :
672 13 : if !tenant_shard_id.is_zero() {
673 : // Requires SLRU contents, which are only stored on shard zero
674 0 : return Err(ApiError::BadRequest(anyhow!(
675 0 : "Size calculations are only available on shard zero"
676 0 : )));
677 13 : }
678 :
679 13 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
680 13 : let timestamp_raw = must_get_query_param(&request, "timestamp")?;
681 13 : let timestamp = humantime::parse_rfc3339(×tamp_raw)
682 13 : .with_context(|| format!("Invalid time: {:?}", timestamp_raw))
683 13 : .map_err(ApiError::BadRequest)?;
684 13 : let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
685 13 :
686 13 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
687 13 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
688 13 : let result = timeline
689 13 : .find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
690 1342 : .await?;
691 13 : #[derive(serde::Serialize, Debug)]
692 : struct Result {
693 : lsn: Lsn,
694 : kind: &'static str,
695 : }
696 13 : let (lsn, kind) = match result {
697 10 : LsnForTimestamp::Present(lsn) => (lsn, "present"),
698 1 : LsnForTimestamp::Future(lsn) => (lsn, "future"),
699 2 : LsnForTimestamp::Past(lsn) => (lsn, "past"),
700 0 : LsnForTimestamp::NoData(lsn) => (lsn, "nodata"),
701 : };
702 13 : let result = Result { lsn, kind };
703 13 : tracing::info!(
704 13 : lsn=?result.lsn,
705 13 : kind=%result.kind,
706 13 : timestamp=%timestamp_raw,
707 13 : "lsn_by_timestamp finished"
708 13 : );
709 13 : json_response(StatusCode::OK, result)
710 13 : }
711 :
712 12 : async fn get_timestamp_of_lsn_handler(
713 12 : request: Request<Body>,
714 12 : _cancel: CancellationToken,
715 12 : ) -> Result<Response<Body>, ApiError> {
716 12 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
717 12 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
718 :
719 12 : if !tenant_shard_id.is_zero() {
720 : // Requires SLRU contents, which are only stored on shard zero
721 0 : return Err(ApiError::BadRequest(anyhow!(
722 0 : "Size calculations are only available on shard zero"
723 0 : )));
724 12 : }
725 :
726 12 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
727 :
728 12 : let lsn_str = must_get_query_param(&request, "lsn")?;
729 12 : let lsn = Lsn::from_str(&lsn_str)
730 12 : .with_context(|| format!("Invalid LSN: {lsn_str:?}"))
731 12 : .map_err(ApiError::BadRequest)?;
732 :
733 12 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
734 12 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
735 81 : let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;
736 :
737 10 : match result {
738 10 : Some(time) => {
739 10 : let time = format_rfc3339(postgres_ffi::from_pg_timestamp(time)).to_string();
740 10 : json_response(StatusCode::OK, time)
741 : }
742 0 : None => json_response(StatusCode::NOT_FOUND, ()),
743 : }
744 12 : }
745 :
746 41 : async fn tenant_attach_handler(
747 41 : mut request: Request<Body>,
748 41 : _cancel: CancellationToken,
749 41 : ) -> Result<Response<Body>, ApiError> {
750 41 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
751 41 : check_permission(&request, Some(tenant_id))?;
752 :
753 41 : let maybe_body: Option<TenantAttachRequest> = json_request_or_empty_body(&mut request).await?;
754 38 : let tenant_conf = match &maybe_body {
755 38 : Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?,
756 0 : None => TenantConfOpt::default(),
757 : };
758 :
759 38 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
760 :
761 38 : info!("Handling tenant attach {tenant_id}");
762 :
763 38 : let state = get_state(&request);
764 :
765 38 : let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
766 :
767 38 : if state.remote_storage.is_none() {
768 0 : return Err(ApiError::BadRequest(anyhow!(
769 0 : "attach_tenant is not possible because pageserver was configured without remote storage"
770 0 : )));
771 38 : }
772 38 :
773 38 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
774 38 : let shard_params = ShardParameters::default();
775 38 : let location_conf = LocationConf::attached_single(tenant_conf, generation, &shard_params);
776 :
777 38 : let tenant = state
778 38 : .tenant_manager
779 38 : .upsert_location(
780 38 : tenant_shard_id,
781 38 : location_conf,
782 38 : None,
783 38 : SpawnMode::Normal,
784 38 : &ctx,
785 38 : )
786 108 : .await?;
787 :
788 38 : let Some(tenant) = tenant else {
789 : // This should never happen: indicates a bug in upsert_location
790 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
791 0 : "Upsert succeeded but didn't return tenant!"
792 0 : )));
793 : };
794 :
795 : // We might have successfully constructed a Tenant, but it could still
796 : // end up in a broken state:
797 : if let TenantState::Broken {
798 6 : reason,
799 : backtrace: _,
800 38 : } = tenant.current_state()
801 : {
802 6 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
803 6 : "Tenant state is Broken: {reason}"
804 6 : )));
805 32 : }
806 32 :
807 32 : json_response(StatusCode::ACCEPTED, ())
808 41 : }
809 :
810 65 : async fn timeline_delete_handler(
811 65 : request: Request<Body>,
812 65 : _cancel: CancellationToken,
813 65 : ) -> Result<Response<Body>, ApiError> {
814 65 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
815 65 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
816 65 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
817 :
818 65 : let state = get_state(&request);
819 :
820 65 : let tenant = state
821 65 : .tenant_manager
822 65 : .get_attached_tenant_shard(tenant_shard_id, false)
823 65 : .map_err(|e| {
824 1 : match e {
825 : // GetTenantError has a built-in conversion to ApiError, but in this context we don't
826 : // want to treat missing tenants as 404, to avoid ambiguity with successful deletions.
827 1 : GetTenantError::NotFound(_) => ApiError::PreconditionFailed(
828 1 : "Requested tenant is missing".to_string().into_boxed_str(),
829 1 : ),
830 0 : e => e.into(),
831 : }
832 65 : })?;
833 64 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
834 64 : tenant.delete_timeline(timeline_id).instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
835 378 : .await?;
836 :
837 53 : json_response(StatusCode::ACCEPTED, ())
838 65 : }
839 :
840 38 : async fn tenant_detach_handler(
841 38 : request: Request<Body>,
842 38 : _cancel: CancellationToken,
843 38 : ) -> Result<Response<Body>, ApiError> {
844 38 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
845 38 : check_permission(&request, Some(tenant_id))?;
846 38 : let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
847 :
848 : // This is a legacy API (`/location_conf` is the replacement). It only supports unsharded tenants
849 38 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
850 38 :
851 38 : let state = get_state(&request);
852 38 : let conf = state.conf;
853 38 : mgr::detach_tenant(
854 38 : conf,
855 38 : tenant_shard_id,
856 38 : detach_ignored.unwrap_or(false),
857 38 : &state.deletion_queue_client,
858 38 : )
859 38 : .instrument(info_span!("tenant_detach", %tenant_id, shard_id=%tenant_shard_id.shard_slug()))
860 183 : .await?;
861 :
862 35 : json_response(StatusCode::OK, ())
863 38 : }
864 :
865 2 : async fn tenant_reset_handler(
866 2 : request: Request<Body>,
867 2 : _cancel: CancellationToken,
868 2 : ) -> Result<Response<Body>, ApiError> {
869 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
870 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
871 :
872 2 : let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
873 :
874 2 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
875 2 : let state = get_state(&request);
876 2 : state
877 2 : .tenant_manager
878 2 : .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx)
879 6 : .await
880 2 : .map_err(ApiError::InternalServerError)?;
881 :
882 2 : json_response(StatusCode::OK, ())
883 2 : }
884 :
885 4 : async fn tenant_load_handler(
886 4 : mut request: Request<Body>,
887 4 : _cancel: CancellationToken,
888 4 : ) -> Result<Response<Body>, ApiError> {
889 4 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
890 4 : check_permission(&request, Some(tenant_id))?;
891 :
892 4 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
893 :
894 4 : let maybe_body: Option<TenantLoadRequest> = json_request_or_empty_body(&mut request).await?;
895 :
896 4 : let state = get_state(&request);
897 :
898 : // The /load request is only usable when control_plane_api is not set. Once it is set, callers
899 : // should always use /attach instead.
900 4 : let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
901 :
902 4 : mgr::load_tenant(
903 4 : state.conf,
904 4 : tenant_id,
905 4 : generation,
906 4 : state.broker_client.clone(),
907 4 : state.remote_storage.clone(),
908 4 : state.deletion_queue_client.clone(),
909 4 : &ctx,
910 4 : )
911 4 : .instrument(info_span!("load", %tenant_id))
912 6 : .await?;
913 :
914 3 : json_response(StatusCode::ACCEPTED, ())
915 4 : }
916 :
917 5 : async fn tenant_ignore_handler(
918 5 : request: Request<Body>,
919 5 : _cancel: CancellationToken,
920 5 : ) -> Result<Response<Body>, ApiError> {
921 5 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
922 5 : check_permission(&request, Some(tenant_id))?;
923 :
924 5 : let state = get_state(&request);
925 5 : let conf = state.conf;
926 5 : mgr::ignore_tenant(conf, tenant_id)
927 5 : .instrument(info_span!("ignore_tenant", %tenant_id))
928 15 : .await?;
929 :
930 5 : json_response(StatusCode::OK, ())
931 5 : }
932 :
933 164 : async fn tenant_list_handler(
934 164 : request: Request<Body>,
935 164 : _cancel: CancellationToken,
936 164 : ) -> Result<Response<Body>, ApiError> {
937 164 : check_permission(&request, None)?;
938 :
939 164 : let response_data = mgr::list_tenants()
940 164 : .instrument(info_span!("tenant_list"))
941 0 : .await
942 164 : .map_err(|_| {
943 0 : ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
944 164 : })?
945 164 : .iter()
946 164 : .map(|(id, state, gen)| TenantInfo {
947 158 : id: *id,
948 158 : state: state.clone(),
949 158 : current_physical_size: None,
950 158 : attachment_status: state.attachment_status(),
951 158 : generation: (*gen).into(),
952 164 : })
953 164 : .collect::<Vec<TenantInfo>>();
954 164 :
955 164 : json_response(StatusCode::OK, response_data)
956 164 : }
957 :
958 549 : async fn tenant_status(
959 549 : request: Request<Body>,
960 549 : _cancel: CancellationToken,
961 549 : ) -> Result<Response<Body>, ApiError> {
962 549 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
963 549 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
964 :
965 549 : let tenant_info = async {
966 549 : let tenant = mgr::get_tenant(tenant_shard_id, false)?;
967 :
968 : // Calculate total physical size of all timelines
969 486 : let mut current_physical_size = 0;
970 486 : for timeline in tenant.list_timelines().iter() {
971 471 : current_physical_size += timeline.layer_size_sum().await;
972 : }
973 :
974 486 : let state = tenant.current_state();
975 486 : Result::<_, ApiError>::Ok(TenantDetails {
976 486 : tenant_info: TenantInfo {
977 486 : id: tenant_shard_id,
978 486 : state: state.clone(),
979 486 : current_physical_size: Some(current_physical_size),
980 486 : attachment_status: state.attachment_status(),
981 486 : generation: tenant.generation().into(),
982 486 : },
983 486 : walredo: tenant.wal_redo_manager_status(),
984 486 : timelines: tenant.list_timeline_ids(),
985 486 : })
986 549 : }
987 : .instrument(info_span!("tenant_status_handler",
988 : tenant_id = %tenant_shard_id.tenant_id,
989 549 : shard_id = %tenant_shard_id.shard_slug()))
990 63 : .await?;
991 :
992 486 : json_response(StatusCode::OK, tenant_info)
993 549 : }
994 :
995 120 : async fn tenant_delete_handler(
996 120 : request: Request<Body>,
997 120 : _cancel: CancellationToken,
998 120 : ) -> Result<Response<Body>, ApiError> {
999 : // TODO openapi spec
1000 120 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1001 120 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1002 :
1003 120 : let state = get_state(&request);
1004 120 :
1005 120 : state
1006 120 : .tenant_manager
1007 120 : .delete_tenant(tenant_shard_id, ACTIVE_TENANT_TIMEOUT)
1008 : .instrument(info_span!("tenant_delete_handler",
1009 : tenant_id = %tenant_shard_id.tenant_id,
1010 120 : shard_id = %tenant_shard_id.shard_slug()
1011 : ))
1012 530 : .await?;
1013 :
1014 86 : json_response(StatusCode::ACCEPTED, ())
1015 120 : }
1016 :
1017 : /// HTTP endpoint to query the current tenant_size of a tenant.
1018 : ///
1019 : /// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used
1020 : /// to debug any of the calculations. Requires `tenant_id` request parameter, supports
1021 : /// `inputs_only=true|false` (default false) which supports debugging failure to calculate model
1022 : /// values.
1023 : ///
1024 : /// 'retention_period' query parameter overrides the cutoff that is used to calculate the size
1025 : /// (only if it is shorter than the real cutoff).
1026 : ///
1027 : /// Note: we don't update the cached size and prometheus metric here.
1028 : /// The retention period might be different, and it's nice to have a method to just calculate it
1029 : /// without modifying anything anyway.
1030 53 : async fn tenant_size_handler(
1031 53 : request: Request<Body>,
1032 53 : cancel: CancellationToken,
1033 53 : ) -> Result<Response<Body>, ApiError> {
1034 53 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1035 53 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1036 53 : let inputs_only: Option<bool> = parse_query_param(&request, "inputs_only")?;
1037 53 : let retention_period: Option<u64> = parse_query_param(&request, "retention_period")?;
1038 53 : let headers = request.headers();
1039 53 :
1040 53 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1041 53 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
1042 :
1043 53 : if !tenant_shard_id.is_zero() {
1044 0 : return Err(ApiError::BadRequest(anyhow!(
1045 0 : "Size calculations are only available on shard zero"
1046 0 : )));
1047 53 : }
1048 :
1049 : // this can be long operation
1050 53 : let inputs = tenant
1051 53 : .gather_size_inputs(
1052 53 : retention_period,
1053 53 : LogicalSizeCalculationCause::TenantSizeHandler,
1054 53 : &cancel,
1055 53 : &ctx,
1056 53 : )
1057 48 : .await
1058 53 : .map_err(ApiError::InternalServerError)?;
1059 :
1060 53 : let mut sizes = None;
1061 53 : let accepts_html = headers
1062 53 : .get(header::ACCEPT)
1063 53 : .map(|v| v == "text/html")
1064 53 : .unwrap_or_default();
1065 53 : if !inputs_only.unwrap_or(false) {
1066 53 : let storage_model = inputs
1067 53 : .calculate_model()
1068 53 : .map_err(ApiError::InternalServerError)?;
1069 53 : let size = storage_model.calculate();
1070 53 :
1071 53 : // If request header expects html, return html
1072 53 : if accepts_html {
1073 18 : return synthetic_size_html_response(inputs, storage_model, size);
1074 35 : }
1075 35 : sizes = Some(size);
1076 0 : } else if accepts_html {
1077 0 : return Err(ApiError::BadRequest(anyhow!(
1078 0 : "inputs_only parameter is incompatible with html output request"
1079 0 : )));
1080 0 : }
1081 :
1082 : /// The type resides in the pageserver not to expose `ModelInputs`.
1083 35 : #[derive(serde::Serialize)]
1084 : struct TenantHistorySize {
1085 : id: TenantId,
1086 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
1087 : ///
1088 : /// Will be none if `?inputs_only=true` was given.
1089 : size: Option<u64>,
1090 : /// Size of each segment used in the model.
1091 : /// Will be null if `?inputs_only=true` was given.
1092 : segment_sizes: Option<Vec<tenant_size_model::SegmentSizeResult>>,
1093 : inputs: crate::tenant::size::ModelInputs,
1094 : }
1095 :
1096 35 : json_response(
1097 35 : StatusCode::OK,
1098 35 : TenantHistorySize {
1099 35 : id: tenant_shard_id.tenant_id,
1100 35 : size: sizes.as_ref().map(|x| x.total_size),
1101 35 : segment_sizes: sizes.map(|x| x.segments),
1102 35 : inputs,
1103 35 : },
1104 35 : )
1105 53 : }
1106 :
1107 96 : async fn layer_map_info_handler(
1108 96 : request: Request<Body>,
1109 96 : _cancel: CancellationToken,
1110 96 : ) -> Result<Response<Body>, ApiError> {
1111 96 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1112 96 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1113 96 : let reset: LayerAccessStatsReset =
1114 96 : parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset);
1115 96 :
1116 96 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1117 :
1118 96 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1119 96 : let layer_map_info = timeline.layer_map_info(reset).await;
1120 :
1121 96 : json_response(StatusCode::OK, layer_map_info)
1122 96 : }
1123 :
1124 2 : async fn layer_download_handler(
1125 2 : request: Request<Body>,
1126 2 : _cancel: CancellationToken,
1127 2 : ) -> Result<Response<Body>, ApiError> {
1128 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1129 2 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1130 2 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
1131 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1132 :
1133 2 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1134 2 : let downloaded = timeline
1135 2 : .download_layer(layer_file_name)
1136 6 : .await
1137 2 : .map_err(ApiError::InternalServerError)?;
1138 :
1139 2 : match downloaded {
1140 2 : Some(true) => json_response(StatusCode::OK, ()),
1141 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
1142 0 : None => json_response(
1143 0 : StatusCode::BAD_REQUEST,
1144 0 : format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"),
1145 0 : ),
1146 : }
1147 2 : }
1148 :
1149 2252 : async fn evict_timeline_layer_handler(
1150 2252 : request: Request<Body>,
1151 2252 : _cancel: CancellationToken,
1152 2252 : ) -> Result<Response<Body>, ApiError> {
1153 2252 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1154 2252 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1155 2252 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1156 2252 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
1157 :
1158 2252 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1159 2252 : let evicted = timeline
1160 2252 : .evict_layer(layer_file_name)
1161 2208 : .await
1162 2252 : .map_err(ApiError::InternalServerError)?;
1163 :
1164 2252 : match evicted {
1165 2252 : Some(true) => json_response(StatusCode::OK, ()),
1166 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
1167 0 : None => json_response(
1168 0 : StatusCode::BAD_REQUEST,
1169 0 : format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"),
1170 0 : ),
1171 : }
1172 2252 : }
1173 :
1174 : /// Get tenant_size SVG graph along with the JSON data.
1175 18 : fn synthetic_size_html_response(
1176 18 : inputs: ModelInputs,
1177 18 : storage_model: StorageModel,
1178 18 : sizes: SizeResult,
1179 18 : ) -> Result<Response<Body>, ApiError> {
1180 18 : let mut timeline_ids: Vec<String> = Vec::new();
1181 18 : let mut timeline_map: HashMap<TimelineId, usize> = HashMap::new();
1182 26 : for (index, ti) in inputs.timeline_inputs.iter().enumerate() {
1183 26 : timeline_map.insert(ti.timeline_id, index);
1184 26 : timeline_ids.push(ti.timeline_id.to_string());
1185 26 : }
1186 18 : let seg_to_branch: Vec<usize> = inputs
1187 18 : .segments
1188 18 : .iter()
1189 66 : .map(|seg| *timeline_map.get(&seg.timeline_id).unwrap())
1190 18 : .collect();
1191 :
1192 18 : let svg =
1193 18 : tenant_size_model::svg::draw_svg(&storage_model, &timeline_ids, &seg_to_branch, &sizes)
1194 18 : .map_err(ApiError::InternalServerError)?;
1195 :
1196 18 : let mut response = String::new();
1197 18 :
1198 18 : use std::fmt::Write;
1199 18 : write!(response, "<html>\n<body>\n").unwrap();
1200 18 : write!(response, "<div>\n{svg}\n</div>").unwrap();
1201 18 : writeln!(response, "Project size: {}", sizes.total_size).unwrap();
1202 18 : writeln!(response, "<pre>").unwrap();
1203 18 : writeln!(
1204 18 : response,
1205 18 : "{}",
1206 18 : serde_json::to_string_pretty(&inputs).unwrap()
1207 18 : )
1208 18 : .unwrap();
1209 18 : writeln!(
1210 18 : response,
1211 18 : "{}",
1212 18 : serde_json::to_string_pretty(&sizes.segments).unwrap()
1213 18 : )
1214 18 : .unwrap();
1215 18 : writeln!(response, "</pre>").unwrap();
1216 18 : write!(response, "</body>\n</html>\n").unwrap();
1217 18 :
1218 18 : html_response(StatusCode::OK, response)
1219 18 : }
1220 :
1221 18 : pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>, ApiError> {
1222 18 : let response = Response::builder()
1223 18 : .status(status)
1224 18 : .header(header::CONTENT_TYPE, "text/html")
1225 18 : .body(Body::from(data.as_bytes().to_vec()))
1226 18 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1227 18 : Ok(response)
1228 18 : }
1229 :
1230 : /// Helper for requests that may take a generation, which is mandatory
1231 : /// when control_plane_api is set, but otherwise defaults to Generation::none()
1232 165 : fn get_request_generation(state: &State, req_gen: Option<u32>) -> Result<Generation, ApiError> {
1233 165 : if state.conf.control_plane_api.is_some() {
1234 165 : req_gen
1235 165 : .map(Generation::new)
1236 165 : .ok_or(ApiError::BadRequest(anyhow!(
1237 165 : "generation attribute missing"
1238 165 : )))
1239 : } else {
1240 : // Legacy mode: all tenants operate with no generation
1241 0 : Ok(Generation::none())
1242 : }
1243 165 : }
1244 :
1245 124 : async fn tenant_create_handler(
1246 124 : mut request: Request<Body>,
1247 124 : _cancel: CancellationToken,
1248 124 : ) -> Result<Response<Body>, ApiError> {
1249 124 : let request_data: TenantCreateRequest = json_request(&mut request).await?;
1250 124 : let target_tenant_id = request_data.new_tenant_id;
1251 124 : check_permission(&request, None)?;
1252 :
1253 123 : let _timer = STORAGE_TIME_GLOBAL
1254 123 : .get_metric_with_label_values(&[StorageTimeOperation::CreateTenant.into()])
1255 123 : .expect("bug")
1256 123 : .start_timer();
1257 :
1258 123 : let tenant_conf =
1259 123 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1260 :
1261 123 : let state = get_state(&request);
1262 :
1263 123 : let generation = get_request_generation(state, request_data.generation)?;
1264 :
1265 123 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1266 123 :
1267 123 : let location_conf =
1268 123 : LocationConf::attached_single(tenant_conf, generation, &request_data.shard_parameters);
1269 :
1270 123 : let new_tenant = state
1271 123 : .tenant_manager
1272 123 : .upsert_location(
1273 123 : target_tenant_id,
1274 123 : location_conf,
1275 123 : None,
1276 123 : SpawnMode::Create,
1277 123 : &ctx,
1278 123 : )
1279 277 : .await?;
1280 :
1281 48 : let Some(new_tenant) = new_tenant else {
1282 : // This should never happen: indicates a bug in upsert_location
1283 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1284 0 : "Upsert succeeded but didn't return tenant!"
1285 0 : )));
1286 : };
1287 : // We created the tenant. Existing API semantics are that the tenant
1288 : // is Active when this function returns.
1289 48 : new_tenant
1290 48 : .wait_to_become_active(ACTIVE_TENANT_TIMEOUT)
1291 63 : .await?;
1292 :
1293 43 : json_response(
1294 43 : StatusCode::CREATED,
1295 43 : TenantCreateResponse(new_tenant.tenant_shard_id().tenant_id),
1296 43 : )
1297 124 : }
1298 :
1299 45 : async fn get_tenant_config_handler(
1300 45 : request: Request<Body>,
1301 45 : _cancel: CancellationToken,
1302 45 : ) -> Result<Response<Body>, ApiError> {
1303 45 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1304 45 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1305 :
1306 45 : let tenant = mgr::get_tenant(tenant_shard_id, false)?;
1307 :
1308 45 : let response = HashMap::from([
1309 : (
1310 : "tenant_specific_overrides",
1311 45 : serde_json::to_value(tenant.tenant_specific_overrides())
1312 45 : .context("serializing tenant specific overrides")
1313 45 : .map_err(ApiError::InternalServerError)?,
1314 : ),
1315 : (
1316 45 : "effective_config",
1317 45 : serde_json::to_value(tenant.effective_config())
1318 45 : .context("serializing effective config")
1319 45 : .map_err(ApiError::InternalServerError)?,
1320 : ),
1321 : ]);
1322 :
1323 45 : json_response(StatusCode::OK, response)
1324 45 : }
1325 :
1326 30 : async fn update_tenant_config_handler(
1327 30 : mut request: Request<Body>,
1328 30 : _cancel: CancellationToken,
1329 30 : ) -> Result<Response<Body>, ApiError> {
1330 30 : let request_data: TenantConfigRequest = json_request(&mut request).await?;
1331 30 : let tenant_id = request_data.tenant_id;
1332 30 : check_permission(&request, Some(tenant_id))?;
1333 :
1334 30 : let tenant_conf =
1335 30 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1336 :
1337 30 : let state = get_state(&request);
1338 30 : mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
1339 30 : .instrument(info_span!("tenant_config", %tenant_id))
1340 60 : .await?;
1341 :
1342 30 : json_response(StatusCode::OK, ())
1343 30 : }
1344 :
1345 663 : async fn put_tenant_location_config_handler(
1346 663 : mut request: Request<Body>,
1347 663 : _cancel: CancellationToken,
1348 663 : ) -> Result<Response<Body>, ApiError> {
1349 663 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1350 :
1351 663 : let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
1352 663 : let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis);
1353 663 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1354 :
1355 663 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1356 663 : let state = get_state(&request);
1357 663 : let conf = state.conf;
1358 663 :
1359 663 : // The `Detached` state is special, it doesn't upsert a tenant, it removes
1360 663 : // its local disk content and drops it from memory.
1361 663 : if let LocationConfigMode::Detached = request_data.config.mode {
1362 7 : if let Err(e) =
1363 50 : mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
1364 : .instrument(info_span!("tenant_detach",
1365 : tenant_id = %tenant_shard_id.tenant_id,
1366 50 : shard_id = %tenant_shard_id.shard_slug()
1367 : ))
1368 210 : .await
1369 : {
1370 7 : match e {
1371 7 : TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
1372 7 : // This API is idempotent: a NotFound on a detach is fine.
1373 7 : }
1374 0 : _ => return Err(e.into()),
1375 : }
1376 43 : }
1377 50 : return json_response(StatusCode::OK, ());
1378 613 : }
1379 :
1380 613 : let location_conf =
1381 613 : LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1382 :
1383 613 : let attached = state
1384 613 : .tenant_manager
1385 613 : .upsert_location(
1386 613 : tenant_shard_id,
1387 613 : location_conf,
1388 613 : flush,
1389 613 : tenant::SpawnMode::Normal,
1390 613 : &ctx,
1391 613 : )
1392 1885 : .await?
1393 612 : .is_some();
1394 :
1395 612 : if let Some(_flush_ms) = flush {
1396 1 : match state
1397 1 : .secondary_controller
1398 1 : .upload_tenant(tenant_shard_id)
1399 1 : .await
1400 : {
1401 : Ok(()) => {
1402 1 : tracing::info!("Uploaded heatmap during flush");
1403 : }
1404 0 : Err(e) => {
1405 0 : tracing::warn!("Failed to flush heatmap: {e}");
1406 : }
1407 : }
1408 : } else {
1409 611 : tracing::info!("No flush requested when configuring");
1410 : }
1411 :
1412 : // This API returns a vector of pageservers where the tenant is attached: this is
1413 : // primarily for use in the sharding service. For compatibilty, we also return this
1414 : // when called directly on a pageserver, but the payload is always zero or one shards.
1415 612 : let mut response = TenantLocationConfigResponse { shards: Vec::new() };
1416 612 : if attached {
1417 581 : response.shards.push(TenantShardLocation {
1418 581 : shard_id: tenant_shard_id,
1419 581 : node_id: state.conf.id,
1420 581 : })
1421 31 : }
1422 :
1423 612 : json_response(StatusCode::OK, response)
1424 663 : }
1425 :
1426 5 : async fn list_location_config_handler(
1427 5 : request: Request<Body>,
1428 5 : _cancel: CancellationToken,
1429 5 : ) -> Result<Response<Body>, ApiError> {
1430 5 : let state = get_state(&request);
1431 5 : let slots = state.tenant_manager.list();
1432 5 : let result = LocationConfigListResponse {
1433 5 : tenant_shards: slots
1434 5 : .into_iter()
1435 5 : .map(|(tenant_shard_id, slot)| {
1436 4 : let v = match slot {
1437 4 : TenantSlot::Attached(t) => Some(t.get_location_conf()),
1438 0 : TenantSlot::Secondary(s) => Some(s.get_location_conf()),
1439 0 : TenantSlot::InProgress(_) => None,
1440 : };
1441 4 : (tenant_shard_id, v)
1442 5 : })
1443 5 : .collect(),
1444 5 : };
1445 5 : json_response(StatusCode::OK, result)
1446 5 : }
1447 :
1448 : // Do a time travel recovery on the given tenant/tenant shard. Tenant needs to be detached
1449 : // (from all pageservers) as it invalidates consistency assumptions.
1450 1 : async fn tenant_time_travel_remote_storage_handler(
1451 1 : request: Request<Body>,
1452 1 : cancel: CancellationToken,
1453 1 : ) -> Result<Response<Body>, ApiError> {
1454 1 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1455 :
1456 1 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1457 :
1458 1 : let timestamp_raw = must_get_query_param(&request, "travel_to")?;
1459 1 : let timestamp = humantime::parse_rfc3339(×tamp_raw)
1460 1 : .with_context(|| format!("Invalid time for travel_to: {timestamp_raw:?}"))
1461 1 : .map_err(ApiError::BadRequest)?;
1462 :
1463 1 : let done_if_after_raw = must_get_query_param(&request, "done_if_after")?;
1464 1 : let done_if_after = humantime::parse_rfc3339(&done_if_after_raw)
1465 1 : .with_context(|| format!("Invalid time for done_if_after: {done_if_after_raw:?}"))
1466 1 : .map_err(ApiError::BadRequest)?;
1467 :
1468 : // This is just a sanity check to fend off naive wrong usages of the API:
1469 : // the tenant needs to be detached *everywhere*
1470 1 : let state = get_state(&request);
1471 1 : let we_manage_tenant = state.tenant_manager.manages_tenant_shard(tenant_shard_id);
1472 1 : if we_manage_tenant {
1473 0 : return Err(ApiError::BadRequest(anyhow!(
1474 0 : "Tenant {tenant_shard_id} is already attached at this pageserver"
1475 0 : )));
1476 1 : }
1477 :
1478 1 : let Some(storage) = state.remote_storage.as_ref() else {
1479 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1480 0 : "remote storage not configured, cannot run time travel"
1481 0 : )));
1482 : };
1483 :
1484 1 : if timestamp > done_if_after {
1485 0 : return Err(ApiError::BadRequest(anyhow!(
1486 0 : "The done_if_after timestamp comes before the timestamp to recover to"
1487 0 : )));
1488 1 : }
1489 :
1490 1 : tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}");
1491 :
1492 1 : remote_timeline_client::upload::time_travel_recover_tenant(
1493 1 : storage,
1494 1 : &tenant_shard_id,
1495 1 : timestamp,
1496 1 : done_if_after,
1497 1 : &cancel,
1498 1 : )
1499 131 : .await
1500 1 : .map_err(|e| match e {
1501 0 : TimeTravelError::BadInput(e) => {
1502 0 : warn!("bad input error: {e}");
1503 0 : ApiError::BadRequest(anyhow!("bad input error"))
1504 : }
1505 : TimeTravelError::Unimplemented => {
1506 0 : ApiError::BadRequest(anyhow!("unimplemented for the configured remote storage"))
1507 : }
1508 0 : TimeTravelError::Cancelled => ApiError::InternalServerError(anyhow!("cancelled")),
1509 : TimeTravelError::TooManyVersions => {
1510 0 : ApiError::InternalServerError(anyhow!("too many versions in remote storage"))
1511 : }
1512 0 : TimeTravelError::Other(e) => {
1513 0 : warn!("internal error: {e}");
1514 0 : ApiError::InternalServerError(anyhow!("internal error"))
1515 : }
1516 1 : })?;
1517 :
1518 1 : json_response(StatusCode::OK, ())
1519 1 : }
1520 :
1521 : /// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
1522 2 : async fn handle_tenant_break(
1523 2 : r: Request<Body>,
1524 2 : _cancel: CancellationToken,
1525 2 : ) -> Result<Response<Body>, ApiError> {
1526 2 : let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
1527 :
1528 2 : let tenant = crate::tenant::mgr::get_tenant(tenant_shard_id, true)
1529 2 : .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
1530 :
1531 2 : tenant.set_broken("broken from test".to_owned()).await;
1532 :
1533 2 : json_response(StatusCode::OK, ())
1534 2 : }
1535 :
1536 : // Run GC immediately on given timeline.
1537 251 : async fn timeline_gc_handler(
1538 251 : mut request: Request<Body>,
1539 251 : cancel: CancellationToken,
1540 251 : ) -> Result<Response<Body>, ApiError> {
1541 251 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1542 251 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1543 251 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1544 :
1545 251 : let gc_req: TimelineGcRequest = json_request(&mut request).await?;
1546 :
1547 251 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1548 250 : let wait_task_done =
1549 251 : mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
1550 250 : let gc_result = wait_task_done
1551 250 : .await
1552 250 : .context("wait for gc task")
1553 250 : .map_err(ApiError::InternalServerError)?
1554 250 : .map_err(ApiError::InternalServerError)?;
1555 :
1556 249 : json_response(StatusCode::OK, gc_result)
1557 251 : }
1558 :
1559 : // Run compaction immediately on given timeline.
1560 124 : async fn timeline_compact_handler(
1561 124 : request: Request<Body>,
1562 124 : cancel: CancellationToken,
1563 124 : ) -> Result<Response<Body>, ApiError> {
1564 124 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1565 124 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1566 124 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1567 :
1568 124 : let mut flags = EnumSet::empty();
1569 124 : if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
1570 2 : flags |= CompactFlags::ForceRepartition;
1571 122 : }
1572 124 : async {
1573 124 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1574 124 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1575 124 : timeline
1576 124 : .compact(&cancel, flags, &ctx)
1577 19381 : .await
1578 124 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1579 124 : json_response(StatusCode::OK, ())
1580 124 : }
1581 124 : .instrument(info_span!("manual_compaction", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1582 19381 : .await
1583 124 : }
1584 :
1585 : // Run checkpoint immediately on given timeline.
1586 530 : async fn timeline_checkpoint_handler(
1587 530 : request: Request<Body>,
1588 530 : cancel: CancellationToken,
1589 530 : ) -> Result<Response<Body>, ApiError> {
1590 530 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1591 530 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1592 530 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1593 :
1594 530 : let mut flags = EnumSet::empty();
1595 530 : if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
1596 2 : flags |= CompactFlags::ForceRepartition;
1597 528 : }
1598 530 : async {
1599 530 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1600 530 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1601 530 : timeline
1602 530 : .freeze_and_flush()
1603 620 : .await
1604 530 : .map_err(ApiError::InternalServerError)?;
1605 530 : timeline
1606 530 : .compact(&cancel, flags, &ctx)
1607 423476 : .await
1608 529 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1609 :
1610 529 : json_response(StatusCode::OK, ())
1611 529 : }
1612 530 : .instrument(info_span!("manual_checkpoint", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1613 424096 : .await
1614 529 : }
1615 :
1616 2 : async fn timeline_download_remote_layers_handler_post(
1617 2 : mut request: Request<Body>,
1618 2 : _cancel: CancellationToken,
1619 2 : ) -> Result<Response<Body>, ApiError> {
1620 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1621 2 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1622 2 : let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?;
1623 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1624 :
1625 2 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1626 2 : match timeline.spawn_download_all_remote_layers(body).await {
1627 2 : Ok(st) => json_response(StatusCode::ACCEPTED, st),
1628 0 : Err(st) => json_response(StatusCode::CONFLICT, st),
1629 : }
1630 2 : }
1631 :
1632 50 : async fn timeline_download_remote_layers_handler_get(
1633 50 : request: Request<Body>,
1634 50 : _cancel: CancellationToken,
1635 50 : ) -> Result<Response<Body>, ApiError> {
1636 50 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1637 50 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1638 50 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1639 :
1640 50 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1641 50 : let info = timeline
1642 50 : .get_download_all_remote_layers_task_info()
1643 50 : .context("task never started since last pageserver process start")
1644 50 : .map_err(|e| ApiError::NotFound(e.into()))?;
1645 50 : json_response(StatusCode::OK, info)
1646 50 : }
1647 :
1648 20 : async fn deletion_queue_flush(
1649 20 : r: Request<Body>,
1650 20 : cancel: CancellationToken,
1651 20 : ) -> Result<Response<Body>, ApiError> {
1652 20 : let state = get_state(&r);
1653 20 :
1654 20 : if state.remote_storage.is_none() {
1655 : // Nothing to do if remote storage is disabled.
1656 0 : return json_response(StatusCode::OK, ());
1657 20 : }
1658 :
1659 20 : let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
1660 20 :
1661 20 : let flush = async {
1662 20 : if execute {
1663 36 : state.deletion_queue_client.flush_execute().await
1664 : } else {
1665 8 : state.deletion_queue_client.flush().await
1666 : }
1667 20 : }
1668 : // DeletionQueueError's only case is shutting down.
1669 20 : .map_err(|_| ApiError::ShuttingDown);
1670 20 :
1671 64 : tokio::select! {
1672 20 : res = flush => {
1673 20 : res.map(|()| json_response(StatusCode::OK, ()))?
1674 : }
1675 : _ = cancel.cancelled() => {
1676 : Err(ApiError::ShuttingDown)
1677 : }
1678 : }
1679 20 : }
1680 :
1681 : /// Try if `GetPage@Lsn` is successful, useful for manual debugging.
1682 0 : async fn getpage_at_lsn_handler(
1683 0 : request: Request<Body>,
1684 0 : _cancel: CancellationToken,
1685 0 : ) -> Result<Response<Body>, ApiError> {
1686 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1687 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1688 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1689 :
1690 : struct Key(crate::repository::Key);
1691 :
1692 : impl std::str::FromStr for Key {
1693 : type Err = anyhow::Error;
1694 :
1695 0 : fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
1696 0 : crate::repository::Key::from_hex(s).map(Key)
1697 0 : }
1698 : }
1699 :
1700 0 : let key: Key = parse_query_param(&request, "key")?
1701 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
1702 0 : let lsn: Lsn = parse_query_param(&request, "lsn")?
1703 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
1704 :
1705 0 : async {
1706 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1707 0 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1708 :
1709 0 : let page = timeline.get(key.0, lsn, &ctx).await?;
1710 :
1711 0 : Result::<_, ApiError>::Ok(
1712 0 : Response::builder()
1713 0 : .status(StatusCode::OK)
1714 0 : .header(header::CONTENT_TYPE, "application/octet-stream")
1715 0 : .body(hyper::Body::from(page))
1716 0 : .unwrap(),
1717 0 : )
1718 0 : }
1719 0 : .instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1720 0 : .await
1721 0 : }
1722 :
1723 0 : async fn timeline_collect_keyspace(
1724 0 : request: Request<Body>,
1725 0 : _cancel: CancellationToken,
1726 0 : ) -> Result<Response<Body>, ApiError> {
1727 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1728 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1729 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1730 :
1731 0 : let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
1732 :
1733 0 : async {
1734 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1735 0 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1736 0 : let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
1737 0 : let keys = timeline
1738 0 : .collect_keyspace(at_lsn, &ctx)
1739 0 : .await
1740 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1741 :
1742 0 : let res = pageserver_api::models::partitioning::Partitioning { keys, at_lsn };
1743 0 :
1744 0 : json_response(StatusCode::OK, res)
1745 0 : }
1746 0 : .instrument(info_span!("timeline_collect_keyspace", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1747 0 : .await
1748 0 : }
1749 :
1750 3081 : async fn active_timeline_of_active_tenant(
1751 3081 : tenant_shard_id: TenantShardId,
1752 3081 : timeline_id: TimelineId,
1753 3081 : ) -> Result<Arc<Timeline>, ApiError> {
1754 3081 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
1755 3081 : tenant
1756 3081 : .get_timeline(timeline_id, true)
1757 3081 : .map_err(|e| ApiError::NotFound(e.into()))
1758 3081 : }
1759 :
1760 0 : async fn always_panic_handler(
1761 0 : req: Request<Body>,
1762 0 : _cancel: CancellationToken,
1763 0 : ) -> Result<Response<Body>, ApiError> {
1764 0 : // Deliberately cause a panic to exercise the panic hook registered via std::panic::set_hook().
1765 0 : // For pageserver, the relevant panic hook is `tracing_panic_hook` , and the `sentry` crate's wrapper around it.
1766 0 : // Use catch_unwind to ensure that tokio nor hyper are distracted by our panic.
1767 0 : let query = req.uri().query();
1768 0 : let _ = std::panic::catch_unwind(|| {
1769 0 : panic!("unconditional panic for testing panic hook integration; request query: {query:?}")
1770 0 : });
1771 0 : json_response(StatusCode::NO_CONTENT, ())
1772 0 : }
1773 :
1774 13 : async fn disk_usage_eviction_run(
1775 13 : mut r: Request<Body>,
1776 13 : cancel: CancellationToken,
1777 13 : ) -> Result<Response<Body>, ApiError> {
1778 13 : check_permission(&r, None)?;
1779 :
1780 63 : #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
1781 : struct Config {
1782 : /// How many bytes to evict before reporting that pressure is relieved.
1783 : evict_bytes: u64,
1784 :
1785 : #[serde(default)]
1786 : eviction_order: crate::disk_usage_eviction_task::EvictionOrder,
1787 : }
1788 :
1789 61 : #[derive(Debug, Clone, Copy, serde::Serialize)]
1790 : struct Usage {
1791 : // remains unchanged after instantiation of the struct
1792 : evict_bytes: u64,
1793 : // updated by `add_available_bytes`
1794 : freed_bytes: u64,
1795 : }
1796 :
1797 : impl crate::disk_usage_eviction_task::Usage for Usage {
1798 254 : fn has_pressure(&self) -> bool {
1799 254 : self.evict_bytes > self.freed_bytes
1800 254 : }
1801 :
1802 462 : fn add_available_bytes(&mut self, bytes: u64) {
1803 462 : self.freed_bytes += bytes;
1804 462 : }
1805 : }
1806 :
1807 13 : let config = json_request::<Config>(&mut r).await?;
1808 :
1809 13 : let usage = Usage {
1810 13 : evict_bytes: config.evict_bytes,
1811 13 : freed_bytes: 0,
1812 13 : };
1813 13 :
1814 13 : let state = get_state(&r);
1815 :
1816 13 : let Some(storage) = state.remote_storage.as_ref() else {
1817 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1818 0 : "remote storage not configured, cannot run eviction iteration"
1819 0 : )));
1820 : };
1821 :
1822 13 : let eviction_state = state.disk_usage_eviction_state.clone();
1823 :
1824 13 : let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
1825 13 : &eviction_state,
1826 13 : storage,
1827 13 : usage,
1828 13 : &state.tenant_manager,
1829 13 : config.eviction_order,
1830 13 : &cancel,
1831 13 : )
1832 301 : .await;
1833 :
1834 13 : info!(?res, "disk_usage_eviction_task_iteration_impl finished");
1835 :
1836 13 : let res = res.map_err(ApiError::InternalServerError)?;
1837 :
1838 13 : json_response(StatusCode::OK, res)
1839 13 : }
1840 :
1841 7 : async fn secondary_upload_handler(
1842 7 : request: Request<Body>,
1843 7 : _cancel: CancellationToken,
1844 7 : ) -> Result<Response<Body>, ApiError> {
1845 7 : let state = get_state(&request);
1846 7 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1847 7 : state
1848 7 : .secondary_controller
1849 7 : .upload_tenant(tenant_shard_id)
1850 7 : .await
1851 7 : .map_err(ApiError::InternalServerError)?;
1852 :
1853 7 : json_response(StatusCode::OK, ())
1854 7 : }
1855 :
1856 6 : async fn secondary_download_handler(
1857 6 : request: Request<Body>,
1858 6 : _cancel: CancellationToken,
1859 6 : ) -> Result<Response<Body>, ApiError> {
1860 6 : let state = get_state(&request);
1861 6 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1862 6 : state
1863 6 : .secondary_controller
1864 6 : .download_tenant(tenant_shard_id)
1865 6 : .await
1866 6 : .map_err(ApiError::InternalServerError)?;
1867 :
1868 6 : json_response(StatusCode::OK, ())
1869 6 : }
1870 :
1871 0 : async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
1872 0 : json_response(
1873 0 : StatusCode::NOT_FOUND,
1874 0 : HttpErrorBody::from_msg("page not found".to_owned()),
1875 0 : )
1876 0 : }
1877 :
1878 0 : async fn post_tracing_event_handler(
1879 0 : mut r: Request<Body>,
1880 0 : _cancel: CancellationToken,
1881 0 : ) -> Result<Response<Body>, ApiError> {
1882 0 : #[derive(Debug, serde::Deserialize)]
1883 : #[serde(rename_all = "lowercase")]
1884 : enum Level {
1885 : Error,
1886 : Warn,
1887 : Info,
1888 : Debug,
1889 : Trace,
1890 : }
1891 0 : #[derive(Debug, serde::Deserialize)]
1892 : struct Request {
1893 : level: Level,
1894 : message: String,
1895 : }
1896 0 : let body: Request = json_request(&mut r)
1897 0 : .await
1898 0 : .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
1899 :
1900 0 : match body.level {
1901 0 : Level::Error => tracing::error!(?body.message),
1902 0 : Level::Warn => tracing::warn!(?body.message),
1903 0 : Level::Info => tracing::info!(?body.message),
1904 0 : Level::Debug => tracing::debug!(?body.message),
1905 0 : Level::Trace => tracing::trace!(?body.message),
1906 : }
1907 :
1908 0 : json_response(StatusCode::OK, ())
1909 0 : }
1910 :
1911 : /// Common functionality of all the HTTP API handlers.
1912 : ///
1913 : /// - Adds a tracing span to each request (by `request_span`)
1914 : /// - Logs the request depending on the request method (by `request_span`)
1915 : /// - Logs the response if it was not successful (by `request_span`
1916 : /// - Shields the handler function from async cancellations. Hyper can drop the handler
1917 : /// Future if the connection to the client is lost, but most of the pageserver code is
1918 : /// not async cancellation safe. This converts the dropped future into a graceful cancellation
1919 : /// request with a CancellationToken.
1920 9176 : async fn api_handler<R, H>(request: Request<Body>, handler: H) -> Result<Response<Body>, ApiError>
1921 9176 : where
1922 9176 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1923 9176 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
1924 9176 : {
1925 9176 : // Spawn a new task to handle the request, to protect the handler from unexpected
1926 9176 : // async cancellations. Most pageserver functions are not async cancellation safe.
1927 9176 : // We arm a drop-guard, so that if Hyper drops the Future, we signal the task
1928 9176 : // with the cancellation token.
1929 9176 : let token = CancellationToken::new();
1930 9176 : let cancel_guard = token.clone().drop_guard();
1931 9176 : let result = request_span(request, move |r| async {
1932 9176 : let handle = tokio::spawn(
1933 9176 : async {
1934 9176 : let token_cloned = token.clone();
1935 8095588 : let result = handler(r, token).await;
1936 9169 : if token_cloned.is_cancelled() {
1937 : // dropguard has executed: we will never turn this result into response.
1938 : //
1939 : // at least temporarily do {:?} logging; these failures are rare enough but
1940 : // could hide difficult errors.
1941 2 : match &result {
1942 2 : Ok(response) => {
1943 2 : let status = response.status();
1944 2 : info!(%status, "Cancelled request finished successfully")
1945 : }
1946 0 : Err(e) => error!("Cancelled request finished with an error: {e:?}"),
1947 : }
1948 9167 : }
1949 : // only logging for cancelled panicked request handlers is the tracing_panic_hook,
1950 : // which should suffice.
1951 : //
1952 : // there is still a chance to lose the result due to race between
1953 : // returning from here and the actual connection closing happening
1954 : // before outer task gets to execute. leaving that up for #5815.
1955 9169 : result
1956 9176 : }
1957 9176 : .in_current_span(),
1958 9176 : );
1959 9176 :
1960 9244 : match handle.await {
1961 : // TODO: never actually return Err from here, always Ok(...) so that we can log
1962 : // spanned errors. Call api_error_handler instead and return appropriate Body.
1963 9167 : Ok(result) => result,
1964 0 : Err(e) => {
1965 0 : // The handler task panicked. We have a global panic handler that logs the
1966 0 : // panic with its backtrace, so no need to log that here. Only log a brief
1967 0 : // message to make it clear that we returned the error to the client.
1968 0 : error!("HTTP request handler task panicked: {e:#}");
1969 :
1970 : // Don't return an Error here, because then fallback error handler that was
1971 : // installed in make_router() will print the error. Instead, construct the
1972 : // HTTP error response and return that.
1973 0 : Ok(
1974 0 : ApiError::InternalServerError(anyhow!("HTTP request handler task panicked"))
1975 0 : .into_response(),
1976 0 : )
1977 : }
1978 : }
1979 9176 : })
1980 9244 : .await;
1981 :
1982 9167 : cancel_guard.disarm();
1983 9167 :
1984 9167 : result
1985 9167 : }
1986 :
1987 : /// Like api_handler, but returns an error response if the server is built without
1988 : /// the 'testing' feature.
1989 835 : async fn testing_api_handler<R, H>(
1990 835 : desc: &str,
1991 835 : request: Request<Body>,
1992 835 : handler: H,
1993 835 : ) -> Result<Response<Body>, ApiError>
1994 835 : where
1995 835 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1996 835 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
1997 835 : {
1998 835 : if cfg!(feature = "testing") {
1999 835 : api_handler(request, handler).await
2000 : } else {
2001 0 : std::future::ready(Err(ApiError::BadRequest(anyhow!(
2002 0 : "Cannot {desc} because pageserver was compiled without testing APIs",
2003 0 : ))))
2004 0 : .await
2005 : }
2006 834 : }
2007 :
2008 604 : pub fn make_router(
2009 604 : state: Arc<State>,
2010 604 : launch_ts: &'static LaunchTimestamp,
2011 604 : auth: Option<Arc<SwappableJwtAuth>>,
2012 604 : ) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
2013 604 : let spec = include_bytes!("openapi_spec.yml");
2014 604 : let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
2015 604 : if auth.is_some() {
2016 120 : router = router.middleware(auth_middleware(|request| {
2017 120 : let state = get_state(request);
2018 120 : if state.allowlist_routes.contains(request.uri()) {
2019 34 : None
2020 : } else {
2021 86 : state.auth.as_deref()
2022 : }
2023 120 : }))
2024 593 : }
2025 :
2026 604 : router = router.middleware(
2027 604 : endpoint::add_response_header_middleware(
2028 604 : "PAGESERVER_LAUNCH_TIMESTAMP",
2029 604 : &launch_ts.to_string(),
2030 604 : )
2031 604 : .expect("construct launch timestamp header middleware"),
2032 604 : );
2033 604 :
2034 604 : Ok(router
2035 604 : .data(state)
2036 610 : .get("/v1/status", |r| api_handler(r, status_handler))
2037 604 : .put("/v1/failpoints", |r| {
2038 179 : testing_api_handler("manage failpoints", r, failpoints_handler)
2039 604 : })
2040 604 : .post("/v1/reload_auth_validation_keys", |r| {
2041 6 : api_handler(r, reload_auth_validation_keys_handler)
2042 604 : })
2043 604 : .get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
2044 604 : .post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
2045 604 : .get("/v1/tenant/:tenant_shard_id", |r| {
2046 549 : api_handler(r, tenant_status)
2047 604 : })
2048 604 : .delete("/v1/tenant/:tenant_shard_id", |r| {
2049 120 : api_handler(r, tenant_delete_handler)
2050 604 : })
2051 604 : .get("/v1/tenant/:tenant_shard_id/synthetic_size", |r| {
2052 53 : api_handler(r, tenant_size_handler)
2053 604 : })
2054 604 : .put("/v1/tenant/config", |r| {
2055 30 : api_handler(r, update_tenant_config_handler)
2056 604 : })
2057 604 : .get("/v1/tenant/:tenant_shard_id/config", |r| {
2058 45 : api_handler(r, get_tenant_config_handler)
2059 604 : })
2060 663 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
2061 663 : api_handler(r, put_tenant_location_config_handler)
2062 663 : })
2063 604 : .get("/v1/location_config", |r| {
2064 5 : api_handler(r, list_location_config_handler)
2065 604 : })
2066 604 : .put(
2067 604 : "/v1/tenant/:tenant_shard_id/time_travel_remote_storage",
2068 604 : |r| api_handler(r, tenant_time_travel_remote_storage_handler),
2069 604 : )
2070 604 : .get("/v1/tenant/:tenant_shard_id/timeline", |r| {
2071 52 : api_handler(r, timeline_list_handler)
2072 604 : })
2073 897 : .post("/v1/tenant/:tenant_shard_id/timeline", |r| {
2074 897 : api_handler(r, timeline_create_handler)
2075 897 : })
2076 604 : .post("/v1/tenant/:tenant_id/attach", |r| {
2077 41 : api_handler(r, tenant_attach_handler)
2078 604 : })
2079 604 : .post("/v1/tenant/:tenant_id/detach", |r| {
2080 38 : api_handler(r, tenant_detach_handler)
2081 604 : })
2082 604 : .post("/v1/tenant/:tenant_shard_id/reset", |r| {
2083 2 : api_handler(r, tenant_reset_handler)
2084 604 : })
2085 604 : .post("/v1/tenant/:tenant_id/load", |r| {
2086 4 : api_handler(r, tenant_load_handler)
2087 604 : })
2088 604 : .post("/v1/tenant/:tenant_id/ignore", |r| {
2089 5 : api_handler(r, tenant_ignore_handler)
2090 604 : })
2091 604 : .post(
2092 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/preserve_initdb_archive",
2093 604 : |r| api_handler(r, timeline_preserve_initdb_handler),
2094 604 : )
2095 2141 : .get("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
2096 2141 : api_handler(r, timeline_detail_handler)
2097 2141 : })
2098 604 : .get(
2099 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_lsn_by_timestamp",
2100 604 : |r| api_handler(r, get_lsn_by_timestamp_handler),
2101 604 : )
2102 604 : .get(
2103 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_timestamp_of_lsn",
2104 604 : |r| api_handler(r, get_timestamp_of_lsn_handler),
2105 604 : )
2106 604 : .put(
2107 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
2108 604 : |r| api_handler(r, timeline_gc_handler),
2109 604 : )
2110 604 : .put(
2111 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
2112 604 : |r| testing_api_handler("run timeline compaction", r, timeline_compact_handler),
2113 604 : )
2114 604 : .put(
2115 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",
2116 604 : |r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler),
2117 604 : )
2118 604 : .post(
2119 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
2120 604 : |r| api_handler(r, timeline_download_remote_layers_handler_post),
2121 604 : )
2122 604 : .get(
2123 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
2124 604 : |r| api_handler(r, timeline_download_remote_layers_handler_get),
2125 604 : )
2126 604 : .delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
2127 65 : api_handler(r, timeline_delete_handler)
2128 604 : })
2129 604 : .get(
2130 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer",
2131 604 : |r| api_handler(r, layer_map_info_handler),
2132 604 : )
2133 604 : .get(
2134 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
2135 604 : |r| api_handler(r, layer_download_handler),
2136 604 : )
2137 604 : .delete(
2138 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
2139 2252 : |r| api_handler(r, evict_timeline_layer_handler),
2140 604 : )
2141 604 : .post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
2142 7 : api_handler(r, secondary_upload_handler)
2143 604 : })
2144 604 : .put("/v1/disk_usage_eviction/run", |r| {
2145 13 : api_handler(r, disk_usage_eviction_run)
2146 604 : })
2147 604 : .put("/v1/deletion_queue/flush", |r| {
2148 20 : api_handler(r, deletion_queue_flush)
2149 604 : })
2150 604 : .post("/v1/tenant/:tenant_shard_id/secondary/download", |r| {
2151 6 : api_handler(r, secondary_download_handler)
2152 604 : })
2153 604 : .put("/v1/tenant/:tenant_shard_id/break", |r| {
2154 2 : testing_api_handler("set tenant state to broken", r, handle_tenant_break)
2155 604 : })
2156 604 : .get("/v1/panic", |r| api_handler(r, always_panic_handler))
2157 604 : .post("/v1/tracing/event", |r| {
2158 0 : testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
2159 604 : })
2160 604 : .get(
2161 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage",
2162 604 : |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
2163 604 : )
2164 604 : .get(
2165 604 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
2166 604 : |r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
2167 604 : )
2168 604 : .any(handler_404))
2169 604 : }
|