Line data Source code
1 : //! HTTP client and server impls.
2 : //! Other modules should use stuff from this module instead of
3 : //! directly relying on deps like `reqwest` (think loose coupling).
4 :
5 : pub mod health_server;
6 :
7 : use std::{sync::Arc, time::Duration};
8 :
9 : use futures::FutureExt;
10 : pub use reqwest::{Request, Response, StatusCode};
11 : pub use reqwest_middleware::{ClientWithMiddleware, Error};
12 : pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
13 : use tokio::time::Instant;
14 : use tracing::trace;
15 :
16 : use crate::{metrics::CONSOLE_REQUEST_LATENCY, rate_limiter, url::ApiUrl};
17 : use reqwest_middleware::RequestBuilder;
18 :
19 : /// This is the preferred way to create new http clients,
20 : /// because it takes care of observability (OpenTelemetry).
21 : /// We deliberately don't want to replace this with a public static.
22 3 : pub fn new_client(rate_limiter_config: rate_limiter::RateLimiterConfig) -> ClientWithMiddleware {
23 3 : let client = reqwest::ClientBuilder::new()
24 3 : .dns_resolver(Arc::new(GaiResolver::default()))
25 3 : .connection_verbose(true)
26 3 : .build()
27 3 : .expect("Failed to create http client");
28 3 :
29 3 : reqwest_middleware::ClientBuilder::new(client)
30 3 : .with(reqwest_tracing::TracingMiddleware::default())
31 3 : .with(rate_limiter::Limiter::new(rate_limiter_config))
32 3 : .build()
33 3 : }
34 :
35 1 : pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
36 1 : let timeout_client = reqwest::ClientBuilder::new()
37 1 : .dns_resolver(Arc::new(GaiResolver::default()))
38 1 : .connection_verbose(true)
39 1 : .timeout(default_timout)
40 1 : .build()
41 1 : .expect("Failed to create http client with timeout");
42 1 :
43 1 : let retry_policy =
44 1 : ExponentialBackoff::builder().build_with_total_retry_duration(default_timout);
45 1 :
46 1 : reqwest_middleware::ClientBuilder::new(timeout_client)
47 1 : .with(reqwest_tracing::TracingMiddleware::default())
48 1 : // As per docs, "This middleware always errors when given requests with streaming bodies".
49 1 : // That's all right because we only use this client to send `serde_json::RawValue`, which
50 1 : // is not a stream.
51 1 : //
52 1 : // ex-maintainer note:
53 1 : // this limitation can be fixed if streaming is necessary.
54 1 : // retries will still not be performed, but it wont error immediately
55 1 : .with(RetryTransientMiddleware::new_with_policy(retry_policy))
56 1 : .build()
57 1 : }
58 :
59 : /// Thin convenience wrapper for an API provided by an http endpoint.
60 0 : #[derive(Debug, Clone)]
61 : pub struct Endpoint {
62 : /// API's base URL.
63 : endpoint: ApiUrl,
64 : /// Connection manager with built-in pooling.
65 : client: ClientWithMiddleware,
66 : }
67 :
68 : impl Endpoint {
69 : /// Construct a new HTTP endpoint wrapper.
70 : /// Http client is not constructed under the hood so that it can be shared.
71 5 : pub fn new(endpoint: ApiUrl, client: impl Into<ClientWithMiddleware>) -> Self {
72 5 : Self {
73 5 : endpoint,
74 5 : client: client.into(),
75 5 : }
76 5 : }
77 :
78 : #[inline(always)]
79 1 : pub fn url(&self) -> &ApiUrl {
80 1 : &self.endpoint
81 1 : }
82 :
83 : /// Return a [builder](RequestBuilder) for a `GET` request,
84 : /// appending a single `path` segment to the base endpoint URL.
85 8 : pub fn get(&self, path: &str) -> RequestBuilder {
86 8 : let mut url = self.endpoint.clone();
87 8 : url.path_segments_mut().push(path);
88 8 : self.client.get(url.into_inner())
89 8 : }
90 :
91 : /// Execute a [request](reqwest::Request).
92 4 : pub async fn execute(&self, request: Request) -> Result<Response, Error> {
93 4 : let path = request.url().path().to_string();
94 4 : let start = Instant::now();
95 12 : let res = self.client.execute(request).await;
96 4 : CONSOLE_REQUEST_LATENCY
97 4 : .with_label_values(&[&path])
98 4 : .observe(start.elapsed().as_secs_f64());
99 4 : res
100 4 : }
101 : }
102 :
103 : /// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
104 : use hyper::{
105 : client::connect::dns::{GaiResolver as HyperGaiResolver, Name},
106 : service::Service,
107 : };
108 : use reqwest::dns::{Addrs, Resolve, Resolving};
109 0 : #[derive(Debug)]
110 : pub struct GaiResolver(HyperGaiResolver);
111 :
112 : impl Default for GaiResolver {
113 4 : fn default() -> Self {
114 4 : Self(HyperGaiResolver::new())
115 4 : }
116 : }
117 :
118 : impl Resolve for GaiResolver {
119 5 : fn resolve(&self, name: Name) -> Resolving {
120 5 : let this = &mut self.0.clone();
121 5 : let start = Instant::now();
122 5 : Box::pin(
123 5 : Service::<Name>::call(this, name.clone()).map(move |result| {
124 5 : let resolve_duration = start.elapsed();
125 5 : trace!(duration = ?resolve_duration, addr = %name, "resolve host complete");
126 5 : result
127 5 : .map(|addrs| -> Addrs { Box::new(addrs) })
128 5 : .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
129 5 : }),
130 5 : )
131 5 : }
132 : }
133 :
134 : #[cfg(test)]
135 : mod tests {
136 : use super::*;
137 : use reqwest::Client;
138 :
139 2 : #[test]
140 2 : fn optional_query_params() -> anyhow::Result<()> {
141 2 : let url = "http://example.com".parse()?;
142 2 : let endpoint = Endpoint::new(url, Client::new());
143 :
144 : // Validate that this pattern makes sense.
145 2 : let req = endpoint
146 2 : .get("frobnicate")
147 2 : .query(&[
148 2 : ("foo", Some("10")), // should be just `foo=10`
149 2 : ("bar", None), // shouldn't be passed at all
150 2 : ])
151 2 : .build()?;
152 :
153 2 : assert_eq!(req.url().as_str(), "http://example.com/frobnicate?foo=10");
154 :
155 2 : Ok(())
156 2 : }
157 :
158 2 : #[test]
159 2 : fn uuid_params() -> anyhow::Result<()> {
160 2 : let url = "http://example.com".parse()?;
161 2 : let endpoint = Endpoint::new(url, Client::new());
162 :
163 2 : let req = endpoint
164 2 : .get("frobnicate")
165 2 : .query(&[("session_id", uuid::Uuid::nil())])
166 2 : .build()?;
167 :
168 2 : assert_eq!(
169 2 : req.url().as_str(),
170 2 : "http://example.com/frobnicate?session_id=00000000-0000-0000-0000-000000000000"
171 2 : );
172 :
173 2 : Ok(())
174 2 : }
175 : }
|