LCOV - code coverage report
Current view: top level - storage_broker/src - lib.rs (source / functions) Coverage Total Hit
Test: f8d8f5b90fa487a9e82c42da223f012f5d4fece7.info Lines: 15.4 % 65 10
Test Date: 2024-09-19 20:36:02 Functions: 5.3 % 19 1

            Line data    Source code
       1              : use hyper::body::HttpBody;
       2              : use std::pin::Pin;
       3              : use std::task::{Context, Poll};
       4              : use std::time::Duration;
       5              : use tonic::codegen::StdError;
       6              : use tonic::transport::{ClientTlsConfig, Endpoint};
       7              : use tonic::{transport::Channel, Status};
       8              : use utils::id::{TenantId, TenantTimelineId, TimelineId};
       9              : 
      10              : use proto::{
      11              :     broker_service_client::BrokerServiceClient, TenantTimelineId as ProtoTenantTimelineId,
      12              : };
      13              : 
      14              : // Code generated by protobuf.
      15              : pub mod proto {
      16              :     // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
      17              :     // we don't use these types for anything but broker data transmission,
      18              :     // so it's ok to ignore this one.
      19              :     #![allow(clippy::derive_partial_eq_without_eq)]
      20              :     tonic::include_proto!("storage_broker");
      21              : }
      22              : 
      23              : pub mod metrics;
      24              : 
      25              : // Re-exports to avoid direct tonic dependency in user crates.
      26              : pub use tonic::Code;
      27              : pub use tonic::Request;
      28              : pub use tonic::Streaming;
      29              : 
      30              : pub use hyper::Uri;
      31              : 
      32              : pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
      33              : pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
      34              : 
      35              : pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
      36              : pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
      37              : 
      38              : // BrokerServiceClient charged with tonic provided Channel transport; helps to
      39              : // avoid depending on tonic directly in user crates.
      40              : pub type BrokerClientChannel = BrokerServiceClient<Channel>;
      41              : 
      42              : // Create connection object configured to run TLS if schema starts with https://
      43              : // and plain text otherwise. Connection is lazy, only endpoint sanity is
      44              : // validated here.
      45              : //
      46              : // NB: this function is not async, but still must be run on a tokio runtime thread
      47              : // because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
      48            0 : pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
      49            0 : where
      50            0 :     U: std::convert::TryInto<Uri>,
      51            0 :     U::Error: std::error::Error + Send + Sync + 'static,
      52            0 : {
      53            0 :     let uri: Uri = endpoint.try_into()?;
      54            0 :     let mut tonic_endpoint: Endpoint = uri.into();
      55              :     // If schema starts with https, start encrypted connection; do plain text
      56              :     // otherwise.
      57            0 :     if let Some("https") = tonic_endpoint.uri().scheme_str() {
      58            0 :         let tls = ClientTlsConfig::new();
      59            0 :         tonic_endpoint = tonic_endpoint.tls_config(tls)?;
      60            0 :     }
      61            0 :     tonic_endpoint = tonic_endpoint
      62            0 :         .http2_keep_alive_interval(keepalive_interval)
      63            0 :         .keep_alive_while_idle(true)
      64            0 :         .connect_timeout(DEFAULT_CONNECT_TIMEOUT);
      65            0 :     //  keep_alive_timeout is 20s by default on both client and server side
      66            0 :     let channel = tonic_endpoint.connect_lazy();
      67            0 :     Ok(BrokerClientChannel::new(channel))
      68            0 : }
      69              : 
      70              : impl BrokerClientChannel {
      71              :     /// Create a new client to the given endpoint, but don't actually connect until the first request.
      72            0 :     pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
      73            0 :     where
      74            0 :         D: std::convert::TryInto<tonic::transport::Endpoint>,
      75            0 :         D::Error: Into<StdError>,
      76            0 :     {
      77            0 :         let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
      78            0 :         Ok(Self::new(conn))
      79            0 :     }
      80              : }
      81              : 
      82              : // parse variable length bytes from protobuf
      83            2 : pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTimelineId, Status> {
      84            2 :     let tenant_id = TenantId::from_slice(&proto_ttid.tenant_id)
      85            2 :         .map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?;
      86            2 :     let timeline_id = TimelineId::from_slice(&proto_ttid.timeline_id).map_err(|e| {
      87            0 :         Status::new(
      88            0 :             Code::InvalidArgument,
      89            0 :             format!("malformed timeline_id: {}", e),
      90            0 :         )
      91            2 :     })?;
      92            2 :     Ok(TenantTimelineId {
      93            2 :         tenant_id,
      94            2 :         timeline_id,
      95            2 :     })
      96            2 : }
      97              : 
      98              : // These several usages don't justify anyhow dependency, though it would work as
      99              : // well.
     100              : type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
     101              : 
     102              : // Provides impl HttpBody for two different types implementing it. Inspired by
     103              : // https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs
     104              : pub enum EitherBody<A, B> {
     105              :     Left(A),
     106              :     Right(B),
     107              : }
     108              : 
     109              : impl<A, B> HttpBody for EitherBody<A, B>
     110              : where
     111              :     A: HttpBody + Send + Unpin,
     112              :     B: HttpBody<Data = A::Data> + Send + Unpin,
     113              :     A::Error: Into<AnyError>,
     114              :     B::Error: Into<AnyError>,
     115              : {
     116              :     type Data = A::Data;
     117              :     type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
     118              : 
     119            0 :     fn is_end_stream(&self) -> bool {
     120            0 :         match self {
     121            0 :             EitherBody::Left(b) => b.is_end_stream(),
     122            0 :             EitherBody::Right(b) => b.is_end_stream(),
     123              :         }
     124            0 :     }
     125              : 
     126            0 :     fn poll_data(
     127            0 :         self: Pin<&mut Self>,
     128            0 :         cx: &mut Context<'_>,
     129            0 :     ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
     130            0 :         match self.get_mut() {
     131            0 :             EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
     132            0 :             EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
     133              :         }
     134            0 :     }
     135              : 
     136            0 :     fn poll_trailers(
     137            0 :         self: Pin<&mut Self>,
     138            0 :         cx: &mut Context<'_>,
     139            0 :     ) -> Poll<Result<Option<hyper::HeaderMap>, Self::Error>> {
     140            0 :         match self.get_mut() {
     141            0 :             EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
     142            0 :             EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
     143              :         }
     144            0 :     }
     145              : }
     146              : 
     147            0 : fn map_option_err<T, U: Into<AnyError>>(err: Option<Result<T, U>>) -> Option<Result<T, AnyError>> {
     148            0 :     err.map(|e| e.map_err(Into::into))
     149            0 : }
        

Generated by: LCOV version 2.1-beta