TLA Line data Source code
1 : //! Simple pub-sub based on grpc (tonic) and Tokio broadcast channel for storage
2 : //! nodes messaging.
3 : //!
4 : //! Subscriptions to 1) single timeline 2) all timelines are possible. We could
5 : //! add subscription to the set of timelines to save grpc streams, but testing
6 : //! shows many individual streams is also ok.
7 : //!
8 : //! Message is dropped if subscriber can't consume it, not affecting other
9 : //! subscribers.
10 : //!
11 : //! Only safekeeper message is supported, but it is not hard to add something
12 : //! else with generics.
13 : use clap::{command, Parser};
14 : use futures_core::Stream;
15 : use futures_util::StreamExt;
16 : use hyper::header::CONTENT_TYPE;
17 : use hyper::server::conn::AddrStream;
18 : use hyper::service::{make_service_fn, service_fn};
19 : use hyper::{Body, Method, StatusCode};
20 : use parking_lot::RwLock;
21 : use std::collections::HashMap;
22 : use std::convert::Infallible;
23 : use std::net::SocketAddr;
24 : use std::pin::Pin;
25 : use std::sync::Arc;
26 : use std::time::Duration;
27 : use tokio::sync::broadcast;
28 : use tokio::sync::broadcast::error::RecvError;
29 : use tokio::time;
30 : use tonic::codegen::Service;
31 : use tonic::transport::server::Connected;
32 : use tonic::Code;
33 : use tonic::{Request, Response, Status};
34 : use tracing::*;
35 : use utils::signals::ShutdownSignals;
36 :
37 : use metrics::{Encoder, TextEncoder};
38 : use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
39 : use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
40 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
41 : use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
42 : use storage_broker::{
43 : parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
44 : };
45 : use utils::id::TenantTimelineId;
46 : use utils::logging::{self, LogFormat};
47 : use utils::project_git_version;
48 : use utils::sentry_init::init_sentry;
49 :
50 : project_git_version!(GIT_VERSION);
51 :
52 : const DEFAULT_CHAN_SIZE: usize = 32;
53 : const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384;
54 :
55 CBC 704 : #[derive(Parser, Debug)]
56 : #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)]
57 : struct Args {
58 : /// Endpoint to listen on.
59 : #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)]
60 UBC 0 : listen_addr: SocketAddr,
61 : /// Size of the queue to the per timeline subscriber.
62 CBC 352 : #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)]
63 UBC 0 : timeline_chan_size: usize,
64 0 : /// Size of the queue to the all keys subscriber.
65 CBC 352 : #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)]
66 UBC 0 : all_keys_chan_size: usize,
67 0 : /// HTTP/2 keepalive interval.
68 : #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)]
69 0 : http2_keepalive_interval: Duration,
70 : /// Format for logging, either 'plain' or 'json'.
71 : #[arg(long, default_value = "plain")]
72 0 : log_format: String,
73 : }
74 :
75 : type PubId = u64; // id of publisher for registering in maps
76 : type SubId = u64; // id of subscriber for registering in maps
77 :
78 CBC 3222 : #[derive(Copy, Clone, Debug)]
79 : enum SubscriptionKey {
80 : All,
81 : Timeline(TenantTimelineId),
82 : }
83 :
84 : impl SubscriptionKey {
85 : // Parse protobuf subkey (protobuf doesn't have fixed size bytes, we get vectors).
86 1611 : pub fn from_proto_subscription_key(key: ProtoSubscriptionKey) -> Result<Self, Status> {
87 1611 : match key {
88 500 : ProtoSubscriptionKey::All(_) => Ok(SubscriptionKey::All),
89 1111 : ProtoSubscriptionKey::TenantTimelineId(proto_ttid) => {
90 1111 : Ok(SubscriptionKey::Timeline(parse_proto_ttid(&proto_ttid)?))
91 : }
92 : }
93 1611 : }
94 : }
95 :
96 : // Channel to timeline subscribers.
97 : struct ChanToTimelineSub {
98 : chan: broadcast::Sender<SafekeeperTimelineInfo>,
99 : // Tracked separately to know when delete the shmem entry. receiver_count()
100 : // is unhandy for that as unregistering and dropping the receiver side
101 : // happens at different moments.
102 : num_subscribers: u64,
103 : }
104 :
105 : struct SharedState {
106 : next_pub_id: PubId,
107 : num_pubs: i64,
108 : next_sub_id: SubId,
109 : num_subs_to_timelines: i64,
110 : chans_to_timeline_subs: HashMap<TenantTimelineId, ChanToTimelineSub>,
111 : num_subs_to_all: i64,
112 : chan_to_all_subs: broadcast::Sender<SafekeeperTimelineInfo>,
113 : }
114 :
115 : impl SharedState {
116 353 : pub fn new(all_keys_chan_size: usize) -> Self {
117 353 : SharedState {
118 353 : next_pub_id: 0,
119 353 : num_pubs: 0,
120 353 : next_sub_id: 0,
121 353 : num_subs_to_timelines: 0,
122 353 : chans_to_timeline_subs: HashMap::new(),
123 353 : num_subs_to_all: 0,
124 353 : chan_to_all_subs: broadcast::channel(all_keys_chan_size).0,
125 353 : }
126 353 : }
127 :
128 : // Register new publisher.
129 501 : pub fn register_publisher(&mut self) -> PubId {
130 501 : let pub_id = self.next_pub_id;
131 501 : self.next_pub_id += 1;
132 501 : self.num_pubs += 1;
133 501 : NUM_PUBS.set(self.num_pubs);
134 501 : pub_id
135 501 : }
136 :
137 : // Unregister publisher.
138 501 : pub fn unregister_publisher(&mut self) {
139 501 : self.num_pubs -= 1;
140 501 : NUM_PUBS.set(self.num_pubs);
141 501 : }
142 :
143 : // Register new subscriber.
144 1613 : pub fn register_subscriber(
145 1613 : &mut self,
146 1613 : sub_key: SubscriptionKey,
147 1613 : timeline_chan_size: usize,
148 1613 : ) -> (SubId, broadcast::Receiver<SafekeeperTimelineInfo>) {
149 1613 : let sub_id = self.next_sub_id;
150 1613 : self.next_sub_id += 1;
151 1613 : let sub_rx = match sub_key {
152 : SubscriptionKey::All => {
153 501 : self.num_subs_to_all += 1;
154 501 : NUM_SUBS_ALL.set(self.num_subs_to_all);
155 501 : self.chan_to_all_subs.subscribe()
156 : }
157 1112 : SubscriptionKey::Timeline(ttid) => {
158 1112 : self.num_subs_to_timelines += 1;
159 1112 : NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
160 1112 : // Create new broadcast channel for this key, or subscriber to
161 1112 : // the existing one.
162 1112 : let chan_to_timeline_sub =
163 1112 : self.chans_to_timeline_subs
164 1112 : .entry(ttid)
165 1112 : .or_insert(ChanToTimelineSub {
166 1112 : chan: broadcast::channel(timeline_chan_size).0,
167 1112 : num_subscribers: 0,
168 1112 : });
169 1112 : chan_to_timeline_sub.num_subscribers += 1;
170 1112 : chan_to_timeline_sub.chan.subscribe()
171 : }
172 : };
173 1613 : (sub_id, sub_rx)
174 1613 : }
175 :
176 : // Unregister the subscriber.
177 1613 : pub fn unregister_subscriber(&mut self, sub_key: SubscriptionKey) {
178 1613 : match sub_key {
179 501 : SubscriptionKey::All => {
180 501 : self.num_subs_to_all -= 1;
181 501 : NUM_SUBS_ALL.set(self.num_subs_to_all);
182 501 : }
183 1112 : SubscriptionKey::Timeline(ttid) => {
184 1112 : self.num_subs_to_timelines -= 1;
185 1112 : NUM_SUBS_TIMELINE.set(self.num_subs_to_timelines);
186 1112 :
187 1112 : // Remove from the map, destroying the channel, if we are the
188 1112 : // last subscriber to this timeline.
189 1112 :
190 1112 : // Missing entry is a bug; we must have registered.
191 1112 : let chan_to_timeline_sub = self
192 1112 : .chans_to_timeline_subs
193 1112 : .get_mut(&ttid)
194 1112 : .expect("failed to find sub entry in shmem during unregister");
195 1112 : chan_to_timeline_sub.num_subscribers -= 1;
196 1112 : if chan_to_timeline_sub.num_subscribers == 0 {
197 1100 : self.chans_to_timeline_subs.remove(&ttid);
198 1100 : }
199 : }
200 : }
201 1613 : }
202 : }
203 :
204 : // SharedState wrapper.
205 2466 : #[derive(Clone)]
206 : struct Registry {
207 : shared_state: Arc<RwLock<SharedState>>,
208 : timeline_chan_size: usize,
209 : }
210 :
211 : impl Registry {
212 : // Register new publisher in shared state.
213 501 : pub fn register_publisher(&self, remote_addr: SocketAddr) -> Publisher {
214 501 : let pub_id = self.shared_state.write().register_publisher();
215 501 : info!("publication started id={} addr={:?}", pub_id, remote_addr);
216 501 : Publisher {
217 501 : id: pub_id,
218 501 : registry: self.clone(),
219 501 : remote_addr,
220 501 : }
221 501 : }
222 :
223 501 : pub fn unregister_publisher(&self, publisher: &Publisher) {
224 501 : self.shared_state.write().unregister_publisher();
225 501 : info!(
226 500 : "publication ended id={} addr={:?}",
227 500 : publisher.id, publisher.remote_addr
228 500 : );
229 501 : }
230 :
231 : // Register new subscriber in shared state.
232 1613 : pub fn register_subscriber(
233 1613 : &self,
234 1613 : sub_key: SubscriptionKey,
235 1613 : remote_addr: SocketAddr,
236 1613 : ) -> Subscriber {
237 1613 : let (sub_id, sub_rx) = self
238 1613 : .shared_state
239 1613 : .write()
240 1613 : .register_subscriber(sub_key, self.timeline_chan_size);
241 1613 : info!(
242 1611 : "subscription started id={}, key={:?}, addr={:?}",
243 1611 : sub_id, sub_key, remote_addr
244 1611 : );
245 1613 : Subscriber {
246 1613 : id: sub_id,
247 1613 : key: sub_key,
248 1613 : sub_rx,
249 1613 : registry: self.clone(),
250 1613 : remote_addr,
251 1613 : }
252 1613 : }
253 :
254 : // Unregister the subscriber
255 1613 : pub fn unregister_subscriber(&self, subscriber: &Subscriber) {
256 1613 : self.shared_state
257 1613 : .write()
258 1613 : .unregister_subscriber(subscriber.key);
259 1613 : info!(
260 1611 : "subscription ended id={}, key={:?}, addr={:?}",
261 1611 : subscriber.id, subscriber.key, subscriber.remote_addr
262 1611 : );
263 1613 : }
264 : }
265 :
266 : // Private subscriber state.
267 : struct Subscriber {
268 : id: SubId,
269 : key: SubscriptionKey,
270 : // Subscriber receives messages from publishers here.
271 : sub_rx: broadcast::Receiver<SafekeeperTimelineInfo>,
272 : // to unregister itself from shared state in Drop
273 : registry: Registry,
274 : // for logging
275 : remote_addr: SocketAddr,
276 : }
277 :
278 : impl Drop for Subscriber {
279 1613 : fn drop(&mut self) {
280 1613 : self.registry.unregister_subscriber(self);
281 1613 : }
282 : }
283 :
284 : // Private publisher state
285 : struct Publisher {
286 : id: PubId,
287 : registry: Registry,
288 : // for logging
289 : remote_addr: SocketAddr,
290 : }
291 :
292 : impl Publisher {
293 : // Send msg to relevant subscribers.
294 6678 : pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
295 6678 : // send message to subscribers for everything
296 6678 : let shared_state = self.registry.shared_state.read();
297 6678 : // Err means there is no subscribers, it is fine.
298 6678 : shared_state.chan_to_all_subs.send(msg.clone()).ok();
299 :
300 : // send message to per timeline subscribers
301 6678 : let ttid =
302 6678 : parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
303 UBC 0 : Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
304 CBC 6678 : })?)?;
305 6678 : if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
306 5805 : // Err can't happen here, as tx is destroyed only after removing
307 5805 : // from the map the last subscriber along with tx.
308 5805 : subs.chan
309 5805 : .send(msg.clone())
310 5805 : .expect("rx is still in the map with zero subscribers");
311 5805 : }
312 6678 : Ok(())
313 6678 : }
314 : }
315 :
316 : impl Drop for Publisher {
317 501 : fn drop(&mut self) {
318 501 : self.registry.unregister_publisher(self);
319 501 : }
320 : }
321 :
322 : struct Broker {
323 : registry: Registry,
324 : }
325 :
326 : #[tonic::async_trait]
327 : impl BrokerService for Broker {
328 500 : async fn publish_safekeeper_info(
329 500 : &self,
330 500 : request: Request<tonic::Streaming<SafekeeperTimelineInfo>>,
331 500 : ) -> Result<Response<()>, Status> {
332 500 : let remote_addr = request
333 500 : .remote_addr()
334 500 : .expect("TCPConnectInfo inserted by handler");
335 500 : let mut publisher = self.registry.register_publisher(remote_addr);
336 500 :
337 500 : let mut stream = request.into_inner();
338 :
339 : loop {
340 7176 : match stream.next().await {
341 6676 : Some(Ok(msg)) => publisher.send_msg(&msg)?,
342 500 : Some(Err(e)) => return Err(e), // grpc error from the stream
343 UBC 0 : None => break, // closed stream
344 0 : }
345 0 : }
346 0 :
347 0 : Ok(Response::new(()))
348 CBC 1000 : }
349 :
350 : type SubscribeSafekeeperInfoStream =
351 : Pin<Box<dyn Stream<Item = Result<SafekeeperTimelineInfo, Status>> + Send + 'static>>;
352 :
353 1611 : async fn subscribe_safekeeper_info(
354 1611 : &self,
355 1611 : request: Request<SubscribeSafekeeperInfoRequest>,
356 1611 : ) -> Result<Response<Self::SubscribeSafekeeperInfoStream>, Status> {
357 1611 : let remote_addr = request
358 1611 : .remote_addr()
359 1611 : .expect("TCPConnectInfo inserted by handler");
360 1611 : let proto_key = request
361 1611 : .into_inner()
362 1611 : .subscription_key
363 1611 : .ok_or_else(|| Status::new(Code::InvalidArgument, "missing subscription key"))?;
364 1611 : let sub_key = SubscriptionKey::from_proto_subscription_key(proto_key)?;
365 1611 : let mut subscriber = self.registry.register_subscriber(sub_key, remote_addr);
366 1611 :
367 1611 : // transform rx into stream with item = Result, as method result demands
368 1611 : let output = async_stream::try_stream! {
369 1611 : let mut warn_interval = time::interval(Duration::from_millis(1000));
370 1611 : let mut missed_msgs: u64 = 0;
371 1611 : loop {
372 28135 : match subscriber.sub_rx.recv().await {
373 15593 : Ok(info) => yield info,
374 1611 : Err(RecvError::Lagged(skipped_msg)) => {
375 UBC 0 : missed_msgs += skipped_msg;
376 0 : if (futures::poll!(Box::pin(warn_interval.tick()))).is_ready() {
377 CBC 1611 : warn!("subscription id={}, key={:?} addr={:?} dropped {} messages, channel is full",
378 UBC 0 : subscriber.id, subscriber.key, subscriber.remote_addr, missed_msgs);
379 CBC 1611 : missed_msgs = 0;
380 1611 : }
381 1611 : }
382 1611 : Err(RecvError::Closed) => {
383 1611 : // can't happen, we never drop the channel while there is a subscriber
384 1611 : Err(Status::new(Code::Internal, "channel unexpectantly closed"))?;
385 1611 : }
386 1611 : }
387 1611 : }
388 1611 : };
389 1611 :
390 1611 : Ok(Response::new(
391 1611 : Box::pin(output) as Self::SubscribeSafekeeperInfoStream
392 1611 : ))
393 3222 : }
394 : }
395 :
396 : // We serve only metrics and healthcheck through http1.
397 5 : async fn http1_handler(
398 5 : req: hyper::Request<hyper::body::Body>,
399 5 : ) -> Result<hyper::Response<Body>, Infallible> {
400 5 : let resp = match (req.method(), req.uri().path()) {
401 5 : (&Method::GET, "/metrics") => {
402 UBC 0 : let mut buffer = vec![];
403 0 : let metrics = metrics::gather();
404 0 : let encoder = TextEncoder::new();
405 0 : encoder.encode(&metrics, &mut buffer).unwrap();
406 0 :
407 0 : hyper::Response::builder()
408 0 : .status(StatusCode::OK)
409 0 : .header(CONTENT_TYPE, encoder.format_type())
410 0 : .body(Body::from(buffer))
411 0 : .unwrap()
412 : }
413 CBC 5 : (&Method::GET, "/status") => hyper::Response::builder()
414 5 : .status(StatusCode::OK)
415 5 : .body(Body::empty())
416 5 : .unwrap(),
417 UBC 0 : _ => hyper::Response::builder()
418 0 : .status(StatusCode::NOT_FOUND)
419 0 : .body(Body::empty())
420 0 : .unwrap(),
421 : };
422 CBC 5 : Ok(resp)
423 5 : }
424 :
425 : #[tokio::main]
426 352 : async fn main() -> Result<(), Box<dyn std::error::Error>> {
427 352 : let args = Args::parse();
428 352 :
429 352 : // important to keep the order of:
430 352 : // 1. init logging
431 352 : // 2. tracing panic hook
432 352 : // 3. sentry
433 352 : logging::init(
434 352 : LogFormat::from_config(&args.log_format)?,
435 352 : logging::TracingErrorLayerEnablement::Disabled,
436 UBC 0 : )?;
437 CBC 352 : logging::replace_panic_hook_with_tracing_panic_hook().forget();
438 352 : // initialize sentry if SENTRY_DSN is provided
439 352 : let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
440 352 : info!("version: {GIT_VERSION}");
441 352 : ::metrics::set_build_info_metric(GIT_VERSION);
442 352 :
443 352 : // On any shutdown signal, log receival and exit.
444 352 : std::thread::spawn(move || {
445 352 : ShutdownSignals::handle(|signal| {
446 350 : info!("received {}, terminating", signal.name());
447 350 : std::process::exit(0);
448 352 : })
449 352 : });
450 352 :
451 352 : let registry = Registry {
452 352 : shared_state: Arc::new(RwLock::new(SharedState::new(args.all_keys_chan_size))),
453 352 : timeline_chan_size: args.timeline_chan_size,
454 352 : };
455 352 : let storage_broker_impl = Broker {
456 352 : registry: registry.clone(),
457 352 : };
458 352 : let storage_broker_server = BrokerServiceServer::new(storage_broker_impl);
459 352 :
460 352 : info!("listening on {}", &args.listen_addr);
461 :
462 : // grpc is served along with http1 for metrics on a single port, hence we
463 : // don't use tonic's Server.
464 352 : hyper::Server::bind(&args.listen_addr)
465 352 : .http2_keep_alive_interval(Some(args.http2_keepalive_interval))
466 1525 : .serve(make_service_fn(move |conn: &AddrStream| {
467 1525 : let storage_broker_server_cloned = storage_broker_server.clone();
468 1525 : let connect_info = conn.connect_info();
469 1525 : async move {
470 2119 : Ok::<_, Infallible>(service_fn(move |mut req| {
471 2119 : // That's what tonic's MakeSvc.call does to pass conninfo to
472 2119 : // the request handler (and where its request.remote_addr()
473 2119 : // expects it to find).
474 2119 : req.extensions_mut().insert(connect_info.clone());
475 2119 :
476 2119 : // Technically this second clone is not needed, but consume
477 2119 : // by async block is apparently unavoidable. BTW, error
478 2119 : // message is enigmatic, see
479 2119 : // https://github.com/rust-lang/rust/issues/68119
480 2119 : //
481 2119 : // We could get away without async block at all, but then we
482 2119 : // need to resort to futures::Either to merge the result,
483 2119 : // which doesn't caress an eye as well.
484 2119 : let mut storage_broker_server_svc = storage_broker_server_cloned.clone();
485 2119 : async move {
486 2119 : if req.headers().get("content-type").map(|x| x.as_bytes())
487 2119 : == Some(b"application/grpc")
488 : {
489 5829 : let res_resp = storage_broker_server_svc.call(req).await;
490 : // Grpc and http1 handlers have slightly different
491 : // Response types: it is UnsyncBoxBody for the
492 : // former one (not sure why) and plain hyper::Body
493 : // for the latter. Both implement HttpBody though,
494 : // and EitherBody is used to merge them.
495 2114 : res_resp.map(|resp| resp.map(EitherBody::Left))
496 : } else {
497 5 : let res_resp = http1_handler(req).await;
498 5 : res_resp.map(|resp| resp.map(EitherBody::Right))
499 : }
500 2119 : }
501 2119 : }))
502 1525 : }
503 1525 : }))
504 1051 : .await?;
505 UBC 0 : Ok(())
506 : }
507 :
508 : #[cfg(test)]
509 : mod tests {
510 : use super::*;
511 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
512 : use tokio::sync::broadcast::error::TryRecvError;
513 : use utils::id::{TenantId, TimelineId};
514 :
515 CBC 2 : fn msg(timeline_id: Vec<u8>) -> SafekeeperTimelineInfo {
516 2 : SafekeeperTimelineInfo {
517 2 : safekeeper_id: 1,
518 2 : tenant_timeline_id: Some(ProtoTenantTimelineId {
519 2 : tenant_id: vec![0x00; 16],
520 2 : timeline_id,
521 2 : }),
522 2 : term: 0,
523 2 : last_log_term: 0,
524 2 : flush_lsn: 1,
525 2 : commit_lsn: 2,
526 2 : backup_lsn: 3,
527 2 : remote_consistent_lsn: 4,
528 2 : peer_horizon_lsn: 5,
529 2 : safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(),
530 2 : http_connstr: "neon-1-sk-1.local:7677".to_owned(),
531 2 : local_start_lsn: 0,
532 2 : availability_zone: None,
533 2 : }
534 2 : }
535 :
536 3 : fn tli_from_u64(i: u64) -> Vec<u8> {
537 3 : let mut timeline_id = vec![0xFF; 8];
538 3 : timeline_id.extend_from_slice(&i.to_be_bytes());
539 3 : timeline_id
540 3 : }
541 :
542 3 : fn mock_addr() -> SocketAddr {
543 3 : "127.0.0.1:8080".parse().unwrap()
544 3 : }
545 :
546 1 : #[tokio::test]
547 1 : async fn test_registry() {
548 1 : let registry = Registry {
549 1 : shared_state: Arc::new(RwLock::new(SharedState::new(16))),
550 1 : timeline_chan_size: 16,
551 1 : };
552 1 :
553 1 : // subscribe to timeline 2
554 1 : let ttid_2 = TenantTimelineId {
555 1 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
556 1 : timeline_id: TimelineId::from_slice(&tli_from_u64(2)).unwrap(),
557 1 : };
558 1 : let sub_key_2 = SubscriptionKey::Timeline(ttid_2);
559 1 : let mut subscriber_2 = registry.register_subscriber(sub_key_2, mock_addr());
560 1 : let mut subscriber_all = registry.register_subscriber(SubscriptionKey::All, mock_addr());
561 1 :
562 1 : // send two messages with different keys
563 1 : let msg_1 = msg(tli_from_u64(1));
564 1 : let msg_2 = msg(tli_from_u64(2));
565 1 : let mut publisher = registry.register_publisher(mock_addr());
566 1 : publisher.send_msg(&msg_1).expect("failed to send msg");
567 1 : publisher.send_msg(&msg_2).expect("failed to send msg");
568 1 :
569 1 : // msg with key 2 should arrive to subscriber_2
570 1 : assert_eq!(subscriber_2.sub_rx.try_recv().unwrap(), msg_2);
571 :
572 : // but nothing more
573 1 : assert_eq!(
574 1 : subscriber_2.sub_rx.try_recv().unwrap_err(),
575 1 : TryRecvError::Empty
576 1 : );
577 :
578 : // subscriber_all should receive both messages
579 1 : assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_1);
580 1 : assert_eq!(subscriber_all.sub_rx.try_recv().unwrap(), msg_2);
581 1 : assert_eq!(
582 1 : subscriber_all.sub_rx.try_recv().unwrap_err(),
583 1 : TryRecvError::Empty
584 1 : );
585 : }
586 : }
|