TLA Line data Source code
1 : //!
2 : //! Management HTTP API
3 : //!
4 : use std::collections::HashMap;
5 : use std::str::FromStr;
6 : use std::sync::Arc;
7 :
8 : use anyhow::{anyhow, Context, Result};
9 : use futures::TryFutureExt;
10 : use humantime::format_rfc3339;
11 : use hyper::header::CONTENT_TYPE;
12 : use hyper::StatusCode;
13 : use hyper::{Body, Request, Response, Uri};
14 : use metrics::launch_timestamp::LaunchTimestamp;
15 : use pageserver_api::models::{
16 : DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
17 : TenantLoadRequest, TenantLocationConfigRequest,
18 : };
19 : use remote_storage::GenericRemoteStorage;
20 : use tenant_size_model::{SizeResult, StorageModel};
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::*;
23 : use utils::http::endpoint::request_span;
24 : use utils::http::json::json_request_or_empty_body;
25 : use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
26 :
27 : use super::models::{
28 : StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
29 : TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
30 : };
31 : use crate::context::{DownloadBehavior, RequestContext};
32 : use crate::deletion_queue::DeletionQueueClient;
33 : use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
34 : use crate::pgdatadir_mapping::LsnForTimestamp;
35 : use crate::task_mgr::TaskKind;
36 : use crate::tenant::config::{LocationConf, TenantConfOpt};
37 : use crate::tenant::mgr::{
38 : GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
39 : };
40 : use crate::tenant::size::ModelInputs;
41 : use crate::tenant::storage_layer::LayerAccessStatsReset;
42 : use crate::tenant::timeline::Timeline;
43 : use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources};
44 : use crate::{config::PageServerConf, tenant::mgr};
45 : use crate::{disk_usage_eviction_task, tenant};
46 : use utils::{
47 : auth::JwtAuth,
48 : generation::Generation,
49 : http::{
50 : endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
51 : error::{ApiError, HttpErrorBody},
52 : json::{json_request, json_response},
53 : request::parse_request_param,
54 : RequestExt, RouterBuilder,
55 : },
56 : id::{TenantId, TimelineId},
57 : lsn::Lsn,
58 : };
59 :
60 : // Imports only used for testing APIs
61 : use super::models::ConfigureFailpointsRequest;
62 :
63 : pub struct State {
64 : conf: &'static PageServerConf,
65 : auth: Option<Arc<JwtAuth>>,
66 : allowlist_routes: Vec<Uri>,
67 : remote_storage: Option<GenericRemoteStorage>,
68 : broker_client: storage_broker::BrokerClientChannel,
69 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
70 : deletion_queue_client: DeletionQueueClient,
71 : }
72 ECB (558) :
73 (558) : impl State {
74 CBC 560 : pub fn new(
75 560 : conf: &'static PageServerConf,
76 560 : auth: Option<Arc<JwtAuth>>,
77 560 : remote_storage: Option<GenericRemoteStorage>,
78 560 : broker_client: storage_broker::BrokerClientChannel,
79 560 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
80 560 : deletion_queue_client: DeletionQueueClient,
81 560 : ) -> anyhow::Result<Self> {
82 560 : let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
83 560 : .iter()
84 2240 : .map(|v| v.parse().unwrap())
85 560 : .collect::<Vec<_>>();
86 560 : Ok(Self {
87 560 : conf,
88 560 : auth,
89 560 : allowlist_routes,
90 560 : remote_storage,
91 560 : broker_client,
92 560 : disk_usage_eviction_state,
93 560 : deletion_queue_client,
94 GIC 560 : })
95 CBC 560 : }
96 ECB (505) :
97 CBC 507 : fn tenant_resources(&self) -> TenantSharedResources {
98 507 : TenantSharedResources {
99 507 : broker_client: self.broker_client.clone(),
100 507 : remote_storage: self.remote_storage.clone(),
101 507 : deletion_queue_client: self.deletion_queue_client.clone(),
102 GIC 507 : }
103 507 : }
104 : }
105 ECB (2163) :
106 (2163) : #[inline(always)]
107 CBC 2181 : fn get_state(request: &Request<Body>) -> &State {
108 2181 : request
109 2181 : .data::<Arc<State>>()
110 2181 : .expect("unknown state type")
111 GIC 2181 : .as_ref()
112 2181 : }
113 ECB (562) :
114 (562) : #[inline(always)]
115 CBC 564 : fn get_config(request: &Request<Body>) -> &'static PageServerConf {
116 GIC 564 : get_state(request).conf
117 564 : }
118 ECB (6414) :
119 (6414) : /// Check that the requester is authorized to operate on given tenant
120 CBC 6392 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
121 6392 : check_permission_with(request, |claims| {
122 60 : crate::auth::check_permission(claims, tenant_id)
123 GIC 6392 : })
124 6392 : }
125 EUB :
126 : impl From<PageReconstructError> for ApiError {
127 GBC 2 : fn from(pre: PageReconstructError) -> ApiError {
128 GIC 2 : match pre {
129 2 : PageReconstructError::Other(pre) => ApiError::InternalServerError(pre),
130 : PageReconstructError::NeedsDownload(_, _) => {
131 EUB : // This shouldn't happen, because we use a RequestContext that requests to
132 : // download any missing layer files on-demand.
133 UIC 0 : ApiError::InternalServerError(anyhow::anyhow!("need to download remote layer file"))
134 EUB : }
135 : PageReconstructError::Cancelled => {
136 UIC 0 : ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
137 EUB : }
138 : PageReconstructError::AncestorStopping(_) => {
139 UBC 0 : ApiError::ResourceUnavailable(format!("{pre}").into())
140 : }
141 0 : PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
142 : }
143 GIC 2 : }
144 : }
145 ECB (17) :
146 (17) : impl From<TenantMapInsertError> for ApiError {
147 GIC 17 : fn from(tmie: TenantMapInsertError) -> ApiError {
148 GBC 17 : match tmie {
149 : TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
150 LBC (14) : ApiError::ResourceUnavailable(format!("{tmie}").into())
151 ECB (14) : }
152 GIC 14 : TenantMapInsertError::TenantAlreadyExists(id, state) => {
153 GBC 14 : ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
154 EUB : }
155 UIC 0 : TenantMapInsertError::TenantExistsSecondary(id) => {
156 LBC (3) : ApiError::Conflict(format!("tenant {id} already exists as secondary"))
157 : }
158 CBC 3 : TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
159 : }
160 GIC 17 : }
161 : }
162 ECB (3) :
163 (3) : impl From<TenantStateError> for ApiError {
164 CBC 3 : fn from(tse: TenantStateError) -> ApiError {
165 GIC 3 : match tse {
166 GBC 3 : TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
167 : TenantStateError::IsStopping(_) => {
168 UBC 0 : ApiError::ResourceUnavailable("Tenant is stopping".into())
169 : }
170 LBC (3) : _ => ApiError::InternalServerError(anyhow::Error::new(tse)),
171 : }
172 GIC 3 : }
173 : }
174 ECB (84) :
175 (84) : impl From<GetTenantError> for ApiError {
176 CBC 79 : fn from(tse: GetTenantError) -> ApiError {
177 GBC 79 : match tse {
178 65 : GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
179 UIC 0 : GetTenantError::Broken(reason) => {
180 0 : ApiError::InternalServerError(anyhow!("tenant is broken: {}", reason))
181 : }
182 : GetTenantError::NotActive(_) => {
183 : // Why is this not `ApiError::NotFound`?
184 : // Because we must be careful to never return 404 for a tenant if it does
185 : // in fact exist locally. If we did, the caller could draw the conclusion
186 : // that it can attach the tenant to another PS and we'd be in split-brain.
187 ECB (19) : //
188 : // (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
189 GIC 14 : ApiError::ResourceUnavailable("Tenant not yet active".into())
190 ECB (84) : }
191 : }
192 GIC 79 : }
193 : }
194 EUB :
195 : impl From<SetNewTenantConfigError> for ApiError {
196 UBC 0 : fn from(e: SetNewTenantConfigError) -> ApiError {
197 0 : match e {
198 UIC 0 : SetNewTenantConfigError::GetTenant(tid) => {
199 UBC 0 : ApiError::NotFound(anyhow!("tenant {}", tid).into())
200 EUB : }
201 UIC 0 : e @ SetNewTenantConfigError::Persist(_) => {
202 0 : ApiError::InternalServerError(anyhow::Error::new(e))
203 EUB : }
204 : }
205 UIC 0 : }
206 : }
207 ECB (13) :
208 (13) : impl From<crate::tenant::DeleteTimelineError> for ApiError {
209 CBC 13 : fn from(value: crate::tenant::DeleteTimelineError) -> Self {
210 13 : use crate::tenant::DeleteTimelineError::*;
211 13 : match value {
212 1 : NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
213 1 : HasChildren(children) => ApiError::PreconditionFailed(
214 1 : format!("Cannot delete timeline which has child timelines: {children:?}")
215 1 : .into_boxed_str(),
216 1 : ),
217 GIC 3 : a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
218 CBC 8 : Other(e) => ApiError::InternalServerError(e),
219 : }
220 GIC 13 : }
221 : }
222 :
223 : impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
224 ECB (1) : fn from(value: crate::tenant::mgr::DeleteTimelineError) -> Self {
225 : use crate::tenant::mgr::DeleteTimelineError::*;
226 GIC 1 : match value {
227 ECB (1) : // Report Precondition failed so client can distinguish between
228 (1) : // "tenant is missing" case from "timeline is missing"
229 CBC 1 : Tenant(GetTenantError::NotFound(..)) => ApiError::PreconditionFailed(
230 GBC 1 : "Requested tenant is missing".to_owned().into_boxed_str(),
231 CBC 1 : ),
232 UIC 0 : Tenant(t) => ApiError::from(t),
233 CBC 13 : Timeline(t) => ApiError::from(t),
234 : }
235 GIC 14 : }
236 : }
237 ECB (19) :
238 (19) : impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
239 CBC 19 : fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
240 19 : use crate::tenant::delete::DeleteTenantError::*;
241 GBC 19 : match value {
242 3 : Get(g) => ApiError::from(g),
243 LBC (16) : e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
244 UBC 0 : Timeline(t) => ApiError::from(t),
245 GIC 16 : Other(o) => ApiError::InternalServerError(o),
246 LBC (19) : e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
247 : }
248 GIC 19 : }
249 : }
250 ECB (1841) :
251 (1841) : // Helper function to construct a TimelineInfo struct for a timeline
252 CBC 1858 : async fn build_timeline_info(
253 1858 : timeline: &Arc<Timeline>,
254 1858 : include_non_incremental_logical_size: bool,
255 1858 : ctx: &RequestContext,
256 GIC 1858 : ) -> anyhow::Result<TimelineInfo> {
257 CBC 1858 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
258 ECB (1841) :
259 GIC 1858 : let mut info = build_timeline_info_common(timeline, ctx).await?;
260 1858 : if include_non_incremental_logical_size {
261 : // XXX we should be using spawn_ondemand_logical_size_calculation here.
262 : // Otherwise, if someone deletes the timeline / detaches the tenant while
263 ECB (20) : // we're executing this function, we will outlive the timeline on-disk state.
264 (20) : info.current_logical_size_non_incremental = Some(
265 CBC 19 : timeline
266 19 : .get_current_logical_size_non_incremental(
267 19 : info.last_record_lsn,
268 19 : CancellationToken::new(),
269 19 : ctx,
270 GIC 19 : )
271 CBC 1200 : .await?,
272 ECB (1840) : );
273 CBC 1839 : }
274 GIC 1857 : Ok(info)
275 CBC 1858 : }
276 ECB (2651) :
277 CBC 2671 : async fn build_timeline_info_common(
278 2671 : timeline: &Arc<Timeline>,
279 2671 : ctx: &RequestContext,
280 2671 : ) -> anyhow::Result<TimelineInfo> {
281 2671 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
282 2671 : let last_record_lsn = timeline.get_last_record_lsn();
283 2671 : let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
284 2671 : let guard = timeline.last_received_wal.lock().unwrap();
285 2671 : if let Some(info) = guard.as_ref() {
286 1420 : (
287 1420 : Some(format!("{:?}", info.wal_source_connconf)), // Password is hidden, but it's for statistics only.
288 1420 : Some(info.last_received_msg_lsn),
289 GIC 1420 : Some(info.last_received_msg_ts),
290 CBC 1420 : )
291 : } else {
292 GIC 1251 : (None, None, None)
293 : }
294 ECB (2651) : };
295 (2651) :
296 CBC 2671 : let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
297 2671 : let ancestor_lsn = match timeline.get_ancestor_lsn() {
298 GIC 1988 : Lsn(0) => None,
299 CBC 683 : lsn @ Lsn(_) => Some(lsn),
300 ECB (2651) : };
301 GBC 2671 : let current_logical_size = match timeline.get_current_logical_size(ctx) {
302 2671 : Ok((size, _)) => Some(size),
303 UBC 0 : Err(err) => {
304 UIC 0 : error!("Timeline info creation failed to get current logical size: {err:?}");
305 0 : None
306 ECB (2651) : }
307 (2651) : };
308 CBC 2671 : let current_physical_size = Some(timeline.layer_size_sum().await);
309 2671 : let state = timeline.current_state();
310 2671 : let remote_consistent_lsn_projected = timeline
311 2671 : .get_remote_consistent_lsn_projected()
312 2671 : .unwrap_or(Lsn(0));
313 2671 : let remote_consistent_lsn_visible = timeline
314 2671 : .get_remote_consistent_lsn_visible()
315 2671 : .unwrap_or(Lsn(0));
316 2671 :
317 2671 : let walreceiver_status = timeline.walreceiver_status();
318 2671 :
319 2671 : let info = TimelineInfo {
320 2671 : tenant_id: timeline.tenant_id,
321 2671 : timeline_id: timeline.timeline_id,
322 2671 : ancestor_timeline_id,
323 2671 : ancestor_lsn,
324 2671 : disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
325 2671 : remote_consistent_lsn: remote_consistent_lsn_projected,
326 2671 : remote_consistent_lsn_visible,
327 2671 : last_record_lsn,
328 2671 : prev_record_lsn: Some(timeline.get_prev_record_lsn()),
329 2671 : latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
330 2671 : current_logical_size,
331 2671 : current_physical_size,
332 2671 : current_logical_size_non_incremental: None,
333 2671 : timeline_dir_layer_file_size_sum: None,
334 2671 : wal_source_connstr,
335 2671 : last_received_msg_lsn,
336 2671 : last_received_msg_ts,
337 2671 : pg_version: timeline.pg_version,
338 2671 :
339 2671 : state,
340 2671 :
341 2671 : walreceiver_status,
342 2671 : };
343 GIC 2671 : Ok(info)
344 2671 : }
345 ECB (562) :
346 (562) : // healthcheck handler
347 CBC 564 : async fn status_handler(
348 564 : request: Request<Body>,
349 564 : _cancel: CancellationToken,
350 564 : ) -> Result<Response<Body>, ApiError> {
351 564 : check_permission(&request, None)?;
352 564 : let config = get_config(&request);
353 GIC 564 : json_response(StatusCode::OK, StatusResponse { id: config.id })
354 CBC 564 : }
355 ECB (833) :
356 CBC 848 : async fn timeline_create_handler(
357 848 : mut request: Request<Body>,
358 848 : _cancel: CancellationToken,
359 848 : ) -> Result<Response<Body>, ApiError> {
360 848 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
361 GIC 848 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
362 CBC 848 : check_permission(&request, Some(tenant_id))?;
363 ECB (832) :
364 CBC 847 : let new_timeline_id = request_data.new_timeline_id;
365 847 :
366 847 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
367 847 :
368 847 : let state = get_state(&request);
369 847 :
370 847 : async {
371 847 : let tenant = mgr::get_tenant(tenant_id, true).await?;
372 847 : match tenant.create_timeline(
373 847 : new_timeline_id,
374 847 : request_data.ancestor_timeline_id.map(TimelineId::from),
375 847 : request_data.ancestor_start_lsn,
376 847 : request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
377 847 : state.broker_client.clone(),
378 847 : &ctx,
379 847 : )
380 GIC 2891018 : .await {
381 CBC 813 : Ok(new_timeline) => {
382 EUB : // Created. Construct a TimelineInfo for it.
383 CBC 813 : let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
384 LBC (810) : .await
385 GIC 813 : .map_err(ApiError::InternalServerError)?;
386 813 : json_response(StatusCode::CREATED, timeline_info)
387 ECB (1) : }
388 : Err(tenant::CreateTimelineError::AlreadyExists) => {
389 CBC 1 : json_response(StatusCode::CONFLICT, ())
390 ECB (8) : }
391 CBC 8 : Err(tenant::CreateTimelineError::AncestorLsn(err)) => {
392 8 : json_response(StatusCode::NOT_ACCEPTABLE, HttpErrorBody::from_msg(
393 GIC 8 : format!("{err:#}")
394 CBC 8 : ))
395 ECB (6) : }
396 GIC 18 : Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
397 CBC 18 : json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
398 : }
399 2 : Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
400 ECB (832) : }
401 CBC 842 : }
402 847 : .instrument(info_span!("timeline_create", %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
403 GIC 2891018 : .await
404 CBC 843 : }
405 ECB (47) :
406 CBC 47 : async fn timeline_list_handler(
407 47 : request: Request<Body>,
408 47 : _cancel: CancellationToken,
409 47 : ) -> Result<Response<Body>, ApiError> {
410 47 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
411 47 : let include_non_incremental_logical_size: Option<bool> =
412 GIC 47 : parse_query_param(&request, "include-non-incremental-logical-size")?;
413 CBC 47 : check_permission(&request, Some(tenant_id))?;
414 :
415 47 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
416 ECB (47) :
417 CBC 47 : let response_data = async {
418 47 : let tenant = mgr::get_tenant(tenant_id, true).await?;
419 47 : let timelines = tenant.list_timelines();
420 47 :
421 47 : let mut response_data = Vec::with_capacity(timelines.len());
422 128 : for timeline in timelines {
423 81 : let timeline_info = build_timeline_info(
424 81 : &timeline,
425 81 : include_non_incremental_logical_size.unwrap_or(false),
426 81 : &ctx,
427 GBC 81 : )
428 CBC 81 : .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
429 LBC (81) : .await
430 GIC 81 : .context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
431 CBC 81 : .map_err(ApiError::InternalServerError)?;
432 :
433 81 : response_data.push(timeline_info);
434 ECB (47) : }
435 CBC 47 : Ok::<Vec<TimelineInfo>, ApiError>(response_data)
436 GBC 47 : }
437 GIC 47 : .instrument(info_span!("timeline_list", %tenant_id))
438 LBC (47) : .await?;
439 ECB (47) :
440 GIC 47 : json_response(StatusCode::OK, response_data)
441 CBC 47 : }
442 ECB (1856) :
443 CBC 1868 : async fn timeline_detail_handler(
444 1868 : request: Request<Body>,
445 1868 : _cancel: CancellationToken,
446 1868 : ) -> Result<Response<Body>, ApiError> {
447 1868 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
448 1868 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
449 1868 : let include_non_incremental_logical_size: Option<bool> =
450 GIC 1868 : parse_query_param(&request, "include-non-incremental-logical-size")?;
451 1868 : check_permission(&request, Some(tenant_id))?;
452 ECB (1856) :
453 : // Logical size calculation needs downloading.
454 CBC 1868 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
455 ECB (1856) :
456 GIC 1868 : let timeline_info = async {
457 CBC 1868 : let tenant = mgr::get_tenant(tenant_id, true).await?;
458 ECB (1837) :
459 CBC 1854 : let timeline = tenant
460 GIC 1854 : .get_timeline(timeline_id, false)
461 CBC 1854 : .map_err(|e| ApiError::NotFound(e.into()))?;
462 ECB (1760) :
463 CBC 1777 : let timeline_info = build_timeline_info(
464 1777 : &timeline,
465 1777 : include_non_incremental_logical_size.unwrap_or(false),
466 1777 : &ctx,
467 1777 : )
468 1216 : .await
469 GIC 1777 : .context("get local timeline info")
470 CBC 1777 : .map_err(ApiError::InternalServerError)?;
471 ECB (1856) :
472 CBC 1776 : Ok::<_, ApiError>(timeline_info)
473 1868 : }
474 GIC 1868 : .instrument(info_span!("timeline_detail", %tenant_id, %timeline_id))
475 CBC 1216 : .await?;
476 ECB (1856) :
477 GIC 1776 : json_response(StatusCode::OK, timeline_info)
478 CBC 1868 : }
479 ECB (12) :
480 CBC 12 : async fn get_lsn_by_timestamp_handler(
481 12 : request: Request<Body>,
482 12 : _cancel: CancellationToken,
483 12 : ) -> Result<Response<Body>, ApiError> {
484 GIC 12 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
485 CBC 12 : check_permission(&request, Some(tenant_id))?;
486 ECB (12) :
487 CBC 12 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
488 12 : let timestamp_raw = must_get_query_param(&request, "timestamp")?;
489 12 : let timestamp = humantime::parse_rfc3339(×tamp_raw)
490 12 : .with_context(|| format!("Invalid time: {:?}", timestamp_raw))
491 12 : .map_err(ApiError::BadRequest)?;
492 12 : let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
493 12 :
494 12 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
495 GIC 12 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
496 CBC 1314 : let result = timeline.find_lsn_for_timestamp(timestamp_pg, &ctx).await?;
497 ECB (10) :
498 CBC 12 : let result = match result {
499 10 : LsnForTimestamp::Present(lsn) => format!("{lsn}"),
500 GBC 1 : LsnForTimestamp::Future(_lsn) => "future".into(),
501 GIC 1 : LsnForTimestamp::Past(_lsn) => "past".into(),
502 LBC (12) : LsnForTimestamp::NoData(_lsn) => "nodata".into(),
503 ECB (12) : };
504 GIC 12 : json_response(StatusCode::OK, result)
505 CBC 12 : }
506 ECB (60) :
507 CBC 12 : async fn get_timestamp_of_lsn_handler(
508 12 : request: Request<Body>,
509 12 : _cancel: CancellationToken,
510 12 : ) -> Result<Response<Body>, ApiError> {
511 GIC 12 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
512 CBC 12 : check_permission(&request, Some(tenant_id))?;
513 ECB (57) :
514 CBC 12 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
515 ECB (2) :
516 GIC 12 : let lsn_str = must_get_query_param(&request, "lsn")?;
517 12 : let lsn = Lsn::from_str(&lsn_str)
518 CBC 12 : .with_context(|| format!("Invalid LSN: {lsn_str:?}"))
519 GIC 12 : .map_err(ApiError::BadRequest)?;
520 ECB (57) :
521 GIC 12 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
522 CBC 12 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
523 GIC 80 : let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;
524 ECB (57) :
525 GIC 10 : match result {
526 CBC 10 : Some(time) => {
527 GBC 10 : let time = format_rfc3339(postgres_ffi::from_pg_timestamp(time)).to_string();
528 10 : json_response(StatusCode::OK, time)
529 EUB : }
530 LBC (57) : None => json_response(StatusCode::NOT_FOUND, ()),
531 ECB (57) : }
532 CBC 12 : }
533 ECB (57) :
534 CBC 60 : async fn tenant_attach_handler(
535 60 : mut request: Request<Body>,
536 60 : _cancel: CancellationToken,
537 60 : ) -> Result<Response<Body>, ApiError> {
538 60 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
539 60 : check_permission(&request, Some(tenant_id))?;
540 ECB (57) :
541 CBC 60 : let maybe_body: Option<TenantAttachRequest> = json_request_or_empty_body(&mut request).await?;
542 GIC 57 : let tenant_conf = match &maybe_body {
543 CBC 55 : Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?,
544 2 : None => TenantConfOpt::default(),
545 : };
546 ECB (99) :
547 CBC 57 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
548 ECB (99) :
549 CBC 57 : info!("Handling tenant attach {tenant_id}");
550 ECB (99) :
551 CBC 57 : let state = get_state(&request);
552 ECB (99) :
553 GIC 57 : let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
554 ECB (99) :
555 CBC 57 : if state.remote_storage.is_none() {
556 LBC (99) : return Err(ApiError::BadRequest(anyhow!(
557 (99) : "attach_tenant is not possible because pageserver was configured without remote storage"
558 (623) : )));
559 GIC 57 : }
560 CBC 57 :
561 57 : mgr::attach_tenant(
562 GIC 57 : state.conf,
563 CBC 57 : tenant_id,
564 57 : generation,
565 57 : tenant_conf,
566 57 : state.tenant_resources(),
567 57 : &ctx,
568 57 : )
569 57 : .instrument(info_span!("tenant_attach", %tenant_id))
570 GIC 15 : .await?;
571 ECB (38) :
572 CBC 42 : json_response(StatusCode::ACCEPTED, ())
573 60 : }
574 ECB (38) :
575 CBC 99 : async fn timeline_delete_handler(
576 99 : request: Request<Body>,
577 99 : _cancel: CancellationToken,
578 99 : ) -> Result<Response<Body>, ApiError> {
579 99 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
580 99 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
581 GIC 99 : check_permission(&request, Some(tenant_id))?;
582 ECB (35) :
583 CBC 99 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
584 GIC 99 :
585 CBC 99 : mgr::delete_timeline(tenant_id, timeline_id, &ctx)
586 99 : .instrument(info_span!("timeline_delete", %tenant_id, %timeline_id))
587 605 : .await?;
588 ECB (6) :
589 CBC 85 : json_response(StatusCode::ACCEPTED, ())
590 99 : }
591 :
592 38 : async fn tenant_detach_handler(
593 GIC 38 : request: Request<Body>,
594 CBC 38 : _cancel: CancellationToken,
595 GIC 38 : ) -> Result<Response<Body>, ApiError> {
596 CBC 38 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
597 GIC 38 : check_permission(&request, Some(tenant_id))?;
598 38 : let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
599 :
600 CBC 38 : let state = get_state(&request);
601 GIC 38 : let conf = state.conf;
602 CBC 38 : mgr::detach_tenant(
603 38 : conf,
604 38 : tenant_id,
605 38 : detach_ignored.unwrap_or(false),
606 38 : &state.deletion_queue_client,
607 38 : )
608 38 : .instrument(info_span!("tenant_detach", %tenant_id))
609 217 : .await?;
610 ECB (6) :
611 CBC 35 : json_response(StatusCode::OK, ())
612 38 : }
613 :
614 6 : async fn tenant_load_handler(
615 6 : mut request: Request<Body>,
616 GIC 6 : _cancel: CancellationToken,
617 CBC 6 : ) -> Result<Response<Body>, ApiError> {
618 6 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
619 6 : check_permission(&request, Some(tenant_id))?;
620 ECB (7) :
621 CBC 6 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
622 ECB (7) :
623 GIC 6 : let maybe_body: Option<TenantLoadRequest> = json_request_or_empty_body(&mut request).await?;
624 ECB (7) :
625 CBC 6 : let state = get_state(&request);
626 ECB (7) :
627 (7) : // The /load request is only usable when control_plane_api is not set. Once it is set, callers
628 (29) : // should always use /attach instead.
629 GIC 6 : let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
630 ECB (7) :
631 CBC 6 : mgr::load_tenant(
632 GIC 6 : state.conf,
633 CBC 6 : tenant_id,
634 6 : generation,
635 6 : state.broker_client.clone(),
636 6 : state.remote_storage.clone(),
637 6 : state.deletion_queue_client.clone(),
638 GIC 6 : &ctx,
639 CBC 6 : )
640 6 : .instrument(info_span!("load", %tenant_id))
641 GBC 1 : .await?;
642 ECB (67) :
643 GBC 5 : json_response(StatusCode::ACCEPTED, ())
644 CBC 6 : }
645 ECB (67) :
646 CBC 7 : async fn tenant_ignore_handler(
647 7 : request: Request<Body>,
648 7 : _cancel: CancellationToken,
649 7 : ) -> Result<Response<Body>, ApiError> {
650 7 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
651 7 : check_permission(&request, Some(tenant_id))?;
652 ECB (67) :
653 CBC 7 : let state = get_state(&request);
654 7 : let conf = state.conf;
655 7 : mgr::ignore_tenant(conf, tenant_id)
656 GIC 7 : .instrument(info_span!("ignore_tenant", %tenant_id))
657 CBC 27 : .await?;
658 ECB (519) :
659 CBC 7 : json_response(StatusCode::OK, ())
660 7 : }
661 ECB (519) :
662 CBC 66 : async fn tenant_list_handler(
663 GIC 66 : request: Request<Body>,
664 CBC 66 : _cancel: CancellationToken,
665 66 : ) -> Result<Response<Body>, ApiError> {
666 GIC 66 : check_permission(&request, None)?;
667 :
668 CBC 66 : let response_data = mgr::list_tenants()
669 66 : .instrument(info_span!("tenant_list"))
670 LBC (489) : .await
671 GIC 66 : .map_err(|_| {
672 UIC 0 : ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
673 CBC 66 : })?
674 66 : .iter()
675 112 : .map(|(id, state)| TenantInfo {
676 112 : id: *id,
677 112 : state: state.clone(),
678 112 : current_physical_size: None,
679 112 : attachment_status: state.attachment_status(),
680 112 : })
681 66 : .collect::<Vec<TenantInfo>>();
682 66 :
683 GIC 66 : json_response(StatusCode::OK, response_data)
684 CBC 66 : }
685 ECB (519) :
686 GIC 498 : async fn tenant_status(
687 CBC 498 : request: Request<Body>,
688 498 : _cancel: CancellationToken,
689 498 : ) -> Result<Response<Body>, ApiError> {
690 498 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
691 GIC 498 : check_permission(&request, Some(tenant_id))?;
692 ECB (91) :
693 CBC 498 : let tenant_info = async {
694 GIC 498 : let tenant = mgr::get_tenant(tenant_id, false).await?;
695 ECB (91) :
696 (91) : // Calculate total physical size of all timelines
697 CBC 436 : let mut current_physical_size = 0;
698 466 : for timeline in tenant.list_timelines().iter() {
699 466 : current_physical_size += timeline.layer_size_sum().await;
700 : }
701 ECB (72) :
702 CBC 436 : let state = tenant.current_state();
703 GIC 436 : Result::<_, ApiError>::Ok(TenantInfo {
704 436 : id: tenant_id,
705 436 : state: state.clone(),
706 436 : current_physical_size: Some(current_physical_size),
707 436 : attachment_status: state.attachment_status(),
708 436 : })
709 498 : }
710 498 : .instrument(info_span!("tenant_status_handler", %tenant_id))
711 62 : .await?;
712 :
713 436 : json_response(StatusCode::OK, tenant_info)
714 498 : }
715 :
716 91 : async fn tenant_delete_handler(
717 CBC 91 : request: Request<Body>,
718 91 : _cancel: CancellationToken,
719 91 : ) -> Result<Response<Body>, ApiError> {
720 ECB (53) : // TODO openapi spec
721 CBC 91 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
722 91 : check_permission(&request, Some(tenant_id))?;
723 ECB (53) :
724 CBC 91 : let state = get_state(&request);
725 91 :
726 91 : mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id)
727 91 : .instrument(info_span!("tenant_delete_handler", %tenant_id))
728 547 : .await?;
729 :
730 GIC 72 : json_response(StatusCode::ACCEPTED, ())
731 CBC 91 : }
732 ECB (53) :
733 (53) : /// HTTP endpoint to query the current tenant_size of a tenant.
734 (53) : ///
735 (53) : /// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used
736 (53) : /// to debug any of the calculations. Requires `tenant_id` request parameter, supports
737 (46) : /// `inputs_only=true|false` (default false) which supports debugging failure to calculate model
738 (53) : /// values.
739 : ///
740 (53) : /// 'retention_period' query parameter overrides the cutoff that is used to calculate the size
741 (53) : /// (only if it is shorter than the real cutoff).
742 (53) : ///
743 (53) : /// Note: we don't update the cached size and prometheus metric here.
744 (53) : /// The retention period might be different, and it's nice to have a method to just calculate it
745 (53) : /// without modifying anything anyway.
746 CBC 53 : async fn tenant_size_handler(
747 53 : request: Request<Body>,
748 53 : _cancel: CancellationToken,
749 53 : ) -> Result<Response<Body>, ApiError> {
750 53 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
751 53 : check_permission(&request, Some(tenant_id))?;
752 GBC 53 : let inputs_only: Option<bool> = parse_query_param(&request, "inputs_only")?;
753 53 : let retention_period: Option<u64> = parse_query_param(&request, "retention_period")?;
754 53 : let headers = request.headers();
755 53 :
756 53 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
757 GIC 53 : let tenant = mgr::get_tenant(tenant_id, true).await?;
758 :
759 : // this can be long operation
760 CBC 53 : let inputs = tenant
761 GIC 53 : .gather_size_inputs(
762 53 : retention_period,
763 53 : LogicalSizeCalculationCause::TenantSizeHandler,
764 53 : &ctx,
765 53 : )
766 47 : .await
767 53 : .map_err(ApiError::InternalServerError)?;
768 :
769 53 : let mut sizes = None;
770 53 : if !inputs_only.unwrap_or(false) {
771 53 : let storage_model = inputs
772 53 : .calculate_model()
773 53 : .map_err(ApiError::InternalServerError)?;
774 CBC 53 : let size = storage_model.calculate();
775 53 :
776 53 : // If request header expects html, return html
777 53 : if headers["Accept"] == "text/html" {
778 18 : return synthetic_size_html_response(inputs, storage_model, size);
779 35 : }
780 35 : sizes = Some(size);
781 LBC (35) : } else if headers["Accept"] == "text/html" {
782 (35) : return Err(ApiError::BadRequest(anyhow!(
783 (53) : "inputs_only parameter is incompatible with html output request"
784 UIC 0 : )));
785 LBC (86) : }
786 ECB (86) :
787 (86) : /// The type resides in the pageserver not to expose `ModelInputs`.
788 (86) : #[serde_with::serde_as]
789 CBC 35 : #[derive(serde::Serialize)]
790 ECB (86) : struct TenantHistorySize {
791 (86) : #[serde_as(as = "serde_with::DisplayFromStr")]
792 (86) : id: TenantId,
793 (86) : /// Size is a mixture of WAL and logical size, so the unit is bytes.
794 (86) : ///
795 : /// Will be none if `?inputs_only=true` was given.
796 (86) : size: Option<u64>,
797 (86) : /// Size of each segment used in the model.
798 : /// Will be null if `?inputs_only=true` was given.
799 (86) : segment_sizes: Option<Vec<tenant_size_model::SegmentSizeResult>>,
800 (86) : inputs: crate::tenant::size::ModelInputs,
801 : }
802 (5) :
803 CBC 35 : json_response(
804 35 : StatusCode::OK,
805 35 : TenantHistorySize {
806 35 : id: tenant_id,
807 35 : size: sizes.as_ref().map(|x| x.total_size),
808 35 : segment_sizes: sizes.map(|x| x.segments),
809 35 : inputs,
810 35 : },
811 GIC 35 : )
812 CBC 53 : }
813 ECB (5) :
814 CBC 85 : async fn layer_map_info_handler(
815 85 : request: Request<Body>,
816 85 : _cancel: CancellationToken,
817 GIC 85 : ) -> Result<Response<Body>, ApiError> {
818 CBC 85 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
819 85 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
820 GBC 85 : let reset: LayerAccessStatsReset =
821 85 : parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset);
822 85 :
823 85 : check_permission(&request, Some(tenant_id))?;
824 EUB :
825 GIC 85 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
826 CBC 85 : let layer_map_info = timeline.layer_map_info(reset).await;
827 :
828 85 : json_response(StatusCode::OK, layer_map_info)
829 85 : }
830 ECB (416) :
831 CBC 6 : async fn layer_download_handler(
832 6 : request: Request<Body>,
833 6 : _cancel: CancellationToken,
834 6 : ) -> Result<Response<Body>, ApiError> {
835 6 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
836 GIC 6 : check_permission(&request, Some(tenant_id))?;
837 CBC 6 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
838 6 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
839 6 : check_permission(&request, Some(tenant_id))?;
840 ECB (10) :
841 CBC 6 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
842 GIC 6 : let downloaded = timeline
843 CBC 6 : .download_layer(layer_file_name)
844 6 : .await
845 GBC 6 : .map_err(ApiError::InternalServerError)?;
846 EUB :
847 GBC 6 : match downloaded {
848 6 : Some(true) => json_response(StatusCode::OK, ()),
849 UBC 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
850 UIC 0 : None => json_response(
851 LBC (416) : StatusCode::BAD_REQUEST,
852 UIC 0 : format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"),
853 0 : ),
854 ECB (18) : }
855 CBC 6 : }
856 ECB (18) :
857 CBC 407 : async fn evict_timeline_layer_handler(
858 407 : request: Request<Body>,
859 407 : _cancel: CancellationToken,
860 407 : ) -> Result<Response<Body>, ApiError> {
861 407 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
862 407 : check_permission(&request, Some(tenant_id))?;
863 407 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
864 407 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
865 ECB (18) :
866 CBC 407 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
867 407 : let evicted = timeline
868 407 : .evict_layer(layer_file_name)
869 6 : .await
870 GIC 407 : .map_err(ApiError::InternalServerError)?;
871 ECB (18) :
872 CBC 407 : match evicted {
873 407 : Some(true) => json_response(StatusCode::OK, ()),
874 UIC 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
875 LBC (18) : None => json_response(
876 (18) : StatusCode::BAD_REQUEST,
877 (18) : format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"),
878 (18) : ),
879 ECB (18) : }
880 CBC 407 : }
881 ECB (18) :
882 (18) : /// Get tenant_size SVG graph along with the JSON data.
883 CBC 18 : fn synthetic_size_html_response(
884 18 : inputs: ModelInputs,
885 18 : storage_model: StorageModel,
886 18 : sizes: SizeResult,
887 18 : ) -> Result<Response<Body>, ApiError> {
888 18 : let mut timeline_ids: Vec<String> = Vec::new();
889 18 : let mut timeline_map: HashMap<TimelineId, usize> = HashMap::new();
890 26 : for (index, ti) in inputs.timeline_inputs.iter().enumerate() {
891 26 : timeline_map.insert(ti.timeline_id, index);
892 26 : timeline_ids.push(ti.timeline_id.to_string());
893 26 : }
894 18 : let seg_to_branch: Vec<usize> = inputs
895 18 : .segments
896 18 : .iter()
897 66 : .map(|seg| *timeline_map.get(&seg.timeline_id).unwrap())
898 18 : .collect();
899 :
900 18 : let svg =
901 18 : tenant_size_model::svg::draw_svg(&storage_model, &timeline_ids, &seg_to_branch, &sizes)
902 18 : .map_err(ApiError::InternalServerError)?;
903 ECB (18) :
904 CBC 18 : let mut response = String::new();
905 18 :
906 18 : use std::fmt::Write;
907 18 : write!(response, "<html>\n<body>\n").unwrap();
908 GIC 18 : write!(response, "<div>\n{svg}\n</div>").unwrap();
909 18 : writeln!(response, "Project size: {}", sizes.total_size).unwrap();
910 18 : writeln!(response, "<pre>").unwrap();
911 CBC 18 : writeln!(
912 18 : response,
913 18 : "{}",
914 18 : serde_json::to_string_pretty(&inputs).unwrap()
915 18 : )
916 18 : .unwrap();
917 18 : writeln!(
918 GIC 18 : response,
919 18 : "{}",
920 CBC 18 : serde_json::to_string_pretty(&sizes.segments).unwrap()
921 GIC 18 : )
922 CBC 18 : .unwrap();
923 GIC 18 : writeln!(response, "</pre>").unwrap();
924 CBC 18 : write!(response, "</body>\n</html>\n").unwrap();
925 18 :
926 18 : html_response(StatusCode::OK, response)
927 18 : }
928 ECB (449) :
929 CBC 18 : pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>, ApiError> {
930 18 : let response = Response::builder()
931 GIC 18 : .status(status)
932 CBC 18 : .header(hyper::header::CONTENT_TYPE, "text/html")
933 18 : .body(Body::from(data.as_bytes().to_vec()))
934 18 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
935 18 : Ok(response)
936 GIC 18 : }
937 ECB (448) :
938 (448) : /// Helper for requests that may take a generation, which is mandatory
939 : /// when control_plane_api is set, but otherwise defaults to Generation::none()
940 CBC 513 : fn get_request_generation(state: &State, req_gen: Option<u32>) -> Result<Generation, ApiError> {
941 GIC 513 : if state.conf.control_plane_api.is_some() {
942 CBC 20 : req_gen
943 GIC 20 : .map(Generation::new)
944 CBC 20 : .ok_or(ApiError::BadRequest(anyhow!(
945 GIC 20 : "generation attribute missing"
946 CBC 20 : )))
947 ECB (448) : } else {
948 (448) : // Legacy mode: all tenants operate with no generation
949 CBC 493 : Ok(Generation::none())
950 ECB (448) : }
951 CBC 513 : }
952 ECB (448) :
953 CBC 451 : async fn tenant_create_handler(
954 451 : mut request: Request<Body>,
955 451 : _cancel: CancellationToken,
956 GIC 451 : ) -> Result<Response<Body>, ApiError> {
957 451 : let request_data: TenantCreateRequest = json_request(&mut request).await?;
958 451 : let target_tenant_id = request_data.new_tenant_id;
959 CBC 451 : check_permission(&request, None)?;
960 :
961 GIC 450 : let _timer = STORAGE_TIME_GLOBAL
962 450 : .get_metric_with_label_values(&[StorageTimeOperation::CreateTenant.into()])
963 450 : .expect("bug")
964 450 : .start_timer();
965 :
966 GBC 450 : let tenant_conf =
967 450 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
968 ECB (447) :
969 GIC 450 : let state = get_state(&request);
970 ECB (447) :
971 CBC 450 : let generation = get_request_generation(state, request_data.generation)?;
972 ECB (447) :
973 CBC 450 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
974 ECB (449) :
975 GIC 450 : let new_tenant = mgr::create_tenant(
976 CBC 450 : state.conf,
977 450 : tenant_conf,
978 450 : target_tenant_id,
979 450 : generation,
980 450 : state.tenant_resources(),
981 450 : &ctx,
982 GIC 450 : )
983 CBC 450 : .instrument(info_span!("tenant_create", tenant_id = %target_tenant_id))
984 GIC 1 : .await?;
985 ECB (40) :
986 : // We created the tenant. Existing API semantics are that the tenant
987 : // is Active when this function returns.
988 CBC 640 : if let res @ Err(_) = new_tenant.wait_to_become_active().await {
989 ECB (40) : // This shouldn't happen because we just created the tenant directory
990 (40) : // in tenant::mgr::create_tenant, and there aren't any remote timelines
991 : // to load, so, nothing can really fail during load.
992 : // Don't do cleanup because we don't know how we got here.
993 (40) : // The tenant will likely be in `Broken` state and subsequent
994 (40) : // calls will fail.
995 LBC (40) : res.context("created tenant failed to become active")
996 (40) : .map_err(ApiError::InternalServerError)?;
997 GIC 449 : }
998 :
999 449 : json_response(
1000 CBC 449 : StatusCode::CREATED,
1001 449 : TenantCreateResponse(new_tenant.tenant_id()),
1002 GIC 449 : )
1003 CBC 451 : }
1004 ECB (27) :
1005 CBC 40 : async fn get_tenant_config_handler(
1006 40 : request: Request<Body>,
1007 40 : _cancel: CancellationToken,
1008 40 : ) -> Result<Response<Body>, ApiError> {
1009 40 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1010 GIC 40 : check_permission(&request, Some(tenant_id))?;
1011 ECB (27) :
1012 CBC 40 : let tenant = mgr::get_tenant(tenant_id, false).await?;
1013 :
1014 40 : let response = HashMap::from([
1015 ECB (27) : (
1016 (27) : "tenant_specific_overrides",
1017 GBC 40 : serde_json::to_value(tenant.tenant_specific_overrides())
1018 GIC 40 : .context("serializing tenant specific overrides")
1019 CBC 40 : .map_err(ApiError::InternalServerError)?,
1020 ECB (27) : ),
1021 : (
1022 GBC 40 : "effective_config",
1023 40 : serde_json::to_value(tenant.effective_config())
1024 40 : .context("serializing effective config")
1025 40 : .map_err(ApiError::InternalServerError)?,
1026 EUB : ),
1027 : ]);
1028 :
1029 GIC 40 : json_response(StatusCode::OK, response)
1030 GBC 40 : }
1031 EUB :
1032 GBC 27 : async fn update_tenant_config_handler(
1033 27 : mut request: Request<Body>,
1034 27 : _cancel: CancellationToken,
1035 27 : ) -> Result<Response<Body>, ApiError> {
1036 27 : let request_data: TenantConfigRequest = json_request(&mut request).await?;
1037 27 : let tenant_id = request_data.tenant_id;
1038 27 : check_permission(&request, Some(tenant_id))?;
1039 EUB :
1040 GIC 27 : let tenant_conf =
1041 GBC 27 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1042 EUB :
1043 GBC 27 : let state = get_state(&request);
1044 27 : mgr::set_new_tenant_config(state.conf, tenant_conf, tenant_id)
1045 27 : .instrument(info_span!("tenant_config", %tenant_id))
1046 UIC 0 : .await?;
1047 EUB :
1048 GBC 27 : json_response(StatusCode::OK, ())
1049 27 : }
1050 :
1051 UBC 0 : async fn put_tenant_location_config_handler(
1052 0 : mut request: Request<Body>,
1053 UIC 0 : _cancel: CancellationToken,
1054 UBC 0 : ) -> Result<Response<Body>, ApiError> {
1055 0 : let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
1056 0 : let tenant_id = request_data.tenant_id;
1057 0 : check_permission(&request, Some(tenant_id))?;
1058 EUB :
1059 UBC 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1060 0 : let state = get_state(&request);
1061 0 : let conf = state.conf;
1062 0 :
1063 0 : // The `Detached` state is special, it doesn't upsert a tenant, it removes
1064 UIC 0 : // its local disk content and drops it from memory.
1065 0 : if let LocationConfigMode::Detached = request_data.config.mode {
1066 0 : if let Err(e) = mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
1067 UBC 0 : .instrument(info_span!("tenant_detach", %tenant_id))
1068 UIC 0 : .await
1069 EUB : {
1070 UBC 0 : match e {
1071 UIC 0 : TenantStateError::NotFound(_) => {
1072 0 : // This API is idempotent: a NotFound on a detach is fine.
1073 LBC (2) : }
1074 (2) : _ => return Err(e.into()),
1075 ECB (2) : }
1076 LBC (2) : }
1077 (2) : return json_response(StatusCode::OK, ());
1078 UIC 0 : }
1079 ECB (2) :
1080 UBC 0 : let location_conf =
1081 LBC (2) : LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1082 :
1083 (2) : mgr::upsert_location(
1084 UIC 0 : state.conf,
1085 LBC (2) : tenant_id,
1086 (2) : location_conf,
1087 UIC 0 : state.broker_client.clone(),
1088 LBC (204) : state.remote_storage.clone(),
1089 (204) : state.deletion_queue_client.clone(),
1090 (204) : &ctx,
1091 (204) : )
1092 (204) : .await
1093 EUB : // TODO: badrequest assumes the caller was asking for something unreasonable, but in
1094 : // principle we might have hit something like concurrent API calls to the same tenant,
1095 : // which is not a 400 but a 409.
1096 LBC (204) : .map_err(ApiError::BadRequest)?;
1097 :
1098 (204) : json_response(StatusCode::OK, ())
1099 (415) : }
1100 ECB (211) :
1101 : /// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
1102 GIC 2 : async fn handle_tenant_break(
1103 2 : r: Request<Body>,
1104 CBC 2 : _cancel: CancellationToken,
1105 GIC 2 : ) -> Result<Response<Body>, ApiError> {
1106 CBC 2 : let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?;
1107 EUB :
1108 GBC 2 : let tenant = crate::tenant::mgr::get_tenant(tenant_id, true)
1109 UBC 0 : .await
1110 CBC 2 : .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
1111 :
1112 GIC 2 : tenant.set_broken("broken from test".to_owned()).await;
1113 ECB (204) :
1114 CBC 2 : json_response(StatusCode::OK, ())
1115 GIC 2 : }
1116 :
1117 CBC 206 : async fn failpoints_handler(
1118 206 : mut request: Request<Body>,
1119 206 : _cancel: CancellationToken,
1120 206 : ) -> Result<Response<Body>, ApiError> {
1121 206 : if !fail::has_failpoints() {
1122 LBC (387) : return Err(ApiError::BadRequest(anyhow!(
1123 (387) : "Cannot manage failpoints because pageserver was compiled without failpoints support"
1124 UIC 0 : )));
1125 CBC 206 : }
1126 :
1127 206 : let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
1128 419 : for fp in failpoints {
1129 213 : info!("cfg failpoint: {} {}", fp.name, fp.actions);
1130 ECB (386) :
1131 (386) : // We recognize one extra "action" that's not natively recognized
1132 (386) : // by the failpoints crate: exit, to immediately kill the process
1133 CBC 213 : let cfg_result = crate::failpoint_support::apply_failpoint(&fp.name, &fp.actions);
1134 :
1135 213 : if let Err(err_msg) = cfg_result {
1136 LBC (387) : return Err(ApiError::BadRequest(anyhow!(
1137 UIC 0 : "Failed to configure failpoints: {err_msg}"
1138 0 : )));
1139 CBC 213 : }
1140 ECB (123) : }
1141 (123) :
1142 CBC 206 : json_response(StatusCode::OK, ())
1143 206 : }
1144 ECB (123) :
1145 (123) : // Run GC immediately on given timeline.
1146 GIC 371 : async fn timeline_gc_handler(
1147 CBC 371 : mut request: Request<Body>,
1148 371 : _cancel: CancellationToken,
1149 371 : ) -> Result<Response<Body>, ApiError> {
1150 371 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1151 371 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1152 371 : check_permission(&request, Some(tenant_id))?;
1153 ECB (123) :
1154 CBC 371 : let gc_req: TimelineGcRequest = json_request(&mut request).await?;
1155 ECB (123) :
1156 CBC 371 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1157 371 : let wait_task_done = mgr::immediate_gc(tenant_id, timeline_id, gc_req, &ctx).await?;
1158 370 : let gc_result = wait_task_done
1159 GIC 370 : .await
1160 370 : .context("wait for gc task")
1161 CBC 370 : .map_err(ApiError::InternalServerError)?
1162 370 : .map_err(ApiError::InternalServerError)?;
1163 ECB (593) :
1164 CBC 369 : json_response(StatusCode::OK, gc_result)
1165 371 : }
1166 ECB (593) :
1167 (593) : // Run compaction immediately on given timeline.
1168 CBC 123 : async fn timeline_compact_handler(
1169 123 : request: Request<Body>,
1170 123 : cancel: CancellationToken,
1171 123 : ) -> Result<Response<Body>, ApiError> {
1172 123 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1173 123 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1174 123 : check_permission(&request, Some(tenant_id))?;
1175 ECB (593) :
1176 CBC 123 : async {
1177 123 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1178 123 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
1179 GIC 123 : timeline
1180 CBC 123 : .compact(&cancel, &ctx)
1181 23107 : .await
1182 123 : .map_err(ApiError::InternalServerError)?;
1183 123 : json_response(StatusCode::OK, ())
1184 123 : }
1185 GIC 123 : .instrument(info_span!("manual_compaction", %tenant_id, %timeline_id))
1186 CBC 23107 : .await
1187 123 : }
1188 ECB (3) :
1189 (3) : // Run checkpoint immediately on given timeline.
1190 CBC 577 : async fn timeline_checkpoint_handler(
1191 577 : request: Request<Body>,
1192 577 : cancel: CancellationToken,
1193 577 : ) -> Result<Response<Body>, ApiError> {
1194 GIC 577 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1195 CBC 577 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1196 577 : check_permission(&request, Some(tenant_id))?;
1197 577 : async {
1198 GBC 577 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1199 GIC 577 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
1200 CBC 577 : timeline
1201 GIC 577 : .freeze_and_flush()
1202 CBC 625 : .await
1203 577 : .map_err(ApiError::InternalServerError)?;
1204 577 : timeline
1205 577 : .compact(&cancel, &ctx)
1206 426014 : .await
1207 576 : .map_err(ApiError::InternalServerError)?;
1208 ECB (25) :
1209 GIC 575 : json_response(StatusCode::OK, ())
1210 CBC 576 : }
1211 577 : .instrument(info_span!("manual_checkpoint", %tenant_id, %timeline_id))
1212 426639 : .await
1213 576 : }
1214 ECB (25) :
1215 CBC 3 : async fn timeline_download_remote_layers_handler_post(
1216 3 : mut request: Request<Body>,
1217 GIC 3 : _cancel: CancellationToken,
1218 CBC 3 : ) -> Result<Response<Body>, ApiError> {
1219 3 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1220 3 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1221 3 : let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?;
1222 3 : check_permission(&request, Some(tenant_id))?;
1223 ECB (19) :
1224 CBC 3 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
1225 GIC 3 : match timeline.spawn_download_all_remote_layers(body).await {
1226 GBC 3 : Ok(st) => json_response(StatusCode::ACCEPTED, st),
1227 LBC (19) : Err(st) => json_response(StatusCode::CONFLICT, st),
1228 : }
1229 CBC 3 : }
1230 ECB (19) :
1231 CBC 22 : async fn timeline_download_remote_layers_handler_get(
1232 22 : request: Request<Body>,
1233 22 : _cancel: CancellationToken,
1234 GIC 22 : ) -> Result<Response<Body>, ApiError> {
1235 CBC 22 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1236 GIC 22 : check_permission(&request, Some(tenant_id))?;
1237 CBC 22 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1238 :
1239 22 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
1240 22 : let info = timeline
1241 22 : .get_download_all_remote_layers_task_info()
1242 22 : .context("task never started since last pageserver process start")
1243 22 : .map_err(|e| ApiError::NotFound(e.into()))?;
1244 GIC 22 : json_response(StatusCode::OK, info)
1245 22 : }
1246 :
1247 19 : async fn deletion_queue_flush(
1248 19 : r: Request<Body>,
1249 CBC 19 : cancel: CancellationToken,
1250 GIC 19 : ) -> Result<Response<Body>, ApiError> {
1251 19 : let state = get_state(&r);
1252 GBC 19 :
1253 19 : if state.remote_storage.is_none() {
1254 EUB : // Nothing to do if remote storage is disabled.
1255 UBC 0 : return json_response(StatusCode::OK, ());
1256 GBC 19 : }
1257 EUB :
1258 GBC 19 : let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
1259 GIC 19 :
1260 19 : let flush = async {
1261 19 : if execute {
1262 33 : state.deletion_queue_client.flush_execute().await
1263 : } else {
1264 8 : state.deletion_queue_client.flush().await
1265 EUB : }
1266 GBC 19 : }
1267 EUB : // DeletionQueueError's only case is shutting down.
1268 GIC 19 : .map_err(|_| ApiError::ShuttingDown);
1269 19 :
1270 GBC 60 : tokio::select! {
1271 19 : res = flush => {
1272 19 : res.map(|()| json_response(StatusCode::OK, ()))?
1273 EUB : }
1274 : _ = cancel.cancelled() => {
1275 : Err(ApiError::ShuttingDown)
1276 : }
1277 : }
1278 GIC 19 : }
1279 EUB :
1280 : /// Try if `GetPage@Lsn` is successful, useful for manual debugging.
1281 UBC 0 : async fn getpage_at_lsn_handler(
1282 0 : request: Request<Body>,
1283 0 : _cancel: CancellationToken,
1284 0 : ) -> Result<Response<Body>, ApiError> {
1285 0 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1286 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1287 0 : check_permission(&request, Some(tenant_id))?;
1288 EUB :
1289 : struct Key(crate::repository::Key);
1290 :
1291 : impl std::str::FromStr for Key {
1292 : type Err = anyhow::Error;
1293 :
1294 UBC 0 : fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
1295 0 : crate::repository::Key::from_hex(s).map(Key)
1296 0 : }
1297 EUB : }
1298 :
1299 UBC 0 : let key: Key = parse_query_param(&request, "key")?
1300 UIC 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
1301 0 : let lsn: Lsn = parse_query_param(&request, "lsn")?
1302 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
1303 :
1304 0 : async {
1305 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1306 0 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
1307 :
1308 UBC 0 : let page = timeline.get(key.0, lsn, &ctx).await?;
1309 EUB :
1310 UBC 0 : Result::<_, ApiError>::Ok(
1311 0 : Response::builder()
1312 UIC 0 : .status(StatusCode::OK)
1313 UBC 0 : .header(CONTENT_TYPE, "application/octet-stream")
1314 0 : .body(hyper::Body::from(page))
1315 0 : .unwrap(),
1316 0 : )
1317 0 : }
1318 0 : .instrument(info_span!("timeline_get", %tenant_id, %timeline_id))
1319 0 : .await
1320 UIC 0 : }
1321 :
1322 0 : async fn timeline_collect_keyspace(
1323 0 : request: Request<Body>,
1324 0 : _cancel: CancellationToken,
1325 UBC 0 : ) -> Result<Response<Body>, ApiError> {
1326 0 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
1327 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1328 0 : check_permission(&request, Some(tenant_id))?;
1329 EUB :
1330 : struct Partitioning {
1331 : keys: crate::keyspace::KeySpace,
1332 :
1333 : at_lsn: Lsn,
1334 : }
1335 :
1336 : impl serde::Serialize for Partitioning {
1337 UBC 0 : fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1338 0 : where
1339 0 : S: serde::Serializer,
1340 UIC 0 : {
1341 EUB : use serde::ser::SerializeMap;
1342 UBC 0 : let mut map = serializer.serialize_map(Some(2))?;
1343 0 : map.serialize_key("keys")?;
1344 UIC 0 : map.serialize_value(&KeySpace(&self.keys))?;
1345 UBC 0 : map.serialize_key("at_lsn")?;
1346 0 : map.serialize_value(&WithDisplay(&self.at_lsn))?;
1347 UIC 0 : map.end()
1348 0 : }
1349 : }
1350 :
1351 : struct WithDisplay<'a, T>(&'a T);
1352 EUB :
1353 : impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
1354 UBC 0 : fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1355 0 : where
1356 UIC 0 : S: serde::Serializer,
1357 UBC 0 : {
1358 0 : serializer.collect_str(&self.0)
1359 0 : }
1360 EUB : }
1361 :
1362 : struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
1363 :
1364 : impl<'a> serde::Serialize for KeySpace<'a> {
1365 UIC 0 : fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1366 UBC 0 : where
1367 0 : S: serde::Serializer,
1368 0 : {
1369 EUB : use serde::ser::SerializeSeq;
1370 UBC 0 : let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
1371 0 : for kr in &self.0.ranges {
1372 0 : seq.serialize_element(&KeyRange(kr))?;
1373 EUB : }
1374 UIC 0 : seq.end()
1375 UBC 0 : }
1376 EUB : }
1377 :
1378 : struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
1379 :
1380 : impl<'a> serde::Serialize for KeyRange<'a> {
1381 LBC (1263) : fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1382 (1263) : where
1383 (1263) : S: serde::Serializer,
1384 (1263) : {
1385 ECB (1263) : use serde::ser::SerializeTuple;
1386 LBC (1263) : let mut t = serializer.serialize_tuple(2)?;
1387 (1263) : t.serialize_element(&WithDisplay(&self.0.start))?;
1388 (1263) : t.serialize_element(&WithDisplay(&self.0.end))?;
1389 (1263) : t.end()
1390 UIC 0 : }
1391 EUB : }
1392 :
1393 UBC 0 : let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
1394 EUB :
1395 UBC 0 : async {
1396 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1397 0 : let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
1398 0 : let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
1399 0 : let keys = timeline
1400 0 : .collect_keyspace(at_lsn, &ctx)
1401 0 : .await
1402 0 : .map_err(ApiError::InternalServerError)?;
1403 EUB :
1404 UIC 0 : json_response(StatusCode::OK, Partitioning { keys, at_lsn })
1405 LBC (5) : }
1406 (5) : .instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id))
1407 (5) : .await
1408 (5) : }
1409 ECB (5) :
1410 GIC 1247 : async fn active_timeline_of_active_tenant(
1411 CBC 1247 : tenant_id: TenantId,
1412 GIC 1247 : timeline_id: TimelineId,
1413 1247 : ) -> Result<Arc<Timeline>, ApiError> {
1414 1247 : let tenant = mgr::get_tenant(tenant_id, true).await?;
1415 1247 : tenant
1416 1247 : .get_timeline(timeline_id, true)
1417 CBC 1247 : .map_err(|e| ApiError::NotFound(e.into()))
1418 GIC 1247 : }
1419 :
1420 UIC 0 : async fn always_panic_handler(
1421 0 : req: Request<Body>,
1422 0 : _cancel: CancellationToken,
1423 0 : ) -> Result<Response<Body>, ApiError> {
1424 0 : // Deliberately cause a panic to exercise the panic hook registered via std::panic::set_hook().
1425 0 : // For pageserver, the relevant panic hook is `tracing_panic_hook` , and the `sentry` crate's wrapper around it.
1426 LBC (114) : // Use catch_unwind to ensure that tokio nor hyper are distracted by our panic.
1427 (114) : let query = req.uri().query();
1428 (114) : let _ = std::panic::catch_unwind(|| {
1429 UIC 0 : panic!("unconditional panic for testing panic hook integration; request query: {query:?}")
1430 LBC (212) : });
1431 (212) : json_response(StatusCode::NO_CONTENT, ())
1432 (212) : }
1433 :
1434 GIC 5 : async fn disk_usage_eviction_run(
1435 CBC 5 : mut r: Request<Body>,
1436 GBC 5 : _cancel: CancellationToken,
1437 CBC 5 : ) -> Result<Response<Body>, ApiError> {
1438 GIC 5 : check_permission(&r, None)?;
1439 ECB (5) :
1440 CBC 26 : #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
1441 ECB (5) : struct Config {
1442 (5) : /// How many bytes to evict before reporting that pressure is relieved.
1443 (5) : evict_bytes: u64,
1444 (5) : }
1445 (5) :
1446 CBC 26 : #[derive(Debug, Clone, Copy, serde::Serialize)]
1447 : struct Usage {
1448 ECB (5) : // remains unchanged after instantiation of the struct
1449 EUB : config: Config,
1450 : // updated by `add_available_bytes`
1451 : freed_bytes: u64,
1452 : }
1453 :
1454 ECB (5) : impl crate::disk_usage_eviction_task::Usage for Usage {
1455 CBC 113 : fn has_pressure(&self) -> bool {
1456 113 : self.config.evict_bytes > self.freed_bytes
1457 113 : }
1458 ECB (5) :
1459 CBC 210 : fn add_available_bytes(&mut self, bytes: u64) {
1460 210 : self.freed_bytes += bytes;
1461 210 : }
1462 ECB (5) : }
1463 (5) :
1464 CBC 5 : let config = json_request::<Config>(&mut r)
1465 LBC (5) : .await
1466 CBC 5 : .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
1467 ECB (5) :
1468 CBC 5 : let usage = Usage {
1469 5 : config,
1470 5 : freed_bytes: 0,
1471 5 : };
1472 5 :
1473 5 : let (tx, rx) = tokio::sync::oneshot::channel();
1474 GBC 5 :
1475 GIC 5 : let state = get_state(&r);
1476 ECB (5) :
1477 GIC 5 : let Some(storage) = state.remote_storage.clone() else {
1478 LBC (5) : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1479 (5) : "remote storage not configured, cannot run eviction iteration"
1480 (5) : )));
1481 ECB (5) : };
1482 (5) :
1483 GIC 5 : let state = state.disk_usage_eviction_state.clone();
1484 CBC 5 :
1485 GIC 5 : let cancel = CancellationToken::new();
1486 CBC 5 : let child_cancel = cancel.clone();
1487 5 : let _g = cancel.drop_guard();
1488 GIC 5 :
1489 GBC 5 : crate::task_mgr::spawn(
1490 5 : crate::task_mgr::BACKGROUND_RUNTIME.handle(),
1491 5 : TaskKind::DiskUsageEviction,
1492 5 : None,
1493 5 : None,
1494 5 : "ondemand disk usage eviction",
1495 GIC 5 : false,
1496 CBC 5 : async move {
1497 5 : let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
1498 5 : &state,
1499 5 : &storage,
1500 5 : usage,
1501 GIC 5 : &child_cancel,
1502 5 : )
1503 UIC 0 : .await;
1504 :
1505 GIC 5 : info!(?res, "disk_usage_eviction_task_iteration_impl finished");
1506 :
1507 5 : let _ = tx.send(res);
1508 5 : Ok(())
1509 CBC 5 : }
1510 GIC 5 : .in_current_span(),
1511 5 : );
1512 :
1513 5 : let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?;
1514 ECB (5) :
1515 GBC 5 : json_response(StatusCode::OK, response)
1516 CBC 5 : }
1517 :
1518 LBC (5) : async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
1519 (2) : json_response(
1520 (2) : StatusCode::NOT_FOUND,
1521 (2) : HttpErrorBody::from_msg("page not found".to_owned()),
1522 (1) : )
1523 (1) : }
1524 :
1525 GIC 5 : async fn post_tracing_event_handler(
1526 CBC 5 : mut r: Request<Body>,
1527 5 : _cancel: CancellationToken,
1528 GIC 5 : ) -> Result<Response<Body>, ApiError> {
1529 10 : #[derive(Debug, serde::Deserialize)]
1530 : #[serde(rename_all = "lowercase")]
1531 : enum Level {
1532 : Error,
1533 : Warn,
1534 : Info,
1535 : Debug,
1536 : Trace,
1537 : }
1538 CBC 25 : #[derive(Debug, serde::Deserialize)]
1539 ECB (6639) : struct Request {
1540 (6639) : level: Level,
1541 (6639) : message: String,
1542 (6639) : }
1543 CBC 5 : let body: Request = json_request(&mut r)
1544 1 : .await
1545 5 : .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
1546 ECB (6639) :
1547 CBC 5 : match body.level {
1548 2 : Level::Error => tracing::error!(?body.message),
1549 2 : Level::Warn => tracing::warn!(?body.message),
1550 2 : Level::Info => tracing::info!(?body.message),
1551 1 : Level::Debug => tracing::debug!(?body.message),
1552 1 : Level::Trace => tracing::trace!(?body.message),
1553 ECB (3347541) : }
1554 (6633) :
1555 CBC 5 : json_response(StatusCode::OK, ())
1556 5 : }
1557 ECB (6633) :
1558 (6639) : /// Common functionality of all the HTTP API handlers.
1559 (6639) : ///
1560 (6639) : /// - Adds a tracing span to each request (by `request_span`)
1561 (6639) : /// - Logs the request depending on the request method (by `request_span`)
1562 (6762) : /// - Logs the response if it was not successful (by `request_span`
1563 (6632) : /// - Shields the handler function from async cancellations. Hyper can drop the handler
1564 EUB : /// Future if the connection to the client is lost, but most of the pageserver code is
1565 : /// not async cancellation safe. This converts the dropped future into a graceful cancellation
1566 : /// request with a CancellationToken.
1567 GBC 6618 : async fn api_handler<R, H>(request: Request<Body>, handler: H) -> Result<Response<Body>, ApiError>
1568 6618 : where
1569 GIC 6618 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1570 6618 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
1571 6618 : {
1572 6618 : // Spawn a new task to handle the request, to protect the handler from unexpected
1573 GBC 6618 : // async cancellations. Most pageserver functions are not async cancellation safe.
1574 6618 : // We arm a drop-guard, so that if Hyper drops the Future, we signal the task
1575 6618 : // with the cancellation token.
1576 6618 : let token = CancellationToken::new();
1577 GIC 6618 : let cancel_guard = token.clone().drop_guard();
1578 6618 : let result = request_span(request, move |r| async {
1579 CBC 6618 : let handle = tokio::spawn(
1580 6618 : async {
1581 GIC 6618 : let token_cloned = token.clone();
1582 CBC 3346008 : let result = handler(r, token).await;
1583 6612 : if token_cloned.is_cancelled() {
1584 1 : info!("Cancelled request finished");
1585 6611 : }
1586 GIC 6612 : result
1587 6618 : }
1588 6618 : .in_current_span(),
1589 CBC 6618 : );
1590 6618 :
1591 6734 : match handle.await {
1592 6611 : Ok(result) => result,
1593 LBC (927) : Err(e) => {
1594 (927) : // The handler task panicked. We have a global panic handler that logs the
1595 (927) : // panic with its backtrace, so no need to log that here. Only log a brief
1596 (927) : // message to make it clear that we returned the error to the client.
1597 (927) : error!("HTTP request handler task panicked: {e:#}");
1598 ECB (927) :
1599 (929) : // Don't return an Error here, because then fallback error handler that was
1600 : // installed in make_router() will print the error. Instead, construct the
1601 EUB : // HTTP error response and return that.
1602 UBC 0 : Ok(
1603 0 : ApiError::InternalServerError(anyhow!("HTTP request handler task panicked"))
1604 0 : .into_response(),
1605 UIC 0 : )
1606 ECB (926) : }
1607 : }
1608 CBC 6618 : })
1609 6734 : .await;
1610 ECB (558) :
1611 CBC 6611 : cancel_guard.disarm();
1612 6611 :
1613 6611 : result
1614 6611 : }
1615 ECB (558) :
1616 (71) : /// Like api_handler, but returns an error response if the server is built without
1617 (71) : /// the 'testing' feature.
1618 CBC 913 : async fn testing_api_handler<R, H>(
1619 913 : desc: &str,
1620 GIC 913 : request: Request<Body>,
1621 CBC 913 : handler: H,
1622 GIC 913 : ) -> Result<Response<Body>, ApiError>
1623 CBC 913 : where
1624 913 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
1625 GIC 913 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
1626 CBC 913 : {
1627 913 : if cfg!(feature = "testing") {
1628 913 : api_handler(request, handler).await
1629 ECB (558) : } else {
1630 LBC (558) : std::future::ready(Err(ApiError::BadRequest(anyhow!(
1631 (558) : "Cannot {desc} because pageserver was compiled without testing APIs",
1632 (558) : ))))
1633 (558) : .await
1634 ECB (558) : }
1635 CBC 912 : }
1636 ECB (562) :
1637 CBC 560 : pub fn make_router(
1638 560 : state: Arc<State>,
1639 560 : launch_ts: &'static LaunchTimestamp,
1640 560 : auth: Option<Arc<JwtAuth>>,
1641 560 : ) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
1642 560 : let spec = include_bytes!("openapi_spec.yml");
1643 560 : let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
1644 560 : if auth.is_some() {
1645 70 : router = router.middleware(auth_middleware(|request| {
1646 70 : let state = get_state(request);
1647 70 : if state.allowlist_routes.contains(request.uri()) {
1648 10 : None
1649 ECB (558) : } else {
1650 CBC 60 : state.auth.as_deref()
1651 ECB (558) : }
1652 CBC 70 : }))
1653 551 : }
1654 ECB (558) :
1655 CBC 560 : router = router.middleware(
1656 GBC 560 : endpoint::add_response_header_middleware(
1657 CBC 560 : "PAGESERVER_LAUNCH_TIMESTAMP",
1658 560 : &launch_ts.to_string(),
1659 560 : )
1660 560 : .expect("construct launch timestamp header middleware"),
1661 560 : );
1662 560 :
1663 560 : Ok(router
1664 560 : .data(state)
1665 564 : .get("/v1/status", |r| api_handler(r, status_handler))
1666 560 : .put("/v1/failpoints", |r| {
1667 206 : testing_api_handler("manage failpoints", r, failpoints_handler)
1668 560 : })
1669 560 : .get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
1670 560 : .post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
1671 560 : .get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
1672 560 : .delete("/v1/tenant/:tenant_id", |r| {
1673 91 : api_handler(r, tenant_delete_handler)
1674 560 : })
1675 560 : .get("/v1/tenant/:tenant_id/synthetic_size", |r| {
1676 53 : api_handler(r, tenant_size_handler)
1677 560 : })
1678 560 : .put("/v1/tenant/config", |r| {
1679 27 : api_handler(r, update_tenant_config_handler)
1680 560 : })
1681 560 : .get("/v1/tenant/:tenant_id/config", |r| {
1682 40 : api_handler(r, get_tenant_config_handler)
1683 560 : })
1684 560 : .put("/v1/tenant/:tenant_id/location_config", |r| {
1685 LBC (558) : api_handler(r, put_tenant_location_config_handler)
1686 CBC 560 : })
1687 560 : .get("/v1/tenant/:tenant_id/timeline", |r| {
1688 47 : api_handler(r, timeline_list_handler)
1689 560 : })
1690 848 : .post("/v1/tenant/:tenant_id/timeline", |r| {
1691 848 : api_handler(r, timeline_create_handler)
1692 848 : })
1693 560 : .post("/v1/tenant/:tenant_id/attach", |r| {
1694 60 : api_handler(r, tenant_attach_handler)
1695 560 : })
1696 560 : .post("/v1/tenant/:tenant_id/detach", |r| {
1697 38 : api_handler(r, tenant_detach_handler)
1698 560 : })
1699 560 : .post("/v1/tenant/:tenant_id/load", |r| {
1700 6 : api_handler(r, tenant_load_handler)
1701 560 : })
1702 560 : .post("/v1/tenant/:tenant_id/ignore", |r| {
1703 7 : api_handler(r, tenant_ignore_handler)
1704 560 : })
1705 1868 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
1706 1868 : api_handler(r, timeline_detail_handler)
1707 1868 : })
1708 560 : .get(
1709 560 : "/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp",
1710 560 : |r| api_handler(r, get_lsn_by_timestamp_handler),
1711 560 : )
1712 560 : .get(
1713 560 : "/v1/tenant/:tenant_id/timeline/:timeline_id/get_timestamp_of_lsn",
1714 560 : |r| api_handler(r, get_timestamp_of_lsn_handler),
1715 560 : )
1716 560 : .put("/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", |r| {
1717 371 : api_handler(r, timeline_gc_handler)
1718 560 : })
1719 560 : .put("/v1/tenant/:tenant_id/timeline/:timeline_id/compact", |r| {
1720 123 : testing_api_handler("run timeline compaction", r, timeline_compact_handler)
1721 560 : })
1722 560 : .put(
1723 560 : "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
1724 577 : |r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler),
1725 560 : )
1726 560 : .post(
1727 560 : "/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
1728 560 : |r| api_handler(r, timeline_download_remote_layers_handler_post),
1729 GBC 560 : )
1730 CBC 560 : .get(
1731 560 : "/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
1732 560 : |r| api_handler(r, timeline_download_remote_layers_handler_get),
1733 560 : )
1734 560 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
1735 99 : api_handler(r, timeline_delete_handler)
1736 560 : })
1737 GIC 560 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {
1738 85 : api_handler(r, layer_map_info_handler)
1739 560 : })
1740 560 : .get(
1741 560 : "/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
1742 560 : |r| api_handler(r, layer_download_handler),
1743 560 : )
1744 560 : .delete(
1745 560 : "/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
1746 560 : |r| api_handler(r, evict_timeline_layer_handler),
1747 560 : )
1748 560 : .put("/v1/disk_usage_eviction/run", |r| {
1749 5 : api_handler(r, disk_usage_eviction_run)
1750 560 : })
1751 560 : .put("/v1/deletion_queue/flush", |r| {
1752 19 : api_handler(r, deletion_queue_flush)
1753 560 : })
1754 560 : .put("/v1/tenant/:tenant_id/break", |r| {
1755 2 : testing_api_handler("set tenant state to broken", r, handle_tenant_break)
1756 560 : })
1757 560 : .get("/v1/panic", |r| api_handler(r, always_panic_handler))
1758 560 : .post("/v1/tracing/event", |r| {
1759 5 : testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
1760 560 : })
1761 560 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id/getpage", |r| {
1762 UIC 0 : testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler)
1763 GIC 560 : })
1764 560 : .get(
1765 560 : "/v1/tenant/:tenant_id/timeline/:timeline_id/keyspace",
1766 560 : |r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
1767 560 : )
1768 560 : .any(handler_404))
1769 560 : }
|