LCOV - code coverage report
Current view: top level - libs/http-utils/src - server.rs (source / functions) Coverage Total Hit
Test: 98683a8629f0f7f0031d02e04512998d589d76ea.info Lines: 0.0 % 141 0
Test Date: 2025-04-11 16:58:57 Functions: 0.0 % 12 0

            Line data    Source code
       1              : use std::{error::Error, sync::Arc};
       2              : 
       3              : use futures::StreamExt;
       4              : use futures::stream::FuturesUnordered;
       5              : use hyper0::Body;
       6              : use hyper0::server::conn::Http;
       7              : use metrics::{IntCounterVec, register_int_counter_vec};
       8              : use once_cell::sync::Lazy;
       9              : use routerify::{RequestService, RequestServiceBuilder};
      10              : use tokio::io::{AsyncRead, AsyncWrite};
      11              : use tokio_rustls::TlsAcceptor;
      12              : use tokio_util::sync::CancellationToken;
      13              : use tracing::{error, info};
      14              : 
      15              : use crate::error::ApiError;
      16              : 
      17              : /// A simple HTTP server over hyper library.
      18              : /// You may want to use it instead of [`hyper0::server::Server`] because:
      19              : /// 1. hyper0's Server was removed from hyper v1.
      20              : ///    It's recommended to replace hyepr0's Server with a manual loop, which is done here.
      21              : /// 2. hyper0's Server doesn't support TLS out of the box, and there is no way
      22              : ///    to support it efficiently with the Accept trait that hyper0's Server uses.
      23              : ///    That's one of the reasons why it was removed from v1.
      24              : ///    <https://github.com/hyperium/hyper/blob/115339d3df50f20c8717680aa35f48858e9a6205/docs/ROADMAP.md#higher-level-client-and-server-problems>
      25              : pub struct Server {
      26              :     request_service: Arc<RequestServiceBuilder<Body, ApiError>>,
      27              :     listener: tokio::net::TcpListener,
      28              :     tls_acceptor: Option<TlsAcceptor>,
      29              : }
      30              : 
      31            0 : static CONNECTION_STARTED_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
      32            0 :     register_int_counter_vec!(
      33            0 :         "http_server_connection_started_total",
      34            0 :         "Number of established http/https connections",
      35            0 :         &["scheme"]
      36            0 :     )
      37            0 :     .expect("failed to define a metric")
      38            0 : });
      39              : 
      40            0 : static CONNECTION_ERROR_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
      41            0 :     register_int_counter_vec!(
      42            0 :         "http_server_connection_errors_total",
      43            0 :         "Number of occured connection errors by type",
      44            0 :         &["type"]
      45            0 :     )
      46            0 :     .expect("failed to define a metric")
      47            0 : });
      48              : 
      49              : impl Server {
      50            0 :     pub fn new(
      51            0 :         request_service: Arc<RequestServiceBuilder<Body, ApiError>>,
      52            0 :         listener: std::net::TcpListener,
      53            0 :         tls_acceptor: Option<TlsAcceptor>,
      54            0 :     ) -> anyhow::Result<Self> {
      55            0 :         // Note: caller of from_std is responsible for setting nonblocking mode.
      56            0 :         listener.set_nonblocking(true)?;
      57            0 :         let listener = tokio::net::TcpListener::from_std(listener)?;
      58              : 
      59            0 :         Ok(Self {
      60            0 :             request_service,
      61            0 :             listener,
      62            0 :             tls_acceptor,
      63            0 :         })
      64            0 :     }
      65              : 
      66            0 :     pub async fn serve(self, cancel: CancellationToken) -> anyhow::Result<()> {
      67            0 :         fn suppress_io_error(err: &std::io::Error) -> bool {
      68              :             use std::io::ErrorKind::*;
      69            0 :             matches!(err.kind(), ConnectionReset | ConnectionAborted | BrokenPipe)
      70            0 :         }
      71            0 :         fn suppress_hyper_error(err: &hyper0::Error) -> bool {
      72            0 :             if err.is_incomplete_message() || err.is_closed() || err.is_timeout() {
      73            0 :                 return true;
      74            0 :             }
      75            0 :             if let Some(inner) = err.source() {
      76            0 :                 if let Some(io) = inner.downcast_ref::<std::io::Error>() {
      77            0 :                     return suppress_io_error(io);
      78            0 :                 }
      79            0 :             }
      80            0 :             false
      81            0 :         }
      82              : 
      83            0 :         let tcp_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["tcp"]);
      84            0 :         let tls_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["tls"]);
      85            0 :         let http_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["http"]);
      86            0 :         let https_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["https"]);
      87            0 :         let panic_error_cnt = CONNECTION_ERROR_COUNT.with_label_values(&["panic"]);
      88            0 : 
      89            0 :         let http_connection_cnt = CONNECTION_STARTED_COUNT.with_label_values(&["http"]);
      90            0 :         let https_connection_cnt = CONNECTION_STARTED_COUNT.with_label_values(&["https"]);
      91            0 : 
      92            0 :         let mut connections = FuturesUnordered::new();
      93              :         loop {
      94            0 :             tokio::select! {
      95            0 :                 stream = self.listener.accept() => {
      96            0 :                     let (tcp_stream, remote_addr) = match stream {
      97            0 :                         Ok(stream) => stream,
      98            0 :                         Err(err) => {
      99            0 :                             tcp_error_cnt.inc();
     100            0 :                             if !suppress_io_error(&err) {
     101            0 :                                 info!("Failed to accept TCP connection: {err:#}");
     102            0 :                             }
     103            0 :                             continue;
     104              :                         }
     105              :                     };
     106              : 
     107            0 :                     let service = self.request_service.build(remote_addr);
     108            0 :                     let tls_acceptor = self.tls_acceptor.clone();
     109            0 :                     let cancel = cancel.clone();
     110            0 : 
     111            0 :                     let tls_error_cnt = tls_error_cnt.clone();
     112            0 :                     let http_error_cnt = http_error_cnt.clone();
     113            0 :                     let https_error_cnt = https_error_cnt.clone();
     114            0 :                     let http_connection_cnt = http_connection_cnt.clone();
     115            0 :                     let https_connection_cnt = https_connection_cnt.clone();
     116            0 : 
     117            0 :                     connections.push(tokio::spawn(
     118            0 :                         async move {
     119            0 :                             match tls_acceptor {
     120            0 :                                 Some(tls_acceptor) => {
     121            0 :                                     // Handle HTTPS connection.
     122            0 :                                     https_connection_cnt.inc();
     123            0 :                                     let tls_stream = tokio::select! {
     124            0 :                                         tls_stream = tls_acceptor.accept(tcp_stream) => tls_stream,
     125            0 :                                         _ = cancel.cancelled() => return,
     126              :                                     };
     127            0 :                                     let tls_stream = match tls_stream {
     128            0 :                                         Ok(tls_stream) => tls_stream,
     129            0 :                                         Err(err) => {
     130            0 :                                             tls_error_cnt.inc();
     131            0 :                                             if !suppress_io_error(&err) {
     132            0 :                                                 info!(%remote_addr, "Failed to accept TLS connection: {err:#}");
     133            0 :                                             }
     134            0 :                                             return;
     135              :                                         }
     136              :                                     };
     137            0 :                                     if let Err(err) = Self::serve_connection(tls_stream, service, cancel).await {
     138            0 :                                         https_error_cnt.inc();
     139            0 :                                         if !suppress_hyper_error(&err) {
     140            0 :                                             info!(%remote_addr, "Failed to serve HTTPS connection: {err:#}");
     141            0 :                                         }
     142            0 :                                     }
     143              :                                 }
     144              :                                 None => {
     145              :                                     // Handle HTTP connection.
     146            0 :                                     http_connection_cnt.inc();
     147            0 :                                     if let Err(err) = Self::serve_connection(tcp_stream, service, cancel).await {
     148            0 :                                         http_error_cnt.inc();
     149            0 :                                         if !suppress_hyper_error(&err) {
     150            0 :                                             info!(%remote_addr, "Failed to serve HTTP connection: {err:#}");
     151            0 :                                         }
     152            0 :                                     }
     153              :                                 }
     154              :                             };
     155            0 :                         }));
     156            0 :                  }
     157            0 :                 Some(conn) = connections.next() => {
     158            0 :                     if let Err(err) = conn {
     159            0 :                         panic_error_cnt.inc();
     160            0 :                         error!("Connection panicked: {err:#}");
     161            0 :                     }
     162              :                 }
     163            0 :                 _ = cancel.cancelled() => {
     164              :                     // Wait for graceful shutdown of all connections.
     165            0 :                     while let Some(conn) = connections.next().await {
     166            0 :                         if let Err(err) = conn {
     167            0 :                             panic_error_cnt.inc();
     168            0 :                             error!("Connection panicked: {err:#}");
     169            0 :                         }
     170              :                     }
     171            0 :                     break;
     172            0 :                 }
     173            0 :             }
     174            0 :         }
     175            0 :         Ok(())
     176            0 :     }
     177              : 
     178              :     /// Serves HTTP connection with graceful shutdown.
     179            0 :     async fn serve_connection<I>(
     180            0 :         io: I,
     181            0 :         service: RequestService<Body, ApiError>,
     182            0 :         cancel: CancellationToken,
     183            0 :     ) -> Result<(), hyper0::Error>
     184            0 :     where
     185            0 :         I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
     186            0 :     {
     187            0 :         let mut conn = Http::new().serve_connection(io, service).with_upgrades();
     188            0 : 
     189            0 :         tokio::select! {
     190            0 :             res = &mut conn => res,
     191            0 :             _ = cancel.cancelled() => {
     192            0 :                 Pin::new(&mut conn).graceful_shutdown();
     193            0 :                 // Note: connection should still be awaited for graceful shutdown to complete.
     194            0 :                 conn.await
     195              :             }
     196              :         }
     197            0 :     }
     198              : }
        

Generated by: LCOV version 2.1-beta