LCOV - code coverage report
Current view: top level - storage_broker/src/bin - storage_broker.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 69.3 % 505 350
Test Date: 2024-02-07 07:37:29 Functions: 58.2 % 91 53

            Line data    Source code
       1              : //! Simple pub-sub based on grpc (tonic) and Tokio broadcast channel for storage
       2              : //! nodes messaging.
       3              : //!
       4              : //! Subscriptions to 1) single timeline 2) all timelines are possible. We could
       5              : //! add subscription to the set of timelines to save grpc streams, but testing
       6              : //! shows many individual streams is also ok.
       7              : //!
       8              : //! Message is dropped if subscriber can't consume it, not affecting other
       9              : //! subscribers.
      10              : //!
      11              : //! Only safekeeper message is supported, but it is not hard to add something
      12              : //! else with generics.
      13              : use clap::{command, Parser};
      14              : use futures_core::Stream;
      15              : use futures_util::StreamExt;
      16              : use hyper::header::CONTENT_TYPE;
      17              : use hyper::server::conn::AddrStream;
      18              : use hyper::service::{make_service_fn, service_fn};
      19              : use hyper::{Body, Method, StatusCode};
      20              : use parking_lot::RwLock;
      21              : use std::collections::HashMap;
      22              : use std::convert::Infallible;
      23              : use std::net::SocketAddr;
      24              : use std::pin::Pin;
      25              : use std::sync::Arc;
      26              : use std::time::Duration;
      27              : use tokio::sync::broadcast;
      28              : use tokio::sync::broadcast::error::RecvError;
      29              : use tokio::time;
      30              : use tonic::codegen::Service;
      31              : use tonic::transport::server::Connected;
      32              : use tonic::Code;
      33              : use tonic::{Request, Response, Status};
      34              : use tracing::*;
      35              : use utils::signals::ShutdownSignals;
      36              : 
      37              : use metrics::{Encoder, TextEncoder};
      38              : use storage_broker::metrics::{
      39              :     BROADCASTED_MESSAGES_TOTAL, BROADCAST_DROPPED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
      40              :     NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
      41              : };
      42              : use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
      43              : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
      44              : use storage_broker::proto::{
      45              :     FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
      46              :     SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
      47              : };
      48              : use storage_broker::{
      49              :     parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
      50              : };
      51              : use utils::id::TenantTimelineId;
      52              : use utils::logging::{self, LogFormat};
      53              : use utils::sentry_init::init_sentry;
      54              : use utils::{project_build_tag, project_git_version};
      55              : 
      56              : project_git_version!(GIT_VERSION);
      57              : project_build_tag!(BUILD_TAG);
      58              : 
      59              : const DEFAULT_CHAN_SIZE: usize = 32;
      60              : const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
      61              : 
      62            3 : #[derive(Parser, Debug)]
      63              : #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
      64              : struct Args {
      65              :     /// Endpoint to listen on.
      66              :     #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
      67            0 :     listen_addr: SocketAddr,
      68              :     /// Size of the queue to the per timeline subscriber.
      69            3 :     #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
      70            0 :     timeline_chan_size: usize,
      71              :     /// Size of the queue to the all keys subscriber.
      72            3 :     #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
      73            0 :     all_keys_chan_size: usize,
      74              :     /// HTTP/2 keepalive interval.
      75              :     #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
      76            0 :     http2_keepalive_interval: Duration,
      77              :     /// Format for logging, either 'plain' or 'json'.
      78              :     #[arg(long, default_value = "plain")]
      79            0 :     log_format: String,
      80              : }
      81              : 
      82              : /// Id of publisher for registering in maps
      83              : type PubId = u64;
      84              : 
      85              : /// Id of subscriber for registering in maps
      86              : type SubId = u64;
      87              : 
      88              : /// Single enum type for all messages.
      89           20 : #[derive(Clone, Debug, PartialEq)]
      90              : #[allow(clippy::enum_variant_names)]
      91              : enum Message {
      92              :     SafekeeperTimelineInfo(SafekeeperTimelineInfo),
      93              :     SafekeeperDiscoveryRequest(SafekeeperDiscoveryRequest),
      94              :     SafekeeperDiscoveryResponse(SafekeeperDiscoveryResponse),
      95              : }
      96              : 
      97              : impl Message {
      98              :     /// Convert proto message to internal message.
      99            0 :     pub fn from(proto_msg: TypedMessage) -> Result<Self, Status> {
     100            0 :         match proto_msg.r#type() {
     101              :             MessageType::SafekeeperTimelineInfo => Ok(Message::SafekeeperTimelineInfo(
     102            0 :                 proto_msg.safekeeper_timeline_info.ok_or_else(|| {
     103            0 :                     Status::new(Code::InvalidArgument, "missing safekeeper_timeline_info")
     104            0 :                 })?,
     105              :             )),
     106              :             MessageType::SafekeeperDiscoveryRequest => Ok(Message::SafekeeperDiscoveryRequest(
     107            0 :                 proto_msg.safekeeper_discovery_request.ok_or_else(|| {
     108            0 :                     Status::new(
     109            0 :                         Code::InvalidArgument,
     110            0 :                         "missing safekeeper_discovery_request",
     111            0 :                     )
     112            0 :                 })?,
     113              :             )),
     114              :             MessageType::SafekeeperDiscoveryResponse => Ok(Message::SafekeeperDiscoveryResponse(
     115            0 :                 proto_msg.safekeeper_discovery_response.ok_or_else(|| {
     116            0 :                     Status::new(
     117            0 :                         Code::InvalidArgument,
     118            0 :                         "missing safekeeper_discovery_response",
     119            0 :                     )
     120            0 :                 })?,
     121              :             )),
     122            0 :             MessageType::Unknown => Err(Status::new(
     123            0 :                 Code::InvalidArgument,
     124            0 :                 format!("invalid message type: {:?}", proto_msg.r#type),
     125            0 :             )),
     126              :         }
     127            0 :     }
     128              : 
     129              :     /// Get the tenant_timeline_id from the message.
     130            6 :     pub fn tenant_timeline_id(&self) -> Result<Option<TenantTimelineId>, Status> {
     131            6 :         match self {
     132            6 :             Message::SafekeeperTimelineInfo(msg) => Ok(msg
     133            6 :                 .tenant_timeline_id
     134            6 :                 .as_ref()
     135            6 :                 .map(parse_proto_ttid)
     136            6 :                 .transpose()?),
     137            0 :             Message::SafekeeperDiscoveryRequest(msg) => Ok(msg
     138            0 :                 .tenant_timeline_id
     139            0 :                 .as_ref()
     140            0 :                 .map(parse_proto_ttid)
     141            0 :                 .transpose()?),
     142            0 :             Message::SafekeeperDiscoveryResponse(msg) => Ok(msg
     143            0 :                 .tenant_timeline_id
     144            0 :                 .as_ref()
     145            0 :                 .map(parse_proto_ttid)
     146            0 :                 .transpose()?),
     147              :         }
     148            6 :     }
     149              : 
     150              :     /// Convert internal message to the protobuf struct.
     151            0 :     pub fn as_typed_message(&self) -> TypedMessage {
     152            0 :         let mut res = TypedMessage {
     153            0 :             r#type: self.message_type() as i32,
     154            0 :             ..Default::default()
     155            0 :         };
     156            0 :         match self {
     157            0 :             Message::SafekeeperTimelineInfo(msg) => {
     158            0 :                 res.safekeeper_timeline_info = Some(msg.clone())
     159              :             }
     160            0 :             Message::SafekeeperDiscoveryRequest(msg) => {
     161            0 :                 res.safekeeper_discovery_request = Some(msg.clone())
     162              :             }
     163            0 :             Message::SafekeeperDiscoveryResponse(msg) => {
     164            0 :                 res.safekeeper_discovery_response = Some(msg.clone())
     165              :             }
     166              :         }
     167            0 :         res
     168            0 :     }
     169              : 
     170              :     /// Get the message type.
     171            0 :     pub fn message_type(&self) -> MessageType {
     172            0 :         match self {
     173            0 :             Message::SafekeeperTimelineInfo(_) => MessageType::SafekeeperTimelineInfo,
     174            0 :             Message::SafekeeperDiscoveryRequest(_) => MessageType::SafekeeperDiscoveryRequest,
     175            0 :             Message::SafekeeperDiscoveryResponse(_) => MessageType::SafekeeperDiscoveryResponse,
     176              :         }
     177            0 :     }
     178              : }
     179              : 
     180            6 : #[derive(Copy, Clone, Debug)]
     181              : enum SubscriptionKey {
     182              :     All,
     183              :     Timeline(TenantTimelineId),
     184              : }
     185              : 
     186              : impl SubscriptionKey {
     187              :     /// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
     188            3 :     pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
     189            3 :         match key {
     190            1 :             ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
     191            2 :             ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
     192            2 :                 Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
     193              :             }
     194              :         }
     195            3 :     }
     196              : 
     197              :     /// Parse from FilterTenantTimelineId
     198            0 :     pub fn from_proto_filter_tenant_timeline_id(
     199            0 :         f: &FilterTenantTimelineId,
     200            0 :     ) -> Result<Self, Status> {
     201            0 :         if !f.enabled {
     202            0 :             return Ok(SubscriptionKey::All);
     203            0 :         }
     204              : 
     205            0 :         let ttid =
     206            0 :             parse_proto_ttid(f.tenant_timeline_id.as_ref().ok_or_else(|| {
     207            0 :                 Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
     208            0 :             })?)?;
     209            0 :         Ok(SubscriptionKey::Timeline(ttid))
     210            0 :     }
     211              : }
     212              : 
     213              : /// Channel to timeline subscribers.
     214              : struct ChanToTimelineSub {
     215              :     chan: broadcast::Sender<Message>,
     216              :     /// Tracked separately to know when delete the shmem entry. receiver_count()
     217              :     /// is unhandy for that as unregistering and dropping the receiver side
     218              :     /// happens at different moments.
     219              :     num_subscribers: u64,
     220              : }
     221              : 
     222              : struct SharedState {
     223              :     next_pub_id: PubId,
     224              :     num_pubs: i64,
     225              :     next_sub_id: SubId,
     226              :     num_subs_to_timelines: i64,
     227              :     chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
     228              :     num_subs_to_all: i64,
     229              :     chan_to_all_subs: broadcast::Sender<Message>,
     230              : }
     231              : 
     232              : impl SharedState {
     233            5 :     pub fn new(all_keys_chan_size: usize) -> Self {
     234            5 :         SharedState {
     235            5 :             next_pub_id: 0,
     236            5 :             num_pubs: 0,
     237            5 :             next_sub_id: 0,
     238            5 :             num_subs_to_timelines: 0,
     239            5 :             chans_to_timeline_subs: HashMap::new(),
     240            5 :             num_subs_to_all: 0,
     241            5 :             chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
     242            5 :         }
     243            5 :     }
     244              : 
     245              :     // Register new publisher.
     246            3 :     pub fn register_publisher(&mut self) -> PubId {
     247            3 :         let pub_id = self.next_pub_id;
     248            3 :         self.next_pub_id += 1;
     249            3 :         self.num_pubs += 1;
     250            3 :         NUM_PUBS.set(self.num_pubs);
     251            3 :         pub_id
     252            3 :     }
     253              : 
     254              :     // Unregister publisher.
     255            3 :     pub fn unregister_publisher(&mut self) {
     256            3 :         self.num_pubs -= 1;
     257            3 :         NUM_PUBS.set(self.num_pubs);
     258            3 :     }
     259              : 
     260              :     // Register new subscriber.
     261            7 :     pub fn register_subscriber(
     262            7 :         &mut self,
     263            7 :         sub_key: SubscriptionKey,
     264            7 :         timeline_chan_size: usize,
     265            7 :     ) -> (SubId, broadcast::Receiver<Message>) {
     266            7 :         let sub_id = self.next_sub_id;
     267            7 :         self.next_sub_id += 1;
     268            7 :         let sub_rx = match sub_key {
     269              :             SubscriptionKey::All => {
     270            3 :                 self.num_subs_to_all += 1;
     271            3 :                 NUM_SUBS_ALL.set(self.num_subs_to_all);
     272            3 :                 self.chan_to_all_subs.subscribe()
     273              :             }
     274            4 :             SubscriptionKey::Timeline(ttid) => {
     275            4 :                 self.num_subs_to_timelines += 1;
     276            4 :                 NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
     277            4 :                 // Create new broadcast channel for this key, or subscriber to
     278            4 :                 // the existing one.
     279            4 :                 let chan_to_timeline_sub =
     280            4 :                     self.chans_to_timeline_subs
     281            4 :                         .entry(ttid)
     282            4 :                         .or_insert(ChanToTimelineSub {
     283            4 :                             chan: broadcast::channel(timeline_chan_size).0,
     284            4 :                             num_subscribers: 0,
     285            4 :                         });
     286            4 :                 chan_to_timeline_sub.num_subscribers += 1;
     287            4 :                 chan_to_timeline_sub.chan.subscribe()
     288              :             }
     289              :         };
     290            7 :         (sub_id, sub_rx)
     291            7 :     }
     292              : 
     293              :     // Unregister the subscriber.
     294            7 :     pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
     295            7 :         match sub_key {
     296            3 :             SubscriptionKey::All => {
     297            3 :                 self.num_subs_to_all -= 1;
     298            3 :                 NUM_SUBS_ALL.set(self.num_subs_to_all);
     299            3 :             }
     300            4 :             SubscriptionKey::Timeline(ttid) => {
     301            4 :                 self.num_subs_to_timelines -= 1;
     302            4 :                 NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
     303            4 : 
     304            4 :                 // Remove from the map, destroying the channel, if we are the
     305            4 :                 // last subscriber to this timeline.
     306            4 : 
     307            4 :                 // Missing entry is a bug; we must have registered.
     308            4 :                 let chan_to_timeline_sub = self
     309            4 :                     .chans_to_timeline_subs
     310            4 :                     .get_mut(&ttid)
     311            4 :                     .expect("failed to find sub entry in shmem during unregister");
     312            4 :                 chan_to_timeline_sub.num_subscribers -= 1;
     313            4 :                 if chan_to_timeline_sub.num_subscribers == 0 {
     314            4 :                     self.chans_to_timeline_subs.remove(&ttid);
     315            4 :                 }
     316              :             }
     317              :         }
     318            7 :     }
     319              : }
     320              : 
     321              : // SharedState wrapper.
     322           13 : #[derive(Clone)]
     323              : struct Registry {
     324              :     shared_state: Arc<RwLock<SharedState>>,
     325              :     timeline_chan_size: usize,
     326              : }
     327              : 
     328              : impl Registry {
     329              :     // Register new publisher in shared state.
     330            3 :     pub fn register_publisher(&self, remote_addr: SocketAddr) -> Publisher {
     331            3 :         let pub_id = self.shared_state.write().register_publisher();
     332            3 :         info!("publication started id={} addr={:?}", pub_id, remote_addr);
     333            3 :         Publisher {
     334            3 :             id: pub_id,
     335            3 :             registry: self.clone(),
     336            3 :             remote_addr,
     337            3 :         }
     338            3 :     }
     339              : 
     340            3 :     pub fn unregister_publisher(&self, publisher: &Publisher) {
     341            3 :         self.shared_state.write().unregister_publisher();
     342            3 :         info!(
     343            1 :             "publication ended id={} addr={:?}",
     344            1 :             publisher.id, publisher.remote_addr
     345            1 :         );
     346            3 :     }
     347              : 
     348              :     // Register new subscriber in shared state.
     349            7 :     pub fn register_subscriber(
     350            7 :         &self,
     351            7 :         sub_key: SubscriptionKey,
     352            7 :         remote_addr: SocketAddr,
     353            7 :     ) -> Subscriber {
     354            7 :         let (sub_id, sub_rx) = self
     355            7 :             .shared_state
     356            7 :             .write()
     357            7 :             .register_subscriber(sub_key, self.timeline_chan_size);
     358            7 :         info!(
     359            3 :             "subscription started id={}, key={:?}, addr={:?}",
     360            3 :             sub_id, sub_key, remote_addr
     361            3 :         );
     362            7 :         Subscriber {
     363            7 :             id: sub_id,
     364            7 :             key: sub_key,
     365            7 :             sub_rx,
     366            7 :             registry: self.clone(),
     367            7 :             remote_addr,
     368            7 :         }
     369            7 :     }
     370              : 
     371              :     // Unregister the subscriber
     372            7 :     pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
     373            7 :         self.shared_state
     374            7 :             .write()
     375            7 :             .unregister_subscriber(subscriber.key);
     376            7 :         info!(
     377            3 :             "subscription ended id={}, key={:?}, addr={:?}",
     378            3 :             subscriber.id, subscriber.key, subscriber.remote_addr
     379            3 :         );
     380            7 :     }
     381              : 
     382              :     /// Send msg to relevant subscribers.
     383            6 :     pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
     384            6 :         PROCESSED_MESSAGES_TOTAL.inc();
     385            6 : 
     386            6 :         // send message to subscribers for everything
     387            6 :         let shared_state = self.shared_state.read();
     388            6 :         // Err means there is no subscribers, it is fine.
     389            6 :         shared_state.chan_to_all_subs.send(msg.clone()).ok();
     390              : 
     391              :         // send message to per timeline subscribers, if there is ttid
     392            6 :         let ttid = msg.tenant_timeline_id()?;
     393            6 :         if let Some(ttid) = ttid {
     394            6 :             if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
     395            4 :                 // Err can't happen here, as tx is destroyed only after removing
     396            4 :                 // from the map the last subscriber along with tx.
     397            4 :                 subs.chan
     398            4 :                     .send(msg.clone())
     399            4 :                     .expect("rx is still in the map with zero subscribers");
     400            4 :             }
     401            0 :         }
     402            6 :         Ok(())
     403            6 :     }
     404              : }
     405              : 
     406              : // Private subscriber state.
     407              : struct Subscriber {
     408              :     id: SubId,
     409              :     key: SubscriptionKey,
     410              :     // Subscriber receives messages from publishers here.
     411              :     sub_rx: broadcast::Receiver<Message>,
     412              :     // to unregister itself from shared state in Drop
     413              :     registry: Registry,
     414              :     // for logging
     415              :     remote_addr: SocketAddr,
     416              : }
     417              : 
     418              : impl Drop for Subscriber {
     419            7 :     fn drop(&mut self) {
     420            7 :         self.registry.unregister_subscriber(self);
     421            7 :     }
     422              : }
     423              : 
     424              : // Private publisher state
     425              : struct Publisher {
     426              :     id: PubId,
     427              :     registry: Registry,
     428              :     // for logging
     429              :     remote_addr: SocketAddr,
     430              : }
     431              : 
     432              : impl Publisher {
     433              :     /// Send msg to relevant subscribers.
     434            6 :     pub fn send_msg(&mut self, msg: &Message) -> Result<(), Status> {
     435            6 :         self.registry.send_msg(msg)
     436            6 :     }
     437              : }
     438              : 
     439              : impl Drop for Publisher {
     440            3 :     fn drop(&mut self) {
     441            3 :         self.registry.unregister_publisher(self);
     442            3 :     }
     443              : }
     444              : 
     445              : struct Broker {
     446              :     registry: Registry,
     447              : }
     448              : 
     449              : #[tonic::async_trait]
     450              : impl BrokerService for Broker {
     451            1 :     async fn publish_safekeeper_info(
     452            1 :         &self,
     453            1 :         request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
     454            1 :     ) -> Result<Response<()>, Status> {
     455            1 :         let remote_addr = request
     456            1 :             .remote_addr()
     457            1 :             .expect("TCPConnectInfo inserted by handler");
     458            1 :         let mut publisher = self.registry.register_publisher(remote_addr);
     459            1 : 
     460            1 :         let mut stream = request.into_inner();
     461              : 
     462              :         loop {
     463            3 :             match stream.next().await {
     464            2 :                 Some(Ok(msg)) => publisher.send_msg(&Message::SafekeeperTimelineInfo(msg))?,
     465            1 :                 Some(Err(e)) => return Err(e), // grpc error from the stream
     466            0 :                 None => break,                 // closed stream
     467            0 :             }
     468            0 :         }
     469            0 : 
     470            0 :         Ok(Response::new(()))
     471            2 :     }
     472              : 
     473              :     type SubscribeSafekeeperInfoStream =
     474              :         Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
     475              : 
     476            3 :     async fn subscribe_safekeeper_info(
     477            3 :         &self,
     478            3 :         request: Request<SubscribeSafekeeperInfoRequest>,
     479            3 :     ) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
     480            3 :         let remote_addr = request
     481            3 :             .remote_addr()
     482            3 :             .expect("TCPConnectInfo inserted by handler");
     483            3 :         let proto_key = request
     484            3 :             .into_inner()
     485            3 :             .subscription_key
     486            3 :             .ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
     487            3 :         let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
     488            3 :         let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
     489            3 : 
     490            3 :         // transform rx into stream with item = Result, as method result demands
     491            3 :         let output = async_stream::try_stream! {
     492            3 :             let mut warn_interval = time::interval(Duration::from_millis(1000));
     493            3 :             let mut missed_msgs: u64 = 0;
     494              :             loop {
     495            7 :                 match subscriber.sub_rx.recv().await {
     496            4 :                     Ok(info) => {
     497            4 :                         match info {
     498            4 :                             Message::SafekeeperTimelineInfo(info) => yield info,
     499            0 :                             _ => {},
     500              :                         }
     501            4 :                         BROADCASTED_MESSAGES_TOTAL.inc();
     502              :                     },
     503            0 :                     Err(RecvError::Lagged(skipped_msg)) => {
     504            0 :                         BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
     505            0 :                         missed_msgs += skipped_msg;
     506            0 :                         if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
     507            0 :                             warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
     508            0 :                                 subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
     509            0 :                             missed_msgs = 0;
     510            0 :                         }
     511              :                     }
     512              :                     Err(RecvError::Closed) => {
     513              :                         // can't happen, we never drop the channel while there is a subscriber
     514            0 :                         Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
     515              :                     }
     516              :                 }
     517              :             }
     518              :         };
     519              : 
     520            3 :         Ok(Response::new(
     521            3 :             Box::pin(output) as Self::SubscribeSafekeeperInfoStream
     522            3 :         ))
     523            6 :     }
     524              : 
     525              :     type SubscribeByFilterStream =
     526              :         Pin<Box<dyn Stream<Item = Result<TypedMessage, Status>> + Send + 'static>>;
     527              : 
     528              :     /// Subscribe to all messages, limited by a filter.
     529            0 :     async fn subscribe_by_filter(
     530            0 :         &self,
     531            0 :         request: Request<SubscribeByFilterRequest>,
     532            0 :     ) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
     533            0 :         let remote_addr = request
     534            0 :             .remote_addr()
     535            0 :             .expect("TCPConnectInfo inserted by handler");
     536            0 :         let proto_filter = request.into_inner();
     537            0 :         let ttid_filter = proto_filter
     538            0 :             .tenant_timeline_id
     539            0 :             .as_ref()
     540            0 :             .ok_or_else(|| Status::new(Code::InvalidArgument, "missing tenant_timeline_id"))?;
     541              : 
     542            0 :         let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
     543            0 :         let types_set = proto_filter
     544            0 :             .types
     545            0 :             .iter()
     546            0 :             .map(|t| t.r#type)
     547            0 :             .collect::<std::collections::HashSet<_>>();
     548            0 : 
     549            0 :         let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
     550            0 : 
     551            0 :         // transform rx into stream with item = Result, as method result demands
     552            0 :         let output = async_stream::try_stream! {
     553            0 :             let mut warn_interval = time::interval(Duration::from_millis(1000));
     554            0 :             let mut missed_msgs: u64 = 0;
     555              :             loop {
     556            0 :                 match subscriber.sub_rx.recv().await {
     557            0 :                     Ok(msg) => {
     558            0 :                         let msg_type = msg.message_type() as i32;
     559            0 :                         if types_set.contains(&msg_type) {
     560            0 :                             yield msg.as_typed_message();
     561            0 :                             BROADCASTED_MESSAGES_TOTAL.inc();
     562            0 :                         }
     563              :                     },
     564            0 :                     Err(RecvError::Lagged(skipped_msg)) => {
     565            0 :                         BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
     566            0 :                         missed_msgs += skipped_msg;
     567            0 :                         if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
     568            0 :                             warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
     569            0 :                                 subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
     570            0 :                             missed_msgs = 0;
     571            0 :                         }
     572              :                     }
     573              :                     Err(RecvError::Closed) => {
     574              :                         // can't happen, we never drop the channel while there is a subscriber
     575            0 :                         Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
     576              :                     }
     577              :                 }
     578              :             }
     579              :         };
     580              : 
     581            0 :         Ok(Response::new(
     582            0 :             Box::pin(output) as Self::SubscribeByFilterStream
     583            0 :         ))
     584            0 :     }
     585              : 
     586              :     /// Publish one message.
     587            0 :     async fn publish_one(
     588            0 :         &self,
     589            0 :         request: Request<TypedMessage>,
     590            0 :     ) -> std::result::Result<Response<()>, Status> {
     591            0 :         let msg = Message::from(request.into_inner())?;
     592            0 :         PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
     593            0 :         self.registry.send_msg(&msg)?;
     594            0 :         Ok(Response::new(()))
     595            0 :     }
     596              : }
     597              : 
     598              : // We serve only metrics and healthcheck through http1.
     599            1 : async fn http1_handler(
     600            1 :     req: hyper::Request<hyper::body::Body>,
     601            1 : ) -> Result<hyper::Response<Body>, Infallible> {
     602            1 :     let resp = match (req.method(), req.uri().path()) {
     603            1 :         (&Method::GET, "/metrics") => {
     604            0 :             let mut buffer = vec![];
     605            0 :             let metrics = metrics::gather();
     606            0 :             let encoder = TextEncoder::new();
     607            0 :             encoder.encode(&metrics, &mut buffer).unwrap();
     608            0 : 
     609            0 :             hyper::Response::builder()
     610            0 :                 .status(StatusCode::OK)
     611            0 :                 .header(CONTENT_TYPE, encoder.format_type())
     612            0 :                 .body(Body::from(buffer))
     613            0 :                 .unwrap()
     614              :         }
     615            1 :         (&Method::GET, "/status") => hyper::Response::builder()
     616            1 :             .status(StatusCode::OK)
     617            1 :             .body(Body::empty())
     618            1 :             .unwrap(),
     619            0 :         _ => hyper::Response::builder()
     620            0 :             .status(StatusCode::NOT_FOUND)
     621            0 :             .body(Body::empty())
     622            0 :             .unwrap(),
     623              :     };
     624            1 :     Ok(resp)
     625            1 : }
     626              : 
     627              : #[tokio::main]
     628            3 : async fn main() -> Result<(), Box<dyn std::error::Error>> {
     629            3 :     let args = Args::parse();
     630            3 : 
     631            3 :     // important to keep the order of:
     632            3 :     // 1. init logging
     633            3 :     // 2. tracing panic hook
     634            3 :     // 3. sentry
     635            3 :     logging::init(
     636            3 :         LogFormat::from_config(&args.log_format)?,
     637            3 :         logging::TracingErrorLayerEnablement::Disabled,
     638            3 :         logging::Output::Stdout,
     639            0 :     )?;
     640            3 :     logging::replace_panic_hook_with_tracing_panic_hook().forget();
     641            3 :     // initialize sentry if SENTRY_DSN is provided
     642            3 :     let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
     643            3 :     info!("version: {GIT_VERSION}");
     644            3 :     info!("build_tag: {BUILD_TAG}");
     645            3 :     metrics::set_build_info_metric(GIT_VERSION, BUILD_TAG);
     646            3 : 
     647            3 :     // On any shutdown signal, log receival and exit.
     648            3 :     std::thread::spawn(move || {
     649            3 :         ShutdownSignals::handle(|signal| {
     650            1 :             info!("received {}, terminating", signal.name());
     651            1 :             std::process::exit(0);
     652            3 :         })
     653            3 :     });
     654            3 : 
     655            3 :     let registry = Registry {
     656            3 :         shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
     657            3 :         timeline_chan_size: args.timeline_chan_size,
     658            3 :     };
     659            3 :     let storage_broker_impl = Broker {
     660            3 :         registry: registry.clone(),
     661            3 :     };
     662            3 :     let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
     663            3 : 
     664            3 :     info!("listening on {}", &args.listen_addr);
     665              : 
     666              :     // grpc is served along with http1 for metrics on a single port, hence we
     667              :     // don't use tonic's Server.
     668            3 :     hyper::Server::bind(&args.listen_addr)
     669            3 :         .http2_keep_alive_interval(Some(args.http2_keepalive_interval))
     670            4 :         .serve(make_service_fn(move |conn: &AddrStream| {
     671            4 :             let storage_broker_server_cloned = storage_broker_server.clone();
     672            4 :             let connect_info = conn.connect_info();
     673            4 :             async move {
     674            5 :                 Ok::<_, Infallible>(service_fn(move |mut req| {
     675            5 :                     // That's what tonic's MakeSvc.call does to pass conninfo to
     676            5 :                     // the request handler (and where its request.remote_addr()
     677            5 :                     // expects it to find).
     678            5 :                     req.extensions_mut().insert(connect_info.clone());
     679            5 : 
     680            5 :                     // Technically this second clone is not needed, but consume
     681            5 :                     // by async block is apparently unavoidable. BTW, error
     682            5 :                     // message is enigmatic, see
     683            5 :                     // https://github.com/rust-lang/rust/issues/68119
     684            5 :                     //
     685            5 :                     // We could get away without async block at all, but then we
     686            5 :                     // need to resort to futures::Either to merge the result,
     687            5 :                     // which doesn't caress an eye as well.
     688            5 :                     let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
     689            5 :                     async move {
     690            5 :                         if req.headers().get("content-type").map(|x| x.as_bytes())
     691            5 :                             == Some(b"application/grpc")
     692              :                         {
     693            4 :                             let res_resp = storage_broker_server_svc.call(req).await;
     694              :                             // Grpc and http1 handlers have slightly different
     695              :                             // Response types: it is UnsyncBoxBody for the
     696              :                             // former one (not sure why) and plain hyper::Body
     697              :                             // for the latter. Both implement HttpBody though,
     698              :                             // and EitherBody is used to merge them.
     699            4 :                             res_resp.map(|resp| resp.map(EitherBody::Left))
     700              :                         } else {
     701            1 :                             let res_resp = http1_handler(req).await;
     702            1 :                             res_resp.map(|resp| resp.map(EitherBody::Right))
     703              :                         }
     704            5 :                     }
     705            5 :                 }))
     706            4 :             }
     707            4 :         }))
     708            4 :         .await?;
     709            0 :     Ok(())
     710              : }
     711              : 
     712              : #[cfg(test)]
     713              : mod tests {
     714              :     use super::*;
     715              :     use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
     716              :     use tokio::sync::broadcast::error::TryRecvError;
     717              :     use utils::id::{TenantId, TimelineId};
     718              : 
     719            4 :     fn msg(timeline_id: Vec<u8>) -> Message {
     720            4 :         Message::SafekeeperTimelineInfo(SafekeeperTimelineInfo {
     721            4 :             safekeeper_id: 1,
     722            4 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     723            4 :                 tenant_id: vec![0x00; 16],
     724            4 :                 timeline_id,
     725            4 :             }),
     726            4 :             term: 0,
     727            4 :             last_log_term: 0,
     728            4 :             flush_lsn: 1,
     729            4 :             commit_lsn: 2,
     730            4 :             backup_lsn: 3,
     731            4 :             remote_consistent_lsn: 4,
     732            4 :             peer_horizon_lsn: 5,
     733            4 :             safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
     734            4 :             http_connstr: "neon-1-sk-1.local:7677".to_owned(),
     735            4 :             local_start_lsn: 0,
     736            4 :             availability_zone: None,
     737            4 :             standby_horizon: 0,
     738            4 :         })
     739            4 :     }
     740              : 
     741            6 :     fn tli_from_u64(i: u64) -> Vec<u8> {
     742            6 :         let mut timeline_id = vec![0xFF; 8];
     743            6 :         timeline_id.extend_from_slice(&i.to_be_bytes());
     744            6 :         timeline_id
     745            6 :     }
     746              : 
     747            6 :     fn mock_addr() -> SocketAddr {
     748            6 :         "127.0.0.1:8080".parse().unwrap()
     749            6 :     }
     750              : 
     751            2 :     #[tokio::test]
     752            2 :     async fn test_registry() {
     753            2 :         let registry = Registry {
     754            2 :             shared_state: Arc::new(RwLock::new(SharedState::new(16))),
     755            2 :             timeline_chan_size: 16,
     756            2 :         };
     757            2 : 
     758            2 :         // subscribe to timeline 2
     759            2 :         let ttid_2 = TenantTimelineId {
     760            2 :             tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
     761            2 :             timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
     762            2 :         };
     763            2 :         let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
     764            2 :         let mut subscriber_2 = registry.register_subscriber(sub_key_2, mock_addr());
     765            2 :         let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All, mock_addr());
     766            2 : 
     767            2 :         // send two messages with different keys
     768            2 :         let msg_1 = msg(tli_from_u64(1));
     769            2 :         let msg_2 = msg(tli_from_u64(2));
     770            2 :         let mut publisher = registry.register_publisher(mock_addr());
     771            2 :         publisher.send_msg(&msg_1).expect("failed to send msg");
     772            2 :         publisher.send_msg(&msg_2).expect("failed to send msg");
     773            2 : 
     774            2 :         // msg with key 2 should arrive to subscriber_2
     775            2 :         assert_eq!(subscriber_2.sub_rx.try_recv().unwrap(), msg_2);
     776              : 
     777              :         // but nothing more
     778            2 :         assert_eq!(
     779            2 :             subscriber_2.sub_rx.try_recv().unwrap_err(),
     780            2 :             TryRecvError::Empty
     781            2 :         );
     782              : 
     783              :         // subscriber_all should receive both messages
     784            2 :         assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_1);
     785            2 :         assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_2);
     786            2 :         assert_eq!(
     787            2 :             subscriber_all.sub_rx.try_recv().unwrap_err(),
     788            2 :             TryRecvError::Empty
     789            2 :         );
     790              :     }
     791              : }
        

Generated by: LCOV version 2.1-beta