TLA Line data Source code
1 : //!
2 : //! Management HTTP API
3 : //!
4 : use std::collections::HashMap;
5 : use std::str::FromStr;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use anyhow::{anyhow, Context, Result};
10 : use enumset::EnumSet;
11 : use futures::TryFutureExt;
12 : use humantime::format_rfc3339;
13 : use hyper::header;
14 : use hyper::StatusCode;
15 : use hyper::{Body, Request, Response, Uri};
16 : use metrics::launch_timestamp::LaunchTimestamp;
17 : use pageserver_api::models::TenantDetails;
18 : use pageserver_api::models::{
19 : DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
20 : TenantLoadRequest, TenantLocationConfigRequest,
21 : };
22 : use pageserver_api::shard::TenantShardId;
23 : use remote_storage::GenericRemoteStorage;
24 : use tenant_size_model::{SizeResult, StorageModel};
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::*;
27 : use utils::auth::JwtAuth;
28 : use utils::failpoint_support::failpoints_handler;
29 : use utils::http::endpoint::request_span;
30 : use utils::http::json::json_request_or_empty_body;
31 : use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
32 :
33 : use crate::context::{DownloadBehavior, RequestContext};
34 : use crate::deletion_queue::DeletionQueueClient;
35 : use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
36 : use crate::pgdatadir_mapping::LsnForTimestamp;
37 : use crate::task_mgr::TaskKind;
38 : use crate::tenant::config::{LocationConf, TenantConfOpt};
39 : use crate::tenant::mgr::GetActiveTenantError;
40 : use crate::tenant::mgr::{
41 : GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
42 : TenantSlotError, TenantSlotUpsertError, TenantStateError,
43 : };
44 : use crate::tenant::secondary::SecondaryController;
45 : use crate::tenant::size::ModelInputs;
46 : use crate::tenant::storage_layer::LayerAccessStatsReset;
47 : use crate::tenant::timeline::CompactFlags;
48 : use crate::tenant::timeline::Timeline;
49 : use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources};
50 : use crate::{config::PageServerConf, tenant::mgr};
51 : use crate::{disk_usage_eviction_task, tenant};
52 : use pageserver_api::models::{
53 : StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
54 : TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
55 : };
56 : use utils::{
57 : auth::SwappableJwtAuth,
58 : generation::Generation,
59 : http::{
60 : endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
61 : error::{ApiError, HttpErrorBody},
62 : json::{json_request, json_response},
63 : request::parse_request_param,
64 : RequestExt, RouterBuilder,
65 : },
66 : id::{TenantId, TimelineId},
67 : lsn::Lsn,
68 : };
69 :
70 : // For APIs that require an Active tenant, how long should we block waiting for that state?
71 : // This is not functionally necessary (clients will retry), but avoids generating a lot of
72 : // failed API calls while tenants are activating.
73 : const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
74 :
75 : pub struct State {
76 : conf: &'static PageServerConf,
77 : tenant_manager: Arc<TenantManager>,
78 : auth: Option<Arc<SwappableJwtAuth>>,
79 : allowlist_routes: Vec<Uri>,
80 : remote_storage: Option<GenericRemoteStorage>,
81 : broker_client: storage_broker::BrokerClientChannel,
82 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
83 : deletion_queue_client: DeletionQueueClient,
84 : secondary_controller: SecondaryController,
85 : }
86 :
87 : impl State {
88 : #[allow(clippy::too_many_arguments)]
89 CBC 557 : pub fn new(
90 557 : conf: &'static PageServerConf,
91 557 : tenant_manager: Arc<TenantManager>,
92 557 : auth: Option<Arc<SwappableJwtAuth>>,
93 557 : remote_storage: Option<GenericRemoteStorage>,
94 557 : broker_client: storage_broker::BrokerClientChannel,
95 557 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
96 557 : deletion_queue_client: DeletionQueueClient,
97 557 : secondary_controller: SecondaryController,
98 557 : ) -> anyhow::Result<Self> {
99 557 : let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
100 557 : .iter()
101 2228 : .map(|v| v.parse().unwrap())
102 557 : .collect::<Vec<_>>();
103 557 : Ok(Self {
104 557 : conf,
105 557 : tenant_manager,
106 557 : auth,
107 557 : allowlist_routes,
108 557 : remote_storage,
109 557 : broker_client,
110 557 : disk_usage_eviction_state,
111 557 : deletion_queue_client,
112 557 : secondary_controller,
113 557 : })
114 557 : }
115 :
116 475 : fn tenant_resources(&self) -> TenantSharedResources {
117 475 : TenantSharedResources {
118 475 : broker_client: self.broker_client.clone(),
119 475 : remote_storage: self.remote_storage.clone(),
120 475 : deletion_queue_client: self.deletion_queue_client.clone(),
121 475 : }
122 475 : }
123 : }
124 :
125 : #[inline(always)]
126 2385 : fn get_state(request: &Request<Body>) -> &State {
127 2385 : request
128 2385 : .data::<Arc<State>>()
129 2385 : .expect("unknown state type")
130 2385 : .as_ref()
131 2385 : }
132 :
133 : #[inline(always)]
134 569 : fn get_config(request: &Request<Body>) -> &'static PageServerConf {
135 569 : get_state(request).conf
136 569 : }
137 :
138 : /// Check that the requester is authorized to operate on given tenant
139 8719 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
140 8719 : check_permission_with(request, |claims| {
141 88 : crate::auth::check_permission(claims, tenant_id)
142 8719 : })
143 8719 : }
144 :
145 : impl From<PageReconstructError> for ApiError {
146 2 : fn from(pre: PageReconstructError) -> ApiError {
147 2 : match pre {
148 2 : PageReconstructError::Other(pre) => ApiError::InternalServerError(pre),
149 : PageReconstructError::Cancelled => {
150 UBC 0 : ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
151 : }
152 : PageReconstructError::AncestorStopping(_) => {
153 0 : ApiError::ResourceUnavailable(format!("{pre}").into())
154 : }
155 0 : PageReconstructError::AncestorLsnTimeout(e) => ApiError::Timeout(format!("{e}").into()),
156 0 : PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
157 : }
158 CBC 2 : }
159 : }
160 :
161 : impl From<TenantMapInsertError> for ApiError {
162 17 : fn from(tmie: TenantMapInsertError) -> ApiError {
163 17 : match tmie {
164 14 : TenantMapInsertError::SlotError(e) => e.into(),
165 UBC 0 : TenantMapInsertError::SlotUpsertError(e) => e.into(),
166 CBC 3 : TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
167 : }
168 17 : }
169 : }
170 :
171 : impl From<TenantSlotError> for ApiError {
172 18 : fn from(e: TenantSlotError) -> ApiError {
173 18 : use TenantSlotError::*;
174 18 : match e {
175 4 : NotFound(tenant_id) => {
176 4 : ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
177 : }
178 14 : e @ (AlreadyExists(_, _) | Conflict(_)) => ApiError::Conflict(format!("{e}")),
179 : InProgress => {
180 UBC 0 : ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
181 : }
182 0 : MapState(e) => e.into(),
183 : }
184 CBC 18 : }
185 : }
186 :
187 : impl From<TenantSlotUpsertError> for ApiError {
188 UBC 0 : fn from(e: TenantSlotUpsertError) -> ApiError {
189 0 : use TenantSlotUpsertError::*;
190 0 : match e {
191 0 : InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")),
192 0 : MapState(e) => e.into(),
193 : }
194 0 : }
195 : }
196 :
197 : impl From<TenantMapError> for ApiError {
198 0 : fn from(e: TenantMapError) -> ApiError {
199 0 : use TenantMapError::*;
200 0 : match e {
201 : StillInitializing | ShuttingDown => {
202 0 : ApiError::ResourceUnavailable(format!("{e}").into())
203 0 : }
204 0 : }
205 0 : }
206 : }
207 :
208 : impl From<TenantStateError> for ApiError {
209 CBC 3 : fn from(tse: TenantStateError) -> ApiError {
210 3 : match tse {
211 : TenantStateError::IsStopping(_) => {
212 UBC 0 : ApiError::ResourceUnavailable("Tenant is stopping".into())
213 : }
214 CBC 3 : TenantStateError::SlotError(e) => e.into(),
215 UBC 0 : TenantStateError::SlotUpsertError(e) => e.into(),
216 0 : TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)),
217 : }
218 CBC 3 : }
219 : }
220 :
221 : impl From<GetTenantError> for ApiError {
222 71 : fn from(tse: GetTenantError) -> ApiError {
223 71 : match tse {
224 65 : GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
225 UBC 0 : GetTenantError::Broken(reason) => {
226 0 : ApiError::InternalServerError(anyhow!("tenant is broken: {}", reason))
227 : }
228 : GetTenantError::NotActive(_) => {
229 : // Why is this not `ApiError::NotFound`?
230 : // Because we must be careful to never return 404 for a tenant if it does
231 : // in fact exist locally. If we did, the caller could draw the conclusion
232 : // that it can attach the tenant to another PS and we'd be in split-brain.
233 : //
234 : // (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
235 CBC 6 : ApiError::ResourceUnavailable("Tenant not yet active".into())
236 : }
237 UBC 0 : GetTenantError::MapState(e) => ApiError::ResourceUnavailable(format!("{e}").into()),
238 : }
239 CBC 71 : }
240 : }
241 :
242 : impl From<GetActiveTenantError> for ApiError {
243 UBC 0 : fn from(e: GetActiveTenantError) -> ApiError {
244 0 : match e {
245 0 : GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)),
246 0 : GetActiveTenantError::Cancelled => ApiError::ShuttingDown,
247 0 : GetActiveTenantError::NotFound(gte) => gte.into(),
248 : GetActiveTenantError::WaitForActiveTimeout { .. } => {
249 0 : ApiError::ResourceUnavailable(format!("{}", e).into())
250 : }
251 : }
252 0 : }
253 : }
254 :
255 : impl From<SetNewTenantConfigError> for ApiError {
256 0 : fn from(e: SetNewTenantConfigError) -> ApiError {
257 0 : match e {
258 0 : SetNewTenantConfigError::GetTenant(tid) => {
259 0 : ApiError::NotFound(anyhow!("tenant {}", tid).into())
260 : }
261 0 : e @ SetNewTenantConfigError::Persist(_) => {
262 0 : ApiError::InternalServerError(anyhow::Error::new(e))
263 : }
264 : }
265 0 : }
266 : }
267 :
268 : impl From<crate::tenant::DeleteTimelineError> for ApiError {
269 CBC 9 : fn from(value: crate::tenant::DeleteTimelineError) -> Self {
270 9 : use crate::tenant::DeleteTimelineError::*;
271 9 : match value {
272 1 : NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
273 1 : HasChildren(children) => ApiError::PreconditionFailed(
274 1 : format!("Cannot delete timeline which has child timelines: {children:?}")
275 1 : .into_boxed_str(),
276 1 : ),
277 3 : a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
278 4 : Other(e) => ApiError::InternalServerError(e),
279 : }
280 9 : }
281 : }
282 :
283 : impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
284 UBC 0 : fn from(value: crate::tenant::mgr::DeleteTimelineError) -> Self {
285 : use crate::tenant::mgr::DeleteTimelineError::*;
286 0 : match value {
287 : // Report Precondition failed so client can distinguish between
288 : // "tenant is missing" case from "timeline is missing"
289 0 : Tenant(GetTenantError::NotFound(..)) => ApiError::PreconditionFailed(
290 0 : "Requested tenant is missing".to_owned().into_boxed_str(),
291 0 : ),
292 0 : Tenant(t) => ApiError::from(t),
293 0 : Timeline(t) => ApiError::from(t),
294 : }
295 0 : }
296 : }
297 :
298 : impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
299 CBC 19 : fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
300 19 : use crate::tenant::delete::DeleteTenantError::*;
301 19 : match value {
302 UBC 0 : Get(g) => ApiError::from(g),
303 0 : e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
304 0 : Timeline(t) => ApiError::from(t),
305 0 : NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
306 CBC 1 : SlotError(e) => e.into(),
307 UBC 0 : SlotUpsertError(e) => e.into(),
308 CBC 16 : Other(o) => ApiError::InternalServerError(o),
309 2 : e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
310 UBC 0 : Cancelled => ApiError::ShuttingDown,
311 : }
312 CBC 19 : }
313 : }
314 :
315 : // Helper function to construct a TimelineInfo struct for a timeline
316 2234 : async fn build_timeline_info(
317 2234 : timeline: &Arc<Timeline>,
318 2234 : include_non_incremental_logical_size: bool,
319 2234 : ctx: &RequestContext,
320 2234 : ) -> anyhow::Result<TimelineInfo> {
321 2234 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
322 :
323 2234 : let mut info = build_timeline_info_common(timeline, ctx).await?;
324 2234 : if include_non_incremental_logical_size {
325 : // XXX we should be using spawn_ondemand_logical_size_calculation here.
326 : // Otherwise, if someone deletes the timeline / detaches the tenant while
327 : // we're executing this function, we will outlive the timeline on-disk state.
328 : info.current_logical_size_non_incremental = Some(
329 35 : timeline
330 35 : .get_current_logical_size_non_incremental(info.last_record_lsn, ctx)
331 2210 : .await?,
332 : );
333 2199 : }
334 2234 : Ok(info)
335 2234 : }
336 :
337 2993 : async fn build_timeline_info_common(
338 2993 : timeline: &Arc<Timeline>,
339 2993 : ctx: &RequestContext,
340 2993 : ) -> anyhow::Result<TimelineInfo> {
341 2993 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
342 2993 : let initdb_lsn = timeline.initdb_lsn;
343 2993 : let last_record_lsn = timeline.get_last_record_lsn();
344 2993 : let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
345 2993 : let guard = timeline.last_received_wal.lock().unwrap();
346 2993 : if let Some(info) = guard.as_ref() {
347 1853 : (
348 1853 : Some(format!("{:?}", info.wal_source_connconf)), // Password is hidden, but it's for statistics only.
349 1853 : Some(info.last_received_msg_lsn),
350 1853 : Some(info.last_received_msg_ts),
351 1853 : )
352 : } else {
353 1140 : (None, None, None)
354 : }
355 : };
356 :
357 2993 : let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
358 2993 : let ancestor_lsn = match timeline.get_ancestor_lsn() {
359 2271 : Lsn(0) => None,
360 722 : lsn @ Lsn(_) => Some(lsn),
361 : };
362 2993 : let current_logical_size =
363 2993 : timeline.get_current_logical_size(tenant::timeline::GetLogicalSizePriority::User, ctx);
364 2993 : let current_physical_size = Some(timeline.layer_size_sum().await);
365 2993 : let state = timeline.current_state();
366 2993 : let remote_consistent_lsn_projected = timeline
367 2993 : .get_remote_consistent_lsn_projected()
368 2993 : .unwrap_or(Lsn(0));
369 2993 : let remote_consistent_lsn_visible = timeline
370 2993 : .get_remote_consistent_lsn_visible()
371 2993 : .unwrap_or(Lsn(0));
372 2993 :
373 2993 : let walreceiver_status = timeline.walreceiver_status();
374 :
375 2993 : let info = TimelineInfo {
376 2993 : tenant_id: timeline.tenant_shard_id,
377 2993 : timeline_id: timeline.timeline_id,
378 2993 : ancestor_timeline_id,
379 2993 : ancestor_lsn,
380 2993 : disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
381 2993 : remote_consistent_lsn: remote_consistent_lsn_projected,
382 2993 : remote_consistent_lsn_visible,
383 2993 : initdb_lsn,
384 2993 : last_record_lsn,
385 2993 : prev_record_lsn: Some(timeline.get_prev_record_lsn()),
386 2993 : latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
387 2993 : current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
388 2993 : current_logical_size_is_accurate: match current_logical_size.accuracy() {
389 427 : tenant::timeline::logical_size::Accuracy::Approximate => false,
390 2566 : tenant::timeline::logical_size::Accuracy::Exact => true,
391 : },
392 2993 : current_physical_size,
393 2993 : current_logical_size_non_incremental: None,
394 2993 : timeline_dir_layer_file_size_sum: None,
395 2993 : wal_source_connstr,
396 2993 : last_received_msg_lsn,
397 2993 : last_received_msg_ts,
398 2993 : pg_version: timeline.pg_version,
399 2993 :
400 2993 : state,
401 2993 :
402 2993 : walreceiver_status,
403 2993 : };
404 2993 : Ok(info)
405 2993 : }
406 :
407 : // healthcheck handler
408 563 : async fn status_handler(
409 563 : request: Request<Body>,
410 563 : _cancel: CancellationToken,
411 563 : ) -> Result<Response<Body>, ApiError> {
412 563 : check_permission(&request, None)?;
413 563 : let config = get_config(&request);
414 563 : json_response(StatusCode::OK, StatusResponse { id: config.id })
415 563 : }
416 :
417 6 : async fn reload_auth_validation_keys_handler(
418 6 : request: Request<Body>,
419 6 : _cancel: CancellationToken,
420 6 : ) -> Result<Response<Body>, ApiError> {
421 6 : check_permission(&request, None)?;
422 6 : let config = get_config(&request);
423 6 : let state = get_state(&request);
424 6 : let Some(shared_auth) = &state.auth else {
425 UBC 0 : return json_response(StatusCode::BAD_REQUEST, ());
426 : };
427 : // unwrap is ok because check is performed when creating config, so path is set and exists
428 CBC 6 : let key_path = config.auth_validation_public_key_path.as_ref().unwrap();
429 6 : info!("Reloading public key(s) for verifying JWT tokens from {key_path:?}");
430 :
431 6 : match JwtAuth::from_key_path(key_path) {
432 6 : Ok(new_auth) => {
433 6 : shared_auth.swap(new_auth);
434 6 : json_response(StatusCode::OK, ())
435 : }
436 UBC 0 : Err(e) => {
437 0 : warn!("Error reloading public keys from {key_path:?}: {e:}");
438 0 : json_response(StatusCode::INTERNAL_SERVER_ERROR, ())
439 : }
440 : }
441 CBC 6 : }
442 :
443 784 : async fn timeline_create_handler(
444 784 : mut request: Request<Body>,
445 784 : _cancel: CancellationToken,
446 784 : ) -> Result<Response<Body>, ApiError> {
447 784 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
448 784 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
449 784 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
450 :
451 783 : let new_timeline_id = request_data.new_timeline_id;
452 783 :
453 783 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
454 783 :
455 783 : let state = get_state(&request);
456 :
457 783 : async {
458 783 : let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, false)?;
459 :
460 783 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
461 :
462 783 : match tenant.create_timeline(
463 783 : new_timeline_id,
464 783 : request_data.ancestor_timeline_id.map(TimelineId::from),
465 783 : request_data.ancestor_start_lsn,
466 783 : request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
467 783 : request_data.existing_initdb_timeline_id,
468 783 : state.broker_client.clone(),
469 783 : &ctx,
470 783 : )
471 6922135 : .await {
472 759 : Ok(new_timeline) => {
473 : // Created. Construct a TimelineInfo for it.
474 759 : let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
475 UBC 0 : .await
476 CBC 759 : .map_err(ApiError::InternalServerError)?;
477 759 : json_response(StatusCode::CREATED, timeline_info)
478 : }
479 : Err(tenant::CreateTimelineError::Conflict | tenant::CreateTimelineError::AlreadyCreating) => {
480 UBC 0 : json_response(StatusCode::CONFLICT, ())
481 : }
482 CBC 8 : Err(tenant::CreateTimelineError::AncestorLsn(err)) => {
483 8 : json_response(StatusCode::NOT_ACCEPTABLE, HttpErrorBody::from_msg(
484 8 : format!("{err:#}")
485 8 : ))
486 : }
487 6 : Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
488 6 : json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
489 : }
490 : Err(tenant::CreateTimelineError::ShuttingDown) => {
491 UBC 0 : json_response(StatusCode::SERVICE_UNAVAILABLE,HttpErrorBody::from_msg("tenant shutting down".to_string()))
492 : }
493 CBC 4 : Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
494 : }
495 777 : }
496 : .instrument(info_span!("timeline_create",
497 : tenant_id = %tenant_shard_id.tenant_id,
498 783 : shard = %tenant_shard_id.shard_slug(),
499 : timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
500 6922136 : .await
501 778 : }
502 :
503 45 : async fn timeline_list_handler(
504 45 : request: Request<Body>,
505 45 : _cancel: CancellationToken,
506 45 : ) -> Result<Response<Body>, ApiError> {
507 45 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
508 45 : let include_non_incremental_logical_size: Option<bool> =
509 45 : parse_query_param(&request, "include-non-incremental-logical-size")?;
510 45 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
511 :
512 45 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
513 :
514 45 : let response_data = async {
515 45 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
516 45 : let timelines = tenant.list_timelines();
517 45 :
518 45 : let mut response_data = Vec::with_capacity(timelines.len());
519 123 : for timeline in timelines {
520 78 : let timeline_info = build_timeline_info(
521 78 : &timeline,
522 78 : include_non_incremental_logical_size.unwrap_or(false),
523 78 : &ctx,
524 78 : )
525 78 : .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
526 UBC 0 : .await
527 CBC 78 : .context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
528 78 : .map_err(ApiError::InternalServerError)?;
529 :
530 78 : response_data.push(timeline_info);
531 : }
532 45 : Ok::<Vec<TimelineInfo>, ApiError>(response_data)
533 45 : }
534 : .instrument(info_span!("timeline_list",
535 : tenant_id = %tenant_shard_id.tenant_id,
536 45 : shard_id = %tenant_shard_id.shard_slug()))
537 UBC 0 : .await?;
538 :
539 CBC 45 : json_response(StatusCode::OK, response_data)
540 45 : }
541 :
542 2215 : async fn timeline_detail_handler(
543 2215 : request: Request<Body>,
544 2215 : _cancel: CancellationToken,
545 2215 : ) -> Result<Response<Body>, ApiError> {
546 2215 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
547 2215 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
548 2215 : let include_non_incremental_logical_size: Option<bool> =
549 2215 : parse_query_param(&request, "include-non-incremental-logical-size")?;
550 2215 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
551 :
552 : // Logical size calculation needs downloading.
553 2215 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
554 :
555 2215 : let timeline_info = async {
556 2215 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
557 :
558 2205 : let timeline = tenant
559 2205 : .get_timeline(timeline_id, false)
560 2205 : .map_err(|e| ApiError::NotFound(e.into()))?;
561 :
562 2156 : let timeline_info = build_timeline_info(
563 2156 : &timeline,
564 2156 : include_non_incremental_logical_size.unwrap_or(false),
565 2156 : &ctx,
566 2156 : )
567 2228 : .await
568 2156 : .context("get local timeline info")
569 2156 : .map_err(ApiError::InternalServerError)?;
570 :
571 2156 : Ok::<_, ApiError>(timeline_info)
572 2215 : }
573 : .instrument(info_span!("timeline_detail",
574 : tenant_id = %tenant_shard_id.tenant_id,
575 2215 : shard_id = %tenant_shard_id.shard_slug(),
576 : %timeline_id))
577 2228 : .await?;
578 :
579 2156 : json_response(StatusCode::OK, timeline_info)
580 2215 : }
581 :
582 13 : async fn get_lsn_by_timestamp_handler(
583 13 : request: Request<Body>,
584 13 : cancel: CancellationToken,
585 13 : ) -> Result<Response<Body>, ApiError> {
586 13 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
587 13 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
588 :
589 13 : if !tenant_shard_id.is_zero() {
590 : // Requires SLRU contents, which are only stored on shard zero
591 UBC 0 : return Err(ApiError::BadRequest(anyhow!(
592 0 : "Size calculations are only available on shard zero"
593 0 : )));
594 CBC 13 : }
595 :
596 13 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
597 13 : let timestamp_raw = must_get_query_param(&request, "timestamp")?;
598 13 : let timestamp = humantime::parse_rfc3339(×tamp_raw)
599 13 : .with_context(|| format!("Invalid time: {:?}", timestamp_raw))
600 13 : .map_err(ApiError::BadRequest)?;
601 13 : let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
602 13 :
603 13 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
604 13 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
605 13 : let result = timeline
606 13 : .find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
607 1315 : .await?;
608 13 : #[derive(serde::Serialize)]
609 : struct Result {
610 : lsn: Lsn,
611 : kind: &'static str,
612 : }
613 13 : let (lsn, kind) = match result {
614 10 : LsnForTimestamp::Present(lsn) => (lsn, "present"),
615 1 : LsnForTimestamp::Future(lsn) => (lsn, "future"),
616 2 : LsnForTimestamp::Past(lsn) => (lsn, "past"),
617 UBC 0 : LsnForTimestamp::NoData(lsn) => (lsn, "nodata"),
618 : };
619 CBC 13 : json_response(StatusCode::OK, Result { lsn, kind })
620 13 : }
621 :
622 12 : async fn get_timestamp_of_lsn_handler(
623 12 : request: Request<Body>,
624 12 : _cancel: CancellationToken,
625 12 : ) -> Result<Response<Body>, ApiError> {
626 12 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
627 12 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
628 :
629 12 : if !tenant_shard_id.is_zero() {
630 : // Requires SLRU contents, which are only stored on shard zero
631 UBC 0 : return Err(ApiError::BadRequest(anyhow!(
632 0 : "Size calculations are only available on shard zero"
633 0 : )));
634 CBC 12 : }
635 :
636 12 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
637 :
638 12 : let lsn_str = must_get_query_param(&request, "lsn")?;
639 12 : let lsn = Lsn::from_str(&lsn_str)
640 12 : .with_context(|| format!("Invalid LSN: {lsn_str:?}"))
641 12 : .map_err(ApiError::BadRequest)?;
642 :
643 12 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
644 12 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
645 80 : let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;
646 :
647 10 : match result {
648 10 : Some(time) => {
649 10 : let time = format_rfc3339(postgres_ffi::from_pg_timestamp(time)).to_string();
650 10 : json_response(StatusCode::OK, time)
651 : }
652 UBC 0 : None => json_response(StatusCode::NOT_FOUND, ()),
653 : }
654 CBC 12 : }
655 :
656 47 : async fn tenant_attach_handler(
657 47 : mut request: Request<Body>,
658 47 : _cancel: CancellationToken,
659 47 : ) -> Result<Response<Body>, ApiError> {
660 47 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
661 47 : check_permission(&request, Some(tenant_id))?;
662 :
663 47 : let maybe_body: Option<TenantAttachRequest> = json_request_or_empty_body(&mut request).await?;
664 44 : let tenant_conf = match &maybe_body {
665 44 : Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?,
666 UBC 0 : None => TenantConfOpt::default(),
667 : };
668 :
669 CBC 44 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
670 :
671 44 : info!("Handling tenant attach {tenant_id}");
672 :
673 44 : let state = get_state(&request);
674 :
675 44 : let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
676 :
677 44 : if state.remote_storage.is_none() {
678 UBC 0 : return Err(ApiError::BadRequest(anyhow!(
679 0 : "attach_tenant is not possible because pageserver was configured without remote storage"
680 0 : )));
681 CBC 44 : }
682 44 :
683 44 : mgr::attach_tenant(
684 44 : state.conf,
685 44 : tenant_id,
686 44 : generation,
687 44 : tenant_conf,
688 44 : state.tenant_resources(),
689 44 : &ctx,
690 44 : )
691 44 : .instrument(info_span!("tenant_attach", %tenant_id))
692 58 : .await?;
693 :
694 29 : json_response(StatusCode::ACCEPTED, ())
695 47 : }
696 :
697 61 : async fn timeline_delete_handler(
698 61 : request: Request<Body>,
699 61 : _cancel: CancellationToken,
700 61 : ) -> Result<Response<Body>, ApiError> {
701 61 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
702 61 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
703 61 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
704 :
705 61 : let state = get_state(&request);
706 :
707 61 : let tenant = state
708 61 : .tenant_manager
709 61 : .get_attached_tenant_shard(tenant_shard_id, false)
710 61 : .map_err(|e| {
711 1 : match e {
712 : // GetTenantError has a built-in conversion to ApiError, but in this context we don't
713 : // want to treat missing tenants as 404, to avoid ambiguity with successful deletions.
714 1 : GetTenantError::NotFound(_) => ApiError::PreconditionFailed(
715 1 : "Requested tenant is missing".to_string().into_boxed_str(),
716 1 : ),
717 UBC 0 : e => e.into(),
718 : }
719 CBC 61 : })?;
720 60 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
721 60 : tenant.delete_timeline(timeline_id).instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id))
722 346 : .await?;
723 :
724 51 : json_response(StatusCode::ACCEPTED, ())
725 61 : }
726 :
727 32 : async fn tenant_detach_handler(
728 32 : request: Request<Body>,
729 32 : _cancel: CancellationToken,
730 32 : ) -> Result<Response<Body>, ApiError> {
731 32 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
732 32 : check_permission(&request, Some(tenant_id))?;
733 32 : let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
734 :
735 : // This is a legacy API (`/location_conf` is the replacement). It only supports unsharded tenants
736 32 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
737 32 :
738 32 : let state = get_state(&request);
739 32 : let conf = state.conf;
740 32 : mgr::detach_tenant(
741 32 : conf,
742 32 : tenant_shard_id,
743 32 : detach_ignored.unwrap_or(false),
744 32 : &state.deletion_queue_client,
745 32 : )
746 32 : .instrument(info_span!("tenant_detach", %tenant_id))
747 151 : .await?;
748 :
749 29 : json_response(StatusCode::OK, ())
750 32 : }
751 :
752 2 : async fn tenant_reset_handler(
753 2 : request: Request<Body>,
754 2 : _cancel: CancellationToken,
755 2 : ) -> Result<Response<Body>, ApiError> {
756 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
757 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
758 :
759 2 : let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
760 :
761 2 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
762 2 : let state = get_state(&request);
763 2 : state
764 2 : .tenant_manager
765 2 : .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx)
766 6 : .await
767 2 : .map_err(ApiError::InternalServerError)?;
768 :
769 2 : json_response(StatusCode::OK, ())
770 2 : }
771 :
772 5 : async fn tenant_load_handler(
773 5 : mut request: Request<Body>,
774 5 : _cancel: CancellationToken,
775 5 : ) -> Result<Response<Body>, ApiError> {
776 5 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
777 5 : check_permission(&request, Some(tenant_id))?;
778 :
779 5 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
780 :
781 5 : let maybe_body: Option<TenantLoadRequest> = json_request_or_empty_body(&mut request).await?;
782 :
783 5 : let state = get_state(&request);
784 :
785 : // The /load request is only usable when control_plane_api is not set. Once it is set, callers
786 : // should always use /attach instead.
787 5 : let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
788 :
789 5 : mgr::load_tenant(
790 5 : state.conf,
791 5 : tenant_id,
792 5 : generation,
793 5 : state.broker_client.clone(),
794 5 : state.remote_storage.clone(),
795 5 : state.deletion_queue_client.clone(),
796 5 : &ctx,
797 5 : )
798 5 : .instrument(info_span!("load", %tenant_id))
799 8 : .await?;
800 :
801 4 : json_response(StatusCode::ACCEPTED, ())
802 5 : }
803 :
804 6 : async fn tenant_ignore_handler(
805 6 : request: Request<Body>,
806 6 : _cancel: CancellationToken,
807 6 : ) -> Result<Response<Body>, ApiError> {
808 6 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
809 6 : check_permission(&request, Some(tenant_id))?;
810 :
811 6 : let state = get_state(&request);
812 6 : let conf = state.conf;
813 6 : mgr::ignore_tenant(conf, tenant_id)
814 6 : .instrument(info_span!("ignore_tenant", %tenant_id))
815 21 : .await?;
816 :
817 6 : json_response(StatusCode::OK, ())
818 6 : }
819 :
820 165 : async fn tenant_list_handler(
821 165 : request: Request<Body>,
822 165 : _cancel: CancellationToken,
823 165 : ) -> Result<Response<Body>, ApiError> {
824 165 : check_permission(&request, None)?;
825 :
826 165 : let response_data = mgr::list_tenants()
827 165 : .instrument(info_span!("tenant_list"))
828 UBC 0 : .await
829 CBC 165 : .map_err(|_| {
830 UBC 0 : ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
831 CBC 165 : })?
832 165 : .iter()
833 165 : .map(|(id, state)| TenantInfo {
834 160 : id: *id,
835 160 : state: state.clone(),
836 160 : current_physical_size: None,
837 160 : attachment_status: state.attachment_status(),
838 165 : })
839 165 : .collect::<Vec<TenantInfo>>();
840 165 :
841 165 : json_response(StatusCode::OK, response_data)
842 165 : }
843 :
844 527 : async fn tenant_status(
845 527 : request: Request<Body>,
846 527 : _cancel: CancellationToken,
847 527 : ) -> Result<Response<Body>, ApiError> {
848 527 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
849 527 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
850 :
851 527 : let tenant_info = async {
852 527 : let tenant = mgr::get_tenant(tenant_shard_id, false)?;
853 :
854 : // Calculate total physical size of all timelines
855 466 : let mut current_physical_size = 0;
856 466 : for timeline in tenant.list_timelines().iter() {
857 450 : current_physical_size += timeline.layer_size_sum().await;
858 : }
859 :
860 466 : let state = tenant.current_state();
861 466 : Result::<_, ApiError>::Ok(TenantDetails {
862 466 : tenant_info: TenantInfo {
863 466 : id: tenant_shard_id,
864 466 : state: state.clone(),
865 466 : current_physical_size: Some(current_physical_size),
866 466 : attachment_status: state.attachment_status(),
867 466 : },
868 466 : timelines: tenant.list_timeline_ids(),
869 466 : })
870 527 : }
871 : .instrument(info_span!("tenant_status_handler",
872 : tenant_id = %tenant_shard_id.tenant_id,
873 527 : shard_id = %tenant_shard_id.shard_slug()))
874 61 : .await?;
875 :
876 466 : json_response(StatusCode::OK, tenant_info)
877 527 : }
878 :
879 91 : async fn tenant_delete_handler(
880 91 : request: Request<Body>,
881 91 : _cancel: CancellationToken,
882 91 : ) -> Result<Response<Body>, ApiError> {
883 : // TODO openapi spec
884 91 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
885 91 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
886 :
887 91 : let state = get_state(&request);
888 91 :
889 91 : state
890 91 : .tenant_manager
891 91 : .delete_tenant(tenant_shard_id, ACTIVE_TENANT_TIMEOUT)
892 : .instrument(info_span!("tenant_delete_handler",
893 : tenant_id = %tenant_shard_id.tenant_id,
894 91 : shard = %tenant_shard_id.shard_slug()
895 : ))
896 448 : .await?;
897 :
898 72 : json_response(StatusCode::ACCEPTED, ())
899 91 : }
900 :
901 : /// HTTP endpoint to query the current tenant_size of a tenant.
902 : ///
903 : /// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used
904 : /// to debug any of the calculations. Requires `tenant_id` request parameter, supports
905 : /// `inputs_only=true|false` (default false) which supports debugging failure to calculate model
906 : /// values.
907 : ///
908 : /// 'retention_period' query parameter overrides the cutoff that is used to calculate the size
909 : /// (only if it is shorter than the real cutoff).
910 : ///
911 : /// Note: we don't update the cached size and prometheus metric here.
912 : /// The retention period might be different, and it's nice to have a method to just calculate it
913 : /// without modifying anything anyway.
914 55 : async fn tenant_size_handler(
915 55 : request: Request<Body>,
916 55 : cancel: CancellationToken,
917 55 : ) -> Result<Response<Body>, ApiError> {
918 55 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
919 55 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
920 55 : let inputs_only: Option<bool> = parse_query_param(&request, "inputs_only")?;
921 55 : let retention_period: Option<u64> = parse_query_param(&request, "retention_period")?;
922 55 : let headers = request.headers();
923 55 :
924 55 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
925 55 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
926 :
927 55 : if !tenant_shard_id.is_zero() {
928 UBC 0 : return Err(ApiError::BadRequest(anyhow!(
929 0 : "Size calculations are only available on shard zero"
930 0 : )));
931 CBC 55 : }
932 :
933 : // this can be long operation
934 55 : let inputs = tenant
935 55 : .gather_size_inputs(
936 55 : retention_period,
937 55 : LogicalSizeCalculationCause::TenantSizeHandler,
938 55 : &cancel,
939 55 : &ctx,
940 55 : )
941 45 : .await
942 55 : .map_err(ApiError::InternalServerError)?;
943 :
944 55 : let mut sizes = None;
945 55 : let accepts_html = headers
946 55 : .get(header::ACCEPT)
947 55 : .map(|v| v == "text/html")
948 55 : .unwrap_or_default();
949 55 : if !inputs_only.unwrap_or(false) {
950 55 : let storage_model = inputs
951 55 : .calculate_model()
952 55 : .map_err(ApiError::InternalServerError)?;
953 55 : let size = storage_model.calculate();
954 55 :
955 55 : // If request header expects html, return html
956 55 : if accepts_html {
957 19 : return synthetic_size_html_response(inputs, storage_model, size);
958 36 : }
959 36 : sizes = Some(size);
960 UBC 0 : } else if accepts_html {
961 0 : return Err(ApiError::BadRequest(anyhow!(
962 0 : "inputs_only parameter is incompatible with html output request"
963 0 : )));
964 0 : }
965 :
966 : /// The type resides in the pageserver not to expose `ModelInputs`.
967 CBC 36 : #[derive(serde::Serialize)]
968 : struct TenantHistorySize {
969 : id: TenantId,
970 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
971 : ///
972 : /// Will be none if `?inputs_only=true` was given.
973 : size: Option<u64>,
974 : /// Size of each segment used in the model.
975 : /// Will be null if `?inputs_only=true` was given.
976 : segment_sizes: Option<Vec<tenant_size_model::SegmentSizeResult>>,
977 : inputs: crate::tenant::size::ModelInputs,
978 : }
979 :
980 36 : json_response(
981 36 : StatusCode::OK,
982 36 : TenantHistorySize {
983 36 : id: tenant_shard_id.tenant_id,
984 36 : size: sizes.as_ref().map(|x| x.total_size),
985 36 : segment_sizes: sizes.map(|x| x.segments),
986 36 : inputs,
987 36 : },
988 36 : )
989 55 : }
990 :
991 83 : async fn layer_map_info_handler(
992 83 : request: Request<Body>,
993 83 : _cancel: CancellationToken,
994 83 : ) -> Result<Response<Body>, ApiError> {
995 83 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
996 83 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
997 83 : let reset: LayerAccessStatsReset =
998 83 : parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset);
999 83 :
1000 83 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1001 :
1002 83 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1003 83 : let layer_map_info = timeline.layer_map_info(reset).await;
1004 :
1005 83 : json_response(StatusCode::OK, layer_map_info)
1006 83 : }
1007 :
1008 2 : async fn layer_download_handler(
1009 2 : request: Request<Body>,
1010 2 : _cancel: CancellationToken,
1011 2 : ) -> Result<Response<Body>, ApiError> {
1012 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1013 2 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1014 2 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
1015 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1016 :
1017 2 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1018 2 : let downloaded = timeline
1019 2 : .download_layer(layer_file_name)
1020 6 : .await
1021 2 : .map_err(ApiError::InternalServerError)?;
1022 :
1023 2 : match downloaded {
1024 2 : Some(true) => json_response(StatusCode::OK, ()),
1025 UBC 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
1026 0 : None => json_response(
1027 0 : StatusCode::BAD_REQUEST,
1028 0 : format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"),
1029 0 : ),
1030 : }
1031 CBC 2 : }
1032 :
1033 2245 : async fn evict_timeline_layer_handler(
1034 2245 : request: Request<Body>,
1035 2245 : _cancel: CancellationToken,
1036 2245 : ) -> Result<Response<Body>, ApiError> {
1037 2245 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1038 2245 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1039 2245 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1040 2245 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
1041 :
1042 2245 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1043 2245 : let evicted = timeline
1044 2245 : .evict_layer(layer_file_name)
1045 2181 : .await
1046 2245 : .map_err(ApiError::InternalServerError)?;
1047 :
1048 2245 : match evicted {
1049 2245 : Some(true) => json_response(StatusCode::OK, ()),
1050 UBC 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
1051 0 : None => json_response(
1052 0 : StatusCode::BAD_REQUEST,
1053 0 : format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"),
1054 0 : ),
1055 : }
1056 CBC 2245 : }
1057 :
1058 : /// Get tenant_size SVG graph along with the JSON data.
1059 19 : fn synthetic_size_html_response(
1060 19 : inputs: ModelInputs,
1061 19 : storage_model: StorageModel,
1062 19 : sizes: SizeResult,
1063 19 : ) -> Result<Response<Body>, ApiError> {
1064 19 : let mut timeline_ids: Vec<String> = Vec::new();
1065 19 : let mut timeline_map: HashMap<TimelineId, usize> = HashMap::new();
1066 27 : for (index, ti) in inputs.timeline_inputs.iter().enumerate() {
1067 27 : timeline_map.insert(ti.timeline_id, index);
1068 27 : timeline_ids.push(ti.timeline_id.to_string());
1069 27 : }
1070 19 : let seg_to_branch: Vec<usize> = inputs
1071 19 : .segments
1072 19 : .iter()
1073 68 : .map(|seg| *timeline_map.get(&seg.timeline_id).unwrap())
1074 19 : .collect();
1075 :
1076 19 : let svg =
1077 19 : tenant_size_model::svg::draw_svg(&storage_model, &timeline_ids, &seg_to_branch, &sizes)
1078 19 : .map_err(ApiError::InternalServerError)?;
1079 :
1080 19 : let mut response = String::new();
1081 19 :
1082 19 : use std::fmt::Write;
1083 19 : write!(response, "<html>\n<body>\n").unwrap();
1084 19 : write!(response, "<div>\n{svg}\n</div>").unwrap();
1085 19 : writeln!(response, "Project size: {}", sizes.total_size).unwrap();
1086 19 : writeln!(response, "<pre>").unwrap();
1087 19 : writeln!(
1088 19 : response,
1089 19 : "{}",
1090 19 : serde_json::to_string_pretty(&inputs).unwrap()
1091 19 : )
1092 19 : .unwrap();
1093 19 : writeln!(
1094 19 : response,
1095 19 : "{}",
1096 19 : serde_json::to_string_pretty(&sizes.segments).unwrap()
1097 19 : )
1098 19 : .unwrap();
1099 19 : writeln!(response, "</pre>").unwrap();
1100 19 : write!(response, "</body>\n</html>\n").unwrap();
1101 19 :
1102 19 : html_response(StatusCode::OK, response)
1103 19 : }
1104 :
1105 19 : pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>, ApiError> {
1106 19 : let response = Response::builder()
1107 19 : .status(status)
1108 19 : .header(header::CONTENT_TYPE, "text/html")
1109 19 : .body(Body::from(data.as_bytes().to_vec()))
1110 19 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1111 19 : Ok(response)
1112 19 : }
1113 :
1114 : /// Helper for requests that may take a generation, which is mandatory
1115 : /// when control_plane_api is set, but otherwise defaults to Generation::none()
1116 480 : fn get_request_generation(state: &State, req_gen: Option<u32>) -> Result<Generation, ApiError> {
1117 480 : if state.conf.control_plane_api.is_some() {
1118 479 : req_gen
1119 479 : .map(Generation::new)
1120 479 : .ok_or(ApiError::BadRequest(anyhow!(
1121 479 : "generation attribute missing"
1122 479 : )))
1123 : } else {
1124 : // Legacy mode: all tenants operate with no generation
1125 1 : Ok(Generation::none())
1126 : }
1127 480 : }
1128 :
1129 432 : async fn tenant_create_handler(
1130 432 : mut request: Request<Body>,
1131 432 : _cancel: CancellationToken,
1132 432 : ) -> Result<Response<Body>, ApiError> {
1133 432 : let request_data: TenantCreateRequest = json_request(&mut request).await?;
1134 432 : let target_tenant_id = request_data.new_tenant_id;
1135 432 : check_permission(&request, None)?;
1136 :
1137 431 : let _timer = STORAGE_TIME_GLOBAL
1138 431 : .get_metric_with_label_values(&[StorageTimeOperation::CreateTenant.into()])
1139 431 : .expect("bug")
1140 431 : .start_timer();
1141 :
1142 431 : let tenant_conf =
1143 431 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1144 :
1145 431 : let state = get_state(&request);
1146 :
1147 431 : let generation = get_request_generation(state, request_data.generation)?;
1148 :
1149 431 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1150 :
1151 431 : let new_tenant = mgr::create_tenant(
1152 431 : state.conf,
1153 431 : tenant_conf,
1154 431 : target_tenant_id,
1155 431 : generation,
1156 431 : state.tenant_resources(),
1157 431 : &ctx,
1158 431 : )
1159 431 : .instrument(info_span!("tenant_create", tenant_id = %target_tenant_id))
1160 861 : .await?;
1161 :
1162 : // We created the tenant. Existing API semantics are that the tenant
1163 : // is Active when this function returns.
1164 430 : if let res @ Err(_) = new_tenant
1165 430 : .wait_to_become_active(ACTIVE_TENANT_TIMEOUT)
1166 601 : .await
1167 : {
1168 : // This shouldn't happen because we just created the tenant directory
1169 : // in tenant::mgr::create_tenant, and there aren't any remote timelines
1170 : // to load, so, nothing can really fail during load.
1171 : // Don't do cleanup because we don't know how we got here.
1172 : // The tenant will likely be in `Broken` state and subsequent
1173 : // calls will fail.
1174 UBC 0 : res.context("created tenant failed to become active")
1175 0 : .map_err(ApiError::InternalServerError)?;
1176 CBC 430 : }
1177 :
1178 430 : json_response(
1179 430 : StatusCode::CREATED,
1180 430 : TenantCreateResponse(new_tenant.tenant_id()),
1181 430 : )
1182 432 : }
1183 :
1184 45 : async fn get_tenant_config_handler(
1185 45 : request: Request<Body>,
1186 45 : _cancel: CancellationToken,
1187 45 : ) -> Result<Response<Body>, ApiError> {
1188 45 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1189 45 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1190 :
1191 45 : let tenant = mgr::get_tenant(tenant_shard_id, false)?;
1192 :
1193 45 : let response = HashMap::from([
1194 : (
1195 : "tenant_specific_overrides",
1196 45 : serde_json::to_value(tenant.tenant_specific_overrides())
1197 45 : .context("serializing tenant specific overrides")
1198 45 : .map_err(ApiError::InternalServerError)?,
1199 : ),
1200 : (
1201 45 : "effective_config",
1202 45 : serde_json::to_value(tenant.effective_config())
1203 45 : .context("serializing effective config")
1204 45 : .map_err(ApiError::InternalServerError)?,
1205 : ),
1206 : ]);
1207 :
1208 45 : json_response(StatusCode::OK, response)
1209 45 : }
1210 :
1211 30 : async fn update_tenant_config_handler(
1212 30 : mut request: Request<Body>,
1213 30 : _cancel: CancellationToken,
1214 30 : ) -> Result<Response<Body>, ApiError> {
1215 30 : let request_data: TenantConfigRequest = json_request(&mut request).await?;
1216 30 : let tenant_id = request_data.tenant_id;
1217 30 : check_permission(&request, Some(tenant_id))?;
1218 :
1219 30 : let tenant_conf =
1220 30 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1221 :
1222 30 : let state = get_state(&request);
1223 30 : mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
1224 30 : .instrument(info_span!("tenant_config", %tenant_id))
1225 60 : .await?;
1226 :
1227 30 : json_response(StatusCode::OK, ())
1228 30 : }
1229 :
1230 162 : async fn put_tenant_location_config_handler(
1231 162 : mut request: Request<Body>,
1232 162 : _cancel: CancellationToken,
1233 162 : ) -> Result<Response<Body>, ApiError> {
1234 162 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1235 :
1236 162 : let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
1237 162 : let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis);
1238 162 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1239 :
1240 162 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1241 162 : let state = get_state(&request);
1242 162 : let conf = state.conf;
1243 162 :
1244 162 : // The `Detached` state is special, it doesn't upsert a tenant, it removes
1245 162 : // its local disk content and drops it from memory.
1246 162 : if let LocationConfigMode::Detached = request_data.config.mode {
1247 9 : if let Err(e) =
1248 45 : mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
1249 : .instrument(info_span!("tenant_detach",
1250 : tenant_id = %tenant_shard_id.tenant_id,
1251 45 : shard = %tenant_shard_id.shard_slug()
1252 : ))
1253 175 : .await
1254 : {
1255 9 : match e {
1256 9 : TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
1257 9 : // This API is idempotent: a NotFound on a detach is fine.
1258 9 : }
1259 UBC 0 : _ => return Err(e.into()),
1260 : }
1261 CBC 36 : }
1262 45 : return json_response(StatusCode::OK, ());
1263 117 : }
1264 :
1265 117 : let location_conf =
1266 117 : LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1267 :
1268 117 : state
1269 117 : .tenant_manager
1270 117 : .upsert_location(tenant_shard_id, location_conf, flush, &ctx)
1271 402 : .await
1272 : // TODO: badrequest assumes the caller was asking for something unreasonable, but in
1273 : // principle we might have hit something like concurrent API calls to the same tenant,
1274 : // which is not a 400 but a 409.
1275 117 : .map_err(ApiError::BadRequest)?;
1276 :
1277 117 : if let Some(_flush_ms) = flush {
1278 1 : match state
1279 1 : .secondary_controller
1280 1 : .upload_tenant(tenant_shard_id)
1281 1 : .await
1282 : {
1283 : Ok(()) => {
1284 1 : tracing::info!("Uploaded heatmap during flush");
1285 : }
1286 UBC 0 : Err(e) => {
1287 0 : tracing::warn!("Failed to flush heatmap: {e}");
1288 : }
1289 : }
1290 : } else {
1291 CBC 116 : tracing::info!("No flush requested when configuring");
1292 : }
1293 :
1294 117 : json_response(StatusCode::OK, ())
1295 162 : }
1296 :
1297 : /// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
1298 2 : async fn handle_tenant_break(
1299 2 : r: Request<Body>,
1300 2 : _cancel: CancellationToken,
1301 2 : ) -> Result<Response<Body>, ApiError> {
1302 2 : let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
1303 :
1304 2 : let tenant = crate::tenant::mgr::get_tenant(tenant_shard_id, true)
1305 2 : .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
1306 :
1307 2 : tenant.set_broken("broken from test".to_owned()).await;
1308 :
1309 2 : json_response(StatusCode::OK, ())
1310 2 : }
1311 :
1312 : // Run GC immediately on given timeline.
1313 322 : async fn timeline_gc_handler(
1314 322 : mut request: Request<Body>,
1315 322 : cancel: CancellationToken,
1316 322 : ) -> Result<Response<Body>, ApiError> {
1317 322 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1318 322 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1319 322 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1320 :
1321 322 : let gc_req: TimelineGcRequest = json_request(&mut request).await?;
1322 :
1323 322 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1324 321 : let wait_task_done =
1325 322 : mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
1326 321 : let gc_result = wait_task_done
1327 321 : .await
1328 321 : .context("wait for gc task")
1329 321 : .map_err(ApiError::InternalServerError)?
1330 321 : .map_err(ApiError::InternalServerError)?;
1331 :
1332 320 : json_response(StatusCode::OK, gc_result)
1333 322 : }
1334 :
1335 : // Run compaction immediately on given timeline.
1336 124 : async fn timeline_compact_handler(
1337 124 : request: Request<Body>,
1338 124 : cancel: CancellationToken,
1339 124 : ) -> Result<Response<Body>, ApiError> {
1340 124 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1341 124 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1342 124 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1343 :
1344 124 : let mut flags = EnumSet::empty();
1345 124 : if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
1346 2 : flags |= CompactFlags::ForceRepartition;
1347 122 : }
1348 124 : async {
1349 124 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1350 124 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1351 124 : timeline
1352 124 : .compact(&cancel, flags, &ctx)
1353 18147 : .await
1354 124 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1355 124 : json_response(StatusCode::OK, ())
1356 124 : }
1357 124 : .instrument(info_span!("manual_compaction", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1358 18147 : .await
1359 124 : }
1360 :
1361 : // Run checkpoint immediately on given timeline.
1362 576 : async fn timeline_checkpoint_handler(
1363 576 : request: Request<Body>,
1364 576 : cancel: CancellationToken,
1365 576 : ) -> Result<Response<Body>, ApiError> {
1366 576 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1367 576 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1368 576 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1369 :
1370 576 : let mut flags = EnumSet::empty();
1371 576 : if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
1372 2 : flags |= CompactFlags::ForceRepartition;
1373 574 : }
1374 576 : async {
1375 576 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1376 576 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1377 576 : timeline
1378 576 : .freeze_and_flush()
1379 696 : .await
1380 576 : .map_err(ApiError::InternalServerError)?;
1381 576 : timeline
1382 576 : .compact(&cancel, flags, &ctx)
1383 341190 : .await
1384 575 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1385 :
1386 575 : json_response(StatusCode::OK, ())
1387 575 : }
1388 576 : .instrument(info_span!("manual_checkpoint", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1389 341886 : .await
1390 575 : }
1391 :
1392 2 : async fn timeline_download_remote_layers_handler_post(
1393 2 : mut request: Request<Body>,
1394 2 : _cancel: CancellationToken,
1395 2 : ) -> Result<Response<Body>, ApiError> {
1396 2 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1397 2 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1398 2 : let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?;
1399 2 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1400 :
1401 2 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1402 2 : match timeline.spawn_download_all_remote_layers(body).await {
1403 2 : Ok(st) => json_response(StatusCode::ACCEPTED, st),
1404 UBC 0 : Err(st) => json_response(StatusCode::CONFLICT, st),
1405 : }
1406 CBC 2 : }
1407 :
1408 57 : async fn timeline_download_remote_layers_handler_get(
1409 57 : request: Request<Body>,
1410 57 : _cancel: CancellationToken,
1411 57 : ) -> Result<Response<Body>, ApiError> {
1412 57 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1413 57 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1414 57 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1415 :
1416 57 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1417 57 : let info = timeline
1418 57 : .get_download_all_remote_layers_task_info()
1419 57 : .context("task never started since last pageserver process start")
1420 57 : .map_err(|e| ApiError::NotFound(e.into()))?;
1421 57 : json_response(StatusCode::OK, info)
1422 57 : }
1423 :
1424 19 : async fn deletion_queue_flush(
1425 19 : r: Request<Body>,
1426 19 : cancel: CancellationToken,
1427 19 : ) -> Result<Response<Body>, ApiError> {
1428 19 : let state = get_state(&r);
1429 19 :
1430 19 : if state.remote_storage.is_none() {
1431 : // Nothing to do if remote storage is disabled.
1432 UBC 0 : return json_response(StatusCode::OK, ());
1433 CBC 19 : }
1434 :
1435 19 : let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
1436 19 :
1437 19 : let flush = async {
1438 19 : if execute {
1439 33 : state.deletion_queue_client.flush_execute().await
1440 : } else {
1441 8 : state.deletion_queue_client.flush().await
1442 : }
1443 19 : }
1444 : // DeletionQueueError's only case is shutting down.
1445 19 : .map_err(|_| ApiError::ShuttingDown);
1446 19 :
1447 60 : tokio::select! {
1448 19 : res = flush => {
1449 19 : res.map(|()| json_response(StatusCode::OK, ()))?
1450 : }
1451 : _ = cancel.cancelled() => {
1452 : Err(ApiError::ShuttingDown)
1453 : }
1454 : }
1455 19 : }
1456 :
1457 : /// Try if `GetPage@Lsn` is successful, useful for manual debugging.
1458 UBC 0 : async fn getpage_at_lsn_handler(
1459 0 : request: Request<Body>,
1460 0 : _cancel: CancellationToken,
1461 0 : ) -> Result<Response<Body>, ApiError> {
1462 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1463 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1464 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1465 :
1466 : struct Key(crate::repository::Key);
1467 :
1468 : impl std::str::FromStr for Key {
1469 : type Err = anyhow::Error;
1470 :
1471 0 : fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
1472 0 : crate::repository::Key::from_hex(s).map(Key)
1473 0 : }
1474 : }
1475 :
1476 0 : let key: Key = parse_query_param(&request, "key")?
1477 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
1478 0 : let lsn: Lsn = parse_query_param(&request, "lsn")?
1479 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
1480 :
1481 0 : async {
1482 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1483 0 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1484 :
1485 0 : let page = timeline.get(key.0, lsn, &ctx).await?;
1486 :
1487 0 : Result::<_, ApiError>::Ok(
1488 0 : Response::builder()
1489 0 : .status(StatusCode::OK)
1490 0 : .header(header::CONTENT_TYPE, "application/octet-stream")
1491 0 : .body(hyper::Body::from(page))
1492 0 : .unwrap(),
1493 0 : )
1494 0 : }
1495 0 : .instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1496 0 : .await
1497 0 : }
1498 :
1499 0 : async fn timeline_collect_keyspace(
1500 0 : request: Request<Body>,
1501 0 : _cancel: CancellationToken,
1502 0 : ) -> Result<Response<Body>, ApiError> {
1503 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1504 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1505 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1506 :
1507 0 : let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
1508 :
1509 0 : async {
1510 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1511 0 : let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?;
1512 0 : let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
1513 0 : let keys = timeline
1514 0 : .collect_keyspace(at_lsn, &ctx)
1515 0 : .await
1516 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1517 :
1518 0 : let res = pageserver_api::models::partitioning::Partitioning { keys, at_lsn };
1519 0 :
1520 0 : json_response(StatusCode::OK, res)
1521 0 : }
1522 0 : .instrument(info_span!("timeline_collect_keyspace", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1523 0 : .await
1524 0 : }
1525 :
1526 CBC 3114 : async fn active_timeline_of_active_tenant(
1527 3114 : tenant_shard_id: TenantShardId,
1528 3114 : timeline_id: TimelineId,
1529 3114 : ) -> Result<Arc<Timeline>, ApiError> {
1530 3114 : let tenant = mgr::get_tenant(tenant_shard_id, true)?;
1531 3114 : tenant
1532 3114 : .get_timeline(timeline_id, true)
1533 3114 : .map_err(|e| ApiError::NotFound(e.into()))
1534 3114 : }
1535 :
1536 UBC 0 : async fn always_panic_handler(
1537 0 : req: Request<Body>,
1538 0 : _cancel: CancellationToken,
1539 0 : ) -> Result<Response<Body>, ApiError> {
1540 0 : // Deliberately cause a panic to exercise the panic hook registered via std::panic::set_hook().
1541 0 : // For pageserver, the relevant panic hook is `tracing_panic_hook` , and the `sentry` crate's wrapper around it.
1542 0 : // Use catch_unwind to ensure that tokio nor hyper are distracted by our panic.
1543 0 : let query = req.uri().query();
1544 0 : let _ = std::panic::catch_unwind(|| {
1545 0 : panic!("unconditional panic for testing panic hook integration; request query: {query:?}")
1546 0 : });
1547 0 : json_response(StatusCode::NO_CONTENT, ())
1548 0 : }
1549 :
1550 CBC 10 : async fn disk_usage_eviction_run(
1551 10 : mut r: Request<Body>,
1552 10 : cancel: CancellationToken,
1553 10 : ) -> Result<Response<Body>, ApiError> {
1554 10 : check_permission(&r, None)?;
1555 :
1556 48 : #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
1557 : struct Config {
1558 : /// How many bytes to evict before reporting that pressure is relieved.
1559 : evict_bytes: u64,
1560 :
1561 : #[serde(default)]
1562 : eviction_order: crate::disk_usage_eviction_task::EvictionOrder,
1563 : }
1564 :
1565 49 : #[derive(Debug, Clone, Copy, serde::Serialize)]
1566 : struct Usage {
1567 : // remains unchanged after instantiation of the struct
1568 : evict_bytes: u64,
1569 : // updated by `add_available_bytes`
1570 : freed_bytes: u64,
1571 : }
1572 :
1573 : impl crate::disk_usage_eviction_task::Usage for Usage {
1574 232 : fn has_pressure(&self) -> bool {
1575 232 : self.evict_bytes > self.freed_bytes
1576 232 : }
1577 :
1578 430 : fn add_available_bytes(&mut self, bytes: u64) {
1579 430 : self.freed_bytes += bytes;
1580 430 : }
1581 : }
1582 :
1583 10 : let config = json_request::<Config>(&mut r).await?;
1584 :
1585 10 : let usage = Usage {
1586 10 : evict_bytes: config.evict_bytes,
1587 10 : freed_bytes: 0,
1588 10 : };
1589 10 :
1590 10 : let state = get_state(&r);
1591 :
1592 10 : let Some(storage) = state.remote_storage.as_ref() else {
1593 UBC 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1594 0 : "remote storage not configured, cannot run eviction iteration"
1595 0 : )));
1596 : };
1597 :
1598 CBC 10 : let state = state.disk_usage_eviction_state.clone();
1599 :
1600 10 : let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
1601 10 : &state,
1602 10 : storage,
1603 10 : usage,
1604 10 : config.eviction_order,
1605 10 : &cancel,
1606 10 : )
1607 273 : .await;
1608 :
1609 10 : info!(?res, "disk_usage_eviction_task_iteration_impl finished");
1610 :
1611 10 : let res = res.map_err(ApiError::InternalServerError)?;
1612 :
1613 10 : json_response(StatusCode::OK, res)
1614 10 : }
1615 :
1616 5 : async fn secondary_upload_handler(
1617 5 : request: Request<Body>,
1618 5 : _cancel: CancellationToken,
1619 5 : ) -> Result<Response<Body>, ApiError> {
1620 5 : let state = get_state(&request);
1621 5 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1622 5 : state
1623 5 : .secondary_controller
1624 5 : .upload_tenant(tenant_shard_id)
1625 5 : .await
1626 5 : .map_err(ApiError::InternalServerError)?;
1627 :
1628 5 : json_response(StatusCode::OK, ())
1629 5 : }
1630 :
1631 4 : async fn secondary_download_handler(
1632 4 : request: Request<Body>,
1633 4 : _cancel: CancellationToken,
1634 4 : ) -> Result<Response<Body>, ApiError> {
1635 4 : let state = get_state(&request);
1636 4 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1637 4 : state
1638 4 : .secondary_controller
1639 4 : .download_tenant(tenant_shard_id)
1640 4 : .await
1641 4 : .map_err(ApiError::InternalServerError)?;
1642 :
1643 4 : json_response(StatusCode::OK, ())
1644 4 : }
1645 :
1646 UBC 0 : async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
1647 0 : json_response(
1648 0 : StatusCode::NOT_FOUND,
1649 0 : HttpErrorBody::from_msg("page not found".to_owned()),
1650 0 : )
1651 0 : }
1652 :
1653 CBC 5 : async fn post_tracing_event_handler(
1654 5 : mut r: Request<Body>,
1655 5 : _cancel: CancellationToken,
1656 5 : ) -> Result<Response<Body>, ApiError> {
1657 10 : #[derive(Debug, serde::Deserialize)]
1658 : #[serde(rename_all = "lowercase")]
1659 : enum Level {
1660 : Error,
1661 : Warn,
1662 : Info,
1663 : Debug,
1664 : Trace,
1665 : }
1666 25 : #[derive(Debug, serde::Deserialize)]
1667 : struct Request {
1668 : level: Level,
1669 : message: String,
1670 : }
1671 5 : let body: Request = json_request(&mut r)
1672 GBC 3 : .await
1673 CBC 5 : .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
1674 :
1675 5 : match body.level {
1676 2 : Level::Error => tracing::error!(?body.message),
1677 2 : Level::Warn => tracing::warn!(?body.message),
1678 2 : Level::Info => tracing::info!(?body.message),
1679 1 : Level::Debug => tracing::debug!(?body.message),
1680 1 : Level::Trace => tracing::trace!(?body.message),
1681 : }
1682 :
1683 5 : json_response(StatusCode::OK, ())
1684 5 : }
1685 :
1686 : /// Common functionality of all the HTTP API handlers.
1687 : ///
1688 : /// - Adds a tracing span to each request (by `request_span`)
1689 : /// - Logs the request depending on the request method (by `request_span`)
1690 : /// - Logs the response if it was not successful (by `request_span`
1691 : /// - Shields the handler function from async cancellations. Hyper can drop the handler
1692 : /// Future if the connection to the client is lost, but most of the pageserver code is
1693 : /// not async cancellation safe. This converts the dropped future into a graceful cancellation
1694 : /// request with a CancellationToken.
1695 8928 : async fn api_handler<R, H>(request: Request<Body>, handler: H) -> Result<Response<Body>, ApiError>
1696 8928 : where
1697 8928 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1698 8928 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
1699 8928 : {
1700 8928 : // Spawn a new task to handle the request, to protect the handler from unexpected
1701 8928 : // async cancellations. Most pageserver functions are not async cancellation safe.
1702 8928 : // We arm a drop-guard, so that if Hyper drops the Future, we signal the task
1703 8928 : // with the cancellation token.
1704 8928 : let token = CancellationToken::new();
1705 8928 : let cancel_guard = token.clone().drop_guard();
1706 8928 : let result = request_span(request, move |r| async {
1707 8928 : let handle = tokio::spawn(
1708 8928 : async {
1709 8928 : let token_cloned = token.clone();
1710 7291891 : let result = handler(r, token).await;
1711 8921 : if token_cloned.is_cancelled() {
1712 : // dropguard has executed: we will never turn this result into response.
1713 : //
1714 : // at least temporarily do {:?} logging; these failures are rare enough but
1715 : // could hide difficult errors.
1716 1 : match &result {
1717 1 : Ok(response) => {
1718 1 : let status = response.status();
1719 1 : info!(%status, "Cancelled request finished successfully")
1720 : }
1721 UBC 0 : Err(e) => error!("Cancelled request finished with an error: {e:?}"),
1722 : }
1723 CBC 8920 : }
1724 : // only logging for cancelled panicked request handlers is the tracing_panic_hook,
1725 : // which should suffice.
1726 : //
1727 : // there is still a chance to lose the result due to race between
1728 : // returning from here and the actual connection closing happening
1729 : // before outer task gets to execute. leaving that up for #5815.
1730 8921 : result
1731 8928 : }
1732 8928 : .in_current_span(),
1733 8928 : );
1734 8928 :
1735 9007 : match handle.await {
1736 : // TODO: never actually return Err from here, always Ok(...) so that we can log
1737 : // spanned errors. Call api_error_handler instead and return appropriate Body.
1738 8920 : Ok(result) => result,
1739 UBC 0 : Err(e) => {
1740 0 : // The handler task panicked. We have a global panic handler that logs the
1741 0 : // panic with its backtrace, so no need to log that here. Only log a brief
1742 0 : // message to make it clear that we returned the error to the client.
1743 0 : error!("HTTP request handler task panicked: {e:#}");
1744 :
1745 : // Don't return an Error here, because then fallback error handler that was
1746 : // installed in make_router() will print the error. Instead, construct the
1747 : // HTTP error response and return that.
1748 0 : Ok(
1749 0 : ApiError::InternalServerError(anyhow!("HTTP request handler task panicked"))
1750 0 : .into_response(),
1751 0 : )
1752 : }
1753 : }
1754 CBC 8928 : })
1755 9007 : .await;
1756 :
1757 8920 : cancel_guard.disarm();
1758 8920 :
1759 8920 : result
1760 8920 : }
1761 :
1762 : /// Like api_handler, but returns an error response if the server is built without
1763 : /// the 'testing' feature.
1764 881 : async fn testing_api_handler<R, H>(
1765 881 : desc: &str,
1766 881 : request: Request<Body>,
1767 881 : handler: H,
1768 881 : ) -> Result<Response<Body>, ApiError>
1769 881 : where
1770 881 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1771 881 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
1772 881 : {
1773 881 : if cfg!(feature = "testing") {
1774 884 : api_handler(request, handler).await
1775 : } else {
1776 UBC 0 : std::future::ready(Err(ApiError::BadRequest(anyhow!(
1777 0 : "Cannot {desc} because pageserver was compiled without testing APIs",
1778 0 : ))))
1779 0 : .await
1780 : }
1781 CBC 880 : }
1782 :
1783 557 : pub fn make_router(
1784 557 : state: Arc<State>,
1785 557 : launch_ts: &'static LaunchTimestamp,
1786 557 : auth: Option<Arc<SwappableJwtAuth>>,
1787 557 : ) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
1788 557 : let spec = include_bytes!("openapi_spec.yml");
1789 557 : let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
1790 557 : if auth.is_some() {
1791 125 : router = router.middleware(auth_middleware(|request| {
1792 125 : let state = get_state(request);
1793 125 : if state.allowlist_routes.contains(request.uri()) {
1794 34 : None
1795 : } else {
1796 91 : state.auth.as_deref()
1797 : }
1798 125 : }))
1799 546 : }
1800 :
1801 557 : router = router.middleware(
1802 557 : endpoint::add_response_header_middleware(
1803 557 : "PAGESERVER_LAUNCH_TIMESTAMP",
1804 557 : &launch_ts.to_string(),
1805 557 : )
1806 557 : .expect("construct launch timestamp header middleware"),
1807 557 : );
1808 557 :
1809 557 : Ok(router
1810 557 : .data(state)
1811 563 : .get("/v1/status", |r| api_handler(r, status_handler))
1812 557 : .put("/v1/failpoints", |r| {
1813 174 : testing_api_handler("manage failpoints", r, failpoints_handler)
1814 557 : })
1815 557 : .post("/v1/reload_auth_validation_keys", |r| {
1816 6 : api_handler(r, reload_auth_validation_keys_handler)
1817 557 : })
1818 557 : .get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
1819 557 : .post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
1820 557 : .get("/v1/tenant/:tenant_shard_id", |r| {
1821 527 : api_handler(r, tenant_status)
1822 557 : })
1823 557 : .delete("/v1/tenant/:tenant_shard_id", |r| {
1824 91 : api_handler(r, tenant_delete_handler)
1825 557 : })
1826 557 : .get("/v1/tenant/:tenant_shard_id/synthetic_size", |r| {
1827 55 : api_handler(r, tenant_size_handler)
1828 557 : })
1829 557 : .put("/v1/tenant/config", |r| {
1830 30 : api_handler(r, update_tenant_config_handler)
1831 557 : })
1832 557 : .get("/v1/tenant/:tenant_shard_id/config", |r| {
1833 45 : api_handler(r, get_tenant_config_handler)
1834 557 : })
1835 557 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
1836 162 : api_handler(r, put_tenant_location_config_handler)
1837 557 : })
1838 557 : .get("/v1/tenant/:tenant_shard_id/timeline", |r| {
1839 45 : api_handler(r, timeline_list_handler)
1840 557 : })
1841 784 : .post("/v1/tenant/:tenant_shard_id/timeline", |r| {
1842 784 : api_handler(r, timeline_create_handler)
1843 784 : })
1844 557 : .post("/v1/tenant/:tenant_id/attach", |r| {
1845 47 : api_handler(r, tenant_attach_handler)
1846 557 : })
1847 557 : .post("/v1/tenant/:tenant_id/detach", |r| {
1848 32 : api_handler(r, tenant_detach_handler)
1849 557 : })
1850 557 : .post("/v1/tenant/:tenant_shard_id/reset", |r| {
1851 2 : api_handler(r, tenant_reset_handler)
1852 557 : })
1853 557 : .post("/v1/tenant/:tenant_id/load", |r| {
1854 5 : api_handler(r, tenant_load_handler)
1855 557 : })
1856 557 : .post("/v1/tenant/:tenant_id/ignore", |r| {
1857 6 : api_handler(r, tenant_ignore_handler)
1858 557 : })
1859 2215 : .get("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
1860 2215 : api_handler(r, timeline_detail_handler)
1861 2215 : })
1862 557 : .get(
1863 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_lsn_by_timestamp",
1864 557 : |r| api_handler(r, get_lsn_by_timestamp_handler),
1865 557 : )
1866 557 : .get(
1867 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_timestamp_of_lsn",
1868 557 : |r| api_handler(r, get_timestamp_of_lsn_handler),
1869 557 : )
1870 557 : .put(
1871 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
1872 557 : |r| api_handler(r, timeline_gc_handler),
1873 557 : )
1874 557 : .put(
1875 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
1876 557 : |r| testing_api_handler("run timeline compaction", r, timeline_compact_handler),
1877 557 : )
1878 557 : .put(
1879 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",
1880 576 : |r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler),
1881 557 : )
1882 557 : .post(
1883 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
1884 557 : |r| api_handler(r, timeline_download_remote_layers_handler_post),
1885 557 : )
1886 557 : .get(
1887 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
1888 557 : |r| api_handler(r, timeline_download_remote_layers_handler_get),
1889 557 : )
1890 557 : .delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
1891 61 : api_handler(r, timeline_delete_handler)
1892 557 : })
1893 557 : .get(
1894 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer",
1895 557 : |r| api_handler(r, layer_map_info_handler),
1896 557 : )
1897 557 : .get(
1898 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
1899 557 : |r| api_handler(r, layer_download_handler),
1900 557 : )
1901 557 : .delete(
1902 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
1903 2245 : |r| api_handler(r, evict_timeline_layer_handler),
1904 557 : )
1905 557 : .post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
1906 5 : api_handler(r, secondary_upload_handler)
1907 557 : })
1908 557 : .put("/v1/disk_usage_eviction/run", |r| {
1909 10 : api_handler(r, disk_usage_eviction_run)
1910 557 : })
1911 557 : .put("/v1/deletion_queue/flush", |r| {
1912 19 : api_handler(r, deletion_queue_flush)
1913 557 : })
1914 557 : .post("/v1/tenant/:tenant_shard_id/secondary/download", |r| {
1915 4 : api_handler(r, secondary_download_handler)
1916 557 : })
1917 557 : .put("/v1/tenant/:tenant_shard_id/break", |r| {
1918 2 : testing_api_handler("set tenant state to broken", r, handle_tenant_break)
1919 557 : })
1920 557 : .get("/v1/panic", |r| api_handler(r, always_panic_handler))
1921 557 : .post("/v1/tracing/event", |r| {
1922 5 : testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
1923 557 : })
1924 557 : .get(
1925 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage",
1926 557 : |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
1927 557 : )
1928 557 : .get(
1929 557 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
1930 557 : |r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
1931 557 : )
1932 557 : .any(handler_404))
1933 557 : }
|