Line data Source code
1 : use std::{error::Error, sync::Arc};
2 :
3 : use futures::StreamExt;
4 : use futures::stream::FuturesUnordered;
5 : use hyper0::Body;
6 : use hyper0::server::conn::Http;
7 : use metrics::{IntCounterVec, register_int_counter_vec};
8 : use once_cell::sync::Lazy;
9 : use routerify::{RequestService, RequestServiceBuilder};
10 : use tokio::io::{AsyncRead, AsyncWrite};
11 : use tokio_rustls::TlsAcceptor;
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{error, info};
14 :
15 : use crate::error::ApiError;
16 :
17 : /// A simple HTTP server over hyper library.
18 : /// You may want to use it instead of [`hyper0::server::Server`] because:
19 : /// 1. hyper0's Server was removed from hyper v1.
20 : /// It's recommended to replace hyepr0's Server with a manual loop, which is done here.
21 : /// 2. hyper0's Server doesn't support TLS out of the box, and there is no way
22 : /// to support it efficiently with the Accept trait that hyper0's Server uses.
23 : /// That's one of the reasons why it was removed from v1.
24 : /// <https://github.com/hyperium/hyper/blob/115339d3df50f20c8717680aa35f48858e9a6205/docs/ROADMAP.md#higher-level-client-and-server-problems>
25 : pub struct Server {
26 : request_service: Arc<RequestServiceBuilder<Body, ApiError>>,
27 : listener: tokio::net::TcpListener,
28 : tls_acceptor: Option<TlsAcceptor>,
29 : }
30 :
31 0 : static CONNECTION_STARTED_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
32 0 : register_int_counter_vec!(
33 0 : "http_server_connection_started_total",
34 0 : "Number of established http/https connections",
35 0 : &["scheme"]
36 0 : )
37 0 : .expect("failed to define a metric")
38 0 : });
39 :
40 0 : static CONNECTION_ERROR_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
41 0 : register_int_counter_vec!(
42 0 : "http_server_connection_errors_total",
43 0 : "Number of occured connection errors by type",
44 0 : &["type"]
45 0 : )
46 0 : .expect("failed to define a metric")
47 0 : });
48 :
49 : impl Server {
50 0 : pub fn new(
51 0 : request_service: Arc<RequestServiceBuilder<Body, ApiError>>,
52 0 : listener: std::net::TcpListener,
53 0 : tls_acceptor: Option<TlsAcceptor>,
54 0 : ) -> anyhow::Result<Self> {
55 0 : // Note: caller of from_std is responsible for setting nonblocking mode.
56 0 : listener.set_nonblocking(true)?;
57 0 : let listener = tokio::net::TcpListener::from_std(listener)?;
58 :
59 0 : Ok(Self {
60 0 : request_service,
61 0 : listener,
62 0 : tls_acceptor,
63 0 : })
64 0 : }
65 :
66 0 : pub async fn serve(self, cancel: CancellationToken) -> anyhow::Result<()> {
67 0 : fn suppress_io_error(err: &std::io::Error) -> bool {
68 : use std::io::ErrorKind::*;
69 0 : matches!(err.kind(), ConnectionReset | ConnectionAborted | BrokenPipe)
70 0 : }
71 0 : fn suppress_hyper_error(err: &hyper0::Error) -> bool {
72 0 : if err.is_incomplete_message() || err.is_closed() || err.is_timeout() {
73 0 : return true;
74 0 : }
75 0 : if let Some(inner) = err.source() {
76 0 : if let Some(io) = inner.downcast_ref::<std::io::Error>() {
77 0 : return suppress_io_error(io);
78 0 : }
79 0 : }
80 0 : false
81 0 : }
82 :
83 0 : let tcp_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["tcp"]);
84 0 : let tls_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["tls"]);
85 0 : let http_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["http"]);
86 0 : let https_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["https"]);
87 0 : let panic_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["panic"]);
88 0 :
89 0 : let http_connection_cnt = CONNECTION_STARTED_COUNT.with_label_values(&["http"]);
90 0 : let https_connection_cnt = CONNECTION_STARTED_COUNT.with_label_values(&["https"]);
91 0 :
92 0 : let mut connections = FuturesUnordered::new();
93 : loop {
94 0 : tokio::select! {
95 0 : stream = self.listener.accept() => {
96 0 : let (tcp_stream, remote_addr) = match stream {
97 0 : Ok(stream) => stream,
98 0 : Err(err) => {
99 0 : tcp_error_cnt.inc();
100 0 : if !suppress_io_error(&err) {
101 0 : info!("Failed to accept TCP connection: {err:#}");
102 0 : }
103 0 : continue;
104 : }
105 : };
106 :
107 0 : let service = self.request_service.build(remote_addr);
108 0 : let tls_acceptor = self.tls_acceptor.clone();
109 0 : let cancel = cancel.clone();
110 0 :
111 0 : let tls_error_cnt = tls_error_cnt.clone();
112 0 : let http_error_cnt = http_error_cnt.clone();
113 0 : let https_error_cnt = https_error_cnt.clone();
114 0 : let http_connection_cnt = http_connection_cnt.clone();
115 0 : let https_connection_cnt = https_connection_cnt.clone();
116 0 :
117 0 : connections.push(tokio::spawn(
118 0 : async move {
119 0 : match tls_acceptor {
120 0 : Some(tls_acceptor) => {
121 0 : // Handle HTTPS connection.
122 0 : https_connection_cnt.inc();
123 0 : let tls_stream = tokio::select! {
124 0 : tls_stream = tls_acceptor.accept(tcp_stream) => tls_stream,
125 0 : _ = cancel.cancelled() => return,
126 : };
127 0 : let tls_stream = match tls_stream {
128 0 : Ok(tls_stream) => tls_stream,
129 0 : Err(err) => {
130 0 : tls_error_cnt.inc();
131 0 : if !suppress_io_error(&err) {
132 0 : info!(%remote_addr, "Failed to accept TLS connection: {err:#}");
133 0 : }
134 0 : return;
135 : }
136 : };
137 0 : if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await {
138 0 : https_error_cnt.inc();
139 0 : if !suppress_hyper_error(&err) {
140 0 : info!(%remote_addr, "Failed to serve HTTPS connection: {err:#}");
141 0 : }
142 0 : }
143 : }
144 : None => {
145 : // Handle HTTP connection.
146 0 : http_connection_cnt.inc();
147 0 : if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await {
148 0 : http_error_cnt.inc();
149 0 : if !suppress_hyper_error(&err) {
150 0 : info!(%remote_addr, "Failed to serve HTTP connection: {err:#}");
151 0 : }
152 0 : }
153 : }
154 : };
155 0 : }));
156 0 : }
157 0 : Some(conn) = connections.next() => {
158 0 : if let Err(err) = conn {
159 0 : panic_error_cnt.inc();
160 0 : error!("Connection panicked: {err:#}");
161 0 : }
162 : }
163 0 : _ = cancel.cancelled() => {
164 : // Wait for graceful shutdown of all connections.
165 0 : while let Some(conn) = connections.next().await {
166 0 : if let Err(err) = conn {
167 0 : panic_error_cnt.inc();
168 0 : error!("Connection panicked: {err:#}");
169 0 : }
170 : }
171 0 : break;
172 0 : }
173 0 : }
174 0 : }
175 0 : Ok(())
176 0 : }
177 :
178 : /// Serves HTTP connection with graceful shutdown.
179 0 : async fn serve_connection<I>(
180 0 : io: I,
181 0 : service: RequestService<Body, ApiError>,
182 0 : cancel: CancellationToken,
183 0 : ) -> Result<(), hyper0::Error>
184 0 : where
185 0 : I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
186 0 : {
187 0 : let mut conn = Http::new().serve_connection(io, service).with_upgrades();
188 0 :
189 0 : tokio::select! {
190 0 : res = &mut conn => res,
191 0 : _ = cancel.cancelled() => {
192 0 : Pin::new(&mut conn).graceful_shutdown();
193 0 : // Note: connection should still be awaited for graceful shutdown to complete.
194 0 : conn.await
195 : }
196 : }
197 0 : }
198 : }
|