Line data Source code
1 : //! HTTP client and server impls.
2 : //! Other modules should use stuff from this module instead of
3 : //! directly relying on deps like `reqwest` (think loose coupling).
4 :
5 : pub mod health_server;
6 :
7 : use std::{str::FromStr, sync::Arc, time::Duration};
8 :
9 : use futures::FutureExt;
10 : pub use reqwest::{Request, Response, StatusCode};
11 : pub use reqwest_middleware::{ClientWithMiddleware, Error};
12 : pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
13 : use tokio::time::Instant;
14 : use tracing::trace;
15 :
16 : use crate::{
17 : metrics::{ConsoleRequest, Metrics},
18 : url::ApiUrl,
19 : };
20 : use reqwest_middleware::RequestBuilder;
21 :
22 : /// This is the preferred way to create new http clients,
23 : /// because it takes care of observability (OpenTelemetry).
24 : /// We deliberately don't want to replace this with a public static.
25 2 : pub fn new_client() -> ClientWithMiddleware {
26 2 : let client = reqwest::ClientBuilder::new()
27 2 : .dns_resolver(Arc::new(GaiResolver::default()))
28 2 : .connection_verbose(true)
29 2 : .build()
30 2 : .expect("Failed to create http client");
31 2 :
32 2 : reqwest_middleware::ClientBuilder::new(client)
33 2 : .with(reqwest_tracing::TracingMiddleware::default())
34 2 : .build()
35 2 : }
36 :
37 0 : pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
38 0 : let timeout_client = reqwest::ClientBuilder::new()
39 0 : .dns_resolver(Arc::new(GaiResolver::default()))
40 0 : .connection_verbose(true)
41 0 : .timeout(default_timout)
42 0 : .build()
43 0 : .expect("Failed to create http client with timeout");
44 0 :
45 0 : let retry_policy =
46 0 : ExponentialBackoff::builder().build_with_total_retry_duration(default_timout);
47 0 :
48 0 : reqwest_middleware::ClientBuilder::new(timeout_client)
49 0 : .with(reqwest_tracing::TracingMiddleware::default())
50 0 : // As per docs, "This middleware always errors when given requests with streaming bodies".
51 0 : // That's all right because we only use this client to send `serde_json::RawValue`, which
52 0 : // is not a stream.
53 0 : //
54 0 : // ex-maintainer note:
55 0 : // this limitation can be fixed if streaming is necessary.
56 0 : // retries will still not be performed, but it wont error immediately
57 0 : .with(RetryTransientMiddleware::new_with_policy(retry_policy))
58 0 : .build()
59 0 : }
60 :
61 : /// Thin convenience wrapper for an API provided by an http endpoint.
62 : #[derive(Debug, Clone)]
63 : pub struct Endpoint {
64 : /// API's base URL.
65 : endpoint: ApiUrl,
66 : /// Connection manager with built-in pooling.
67 : client: ClientWithMiddleware,
68 : }
69 :
70 : impl Endpoint {
71 : /// Construct a new HTTP endpoint wrapper.
72 : /// Http client is not constructed under the hood so that it can be shared.
73 4 : pub fn new(endpoint: ApiUrl, client: impl Into<ClientWithMiddleware>) -> Self {
74 4 : Self {
75 4 : endpoint,
76 4 : client: client.into(),
77 4 : }
78 4 : }
79 :
80 : #[inline(always)]
81 0 : pub fn url(&self) -> &ApiUrl {
82 0 : &self.endpoint
83 0 : }
84 :
85 : /// Return a [builder](RequestBuilder) for a `GET` request,
86 : /// appending a single `path` segment to the base endpoint URL.
87 4 : pub fn get(&self, path: &str) -> RequestBuilder {
88 4 : let mut url = self.endpoint.clone();
89 4 : url.path_segments_mut().push(path);
90 4 : self.client.get(url.into_inner())
91 4 : }
92 :
93 : /// Execute a [request](reqwest::Request).
94 0 : pub async fn execute(&self, request: Request) -> Result<Response, Error> {
95 0 : let _timer = Metrics::get()
96 0 : .proxy
97 0 : .console_request_latency
98 0 : .start_timer(ConsoleRequest {
99 0 : request: request.url().path(),
100 0 : });
101 0 :
102 0 : self.client.execute(request).await
103 0 : }
104 : }
105 :
106 : use hyper_util::client::legacy::connect::dns::{
107 : GaiResolver as HyperGaiResolver, Name as HyperName,
108 : };
109 : use reqwest::dns::{Addrs, Name, Resolve, Resolving};
110 : /// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
111 : use tower_service::Service;
112 : #[derive(Debug)]
113 : pub struct GaiResolver(HyperGaiResolver);
114 :
115 : impl Default for GaiResolver {
116 2 : fn default() -> Self {
117 2 : Self(HyperGaiResolver::new())
118 2 : }
119 : }
120 :
121 : impl Resolve for GaiResolver {
122 0 : fn resolve(&self, name: Name) -> Resolving {
123 0 : let this = &mut self.0.clone();
124 0 : let hyper_name = HyperName::from_str(name.as_str()).expect("name should be valid");
125 0 : let start = Instant::now();
126 0 : Box::pin(
127 0 : Service::<HyperName>::call(this, hyper_name).map(move |result| {
128 0 : let resolve_duration = start.elapsed();
129 0 : trace!(duration = ?resolve_duration, addr = %name.as_str(), "resolve host complete");
130 0 : result
131 0 : .map(|addrs| -> Addrs { Box::new(addrs) })
132 0 : .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
133 0 : }),
134 0 : )
135 0 : }
136 : }
137 :
138 : #[cfg(test)]
139 : mod tests {
140 : use super::*;
141 : use reqwest::Client;
142 :
143 : #[test]
144 2 : fn optional_query_params() -> anyhow::Result<()> {
145 2 : let url = "http://example.com".parse()?;
146 2 : let endpoint = Endpoint::new(url, Client::new());
147 :
148 : // Validate that this pattern makes sense.
149 2 : let req = endpoint
150 2 : .get("frobnicate")
151 2 : .query(&[
152 2 : ("foo", Some("10")), // should be just `foo=10`
153 2 : ("bar", None), // shouldn't be passed at all
154 2 : ])
155 2 : .build()?;
156 :
157 2 : assert_eq!(req.url().as_str(), "http://example.com/frobnicate?foo=10");
158 :
159 2 : Ok(())
160 2 : }
161 :
162 : #[test]
163 2 : fn uuid_params() -> anyhow::Result<()> {
164 2 : let url = "http://example.com".parse()?;
165 2 : let endpoint = Endpoint::new(url, Client::new());
166 :
167 2 : let req = endpoint
168 2 : .get("frobnicate")
169 2 : .query(&[("session_id", uuid::Uuid::nil())])
170 2 : .build()?;
171 :
172 2 : assert_eq!(
173 2 : req.url().as_str(),
174 2 : "http://example.com/frobnicate?session_id=00000000-0000-0000-0000-000000000000"
175 2 : );
176 :
177 2 : Ok(())
178 2 : }
179 : }
|