LCOV - code coverage report
Current view: top level - safekeeper/src/http - routes.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 98.3 % 286 281
Test Date: 2023-09-06 10:18:01 Functions: 61.0 % 105 64

            Line data    Source code
       1              : use hyper::{Body, Request, Response, StatusCode, Uri};
       2              : 
       3              : use once_cell::sync::Lazy;
       4              : use postgres_ffi::WAL_SEGMENT_SIZE;
       5              : use safekeeper_api::models::SkTimelineInfo;
       6              : use serde::{Deserialize, Serialize};
       7              : use serde_with::{serde_as, DisplayFromStr};
       8              : use std::collections::{HashMap, HashSet};
       9              : use std::fmt;
      10              : use std::str::FromStr;
      11              : use std::sync::Arc;
      12              : use storage_broker::proto::SafekeeperTimelineInfo;
      13              : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      14              : use tokio::fs::File;
      15              : use tokio::io::AsyncReadExt;
      16              : use utils::http::endpoint::request_span;
      17              : 
      18              : use crate::receive_wal::WalReceiverState;
      19              : use crate::safekeeper::ServerInfo;
      20              : use crate::safekeeper::Term;
      21              : use crate::send_wal::WalSenderState;
      22              : use crate::timeline::PeerInfo;
      23              : use crate::{debug_dump, pull_timeline};
      24              : 
      25              : use crate::timelines_global_map::TimelineDeleteForceResult;
      26              : use crate::GlobalTimelines;
      27              : use crate::SafeKeeperConf;
      28              : use utils::{
      29              :     auth::JwtAuth,
      30              :     http::{
      31              :         endpoint::{self, auth_middleware, check_permission_with},
      32              :         error::ApiError,
      33              :         json::{json_request, json_response},
      34              :         request::{ensure_no_body, parse_request_param},
      35              :         RequestExt, RouterBuilder,
      36              :     },
      37              :     id::{NodeId, TenantId, TenantTimelineId, TimelineId},
      38              :     lsn::Lsn,
      39              : };
      40              : 
      41              : use super::models::TimelineCreateRequest;
      42              : 
      43         1033 : #[derive(Debug, Serialize)]
      44              : struct SafekeeperStatus {
      45              :     id: NodeId,
      46              : }
      47              : 
      48              : /// Healthcheck handler.
      49         1033 : async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
      50         1033 :     check_permission(&request, None)?;
      51         1033 :     let conf = get_conf(&request);
      52         1033 :     let status = SafekeeperStatus { id: conf.my_id };
      53         1033 :     json_response(StatusCode::OK, status)
      54         1033 : }
      55              : 
      56         1232 : fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
      57         1232 :     request
      58         1232 :         .data::<Arc<SafeKeeperConf>>()
      59         1232 :         .expect("unknown state type")
      60         1232 :         .as_ref()
      61         1232 : }
      62              : 
      63              : /// Same as TermSwitchEntry, but serializes LSN using display serializer
      64              : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
      65              : #[serde_as]
      66          252 : #[derive(Debug, Serialize, Deserialize)]
      67              : pub struct TermSwitchApiEntry {
      68              :     pub term: Term,
      69              :     #[serde_as(as = "DisplayFromStr")]
      70              :     pub lsn: Lsn,
      71              : }
      72              : 
      73              : /// Augment AcceptorState with epoch for convenience
      74          200 : #[derive(Debug, Serialize, Deserialize)]
      75              : pub struct AcceptorStateStatus {
      76              :     pub term: Term,
      77              :     pub epoch: Term,
      78              :     pub term_history: Vec<TermSwitchApiEntry>,
      79              : }
      80              : 
      81              : /// Info about timeline on safekeeper ready for reporting.
      82              : #[serde_as]
      83          199 : #[derive(Debug, Serialize, Deserialize)]
      84              : pub struct TimelineStatus {
      85              :     #[serde_as(as = "DisplayFromStr")]
      86              :     pub tenant_id: TenantId,
      87              :     #[serde_as(as = "DisplayFromStr")]
      88              :     pub timeline_id: TimelineId,
      89              :     pub acceptor_state: AcceptorStateStatus,
      90              :     pub pg_info: ServerInfo,
      91              :     #[serde_as(as = "DisplayFromStr")]
      92              :     pub flush_lsn: Lsn,
      93              :     #[serde_as(as = "DisplayFromStr")]
      94              :     pub timeline_start_lsn: Lsn,
      95              :     #[serde_as(as = "DisplayFromStr")]
      96              :     pub local_start_lsn: Lsn,
      97              :     #[serde_as(as = "DisplayFromStr")]
      98              :     pub commit_lsn: Lsn,
      99              :     #[serde_as(as = "DisplayFromStr")]
     100              :     pub backup_lsn: Lsn,
     101              :     #[serde_as(as = "DisplayFromStr")]
     102              :     pub peer_horizon_lsn: Lsn,
     103              :     #[serde_as(as = "DisplayFromStr")]
     104              :     pub remote_consistent_lsn: Lsn,
     105              :     pub peers: Vec<PeerInfo>,
     106              :     pub walsenders: Vec<WalSenderState>,
     107              :     pub walreceivers: Vec<WalReceiverState>,
     108              : }
     109              : 
     110         1280 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
     111         1280 :     check_permission_with(request, |claims| {
     112           24 :         crate::auth::check_permission(claims, tenant_id)
     113         1280 :     })
     114         1280 : }
     115              : 
     116              : /// Report info about timeline.
     117          200 : async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     118          200 :     let ttid = TenantTimelineId::new(
     119          200 :         parse_request_param(&request, "tenant_id")?,
     120          200 :         parse_request_param(&request, "timeline_id")?,
     121              :     );
     122          200 :     check_permission(&request, Some(ttid.tenant_id))?;
     123              : 
     124          199 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     125          199 :     let (inmem, state) = tli.get_state().await;
     126          199 :     let flush_lsn = tli.get_flush_lsn().await;
     127              : 
     128          199 :     let epoch = state.acceptor_state.get_epoch(flush_lsn);
     129          199 :     let term_history = state
     130          199 :         .acceptor_state
     131          199 :         .term_history
     132          199 :         .0
     133          199 :         .into_iter()
     134          251 :         .map(|ts| TermSwitchApiEntry {
     135          251 :             term: ts.term,
     136          251 :             lsn: ts.lsn,
     137          251 :         })
     138          199 :         .collect();
     139          199 :     let acc_state = AcceptorStateStatus {
     140          199 :         term: state.acceptor_state.term,
     141          199 :         epoch,
     142          199 :         term_history,
     143          199 :     };
     144          199 : 
     145          199 :     let conf = get_conf(&request);
     146              :     // Note: we report in memory values which can be lost.
     147          199 :     let status = TimelineStatus {
     148          199 :         tenant_id: ttid.tenant_id,
     149          199 :         timeline_id: ttid.timeline_id,
     150          199 :         acceptor_state: acc_state,
     151          199 :         pg_info: state.server,
     152          199 :         flush_lsn,
     153          199 :         timeline_start_lsn: state.timeline_start_lsn,
     154          199 :         local_start_lsn: state.local_start_lsn,
     155          199 :         commit_lsn: inmem.commit_lsn,
     156          199 :         backup_lsn: inmem.backup_lsn,
     157          199 :         peer_horizon_lsn: inmem.peer_horizon_lsn,
     158          199 :         remote_consistent_lsn: tli.get_walsenders().get_remote_consistent_lsn(),
     159          199 :         peers: tli.get_peers(conf).await,
     160          199 :         walsenders: tli.get_walsenders().get_all(),
     161          199 :         walreceivers: tli.get_walreceivers().get_all(),
     162          199 :     };
     163          199 :     json_response(StatusCode::OK, status)
     164          200 : }
     165              : 
     166            9 : async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     167            9 :     let request_data: TimelineCreateRequest = json_request(&mut request).await?;
     168              : 
     169            9 :     let ttid = TenantTimelineId {
     170            9 :         tenant_id: request_data.tenant_id,
     171            9 :         timeline_id: request_data.timeline_id,
     172            9 :     };
     173            9 :     check_permission(&request, Some(ttid.tenant_id))?;
     174              : 
     175            9 :     let server_info = ServerInfo {
     176            9 :         pg_version: request_data.pg_version,
     177            9 :         system_id: request_data.system_id.unwrap_or(0),
     178            9 :         wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
     179            9 :     };
     180            9 :     let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
     181            9 :         request_data
     182            9 :             .commit_lsn
     183            9 :             .segment_lsn(server_info.wal_seg_size as usize)
     184            9 :     });
     185            9 :     GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
     186           46 :         .await
     187            9 :         .map_err(ApiError::InternalServerError)?;
     188              : 
     189            9 :     json_response(StatusCode::OK, ())
     190            9 : }
     191              : 
     192              : /// Pull timeline from peer safekeeper instances.
     193            1 : async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     194            1 :     check_permission(&request, None)?;
     195              : 
     196            1 :     let data: pull_timeline::Request = json_request(&mut request).await?;
     197              : 
     198            1 :     let resp = pull_timeline::handle_request(data)
     199          107 :         .await
     200            1 :         .map_err(ApiError::InternalServerError)?;
     201            1 :     json_response(StatusCode::OK, resp)
     202            1 : }
     203              : 
     204              : /// Download a file from the timeline directory.
     205              : // TODO: figure out a better way to copy files between safekeepers
     206            3 : async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
     207            3 :     let ttid = TenantTimelineId::new(
     208            3 :         parse_request_param(&request, "tenant_id")?,
     209            3 :         parse_request_param(&request, "timeline_id")?,
     210              :     );
     211            3 :     check_permission(&request, Some(ttid.tenant_id))?;
     212              : 
     213            3 :     let filename: String = parse_request_param(&request, "filename")?;
     214              : 
     215            3 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     216              : 
     217            3 :     let filepath = tli.timeline_dir.join(filename);
     218            3 :     let mut file = File::open(&filepath)
     219            3 :         .await
     220            3 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     221              : 
     222            3 :     let mut content = Vec::new();
     223            3 :     // TODO: don't store files in memory
     224            3 :     file.read_to_end(&mut content)
     225           55 :         .await
     226            3 :         .map_err(|e| ApiError::InternalServerError(e.into()))?;
     227              : 
     228            3 :     Response::builder()
     229            3 :         .status(StatusCode::OK)
     230            3 :         .header("Content-Type", "application/octet-stream")
     231            3 :         .body(Body::from(content))
     232            3 :         .map_err(|e| ApiError::InternalServerError(e.into()))
     233            3 : }
     234              : 
     235              : /// Deactivates the timeline and removes its data directory.
     236           18 : async fn timeline_delete_force_handler(
     237           18 :     mut request: Request<Body>,
     238           18 : ) -> Result<Response<Body>, ApiError> {
     239           18 :     let ttid = TenantTimelineId::new(
     240           18 :         parse_request_param(&request, "tenant_id")?,
     241           18 :         parse_request_param(&request, "timeline_id")?,
     242              :     );
     243           18 :     check_permission(&request, Some(ttid.tenant_id))?;
     244           17 :     ensure_no_body(&mut request).await?;
     245              :     // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
     246              :     // error handling here when we're able to.
     247           17 :     let resp = GlobalTimelines::delete_force(&ttid)
     248           15 :         .await
     249           17 :         .map_err(ApiError::InternalServerError)?;
     250           17 :     json_response(StatusCode::OK, resp)
     251           18 : }
     252              : 
     253              : /// Deactivates all timelines for the tenant and removes its data directory.
     254              : /// See `timeline_delete_force_handler`.
     255            5 : async fn tenant_delete_force_handler(
     256            5 :     mut request: Request<Body>,
     257            5 : ) -> Result<Response<Body>, ApiError> {
     258            5 :     let tenant_id = parse_request_param(&request, "tenant_id")?;
     259            5 :     check_permission(&request, Some(tenant_id))?;
     260            4 :     ensure_no_body(&mut request).await?;
     261              :     // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
     262              :     // Using an `InternalServerError` should be fixed when the types support it
     263            4 :     let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
     264           16 :         .await
     265            4 :         .map_err(ApiError::InternalServerError)?;
     266            4 :     json_response(
     267            4 :         StatusCode::OK,
     268            4 :         delete_info
     269            4 :             .iter()
     270           16 :             .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
     271            4 :             .collect::<HashMap<String, TimelineDeleteForceResult>>(),
     272            4 :     )
     273            5 : }
     274              : 
     275              : /// Used only in tests to hand craft required data.
     276            6 : async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     277            6 :     let ttid = TenantTimelineId::new(
     278            6 :         parse_request_param(&request, "tenant_id")?,
     279            6 :         parse_request_param(&request, "timeline_id")?,
     280              :     );
     281            6 :     check_permission(&request, Some(ttid.tenant_id))?;
     282            5 :     let sk_info: SkTimelineInfo = json_request(&mut request).await?;
     283            5 :     let proto_sk_info = SafekeeperTimelineInfo {
     284            5 :         safekeeper_id: 0,
     285            5 :         tenant_timeline_id: Some(ProtoTenantTimelineId {
     286            5 :             tenant_id: ttid.tenant_id.as_ref().to_owned(),
     287            5 :             timeline_id: ttid.timeline_id.as_ref().to_owned(),
     288            5 :         }),
     289            5 :         term: sk_info.term.unwrap_or(0),
     290            5 :         last_log_term: sk_info.last_log_term.unwrap_or(0),
     291            5 :         flush_lsn: sk_info.flush_lsn.0,
     292            5 :         commit_lsn: sk_info.commit_lsn.0,
     293            5 :         remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
     294            5 :         peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
     295            5 :         safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
     296            5 :         http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
     297            5 :         backup_lsn: sk_info.backup_lsn.0,
     298            5 :         local_start_lsn: sk_info.local_start_lsn.0,
     299            5 :         availability_zone: None,
     300            5 :     };
     301              : 
     302            5 :     let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
     303            5 :     tli.record_safekeeper_info(proto_sk_info)
     304           15 :         .await
     305            5 :         .map_err(ApiError::InternalServerError)?;
     306              : 
     307            5 :     json_response(StatusCode::OK, ())
     308            6 : }
     309              : 
     310            7 : fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
     311            7 :     v.parse()
     312            7 :         .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
     313            7 : }
     314              : 
     315              : /// Dump debug info about all available safekeeper state.
     316            5 : async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
     317            5 :     check_permission(&request, None)?;
     318            5 :     ensure_no_body(&mut request).await?;
     319              : 
     320            5 :     let mut dump_all: Option<bool> = None;
     321            5 :     let mut dump_control_file: Option<bool> = None;
     322            5 :     let mut dump_memory: Option<bool> = None;
     323            5 :     let mut dump_disk_content: Option<bool> = None;
     324            5 :     let mut dump_term_history: Option<bool> = None;
     325            5 :     let mut tenant_id: Option<TenantId> = None;
     326            5 :     let mut timeline_id: Option<TimelineId> = None;
     327            5 : 
     328            5 :     let query = request.uri().query().unwrap_or("");
     329            5 :     let mut values = url::form_urlencoded::parse(query.as_bytes());
     330              : 
     331           12 :     for (k, v) in &mut values {
     332            7 :         match k.as_ref() {
     333            7 :             "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
     334            2 :             "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
     335            2 :             "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
     336            2 :             "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
     337            2 :             "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
     338            2 :             "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
     339            1 :             "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
     340            0 :             _ => Err(ApiError::BadRequest(anyhow::anyhow!(
     341            0 :                 "Unknown query parameter: {}",
     342            0 :                 k
     343            0 :             )))?,
     344              :         }
     345              :     }
     346              : 
     347            5 :     let dump_all = dump_all.unwrap_or(false);
     348            5 :     let dump_control_file = dump_control_file.unwrap_or(dump_all);
     349            5 :     let dump_memory = dump_memory.unwrap_or(dump_all);
     350            5 :     let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
     351            5 :     let dump_term_history = dump_term_history.unwrap_or(true);
     352            5 : 
     353            5 :     let args = debug_dump::Args {
     354            5 :         dump_all,
     355            5 :         dump_control_file,
     356            5 :         dump_memory,
     357            5 :         dump_disk_content,
     358            5 :         dump_term_history,
     359            5 :         tenant_id,
     360            5 :         timeline_id,
     361            5 :     };
     362              : 
     363            5 :     let resp = debug_dump::build(args)
     364            0 :         .await
     365            5 :         .map_err(ApiError::InternalServerError)?;
     366              : 
     367              :     // TODO: use streaming response
     368            5 :     json_response(StatusCode::OK, resp)
     369            5 : }
     370              : 
     371              : /// Safekeeper http router.
     372          517 : pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
     373          517 :     let mut router = endpoint::make_router();
     374          517 :     if conf.http_auth.is_some() {
     375           69 :         router = router.middleware(auth_middleware(|request| {
     376           69 :             #[allow(clippy::mutable_key_type)]
     377           69 :             static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
     378           69 :                 Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect());
     379           69 :             if ALLOWLIST_ROUTES.contains(request.uri()) {
     380           40 :                 None
     381              :             } else {
     382              :                 // Option<Arc<JwtAuth>> is always provided as data below, hence unwrap().
     383           29 :                 request.data::<Option<Arc<JwtAuth>>>().unwrap().as_deref()
     384              :             }
     385           69 :         }))
     386          499 :     }
     387              : 
     388              :     // NB: on any changes do not forget to update the OpenAPI spec
     389              :     // located nearby (/safekeeper/src/http/openapi_spec.yaml).
     390          517 :     let auth = conf.http_auth.clone();
     391          517 :     router
     392          517 :         .data(Arc::new(conf))
     393          517 :         .data(auth)
     394         1033 :         .get("/v1/status", |r| request_span(r, status_handler))
     395          517 :         // Will be used in the future instead of implicit timeline creation
     396          517 :         .post("/v1/tenant/timeline", |r| {
     397            9 :             request_span(r, timeline_create_handler)
     398          517 :         })
     399          517 :         .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     400          200 :             request_span(r, timeline_status_handler)
     401          517 :         })
     402          517 :         .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
     403           18 :             request_span(r, timeline_delete_force_handler)
     404          517 :         })
     405          517 :         .delete("/v1/tenant/:tenant_id", |r| {
     406            5 :             request_span(r, tenant_delete_force_handler)
     407          517 :         })
     408          517 :         .post("/v1/pull_timeline", |r| {
     409            1 :             request_span(r, timeline_pull_handler)
     410          517 :         })
     411          517 :         .get(
     412          517 :             "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
     413          517 :             |r| request_span(r, timeline_files_handler),
     414          517 :         )
     415          517 :         // for tests
     416          517 :         .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
     417            6 :             request_span(r, record_safekeeper_info)
     418          517 :         })
     419          517 :         .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
     420          517 : }
     421              : 
     422              : #[cfg(test)]
     423              : mod tests {
     424              :     use super::*;
     425              : 
     426            1 :     #[test]
     427            1 :     fn test_term_switch_entry_api_serialize() {
     428            1 :         let state = AcceptorStateStatus {
     429            1 :             term: 1,
     430            1 :             epoch: 1,
     431            1 :             term_history: vec![TermSwitchApiEntry {
     432            1 :                 term: 1,
     433            1 :                 lsn: Lsn(0x16FFDDDD),
     434            1 :             }],
     435            1 :         };
     436            1 :         let json = serde_json::to_string(&state).unwrap();
     437            1 :         assert_eq!(
     438            1 :             json,
     439            1 :             "{\"term\":1,\"epoch\":1,\"term_history\":[{\"term\":1,\"lsn\":\"0/16FFDDDD\"}]}"
     440            1 :         );
     441            1 :     }
     442              : }
        

Generated by: LCOV version 2.1-beta