TLA Line data Source code
1 : //! HTTP client and server impls.
2 : //! Other modules should use stuff from this module instead of
3 : //! directly relying on deps like `reqwest` (think loose coupling).
4 :
5 : pub mod conn_pool;
6 : pub mod server;
7 : pub mod sql_over_http;
8 : pub mod websocket;
9 :
10 : use std::{sync::Arc, time::Duration};
11 :
12 : use futures::FutureExt;
13 : pub use reqwest::{Request, Response, StatusCode};
14 : pub use reqwest_middleware::{ClientWithMiddleware, Error};
15 : pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
16 : use tokio::time::Instant;
17 : use tracing::trace;
18 :
19 : use crate::url::ApiUrl;
20 : use reqwest_middleware::RequestBuilder;
21 :
22 : /// This is the preferred way to create new http clients,
23 : /// because it takes care of observability (OpenTelemetry).
24 : /// We deliberately don't want to replace this with a public static.
25 CBC 1 : pub fn new_client() -> ClientWithMiddleware {
26 1 : let client = reqwest::ClientBuilder::new()
27 1 : .dns_resolver(Arc::new(GaiResolver::default()))
28 1 : .connection_verbose(true)
29 1 : .build()
30 1 : .expect("Failed to create http client");
31 1 :
32 1 : reqwest_middleware::ClientBuilder::new(client)
33 1 : .with(reqwest_tracing::TracingMiddleware::default())
34 1 : .build()
35 1 : }
36 :
37 1 : pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
38 1 : let timeout_client = reqwest::ClientBuilder::new()
39 1 : .dns_resolver(Arc::new(GaiResolver::default()))
40 1 : .connection_verbose(true)
41 1 : .timeout(default_timout)
42 1 : .build()
43 1 : .expect("Failed to create http client with timeout");
44 1 :
45 1 : let retry_policy =
46 1 : ExponentialBackoff::builder().build_with_total_retry_duration(default_timout);
47 1 :
48 1 : reqwest_middleware::ClientBuilder::new(timeout_client)
49 1 : .with(reqwest_tracing::TracingMiddleware::default())
50 1 : // As per docs, "This middleware always errors when given requests with streaming bodies".
51 1 : // That's all right because we only use this client to send `serde_json::RawValue`, which
52 1 : // is not a stream.
53 1 : //
54 1 : // ex-maintainer note:
55 1 : // this limitation can be fixed if streaming is necessary.
56 1 : // retries will still not be performed, but it wont error immediately
57 1 : .with(RetryTransientMiddleware::new_with_policy(retry_policy))
58 1 : .build()
59 1 : }
60 :
61 : /// Thin convenience wrapper for an API provided by an http endpoint.
62 UBC 0 : #[derive(Debug, Clone)]
63 : pub struct Endpoint {
64 : /// API's base URL.
65 : endpoint: ApiUrl,
66 : /// Connection manager with built-in pooling.
67 : client: ClientWithMiddleware,
68 : }
69 :
70 : impl Endpoint {
71 : /// Construct a new HTTP endpoint wrapper.
72 : /// Http client is not constructed under the hood so that it can be shared.
73 CBC 2 : pub fn new(endpoint: ApiUrl, client: impl Into<ClientWithMiddleware>) -> Self {
74 2 : Self {
75 2 : endpoint,
76 2 : client: client.into(),
77 2 : }
78 2 : }
79 :
80 : #[inline(always)]
81 UBC 0 : pub fn url(&self) -> &ApiUrl {
82 0 : &self.endpoint
83 0 : }
84 :
85 : /// Return a [builder](RequestBuilder) for a `GET` request,
86 : /// appending a single `path` segment to the base endpoint URL.
87 CBC 2 : pub fn get(&self, path: &str) -> RequestBuilder {
88 2 : let mut url = self.endpoint.clone();
89 2 : url.path_segments_mut().push(path);
90 2 : self.client.get(url.into_inner())
91 2 : }
92 :
93 : /// Execute a [request](reqwest::Request).
94 UBC 0 : pub async fn execute(&self, request: Request) -> Result<Response, Error> {
95 0 : self.client.execute(request).await
96 0 : }
97 : }
98 :
99 : /// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
100 : use hyper::{
101 : client::connect::dns::{GaiResolver as HyperGaiResolver, Name},
102 : service::Service,
103 : };
104 : use reqwest::dns::{Addrs, Resolve, Resolving};
105 0 : #[derive(Debug)]
106 : pub struct GaiResolver(HyperGaiResolver);
107 :
108 : impl Default for GaiResolver {
109 CBC 2 : fn default() -> Self {
110 2 : Self(HyperGaiResolver::new())
111 2 : }
112 : }
113 :
114 : impl Resolve for GaiResolver {
115 2 : fn resolve(&self, name: Name) -> Resolving {
116 2 : let this = &mut self.0.clone();
117 2 : let start = Instant::now();
118 2 : Box::pin(
119 2 : Service::<Name>::call(this, name.clone()).map(move |result| {
120 2 : let resolve_duration = start.elapsed();
121 2 : trace!(duration = ?resolve_duration, addr = %name, "resolve host complete");
122 2 : result
123 2 : .map(|addrs| -> Addrs { Box::new(addrs) })
124 2 : .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
125 2 : }),
126 2 : )
127 2 : }
128 : }
129 :
130 : #[cfg(test)]
131 : mod tests {
132 : use super::*;
133 : use reqwest::Client;
134 :
135 1 : #[test]
136 1 : fn optional_query_params() -> anyhow::Result<()> {
137 1 : let url = "http://example.com".parse()?;
138 1 : let endpoint = Endpoint::new(url, Client::new());
139 :
140 : // Validate that this pattern makes sense.
141 1 : let req = endpoint
142 1 : .get("frobnicate")
143 1 : .query(&[
144 1 : ("foo", Some("10")), // should be just `foo=10`
145 1 : ("bar", None), // shouldn't be passed at all
146 1 : ])
147 1 : .build()?;
148 :
149 1 : assert_eq!(req.url().as_str(), "http://example.com/frobnicate?foo=10");
150 :
151 1 : Ok(())
152 1 : }
153 :
154 1 : #[test]
155 1 : fn uuid_params() -> anyhow::Result<()> {
156 1 : let url = "http://example.com".parse()?;
157 1 : let endpoint = Endpoint::new(url, Client::new());
158 :
159 1 : let req = endpoint
160 1 : .get("frobnicate")
161 1 : .query(&[("session_id", uuid::Uuid::nil())])
162 1 : .build()?;
163 :
164 1 : assert_eq!(
165 1 : req.url().as_str(),
166 1 : "http://example.com/frobnicate?session_id=00000000-0000-0000-0000-000000000000"
167 1 : );
168 :
169 1 : Ok(())
170 1 : }
171 : }
|