TLA Line data Source code
1 : use hyper::body::HttpBody;
2 : use std::pin::Pin;
3 : use std::task::{Context, Poll};
4 : use std::time::Duration;
5 : use tonic::codegen::StdError;
6 : use tonic::transport::{ClientTlsConfig, Endpoint};
7 : use tonic::{transport::Channel, Code, Status};
8 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
9 :
10 : use proto::{
11 : broker_service_client::BrokerServiceClient, TenantTimelineId as ProtoTenantTimelineId,
12 : };
13 :
14 : // Code generated by protobuf.
15 : pub mod proto {
16 : // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
17 : // we don't use these types for anything but broker data transmission,
18 : // so it's ok to ignore this one.
19 : #![allow(clippy::derive_partial_eq_without_eq)]
20 : tonic::include_proto!("storage_broker");
21 : }
22 :
23 : pub mod metrics;
24 :
25 : // Re-exports to avoid direct tonic dependency in user crates.
26 : pub use tonic::Request;
27 : pub use tonic::Streaming;
28 :
29 : pub use hyper::Uri;
30 :
31 : pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
32 : pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
33 :
34 : pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
35 : pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
36 :
37 : // BrokerServiceClient charged with tonic provided Channel transport; helps to
38 : // avoid depending on tonic directly in user crates.
39 : pub type BrokerClientChannel = BrokerServiceClient<Channel>;
40 :
41 : // Create connection object configured to run TLS if schema starts with https://
42 : // and plain text otherwise. Connection is lazy, only endpoint sanity is
43 : // validated here.
44 : //
45 : // NB: this function is not async, but still must be run on a tokio runtime thread
46 : // because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
47 CBC 1560 : pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
48 1560 : where
49 1560 : U: std::convert::TryInto<Uri>,
50 1560 : U::Error: std::error::Error + Send + Sync + 'static,
51 1560 : {
52 1560 : let uri: Uri = endpoint.try_into()?;
53 1560 : let mut tonic_endpoint: Endpoint = uri.into();
54 : // If schema starts with https, start encrypted connection; do plain text
55 : // otherwise.
56 1560 : if let Some("https") = tonic_endpoint.uri().scheme_str() {
57 UBC 0 : let tls = ClientTlsConfig::new();
58 0 : tonic_endpoint = tonic_endpoint.tls_config(tls)?;
59 CBC 1560 : }
60 1560 : tonic_endpoint = tonic_endpoint
61 1560 : .http2_keep_alive_interval(keepalive_interval)
62 1560 : .keep_alive_while_idle(true)
63 1560 : .connect_timeout(DEFAULT_CONNECT_TIMEOUT);
64 1560 : // keep_alive_timeout is 20s by default on both client and server side
65 1560 : let channel = tonic_endpoint.connect_lazy();
66 1560 : Ok(BrokerClientChannel::new(channel))
67 1560 : }
68 :
69 : impl BrokerClientChannel {
70 : /// Create a new client to the given endpoint, but don't actually connect until the first request.
71 UBC 0 : pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
72 0 : where
73 0 : D: std::convert::TryInto<tonic::transport::Endpoint>,
74 0 : D::Error: Into<StdError>,
75 0 : {
76 0 : let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
77 0 : Ok(Self::new(conn))
78 0 : }
79 : }
80 :
81 : // parse variable length bytes from protobuf
82 CBC 17529 : pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTimelineId, Status> {
83 17529 : let tenant_id = TenantId::from_slice(&proto_ttid.tenant_id)
84 17529 : .map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?;
85 17529 : let timeline_id = TimelineId::from_slice(&proto_ttid.timeline_id).map_err(|e| {
86 UBC 0 : Status::new(
87 0 : Code::InvalidArgument,
88 0 : format!("malformed timeline_id: {}", e),
89 0 : )
90 CBC 17529 : })?;
91 17529 : Ok(TenantTimelineId {
92 17529 : tenant_id,
93 17529 : timeline_id,
94 17529 : })
95 17529 : }
96 :
97 : // These several usages don't justify anyhow dependency, though it would work as
98 : // well.
99 : type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
100 :
101 : // Provides impl HttpBody for two different types implementing it. Inspired by
102 : // https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs
103 : pub enum EitherBody<A, B> {
104 : Left(A),
105 : Right(B),
106 : }
107 :
108 : impl<A, B> HttpBody for EitherBody<A, B>
109 : where
110 : A: HttpBody + Send + Unpin,
111 : B: HttpBody<Data = A::Data> + Send + Unpin,
112 : A::Error: Into<AnyError>,
113 : B::Error: Into<AnyError>,
114 : {
115 : type Data = A::Data;
116 : type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
117 :
118 17712 : fn is_end_stream(&self) -> bool {
119 17712 : match self {
120 17707 : EitherBody::Left(b) => b.is_end_stream(),
121 5 : EitherBody::Right(b) => b.is_end_stream(),
122 : }
123 17712 : }
124 :
125 45339 : fn poll_data(
126 45339 : self: Pin<&mut Self>,
127 45339 : cx: &mut Context<'_>,
128 45339 : ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
129 45339 : match self.get_mut() {
130 45339 : EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
131 UBC 0 : EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
132 : }
133 CBC 45339 : }
134 :
135 UBC 0 : fn poll_trailers(
136 0 : self: Pin<&mut Self>,
137 0 : cx: &mut Context<'_>,
138 0 : ) -> Poll<Result<Option<hyper::HeaderMap>, Self::Error>> {
139 0 : match self.get_mut() {
140 0 : EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
141 0 : EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
142 : }
143 0 : }
144 : }
145 :
146 CBC 15593 : fn map_option_err<T, U: Into<AnyError>>(err: Option<Result<T, U>>) -> Option<Result<T, AnyError>> {
147 15593 : err.map(|e| e.map_err(Into::into))
148 15593 : }
|