LCOV - code coverage report
Current view: top level - proxy/src - http.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 80.4 % 97 78
Test Date: 2023-09-06 10:18:01 Functions: 50.0 % 22 11

            Line data    Source code
       1              : //! HTTP client and server impls.
       2              : //! Other modules should use stuff from this module instead of
       3              : //! directly relying on deps like `reqwest` (think loose coupling).
       4              : 
       5              : pub mod conn_pool;
       6              : pub mod server;
       7              : pub mod sql_over_http;
       8              : pub mod websocket;
       9              : 
      10              : use std::{sync::Arc, time::Duration};
      11              : 
      12              : use futures::FutureExt;
      13              : pub use reqwest::{Request, Response, StatusCode};
      14              : pub use reqwest_middleware::{ClientWithMiddleware, Error};
      15              : pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
      16              : use tokio::time::Instant;
      17              : use tracing::trace;
      18              : 
      19              : use crate::url::ApiUrl;
      20              : use reqwest_middleware::RequestBuilder;
      21              : 
      22              : /// This is the preferred way to create new http clients,
      23              : /// because it takes care of observability (OpenTelemetry).
      24              : /// We deliberately don't want to replace this with a public static.
      25            0 : pub fn new_client() -> ClientWithMiddleware {
      26            0 :     let client = reqwest::ClientBuilder::new()
      27            0 :         .dns_resolver(Arc::new(GaiResolver::default()))
      28            0 :         .connection_verbose(true)
      29            0 :         .build()
      30            0 :         .expect("Failed to create http client");
      31            0 : 
      32            0 :     reqwest_middleware::ClientBuilder::new(client)
      33            0 :         .with(reqwest_tracing::TracingMiddleware::default())
      34            0 :         .build()
      35            0 : }
      36              : 
      37            1 : pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
      38            1 :     let timeout_client = reqwest::ClientBuilder::new()
      39            1 :         .dns_resolver(Arc::new(GaiResolver::default()))
      40            1 :         .connection_verbose(true)
      41            1 :         .timeout(default_timout)
      42            1 :         .build()
      43            1 :         .expect("Failed to create http client with timeout");
      44            1 : 
      45            1 :     let retry_policy =
      46            1 :         ExponentialBackoff::builder().build_with_total_retry_duration(default_timout);
      47            1 : 
      48            1 :     reqwest_middleware::ClientBuilder::new(timeout_client)
      49            1 :         .with(reqwest_tracing::TracingMiddleware::default())
      50            1 :         // As per docs, "This middleware always errors when given requests with streaming bodies".
      51            1 :         // That's all right because we only use this client to send `serde_json::RawValue`, which
      52            1 :         // is not a stream.
      53            1 :         //
      54            1 :         // ex-maintainer note:
      55            1 :         // this limitation can be fixed if streaming is necessary.
      56            1 :         // retries will still not be performed, but it wont error immediately
      57            1 :         .with(RetryTransientMiddleware::new_with_policy(retry_policy))
      58            1 :         .build()
      59            1 : }
      60              : 
      61              : /// Thin convenience wrapper for an API provided by an http endpoint.
      62            0 : #[derive(Debug, Clone)]
      63              : pub struct Endpoint {
      64              :     /// API's base URL.
      65              :     endpoint: ApiUrl,
      66              :     /// Connection manager with built-in pooling.
      67              :     client: ClientWithMiddleware,
      68              : }
      69              : 
      70              : impl Endpoint {
      71              :     /// Construct a new HTTP endpoint wrapper.
      72              :     /// Http client is not constructed under the hood so that it can be shared.
      73            2 :     pub fn new(endpoint: ApiUrl, client: impl Into<ClientWithMiddleware>) -> Self {
      74            2 :         Self {
      75            2 :             endpoint,
      76            2 :             client: client.into(),
      77            2 :         }
      78            2 :     }
      79              : 
      80              :     #[inline(always)]
      81            0 :     pub fn url(&self) -> &ApiUrl {
      82            0 :         &self.endpoint
      83            0 :     }
      84              : 
      85              :     /// Return a [builder](RequestBuilder) for a `GET` request,
      86              :     /// appending a single `path` segment to the base endpoint URL.
      87            2 :     pub fn get(&self, path: &str) -> RequestBuilder {
      88            2 :         let mut url = self.endpoint.clone();
      89            2 :         url.path_segments_mut().push(path);
      90            2 :         self.client.get(url.into_inner())
      91            2 :     }
      92              : 
      93              :     /// Execute a [request](reqwest::Request).
      94            0 :     pub async fn execute(&self, request: Request) -> Result<Response, Error> {
      95            0 :         self.client.execute(request).await
      96            0 :     }
      97              : }
      98              : 
      99              : /// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
     100              : use hyper::{
     101              :     client::connect::dns::{GaiResolver as HyperGaiResolver, Name},
     102              :     service::Service,
     103              : };
     104              : use reqwest::dns::{Addrs, Resolve, Resolving};
     105            0 : #[derive(Debug)]
     106              : pub struct GaiResolver(HyperGaiResolver);
     107              : 
     108              : impl Default for GaiResolver {
     109            1 :     fn default() -> Self {
     110            1 :         Self(HyperGaiResolver::new())
     111            1 :     }
     112              : }
     113              : 
     114              : impl Resolve for GaiResolver {
     115            2 :     fn resolve(&self, name: Name) -> Resolving {
     116            2 :         let this = &mut self.0.clone();
     117            2 :         let start = Instant::now();
     118            2 :         Box::pin(
     119            2 :             Service::<Name>::call(this, name.clone()).map(move |result| {
     120            2 :                 let resolve_duration = start.elapsed();
     121            2 :                 trace!(duration = ?resolve_duration, addr = %name, "resolve host complete");
     122            2 :                 result
     123            2 :                     .map(|addrs| -> Addrs { Box::new(addrs) })
     124            2 :                     .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
     125            2 :             }),
     126            2 :         )
     127            2 :     }
     128              : }
     129              : 
     130              : #[cfg(test)]
     131              : mod tests {
     132              :     use super::*;
     133              :     use reqwest::Client;
     134              : 
     135            1 :     #[test]
     136            1 :     fn optional_query_params() -> anyhow::Result<()> {
     137            1 :         let url = "http://example.com".parse()?;
     138            1 :         let endpoint = Endpoint::new(url, Client::new());
     139              : 
     140              :         // Validate that this pattern makes sense.
     141            1 :         let req = endpoint
     142            1 :             .get("frobnicate")
     143            1 :             .query(&[
     144            1 :                 ("foo", Some("10")), // should be just `foo=10`
     145            1 :                 ("bar", None),       // shouldn't be passed at all
     146            1 :             ])
     147            1 :             .build()?;
     148              : 
     149            1 :         assert_eq!(req.url().as_str(), "http://example.com/frobnicate?foo=10");
     150              : 
     151            1 :         Ok(())
     152            1 :     }
     153              : 
     154            1 :     #[test]
     155            1 :     fn uuid_params() -> anyhow::Result<()> {
     156            1 :         let url = "http://example.com".parse()?;
     157            1 :         let endpoint = Endpoint::new(url, Client::new());
     158              : 
     159            1 :         let req = endpoint
     160            1 :             .get("frobnicate")
     161            1 :             .query(&[("session_id", uuid::Uuid::nil())])
     162            1 :             .build()?;
     163              : 
     164            1 :         assert_eq!(
     165            1 :             req.url().as_str(),
     166            1 :             "http://example.com/frobnicate?session_id=00000000-0000-0000-0000-000000000000"
     167            1 :         );
     168              : 
     169            1 :         Ok(())
     170            1 :     }
     171              : }
        

Generated by: LCOV version 2.1-beta