LCOV - code coverage report
Current view: top level - safekeeper/src/http - routes.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 97.0 % 402 390
Test Date: 2024-02-07 07:37:29 Functions: 72.1 % 104 75

            Line data    Source code
       1              : use hyper::{Body, Request, Response, StatusCode, Uri};
       2              : 
       3              : use once_cell::sync::Lazy;
       4              : use postgres_ffi::WAL_SEGMENT_SIZE;
       5              : use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
       6              : use serde::{Deserialize, Serialize};
       7              : use std::collections::{HashMap, HashSet};
       8              : use std::fmt;
       9              : use std::str::FromStr;
      10              : use std::sync::Arc;
      11              : use storage_broker::proto::SafekeeperTimelineInfo;
      12              : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      13              : use tokio::fs::File;
      14              : use tokio::io::AsyncReadExt;
      15              : use tokio_util::sync::CancellationToken;
      16              : use utils::failpoint_support::failpoints_handler;
      17              : use utils::http::request::parse_query_param;
      18              : 
      19              : use std::io::Write as _;
      20              : use tokio::sync::mpsc;
      21              : use tokio_stream::wrappers::ReceiverStream;
      22              : use tracing::{info_span, Instrument};
      23              : use utils::http::endpoint::{request_span, ChannelWriter};
      24              : 
      25              : use crate::debug_dump::TimelineDigestRequest;
      26              : use crate::receive_wal::WalReceiverState;
      27              : use crate::safekeeper::Term;
      28              : use crate::safekeeper::{ServerInfo, TermLsn};
      29              : use crate::send_wal::WalSenderState;
      30              : use crate::timeline::PeerInfo;
      31              : use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
      32              : 
      33              : use crate::timelines_global_map::TimelineDeleteForceResult;
      34              : use crate::GlobalTimelines;
      35              : use crate::SafeKeeperConf;
      36              : use utils::{
      37              :     auth::SwappableJwtAuth,
      38              :     http::{
      39              :         endpoint::{self, auth_middleware, check_permission_with},
      40              :         error::ApiError,
      41              :         json::{json_request, json_response},
      42              :         request::{ensure_no_body, parse_request_param},
      43              :         RequestExt, RouterBuilder,
      44              :     },
      45              :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      46              :     lsn::Lsn,
      47              : };
      48              : 
      49              : use super::models::TimelineCreateRequest;
      50              : 
      51         1023 : #[derive(Debug, Serialize)]
      52              : struct SafekeeperStatus {
      53              :     id: NodeId,
      54              : }
      55              : 
      56              : /// Healthcheck handler.
      57         1023 : async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
      58         1023 :     check_permission(&request, None)?;
      59         1023 :     let conf = get_conf(&request);
      60         1023 :     let status = SafekeeperStatus { id: conf.my_id };
      61         1023 :     json_response(StatusCode::OK, status)
      62         1023 : }
      63              : 
      64         1275 : fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
      65         1275 :     request
      66         1275 :         .data::<Arc<SafeKeeperConf>>()
      67         1275 :         .expect("unknown state type")
      68         1275 :         .as_ref()
      69         1275 : }
      70              : 
      71              : /// Same as TermLsn, but serializes LSN using display serializer
      72              : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
      73          630 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      74              : pub struct TermSwitchApiEntry {
      75              :     pub term: Term,
      76              :     pub lsn: Lsn,
      77              : }
      78              : 
      79              : impl From<TermSwitchApiEntry> for TermLsn {
      80            2 :     fn from(api_val: TermSwitchApiEntry) -> Self {
      81            2 :         TermLsn {
      82            2 :             term: api_val.term,
      83            2 :             lsn: api_val.lsn,
      84            2 :         }
      85            2 :     }
      86              : }
      87              : 
      88              : /// Augment AcceptorState with epoch for convenience
      89          254 : #[derive(Debug, Serialize, Deserialize)]
      90              : pub struct AcceptorStateStatus {
      91              :     pub term: Term,
      92              :     pub epoch: Term,
      93              :     pub term_history: Vec<TermSwitchApiEntry>,
      94              : }
      95              : 
      96              : /// Info about timeline on safekeeper ready for reporting.
      97          252 : #[derive(Debug, Serialize, Deserialize)]
      98              : pub struct TimelineStatus {
      99              :     pub tenant_id: TenantId,
     100              :     pub timeline_id: TimelineId,
     101              :     pub acceptor_state: AcceptorStateStatus,
     102              :     pub pg_info: ServerInfo,
     103              :     pub flush_lsn: Lsn,
     104              :     pub timeline_start_lsn: Lsn,
     105              :     pub local_start_lsn: Lsn,
     106              :     pub commit_lsn: Lsn,
     107              :     pub backup_lsn: Lsn,
     108              :     pub peer_horizon_lsn: Lsn,
     109              :     pub remote_consistent_lsn: Lsn,
     110              :     pub peers: Vec<PeerInfo>,
     111              :     pub walsenders: Vec<WalSenderState>,
     112              :     pub walreceivers: Vec<WalReceiverState>,
     113              : }
     114              : 
     115         1426 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
     116         1426 :     check_permission_with(request, |claims| {
     117           29 :         crate::auth::check_permission(claims, tenant_id)
     118         1426 :     })
     119         1426 : }
     120              : 
     121              : /// Report info about timeline.
     122          253 : async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     123          253 :     let ttid = TenantTimelineId::new(
     124          253 :         parse_request_param(&request, "tenant_id")?,
     125          253 :         parse_request_param(&request, "timeline_id")?,
     126              :     );
     127          253 :     check_permission(&request, Some(ttid.tenant_id))?;
     128              : 
     129          252 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     130          252 :     let (inmem, state) = tli.get_state().await;
     131          252 :     let flush_lsn = tli.get_flush_lsn().await;
     132              : 
     133          252 :     let epoch = state.acceptor_state.get_epoch(flush_lsn);
     134          252 :     let term_history = state
     135          252 :         .acceptor_state
     136          252 :         .term_history
     137          252 :         .0
     138          252 :         .into_iter()
     139          628 :         .map(|ts| TermSwitchApiEntry {
     140          628 :             term: ts.term,
     141          628 :             lsn: ts.lsn,
     142          628 :         })
     143          252 :         .collect();
     144          252 :     let acc_state = AcceptorStateStatus {
     145          252 :         term: state.acceptor_state.term,
     146          252 :         epoch,
     147          252 :         term_history,
     148          252 :     };
     149          252 : 
     150          252 :     let conf = get_conf(&request);
     151              :     // Note: we report in memory values which can be lost.
     152          252 :     let status = TimelineStatus {
     153          252 :         tenant_id: ttid.tenant_id,
     154          252 :         timeline_id: ttid.timeline_id,
     155          252 :         acceptor_state: acc_state,
     156          252 :         pg_info: state.server,
     157          252 :         flush_lsn,
     158          252 :         timeline_start_lsn: state.timeline_start_lsn,
     159          252 :         local_start_lsn: state.local_start_lsn,
     160          252 :         commit_lsn: inmem.commit_lsn,
     161          252 :         backup_lsn: inmem.backup_lsn,
     162          252 :         peer_horizon_lsn: inmem.peer_horizon_lsn,
     163          252 :         remote_consistent_lsn: inmem.remote_consistent_lsn,
     164          252 :         peers: tli.get_peers(conf).await,
     165          252 :         walsenders: tli.get_walsenders().get_all(),
     166          252 :         walreceivers: tli.get_walreceivers().get_all(),
     167          252 :     };
     168          252 :     json_response(StatusCode::OK, status)
     169          253 : }
     170              : 
     171            3 : async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     172            3 :     let request_data: TimelineCreateRequest = json_request(&mut request).await?;
     173              : 
     174            3 :     let ttid = TenantTimelineId {
     175            3 :         tenant_id: request_data.tenant_id,
     176            3 :         timeline_id: request_data.timeline_id,
     177            3 :     };
     178            3 :     check_permission(&request, Some(ttid.tenant_id))?;
     179              : 
     180            3 :     let server_info = ServerInfo {
     181            3 :         pg_version: request_data.pg_version,
     182            3 :         system_id: request_data.system_id.unwrap_or(0),
     183            3 :         wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
     184            3 :     };
     185            3 :     let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
     186            3 :         request_data
     187            3 :             .commit_lsn
     188            3 :             .segment_lsn(server_info.wal_seg_size as usize)
     189            3 :     });
     190            3 :     GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
     191           15 :         .await
     192            3 :         .map_err(ApiError::InternalServerError)?;
     193              : 
     194            3 :     json_response(StatusCode::OK, ())
     195            3 : }
     196              : 
     197              : /// Pull timeline from peer safekeeper instances.
     198            1 : async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     199            1 :     check_permission(&request, None)?;
     200              : 
     201            1 :     let data: pull_timeline::Request = json_request(&mut request).await?;
     202              : 
     203            1 :     let resp = pull_timeline::handle_request(data)
     204          111 :         .await
     205            1 :         .map_err(ApiError::InternalServerError)?;
     206            1 :     json_response(StatusCode::OK, resp)
     207            1 : }
     208              : 
     209           48 : async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     210           48 :     check_permission(&request, None)?;
     211              : 
     212           48 :     let request_data: TimelineCopyRequest = json_request(&mut request).await?;
     213           48 :     let ttid = TenantTimelineId::new(
     214           48 :         parse_request_param(&request, "tenant_id")?,
     215           48 :         parse_request_param(&request, "source_timeline_id")?,
     216              :     );
     217              : 
     218           48 :     let source = GlobalTimelines::get(ttid)?;
     219              : 
     220           48 :     copy_timeline::handle_request(copy_timeline::Request{
     221           48 :         source,
     222           48 :         until_lsn: request_data.until_lsn,
     223           48 :         destination_ttid: TenantTimelineId::new(ttid.tenant_id, request_data.target_timeline_id),
     224           48 :     })
     225           48 :         .instrument(info_span!("copy_timeline", from=%ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
     226         2683 :         .await
     227           48 :         .map_err(ApiError::InternalServerError)?;
     228              : 
     229           48 :     json_response(StatusCode::OK, ())
     230           48 : }
     231              : 
     232           64 : async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     233           64 :     let ttid = TenantTimelineId::new(
     234           64 :         parse_request_param(&request, "tenant_id")?,
     235           64 :         parse_request_param(&request, "timeline_id")?,
     236              :     );
     237           64 :     check_permission(&request, Some(ttid.tenant_id))?;
     238              : 
     239           64 :     let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
     240           64 :     let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
     241              : 
     242           64 :     let request = TimelineDigestRequest {
     243           64 :         from_lsn: from_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
     244           64 :             "from_lsn is required"
     245           64 :         )))?,
     246           64 :         until_lsn: until_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
     247           64 :             "until_lsn is required"
     248           64 :         )))?,
     249              :     };
     250              : 
     251           64 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     252              : 
     253           64 :     let response = debug_dump::calculate_digest(&tli, request)
     254         3288 :         .await
     255           64 :         .map_err(ApiError::InternalServerError)?;
     256           64 :     json_response(StatusCode::OK, response)
     257           64 : }
     258              : 
     259              : /// Download a file from the timeline directory.
     260              : // TODO: figure out a better way to copy files between safekeepers
     261            3 : async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     262            3 :     let ttid = TenantTimelineId::new(
     263            3 :         parse_request_param(&request, "tenant_id")?,
     264            3 :         parse_request_param(&request, "timeline_id")?,
     265              :     );
     266            3 :     check_permission(&request, Some(ttid.tenant_id))?;
     267              : 
     268            3 :     let filename: String = parse_request_param(&request, "filename")?;
     269              : 
     270            3 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     271              : 
     272            3 :     let filepath = tli.timeline_dir.join(filename);
     273            3 :     let mut file = File::open(&filepath)
     274            3 :         .await
     275            3 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     276              : 
     277            3 :     let mut content = Vec::new();
     278            3 :     // TODO: don't store files in memory
     279            3 :     file.read_to_end(&mut content)
     280           55 :         .await
     281            3 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     282              : 
     283            3 :     Response::builder()
     284            3 :         .status(StatusCode::OK)
     285            3 :         .header("Content-Type", "application/octet-stream")
     286            3 :         .body(Body::from(content))
     287            3 :         .map_err(|e| ApiError::InternalServerError(e.into()))
     288            3 : }
     289              : 
     290              : /// Deactivates the timeline and removes its data directory.
     291           15 : async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     292           15 :     let ttid = TenantTimelineId::new(
     293           15 :         parse_request_param(&request, "tenant_id")?,
     294           15 :         parse_request_param(&request, "timeline_id")?,
     295              :     );
     296           15 :     let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
     297           15 :     check_permission(&request, Some(ttid.tenant_id))?;
     298           14 :     ensure_no_body(&mut request).await?;
     299              :     // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
     300              :     // error handling here when we're able to.
     301           14 :     let resp = GlobalTimelines::delete(&ttid, only_local)
     302           30 :         .await
     303           14 :         .map_err(ApiError::InternalServerError)?;
     304           14 :     json_response(StatusCode::OK, resp)
     305           15 : }
     306              : 
     307              : /// Deactivates all timelines for the tenant and removes its data directory.
     308              : /// See `timeline_delete_handler`.
     309            5 : async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     310            5 :     let tenant_id = parse_request_param(&request, "tenant_id")?;
     311            5 :     let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
     312            5 :     check_permission(&request, Some(tenant_id))?;
     313            4 :     ensure_no_body(&mut request).await?;
     314              :     // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
     315              :     // Using an `InternalServerError` should be fixed when the types support it
     316            4 :     let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local)
     317           16 :         .await
     318            4 :         .map_err(ApiError::InternalServerError)?;
     319            4 :     json_response(
     320            4 :         StatusCode::OK,
     321            4 :         delete_info
     322            4 :             .iter()
     323           16 :             .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
     324            4 :             .collect::<HashMap<String, TimelineDeleteForceResult>>(),
     325            4 :     )
     326            5 : }
     327              : 
     328              : /// Used only in tests to hand craft required data.
     329            4 : async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     330            4 :     let ttid = TenantTimelineId::new(
     331            4 :         parse_request_param(&request, "tenant_id")?,
     332            4 :         parse_request_param(&request, "timeline_id")?,
     333              :     );
     334            4 :     check_permission(&request, Some(ttid.tenant_id))?;
     335            3 :     let sk_info: SkTimelineInfo = json_request(&mut request).await?;
     336            3 :     let proto_sk_info = SafekeeperTimelineInfo {
     337            3 :         safekeeper_id: 0,
     338            3 :         tenant_timeline_id: Some(ProtoTenantTimelineId {
     339            3 :             tenant_id: ttid.tenant_id.as_ref().to_owned(),
     340            3 :             timeline_id: ttid.timeline_id.as_ref().to_owned(),
     341            3 :         }),
     342            3 :         term: sk_info.term.unwrap_or(0),
     343            3 :         last_log_term: sk_info.last_log_term.unwrap_or(0),
     344            3 :         flush_lsn: sk_info.flush_lsn.0,
     345            3 :         commit_lsn: sk_info.commit_lsn.0,
     346            3 :         remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
     347            3 :         peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
     348            3 :         safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
     349            3 :         http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
     350            3 :         backup_lsn: sk_info.backup_lsn.0,
     351            3 :         local_start_lsn: sk_info.local_start_lsn.0,
     352            3 :         availability_zone: None,
     353            3 :         standby_horizon: sk_info.standby_horizon.0,
     354            3 :     };
     355              : 
     356            3 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     357            3 :     tli.record_safekeeper_info(proto_sk_info)
     358            9 :         .await
     359            3 :         .map_err(ApiError::InternalServerError)?;
     360              : 
     361            3 :     json_response(StatusCode::OK, ())
     362            4 : }
     363              : 
     364            9 : fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
     365            9 :     v.parse()
     366            9 :         .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
     367            9 : }
     368              : 
     369              : /// Dump debug info about all available safekeeper state.
     370            6 : async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     371            6 :     check_permission(&request, None)?;
     372            6 :     ensure_no_body(&mut request).await?;
     373              : 
     374            6 :     let mut dump_all: Option<bool> = None;
     375            6 :     let mut dump_control_file: Option<bool> = None;
     376            6 :     let mut dump_memory: Option<bool> = None;
     377            6 :     let mut dump_disk_content: Option<bool> = None;
     378            6 :     let mut dump_term_history: Option<bool> = None;
     379            6 :     let mut tenant_id: Option<TenantId> = None;
     380            6 :     let mut timeline_id: Option<TimelineId> = None;
     381            6 : 
     382            6 :     let query = request.uri().query().unwrap_or("");
     383            6 :     let mut values = url::form_urlencoded::parse(query.as_bytes());
     384              : 
     385           15 :     for (k, v) in &mut values {
     386            9 :         match k.as_ref() {
     387            9 :             "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
     388            4 :             "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
     389            3 :             "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
     390            3 :             "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
     391            3 :             "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
     392            3 :             "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
     393            2 :             "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
     394            0 :             _ => Err(ApiError::BadRequest(anyhow::anyhow!(
     395            0 :                 "Unknown query parameter: {}",
     396            0 :                 k
     397            0 :             )))?,
     398              :         }
     399              :     }
     400              : 
     401            6 :     let dump_all = dump_all.unwrap_or(false);
     402            6 :     let dump_control_file = dump_control_file.unwrap_or(dump_all);
     403            6 :     let dump_memory = dump_memory.unwrap_or(dump_all);
     404            6 :     let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
     405            6 :     let dump_term_history = dump_term_history.unwrap_or(true);
     406            6 : 
     407            6 :     let args = debug_dump::Args {
     408            6 :         dump_all,
     409            6 :         dump_control_file,
     410            6 :         dump_memory,
     411            6 :         dump_disk_content,
     412            6 :         dump_term_history,
     413            6 :         tenant_id,
     414            6 :         timeline_id,
     415            6 :     };
     416              : 
     417            6 :     let resp = debug_dump::build(args)
     418            0 :         .await
     419            6 :         .map_err(ApiError::InternalServerError)?;
     420              : 
     421            6 :     let started_at = std::time::Instant::now();
     422            6 : 
     423            6 :     let (tx, rx) = mpsc::channel(1);
     424            6 : 
     425            6 :     let body = Body::wrap_stream(ReceiverStream::new(rx));
     426            6 : 
     427            6 :     let mut writer = ChannelWriter::new(128 * 1024, tx);
     428            6 : 
     429            6 :     let response = Response::builder()
     430            6 :         .status(200)
     431            6 :         .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
     432            6 :         .body(body)
     433            6 :         .unwrap();
     434              : 
     435            6 :     let span = info_span!("blocking");
     436            6 :     tokio::task::spawn_blocking(move || {
     437            6 :         let _span = span.entered();
     438            6 : 
     439            6 :         let res = serde_json::to_writer(&mut writer, &resp)
     440            6 :             .map_err(std::io::Error::from)
     441            6 :             .and_then(|_| writer.flush());
     442            6 : 
     443            6 :         match res {
     444              :             Ok(()) => {
     445            6 :                 tracing::info!(
     446            6 :                     bytes = writer.flushed_bytes(),
     447            6 :                     elapsed_ms = started_at.elapsed().as_millis(),
     448            6 :                     "responded /v1/debug_dump"
     449            6 :                 );
     450              :             }
     451            0 :             Err(e) => {
     452            0 :                 tracing::warn!("failed to write out /v1/debug_dump response: {e:#}");
     453              :                 // semantics of this error are quite... unclear. we want to error the stream out to
     454              :                 // abort the response to somehow notify the client that we failed.
     455              :                 //
     456              :                 // though, most likely the reason for failure is that the receiver is already gone.
     457            0 :                 drop(
     458            0 :                     writer
     459            0 :                         .tx
     460            0 :                         .blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
     461            0 :                 );
     462              :             }
     463              :         }
     464            6 :     });
     465            6 : 
     466            6 :     Ok(response)
     467            6 : }
     468              : 
     469            1 : async fn patch_control_file_handler(
     470            1 :     mut request: Request<Body>,
     471            1 : ) -> Result<Response<Body>, ApiError> {
     472            1 :     check_permission(&request, None)?;
     473              : 
     474            1 :     let ttid = TenantTimelineId::new(
     475            1 :         parse_request_param(&request, "tenant_id")?,
     476            1 :         parse_request_param(&request, "timeline_id")?,
     477              :     );
     478              : 
     479            1 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     480              : 
     481            1 :     let patch_request: patch_control_file::Request = json_request(&mut request).await?;
     482            1 :     let response = patch_control_file::handle_request(tli, patch_request)
     483            3 :         .await
     484            1 :         .map_err(ApiError::InternalServerError)?;
     485              : 
     486            1 :     json_response(StatusCode::OK, response)
     487            1 : }
     488              : 
     489              : /// Safekeeper http router.
     490          510 : pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
     491          510 :     let mut router = endpoint::make_router();
     492          510 :     if conf.http_auth.is_some() {
     493           78 :         router = router.middleware(auth_middleware(|request| {
     494           78 :             #[allow(clippy::mutable_key_type)]
     495           78 :             static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
     496           20 :                 ["/v1/status", "/metrics"]
     497           20 :                     .iter()
     498           40 :                     .map(|v| v.parse().unwrap())
     499           20 :                     .collect()
     500           20 :             });
     501           78 :             if ALLOWLIST_ROUTES.contains(request.uri()) {
     502           44 :                 None
     503              :             } else {
     504              :                 // Option<Arc<SwappableJwtAuth>> is always provided as data below, hence unwrap().
     505           34 :                 request
     506           34 :                     .data::<Option<Arc<SwappableJwtAuth>>>()
     507           34 :                     .unwrap()
     508           34 :                     .as_deref()
     509              :             }
     510           78 :         }))
     511          490 :     }
     512              : 
     513              :     // NB: on any changes do not forget to update the OpenAPI spec
     514              :     // located nearby (/safekeeper/src/http/openapi_spec.yaml).
     515          510 :     let auth = conf.http_auth.clone();
     516          510 :     router
     517          510 :         .data(Arc::new(conf))
     518          510 :         .data(auth)
     519         1023 :         .get("/v1/status", |r| request_span(r, status_handler))
     520          510 :         .put("/v1/failpoints", |r| {
     521            1 :             request_span(r, move |r| async {
     522            1 :                 let cancel = CancellationToken::new();
     523            1 :                 failpoints_handler(r, cancel).await
     524            1 :             })
     525          510 :         })
     526          510 :         // Will be used in the future instead of implicit timeline creation
     527          510 :         .post("/v1/tenant/timeline", |r| {
     528            3 :             request_span(r, timeline_create_handler)
     529          510 :         })
     530          510 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     531          253 :             request_span(r, timeline_status_handler)
     532          510 :         })
     533          510 :         .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     534           15 :             request_span(r, timeline_delete_handler)
     535          510 :         })
     536          510 :         .delete("/v1/tenant/:tenant_id", |r| {
     537            5 :             request_span(r, tenant_delete_handler)
     538          510 :         })
     539          510 :         .post("/v1/pull_timeline", |r| {
     540            1 :             request_span(r, timeline_pull_handler)
     541          510 :         })
     542          510 :         .get(
     543          510 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
     544          510 :             |r| request_span(r, timeline_files_handler),
     545          510 :         )
     546          510 :         .post(
     547          510 :             "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
     548          510 :             |r| request_span(r, timeline_copy_handler),
     549          510 :         )
     550          510 :         .patch(
     551          510 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
     552          510 :             |r| request_span(r, patch_control_file_handler),
     553          510 :         )
     554          510 :         // for tests
     555          510 :         .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
     556            4 :             request_span(r, record_safekeeper_info)
     557          510 :         })
     558          510 :         .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
     559          510 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
     560           64 :             request_span(r, timeline_digest_handler)
     561          510 :         })
     562          510 : }
     563              : 
     564              : #[cfg(test)]
     565              : mod tests {
     566              :     use super::*;
     567              : 
     568            2 :     #[test]
     569            2 :     fn test_term_switch_entry_api_serialize() {
     570            2 :         let state = AcceptorStateStatus {
     571            2 :             term: 1,
     572            2 :             epoch: 1,
     573            2 :             term_history: vec![TermSwitchApiEntry {
     574            2 :                 term: 1,
     575            2 :                 lsn: Lsn(0x16FFDDDD),
     576            2 :             }],
     577            2 :         };
     578            2 :         let json = serde_json::to_string(&state).unwrap();
     579            2 :         assert_eq!(
     580            2 :             json,
     581            2 :             "{\"term\":1,\"epoch\":1,\"term_history\":[{\"term\":1,\"lsn\":\"0/16FFDDDD\"}]}"
     582            2 :         );
     583            2 :     }
     584              : }
        

Generated by: LCOV version 2.1-beta