LCOV - differential code coverage report
Current view: top level - storage_broker/src - lib.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 64.6 % 65 42 23 42
Current Date: 2023-10-19 02:04:12 Functions: 31.6 % 19 6 13 6
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : use hyper::body::HttpBody;
       2                 : use std::pin::Pin;
       3                 : use std::task::{Context, Poll};
       4                 : use std::time::Duration;
       5                 : use tonic::codegen::StdError;
       6                 : use tonic::transport::{ClientTlsConfig, Endpoint};
       7                 : use tonic::{transport::Channel, Code, Status};
       8                 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
       9                 : 
      10                 : use proto::{
      11                 :     broker_service_client::BrokerServiceClient, TenantTimelineId as ProtoTenantTimelineId,
      12                 : };
      13                 : 
      14                 : // Code generated by protobuf.
      15                 : pub mod proto {
      16                 :     // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
      17                 :     // we don't use these types for anything but broker data transmission,
      18                 :     // so it's ok to ignore this one.
      19                 :     #![allow(clippy::derive_partial_eq_without_eq)]
      20                 :     tonic::include_proto!("storage_broker");
      21                 : }
      22                 : 
      23                 : pub mod metrics;
      24                 : 
      25                 : // Re-exports to avoid direct tonic dependency in user crates.
      26                 : pub use tonic::Request;
      27                 : pub use tonic::Streaming;
      28                 : 
      29                 : pub use hyper::Uri;
      30                 : 
      31                 : pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
      32                 : pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
      33                 : 
      34                 : pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
      35                 : pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
      36                 : 
      37                 : // BrokerServiceClient charged with tonic provided Channel transport; helps to
      38                 : // avoid depending on tonic directly in user crates.
      39                 : pub type BrokerClientChannel = BrokerServiceClient<Channel>;
      40                 : 
      41                 : // Create connection object configured to run TLS if schema starts with https://
      42                 : // and plain text otherwise. Connection is lazy, only endpoint sanity is
      43                 : // validated here.
      44                 : //
      45                 : // NB: this function is not async, but still must be run on a tokio runtime thread
      46                 : // because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
      47 CBC        1560 : pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
      48            1560 : where
      49            1560 :     U: std::convert::TryInto<Uri>,
      50            1560 :     U::Error: std::error::Error + Send + Sync + 'static,
      51            1560 : {
      52            1560 :     let uri: Uri = endpoint.try_into()?;
      53            1560 :     let mut tonic_endpoint: Endpoint = uri.into();
      54                 :     // If schema starts with https, start encrypted connection; do plain text
      55                 :     // otherwise.
      56            1560 :     if let Some("https") = tonic_endpoint.uri().scheme_str() {
      57 UBC           0 :         let tls = ClientTlsConfig::new();
      58               0 :         tonic_endpoint = tonic_endpoint.tls_config(tls)?;
      59 CBC        1560 :     }
      60            1560 :     tonic_endpoint = tonic_endpoint
      61            1560 :         .http2_keep_alive_interval(keepalive_interval)
      62            1560 :         .keep_alive_while_idle(true)
      63            1560 :         .connect_timeout(DEFAULT_CONNECT_TIMEOUT);
      64            1560 :     //  keep_alive_timeout is 20s by default on both client and server side
      65            1560 :     let channel = tonic_endpoint.connect_lazy();
      66            1560 :     Ok(BrokerClientChannel::new(channel))
      67            1560 : }
      68                 : 
      69                 : impl BrokerClientChannel {
      70                 :     /// Create a new client to the given endpoint, but don't actually connect until the first request.
      71 UBC           0 :     pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
      72               0 :     where
      73               0 :         D: std::convert::TryInto<tonic::transport::Endpoint>,
      74               0 :         D::Error: Into<StdError>,
      75               0 :     {
      76               0 :         let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
      77               0 :         Ok(Self::new(conn))
      78               0 :     }
      79                 : }
      80                 : 
      81                 : // parse variable length bytes from protobuf
      82 CBC       17529 : pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTimelineId, Status> {
      83           17529 :     let tenant_id = TenantId::from_slice(&proto_ttid.tenant_id)
      84           17529 :         .map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?;
      85           17529 :     let timeline_id = TimelineId::from_slice(&proto_ttid.timeline_id).map_err(|e| {
      86 UBC           0 :         Status::new(
      87               0 :             Code::InvalidArgument,
      88               0 :             format!("malformed timeline_id: {}", e),
      89               0 :         )
      90 CBC       17529 :     })?;
      91           17529 :     Ok(TenantTimelineId {
      92           17529 :         tenant_id,
      93           17529 :         timeline_id,
      94           17529 :     })
      95           17529 : }
      96                 : 
      97                 : // These several usages don't justify anyhow dependency, though it would work as
      98                 : // well.
      99                 : type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
     100                 : 
     101                 : // Provides impl HttpBody for two different types implementing it. Inspired by
     102                 : // https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs
     103                 : pub enum EitherBody<A, B> {
     104                 :     Left(A),
     105                 :     Right(B),
     106                 : }
     107                 : 
     108                 : impl<A, B> HttpBody for EitherBody<A, B>
     109                 : where
     110                 :     A: HttpBody + Send + Unpin,
     111                 :     B: HttpBody<Data = A::Data> + Send + Unpin,
     112                 :     A::Error: Into<AnyError>,
     113                 :     B::Error: Into<AnyError>,
     114                 : {
     115                 :     type Data = A::Data;
     116                 :     type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
     117                 : 
     118           17712 :     fn is_end_stream(&self) -> bool {
     119           17712 :         match self {
     120           17707 :             EitherBody::Left(b) => b.is_end_stream(),
     121               5 :             EitherBody::Right(b) => b.is_end_stream(),
     122                 :         }
     123           17712 :     }
     124                 : 
     125           45339 :     fn poll_data(
     126           45339 :         self: Pin<&mut Self>,
     127           45339 :         cx: &mut Context<'_>,
     128           45339 :     ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
     129           45339 :         match self.get_mut() {
     130           45339 :             EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
     131 UBC           0 :             EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
     132                 :         }
     133 CBC       45339 :     }
     134                 : 
     135 UBC           0 :     fn poll_trailers(
     136               0 :         self: Pin<&mut Self>,
     137               0 :         cx: &mut Context<'_>,
     138               0 :     ) -> Poll<Result<Option<hyper::HeaderMap>, Self::Error>> {
     139               0 :         match self.get_mut() {
     140               0 :             EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
     141               0 :             EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
     142                 :         }
     143               0 :     }
     144                 : }
     145                 : 
     146 CBC       15593 : fn map_option_err<T, U: Into<AnyError>>(err: Option<Result<T, U>>) -> Option<Result<T, AnyError>> {
     147           15593 :     err.map(|e| e.map_err(Into::into))
     148           15593 : }
        

Generated by: LCOV version 2.1-beta