LCOV - code coverage report
Current view: top level - storage_broker/src/bin - storage_broker.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 91.5 % 377 345
Test Date: 2023-09-06 10:18:01 Functions: 69.0 % 71 49

            Line data    Source code
       1              : //! Simple pub-sub based on grpc (tonic) and Tokio broadcast channel for storage
       2              : //! nodes messaging.
       3              : //!
       4              : //! Subscriptions to 1) single timeline 2) all timelines are possible. We could
       5              : //! add subscription to the set of timelines to save grpc streams, but testing
       6              : //! shows many individual streams is also ok.
       7              : //!
       8              : //! Message is dropped if subscriber can't consume it, not affecting other
       9              : //! subscribers.
      10              : //!
      11              : //! Only safekeeper message is supported, but it is not hard to add something
      12              : //! else with generics.
      13              : use clap::{command, Parser};
      14              : use futures_core::Stream;
      15              : use futures_util::StreamExt;
      16              : use hyper::header::CONTENT_TYPE;
      17              : use hyper::server::conn::AddrStream;
      18              : use hyper::service::{make_service_fn, service_fn};
      19              : use hyper::{Body, Method, StatusCode};
      20              : use parking_lot::RwLock;
      21              : use std::collections::HashMap;
      22              : use std::convert::Infallible;
      23              : use std::net::SocketAddr;
      24              : use std::pin::Pin;
      25              : use std::sync::Arc;
      26              : use std::time::Duration;
      27              : use tokio::sync::broadcast;
      28              : use tokio::sync::broadcast::error::RecvError;
      29              : use tokio::time;
      30              : use tonic::codegen::Service;
      31              : use tonic::transport::server::Connected;
      32              : use tonic::Code;
      33              : use tonic::{Request, Response, Status};
      34              : use tracing::*;
      35              : use utils::signals::ShutdownSignals;
      36              : 
      37              : use metrics::{Encoder, TextEncoder};
      38              : use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
      39              : use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
      40              : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
      41              : use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
      42              : use storage_broker::{
      43              :     parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
      44              : };
      45              : use utils::id::TenantTimelineId;
      46              : use utils::logging::{self, LogFormat};
      47              : use utils::project_git_version;
      48              : use utils::sentry_init::init_sentry;
      49              : 
      50              : project_git_version!(GIT_VERSION);
      51              : 
      52              : const DEFAULT_CHAN_SIZE: usize = 32;
      53              : const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
      54              : 
      55          744 : #[derive(Parser, Debug)]
      56              : #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
      57              : struct Args {
      58              :     /// Endpoint to listen on.
      59              :     #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
      60            0 :     listen_addr: SocketAddr,
      61              :     /// Size of the queue to the per timeline subscriber.
      62          372 :     #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
      63            0 :     timeline_chan_size: usize,
      64            0 :     /// Size of the queue to the all keys subscriber.
      65          372 :     #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
      66            0 :     all_keys_chan_size: usize,
      67            0 :     /// HTTP/2 keepalive interval.
      68              :     #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
      69            0 :     http2_keepalive_interval: Duration,
      70              :     /// Format for logging, either 'plain' or 'json'.
      71              :     #[arg(long, default_value = "plain")]
      72            0 :     log_format: String,
      73              : }
      74              : 
      75              : type PubId = u64; // id of publisher for registering in maps
      76              : type SubId = u64; // id of subscriber for registering in maps
      77              : 
      78         3430 : #[derive(Copy, Clone, Debug)]
      79              : enum SubscriptionKey {
      80              :     All,
      81              :     Timeline(TenantTimelineId),
      82              : }
      83              : 
      84              : impl SubscriptionKey {
      85              :     // Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
      86         1715 :     pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
      87         1715 :         match key {
      88          517 :             ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
      89         1198 :             ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
      90         1198 :                 Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
      91              :             }
      92              :         }
      93         1715 :     }
      94              : }
      95              : 
      96              : // Channel to timeline subscribers.
      97              : struct ChanToTimelineSub {
      98              :     chan: broadcast::Sender<SafekeeperTimelineInfo>,
      99              :     // Tracked separately to know when delete the shmem entry. receiver_count()
     100              :     // is unhandy for that as unregistering and dropping the receiver side
     101              :     // happens at different moments.
     102              :     num_subscribers: u64,
     103              : }
     104              : 
     105              : struct SharedState {
     106              :     next_pub_id: PubId,
     107              :     num_pubs: i64,
     108              :     next_sub_id: SubId,
     109              :     num_subs_to_timelines: i64,
     110              :     chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
     111              :     num_subs_to_all: i64,
     112              :     chan_to_all_subs: broadcast::Sender<SafekeeperTimelineInfo>,
     113              : }
     114              : 
     115              : impl SharedState {
     116          373 :     pub fn new(all_keys_chan_size: usize) -> Self {
     117          373 :         SharedState {
     118          373 :             next_pub_id: 0,
     119          373 :             num_pubs: 0,
     120          373 :             next_sub_id: 0,
     121          373 :             num_subs_to_timelines: 0,
     122          373 :             chans_to_timeline_subs: HashMap::new(),
     123          373 :             num_subs_to_all: 0,
     124          373 :             chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
     125          373 :         }
     126          373 :     }
     127              : 
     128              :     // Register new publisher.
     129          518 :     pub fn register_publisher(&mut self) -> PubId {
     130          518 :         let pub_id = self.next_pub_id;
     131          518 :         self.next_pub_id += 1;
     132          518 :         self.num_pubs += 1;
     133          518 :         NUM_PUBS.set(self.num_pubs);
     134          518 :         pub_id
     135          518 :     }
     136              : 
     137              :     // Unregister publisher.
     138          518 :     pub fn unregister_publisher(&mut self) {
     139          518 :         self.num_pubs -= 1;
     140          518 :         NUM_PUBS.set(self.num_pubs);
     141          518 :     }
     142              : 
     143              :     // Register new subscriber.
     144         1717 :     pub fn register_subscriber(
     145         1717 :         &mut self,
     146         1717 :         sub_key: SubscriptionKey,
     147         1717 :         timeline_chan_size: usize,
     148         1717 :     ) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) {
     149         1717 :         let sub_id = self.next_sub_id;
     150         1717 :         self.next_sub_id += 1;
     151         1717 :         let sub_rx = match sub_key {
     152              :             SubscriptionKey::All => {
     153          518 :                 self.num_subs_to_all += 1;
     154          518 :                 NUM_SUBS_ALL.set(self.num_subs_to_all);
     155          518 :                 self.chan_to_all_subs.subscribe()
     156              :             }
     157         1199 :             SubscriptionKey::Timeline(ttid) => {
     158         1199 :                 self.num_subs_to_timelines += 1;
     159         1199 :                 NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
     160         1199 :                 // Create new broadcast channel for this key, or subscriber to
     161         1199 :                 // the existing one.
     162         1199 :                 let chan_to_timeline_sub =
     163         1199 :                     self.chans_to_timeline_subs
     164         1199 :                         .entry(ttid)
     165         1199 :                         .or_insert(ChanToTimelineSub {
     166         1199 :                             chan: broadcast::channel(timeline_chan_size).0,
     167         1199 :                             num_subscribers: 0,
     168         1199 :                         });
     169         1199 :                 chan_to_timeline_sub.num_subscribers += 1;
     170         1199 :                 chan_to_timeline_sub.chan.subscribe()
     171              :             }
     172              :         };
     173         1717 :         (sub_id, sub_rx)
     174         1717 :     }
     175              : 
     176              :     // Unregister the subscriber.
     177         1717 :     pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
     178         1717 :         match sub_key {
     179          518 :             SubscriptionKey::All => {
     180          518 :                 self.num_subs_to_all -= 1;
     181          518 :                 NUM_SUBS_ALL.set(self.num_subs_to_all);
     182          518 :             }
     183         1199 :             SubscriptionKey::Timeline(ttid) => {
     184         1199 :                 self.num_subs_to_timelines -= 1;
     185         1199 :                 NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
     186         1199 : 
     187         1199 :                 // Remove from the map, destroying the channel, if we are the
     188         1199 :                 // last subscriber to this timeline.
     189         1199 : 
     190         1199 :                 // Missing entry is a bug; we must have registered.
     191         1199 :                 let chan_to_timeline_sub = self
     192         1199 :                     .chans_to_timeline_subs
     193         1199 :                     .get_mut(&ttid)
     194         1199 :                     .expect("failed to find sub entry in shmem during unregister");
     195         1199 :                 chan_to_timeline_sub.num_subscribers -= 1;
     196         1199 :                 if chan_to_timeline_sub.num_subscribers == 0 {
     197         1187 :                     self.chans_to_timeline_subs.remove(&ttid);
     198         1187 :                 }
     199              :             }
     200              :         }
     201         1717 :     }
     202              : }
     203              : 
     204              : // SharedState wrapper.
     205         2607 : #[derive(Clone)]
     206              : struct Registry {
     207              :     shared_state: Arc<RwLock<SharedState>>,
     208              :     timeline_chan_size: usize,
     209              : }
     210              : 
     211              : impl Registry {
     212              :     // Register new publisher in shared state.
     213          518 :     pub fn register_publisher(&self, remote_addr: SocketAddr) -> Publisher {
     214          518 :         let pub_id = self.shared_state.write().register_publisher();
     215          518 :         info!("publication started id={} addr={:?}", pub_id, remote_addr);
     216          518 :         Publisher {
     217          518 :             id: pub_id,
     218          518 :             registry: self.clone(),
     219          518 :             remote_addr,
     220          518 :         }
     221          518 :     }
     222              : 
     223          518 :     pub fn unregister_publisher(&self, publisher: &Publisher) {
     224          518 :         self.shared_state.write().unregister_publisher();
     225          518 :         info!(
     226          517 :             "publication ended id={} addr={:?}",
     227          517 :             publisher.id, publisher.remote_addr
     228          517 :         );
     229          518 :     }
     230              : 
     231              :     // Register new subscriber in shared state.
     232         1717 :     pub fn register_subscriber(
     233         1717 :         &self,
     234         1717 :         sub_key: SubscriptionKey,
     235         1717 :         remote_addr: SocketAddr,
     236         1717 :     ) -> Subscriber {
     237         1717 :         let (sub_id, sub_rx) = self
     238         1717 :             .shared_state
     239         1717 :             .write()
     240         1717 :             .register_subscriber(sub_key, self.timeline_chan_size);
     241         1717 :         info!(
     242         1715 :             "subscription started id={}, key={:?}, addr={:?}",
     243         1715 :             sub_id, sub_key, remote_addr
     244         1715 :         );
     245         1717 :         Subscriber {
     246         1717 :             id: sub_id,
     247         1717 :             key: sub_key,
     248         1717 :             sub_rx,
     249         1717 :             registry: self.clone(),
     250         1717 :             remote_addr,
     251         1717 :         }
     252         1717 :     }
     253              : 
     254              :     // Unregister the subscriber
     255         1717 :     pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
     256         1717 :         self.shared_state
     257         1717 :             .write()
     258         1717 :             .unregister_subscriber(subscriber.key);
     259         1717 :         info!(
     260         1715 :             "subscription ended id={}, key={:?}, addr={:?}",
     261         1715 :             subscriber.id, subscriber.key, subscriber.remote_addr
     262         1715 :         );
     263         1717 :     }
     264              : }
     265              : 
     266              : // Private subscriber state.
     267              : struct Subscriber {
     268              :     id: SubId,
     269              :     key: SubscriptionKey,
     270              :     // Subscriber receives messages from publishers here.
     271              :     sub_rx: broadcast::Receiver<SafekeeperTimelineInfo>,
     272              :     // to unregister itself from shared state in Drop
     273              :     registry: Registry,
     274              :     // for logging
     275              :     remote_addr: SocketAddr,
     276              : }
     277              : 
     278              : impl Drop for Subscriber {
     279         1717 :     fn drop(&mut self) {
     280         1717 :         self.registry.unregister_subscriber(self);
     281         1717 :     }
     282              : }
     283              : 
     284              : // Private publisher state
     285              : struct Publisher {
     286              :     id: PubId,
     287              :     registry: Registry,
     288              :     // for logging
     289              :     remote_addr: SocketAddr,
     290              : }
     291              : 
     292              : impl Publisher {
     293              :     // Send msg to relevant subscribers.
     294         6310 :     pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
     295         6310 :         // send message to subscribers for everything
     296         6310 :         let shared_state = self.registry.shared_state.read();
     297         6310 :         // Err means there is no subscribers, it is fine.
     298         6310 :         shared_state.chan_to_all_subs.send(msg.clone()).ok();
     299              : 
     300              :         // send message to per timeline subscribers
     301         6310 :         let ttid =
     302         6310 :             parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
     303            0 :                 Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
     304         6310 :             })?)?;
     305         6310 :         if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
     306         5720 :             // Err can't happen here, as tx is destroyed only after removing
     307         5720 :             // from the map the last subscriber along with tx.
     308         5720 :             subs.chan
     309         5720 :                 .send(msg.clone())
     310         5720 :                 .expect("rx is still in the map with zero subscribers");
     311         5720 :         }
     312         6310 :         Ok(())
     313         6310 :     }
     314              : }
     315              : 
     316              : impl Drop for Publisher {
     317          518 :     fn drop(&mut self) {
     318          518 :         self.registry.unregister_publisher(self);
     319          518 :     }
     320              : }
     321              : 
     322              : struct Broker {
     323              :     registry: Registry,
     324              : }
     325              : 
     326              : #[tonic::async_trait]
     327              : impl BrokerService for Broker {
     328          517 :     async fn publish_safekeeper_info(
     329          517 :         &self,
     330          517 :         request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
     331          517 :     ) -> Result<Response<()>, Status> {
     332          517 :         let remote_addr = request
     333          517 :             .remote_addr()
     334          517 :             .expect("TCPConnectInfo inserted by handler");
     335          517 :         let mut publisher = self.registry.register_publisher(remote_addr);
     336          517 : 
     337          517 :         let mut stream = request.into_inner();
     338              : 
     339              :         loop {
     340         6825 :             match stream.next().await {
     341         6308 :                 Some(Ok(msg)) => publisher.send_msg(&msg)?,
     342          517 :                 Some(Err(e)) => return Err(e), // grpc error from the stream
     343            0 :                 None => break,                 // closed stream
     344            0 :             }
     345            0 :         }
     346            0 : 
     347            0 :         Ok(Response::new(()))
     348         1034 :     }
     349              : 
     350              :     type SubscribeSafekeeperInfoStream =
     351              :         Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
     352              : 
     353         1715 :     async fn subscribe_safekeeper_info(
     354         1715 :         &self,
     355         1715 :         request: Request<SubscribeSafekeeperInfoRequest>,
     356         1715 :     ) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
     357         1715 :         let remote_addr = request
     358         1715 :             .remote_addr()
     359         1715 :             .expect("TCPConnectInfo inserted by handler");
     360         1715 :         let proto_key = request
     361         1715 :             .into_inner()
     362         1715 :             .subscription_key
     363         1715 :             .ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
     364         1715 :         let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
     365         1715 :         let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
     366         1715 : 
     367         1715 :         // transform rx into stream with item = Result, as method result demands
     368         1715 :         let output = async_stream::try_stream! {
     369         1715 :             let mut warn_interval = time::interval(Duration::from_millis(1000));
     370         1715 :             let mut missed_msgs: u64 = 0;
     371         1715 :             loop {
     372        26531 :                 match subscriber.sub_rx.recv().await {
     373        14829 :                     Ok(info) => yield info,
     374         1715 :                     Err(RecvError::Lagged(skipped_msg)) => {
     375            0 :                         missed_msgs += skipped_msg;
     376            0 :                         if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
     377         1715 :                             warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
     378            0 :                                 subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
     379         1715 :                             missed_msgs = 0;
     380         1715 :                         }
     381         1715 :                     }
     382         1715 :                     Err(RecvError::Closed) => {
     383         1715 :                         // can't happen, we never drop the channel while there is a subscriber
     384         1715 :                         Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
     385         1715 :                     }
     386         1715 :                 }
     387         1715 :             }
     388         1715 :         };
     389         1715 : 
     390         1715 :         Ok(Response::new(
     391         1715 :             Box::pin(output) as Self::SubscribeSafekeeperInfoStream
     392         1715 :         ))
     393         3430 :     }
     394              : }
     395              : 
     396              : // We serve only metrics and healthcheck through http1.
     397            4 : async fn http1_handler(
     398            4 :     req: hyper::Request<hyper::body::Body>,
     399            4 : ) -> Result<hyper::Response<Body>, Infallible> {
     400            4 :     let resp = match (req.method(), req.uri().path()) {
     401            4 :         (&Method::GET, "/metrics") => {
     402            0 :             let mut buffer = vec![];
     403            0 :             let metrics = metrics::gather();
     404            0 :             let encoder = TextEncoder::new();
     405            0 :             encoder.encode(&metrics, &mut buffer).unwrap();
     406            0 : 
     407            0 :             hyper::Response::builder()
     408            0 :                 .status(StatusCode::OK)
     409            0 :                 .header(CONTENT_TYPE, encoder.format_type())
     410            0 :                 .body(Body::from(buffer))
     411            0 :                 .unwrap()
     412              :         }
     413            4 :         (&Method::GET, "/status") => hyper::Response::builder()
     414            4 :             .status(StatusCode::OK)
     415            4 :             .body(Body::empty())
     416            4 :             .unwrap(),
     417            0 :         _ => hyper::Response::builder()
     418            0 :             .status(StatusCode::NOT_FOUND)
     419            0 :             .body(Body::empty())
     420            0 :             .unwrap(),
     421              :     };
     422            4 :     Ok(resp)
     423            4 : }
     424              : 
     425              : #[tokio::main]
     426          372 : async fn main() -> Result<(), Box<dyn std::error::Error>> {
     427          372 :     let args = Args::parse();
     428          372 : 
     429          372 :     // important to keep the order of:
     430          372 :     // 1. init logging
     431          372 :     // 2. tracing panic hook
     432          372 :     // 3. sentry
     433          372 :     logging::init(
     434          372 :         LogFormat::from_config(&args.log_format)?,
     435          372 :         logging::TracingErrorLayerEnablement::Disabled,
     436            0 :     )?;
     437          372 :     logging::replace_panic_hook_with_tracing_panic_hook().forget();
     438          372 :     // initialize sentry if SENTRY_DSN is provided
     439          372 :     let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
     440          372 :     info!("version: {GIT_VERSION}");
     441          372 :     ::metrics::set_build_info_metric(GIT_VERSION);
     442          372 : 
     443          372 :     // On any shutdown signal, log receival and exit.
     444          372 :     std::thread::spawn(move || {
     445          372 :         ShutdownSignals::handle(|signal| {
     446          371 :             info!("received {}, terminating", signal.name());
     447          371 :             std::process::exit(0);
     448          372 :         })
     449          372 :     });
     450          372 : 
     451          372 :     let registry = Registry {
     452          372 :         shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
     453          372 :         timeline_chan_size: args.timeline_chan_size,
     454          372 :     };
     455          372 :     let storage_broker_impl = Broker {
     456          372 :         registry: registry.clone(),
     457          372 :     };
     458          372 :     let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
     459          372 : 
     460          372 :     info!("listening on {}", &args.listen_addr);
     461              : 
     462              :     // grpc is served along with http1 for metrics on a single port, hence we
     463              :     // don't use tonic's Server.
     464          372 :     hyper::Server::bind(&args.listen_addr)
     465          372 :         .http2_keep_alive_interval(Some(args.http2_keepalive_interval))
     466         1576 :         .serve(make_service_fn(move |conn: &AddrStream| {
     467         1576 :             let storage_broker_server_cloned = storage_broker_server.clone();
     468         1576 :             let connect_info = conn.connect_info();
     469         1576 :             async move {
     470         2237 :                 Ok::<_, Infallible>(service_fn(move |mut req| {
     471         2237 :                     // That's what tonic's MakeSvc.call does to pass conninfo to
     472         2237 :                     // the request handler (and where its request.remote_addr()
     473         2237 :                     // expects it to find).
     474         2237 :                     req.extensions_mut().insert(connect_info.clone());
     475         2237 : 
     476         2237 :                     // Technically this second clone is not needed, but consume
     477         2237 :                     // by async block is apparently unavoidable. BTW, error
     478         2237 :                     // message is enigmatic, see
     479         2237 :                     // https://github.com/rust-lang/rust/issues/68119
     480         2237 :                     //
     481         2237 :                     // We could get away without async block at all, but then we
     482         2237 :                     // need to resort to futures::Either to merge the result,
     483         2237 :                     // which doesn't caress an eye as well.
     484         2237 :                     let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
     485         2237 :                     async move {
     486         2237 :                         if req.headers().get("content-type").map(|x| x.as_bytes())
     487         2237 :                             == Some(b"application/grpc")
     488              :                         {
     489         5446 :                             let res_resp = storage_broker_server_svc.call(req).await;
     490              :                             // Grpc and http1 handlers have slightly different
     491              :                             // Response types: it is UnsyncBoxBody for the
     492              :                             // former one (not sure why) and plain hyper::Body
     493              :                             // for the latter. Both implement HttpBody though,
     494              :                             // and EitherBody is used to merge them.
     495         2233 :                             res_resp.map(|resp| resp.map(EitherBody::Left))
     496              :                         } else {
     497            4 :                             let res_resp = http1_handler(req).await;
     498            4 :                             res_resp.map(|resp| resp.map(EitherBody::Right))
     499              :                         }
     500         2237 :                     }
     501         2237 :                 }))
     502         1576 :             }
     503         1576 :         }))
     504         1089 :         .await?;
     505            0 :     Ok(())
     506              : }
     507              : 
     508              : #[cfg(test)]
     509              : mod tests {
     510              :     use super::*;
     511              :     use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
     512              :     use tokio::sync::broadcast::error::TryRecvError;
     513              :     use utils::id::{TenantId, TimelineId};
     514              : 
     515            2 :     fn msg(timeline_id: Vec<u8>) -> SafekeeperTimelineInfo {
     516            2 :         SafekeeperTimelineInfo {
     517            2 :             safekeeper_id: 1,
     518            2 :             tenant_timeline_id: Some(ProtoTenantTimelineId {
     519            2 :                 tenant_id: vec![0x00; 16],
     520            2 :                 timeline_id,
     521            2 :             }),
     522            2 :             term: 0,
     523            2 :             last_log_term: 0,
     524            2 :             flush_lsn: 1,
     525            2 :             commit_lsn: 2,
     526            2 :             backup_lsn: 3,
     527            2 :             remote_consistent_lsn: 4,
     528            2 :             peer_horizon_lsn: 5,
     529            2 :             safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
     530            2 :             http_connstr: "neon-1-sk-1.local:7677".to_owned(),
     531            2 :             local_start_lsn: 0,
     532            2 :             availability_zone: None,
     533            2 :         }
     534            2 :     }
     535              : 
     536            3 :     fn tli_from_u64(i: u64) -> Vec<u8> {
     537            3 :         let mut timeline_id = vec![0xFF; 8];
     538            3 :         timeline_id.extend_from_slice(&i.to_be_bytes());
     539            3 :         timeline_id
     540            3 :     }
     541              : 
     542            3 :     fn mock_addr() -> SocketAddr {
     543            3 :         "127.0.0.1:8080".parse().unwrap()
     544            3 :     }
     545              : 
     546            1 :     #[tokio::test]
     547            1 :     async fn test_registry() {
     548            1 :         let registry = Registry {
     549            1 :             shared_state: Arc::new(RwLock::new(SharedState::new(16))),
     550            1 :             timeline_chan_size: 16,
     551            1 :         };
     552            1 : 
     553            1 :         // subscribe to timeline 2
     554            1 :         let ttid_2 = TenantTimelineId {
     555            1 :             tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
     556            1 :             timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
     557            1 :         };
     558            1 :         let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
     559            1 :         let mut subscriber_2 = registry.register_subscriber(sub_key_2, mock_addr());
     560            1 :         let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All, mock_addr());
     561            1 : 
     562            1 :         // send two messages with different keys
     563            1 :         let msg_1 = msg(tli_from_u64(1));
     564            1 :         let msg_2 = msg(tli_from_u64(2));
     565            1 :         let mut publisher = registry.register_publisher(mock_addr());
     566            1 :         publisher.send_msg(&msg_1).expect("failed to send msg");
     567            1 :         publisher.send_msg(&msg_2).expect("failed to send msg");
     568            1 : 
     569            1 :         // msg with key 2 should arrive to subscriber_2
     570            1 :         assert_eq!(subscriber_2.sub_rx.try_recv().unwrap(), msg_2);
     571              : 
     572              :         // but nothing more
     573            1 :         assert_eq!(
     574            1 :             subscriber_2.sub_rx.try_recv().unwrap_err(),
     575            1 :             TryRecvError::Empty
     576            1 :         );
     577              : 
     578              :         // subscriber_all should receive both messages
     579            1 :         assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_1);
     580            1 :         assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_2);
     581            1 :         assert_eq!(
     582            1 :             subscriber_all.sub_rx.try_recv().unwrap_err(),
     583            1 :             TryRecvError::Empty
     584            1 :         );
     585              :     }
     586              : }
        

Generated by: LCOV version 2.1-beta