LCOV - code coverage report
Current view: top level - safekeeper/src/http - routes.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 2.1 % 580 12
Test Date: 2025-07-22 17:50:06 Functions: 1.1 % 90 1

            Line data    Source code
       1              : use std::collections::HashMap;
       2              : use std::fmt;
       3              : use std::io::Write as _;
       4              : use std::str::FromStr;
       5              : use std::sync::Arc;
       6              : 
       7              : use http_utils::endpoint::{
       8              :     self, ChannelWriter, auth_middleware, check_permission_with, profile_cpu_handler,
       9              :     profile_heap_handler, prometheus_metrics_handler, request_span,
      10              : };
      11              : use http_utils::error::ApiError;
      12              : use http_utils::failpoints::failpoints_handler;
      13              : use http_utils::json::{json_request, json_response};
      14              : use http_utils::request::{ensure_no_body, parse_query_param, parse_request_param};
      15              : use http_utils::{RequestExt, RouterBuilder};
      16              : use hyper::{Body, Request, Response, StatusCode};
      17              : use pem::Pem;
      18              : use postgres_ffi::WAL_SEGMENT_SIZE;
      19              : use safekeeper_api::models::{
      20              :     AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
      21              :     TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
      22              :     TimelineStatus, TimelineTermBumpRequest,
      23              : };
      24              : use safekeeper_api::{ServerInfo, membership, models};
      25              : use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
      26              : use tokio::sync::mpsc;
      27              : use tokio::task;
      28              : use tokio_stream::wrappers::ReceiverStream;
      29              : use tokio_util::sync::CancellationToken;
      30              : use tracing::{Instrument, info_span};
      31              : use utils::auth::SwappableJwtAuth;
      32              : use utils::id::{TenantId, TenantTimelineId, TimelineId};
      33              : use utils::lsn::Lsn;
      34              : 
      35              : use crate::debug_dump::TimelineDigestRequest;
      36              : use crate::hadron::{get_filesystem_capacity, get_filesystem_usage};
      37              : use crate::safekeeper::TermLsn;
      38              : use crate::timelines_global_map::DeleteOrExclude;
      39              : use crate::{
      40              :     GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline,
      41              : };
      42              : use serde_json::json;
      43              : 
      44              : /// Healthcheck handler.
      45            0 : async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
      46            0 :     check_permission(&request, None)?;
      47            0 :     let conf = get_conf(&request);
      48            0 :     let status = SafekeeperStatus { id: conf.my_id };
      49            0 :     json_response(StatusCode::OK, status)
      50            0 : }
      51              : 
      52            0 : fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
      53            0 :     request
      54            0 :         .data::<Arc<SafeKeeperConf>>()
      55            0 :         .expect("unknown state type")
      56            0 :         .as_ref()
      57            0 : }
      58              : 
      59            0 : fn get_global_timelines(request: &Request<Body>) -> Arc<GlobalTimelines> {
      60            0 :     request
      61            0 :         .data::<Arc<GlobalTimelines>>()
      62            0 :         .expect("unknown state type")
      63            0 :         .clone()
      64            0 : }
      65              : 
      66            0 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
      67            0 :     check_permission_with(request, |claims| {
      68            0 :         crate::auth::check_permission(claims, tenant_id)
      69            0 :     })
      70            0 : }
      71              : 
      72              : /// Deactivates all timelines for the tenant and removes its data directory.
      73              : /// See `timeline_delete_handler`.
      74            0 : async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
      75            0 :     let tenant_id = parse_request_param(&request, "tenant_id")?;
      76            0 :     let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
      77            0 :     check_permission(&request, Some(tenant_id))?;
      78            0 :     ensure_no_body(&mut request).await?;
      79            0 :     let global_timelines = get_global_timelines(&request);
      80            0 :     let action = if only_local {
      81            0 :         DeleteOrExclude::DeleteLocal
      82              :     } else {
      83            0 :         DeleteOrExclude::Delete
      84              :     };
      85            0 :     let delete_info = global_timelines
      86            0 :         .delete_all_for_tenant(&tenant_id, action)
      87            0 :         .await
      88            0 :         .map_err(ApiError::InternalServerError)?;
      89            0 :     let response_body: TenantDeleteResult = delete_info
      90            0 :         .iter()
      91            0 :         .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
      92            0 :         .collect::<HashMap<String, TimelineDeleteResult>>();
      93            0 :     json_response(StatusCode::OK, response_body)
      94            0 : }
      95              : 
      96            0 : async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
      97            0 :     let request_data: TimelineCreateRequest = json_request(&mut request).await?;
      98              : 
      99            0 :     let ttid = TenantTimelineId {
     100            0 :         tenant_id: request_data.tenant_id,
     101            0 :         timeline_id: request_data.timeline_id,
     102            0 :     };
     103            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     104              : 
     105            0 :     let server_info = ServerInfo {
     106            0 :         pg_version: request_data.pg_version,
     107            0 :         system_id: request_data.system_id.unwrap_or(0),
     108            0 :         wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
     109            0 :     };
     110            0 :     let global_timelines = get_global_timelines(&request);
     111            0 :     global_timelines
     112            0 :         .create(
     113            0 :             ttid,
     114            0 :             request_data.mconf,
     115            0 :             server_info,
     116            0 :             request_data.start_lsn,
     117            0 :             request_data.commit_lsn.unwrap_or(request_data.start_lsn),
     118            0 :         )
     119            0 :         .await
     120            0 :         .map_err(ApiError::InternalServerError)?;
     121              : 
     122            0 :     json_response(StatusCode::OK, ())
     123            0 : }
     124              : 
     125            0 : async fn utilization_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     126            0 :     check_permission(&request, None)?;
     127            0 :     let global_timelines = get_global_timelines(&request);
     128            0 :     let utilization = global_timelines.get_timeline_counts();
     129            0 :     json_response(StatusCode::OK, utilization)
     130            0 : }
     131              : 
     132              : /// Returns filesystem capacity and current utilization for the safekeeper data directory.
     133            0 : async fn filesystem_usage_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     134            0 :     check_permission(&request, None)?;
     135            0 :     let conf = get_conf(&request);
     136            0 :     let path = conf.workdir.as_std_path();
     137            0 :     let capacity = get_filesystem_capacity(path).map_err(ApiError::InternalServerError)?;
     138            0 :     let usage = get_filesystem_usage(path);
     139            0 :     let resp = json!({
     140            0 :         "data_dir": path,
     141            0 :         "capacity_bytes": capacity,
     142            0 :         "usage_bytes": usage,
     143              :     });
     144            0 :     json_response(StatusCode::OK, resp)
     145            0 : }
     146              : 
     147              : /// List all (not deleted) timelines.
     148              : /// Note: it is possible to do the same with debug_dump.
     149            0 : async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     150            0 :     check_permission(&request, None)?;
     151            0 :     let global_timelines = get_global_timelines(&request);
     152            0 :     let res: Vec<TenantTimelineId> = global_timelines
     153            0 :         .get_all()
     154            0 :         .iter()
     155            0 :         .map(|tli| tli.ttid)
     156            0 :         .collect();
     157            0 :     json_response(StatusCode::OK, res)
     158            0 : }
     159              : 
     160              : impl From<TermSwitchApiEntry> for TermLsn {
     161            0 :     fn from(api_val: TermSwitchApiEntry) -> Self {
     162            0 :         TermLsn {
     163            0 :             term: api_val.term,
     164            0 :             lsn: api_val.lsn,
     165            0 :         }
     166            0 :     }
     167              : }
     168              : 
     169              : /// Report info about timeline.
     170            0 : async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     171            0 :     let ttid = TenantTimelineId::new(
     172            0 :         parse_request_param(&request, "tenant_id")?,
     173            0 :         parse_request_param(&request, "timeline_id")?,
     174              :     );
     175            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     176              : 
     177            0 :     let global_timelines = get_global_timelines(&request);
     178            0 :     let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
     179            0 :     let (inmem, state) = tli.get_state().await;
     180            0 :     let flush_lsn = tli.get_flush_lsn().await;
     181              : 
     182            0 :     let last_log_term = state.acceptor_state.get_last_log_term(flush_lsn);
     183            0 :     let term_history = state
     184            0 :         .acceptor_state
     185            0 :         .term_history
     186            0 :         .0
     187            0 :         .into_iter()
     188            0 :         .map(|ts| TermSwitchApiEntry {
     189            0 :             term: ts.term,
     190            0 :             lsn: ts.lsn,
     191            0 :         })
     192            0 :         .collect();
     193            0 :     let acc_state = AcceptorStateStatus {
     194            0 :         term: state.acceptor_state.term,
     195            0 :         epoch: last_log_term,
     196            0 :         term_history,
     197            0 :     };
     198              : 
     199            0 :     let conf = get_conf(&request);
     200              :     // Note: we report in memory values which can be lost.
     201            0 :     let status = TimelineStatus {
     202            0 :         tenant_id: ttid.tenant_id,
     203            0 :         timeline_id: ttid.timeline_id,
     204            0 :         mconf: state.mconf,
     205            0 :         acceptor_state: acc_state,
     206            0 :         pg_info: state.server,
     207            0 :         flush_lsn,
     208            0 :         timeline_start_lsn: state.timeline_start_lsn,
     209            0 :         local_start_lsn: state.local_start_lsn,
     210            0 :         commit_lsn: inmem.commit_lsn,
     211            0 :         backup_lsn: inmem.backup_lsn,
     212            0 :         peer_horizon_lsn: inmem.peer_horizon_lsn,
     213            0 :         remote_consistent_lsn: inmem.remote_consistent_lsn,
     214            0 :         peers: tli.get_peers(conf).await,
     215            0 :         walsenders: tli.get_walsenders().get_all_public(),
     216            0 :         walreceivers: tli.get_walreceivers().get_all(),
     217              :     };
     218            0 :     json_response(StatusCode::OK, status)
     219            0 : }
     220              : 
     221              : /// Deactivates the timeline and removes its data directory.
     222            0 : async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     223            0 :     let ttid = TenantTimelineId::new(
     224            0 :         parse_request_param(&request, "tenant_id")?,
     225            0 :         parse_request_param(&request, "timeline_id")?,
     226              :     );
     227            0 :     let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
     228            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     229            0 :     ensure_no_body(&mut request).await?;
     230            0 :     let global_timelines = get_global_timelines(&request);
     231            0 :     let action = if only_local {
     232            0 :         DeleteOrExclude::DeleteLocal
     233              :     } else {
     234            0 :         DeleteOrExclude::Delete
     235              :     };
     236            0 :     let resp = global_timelines
     237            0 :         .delete_or_exclude(&ttid, action)
     238            0 :         .await
     239            0 :         .map_err(ApiError::from)?;
     240            0 :     json_response(StatusCode::OK, resp)
     241            0 : }
     242              : 
     243              : /// Pull timeline from peer safekeeper instances.
     244            0 : async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     245            0 :     check_permission(&request, None)?;
     246              : 
     247            0 :     let data: PullTimelineRequest = json_request(&mut request).await?;
     248            0 :     let conf = get_conf(&request);
     249            0 :     let global_timelines = get_global_timelines(&request);
     250              : 
     251            0 :     let ca_certs = conf
     252            0 :         .ssl_ca_certs
     253            0 :         .iter()
     254            0 :         .map(Pem::contents)
     255            0 :         .map(reqwest::Certificate::from_der)
     256            0 :         .collect::<Result<Vec<_>, _>>()
     257            0 :         .map_err(|e| {
     258            0 :             ApiError::InternalServerError(anyhow::anyhow!("failed to parse CA certs: {e}"))
     259            0 :         })?;
     260              : 
     261            0 :     let resp = pull_timeline::handle_request(
     262            0 :         data,
     263            0 :         conf.sk_auth_token.clone(),
     264            0 :         ca_certs,
     265            0 :         global_timelines,
     266            0 :         false,
     267            0 :     )
     268            0 :     .await?;
     269            0 :     json_response(StatusCode::OK, resp)
     270            0 : }
     271              : 
     272              : /// Stream tar archive with all timeline data.
     273            0 : async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     274            0 :     let destination = parse_request_param(&request, "destination_id")?;
     275            0 :     let ttid = TenantTimelineId::new(
     276            0 :         parse_request_param(&request, "tenant_id")?,
     277            0 :         parse_request_param(&request, "timeline_id")?,
     278              :     );
     279            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     280              : 
     281            0 :     let global_timelines = get_global_timelines(&request);
     282            0 :     let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
     283            0 :     let storage = global_timelines.get_wal_backup().get_storage();
     284              : 
     285              :     // To stream the body use wrap_stream which wants Stream of Result<Bytes>,
     286              :     // so create the chan and write to it in another task.
     287            0 :     let (tx, rx) = mpsc::channel(1);
     288              : 
     289            0 :     let conf = get_conf(&request);
     290            0 :     task::spawn(pull_timeline::stream_snapshot(
     291            0 :         tli,
     292            0 :         conf.my_id,
     293            0 :         destination,
     294            0 :         tx,
     295            0 :         storage,
     296              :     ));
     297              : 
     298            0 :     let rx_stream = ReceiverStream::new(rx);
     299            0 :     let body = Body::wrap_stream(rx_stream);
     300              : 
     301            0 :     let response = Response::builder()
     302            0 :         .status(200)
     303            0 :         .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
     304            0 :         .body(body)
     305            0 :         .unwrap();
     306              : 
     307            0 :     Ok(response)
     308            0 : }
     309              : 
     310              : /// Error type for delete_or_exclude: either generation conflict or something
     311              : /// internal.
     312              : #[derive(thiserror::Error, Debug)]
     313              : pub enum DeleteOrExcludeError {
     314              :     #[error("refused to switch into excluding mconf {requested}, current: {current}")]
     315              :     Conflict {
     316              :         requested: membership::Configuration,
     317              :         current: membership::Configuration,
     318              :     },
     319              :     #[error(transparent)]
     320              :     Other(#[from] anyhow::Error),
     321              : }
     322              : 
     323              : /// Convert DeleteOrExcludeError to ApiError.
     324              : impl From<DeleteOrExcludeError> for ApiError {
     325            0 :     fn from(de: DeleteOrExcludeError) -> ApiError {
     326            0 :         match de {
     327              :             DeleteOrExcludeError::Conflict {
     328              :                 requested: _,
     329              :                 current: _,
     330            0 :             } => ApiError::Conflict(de.to_string()),
     331            0 :             DeleteOrExcludeError::Other(e) => ApiError::InternalServerError(e),
     332              :         }
     333            0 :     }
     334              : }
     335              : 
     336              : /// Remove timeline locally after this node has been excluded from the
     337              : /// membership configuration. The body is the same as in the membership endpoint
     338              : /// -- conf where node is excluded -- and in principle single ep could be used
     339              : /// for both actions, but since this is a data deletion op let's keep them
     340              : /// separate.
     341            0 : async fn timeline_exclude_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     342            0 :     let ttid = TenantTimelineId::new(
     343            0 :         parse_request_param(&request, "tenant_id")?,
     344            0 :         parse_request_param(&request, "timeline_id")?,
     345              :     );
     346            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     347              : 
     348            0 :     let global_timelines = get_global_timelines(&request);
     349            0 :     let data: models::TimelineMembershipSwitchRequest = json_request(&mut request).await?;
     350            0 :     let my_id = get_conf(&request).my_id;
     351              :     // If request doesn't exclude us, membership switch endpoint should be used
     352              :     // instead.
     353            0 :     if data.mconf.contains(my_id) {
     354            0 :         return Err(ApiError::Forbidden(format!(
     355            0 :             "refused to switch into {}, node {} is member of it",
     356            0 :             data.mconf, my_id
     357            0 :         )));
     358            0 :     }
     359            0 :     let action = DeleteOrExclude::Exclude(data.mconf);
     360              : 
     361            0 :     let resp = global_timelines
     362            0 :         .delete_or_exclude(&ttid, action)
     363            0 :         .await
     364            0 :         .map_err(ApiError::from)?;
     365            0 :     json_response(StatusCode::OK, resp)
     366            0 : }
     367              : 
     368              : /// Consider switching timeline membership configuration to the provided one.
     369            0 : async fn timeline_membership_handler(
     370            0 :     mut request: Request<Body>,
     371            0 : ) -> Result<Response<Body>, ApiError> {
     372            0 :     let ttid = TenantTimelineId::new(
     373            0 :         parse_request_param(&request, "tenant_id")?,
     374            0 :         parse_request_param(&request, "timeline_id")?,
     375              :     );
     376            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     377              : 
     378            0 :     let global_timelines = get_global_timelines(&request);
     379            0 :     let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
     380              : 
     381            0 :     let data: models::TimelineMembershipSwitchRequest = json_request(&mut request).await?;
     382            0 :     let my_id = get_conf(&request).my_id;
     383              :     // If request excludes us, exclude endpoint should be used instead.
     384            0 :     if !data.mconf.contains(my_id) {
     385            0 :         return Err(ApiError::Forbidden(format!(
     386            0 :             "refused to switch into {}, node {} is not a member of it",
     387            0 :             data.mconf, my_id
     388            0 :         )));
     389            0 :     }
     390            0 :     let req_gen = data.mconf.generation;
     391            0 :     let response = tli
     392            0 :         .membership_switch(data.mconf)
     393            0 :         .await
     394            0 :         .map_err(ApiError::InternalServerError)?;
     395              : 
     396              :     // Return 409 if request was ignored.
     397            0 :     if req_gen == response.current_conf.generation {
     398            0 :         json_response(StatusCode::OK, response)
     399              :     } else {
     400            0 :         Err(ApiError::Conflict(format!(
     401            0 :             "request to switch into {} ignored, current generation {}",
     402            0 :             req_gen, response.current_conf.generation
     403            0 :         )))
     404              :     }
     405            0 : }
     406              : 
     407            0 : async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     408            0 :     check_permission(&request, None)?;
     409              : 
     410            0 :     let request_data: TimelineCopyRequest = json_request(&mut request).await?;
     411            0 :     let source_ttid = TenantTimelineId::new(
     412            0 :         parse_request_param(&request, "tenant_id")?,
     413            0 :         parse_request_param(&request, "source_timeline_id")?,
     414              :     );
     415              : 
     416            0 :     let global_timelines = get_global_timelines(&request);
     417            0 :     let wal_backup = global_timelines.get_wal_backup();
     418            0 :     let storage = wal_backup
     419            0 :         .get_storage()
     420            0 :         .ok_or(ApiError::BadRequest(anyhow::anyhow!(
     421            0 :             "Remote Storage is not configured"
     422            0 :         )))?;
     423              : 
     424            0 :     copy_timeline::handle_request(copy_timeline::Request{
     425            0 :         source_ttid,
     426            0 :         until_lsn: request_data.until_lsn,
     427            0 :         destination_ttid: TenantTimelineId::new(source_ttid.tenant_id, request_data.target_timeline_id),
     428            0 :     }, global_timelines, storage)
     429            0 :         .instrument(info_span!("copy_timeline", from=%source_ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
     430            0 :         .await
     431            0 :         .map_err(ApiError::InternalServerError)?;
     432              : 
     433            0 :     json_response(StatusCode::OK, ())
     434            0 : }
     435              : 
     436            0 : async fn patch_control_file_handler(
     437            0 :     mut request: Request<Body>,
     438            0 : ) -> Result<Response<Body>, ApiError> {
     439            0 :     check_permission(&request, None)?;
     440              : 
     441            0 :     let ttid = TenantTimelineId::new(
     442            0 :         parse_request_param(&request, "tenant_id")?,
     443            0 :         parse_request_param(&request, "timeline_id")?,
     444              :     );
     445              : 
     446            0 :     let global_timelines = get_global_timelines(&request);
     447            0 :     let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
     448              : 
     449            0 :     let patch_request: patch_control_file::Request = json_request(&mut request).await?;
     450            0 :     let response = patch_control_file::handle_request(tli, patch_request)
     451            0 :         .await
     452            0 :         .map_err(ApiError::InternalServerError)?;
     453              : 
     454            0 :     json_response(StatusCode::OK, response)
     455            0 : }
     456              : 
     457              : /// Force persist control file.
     458            0 : async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     459            0 :     check_permission(&request, None)?;
     460              : 
     461            0 :     let ttid = TenantTimelineId::new(
     462            0 :         parse_request_param(&request, "tenant_id")?,
     463            0 :         parse_request_param(&request, "timeline_id")?,
     464              :     );
     465              : 
     466            0 :     let global_timelines = get_global_timelines(&request);
     467            0 :     let tli = global_timelines.get(ttid)?;
     468            0 :     tli.write_shared_state()
     469            0 :         .await
     470              :         .sk
     471            0 :         .state_mut()
     472            0 :         .flush()
     473            0 :         .await
     474            0 :         .map_err(ApiError::InternalServerError)?;
     475            0 :     json_response(StatusCode::OK, ())
     476            0 : }
     477              : 
     478            0 : async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     479            0 :     let ttid = TenantTimelineId::new(
     480            0 :         parse_request_param(&request, "tenant_id")?,
     481            0 :         parse_request_param(&request, "timeline_id")?,
     482              :     );
     483            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     484              : 
     485            0 :     let global_timelines = get_global_timelines(&request);
     486            0 :     let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
     487            0 :     let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
     488              : 
     489            0 :     let request = TimelineDigestRequest {
     490            0 :         from_lsn: from_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
     491            0 :             "from_lsn is required"
     492            0 :         )))?,
     493            0 :         until_lsn: until_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
     494            0 :             "until_lsn is required"
     495            0 :         )))?,
     496              :     };
     497              : 
     498            0 :     let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
     499            0 :     let tli = tli
     500            0 :         .wal_residence_guard()
     501            0 :         .await
     502            0 :         .map_err(ApiError::InternalServerError)?;
     503              : 
     504            0 :     let response = debug_dump::calculate_digest(&tli, request)
     505            0 :         .await
     506            0 :         .map_err(ApiError::InternalServerError)?;
     507            0 :     json_response(StatusCode::OK, response)
     508            0 : }
     509              : 
     510              : /// Unevict timeline and remove uploaded partial segment(s) from the remote storage.
     511              : /// Successfull response returns list of segments existed before the deletion.
     512              : /// Aimed for one-off usage not normally needed.
     513            0 : async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     514            0 :     let ttid = TenantTimelineId::new(
     515            0 :         parse_request_param(&request, "tenant_id")?,
     516            0 :         parse_request_param(&request, "timeline_id")?,
     517              :     );
     518            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     519              : 
     520            0 :     let global_timelines = get_global_timelines(&request);
     521            0 :     let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
     522              : 
     523            0 :     let response = tli
     524            0 :         .backup_partial_reset()
     525            0 :         .await
     526            0 :         .map_err(ApiError::InternalServerError)?;
     527            0 :     json_response(StatusCode::OK, response)
     528            0 : }
     529              : 
     530              : /// Make term at least as high as one in request. If one in request is None,
     531              : /// increment current one.
     532            0 : async fn timeline_term_bump_handler(
     533            0 :     mut request: Request<Body>,
     534            0 : ) -> Result<Response<Body>, ApiError> {
     535            0 :     let ttid = TenantTimelineId::new(
     536            0 :         parse_request_param(&request, "tenant_id")?,
     537            0 :         parse_request_param(&request, "timeline_id")?,
     538              :     );
     539            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     540              : 
     541            0 :     let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
     542              : 
     543            0 :     let global_timelines = get_global_timelines(&request);
     544            0 :     let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
     545            0 :     let response = tli
     546            0 :         .term_bump(request_data.term)
     547            0 :         .await
     548            0 :         .map_err(ApiError::InternalServerError)?;
     549              : 
     550            0 :     json_response(StatusCode::OK, response)
     551            0 : }
     552              : 
     553              : /// Used only in tests to hand craft required data.
     554            0 : async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     555            0 :     let ttid = TenantTimelineId::new(
     556            0 :         parse_request_param(&request, "tenant_id")?,
     557            0 :         parse_request_param(&request, "timeline_id")?,
     558              :     );
     559            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     560            0 :     let sk_info: SkTimelineInfo = json_request(&mut request).await?;
     561            0 :     let proto_sk_info = SafekeeperTimelineInfo {
     562              :         safekeeper_id: 0,
     563            0 :         tenant_timeline_id: Some(ProtoTenantTimelineId {
     564            0 :             tenant_id: ttid.tenant_id.as_ref().to_owned(),
     565            0 :             timeline_id: ttid.timeline_id.as_ref().to_owned(),
     566            0 :         }),
     567            0 :         term: sk_info.term.unwrap_or(0),
     568            0 :         last_log_term: sk_info.last_log_term.unwrap_or(0),
     569            0 :         flush_lsn: sk_info.flush_lsn.0,
     570            0 :         commit_lsn: sk_info.commit_lsn.0,
     571            0 :         remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
     572            0 :         peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
     573            0 :         safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
     574            0 :         http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
     575            0 :         https_connstr: sk_info.https_connstr,
     576            0 :         backup_lsn: sk_info.backup_lsn.0,
     577            0 :         local_start_lsn: sk_info.local_start_lsn.0,
     578            0 :         availability_zone: None,
     579            0 :         standby_horizon: sk_info.standby_horizon.0,
     580              :     };
     581              : 
     582            0 :     let global_timelines = get_global_timelines(&request);
     583            0 :     let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
     584            0 :     tli.record_safekeeper_info(proto_sk_info)
     585            0 :         .await
     586            0 :         .map_err(ApiError::InternalServerError)?;
     587              : 
     588            0 :     json_response(StatusCode::OK, ())
     589            0 : }
     590              : 
     591            0 : fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
     592            0 :     v.parse()
     593            0 :         .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
     594            0 : }
     595              : 
     596              : /// Dump debug info about all available safekeeper state.
     597            0 : async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     598            0 :     check_permission(&request, None)?;
     599            0 :     ensure_no_body(&mut request).await?;
     600              : 
     601            0 :     let mut dump_all: Option<bool> = None;
     602            0 :     let mut dump_control_file: Option<bool> = None;
     603            0 :     let mut dump_memory: Option<bool> = None;
     604            0 :     let mut dump_disk_content: Option<bool> = None;
     605            0 :     let mut dump_term_history: Option<bool> = None;
     606            0 :     let mut dump_wal_last_modified: Option<bool> = None;
     607            0 :     let mut tenant_id: Option<TenantId> = None;
     608            0 :     let mut timeline_id: Option<TimelineId> = None;
     609              : 
     610            0 :     let query = request.uri().query().unwrap_or("");
     611            0 :     let mut values = url::form_urlencoded::parse(query.as_bytes());
     612              : 
     613            0 :     for (k, v) in &mut values {
     614            0 :         match k.as_ref() {
     615            0 :             "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
     616            0 :             "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
     617            0 :             "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
     618            0 :             "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
     619            0 :             "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
     620            0 :             "dump_wal_last_modified" => dump_wal_last_modified = Some(parse_kv_str(&k, &v)?),
     621            0 :             "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
     622            0 :             "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
     623            0 :             _ => Err(ApiError::BadRequest(anyhow::anyhow!(
     624            0 :                 "Unknown query parameter: {}",
     625            0 :                 k
     626            0 :             )))?,
     627              :         }
     628              :     }
     629              : 
     630            0 :     let dump_all = dump_all.unwrap_or(false);
     631            0 :     let dump_control_file = dump_control_file.unwrap_or(dump_all);
     632            0 :     let dump_memory = dump_memory.unwrap_or(dump_all);
     633            0 :     let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
     634            0 :     let dump_term_history = dump_term_history.unwrap_or(true);
     635            0 :     let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
     636              : 
     637            0 :     let global_timelines = get_global_timelines(&request);
     638              : 
     639            0 :     let args = debug_dump::Args {
     640            0 :         dump_all,
     641            0 :         dump_control_file,
     642            0 :         dump_memory,
     643            0 :         dump_disk_content,
     644            0 :         dump_term_history,
     645            0 :         dump_wal_last_modified,
     646            0 :         tenant_id,
     647            0 :         timeline_id,
     648            0 :     };
     649              : 
     650            0 :     let resp = debug_dump::build(args, global_timelines)
     651            0 :         .await
     652            0 :         .map_err(ApiError::InternalServerError)?;
     653              : 
     654            0 :     let started_at = std::time::Instant::now();
     655              : 
     656            0 :     let (tx, rx) = mpsc::channel(1);
     657              : 
     658            0 :     let body = Body::wrap_stream(ReceiverStream::new(rx));
     659              : 
     660            0 :     let mut writer = ChannelWriter::new(128 * 1024, tx);
     661              : 
     662            0 :     let response = Response::builder()
     663            0 :         .status(200)
     664            0 :         .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
     665            0 :         .body(body)
     666            0 :         .unwrap();
     667              : 
     668            0 :     let span = info_span!("blocking");
     669            0 :     tokio::task::spawn_blocking(move || {
     670            0 :         let _span = span.entered();
     671              : 
     672            0 :         let res = serde_json::to_writer(&mut writer, &resp)
     673            0 :             .map_err(std::io::Error::from)
     674            0 :             .and_then(|_| writer.flush());
     675              : 
     676            0 :         match res {
     677              :             Ok(()) => {
     678            0 :                 tracing::info!(
     679            0 :                     bytes = writer.flushed_bytes(),
     680            0 :                     elapsed_ms = started_at.elapsed().as_millis(),
     681            0 :                     "responded /v1/debug_dump"
     682              :                 );
     683              :             }
     684            0 :             Err(e) => {
     685            0 :                 tracing::warn!("failed to write out /v1/debug_dump response: {e:#}");
     686              :                 // semantics of this error are quite... unclear. we want to error the stream out to
     687              :                 // abort the response to somehow notify the client that we failed.
     688              :                 //
     689              :                 // though, most likely the reason for failure is that the receiver is already gone.
     690            0 :                 drop(
     691            0 :                     writer
     692            0 :                         .tx
     693            0 :                         .blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
     694              :                 );
     695              :             }
     696              :         }
     697            0 :     });
     698              : 
     699            0 :     Ok(response)
     700            0 : }
     701              : 
     702              : /// Safekeeper http router.
     703            0 : pub fn make_router(
     704            0 :     conf: Arc<SafeKeeperConf>,
     705            0 :     global_timelines: Arc<GlobalTimelines>,
     706            0 : ) -> RouterBuilder<hyper::Body, ApiError> {
     707            0 :     let mut router = endpoint::make_router();
     708            0 :     if conf.http_auth.is_some() {
     709            0 :         router = router.middleware(auth_middleware(|request| {
     710              :             const ALLOWLIST_ROUTES: &[&str] =
     711              :                 &["/v1/status", "/metrics", "/profile/cpu", "/profile/heap"];
     712            0 :             if ALLOWLIST_ROUTES.contains(&request.uri().path()) {
     713            0 :                 None
     714              :             } else {
     715              :                 // Option<Arc<SwappableJwtAuth>> is always provided as data below, hence unwrap().
     716            0 :                 request
     717            0 :                     .data::<Option<Arc<SwappableJwtAuth>>>()
     718            0 :                     .unwrap()
     719            0 :                     .as_deref()
     720              :             }
     721            0 :         }))
     722            0 :     }
     723              : 
     724            0 :     let force_metric_collection_on_scrape = conf.force_metric_collection_on_scrape;
     725              : 
     726            0 :     let prometheus_metrics_handler_wrapper =
     727            0 :         move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape);
     728              : 
     729              :     // NB: on any changes do not forget to update the OpenAPI spec
     730              :     // located nearby (/safekeeper/src/http/openapi_spec.yaml).
     731            0 :     let auth = conf.http_auth.clone();
     732            0 :     router
     733            0 :         .data(conf)
     734            0 :         .data(global_timelines)
     735            0 :         .data(auth)
     736            0 :         .get("/metrics", move |r| {
     737            0 :             request_span(r, prometheus_metrics_handler_wrapper)
     738            0 :         })
     739            0 :         .get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
     740            0 :         .get("/profile/heap", |r| request_span(r, profile_heap_handler))
     741            0 :         .get("/v1/status", |r| request_span(r, status_handler))
     742            0 :         .put("/v1/failpoints", |r| {
     743            0 :             request_span(r, move |r| async {
     744            0 :                 check_permission(&r, None)?;
     745            0 :                 let cancel = CancellationToken::new();
     746            0 :                 failpoints_handler(r, cancel).await
     747            0 :             })
     748            0 :         })
     749            0 :         .get("/v1/utilization", |r| request_span(r, utilization_handler))
     750              :         /* BEGIN_HADRON */
     751            0 :         .get("/v1/debug/filesystem_usage", |r| {
     752            0 :             request_span(r, filesystem_usage_handler)
     753            0 :         })
     754              :         /* END_HADRON */
     755            0 :         .delete("/v1/tenant/:tenant_id", |r| {
     756            0 :             request_span(r, tenant_delete_handler)
     757            0 :         })
     758              :         // Will be used in the future instead of implicit timeline creation
     759            0 :         .post("/v1/tenant/timeline", |r| {
     760            0 :             request_span(r, timeline_create_handler)
     761            0 :         })
     762            0 :         .get("/v1/tenant/timeline", |r| {
     763            0 :             request_span(r, timeline_list_handler)
     764            0 :         })
     765            0 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     766            0 :             request_span(r, timeline_status_handler)
     767            0 :         })
     768            0 :         .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     769            0 :             request_span(r, timeline_delete_handler)
     770            0 :         })
     771            0 :         .post("/v1/pull_timeline", |r| {
     772            0 :             request_span(r, timeline_pull_handler)
     773            0 :         })
     774            0 :         .put("/v1/tenant/:tenant_id/timeline/:timeline_id/exclude", |r| {
     775            0 :             request_span(r, timeline_exclude_handler)
     776            0 :         })
     777            0 :         .get(
     778              :             "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
     779            0 :             |r| request_span(r, timeline_snapshot_handler),
     780              :         )
     781            0 :         .put(
     782              :             "/v1/tenant/:tenant_id/timeline/:timeline_id/membership",
     783            0 :             |r| request_span(r, timeline_membership_handler),
     784              :         )
     785            0 :         .post(
     786              :             "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
     787            0 :             |r| request_span(r, timeline_copy_handler),
     788              :         )
     789            0 :         .patch(
     790              :             "/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
     791            0 :             |r| request_span(r, patch_control_file_handler),
     792              :         )
     793            0 :         .post(
     794              :             "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
     795            0 :             |r| request_span(r, timeline_checkpoint_handler),
     796              :         )
     797            0 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
     798            0 :             request_span(r, timeline_digest_handler)
     799            0 :         })
     800            0 :         .post(
     801              :             "/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
     802            0 :             |r| request_span(r, timeline_backup_partial_reset),
     803              :         )
     804            0 :         .post(
     805              :             "/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump",
     806            0 :             |r| request_span(r, timeline_term_bump_handler),
     807              :         )
     808            0 :         .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
     809            0 :             request_span(r, record_safekeeper_info)
     810            0 :         })
     811            0 :         .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
     812            0 : }
     813              : 
     814              : #[cfg(test)]
     815              : mod tests {
     816              :     use super::*;
     817              : 
     818              :     #[test]
     819            1 :     fn test_term_switch_entry_api_serialize() {
     820            1 :         let state = AcceptorStateStatus {
     821            1 :             term: 1,
     822            1 :             epoch: 1,
     823            1 :             term_history: vec![TermSwitchApiEntry {
     824            1 :                 term: 1,
     825            1 :                 lsn: Lsn(0x16FFDDDD),
     826            1 :             }],
     827            1 :         };
     828            1 :         let json = serde_json::to_string(&state).unwrap();
     829            1 :         assert_eq!(
     830              :             json,
     831              :             "{\"term\":1,\"epoch\":1,\"term_history\":[{\"term\":1,\"lsn\":\"0/16FFDDDD\"}]}"
     832              :         );
     833            1 :     }
     834              : }
        

Generated by: LCOV version 2.1-beta