Line data Source code
1 : //! HTTP client and server impls.
2 : //! Other modules should use stuff from this module instead of
3 : //! directly relying on deps like `reqwest` (think loose coupling).
4 :
5 : pub mod health_server;
6 :
7 : use std::time::Duration;
8 :
9 : use anyhow::bail;
10 : use bytes::Bytes;
11 : use http_body_util::BodyExt;
12 : use hyper1::body::Body;
13 : use serde::de::DeserializeOwned;
14 :
15 : pub(crate) use reqwest::{Request, Response};
16 : pub(crate) use reqwest_middleware::{ClientWithMiddleware, Error};
17 : pub(crate) use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
18 :
19 : use crate::{
20 : metrics::{ConsoleRequest, Metrics},
21 : url::ApiUrl,
22 : };
23 : use reqwest_middleware::RequestBuilder;
24 :
25 : /// This is the preferred way to create new http clients,
26 : /// because it takes care of observability (OpenTelemetry).
27 : /// We deliberately don't want to replace this with a public static.
28 1 : pub fn new_client() -> ClientWithMiddleware {
29 1 : let client = reqwest::ClientBuilder::new()
30 1 : .build()
31 1 : .expect("Failed to create http client");
32 1 :
33 1 : reqwest_middleware::ClientBuilder::new(client)
34 1 : .with(reqwest_tracing::TracingMiddleware::default())
35 1 : .build()
36 1 : }
37 :
38 0 : pub(crate) fn new_client_with_timeout(
39 0 : request_timeout: Duration,
40 0 : total_retry_duration: Duration,
41 0 : ) -> ClientWithMiddleware {
42 0 : let timeout_client = reqwest::ClientBuilder::new()
43 0 : .timeout(request_timeout)
44 0 : .build()
45 0 : .expect("Failed to create http client with timeout");
46 0 :
47 0 : let retry_policy =
48 0 : ExponentialBackoff::builder().build_with_total_retry_duration(total_retry_duration);
49 0 :
50 0 : reqwest_middleware::ClientBuilder::new(timeout_client)
51 0 : .with(reqwest_tracing::TracingMiddleware::default())
52 0 : // As per docs, "This middleware always errors when given requests with streaming bodies".
53 0 : // That's all right because we only use this client to send `serde_json::RawValue`, which
54 0 : // is not a stream.
55 0 : //
56 0 : // ex-maintainer note:
57 0 : // this limitation can be fixed if streaming is necessary.
58 0 : // retries will still not be performed, but it wont error immediately
59 0 : .with(RetryTransientMiddleware::new_with_policy(retry_policy))
60 0 : .build()
61 0 : }
62 :
63 : /// Thin convenience wrapper for an API provided by an http endpoint.
64 : #[derive(Debug, Clone)]
65 : pub struct Endpoint {
66 : /// API's base URL.
67 : endpoint: ApiUrl,
68 : /// Connection manager with built-in pooling.
69 : client: ClientWithMiddleware,
70 : }
71 :
72 : impl Endpoint {
73 : /// Construct a new HTTP endpoint wrapper.
74 : /// Http client is not constructed under the hood so that it can be shared.
75 2 : pub fn new(endpoint: ApiUrl, client: impl Into<ClientWithMiddleware>) -> Self {
76 2 : Self {
77 2 : endpoint,
78 2 : client: client.into(),
79 2 : }
80 2 : }
81 :
82 : #[inline(always)]
83 0 : pub(crate) fn url(&self) -> &ApiUrl {
84 0 : &self.endpoint
85 0 : }
86 :
87 : /// Return a [builder](RequestBuilder) for a `GET` request,
88 : /// appending a single `path` segment to the base endpoint URL.
89 2 : pub(crate) fn get(&self, path: &str) -> RequestBuilder {
90 2 : let mut url = self.endpoint.clone();
91 2 : url.path_segments_mut().push(path);
92 2 : self.client.get(url.into_inner())
93 2 : }
94 :
95 : /// Execute a [request](reqwest::Request).
96 0 : pub(crate) async fn execute(&self, request: Request) -> Result<Response, Error> {
97 0 : let _timer = Metrics::get()
98 0 : .proxy
99 0 : .console_request_latency
100 0 : .start_timer(ConsoleRequest {
101 0 : request: request.url().path(),
102 0 : });
103 0 :
104 0 : self.client.execute(request).await
105 0 : }
106 : }
107 :
108 2 : pub(crate) async fn parse_json_body_with_limit<D: DeserializeOwned>(
109 2 : mut b: impl Body<Data = Bytes, Error = reqwest::Error> + Unpin,
110 2 : limit: usize,
111 2 : ) -> anyhow::Result<D> {
112 : // We could use `b.limited().collect().await.to_bytes()` here
113 : // but this ends up being slightly more efficient as far as I can tell.
114 :
115 : // check the lower bound of the size hint.
116 : // in reqwest, this value is influenced by the Content-Length header.
117 2 : let lower_bound = match usize::try_from(b.size_hint().lower()) {
118 2 : Ok(bound) if bound <= limit => bound,
119 0 : _ => bail!("Content length exceeds limit of {limit} bytes"),
120 : };
121 2 : let mut bytes = Vec::with_capacity(lower_bound);
122 :
123 4 : while let Some(frame) = b.frame().await.transpose()? {
124 2 : if let Ok(data) = frame.into_data() {
125 2 : if bytes.len() + data.len() > limit {
126 0 : bail!("Content length exceeds limit of {limit} bytes")
127 2 : }
128 2 : bytes.extend_from_slice(&data);
129 0 : }
130 : }
131 :
132 2 : Ok(serde_json::from_slice::<D>(&bytes)?)
133 2 : }
134 :
135 : #[cfg(test)]
136 : mod tests {
137 : use super::*;
138 : use reqwest::Client;
139 :
140 : #[test]
141 1 : fn optional_query_params() -> anyhow::Result<()> {
142 1 : let url = "http://example.com".parse()?;
143 1 : let endpoint = Endpoint::new(url, Client::new());
144 :
145 : // Validate that this pattern makes sense.
146 1 : let req = endpoint
147 1 : .get("frobnicate")
148 1 : .query(&[
149 1 : ("foo", Some("10")), // should be just `foo=10`
150 1 : ("bar", None), // shouldn't be passed at all
151 1 : ])
152 1 : .build()?;
153 :
154 1 : assert_eq!(req.url().as_str(), "http://example.com/frobnicate?foo=10");
155 :
156 1 : Ok(())
157 1 : }
158 :
159 : #[test]
160 1 : fn uuid_params() -> anyhow::Result<()> {
161 1 : let url = "http://example.com".parse()?;
162 1 : let endpoint = Endpoint::new(url, Client::new());
163 :
164 1 : let req = endpoint
165 1 : .get("frobnicate")
166 1 : .query(&[("session_id", uuid::Uuid::nil())])
167 1 : .build()?;
168 :
169 1 : assert_eq!(
170 1 : req.url().as_str(),
171 1 : "http://example.com/frobnicate?session_id=00000000-0000-0000-0000-000000000000"
172 1 : );
173 :
174 1 : Ok(())
175 1 : }
176 : }
|