Line data Source code
1 : //!
2 : //! Management HTTP API
3 : //!
4 : use std::cmp::Reverse;
5 : use std::collections::BinaryHeap;
6 : use std::collections::HashMap;
7 : use std::str::FromStr;
8 : use std::sync::Arc;
9 : use std::time::Duration;
10 :
11 : use anyhow::{anyhow, Context, Result};
12 : use enumset::EnumSet;
13 : use futures::StreamExt;
14 : use futures::TryFutureExt;
15 : use humantime::format_rfc3339;
16 : use hyper::header;
17 : use hyper::StatusCode;
18 : use hyper::{Body, Request, Response, Uri};
19 : use metrics::launch_timestamp::LaunchTimestamp;
20 : use pageserver_api::models::virtual_file::IoMode;
21 : use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
22 : use pageserver_api::models::IngestAuxFilesRequest;
23 : use pageserver_api::models::ListAuxFilesRequest;
24 : use pageserver_api::models::LocationConfig;
25 : use pageserver_api::models::LocationConfigListResponse;
26 : use pageserver_api::models::LocationConfigMode;
27 : use pageserver_api::models::LsnLease;
28 : use pageserver_api::models::LsnLeaseRequest;
29 : use pageserver_api::models::OffloadedTimelineInfo;
30 : use pageserver_api::models::ShardParameters;
31 : use pageserver_api::models::TenantDetails;
32 : use pageserver_api::models::TenantLocationConfigRequest;
33 : use pageserver_api::models::TenantLocationConfigResponse;
34 : use pageserver_api::models::TenantScanRemoteStorageResponse;
35 : use pageserver_api::models::TenantScanRemoteStorageShard;
36 : use pageserver_api::models::TenantShardLocation;
37 : use pageserver_api::models::TenantShardSplitRequest;
38 : use pageserver_api::models::TenantShardSplitResponse;
39 : use pageserver_api::models::TenantSorting;
40 : use pageserver_api::models::TimelineArchivalConfigRequest;
41 : use pageserver_api::models::TimelineCreateRequestMode;
42 : use pageserver_api::models::TimelinesInfoAndOffloaded;
43 : use pageserver_api::models::TopTenantShardItem;
44 : use pageserver_api::models::TopTenantShardsRequest;
45 : use pageserver_api::models::TopTenantShardsResponse;
46 : use pageserver_api::shard::ShardCount;
47 : use pageserver_api::shard::TenantShardId;
48 : use remote_storage::DownloadError;
49 : use remote_storage::GenericRemoteStorage;
50 : use remote_storage::TimeTravelError;
51 : use tenant_size_model::{svg::SvgBranchKind, SizeResult, StorageModel};
52 : use tokio_util::io::StreamReader;
53 : use tokio_util::sync::CancellationToken;
54 : use tracing::*;
55 : use utils::auth::JwtAuth;
56 : use utils::failpoint_support::failpoints_handler;
57 : use utils::http::endpoint::prometheus_metrics_handler;
58 : use utils::http::endpoint::request_span;
59 : use utils::http::request::must_parse_query_param;
60 : use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
61 :
62 : use crate::config::PageServerConf;
63 : use crate::context::{DownloadBehavior, RequestContext};
64 : use crate::deletion_queue::DeletionQueueClient;
65 : use crate::pgdatadir_mapping::LsnForTimestamp;
66 : use crate::task_mgr::TaskKind;
67 : use crate::tenant::config::{LocationConf, TenantConfOpt};
68 : use crate::tenant::mgr::GetActiveTenantError;
69 : use crate::tenant::mgr::{
70 : GetTenantError, TenantManager, TenantMapError, TenantMapInsertError, TenantSlotError,
71 : TenantSlotUpsertError, TenantStateError,
72 : };
73 : use crate::tenant::mgr::{TenantSlot, UpsertLocationError};
74 : use crate::tenant::remote_timeline_client;
75 : use crate::tenant::remote_timeline_client::download_index_part;
76 : use crate::tenant::remote_timeline_client::list_remote_tenant_shards;
77 : use crate::tenant::remote_timeline_client::list_remote_timelines;
78 : use crate::tenant::secondary::SecondaryController;
79 : use crate::tenant::size::ModelInputs;
80 : use crate::tenant::storage_layer::LayerAccessStatsReset;
81 : use crate::tenant::storage_layer::LayerName;
82 : use crate::tenant::timeline::offload::offload_timeline;
83 : use crate::tenant::timeline::CompactFlags;
84 : use crate::tenant::timeline::CompactionError;
85 : use crate::tenant::timeline::Timeline;
86 : use crate::tenant::GetTimelineError;
87 : use crate::tenant::OffloadedTimeline;
88 : use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
89 : use crate::DEFAULT_PG_VERSION;
90 : use crate::{disk_usage_eviction_task, tenant};
91 : use pageserver_api::models::{
92 : StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
93 : TimelineInfo,
94 : };
95 : use utils::{
96 : auth::SwappableJwtAuth,
97 : generation::Generation,
98 : http::{
99 : endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
100 : error::{ApiError, HttpErrorBody},
101 : json::{json_request, json_response},
102 : request::parse_request_param,
103 : RequestExt, RouterBuilder,
104 : },
105 : id::{TenantId, TimelineId},
106 : lsn::Lsn,
107 : };
108 :
109 : // For APIs that require an Active tenant, how long should we block waiting for that state?
110 : // This is not functionally necessary (clients will retry), but avoids generating a lot of
111 : // failed API calls while tenants are activating.
112 : #[cfg(not(feature = "testing"))]
113 : pub(crate) const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
114 :
115 : // Tests run on slow/oversubscribed nodes, and may need to wait much longer for tenants to
116 : // finish attaching, if calls to remote storage are slow.
117 : #[cfg(feature = "testing")]
118 : pub(crate) const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
119 :
120 : pub struct State {
121 : conf: &'static PageServerConf,
122 : tenant_manager: Arc<TenantManager>,
123 : auth: Option<Arc<SwappableJwtAuth>>,
124 : allowlist_routes: Vec<Uri>,
125 : remote_storage: GenericRemoteStorage,
126 : broker_client: storage_broker::BrokerClientChannel,
127 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
128 : deletion_queue_client: DeletionQueueClient,
129 : secondary_controller: SecondaryController,
130 : latest_utilization: tokio::sync::Mutex<Option<(std::time::Instant, bytes::Bytes)>>,
131 : }
132 :
133 : impl State {
134 : #[allow(clippy::too_many_arguments)]
135 0 : pub fn new(
136 0 : conf: &'static PageServerConf,
137 0 : tenant_manager: Arc<TenantManager>,
138 0 : auth: Option<Arc<SwappableJwtAuth>>,
139 0 : remote_storage: GenericRemoteStorage,
140 0 : broker_client: storage_broker::BrokerClientChannel,
141 0 : disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
142 0 : deletion_queue_client: DeletionQueueClient,
143 0 : secondary_controller: SecondaryController,
144 0 : ) -> anyhow::Result<Self> {
145 0 : let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
146 0 : .iter()
147 0 : .map(|v| v.parse().unwrap())
148 0 : .collect::<Vec<_>>();
149 0 : Ok(Self {
150 0 : conf,
151 0 : tenant_manager,
152 0 : auth,
153 0 : allowlist_routes,
154 0 : remote_storage,
155 0 : broker_client,
156 0 : disk_usage_eviction_state,
157 0 : deletion_queue_client,
158 0 : secondary_controller,
159 0 : latest_utilization: Default::default(),
160 0 : })
161 0 : }
162 : }
163 :
164 : #[inline(always)]
165 0 : fn get_state(request: &Request<Body>) -> &State {
166 0 : request
167 0 : .data::<Arc<State>>()
168 0 : .expect("unknown state type")
169 0 : .as_ref()
170 0 : }
171 :
172 : #[inline(always)]
173 0 : fn get_config(request: &Request<Body>) -> &'static PageServerConf {
174 0 : get_state(request).conf
175 0 : }
176 :
177 : /// Check that the requester is authorized to operate on given tenant
178 0 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
179 0 : check_permission_with(request, |claims| {
180 0 : crate::auth::check_permission(claims, tenant_id)
181 0 : })
182 0 : }
183 :
184 : impl From<PageReconstructError> for ApiError {
185 0 : fn from(pre: PageReconstructError) -> ApiError {
186 0 : match pre {
187 0 : PageReconstructError::Other(other) => ApiError::InternalServerError(other),
188 0 : PageReconstructError::MissingKey(e) => ApiError::InternalServerError(e.into()),
189 0 : PageReconstructError::Cancelled => ApiError::Cancelled,
190 0 : PageReconstructError::AncestorLsnTimeout(e) => ApiError::Timeout(format!("{e}").into()),
191 0 : PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
192 : }
193 0 : }
194 : }
195 :
196 : impl From<TenantMapInsertError> for ApiError {
197 0 : fn from(tmie: TenantMapInsertError) -> ApiError {
198 0 : match tmie {
199 0 : TenantMapInsertError::SlotError(e) => e.into(),
200 0 : TenantMapInsertError::SlotUpsertError(e) => e.into(),
201 0 : TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
202 : }
203 0 : }
204 : }
205 :
206 : impl From<TenantSlotError> for ApiError {
207 0 : fn from(e: TenantSlotError) -> ApiError {
208 : use TenantSlotError::*;
209 0 : match e {
210 0 : NotFound(tenant_id) => {
211 0 : ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
212 : }
213 : InProgress => {
214 0 : ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
215 : }
216 0 : MapState(e) => e.into(),
217 : }
218 0 : }
219 : }
220 :
221 : impl From<TenantSlotUpsertError> for ApiError {
222 0 : fn from(e: TenantSlotUpsertError) -> ApiError {
223 : use TenantSlotUpsertError::*;
224 0 : match e {
225 0 : InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")),
226 0 : MapState(e) => e.into(),
227 0 : ShuttingDown(_) => ApiError::ShuttingDown,
228 : }
229 0 : }
230 : }
231 :
232 : impl From<UpsertLocationError> for ApiError {
233 0 : fn from(e: UpsertLocationError) -> ApiError {
234 : use UpsertLocationError::*;
235 0 : match e {
236 0 : BadRequest(e) => ApiError::BadRequest(e),
237 0 : Unavailable(_) => ApiError::ShuttingDown,
238 0 : e @ InProgress => ApiError::Conflict(format!("{e}")),
239 0 : Flush(e) | InternalError(e) => ApiError::InternalServerError(e),
240 : }
241 0 : }
242 : }
243 :
244 : impl From<TenantMapError> for ApiError {
245 0 : fn from(e: TenantMapError) -> ApiError {
246 : use TenantMapError::*;
247 0 : match e {
248 : StillInitializing | ShuttingDown => {
249 0 : ApiError::ResourceUnavailable(format!("{e}").into())
250 0 : }
251 0 : }
252 0 : }
253 : }
254 :
255 : impl From<TenantStateError> for ApiError {
256 0 : fn from(tse: TenantStateError) -> ApiError {
257 0 : match tse {
258 : TenantStateError::IsStopping(_) => {
259 0 : ApiError::ResourceUnavailable("Tenant is stopping".into())
260 : }
261 0 : TenantStateError::SlotError(e) => e.into(),
262 0 : TenantStateError::SlotUpsertError(e) => e.into(),
263 0 : TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)),
264 : }
265 0 : }
266 : }
267 :
268 : impl From<GetTenantError> for ApiError {
269 0 : fn from(tse: GetTenantError) -> ApiError {
270 0 : match tse {
271 0 : GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
272 : GetTenantError::NotActive(_) => {
273 : // Why is this not `ApiError::NotFound`?
274 : // Because we must be careful to never return 404 for a tenant if it does
275 : // in fact exist locally. If we did, the caller could draw the conclusion
276 : // that it can attach the tenant to another PS and we'd be in split-brain.
277 0 : ApiError::ResourceUnavailable("Tenant not yet active".into())
278 : }
279 0 : GetTenantError::MapState(e) => ApiError::ResourceUnavailable(format!("{e}").into()),
280 : }
281 0 : }
282 : }
283 :
284 : impl From<GetTimelineError> for ApiError {
285 0 : fn from(gte: GetTimelineError) -> Self {
286 0 : // Rationale: tenant is activated only after eligble timelines activate
287 0 : ApiError::NotFound(gte.into())
288 0 : }
289 : }
290 :
291 : impl From<GetActiveTenantError> for ApiError {
292 0 : fn from(e: GetActiveTenantError) -> ApiError {
293 0 : match e {
294 0 : GetActiveTenantError::Broken(reason) => {
295 0 : ApiError::InternalServerError(anyhow!("tenant is broken: {}", reason))
296 : }
297 0 : GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)),
298 0 : GetActiveTenantError::Cancelled => ApiError::ShuttingDown,
299 0 : GetActiveTenantError::NotFound(gte) => gte.into(),
300 : GetActiveTenantError::WaitForActiveTimeout { .. } => {
301 0 : ApiError::ResourceUnavailable(format!("{}", e).into())
302 : }
303 : GetActiveTenantError::SwitchedTenant => {
304 : // in our HTTP handlers, this error doesn't happen
305 : // TODO: separate error types
306 0 : ApiError::ResourceUnavailable("switched tenant".into())
307 : }
308 : }
309 0 : }
310 : }
311 :
312 : impl From<crate::tenant::DeleteTimelineError> for ApiError {
313 0 : fn from(value: crate::tenant::DeleteTimelineError) -> Self {
314 : use crate::tenant::DeleteTimelineError::*;
315 0 : match value {
316 0 : NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
317 0 : HasChildren(children) => ApiError::PreconditionFailed(
318 0 : format!("Cannot delete timeline which has child timelines: {children:?}")
319 0 : .into_boxed_str(),
320 0 : ),
321 0 : a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
322 0 : Other(e) => ApiError::InternalServerError(e),
323 : }
324 0 : }
325 : }
326 :
327 : impl From<crate::tenant::TimelineArchivalError> for ApiError {
328 0 : fn from(value: crate::tenant::TimelineArchivalError) -> Self {
329 : use crate::tenant::TimelineArchivalError::*;
330 0 : match value {
331 0 : NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
332 0 : Timeout => ApiError::Timeout("hit pageserver internal timeout".into()),
333 0 : Cancelled => ApiError::ShuttingDown,
334 0 : e @ HasArchivedParent(_) => {
335 0 : ApiError::PreconditionFailed(e.to_string().into_boxed_str())
336 : }
337 0 : HasUnarchivedChildren(children) => ApiError::PreconditionFailed(
338 0 : format!(
339 0 : "Cannot archive timeline which has non-archived child timelines: {children:?}"
340 0 : )
341 0 : .into_boxed_str(),
342 0 : ),
343 0 : a @ AlreadyInProgress => ApiError::Conflict(a.to_string()),
344 0 : Other(e) => ApiError::InternalServerError(e),
345 : }
346 0 : }
347 : }
348 :
349 : impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
350 0 : fn from(value: crate::tenant::mgr::DeleteTimelineError) -> Self {
351 : use crate::tenant::mgr::DeleteTimelineError::*;
352 0 : match value {
353 : // Report Precondition failed so client can distinguish between
354 : // "tenant is missing" case from "timeline is missing"
355 0 : Tenant(GetTenantError::NotFound(..)) => ApiError::PreconditionFailed(
356 0 : "Requested tenant is missing".to_owned().into_boxed_str(),
357 0 : ),
358 0 : Tenant(t) => ApiError::from(t),
359 0 : Timeline(t) => ApiError::from(t),
360 : }
361 0 : }
362 : }
363 :
364 : impl From<crate::tenant::mgr::DeleteTenantError> for ApiError {
365 0 : fn from(value: crate::tenant::mgr::DeleteTenantError) -> Self {
366 : use crate::tenant::mgr::DeleteTenantError::*;
367 0 : match value {
368 0 : SlotError(e) => e.into(),
369 0 : Other(o) => ApiError::InternalServerError(o),
370 0 : Cancelled => ApiError::ShuttingDown,
371 : }
372 0 : }
373 : }
374 :
375 : // Helper function to construct a TimelineInfo struct for a timeline
376 0 : async fn build_timeline_info(
377 0 : timeline: &Arc<Timeline>,
378 0 : include_non_incremental_logical_size: bool,
379 0 : force_await_initial_logical_size: bool,
380 0 : ctx: &RequestContext,
381 0 : ) -> anyhow::Result<TimelineInfo> {
382 0 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
383 0 :
384 0 : if force_await_initial_logical_size {
385 0 : timeline.clone().await_initial_logical_size().await
386 0 : }
387 :
388 0 : let mut info = build_timeline_info_common(
389 0 : timeline,
390 0 : ctx,
391 0 : tenant::timeline::GetLogicalSizePriority::Background,
392 0 : )
393 0 : .await?;
394 0 : if include_non_incremental_logical_size {
395 : // XXX we should be using spawn_ondemand_logical_size_calculation here.
396 : // Otherwise, if someone deletes the timeline / detaches the tenant while
397 : // we're executing this function, we will outlive the timeline on-disk state.
398 : info.current_logical_size_non_incremental = Some(
399 0 : timeline
400 0 : .get_current_logical_size_non_incremental(info.last_record_lsn, ctx)
401 0 : .await?,
402 : );
403 0 : }
404 0 : Ok(info)
405 0 : }
406 :
407 0 : async fn build_timeline_info_common(
408 0 : timeline: &Arc<Timeline>,
409 0 : ctx: &RequestContext,
410 0 : logical_size_task_priority: tenant::timeline::GetLogicalSizePriority,
411 0 : ) -> anyhow::Result<TimelineInfo> {
412 0 : crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
413 0 : let initdb_lsn = timeline.initdb_lsn;
414 0 : let last_record_lsn = timeline.get_last_record_lsn();
415 0 : let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
416 0 : let guard = timeline.last_received_wal.lock().unwrap();
417 0 : if let Some(info) = guard.as_ref() {
418 0 : (
419 0 : Some(format!("{}", info.wal_source_connconf)), // Password is hidden, but it's for statistics only.
420 0 : Some(info.last_received_msg_lsn),
421 0 : Some(info.last_received_msg_ts),
422 0 : )
423 : } else {
424 0 : (None, None, None)
425 : }
426 : };
427 :
428 0 : let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
429 0 : let ancestor_lsn = match timeline.get_ancestor_lsn() {
430 0 : Lsn(0) => None,
431 0 : lsn @ Lsn(_) => Some(lsn),
432 : };
433 0 : let current_logical_size = timeline.get_current_logical_size(logical_size_task_priority, ctx);
434 0 : let current_physical_size = Some(timeline.layer_size_sum().await);
435 0 : let state = timeline.current_state();
436 0 : // Report is_archived = false if the timeline is still loading
437 0 : let is_archived = timeline.is_archived().unwrap_or(false);
438 0 : let remote_consistent_lsn_projected = timeline
439 0 : .get_remote_consistent_lsn_projected()
440 0 : .unwrap_or(Lsn(0));
441 0 : let remote_consistent_lsn_visible = timeline
442 0 : .get_remote_consistent_lsn_visible()
443 0 : .unwrap_or(Lsn(0));
444 0 :
445 0 : let walreceiver_status = timeline.walreceiver_status();
446 0 :
447 0 : let (pitr_history_size, within_ancestor_pitr) = timeline.get_pitr_history_stats();
448 :
449 0 : let info = TimelineInfo {
450 0 : tenant_id: timeline.tenant_shard_id,
451 0 : timeline_id: timeline.timeline_id,
452 0 : ancestor_timeline_id,
453 0 : ancestor_lsn,
454 0 : disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
455 0 : remote_consistent_lsn: remote_consistent_lsn_projected,
456 0 : remote_consistent_lsn_visible,
457 0 : initdb_lsn,
458 0 : last_record_lsn,
459 0 : prev_record_lsn: Some(timeline.get_prev_record_lsn()),
460 0 : latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
461 0 : current_logical_size: current_logical_size.size_dont_care_about_accuracy(),
462 0 : current_logical_size_is_accurate: match current_logical_size.accuracy() {
463 0 : tenant::timeline::logical_size::Accuracy::Approximate => false,
464 0 : tenant::timeline::logical_size::Accuracy::Exact => true,
465 : },
466 0 : directory_entries_counts: timeline.get_directory_metrics().to_vec(),
467 0 : current_physical_size,
468 0 : current_logical_size_non_incremental: None,
469 0 : pitr_history_size,
470 0 : within_ancestor_pitr,
471 0 : timeline_dir_layer_file_size_sum: None,
472 0 : wal_source_connstr,
473 0 : last_received_msg_lsn,
474 0 : last_received_msg_ts,
475 0 : pg_version: timeline.pg_version,
476 0 :
477 0 : state,
478 0 : is_archived: Some(is_archived),
479 0 :
480 0 : walreceiver_status,
481 0 : };
482 0 : Ok(info)
483 0 : }
484 :
485 0 : fn build_timeline_offloaded_info(offloaded: &Arc<OffloadedTimeline>) -> OffloadedTimelineInfo {
486 0 : let &OffloadedTimeline {
487 0 : tenant_shard_id,
488 0 : timeline_id,
489 0 : ancestor_retain_lsn,
490 0 : ancestor_timeline_id,
491 0 : archived_at,
492 0 : ..
493 0 : } = offloaded.as_ref();
494 0 : OffloadedTimelineInfo {
495 0 : tenant_id: tenant_shard_id,
496 0 : timeline_id,
497 0 : ancestor_retain_lsn,
498 0 : ancestor_timeline_id,
499 0 : archived_at: archived_at.and_utc(),
500 0 : }
501 0 : }
502 :
503 : // healthcheck handler
504 0 : async fn status_handler(
505 0 : request: Request<Body>,
506 0 : _cancel: CancellationToken,
507 0 : ) -> Result<Response<Body>, ApiError> {
508 0 : check_permission(&request, None)?;
509 0 : let config = get_config(&request);
510 0 : json_response(StatusCode::OK, StatusResponse { id: config.id })
511 0 : }
512 :
513 0 : async fn reload_auth_validation_keys_handler(
514 0 : request: Request<Body>,
515 0 : _cancel: CancellationToken,
516 0 : ) -> Result<Response<Body>, ApiError> {
517 0 : check_permission(&request, None)?;
518 0 : let config = get_config(&request);
519 0 : let state = get_state(&request);
520 0 : let Some(shared_auth) = &state.auth else {
521 0 : return json_response(StatusCode::BAD_REQUEST, ());
522 : };
523 : // unwrap is ok because check is performed when creating config, so path is set and exists
524 0 : let key_path = config.auth_validation_public_key_path.as_ref().unwrap();
525 0 : info!("Reloading public key(s) for verifying JWT tokens from {key_path:?}");
526 :
527 0 : match JwtAuth::from_key_path(key_path) {
528 0 : Ok(new_auth) => {
529 0 : shared_auth.swap(new_auth);
530 0 : json_response(StatusCode::OK, ())
531 : }
532 0 : Err(e) => {
533 0 : let err_msg = "Error reloading public keys";
534 0 : warn!("Error reloading public keys from {key_path:?}: {e:}");
535 0 : json_response(
536 0 : StatusCode::INTERNAL_SERVER_ERROR,
537 0 : HttpErrorBody::from_msg(err_msg.to_string()),
538 0 : )
539 : }
540 : }
541 0 : }
542 :
543 0 : async fn timeline_create_handler(
544 0 : mut request: Request<Body>,
545 0 : _cancel: CancellationToken,
546 0 : ) -> Result<Response<Body>, ApiError> {
547 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
548 0 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
549 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
550 :
551 0 : let new_timeline_id = request_data.new_timeline_id;
552 : // fill in the default pg_version if not provided & convert request into domain model
553 0 : let params: tenant::CreateTimelineParams = match request_data.mode {
554 : TimelineCreateRequestMode::Bootstrap {
555 0 : existing_initdb_timeline_id,
556 0 : pg_version,
557 0 : } => tenant::CreateTimelineParams::Bootstrap(tenant::CreateTimelineParamsBootstrap {
558 0 : new_timeline_id,
559 0 : existing_initdb_timeline_id,
560 0 : pg_version: pg_version.unwrap_or(DEFAULT_PG_VERSION),
561 0 : }),
562 : TimelineCreateRequestMode::Branch {
563 0 : ancestor_timeline_id,
564 0 : ancestor_start_lsn,
565 0 : pg_version: _,
566 0 : } => tenant::CreateTimelineParams::Branch(tenant::CreateTimelineParamsBranch {
567 0 : new_timeline_id,
568 0 : ancestor_timeline_id,
569 0 : ancestor_start_lsn,
570 0 : }),
571 : };
572 :
573 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
574 0 :
575 0 : let state = get_state(&request);
576 :
577 0 : async {
578 0 : let tenant = state
579 0 : .tenant_manager
580 0 : .get_attached_tenant_shard(tenant_shard_id)?;
581 :
582 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
583 :
584 : // earlier versions of the code had pg_version and ancestor_lsn in the span
585 : // => continue to provide that information, but, through a log message that doesn't require us to destructure
586 0 : tracing::info!(?params, "creating timeline");
587 :
588 0 : match tenant
589 0 : .create_timeline(params, state.broker_client.clone(), &ctx)
590 0 : .await
591 : {
592 0 : Ok(new_timeline) => {
593 : // Created. Construct a TimelineInfo for it.
594 0 : let timeline_info = build_timeline_info_common(
595 0 : &new_timeline,
596 0 : &ctx,
597 0 : tenant::timeline::GetLogicalSizePriority::User,
598 0 : )
599 0 : .await
600 0 : .map_err(ApiError::InternalServerError)?;
601 0 : json_response(StatusCode::CREATED, timeline_info)
602 : }
603 0 : Err(_) if tenant.cancel.is_cancelled() => {
604 0 : // In case we get some ugly error type during shutdown, cast it into a clean 503.
605 0 : json_response(
606 0 : StatusCode::SERVICE_UNAVAILABLE,
607 0 : HttpErrorBody::from_msg("Tenant shutting down".to_string()),
608 0 : )
609 : }
610 0 : Err(e @ tenant::CreateTimelineError::Conflict) => {
611 0 : json_response(StatusCode::CONFLICT, HttpErrorBody::from_msg(e.to_string()))
612 : }
613 0 : Err(e @ tenant::CreateTimelineError::AlreadyCreating) => json_response(
614 0 : StatusCode::TOO_MANY_REQUESTS,
615 0 : HttpErrorBody::from_msg(e.to_string()),
616 0 : ),
617 0 : Err(tenant::CreateTimelineError::AncestorLsn(err)) => json_response(
618 0 : StatusCode::NOT_ACCEPTABLE,
619 0 : HttpErrorBody::from_msg(format!("{err:#}")),
620 0 : ),
621 0 : Err(e @ tenant::CreateTimelineError::AncestorNotActive) => json_response(
622 0 : StatusCode::SERVICE_UNAVAILABLE,
623 0 : HttpErrorBody::from_msg(e.to_string()),
624 0 : ),
625 0 : Err(e @ tenant::CreateTimelineError::AncestorArchived) => json_response(
626 0 : StatusCode::NOT_ACCEPTABLE,
627 0 : HttpErrorBody::from_msg(e.to_string()),
628 0 : ),
629 0 : Err(tenant::CreateTimelineError::ShuttingDown) => json_response(
630 0 : StatusCode::SERVICE_UNAVAILABLE,
631 0 : HttpErrorBody::from_msg("tenant shutting down".to_string()),
632 0 : ),
633 0 : Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
634 : }
635 0 : }
636 0 : .instrument(info_span!("timeline_create",
637 : tenant_id = %tenant_shard_id.tenant_id,
638 0 : shard_id = %tenant_shard_id.shard_slug(),
639 : timeline_id = %new_timeline_id,
640 : ))
641 0 : .await
642 0 : }
643 :
644 0 : async fn timeline_list_handler(
645 0 : request: Request<Body>,
646 0 : _cancel: CancellationToken,
647 0 : ) -> Result<Response<Body>, ApiError> {
648 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
649 0 : let include_non_incremental_logical_size: Option<bool> =
650 0 : parse_query_param(&request, "include-non-incremental-logical-size")?;
651 0 : let force_await_initial_logical_size: Option<bool> =
652 0 : parse_query_param(&request, "force-await-initial-logical-size")?;
653 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
654 :
655 0 : let state = get_state(&request);
656 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
657 :
658 0 : let response_data = async {
659 0 : let tenant = state
660 0 : .tenant_manager
661 0 : .get_attached_tenant_shard(tenant_shard_id)?;
662 :
663 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
664 :
665 0 : let timelines = tenant.list_timelines();
666 0 :
667 0 : let mut response_data = Vec::with_capacity(timelines.len());
668 0 : for timeline in timelines {
669 0 : let timeline_info = build_timeline_info(
670 0 : &timeline,
671 0 : include_non_incremental_logical_size.unwrap_or(false),
672 0 : force_await_initial_logical_size.unwrap_or(false),
673 0 : &ctx,
674 0 : )
675 0 : .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
676 0 : .await
677 0 : .context("Failed to build timeline info")
678 0 : .map_err(ApiError::InternalServerError)?;
679 :
680 0 : response_data.push(timeline_info);
681 : }
682 0 : Ok::<Vec<TimelineInfo>, ApiError>(response_data)
683 0 : }
684 0 : .instrument(info_span!("timeline_list",
685 : tenant_id = %tenant_shard_id.tenant_id,
686 0 : shard_id = %tenant_shard_id.shard_slug()))
687 0 : .await?;
688 :
689 0 : json_response(StatusCode::OK, response_data)
690 0 : }
691 :
692 0 : async fn timeline_and_offloaded_list_handler(
693 0 : request: Request<Body>,
694 0 : _cancel: CancellationToken,
695 0 : ) -> Result<Response<Body>, ApiError> {
696 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
697 0 : let include_non_incremental_logical_size: Option<bool> =
698 0 : parse_query_param(&request, "include-non-incremental-logical-size")?;
699 0 : let force_await_initial_logical_size: Option<bool> =
700 0 : parse_query_param(&request, "force-await-initial-logical-size")?;
701 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
702 :
703 0 : let state = get_state(&request);
704 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
705 :
706 0 : let response_data = async {
707 0 : let tenant = state
708 0 : .tenant_manager
709 0 : .get_attached_tenant_shard(tenant_shard_id)?;
710 :
711 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
712 :
713 0 : let (timelines, offloadeds) = tenant.list_timelines_and_offloaded();
714 0 :
715 0 : let mut timeline_infos = Vec::with_capacity(timelines.len());
716 0 : for timeline in timelines {
717 0 : let timeline_info = build_timeline_info(
718 0 : &timeline,
719 0 : include_non_incremental_logical_size.unwrap_or(false),
720 0 : force_await_initial_logical_size.unwrap_or(false),
721 0 : &ctx,
722 0 : )
723 0 : .instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
724 0 : .await
725 0 : .context("Failed to build timeline info")
726 0 : .map_err(ApiError::InternalServerError)?;
727 :
728 0 : timeline_infos.push(timeline_info);
729 : }
730 0 : let offloaded_infos = offloadeds
731 0 : .into_iter()
732 0 : .map(|offloaded| build_timeline_offloaded_info(&offloaded))
733 0 : .collect::<Vec<_>>();
734 0 : let res = TimelinesInfoAndOffloaded {
735 0 : timelines: timeline_infos,
736 0 : offloaded: offloaded_infos,
737 0 : };
738 0 : Ok::<TimelinesInfoAndOffloaded, ApiError>(res)
739 0 : }
740 0 : .instrument(info_span!("timeline_and_offloaded_list",
741 : tenant_id = %tenant_shard_id.tenant_id,
742 0 : shard_id = %tenant_shard_id.shard_slug()))
743 0 : .await?;
744 :
745 0 : json_response(StatusCode::OK, response_data)
746 0 : }
747 :
748 0 : async fn timeline_preserve_initdb_handler(
749 0 : request: Request<Body>,
750 0 : _cancel: CancellationToken,
751 0 : ) -> Result<Response<Body>, ApiError> {
752 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
753 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
754 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
755 0 : let state = get_state(&request);
756 :
757 : // Part of the process for disaster recovery from safekeeper-stored WAL:
758 : // If we don't recover into a new timeline but want to keep the timeline ID,
759 : // then the initdb archive is deleted. This endpoint copies it to a different
760 : // location where timeline recreation cand find it.
761 :
762 0 : async {
763 0 : let tenant = state
764 0 : .tenant_manager
765 0 : .get_attached_tenant_shard(tenant_shard_id)?;
766 :
767 0 : let timeline = tenant.get_timeline(timeline_id, false)?;
768 :
769 0 : timeline
770 0 : .preserve_initdb_archive()
771 0 : .await
772 0 : .context("preserving initdb archive")
773 0 : .map_err(ApiError::InternalServerError)?;
774 :
775 0 : Ok::<_, ApiError>(())
776 0 : }
777 0 : .instrument(info_span!("timeline_preserve_initdb_archive",
778 : tenant_id = %tenant_shard_id.tenant_id,
779 0 : shard_id = %tenant_shard_id.shard_slug(),
780 : %timeline_id))
781 0 : .await?;
782 :
783 0 : json_response(StatusCode::OK, ())
784 0 : }
785 :
786 0 : async fn timeline_archival_config_handler(
787 0 : mut request: Request<Body>,
788 0 : _cancel: CancellationToken,
789 0 : ) -> Result<Response<Body>, ApiError> {
790 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
791 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
792 :
793 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
794 :
795 0 : let request_data: TimelineArchivalConfigRequest = json_request(&mut request).await?;
796 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
797 0 : let state = get_state(&request);
798 :
799 0 : async {
800 0 : let tenant = state
801 0 : .tenant_manager
802 0 : .get_attached_tenant_shard(tenant_shard_id)?;
803 :
804 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
805 :
806 0 : tenant
807 0 : .apply_timeline_archival_config(
808 0 : timeline_id,
809 0 : request_data.state,
810 0 : state.broker_client.clone(),
811 0 : ctx,
812 0 : )
813 0 : .await?;
814 0 : Ok::<_, ApiError>(())
815 0 : }
816 0 : .instrument(info_span!("timeline_archival_config",
817 : tenant_id = %tenant_shard_id.tenant_id,
818 0 : shard_id = %tenant_shard_id.shard_slug(),
819 : state = ?request_data.state,
820 : %timeline_id))
821 0 : .await?;
822 :
823 0 : json_response(StatusCode::OK, ())
824 0 : }
825 :
826 0 : async fn timeline_detail_handler(
827 0 : request: Request<Body>,
828 0 : _cancel: CancellationToken,
829 0 : ) -> Result<Response<Body>, ApiError> {
830 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
831 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
832 0 : let include_non_incremental_logical_size: Option<bool> =
833 0 : parse_query_param(&request, "include-non-incremental-logical-size")?;
834 0 : let force_await_initial_logical_size: Option<bool> =
835 0 : parse_query_param(&request, "force-await-initial-logical-size")?;
836 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
837 :
838 : // Logical size calculation needs downloading.
839 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
840 0 : let state = get_state(&request);
841 :
842 0 : let timeline_info = async {
843 0 : let tenant = state
844 0 : .tenant_manager
845 0 : .get_attached_tenant_shard(tenant_shard_id)?;
846 :
847 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
848 :
849 0 : let timeline = tenant.get_timeline(timeline_id, false)?;
850 :
851 0 : let timeline_info = build_timeline_info(
852 0 : &timeline,
853 0 : include_non_incremental_logical_size.unwrap_or(false),
854 0 : force_await_initial_logical_size.unwrap_or(false),
855 0 : &ctx,
856 0 : )
857 0 : .await
858 0 : .context("get local timeline info")
859 0 : .map_err(ApiError::InternalServerError)?;
860 :
861 0 : Ok::<_, ApiError>(timeline_info)
862 0 : }
863 0 : .instrument(info_span!("timeline_detail",
864 : tenant_id = %tenant_shard_id.tenant_id,
865 0 : shard_id = %tenant_shard_id.shard_slug(),
866 : %timeline_id))
867 0 : .await?;
868 :
869 0 : json_response(StatusCode::OK, timeline_info)
870 0 : }
871 :
872 0 : async fn get_lsn_by_timestamp_handler(
873 0 : request: Request<Body>,
874 0 : cancel: CancellationToken,
875 0 : ) -> Result<Response<Body>, ApiError> {
876 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
877 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
878 0 : let state = get_state(&request);
879 0 :
880 0 : if !tenant_shard_id.is_shard_zero() {
881 : // Requires SLRU contents, which are only stored on shard zero
882 0 : return Err(ApiError::BadRequest(anyhow!(
883 0 : "Size calculations are only available on shard zero"
884 0 : )));
885 0 : }
886 :
887 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
888 0 : let timestamp_raw = must_get_query_param(&request, "timestamp")?;
889 0 : let timestamp = humantime::parse_rfc3339(×tamp_raw)
890 0 : .with_context(|| format!("Invalid time: {:?}", timestamp_raw))
891 0 : .map_err(ApiError::BadRequest)?;
892 0 : let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
893 :
894 0 : let with_lease = parse_query_param(&request, "with_lease")?.unwrap_or(false);
895 0 :
896 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
897 :
898 0 : let timeline =
899 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
900 0 : .await?;
901 0 : let result = timeline
902 0 : .find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
903 0 : .await?;
904 :
905 : #[derive(serde::Serialize, Debug)]
906 : struct Result {
907 : lsn: Lsn,
908 : kind: &'static str,
909 : #[serde(default)]
910 : #[serde(skip_serializing_if = "Option::is_none")]
911 : #[serde(flatten)]
912 : lease: Option<LsnLease>,
913 : }
914 0 : let (lsn, kind) = match result {
915 0 : LsnForTimestamp::Present(lsn) => (lsn, "present"),
916 0 : LsnForTimestamp::Future(lsn) => (lsn, "future"),
917 0 : LsnForTimestamp::Past(lsn) => (lsn, "past"),
918 0 : LsnForTimestamp::NoData(lsn) => (lsn, "nodata"),
919 : };
920 :
921 0 : let lease = if with_lease {
922 0 : timeline
923 0 : .init_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx)
924 0 : .inspect_err(|_| {
925 0 : warn!("fail to grant a lease to {}", lsn);
926 0 : })
927 0 : .ok()
928 : } else {
929 0 : None
930 : };
931 :
932 0 : let result = Result { lsn, kind, lease };
933 0 : let valid_until = result
934 0 : .lease
935 0 : .as_ref()
936 0 : .map(|l| humantime::format_rfc3339_millis(l.valid_until).to_string());
937 0 : tracing::info!(
938 : lsn=?result.lsn,
939 : kind=%result.kind,
940 : timestamp=%timestamp_raw,
941 : valid_until=?valid_until,
942 0 : "lsn_by_timestamp finished"
943 : );
944 0 : json_response(StatusCode::OK, result)
945 0 : }
946 :
947 0 : async fn get_timestamp_of_lsn_handler(
948 0 : request: Request<Body>,
949 0 : _cancel: CancellationToken,
950 0 : ) -> Result<Response<Body>, ApiError> {
951 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
952 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
953 0 : let state = get_state(&request);
954 0 :
955 0 : if !tenant_shard_id.is_shard_zero() {
956 : // Requires SLRU contents, which are only stored on shard zero
957 0 : return Err(ApiError::BadRequest(anyhow!(
958 0 : "Size calculations are only available on shard zero"
959 0 : )));
960 0 : }
961 :
962 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
963 :
964 0 : let lsn_str = must_get_query_param(&request, "lsn")?;
965 0 : let lsn = Lsn::from_str(&lsn_str)
966 0 : .with_context(|| format!("Invalid LSN: {lsn_str:?}"))
967 0 : .map_err(ApiError::BadRequest)?;
968 :
969 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
970 0 : let timeline =
971 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
972 0 : .await?;
973 0 : let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;
974 :
975 0 : match result {
976 0 : Some(time) => {
977 0 : let time = format_rfc3339(
978 0 : postgres_ffi::try_from_pg_timestamp(time).map_err(ApiError::InternalServerError)?,
979 : )
980 0 : .to_string();
981 0 : json_response(StatusCode::OK, time)
982 : }
983 0 : None => Err(ApiError::NotFound(
984 0 : anyhow::anyhow!("Timestamp for lsn {} not found", lsn).into(),
985 0 : )),
986 : }
987 0 : }
988 :
989 0 : async fn timeline_delete_handler(
990 0 : request: Request<Body>,
991 0 : _cancel: CancellationToken,
992 0 : ) -> Result<Response<Body>, ApiError> {
993 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
994 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
995 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
996 :
997 0 : let state = get_state(&request);
998 :
999 0 : let tenant = state
1000 0 : .tenant_manager
1001 0 : .get_attached_tenant_shard(tenant_shard_id)
1002 0 : .map_err(|e| {
1003 0 : match e {
1004 : // GetTenantError has a built-in conversion to ApiError, but in this context we don't
1005 : // want to treat missing tenants as 404, to avoid ambiguity with successful deletions.
1006 0 : GetTenantError::NotFound(_) => ApiError::PreconditionFailed(
1007 0 : "Requested tenant is missing".to_string().into_boxed_str(),
1008 0 : ),
1009 0 : e => e.into(),
1010 : }
1011 0 : })?;
1012 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
1013 0 : tenant.delete_timeline(timeline_id).instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
1014 0 : .await?;
1015 :
1016 0 : json_response(StatusCode::ACCEPTED, ())
1017 0 : }
1018 :
1019 0 : async fn tenant_reset_handler(
1020 0 : request: Request<Body>,
1021 0 : _cancel: CancellationToken,
1022 0 : ) -> Result<Response<Body>, ApiError> {
1023 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1024 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1025 :
1026 0 : let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
1027 :
1028 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1029 0 : let state = get_state(&request);
1030 0 : state
1031 0 : .tenant_manager
1032 0 : .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &ctx)
1033 0 : .await
1034 0 : .map_err(ApiError::InternalServerError)?;
1035 :
1036 0 : json_response(StatusCode::OK, ())
1037 0 : }
1038 :
1039 0 : async fn tenant_list_handler(
1040 0 : request: Request<Body>,
1041 0 : _cancel: CancellationToken,
1042 0 : ) -> Result<Response<Body>, ApiError> {
1043 0 : check_permission(&request, None)?;
1044 0 : let state = get_state(&request);
1045 :
1046 0 : let response_data = state
1047 0 : .tenant_manager
1048 0 : .list_tenants()
1049 0 : .map_err(|_| {
1050 0 : ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
1051 0 : })?
1052 0 : .iter()
1053 0 : .map(|(id, state, gen)| TenantInfo {
1054 0 : id: *id,
1055 0 : state: state.clone(),
1056 0 : current_physical_size: None,
1057 0 : attachment_status: state.attachment_status(),
1058 0 : generation: (*gen)
1059 0 : .into()
1060 0 : .expect("Tenants are always attached with a generation"),
1061 0 : gc_blocking: None,
1062 0 : })
1063 0 : .collect::<Vec<TenantInfo>>();
1064 0 :
1065 0 : json_response(StatusCode::OK, response_data)
1066 0 : }
1067 :
1068 0 : async fn tenant_status(
1069 0 : request: Request<Body>,
1070 0 : _cancel: CancellationToken,
1071 0 : ) -> Result<Response<Body>, ApiError> {
1072 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1073 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1074 0 : let state = get_state(&request);
1075 0 :
1076 0 : // In tests, sometimes we want to query the state of a tenant without auto-activating it if it's currently waiting.
1077 0 : let activate = true;
1078 : #[cfg(feature = "testing")]
1079 0 : let activate = parse_query_param(&request, "activate")?.unwrap_or(activate);
1080 :
1081 0 : let tenant_info = async {
1082 0 : let tenant = state
1083 0 : .tenant_manager
1084 0 : .get_attached_tenant_shard(tenant_shard_id)?;
1085 :
1086 0 : if activate {
1087 : // This is advisory: we prefer to let the tenant activate on-demand when this function is
1088 : // called, but it is still valid to return 200 and describe the current state of the tenant
1089 : // if it doesn't make it into an active state.
1090 0 : tenant
1091 0 : .wait_to_become_active(ACTIVE_TENANT_TIMEOUT)
1092 0 : .await
1093 0 : .ok();
1094 0 : }
1095 :
1096 : // Calculate total physical size of all timelines
1097 0 : let mut current_physical_size = 0;
1098 0 : for timeline in tenant.list_timelines().iter() {
1099 0 : current_physical_size += timeline.layer_size_sum().await;
1100 : }
1101 :
1102 0 : let state = tenant.current_state();
1103 0 : Result::<_, ApiError>::Ok(TenantDetails {
1104 0 : tenant_info: TenantInfo {
1105 0 : id: tenant_shard_id,
1106 0 : state: state.clone(),
1107 0 : current_physical_size: Some(current_physical_size),
1108 0 : attachment_status: state.attachment_status(),
1109 0 : generation: tenant
1110 0 : .generation()
1111 0 : .into()
1112 0 : .expect("Tenants are always attached with a generation"),
1113 0 : gc_blocking: tenant.gc_block.summary().map(|x| format!("{x:?}")),
1114 0 : },
1115 0 : walredo: tenant.wal_redo_manager_status(),
1116 0 : timelines: tenant.list_timeline_ids(),
1117 0 : })
1118 0 : }
1119 0 : .instrument(info_span!("tenant_status_handler",
1120 : tenant_id = %tenant_shard_id.tenant_id,
1121 0 : shard_id = %tenant_shard_id.shard_slug()))
1122 0 : .await?;
1123 :
1124 0 : json_response(StatusCode::OK, tenant_info)
1125 0 : }
1126 :
1127 0 : async fn tenant_delete_handler(
1128 0 : request: Request<Body>,
1129 0 : _cancel: CancellationToken,
1130 0 : ) -> Result<Response<Body>, ApiError> {
1131 : // TODO openapi spec
1132 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1133 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1134 :
1135 0 : let state = get_state(&request);
1136 0 :
1137 0 : state
1138 0 : .tenant_manager
1139 0 : .delete_tenant(tenant_shard_id)
1140 0 : .instrument(info_span!("tenant_delete_handler",
1141 : tenant_id = %tenant_shard_id.tenant_id,
1142 0 : shard_id = %tenant_shard_id.shard_slug()
1143 : ))
1144 0 : .await?;
1145 :
1146 0 : json_response(StatusCode::OK, ())
1147 0 : }
1148 :
1149 : /// HTTP endpoint to query the current tenant_size of a tenant.
1150 : ///
1151 : /// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used
1152 : /// to debug any of the calculations. Requires `tenant_id` request parameter, supports
1153 : /// `inputs_only=true|false` (default false) which supports debugging failure to calculate model
1154 : /// values.
1155 : ///
1156 : /// 'retention_period' query parameter overrides the cutoff that is used to calculate the size
1157 : /// (only if it is shorter than the real cutoff).
1158 : ///
1159 : /// Note: we don't update the cached size and prometheus metric here.
1160 : /// The retention period might be different, and it's nice to have a method to just calculate it
1161 : /// without modifying anything anyway.
1162 0 : async fn tenant_size_handler(
1163 0 : request: Request<Body>,
1164 0 : cancel: CancellationToken,
1165 0 : ) -> Result<Response<Body>, ApiError> {
1166 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1167 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1168 0 : let inputs_only: Option<bool> = parse_query_param(&request, "inputs_only")?;
1169 0 : let retention_period: Option<u64> = parse_query_param(&request, "retention_period")?;
1170 0 : let headers = request.headers();
1171 0 : let state = get_state(&request);
1172 0 :
1173 0 : if !tenant_shard_id.is_shard_zero() {
1174 0 : return Err(ApiError::BadRequest(anyhow!(
1175 0 : "Size calculations are only available on shard zero"
1176 0 : )));
1177 0 : }
1178 0 :
1179 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1180 0 : let tenant = state
1181 0 : .tenant_manager
1182 0 : .get_attached_tenant_shard(tenant_shard_id)?;
1183 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
1184 :
1185 : // this can be long operation
1186 0 : let inputs = tenant
1187 0 : .gather_size_inputs(
1188 0 : retention_period,
1189 0 : LogicalSizeCalculationCause::TenantSizeHandler,
1190 0 : &cancel,
1191 0 : &ctx,
1192 0 : )
1193 0 : .await
1194 0 : .map_err(|e| match e {
1195 0 : crate::tenant::size::CalculateSyntheticSizeError::Cancelled => ApiError::ShuttingDown,
1196 0 : other => ApiError::InternalServerError(anyhow::anyhow!(other)),
1197 0 : })?;
1198 :
1199 0 : let mut sizes = None;
1200 0 : let accepts_html = headers
1201 0 : .get(header::ACCEPT)
1202 0 : .map(|v| v == "text/html")
1203 0 : .unwrap_or_default();
1204 0 : if !inputs_only.unwrap_or(false) {
1205 0 : let storage_model = inputs.calculate_model();
1206 0 : let size = storage_model.calculate();
1207 0 :
1208 0 : // If request header expects html, return html
1209 0 : if accepts_html {
1210 0 : return synthetic_size_html_response(inputs, storage_model, size);
1211 0 : }
1212 0 : sizes = Some(size);
1213 0 : } else if accepts_html {
1214 0 : return Err(ApiError::BadRequest(anyhow!(
1215 0 : "inputs_only parameter is incompatible with html output request"
1216 0 : )));
1217 0 : }
1218 :
1219 : /// The type resides in the pageserver not to expose `ModelInputs`.
1220 : #[derive(serde::Serialize)]
1221 : struct TenantHistorySize {
1222 : id: TenantId,
1223 : /// Size is a mixture of WAL and logical size, so the unit is bytes.
1224 : ///
1225 : /// Will be none if `?inputs_only=true` was given.
1226 : size: Option<u64>,
1227 : /// Size of each segment used in the model.
1228 : /// Will be null if `?inputs_only=true` was given.
1229 : segment_sizes: Option<Vec<tenant_size_model::SegmentSizeResult>>,
1230 : inputs: crate::tenant::size::ModelInputs,
1231 : }
1232 :
1233 0 : json_response(
1234 0 : StatusCode::OK,
1235 0 : TenantHistorySize {
1236 0 : id: tenant_shard_id.tenant_id,
1237 0 : size: sizes.as_ref().map(|x| x.total_size),
1238 0 : segment_sizes: sizes.map(|x| x.segments),
1239 0 : inputs,
1240 0 : },
1241 0 : )
1242 0 : }
1243 :
1244 0 : async fn tenant_shard_split_handler(
1245 0 : mut request: Request<Body>,
1246 0 : _cancel: CancellationToken,
1247 0 : ) -> Result<Response<Body>, ApiError> {
1248 0 : let req: TenantShardSplitRequest = json_request(&mut request).await?;
1249 :
1250 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1251 0 : let state = get_state(&request);
1252 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1253 :
1254 0 : let tenant = state
1255 0 : .tenant_manager
1256 0 : .get_attached_tenant_shard(tenant_shard_id)?;
1257 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
1258 :
1259 0 : let new_shards = state
1260 0 : .tenant_manager
1261 0 : .shard_split(
1262 0 : tenant,
1263 0 : ShardCount::new(req.new_shard_count),
1264 0 : req.new_stripe_size,
1265 0 : &ctx,
1266 0 : )
1267 0 : .await
1268 0 : .map_err(ApiError::InternalServerError)?;
1269 :
1270 0 : json_response(StatusCode::OK, TenantShardSplitResponse { new_shards })
1271 0 : }
1272 :
1273 0 : async fn layer_map_info_handler(
1274 0 : request: Request<Body>,
1275 0 : _cancel: CancellationToken,
1276 0 : ) -> Result<Response<Body>, ApiError> {
1277 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1278 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1279 0 : let reset: LayerAccessStatsReset =
1280 0 : parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset);
1281 0 : let state = get_state(&request);
1282 0 :
1283 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1284 :
1285 0 : let timeline =
1286 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
1287 0 : .await?;
1288 0 : let layer_map_info = timeline
1289 0 : .layer_map_info(reset)
1290 0 : .await
1291 0 : .map_err(|_shutdown| ApiError::ShuttingDown)?;
1292 :
1293 0 : json_response(StatusCode::OK, layer_map_info)
1294 0 : }
1295 :
1296 0 : async fn layer_download_handler(
1297 0 : request: Request<Body>,
1298 0 : _cancel: CancellationToken,
1299 0 : ) -> Result<Response<Body>, ApiError> {
1300 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1301 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1302 0 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
1303 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1304 0 : let layer_name = LayerName::from_str(layer_file_name)
1305 0 : .map_err(|s| ApiError::BadRequest(anyhow::anyhow!(s)))?;
1306 0 : let state = get_state(&request);
1307 :
1308 0 : let timeline =
1309 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
1310 0 : .await?;
1311 0 : let downloaded = timeline
1312 0 : .download_layer(&layer_name)
1313 0 : .await
1314 0 : .map_err(ApiError::InternalServerError)?;
1315 :
1316 0 : match downloaded {
1317 0 : Some(true) => json_response(StatusCode::OK, ()),
1318 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
1319 0 : None => json_response(
1320 0 : StatusCode::BAD_REQUEST,
1321 0 : format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"),
1322 0 : ),
1323 : }
1324 0 : }
1325 :
1326 0 : async fn evict_timeline_layer_handler(
1327 0 : request: Request<Body>,
1328 0 : _cancel: CancellationToken,
1329 0 : ) -> Result<Response<Body>, ApiError> {
1330 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1331 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1332 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1333 0 : let layer_file_name = get_request_param(&request, "layer_file_name")?;
1334 0 : let state = get_state(&request);
1335 :
1336 0 : let layer_name = LayerName::from_str(layer_file_name)
1337 0 : .map_err(|s| ApiError::BadRequest(anyhow::anyhow!(s)))?;
1338 :
1339 0 : let timeline =
1340 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
1341 0 : .await?;
1342 0 : let evicted = timeline
1343 0 : .evict_layer(&layer_name)
1344 0 : .await
1345 0 : .map_err(ApiError::InternalServerError)?;
1346 :
1347 0 : match evicted {
1348 0 : Some(true) => json_response(StatusCode::OK, ()),
1349 0 : Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
1350 0 : None => json_response(
1351 0 : StatusCode::BAD_REQUEST,
1352 0 : format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"),
1353 0 : ),
1354 : }
1355 0 : }
1356 :
1357 0 : async fn timeline_gc_blocking_handler(
1358 0 : request: Request<Body>,
1359 0 : _cancel: CancellationToken,
1360 0 : ) -> Result<Response<Body>, ApiError> {
1361 0 : block_or_unblock_gc(request, true).await
1362 0 : }
1363 :
1364 0 : async fn timeline_gc_unblocking_handler(
1365 0 : request: Request<Body>,
1366 0 : _cancel: CancellationToken,
1367 0 : ) -> Result<Response<Body>, ApiError> {
1368 0 : block_or_unblock_gc(request, false).await
1369 0 : }
1370 :
1371 : /// Adding a block is `POST ../block_gc`, removing a block is `POST ../unblock_gc`.
1372 : ///
1373 : /// Both are technically unsafe because they might fire off index uploads, thus they are POST.
1374 0 : async fn block_or_unblock_gc(
1375 0 : request: Request<Body>,
1376 0 : block: bool,
1377 0 : ) -> Result<Response<Body>, ApiError> {
1378 : use crate::tenant::{
1379 : remote_timeline_client::WaitCompletionError, upload_queue::NotInitialized,
1380 : };
1381 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1382 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1383 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1384 0 : let state = get_state(&request);
1385 :
1386 0 : let tenant = state
1387 0 : .tenant_manager
1388 0 : .get_attached_tenant_shard(tenant_shard_id)?;
1389 :
1390 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
1391 :
1392 0 : let timeline = tenant.get_timeline(timeline_id, true)?;
1393 :
1394 0 : let fut = async {
1395 0 : if block {
1396 0 : timeline.block_gc(&tenant).await.map(|_| ())
1397 : } else {
1398 0 : timeline.unblock_gc(&tenant).await
1399 : }
1400 0 : };
1401 :
1402 0 : let span = tracing::info_span!(
1403 : "block_or_unblock_gc",
1404 : tenant_id = %tenant_shard_id.tenant_id,
1405 0 : shard_id = %tenant_shard_id.shard_slug(),
1406 : timeline_id = %timeline_id,
1407 : block = block,
1408 : );
1409 :
1410 0 : let res = fut.instrument(span).await;
1411 :
1412 0 : res.map_err(|e| {
1413 0 : if e.is::<NotInitialized>() || e.is::<WaitCompletionError>() {
1414 0 : ApiError::ShuttingDown
1415 : } else {
1416 0 : ApiError::InternalServerError(e)
1417 : }
1418 0 : })?;
1419 :
1420 0 : json_response(StatusCode::OK, ())
1421 0 : }
1422 :
1423 : /// Get tenant_size SVG graph along with the JSON data.
1424 0 : fn synthetic_size_html_response(
1425 0 : inputs: ModelInputs,
1426 0 : storage_model: StorageModel,
1427 0 : sizes: SizeResult,
1428 0 : ) -> Result<Response<Body>, ApiError> {
1429 0 : let mut timeline_ids: Vec<String> = Vec::new();
1430 0 : let mut timeline_map: HashMap<TimelineId, usize> = HashMap::new();
1431 0 : for (index, ti) in inputs.timeline_inputs.iter().enumerate() {
1432 0 : timeline_map.insert(ti.timeline_id, index);
1433 0 : timeline_ids.push(ti.timeline_id.to_string());
1434 0 : }
1435 0 : let seg_to_branch: Vec<(usize, SvgBranchKind)> = inputs
1436 0 : .segments
1437 0 : .iter()
1438 0 : .map(|seg| {
1439 0 : (
1440 0 : *timeline_map.get(&seg.timeline_id).unwrap(),
1441 0 : seg.kind.into(),
1442 0 : )
1443 0 : })
1444 0 : .collect();
1445 :
1446 0 : let svg =
1447 0 : tenant_size_model::svg::draw_svg(&storage_model, &timeline_ids, &seg_to_branch, &sizes)
1448 0 : .map_err(ApiError::InternalServerError)?;
1449 :
1450 0 : let mut response = String::new();
1451 :
1452 : use std::fmt::Write;
1453 0 : write!(response, "<html>\n<body>\n").unwrap();
1454 0 : write!(response, "<div>\n{svg}\n</div>").unwrap();
1455 0 : writeln!(response, "Project size: {}", sizes.total_size).unwrap();
1456 0 : writeln!(response, "<pre>").unwrap();
1457 0 : writeln!(
1458 0 : response,
1459 0 : "{}",
1460 0 : serde_json::to_string_pretty(&inputs).unwrap()
1461 0 : )
1462 0 : .unwrap();
1463 0 : writeln!(
1464 0 : response,
1465 0 : "{}",
1466 0 : serde_json::to_string_pretty(&sizes.segments).unwrap()
1467 0 : )
1468 0 : .unwrap();
1469 0 : writeln!(response, "</pre>").unwrap();
1470 0 : write!(response, "</body>\n</html>\n").unwrap();
1471 0 :
1472 0 : html_response(StatusCode::OK, response)
1473 0 : }
1474 :
1475 0 : pub fn html_response(status: StatusCode, data: String) -> Result<Response<Body>, ApiError> {
1476 0 : let response = Response::builder()
1477 0 : .status(status)
1478 0 : .header(header::CONTENT_TYPE, "text/html")
1479 0 : .body(Body::from(data.as_bytes().to_vec()))
1480 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1481 0 : Ok(response)
1482 0 : }
1483 :
1484 0 : async fn get_tenant_config_handler(
1485 0 : request: Request<Body>,
1486 0 : _cancel: CancellationToken,
1487 0 : ) -> Result<Response<Body>, ApiError> {
1488 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1489 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1490 0 : let state = get_state(&request);
1491 :
1492 0 : let tenant = state
1493 0 : .tenant_manager
1494 0 : .get_attached_tenant_shard(tenant_shard_id)?;
1495 :
1496 0 : let response = HashMap::from([
1497 : (
1498 : "tenant_specific_overrides",
1499 0 : serde_json::to_value(tenant.tenant_specific_overrides())
1500 0 : .context("serializing tenant specific overrides")
1501 0 : .map_err(ApiError::InternalServerError)?,
1502 : ),
1503 : (
1504 0 : "effective_config",
1505 0 : serde_json::to_value(tenant.effective_config())
1506 0 : .context("serializing effective config")
1507 0 : .map_err(ApiError::InternalServerError)?,
1508 : ),
1509 : ]);
1510 :
1511 0 : json_response(StatusCode::OK, response)
1512 0 : }
1513 :
1514 0 : async fn update_tenant_config_handler(
1515 0 : mut request: Request<Body>,
1516 0 : _cancel: CancellationToken,
1517 0 : ) -> Result<Response<Body>, ApiError> {
1518 0 : let request_data: TenantConfigRequest = json_request(&mut request).await?;
1519 0 : let tenant_id = request_data.tenant_id;
1520 0 : check_permission(&request, Some(tenant_id))?;
1521 :
1522 0 : let new_tenant_conf =
1523 0 : TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1524 :
1525 0 : let state = get_state(&request);
1526 0 :
1527 0 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
1528 :
1529 0 : let tenant = state
1530 0 : .tenant_manager
1531 0 : .get_attached_tenant_shard(tenant_shard_id)?;
1532 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
1533 :
1534 : // This is a legacy API that only operates on attached tenants: the preferred
1535 : // API to use is the location_config/ endpoint, which lets the caller provide
1536 : // the full LocationConf.
1537 0 : let location_conf = LocationConf::attached_single(
1538 0 : new_tenant_conf.clone(),
1539 0 : tenant.get_generation(),
1540 0 : &ShardParameters::default(),
1541 0 : );
1542 0 :
1543 0 : crate::tenant::Tenant::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
1544 0 : .await
1545 0 : .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
1546 0 : tenant.set_new_tenant_config(new_tenant_conf);
1547 0 :
1548 0 : json_response(StatusCode::OK, ())
1549 0 : }
1550 :
1551 0 : async fn put_tenant_location_config_handler(
1552 0 : mut request: Request<Body>,
1553 0 : _cancel: CancellationToken,
1554 0 : ) -> Result<Response<Body>, ApiError> {
1555 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1556 :
1557 0 : let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
1558 0 : let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis);
1559 0 : let lazy = parse_query_param(&request, "lazy")?.unwrap_or(false);
1560 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1561 :
1562 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
1563 0 : let state = get_state(&request);
1564 0 : let conf = state.conf;
1565 0 :
1566 0 : // The `Detached` state is special, it doesn't upsert a tenant, it removes
1567 0 : // its local disk content and drops it from memory.
1568 0 : if let LocationConfigMode::Detached = request_data.config.mode {
1569 0 : if let Err(e) = state
1570 0 : .tenant_manager
1571 0 : .detach_tenant(conf, tenant_shard_id, &state.deletion_queue_client)
1572 0 : .instrument(info_span!("tenant_detach",
1573 : tenant_id = %tenant_shard_id.tenant_id,
1574 0 : shard_id = %tenant_shard_id.shard_slug()
1575 : ))
1576 0 : .await
1577 : {
1578 0 : match e {
1579 0 : TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
1580 0 : // This API is idempotent: a NotFound on a detach is fine.
1581 0 : }
1582 0 : _ => return Err(e.into()),
1583 : }
1584 0 : }
1585 0 : return json_response(StatusCode::OK, ());
1586 0 : }
1587 :
1588 0 : let location_conf =
1589 0 : LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
1590 :
1591 : // lazy==true queues up for activation or jumps the queue like normal when a compute connects,
1592 : // similar to at startup ordering.
1593 0 : let spawn_mode = if lazy {
1594 0 : tenant::SpawnMode::Lazy
1595 : } else {
1596 0 : tenant::SpawnMode::Eager
1597 : };
1598 :
1599 0 : let tenant = state
1600 0 : .tenant_manager
1601 0 : .upsert_location(tenant_shard_id, location_conf, flush, spawn_mode, &ctx)
1602 0 : .await?;
1603 0 : let stripe_size = tenant.as_ref().map(|t| t.get_shard_stripe_size());
1604 0 : let attached = tenant.is_some();
1605 :
1606 0 : if let Some(_flush_ms) = flush {
1607 0 : match state
1608 0 : .secondary_controller
1609 0 : .upload_tenant(tenant_shard_id)
1610 0 : .await
1611 : {
1612 : Ok(()) => {
1613 0 : tracing::info!("Uploaded heatmap during flush");
1614 : }
1615 0 : Err(e) => {
1616 0 : tracing::warn!("Failed to flush heatmap: {e}");
1617 : }
1618 : }
1619 : } else {
1620 0 : tracing::info!("No flush requested when configuring");
1621 : }
1622 :
1623 : // This API returns a vector of pageservers where the tenant is attached: this is
1624 : // primarily for use in the sharding service. For compatibilty, we also return this
1625 : // when called directly on a pageserver, but the payload is always zero or one shards.
1626 0 : let mut response = TenantLocationConfigResponse {
1627 0 : shards: Vec::new(),
1628 0 : stripe_size: None,
1629 0 : };
1630 0 : if attached {
1631 0 : response.shards.push(TenantShardLocation {
1632 0 : shard_id: tenant_shard_id,
1633 0 : node_id: state.conf.id,
1634 0 : });
1635 0 : if tenant_shard_id.shard_count.count() > 1 {
1636 : // Stripe size should be set if we are attached
1637 0 : debug_assert!(stripe_size.is_some());
1638 0 : response.stripe_size = stripe_size;
1639 0 : }
1640 0 : }
1641 :
1642 0 : json_response(StatusCode::OK, response)
1643 0 : }
1644 :
1645 0 : async fn list_location_config_handler(
1646 0 : request: Request<Body>,
1647 0 : _cancel: CancellationToken,
1648 0 : ) -> Result<Response<Body>, ApiError> {
1649 0 : let state = get_state(&request);
1650 0 : let slots = state.tenant_manager.list();
1651 0 : let result = LocationConfigListResponse {
1652 0 : tenant_shards: slots
1653 0 : .into_iter()
1654 0 : .map(|(tenant_shard_id, slot)| {
1655 0 : let v = match slot {
1656 0 : TenantSlot::Attached(t) => Some(t.get_location_conf()),
1657 0 : TenantSlot::Secondary(s) => Some(s.get_location_conf()),
1658 0 : TenantSlot::InProgress(_) => None,
1659 : };
1660 0 : (tenant_shard_id, v)
1661 0 : })
1662 0 : .collect(),
1663 0 : };
1664 0 : json_response(StatusCode::OK, result)
1665 0 : }
1666 :
1667 0 : async fn get_location_config_handler(
1668 0 : request: Request<Body>,
1669 0 : _cancel: CancellationToken,
1670 0 : ) -> Result<Response<Body>, ApiError> {
1671 0 : let state = get_state(&request);
1672 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1673 0 : let slot = state.tenant_manager.get(tenant_shard_id);
1674 :
1675 0 : let Some(slot) = slot else {
1676 0 : return Err(ApiError::NotFound(
1677 0 : anyhow::anyhow!("Tenant shard not found").into(),
1678 0 : ));
1679 : };
1680 :
1681 0 : let result: Option<LocationConfig> = match slot {
1682 0 : TenantSlot::Attached(t) => Some(t.get_location_conf()),
1683 0 : TenantSlot::Secondary(s) => Some(s.get_location_conf()),
1684 0 : TenantSlot::InProgress(_) => None,
1685 : };
1686 :
1687 0 : json_response(StatusCode::OK, result)
1688 0 : }
1689 :
1690 : // Do a time travel recovery on the given tenant/tenant shard. Tenant needs to be detached
1691 : // (from all pageservers) as it invalidates consistency assumptions.
1692 0 : async fn tenant_time_travel_remote_storage_handler(
1693 0 : request: Request<Body>,
1694 0 : cancel: CancellationToken,
1695 0 : ) -> Result<Response<Body>, ApiError> {
1696 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1697 :
1698 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1699 :
1700 0 : let timestamp_raw = must_get_query_param(&request, "travel_to")?;
1701 0 : let timestamp = humantime::parse_rfc3339(×tamp_raw)
1702 0 : .with_context(|| format!("Invalid time for travel_to: {timestamp_raw:?}"))
1703 0 : .map_err(ApiError::BadRequest)?;
1704 :
1705 0 : let done_if_after_raw = must_get_query_param(&request, "done_if_after")?;
1706 0 : let done_if_after = humantime::parse_rfc3339(&done_if_after_raw)
1707 0 : .with_context(|| format!("Invalid time for done_if_after: {done_if_after_raw:?}"))
1708 0 : .map_err(ApiError::BadRequest)?;
1709 :
1710 : // This is just a sanity check to fend off naive wrong usages of the API:
1711 : // the tenant needs to be detached *everywhere*
1712 0 : let state = get_state(&request);
1713 0 : let we_manage_tenant = state.tenant_manager.manages_tenant_shard(tenant_shard_id);
1714 0 : if we_manage_tenant {
1715 0 : return Err(ApiError::BadRequest(anyhow!(
1716 0 : "Tenant {tenant_shard_id} is already attached at this pageserver"
1717 0 : )));
1718 0 : }
1719 0 :
1720 0 : if timestamp > done_if_after {
1721 0 : return Err(ApiError::BadRequest(anyhow!(
1722 0 : "The done_if_after timestamp comes before the timestamp to recover to"
1723 0 : )));
1724 0 : }
1725 0 :
1726 0 : tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}");
1727 :
1728 0 : remote_timeline_client::upload::time_travel_recover_tenant(
1729 0 : &state.remote_storage,
1730 0 : &tenant_shard_id,
1731 0 : timestamp,
1732 0 : done_if_after,
1733 0 : &cancel,
1734 0 : )
1735 0 : .await
1736 0 : .map_err(|e| match e {
1737 0 : TimeTravelError::BadInput(e) => {
1738 0 : warn!("bad input error: {e}");
1739 0 : ApiError::BadRequest(anyhow!("bad input error"))
1740 : }
1741 : TimeTravelError::Unimplemented => {
1742 0 : ApiError::BadRequest(anyhow!("unimplemented for the configured remote storage"))
1743 : }
1744 0 : TimeTravelError::Cancelled => ApiError::InternalServerError(anyhow!("cancelled")),
1745 : TimeTravelError::TooManyVersions => {
1746 0 : ApiError::InternalServerError(anyhow!("too many versions in remote storage"))
1747 : }
1748 0 : TimeTravelError::Other(e) => {
1749 0 : warn!("internal error: {e}");
1750 0 : ApiError::InternalServerError(anyhow!("internal error"))
1751 : }
1752 0 : })?;
1753 :
1754 0 : json_response(StatusCode::OK, ())
1755 0 : }
1756 :
1757 : /// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
1758 0 : async fn handle_tenant_break(
1759 0 : r: Request<Body>,
1760 0 : _cancel: CancellationToken,
1761 0 : ) -> Result<Response<Body>, ApiError> {
1762 0 : let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
1763 :
1764 0 : let state = get_state(&r);
1765 0 : state
1766 0 : .tenant_manager
1767 0 : .get_attached_tenant_shard(tenant_shard_id)?
1768 0 : .set_broken("broken from test".to_owned())
1769 0 : .await;
1770 :
1771 0 : json_response(StatusCode::OK, ())
1772 0 : }
1773 :
1774 : // Obtains an lsn lease on the given timeline.
1775 0 : async fn lsn_lease_handler(
1776 0 : mut request: Request<Body>,
1777 0 : _cancel: CancellationToken,
1778 0 : ) -> Result<Response<Body>, ApiError> {
1779 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1780 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1781 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1782 0 : let lsn = json_request::<LsnLeaseRequest>(&mut request).await?.lsn;
1783 :
1784 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1785 0 :
1786 0 : let state = get_state(&request);
1787 :
1788 0 : let timeline =
1789 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
1790 0 : .await?;
1791 :
1792 0 : let result = async {
1793 0 : timeline
1794 0 : .init_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx)
1795 0 : .map_err(|e| {
1796 0 : ApiError::InternalServerError(
1797 0 : e.context(format!("invalid lsn lease request at {lsn}")),
1798 0 : )
1799 0 : })
1800 0 : }
1801 0 : .instrument(info_span!("init_lsn_lease", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1802 0 : .await?;
1803 :
1804 0 : json_response(StatusCode::OK, result)
1805 0 : }
1806 :
1807 : // Run GC immediately on given timeline.
1808 0 : async fn timeline_gc_handler(
1809 0 : mut request: Request<Body>,
1810 0 : cancel: CancellationToken,
1811 0 : ) -> Result<Response<Body>, ApiError> {
1812 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1813 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1814 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1815 :
1816 0 : let gc_req: TimelineGcRequest = json_request(&mut request).await?;
1817 :
1818 0 : let state = get_state(&request);
1819 0 :
1820 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1821 0 : let gc_result = state
1822 0 : .tenant_manager
1823 0 : .immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx)
1824 0 : .await?;
1825 :
1826 0 : json_response(StatusCode::OK, gc_result)
1827 0 : }
1828 :
1829 : // Run compaction immediately on given timeline.
1830 0 : async fn timeline_compact_handler(
1831 0 : request: Request<Body>,
1832 0 : cancel: CancellationToken,
1833 0 : ) -> Result<Response<Body>, ApiError> {
1834 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1835 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1836 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1837 :
1838 0 : let state = get_state(&request);
1839 0 :
1840 0 : let mut flags = EnumSet::empty();
1841 0 :
1842 0 : if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
1843 0 : flags |= CompactFlags::ForceL0Compaction;
1844 0 : }
1845 0 : if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
1846 0 : flags |= CompactFlags::ForceRepartition;
1847 0 : }
1848 0 : if Some(true) == parse_query_param::<_, bool>(&request, "force_image_layer_creation")? {
1849 0 : flags |= CompactFlags::ForceImageLayerCreation;
1850 0 : }
1851 0 : if Some(true) == parse_query_param::<_, bool>(&request, "enhanced_gc_bottom_most_compaction")? {
1852 0 : flags |= CompactFlags::EnhancedGcBottomMostCompaction;
1853 0 : }
1854 0 : if Some(true) == parse_query_param::<_, bool>(&request, "dry_run")? {
1855 0 : flags |= CompactFlags::DryRun;
1856 0 : }
1857 :
1858 0 : let wait_until_uploaded =
1859 0 : parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
1860 :
1861 0 : async {
1862 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1863 0 : let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
1864 0 : timeline
1865 0 : .compact(&cancel, flags, &ctx)
1866 0 : .await
1867 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
1868 0 : if wait_until_uploaded {
1869 0 : timeline.remote_client.wait_completion().await
1870 : // XXX map to correct ApiError for the cases where it's due to shutdown
1871 0 : .context("wait completion").map_err(ApiError::InternalServerError)?;
1872 0 : }
1873 0 : json_response(StatusCode::OK, ())
1874 0 : }
1875 0 : .instrument(info_span!("manual_compaction", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1876 0 : .await
1877 0 : }
1878 :
1879 : // Run offload immediately on given timeline.
1880 0 : async fn timeline_offload_handler(
1881 0 : request: Request<Body>,
1882 0 : _cancel: CancellationToken,
1883 0 : ) -> Result<Response<Body>, ApiError> {
1884 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1885 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1886 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1887 :
1888 0 : let state = get_state(&request);
1889 :
1890 0 : async {
1891 0 : let tenant = state
1892 0 : .tenant_manager
1893 0 : .get_attached_tenant_shard(tenant_shard_id)?;
1894 :
1895 0 : if tenant.get_offloaded_timeline(timeline_id).is_ok() {
1896 0 : return json_response(StatusCode::OK, ());
1897 0 : }
1898 0 : let timeline =
1899 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
1900 0 : .await?;
1901 :
1902 0 : if !tenant.timeline_has_no_attached_children(timeline_id) {
1903 0 : return Err(ApiError::PreconditionFailed(
1904 0 : "timeline has attached children".into(),
1905 0 : ));
1906 0 : }
1907 0 : if !timeline.can_offload() {
1908 0 : return Err(ApiError::PreconditionFailed(
1909 0 : "Timeline::can_offload() returned false".into(),
1910 0 : ));
1911 0 : }
1912 0 : offload_timeline(&tenant, &timeline)
1913 0 : .await
1914 0 : .map_err(ApiError::InternalServerError)?;
1915 :
1916 0 : json_response(StatusCode::OK, ())
1917 0 : }
1918 0 : .instrument(info_span!("manual_timeline_offload", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1919 0 : .await
1920 0 : }
1921 :
1922 : // Run checkpoint immediately on given timeline.
1923 0 : async fn timeline_checkpoint_handler(
1924 0 : request: Request<Body>,
1925 0 : cancel: CancellationToken,
1926 0 : ) -> Result<Response<Body>, ApiError> {
1927 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1928 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1929 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1930 :
1931 0 : let state = get_state(&request);
1932 0 :
1933 0 : let mut flags = EnumSet::empty();
1934 0 : if Some(true) == parse_query_param::<_, bool>(&request, "force_l0_compaction")? {
1935 0 : flags |= CompactFlags::ForceL0Compaction;
1936 0 : }
1937 0 : if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
1938 0 : flags |= CompactFlags::ForceRepartition;
1939 0 : }
1940 0 : if Some(true) == parse_query_param::<_, bool>(&request, "force_image_layer_creation")? {
1941 0 : flags |= CompactFlags::ForceImageLayerCreation;
1942 0 : }
1943 :
1944 : // By default, checkpoints come with a compaction, but this may be optionally disabled by tests that just want to flush + upload.
1945 0 : let compact = parse_query_param::<_, bool>(&request, "compact")?.unwrap_or(true);
1946 :
1947 0 : let wait_until_uploaded =
1948 0 : parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
1949 :
1950 0 : async {
1951 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
1952 0 : let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
1953 0 : timeline
1954 0 : .freeze_and_flush()
1955 0 : .await
1956 0 : .map_err(|e| {
1957 0 : match e {
1958 0 : tenant::timeline::FlushLayerError::Cancelled => ApiError::ShuttingDown,
1959 0 : other => ApiError::InternalServerError(other.into()),
1960 :
1961 : }
1962 0 : })?;
1963 0 : if compact {
1964 0 : timeline
1965 0 : .compact(&cancel, flags, &ctx)
1966 0 : .await
1967 0 : .map_err(|e|
1968 0 : match e {
1969 0 : CompactionError::ShuttingDown => ApiError::ShuttingDown,
1970 0 : CompactionError::Other(e) => ApiError::InternalServerError(e)
1971 0 : }
1972 0 : )?;
1973 0 : }
1974 :
1975 0 : if wait_until_uploaded {
1976 0 : tracing::info!("Waiting for uploads to complete...");
1977 0 : timeline.remote_client.wait_completion().await
1978 : // XXX map to correct ApiError for the cases where it's due to shutdown
1979 0 : .context("wait completion").map_err(ApiError::InternalServerError)?;
1980 0 : tracing::info!("Uploads completed up to {}", timeline.get_remote_consistent_lsn_projected().unwrap_or(Lsn(0)));
1981 0 : }
1982 :
1983 0 : json_response(StatusCode::OK, ())
1984 0 : }
1985 0 : .instrument(info_span!("manual_checkpoint", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
1986 0 : .await
1987 0 : }
1988 :
1989 0 : async fn timeline_download_remote_layers_handler_post(
1990 0 : mut request: Request<Body>,
1991 0 : _cancel: CancellationToken,
1992 0 : ) -> Result<Response<Body>, ApiError> {
1993 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
1994 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
1995 0 : let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?;
1996 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
1997 :
1998 0 : let state = get_state(&request);
1999 :
2000 0 : let timeline =
2001 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
2002 0 : .await?;
2003 0 : match timeline.spawn_download_all_remote_layers(body).await {
2004 0 : Ok(st) => json_response(StatusCode::ACCEPTED, st),
2005 0 : Err(st) => json_response(StatusCode::CONFLICT, st),
2006 : }
2007 0 : }
2008 :
2009 0 : async fn timeline_download_remote_layers_handler_get(
2010 0 : request: Request<Body>,
2011 0 : _cancel: CancellationToken,
2012 0 : ) -> Result<Response<Body>, ApiError> {
2013 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
2014 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
2015 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
2016 0 : let state = get_state(&request);
2017 :
2018 0 : let timeline =
2019 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
2020 0 : .await?;
2021 0 : let info = timeline
2022 0 : .get_download_all_remote_layers_task_info()
2023 0 : .context("task never started since last pageserver process start")
2024 0 : .map_err(|e| ApiError::NotFound(e.into()))?;
2025 0 : json_response(StatusCode::OK, info)
2026 0 : }
2027 :
2028 0 : async fn timeline_detach_ancestor_handler(
2029 0 : request: Request<Body>,
2030 0 : _cancel: CancellationToken,
2031 0 : ) -> Result<Response<Body>, ApiError> {
2032 : use crate::tenant::timeline::detach_ancestor;
2033 : use pageserver_api::models::detach_ancestor::AncestorDetached;
2034 :
2035 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
2036 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
2037 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
2038 :
2039 0 : let span = tracing::info_span!("detach_ancestor", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
2040 :
2041 0 : async move {
2042 0 : let mut options = detach_ancestor::Options::default();
2043 :
2044 0 : let rewrite_concurrency =
2045 0 : parse_query_param::<_, std::num::NonZeroUsize>(&request, "rewrite_concurrency")?;
2046 0 : let copy_concurrency =
2047 0 : parse_query_param::<_, std::num::NonZeroUsize>(&request, "copy_concurrency")?;
2048 :
2049 0 : [
2050 0 : (&mut options.rewrite_concurrency, rewrite_concurrency),
2051 0 : (&mut options.copy_concurrency, copy_concurrency),
2052 0 : ]
2053 0 : .into_iter()
2054 0 : .filter_map(|(target, val)| val.map(|val| (target, val)))
2055 0 : .for_each(|(target, val)| *target = val);
2056 0 :
2057 0 : let state = get_state(&request);
2058 :
2059 0 : let tenant = state
2060 0 : .tenant_manager
2061 0 : .get_attached_tenant_shard(tenant_shard_id)?;
2062 :
2063 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
2064 :
2065 0 : let ctx = RequestContext::new(TaskKind::DetachAncestor, DownloadBehavior::Download);
2066 0 : let ctx = &ctx;
2067 :
2068 0 : let timeline = tenant.get_timeline(timeline_id, true)?;
2069 :
2070 0 : let progress = timeline
2071 0 : .prepare_to_detach_from_ancestor(&tenant, options, ctx)
2072 0 : .await?;
2073 :
2074 : // uncomment to allow early as possible Tenant::drop
2075 : // drop(tenant);
2076 :
2077 0 : let resp = match progress {
2078 0 : detach_ancestor::Progress::Prepared(attempt, prepared) => {
2079 : // it would be great to tag the guard on to the tenant activation future
2080 0 : let reparented_timelines = state
2081 0 : .tenant_manager
2082 0 : .complete_detaching_timeline_ancestor(
2083 0 : tenant_shard_id,
2084 0 : timeline_id,
2085 0 : prepared,
2086 0 : attempt,
2087 0 : ctx,
2088 0 : )
2089 0 : .await?;
2090 :
2091 0 : AncestorDetached {
2092 0 : reparented_timelines,
2093 0 : }
2094 : }
2095 0 : detach_ancestor::Progress::Done(resp) => resp,
2096 : };
2097 :
2098 0 : json_response(StatusCode::OK, resp)
2099 0 : }
2100 0 : .instrument(span)
2101 0 : .await
2102 0 : }
2103 :
2104 0 : async fn deletion_queue_flush(
2105 0 : r: Request<Body>,
2106 0 : cancel: CancellationToken,
2107 0 : ) -> Result<Response<Body>, ApiError> {
2108 0 : let state = get_state(&r);
2109 :
2110 0 : let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
2111 0 :
2112 0 : let flush = async {
2113 0 : if execute {
2114 0 : state.deletion_queue_client.flush_execute().await
2115 : } else {
2116 0 : state.deletion_queue_client.flush().await
2117 : }
2118 0 : }
2119 : // DeletionQueueError's only case is shutting down.
2120 0 : .map_err(|_| ApiError::ShuttingDown);
2121 0 :
2122 0 : tokio::select! {
2123 0 : res = flush => {
2124 0 : res.map(|()| json_response(StatusCode::OK, ()))?
2125 : }
2126 0 : _ = cancel.cancelled() => {
2127 0 : Err(ApiError::ShuttingDown)
2128 : }
2129 : }
2130 0 : }
2131 :
2132 : /// Try if `GetPage@Lsn` is successful, useful for manual debugging.
2133 0 : async fn getpage_at_lsn_handler(
2134 0 : request: Request<Body>,
2135 0 : _cancel: CancellationToken,
2136 0 : ) -> Result<Response<Body>, ApiError> {
2137 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
2138 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
2139 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
2140 0 : let state = get_state(&request);
2141 :
2142 : struct Key(crate::repository::Key);
2143 :
2144 : impl std::str::FromStr for Key {
2145 : type Err = anyhow::Error;
2146 :
2147 0 : fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
2148 0 : crate::repository::Key::from_hex(s).map(Key)
2149 0 : }
2150 : }
2151 :
2152 0 : let key: Key = parse_query_param(&request, "key")?
2153 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
2154 0 : let lsn: Lsn = parse_query_param(&request, "lsn")?
2155 0 : .ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
2156 :
2157 0 : async {
2158 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
2159 0 : let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
2160 :
2161 0 : let page = timeline.get(key.0, lsn, &ctx).await?;
2162 :
2163 0 : Result::<_, ApiError>::Ok(
2164 0 : Response::builder()
2165 0 : .status(StatusCode::OK)
2166 0 : .header(header::CONTENT_TYPE, "application/octet-stream")
2167 0 : .body(hyper::Body::from(page))
2168 0 : .unwrap(),
2169 0 : )
2170 0 : }
2171 0 : .instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
2172 0 : .await
2173 0 : }
2174 :
2175 0 : async fn timeline_collect_keyspace(
2176 0 : request: Request<Body>,
2177 0 : _cancel: CancellationToken,
2178 0 : ) -> Result<Response<Body>, ApiError> {
2179 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
2180 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
2181 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
2182 0 : let state = get_state(&request);
2183 :
2184 0 : let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
2185 :
2186 0 : async {
2187 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
2188 0 : let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
2189 0 : let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
2190 0 : let (dense_ks, sparse_ks) = timeline
2191 0 : .collect_keyspace(at_lsn, &ctx)
2192 0 : .await
2193 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
2194 :
2195 : // This API is currently used by pagebench. Pagebench will iterate all keys within the keyspace.
2196 : // Therefore, we split dense/sparse keys in this API.
2197 0 : let res = pageserver_api::models::partitioning::Partitioning { keys: dense_ks, sparse_keys: sparse_ks, at_lsn };
2198 0 :
2199 0 : json_response(StatusCode::OK, res)
2200 0 : }
2201 0 : .instrument(info_span!("timeline_collect_keyspace", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
2202 0 : .await
2203 0 : }
2204 :
2205 0 : async fn active_timeline_of_active_tenant(
2206 0 : tenant_manager: &TenantManager,
2207 0 : tenant_shard_id: TenantShardId,
2208 0 : timeline_id: TimelineId,
2209 0 : ) -> Result<Arc<Timeline>, ApiError> {
2210 0 : let tenant = tenant_manager.get_attached_tenant_shard(tenant_shard_id)?;
2211 :
2212 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
2213 :
2214 0 : Ok(tenant.get_timeline(timeline_id, true)?)
2215 0 : }
2216 :
2217 0 : async fn always_panic_handler(
2218 0 : req: Request<Body>,
2219 0 : _cancel: CancellationToken,
2220 0 : ) -> Result<Response<Body>, ApiError> {
2221 0 : // Deliberately cause a panic to exercise the panic hook registered via std::panic::set_hook().
2222 0 : // For pageserver, the relevant panic hook is `tracing_panic_hook` , and the `sentry` crate's wrapper around it.
2223 0 : // Use catch_unwind to ensure that tokio nor hyper are distracted by our panic.
2224 0 : let query = req.uri().query();
2225 0 : let _ = std::panic::catch_unwind(|| {
2226 0 : panic!("unconditional panic for testing panic hook integration; request query: {query:?}")
2227 0 : });
2228 0 : json_response(StatusCode::NO_CONTENT, ())
2229 0 : }
2230 :
2231 0 : async fn disk_usage_eviction_run(
2232 0 : mut r: Request<Body>,
2233 0 : cancel: CancellationToken,
2234 0 : ) -> Result<Response<Body>, ApiError> {
2235 0 : check_permission(&r, None)?;
2236 :
2237 0 : #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
2238 : struct Config {
2239 : /// How many bytes to evict before reporting that pressure is relieved.
2240 : evict_bytes: u64,
2241 :
2242 : #[serde(default)]
2243 : eviction_order: pageserver_api::config::EvictionOrder,
2244 : }
2245 :
2246 : #[derive(Debug, Clone, Copy, serde::Serialize)]
2247 : struct Usage {
2248 : // remains unchanged after instantiation of the struct
2249 : evict_bytes: u64,
2250 : // updated by `add_available_bytes`
2251 : freed_bytes: u64,
2252 : }
2253 :
2254 : impl crate::disk_usage_eviction_task::Usage for Usage {
2255 0 : fn has_pressure(&self) -> bool {
2256 0 : self.evict_bytes > self.freed_bytes
2257 0 : }
2258 :
2259 0 : fn add_available_bytes(&mut self, bytes: u64) {
2260 0 : self.freed_bytes += bytes;
2261 0 : }
2262 : }
2263 :
2264 0 : let config = json_request::<Config>(&mut r).await?;
2265 :
2266 0 : let usage = Usage {
2267 0 : evict_bytes: config.evict_bytes,
2268 0 : freed_bytes: 0,
2269 0 : };
2270 0 :
2271 0 : let state = get_state(&r);
2272 0 : let eviction_state = state.disk_usage_eviction_state.clone();
2273 :
2274 0 : let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
2275 0 : &eviction_state,
2276 0 : &state.remote_storage,
2277 0 : usage,
2278 0 : &state.tenant_manager,
2279 0 : config.eviction_order.into(),
2280 0 : &cancel,
2281 0 : )
2282 0 : .await;
2283 :
2284 0 : info!(?res, "disk_usage_eviction_task_iteration_impl finished");
2285 :
2286 0 : let res = res.map_err(ApiError::InternalServerError)?;
2287 :
2288 0 : json_response(StatusCode::OK, res)
2289 0 : }
2290 :
2291 0 : async fn secondary_upload_handler(
2292 0 : request: Request<Body>,
2293 0 : _cancel: CancellationToken,
2294 0 : ) -> Result<Response<Body>, ApiError> {
2295 0 : let state = get_state(&request);
2296 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
2297 0 : state
2298 0 : .secondary_controller
2299 0 : .upload_tenant(tenant_shard_id)
2300 0 : .await
2301 0 : .map_err(ApiError::InternalServerError)?;
2302 :
2303 0 : json_response(StatusCode::OK, ())
2304 0 : }
2305 :
2306 0 : async fn tenant_scan_remote_handler(
2307 0 : request: Request<Body>,
2308 0 : cancel: CancellationToken,
2309 0 : ) -> Result<Response<Body>, ApiError> {
2310 0 : let state = get_state(&request);
2311 0 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
2312 :
2313 0 : let mut response = TenantScanRemoteStorageResponse::default();
2314 :
2315 0 : let (shards, _other_keys) =
2316 0 : list_remote_tenant_shards(&state.remote_storage, tenant_id, cancel.clone())
2317 0 : .await
2318 0 : .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
2319 :
2320 0 : for tenant_shard_id in shards {
2321 0 : let (timeline_ids, _other_keys) =
2322 0 : list_remote_timelines(&state.remote_storage, tenant_shard_id, cancel.clone())
2323 0 : .await
2324 0 : .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
2325 :
2326 0 : let mut generation = Generation::none();
2327 0 : for timeline_id in timeline_ids {
2328 0 : match download_index_part(
2329 0 : &state.remote_storage,
2330 0 : &tenant_shard_id,
2331 0 : &timeline_id,
2332 0 : Generation::MAX,
2333 0 : &cancel,
2334 0 : )
2335 0 : .instrument(info_span!("download_index_part",
2336 : tenant_id=%tenant_shard_id.tenant_id,
2337 0 : shard_id=%tenant_shard_id.shard_slug(),
2338 : %timeline_id))
2339 0 : .await
2340 : {
2341 0 : Ok((index_part, index_generation, _index_mtime)) => {
2342 0 : tracing::info!("Found timeline {tenant_shard_id}/{timeline_id} metadata (gen {index_generation:?}, {} layers, {} consistent LSN)",
2343 0 : index_part.layer_metadata.len(), index_part.metadata.disk_consistent_lsn());
2344 0 : generation = std::cmp::max(generation, index_generation);
2345 : }
2346 : Err(DownloadError::NotFound) => {
2347 : // This is normal for tenants that were created with multiple shards: they have an unsharded path
2348 : // containing the timeline's initdb tarball but no index. Otherwise it is a bit strange.
2349 0 : tracing::info!("Timeline path {tenant_shard_id}/{timeline_id} exists in remote storage but has no index, skipping");
2350 0 : continue;
2351 : }
2352 0 : Err(e) => {
2353 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(e)));
2354 : }
2355 : };
2356 : }
2357 :
2358 0 : response.shards.push(TenantScanRemoteStorageShard {
2359 0 : tenant_shard_id,
2360 0 : generation: generation.into(),
2361 0 : });
2362 : }
2363 :
2364 0 : if response.shards.is_empty() {
2365 0 : return Err(ApiError::NotFound(
2366 0 : anyhow::anyhow!("No shards found for tenant ID {tenant_id}").into(),
2367 0 : ));
2368 0 : }
2369 0 :
2370 0 : json_response(StatusCode::OK, response)
2371 0 : }
2372 :
2373 0 : async fn secondary_download_handler(
2374 0 : request: Request<Body>,
2375 0 : _cancel: CancellationToken,
2376 0 : ) -> Result<Response<Body>, ApiError> {
2377 0 : let state = get_state(&request);
2378 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
2379 0 : let wait = parse_query_param(&request, "wait_ms")?.map(Duration::from_millis);
2380 :
2381 : // We don't need this to issue the download request, but:
2382 : // - it enables us to cleanly return 404 if we get a request for an absent shard
2383 : // - we will use this to provide status feedback in the response
2384 0 : let Some(secondary_tenant) = state
2385 0 : .tenant_manager
2386 0 : .get_secondary_tenant_shard(tenant_shard_id)
2387 : else {
2388 0 : return Err(ApiError::NotFound(
2389 0 : anyhow::anyhow!("Shard {} not found", tenant_shard_id).into(),
2390 0 : ));
2391 : };
2392 :
2393 0 : let timeout = wait.unwrap_or(Duration::MAX);
2394 :
2395 0 : let result = tokio::time::timeout(
2396 0 : timeout,
2397 0 : state.secondary_controller.download_tenant(tenant_shard_id),
2398 0 : )
2399 0 : .await;
2400 :
2401 0 : let progress = secondary_tenant.progress.lock().unwrap().clone();
2402 :
2403 0 : let status = match result {
2404 : Ok(Ok(())) => {
2405 0 : if progress.layers_downloaded >= progress.layers_total {
2406 : // Download job ran to completion
2407 0 : StatusCode::OK
2408 : } else {
2409 : // Download dropped out without errors because it ran out of time budget
2410 0 : StatusCode::ACCEPTED
2411 : }
2412 : }
2413 : // Edge case: downloads aren't usually fallible: things like a missing heatmap are considered
2414 : // okay. We could get an error here in the unlikely edge case that the tenant
2415 : // was detached between our check above and executing the download job.
2416 0 : Ok(Err(e)) => return Err(ApiError::InternalServerError(e)),
2417 : // A timeout is not an error: we have started the download, we're just not done
2418 : // yet. The caller will get a response body indicating status.
2419 0 : Err(_) => StatusCode::ACCEPTED,
2420 : };
2421 :
2422 0 : json_response(status, progress)
2423 0 : }
2424 :
2425 0 : async fn secondary_status_handler(
2426 0 : request: Request<Body>,
2427 0 : _cancel: CancellationToken,
2428 0 : ) -> Result<Response<Body>, ApiError> {
2429 0 : let state = get_state(&request);
2430 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
2431 :
2432 0 : let Some(secondary_tenant) = state
2433 0 : .tenant_manager
2434 0 : .get_secondary_tenant_shard(tenant_shard_id)
2435 : else {
2436 0 : return Err(ApiError::NotFound(
2437 0 : anyhow::anyhow!("Shard {} not found", tenant_shard_id).into(),
2438 0 : ));
2439 : };
2440 :
2441 0 : let progress = secondary_tenant.progress.lock().unwrap().clone();
2442 0 :
2443 0 : json_response(StatusCode::OK, progress)
2444 0 : }
2445 :
2446 0 : async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
2447 0 : json_response(
2448 0 : StatusCode::NOT_FOUND,
2449 0 : HttpErrorBody::from_msg("page not found".to_owned()),
2450 0 : )
2451 0 : }
2452 :
2453 0 : async fn post_tracing_event_handler(
2454 0 : mut r: Request<Body>,
2455 0 : _cancel: CancellationToken,
2456 0 : ) -> Result<Response<Body>, ApiError> {
2457 0 : #[derive(Debug, serde::Deserialize)]
2458 : #[serde(rename_all = "lowercase")]
2459 : enum Level {
2460 : Error,
2461 : Warn,
2462 : Info,
2463 : Debug,
2464 : Trace,
2465 : }
2466 0 : #[derive(Debug, serde::Deserialize)]
2467 : struct Request {
2468 : level: Level,
2469 : message: String,
2470 : }
2471 0 : let body: Request = json_request(&mut r)
2472 0 : .await
2473 0 : .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
2474 :
2475 0 : match body.level {
2476 0 : Level::Error => tracing::error!(?body.message),
2477 0 : Level::Warn => tracing::warn!(?body.message),
2478 0 : Level::Info => tracing::info!(?body.message),
2479 0 : Level::Debug => tracing::debug!(?body.message),
2480 0 : Level::Trace => tracing::trace!(?body.message),
2481 : }
2482 :
2483 0 : json_response(StatusCode::OK, ())
2484 0 : }
2485 :
2486 0 : async fn put_io_engine_handler(
2487 0 : mut r: Request<Body>,
2488 0 : _cancel: CancellationToken,
2489 0 : ) -> Result<Response<Body>, ApiError> {
2490 0 : check_permission(&r, None)?;
2491 0 : let kind: crate::virtual_file::IoEngineKind = json_request(&mut r).await?;
2492 0 : crate::virtual_file::io_engine::set(kind);
2493 0 : json_response(StatusCode::OK, ())
2494 0 : }
2495 :
2496 0 : async fn put_io_mode_handler(
2497 0 : mut r: Request<Body>,
2498 0 : _cancel: CancellationToken,
2499 0 : ) -> Result<Response<Body>, ApiError> {
2500 0 : check_permission(&r, None)?;
2501 0 : let mode: IoMode = json_request(&mut r).await?;
2502 0 : crate::virtual_file::set_io_mode(mode);
2503 0 : json_response(StatusCode::OK, ())
2504 0 : }
2505 :
2506 : /// Polled by control plane.
2507 : ///
2508 : /// See [`crate::utilization`].
2509 0 : async fn get_utilization(
2510 0 : r: Request<Body>,
2511 0 : _cancel: CancellationToken,
2512 0 : ) -> Result<Response<Body>, ApiError> {
2513 0 : fail::fail_point!("get-utilization-http-handler", |_| {
2514 0 : Err(ApiError::ResourceUnavailable("failpoint".into()))
2515 0 : });
2516 :
2517 : // this probably could be completely public, but lets make that change later.
2518 0 : check_permission(&r, None)?;
2519 :
2520 0 : let state = get_state(&r);
2521 0 : let mut g = state.latest_utilization.lock().await;
2522 :
2523 0 : let regenerate_every = Duration::from_secs(1);
2524 0 : let still_valid = g
2525 0 : .as_ref()
2526 0 : .is_some_and(|(captured_at, _)| captured_at.elapsed() < regenerate_every);
2527 0 :
2528 0 : // avoid needless statvfs calls even though those should be non-blocking fast.
2529 0 : // regenerate at most 1Hz to allow polling at any rate.
2530 0 : if !still_valid {
2531 0 : let path = state.conf.tenants_path();
2532 0 : let doc =
2533 0 : crate::utilization::regenerate(state.conf, path.as_std_path(), &state.tenant_manager)
2534 0 : .map_err(ApiError::InternalServerError)?;
2535 :
2536 0 : let mut buf = Vec::new();
2537 0 : serde_json::to_writer(&mut buf, &doc)
2538 0 : .context("serialize")
2539 0 : .map_err(ApiError::InternalServerError)?;
2540 :
2541 0 : let body = bytes::Bytes::from(buf);
2542 0 :
2543 0 : *g = Some((std::time::Instant::now(), body));
2544 0 : }
2545 :
2546 : // hyper 0.14 doesn't yet have Response::clone so this is a bit of extra legwork
2547 0 : let cached = g.as_ref().expect("just set").1.clone();
2548 0 :
2549 0 : Response::builder()
2550 0 : .header(hyper::http::header::CONTENT_TYPE, "application/json")
2551 0 : // thought of using http date header, but that is second precision which does not give any
2552 0 : // debugging aid
2553 0 : .status(StatusCode::OK)
2554 0 : .body(hyper::Body::from(cached))
2555 0 : .context("build response")
2556 0 : .map_err(ApiError::InternalServerError)
2557 0 : }
2558 :
2559 0 : async fn list_aux_files(
2560 0 : mut request: Request<Body>,
2561 0 : _cancel: CancellationToken,
2562 0 : ) -> Result<Response<Body>, ApiError> {
2563 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
2564 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
2565 0 : let body: ListAuxFilesRequest = json_request(&mut request).await?;
2566 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
2567 :
2568 0 : let state = get_state(&request);
2569 :
2570 0 : let timeline =
2571 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
2572 0 : .await?;
2573 :
2574 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
2575 0 : let files = timeline.list_aux_files(body.lsn, &ctx).await?;
2576 0 : json_response(StatusCode::OK, files)
2577 0 : }
2578 :
2579 0 : async fn perf_info(
2580 0 : request: Request<Body>,
2581 0 : _cancel: CancellationToken,
2582 0 : ) -> Result<Response<Body>, ApiError> {
2583 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
2584 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
2585 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
2586 :
2587 0 : let state = get_state(&request);
2588 :
2589 0 : let timeline =
2590 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
2591 0 : .await?;
2592 :
2593 0 : let result = timeline.perf_info().await;
2594 :
2595 0 : json_response(StatusCode::OK, result)
2596 0 : }
2597 :
2598 0 : async fn ingest_aux_files(
2599 0 : mut request: Request<Body>,
2600 0 : _cancel: CancellationToken,
2601 0 : ) -> Result<Response<Body>, ApiError> {
2602 0 : let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
2603 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
2604 0 : let body: IngestAuxFilesRequest = json_request(&mut request).await?;
2605 0 : check_permission(&request, Some(tenant_shard_id.tenant_id))?;
2606 :
2607 0 : let state = get_state(&request);
2608 :
2609 0 : let timeline =
2610 0 : active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
2611 0 : .await?;
2612 :
2613 0 : let mut modification = timeline.begin_modification(
2614 0 : Lsn(timeline.get_last_record_lsn().0 + 8), /* advance LSN by 8 */
2615 0 : );
2616 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
2617 0 : for (fname, content) in body.aux_files {
2618 0 : modification
2619 0 : .put_file(&fname, content.as_bytes(), &ctx)
2620 0 : .await
2621 0 : .map_err(ApiError::InternalServerError)?;
2622 : }
2623 0 : modification
2624 0 : .commit(&ctx)
2625 0 : .await
2626 0 : .map_err(ApiError::InternalServerError)?;
2627 :
2628 0 : json_response(StatusCode::OK, ())
2629 0 : }
2630 :
2631 : /// Report on the largest tenants on this pageserver, for the storage controller to identify
2632 : /// candidates for splitting
2633 0 : async fn post_top_tenants(
2634 0 : mut r: Request<Body>,
2635 0 : _cancel: CancellationToken,
2636 0 : ) -> Result<Response<Body>, ApiError> {
2637 0 : check_permission(&r, None)?;
2638 0 : let request: TopTenantShardsRequest = json_request(&mut r).await?;
2639 0 : let state = get_state(&r);
2640 :
2641 0 : fn get_size_metric(sizes: &TopTenantShardItem, order_by: &TenantSorting) -> u64 {
2642 0 : match order_by {
2643 0 : TenantSorting::ResidentSize => sizes.resident_size,
2644 0 : TenantSorting::MaxLogicalSize => sizes.max_logical_size,
2645 : }
2646 0 : }
2647 :
2648 : #[derive(Eq, PartialEq)]
2649 : struct HeapItem {
2650 : metric: u64,
2651 : sizes: TopTenantShardItem,
2652 : }
2653 :
2654 : impl PartialOrd for HeapItem {
2655 0 : fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
2656 0 : Some(self.cmp(other))
2657 0 : }
2658 : }
2659 :
2660 : /// Heap items have reverse ordering on their metric: this enables using BinaryHeap, which
2661 : /// supports popping the greatest item but not the smallest.
2662 : impl Ord for HeapItem {
2663 0 : fn cmp(&self, other: &Self) -> std::cmp::Ordering {
2664 0 : Reverse(self.metric).cmp(&Reverse(other.metric))
2665 0 : }
2666 : }
2667 :
2668 0 : let mut top_n: BinaryHeap<HeapItem> = BinaryHeap::with_capacity(request.limit);
2669 :
2670 : // FIXME: this is a lot of clones to take this tenant list
2671 0 : for (tenant_shard_id, tenant_slot) in state.tenant_manager.list() {
2672 0 : if let Some(shards_lt) = request.where_shards_lt {
2673 : // Ignore tenants which already have >= this many shards
2674 0 : if tenant_shard_id.shard_count >= shards_lt {
2675 0 : continue;
2676 0 : }
2677 0 : }
2678 :
2679 0 : let sizes = match tenant_slot {
2680 0 : TenantSlot::Attached(tenant) => tenant.get_sizes(),
2681 : TenantSlot::Secondary(_) | TenantSlot::InProgress(_) => {
2682 0 : continue;
2683 : }
2684 : };
2685 0 : let metric = get_size_metric(&sizes, &request.order_by);
2686 :
2687 0 : if let Some(gt) = request.where_gt {
2688 : // Ignore tenants whose metric is <= the lower size threshold, to do less sorting work
2689 0 : if metric <= gt {
2690 0 : continue;
2691 0 : }
2692 0 : };
2693 :
2694 0 : match top_n.peek() {
2695 0 : None => {
2696 0 : // Top N list is empty: candidate becomes first member
2697 0 : top_n.push(HeapItem { metric, sizes });
2698 0 : }
2699 0 : Some(i) if i.metric > metric && top_n.len() < request.limit => {
2700 0 : // Lowest item in list is greater than our candidate, but we aren't at limit yet: push to end
2701 0 : top_n.push(HeapItem { metric, sizes });
2702 0 : }
2703 0 : Some(i) if i.metric > metric => {
2704 0 : // List is at limit and lowest value is greater than our candidate, drop it.
2705 0 : }
2706 0 : Some(_) => top_n.push(HeapItem { metric, sizes }),
2707 : }
2708 :
2709 0 : while top_n.len() > request.limit {
2710 0 : top_n.pop();
2711 0 : }
2712 : }
2713 :
2714 0 : json_response(
2715 0 : StatusCode::OK,
2716 0 : TopTenantShardsResponse {
2717 0 : shards: top_n.into_iter().map(|i| i.sizes).collect(),
2718 0 : },
2719 0 : )
2720 0 : }
2721 :
2722 0 : async fn put_tenant_timeline_import_basebackup(
2723 0 : request: Request<Body>,
2724 0 : _cancel: CancellationToken,
2725 0 : ) -> Result<Response<Body>, ApiError> {
2726 0 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
2727 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
2728 0 : let base_lsn: Lsn = must_parse_query_param(&request, "base_lsn")?;
2729 0 : let end_lsn: Lsn = must_parse_query_param(&request, "end_lsn")?;
2730 0 : let pg_version: u32 = must_parse_query_param(&request, "pg_version")?;
2731 :
2732 0 : check_permission(&request, Some(tenant_id))?;
2733 :
2734 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
2735 :
2736 0 : let span = info_span!("import_basebackup", tenant_id=%tenant_id, timeline_id=%timeline_id, base_lsn=%base_lsn, end_lsn=%end_lsn, pg_version=%pg_version);
2737 0 : async move {
2738 0 : let state = get_state(&request);
2739 0 : let tenant = state
2740 0 : .tenant_manager
2741 0 : .get_attached_tenant_shard(TenantShardId::unsharded(tenant_id))?;
2742 :
2743 0 : let broker_client = state.broker_client.clone();
2744 0 :
2745 0 : let mut body = StreamReader::new(request.into_body().map(|res| {
2746 0 : res.map_err(|error| {
2747 0 : std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!(error))
2748 0 : })
2749 0 : }));
2750 0 :
2751 0 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
2752 :
2753 0 : let timeline = tenant
2754 0 : .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
2755 0 : .map_err(ApiError::InternalServerError)
2756 0 : .await?;
2757 :
2758 : // TODO mark timeline as not ready until it reaches end_lsn.
2759 : // We might have some wal to import as well, and we should prevent compute
2760 : // from connecting before that and writing conflicting wal.
2761 : //
2762 : // This is not relevant for pageserver->pageserver migrations, since there's
2763 : // no wal to import. But should be fixed if we want to import from postgres.
2764 :
2765 : // TODO leave clean state on error. For now you can use detach to clean
2766 : // up broken state from a failed import.
2767 :
2768 : // Import basebackup provided via CopyData
2769 0 : info!("importing basebackup");
2770 :
2771 0 : timeline
2772 0 : .import_basebackup_from_tar(tenant.clone(), &mut body, base_lsn, broker_client, &ctx)
2773 0 : .await
2774 0 : .map_err(ApiError::InternalServerError)?;
2775 :
2776 : // Read the end of the tar archive.
2777 0 : read_tar_eof(body)
2778 0 : .await
2779 0 : .map_err(ApiError::InternalServerError)?;
2780 :
2781 : // TODO check checksum
2782 : // Meanwhile you can verify client-side by taking fullbackup
2783 : // and checking that it matches in size with what was imported.
2784 : // It wouldn't work if base came from vanilla postgres though,
2785 : // since we discard some log files.
2786 :
2787 0 : info!("done");
2788 0 : json_response(StatusCode::OK, ())
2789 0 : }
2790 0 : .instrument(span)
2791 0 : .await
2792 0 : }
2793 :
2794 0 : async fn put_tenant_timeline_import_wal(
2795 0 : request: Request<Body>,
2796 0 : _cancel: CancellationToken,
2797 0 : ) -> Result<Response<Body>, ApiError> {
2798 0 : let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
2799 0 : let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
2800 0 : let start_lsn: Lsn = must_parse_query_param(&request, "start_lsn")?;
2801 0 : let end_lsn: Lsn = must_parse_query_param(&request, "end_lsn")?;
2802 :
2803 0 : check_permission(&request, Some(tenant_id))?;
2804 :
2805 0 : let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
2806 :
2807 0 : let span = info_span!("import_wal", tenant_id=%tenant_id, timeline_id=%timeline_id, start_lsn=%start_lsn, end_lsn=%end_lsn);
2808 0 : async move {
2809 0 : let state = get_state(&request);
2810 :
2811 0 : let timeline = active_timeline_of_active_tenant(&state.tenant_manager, TenantShardId::unsharded(tenant_id), timeline_id).await?;
2812 :
2813 0 : let mut body = StreamReader::new(request.into_body().map(|res| {
2814 0 : res.map_err(|error| {
2815 0 : std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!(error))
2816 0 : })
2817 0 : }));
2818 0 :
2819 0 : let last_record_lsn = timeline.get_last_record_lsn();
2820 0 : if last_record_lsn != start_lsn {
2821 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}")));
2822 0 : }
2823 0 :
2824 0 : // TODO leave clean state on error. For now you can use detach to clean
2825 0 : // up broken state from a failed import.
2826 0 :
2827 0 : // Import wal provided via CopyData
2828 0 : info!("importing wal");
2829 0 : crate::import_datadir::import_wal_from_tar(&timeline, &mut body, start_lsn, end_lsn, &ctx).await.map_err(ApiError::InternalServerError)?;
2830 0 : info!("wal import complete");
2831 :
2832 : // Read the end of the tar archive.
2833 0 : read_tar_eof(body).await.map_err(ApiError::InternalServerError)?;
2834 :
2835 : // TODO Does it make sense to overshoot?
2836 0 : if timeline.get_last_record_lsn() < end_lsn {
2837 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}")));
2838 0 : }
2839 0 :
2840 0 : // Flush data to disk, then upload to s3. No need for a forced checkpoint.
2841 0 : // We only want to persist the data, and it doesn't matter if it's in the
2842 0 : // shape of deltas or images.
2843 0 : info!("flushing layers");
2844 0 : timeline.freeze_and_flush().await.map_err(|e| match e {
2845 0 : tenant::timeline::FlushLayerError::Cancelled => ApiError::ShuttingDown,
2846 0 : other => ApiError::InternalServerError(anyhow::anyhow!(other)),
2847 0 : })?;
2848 :
2849 0 : info!("done");
2850 :
2851 0 : json_response(StatusCode::OK, ())
2852 0 : }.instrument(span).await
2853 0 : }
2854 :
2855 : /// Read the end of a tar archive.
2856 : ///
2857 : /// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
2858 : /// `tokio_tar` already read the first such block. Read the second all-zeros block,
2859 : /// and check that there is no more data after the EOF marker.
2860 : ///
2861 : /// 'tar' command can also write extra blocks of zeros, up to a record
2862 : /// size, controlled by the --record-size argument. Ignore them too.
2863 0 : async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow::Result<()> {
2864 : use tokio::io::AsyncReadExt;
2865 0 : let mut buf = [0u8; 512];
2866 0 :
2867 0 : // Read the all-zeros block, and verify it
2868 0 : let mut total_bytes = 0;
2869 0 : while total_bytes < 512 {
2870 0 : let nbytes = reader.read(&mut buf[total_bytes..]).await?;
2871 0 : total_bytes += nbytes;
2872 0 : if nbytes == 0 {
2873 0 : break;
2874 0 : }
2875 : }
2876 0 : if total_bytes < 512 {
2877 0 : anyhow::bail!("incomplete or invalid tar EOF marker");
2878 0 : }
2879 0 : if !buf.iter().all(|&x| x == 0) {
2880 0 : anyhow::bail!("invalid tar EOF marker");
2881 0 : }
2882 0 :
2883 0 : // Drain any extra zero-blocks after the EOF marker
2884 0 : let mut trailing_bytes = 0;
2885 0 : let mut seen_nonzero_bytes = false;
2886 : loop {
2887 0 : let nbytes = reader.read(&mut buf).await?;
2888 0 : trailing_bytes += nbytes;
2889 0 : if !buf.iter().all(|&x| x == 0) {
2890 0 : seen_nonzero_bytes = true;
2891 0 : }
2892 0 : if nbytes == 0 {
2893 0 : break;
2894 0 : }
2895 : }
2896 0 : if seen_nonzero_bytes {
2897 0 : anyhow::bail!("unexpected non-zero bytes after the tar archive");
2898 0 : }
2899 0 : if trailing_bytes % 512 != 0 {
2900 0 : anyhow::bail!("unexpected number of zeros ({trailing_bytes}), not divisible by tar block size (512 bytes), after the tar archive");
2901 0 : }
2902 0 : Ok(())
2903 0 : }
2904 :
2905 : /// Common functionality of all the HTTP API handlers.
2906 : ///
2907 : /// - Adds a tracing span to each request (by `request_span`)
2908 : /// - Logs the request depending on the request method (by `request_span`)
2909 : /// - Logs the response if it was not successful (by `request_span`
2910 : /// - Shields the handler function from async cancellations. Hyper can drop the handler
2911 : /// Future if the connection to the client is lost, but most of the pageserver code is
2912 : /// not async cancellation safe. This converts the dropped future into a graceful cancellation
2913 : /// request with a CancellationToken.
2914 0 : async fn api_handler<R, H>(request: Request<Body>, handler: H) -> Result<Response<Body>, ApiError>
2915 0 : where
2916 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
2917 0 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
2918 0 : {
2919 0 : if request.uri() != &"/v1/failpoints".parse::<Uri>().unwrap() {
2920 0 : fail::fail_point!("api-503", |_| Err(ApiError::ResourceUnavailable(
2921 0 : "failpoint".into()
2922 0 : )));
2923 :
2924 0 : fail::fail_point!("api-500", |_| Err(ApiError::InternalServerError(
2925 0 : anyhow::anyhow!("failpoint")
2926 0 : )));
2927 0 : }
2928 :
2929 : // Spawn a new task to handle the request, to protect the handler from unexpected
2930 : // async cancellations. Most pageserver functions are not async cancellation safe.
2931 : // We arm a drop-guard, so that if Hyper drops the Future, we signal the task
2932 : // with the cancellation token.
2933 0 : let token = CancellationToken::new();
2934 0 : let cancel_guard = token.clone().drop_guard();
2935 0 : let result = request_span(request, move |r| async {
2936 0 : let handle = tokio::spawn(
2937 0 : async {
2938 0 : let token_cloned = token.clone();
2939 0 : let result = handler(r, token).await;
2940 0 : if token_cloned.is_cancelled() {
2941 : // dropguard has executed: we will never turn this result into response.
2942 : //
2943 : // at least temporarily do {:?} logging; these failures are rare enough but
2944 : // could hide difficult errors.
2945 0 : match &result {
2946 0 : Ok(response) => {
2947 0 : let status = response.status();
2948 0 : info!(%status, "Cancelled request finished successfully")
2949 : }
2950 0 : Err(e) => error!("Cancelled request finished with an error: {e:?}"),
2951 : }
2952 0 : }
2953 : // only logging for cancelled panicked request handlers is the tracing_panic_hook,
2954 : // which should suffice.
2955 : //
2956 : // there is still a chance to lose the result due to race between
2957 : // returning from here and the actual connection closing happening
2958 : // before outer task gets to execute. leaving that up for #5815.
2959 0 : result
2960 0 : }
2961 0 : .in_current_span(),
2962 0 : );
2963 0 :
2964 0 : match handle.await {
2965 : // TODO: never actually return Err from here, always Ok(...) so that we can log
2966 : // spanned errors. Call api_error_handler instead and return appropriate Body.
2967 0 : Ok(result) => result,
2968 0 : Err(e) => {
2969 0 : // The handler task panicked. We have a global panic handler that logs the
2970 0 : // panic with its backtrace, so no need to log that here. Only log a brief
2971 0 : // message to make it clear that we returned the error to the client.
2972 0 : error!("HTTP request handler task panicked: {e:#}");
2973 :
2974 : // Don't return an Error here, because then fallback error handler that was
2975 : // installed in make_router() will print the error. Instead, construct the
2976 : // HTTP error response and return that.
2977 0 : Ok(
2978 0 : ApiError::InternalServerError(anyhow!("HTTP request handler task panicked"))
2979 0 : .into_response(),
2980 0 : )
2981 : }
2982 : }
2983 0 : })
2984 0 : .await;
2985 :
2986 0 : cancel_guard.disarm();
2987 0 :
2988 0 : result
2989 0 : }
2990 :
2991 : /// Like api_handler, but returns an error response if the server is built without
2992 : /// the 'testing' feature.
2993 0 : async fn testing_api_handler<R, H>(
2994 0 : desc: &str,
2995 0 : request: Request<Body>,
2996 0 : handler: H,
2997 0 : ) -> Result<Response<Body>, ApiError>
2998 0 : where
2999 0 : R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
3000 0 : H: FnOnce(Request<Body>, CancellationToken) -> R + Send + Sync + 'static,
3001 0 : {
3002 0 : if cfg!(feature = "testing") {
3003 0 : api_handler(request, handler).await
3004 : } else {
3005 0 : std::future::ready(Err(ApiError::BadRequest(anyhow!(
3006 0 : "Cannot {desc} because pageserver was compiled without testing APIs",
3007 0 : ))))
3008 0 : .await
3009 : }
3010 0 : }
3011 :
3012 0 : pub fn make_router(
3013 0 : state: Arc<State>,
3014 0 : launch_ts: &'static LaunchTimestamp,
3015 0 : auth: Option<Arc<SwappableJwtAuth>>,
3016 0 : ) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
3017 0 : let spec = include_bytes!("openapi_spec.yml");
3018 0 : let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
3019 0 : if auth.is_some() {
3020 0 : router = router.middleware(auth_middleware(|request| {
3021 0 : let state = get_state(request);
3022 0 : if state.allowlist_routes.contains(request.uri()) {
3023 0 : None
3024 : } else {
3025 0 : state.auth.as_deref()
3026 : }
3027 0 : }))
3028 0 : }
3029 :
3030 0 : router = router.middleware(
3031 0 : endpoint::add_response_header_middleware(
3032 0 : "PAGESERVER_LAUNCH_TIMESTAMP",
3033 0 : &launch_ts.to_string(),
3034 0 : )
3035 0 : .expect("construct launch timestamp header middleware"),
3036 0 : );
3037 0 :
3038 0 : Ok(router
3039 0 : .data(state)
3040 0 : .get("/metrics", |r| request_span(r, prometheus_metrics_handler))
3041 0 : .get("/v1/status", |r| api_handler(r, status_handler))
3042 0 : .put("/v1/failpoints", |r| {
3043 0 : testing_api_handler("manage failpoints", r, failpoints_handler)
3044 0 : })
3045 0 : .post("/v1/reload_auth_validation_keys", |r| {
3046 0 : api_handler(r, reload_auth_validation_keys_handler)
3047 0 : })
3048 0 : .get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
3049 0 : .get("/v1/tenant/:tenant_shard_id", |r| {
3050 0 : api_handler(r, tenant_status)
3051 0 : })
3052 0 : .delete("/v1/tenant/:tenant_shard_id", |r| {
3053 0 : api_handler(r, tenant_delete_handler)
3054 0 : })
3055 0 : .get("/v1/tenant/:tenant_shard_id/synthetic_size", |r| {
3056 0 : api_handler(r, tenant_size_handler)
3057 0 : })
3058 0 : .put("/v1/tenant/config", |r| {
3059 0 : api_handler(r, update_tenant_config_handler)
3060 0 : })
3061 0 : .put("/v1/tenant/:tenant_shard_id/shard_split", |r| {
3062 0 : api_handler(r, tenant_shard_split_handler)
3063 0 : })
3064 0 : .get("/v1/tenant/:tenant_shard_id/config", |r| {
3065 0 : api_handler(r, get_tenant_config_handler)
3066 0 : })
3067 0 : .put("/v1/tenant/:tenant_shard_id/location_config", |r| {
3068 0 : api_handler(r, put_tenant_location_config_handler)
3069 0 : })
3070 0 : .get("/v1/location_config", |r| {
3071 0 : api_handler(r, list_location_config_handler)
3072 0 : })
3073 0 : .get("/v1/location_config/:tenant_shard_id", |r| {
3074 0 : api_handler(r, get_location_config_handler)
3075 0 : })
3076 0 : .put(
3077 0 : "/v1/tenant/:tenant_shard_id/time_travel_remote_storage",
3078 0 : |r| api_handler(r, tenant_time_travel_remote_storage_handler),
3079 0 : )
3080 0 : .get("/v1/tenant/:tenant_shard_id/timeline", |r| {
3081 0 : api_handler(r, timeline_list_handler)
3082 0 : })
3083 0 : .get("/v1/tenant/:tenant_shard_id/timeline_and_offloaded", |r| {
3084 0 : api_handler(r, timeline_and_offloaded_list_handler)
3085 0 : })
3086 0 : .post("/v1/tenant/:tenant_shard_id/timeline", |r| {
3087 0 : api_handler(r, timeline_create_handler)
3088 0 : })
3089 0 : .post("/v1/tenant/:tenant_shard_id/reset", |r| {
3090 0 : api_handler(r, tenant_reset_handler)
3091 0 : })
3092 0 : .post(
3093 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/preserve_initdb_archive",
3094 0 : |r| api_handler(r, timeline_preserve_initdb_handler),
3095 0 : )
3096 0 : .put(
3097 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/archival_config",
3098 0 : |r| api_handler(r, timeline_archival_config_handler),
3099 0 : )
3100 0 : .get("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
3101 0 : api_handler(r, timeline_detail_handler)
3102 0 : })
3103 0 : .get(
3104 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_lsn_by_timestamp",
3105 0 : |r| api_handler(r, get_lsn_by_timestamp_handler),
3106 0 : )
3107 0 : .get(
3108 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_timestamp_of_lsn",
3109 0 : |r| api_handler(r, get_timestamp_of_lsn_handler),
3110 0 : )
3111 0 : .post(
3112 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/lsn_lease",
3113 0 : |r| api_handler(r, lsn_lease_handler),
3114 0 : )
3115 0 : .put(
3116 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
3117 0 : |r| api_handler(r, timeline_gc_handler),
3118 0 : )
3119 0 : .put(
3120 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
3121 0 : |r| api_handler(r, timeline_compact_handler),
3122 0 : )
3123 0 : .put(
3124 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/offload",
3125 0 : |r| testing_api_handler("attempt timeline offload", r, timeline_offload_handler),
3126 0 : )
3127 0 : .put(
3128 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",
3129 0 : |r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler),
3130 0 : )
3131 0 : .post(
3132 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
3133 0 : |r| api_handler(r, timeline_download_remote_layers_handler_post),
3134 0 : )
3135 0 : .get(
3136 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
3137 0 : |r| api_handler(r, timeline_download_remote_layers_handler_get),
3138 0 : )
3139 0 : .put(
3140 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/detach_ancestor",
3141 0 : |r| api_handler(r, timeline_detach_ancestor_handler),
3142 0 : )
3143 0 : .delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
3144 0 : api_handler(r, timeline_delete_handler)
3145 0 : })
3146 0 : .get(
3147 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer",
3148 0 : |r| api_handler(r, layer_map_info_handler),
3149 0 : )
3150 0 : .get(
3151 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
3152 0 : |r| api_handler(r, layer_download_handler),
3153 0 : )
3154 0 : .delete(
3155 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
3156 0 : |r| api_handler(r, evict_timeline_layer_handler),
3157 0 : )
3158 0 : .post(
3159 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/block_gc",
3160 0 : |r| api_handler(r, timeline_gc_blocking_handler),
3161 0 : )
3162 0 : .post(
3163 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/unblock_gc",
3164 0 : |r| api_handler(r, timeline_gc_unblocking_handler),
3165 0 : )
3166 0 : .post("/v1/tenant/:tenant_shard_id/heatmap_upload", |r| {
3167 0 : api_handler(r, secondary_upload_handler)
3168 0 : })
3169 0 : .get("/v1/tenant/:tenant_id/scan_remote_storage", |r| {
3170 0 : api_handler(r, tenant_scan_remote_handler)
3171 0 : })
3172 0 : .put("/v1/disk_usage_eviction/run", |r| {
3173 0 : api_handler(r, disk_usage_eviction_run)
3174 0 : })
3175 0 : .put("/v1/deletion_queue/flush", |r| {
3176 0 : api_handler(r, deletion_queue_flush)
3177 0 : })
3178 0 : .get("/v1/tenant/:tenant_shard_id/secondary/status", |r| {
3179 0 : api_handler(r, secondary_status_handler)
3180 0 : })
3181 0 : .post("/v1/tenant/:tenant_shard_id/secondary/download", |r| {
3182 0 : api_handler(r, secondary_download_handler)
3183 0 : })
3184 0 : .put("/v1/tenant/:tenant_shard_id/break", |r| {
3185 0 : testing_api_handler("set tenant state to broken", r, handle_tenant_break)
3186 0 : })
3187 0 : .get("/v1/panic", |r| api_handler(r, always_panic_handler))
3188 0 : .post("/v1/tracing/event", |r| {
3189 0 : testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
3190 0 : })
3191 0 : .get(
3192 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage",
3193 0 : |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler),
3194 0 : )
3195 0 : .get(
3196 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
3197 0 : |r| api_handler(r, timeline_collect_keyspace),
3198 0 : )
3199 0 : .put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
3200 0 : .put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
3201 0 : .get("/v1/utilization", |r| api_handler(r, get_utilization))
3202 0 : .post(
3203 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
3204 0 : |r| testing_api_handler("ingest_aux_files", r, ingest_aux_files),
3205 0 : )
3206 0 : .post(
3207 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/list_aux_files",
3208 0 : |r| testing_api_handler("list_aux_files", r, list_aux_files),
3209 0 : )
3210 0 : .post("/v1/top_tenants", |r| api_handler(r, post_top_tenants))
3211 0 : .post(
3212 0 : "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/perf_info",
3213 0 : |r| testing_api_handler("perf_info", r, perf_info),
3214 0 : )
3215 0 : .put(
3216 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/import_basebackup",
3217 0 : |r| api_handler(r, put_tenant_timeline_import_basebackup),
3218 0 : )
3219 0 : .put(
3220 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/import_wal",
3221 0 : |r| api_handler(r, put_tenant_timeline_import_wal),
3222 0 : )
3223 0 : .any(handler_404))
3224 0 : }
|