LCOV - code coverage report
Current view: top level - proxy/src/serverless - mod.rs (source / functions) Coverage Total Hit
Test: 4be46b1c0003aa3bbac9ade362c676b419df4c20.info Lines: 0.0 % 332 0
Test Date: 2025-07-22 17:50:06 Functions: 0.0 % 33 0

            Line data    Source code
       1              : //! Routers for our serverless APIs
       2              : //!
       3              : //! Handles both SQL over HTTP and SQL over Websockets.
       4              : 
       5              : mod backend;
       6              : pub mod cancel_set;
       7              : mod conn_pool;
       8              : mod conn_pool_lib;
       9              : mod error;
      10              : mod http_conn_pool;
      11              : mod http_util;
      12              : mod json;
      13              : mod local_conn_pool;
      14              : #[cfg(feature = "rest_broker")]
      15              : pub mod rest;
      16              : mod sql_over_http;
      17              : mod websocket;
      18              : 
      19              : use std::net::{IpAddr, SocketAddr};
      20              : use std::pin::{Pin, pin};
      21              : use std::sync::Arc;
      22              : 
      23              : use anyhow::Context;
      24              : use arc_swap::ArcSwapOption;
      25              : use async_trait::async_trait;
      26              : use atomic_take::AtomicTake;
      27              : use bytes::Bytes;
      28              : pub use conn_pool_lib::GlobalConnPoolOptions;
      29              : use futures::TryFutureExt;
      30              : use futures::future::{Either, select};
      31              : use http::{Method, Response, StatusCode};
      32              : use http_body_util::combinators::BoxBody;
      33              : use http_body_util::{BodyExt, Empty};
      34              : use http_util::{NEON_REQUEST_ID, uuid_to_header_value};
      35              : use http_utils::error::ApiError;
      36              : use hyper::body::Incoming;
      37              : use hyper_util::rt::TokioExecutor;
      38              : use hyper_util::server::conn::auto::Builder;
      39              : use rand::SeedableRng;
      40              : use rand::rngs::StdRng;
      41              : use tokio::io::{AsyncRead, AsyncWrite};
      42              : use tokio::net::{TcpListener, TcpStream};
      43              : use tokio::time::timeout;
      44              : use tokio_rustls::TlsAcceptor;
      45              : use tokio_util::sync::CancellationToken;
      46              : use tokio_util::task::TaskTracker;
      47              : use tracing::{Instrument, info, warn};
      48              : 
      49              : use crate::cancellation::CancellationHandler;
      50              : use crate::config::{ProxyConfig, ProxyProtocolV2};
      51              : use crate::context::RequestContext;
      52              : use crate::ext::TaskExt;
      53              : use crate::metrics::Metrics;
      54              : use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
      55              : use crate::rate_limiter::EndpointRateLimiter;
      56              : use crate::serverless::backend::PoolingBackend;
      57              : use crate::serverless::http_util::{api_error_into_response, json_response};
      58              : use crate::util::run_until_cancelled;
      59              : 
      60              : pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
      61              : pub(crate) const AUTH_BROKER_SNI: &str = "apiauth";
      62              : 
      63            0 : pub async fn task_main(
      64            0 :     config: &'static ProxyConfig,
      65            0 :     auth_backend: &'static crate::auth::Backend<'static, ()>,
      66            0 :     ws_listener: TcpListener,
      67            0 :     cancellation_token: CancellationToken,
      68            0 :     cancellation_handler: Arc<CancellationHandler>,
      69            0 :     endpoint_rate_limiter: Arc<EndpointRateLimiter>,
      70            0 : ) -> anyhow::Result<()> {
      71            0 :     scopeguard::defer! {
      72              :         info!("websocket server has shut down");
      73              :     }
      74              : 
      75            0 :     let local_pool = local_conn_pool::LocalConnPool::new(&config.http_config);
      76            0 :     let conn_pool = conn_pool_lib::GlobalConnPool::new(&config.http_config);
      77              :     {
      78            0 :         let conn_pool = Arc::clone(&conn_pool);
      79            0 :         tokio::spawn(async move {
      80            0 :             conn_pool.gc_worker(StdRng::from_os_rng()).await;
      81            0 :         });
      82              :     }
      83              : 
      84              :     // shutdown the connection pool
      85            0 :     tokio::spawn({
      86            0 :         let cancellation_token = cancellation_token.clone();
      87            0 :         let conn_pool = conn_pool.clone();
      88            0 :         async move {
      89            0 :             cancellation_token.cancelled().await;
      90            0 :             tokio::task::spawn_blocking(move || conn_pool.shutdown())
      91            0 :                 .await
      92            0 :                 .propagate_task_panic();
      93            0 :         }
      94              :     });
      95              : 
      96            0 :     let http_conn_pool = conn_pool_lib::GlobalConnPool::new(&config.http_config);
      97              :     {
      98            0 :         let http_conn_pool = Arc::clone(&http_conn_pool);
      99            0 :         tokio::spawn(async move {
     100            0 :             http_conn_pool.gc_worker(StdRng::from_os_rng()).await;
     101            0 :         });
     102              :     }
     103              : 
     104              :     // shutdown the connection pool
     105            0 :     tokio::spawn({
     106            0 :         let cancellation_token = cancellation_token.clone();
     107            0 :         let http_conn_pool = http_conn_pool.clone();
     108            0 :         async move {
     109            0 :             cancellation_token.cancelled().await;
     110            0 :             tokio::task::spawn_blocking(move || http_conn_pool.shutdown())
     111            0 :                 .await
     112            0 :                 .propagate_task_panic();
     113            0 :         }
     114              :     });
     115              : 
     116            0 :     let backend = Arc::new(PoolingBackend {
     117            0 :         http_conn_pool: Arc::clone(&http_conn_pool),
     118            0 :         local_pool,
     119            0 :         pool: Arc::clone(&conn_pool),
     120            0 :         config,
     121            0 :         auth_backend,
     122            0 :         endpoint_rate_limiter: Arc::clone(&endpoint_rate_limiter),
     123            0 :     });
     124            0 :     let tls_acceptor: Arc<dyn MaybeTlsAcceptor> = Arc::new(&config.tls_config);
     125              : 
     126            0 :     let connections = tokio_util::task::task_tracker::TaskTracker::new();
     127            0 :     connections.close(); // allows `connections.wait to complete`
     128              : 
     129            0 :     let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
     130            0 :     while let Some(res) = run_until_cancelled(ws_listener.accept(), &cancellation_token).await {
     131            0 :         let (conn, peer_addr) = res.context("could not accept TCP stream")?;
     132            0 :         if let Err(e) = conn.set_nodelay(true) {
     133            0 :             tracing::error!("could not set nodelay: {e}");
     134            0 :             continue;
     135            0 :         }
     136            0 :         let conn_id = uuid::Uuid::new_v4();
     137            0 :         let http_conn_span = tracing::info_span!("http_conn", ?conn_id);
     138              : 
     139            0 :         let n_connections = Metrics::get()
     140            0 :             .proxy
     141            0 :             .client_connections
     142            0 :             .sample(crate::metrics::Protocol::Http);
     143            0 :         tracing::trace!(?n_connections, threshold = ?config.http_config.client_conn_threshold, "check");
     144            0 :         if n_connections > config.http_config.client_conn_threshold {
     145            0 :             tracing::trace!("attempting to cancel a random connection");
     146            0 :             if let Some(token) = config.http_config.cancel_set.take() {
     147            0 :                 tracing::debug!("cancelling a random connection");
     148            0 :                 token.cancel();
     149            0 :             }
     150            0 :         }
     151              : 
     152            0 :         let conn_token = cancellation_token.child_token();
     153            0 :         let tls_acceptor = tls_acceptor.clone();
     154            0 :         let backend = backend.clone();
     155            0 :         let connections2 = connections.clone();
     156            0 :         let cancellation_handler = cancellation_handler.clone();
     157            0 :         let endpoint_rate_limiter = endpoint_rate_limiter.clone();
     158            0 :         let cancellations = cancellations.clone();
     159            0 :         connections.spawn(
     160            0 :             async move {
     161            0 :                 let conn_token2 = conn_token.clone();
     162            0 :                 let _cancel_guard = config.http_config.cancel_set.insert(conn_id, conn_token2);
     163              : 
     164            0 :                 let session_id = uuid::Uuid::new_v4();
     165              : 
     166            0 :                 let _gauge = Metrics::get()
     167            0 :                     .proxy
     168            0 :                     .client_connections
     169            0 :                     .guard(crate::metrics::Protocol::Http);
     170              : 
     171            0 :                 let startup_result = Box::pin(connection_startup(
     172            0 :                     config,
     173            0 :                     tls_acceptor,
     174            0 :                     session_id,
     175            0 :                     conn,
     176            0 :                     peer_addr,
     177            0 :                 ))
     178            0 :                 .await;
     179            0 :                 let Some((conn, conn_info)) = startup_result else {
     180            0 :                     return;
     181              :                 };
     182              : 
     183            0 :                 Box::pin(connection_handler(
     184            0 :                     config,
     185            0 :                     backend,
     186            0 :                     connections2,
     187            0 :                     cancellations,
     188            0 :                     cancellation_handler,
     189            0 :                     endpoint_rate_limiter,
     190            0 :                     conn_token,
     191            0 :                     conn,
     192            0 :                     conn_info,
     193            0 :                     session_id,
     194            0 :                 ))
     195            0 :                 .await;
     196            0 :             }
     197            0 :             .instrument(http_conn_span),
     198              :         );
     199              :     }
     200              : 
     201            0 :     connections.wait().await;
     202              : 
     203            0 :     Ok(())
     204            0 : }
     205              : 
     206              : pub(crate) trait AsyncReadWrite: AsyncRead + AsyncWrite + Send + 'static {}
     207              : impl<T: AsyncRead + AsyncWrite + Send + 'static> AsyncReadWrite for T {}
     208              : pub(crate) type AsyncRW = Pin<Box<dyn AsyncReadWrite>>;
     209              : 
     210              : #[async_trait]
     211              : trait MaybeTlsAcceptor: Send + Sync + 'static {
     212              :     async fn accept(&self, conn: TcpStream) -> std::io::Result<AsyncRW>;
     213              : }
     214              : 
     215              : #[async_trait]
     216              : impl MaybeTlsAcceptor for &'static ArcSwapOption<crate::config::TlsConfig> {
     217            0 :     async fn accept(&self, conn: TcpStream) -> std::io::Result<AsyncRW> {
     218            0 :         match &*self.load() {
     219            0 :             Some(config) => Ok(Box::pin(
     220            0 :                 TlsAcceptor::from(config.http_config.clone())
     221            0 :                     .accept(conn)
     222            0 :                     .await?,
     223              :             )),
     224            0 :             None => Ok(Box::pin(conn)),
     225              :         }
     226            0 :     }
     227              : }
     228              : 
     229              : /// Handles the TCP startup lifecycle.
     230              : /// 1. Parses PROXY protocol V2
     231              : /// 2. Handles TLS handshake
     232            0 : async fn connection_startup(
     233            0 :     config: &ProxyConfig,
     234            0 :     tls_acceptor: Arc<dyn MaybeTlsAcceptor>,
     235            0 :     session_id: uuid::Uuid,
     236            0 :     conn: TcpStream,
     237            0 :     peer_addr: SocketAddr,
     238            0 : ) -> Option<(AsyncRW, ConnectionInfo)> {
     239              :     // handle PROXY protocol
     240            0 :     let (conn, conn_info) = match config.proxy_protocol_v2 {
     241              :         ProxyProtocolV2::Required => {
     242            0 :             match read_proxy_protocol(conn).await {
     243            0 :                 Err(e) => {
     244            0 :                     warn!("per-client task finished with an error: {e:#}");
     245            0 :                     return None;
     246              :                 }
     247              :                 // our load balancers will not send any more data. let's just exit immediately
     248            0 :                 Ok((_conn, ConnectHeader::Local)) => {
     249            0 :                     tracing::debug!("healthcheck received");
     250            0 :                     return None;
     251              :                 }
     252            0 :                 Ok((conn, ConnectHeader::Proxy(info))) => (conn, info),
     253              :             }
     254              :         }
     255              :         // ignore the header - it cannot be confused for a postgres or http connection so will
     256              :         // error later.
     257            0 :         ProxyProtocolV2::Rejected => (
     258            0 :             conn,
     259            0 :             ConnectionInfo {
     260            0 :                 addr: peer_addr,
     261            0 :                 extra: None,
     262            0 :             },
     263            0 :         ),
     264              :     };
     265              : 
     266            0 :     let has_private_peer_addr = match conn_info.addr.ip() {
     267            0 :         IpAddr::V4(ip) => ip.is_private(),
     268            0 :         IpAddr::V6(_) => false,
     269              :     };
     270            0 :     info!(?session_id, %conn_info, "accepted new TCP connection");
     271              : 
     272              :     // try upgrade to TLS, but with a timeout.
     273            0 :     let conn = match timeout(config.handshake_timeout, tls_acceptor.accept(conn)).await {
     274            0 :         Ok(Ok(conn)) => {
     275            0 :             info!(?session_id, %conn_info, "accepted new TLS connection");
     276            0 :             conn
     277              :         }
     278              :         // The handshake failed
     279            0 :         Ok(Err(e)) => {
     280            0 :             if !has_private_peer_addr {
     281            0 :                 Metrics::get().proxy.tls_handshake_failures.inc();
     282            0 :             }
     283            0 :             warn!(?session_id, %conn_info, "failed to accept TLS connection: {e:?}");
     284            0 :             return None;
     285              :         }
     286              :         // The handshake timed out
     287            0 :         Err(e) => {
     288            0 :             if !has_private_peer_addr {
     289            0 :                 Metrics::get().proxy.tls_handshake_failures.inc();
     290            0 :             }
     291            0 :             warn!(?session_id, %conn_info, "failed to accept TLS connection: {e:?}");
     292            0 :             return None;
     293              :         }
     294              :     };
     295              : 
     296            0 :     Some((conn, conn_info))
     297            0 : }
     298              : 
     299              : /// Handles HTTP connection
     300              : /// 1. With graceful shutdowns
     301              : /// 2. With graceful request cancellation with connection failure
     302              : /// 3. With websocket upgrade support.
     303              : #[allow(clippy::too_many_arguments)]
     304            0 : async fn connection_handler(
     305            0 :     config: &'static ProxyConfig,
     306            0 :     backend: Arc<PoolingBackend>,
     307            0 :     connections: TaskTracker,
     308            0 :     cancellations: TaskTracker,
     309            0 :     cancellation_handler: Arc<CancellationHandler>,
     310            0 :     endpoint_rate_limiter: Arc<EndpointRateLimiter>,
     311            0 :     cancellation_token: CancellationToken,
     312            0 :     conn: AsyncRW,
     313            0 :     conn_info: ConnectionInfo,
     314            0 :     session_id: uuid::Uuid,
     315            0 : ) {
     316            0 :     let session_id = AtomicTake::new(session_id);
     317              : 
     318              :     // Cancel all current inflight HTTP requests if the HTTP connection is closed.
     319            0 :     let http_cancellation_token = CancellationToken::new();
     320            0 :     let _cancel_connection = http_cancellation_token.clone().drop_guard();
     321              : 
     322            0 :     let conn_info2 = conn_info.clone();
     323            0 :     let server = Builder::new(TokioExecutor::new());
     324            0 :     let conn = server.serve_connection_with_upgrades(
     325            0 :         hyper_util::rt::TokioIo::new(conn),
     326            0 :         hyper::service::service_fn(move |req: hyper::Request<Incoming>| {
     327              :             // First HTTP request shares the same session ID
     328            0 :             let mut session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
     329              : 
     330            0 :             if matches!(backend.auth_backend, crate::auth::Backend::Local(_)) {
     331              :                 // take session_id from request, if given.
     332            0 :                 if let Some(id) = req
     333            0 :                     .headers()
     334            0 :                     .get(&NEON_REQUEST_ID)
     335            0 :                     .and_then(|id| uuid::Uuid::try_parse_ascii(id.as_bytes()).ok())
     336            0 :                 {
     337            0 :                     session_id = id;
     338            0 :                 }
     339            0 :             }
     340              : 
     341              :             // Cancel the current inflight HTTP request if the requets stream is closed.
     342              :             // This is slightly different to `_cancel_connection` in that
     343              :             // h2 can cancel individual requests with a `RST_STREAM`.
     344            0 :             let http_request_token = http_cancellation_token.child_token();
     345            0 :             let cancel_request = http_request_token.clone().drop_guard();
     346              : 
     347              :             // `request_handler` is not cancel safe. It expects to be cancelled only at specific times.
     348              :             // By spawning the future, we ensure it never gets cancelled until it decides to.
     349            0 :             let cancellations = cancellations.clone();
     350            0 :             let handler = connections.spawn(
     351            0 :                 request_handler(
     352            0 :                     req,
     353            0 :                     config,
     354            0 :                     backend.clone(),
     355            0 :                     connections.clone(),
     356            0 :                     cancellation_handler.clone(),
     357            0 :                     session_id,
     358            0 :                     conn_info2.clone(),
     359            0 :                     http_request_token,
     360            0 :                     endpoint_rate_limiter.clone(),
     361            0 :                     cancellations,
     362              :                 )
     363            0 :                 .in_current_span()
     364            0 :                 .map_ok_or_else(api_error_into_response, |r| r),
     365              :             );
     366            0 :             async move {
     367            0 :                 let mut res = handler.await;
     368            0 :                 cancel_request.disarm();
     369              : 
     370              :                 // add the session ID to the response
     371            0 :                 if let Ok(resp) = &mut res {
     372            0 :                     resp.headers_mut()
     373            0 :                         .append(&NEON_REQUEST_ID, uuid_to_header_value(session_id));
     374            0 :                 }
     375              : 
     376            0 :                 res
     377            0 :             }
     378            0 :         }),
     379              :     );
     380              : 
     381              :     // On cancellation, trigger the HTTP connection handler to shut down.
     382            0 :     let res = match select(pin!(cancellation_token.cancelled()), pin!(conn)).await {
     383            0 :         Either::Left((_cancelled, mut conn)) => {
     384            0 :             tracing::debug!(%conn_info, "cancelling connection");
     385            0 :             conn.as_mut().graceful_shutdown();
     386            0 :             conn.await
     387              :         }
     388            0 :         Either::Right((res, _)) => res,
     389              :     };
     390              : 
     391            0 :     match res {
     392            0 :         Ok(()) => tracing::info!(%conn_info, "HTTP connection closed"),
     393            0 :         Err(e) => tracing::warn!(%conn_info, "HTTP connection error {e}"),
     394              :     }
     395            0 : }
     396              : 
     397              : #[allow(clippy::too_many_arguments)]
     398            0 : async fn request_handler(
     399            0 :     mut request: hyper::Request<Incoming>,
     400            0 :     config: &'static ProxyConfig,
     401            0 :     backend: Arc<PoolingBackend>,
     402            0 :     ws_connections: TaskTracker,
     403            0 :     cancellation_handler: Arc<CancellationHandler>,
     404            0 :     session_id: uuid::Uuid,
     405            0 :     conn_info: ConnectionInfo,
     406            0 :     // used to cancel in-flight HTTP requests. not used to cancel websockets
     407            0 :     http_cancellation_token: CancellationToken,
     408            0 :     endpoint_rate_limiter: Arc<EndpointRateLimiter>,
     409            0 :     cancellations: TaskTracker,
     410            0 : ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
     411            0 :     let host = request
     412            0 :         .headers()
     413            0 :         .get("host")
     414            0 :         .and_then(|h| h.to_str().ok())
     415            0 :         .and_then(|h| h.split(':').next())
     416            0 :         .map(|s| s.to_string());
     417              : 
     418              :     // Check if the request is a websocket upgrade request.
     419            0 :     if config.http_config.accept_websockets
     420            0 :         && framed_websockets::upgrade::is_upgrade_request(&request)
     421              :     {
     422            0 :         let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Ws);
     423              : 
     424            0 :         ctx.set_user_agent(
     425            0 :             request
     426            0 :                 .headers()
     427            0 :                 .get(hyper::header::USER_AGENT)
     428            0 :                 .and_then(|h| h.to_str().ok())
     429            0 :                 .map(Into::into),
     430              :         );
     431              : 
     432            0 :         let span = ctx.span();
     433            0 :         info!(parent: &span, "performing websocket upgrade");
     434              : 
     435            0 :         let (response, websocket) = framed_websockets::upgrade::upgrade(&mut request)
     436            0 :             .map_err(|e| ApiError::BadRequest(e.into()))?;
     437              : 
     438            0 :         let cancellations = cancellations.clone();
     439            0 :         ws_connections.spawn(
     440            0 :             async move {
     441            0 :                 if let Err(e) = websocket::serve_websocket(
     442            0 :                     config,
     443            0 :                     backend.auth_backend,
     444            0 :                     ctx,
     445            0 :                     websocket,
     446            0 :                     cancellation_handler,
     447            0 :                     endpoint_rate_limiter,
     448            0 :                     host,
     449            0 :                     cancellations,
     450              :                 )
     451            0 :                 .await
     452              :                 {
     453            0 :                     warn!("error in websocket connection: {e:#}");
     454            0 :                 }
     455            0 :             }
     456            0 :             .instrument(span),
     457              :         );
     458              : 
     459              :         // Return the response so the spawned future can continue.
     460            0 :         Ok(response.map(|b| b.map_err(|x| match x {}).boxed()))
     461            0 :     } else if request.uri().path() == "/sql" && *request.method() == Method::POST {
     462            0 :         let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http);
     463            0 :         let span = ctx.span();
     464              : 
     465            0 :         let testodrome_id = request
     466            0 :             .headers()
     467            0 :             .get("X-Neon-Query-ID")
     468            0 :             .and_then(|value| value.to_str().ok())
     469            0 :             .map(|s| s.to_string());
     470              : 
     471            0 :         if let Some(query_id) = testodrome_id {
     472            0 :             info!(parent: &ctx.span(), "testodrome query ID: {query_id}");
     473            0 :             ctx.set_testodrome_id(query_id.into());
     474            0 :         }
     475              : 
     476            0 :         sql_over_http::handle(config, ctx, request, backend, http_cancellation_token)
     477            0 :             .instrument(span)
     478            0 :             .await
     479            0 :     } else if request.uri().path() == "/sql" && *request.method() == Method::OPTIONS {
     480            0 :         Response::builder()
     481            0 :             .header("Allow", "OPTIONS, POST")
     482            0 :             .header("Access-Control-Allow-Origin", "*")
     483            0 :             .header(
     484              :                 "Access-Control-Allow-Headers",
     485              :                 "Authorization, Neon-Connection-String, Neon-Raw-Text-Output, Neon-Array-Mode, Neon-Pool-Opt-In, Neon-Batch-Read-Only, Neon-Batch-Isolation-Level",
     486              :             )
     487            0 :             .header("Access-Control-Max-Age", "86400" /* 24 hours */)
     488            0 :             .status(StatusCode::OK) // 204 is also valid, but see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS#status_code
     489            0 :             .body(Empty::new().map_err(|x| match x {}).boxed())
     490            0 :             .map_err(|e| ApiError::InternalServerError(e.into()))
     491              :     } else {
     492              :         #[cfg(feature = "rest_broker")]
     493              :         {
     494            0 :             if config.rest_config.is_rest_broker
     495              :             // we are testing for the path to be /database_name/rest/...
     496            0 :                 && request
     497            0 :                     .uri()
     498            0 :                     .path()
     499            0 :                     .split('/')
     500            0 :                     .nth(2)
     501            0 :                     .is_some_and(|part| part.starts_with("rest"))
     502              :             {
     503            0 :                 let ctx =
     504            0 :                     RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http);
     505            0 :                 let span = ctx.span();
     506              : 
     507            0 :                 let testodrome_id = request
     508            0 :                     .headers()
     509            0 :                     .get("X-Neon-Query-ID")
     510            0 :                     .and_then(|value| value.to_str().ok())
     511            0 :                     .map(|s| s.to_string());
     512              : 
     513            0 :                 if let Some(query_id) = testodrome_id {
     514            0 :                     info!(parent: &span, "testodrome query ID: {query_id}");
     515            0 :                     ctx.set_testodrome_id(query_id.into());
     516            0 :                 }
     517              : 
     518            0 :                 rest::handle(config, ctx, request, backend, http_cancellation_token)
     519            0 :                     .instrument(span)
     520            0 :                     .await
     521              :             } else {
     522            0 :                 json_response(StatusCode::BAD_REQUEST, "query is not supported")
     523              :             }
     524              :         }
     525              :         #[cfg(not(feature = "rest_broker"))]
     526              :         {
     527              :             json_response(StatusCode::BAD_REQUEST, "query is not supported")
     528              :         }
     529              :     }
     530            0 : }
        

Generated by: LCOV version 2.1-beta