LCOV - differential code coverage report
Current view: top level - storage_broker/src/bin - storage_broker.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 91.5 % 377 345 32 345
Current Date: 2023-10-19 02:04:12 Functions: 69.0 % 71 49 22 49
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Simple pub-sub based on grpc (tonic) and Tokio broadcast channel for storage
       2                 : //! nodes messaging.
       3                 : //!
       4                 : //! Subscriptions to 1) single timeline 2) all timelines are possible. We could
       5                 : //! add subscription to the set of timelines to save grpc streams, but testing
       6                 : //! shows many individual streams is also ok.
       7                 : //!
       8                 : //! Message is dropped if subscriber can't consume it, not affecting other
       9                 : //! subscribers.
      10                 : //!
      11                 : //! Only safekeeper message is supported, but it is not hard to add something
      12                 : //! else with generics.
      13                 : use clap::{command, Parser};
      14                 : use futures_core::Stream;
      15                 : use futures_util::StreamExt;
      16                 : use hyper::header::CONTENT_TYPE;
      17                 : use hyper::server::conn::AddrStream;
      18                 : use hyper::service::{make_service_fn, service_fn};
      19                 : use hyper::{Body, Method, StatusCode};
      20                 : use parking_lot::RwLock;
      21                 : use std::collections::HashMap;
      22                 : use std::convert::Infallible;
      23                 : use std::net::SocketAddr;
      24                 : use std::pin::Pin;
      25                 : use std::sync::Arc;
      26                 : use std::time::Duration;
      27                 : use tokio::sync::broadcast;
      28                 : use tokio::sync::broadcast::error::RecvError;
      29                 : use tokio::time;
      30                 : use tonic::codegen::Service;
      31                 : use tonic::transport::server::Connected;
      32                 : use tonic::Code;
      33                 : use tonic::{Request, Response, Status};
      34                 : use tracing::*;
      35                 : use utils::signals::ShutdownSignals;
      36                 : 
      37                 : use metrics::{Encoder, TextEncoder};
      38                 : use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
      39                 : use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
      40                 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
      41                 : use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
      42                 : use storage_broker::{
      43                 :     parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
      44                 : };
      45                 : use utils::id::TenantTimelineId;
      46                 : use utils::logging::{self, LogFormat};
      47                 : use utils::project_git_version;
      48                 : use utils::sentry_init::init_sentry;
      49                 : 
      50                 : project_git_version!(GIT_VERSION);
      51                 : 
      52                 : const DEFAULT_CHAN_SIZE: usize = 32;
      53                 : const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
      54                 : 
      55 CBC         704 : #[derive(Parser, Debug)]
      56                 : #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
      57                 : struct Args {
      58                 :     /// Endpoint to listen on.
      59                 :     #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
      60 UBC           0 :     listen_addr: SocketAddr,
      61                 :     /// Size of the queue to the per timeline subscriber.
      62 CBC         352 :     #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
      63 UBC           0 :     timeline_chan_size: usize,
      64               0 :     /// Size of the queue to the all keys subscriber.
      65 CBC         352 :     #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
      66 UBC           0 :     all_keys_chan_size: usize,
      67               0 :     /// HTTP/2 keepalive interval.
      68                 :     #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
      69               0 :     http2_keepalive_interval: Duration,
      70                 :     /// Format for logging, either 'plain' or 'json'.
      71                 :     #[arg(long, default_value = "plain")]
      72               0 :     log_format: String,
      73                 : }
      74                 : 
      75                 : type PubId = u64; // id of publisher for registering in maps
      76                 : type SubId = u64; // id of subscriber for registering in maps
      77                 : 
      78 CBC        3222 : #[derive(Copy, Clone, Debug)]
      79                 : enum SubscriptionKey {
      80                 :     All,
      81                 :     Timeline(TenantTimelineId),
      82                 : }
      83                 : 
      84                 : impl SubscriptionKey {
      85                 :     // Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
      86            1611 :     pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
      87            1611 :         match key {
      88             500 :             ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
      89            1111 :             ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
      90            1111 :                 Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
      91                 :             }
      92                 :         }
      93            1611 :     }
      94                 : }
      95                 : 
      96                 : // Channel to timeline subscribers.
      97                 : struct ChanToTimelineSub {
      98                 :     chan: broadcast::Sender<SafekeeperTimelineInfo>,
      99                 :     // Tracked separately to know when delete the shmem entry. receiver_count()
     100                 :     // is unhandy for that as unregistering and dropping the receiver side
     101                 :     // happens at different moments.
     102                 :     num_subscribers: u64,
     103                 : }
     104                 : 
     105                 : struct SharedState {
     106                 :     next_pub_id: PubId,
     107                 :     num_pubs: i64,
     108                 :     next_sub_id: SubId,
     109                 :     num_subs_to_timelines: i64,
     110                 :     chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
     111                 :     num_subs_to_all: i64,
     112                 :     chan_to_all_subs: broadcast::Sender<SafekeeperTimelineInfo>,
     113                 : }
     114                 : 
     115                 : impl SharedState {
     116             353 :     pub fn new(all_keys_chan_size: usize) -> Self {
     117             353 :         SharedState {
     118             353 :             next_pub_id: 0,
     119             353 :             num_pubs: 0,
     120             353 :             next_sub_id: 0,
     121             353 :             num_subs_to_timelines: 0,
     122             353 :             chans_to_timeline_subs: HashMap::new(),
     123             353 :             num_subs_to_all: 0,
     124             353 :             chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
     125             353 :         }
     126             353 :     }
     127                 : 
     128                 :     // Register new publisher.
     129             501 :     pub fn register_publisher(&mut self) -> PubId {
     130             501 :         let pub_id = self.next_pub_id;
     131             501 :         self.next_pub_id += 1;
     132             501 :         self.num_pubs += 1;
     133             501 :         NUM_PUBS.set(self.num_pubs);
     134             501 :         pub_id
     135             501 :     }
     136                 : 
     137                 :     // Unregister publisher.
     138             501 :     pub fn unregister_publisher(&mut self) {
     139             501 :         self.num_pubs -= 1;
     140             501 :         NUM_PUBS.set(self.num_pubs);
     141             501 :     }
     142                 : 
     143                 :     // Register new subscriber.
     144            1613 :     pub fn register_subscriber(
     145            1613 :         &mut self,
     146            1613 :         sub_key: SubscriptionKey,
     147            1613 :         timeline_chan_size: usize,
     148            1613 :     ) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) {
     149            1613 :         let sub_id = self.next_sub_id;
     150            1613 :         self.next_sub_id += 1;
     151            1613 :         let sub_rx = match sub_key {
     152                 :             SubscriptionKey::All => {
     153             501 :                 self.num_subs_to_all += 1;
     154             501 :                 NUM_SUBS_ALL.set(self.num_subs_to_all);
     155             501 :                 self.chan_to_all_subs.subscribe()
     156                 :             }
     157            1112 :             SubscriptionKey::Timeline(ttid) => {
     158            1112 :                 self.num_subs_to_timelines += 1;
     159            1112 :                 NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
     160            1112 :                 // Create new broadcast channel for this key, or subscriber to
     161            1112 :                 // the existing one.
     162            1112 :                 let chan_to_timeline_sub =
     163            1112 :                     self.chans_to_timeline_subs
     164            1112 :                         .entry(ttid)
     165            1112 :                         .or_insert(ChanToTimelineSub {
     166            1112 :                             chan: broadcast::channel(timeline_chan_size).0,
     167            1112 :                             num_subscribers: 0,
     168            1112 :                         });
     169            1112 :                 chan_to_timeline_sub.num_subscribers += 1;
     170            1112 :                 chan_to_timeline_sub.chan.subscribe()
     171                 :             }
     172                 :         };
     173            1613 :         (sub_id, sub_rx)
     174            1613 :     }
     175                 : 
     176                 :     // Unregister the subscriber.
     177            1613 :     pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
     178            1613 :         match sub_key {
     179             501 :             SubscriptionKey::All => {
     180             501 :                 self.num_subs_to_all -= 1;
     181             501 :                 NUM_SUBS_ALL.set(self.num_subs_to_all);
     182             501 :             }
     183            1112 :             SubscriptionKey::Timeline(ttid) => {
     184            1112 :                 self.num_subs_to_timelines -= 1;
     185            1112 :                 NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
     186            1112 : 
     187            1112 :                 // Remove from the map, destroying the channel, if we are the
     188            1112 :                 // last subscriber to this timeline.
     189            1112 : 
     190            1112 :                 // Missing entry is a bug; we must have registered.
     191            1112 :                 let chan_to_timeline_sub = self
     192            1112 :                     .chans_to_timeline_subs
     193            1112 :                     .get_mut(&ttid)
     194            1112 :                     .expect("failed to find sub entry in shmem during unregister");
     195            1112 :                 chan_to_timeline_sub.num_subscribers -= 1;
     196            1112 :                 if chan_to_timeline_sub.num_subscribers == 0 {
     197            1100 :                     self.chans_to_timeline_subs.remove(&ttid);
     198            1100 :                 }
     199                 :             }
     200                 :         }
     201            1613 :     }
     202                 : }
     203                 : 
     204                 : // SharedState wrapper.
     205            2466 : #[derive(Clone)]
     206                 : struct Registry {
     207                 :     shared_state: Arc<RwLock<SharedState>>,
     208                 :     timeline_chan_size: usize,
     209                 : }
     210                 : 
     211                 : impl Registry {
     212                 :     // Register new publisher in shared state.
     213             501 :     pub fn register_publisher(&self, remote_addr: SocketAddr) -> Publisher {
     214             501 :         let pub_id = self.shared_state.write().register_publisher();
     215             501 :         info!("publication started id={} addr={:?}", pub_id, remote_addr);
     216             501 :         Publisher {
     217             501 :             id: pub_id,
     218             501 :             registry: self.clone(),
     219             501 :             remote_addr,
     220             501 :         }
     221             501 :     }
     222                 : 
     223             501 :     pub fn unregister_publisher(&self, publisher: &Publisher) {
     224             501 :         self.shared_state.write().unregister_publisher();
     225             501 :         info!(
     226             500 :             "publication ended id={} addr={:?}",
     227             500 :             publisher.id, publisher.remote_addr
     228             500 :         );
     229             501 :     }
     230                 : 
     231                 :     // Register new subscriber in shared state.
     232            1613 :     pub fn register_subscriber(
     233            1613 :         &self,
     234            1613 :         sub_key: SubscriptionKey,
     235            1613 :         remote_addr: SocketAddr,
     236            1613 :     ) -> Subscriber {
     237            1613 :         let (sub_id, sub_rx) = self
     238            1613 :             .shared_state
     239            1613 :             .write()
     240            1613 :             .register_subscriber(sub_key, self.timeline_chan_size);
     241            1613 :         info!(
     242            1611 :             "subscription started id={}, key={:?}, addr={:?}",
     243            1611 :             sub_id, sub_key, remote_addr
     244            1611 :         );
     245            1613 :         Subscriber {
     246            1613 :             id: sub_id,
     247            1613 :             key: sub_key,
     248            1613 :             sub_rx,
     249            1613 :             registry: self.clone(),
     250            1613 :             remote_addr,
     251            1613 :         }
     252            1613 :     }
     253                 : 
     254                 :     // Unregister the subscriber
     255            1613 :     pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
     256            1613 :         self.shared_state
     257            1613 :             .write()
     258            1613 :             .unregister_subscriber(subscriber.key);
     259            1613 :         info!(
     260            1611 :             "subscription ended id={}, key={:?}, addr={:?}",
     261            1611 :             subscriber.id, subscriber.key, subscriber.remote_addr
     262            1611 :         );
     263            1613 :     }
     264                 : }
     265                 : 
     266                 : // Private subscriber state.
     267                 : struct Subscriber {
     268                 :     id: SubId,
     269                 :     key: SubscriptionKey,
     270                 :     // Subscriber receives messages from publishers here.
     271                 :     sub_rx: broadcast::Receiver<SafekeeperTimelineInfo>,
     272                 :     // to unregister itself from shared state in Drop
     273                 :     registry: Registry,
     274                 :     // for logging
     275                 :     remote_addr: SocketAddr,
     276                 : }
     277                 : 
     278                 : impl Drop for Subscriber {
     279            1613 :     fn drop(&mut self) {
     280            1613 :         self.registry.unregister_subscriber(self);
     281            1613 :     }
     282                 : }
     283                 : 
     284                 : // Private publisher state
     285                 : struct Publisher {
     286                 :     id: PubId,
     287                 :     registry: Registry,
     288                 :     // for logging
     289                 :     remote_addr: SocketAddr,
     290                 : }
     291                 : 
     292                 : impl Publisher {
     293                 :     // Send msg to relevant subscribers.
     294            6678 :     pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
     295            6678 :         // send message to subscribers for everything
     296            6678 :         let shared_state = self.registry.shared_state.read();
     297            6678 :         // Err means there is no subscribers, it is fine.
     298            6678 :         shared_state.chan_to_all_subs.send(msg.clone()).ok();
     299                 : 
     300                 :         // send message to per timeline subscribers
     301            6678 :         let ttid =
     302            6678 :             parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
     303 UBC           0 :                 Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
     304 CBC        6678 :             })?)?;
     305            6678 :         if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
     306            5805 :             // Err can't happen here, as tx is destroyed only after removing
     307            5805 :             // from the map the last subscriber along with tx.
     308            5805 :             subs.chan
     309            5805 :                 .send(msg.clone())
     310            5805 :                 .expect("rx is still in the map with zero subscribers");
     311            5805 :         }
     312            6678 :         Ok(())
     313            6678 :     }
     314                 : }
     315                 : 
     316                 : impl Drop for Publisher {
     317             501 :     fn drop(&mut self) {
     318             501 :         self.registry.unregister_publisher(self);
     319             501 :     }
     320                 : }
     321                 : 
     322                 : struct Broker {
     323                 :     registry: Registry,
     324                 : }
     325                 : 
     326                 : #[tonic::async_trait]
     327                 : impl BrokerService for Broker {
     328             500 :     async fn publish_safekeeper_info(
     329             500 :         &self,
     330             500 :         request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
     331             500 :     ) -> Result<Response<()>, Status> {
     332             500 :         let remote_addr = request
     333             500 :             .remote_addr()
     334             500 :             .expect("TCPConnectInfo inserted by handler");
     335             500 :         let mut publisher = self.registry.register_publisher(remote_addr);
     336             500 : 
     337             500 :         let mut stream = request.into_inner();
     338                 : 
     339                 :         loop {
     340            7176 :             match stream.next().await {
     341            6676 :                 Some(Ok(msg)) => publisher.send_msg(&msg)?,
     342             500 :                 Some(Err(e)) => return Err(e), // grpc error from the stream
     343 UBC           0 :                 None => break,                 // closed stream
     344               0 :             }
     345               0 :         }
     346               0 : 
     347               0 :         Ok(Response::new(()))
     348 CBC        1000 :     }
     349                 : 
     350                 :     type SubscribeSafekeeperInfoStream =
     351                 :         Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
     352                 : 
     353            1611 :     async fn subscribe_safekeeper_info(
     354            1611 :         &self,
     355            1611 :         request: Request<SubscribeSafekeeperInfoRequest>,
     356            1611 :     ) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
     357            1611 :         let remote_addr = request
     358            1611 :             .remote_addr()
     359            1611 :             .expect("TCPConnectInfo inserted by handler");
     360            1611 :         let proto_key = request
     361            1611 :             .into_inner()
     362            1611 :             .subscription_key
     363            1611 :             .ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
     364            1611 :         let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
     365            1611 :         let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
     366            1611 : 
     367            1611 :         // transform rx into stream with item = Result, as method result demands
     368            1611 :         let output = async_stream::try_stream! {
     369            1611 :             let mut warn_interval = time::interval(Duration::from_millis(1000));
     370            1611 :             let mut missed_msgs: u64 = 0;
     371            1611 :             loop {
     372           28135 :                 match subscriber.sub_rx.recv().await {
     373           15593 :                     Ok(info) => yield info,
     374            1611 :                     Err(RecvError::Lagged(skipped_msg)) => {
     375 UBC           0 :                         missed_msgs += skipped_msg;
     376               0 :                         if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
     377 CBC        1611 :                             warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
     378 UBC           0 :                                 subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
     379 CBC        1611 :                             missed_msgs = 0;
     380            1611 :                         }
     381            1611 :                     }
     382            1611 :                     Err(RecvError::Closed) => {
     383            1611 :                         // can't happen, we never drop the channel while there is a subscriber
     384            1611 :                         Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
     385            1611 :                     }
     386            1611 :                 }
     387            1611 :             }
     388            1611 :         };
     389            1611 : 
     390            1611 :         Ok(Response::new(
     391            1611 :             Box::pin(output) as Self::SubscribeSafekeeperInfoStream
     392            1611 :         ))
     393            3222 :     }
     394                 : }
     395                 : 
     396                 : // We serve only metrics and healthcheck through http1.
     397               5 : async fn http1_handler(
     398               5 :     req: hyper::Request<hyper::body::Body>,
     399               5 : ) -> Result<hyper::Response<Body>, Infallible> {
     400               5 :     let resp = match (req.method(), req.uri().path()) {
     401               5 :         (&Method::GET, "/metrics") => {
     402 UBC           0 :             let mut buffer = vec![];
     403               0 :             let metrics = metrics::gather();
     404               0 :             let encoder = TextEncoder::new();
     405               0 :             encoder.encode(&metrics, &mut buffer).unwrap();
     406               0 : 
     407               0 :             hyper::Response::builder()
     408               0 :                 .status(StatusCode::OK)
     409               0 :                 .header(CONTENT_TYPE, encoder.format_type())
     410               0 :                 .body(Body::from(buffer))
     411               0 :                 .unwrap()
     412                 :         }
     413 CBC           5 :         (&Method::GET, "/status") => hyper::Response::builder()
     414               5 :             .status(StatusCode::OK)
     415               5 :             .body(Body::empty())
     416               5 :             .unwrap(),
     417 UBC           0 :         _ => hyper::Response::builder()
     418               0 :             .status(StatusCode::NOT_FOUND)
     419               0 :             .body(Body::empty())
     420               0 :             .unwrap(),
     421                 :     };
     422 CBC           5 :     Ok(resp)
     423               5 : }
     424                 : 
     425                 : #[tokio::main]
     426             352 : async fn main() -> Result<(), Box<dyn std::error::Error>> {
     427             352 :     let args = Args::parse();
     428             352 : 
     429             352 :     // important to keep the order of:
     430             352 :     // 1. init logging
     431             352 :     // 2. tracing panic hook
     432             352 :     // 3. sentry
     433             352 :     logging::init(
     434             352 :         LogFormat::from_config(&args.log_format)?,
     435             352 :         logging::TracingErrorLayerEnablement::Disabled,
     436 UBC           0 :     )?;
     437 CBC         352 :     logging::replace_panic_hook_with_tracing_panic_hook().forget();
     438             352 :     // initialize sentry if SENTRY_DSN is provided
     439             352 :     let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
     440             352 :     info!("version: {GIT_VERSION}");
     441             352 :     ::metrics::set_build_info_metric(GIT_VERSION);
     442             352 : 
     443             352 :     // On any shutdown signal, log receival and exit.
     444             352 :     std::thread::spawn(move || {
     445             352 :         ShutdownSignals::handle(|signal| {
     446             350 :             info!("received {}, terminating", signal.name());
     447             350 :             std::process::exit(0);
     448             352 :         })
     449             352 :     });
     450             352 : 
     451             352 :     let registry = Registry {
     452             352 :         shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
     453             352 :         timeline_chan_size: args.timeline_chan_size,
     454             352 :     };
     455             352 :     let storage_broker_impl = Broker {
     456             352 :         registry: registry.clone(),
     457             352 :     };
     458             352 :     let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
     459             352 : 
     460             352 :     info!("listening on {}", &args.listen_addr);
     461                 : 
     462                 :     // grpc is served along with http1 for metrics on a single port, hence we
     463                 :     // don't use tonic's Server.
     464             352 :     hyper::Server::bind(&args.listen_addr)
     465             352 :         .http2_keep_alive_interval(Some(args.http2_keepalive_interval))
     466            1525 :         .serve(make_service_fn(move |conn: &AddrStream| {
     467            1525 :             let storage_broker_server_cloned = storage_broker_server.clone();
     468            1525 :             let connect_info = conn.connect_info();
     469            1525 :             async move {
     470            2119 :                 Ok::<_, Infallible>(service_fn(move |mut req| {
     471            2119 :                     // That's what tonic's MakeSvc.call does to pass conninfo to
     472            2119 :                     // the request handler (and where its request.remote_addr()
     473            2119 :                     // expects it to find).
     474            2119 :                     req.extensions_mut().insert(connect_info.clone());
     475            2119 : 
     476            2119 :                     // Technically this second clone is not needed, but consume
     477            2119 :                     // by async block is apparently unavoidable. BTW, error
     478            2119 :                     // message is enigmatic, see
     479            2119 :                     // https://github.com/rust-lang/rust/issues/68119
     480            2119 :                     //
     481            2119 :                     // We could get away without async block at all, but then we
     482            2119 :                     // need to resort to futures::Either to merge the result,
     483            2119 :                     // which doesn't caress an eye as well.
     484            2119 :                     let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
     485            2119 :                     async move {
     486            2119 :                         if req.headers().get("content-type").map(|x| x.as_bytes())
     487            2119 :                             == Some(b"application/grpc")
     488                 :                         {
     489            5829 :                             let res_resp = storage_broker_server_svc.call(req).await;
     490                 :                             // Grpc and http1 handlers have slightly different
     491                 :                             // Response types: it is UnsyncBoxBody for the
     492                 :                             // former one (not sure why) and plain hyper::Body
     493                 :                             // for the latter. Both implement HttpBody though,
     494                 :                             // and EitherBody is used to merge them.
     495            2114 :                             res_resp.map(|resp| resp.map(EitherBody::Left))
     496                 :                         } else {
     497               5 :                             let res_resp = http1_handler(req).await;
     498               5 :                             res_resp.map(|resp| resp.map(EitherBody::Right))
     499                 :                         }
     500            2119 :                     }
     501            2119 :                 }))
     502            1525 :             }
     503            1525 :         }))
     504            1051 :         .await?;
     505 UBC           0 :     Ok(())
     506                 : }
     507                 : 
     508                 : #[cfg(test)]
     509                 : mod tests {
     510                 :     use super::*;
     511                 :     use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
     512                 :     use tokio::sync::broadcast::error::TryRecvError;
     513                 :     use utils::id::{TenantId, TimelineId};
     514                 : 
     515 CBC           2 :     fn msg(timeline_id: Vec<u8>) -> SafekeeperTimelineInfo {
     516               2 :         SafekeeperTimelineInfo {
     517               2 :             safekeeper_id: 1,
     518               2 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     519               2 :                 tenant_id: vec![0x00; 16],
     520               2 :                 timeline_id,
     521               2 :             }),
     522               2 :             term: 0,
     523               2 :             last_log_term: 0,
     524               2 :             flush_lsn: 1,
     525               2 :             commit_lsn: 2,
     526               2 :             backup_lsn: 3,
     527               2 :             remote_consistent_lsn: 4,
     528               2 :             peer_horizon_lsn: 5,
     529               2 :             safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
     530               2 :             http_connstr: "neon-1-sk-1.local:7677".to_owned(),
     531               2 :             local_start_lsn: 0,
     532               2 :             availability_zone: None,
     533               2 :         }
     534               2 :     }
     535                 : 
     536               3 :     fn tli_from_u64(i: u64) -> Vec<u8> {
     537               3 :         let mut timeline_id = vec![0xFF; 8];
     538               3 :         timeline_id.extend_from_slice(&i.to_be_bytes());
     539               3 :         timeline_id
     540               3 :     }
     541                 : 
     542               3 :     fn mock_addr() -> SocketAddr {
     543               3 :         "127.0.0.1:8080".parse().unwrap()
     544               3 :     }
     545                 : 
     546               1 :     #[tokio::test]
     547               1 :     async fn test_registry() {
     548               1 :         let registry = Registry {
     549               1 :             shared_state: Arc::new(RwLock::new(SharedState::new(16))),
     550               1 :             timeline_chan_size: 16,
     551               1 :         };
     552               1 : 
     553               1 :         // subscribe to timeline 2
     554               1 :         let ttid_2 = TenantTimelineId {
     555               1 :             tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
     556               1 :             timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
     557               1 :         };
     558               1 :         let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
     559               1 :         let mut subscriber_2 = registry.register_subscriber(sub_key_2, mock_addr());
     560               1 :         let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All, mock_addr());
     561               1 : 
     562               1 :         // send two messages with different keys
     563               1 :         let msg_1 = msg(tli_from_u64(1));
     564               1 :         let msg_2 = msg(tli_from_u64(2));
     565               1 :         let mut publisher = registry.register_publisher(mock_addr());
     566               1 :         publisher.send_msg(&msg_1).expect("failed to send msg");
     567               1 :         publisher.send_msg(&msg_2).expect("failed to send msg");
     568               1 : 
     569               1 :         // msg with key 2 should arrive to subscriber_2
     570               1 :         assert_eq!(subscriber_2.sub_rx.try_recv().unwrap(), msg_2);
     571                 : 
     572                 :         // but nothing more
     573               1 :         assert_eq!(
     574               1 :             subscriber_2.sub_rx.try_recv().unwrap_err(),
     575               1 :             TryRecvError::Empty
     576               1 :         );
     577                 : 
     578                 :         // subscriber_all should receive both messages
     579               1 :         assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_1);
     580               1 :         assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_2);
     581               1 :         assert_eq!(
     582               1 :             subscriber_all.sub_rx.try_recv().unwrap_err(),
     583               1 :             TryRecvError::Empty
     584               1 :         );
     585                 :     }
     586                 : }
        

Generated by: LCOV version 2.1-beta