LCOV - differential code coverage report
Current view: top level - safekeeper/src/http - routes.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 98.3 % 290 285 5 285
Current Date: 2023-10-19 02:04:12 Functions: 61.0 % 105 64 41 64
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use hyper::{Body, Request, Response, StatusCode, Uri};
       2                 : 
       3                 : use once_cell::sync::Lazy;
       4                 : use postgres_ffi::WAL_SEGMENT_SIZE;
       5                 : use safekeeper_api::models::SkTimelineInfo;
       6                 : use serde::{Deserialize, Serialize};
       7                 : use serde_with::{serde_as, DisplayFromStr};
       8                 : use std::collections::{HashMap, HashSet};
       9                 : use std::fmt;
      10                 : use std::str::FromStr;
      11                 : use std::sync::Arc;
      12                 : use storage_broker::proto::SafekeeperTimelineInfo;
      13                 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      14                 : use tokio::fs::File;
      15                 : use tokio::io::AsyncReadExt;
      16                 : use utils::http::endpoint::request_span;
      17                 : 
      18                 : use crate::receive_wal::WalReceiverState;
      19                 : use crate::safekeeper::ServerInfo;
      20                 : use crate::safekeeper::Term;
      21                 : use crate::send_wal::WalSenderState;
      22                 : use crate::timeline::PeerInfo;
      23                 : use crate::{debug_dump, pull_timeline};
      24                 : 
      25                 : use crate::timelines_global_map::TimelineDeleteForceResult;
      26                 : use crate::GlobalTimelines;
      27                 : use crate::SafeKeeperConf;
      28                 : use utils::{
      29                 :     auth::JwtAuth,
      30                 :     http::{
      31                 :         endpoint::{self, auth_middleware, check_permission_with},
      32                 :         error::ApiError,
      33                 :         json::{json_request, json_response},
      34                 :         request::{ensure_no_body, parse_request_param},
      35                 :         RequestExt, RouterBuilder,
      36                 :     },
      37                 :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      38                 :     lsn::Lsn,
      39                 : };
      40                 : 
      41                 : use super::models::TimelineCreateRequest;
      42                 : 
      43 CBC         997 : #[derive(Debug, Serialize)]
      44                 : struct SafekeeperStatus {
      45                 :     id: NodeId,
      46                 : }
      47                 : 
      48                 : /// Healthcheck handler.
      49             997 : async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
      50             997 :     check_permission(&request, None)?;
      51             997 :     let conf = get_conf(&request);
      52             997 :     let status = SafekeeperStatus { id: conf.my_id };
      53             997 :     json_response(StatusCode::OK, status)
      54             997 : }
      55                 : 
      56            1209 : fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
      57            1209 :     request
      58            1209 :         .data::<Arc<SafeKeeperConf>>()
      59            1209 :         .expect("unknown state type")
      60            1209 :         .as_ref()
      61            1209 : }
      62                 : 
      63                 : /// Same as TermSwitchEntry, but serializes LSN using display serializer
      64                 : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
      65                 : #[serde_as]
      66             293 : #[derive(Debug, Serialize, Deserialize)]
      67                 : pub struct TermSwitchApiEntry {
      68                 :     pub term: Term,
      69                 :     #[serde_as(as = "DisplayFromStr")]
      70                 :     pub lsn: Lsn,
      71                 : }
      72                 : 
      73                 : /// Augment AcceptorState with epoch for convenience
      74             213 : #[derive(Debug, Serialize, Deserialize)]
      75                 : pub struct AcceptorStateStatus {
      76                 :     pub term: Term,
      77                 :     pub epoch: Term,
      78                 :     pub term_history: Vec<TermSwitchApiEntry>,
      79                 : }
      80                 : 
      81                 : /// Info about timeline on safekeeper ready for reporting.
      82                 : #[serde_as]
      83             212 : #[derive(Debug, Serialize, Deserialize)]
      84                 : pub struct TimelineStatus {
      85                 :     #[serde_as(as = "DisplayFromStr")]
      86                 :     pub tenant_id: TenantId,
      87                 :     #[serde_as(as = "DisplayFromStr")]
      88                 :     pub timeline_id: TimelineId,
      89                 :     pub acceptor_state: AcceptorStateStatus,
      90                 :     pub pg_info: ServerInfo,
      91                 :     #[serde_as(as = "DisplayFromStr")]
      92                 :     pub flush_lsn: Lsn,
      93                 :     #[serde_as(as = "DisplayFromStr")]
      94                 :     pub timeline_start_lsn: Lsn,
      95                 :     #[serde_as(as = "DisplayFromStr")]
      96                 :     pub local_start_lsn: Lsn,
      97                 :     #[serde_as(as = "DisplayFromStr")]
      98                 :     pub commit_lsn: Lsn,
      99                 :     #[serde_as(as = "DisplayFromStr")]
     100                 :     pub backup_lsn: Lsn,
     101                 :     #[serde_as(as = "DisplayFromStr")]
     102                 :     pub peer_horizon_lsn: Lsn,
     103                 :     #[serde_as(as = "DisplayFromStr")]
     104                 :     pub remote_consistent_lsn: Lsn,
     105                 :     pub peers: Vec<PeerInfo>,
     106                 :     pub walsenders: Vec<WalSenderState>,
     107                 :     pub walreceivers: Vec<WalReceiverState>,
     108                 : }
     109                 : 
     110            1257 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
     111            1257 :     check_permission_with(request, |claims| {
     112              26 :         crate::auth::check_permission(claims, tenant_id)
     113            1257 :     })
     114            1257 : }
     115                 : 
     116                 : /// Report info about timeline.
     117             213 : async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     118             213 :     let ttid = TenantTimelineId::new(
     119             213 :         parse_request_param(&request, "tenant_id")?,
     120             213 :         parse_request_param(&request, "timeline_id")?,
     121                 :     );
     122             213 :     check_permission(&request, Some(ttid.tenant_id))?;
     123                 : 
     124             212 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     125             212 :     let (inmem, state) = tli.get_state().await;
     126             212 :     let flush_lsn = tli.get_flush_lsn().await;
     127                 : 
     128             212 :     let epoch = state.acceptor_state.get_epoch(flush_lsn);
     129             212 :     let term_history = state
     130             212 :         .acceptor_state
     131             212 :         .term_history
     132             212 :         .0
     133             212 :         .into_iter()
     134             292 :         .map(|ts| TermSwitchApiEntry {
     135             292 :             term: ts.term,
     136             292 :             lsn: ts.lsn,
     137             292 :         })
     138             212 :         .collect();
     139             212 :     let acc_state = AcceptorStateStatus {
     140             212 :         term: state.acceptor_state.term,
     141             212 :         epoch,
     142             212 :         term_history,
     143             212 :     };
     144             212 : 
     145             212 :     let conf = get_conf(&request);
     146                 :     // Note: we report in memory values which can be lost.
     147             212 :     let status = TimelineStatus {
     148             212 :         tenant_id: ttid.tenant_id,
     149             212 :         timeline_id: ttid.timeline_id,
     150             212 :         acceptor_state: acc_state,
     151             212 :         pg_info: state.server,
     152             212 :         flush_lsn,
     153             212 :         timeline_start_lsn: state.timeline_start_lsn,
     154             212 :         local_start_lsn: state.local_start_lsn,
     155             212 :         commit_lsn: inmem.commit_lsn,
     156             212 :         backup_lsn: inmem.backup_lsn,
     157             212 :         peer_horizon_lsn: inmem.peer_horizon_lsn,
     158             212 :         remote_consistent_lsn: tli.get_walsenders().get_remote_consistent_lsn(),
     159             212 :         peers: tli.get_peers(conf).await,
     160             212 :         walsenders: tli.get_walsenders().get_all(),
     161             212 :         walreceivers: tli.get_walreceivers().get_all(),
     162             212 :     };
     163             212 :     json_response(StatusCode::OK, status)
     164             213 : }
     165                 : 
     166               9 : async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     167               9 :     let request_data: TimelineCreateRequest = json_request(&mut request).await?;
     168                 : 
     169               9 :     let ttid = TenantTimelineId {
     170               9 :         tenant_id: request_data.tenant_id,
     171               9 :         timeline_id: request_data.timeline_id,
     172               9 :     };
     173               9 :     check_permission(&request, Some(ttid.tenant_id))?;
     174                 : 
     175               9 :     let server_info = ServerInfo {
     176               9 :         pg_version: request_data.pg_version,
     177               9 :         system_id: request_data.system_id.unwrap_or(0),
     178               9 :         wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
     179               9 :     };
     180               9 :     let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
     181               9 :         request_data
     182               9 :             .commit_lsn
     183               9 :             .segment_lsn(server_info.wal_seg_size as usize)
     184               9 :     });
     185               9 :     GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
     186              45 :         .await
     187               9 :         .map_err(ApiError::InternalServerError)?;
     188                 : 
     189               9 :     json_response(StatusCode::OK, ())
     190               9 : }
     191                 : 
     192                 : /// Pull timeline from peer safekeeper instances.
     193               1 : async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     194               1 :     check_permission(&request, None)?;
     195                 : 
     196               1 :     let data: pull_timeline::Request = json_request(&mut request).await?;
     197                 : 
     198               1 :     let resp = pull_timeline::handle_request(data)
     199             113 :         .await
     200               1 :         .map_err(ApiError::InternalServerError)?;
     201               1 :     json_response(StatusCode::OK, resp)
     202               1 : }
     203                 : 
     204                 : /// Download a file from the timeline directory.
     205                 : // TODO: figure out a better way to copy files between safekeepers
     206               3 : async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     207               3 :     let ttid = TenantTimelineId::new(
     208               3 :         parse_request_param(&request, "tenant_id")?,
     209               3 :         parse_request_param(&request, "timeline_id")?,
     210                 :     );
     211               3 :     check_permission(&request, Some(ttid.tenant_id))?;
     212                 : 
     213               3 :     let filename: String = parse_request_param(&request, "filename")?;
     214                 : 
     215               3 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     216                 : 
     217               3 :     let filepath = tli.timeline_dir.join(filename);
     218               3 :     let mut file = File::open(&filepath)
     219               3 :         .await
     220               3 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     221                 : 
     222               3 :     let mut content = Vec::new();
     223               3 :     // TODO: don't store files in memory
     224               3 :     file.read_to_end(&mut content)
     225              55 :         .await
     226               3 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     227                 : 
     228               3 :     Response::builder()
     229               3 :         .status(StatusCode::OK)
     230               3 :         .header("Content-Type", "application/octet-stream")
     231               3 :         .body(Body::from(content))
     232               3 :         .map_err(|e| ApiError::InternalServerError(e.into()))
     233               3 : }
     234                 : 
     235                 : /// Deactivates the timeline and removes its data directory.
     236              18 : async fn timeline_delete_force_handler(
     237              18 :     mut request: Request<Body>,
     238              18 : ) -> Result<Response<Body>, ApiError> {
     239              18 :     let ttid = TenantTimelineId::new(
     240              18 :         parse_request_param(&request, "tenant_id")?,
     241              18 :         parse_request_param(&request, "timeline_id")?,
     242                 :     );
     243              18 :     check_permission(&request, Some(ttid.tenant_id))?;
     244              17 :     ensure_no_body(&mut request).await?;
     245                 :     // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
     246                 :     // error handling here when we're able to.
     247              17 :     let resp = GlobalTimelines::delete_force(&ttid)
     248              15 :         .await
     249              17 :         .map_err(ApiError::InternalServerError)?;
     250              17 :     json_response(StatusCode::OK, resp)
     251              18 : }
     252                 : 
     253                 : /// Deactivates all timelines for the tenant and removes its data directory.
     254                 : /// See `timeline_delete_force_handler`.
     255               5 : async fn tenant_delete_force_handler(
     256               5 :     mut request: Request<Body>,
     257               5 : ) -> Result<Response<Body>, ApiError> {
     258               5 :     let tenant_id = parse_request_param(&request, "tenant_id")?;
     259               5 :     check_permission(&request, Some(tenant_id))?;
     260               4 :     ensure_no_body(&mut request).await?;
     261                 :     // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
     262                 :     // Using an `InternalServerError` should be fixed when the types support it
     263               4 :     let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
     264              16 :         .await
     265               4 :         .map_err(ApiError::InternalServerError)?;
     266               4 :     json_response(
     267               4 :         StatusCode::OK,
     268               4 :         delete_info
     269               4 :             .iter()
     270              16 :             .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
     271               4 :             .collect::<HashMap<String, TimelineDeleteForceResult>>(),
     272               4 :     )
     273               5 : }
     274                 : 
     275                 : /// Used only in tests to hand craft required data.
     276               6 : async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     277               6 :     let ttid = TenantTimelineId::new(
     278               6 :         parse_request_param(&request, "tenant_id")?,
     279               6 :         parse_request_param(&request, "timeline_id")?,
     280                 :     );
     281               6 :     check_permission(&request, Some(ttid.tenant_id))?;
     282               5 :     let sk_info: SkTimelineInfo = json_request(&mut request).await?;
     283               5 :     let proto_sk_info = SafekeeperTimelineInfo {
     284               5 :         safekeeper_id: 0,
     285               5 :         tenant_timeline_id: Some(ProtoTenantTimelineId {
     286               5 :             tenant_id: ttid.tenant_id.as_ref().to_owned(),
     287               5 :             timeline_id: ttid.timeline_id.as_ref().to_owned(),
     288               5 :         }),
     289               5 :         term: sk_info.term.unwrap_or(0),
     290               5 :         last_log_term: sk_info.last_log_term.unwrap_or(0),
     291               5 :         flush_lsn: sk_info.flush_lsn.0,
     292               5 :         commit_lsn: sk_info.commit_lsn.0,
     293               5 :         remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
     294               5 :         peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
     295               5 :         safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
     296               5 :         http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
     297               5 :         backup_lsn: sk_info.backup_lsn.0,
     298               5 :         local_start_lsn: sk_info.local_start_lsn.0,
     299               5 :         availability_zone: None,
     300               5 :     };
     301                 : 
     302               5 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     303               5 :     tli.record_safekeeper_info(proto_sk_info)
     304              15 :         .await
     305               5 :         .map_err(ApiError::InternalServerError)?;
     306                 : 
     307               5 :     json_response(StatusCode::OK, ())
     308               6 : }
     309                 : 
     310               7 : fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
     311               7 :     v.parse()
     312               7 :         .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
     313               7 : }
     314                 : 
     315                 : /// Dump debug info about all available safekeeper state.
     316               5 : async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     317               5 :     check_permission(&request, None)?;
     318               5 :     ensure_no_body(&mut request).await?;
     319                 : 
     320               5 :     let mut dump_all: Option<bool> = None;
     321               5 :     let mut dump_control_file: Option<bool> = None;
     322               5 :     let mut dump_memory: Option<bool> = None;
     323               5 :     let mut dump_disk_content: Option<bool> = None;
     324               5 :     let mut dump_term_history: Option<bool> = None;
     325               5 :     let mut tenant_id: Option<TenantId> = None;
     326               5 :     let mut timeline_id: Option<TimelineId> = None;
     327               5 : 
     328               5 :     let query = request.uri().query().unwrap_or("");
     329               5 :     let mut values = url::form_urlencoded::parse(query.as_bytes());
     330                 : 
     331              12 :     for (k, v) in &mut values {
     332               7 :         match k.as_ref() {
     333               7 :             "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
     334               2 :             "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
     335               2 :             "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
     336               2 :             "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
     337               2 :             "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
     338               2 :             "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
     339               1 :             "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
     340 UBC           0 :             _ => Err(ApiError::BadRequest(anyhow::anyhow!(
     341               0 :                 "Unknown query parameter: {}",
     342               0 :                 k
     343               0 :             )))?,
     344                 :         }
     345                 :     }
     346                 : 
     347 CBC           5 :     let dump_all = dump_all.unwrap_or(false);
     348               5 :     let dump_control_file = dump_control_file.unwrap_or(dump_all);
     349               5 :     let dump_memory = dump_memory.unwrap_or(dump_all);
     350               5 :     let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
     351               5 :     let dump_term_history = dump_term_history.unwrap_or(true);
     352               5 : 
     353               5 :     let args = debug_dump::Args {
     354               5 :         dump_all,
     355               5 :         dump_control_file,
     356               5 :         dump_memory,
     357               5 :         dump_disk_content,
     358               5 :         dump_term_history,
     359               5 :         tenant_id,
     360               5 :         timeline_id,
     361               5 :     };
     362                 : 
     363               5 :     let resp = debug_dump::build(args)
     364 UBC           0 :         .await
     365 CBC           5 :         .map_err(ApiError::InternalServerError)?;
     366                 : 
     367                 :     // TODO: use streaming response
     368               5 :     json_response(StatusCode::OK, resp)
     369               5 : }
     370                 : 
     371                 : /// Safekeeper http router.
     372             500 : pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
     373             500 :     let mut router = endpoint::make_router();
     374             500 :     if conf.http_auth.is_some() {
     375              71 :         router = router.middleware(auth_middleware(|request| {
     376              71 :             #[allow(clippy::mutable_key_type)]
     377              71 :             static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
     378              18 :                 ["/v1/status", "/metrics"]
     379              18 :                     .iter()
     380              36 :                     .map(|v| v.parse().unwrap())
     381              18 :                     .collect()
     382              18 :             });
     383              71 :             if ALLOWLIST_ROUTES.contains(request.uri()) {
     384              40 :                 None
     385                 :             } else {
     386                 :                 // Option<Arc<JwtAuth>> is always provided as data below, hence unwrap().
     387              31 :                 request.data::<Option<Arc<JwtAuth>>>().unwrap().as_deref()
     388                 :             }
     389              71 :         }))
     390             482 :     }
     391                 : 
     392                 :     // NB: on any changes do not forget to update the OpenAPI spec
     393                 :     // located nearby (/safekeeper/src/http/openapi_spec.yaml).
     394             500 :     let auth = conf.http_auth.clone();
     395             500 :     router
     396             500 :         .data(Arc::new(conf))
     397             500 :         .data(auth)
     398             997 :         .get("/v1/status", |r| request_span(r, status_handler))
     399             500 :         // Will be used in the future instead of implicit timeline creation
     400             500 :         .post("/v1/tenant/timeline", |r| {
     401               9 :             request_span(r, timeline_create_handler)
     402             500 :         })
     403             500 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     404             213 :             request_span(r, timeline_status_handler)
     405             500 :         })
     406             500 :         .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     407              18 :             request_span(r, timeline_delete_force_handler)
     408             500 :         })
     409             500 :         .delete("/v1/tenant/:tenant_id", |r| {
     410               5 :             request_span(r, tenant_delete_force_handler)
     411             500 :         })
     412             500 :         .post("/v1/pull_timeline", |r| {
     413               1 :             request_span(r, timeline_pull_handler)
     414             500 :         })
     415             500 :         .get(
     416             500 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
     417             500 :             |r| request_span(r, timeline_files_handler),
     418             500 :         )
     419             500 :         // for tests
     420             500 :         .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
     421               6 :             request_span(r, record_safekeeper_info)
     422             500 :         })
     423             500 :         .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
     424             500 : }
     425                 : 
     426                 : #[cfg(test)]
     427                 : mod tests {
     428                 :     use super::*;
     429                 : 
     430               1 :     #[test]
     431               1 :     fn test_term_switch_entry_api_serialize() {
     432               1 :         let state = AcceptorStateStatus {
     433               1 :             term: 1,
     434               1 :             epoch: 1,
     435               1 :             term_history: vec![TermSwitchApiEntry {
     436               1 :                 term: 1,
     437               1 :                 lsn: Lsn(0x16FFDDDD),
     438               1 :             }],
     439               1 :         };
     440               1 :         let json = serde_json::to_string(&state).unwrap();
     441               1 :         assert_eq!(
     442               1 :             json,
     443               1 :             "{\"term\":1,\"epoch\":1,\"term_history\":[{\"term\":1,\"lsn\":\"0/16FFDDDD\"}]}"
     444               1 :         );
     445               1 :     }
     446                 : }
        

Generated by: LCOV version 2.1-beta