Line data Source code
1 : use hyper::body::HttpBody;
2 : use std::pin::Pin;
3 : use std::task::{Context, Poll};
4 : use std::time::Duration;
5 : use tonic::codegen::StdError;
6 : use tonic::transport::{ClientTlsConfig, Endpoint};
7 : use tonic::{transport::Channel, Status};
8 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
9 :
10 : use proto::{
11 : broker_service_client::BrokerServiceClient, TenantTimelineId as ProtoTenantTimelineId,
12 : };
13 :
14 : // Code generated by protobuf.
15 : pub mod proto {
16 : // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
17 : // we don't use these types for anything but broker data transmission,
18 : // so it's ok to ignore this one.
19 : #![allow(clippy::derive_partial_eq_without_eq)]
20 : tonic::include_proto!("storage_broker");
21 : }
22 :
23 : pub mod metrics;
24 :
25 : // Re-exports to avoid direct tonic dependency in user crates.
26 : pub use tonic::Code;
27 : pub use tonic::Request;
28 : pub use tonic::Streaming;
29 :
30 : pub use hyper::Uri;
31 :
32 : pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
33 : pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
34 :
35 : pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
36 : pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
37 :
38 : // BrokerServiceClient charged with tonic provided Channel transport; helps to
39 : // avoid depending on tonic directly in user crates.
40 : pub type BrokerClientChannel = BrokerServiceClient<Channel>;
41 :
42 : // Create connection object configured to run TLS if schema starts with https://
43 : // and plain text otherwise. Connection is lazy, only endpoint sanity is
44 : // validated here.
45 : //
46 : // NB: this function is not async, but still must be run on a tokio runtime thread
47 : // because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
48 0 : pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
49 0 : where
50 0 : U: std::convert::TryInto<Uri>,
51 0 : U::Error: std::error::Error + Send + Sync + 'static,
52 0 : {
53 0 : let uri: Uri = endpoint.try_into()?;
54 0 : let mut tonic_endpoint: Endpoint = uri.into();
55 : // If schema starts with https, start encrypted connection; do plain text
56 : // otherwise.
57 0 : if let Some("https") = tonic_endpoint.uri().scheme_str() {
58 0 : let tls = ClientTlsConfig::new();
59 0 : tonic_endpoint = tonic_endpoint.tls_config(tls)?;
60 0 : }
61 0 : tonic_endpoint = tonic_endpoint
62 0 : .http2_keep_alive_interval(keepalive_interval)
63 0 : .keep_alive_while_idle(true)
64 0 : .connect_timeout(DEFAULT_CONNECT_TIMEOUT);
65 0 : // keep_alive_timeout is 20s by default on both client and server side
66 0 : let channel = tonic_endpoint.connect_lazy();
67 0 : Ok(BrokerClientChannel::new(channel))
68 0 : }
69 :
70 : impl BrokerClientChannel {
71 : /// Create a new client to the given endpoint, but don't actually connect until the first request.
72 0 : pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
73 0 : where
74 0 : D: std::convert::TryInto<tonic::transport::Endpoint>,
75 0 : D::Error: Into<StdError>,
76 0 : {
77 0 : let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
78 0 : Ok(Self::new(conn))
79 0 : }
80 : }
81 :
82 : // parse variable length bytes from protobuf
83 2 : pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTimelineId, Status> {
84 2 : let tenant_id = TenantId::from_slice(&proto_ttid.tenant_id)
85 2 : .map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?;
86 2 : let timeline_id = TimelineId::from_slice(&proto_ttid.timeline_id).map_err(|e| {
87 0 : Status::new(
88 0 : Code::InvalidArgument,
89 0 : format!("malformed timeline_id: {}", e),
90 0 : )
91 2 : })?;
92 2 : Ok(TenantTimelineId {
93 2 : tenant_id,
94 2 : timeline_id,
95 2 : })
96 2 : }
97 :
98 : // These several usages don't justify anyhow dependency, though it would work as
99 : // well.
100 : type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;
101 :
102 : // Provides impl HttpBody for two different types implementing it. Inspired by
103 : // https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp/server.rs
104 : pub enum EitherBody<A, B> {
105 : Left(A),
106 : Right(B),
107 : }
108 :
109 : impl<A, B> HttpBody for EitherBody<A, B>
110 : where
111 : A: HttpBody + Send + Unpin,
112 : B: HttpBody<Data = A::Data> + Send + Unpin,
113 : A::Error: Into<AnyError>,
114 : B::Error: Into<AnyError>,
115 : {
116 : type Data = A::Data;
117 : type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
118 :
119 0 : fn is_end_stream(&self) -> bool {
120 0 : match self {
121 0 : EitherBody::Left(b) => b.is_end_stream(),
122 0 : EitherBody::Right(b) => b.is_end_stream(),
123 : }
124 0 : }
125 :
126 0 : fn poll_data(
127 0 : self: Pin<&mut Self>,
128 0 : cx: &mut Context<'_>,
129 0 : ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
130 0 : match self.get_mut() {
131 0 : EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
132 0 : EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
133 : }
134 0 : }
135 :
136 0 : fn poll_trailers(
137 0 : self: Pin<&mut Self>,
138 0 : cx: &mut Context<'_>,
139 0 : ) -> Poll<Result<Option<hyper::HeaderMap>, Self::Error>> {
140 0 : match self.get_mut() {
141 0 : EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
142 0 : EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
143 : }
144 0 : }
145 : }
146 :
147 0 : fn map_option_err<T, U: Into<AnyError>>(err: Option<Result<T, U>>) -> Option<Result<T, AnyError>> {
148 0 : err.map(|e| e.map_err(Into::into))
149 0 : }
|