LCOV - differential code coverage report
Current view: top level - storage_broker/src/bin - storage_broker.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 69.2 % 504 349 155 349
Current Date: 2024-01-09 02:06:09 Functions: 58.2 % 91 53 38 53
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Simple pub-sub based on grpc (tonic) and Tokio broadcast channel for storage
       2                 : //! nodes messaging.
       3                 : //!
       4                 : //! Subscriptions to 1) single timeline 2) all timelines are possible. We could
       5                 : //! add subscription to the set of timelines to save grpc streams, but testing
       6                 : //! shows many individual streams is also ok.
       7                 : //!
       8                 : //! Message is dropped if subscriber can't consume it, not affecting other
       9                 : //! subscribers.
      10                 : //!
      11                 : //! Only safekeeper message is supported, but it is not hard to add something
      12                 : //! else with generics.
      13                 : use clap::{command, Parser};
      14                 : use futures_core::Stream;
      15                 : use futures_util::StreamExt;
      16                 : use hyper::header::CONTENT_TYPE;
      17                 : use hyper::server::conn::AddrStream;
      18                 : use hyper::service::{make_service_fn, service_fn};
      19                 : use hyper::{Body, Method, StatusCode};
      20                 : use parking_lot::RwLock;
      21                 : use std::collections::HashMap;
      22                 : use std::convert::Infallible;
      23                 : use std::net::SocketAddr;
      24                 : use std::pin::Pin;
      25                 : use std::sync::Arc;
      26                 : use std::time::Duration;
      27                 : use tokio::sync::broadcast;
      28                 : use tokio::sync::broadcast::error::RecvError;
      29                 : use tokio::time;
      30                 : use tonic::codegen::Service;
      31                 : use tonic::transport::server::Connected;
      32                 : use tonic::Code;
      33                 : use tonic::{Request, Response, Status};
      34                 : use tracing::*;
      35                 : use utils::signals::ShutdownSignals;
      36                 : 
      37                 : use metrics::{Encoder, TextEncoder};
      38                 : use storage_broker::metrics::{
      39                 :     BROADCASTED_MESSAGES_TOTAL, BROADCAST_DROPPED_MESSAGES_TOTAL, NUM_PUBS, NUM_SUBS_ALL,
      40                 :     NUM_SUBS_TIMELINE, PROCESSED_MESSAGES_TOTAL, PUBLISHED_ONEOFF_MESSAGES_TOTAL,
      41                 : };
      42                 : use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
      43                 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
      44                 : use storage_broker::proto::{
      45                 :     FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
      46                 :     SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage,
      47                 : };
      48                 : use storage_broker::{
      49                 :     parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
      50                 : };
      51                 : use utils::id::TenantTimelineId;
      52                 : use utils::logging::{self, LogFormat};
      53                 : use utils::sentry_init::init_sentry;
      54                 : use utils::{project_build_tag, project_git_version};
      55                 : 
      56                 : project_git_version!(GIT_VERSION);
      57                 : project_build_tag!(BUILD_TAG);
      58                 : 
      59                 : const DEFAULT_CHAN_SIZE: usize = 32;
      60                 : const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
      61                 : 
      62 CBC         336 : #[derive(Parser, Debug)]
      63                 : #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
      64                 : struct Args {
      65                 :     /// Endpoint to listen on.
      66                 :     #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
      67 UBC           0 :     listen_addr: SocketAddr,
      68                 :     /// Size of the queue to the per timeline subscriber.
      69 CBC         336 :     #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
      70 UBC           0 :     timeline_chan_size: usize,
      71                 :     /// Size of the queue to the all keys subscriber.
      72 CBC         336 :     #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
      73 UBC           0 :     all_keys_chan_size: usize,
      74                 :     /// HTTP/2 keepalive interval.
      75                 :     #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
      76               0 :     http2_keepalive_interval: Duration,
      77                 :     /// Format for logging, either 'plain' or 'json'.
      78                 :     #[arg(long, default_value = "plain")]
      79               0 :     log_format: String,
      80                 : }
      81                 : 
      82                 : /// Id of publisher for registering in maps
      83                 : type PubId = u64;
      84                 : 
      85                 : /// Id of subscriber for registering in maps
      86                 : type SubId = u64;
      87                 : 
      88                 : /// Single enum type for all messages.
      89 CBC       37682 : #[derive(Clone, Debug, PartialEq)]
      90                 : #[allow(clippy::enum_variant_names)]
      91                 : enum Message {
      92                 :     SafekeeperTimelineInfo(SafekeeperTimelineInfo),
      93                 :     SafekeeperDiscoveryRequest(SafekeeperDiscoveryRequest),
      94                 :     SafekeeperDiscoveryResponse(SafekeeperDiscoveryResponse),
      95                 : }
      96                 : 
      97                 : impl Message {
      98                 :     /// Convert proto message to internal message.
      99 UBC           0 :     pub fn from(proto_msg: TypedMessage) -> Result<Self, Status> {
     100               0 :         match proto_msg.r#type() {
     101                 :             MessageType::SafekeeperTimelineInfo => Ok(Message::SafekeeperTimelineInfo(
     102               0 :                 proto_msg.safekeeper_timeline_info.ok_or_else(|| {
     103               0 :                     Status::new(Code::InvalidArgument, "missing safekeeper_timeline_info")
     104               0 :                 })?,
     105                 :             )),
     106                 :             MessageType::SafekeeperDiscoveryRequest => Ok(Message::SafekeeperDiscoveryRequest(
     107               0 :                 proto_msg.safekeeper_discovery_request.ok_or_else(|| {
     108               0 :                     Status::new(
     109               0 :                         Code::InvalidArgument,
     110               0 :                         "missing safekeeper_discovery_request",
     111               0 :                     )
     112               0 :                 })?,
     113                 :             )),
     114                 :             MessageType::SafekeeperDiscoveryResponse => Ok(Message::SafekeeperDiscoveryResponse(
     115               0 :                 proto_msg.safekeeper_discovery_response.ok_or_else(|| {
     116               0 :                     Status::new(
     117               0 :                         Code::InvalidArgument,
     118               0 :                         "missing safekeeper_discovery_response",
     119               0 :                     )
     120               0 :                 })?,
     121                 :             )),
     122               0 :             MessageType::Unknown => Err(Status::new(
     123               0 :                 Code::InvalidArgument,
     124               0 :                 format!("invalid message type: {:?}", proto_msg.r#type),
     125               0 :             )),
     126                 :         }
     127               0 :     }
     128                 : 
     129                 :     /// Get the tenant_timeline_id from the message.
     130 CBC        8842 :     pub fn tenant_timeline_id(&self) -> Result<Option<TenantTimelineId>, Status> {
     131            8842 :         match self {
     132            8842 :             Message::SafekeeperTimelineInfo(msg) => Ok(msg
     133            8842 :                 .tenant_timeline_id
     134            8842 :                 .as_ref()
     135            8842 :                 .map(parse_proto_ttid)
     136            8842 :                 .transpose()?),
     137 UBC           0 :             Message::SafekeeperDiscoveryRequest(msg) => Ok(msg
     138               0 :                 .tenant_timeline_id
     139               0 :                 .as_ref()
     140               0 :                 .map(parse_proto_ttid)
     141               0 :                 .transpose()?),
     142               0 :             Message::SafekeeperDiscoveryResponse(msg) => Ok(msg
     143               0 :                 .tenant_timeline_id
     144               0 :                 .as_ref()
     145               0 :                 .map(parse_proto_ttid)
     146               0 :                 .transpose()?),
     147                 :         }
     148 CBC        8842 :     }
     149                 : 
     150                 :     /// Convert internal message to the protobuf struct.
     151 UBC           0 :     pub fn as_typed_message(&self) -> TypedMessage {
     152               0 :         let mut res = TypedMessage {
     153               0 :             r#type: self.message_type() as i32,
     154               0 :             ..Default::default()
     155               0 :         };
     156               0 :         match self {
     157               0 :             Message::SafekeeperTimelineInfo(msg) => {
     158               0 :                 res.safekeeper_timeline_info = Some(msg.clone())
     159                 :             }
     160               0 :             Message::SafekeeperDiscoveryRequest(msg) => {
     161               0 :                 res.safekeeper_discovery_request = Some(msg.clone())
     162                 :             }
     163               0 :             Message::SafekeeperDiscoveryResponse(msg) => {
     164               0 :                 res.safekeeper_discovery_response = Some(msg.clone())
     165                 :             }
     166                 :         }
     167               0 :         res
     168               0 :     }
     169                 : 
     170                 :     /// Get the message type.
     171               0 :     pub fn message_type(&self) -> MessageType {
     172               0 :         match self {
     173               0 :             Message::SafekeeperTimelineInfo(_) => MessageType::SafekeeperTimelineInfo,
     174               0 :             Message::SafekeeperDiscoveryRequest(_) => MessageType::SafekeeperDiscoveryRequest,
     175               0 :             Message::SafekeeperDiscoveryResponse(_) => MessageType::SafekeeperDiscoveryResponse,
     176                 :         }
     177               0 :     }
     178                 : }
     179                 : 
     180 CBC        3180 : #[derive(Copy, Clone, Debug)]
     181                 : enum SubscriptionKey {
     182                 :     All,
     183                 :     Timeline(TenantTimelineId),
     184                 : }
     185                 : 
     186                 : impl SubscriptionKey {
     187                 :     /// Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
     188            1590 :     pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
     189            1590 :         match key {
     190             488 :             ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
     191            1102 :             ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
     192            1102 :                 Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
     193                 :             }
     194                 :         }
     195            1590 :     }
     196                 : 
     197                 :     /// Parse from FilterTenantTimelineId
     198 UBC           0 :     pub fn from_proto_filter_tenant_timeline_id(
     199               0 :         f: &FilterTenantTimelineId,
     200               0 :     ) -> Result<Self, Status> {
     201               0 :         if !f.enabled {
     202               0 :             return Ok(SubscriptionKey::All);
     203               0 :         }
     204                 : 
     205               0 :         let ttid =
     206               0 :             parse_proto_ttid(f.tenant_timeline_id.as_ref().ok_or_else(|| {
     207               0 :                 Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
     208               0 :             })?)?;
     209               0 :         Ok(SubscriptionKey::Timeline(ttid))
     210               0 :     }
     211                 : }
     212                 : 
     213                 : /// Channel to timeline subscribers.
     214                 : struct ChanToTimelineSub {
     215                 :     chan: broadcast::Sender<Message>,
     216                 :     /// Tracked separately to know when delete the shmem entry. receiver_count()
     217                 :     /// is unhandy for that as unregistering and dropping the receiver side
     218                 :     /// happens at different moments.
     219                 :     num_subscribers: u64,
     220                 : }
     221                 : 
     222                 : struct SharedState {
     223                 :     next_pub_id: PubId,
     224                 :     num_pubs: i64,
     225                 :     next_sub_id: SubId,
     226                 :     num_subs_to_timelines: i64,
     227                 :     chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
     228                 :     num_subs_to_all: i64,
     229                 :     chan_to_all_subs: broadcast::Sender<Message>,
     230                 : }
     231                 : 
     232                 : impl SharedState {
     233 CBC         337 :     pub fn new(all_keys_chan_size: usize) -> Self {
     234             337 :         SharedState {
     235             337 :             next_pub_id: 0,
     236             337 :             num_pubs: 0,
     237             337 :             next_sub_id: 0,
     238             337 :             num_subs_to_timelines: 0,
     239             337 :             chans_to_timeline_subs: HashMap::new(),
     240             337 :             num_subs_to_all: 0,
     241             337 :             chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
     242             337 :         }
     243             337 :     }
     244                 : 
     245                 :     // Register new publisher.
     246             489 :     pub fn register_publisher(&mut self) -> PubId {
     247             489 :         let pub_id = self.next_pub_id;
     248             489 :         self.next_pub_id += 1;
     249             489 :         self.num_pubs += 1;
     250             489 :         NUM_PUBS.set(self.num_pubs);
     251             489 :         pub_id
     252             489 :     }
     253                 : 
     254                 :     // Unregister publisher.
     255             489 :     pub fn unregister_publisher(&mut self) {
     256             489 :         self.num_pubs -= 1;
     257             489 :         NUM_PUBS.set(self.num_pubs);
     258             489 :     }
     259                 : 
     260                 :     // Register new subscriber.
     261            1592 :     pub fn register_subscriber(
     262            1592 :         &mut self,
     263            1592 :         sub_key: SubscriptionKey,
     264            1592 :         timeline_chan_size: usize,
     265            1592 :     ) -> (SubId, broadcast::Receiver<Message>) {
     266            1592 :         let sub_id = self.next_sub_id;
     267            1592 :         self.next_sub_id += 1;
     268            1592 :         let sub_rx = match sub_key {
     269                 :             SubscriptionKey::All => {
     270             489 :                 self.num_subs_to_all += 1;
     271             489 :                 NUM_SUBS_ALL.set(self.num_subs_to_all);
     272             489 :                 self.chan_to_all_subs.subscribe()
     273                 :             }
     274            1103 :             SubscriptionKey::Timeline(ttid) => {
     275            1103 :                 self.num_subs_to_timelines += 1;
     276            1103 :                 NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
     277            1103 :                 // Create new broadcast channel for this key, or subscriber to
     278            1103 :                 // the existing one.
     279            1103 :                 let chan_to_timeline_sub =
     280            1103 :                     self.chans_to_timeline_subs
     281            1103 :                         .entry(ttid)
     282            1103 :                         .or_insert(ChanToTimelineSub {
     283            1103 :                             chan: broadcast::channel(timeline_chan_size).0,
     284            1103 :                             num_subscribers: 0,
     285            1103 :                         });
     286            1103 :                 chan_to_timeline_sub.num_subscribers += 1;
     287            1103 :                 chan_to_timeline_sub.chan.subscribe()
     288                 :             }
     289                 :         };
     290            1592 :         (sub_id, sub_rx)
     291            1592 :     }
     292                 : 
     293                 :     // Unregister the subscriber.
     294            1592 :     pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
     295            1592 :         match sub_key {
     296             489 :             SubscriptionKey::All => {
     297             489 :                 self.num_subs_to_all -= 1;
     298             489 :                 NUM_SUBS_ALL.set(self.num_subs_to_all);
     299             489 :             }
     300            1103 :             SubscriptionKey::Timeline(ttid) => {
     301            1103 :                 self.num_subs_to_timelines -= 1;
     302            1103 :                 NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
     303            1103 : 
     304            1103 :                 // Remove from the map, destroying the channel, if we are the
     305            1103 :                 // last subscriber to this timeline.
     306            1103 : 
     307            1103 :                 // Missing entry is a bug; we must have registered.
     308            1103 :                 let chan_to_timeline_sub = self
     309            1103 :                     .chans_to_timeline_subs
     310            1103 :                     .get_mut(&ttid)
     311            1103 :                     .expect("failed to find sub entry in shmem during unregister");
     312            1103 :                 chan_to_timeline_sub.num_subscribers -= 1;
     313            1103 :                 if chan_to_timeline_sub.num_subscribers == 0 {
     314            1027 :                     self.chans_to_timeline_subs.remove(&ttid);
     315            1027 :                 }
     316                 :             }
     317                 :         }
     318            1592 :     }
     319                 : }
     320                 : 
     321                 : // SharedState wrapper.
     322            2417 : #[derive(Clone)]
     323                 : struct Registry {
     324                 :     shared_state: Arc<RwLock<SharedState>>,
     325                 :     timeline_chan_size: usize,
     326                 : }
     327                 : 
     328                 : impl Registry {
     329                 :     // Register new publisher in shared state.
     330             489 :     pub fn register_publisher(&self, remote_addr: SocketAddr) -> Publisher {
     331             489 :         let pub_id = self.shared_state.write().register_publisher();
     332             489 :         info!("publication started id={} addr={:?}", pub_id, remote_addr);
     333             489 :         Publisher {
     334             489 :             id: pub_id,
     335             489 :             registry: self.clone(),
     336             489 :             remote_addr,
     337             489 :         }
     338             489 :     }
     339                 : 
     340             489 :     pub fn unregister_publisher(&self, publisher: &Publisher) {
     341             489 :         self.shared_state.write().unregister_publisher();
     342             489 :         info!(
     343             488 :             "publication ended id={} addr={:?}",
     344             488 :             publisher.id, publisher.remote_addr
     345             488 :         );
     346             489 :     }
     347                 : 
     348                 :     // Register new subscriber in shared state.
     349            1592 :     pub fn register_subscriber(
     350            1592 :         &self,
     351            1592 :         sub_key: SubscriptionKey,
     352            1592 :         remote_addr: SocketAddr,
     353            1592 :     ) -> Subscriber {
     354            1592 :         let (sub_id, sub_rx) = self
     355            1592 :             .shared_state
     356            1592 :             .write()
     357            1592 :             .register_subscriber(sub_key, self.timeline_chan_size);
     358            1592 :         info!(
     359            1590 :             "subscription started id={}, key={:?}, addr={:?}",
     360            1590 :             sub_id, sub_key, remote_addr
     361            1590 :         );
     362            1592 :         Subscriber {
     363            1592 :             id: sub_id,
     364            1592 :             key: sub_key,
     365            1592 :             sub_rx,
     366            1592 :             registry: self.clone(),
     367            1592 :             remote_addr,
     368            1592 :         }
     369            1592 :     }
     370                 : 
     371                 :     // Unregister the subscriber
     372            1592 :     pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
     373            1592 :         self.shared_state
     374            1592 :             .write()
     375            1592 :             .unregister_subscriber(subscriber.key);
     376            1592 :         info!(
     377            1590 :             "subscription ended id={}, key={:?}, addr={:?}",
     378            1590 :             subscriber.id, subscriber.key, subscriber.remote_addr
     379            1590 :         );
     380            1592 :     }
     381                 : 
     382                 :     /// Send msg to relevant subscribers.
     383            8842 :     pub fn send_msg(&self, msg: &Message) -> Result<(), Status> {
     384            8842 :         PROCESSED_MESSAGES_TOTAL.inc();
     385            8842 : 
     386            8842 :         // send message to subscribers for everything
     387            8842 :         let shared_state = self.shared_state.read();
     388            8842 :         // Err means there is no subscribers, it is fine.
     389            8842 :         shared_state.chan_to_all_subs.send(msg.clone()).ok();
     390                 : 
     391                 :         // send message to per timeline subscribers, if there is ttid
     392            8842 :         let ttid = msg.tenant_timeline_id()?;
     393            8842 :         if let Some(ttid) = ttid {
     394            8842 :             if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
     395            8081 :                 // Err can't happen here, as tx is destroyed only after removing
     396            8081 :                 // from the map the last subscriber along with tx.
     397            8081 :                 subs.chan
     398            8081 :                     .send(msg.clone())
     399            8081 :                     .expect("rx is still in the map with zero subscribers");
     400            8081 :             }
     401 UBC           0 :         }
     402 CBC        8842 :         Ok(())
     403            8842 :     }
     404                 : }
     405                 : 
     406                 : // Private subscriber state.
     407                 : struct Subscriber {
     408                 :     id: SubId,
     409                 :     key: SubscriptionKey,
     410                 :     // Subscriber receives messages from publishers here.
     411                 :     sub_rx: broadcast::Receiver<Message>,
     412                 :     // to unregister itself from shared state in Drop
     413                 :     registry: Registry,
     414                 :     // for logging
     415                 :     remote_addr: SocketAddr,
     416                 : }
     417                 : 
     418                 : impl Drop for Subscriber {
     419            1592 :     fn drop(&mut self) {
     420            1592 :         self.registry.unregister_subscriber(self);
     421            1592 :     }
     422                 : }
     423                 : 
     424                 : // Private publisher state
     425                 : struct Publisher {
     426                 :     id: PubId,
     427                 :     registry: Registry,
     428                 :     // for logging
     429                 :     remote_addr: SocketAddr,
     430                 : }
     431                 : 
     432                 : impl Publisher {
     433                 :     /// Send msg to relevant subscribers.
     434            8842 :     pub fn send_msg(&mut self, msg: &Message) -> Result<(), Status> {
     435            8842 :         self.registry.send_msg(msg)
     436            8842 :     }
     437                 : }
     438                 : 
     439                 : impl Drop for Publisher {
     440             489 :     fn drop(&mut self) {
     441             489 :         self.registry.unregister_publisher(self);
     442             489 :     }
     443                 : }
     444                 : 
     445                 : struct Broker {
     446                 :     registry: Registry,
     447                 : }
     448                 : 
     449                 : #[tonic::async_trait]
     450                 : impl BrokerService for Broker {
     451             488 :     async fn publish_safekeeper_info(
     452             488 :         &self,
     453             488 :         request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
     454             488 :     ) -> Result<Response<()>, Status> {
     455             488 :         let remote_addr = request
     456             488 :             .remote_addr()
     457             488 :             .expect("TCPConnectInfo inserted by handler");
     458             488 :         let mut publisher = self.registry.register_publisher(remote_addr);
     459             488 : 
     460             488 :         let mut stream = request.into_inner();
     461                 : 
     462                 :         loop {
     463            9328 :             match stream.next().await {
     464            8840 :                 Some(Ok(msg)) => publisher.send_msg(&Message::SafekeeperTimelineInfo(msg))?,
     465             488 :                 Some(Err(e)) => return Err(e), // grpc error from the stream
     466 UBC           0 :                 None => break,                 // closed stream
     467               0 :             }
     468               0 :         }
     469               0 : 
     470               0 :         Ok(Response::new(()))
     471 CBC         976 :     }
     472                 : 
     473                 :     type SubscribeSafekeeperInfoStream =
     474                 :         Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
     475                 : 
     476            1590 :     async fn subscribe_safekeeper_info(
     477            1590 :         &self,
     478            1590 :         request: Request<SubscribeSafekeeperInfoRequest>,
     479            1590 :     ) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
     480            1590 :         let remote_addr = request
     481            1590 :             .remote_addr()
     482            1590 :             .expect("TCPConnectInfo inserted by handler");
     483            1590 :         let proto_key = request
     484            1590 :             .into_inner()
     485            1590 :             .subscription_key
     486            1590 :             .ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
     487            1590 :         let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
     488            1590 :         let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
     489            1590 : 
     490            1590 :         // transform rx into stream with item = Result, as method result demands
     491            1590 :         let output = async_stream::try_stream! {
     492            1590 :             let mut warn_interval = time::interval(Duration::from_millis(1000));
     493            1590 :             let mut missed_msgs: u64 = 0;
     494                 :             loop {
     495           36624 :                 match subscriber.sub_rx.recv().await {
     496           20756 :                     Ok(info) => {
     497           20756 :                         match info {
     498           20756 :                             Message::SafekeeperTimelineInfo(info) => yield info,
     499 UBC           0 :                             _ => {},
     500                 :                         }
     501 CBC       20756 :                         BROADCASTED_MESSAGES_TOTAL.inc();
     502                 :                     },
     503 UBC           0 :                     Err(RecvError::Lagged(skipped_msg)) => {
     504               0 :                         BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
     505               0 :                         missed_msgs += skipped_msg;
     506               0 :                         if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
     507               0 :                             warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
     508               0 :                                 subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
     509               0 :                             missed_msgs = 0;
     510               0 :                         }
     511                 :                     }
     512                 :                     Err(RecvError::Closed) => {
     513                 :                         // can't happen, we never drop the channel while there is a subscriber
     514               0 :                         Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
     515                 :                     }
     516                 :                 }
     517                 :             }
     518                 :         };
     519                 : 
     520 CBC        1590 :         Ok(Response::new(
     521            1590 :             Box::pin(output) as Self::SubscribeSafekeeperInfoStream
     522            1590 :         ))
     523            3180 :     }
     524                 : 
     525                 :     type SubscribeByFilterStream =
     526                 :         Pin<Box<dyn Stream<Item = Result<TypedMessage, Status>> + Send + 'static>>;
     527                 : 
     528                 :     /// Subscribe to all messages, limited by a filter.
     529 UBC           0 :     async fn subscribe_by_filter(
     530               0 :         &self,
     531               0 :         request: Request<SubscribeByFilterRequest>,
     532               0 :     ) -> std::result::Result<Response<Self::SubscribeByFilterStream>, Status> {
     533               0 :         let remote_addr = request
     534               0 :             .remote_addr()
     535               0 :             .expect("TCPConnectInfo inserted by handler");
     536               0 :         let proto_filter = request.into_inner();
     537               0 :         let ttid_filter = proto_filter
     538               0 :             .tenant_timeline_id
     539               0 :             .as_ref()
     540               0 :             .ok_or_else(|| Status::new(Code::InvalidArgument, "missing tenant_timeline_id"))?;
     541                 : 
     542               0 :         let sub_key = SubscriptionKey::from_proto_filter_tenant_timeline_id(ttid_filter)?;
     543               0 :         let types_set = proto_filter
     544               0 :             .types
     545               0 :             .iter()
     546               0 :             .map(|t| t.r#type)
     547               0 :             .collect::<std::collections::HashSet<_>>();
     548               0 : 
     549               0 :         let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
     550               0 : 
     551               0 :         // transform rx into stream with item = Result, as method result demands
     552               0 :         let output = async_stream::try_stream! {
     553               0 :             let mut warn_interval = time::interval(Duration::from_millis(1000));
     554               0 :             let mut missed_msgs: u64 = 0;
     555                 :             loop {
     556               0 :                 match subscriber.sub_rx.recv().await {
     557               0 :                     Ok(msg) => {
     558               0 :                         let msg_type = msg.message_type() as i32;
     559               0 :                         if types_set.contains(&msg_type) {
     560               0 :                             yield msg.as_typed_message();
     561               0 :                             BROADCASTED_MESSAGES_TOTAL.inc();
     562               0 :                         }
     563                 :                     },
     564               0 :                     Err(RecvError::Lagged(skipped_msg)) => {
     565               0 :                         BROADCAST_DROPPED_MESSAGES_TOTAL.inc_by(skipped_msg);
     566               0 :                         missed_msgs += skipped_msg;
     567               0 :                         if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
     568               0 :                             warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
     569               0 :                                 subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
     570               0 :                             missed_msgs = 0;
     571               0 :                         }
     572                 :                     }
     573                 :                     Err(RecvError::Closed) => {
     574                 :                         // can't happen, we never drop the channel while there is a subscriber
     575               0 :                         Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
     576                 :                     }
     577                 :                 }
     578                 :             }
     579                 :         };
     580                 : 
     581               0 :         Ok(Response::new(
     582               0 :             Box::pin(output) as Self::SubscribeByFilterStream
     583               0 :         ))
     584               0 :     }
     585                 : 
     586                 :     /// Publish one message.
     587               0 :     async fn publish_one(
     588               0 :         &self,
     589               0 :         request: Request<TypedMessage>,
     590               0 :     ) -> std::result::Result<Response<()>, Status> {
     591               0 :         let msg = Message::from(request.into_inner())?;
     592               0 :         PUBLISHED_ONEOFF_MESSAGES_TOTAL.inc();
     593               0 :         self.registry.send_msg(&msg)?;
     594               0 :         Ok(Response::new(()))
     595               0 :     }
     596                 : }
     597                 : 
     598                 : // We serve only metrics and healthcheck through http1.
     599 CBC           3 : async fn http1_handler(
     600               3 :     req: hyper::Request<hyper::body::Body>,
     601               3 : ) -> Result<hyper::Response<Body>, Infallible> {
     602               3 :     let resp = match (req.method(), req.uri().path()) {
     603               3 :         (&Method::GET, "/metrics") => {
     604 UBC           0 :             let mut buffer = vec![];
     605               0 :             let metrics = metrics::gather();
     606               0 :             let encoder = TextEncoder::new();
     607               0 :             encoder.encode(&metrics, &mut buffer).unwrap();
     608               0 : 
     609               0 :             hyper::Response::builder()
     610               0 :                 .status(StatusCode::OK)
     611               0 :                 .header(CONTENT_TYPE, encoder.format_type())
     612               0 :                 .body(Body::from(buffer))
     613               0 :                 .unwrap()
     614                 :         }
     615 CBC           3 :         (&Method::GET, "/status") => hyper::Response::builder()
     616               3 :             .status(StatusCode::OK)
     617               3 :             .body(Body::empty())
     618               3 :             .unwrap(),
     619 UBC           0 :         _ => hyper::Response::builder()
     620               0 :             .status(StatusCode::NOT_FOUND)
     621               0 :             .body(Body::empty())
     622               0 :             .unwrap(),
     623                 :     };
     624 CBC           3 :     Ok(resp)
     625               3 : }
     626                 : 
     627                 : #[tokio::main]
     628             336 : async fn main() -> Result<(), Box<dyn std::error::Error>> {
     629             336 :     let args = Args::parse();
     630             336 : 
     631             336 :     // important to keep the order of:
     632             336 :     // 1. init logging
     633             336 :     // 2. tracing panic hook
     634             336 :     // 3. sentry
     635             336 :     logging::init(
     636             336 :         LogFormat::from_config(&args.log_format)?,
     637             336 :         logging::TracingErrorLayerEnablement::Disabled,
     638             336 :         logging::Output::Stdout,
     639 UBC           0 :     )?;
     640 CBC         336 :     logging::replace_panic_hook_with_tracing_panic_hook().forget();
     641             336 :     // initialize sentry if SENTRY_DSN is provided
     642             336 :     let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
     643             336 :     info!("version: {GIT_VERSION}");
     644             336 :     info!("build_tag: {BUILD_TAG}");
     645             336 :     metrics::set_build_info_metric(GIT_VERSION, BUILD_TAG);
     646             336 : 
     647             336 :     // On any shutdown signal, log receival and exit.
     648             336 :     std::thread::spawn(move || {
     649             336 :         ShutdownSignals::handle(|signal| {
     650             334 :             info!("received {}, terminating", signal.name());
     651             334 :             std::process::exit(0);
     652             336 :         })
     653             336 :     });
     654             336 : 
     655             336 :     let registry = Registry {
     656             336 :         shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
     657             336 :         timeline_chan_size: args.timeline_chan_size,
     658             336 :     };
     659             336 :     let storage_broker_impl = Broker {
     660             336 :         registry: registry.clone(),
     661             336 :     };
     662             336 :     let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
     663             336 : 
     664             336 :     info!("listening on {}", &args.listen_addr);
     665                 : 
     666                 :     // grpc is served along with http1 for metrics on a single port, hence we
     667                 :     // don't use tonic's Server.
     668             336 :     hyper::Server::bind(&args.listen_addr)
     669             336 :         .http2_keep_alive_interval(Some(args.http2_keepalive_interval))
     670            1485 :         .serve(make_service_fn(move |conn: &AddrStream| {
     671            1485 :             let storage_broker_server_cloned = storage_broker_server.clone();
     672            1485 :             let connect_info = conn.connect_info();
     673            1485 :             async move {
     674            2083 :                 Ok::<_, Infallible>(service_fn(move |mut req| {
     675            2083 :                     // That's what tonic's MakeSvc.call does to pass conninfo to
     676            2083 :                     // the request handler (and where its request.remote_addr()
     677            2083 :                     // expects it to find).
     678            2083 :                     req.extensions_mut().insert(connect_info.clone());
     679            2083 : 
     680            2083 :                     // Technically this second clone is not needed, but consume
     681            2083 :                     // by async block is apparently unavoidable. BTW, error
     682            2083 :                     // message is enigmatic, see
     683            2083 :                     // https://github.com/rust-lang/rust/issues/68119
     684            2083 :                     //
     685            2083 :                     // We could get away without async block at all, but then we
     686            2083 :                     // need to resort to futures::Either to merge the result,
     687            2083 :                     // which doesn't caress an eye as well.
     688            2083 :                     let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
     689            2083 :                     async move {
     690            2083 :                         if req.headers().get("content-type").map(|x| x.as_bytes())
     691            2083 :                             == Some(b"application/grpc")
     692                 :                         {
     693            7094 :                             let res_resp = storage_broker_server_svc.call(req).await;
     694                 :                             // Grpc and http1 handlers have slightly different
     695                 :                             // Response types: it is UnsyncBoxBody for the
     696                 :                             // former one (not sure why) and plain hyper::Body
     697                 :                             // for the latter. Both implement HttpBody though,
     698                 :                             // and EitherBody is used to merge them.
     699            2080 :                             res_resp.map(|resp| resp.map(EitherBody::Left))
     700                 :                         } else {
     701               3 :                             let res_resp = http1_handler(req).await;
     702               3 :                             res_resp.map(|resp| resp.map(EitherBody::Right))
     703                 :                         }
     704            2083 :                     }
     705            2083 :                 }))
     706            1485 :             }
     707            1485 :         }))
     708            1033 :         .await?;
     709 UBC           0 :     Ok(())
     710                 : }
     711                 : 
     712                 : #[cfg(test)]
     713                 : mod tests {
     714                 :     use super::*;
     715                 :     use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
     716                 :     use tokio::sync::broadcast::error::TryRecvError;
     717                 :     use utils::id::{TenantId, TimelineId};
     718                 : 
     719 CBC           2 :     fn msg(timeline_id: Vec<u8>) -> Message {
     720               2 :         Message::SafekeeperTimelineInfo(SafekeeperTimelineInfo {
     721               2 :             safekeeper_id: 1,
     722               2 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     723               2 :                 tenant_id: vec![0x00; 16],
     724               2 :                 timeline_id,
     725               2 :             }),
     726               2 :             term: 0,
     727               2 :             last_log_term: 0,
     728               2 :             flush_lsn: 1,
     729               2 :             commit_lsn: 2,
     730               2 :             backup_lsn: 3,
     731               2 :             remote_consistent_lsn: 4,
     732               2 :             peer_horizon_lsn: 5,
     733               2 :             safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
     734               2 :             http_connstr: "neon-1-sk-1.local:7677".to_owned(),
     735               2 :             local_start_lsn: 0,
     736               2 :             availability_zone: None,
     737               2 :         })
     738               2 :     }
     739                 : 
     740               3 :     fn tli_from_u64(i: u64) -> Vec<u8> {
     741               3 :         let mut timeline_id = vec![0xFF; 8];
     742               3 :         timeline_id.extend_from_slice(&i.to_be_bytes());
     743               3 :         timeline_id
     744               3 :     }
     745                 : 
     746               3 :     fn mock_addr() -> SocketAddr {
     747               3 :         "127.0.0.1:8080".parse().unwrap()
     748               3 :     }
     749                 : 
     750               1 :     #[tokio::test]
     751               1 :     async fn test_registry() {
     752               1 :         let registry = Registry {
     753               1 :             shared_state: Arc::new(RwLock::new(SharedState::new(16))),
     754               1 :             timeline_chan_size: 16,
     755               1 :         };
     756               1 : 
     757               1 :         // subscribe to timeline 2
     758               1 :         let ttid_2 = TenantTimelineId {
     759               1 :             tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
     760               1 :             timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
     761               1 :         };
     762               1 :         let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
     763               1 :         let mut subscriber_2 = registry.register_subscriber(sub_key_2, mock_addr());
     764               1 :         let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All, mock_addr());
     765               1 : 
     766               1 :         // send two messages with different keys
     767               1 :         let msg_1 = msg(tli_from_u64(1));
     768               1 :         let msg_2 = msg(tli_from_u64(2));
     769               1 :         let mut publisher = registry.register_publisher(mock_addr());
     770               1 :         publisher.send_msg(&msg_1).expect("failed to send msg");
     771               1 :         publisher.send_msg(&msg_2).expect("failed to send msg");
     772               1 : 
     773               1 :         // msg with key 2 should arrive to subscriber_2
     774               1 :         assert_eq!(subscriber_2.sub_rx.try_recv().unwrap(), msg_2);
     775                 : 
     776                 :         // but nothing more
     777               1 :         assert_eq!(
     778               1 :             subscriber_2.sub_rx.try_recv().unwrap_err(),
     779               1 :             TryRecvError::Empty
     780               1 :         );
     781                 : 
     782                 :         // subscriber_all should receive both messages
     783               1 :         assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_1);
     784               1 :         assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_2);
     785               1 :         assert_eq!(
     786               1 :             subscriber_all.sub_rx.try_recv().unwrap_err(),
     787               1 :             TryRecvError::Empty
     788               1 :         );
     789                 :     }
     790                 : }
        

Generated by: LCOV version 2.1-beta