Line data Source code
1 : use std::time::Duration;
2 :
3 : use proto::TenantTimelineId as ProtoTenantTimelineId;
4 : use proto::broker_service_client::BrokerServiceClient;
5 : use tonic::Status;
6 : use tonic::codegen::StdError;
7 : use tonic::transport::{Channel, Endpoint};
8 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
9 :
10 : // Code generated by protobuf.
11 : pub mod proto {
12 : // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
13 : // we don't use these types for anything but broker data transmission,
14 : // so it's ok to ignore this one.
15 : #![allow(clippy::derive_partial_eq_without_eq)]
16 : tonic::include_proto!("storage_broker");
17 : }
18 :
19 : pub mod metrics;
20 :
21 : // Re-exports to avoid direct tonic dependency in user crates.
22 : pub use hyper::Uri;
23 : pub use tonic::transport::{Certificate, ClientTlsConfig};
24 : pub use tonic::{Code, Request, Streaming};
25 :
26 : pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
27 : pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
28 :
29 : pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
30 : pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
31 :
32 : // BrokerServiceClient charged with tonic provided Channel transport; helps to
33 : // avoid depending on tonic directly in user crates.
34 : pub type BrokerClientChannel = BrokerServiceClient<Channel>;
35 :
36 : // Create connection object configured to run TLS if schema starts with https://
37 : // and plain text otherwise. Connection is lazy, only endpoint sanity is
38 : // validated here.
39 : //
40 : // NB: this function is not async, but still must be run on a tokio runtime thread
41 : // because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
42 0 : pub fn connect<U>(
43 0 : endpoint: U,
44 0 : keepalive_interval: Duration,
45 0 : tls_config: ClientTlsConfig,
46 0 : ) -> anyhow::Result<BrokerClientChannel>
47 0 : where
48 0 : U: std::convert::TryInto<Uri>,
49 0 : U::Error: std::error::Error + Send + Sync + 'static,
50 0 : {
51 0 : let uri: Uri = endpoint.try_into()?;
52 0 : let mut tonic_endpoint: Endpoint = uri.into();
53 : // If schema starts with https, start encrypted connection; do plain text
54 : // otherwise.
55 0 : if let Some("https") = tonic_endpoint.uri().scheme_str() {
56 : // if there's no default provider and both ring+aws-lc-rs are enabled
57 : // this the tls settings on tonic will not work.
58 : // erroring is ok.
59 0 : rustls::crypto::ring::default_provider()
60 0 : .install_default()
61 0 : .ok();
62 0 : tonic_endpoint = tonic_endpoint.tls_config(tls_config)?;
63 0 : }
64 0 : tonic_endpoint = tonic_endpoint
65 0 : .http2_keep_alive_interval(keepalive_interval)
66 0 : .keep_alive_while_idle(true)
67 0 : .connect_timeout(DEFAULT_CONNECT_TIMEOUT);
68 0 : // keep_alive_timeout is 20s by default on both client and server side
69 0 : let channel = tonic_endpoint.connect_lazy();
70 0 : Ok(BrokerClientChannel::new(channel))
71 0 : }
72 :
73 : impl BrokerClientChannel {
74 : /// Create a new client to the given endpoint, but don't actually connect until the first request.
75 0 : pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
76 0 : where
77 0 : D: std::convert::TryInto<tonic::transport::Endpoint>,
78 0 : D::Error: Into<StdError>,
79 0 : {
80 0 : let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
81 0 : Ok(Self::new(conn))
82 0 : }
83 : }
84 :
85 : // parse variable length bytes from protobuf
86 : #[allow(clippy::result_large_err, reason = "TODO")]
87 2 : pub fn parse_proto_ttid(proto_ttid: &ProtoTenantTimelineId) -> Result<TenantTimelineId, Status> {
88 2 : let tenant_id = TenantId::from_slice(&proto_ttid.tenant_id)
89 2 : .map_err(|e| Status::new(Code::InvalidArgument, format!("malformed tenant_id: {}", e)))?;
90 2 : let timeline_id = TimelineId::from_slice(&proto_ttid.timeline_id).map_err(|e| {
91 0 : Status::new(
92 0 : Code::InvalidArgument,
93 0 : format!("malformed timeline_id: {}", e),
94 0 : )
95 2 : })?;
96 2 : Ok(TenantTimelineId {
97 2 : tenant_id,
98 2 : timeline_id,
99 2 : })
100 2 : }
|