LCOV - code coverage report
Current view: top level - proxy/src - http.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 98.1 % 104 102
Test Date: 2024-02-07 07:37:29 Functions: 72.7 % 22 16

            Line data    Source code
       1              : //! HTTP client and server impls.
       2              : //! Other modules should use stuff from this module instead of
       3              : //! directly relying on deps like `reqwest` (think loose coupling).
       4              : 
       5              : pub mod health_server;
       6              : 
       7              : use std::{sync::Arc, time::Duration};
       8              : 
       9              : use futures::FutureExt;
      10              : pub use reqwest::{Request, Response, StatusCode};
      11              : pub use reqwest_middleware::{ClientWithMiddleware, Error};
      12              : pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
      13              : use tokio::time::Instant;
      14              : use tracing::trace;
      15              : 
      16              : use crate::{metrics::CONSOLE_REQUEST_LATENCY, rate_limiter, url::ApiUrl};
      17              : use reqwest_middleware::RequestBuilder;
      18              : 
      19              : /// This is the preferred way to create new http clients,
      20              : /// because it takes care of observability (OpenTelemetry).
      21              : /// We deliberately don't want to replace this with a public static.
      22            3 : pub fn new_client(rate_limiter_config: rate_limiter::RateLimiterConfig) -> ClientWithMiddleware {
      23            3 :     let client = reqwest::ClientBuilder::new()
      24            3 :         .dns_resolver(Arc::new(GaiResolver::default()))
      25            3 :         .connection_verbose(true)
      26            3 :         .build()
      27            3 :         .expect("Failed to create http client");
      28            3 : 
      29            3 :     reqwest_middleware::ClientBuilder::new(client)
      30            3 :         .with(reqwest_tracing::TracingMiddleware::default())
      31            3 :         .with(rate_limiter::Limiter::new(rate_limiter_config))
      32            3 :         .build()
      33            3 : }
      34              : 
      35            1 : pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
      36            1 :     let timeout_client = reqwest::ClientBuilder::new()
      37            1 :         .dns_resolver(Arc::new(GaiResolver::default()))
      38            1 :         .connection_verbose(true)
      39            1 :         .timeout(default_timout)
      40            1 :         .build()
      41            1 :         .expect("Failed to create http client with timeout");
      42            1 : 
      43            1 :     let retry_policy =
      44            1 :         ExponentialBackoff::builder().build_with_total_retry_duration(default_timout);
      45            1 : 
      46            1 :     reqwest_middleware::ClientBuilder::new(timeout_client)
      47            1 :         .with(reqwest_tracing::TracingMiddleware::default())
      48            1 :         // As per docs, "This middleware always errors when given requests with streaming bodies".
      49            1 :         // That's all right because we only use this client to send `serde_json::RawValue`, which
      50            1 :         // is not a stream.
      51            1 :         //
      52            1 :         // ex-maintainer note:
      53            1 :         // this limitation can be fixed if streaming is necessary.
      54            1 :         // retries will still not be performed, but it wont error immediately
      55            1 :         .with(RetryTransientMiddleware::new_with_policy(retry_policy))
      56            1 :         .build()
      57            1 : }
      58              : 
      59              : /// Thin convenience wrapper for an API provided by an http endpoint.
      60            0 : #[derive(Debug, Clone)]
      61              : pub struct Endpoint {
      62              :     /// API's base URL.
      63              :     endpoint: ApiUrl,
      64              :     /// Connection manager with built-in pooling.
      65              :     client: ClientWithMiddleware,
      66              : }
      67              : 
      68              : impl Endpoint {
      69              :     /// Construct a new HTTP endpoint wrapper.
      70              :     /// Http client is not constructed under the hood so that it can be shared.
      71            5 :     pub fn new(endpoint: ApiUrl, client: impl Into<ClientWithMiddleware>) -> Self {
      72            5 :         Self {
      73            5 :             endpoint,
      74            5 :             client: client.into(),
      75            5 :         }
      76            5 :     }
      77              : 
      78              :     #[inline(always)]
      79            1 :     pub fn url(&self) -> &ApiUrl {
      80            1 :         &self.endpoint
      81            1 :     }
      82              : 
      83              :     /// Return a [builder](RequestBuilder) for a `GET` request,
      84              :     /// appending a single `path` segment to the base endpoint URL.
      85            8 :     pub fn get(&self, path: &str) -> RequestBuilder {
      86            8 :         let mut url = self.endpoint.clone();
      87            8 :         url.path_segments_mut().push(path);
      88            8 :         self.client.get(url.into_inner())
      89            8 :     }
      90              : 
      91              :     /// Execute a [request](reqwest::Request).
      92            4 :     pub async fn execute(&self, request: Request) -> Result<Response, Error> {
      93            4 :         let path = request.url().path().to_string();
      94            4 :         let start = Instant::now();
      95           12 :         let res = self.client.execute(request).await;
      96            4 :         CONSOLE_REQUEST_LATENCY
      97            4 :             .with_label_values(&[&path])
      98            4 :             .observe(start.elapsed().as_secs_f64());
      99            4 :         res
     100            4 :     }
     101              : }
     102              : 
     103              : /// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
     104              : use hyper::{
     105              :     client::connect::dns::{GaiResolver as HyperGaiResolver, Name},
     106              :     service::Service,
     107              : };
     108              : use reqwest::dns::{Addrs, Resolve, Resolving};
     109            0 : #[derive(Debug)]
     110              : pub struct GaiResolver(HyperGaiResolver);
     111              : 
     112              : impl Default for GaiResolver {
     113            4 :     fn default() -> Self {
     114            4 :         Self(HyperGaiResolver::new())
     115            4 :     }
     116              : }
     117              : 
     118              : impl Resolve for GaiResolver {
     119            5 :     fn resolve(&self, name: Name) -> Resolving {
     120            5 :         let this = &mut self.0.clone();
     121            5 :         let start = Instant::now();
     122            5 :         Box::pin(
     123            5 :             Service::<Name>::call(this, name.clone()).map(move |result| {
     124            5 :                 let resolve_duration = start.elapsed();
     125            5 :                 trace!(duration = ?resolve_duration, addr = %name, "resolve host complete");
     126            5 :                 result
     127            5 :                     .map(|addrs| -> Addrs { Box::new(addrs) })
     128            5 :                     .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
     129            5 :             }),
     130            5 :         )
     131            5 :     }
     132              : }
     133              : 
     134              : #[cfg(test)]
     135              : mod tests {
     136              :     use super::*;
     137              :     use reqwest::Client;
     138              : 
     139            2 :     #[test]
     140            2 :     fn optional_query_params() -> anyhow::Result<()> {
     141            2 :         let url = "http://example.com".parse()?;
     142            2 :         let endpoint = Endpoint::new(url, Client::new());
     143              : 
     144              :         // Validate that this pattern makes sense.
     145            2 :         let req = endpoint
     146            2 :             .get("frobnicate")
     147            2 :             .query(&[
     148            2 :                 ("foo", Some("10")), // should be just `foo=10`
     149            2 :                 ("bar", None),       // shouldn't be passed at all
     150            2 :             ])
     151            2 :             .build()?;
     152              : 
     153            2 :         assert_eq!(req.url().as_str(), "http://example.com/frobnicate?foo=10");
     154              : 
     155            2 :         Ok(())
     156            2 :     }
     157              : 
     158            2 :     #[test]
     159            2 :     fn uuid_params() -> anyhow::Result<()> {
     160            2 :         let url = "http://example.com".parse()?;
     161            2 :         let endpoint = Endpoint::new(url, Client::new());
     162              : 
     163            2 :         let req = endpoint
     164            2 :             .get("frobnicate")
     165            2 :             .query(&[("session_id", uuid::Uuid::nil())])
     166            2 :             .build()?;
     167              : 
     168            2 :         assert_eq!(
     169            2 :             req.url().as_str(),
     170            2 :             "http://example.com/frobnicate?session_id=00000000-0000-0000-0000-000000000000"
     171            2 :         );
     172              : 
     173            2 :         Ok(())
     174            2 :     }
     175              : }
        

Generated by: LCOV version 2.1-beta