LCOV - code coverage report
Current view: top level - safekeeper/src/http - routes.rs (source / functions) Coverage Total Hit
Test: f8d8f5b90fa487a9e82c42da223f012f5d4fece7.info Lines: 3.1 % 482 15
Test Date: 2024-09-19 20:36:02 Functions: 1.0 % 99 1

            Line data    Source code
       1              : use hyper::{Body, Request, Response, StatusCode, Uri};
       2              : use once_cell::sync::Lazy;
       3              : use serde::{Deserialize, Serialize};
       4              : use std::collections::{HashMap, HashSet};
       5              : use std::fmt;
       6              : use std::io::Write as _;
       7              : use std::str::FromStr;
       8              : use std::sync::Arc;
       9              : use storage_broker::proto::SafekeeperTimelineInfo;
      10              : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      11              : use tokio::sync::mpsc;
      12              : use tokio::task;
      13              : use tokio_stream::wrappers::ReceiverStream;
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::{info_span, Instrument};
      16              : use utils::failpoint_support::failpoints_handler;
      17              : use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
      18              : use utils::http::request::parse_query_param;
      19              : 
      20              : use postgres_ffi::WAL_SEGMENT_SIZE;
      21              : use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
      22              : use safekeeper_api::models::{TimelineCreateRequest, TimelineTermBumpRequest};
      23              : use utils::{
      24              :     auth::SwappableJwtAuth,
      25              :     http::{
      26              :         endpoint::{self, auth_middleware, check_permission_with},
      27              :         error::ApiError,
      28              :         json::{json_request, json_response},
      29              :         request::{ensure_no_body, parse_request_param},
      30              :         RequestExt, RouterBuilder,
      31              :     },
      32              :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      33              :     lsn::Lsn,
      34              : };
      35              : 
      36              : use crate::debug_dump::TimelineDigestRequest;
      37              : use crate::receive_wal::WalReceiverState;
      38              : use crate::safekeeper::Term;
      39              : use crate::safekeeper::{ServerInfo, TermLsn};
      40              : use crate::send_wal::WalSenderState;
      41              : use crate::timeline::PeerInfo;
      42              : use crate::timelines_global_map::TimelineDeleteForceResult;
      43              : use crate::GlobalTimelines;
      44              : use crate::SafeKeeperConf;
      45              : use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
      46              : 
      47              : #[derive(Debug, Serialize)]
      48              : struct SafekeeperStatus {
      49              :     id: NodeId,
      50              : }
      51              : 
      52              : /// Healthcheck handler.
      53            0 : async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
      54            0 :     check_permission(&request, None)?;
      55            0 :     let conf = get_conf(&request);
      56            0 :     let status = SafekeeperStatus { id: conf.my_id };
      57            0 :     json_response(StatusCode::OK, status)
      58            0 : }
      59              : 
      60            0 : fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
      61            0 :     request
      62            0 :         .data::<Arc<SafeKeeperConf>>()
      63            0 :         .expect("unknown state type")
      64            0 :         .as_ref()
      65            0 : }
      66              : 
      67              : /// Same as TermLsn, but serializes LSN using display serializer
      68              : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
      69            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      70              : pub struct TermSwitchApiEntry {
      71              :     pub term: Term,
      72              :     pub lsn: Lsn,
      73              : }
      74              : 
      75              : impl From<TermSwitchApiEntry> for TermLsn {
      76            0 :     fn from(api_val: TermSwitchApiEntry) -> Self {
      77            0 :         TermLsn {
      78            0 :             term: api_val.term,
      79            0 :             lsn: api_val.lsn,
      80            0 :         }
      81            0 :     }
      82              : }
      83              : 
      84              : /// Augment AcceptorState with last_log_term for convenience
      85            0 : #[derive(Debug, Serialize, Deserialize)]
      86              : pub struct AcceptorStateStatus {
      87              :     pub term: Term,
      88              :     pub epoch: Term, // aka last_log_term
      89              :     pub term_history: Vec<TermSwitchApiEntry>,
      90              : }
      91              : 
      92              : /// Info about timeline on safekeeper ready for reporting.
      93            0 : #[derive(Debug, Serialize, Deserialize)]
      94              : pub struct TimelineStatus {
      95              :     pub tenant_id: TenantId,
      96              :     pub timeline_id: TimelineId,
      97              :     pub acceptor_state: AcceptorStateStatus,
      98              :     pub pg_info: ServerInfo,
      99              :     pub flush_lsn: Lsn,
     100              :     pub timeline_start_lsn: Lsn,
     101              :     pub local_start_lsn: Lsn,
     102              :     pub commit_lsn: Lsn,
     103              :     pub backup_lsn: Lsn,
     104              :     pub peer_horizon_lsn: Lsn,
     105              :     pub remote_consistent_lsn: Lsn,
     106              :     pub peers: Vec<PeerInfo>,
     107              :     pub walsenders: Vec<WalSenderState>,
     108              :     pub walreceivers: Vec<WalReceiverState>,
     109              : }
     110              : 
     111            0 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
     112            0 :     check_permission_with(request, |claims| {
     113            0 :         crate::auth::check_permission(claims, tenant_id)
     114            0 :     })
     115            0 : }
     116              : 
     117              : /// Deactivates all timelines for the tenant and removes its data directory.
     118              : /// See `timeline_delete_handler`.
     119            0 : async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     120            0 :     let tenant_id = parse_request_param(&request, "tenant_id")?;
     121            0 :     let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
     122            0 :     check_permission(&request, Some(tenant_id))?;
     123            0 :     ensure_no_body(&mut request).await?;
     124              :     // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
     125              :     // Using an `InternalServerError` should be fixed when the types support it
     126            0 :     let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local)
     127            0 :         .await
     128            0 :         .map_err(ApiError::InternalServerError)?;
     129            0 :     json_response(
     130            0 :         StatusCode::OK,
     131            0 :         delete_info
     132            0 :             .iter()
     133            0 :             .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
     134            0 :             .collect::<HashMap<String, TimelineDeleteForceResult>>(),
     135            0 :     )
     136            0 : }
     137              : 
     138            0 : async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     139            0 :     let request_data: TimelineCreateRequest = json_request(&mut request).await?;
     140              : 
     141            0 :     let ttid = TenantTimelineId {
     142            0 :         tenant_id: request_data.tenant_id,
     143            0 :         timeline_id: request_data.timeline_id,
     144            0 :     };
     145            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     146              : 
     147            0 :     let server_info = ServerInfo {
     148            0 :         pg_version: request_data.pg_version,
     149            0 :         system_id: request_data.system_id.unwrap_or(0),
     150            0 :         wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
     151            0 :     };
     152            0 :     let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
     153            0 :         request_data
     154            0 :             .commit_lsn
     155            0 :             .segment_lsn(server_info.wal_seg_size as usize)
     156            0 :     });
     157            0 :     GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
     158            0 :         .await
     159            0 :         .map_err(ApiError::InternalServerError)?;
     160              : 
     161            0 :     json_response(StatusCode::OK, ())
     162            0 : }
     163              : 
     164              : /// List all (not deleted) timelines.
     165              : /// Note: it is possible to do the same with debug_dump.
     166            0 : async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     167            0 :     check_permission(&request, None)?;
     168            0 :     let res: Vec<TenantTimelineId> = GlobalTimelines::get_all()
     169            0 :         .iter()
     170            0 :         .map(|tli| tli.ttid)
     171            0 :         .collect();
     172            0 :     json_response(StatusCode::OK, res)
     173            0 : }
     174              : 
     175              : /// Report info about timeline.
     176            0 : async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     177            0 :     let ttid = TenantTimelineId::new(
     178            0 :         parse_request_param(&request, "tenant_id")?,
     179            0 :         parse_request_param(&request, "timeline_id")?,
     180              :     );
     181            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     182              : 
     183            0 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     184            0 :     let (inmem, state) = tli.get_state().await;
     185            0 :     let flush_lsn = tli.get_flush_lsn().await;
     186              : 
     187            0 :     let last_log_term = state.acceptor_state.get_last_log_term(flush_lsn);
     188            0 :     let term_history = state
     189            0 :         .acceptor_state
     190            0 :         .term_history
     191            0 :         .0
     192            0 :         .into_iter()
     193            0 :         .map(|ts| TermSwitchApiEntry {
     194            0 :             term: ts.term,
     195            0 :             lsn: ts.lsn,
     196            0 :         })
     197            0 :         .collect();
     198            0 :     let acc_state = AcceptorStateStatus {
     199            0 :         term: state.acceptor_state.term,
     200            0 :         epoch: last_log_term,
     201            0 :         term_history,
     202            0 :     };
     203            0 : 
     204            0 :     let conf = get_conf(&request);
     205              :     // Note: we report in memory values which can be lost.
     206            0 :     let status = TimelineStatus {
     207            0 :         tenant_id: ttid.tenant_id,
     208            0 :         timeline_id: ttid.timeline_id,
     209            0 :         acceptor_state: acc_state,
     210            0 :         pg_info: state.server,
     211            0 :         flush_lsn,
     212            0 :         timeline_start_lsn: state.timeline_start_lsn,
     213            0 :         local_start_lsn: state.local_start_lsn,
     214            0 :         commit_lsn: inmem.commit_lsn,
     215            0 :         backup_lsn: inmem.backup_lsn,
     216            0 :         peer_horizon_lsn: inmem.peer_horizon_lsn,
     217            0 :         remote_consistent_lsn: inmem.remote_consistent_lsn,
     218            0 :         peers: tli.get_peers(conf).await,
     219            0 :         walsenders: tli.get_walsenders().get_all(),
     220            0 :         walreceivers: tli.get_walreceivers().get_all(),
     221            0 :     };
     222            0 :     json_response(StatusCode::OK, status)
     223            0 : }
     224              : 
     225              : /// Deactivates the timeline and removes its data directory.
     226            0 : async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     227            0 :     let ttid = TenantTimelineId::new(
     228            0 :         parse_request_param(&request, "tenant_id")?,
     229            0 :         parse_request_param(&request, "timeline_id")?,
     230              :     );
     231            0 :     let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
     232            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     233            0 :     ensure_no_body(&mut request).await?;
     234              :     // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
     235              :     // error handling here when we're able to.
     236            0 :     let resp = GlobalTimelines::delete(&ttid, only_local)
     237            0 :         .await
     238            0 :         .map_err(ApiError::InternalServerError)?;
     239            0 :     json_response(StatusCode::OK, resp)
     240            0 : }
     241              : 
     242              : /// Pull timeline from peer safekeeper instances.
     243            0 : async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     244            0 :     check_permission(&request, None)?;
     245              : 
     246            0 :     let data: pull_timeline::Request = json_request(&mut request).await?;
     247            0 :     let conf = get_conf(&request);
     248              : 
     249            0 :     let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone())
     250            0 :         .await
     251            0 :         .map_err(ApiError::InternalServerError)?;
     252            0 :     json_response(StatusCode::OK, resp)
     253            0 : }
     254              : 
     255              : /// Stream tar archive with all timeline data.
     256            0 : async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     257            0 :     let destination = parse_request_param(&request, "destination_id")?;
     258            0 :     let ttid = TenantTimelineId::new(
     259            0 :         parse_request_param(&request, "tenant_id")?,
     260            0 :         parse_request_param(&request, "timeline_id")?,
     261              :     );
     262            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     263              : 
     264            0 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     265              :     // Note: with evicted timelines it should work better then de-evict them and
     266              :     // stream; probably start_snapshot would copy partial s3 file to dest path
     267              :     // and stream control file, or return WalResidentTimeline if timeline is not
     268              :     // evicted.
     269            0 :     let tli = tli
     270            0 :         .wal_residence_guard()
     271            0 :         .await
     272            0 :         .map_err(ApiError::InternalServerError)?;
     273              : 
     274              :     // To stream the body use wrap_stream which wants Stream of Result<Bytes>,
     275              :     // so create the chan and write to it in another task.
     276            0 :     let (tx, rx) = mpsc::channel(1);
     277            0 : 
     278            0 :     let conf = get_conf(&request);
     279            0 :     task::spawn(pull_timeline::stream_snapshot(
     280            0 :         tli,
     281            0 :         conf.my_id,
     282            0 :         destination,
     283            0 :         tx,
     284            0 :     ));
     285            0 : 
     286            0 :     let rx_stream = ReceiverStream::new(rx);
     287            0 :     let body = Body::wrap_stream(rx_stream);
     288            0 : 
     289            0 :     let response = Response::builder()
     290            0 :         .status(200)
     291            0 :         .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
     292            0 :         .body(body)
     293            0 :         .unwrap();
     294            0 : 
     295            0 :     Ok(response)
     296            0 : }
     297              : 
     298            0 : async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     299            0 :     check_permission(&request, None)?;
     300              : 
     301            0 :     let request_data: TimelineCopyRequest = json_request(&mut request).await?;
     302            0 :     let ttid = TenantTimelineId::new(
     303            0 :         parse_request_param(&request, "tenant_id")?,
     304            0 :         parse_request_param(&request, "source_timeline_id")?,
     305              :     );
     306              : 
     307            0 :     let source = GlobalTimelines::get(ttid)?;
     308              : 
     309            0 :     copy_timeline::handle_request(copy_timeline::Request{
     310            0 :         source,
     311            0 :         until_lsn: request_data.until_lsn,
     312            0 :         destination_ttid: TenantTimelineId::new(ttid.tenant_id, request_data.target_timeline_id),
     313            0 :     })
     314            0 :         .instrument(info_span!("copy_timeline", from=%ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
     315            0 :         .await
     316            0 :         .map_err(ApiError::InternalServerError)?;
     317              : 
     318            0 :     json_response(StatusCode::OK, ())
     319            0 : }
     320              : 
     321            0 : async fn patch_control_file_handler(
     322            0 :     mut request: Request<Body>,
     323            0 : ) -> Result<Response<Body>, ApiError> {
     324            0 :     check_permission(&request, None)?;
     325              : 
     326            0 :     let ttid = TenantTimelineId::new(
     327            0 :         parse_request_param(&request, "tenant_id")?,
     328            0 :         parse_request_param(&request, "timeline_id")?,
     329              :     );
     330              : 
     331            0 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     332              : 
     333            0 :     let patch_request: patch_control_file::Request = json_request(&mut request).await?;
     334            0 :     let response = patch_control_file::handle_request(tli, patch_request)
     335            0 :         .await
     336            0 :         .map_err(ApiError::InternalServerError)?;
     337              : 
     338            0 :     json_response(StatusCode::OK, response)
     339            0 : }
     340              : 
     341              : /// Force persist control file.
     342            0 : async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     343            0 :     check_permission(&request, None)?;
     344              : 
     345            0 :     let ttid = TenantTimelineId::new(
     346            0 :         parse_request_param(&request, "tenant_id")?,
     347            0 :         parse_request_param(&request, "timeline_id")?,
     348              :     );
     349              : 
     350            0 :     let tli = GlobalTimelines::get(ttid)?;
     351            0 :     tli.write_shared_state()
     352            0 :         .await
     353              :         .sk
     354            0 :         .state_mut()
     355            0 :         .flush()
     356            0 :         .await
     357            0 :         .map_err(ApiError::InternalServerError)?;
     358            0 :     json_response(StatusCode::OK, ())
     359            0 : }
     360              : 
     361            0 : async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     362            0 :     let ttid = TenantTimelineId::new(
     363            0 :         parse_request_param(&request, "tenant_id")?,
     364            0 :         parse_request_param(&request, "timeline_id")?,
     365              :     );
     366            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     367              : 
     368            0 :     let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
     369            0 :     let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
     370              : 
     371            0 :     let request = TimelineDigestRequest {
     372            0 :         from_lsn: from_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
     373            0 :             "from_lsn is required"
     374            0 :         )))?,
     375            0 :         until_lsn: until_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
     376            0 :             "until_lsn is required"
     377            0 :         )))?,
     378              :     };
     379              : 
     380            0 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     381            0 :     let tli = tli
     382            0 :         .wal_residence_guard()
     383            0 :         .await
     384            0 :         .map_err(ApiError::InternalServerError)?;
     385              : 
     386            0 :     let response = debug_dump::calculate_digest(&tli, request)
     387            0 :         .await
     388            0 :         .map_err(ApiError::InternalServerError)?;
     389            0 :     json_response(StatusCode::OK, response)
     390            0 : }
     391              : 
     392              : /// Unevict timeline and remove uploaded partial segment(s) from the remote storage.
     393              : /// Successfull response returns list of segments existed before the deletion.
     394              : /// Aimed for one-off usage not normally needed.
     395            0 : async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     396            0 :     let ttid = TenantTimelineId::new(
     397            0 :         parse_request_param(&request, "tenant_id")?,
     398            0 :         parse_request_param(&request, "timeline_id")?,
     399              :     );
     400            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     401              : 
     402            0 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     403              : 
     404            0 :     let response = tli
     405            0 :         .backup_partial_reset()
     406            0 :         .await
     407            0 :         .map_err(ApiError::InternalServerError)?;
     408            0 :     json_response(StatusCode::OK, response)
     409            0 : }
     410              : 
     411              : /// Make term at least as high as one in request. If one in request is None,
     412              : /// increment current one.
     413            0 : async fn timeline_term_bump_handler(
     414            0 :     mut request: Request<Body>,
     415            0 : ) -> Result<Response<Body>, ApiError> {
     416            0 :     let ttid = TenantTimelineId::new(
     417            0 :         parse_request_param(&request, "tenant_id")?,
     418            0 :         parse_request_param(&request, "timeline_id")?,
     419              :     );
     420            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     421              : 
     422            0 :     let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
     423              : 
     424            0 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     425            0 :     let response = tli
     426            0 :         .term_bump(request_data.term)
     427            0 :         .await
     428            0 :         .map_err(ApiError::InternalServerError)?;
     429              : 
     430            0 :     json_response(StatusCode::OK, response)
     431            0 : }
     432              : 
     433              : /// Used only in tests to hand craft required data.
     434            0 : async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     435            0 :     let ttid = TenantTimelineId::new(
     436            0 :         parse_request_param(&request, "tenant_id")?,
     437            0 :         parse_request_param(&request, "timeline_id")?,
     438              :     );
     439            0 :     check_permission(&request, Some(ttid.tenant_id))?;
     440            0 :     let sk_info: SkTimelineInfo = json_request(&mut request).await?;
     441            0 :     let proto_sk_info = SafekeeperTimelineInfo {
     442            0 :         safekeeper_id: 0,
     443            0 :         tenant_timeline_id: Some(ProtoTenantTimelineId {
     444            0 :             tenant_id: ttid.tenant_id.as_ref().to_owned(),
     445            0 :             timeline_id: ttid.timeline_id.as_ref().to_owned(),
     446            0 :         }),
     447            0 :         term: sk_info.term.unwrap_or(0),
     448            0 :         last_log_term: sk_info.last_log_term.unwrap_or(0),
     449            0 :         flush_lsn: sk_info.flush_lsn.0,
     450            0 :         commit_lsn: sk_info.commit_lsn.0,
     451            0 :         remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
     452            0 :         peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
     453            0 :         safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
     454            0 :         http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
     455            0 :         backup_lsn: sk_info.backup_lsn.0,
     456            0 :         local_start_lsn: sk_info.local_start_lsn.0,
     457            0 :         availability_zone: None,
     458            0 :         standby_horizon: sk_info.standby_horizon.0,
     459            0 :     };
     460              : 
     461            0 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     462            0 :     tli.record_safekeeper_info(proto_sk_info)
     463            0 :         .await
     464            0 :         .map_err(ApiError::InternalServerError)?;
     465              : 
     466            0 :     json_response(StatusCode::OK, ())
     467            0 : }
     468              : 
     469            0 : fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
     470            0 :     v.parse()
     471            0 :         .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
     472            0 : }
     473              : 
     474              : /// Dump debug info about all available safekeeper state.
     475            0 : async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     476            0 :     check_permission(&request, None)?;
     477            0 :     ensure_no_body(&mut request).await?;
     478              : 
     479            0 :     let mut dump_all: Option<bool> = None;
     480            0 :     let mut dump_control_file: Option<bool> = None;
     481            0 :     let mut dump_memory: Option<bool> = None;
     482            0 :     let mut dump_disk_content: Option<bool> = None;
     483            0 :     let mut dump_term_history: Option<bool> = None;
     484            0 :     let mut dump_wal_last_modified: Option<bool> = None;
     485            0 :     let mut tenant_id: Option<TenantId> = None;
     486            0 :     let mut timeline_id: Option<TimelineId> = None;
     487            0 : 
     488            0 :     let query = request.uri().query().unwrap_or("");
     489            0 :     let mut values = url::form_urlencoded::parse(query.as_bytes());
     490              : 
     491            0 :     for (k, v) in &mut values {
     492            0 :         match k.as_ref() {
     493            0 :             "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
     494            0 :             "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
     495            0 :             "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
     496            0 :             "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
     497            0 :             "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
     498            0 :             "dump_wal_last_modified" => dump_wal_last_modified = Some(parse_kv_str(&k, &v)?),
     499            0 :             "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
     500            0 :             "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
     501            0 :             _ => Err(ApiError::BadRequest(anyhow::anyhow!(
     502            0 :                 "Unknown query parameter: {}",
     503            0 :                 k
     504            0 :             )))?,
     505              :         }
     506              :     }
     507              : 
     508            0 :     let dump_all = dump_all.unwrap_or(false);
     509            0 :     let dump_control_file = dump_control_file.unwrap_or(dump_all);
     510            0 :     let dump_memory = dump_memory.unwrap_or(dump_all);
     511            0 :     let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
     512            0 :     let dump_term_history = dump_term_history.unwrap_or(true);
     513            0 :     let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
     514            0 : 
     515            0 :     let args = debug_dump::Args {
     516            0 :         dump_all,
     517            0 :         dump_control_file,
     518            0 :         dump_memory,
     519            0 :         dump_disk_content,
     520            0 :         dump_term_history,
     521            0 :         dump_wal_last_modified,
     522            0 :         tenant_id,
     523            0 :         timeline_id,
     524            0 :     };
     525              : 
     526            0 :     let resp = debug_dump::build(args)
     527            0 :         .await
     528            0 :         .map_err(ApiError::InternalServerError)?;
     529              : 
     530            0 :     let started_at = std::time::Instant::now();
     531            0 : 
     532            0 :     let (tx, rx) = mpsc::channel(1);
     533            0 : 
     534            0 :     let body = Body::wrap_stream(ReceiverStream::new(rx));
     535            0 : 
     536            0 :     let mut writer = ChannelWriter::new(128 * 1024, tx);
     537            0 : 
     538            0 :     let response = Response::builder()
     539            0 :         .status(200)
     540            0 :         .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
     541            0 :         .body(body)
     542            0 :         .unwrap();
     543              : 
     544            0 :     let span = info_span!("blocking");
     545            0 :     tokio::task::spawn_blocking(move || {
     546            0 :         let _span = span.entered();
     547            0 : 
     548            0 :         let res = serde_json::to_writer(&mut writer, &resp)
     549            0 :             .map_err(std::io::Error::from)
     550            0 :             .and_then(|_| writer.flush());
     551            0 : 
     552            0 :         match res {
     553              :             Ok(()) => {
     554            0 :                 tracing::info!(
     555            0 :                     bytes = writer.flushed_bytes(),
     556            0 :                     elapsed_ms = started_at.elapsed().as_millis(),
     557            0 :                     "responded /v1/debug_dump"
     558              :                 );
     559              :             }
     560            0 :             Err(e) => {
     561            0 :                 tracing::warn!("failed to write out /v1/debug_dump response: {e:#}");
     562              :                 // semantics of this error are quite... unclear. we want to error the stream out to
     563              :                 // abort the response to somehow notify the client that we failed.
     564              :                 //
     565              :                 // though, most likely the reason for failure is that the receiver is already gone.
     566            0 :                 drop(
     567            0 :                     writer
     568            0 :                         .tx
     569            0 :                         .blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
     570            0 :                 );
     571              :             }
     572              :         }
     573            0 :     });
     574            0 : 
     575            0 :     Ok(response)
     576            0 : }
     577              : 
     578              : /// Safekeeper http router.
     579            0 : pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
     580            0 :     let mut router = endpoint::make_router();
     581            0 :     if conf.http_auth.is_some() {
     582            0 :         router = router.middleware(auth_middleware(|request| {
     583              :             #[allow(clippy::mutable_key_type)]
     584            0 :             static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
     585            0 :                 ["/v1/status", "/metrics"]
     586            0 :                     .iter()
     587            0 :                     .map(|v| v.parse().unwrap())
     588            0 :                     .collect()
     589            0 :             });
     590            0 :             if ALLOWLIST_ROUTES.contains(request.uri()) {
     591            0 :                 None
     592              :             } else {
     593              :                 // Option<Arc<SwappableJwtAuth>> is always provided as data below, hence unwrap().
     594            0 :                 request
     595            0 :                     .data::<Option<Arc<SwappableJwtAuth>>>()
     596            0 :                     .unwrap()
     597            0 :                     .as_deref()
     598              :             }
     599            0 :         }))
     600            0 :     }
     601              : 
     602              :     // NB: on any changes do not forget to update the OpenAPI spec
     603              :     // located nearby (/safekeeper/src/http/openapi_spec.yaml).
     604            0 :     let auth = conf.http_auth.clone();
     605            0 :     router
     606            0 :         .data(Arc::new(conf))
     607            0 :         .data(auth)
     608            0 :         .get("/metrics", |r| request_span(r, prometheus_metrics_handler))
     609            0 :         .get("/v1/status", |r| request_span(r, status_handler))
     610            0 :         .put("/v1/failpoints", |r| {
     611            0 :             request_span(r, move |r| async {
     612            0 :                 check_permission(&r, None)?;
     613            0 :                 let cancel = CancellationToken::new();
     614            0 :                 failpoints_handler(r, cancel).await
     615            0 :             })
     616            0 :         })
     617            0 :         .delete("/v1/tenant/:tenant_id", |r| {
     618            0 :             request_span(r, tenant_delete_handler)
     619            0 :         })
     620            0 :         // Will be used in the future instead of implicit timeline creation
     621            0 :         .post("/v1/tenant/timeline", |r| {
     622            0 :             request_span(r, timeline_create_handler)
     623            0 :         })
     624            0 :         .get("/v1/tenant/timeline", |r| {
     625            0 :             request_span(r, timeline_list_handler)
     626            0 :         })
     627            0 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     628            0 :             request_span(r, timeline_status_handler)
     629            0 :         })
     630            0 :         .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     631            0 :             request_span(r, timeline_delete_handler)
     632            0 :         })
     633            0 :         .post("/v1/pull_timeline", |r| {
     634            0 :             request_span(r, timeline_pull_handler)
     635            0 :         })
     636            0 :         .get(
     637            0 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
     638            0 :             |r| request_span(r, timeline_snapshot_handler),
     639            0 :         )
     640            0 :         .post(
     641            0 :             "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
     642            0 :             |r| request_span(r, timeline_copy_handler),
     643            0 :         )
     644            0 :         .patch(
     645            0 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
     646            0 :             |r| request_span(r, patch_control_file_handler),
     647            0 :         )
     648            0 :         .post(
     649            0 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
     650            0 :             |r| request_span(r, timeline_checkpoint_handler),
     651            0 :         )
     652            0 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
     653            0 :             request_span(r, timeline_digest_handler)
     654            0 :         })
     655            0 :         .post(
     656            0 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
     657            0 :             |r| request_span(r, timeline_backup_partial_reset),
     658            0 :         )
     659            0 :         .post(
     660            0 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump",
     661            0 :             |r| request_span(r, timeline_term_bump_handler),
     662            0 :         )
     663            0 :         .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
     664            0 :             request_span(r, record_safekeeper_info)
     665            0 :         })
     666            0 :         .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
     667            0 : }
     668              : 
     669              : #[cfg(test)]
     670              : mod tests {
     671              :     use super::*;
     672              : 
     673              :     #[test]
     674            1 :     fn test_term_switch_entry_api_serialize() {
     675            1 :         let state = AcceptorStateStatus {
     676            1 :             term: 1,
     677            1 :             epoch: 1,
     678            1 :             term_history: vec![TermSwitchApiEntry {
     679            1 :                 term: 1,
     680            1 :                 lsn: Lsn(0x16FFDDDD),
     681            1 :             }],
     682            1 :         };
     683            1 :         let json = serde_json::to_string(&state).unwrap();
     684            1 :         assert_eq!(
     685            1 :             json,
     686            1 :             "{\"term\":1,\"epoch\":1,\"term_history\":[{\"term\":1,\"lsn\":\"0/16FFDDDD\"}]}"
     687            1 :         );
     688            1 :     }
     689              : }
        

Generated by: LCOV version 2.1-beta