LCOV - code coverage report
Current view: top level - proxy/src - http.rs (source / functions) Coverage Total Hit
Test: 02e8c57acd6e2b986849f552ca30280d54699b79.info Lines: 50.5 % 101 51
Test Date: 2024-06-26 17:13:54 Functions: 37.5 % 16 6

            Line data    Source code
       1              : //! HTTP client and server impls.
       2              : //! Other modules should use stuff from this module instead of
       3              : //! directly relying on deps like `reqwest` (think loose coupling).
       4              : 
       5              : pub mod health_server;
       6              : 
       7              : use std::{str::FromStr, sync::Arc, time::Duration};
       8              : 
       9              : use futures::FutureExt;
      10              : pub use reqwest::{Request, Response, StatusCode};
      11              : pub use reqwest_middleware::{ClientWithMiddleware, Error};
      12              : pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
      13              : use tokio::time::Instant;
      14              : use tracing::trace;
      15              : 
      16              : use crate::{
      17              :     metrics::{ConsoleRequest, Metrics},
      18              :     url::ApiUrl,
      19              : };
      20              : use reqwest_middleware::RequestBuilder;
      21              : 
      22              : /// This is the preferred way to create new http clients,
      23              : /// because it takes care of observability (OpenTelemetry).
      24              : /// We deliberately don't want to replace this with a public static.
      25            2 : pub fn new_client() -> ClientWithMiddleware {
      26            2 :     let client = reqwest::ClientBuilder::new()
      27            2 :         .dns_resolver(Arc::new(GaiResolver::default()))
      28            2 :         .connection_verbose(true)
      29            2 :         .build()
      30            2 :         .expect("Failed to create http client");
      31            2 : 
      32            2 :     reqwest_middleware::ClientBuilder::new(client)
      33            2 :         .with(reqwest_tracing::TracingMiddleware::default())
      34            2 :         .build()
      35            2 : }
      36              : 
      37            0 : pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
      38            0 :     let timeout_client = reqwest::ClientBuilder::new()
      39            0 :         .dns_resolver(Arc::new(GaiResolver::default()))
      40            0 :         .connection_verbose(true)
      41            0 :         .timeout(default_timout)
      42            0 :         .build()
      43            0 :         .expect("Failed to create http client with timeout");
      44            0 : 
      45            0 :     let retry_policy =
      46            0 :         ExponentialBackoff::builder().build_with_total_retry_duration(default_timout);
      47            0 : 
      48            0 :     reqwest_middleware::ClientBuilder::new(timeout_client)
      49            0 :         .with(reqwest_tracing::TracingMiddleware::default())
      50            0 :         // As per docs, "This middleware always errors when given requests with streaming bodies".
      51            0 :         // That's all right because we only use this client to send `serde_json::RawValue`, which
      52            0 :         // is not a stream.
      53            0 :         //
      54            0 :         // ex-maintainer note:
      55            0 :         // this limitation can be fixed if streaming is necessary.
      56            0 :         // retries will still not be performed, but it wont error immediately
      57            0 :         .with(RetryTransientMiddleware::new_with_policy(retry_policy))
      58            0 :         .build()
      59            0 : }
      60              : 
      61              : /// Thin convenience wrapper for an API provided by an http endpoint.
      62              : #[derive(Debug, Clone)]
      63              : pub struct Endpoint {
      64              :     /// API's base URL.
      65              :     endpoint: ApiUrl,
      66              :     /// Connection manager with built-in pooling.
      67              :     client: ClientWithMiddleware,
      68              : }
      69              : 
      70              : impl Endpoint {
      71              :     /// Construct a new HTTP endpoint wrapper.
      72              :     /// Http client is not constructed under the hood so that it can be shared.
      73            4 :     pub fn new(endpoint: ApiUrl, client: impl Into<ClientWithMiddleware>) -> Self {
      74            4 :         Self {
      75            4 :             endpoint,
      76            4 :             client: client.into(),
      77            4 :         }
      78            4 :     }
      79              : 
      80              :     #[inline(always)]
      81            0 :     pub fn url(&self) -> &ApiUrl {
      82            0 :         &self.endpoint
      83            0 :     }
      84              : 
      85              :     /// Return a [builder](RequestBuilder) for a `GET` request,
      86              :     /// appending a single `path` segment to the base endpoint URL.
      87            4 :     pub fn get(&self, path: &str) -> RequestBuilder {
      88            4 :         let mut url = self.endpoint.clone();
      89            4 :         url.path_segments_mut().push(path);
      90            4 :         self.client.get(url.into_inner())
      91            4 :     }
      92              : 
      93              :     /// Execute a [request](reqwest::Request).
      94            0 :     pub async fn execute(&self, request: Request) -> Result<Response, Error> {
      95            0 :         let _timer = Metrics::get()
      96            0 :             .proxy
      97            0 :             .console_request_latency
      98            0 :             .start_timer(ConsoleRequest {
      99            0 :                 request: request.url().path(),
     100            0 :             });
     101            0 : 
     102            0 :         self.client.execute(request).await
     103            0 :     }
     104              : }
     105              : 
     106              : use hyper_util::client::legacy::connect::dns::{
     107              :     GaiResolver as HyperGaiResolver, Name as HyperName,
     108              : };
     109              : use reqwest::dns::{Addrs, Name, Resolve, Resolving};
     110              : /// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
     111              : use tower_service::Service;
     112              : #[derive(Debug)]
     113              : pub struct GaiResolver(HyperGaiResolver);
     114              : 
     115              : impl Default for GaiResolver {
     116            2 :     fn default() -> Self {
     117            2 :         Self(HyperGaiResolver::new())
     118            2 :     }
     119              : }
     120              : 
     121              : impl Resolve for GaiResolver {
     122            0 :     fn resolve(&self, name: Name) -> Resolving {
     123            0 :         let this = &mut self.0.clone();
     124            0 :         let hyper_name = HyperName::from_str(name.as_str()).expect("name should be valid");
     125            0 :         let start = Instant::now();
     126            0 :         Box::pin(
     127            0 :             Service::<HyperName>::call(this, hyper_name).map(move |result| {
     128            0 :                 let resolve_duration = start.elapsed();
     129            0 :                 trace!(duration = ?resolve_duration, addr = %name.as_str(), "resolve host complete");
     130            0 :                 result
     131            0 :                     .map(|addrs| -> Addrs { Box::new(addrs) })
     132            0 :                     .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
     133            0 :             }),
     134            0 :         )
     135            0 :     }
     136              : }
     137              : 
     138              : #[cfg(test)]
     139              : mod tests {
     140              :     use super::*;
     141              :     use reqwest::Client;
     142              : 
     143              :     #[test]
     144            2 :     fn optional_query_params() -> anyhow::Result<()> {
     145            2 :         let url = "http://example.com".parse()?;
     146            2 :         let endpoint = Endpoint::new(url, Client::new());
     147              : 
     148              :         // Validate that this pattern makes sense.
     149            2 :         let req = endpoint
     150            2 :             .get("frobnicate")
     151            2 :             .query(&[
     152            2 :                 ("foo", Some("10")), // should be just `foo=10`
     153            2 :                 ("bar", None),       // shouldn't be passed at all
     154            2 :             ])
     155            2 :             .build()?;
     156              : 
     157            2 :         assert_eq!(req.url().as_str(), "http://example.com/frobnicate?foo=10");
     158              : 
     159            2 :         Ok(())
     160            2 :     }
     161              : 
     162              :     #[test]
     163            2 :     fn uuid_params() -> anyhow::Result<()> {
     164            2 :         let url = "http://example.com".parse()?;
     165            2 :         let endpoint = Endpoint::new(url, Client::new());
     166              : 
     167            2 :         let req = endpoint
     168            2 :             .get("frobnicate")
     169            2 :             .query(&[("session_id", uuid::Uuid::nil())])
     170            2 :             .build()?;
     171              : 
     172            2 :         assert_eq!(
     173            2 :             req.url().as_str(),
     174            2 :             "http://example.com/frobnicate?session_id=00000000-0000-0000-0000-000000000000"
     175            2 :         );
     176              : 
     177            2 :         Ok(())
     178            2 :     }
     179              : }
        

Generated by: LCOV version 2.1-beta