LCOV - differential code coverage report
Current view: top level - proxy/src - http.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 91.8 % 97 89 8 89
Current Date: 2023-10-19 02:04:12 Functions: 54.5 % 22 12 10 12
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! HTTP client and server impls.
       2                 : //! Other modules should use stuff from this module instead of
       3                 : //! directly relying on deps like `reqwest` (think loose coupling).
       4                 : 
       5                 : pub mod conn_pool;
       6                 : pub mod server;
       7                 : pub mod sql_over_http;
       8                 : pub mod websocket;
       9                 : 
      10                 : use std::{sync::Arc, time::Duration};
      11                 : 
      12                 : use futures::FutureExt;
      13                 : pub use reqwest::{Request, Response, StatusCode};
      14                 : pub use reqwest_middleware::{ClientWithMiddleware, Error};
      15                 : pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
      16                 : use tokio::time::Instant;
      17                 : use tracing::trace;
      18                 : 
      19                 : use crate::url::ApiUrl;
      20                 : use reqwest_middleware::RequestBuilder;
      21                 : 
      22                 : /// This is the preferred way to create new http clients,
      23                 : /// because it takes care of observability (OpenTelemetry).
      24                 : /// We deliberately don't want to replace this with a public static.
      25 CBC           1 : pub fn new_client() -> ClientWithMiddleware {
      26               1 :     let client = reqwest::ClientBuilder::new()
      27               1 :         .dns_resolver(Arc::new(GaiResolver::default()))
      28               1 :         .connection_verbose(true)
      29               1 :         .build()
      30               1 :         .expect("Failed to create http client");
      31               1 : 
      32               1 :     reqwest_middleware::ClientBuilder::new(client)
      33               1 :         .with(reqwest_tracing::TracingMiddleware::default())
      34               1 :         .build()
      35               1 : }
      36                 : 
      37               1 : pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
      38               1 :     let timeout_client = reqwest::ClientBuilder::new()
      39               1 :         .dns_resolver(Arc::new(GaiResolver::default()))
      40               1 :         .connection_verbose(true)
      41               1 :         .timeout(default_timout)
      42               1 :         .build()
      43               1 :         .expect("Failed to create http client with timeout");
      44               1 : 
      45               1 :     let retry_policy =
      46               1 :         ExponentialBackoff::builder().build_with_total_retry_duration(default_timout);
      47               1 : 
      48               1 :     reqwest_middleware::ClientBuilder::new(timeout_client)
      49               1 :         .with(reqwest_tracing::TracingMiddleware::default())
      50               1 :         // As per docs, "This middleware always errors when given requests with streaming bodies".
      51               1 :         // That's all right because we only use this client to send `serde_json::RawValue`, which
      52               1 :         // is not a stream.
      53               1 :         //
      54               1 :         // ex-maintainer note:
      55               1 :         // this limitation can be fixed if streaming is necessary.
      56               1 :         // retries will still not be performed, but it wont error immediately
      57               1 :         .with(RetryTransientMiddleware::new_with_policy(retry_policy))
      58               1 :         .build()
      59               1 : }
      60                 : 
      61                 : /// Thin convenience wrapper for an API provided by an http endpoint.
      62 UBC           0 : #[derive(Debug, Clone)]
      63                 : pub struct Endpoint {
      64                 :     /// API's base URL.
      65                 :     endpoint: ApiUrl,
      66                 :     /// Connection manager with built-in pooling.
      67                 :     client: ClientWithMiddleware,
      68                 : }
      69                 : 
      70                 : impl Endpoint {
      71                 :     /// Construct a new HTTP endpoint wrapper.
      72                 :     /// Http client is not constructed under the hood so that it can be shared.
      73 CBC           2 :     pub fn new(endpoint: ApiUrl, client: impl Into<ClientWithMiddleware>) -> Self {
      74               2 :         Self {
      75               2 :             endpoint,
      76               2 :             client: client.into(),
      77               2 :         }
      78               2 :     }
      79                 : 
      80                 :     #[inline(always)]
      81 UBC           0 :     pub fn url(&self) -> &ApiUrl {
      82               0 :         &self.endpoint
      83               0 :     }
      84                 : 
      85                 :     /// Return a [builder](RequestBuilder) for a `GET` request,
      86                 :     /// appending a single `path` segment to the base endpoint URL.
      87 CBC           2 :     pub fn get(&self, path: &str) -> RequestBuilder {
      88               2 :         let mut url = self.endpoint.clone();
      89               2 :         url.path_segments_mut().push(path);
      90               2 :         self.client.get(url.into_inner())
      91               2 :     }
      92                 : 
      93                 :     /// Execute a [request](reqwest::Request).
      94 UBC           0 :     pub async fn execute(&self, request: Request) -> Result<Response, Error> {
      95               0 :         self.client.execute(request).await
      96               0 :     }
      97                 : }
      98                 : 
      99                 : /// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
     100                 : use hyper::{
     101                 :     client::connect::dns::{GaiResolver as HyperGaiResolver, Name},
     102                 :     service::Service,
     103                 : };
     104                 : use reqwest::dns::{Addrs, Resolve, Resolving};
     105               0 : #[derive(Debug)]
     106                 : pub struct GaiResolver(HyperGaiResolver);
     107                 : 
     108                 : impl Default for GaiResolver {
     109 CBC           2 :     fn default() -> Self {
     110               2 :         Self(HyperGaiResolver::new())
     111               2 :     }
     112                 : }
     113                 : 
     114                 : impl Resolve for GaiResolver {
     115               2 :     fn resolve(&self, name: Name) -> Resolving {
     116               2 :         let this = &mut self.0.clone();
     117               2 :         let start = Instant::now();
     118               2 :         Box::pin(
     119               2 :             Service::<Name>::call(this, name.clone()).map(move |result| {
     120               2 :                 let resolve_duration = start.elapsed();
     121               2 :                 trace!(duration = ?resolve_duration, addr = %name, "resolve host complete");
     122               2 :                 result
     123               2 :                     .map(|addrs| -> Addrs { Box::new(addrs) })
     124               2 :                     .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
     125               2 :             }),
     126               2 :         )
     127               2 :     }
     128                 : }
     129                 : 
     130                 : #[cfg(test)]
     131                 : mod tests {
     132                 :     use super::*;
     133                 :     use reqwest::Client;
     134                 : 
     135               1 :     #[test]
     136               1 :     fn optional_query_params() -> anyhow::Result<()> {
     137               1 :         let url = "http://example.com".parse()?;
     138               1 :         let endpoint = Endpoint::new(url, Client::new());
     139                 : 
     140                 :         // Validate that this pattern makes sense.
     141               1 :         let req = endpoint
     142               1 :             .get("frobnicate")
     143               1 :             .query(&[
     144               1 :                 ("foo", Some("10")), // should be just `foo=10`
     145               1 :                 ("bar", None),       // shouldn't be passed at all
     146               1 :             ])
     147               1 :             .build()?;
     148                 : 
     149               1 :         assert_eq!(req.url().as_str(), "http://example.com/frobnicate?foo=10");
     150                 : 
     151               1 :         Ok(())
     152               1 :     }
     153                 : 
     154               1 :     #[test]
     155               1 :     fn uuid_params() -> anyhow::Result<()> {
     156               1 :         let url = "http://example.com".parse()?;
     157               1 :         let endpoint = Endpoint::new(url, Client::new());
     158                 : 
     159               1 :         let req = endpoint
     160               1 :             .get("frobnicate")
     161               1 :             .query(&[("session_id", uuid::Uuid::nil())])
     162               1 :             .build()?;
     163                 : 
     164               1 :         assert_eq!(
     165               1 :             req.url().as_str(),
     166               1 :             "http://example.com/frobnicate?session_id=00000000-0000-0000-0000-000000000000"
     167               1 :         );
     168                 : 
     169               1 :         Ok(())
     170               1 :     }
     171                 : }
        

Generated by: LCOV version 2.1-beta