LCOV - differential code coverage report
Current view: top level - proxy/src - http.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 98.1 % 104 102 2 102
Current Date: 2024-01-09 02:06:09 Functions: 72.7 % 22 16 6 16
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! HTTP client and server impls.
       2                 : //! Other modules should use stuff from this module instead of
       3                 : //! directly relying on deps like `reqwest` (think loose coupling).
       4                 : 
       5                 : pub mod health_server;
       6                 : 
       7                 : use std::{sync::Arc, time::Duration};
       8                 : 
       9                 : use futures::FutureExt;
      10                 : pub use reqwest::{Request, Response, StatusCode};
      11                 : pub use reqwest_middleware::{ClientWithMiddleware, Error};
      12                 : pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
      13                 : use tokio::time::Instant;
      14                 : use tracing::trace;
      15                 : 
      16                 : use crate::{metrics::CONSOLE_REQUEST_LATENCY, rate_limiter, url::ApiUrl};
      17                 : use reqwest_middleware::RequestBuilder;
      18                 : 
      19                 : /// This is the preferred way to create new http clients,
      20                 : /// because it takes care of observability (OpenTelemetry).
      21                 : /// We deliberately don't want to replace this with a public static.
      22 CBC           2 : pub fn new_client(rate_limiter_config: rate_limiter::RateLimiterConfig) -> ClientWithMiddleware {
      23               2 :     let client = reqwest::ClientBuilder::new()
      24               2 :         .dns_resolver(Arc::new(GaiResolver::default()))
      25               2 :         .connection_verbose(true)
      26               2 :         .build()
      27               2 :         .expect("Failed to create http client");
      28               2 : 
      29               2 :     reqwest_middleware::ClientBuilder::new(client)
      30               2 :         .with(reqwest_tracing::TracingMiddleware::default())
      31               2 :         .with(rate_limiter::Limiter::new(rate_limiter_config))
      32               2 :         .build()
      33               2 : }
      34                 : 
      35               1 : pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
      36               1 :     let timeout_client = reqwest::ClientBuilder::new()
      37               1 :         .dns_resolver(Arc::new(GaiResolver::default()))
      38               1 :         .connection_verbose(true)
      39               1 :         .timeout(default_timout)
      40               1 :         .build()
      41               1 :         .expect("Failed to create http client with timeout");
      42               1 : 
      43               1 :     let retry_policy =
      44               1 :         ExponentialBackoff::builder().build_with_total_retry_duration(default_timout);
      45               1 : 
      46               1 :     reqwest_middleware::ClientBuilder::new(timeout_client)
      47               1 :         .with(reqwest_tracing::TracingMiddleware::default())
      48               1 :         // As per docs, "This middleware always errors when given requests with streaming bodies".
      49               1 :         // That's all right because we only use this client to send `serde_json::RawValue`, which
      50               1 :         // is not a stream.
      51               1 :         //
      52               1 :         // ex-maintainer note:
      53               1 :         // this limitation can be fixed if streaming is necessary.
      54               1 :         // retries will still not be performed, but it wont error immediately
      55               1 :         .with(RetryTransientMiddleware::new_with_policy(retry_policy))
      56               1 :         .build()
      57               1 : }
      58                 : 
      59                 : /// Thin convenience wrapper for an API provided by an http endpoint.
      60 UBC           0 : #[derive(Debug, Clone)]
      61                 : pub struct Endpoint {
      62                 :     /// API's base URL.
      63                 :     endpoint: ApiUrl,
      64                 :     /// Connection manager with built-in pooling.
      65                 :     client: ClientWithMiddleware,
      66                 : }
      67                 : 
      68                 : impl Endpoint {
      69                 :     /// Construct a new HTTP endpoint wrapper.
      70                 :     /// Http client is not constructed under the hood so that it can be shared.
      71 CBC           3 :     pub fn new(endpoint: ApiUrl, client: impl Into<ClientWithMiddleware>) -> Self {
      72               3 :         Self {
      73               3 :             endpoint,
      74               3 :             client: client.into(),
      75               3 :         }
      76               3 :     }
      77                 : 
      78                 :     #[inline(always)]
      79               1 :     pub fn url(&self) -> &ApiUrl {
      80               1 :         &self.endpoint
      81               1 :     }
      82                 : 
      83                 :     /// Return a [builder](RequestBuilder) for a `GET` request,
      84                 :     /// appending a single `path` segment to the base endpoint URL.
      85               6 :     pub fn get(&self, path: &str) -> RequestBuilder {
      86               6 :         let mut url = self.endpoint.clone();
      87               6 :         url.path_segments_mut().push(path);
      88               6 :         self.client.get(url.into_inner())
      89               6 :     }
      90                 : 
      91                 :     /// Execute a [request](reqwest::Request).
      92               4 :     pub async fn execute(&self, request: Request) -> Result<Response, Error> {
      93               4 :         let path = request.url().path().to_string();
      94               4 :         let start = Instant::now();
      95              12 :         let res = self.client.execute(request).await;
      96               4 :         CONSOLE_REQUEST_LATENCY
      97               4 :             .with_label_values(&[&path])
      98               4 :             .observe(start.elapsed().as_secs_f64());
      99               4 :         res
     100               4 :     }
     101                 : }
     102                 : 
     103                 : /// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
     104                 : use hyper::{
     105                 :     client::connect::dns::{GaiResolver as HyperGaiResolver, Name},
     106                 :     service::Service,
     107                 : };
     108                 : use reqwest::dns::{Addrs, Resolve, Resolving};
     109 UBC           0 : #[derive(Debug)]
     110                 : pub struct GaiResolver(HyperGaiResolver);
     111                 : 
     112                 : impl Default for GaiResolver {
     113 CBC           3 :     fn default() -> Self {
     114               3 :         Self(HyperGaiResolver::new())
     115               3 :     }
     116                 : }
     117                 : 
     118                 : impl Resolve for GaiResolver {
     119               5 :     fn resolve(&self, name: Name) -> Resolving {
     120               5 :         let this = &mut self.0.clone();
     121               5 :         let start = Instant::now();
     122               5 :         Box::pin(
     123               5 :             Service::<Name>::call(this, name.clone()).map(move |result| {
     124               5 :                 let resolve_duration = start.elapsed();
     125               5 :                 trace!(duration = ?resolve_duration, addr = %name, "resolve host complete");
     126               5 :                 result
     127               5 :                     .map(|addrs| -> Addrs { Box::new(addrs) })
     128               5 :                     .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
     129               5 :             }),
     130               5 :         )
     131               5 :     }
     132                 : }
     133                 : 
     134                 : #[cfg(test)]
     135                 : mod tests {
     136                 :     use super::*;
     137                 :     use reqwest::Client;
     138                 : 
     139               1 :     #[test]
     140               1 :     fn optional_query_params() -> anyhow::Result<()> {
     141               1 :         let url = "http://example.com".parse()?;
     142               1 :         let endpoint = Endpoint::new(url, Client::new());
     143                 : 
     144                 :         // Validate that this pattern makes sense.
     145               1 :         let req = endpoint
     146               1 :             .get("frobnicate")
     147               1 :             .query(&[
     148               1 :                 ("foo", Some("10")), // should be just `foo=10`
     149               1 :                 ("bar", None),       // shouldn't be passed at all
     150               1 :             ])
     151               1 :             .build()?;
     152                 : 
     153               1 :         assert_eq!(req.url().as_str(), "http://example.com/frobnicate?foo=10");
     154                 : 
     155               1 :         Ok(())
     156               1 :     }
     157                 : 
     158               1 :     #[test]
     159               1 :     fn uuid_params() -> anyhow::Result<()> {
     160               1 :         let url = "http://example.com".parse()?;
     161               1 :         let endpoint = Endpoint::new(url, Client::new());
     162                 : 
     163               1 :         let req = endpoint
     164               1 :             .get("frobnicate")
     165               1 :             .query(&[("session_id", uuid::Uuid::nil())])
     166               1 :             .build()?;
     167                 : 
     168               1 :         assert_eq!(
     169               1 :             req.url().as_str(),
     170               1 :             "http://example.com/frobnicate?session_id=00000000-0000-0000-0000-000000000000"
     171               1 :         );
     172                 : 
     173               1 :         Ok(())
     174               1 :     }
     175                 : }
        

Generated by: LCOV version 2.1-beta