LCOV - differential code coverage report
Current view: top level - safekeeper/src/http - routes.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 95.8 % 385 369 16 369
Current Date: 2024-01-09 02:06:09 Functions: 69.3 % 101 70 31 70
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use hyper::{Body, Request, Response, StatusCode, Uri};
       2                 : 
       3                 : use once_cell::sync::Lazy;
       4                 : use postgres_ffi::WAL_SEGMENT_SIZE;
       5                 : use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
       6                 : use serde::{Deserialize, Serialize};
       7                 : use std::collections::{HashMap, HashSet};
       8                 : use std::fmt;
       9                 : use std::str::FromStr;
      10                 : use std::sync::Arc;
      11                 : use storage_broker::proto::SafekeeperTimelineInfo;
      12                 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      13                 : use tokio::fs::File;
      14                 : use tokio::io::AsyncReadExt;
      15                 : use tokio_util::sync::CancellationToken;
      16                 : use utils::failpoint_support::failpoints_handler;
      17                 : use utils::http::request::parse_query_param;
      18                 : 
      19                 : use std::io::Write as _;
      20                 : use tokio::sync::mpsc;
      21                 : use tokio_stream::wrappers::ReceiverStream;
      22                 : use tracing::{info_span, Instrument};
      23                 : use utils::http::endpoint::{request_span, ChannelWriter};
      24                 : 
      25                 : use crate::debug_dump::TimelineDigestRequest;
      26                 : use crate::receive_wal::WalReceiverState;
      27                 : use crate::safekeeper::Term;
      28                 : use crate::safekeeper::{ServerInfo, TermLsn};
      29                 : use crate::send_wal::WalSenderState;
      30                 : use crate::timeline::PeerInfo;
      31                 : use crate::{copy_timeline, debug_dump, pull_timeline};
      32                 : 
      33                 : use crate::timelines_global_map::TimelineDeleteForceResult;
      34                 : use crate::GlobalTimelines;
      35                 : use crate::SafeKeeperConf;
      36                 : use utils::{
      37                 :     auth::SwappableJwtAuth,
      38                 :     http::{
      39                 :         endpoint::{self, auth_middleware, check_permission_with},
      40                 :         error::ApiError,
      41                 :         json::{json_request, json_response},
      42                 :         request::{ensure_no_body, parse_request_param},
      43                 :         RequestExt, RouterBuilder,
      44                 :     },
      45                 :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      46                 :     lsn::Lsn,
      47                 : };
      48                 : 
      49                 : use super::models::TimelineCreateRequest;
      50                 : 
      51 CBC         973 : #[derive(Debug, Serialize)]
      52                 : struct SafekeeperStatus {
      53                 :     id: NodeId,
      54                 : }
      55                 : 
      56                 : /// Healthcheck handler.
      57             973 : async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
      58             973 :     check_permission(&request, None)?;
      59             973 :     let conf = get_conf(&request);
      60             973 :     let status = SafekeeperStatus { id: conf.my_id };
      61             973 :     json_response(StatusCode::OK, status)
      62             973 : }
      63                 : 
      64            1220 : fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
      65            1220 :     request
      66            1220 :         .data::<Arc<SafeKeeperConf>>()
      67            1220 :         .expect("unknown state type")
      68            1220 :         .as_ref()
      69            1220 : }
      70                 : 
      71                 : /// Same as TermLsn, but serializes LSN using display serializer
      72                 : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
      73             639 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      74                 : pub struct TermSwitchApiEntry {
      75                 :     pub term: Term,
      76                 :     pub lsn: Lsn,
      77                 : }
      78                 : 
      79                 : impl From<TermSwitchApiEntry> for TermLsn {
      80               2 :     fn from(api_val: TermSwitchApiEntry) -> Self {
      81               2 :         TermLsn {
      82               2 :             term: api_val.term,
      83               2 :             lsn: api_val.lsn,
      84               2 :         }
      85               2 :     }
      86                 : }
      87                 : 
      88                 : /// Augment AcceptorState with epoch for convenience
      89             248 : #[derive(Debug, Serialize, Deserialize)]
      90                 : pub struct AcceptorStateStatus {
      91                 :     pub term: Term,
      92                 :     pub epoch: Term,
      93                 :     pub term_history: Vec<TermSwitchApiEntry>,
      94                 : }
      95                 : 
      96                 : /// Info about timeline on safekeeper ready for reporting.
      97             247 : #[derive(Debug, Serialize, Deserialize)]
      98                 : pub struct TimelineStatus {
      99                 :     pub tenant_id: TenantId,
     100                 :     pub timeline_id: TimelineId,
     101                 :     pub acceptor_state: AcceptorStateStatus,
     102                 :     pub pg_info: ServerInfo,
     103                 :     pub flush_lsn: Lsn,
     104                 :     pub timeline_start_lsn: Lsn,
     105                 :     pub local_start_lsn: Lsn,
     106                 :     pub commit_lsn: Lsn,
     107                 :     pub backup_lsn: Lsn,
     108                 :     pub peer_horizon_lsn: Lsn,
     109                 :     pub remote_consistent_lsn: Lsn,
     110                 :     pub peers: Vec<PeerInfo>,
     111                 :     pub walsenders: Vec<WalSenderState>,
     112                 :     pub walreceivers: Vec<WalReceiverState>,
     113                 : }
     114                 : 
     115            1366 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
     116            1366 :     check_permission_with(request, |claims| {
     117              43 :         crate::auth::check_permission(claims, tenant_id)
     118            1366 :     })
     119            1366 : }
     120                 : 
     121                 : /// Report info about timeline.
     122             248 : async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     123             248 :     let ttid = TenantTimelineId::new(
     124             248 :         parse_request_param(&request, "tenant_id")?,
     125             248 :         parse_request_param(&request, "timeline_id")?,
     126                 :     );
     127             248 :     check_permission(&request, Some(ttid.tenant_id))?;
     128                 : 
     129             247 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     130             247 :     let (inmem, state) = tli.get_state().await;
     131             247 :     let flush_lsn = tli.get_flush_lsn().await;
     132                 : 
     133             247 :     let epoch = state.acceptor_state.get_epoch(flush_lsn);
     134             247 :     let term_history = state
     135             247 :         .acceptor_state
     136             247 :         .term_history
     137             247 :         .0
     138             247 :         .into_iter()
     139             638 :         .map(|ts| TermSwitchApiEntry {
     140             638 :             term: ts.term,
     141             638 :             lsn: ts.lsn,
     142             638 :         })
     143             247 :         .collect();
     144             247 :     let acc_state = AcceptorStateStatus {
     145             247 :         term: state.acceptor_state.term,
     146             247 :         epoch,
     147             247 :         term_history,
     148             247 :     };
     149             247 : 
     150             247 :     let conf = get_conf(&request);
     151                 :     // Note: we report in memory values which can be lost.
     152             247 :     let status = TimelineStatus {
     153             247 :         tenant_id: ttid.tenant_id,
     154             247 :         timeline_id: ttid.timeline_id,
     155             247 :         acceptor_state: acc_state,
     156             247 :         pg_info: state.server,
     157             247 :         flush_lsn,
     158             247 :         timeline_start_lsn: state.timeline_start_lsn,
     159             247 :         local_start_lsn: state.local_start_lsn,
     160             247 :         commit_lsn: inmem.commit_lsn,
     161             247 :         backup_lsn: inmem.backup_lsn,
     162             247 :         peer_horizon_lsn: inmem.peer_horizon_lsn,
     163             247 :         remote_consistent_lsn: tli.get_walsenders().get_remote_consistent_lsn(),
     164             247 :         peers: tli.get_peers(conf).await,
     165             247 :         walsenders: tli.get_walsenders().get_all(),
     166             247 :         walreceivers: tli.get_walreceivers().get_all(),
     167             247 :     };
     168             247 :     json_response(StatusCode::OK, status)
     169             248 : }
     170                 : 
     171               3 : async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     172               3 :     let request_data: TimelineCreateRequest = json_request(&mut request).await?;
     173                 : 
     174               3 :     let ttid = TenantTimelineId {
     175               3 :         tenant_id: request_data.tenant_id,
     176               3 :         timeline_id: request_data.timeline_id,
     177               3 :     };
     178               3 :     check_permission(&request, Some(ttid.tenant_id))?;
     179                 : 
     180               3 :     let server_info = ServerInfo {
     181               3 :         pg_version: request_data.pg_version,
     182               3 :         system_id: request_data.system_id.unwrap_or(0),
     183               3 :         wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
     184               3 :     };
     185               3 :     let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
     186               3 :         request_data
     187               3 :             .commit_lsn
     188               3 :             .segment_lsn(server_info.wal_seg_size as usize)
     189               3 :     });
     190               3 :     GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
     191              15 :         .await
     192               3 :         .map_err(ApiError::InternalServerError)?;
     193                 : 
     194               3 :     json_response(StatusCode::OK, ())
     195               3 : }
     196                 : 
     197                 : /// Pull timeline from peer safekeeper instances.
     198               1 : async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     199               1 :     check_permission(&request, None)?;
     200                 : 
     201               1 :     let data: pull_timeline::Request = json_request(&mut request).await?;
     202                 : 
     203               1 :     let resp = pull_timeline::handle_request(data)
     204             112 :         .await
     205               1 :         .map_err(ApiError::InternalServerError)?;
     206               1 :     json_response(StatusCode::OK, resp)
     207               1 : }
     208                 : 
     209              48 : async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     210              48 :     check_permission(&request, None)?;
     211                 : 
     212              48 :     let request_data: TimelineCopyRequest = json_request(&mut request).await?;
     213              48 :     let ttid = TenantTimelineId::new(
     214              48 :         parse_request_param(&request, "tenant_id")?,
     215              48 :         parse_request_param(&request, "source_timeline_id")?,
     216                 :     );
     217                 : 
     218              48 :     let source = GlobalTimelines::get(ttid)?;
     219                 : 
     220              48 :     copy_timeline::handle_request(copy_timeline::Request{
     221              48 :         source,
     222              48 :         until_lsn: request_data.until_lsn,
     223              48 :         destination_ttid: TenantTimelineId::new(ttid.tenant_id, request_data.target_timeline_id),
     224              48 :     })
     225              48 :         .instrument(info_span!("copy_timeline", from=%ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
     226            2436 :         .await
     227              48 :         .map_err(ApiError::InternalServerError)?;
     228                 : 
     229              48 :     json_response(StatusCode::OK, ())
     230              48 : }
     231                 : 
     232              64 : async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     233              64 :     let ttid = TenantTimelineId::new(
     234              64 :         parse_request_param(&request, "tenant_id")?,
     235              64 :         parse_request_param(&request, "timeline_id")?,
     236                 :     );
     237              64 :     check_permission(&request, Some(ttid.tenant_id))?;
     238                 : 
     239              64 :     let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
     240              64 :     let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
     241                 : 
     242              64 :     let request = TimelineDigestRequest {
     243              64 :         from_lsn: from_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
     244              64 :             "from_lsn is required"
     245              64 :         )))?,
     246              64 :         until_lsn: until_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
     247              64 :             "until_lsn is required"
     248              64 :         )))?,
     249                 :     };
     250                 : 
     251              64 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     252                 : 
     253              64 :     let response = debug_dump::calculate_digest(&tli, request)
     254            3392 :         .await
     255              64 :         .map_err(ApiError::InternalServerError)?;
     256              64 :     json_response(StatusCode::OK, response)
     257              64 : }
     258                 : 
     259                 : /// Download a file from the timeline directory.
     260                 : // TODO: figure out a better way to copy files between safekeepers
     261               3 : async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     262               3 :     let ttid = TenantTimelineId::new(
     263               3 :         parse_request_param(&request, "tenant_id")?,
     264               3 :         parse_request_param(&request, "timeline_id")?,
     265                 :     );
     266               3 :     check_permission(&request, Some(ttid.tenant_id))?;
     267                 : 
     268               3 :     let filename: String = parse_request_param(&request, "filename")?;
     269                 : 
     270               3 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     271                 : 
     272               3 :     let filepath = tli.timeline_dir.join(filename);
     273               3 :     let mut file = File::open(&filepath)
     274               3 :         .await
     275               3 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     276                 : 
     277               3 :     let mut content = Vec::new();
     278               3 :     // TODO: don't store files in memory
     279               3 :     file.read_to_end(&mut content)
     280              55 :         .await
     281               3 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     282                 : 
     283               3 :     Response::builder()
     284               3 :         .status(StatusCode::OK)
     285               3 :         .header("Content-Type", "application/octet-stream")
     286               3 :         .body(Body::from(content))
     287               3 :         .map_err(|e| ApiError::InternalServerError(e.into()))
     288               3 : }
     289                 : 
     290                 : /// Deactivates the timeline and removes its data directory.
     291              12 : async fn timeline_delete_force_handler(
     292              12 :     mut request: Request<Body>,
     293              12 : ) -> Result<Response<Body>, ApiError> {
     294              12 :     let ttid = TenantTimelineId::new(
     295              12 :         parse_request_param(&request, "tenant_id")?,
     296              12 :         parse_request_param(&request, "timeline_id")?,
     297                 :     );
     298              12 :     check_permission(&request, Some(ttid.tenant_id))?;
     299              11 :     ensure_no_body(&mut request).await?;
     300                 :     // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
     301                 :     // error handling here when we're able to.
     302              11 :     let resp = GlobalTimelines::delete_force(&ttid)
     303               9 :         .await
     304              11 :         .map_err(ApiError::InternalServerError)?;
     305              11 :     json_response(StatusCode::OK, resp)
     306              12 : }
     307                 : 
     308                 : /// Deactivates all timelines for the tenant and removes its data directory.
     309                 : /// See `timeline_delete_force_handler`.
     310               5 : async fn tenant_delete_force_handler(
     311               5 :     mut request: Request<Body>,
     312               5 : ) -> Result<Response<Body>, ApiError> {
     313               5 :     let tenant_id = parse_request_param(&request, "tenant_id")?;
     314               5 :     check_permission(&request, Some(tenant_id))?;
     315               4 :     ensure_no_body(&mut request).await?;
     316                 :     // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
     317                 :     // Using an `InternalServerError` should be fixed when the types support it
     318               4 :     let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
     319              16 :         .await
     320               4 :         .map_err(ApiError::InternalServerError)?;
     321               4 :     json_response(
     322               4 :         StatusCode::OK,
     323               4 :         delete_info
     324               4 :             .iter()
     325              16 :             .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
     326               4 :             .collect::<HashMap<String, TimelineDeleteForceResult>>(),
     327               4 :     )
     328               5 : }
     329                 : 
     330                 : /// Used only in tests to hand craft required data.
     331               4 : async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     332               4 :     let ttid = TenantTimelineId::new(
     333               4 :         parse_request_param(&request, "tenant_id")?,
     334               4 :         parse_request_param(&request, "timeline_id")?,
     335                 :     );
     336               4 :     check_permission(&request, Some(ttid.tenant_id))?;
     337               3 :     let sk_info: SkTimelineInfo = json_request(&mut request).await?;
     338               3 :     let proto_sk_info = SafekeeperTimelineInfo {
     339               3 :         safekeeper_id: 0,
     340               3 :         tenant_timeline_id: Some(ProtoTenantTimelineId {
     341               3 :             tenant_id: ttid.tenant_id.as_ref().to_owned(),
     342               3 :             timeline_id: ttid.timeline_id.as_ref().to_owned(),
     343               3 :         }),
     344               3 :         term: sk_info.term.unwrap_or(0),
     345               3 :         last_log_term: sk_info.last_log_term.unwrap_or(0),
     346               3 :         flush_lsn: sk_info.flush_lsn.0,
     347               3 :         commit_lsn: sk_info.commit_lsn.0,
     348               3 :         remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
     349               3 :         peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
     350               3 :         safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
     351               3 :         http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
     352               3 :         backup_lsn: sk_info.backup_lsn.0,
     353               3 :         local_start_lsn: sk_info.local_start_lsn.0,
     354               3 :         availability_zone: None,
     355               3 :     };
     356                 : 
     357               3 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     358               3 :     tli.record_safekeeper_info(proto_sk_info)
     359               9 :         .await
     360               3 :         .map_err(ApiError::InternalServerError)?;
     361                 : 
     362               3 :     json_response(StatusCode::OK, ())
     363               4 : }
     364                 : 
     365               7 : fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
     366               7 :     v.parse()
     367               7 :         .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
     368               7 : }
     369                 : 
     370                 : /// Dump debug info about all available safekeeper state.
     371               5 : async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     372               5 :     check_permission(&request, None)?;
     373               5 :     ensure_no_body(&mut request).await?;
     374                 : 
     375               5 :     let mut dump_all: Option<bool> = None;
     376               5 :     let mut dump_control_file: Option<bool> = None;
     377               5 :     let mut dump_memory: Option<bool> = None;
     378               5 :     let mut dump_disk_content: Option<bool> = None;
     379               5 :     let mut dump_term_history: Option<bool> = None;
     380               5 :     let mut tenant_id: Option<TenantId> = None;
     381               5 :     let mut timeline_id: Option<TimelineId> = None;
     382               5 : 
     383               5 :     let query = request.uri().query().unwrap_or("");
     384               5 :     let mut values = url::form_urlencoded::parse(query.as_bytes());
     385                 : 
     386              12 :     for (k, v) in &mut values {
     387               7 :         match k.as_ref() {
     388               7 :             "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
     389               2 :             "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
     390               2 :             "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
     391               2 :             "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
     392               2 :             "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
     393               2 :             "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
     394               1 :             "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
     395 UBC           0 :             _ => Err(ApiError::BadRequest(anyhow::anyhow!(
     396               0 :                 "Unknown query parameter: {}",
     397               0 :                 k
     398               0 :             )))?,
     399                 :         }
     400                 :     }
     401                 : 
     402 CBC           5 :     let dump_all = dump_all.unwrap_or(false);
     403               5 :     let dump_control_file = dump_control_file.unwrap_or(dump_all);
     404               5 :     let dump_memory = dump_memory.unwrap_or(dump_all);
     405               5 :     let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
     406               5 :     let dump_term_history = dump_term_history.unwrap_or(true);
     407               5 : 
     408               5 :     let args = debug_dump::Args {
     409               5 :         dump_all,
     410               5 :         dump_control_file,
     411               5 :         dump_memory,
     412               5 :         dump_disk_content,
     413               5 :         dump_term_history,
     414               5 :         tenant_id,
     415               5 :         timeline_id,
     416               5 :     };
     417                 : 
     418               5 :     let resp = debug_dump::build(args)
     419 UBC           0 :         .await
     420 CBC           5 :         .map_err(ApiError::InternalServerError)?;
     421                 : 
     422               5 :     let started_at = std::time::Instant::now();
     423               5 : 
     424               5 :     let (tx, rx) = mpsc::channel(1);
     425               5 : 
     426               5 :     let body = Body::wrap_stream(ReceiverStream::new(rx));
     427               5 : 
     428               5 :     let mut writer = ChannelWriter::new(128 * 1024, tx);
     429               5 : 
     430               5 :     let response = Response::builder()
     431               5 :         .status(200)
     432               5 :         .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
     433               5 :         .body(body)
     434               5 :         .unwrap();
     435                 : 
     436               5 :     let span = info_span!("blocking");
     437               5 :     tokio::task::spawn_blocking(move || {
     438               5 :         let _span = span.entered();
     439               5 : 
     440               5 :         let res = serde_json::to_writer(&mut writer, &resp)
     441               5 :             .map_err(std::io::Error::from)
     442               5 :             .and_then(|_| writer.flush());
     443               5 : 
     444               5 :         match res {
     445                 :             Ok(()) => {
     446               5 :                 tracing::info!(
     447               5 :                     bytes = writer.flushed_bytes(),
     448               5 :                     elapsed_ms = started_at.elapsed().as_millis(),
     449               5 :                     "responded /v1/debug_dump"
     450               5 :                 );
     451                 :             }
     452 UBC           0 :             Err(e) => {
     453               0 :                 tracing::warn!("failed to write out /v1/debug_dump response: {e:#}");
     454                 :                 // semantics of this error are quite... unclear. we want to error the stream out to
     455                 :                 // abort the response to somehow notify the client that we failed.
     456                 :                 //
     457                 :                 // though, most likely the reason for failure is that the receiver is already gone.
     458               0 :                 drop(
     459               0 :                     writer
     460               0 :                         .tx
     461               0 :                         .blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
     462               0 :                 );
     463                 :             }
     464                 :         }
     465 CBC           5 :     });
     466               5 : 
     467               5 :     Ok(response)
     468               5 : }
     469                 : 
     470                 : /// Safekeeper http router.
     471             485 : pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
     472             485 :     let mut router = endpoint::make_router();
     473             485 :     if conf.http_auth.is_some() {
     474              92 :         router = router.middleware(auth_middleware(|request| {
     475              92 :             #[allow(clippy::mutable_key_type)]
     476              92 :             static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
     477              20 :                 ["/v1/status", "/metrics"]
     478              20 :                     .iter()
     479              40 :                     .map(|v| v.parse().unwrap())
     480              20 :                     .collect()
     481              20 :             });
     482              92 :             if ALLOWLIST_ROUTES.contains(request.uri()) {
     483              44 :                 None
     484                 :             } else {
     485                 :                 // Option<Arc<SwappableJwtAuth>> is always provided as data below, hence unwrap().
     486              48 :                 request
     487              48 :                     .data::<Option<Arc<SwappableJwtAuth>>>()
     488              48 :                     .unwrap()
     489              48 :                     .as_deref()
     490                 :             }
     491              92 :         }))
     492             465 :     }
     493                 : 
     494                 :     // NB: on any changes do not forget to update the OpenAPI spec
     495                 :     // located nearby (/safekeeper/src/http/openapi_spec.yaml).
     496             485 :     let auth = conf.http_auth.clone();
     497             485 :     router
     498             485 :         .data(Arc::new(conf))
     499             485 :         .data(auth)
     500             973 :         .get("/v1/status", |r| request_span(r, status_handler))
     501             485 :         .put("/v1/failpoints", |r| {
     502 UBC           0 :             request_span(r, move |r| async {
     503               0 :                 let cancel = CancellationToken::new();
     504               0 :                 failpoints_handler(r, cancel).await
     505               0 :             })
     506 CBC         485 :         })
     507             485 :         // Will be used in the future instead of implicit timeline creation
     508             485 :         .post("/v1/tenant/timeline", |r| {
     509               3 :             request_span(r, timeline_create_handler)
     510             485 :         })
     511             485 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     512             248 :             request_span(r, timeline_status_handler)
     513             485 :         })
     514             485 :         .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     515              12 :             request_span(r, timeline_delete_force_handler)
     516             485 :         })
     517             485 :         .delete("/v1/tenant/:tenant_id", |r| {
     518               5 :             request_span(r, tenant_delete_force_handler)
     519             485 :         })
     520             485 :         .post("/v1/pull_timeline", |r| {
     521               1 :             request_span(r, timeline_pull_handler)
     522             485 :         })
     523             485 :         .get(
     524             485 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
     525             485 :             |r| request_span(r, timeline_files_handler),
     526             485 :         )
     527             485 :         .post(
     528             485 :             "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
     529             485 :             |r| request_span(r, timeline_copy_handler),
     530             485 :         )
     531             485 :         // for tests
     532             485 :         .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
     533               4 :             request_span(r, record_safekeeper_info)
     534             485 :         })
     535             485 :         .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
     536             485 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
     537              64 :             request_span(r, timeline_digest_handler)
     538             485 :         })
     539             485 : }
     540                 : 
     541                 : #[cfg(test)]
     542                 : mod tests {
     543                 :     use super::*;
     544                 : 
     545               1 :     #[test]
     546               1 :     fn test_term_switch_entry_api_serialize() {
     547               1 :         let state = AcceptorStateStatus {
     548               1 :             term: 1,
     549               1 :             epoch: 1,
     550               1 :             term_history: vec![TermSwitchApiEntry {
     551               1 :                 term: 1,
     552               1 :                 lsn: Lsn(0x16FFDDDD),
     553               1 :             }],
     554               1 :         };
     555               1 :         let json = serde_json::to_string(&state).unwrap();
     556               1 :         assert_eq!(
     557               1 :             json,
     558               1 :             "{\"term\":1,\"epoch\":1,\"term_history\":[{\"term\":1,\"lsn\":\"0/16FFDDDD\"}]}"
     559               1 :         );
     560               1 :     }
     561                 : }
        

Generated by: LCOV version 2.1-beta